什么是在Python中每隔x秒重复执行一次函数的最好方法?
我想每隔60秒一次在Python中反复执行一个函数(就像Objective C中的NSTimer一样)。 这段代码将作为守护进程运行,实际上就像使用cron每分钟调用python脚本一样,但不需要由用户设置。
在这个关于在Python中实现的cron的问题中 ,该解决scheme似乎只能在x秒内睡眠() 。 我不需要这样的高级function,所以也许这样的事情会起作用
while True: # Code executed here time.sleep(60)
这个代码有没有可预见的问题?
使用调度模块,它实现了一个通用的事件调度器。
import sched, time s = sched.scheduler(time.time, time.sleep) def do_something(sc): print "Doing stuff..." # do your stuff s.enter(60, 1, do_something, (sc,)) s.enter(60, 1, do_something, (s,)) s.run()
只要locking你的时间循环到系统时钟。 简单。
import time starttime=time.time() while True: print "tick" time.sleep(60.0 - ((time.time() - starttime) % 60.0))
你可能想考虑Twisted ,它是一个实现Reactor模式的pythonnetworking库。
from twisted.internet import task from twisted.internet import reactor timeout = 60.0 # Sixty seconds def doWork(): #do work here pass l = task.LoopingCall(doWork) l.start(timeout) # call every sixty seconds reactor.run()
虽然“True:sleep(60)”可能会起作用Twisted可能已经实现了许多您最终需要的function(如bobince指出的守护进程,日志logging或exception处理),并且可能会是一个更强大的解决scheme
我认为更简单的方法是:
import time def executeSomething(): #code here time.sleep(60) while True: executeSomething()
这样你的代码被执行,然后等待60秒,然后再次执行,等待,执行等等…不需要复杂的事情:D
如果你想要一个非阻塞的方式来定期执行你的函数,而不是阻塞无限循环,我会使用线程计时器。 这样你的代码就可以继续运行并执行其他任务,并且每n秒钟都会调用你的函数。 我使用这种技术在很长的CPU /磁盘/networking密集型任务上打印进度信息。
下面是我在类似问题中发布的代码,使用start()和stop()控件:
from threading import Timer class RepeatedTimer(object): def __init__(self, interval, function, *args, **kwargs): self._timer = None self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.is_running = False self.start() def _run(self): self.is_running = False self.start() self.function(*self.args, **self.kwargs) def start(self): if not self.is_running: self._timer = Timer(self.interval, self._run) self._timer.start() self.is_running = True def stop(self): self._timer.cancel() self.is_running = False
用法:
from time import sleep def hello(name): print "Hello %s!" % name print "starting..." rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start() try: sleep(5) # your long-running job goes here... finally: rt.stop() # better in a try/finally block to make sure the program ends!
特征:
- 只有标准库,没有外部依赖
- 即使定时器已经开始/停止,
start()
和stop()
也可以安全地多次调用 - 函数被调用可以有位置和命名参数
- 您可以随时更改
interval
,下次运行后生效。 同样的args
,kwargs
甚至function
!
以下是MestreLion代码的更新,可以避免一段时间的stream失:
class RepeatedTimer(object): def __init__(self, interval, function, *args, **kwargs): self._timer = None self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.is_running = False self.next_call = time.time() self.start() def _run(self): self.is_running = False self.start() self.function(*self.args, **self.kwargs) def start(self): if not self.is_running: self.next_call += self.interval self._timer = threading.Timer(self.next_call - time.time(), self._run) self._timer.start() self.is_running = True def stop(self): self._timer.cancel() self.is_running = False
我后来遇到了类似的问题。 可能是http://cronus.readthedocs.org可能有帮助吗?;
对于v0.2,以下片段起作用
import cronus.beat as beat beat.set_rate(2) # 2 Hz while beat.true(): # do some time consuming work here beat.sleep() # total loop duration would be 0.5 sec
那和cron之间的主要区别是,一个例外会杀死守护进程。 你可能想要包装一个exception捕获器和logging器。
一个可能的答案:
import time t=time.time() while True: if time.time()-t>10: #run your task here t=time.time()
我用这个来引起每小时60个事件,大部分事件发生在整个分钟后相同的秒数上:
import math import time import random TICK = 60 # one minute tick size TICK_TIMING = 59 # execute on 59th second of the tick TICK_MINIMUM = 30 # minimum catch up tick size when lagging def set_timing(): now = time.time() elapsed = now - info['begin'] minutes = math.floor(elapsed/TICK) tick_elapsed = now - info['completion_time'] if (info['tick']+1) > minutes: wait = max(0,(TICK_TIMING-(time.time() % TICK))) print ('standard wait: %.2f' % wait) time.sleep(wait) elif tick_elapsed < TICK_MINIMUM: wait = TICK_MINIMUM-tick_elapsed print ('minimum wait: %.2f' % wait) time.sleep(wait) else: print ('skip set_timing(); no wait') drift = ((time.time() - info['begin']) - info['tick']*TICK - TICK_TIMING + info['begin']%TICK) print ('drift: %.6f' % drift) info['tick'] = 0 info['begin'] = time.time() info['completion_time'] = info['begin'] - TICK while 1: set_timing() print('hello world') #random real world event time.sleep(random.random()*TICK_MINIMUM) info['tick'] += 1 info['completion_time'] = time.time()
根据实际情况,您可能会得到长度的刻度:
60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.
但在60分钟结束时,你会有60个滴答声; 他们中的大多数将会以您喜欢的分钟的正确偏移量出现。
在我的系统上,我得到典型的<1/20秒的漂移,直到需要更正。
这种方法的优点是时钟漂移的分辨率; 这可能会导致问题,如果你正在做的事情,例如每个滴答附加一个项目,你期望每小时追加60个项目。 不考虑漂移可能会导致二次显示,如移动平均值将数据视为太深,导致错误的输出。
例如显示当前本地时间
import datetime import glib import logger def get_local_time(): current_time = datetime.datetime.now().strftime("%H:%M") logger.info("get_local_time(): %s",current_time) return str(current_time) def display_local_time(): logger.info("Current time is: %s", get_local_time()) # call every minute glib.timeout_add(60*1000, display_local_time)
- 有没有更准确的方法来创build比setTimeout的Javascript计时器?
- System.Threading.Timer在C#中似乎不工作。 它运行速度非常快,每3秒
- 如果一个计时器在新周期时间到来之前无法完成所有工作,该怎么办?
- gettimeofday()保证微秒的分辨率?
- System.Timers.Timer / Threading.Timer与线程与WhileLoop + Thread.Sleep的周期性任务
- C ++在Linux上获取毫秒时间 – clock()似乎不能正常工作
- 弹簧行为模拟
- Timer和TimerTask与Java中的Thread + sleep
- Timertask或Handler