如何限制Python中函数调用的执行时间
在我的代码中有一个与套接字相关的函数调用,那个函数来自另一个模块,因此不在我的控制之下,问题是它偶尔会阻塞几个小时,这是完全不可接受的,我怎样才能限制我的代码中的函数执行时间? 我想解决scheme必须利用另一个线程。
我不确定这可能是跨平台的,但使用信号和警报可能是一个很好的方法来看待这个问题。 通过一些小小的工作,你可以在任何情况下都做到这一点。
http://docs.python.org/library/signal.html
所以你的代码会看起来像这样。
import signal def signal_handler(signum, frame): raise Exception("Timed out!") signal.signal(signal.SIGALRM, signal_handler) signal.alarm(10) # Ten seconds try: long_function_call() except Exception, msg: print "Timed out!"
@ rik.the.vik的答案的一个改进是使用with
语句给超时函数一些语法糖:
import signal from contextlib import contextmanager class TimeoutException(Exception): pass @contextmanager def time_limit(seconds): def signal_handler(signum, frame): raise TimeoutException("Timed out!") signal.signal(signal.SIGALRM, signal_handler) signal.alarm(seconds) try: yield finally: signal.alarm(0) try: with time_limit(10): long_function_call() except TimeoutException as e: print("Timed out!")
以下是限制一个函数运行时间的Linux / OSX方法。 这是在你不想使用线程的情况下,并希望你的程序等待,直到函数结束,或者时间限制到期。
from multiprocessing import Process from time import sleep def f(time): sleep(time) def run_with_limited_time(func, args, kwargs, time): """Runs a function with time limit :param func: The function to run :param args: The functions args, given as tuple :param kwargs: The functions keywords, given as dict :param time: The time limit in seconds :return: True if the function ended successfully. False if it was terminated. """ p = Process(target=func, args=args, kwargs=kwargs) p.start() p.join(time) if p.is_alive(): p.terminate() return False return True if __name__ == '__main__': print run_with_limited_time(f, (1.5, ), {}, 2.5) # True print run_with_limited_time(f, (3.5, ), {}, 2.5) # False
在信号处理程序中执行此操作是非常危险的:在引发exception时,您可能在exception处理程序中,并使事情处于中断状态。 例如,
def function_with_enforced_timeout(): f = open_temporary_file() try: ... finally: here() unlink(f.filename)
如果您的exception在此处引发(),则临时文件将永远不会被删除。
这里的解决scheme是推迟asynchronousexception,直到代码不在exception处理代码(除了finally块)之外,但Python不会这样做。
请注意,执行本机代码时不会中断任何操作。 它只会在函数返回时中断它,所以这可能不会帮助这个特定的情况。 (SIGALRM本身可能会中断阻塞的调用 – 但是套接字代码通常在EINTR之后重试。)
这样做与线程是一个更好的主意,因为它比信号更便携。 由于您正在启动一个工作线程并阻塞,直到完成,所以没有任何常见的并发问题。 不幸的是,没有办法asynchronous传递exception到Python中的另一个线程(其他线程API可以做到这一点)。 在exception处理程序中发送exception也会有同样的问题,并且需要相同的修复。
以任何语言来做这个事情的唯一“安全”方法是使用辅助进程来完成超时工作,否则就需要以自己安全的方式构build代码,例如通过检查循环或相似的时间。 如果改变方法不是一个选项,一个线程是不够的。
为什么? 因为当你这样做的时候,你会冒险把事情弄糟。 如果线程在方法中被简单地杀死,锁等等将被保存,并且不能被释放。
所以看stream程的方式,不要看线程的方式。
我更喜欢上下文pipe理器方法,因为它允许在with time_limit
语句中执行多个python语句。 因为windows系统没有SIGALARM
,所以更便携,也许更简单的方法是使用Timer
from contextlib import contextmanager import threading import _thread class TimeoutException(Exception): def __init__(self, msg=''): self.msg = msg @contextmanager def time_limit(seconds, msg=''): timer = threading.Timer(seconds, lambda: _thread.interrupt_main()) timer.start() try: yield except KeyboardInterrupt: raise TimeoutException("Timed out for operation {}".format(msg)) finally: # if the action ends in specified time, timer is canceled timer.cancel() import time # ends after 5 seconds with time_limit(5, 'sleep'): for i in range(10): time.sleep(1) # this will actually end after 10 seconds with time_limit(5, 'sleep'): time.sleep(10)
这里的关键技术是使用_thread.interrupt_main
从计时器线程中断主线程。 一个需要注意的是,主线程并不总是响应由Timer
引发的KeyboardInterrupt
。 例如, time.sleep()
调用一个系统函数,以便在sleep
呼叫之后处理KeyboardInterrupt
。
你不必使用线程。 您可以使用其他进程来执行阻塞工作,例如可能使用subprocess模块。 如果你想在你的程序的不同部分之间共享数据结构, Twisted是一个伟大的库,可以让你控制这个,如果你关心阻塞,并且期望遇到这个麻烦,我会推荐它。 Twisted的坏消息是你必须重写你的代码,以避免任何阻塞,并有一个公平的学习曲线。
你可以使用线程来避免阻塞,但我认为这是最后的手段,因为它使你暴露在一个痛苦的世界里。 甚至在考虑在生产中使用线程(例如Jean Bacon的“并发系统”)之前,请阅读一本关于并发性的好书。 我和一群真正用线程做酷的高性能的人一起工作,除非我们真的需要它们,否则我们不会把线程引入到项目中。
这是一个超时function,我想我通过谷歌发现,它适用于我。
来自: http : //code.activestate.com/recipes/473878/
def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None): '''This function will spwan a thread and run the given function using the args, kwargs and return the given default value if the timeout_duration is exceeded ''' import threading class InterruptableThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.result = default def run(self): try: self.result = func(*args, **kwargs) except: self.result = default it = InterruptableThread() it.start() it.join(timeout_duration) if it.isAlive(): return it.result else: return it.result
我通常更喜欢使用@ josh-leebuild议的上下文pipe理器
但是如果有人有兴趣把这个实现成一个装饰器,这里有一个select。
以下是它的样子:
import time from timeout import timeout class Test(object): @timeout(2) def test_a(self, foo, bar): print foo time.sleep(1) print bar return 'A Done' @timeout(2) def test_b(self, foo, bar): print foo time.sleep(3) print bar return 'B Done' t = Test() print t.test_a('python', 'rocks') print t.test_b('timing', 'out')
这是timeout.py
模块:
import threading class TimeoutError(Exception): pass class InterruptableThread(threading.Thread): def __init__(self, func, *args, **kwargs): threading.Thread.__init__(self) self._func = func self._args = args self._kwargs = kwargs self._result = None def run(self): self._result = self._func(*self._args, **self._kwargs) @property def result(self): return self._result class timeout(object): def __init__(self, sec): self._sec = sec def __call__(self, f): def wrapped_f(*args, **kwargs): it = InterruptableThread(f, *args, **kwargs) it.start() it.join(self._sec) if not it.is_alive(): return it.result raise TimeoutError('execution expired') return wrapped_f
输出:
python rocks A Done timing Traceback (most recent call last): ... timeout.TimeoutError: execution expired out
请注意,即使引发了TimeoutError
,装饰的方法也将继续在不同的线程中运行。 如果你还想让这个线程“停止”,请参阅: 有什么办法杀死Python中的线程?