Python中的事件系统

你使用什么Python事件系统? 我已经知道pydispatcher ,但我想知道还有什么可以find,或者是常用的?

我对作为大型框架一部分的事件pipe理者不感兴趣,我宁愿使用一个我可以轻松扩展的简单的解决scheme。

总结回答中提到的各种事件系统:

事件系统的最基本的风格是“处理器方法包”,它是观察者模式的简单实现。 基本上,处理器方法(可调用)存储在一个数组中,并在事件“触发”时分别被调用。

  • zope.event显示了这是如何工作的骨架(请参阅Lennart的答案 )。 注意:这个例子甚至不支持处理器参数。
  • LongPoke的“可调用列表”实现表明,这样一个事件系统可以通过子类化list非常简单地实现。
  • spassig的EventHook (Michael Foord的事件模式)是一个简单的实现。
  • Josip的Valued Lessons Event类基本上是相同的,但是使用一个set而不是一个list来存储这个包,并且实现了__call__ ,这两个都是合理的补充。
  • PyNotify在概念上是相似的,并且还提供了variables和条件的附加概念(“变化的事件”)。
  • Axel基本上是一个处理程序与更多的function相关的线程,error handling,… … –

这些事件系统的缺点是只能在实际的事件对象(或处理程序列表)上注册处理程序。 所以在注册时间事件已经存在。

这就是为什么存在第二种types的事件系统: 发布 – 订阅模式 。 在这里,处理程序不在事件对象(或处理程序列表)上注册,而是在中央调度程序上注册。 通知者也只和调度员交谈。 听什么或发表什么是由“信号”决定的,它只不过是一个名字(string)。

  • 闪光灯有一些漂亮的function,如基于发件人的自动断开和过滤。
  • PyPubSub一见钟情似乎很简单。
  • PyDispatcher似乎强调多对多的出版物等方面的灵活性。
  • louie是一个重新编写的PyDispatcher“提供包括Twisted和PyQt特定支持的插件基础结构”。 2016年1月以后似乎已经失去维护。
  • django.dispatch是一个重写的PyDispatcher,“界面更有限,但性能更高”。
  • Qt的信号和插槽可以从PyQt或PySide获得 。 它们在同一个线程中使用时作为callback,或者在两个不同线程之间作为事件(使用事件循环)。 信号和插槽有限制,他们只能在派生自QObject的类的对象中工作。

注意: threading.Event不是上述意义上的“事件系统”。 这是一个线程同步系统,其中一个线程等待另一个线程“发送”Event对象。

我一直在这样做:

 class Event(list): """Event subscription. A list of callable objects. Calling an instance of this will cause a call to each item in the list in ascending order by index. Example Usage: >>> def f(x): ... print 'f(%s)' % x >>> def g(x): ... print 'g(%s)' % x >>> e = Event() >>> e() >>> e.append(f) >>> e(123) f(123) >>> e.remove(f) >>> e() >>> e += (f, g) >>> e(10) f(10) g(10) >>> del e[0] >>> e(2) g(2) """ def __call__(self, *args, **kwargs): for f in self: f(*args, **kwargs) def __repr__(self): return "Event(%s)" % list.__repr__(self) 

不过,就像我看到的其他所有东西一样,没有自动生成的pydoc,也没有签名,真的很糟糕。

我们使用Michael Foord在他的Event Pattern中提出的一个EventHook:

只需添加EventHooks到您的类与:

 class MyBroadcaster() def __init__(): self.onChange = EventHook() theBroadcaster = MyBroadcaster() # add a listener to the event theBroadcaster.onChange += myFunction # remove listener from the event theBroadcaster.onChange -= myFunction # fire event theBroadcaster.onChange.fire() 

我们增加了将所有的监听器从一个对象中移除到Michaels类中的function,并以此结束:

 class EventHook(object): def __init__(self): self.__handlers = [] def __iadd__(self, handler): self.__handlers.append(handler) return self def __isub__(self, handler): self.__handlers.remove(handler) return self def fire(self, *args, **keywargs): for handler in self.__handlers: handler(*args, **keywargs) def clearObjectHandlers(self, inObject): for theHandler in self.__handlers: if theHandler.im_self == inObject: self -= theHandler 

