在Python中发送100,000个HTTP请求的最快方式是什么?

我正在打开一个具有100,000个url的文件。 我需要发送一个http请求到每个url并打印状态码。 我正在使用Python 2.6,到目前为止看着Python实现线程/并发的许多令人困惑的方式。 我甚至看了python 并发库,但不知道如何正确编写这个程序。 有没有人遇到类似的问题? 我想通常我需要知道如何在Python中尽可能快地执行数千个任务 – 我想这意味着“同时”。

谢谢,伊戈尔

无扭线解决scheme:

 from urlparse import urlparse from threading import Thread import httplib, sys from Queue import Queue concurrent = 200 def doWork(): while True: url = q.get() status, url = getStatus(url) doSomethingWithResult(status, url) q.task_done() def getStatus(ourl): try: url = urlparse(ourl) conn = httplib.HTTPConnection(url.netloc) conn.request("HEAD", url.path) res = conn.getresponse() return res.status, ourl except: return "error", ourl def doSomethingWithResult(status, url): print status, url q = Queue(concurrent * 2) for i in range(concurrent): t = Thread(target=doWork) t.daemon = True t.start() try: for url in open('urllist.txt'): q.put(url.strip()) q.join() except KeyboardInterrupt: sys.exit(1) 

这个比扭曲的解决scheme略微快一点,并且使用较less的CPU。

一个使用龙卷风asynchronousnetworking库的解决scheme

 from tornado import ioloop, httpclient i = 0 def handle_request(response): print(response.code) global i i -= 1 if i == 0: ioloop.IOLoop.instance().stop() http_client = httpclient.AsyncHTTPClient() for url in open('urls.txt'): i += 1 http_client.fetch(url.strip(), handle_request, method='HEAD') ioloop.IOLoop.instance().start() 

线程绝对不是这里的答案。 他们将提供进程和内核瓶颈,以及吞吐量限制,如果总体目标是“最快的方式”,这是不可接受的。

有点twisted ,它的asynchronousHTTP客户端会给你更好的结果。

使用grequests ,这是请求+ Gevent模块的组合。

GRequests允许您使用Gevent的请求轻松地创buildasynchronous的HTTP请求。

用法很简单:

 import grequests urls = [ 'http://www.heroku.com', 'http://tablib.org', 'http://httpbin.org', 'http://python-requests.org', 'http://kennethreitz.com' ] 

创build一组未发送的请求:

 >>> rs = (grequests.get(u) for u in urls) 

同时发送他们:

 >>> grequests.map(rs) [<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>] 

解决这个问题的一个好方法是首先编写获得一个结果所需的代码,然后合并线程代码来并行化应用程序。

在一个完美的世界中,这只是意味着同时启动100,000个线程,将其结果输出到字典或列表中供以后处理,但在实践中限制了以这种方式发出多less个并行HTTP请求。 在本地,您可以同时打开多less个套接字,Python解释器允许执行多less个线程。 从远程来看,如果所有请求都是针对一台服务器,或者多台服务器,则可能会限制同时连接的数量。 这些限制可能会使您必须以这种方式编写脚本,以便在任何时候只轮询一小部分的URL(100,因为提到的另一个海报可能是一个体面的线程池大小,虽然你可能会发现你可以成功部署更多)。

你可以按照这个devise模式解决上述问题:

  1. 启动一个线程,启动新的请求线程,直到当前正在运行的线程的数量(可以通过threading.active_count()跟踪或通过将线程对象推入数据结构)> =最大同时请求数量(比如100) ,然后睡一会儿超时。 当没有更多的URL要处理时,该线程应该终止。 因此,线程将不断醒来,启动新线程,并hibernate,直到完成。
  2. 让请求线程将其结果存储在某个数据结构中,供以后检索和输出。 如果您正在存储结果的结构是CPython中的listdict ,则可以安全地在线程中附加或插入唯一的项目,而无需locking ,但是如果写入文件或需要更复杂的跨线程数据交互,则应该使用互斥锁来保护这个状态免受腐败

我build议你使用线程模块。 您可以使用它来启动和跟踪正在运行的线程。 Python的线程支持是裸露的,但对问题的描述表明它完全满足您的需求。

最后,如果你想看看用Python编写的并行networking应用程序的一个非常简单的应用程序,请检查ssh.py。 这是一个使用Python线程来并行处理多个SSH连接的小型库。 devise足够接近你的要求,你可能会发现它是一个很好的资源。

一个办法:

 from twisted.internet import reactor, threads from urlparse import urlparse import httplib import itertools concurrent = 200 finished=itertools.count(1) reactor.suggestThreadPoolSize(concurrent) def getStatus(ourl): url = urlparse(ourl) conn = httplib.HTTPConnection(url.netloc) conn.request("HEAD", url.path) res = conn.getresponse() return res.status def processResponse(response,url): print response, url processedOne() def processError(error,url): print "error", url#, error processedOne() def processedOne(): if finished.next()==added: reactor.stop() def addTask(url): req = threads.deferToThread(getStatus, url) req.addCallback(processResponse, url) req.addErrback(processError, url) added=0 for url in open('urllist.txt'): added+=1 addTask(url.strip()) try: reactor.run() except KeyboardInterrupt: reactor.stop() 

