Files
mvvm--learn/demo/mvvm/framework.py
2026-05-18 11:33:59 +08:00

211 lines
7.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
MVVM 小框架核心模块。
这个文件实现了一个极简的“发布-订阅”事件通道,并在它之上封装
Model 与 View 的绑定关系。demo 中的界面刷新不是由 ViewModel 直接
调用 View 完成,而是由 Model 发布事件,再由订阅者 View 响应事件。
"""
from threading import Thread, Lock
from queue import Queue
from enum import Enum, unique
class Subscriber(object):
"""
订阅者。
一个订阅者由两部分组成:
1. topic它关心的事件主题。
2. callback主题事件发生时要执行的回调函数。
"""
def __init__(self, topic, callback):
# 保存订阅主题,例如 "log_to_view"。
self._topic = topic
# 保存事件到来时要调用的函数,例如 view.signal_proxy。
self._callback = callback
@property
def topic(self):
"""返回订阅者关注的主题。"""
return self._topic
@property
def callback(self):
"""返回事件触发时要执行的回调函数。"""
return self._callback
class Publisher(object):
"""
发布者/事件对象。
这里的 Publisher 不是一个长期存在的对象,而是一次事件发布时
放进队列里的消息包topic 表示发给谁content 表示传什么数据。
"""
def __init__(self, topic, content):
# 事件主题,决定这条消息会被哪些订阅者消费。
self._topic = topic
# 事件内容,也就是回调函数收到的参数。
self._content = content
@property
def topic(self):
"""返回事件主题。"""
return self._topic
@property
def content(self):
"""返回事件内容。"""
return self._content
# 保留主题:当前 demo 没有使用,可作为以后“不触发实际业务”的占位主题。
KEEP_TOPIC = 'keep'
class EventChannel(Thread):
"""
事件通道。
它继承 Thread内部维护一个 Queue。发布事件时先把事件放入队列
线程运行后不断从队列取事件,再根据 topic 找到对应订阅者并执行回调。
"""
def __init__(self):
super(EventChannel, self).__init__()
# 控制事件循环是否继续运行。
self._working = True
# 保护订阅关系表,避免多个线程同时读写造成状态错乱。
self._mutex = Lock()
# 存放待处理事件的线程安全队列。
self._queue = Queue()
# 订阅关系表:{"topic": [Subscriber, Subscriber, ...]}。
self._resigster_container = {}
def __del__(self):
# 对象销毁时尝试停止循环。注意:如果线程正阻塞在 queue.get()
# 仅设置 False 不一定能立即退出,这里只作为简单 demo 处理。
self._working = False
def run(self) -> None:
"""线程入口:持续消费事件队列,并通知对应订阅者。"""
while self._working:
# Queue.get() 会阻塞等待新事件,因此线程不会空转占用 CPU。
puber = self._queue.get()
with self._mutex:
print('pop event {} : {}'.format(puber.topic, puber.content))
# 找到该 topic 下的所有订阅者;无人订阅时返回空列表。
group = self._resigster_container.get(puber.topic, [])
for suber in group:
# 将事件内容传给订阅者的回调函数。
suber.callback(puber.content)
def publish(self, topic: str, content):
"""发布事件:把主题和内容包装成 Publisher 后放入队列。"""
puber = Publisher(topic, content)
self._queue.put(puber)
def subscribe(self, topic: str, callback):
"""订阅事件:把某个 topic 与对应回调函数登记到订阅关系表中。"""
suber = Subscriber(topic, callback)
with self._mutex:
group = self._resigster_container.get(suber.topic, [])
group.append(suber)
self._resigster_container[suber.topic] = group
class BaseModel(object):
"""
Model 基类。
子类修改自身数据时,一般会通过 fw_proxy.channel.publish 发布事件;
当 View 的事件反向更新 Model 时,则由 signal_proxy 接收新值。
"""
def __init__(self, topic: str):
# 每个 Model 都有自己的主题,用来向外通知“我变了”。
self._topic = topic
@property
def topic(self):
"""返回 Model 对应的发布主题。"""
return self._topic
def signal_proxy(self, *args):
"""事件代理函数。子类通常会重写它来更新自身状态。"""
print('{} event proxy run'.format(type(self)))
class BaseView(object):
"""
View 基类。
View 负责界面显示。它收到 Model 发布的事件后,在 signal_proxy 中
把数据刷新到具体控件上。
"""
def __init__(self, topic: str):
# 每个 View 也有自己的主题,做双向绑定时可用来通知 Model。
self._topic = topic
@property
def topic(self):
"""返回 View 对应的发布主题。"""
return self._topic
def signal_proxy(self, *args):
"""事件代理函数。子类通常会重写它来刷新界面控件。"""
print('{} event proxy run'.format(type(self)))
@unique
class BindingStyle(Enum):
"""绑定方式枚举。"""
DOUBLE_BINDING = 0 # 双向绑定Model -> View同时 View -> Model。
MODEL_TO_VIEW = 1 # 单向绑定Model 改变后刷新 View。
VIEW_TO_MODEL = 2 # 单向绑定View 改变后更新 Model。
class Framework(object):
"""框架入口对象,负责启动事件通道并建立绑定关系。"""
def __init__(self):
# 整个框架共用一个事件通道。
self.channel = EventChannel()
def work(self):
"""启动事件通道线程。"""
# 设为守护线程:主程序退出时,该事件线程也会随之结束。
self.channel.daemon = True
self.channel.start()
def binding(self, model: BaseModel, view: BaseView, style=BindingStyle.DOUBLE_BINDING):
"""
建立 Model 与 View 的绑定。
本质是把某一方的 topic 订阅到另一方的 signal_proxy。
"""
if style == BindingStyle.DOUBLE_BINDING:
# Model 发布变化时View 响应并刷新界面。
self.channel.subscribe(model.topic, view.signal_proxy)
# View 发布变化时Model 响应并更新数据。
self.channel.subscribe(view.topic, model.signal_proxy)
elif style == BindingStyle.MODEL_TO_VIEW:
# 模型改变引起视图改变。
self.channel.subscribe(model.topic, view.signal_proxy)
elif style == BindingStyle.VIEW_TO_MODEL:
# 视图改变引起模型改变。
self.channel.subscribe(view.topic, model.signal_proxy)
else:
raise RuntimeWarning('binding is wrong')
# 框架单例对象:其他模块通过 fw_proxy 使用同一个事件通道。
fw_proxy = Framework()