我使用zope.event 。 这是你能想象到的最残酷的骨头。 :-)其实这里是完整的源代码:

 subscribers = [] def notify(event): for subscriber in subscribers: subscriber(event) 

请注意,例如,您不能在进程之间发送消息。 这不是一个消息系统,只是一个事件系统,没有什么比这更重要。

我在“重要课程”中find了这个小脚本。 这似乎只是正确的简单/功率比我后。 彼得·撒切尔是以下代码的作者(没有提到许可证)。

 class Event: def __init__(self): self.handlers = set() def handle(self, handler): self.handlers.add(handler) return self def unhandle(self, handler): try: self.handlers.remove(handler) except: raise ValueError("Handler is not handling this event, so cannot unhandle it.") return self def fire(self, *args, **kargs): for handler in self.handlers: handler(*args, **kargs) def getHandlerCount(self): return len(self.handlers) __iadd__ = handle __isub__ = unhandle __call__ = fire __len__ = getHandlerCount class MockFileWatcher: def __init__(self): self.fileChanged = Event() def watchFiles(self): source_path = "foo" self.fileChanged(source_path) def log_file_change(source_path): print "%r changed." % (source_path,) def log_file_change2(source_path): print "%r changed!" % (source_path,) watcher = MockFileWatcher() watcher.fileChanged += log_file_change2 watcher.fileChanged += log_file_change watcher.fileChanged -= log_file_change2 watcher.watchFiles() 

你可以看看pymitter ( pypi )。 它是一个小型单文件(〜250 loc)的方法“提供命名空间,通配符和TTL”。

这是一个基本的例子:

 from pymitter import EventEmitter ee = EventEmitter() # decorator usage @ee.on("myevent") def handler1(arg): print "handler1 called with", arg # callback usage def handler2(arg): print "handler2 called with", arg ee.on("myotherevent", handler2) # emit ee.emit("myevent", "foo") # -> "handler1 called with foo" ee.emit("myotherevent", "bar") # -> "handler2 called with bar" 

我做了一个Longpoke的简约方法的变体,也确保了被调用者和调用者的签名:

 class EventHook(object): ''' A simple implementation of the Observer-Pattern. The user can specify an event signature upon inizializazion, defined by kwargs in the form of argumentname=class (eg id=int). The arguments' types are not checked in this implementation though. Callables with a fitting signature can be added with += or removed with -=. All listeners can be notified by calling the EventHook class with fitting arguments. >>> event = EventHook(id=int, data=dict) >>> event += lambda id, data: print("%d %s" % (id, data)) >>> event(id=5, data={"foo": "bar"}) 5 {'foo': 'bar'} >>> event = EventHook(id=int) >>> event += lambda wrong_name: None Traceback (most recent call last): ... ValueError: Listener must have these arguments: (id=int) >>> event = EventHook(id=int) >>> event += lambda id: None >>> event(wrong_name=0) Traceback (most recent call last): ... ValueError: This EventHook must be called with these arguments: (id=int) ''' def __init__(self, **signature): self._signature = signature self._argnames = set(signature.keys()) self._handlers = [] def _kwargs_str(self): return ", ".join(k+"="+v.__name__ for k, v in self._signature.items()) def __iadd__(self, handler): params = inspect.signature(handler).parameters valid = True argnames = set(n for n in params.keys()) if argnames != self._argnames: valid = False for p in params.values(): if p.kind == p.VAR_KEYWORD: valid = True break if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY): valid = False break if not valid: raise ValueError("Listener must have these arguments: (%s)" % self._kwargs_str()) self._handlers.append(handler) return self def __isub__(self, handler): self._handlers.remove(handler) return self def __call__(self, *args, **kwargs): if args or set(kwargs.keys()) != self._argnames: raise ValueError("This EventHook must be called with these " + "keyword arguments: (%s)" % self._kwargs_str()) for handler in self._handlers[:]: handler(**kwargs) def __repr__(self): return "EventHook(%s)" % self._kwargs_str() 

