Python threading.timer – 每“n”秒重复一次函数
我在python计时器方面遇到困难,非常感谢一些build议或帮助:D
我不太了解线程是如何工作的,但我只想每0.5秒发一个函数,并能够启动和停止并重置计时器。
但是,我不断得到RuntimeError: threads can only be started once
执行RuntimeError: threads can only be started once
当我执行threading.timer.start()
两次。 有没有解决这个问题? 我尝试在每次启动之前应用threading.timer.cancel()
。
伪代码:
t=threading.timer(0.5,function) while True: t.cancel() t.start()
最好的方法是启动一次定时器线程。 在你的计时器线程中,你可以编写以下代码
class MyThread(Thread): def __init__(self, event): Thread.__init__(self) self.stopped = event def run(self): while not self.stopped.wait(0.5): print("my thread") # call a function
在启动定时器的代码中,可以set
停止的事件来停止定时器。
stopFlag = Event() thread = MyThread(stopFlag) thread.start() # this will stop the timer stopFlag.set()
使用计时器线程 –
from threading import Timer,Thread,Event class perpetualTimer(): def __init__(self,t,hFunction): self.t=t self.hFunction = hFunction self.thread = Timer(self.t,self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t,self.handle_function) self.thread.start() def start(self): self.thread.start() def cancel(self): self.thread.cancel() def printer(): print 'ipsem lorem' t = perpetualTimer(5,printer) t.start()
这可以通过t.cancel()
来停止
从python中的setInterval的等价 :
import threading def setInterval(interval): def decorator(function): def wrapper(*args, **kwargs): stopped = threading.Event() def loop(): # executed in another thread while not stopped.wait(interval): # until stopped function(*args, **kwargs) t = threading.Thread(target=loop) t.daemon = True # stop if the program exits t.start() return stopped return wrapper return decorator
用法:
@setInterval(.5) def function(): "..." stop = function() # start timer, the first call is in .5 seconds stop.set() # stop the loop stop = function() # start new timer # ... stop.set()
或者,这是相同的function,但作为一个独立的function,而不是装饰 :
cancel_future_calls = call_repeatedly(60, print, "Hello, World") # ... cancel_future_calls()
这是如何做到这一点,而不使用线程 。
为了提供一个正确的答案使用计时器作为OP请求,我会改进swapnil jariwala的答案 :
from threading import Timer import time class InfiniteTimer(): """A Timer class that does not stop, unless you want it to.""" def __init__(self, seconds, target): self._should_continue = False self.is_running = False self.seconds = seconds self.target = target self.thread = None def _handle_target(self): self.is_running = True self.target() self.is_running = False self._start_timer() def _start_timer(self): if self._should_continue: # Code could have been running when cancel was called. self.thread = Timer(self.seconds, self._handle_target) self.thread.start() def start(self): if not self._should_continue and not self.is_running: self._should_continue = True self._start_timer() else: print("Timer already started or running, please wait if you're restarting.") def cancel(self): if self.thread is not None: self._should_continue = False # Just in case thread is running and cancel fails. self.thread.cancel() else: print("Timer never started or failed to initialize.") def tick(): print('ipsem lorem') # Example Usage t = InfiniteTimer(0.5, tick) t.start()
导入时间是可选的,没有示例用法。
我已经实现了一个作为计时器的类。
我留下的链接在这里,以防万一需要它: https : //github.com/ivanhalencp/python/tree/master/xTimer
我已经改变了swapnil-jariwala代码中的一些代码来制作一个小控制台时钟。
from threading import Timer, Thread, Event from datetime import datetime class perpetualTimer(): def __init__(self, t, hFunction): self.t = t self.hFunction = hFunction self.thread = Timer(self.t, self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t, self.handle_function) self.thread.start() def start(self): self.thread.start() def cancel(self): self.thread.cancel() def printer(): tempo = datetime.today() print("{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second)) t = perpetualTimer(1, printer) t.start()
OUTPUT
>>> 11:39:11 11:39:12 11:39:13 11:39:14 11:39:15 11:39:16 ...
计时器与tkintergraphics界面
这段代码把时钟定时器放在tkinter的一个小窗口中
from threading import Timer, Thread, Event from datetime import datetime import tkinter as tk app = tk.Tk() lab = tk.Label(app, text="Timer will start in a sec") lab.pack() class perpetualTimer(): def __init__(self, t, hFunction): self.t = t self.hFunction = hFunction self.thread = Timer(self.t, self.handle_function) def handle_function(self): self.hFunction() self.thread = Timer(self.t, self.handle_function) self.thread.start() def start(self): self.thread.start() def cancel(self): self.thread.cancel() def printer(): tempo = datetime.today() clock = "{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second) try: lab['text'] = clock except RuntimeError: exit() t = perpetualTimer(1, printer) t.start() app.mainloop()
from threading import Timer def TaskManager(): #do stuff t = Timer( 1, TaskManager ); t.start() TaskManager();
这里是小样本,这将有助于了解它如何运行。 函数taskManager()在最后创build延迟函数调用它自己。
尝试改变“达赖”variables,你将能够看到不同
from threading import Timer, _sleep # ------------------------------------------ DATA = []; dalay = 0.25; # sec counter = 0; allow_run = True; FIFO = True; def taskManager(): global counter, DATA, delay, allow_run; counter += 1; if len(DATA) > 0: if FIFO: print("["+str(counter)+"] new data: ["+str(DATA.pop(0))+"]"); else: print("["+str(counter)+"] new data: ["+str(DATA.pop())+"]"); else: print("["+str(counter)+"] no data"); if allow_run: #delayed method/function call to it self t = Timer( dalay, taskManager ); t.start(); else: print(" END task-manager: disabled"); # ------------------------------------------ def main(): DATA.append("data from main(): 0"); _sleep(2); DATA.append("data from main(): 1"); _sleep(2); # ------------------------------------------ print(" START task-manager:"); taskManager(); _sleep(2); DATA.append("first data"); _sleep(2); DATA.append("second data"); print(" START main():"); main(); print(" END main():"); _sleep(2); DATA.append("last data"); allow_run = False;
我必须为一个项目做这个。 我最终做的是为这个函数启动一个单独的线程
t = threading.Thread(target =heartbeat, args=(worker,)) t.start()
****心跳是我的function,工作人员是我的一个论据****
在我的心跳function里面:
def heartbeat(worker): while True: time.sleep(5) #all of my code
所以当我启动线程时,函数会重复等待5秒钟,运行我所有的代码,并无限期地执行。 如果你想杀死进程就杀死线程。