Python中的asynchronous方法调用?
我想知道在Python中是否有用于asynchronous方法调用的库。 如果你能做到这样的话,那将是非常棒的
@async def longComputation(): <code> token = longComputation() token.registerCallback(callback_function) # alternative, polling while not token.finished(): doSomethingElse() if token.finished(): result = token.result()
或者asynchronous调用非asynchronous例程
def longComputation() <code> token = asynccall(longComputation())
在语言核心上有一个更精炼的策略是非常棒的。 这是否被考虑?
您可以使用Python 2.6中添加的多处理模块 。 您可以使用进程池,然后通过以下方式asynchronous获取结果:
apply_async(func[, args[, kwds[, callback]]])
例如:
from multiprocessing import Pool def f(x): return x*x if __name__ == '__main__': pool = Pool(processes=1) # Start a worker processes. result = pool.apply_async(f, [10], callback) # Evaluate "f(10)" asynchronously calling callback when finished.
这只是一个select。 这个模块提供了许多设施来实现你想要的。 从这个方面来制作装饰器也是非常容易的。
那样的事情呢?
import threading thr = threading.Thread(target=foo, args=(), kwargs={}) thr.start() # will run "foo" .... thr.is_alive() # will return whether foo is running currently .... thr.join() # will wait till "foo" is done
有关更多详细信息,请参阅https://docs.python.org/2/library/threading.html#module-threading上的文档;; 这个代码应该也适用于python 3。
从Python 3.5开始,您可以使用增强型生成器来实现asynchronousfunction。
import asyncio import datetime
增强的生成器语法
@asyncio.coroutine def display_date(loop): end_time = loop.time() + 5.0 while True: print(datetime.datetime.now()) if (loop.time() + 1.0) >= end_time: break yield from asyncio.sleep(1) loop = asyncio.get_event_loop() # Blocking call which returns when the display_date() coroutine is done loop.run_until_complete(display_date(loop)) loop.close()
新的async/await
语法:
async def display_date(loop): end_time = loop.time() + 5.0 while True: print(datetime.datetime.now()) if (loop.time() + 1.0) >= end_time: break await asyncio.sleep(1) loop = asyncio.get_event_loop() # Blocking call which returns when the display_date() coroutine is done loop.run_until_complete(display_date(loop)) loop.close()
这不是语言核心,而是一个非常成熟的图书馆,你想要的是扭曲的 。 它引入了Deferred对象,您可以将callback或error handling程序(“errbacks”)附加到该对象。 Deferred基本上是一个函数最终会有结果的“承诺”。
你可以实现一个装饰器来使你的函数asynchronous,虽然这有点棘手。 multiprocessing
模块充满了小怪癖和似乎任意的限制 – 但更多的理由将其封装在友好的界面后面。
from inspect import getmodule from multiprocessing import Pool def async(decorated): r'''Wraps a top-level function around an asynchronous dispatcher. when the decorated function is called, a task is submitted to a process pool, and a future object is returned, providing access to an eventual return value. The future object has a blocking get() method to access the task result: it will return immediately if the job is already done, or block until it completes. This decorator won't work on methods, due to limitations in Python's pickling machinery (in principle methods could be made pickleable, but good luck on that). ''' # Keeps the original function visible from the module global namespace, # under a name consistent to its __name__ attribute. This is necessary for # the multiprocessing pickling machinery to work properly. module = getmodule(decorated) decorated.__name__ += '_original' setattr(module, decorated.__name__, decorated) def send(*args, **opts): return async.pool.apply_async(decorated, args, opts) return send
下面的代码演示了装饰器的用法:
@async def printsum(uid, values): summed = 0 for value in values: summed += value print("Worker %i: sum value is %i" % (uid, summed)) return (uid, summed) if __name__ == '__main__': from random import sample # The process pool must be created inside __main__. async.pool = Pool(4) p = range(0, 1000) results = [] for i in range(4): result = printsum(i, sample(p, 100)) results.append(result) for result in results: print("Worker %i: sum value is %i" % result.get())
在现实世界的情况下,我会详细介绍一下装饰器,提供一些closuresdebugging的方法(同时保留未来的接口),或者是一个处理exception的工具。 但我认为这足以certificate这一原则。
只是
import threading, time def f(): print "f started" time.sleep(3) print "f finished" threading.Thread(target=f).start()
你可以使用eventlet。 它可以让你写什么似乎是同步代码,但它有asynchronous运行在networking上。
这里有一个超级最小的爬虫的例子:
urls = ["http://www.google.com/intl/en_ALLhttp://img.dovov.comlogo.gif", "https://wiki.secondlife.com/whttp://img.dovov.comsecondlife.jpg", "http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"] import eventlet from eventlet.green import urllib2 def fetch(url): return urllib2.urlopen(url).read() pool = eventlet.GreenPool() for body in pool.imap(fetch, urls): print "got body", len(body)
我的解决scheme是:
import threading class TimeoutError(RuntimeError): pass class AsyncCall(object): def __init__(self, fnc, callback = None): self.Callable = fnc self.Callback = callback def __call__(self, *args, **kwargs): self.Thread = threading.Thread(target = self.run, name = self.Callable.__name__, args = args, kwargs = kwargs) self.Thread.start() return self def wait(self, timeout = None): self.Thread.join(timeout) if self.Thread.isAlive(): raise TimeoutError() else: return self.Result def run(self, *args, **kwargs): self.Result = self.Callable(*args, **kwargs) if self.Callback: self.Callback(self.Result) class AsyncMethod(object): def __init__(self, fnc, callback=None): self.Callable = fnc self.Callback = callback def __call__(self, *args, **kwargs): return AsyncCall(self.Callable, self.Callback)(*args, **kwargs) def Async(fnc = None, callback = None): if fnc == None: def AddAsyncCallback(fnc): return AsyncMethod(fnc, callback) return AddAsyncCallback else: return AsyncMethod(fnc, callback)
并按要求完成工作:
@Async def fnc(): pass
像这样的东西适合我,然后你可以调用这个函数,然后它将自己调度到一个新的线程上。
from thread import start_new_thread def dowork(asynchronous=True): if asynchronous: args = (False) start_new_thread(dowork,args) #Call itself on a new thread. else: while True: #do something... time.sleep(60) #sleep for a minute return
有什么理由不使用线程? 你可以使用threading
类。 而不是finished()
函数使用isAlive()
。 result()
函数可以join()
线程并检索结果。 而且,如果可以的话,重写run()
和__init__
函数来调用构造函数中指定的函数,并将该值保存到类的实例中。
你可以使用concurrent.futures (在Python 3.2中添加)。
import time from concurrent.futures import ThreadPoolExecutor def long_computation(duration): for x in range(0, duration): print(x) time.sleep(1) return duration * 2 print('Use polling') with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(long_computation, 5) while not future.done(): print('waiting...') time.sleep(0.5) print(future.result()) print('Use callback') executor = ThreadPoolExecutor(max_workers=1) future = executor.submit(long_computation, 5) future.add_done_callback(lambda f: print(f.result())) print('waiting for callback') executor.shutdown(False) # non-blocking print('shutdown invoked')