我创build了一个EventManager类(最后的代码)。 语法如下:

 #Create an event with no listeners assigned to it EventManager.addEvent( eventName = [] ) #Create an event with listeners assigned to it EventManager.addEvent( eventName = [fun1, fun2,...] ) #Create any number event with listeners assigned to them EventManager.addEvent( eventName1 = [e1fun1, e1fun2,...], eventName2 = [e2fun1, e2fun2,...], ... ) #Add or remove listener to an existing event EventManager.eventName += extra_fun EventManager.eventName -= removed_fun #Delete an event del EventManager.eventName #Fire the event EventManager.eventName() 

这是一个例子:

 def hello(name): print "Hello {}".format(name) def greetings(name): print "Greetings {}".format(name) EventManager.addEvent( salute = [greetings] ) EventManager.salute += hello print "\nInitial salute" EventManager.salute('Oscar') print "\nNow remove greetings" EventManager.salute -= greetings EventManager.salute('Oscar') 

输出:

首先致敬
问候奥斯卡
你好,奥斯卡

现在删除问候
你好,奥斯卡

EventManger代码:

 class EventManager: class Event: def __init__(self,functions): if type(functions) is not list: raise ValueError("functions parameter has to be a list") self.functions = functions def __iadd__(self,func): self.functions.append(func) return self def __isub__(self,func): self.functions.remove(func) return self def __call__(self,*args,**kvargs): for func in self.functions : func(*args,**kvargs) @classmethod def addEvent(cls,**kvargs): """ addEvent( event1 = [f1,f2,...], event2 = [g1,g2,...], ... ) creates events using **kvargs to create any number of events. Each event recieves a list of functions, where every function in the list recieves the same parameters. Example: def hello(): print "Hello ", def world(): print "World" EventManager.addEvent( salute = [hello] ) EventManager.salute += world EventManager.salute() Output: Hello World """ for key in kvargs.keys(): if type(kvargs[key]) is not list: raise ValueError("value has to be a list") else: kvargs[key] = cls.Event(kvargs[key]) cls.__dict__.update(kvargs) 

这是一个最小的devise,应该可以正常工作。 你所要做的只是在类中inheritanceObserver ,然后使用observe(event_name, callback_fn)来监听特定的事件。 无论何时在代码中的任何地方(即Event('USB connected') )触发该特定事件,相应的callback将触发。

 class Observer(): _observers = [] def __init__(self): self._observers.append(self) self._observed_events = [] def observe(self, event_name, callback_fn): self._observed_events.append({'event_name' : event_name, 'callback_fn' : callback_fn}) class Event(): def __init__(self, event_name, *callback_args): for observer in Observer._observers: for observable in observer._observed_events: if observable['event_name'] == event_name: observable['callback_fn'](*callback_args) 

例:

 class Room(Observer): def __init__(self): print("Room is ready.") Observer.__init__(self) # DON'T FORGET THIS def someone_arrived(self, who): print(who + " has arrived!") # Observe for specific event room = Room() room.observe('someone arrived', room.someone_arrived) # Fire some events Event('someone left', 'John') Event('someone arrived', 'Lenard') # will output "Lenard has arrived!" Event('someone Farted', 'Lenard') 

如果我在pyQt中执行代码,我使用QT套接字/信号范例,对于Django也一样

如果我正在做asynchronousI / OI使用本机select模块

如果我使用一个SAX Pythonparsing器,我使用SAX提供的事件API。 所以看起来我是底层API的受害者:-)

也许你应该问自己,你对事件框架/模块有什么期望? 我的个人偏好是使用QT的Socket / Signal范式。 更多的信息可以在这里find

这是另一个考虑的模块 。 对于更苛刻的应用来说,这似乎是一个可行的select

Py-notify是一个Python包,提供了实现Observer编程模式的工具。 这些工具包括信号,条件和variables。

信号是发送信号时调用的处理程序列表。 条件基本上是布尔variables,加上条件状态改变时发出的信号。 它们可以使用标准的逻辑运算符(不是等)组合成复合条件。 与条件不同,variables可以容纳任何Python对象,不仅仅是布尔值,而且不能组合。