使用POST脚本从Python脚本发送文件
有没有办法从Python脚本使用POST发送文件?
从http://docs.python-requests.org/en/latest/user/quickstart/#post-a-multipart-encoded-file
请求使得上传Multipart编码的文件变得非常简单:
>>> with open('report.xls', 'rb') as f: r = requests.post('http://httpbin.org/post', files={'report.xls': f})
而已。 我不是在开玩笑 – 这是一行代码。 文件已发送。 让我们检查:
>>> r.text { "origin": "179.13.100.4", "files": { "report.xls": "<censored...binary...data>" }, "form": {}, "url": "http://httpbin.org/post", "args": {}, "headers": { "Content-Length": "3196", "Accept-Encoding": "identity, deflate, compress, gzip", "Accept": "*/*", "User-Agent": "python-requests/0.8.0", "Host": "httpbin.org:80", "Content-Type": "multipart/form-data; boundary=127.0.0.1.502.21746.1321131593.786.1" }, "data": "" }
是。 您将使用urllib2
模块,并使用multipart/form-data
内容types进行编码。 下面是一些示例代码,让你开始 – 这不仅仅是file upload,但是你应该能够通读它,看看它是如何工作的:
user_agent = "image uploader" default_message = "Image $current of $total" import logging import os from os.path import abspath, isabs, isdir, isfile, join import random import string import sys import mimetypes import urllib2 import httplib import time import re def random_string (length): return ''.join (random.choice (string.letters) for ii in range (length + 1)) def encode_multipart_data (data, files): boundary = random_string (30) def get_content_type (filename): return mimetypes.guess_type (filename)[0] or 'application/octet-stream' def encode_field (field_name): return ('--' + boundary, 'Content-Disposition: form-data; name="%s"' % field_name, '', str (data [field_name])) def encode_file (field_name): filename = files [field_name] return ('--' + boundary, 'Content-Disposition: form-data; name="%s"; filename="%s"' % (field_name, filename), 'Content-Type: %s' % get_content_type(filename), '', open (filename, 'rb').read ()) lines = [] for name in data: lines.extend (encode_field (name)) for name in files: lines.extend (encode_file (name)) lines.extend (('--%s--' % boundary, '')) body = '\r\n'.join (lines) headers = {'content-type': 'multipart/form-data; boundary=' + boundary, 'content-length': str (len (body))} return body, headers def send_post (url, data, files): req = urllib2.Request (url) connection = httplib.HTTPConnection (req.get_host ()) connection.request ('POST', req.get_selector (), *encode_multipart_data (data, files)) response = connection.getresponse () logging.debug ('response = %s', response.read ()) logging.debug ('Code: %s %s', response.status, response.reason) def make_upload_file (server, thread, delay = 15, message = None, username = None, email = None, password = None): delay = max (int (delay or '0'), 15) def upload_file (path, current, total): assert isabs (path) assert isfile (path) logging.debug ('Uploading %r to %r', path, server) message_template = string.Template (message or default_message) data = {'MAX_FILE_SIZE': '3145728', 'sub': '', 'mode': 'regist', 'com': message_template.safe_substitute (current = current, total = total), 'resto': thread, 'name': username or '', 'email': email or '', 'pwd': password or random_string (20),} files = {'upfile': path} send_post (server, data, files) logging.info ('Uploaded %r', path) rand_delay = random.randint (delay, delay + 5) logging.debug ('Sleeping for %.2f seconds------------------------------\n\n', rand_delay) time.sleep (rand_delay) return upload_file def upload_directory (path, upload_file): assert isabs (path) assert isdir (path) matching_filenames = [] file_matcher = re.compile (r'\.(?:jpe?g|gif|png)$', re.IGNORECASE) for dirpath, dirnames, filenames in os.walk (path): for name in filenames: file_path = join (dirpath, name) logging.debug ('Testing file_path %r', file_path) if file_matcher.search (file_path): matching_filenames.append (file_path) else: logging.info ('Ignoring non-image file %r', path) total_count = len (matching_filenames) for index, file_path in enumerate (matching_filenames): upload_file (file_path, index + 1, total_count) def run_upload (options, paths): upload_file = make_upload_file (**options) for arg in paths: path = abspath (arg) if isdir (path): upload_directory (path, upload_file) elif isfile (path): upload_file (path) else: logging.error ('No such path: %r' % path) logging.info ('Done!')
唯一阻止你直接在文件对象上使用urlopen的事实是内build文件对象缺lesslen定义。 一个简单的方法是创build一个子类,它提供了正确的文件urlopen。 我也修改了下面的文件中的Content-Type标题。
import os import urllib2 class EnhancedFile(file): def __init__(self, *args, **keyws): file.__init__(self, *args, **keyws) def __len__(self): return int(os.fstat(self.fileno())[6]) theFile = EnhancedFile('a.xml', 'r') theUrl = "http://example.com/abcde" theHeaders= {'Content-Type': 'text/xml'} theRequest = urllib2.Request(theUrl, theFile, theHeaders) response = urllib2.urlopen(theRequest) theFile.close() for line in response: print line
看起来像python请求不处理非常大的多部分文件。
文档build议您查看requests-toolbelt
。
以下是他们文档中的相关页面 。
Chris Atlee的海报库非常适合这个(特别是便利functionposter.encode.multipart_encode()
)。 作为奖励,它支持大文件的stream式传输,而不需要将整个文件加载到内存中。 另请参阅Python问题3244 。
我正在尝试django rest api和它为我工作:
def test_upload_file(self): filename = "/Users/Ranvijay/tests/test_price_matrix.csv" data = {'file': open(filename, 'rb')} client = APIClient() # client.credentials(HTTP_AUTHORIZATION='Token ' + token.key) response = client.post(reverse('price-matrix-csv'), data, format='multipart') print response self.assertEqual(response.status_code, status.HTTP_200_OK)
你可能也想看一下httplib2和示例 。 我发现使用httplib2比使用内置的HTTP模块更简洁。
def visit_v2(device_code, camera_code): image1 = MultipartParam.from_file("files", "/home/yuzx/1.txt") image2 = MultipartParam.from_file("files", "/home/yuzx/2.txt") datagen, headers = multipart_encode([('device_code', device_code), ('position', 3), ('person_data', person_data), image1, image2]) print "".join(datagen) if server_port == 80: port_str = "" else: port_str = ":%s" % (server_port,) url_str = "http://" + server_ip + port_str + "/adopen/device/visit_v2" headers['nothing'] = 'nothing' request = urllib2.Request(url_str, datagen, headers) try: response = urllib2.urlopen(request) resp = response.read() print "http_status =", response.code result = json.loads(resp) print resp return result except urllib2.HTTPError, e: print "http_status =", e.code print e.read()