原料与材料:

 [kalmi@ubi1:~] wc -l urllist.txt 10000 urllist.txt [kalmi@ubi1:~] time python f.py > /dev/null real 1m10.682s user 0m16.020s sys 0m10.330s [kalmi@ubi1:~] head -n 6 urllist.txt http://www.google.com http://www.bix.hu http://www.godaddy.com http://www.google.com http://www.bix.hu http://www.godaddy.com [kalmi@ubi1:~] python f.py | head -n 6 200 http://www.bix.hu 200 http://www.bix.hu 200 http://www.bix.hu 200 http://www.bix.hu 200 http://www.bix.hu 200 http://www.bix.hu 

Pingtime:

 bix.hu is ~10 ms away from me godaddy.com: ~170 ms google.com: ~30 ms 

如果您希望尽可能获得最佳性能,则可能需要考虑使用asynchronousI / O而不是线程。 与数千个操作系统线程相关的开销并不重要,Python解释器中的上下文切换在其上增加了更多。 线程将肯定会完成工作,但我怀疑asynchronous路由将提供更好的整体性能。

具体来说,我build议twisted库( http://www.twistedmatrix.com )中的asynchronousWeb客户端。 它有一个公认的陡峭的学习曲线,但它很容易使用,一旦你很好地处理了Twisted的asynchronous编程风格。

扭曲的asynchronousWeb客户端API的一个HowTo可在:

http://twistedmatrix.com/documents/current/web/howto/client.html

事情发生了变化,自2010年以来,当这张贴,我还没有尝试所有其他的答案,但我已经尝试了一些,我发现这对我使用python3.6最好。

我能够在AWS上运行每秒约150个独特的域。

 import pandas as pd import concurrent.futures import requests import time out = [] CONNECTIONS = 100 TIMEOUT = 5 time1 = None time2 = None tlds = open('../data/sample_1k.txt').read().splitlines() urls = ['http://{}'.format(x) for x in tlds[1:]] def load_url(url, timeout): ans = requests.head(url, timeout=timeout) return ans.status_code with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor: future_to_url = {executor.submit(load_url, url, TIMEOUT): url for url in urls} for future in concurrent.futures.as_completed(future_to_url): try: data = future.result() except Exception as exc: data = str(type(exc)) finally: out.append(data) print(str(len(out)),end="\r") if time1 == None: time1 = time.time() if len(out)/len(urls)>=1 and time2==None: time2 = time.time() print('Took {:.2f} s'.format((time2-time1))) print(str(pd.Series(out).value_counts())) 

使用线程池是一个不错的select,并且会使这个过程变得相当简单。 不幸的是,python没有一个标准库,使线程池非常容易。 但是,这是一个体面的图书馆,应该让你开始: http : //www.chrisarndt.de/projects/threadpool/

来自其网站的代码示例:

 pool = ThreadPool(poolsize) requests = makeRequests(some_callable, list_of_args, callback) [pool.putRequest(req) for req in requests] pool.wait() 

希望这可以帮助。

对于你的情况,线程可能会做的伎俩,因为你可能会花大部分时间等待回应。 在标准库中有像Queue这样有用的模块可能会有所帮助。

我做了一个类似的事情,并行下载文件之前,这对我来说已经足够了,但是这并不是你正在谈论的规模。

如果您的任务更受CPU限制,则可能需要查看多处理模块,这将允许您使用更多的CPU /内核/线程(更多的进程不会相互阻塞,因为每个进程的locking都是一样的)

考虑使用风车 ,虽然风车可能不能做那么multithreading。

你可以在5台机器上用一个手动的Python脚本来完成,每台机器使用40000-60000端口连接出口,打开100,000个端口连接。

另外,为了了解每个服务器可以处理多less,可以使用OpenSTA等线程良好的QA应用程序进行示例testing。

另外,尝试使用LWP :: ConnCache类简单的Perl。 你可能会获得更多的性能(更多的连接)。

这个扭曲的asynchronousWeb客户端变得非常快。

 #!/usr/bin/python2.7 from twisted.internet import reactor from twisted.internet.defer import Deferred, DeferredList, DeferredLock from twisted.internet.defer import inlineCallbacks from twisted.web.client import Agent, HTTPConnectionPool from twisted.web.http_headers import Headers from pprint import pprint from collections import defaultdict from urlparse import urlparse from random import randrange import fileinput pool = HTTPConnectionPool(reactor) pool.maxPersistentPerHost = 16 agent = Agent(reactor, pool) locks = defaultdict(DeferredLock) codes = {} def getLock(url, simultaneous = 1): return locks[urlparse(url).netloc, randrange(simultaneous)] @inlineCallbacks def getMapping(url): # Limit ourselves to 4 simultaneous connections per host # Tweak this number, but it should be no larger than pool.maxPersistentPerHost lock = getLock(url,4) yield lock.acquire() try: resp = yield agent.request('HEAD', url) codes[url] = resp.code except Exception as e: codes[url] = str(e) finally: lock.release() dl = DeferredList(getMapping(url.strip()) for url in fileinput.input()) dl.addCallback(lambda _: reactor.stop()) reactor.run() pprint(codes) 

最简单的方法是使用Python的内置线程库。 它们不是“真正的”/内核线程,但是足够好。 你会想要一个队列和线程池。 一种select是在这里 ,但是写你自己的是微不足道的。 您不能并行处理所有100,000个电话,但您可以同时触发100个(或多个)电话。