什么是在Python中每隔x秒重复执行一次函数的最好方法?

我想每隔60秒一次在Python中反复执行一个函数(就像Objective C中的NSTimer一样)。 这段代码将作为守护进程运行,实际上就像使用cron每分钟调用python脚本一样,但不需要由用户设置。

在这个关于在Python中实现的cron的问题中 ,该解决scheme似乎只能在x秒内睡眠() 。 我不需要这样的高级function,所以也许这样的事情会起作用

while True: # Code executed here time.sleep(60) 

这个代码有没有可预见的问题?

使用调度模块,它实现了一个通用的事件调度器。

 import sched, time s = sched.scheduler(time.time, time.sleep) def do_something(sc): print "Doing stuff..." # do your stuff s.enter(60, 1, do_something, (sc,)) s.enter(60, 1, do_something, (s,)) s.run() 

只要locking你的时间循环到系统时钟。 简单。

 import time starttime=time.time() while True: print "tick" time.sleep(60.0 - ((time.time() - starttime) % 60.0)) 

你可能想考虑Twisted ,它是一个实现Reactor模式的pythonnetworking库。

 from twisted.internet import task from twisted.internet import reactor timeout = 60.0 # Sixty seconds def doWork(): #do work here pass l = task.LoopingCall(doWork) l.start(timeout) # call every sixty seconds reactor.run() 

虽然“True:sleep(60)”可能会起作用Twisted可能已经实现了许多您最终需要的function(如bobince指出的守护进程,日志logging或exception处理),并且可能会是一个更强大的解决scheme

我认为更简单的方法是:

 import time def executeSomething(): #code here time.sleep(60) while True: executeSomething() 

这样你的代码被执行,然后等待60秒,然后再次执行,等待,执行等等…不需要复杂的事情:D

如果你想要一个非阻塞的方式来定期执行你的函数,而不是阻塞无限循环,我会使用线程计时器。 这样你的代码就可以继续运行并执行其他任务,并且每n秒钟都会调用你的函数。 我使用这种技术在很长的CPU /磁盘/networking密集型任务上打印进度信息。

下面是我在类似问题中发布的代码,使用start()和stop()控件:

 from threading import Timer class RepeatedTimer(object): def __init__(self, interval, function, *args, **kwargs): self._timer = None self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.is_running = False self.start() def _run(self): self.is_running = False self.start() self.function(*self.args, **self.kwargs) def start(self): if not self.is_running: self._timer = Timer(self.interval, self._run) self._timer.start() self.is_running = True def stop(self): self._timer.cancel() self.is_running = False 

用法:

 from time import sleep def hello(name): print "Hello %s!" % name print "starting..." rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start() try: sleep(5) # your long-running job goes here... finally: rt.stop() # better in a try/finally block to make sure the program ends! 

特征:

  • 只有标准库,没有外部依赖
  • 即使定时器已经开始/停止, start()stop()也可以安全地多次调用
  • 函数被调用可以有位置和命名参数
  • 您可以随时更改interval ,下次运行后生效。 同样的argskwargs甚至function

以下是MestreLion代码的更新,可以避免一段时间的stream失:

 class RepeatedTimer(object): def __init__(self, interval, function, *args, **kwargs): self._timer = None self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.is_running = False self.next_call = time.time() self.start() def _run(self): self.is_running = False self.start() self.function(*self.args, **self.kwargs) def start(self): if not self.is_running: self.next_call += self.interval self._timer = threading.Timer(self.next_call - time.time(), self._run) self._timer.start() self.is_running = True def stop(self): self._timer.cancel() self.is_running = False 

我后来遇到了类似的问题。 可能是http://cronus.readthedocs.org可能有帮助吗?;

对于v0.2,以下片段起作用

 import cronus.beat as beat beat.set_rate(2) # 2 Hz while beat.true(): # do some time consuming work here beat.sleep() # total loop duration would be 0.5 sec 

那和cron之间的主要区别是,一个例外会杀死守护进程。 你可能想要包装一个exception捕获器和logging器。

一个可能的答案:

 import time t=time.time() while True: if time.time()-t>10: #run your task here t=time.time() 

我用这个来引起每小时60个事件,大部分事件发生在整个分钟后相同的秒数上:

 import math import time import random TICK = 60 # one minute tick size TICK_TIMING = 59 # execute on 59th second of the tick TICK_MINIMUM = 30 # minimum catch up tick size when lagging def set_timing(): now = time.time() elapsed = now - info['begin'] minutes = math.floor(elapsed/TICK) tick_elapsed = now - info['completion_time'] if (info['tick']+1) > minutes: wait = max(0,(TICK_TIMING-(time.time() % TICK))) print ('standard wait: %.2f' % wait) time.sleep(wait) elif tick_elapsed < TICK_MINIMUM: wait = TICK_MINIMUM-tick_elapsed print ('minimum wait: %.2f' % wait) time.sleep(wait) else: print ('skip set_timing(); no wait') drift = ((time.time() - info['begin']) - info['tick']*TICK - TICK_TIMING + info['begin']%TICK) print ('drift: %.6f' % drift) info['tick'] = 0 info['begin'] = time.time() info['completion_time'] = info['begin'] - TICK while 1: set_timing() print('hello world') #random real world event time.sleep(random.random()*TICK_MINIMUM) info['tick'] += 1 info['completion_time'] = time.time() 

根据实际情况,您可能会得到长度的刻度:

 60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc. 

但在60分钟结束时,你会有60个滴答声; 他们中的大多数将会以您喜欢的分钟的正确偏移量出现。

在我的系统上,我得到典型的<1/20秒的漂移,直到需要更正。

这种方法的优点是时钟漂移的分辨率; 这可能会导致问题,如果你正在做的事情,例如每个滴答附加一个项目,你期望每小时追加60个项目。 不考虑漂移可能会导致二次显示,如移动平均值将数据视为太深,导致错误的输出。

例如显示当前本地时间

 import datetime import glib import logger def get_local_time(): current_time = datetime.datetime.now().strftime("%H:%M") logger.info("get_local_time(): %s",current_time) return str(current_time) def display_local_time(): logger.info("Current time is: %s", get_local_time()) # call every minute glib.timeout_add(60*1000, display_local_time)