每n秒运行一段代码
有没有办法打印Hello World!
每n秒? 例如,程序会通过我有的任何代码,然后一旦它已经5秒( time.sleep()
)它将执行该代码。 我会用这个来更新一个文件,而不是打印Hello World。
例如:
startrepeat("print('Hello World')", .01) # Repeats print('Hello World') ever .01 seconds for i in range(5): print(i) >> Hello World! >> 0 >> 1 >> 2 >> Hello World! >> 3 >> Hello World! >> 4
import threading def printit(): threading.Timer(5.0, printit).start() print "Hello, World!" printit() # continue with the rest of your code
我谦卑的承担了这个问题,一个普遍的亚历克斯Martelli的答案,开始()和停止()控制:
from threading import Timer class RepeatedTimer(object): def __init__(self, interval, function, *args, **kwargs): self._timer = None self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.is_running = False self.start() def _run(self): self.is_running = False self.start() self.function(*self.args, **self.kwargs) def start(self): if not self.is_running: self._timer = Timer(self.interval, self._run) self._timer.start() self.is_running = True def stop(self): self._timer.cancel() self.is_running = False
用法:
from time import sleep def hello(name): print "Hello %s!" % name print "starting..." rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start() try: sleep(5) # your long-running job goes here... finally: rt.stop() # better in a try/finally block to make sure the program ends!
特征:
- 只有标准库,没有外部依赖
- 即使定时器已经开始/停止,
start()
和stop()
也可以安全地多次调用 - 函数被调用可以有位置和命名参数
- 您可以随时更改
interval
,下次运行后生效。 同样的args
,kwargs
甚至function
!
保存一个精神分裂的情节,并使用高级Python调度: http : //pythonhosted.org/APScheduler
代码非常简单:
from apscheduler.scheduler import Scheduler sched = Scheduler() sched.start() def some_job(): print "Every 10 seconds" sched.add_interval_job(some_job, seconds = 10) .... sched.shutdown()
def update(): import time while True: print 'Hello World!' time.sleep(5)
这将作为一个function运行。 while True:
使它永远运行。 如果你需要,你可以随时把它拿出来。
这是一个简单的例子与APScheduler 3.00+兼容:
# note that there are many other schedulers available from apscheduler.schedulers.background import BackgroundScheduler sched = BackgroundScheduler() def some_job(): print('Every 10 seconds') # seconds can be replaced with minutes, hours, or days sched.add_job(some_job, 'interval', seconds=10) sched.start() ... sched.shutdown()
或者,您可以使用以下内容。 与其他许多select不同的是,这个定时器将精确地每n秒执行一次所需的代码(不pipe代码执行的时间如何)。 所以这是一个很好的select,如果你无法承受任何漂移。
import time from threading import Event, Thread class RepeatedTimer: """Repeat `function` every `interval` seconds.""" def __init__(self, interval, function, *args, **kwargs): self.interval = interval self.function = function self.args = args self.kwargs = kwargs self.start = time.time() self.event = Event() self.thread = Thread(target=self._target) self.thread.start() def _target(self): while not self.event.wait(self._time): self.function(*self.args, **self.kwargs) @property def _time(self): return self.interval - ((time.time() - self.start) % self.interval) def stop(self): self.event.set() self.thread.join() # start timer timer = RepeatedTimer(10, print, 'Hello world') # stop timer timer.stop()
这是一个不会每隔n
秒创build一个新线程的版本:
from threading import Event, Thread def call_repeatedly(interval, func, *args): stopped = Event() def loop(): while not stopped.wait(interval): # the first call is in `interval` secs func(*args) Thread(target=loop).start() return stopped.set
该事件用于停止重复:
cancel_future_calls = call_repeatedly(5, print, "Hello, World") # do something else here... cancel_future_calls() # stop future calls
请参阅改进setInterval python的当前实现
你可以开始一个单独的线程,其唯一的职责是计数5秒,更新文件,重复。 你不希望这个单独的线程干扰你的主线程。