超时使用模块“subprocess”
下面是运行任意命令返回stdout
数据的Python代码,或者引发非零退出代码的exception:
proc = subprocess.Popen( cmd, stderr=subprocess.STDOUT, # Merge stdout and stderr stdout=subprocess.PIPE, shell=True)
communicate
是用来等待进程退出:
stdoutdata, stderrdata = proc.communicate()
subprocess
进程模块不支持超时 – 杀死运行超过X秒的进程的能力 – 因此, communicate
可能需要永远运行。
在Windows和Linux上运行的Python程序中实现超时的最简单方法是什么?
在Python 3.3+中:
from subprocess import STDOUT, check_output output = check_output(cmd, stderr=STDOUT, timeout=seconds)
output
是一个包含命令的合并标准输出stderr数据的字节string。
与proc.communicate()
方法不同,此代码在问题文本中指定的非零退出状态上引发CalledProcessError
。
我已经删除了shell=True
因为它经常被不必要地使用。 如果cmd
确实需要的话,你可以随时添加它。 如果你添加shell=True
,如果subprocess产生自己的后代, check_output()
返回的时间可能会比timeout指示晚得多,请参阅subprocess超时失败 。
超时function在Python 2.x上通过3.2+subprocess模块的subprocess32 backport提供。
我对低层次的细节知之甚less, 但是,考虑到在Python 2.6中,API提供了等待线程和终止进程的能力,那么在单独的线程中运行进程呢?
import subprocess, threading class Command(object): def __init__(self, cmd): self.cmd = cmd self.process = None def run(self, timeout): def target(): print 'Thread started' self.process = subprocess.Popen(self.cmd, shell=True) self.process.communicate() print 'Thread finished' thread = threading.Thread(target=target) thread.start() thread.join(timeout) if thread.is_alive(): print 'Terminating process' self.process.terminate() thread.join() print self.process.returncode command = Command("echo 'Process started'; sleep 2; echo 'Process finished'") command.run(timeout=3) command.run(timeout=1)
这个片段在我的机器上的输出是:
Thread started Process started Process finished Thread finished 0 Thread started Process started Terminating process Thread finished -15
在这里可以看出,在第一次执行中,进程正确地完成(返回代码0),而在第二次执行中,进程终止(返回码-15)。
我没有在窗口中testing; 但除了更新示例命令,我认为它应该工作,因为我没有在文档中发现任何说thread.join或process.terminate不支持。
如果你在Unix上,
import signal ... class Alarm(Exception): pass def alarm_handler(signum, frame): raise Alarm signal.signal(signal.SIGALRM, alarm_handler) signal.alarm(5*60) # 5 minutes try: stdoutdata, stderrdata = proc.communicate() signal.alarm(0) # reset the alarm except Alarm: print "Oops, taking too long!" # whatever else
jcollado的答案可以使用threading.Timer类来简化:
import subprocess, shlex from threading import Timer def run(cmd, timeout_sec): proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE) kill_proc = lambda p: p.kill() timer = Timer(timeout_sec, kill_proc, [proc]) try: timer.start() stdout,stderr = proc.communicate() finally: timer.cancel()
这里是Alex Martelli的解决scheme,作为一个适当的进程查杀模块。 其他方法不起作用,因为它们不使用proc.communicate()。 所以如果你有一个产生大量输出的进程,它将填充它的输出缓冲区,然后阻塞,直到你从中读取一些东西。
from os import kill from signal import alarm, signal, SIGALRM, SIGKILL from subprocess import PIPE, Popen def run(args, cwd = None, shell = False, kill_tree = True, timeout = -1, env = None): ''' Run a command with a timeout after which it will be forcibly killed. ''' class Alarm(Exception): pass def alarm_handler(signum, frame): raise Alarm p = Popen(args, shell = shell, cwd = cwd, stdout = PIPE, stderr = PIPE, env = env) if timeout != -1: signal(SIGALRM, alarm_handler) alarm(timeout) try: stdout, stderr = p.communicate() if timeout != -1: alarm(0) except Alarm: pids = [p.pid] if kill_tree: pids.extend(get_process_children(p.pid)) for pid in pids: # process might have died before getting to this line # so wrap to avoid OSError: no such process try: kill(pid, SIGKILL) except OSError: pass return -9, '', '' return p.returncode, stdout, stderr def get_process_children(pid): p = Popen('ps --no-headers -o pid --ppid %d' % pid, shell = True, stdout = PIPE, stderr = PIPE) stdout, stderr = p.communicate() return [int(p) for p in stdout.split()] if __name__ == '__main__': print run('find /', shell = True, timeout = 3) print run('find', shell = True)
我修改了sussudio的答案。 现在函数返回:( returncode
, stdout
, stderr
, timeout
) – stdout
和stderr
被解码为utf-8string
def kill_proc(proc, timeout): timeout["value"] = True proc.kill() def run(cmd, timeout_sec): proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE) timeout = {"value": False} timer = Timer(timeout_sec, kill_proc, [proc, timeout]) timer.start() stdout, stderr = proc.communicate() timer.cancel() return proc.returncode, stdout.decode("utf-8"), stderr.decode("utf-8"), timeout["value"]
惊讶没有人提到使用timeout
timeout 5 ping -c 3 somehost
这显然不适合每个用例的工作,但如果你处理一个简单的脚本,这是很难打败的。
对于mac用户,也可以通过homebrew
在coreutils中作为gtimeout使用。
另一个select是写入临时文件,以防止stdout阻塞,而不是通过poll()进行轮询。 这对其他答案没有的地方是有效的。 例如在窗户上。
outFile = tempfile.SpooledTemporaryFile() errFile = tempfile.SpooledTemporaryFile() proc = subprocess.Popen(args, stderr=errFile, stdout=outFile, universal_newlines=False) wait_remaining_sec = timeout while proc.poll() is None and wait_remaining_sec > 0: time.sleep(1) wait_remaining_sec -= 1 if wait_remaining_sec <= 0: killProc(proc.pid) raise ProcessIncompleteError(proc, timeout) # read temp streams from start outFile.seek(0); errFile.seek(0); out = outFile.read() err = errFile.read() outFile.close() errFile.close()
timeout
现在由subprocess模块中的call()
和communicate()
(从Python3.3开始)支持:
import subprocess subprocess.call("command", timeout=20, shell=True)
这将调用命令并引发exception
subprocess.TimeoutExpired
如果命令20秒后没有完成。
然后你可以处理这个exception来继续你的代码,如下所示:
try: subprocess.call("command", timeout=20, shell=True) except subprocess.TimeoutExpired: # insert code here
希望这可以帮助。
这是我的解决scheme,我正在使用线程和事件:
import subprocess from threading import Thread, Event def kill_on_timeout(done, timeout, proc): if not done.wait(timeout): proc.kill() def exec_command(command, timeout): done = Event() proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) watcher = Thread(target=kill_on_timeout, args=(done, timeout, proc)) watcher.daemon = True watcher.start() data, stderr = proc.communicate() done.set() return data, stderr, proc.returncode
在行动:
In [2]: exec_command(['sleep', '10'], 5) Out[2]: ('', '', -9) In [3]: exec_command(['sleep', '10'], 11) Out[3]: ('', '', 0)
我使用的解决scheme是用timelimit作为shell命令的前缀。 如果指令太长,timelimit会停止,并且Popen会有一个由timelimit设置的返回码。 如果是> 128,则意味着时间限制杀死了进程。
另请参见pythonsubprocess超时和大输出(> 64K)
我添加了从jcollado
到Python模块easyprocess的线程解决scheme。
安装:
pip install easyprocess
例:
from easyprocess import Proc # shell is not supported! stdout=Proc('ping localhost').call(timeout=1.5).stdout print stdout
我已经实现了我可以从其中几个收集的东西。 这适用于Windows,由于这是一个社区维基,我想我也会分享我的代码:
class Command(threading.Thread): def __init__(self, cmd, outFile, errFile, timeout): threading.Thread.__init__(self) self.cmd = cmd self.process = None self.outFile = outFile self.errFile = errFile self.timed_out = False self.timeout = timeout def run(self): self.process = subprocess.Popen(self.cmd, stdout = self.outFile, \ stderr = self.errFile) while (self.process.poll() is None and self.timeout > 0): time.sleep(1) self.timeout -= 1 if not self.timeout > 0: self.process.terminate() self.timed_out = True else: self.timed_out = False
然后从另一个课程或文件:
outFile = tempfile.SpooledTemporaryFile() errFile = tempfile.SpooledTemporaryFile() executor = command.Command(c, outFile, errFile, timeout) executor.daemon = True executor.start() executor.join() if executor.timed_out: out = 'timed out' else: outFile.seek(0) errFile.seek(0) out = outFile.read() err = errFile.read() outFile.close() errFile.close()
如果你使用Python 2,试试看
import subprocess32 try: output = subprocess32.check_output(command, shell=True, timeout=3) except subprocess32.TimeoutExpired as e: print e
我已经成功地在Windows,Linux和Mac上使用killableprocess 。 如果你正在使用Cygwin Python,你将需要OSAF的killableprocess版本,因为否则本机Windows进程不会被杀死。
虽然我没有广泛地看过,但是我在ActiveState中find的这个装饰器似乎对这类事情非常有用。 随着subprocess.Popen(..., close_fds=True)
,至less我已经准备好在Python的shell脚本。
一旦你了解了* unix中的完整进程运行机制,你将很容易find更简单的解决scheme:
考虑这个简单的例子,如何使用select.select()(现在几乎可以在* nix上的每一处都可用)进行可超时交stream()方法。 这也可以用epoll / poll / kqueue来写,但是select.select()变体可能是一个很好的例子。 而select.select()(速度和1024最大fds)的主要限制是不适用于您的任务。
这在* nix下工作,不创build线程,不使用信号,可以从任何线程(不仅主要),并快速读取我的机器上的标准输出(i5 2.3ghz)250mb / s的数据。
沟通结束时,joinstdout / stderr存在问题。 如果你有巨大的程序输出,这可能会导致大量的内存使用。 但是你可以用较小的超时来调用通信()几次。
class Popen(subprocess.Popen): def communicate(self, input=None, timeout=None): if timeout is None: return subprocess.Popen.communicate(self, input) if self.stdin: # Flush stdio buffer, this might block if user # has been writing to .stdin in an uncontrolled # fashion. self.stdin.flush() if not input: self.stdin.close() read_set, write_set = [], [] stdout = stderr = None if self.stdin and input: write_set.append(self.stdin) if self.stdout: read_set.append(self.stdout) stdout = [] if self.stderr: read_set.append(self.stderr) stderr = [] input_offset = 0 deadline = time.time() + timeout while read_set or write_set: try: rlist, wlist, xlist = select.select(read_set, write_set, [], max(0, deadline - time.time())) except select.error as ex: if ex.args[0] == errno.EINTR: continue raise if not (rlist or wlist): # Just break if timeout # Since we do not close stdout/stderr/stdin, we can call # communicate() several times reading data by smaller pieces. break if self.stdin in wlist: chunk = input[input_offset:input_offset + subprocess._PIPE_BUF] try: bytes_written = os.write(self.stdin.fileno(), chunk) except OSError as ex: if ex.errno == errno.EPIPE: self.stdin.close() write_set.remove(self.stdin) else: raise else: input_offset += bytes_written if input_offset >= len(input): self.stdin.close() write_set.remove(self.stdin) # Read stdout / stderr by 1024 bytes for fn, tgt in ( (self.stdout, stdout), (self.stderr, stderr), ): if fn in rlist: data = os.read(fn.fileno(), 1024) if data == '': fn.close() read_set.remove(fn) tgt.append(data) if stdout is not None: stdout = ''.join(stdout) if stderr is not None: stderr = ''.join(stderr) return (stdout, stderr)
你可以使用select
来做到这一点
import subprocess from datetime import datetime from select import select def call_with_timeout(cmd, timeout): started = datetime.now() sp = subprocess.Popen(cmd, stdout=subprocess.PIPE) while True: p = select([sp.stdout], [], [], timeout) if p[0]: p[0][0].read() ret = sp.poll() if ret is not None: return ret if (datetime.now()-started).total_seconds() > timeout: sp.kill() return None
不幸的是,我受到雇主披露源代码的严格限制,所以我不能提供实际的代码。 但是对于我的口味来说,最好的解决scheme是创build一个覆盖Popen.wait()
的子类来轮询而不是无限等待,而Popen.__init__
接受一个timeout参数。 一旦你这样做,所有其他的Popen
方法(呼叫wait
)将按预期工作,包括communicate
。
https://pypi.python.org/pypi/python-subprocess2提供了subprocess模块的扩展,允许您等待一段时间,否则终止。;
所以,等待最多10秒的过程终止,否则杀:
pipe = subprocess.Popen('...') timeout = 10 results = pipe.waitOrTerminate(timeout)
这与windows和unix兼容。 “结果”是一个字典,它包含“returnCode”,它是应用程序的返回(或者如果必须被杀死,则为None)以及“actionTaken”。 如果进程正常完成,那么它将是“SUBPROCESS2_PROCESS_COMPLETED”,或者取决于所采取的动作的掩码是“SUBPROCESS2_PROCESS_TERMINATED”和SUBPROCESS2_PROCESS_KILLED(详情请参阅文档)
此解决scheme在shell = True的情况下杀死进程树,将parameter passing给进程(或不进行),有超时并获取callback的stdout,stderr和进程输出(它使用psutil作为kill_proc_tree)。 这是基于几个解决scheme张贴在SO包括jcollado的。 张贴回应Anson和jradla在jcollado的回答中的评论。 testing在Windows Srvr 2012和Ubuntu 14.04。 请注意,对于Ubuntu,您需要将parent.children(…)调用更改为parent.get_children(…)。
def kill_proc_tree(pid, including_parent=True): parent = psutil.Process(pid) children = parent.children(recursive=True) for child in children: child.kill() psutil.wait_procs(children, timeout=5) if including_parent: parent.kill() parent.wait(5) def run_with_timeout(cmd, current_dir, cmd_parms, timeout): def target(): process = subprocess.Popen(cmd, cwd=current_dir, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) # wait for the process to terminate if (cmd_parms == ""): out, err = process.communicate() else: out, err = process.communicate(cmd_parms) errcode = process.returncode thread = Thread(target=target) thread.start() thread.join(timeout) if thread.is_alive(): me = os.getpid() kill_proc_tree(me, including_parent=False) thread.join()
有一个想法是inheritancePopen类,并用一些简单的方法装饰器来扩展它。 我们称之为ExpirablePopen。
from logging import error from subprocess import Popen from threading import Event from threading import Thread class ExpirablePopen(Popen): def __init__(self, *args, **kwargs): self.timeout = kwargs.pop('timeout', 0) self.timer = None self.done = Event() Popen.__init__(self, *args, **kwargs) def __tkill(self): timeout = self.timeout if not self.done.wait(timeout): error('Terminating process {} by timeout of {} secs.'.format(self.pid, timeout)) self.kill() def expirable(func): def wrapper(self, *args, **kwargs): # zero timeout means call of parent method if self.timeout == 0: return func(self, *args, **kwargs) # if timer is None, need to start it if self.timer is None: self.timer = thr = Thread(target=self.__tkill) thr.daemon = True thr.start() result = func(self, *args, **kwargs) self.done.set() return result return wrapper wait = expirable(Popen.wait) communicate = expirable(Popen.communicate) if __name__ == '__main__': from subprocess import PIPE print ExpirablePopen('ssh -T git@bitbucket.org', stdout=PIPE, timeout=1).communicate()
import subprocess, optparse, os, sys, re, datetime, threading, time, glob, shutil, xml.dom.minidom, traceback class OutputManager: def __init__(self, filename, mode, console, logonly): self.con = console self.logtoconsole = True self.logtofile = False if filename: try: self.f = open(filename, mode) self.logtofile = True if logonly == True: self.logtoconsole = False except IOError: print (sys.exc_value) print ("Switching to console only output...\n") self.logtofile = False self.logtoconsole = True def write(self, data): if self.logtoconsole == True: self.con.write(data) if self.logtofile == True: self.f.write(data) sys.stdout.flush() def getTimeString(): return time.strftime("%Y-%m-%d", time.gmtime()) def runCommand(command): ''' Execute a command in new thread and return the stdout and stderr content of it. ''' try: Output = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True).communicate()[0] except Exception as e: print ("runCommand failed :%s" % (command)) print (str(e)) sys.stdout.flush() return None return Output def GetOs(): Os = "" if sys.platform.startswith('win32'): Os = "win" elif sys.platform.startswith('linux'): Os = "linux" elif sys.platform.startswith('darwin'): Os = "mac" return Os def check_output(*popenargs, **kwargs): try: if 'stdout' in kwargs: raise ValueError('stdout argument not allowed, it will be overridden.') # Get start time. startTime = datetime.datetime.now() timeoutValue=3600 cmd = popenargs[0] if sys.platform.startswith('win32'): process = subprocess.Popen( cmd, stdout=subprocess.PIPE, shell=True) elif sys.platform.startswith('linux'): process = subprocess.Popen( cmd , stdout=subprocess.PIPE, shell=True ) elif sys.platform.startswith('darwin'): process = subprocess.Popen( cmd , stdout=subprocess.PIPE, shell=True ) stdoutdata, stderrdata = process.communicate( timeout = timeoutValue ) retcode = process.poll() #################################### # Catch crash error and log it. #################################### OutputHandle = None try: if retcode >= 1: OutputHandle = OutputManager( 'CrashJob_' + getTimeString() + '.txt', 'a+', sys.stdout, False) OutputHandle.write( cmd ) print (stdoutdata) print (stderrdata) sys.stdout.flush() except Exception as e: print (str(e)) except subprocess.TimeoutExpired: #################################### # Catch time out error and log it. #################################### Os = GetOs() if Os == 'win': killCmd = "taskkill /FI \"IMAGENAME eq {0}\" /T /F" elif Os == 'linux': killCmd = "pkill {0)" elif Os == 'mac': # Linux, Mac OS killCmd = "killall -KILL {0}" runCommand(killCmd.format("java")) runCommand(killCmd.format("YouApp")) OutputHandle = None try: OutputHandle = OutputManager( 'KillJob_' + getTimeString() + '.txt', 'a+', sys.stdout, False) OutputHandle.write( cmd ) except Exception as e: print (str(e)) except Exception as e: for frame in traceback.extract_tb(sys.exc_info()[2]): fname,lineno,fn,text = frame print "Error in %s on line %d" % (fname, lineno)
只是想写一些简单的东西。
#!/usr/bin/python from subprocess import Popen, PIPE import datetime import time popen = Popen(["/bin/sleep", "10"]); pid = popen.pid sttime = time.time(); waittime = 3 print "Start time %s"%(sttime) while True: popen.poll(); time.sleep(1) rcode = popen.returncode now = time.time(); if [ rcode is None ] and [ now > (sttime + waittime) ] : print "Killing it now" popen.kill()