zeromq:如何防止无限等待?

我刚刚开始使用ZMQ。 我正在devise一个应用程序的工作stream程是:

  1. 许多客户之一(具有随机PULL地址)在5555向服务器推送请求
  2. 服务器永远在等待客户端PUSHes。 当有人来的时候,这个特定的请求会产生一个工作进程。 是的,工作进程可以同时存在。
  3. 当这个过程完成它的任务时,它将结果压入客户端。

我认为PUSH / PULL体系结构适合于此。 请纠正我这一点。


但是,我如何处理这些情况呢?

  1. 当服务器无法响应时,client_receiver.recv()将等待无限的时间。
  2. 客户端可能会发送请求,但之后会立即失败,因此工作进程将永远停留在server_sender.send()。

那么如何在PUSH / PULL模型中设置类似于超时的东西呢?


编辑 :感谢user938949的build议,我得到了一个工作的答案 ,我分享了后代。

如果您使用的是zeromq> = 3.0,则可以设置RCVTIMEO套接字选项:

client_receiver.RCVTIMEO = 1000 # in milliseconds 

但总的来说,你可以使用轮询器:

 poller = zmq.Poller() poller.register(client_receiver, zmq.POLLIN) # POLLIN for recv, POLLOUT for send 

poller.poll()需要超时:

 evts = poller.poll(1000) # wait *up to* one second for a message to arrive. 

如果没有什么可以接收的话,那么evts将是一个空的列表。

您可以使用zmq.POLLOUT进行轮询,以检查发送是否成功。

或者,为了处理可能失败的同伴的情况,a:

 worker.send(msg, zmq.NOBLOCK) 

可能就足够了,这将总是立即返回 – 如果发送无法完成则引发ZMQError(zmq.EAGAIN)。

这是我引用user938949的答案和http://taotetek.wordpress.com/2011/02/02/python-multiprocessing-with-zeromq/之后所做的一个;快速入门。 如果你做得更好,请发表你的答案, 我会推荐你​​的答案

对于那些希望持久的可靠性解决scheme ,请参阅http://zguide.zeromq.org/page:all#toc64

zeromq(testing版ATM)版本3.0支持ZMQ_RCVTIMEO和ZMQ_SNDTIMEO中的超时http://api.zeromq.org/3-0:zmq-setsockopt

服务器

zmq.NOBLOCK确保当客户端不存在时,send()不会被阻塞。

 import time import zmq context = zmq.Context() ventilator_send = context.socket(zmq.PUSH) ventilator_send.bind("tcp://127.0.0.1:5557") i=0 while True: i=i+1 time.sleep(0.5) print ">>sending message ",i try: ventilator_send.send(repr(i),zmq.NOBLOCK) print " succeed" except: print " failed" 

客户

poller对象可以监听多个接收套接字(请参阅上面链接的“Python多处理与ZeroMQ”,我只在work_receiver上链接了它,在无限循环中,客户端以1000ms的间隔轮询,如果没有, socks对象返回空消息已经在那个时候收到。

 import time import zmq context = zmq.Context() work_receiver = context.socket(zmq.PULL) work_receiver.connect("tcp://127.0.0.1:5557") poller = zmq.Poller() poller.register(work_receiver, zmq.POLLIN) # Loop and accept messages from both channels, acting accordingly while True: socks = dict(poller.poll(1000)) if socks: if socks.get(work_receiver) == zmq.POLLIN: print "got message ",work_receiver.recv(zmq.NOBLOCK) else: print "error: message timeout" 

如果使用ZMQ_NOBLOCK,发送将不会阻塞,但是如果尝试closures套接字和上下文,则此步骤将阻止程序退出。

原因是套接字等待任何对等体,以确保传出消息被排队。要立即closures套接字并从缓冲区中清除传出消息,请使用ZMQ_LINGER并将其设置为0 ..