#!/usr/bin/python # -*- coding: utf-8 -*- """ MVVM 小框架核心模块。 这个根目录版本与 demo/mvvm/framework.py 作用相同,适合复制到其他项目中 单独使用。它提供发布-订阅事件通道,以及 Model/View 的绑定入口。 """ from threading import Thread, Lock from queue import Queue from enum import Enum, unique class Subscriber(object): """订阅者:记录一个主题以及主题触发时要执行的回调函数。""" def __init__(self, topic, callback): # 订阅的事件主题。 self._topic = topic # 主题事件发生时执行的函数。 self._callback = callback @property def topic(self): """返回订阅主题。""" return self._topic @property def callback(self): """返回事件回调函数。""" return self._callback class Publisher(object): """发布事件时放入队列的消息对象。""" def __init__(self, topic, content): # 事件主题,用于匹配订阅者。 self._topic = topic # 事件内容,会作为参数传给订阅回调。 self._content = content @property def topic(self): """返回事件主题。""" return self._topic @property def content(self): """返回事件内容。""" return self._content # 保留主题:当前没有使用,预留给未来的占位或空事件。 KEEP_TOPIC = 'keep' class EventChannel(Thread): """事件通道线程:从队列取事件,并分发给对应 topic 的订阅者。""" def __init__(self): super(EventChannel, self).__init__() # 控制线程循环。 self._working = True # 保护订阅关系表的互斥锁。 self._mutex = Lock() # 线程安全事件队列。 self._queue = Queue() # topic 到订阅者列表的映射。 self._resigster_container = {} def __del__(self): # 简单停止标记;demo 级写法,不保证立即唤醒阻塞中的 get()。 self._working = False def run(self) -> None: """线程主循环:消费事件并执行订阅回调。""" while self._working: puber = self._queue.get() with self._mutex: print('pop event {} : {}'.format(puber.topic, puber.content)) group = self._resigster_container.get(puber.topic, []) for suber in group: suber.callback(puber.content) def publish(self, topic: str, content): """发布事件:把 topic 和 content 包成 Publisher 放入队列。""" puber = Publisher(topic, content) self._queue.put(puber) def subscribe(self, topic: str, callback): """订阅事件:登记 topic 与回调函数的关系。""" suber = Subscriber(topic, callback) with self._mutex: group = self._resigster_container.get(suber.topic, []) group.append(suber) self._resigster_container[suber.topic] = group class BaseModel(object): """Model 基类:保存主题,并提供可被 View 事件调用的代理方法。""" def __init__(self, topic: str): # Model 变化时发布的主题。 self._topic = topic @property def topic(self): """返回 Model 对应主题。""" return self._topic def signal_proxy(self, *args): """事件代理函数,具体 Model 可重写。""" print('{} event proxy run'.format(type(self))) class BaseView(object): """View 基类:保存主题,并提供可被 Model 事件调用的代理方法。""" def __init__(self, topic: str): # View 变化时发布的主题。 self._topic = topic @property def topic(self): """返回 View 对应主题。""" return self._topic def signal_proxy(self, *args): """事件代理函数,具体 View 可重写。""" print('{} event proxy run'.format(type(self))) @unique class BindingStyle(Enum): """Model 与 View 的绑定方向。""" DOUBLE_BINDING = 0 # 双向绑定。 MODEL_TO_VIEW = 1 # Model -> View。 VIEW_TO_MODEL = 2 # View -> Model。 class Framework(object): """框架门面对象:负责启动通道和建立绑定。""" def __init__(self): # 框架内部唯一事件通道。 self.channel = EventChannel() def work(self): """启动事件通道线程。""" self.channel.daemon = True self.channel.start() def binding(self, model: BaseModel, view: BaseView, style=BindingStyle.DOUBLE_BINDING): """按指定方向绑定 Model 与 View。""" if style == BindingStyle.DOUBLE_BINDING: self.channel.subscribe(model.topic, view.signal_proxy) self.channel.subscribe(view.topic, model.signal_proxy) elif style == BindingStyle.MODEL_TO_VIEW: # 模型改变引起视图改变。 self.channel.subscribe(model.topic, view.signal_proxy) elif style == BindingStyle.VIEW_TO_MODEL: # 视图改变引起模型改变。 self.channel.subscribe(view.topic, model.signal_proxy) else: raise RuntimeWarning('binding is wrong') # 框架单例对象,业务代码通过它发布、订阅和绑定。 fw_proxy = Framework()