如何从python的线程得到返回值?
如何获得线程返回的值'foo'
?
from threading import Thread def foo(bar): print 'hello {}'.format(bar) return 'foo' thread = Thread(target=foo, args=('world!',)) thread.start() ret = thread.join() print ret
上面显示的一个显而易见的方法是返回None
。
FWIW, multiprocessing
模块使用Pool
类有一个很好的界面。 如果你想坚持使用线程而不是进程,你可以使用multiprocessing.pool.ThreadPool
类作为一个插件替换。
def foo(bar, baz): print 'hello {0}'.format(bar) return 'foo' + baz from multiprocessing.pool import ThreadPool pool = ThreadPool(processes=1) async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo # do some other stuff in the main process return_val = async_result.get() # get the return value from your function.
我见过的一种方法是将可变对象(如列表或字典)传递给线程的构造函数,以及某种索引或其他标识符。 线程然后可以将其结果存储在该对象的专用槽中。 例如:
def foo(bar, result, index): print 'hello {0}'.format(bar) result[index] = "foo" from threading import Thread threads = [None] * 10 results = [None] * 10 for i in range(len(threads)): threads[i] = Thread(target=foo, args=('world!', results, i)) threads[i].start() # do some other stuff for i in range(len(threads)): threads[i].join() print " ".join(results) # what sound does a metasyntactic locomotive make?
如果你真的想让join()
返回被调用函数的返回值,那么可以使用如下所示的Thread
子类来实现:
from threading import Thread def foo(bar): print 'hello {0}'.format(bar) return "foo" class ThreadWithReturnValue(Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, Verbose=None): Thread.__init__(self, group, target, name, args, kwargs, Verbose) self._return = None def run(self): if self._Thread__target is not None: self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs) def join(self): Thread.join(self) return self._return twrv = ThreadWithReturnValue(target=foo, args=('world!',)) twrv.start() print twrv.join() # prints foo
由于某些名称的改变,这会变得有点毛骨悚然,并且它访问专用于Thread
实现的“私有”数据结构…但是它起作用。
杰克的答案是好的,但如果你不想使用线程池(你不知道需要多少线程,但根据需要创建它们),那么在线程之间传输信息的好方法是内置Queue.Queue类,因为它提供了线程安全性。
我创建了下面的装饰器,使其与线程池的行为类似:
def threaded(f, daemon=False): import Queue def wrapped_f(q, *args, **kwargs): '''this function calls the decorated function and puts the result in a queue''' ret = f(*args, **kwargs) q.put(ret) def wrap(*args, **kwargs): '''this is the function returned from the decorator. It fires off wrapped_f in a new thread and returns the thread object with the result queue attached''' q = Queue.Queue() t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs) t.daemon = daemon t.start() t.result_queue = q return t return wrap
那么你只是使用它:
@threaded def long_task(x): import time x = x + 5 time.sleep(5) return x # does not block, returns Thread object y = long_task(10) print y # this blocks, waiting for the result result = y.result_queue.get() print result
装饰的函数每次调用时都会创建一个新线程,并返回一个包含将接收结果的队列的Thread对象。
我偷了所有的答案,并把它清理了一下。
关键部分是将* args和** kwargs添加到join()中以处理超时
class threadWithReturn(Thread): def __init__(self, *args, **kwargs): super(threadWithReturn, self).__init__(*args, **kwargs) self._return = None def run(self): if self._Thread__target is not None: self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs) def join(self, *args, **kwargs): super(threadWithReturn, self).join(*args, **kwargs) return self._return
另一个不需要更改现有代码的解决方案:
import Queue from threading import Thread def foo(bar): print 'hello {0}'.format(bar) return 'foo' que = Queue.Queue() t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!')) t.start() t.join() result = que.get() print result
它也可以很容易地调整到多线程环境:
import Queue from threading import Thread def foo(bar): print 'hello {0}'.format(bar) return 'foo' que = Queue.Queue() threads_list = list() t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!')) t.start() threads_list.append(t) # Add more threads here ... threads_list.append(t2) ... threads_list.append(t3) ... # Join all the threads for t in threads_list: t.join() # Check thread's return value while not que.empty(): result = que.get() print result
Parris / kindall的答案 join
/ return
移植到Python 3的答案:
from threading import Thread def foo(bar): print('hello {0}'.format(bar)) return "foo" class ThreadWithReturnValue(Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None): Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon) self._return = None def run(self): if self._target is not None: self._return = self._target(*self._args, **self._kwargs) def join(self): Thread.join(self) return self._return twrv = ThreadWithReturnValue(target=foo, args=('world!',)) twrv.start() print(twrv.join()) # prints foo
请注意, Thread
类在Python 3中以不同的方式实现。
我解决这个问题的方法是将函数和线程包装到一个类中。 不需要使用池,队列或c类型变量传递。 它也是非阻塞的。 你检查状态,而不是。 查看如何在代码结束时使用它的示例。
import threading class ThreadWorker(): ''' The basic idea is given a function create an object. The object can then run the function in a thread. It provides a wrapper to start it,check its status,and get data out the function. ''' def __init__(self,func): self.thread = None self.data = None self.func = self.save_data(func) def save_data(self,func): '''modify function to save its returned data''' def new_func(*args, **kwargs): self.data=func(*args, **kwargs) return new_func def start(self,params): self.data = None if self.thread is not None: if self.thread.isAlive(): return 'running' #could raise exception here #unless thread exists and is alive start or restart it self.thread = threading.Thread(target=self.func,args=params) self.thread.start() return 'started' def status(self): if self.thread is None: return 'not_started' else: if self.thread.isAlive(): return 'running' else: return 'finished' def get_results(self): if self.thread is None: return 'not_started' #could return exception else: if self.thread.isAlive(): return 'running' else: return self.data def add(x,y): return x +y add_worker = ThreadWorker(add) print add_worker.start((1,2,)) print add_worker.status() print add_worker.get_results()
您可以使用Pool作为工作进程池,如下所示:
from multiprocessing import Pool def f1(x, y): return x*y if __name__ == '__main__': with Pool(processes=10) as pool: result = pool.apply(f1, (2, 3)) print(result)
join
总是返回None
,我想你应该子类Thread
来处理返回码等等。
定义你的目标
1)拿一个参数q
2)用q.put(foo); return
替换任何return foo
语句q.put(foo); return
q.put(foo); return
所以一个功能
def func(a): ans = a * a return ans
会成为
def func(a, q): ans = a * a q.put(ans) return
然后你会这样做
from Queue import Queue from threading import Thread ans_q = Queue() arg_tups = [(i, ans_q) for i in xrange(10)] threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups] _ = [t.start() for t in threads] _ = [t.join() for t in threads] results = [q.get() for _ in xrange(len(threads))]
你可以使用函数装饰器/包装器来创建它,所以你可以使用你现有的函数作为target
而不用修改它们,但是遵循这个基本的方案。
你可以在线程函数的作用域之上定义一个mutable,并将结果添加到该函数中。 (我也修改了python3兼容的代码)
returns = {} def foo(bar): print('hello {0}'.format(bar)) returns[bar] = 'foo' from threading import Thread t = Thread(target=foo, args=('world!',)) t.start() t.join() print(returns)
这返回{'world!': 'foo'}
如果使用函数输入作为结果字典的关键字,则每个唯一的输入都会保证在结果中输入一个条目
我正在使用这个包装器,它可以舒适地转换Thread
运行的任何函数 – 处理返回值或异常。 它不会添加Queue
开销。
def threading_func(f): """Decorator for running a function in a thread and handling its return value or exception""" def start(*args, **kw): def run(): try: th.ret = f(*args, **kw) except: th.exc = sys.exc_info() def get(timeout=None): th.join(timeout) if th.exc: raise th.exc[0], th.exc[1], th.exc[2] # py2 ##raise th.exc[1] #py3 return th.ret th = threading.Thread(None, run) th.exc = None th.get = get th.start() return th return start
用法示例
def f(x): return 2.5 * x th = threading_func(f)(4) print("still running?:", th.is_alive()) print("result:", th.get(timeout=1.0)) @threading_func def th_mul(a, b): return a * b th = th_mul("text", 2.5) try: print(th.get()) except TypeError: print("exception thrown ok.")
threading
模块的注意事项
线程函数的舒适返回值和异常处理是一种频繁的“Pythonic”需求,应该已经由threading
模块提供 – 可能直接在标准Thread
类中提供。 ThreadPool
对于简单的任务有太多的开销 – 3管理线程,很多官僚作风。 不幸的是, Thread
的布局最初是从Java复制而来的 – 例如,您可以从仍然无用的1st(!)构造函数参数group
看到该布局。
一个通常的解决方案是用你的装饰器来包装你的函数foo
result = queue.Queue() def task_wrapper(*args): result.put(target(*args))
那么整个代码可能看起来像这样
result = queue.Queue() def task_wrapper(*args): result.put(target(*args)) threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list] for t in threads: t.start() while(True): if(len(threading.enumerate()) < max_num): break for t in threads: t.join() return result
注意
一个重要的问题是,返回值可能是无序的 。 (其实return value
并不一定保存到queue
,因为你可以选择任意线程安全的数据结构)
为什么不使用全局变量?
import threading class myThread(threading.Thread): def __init__(self, ind, lock): threading.Thread.__init__(self) self.ind = ind self.lock = lock def run(self): global results with self.lock: results.append(self.ind) results = [] lock = threading.Lock() threads = [myThread(x, lock) for x in range(1, 4)] for t in threads: t.start() for t in threads: t.join() print(results)