Python如何一次读取N行
我正在写一个代码,一次获取一个巨大的文本文件(几GB)N行,处理该批处理,并移动到下一个N行,直到完成整个文件。 (我不在乎最后一批是不是完美的大小)。
我一直在阅读关于使用itertools islice这个操作。 我想我在这里一半:
from itertools import islice N = 16 infile = open("my_very_large_text_file", "r") lines_gen = islice(infile, N) for lines in lines_gen: ...process my lines...
麻烦的是,我想处理下一批16行,但我错过了一些东西
islice()
可用于获取迭代器的下n
项目。 因此, list(islice(f, n))
将返回文件f
的下n
行列表。 在一个循环中使用它会给你n
行文件。 在文件末尾,列表可能会更短,最后这个调用将返回一个空的列表。
with open(...) as f: while True: next_n_lines = list(islice(f, n)) if not next_n_lines: break # process next_n_lines
另一种方法是使用石斑鱼模式 :
with open(...) as f: for next_n_lines in izip_longest(*[f] * n): # process next_n_lines
这个问题似乎假定,一次读取N行数据块中的“巨大文本文件”是有效的。 这为已经高度优化的stdio
库增加了缓冲的应用层,增加了复杂性,并且可能完全没有购买任何东西。
从而:
with open('my_very_large_text_file') as f: for line in f: process(line)
在时间,空间,复杂性和可读性方面可能要优于其他方法。
另请参阅罗伯·派克的前两条规则 , jackson的两条规则和PEP-20“python之禅” 。 如果你真的只是想玩islice
你应该把大文件islice
。
由于要求增加了从文件中select的行的统计分布,我提供了这个简单的方法。
"""randsamp - extract a random subset of n lines from a large file""" import random def scan_linepos(path): """return a list of seek offsets of the beginning of each line""" linepos = [] offset = 0 with open(path) as inf: # WARNING: CPython 2.7 file.tell() is not accurate on file.next() for line in inf: linepos.append(offset) offset += len(line) return linepos def sample_lines(path, linepos, nsamp): """return nsamp lines from path where line offsets are in linepos""" offsets = random.sample(linepos, nsamp) offsets.sort() # this may make file reads more efficient lines = [] with open(path) as inf: for offset in offsets: inf.seek(offset) lines.append(inf.readline()) return lines dataset = 'big_data.txt' nsamp = 5 linepos = scan_linepos(dataset) # the scan only need be done once lines = sample_lines(dataset, linepos, nsamp) print 'selecting %d lines from a file of %d' % (nsamp, len(linepos)) print ''.join(lines)
我testing了一个300万行的模拟数据文件,其中包含磁盘上的1.7GB。 在scan_linepos
热的桌面上, scan_linepos
占据了运行时间大约20秒。
为了检查sample_lines
的性能,我使用了timeit
模块
import timeit t = timeit.Timer('sample_lines(dataset, linepos, nsamp)', 'from __main__ import sample_lines, dataset, linepos, nsamp') trials = 10 ** 4 elapsed = t.timeit(number=trials) print u'%dk trials in %.2f seconds, %.2fµs per trial' % (trials/1000, elapsed, (elapsed/trials) * (10 ** 6))
对于nsamp
各种值, 当nsamp
为100时,单个sample_lines
在460μs sample_lines
完成,并以每次47ms线性缩放至10k个样本。
自然的下一个问题是随机的几乎是随机的? ,答案是“子密码学,但生物信息学肯定是好的”。
这是另一种使用groupby的方法 :
from itertools import count, groupby N = 16 with open('test') as f: for g, group in groupby(f, key=lambda _, c=count(): c.next()/N): print list(group)
怎么运行的:
基本上,groupby()将按照key参数的返回值分组行,key参数是lambda函数lambda _, c=count(): c.next()/N
并使用c参数当函数被定义时绑定到count() ,所以每次groupby()
都会调用lambda函数并计算返回值来确定分组的行数,如下所示:
# 1 iteration. c.next() => 0 0 / 16 => 0 # 2 iteration. c.next() => 1 1 / 16 => 0 ... # Start of the second grouper. c.next() => 16 16/16 => 1 ...
使用chunker函数什么是最“pythonic”的方式来遍历一个大块的列表? :
from itertools import izip_longest def grouper(iterable, n, fillvalue=None): "grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx" args = [iter(iterable)] * n return izip_longest(*args, fillvalue=fillvalue) with open(filename) as f: for lines in grouper(f, chunk_size, ""): #for every chunk_sized chunk """process lines like lines[0], lines[1] , ... , lines[chunk_size-1]"""
假设“批处理”是指想一次处理所有16个logging,而不是单独处理,则一次读取一个logging并更新一个计数器; 当计数器达到16时,处理该组。
interim_list = [] infile = open("my_very_large_text_file", "r") ctr = 0 for rec in infile: interim_list.append(rec) ctr += 1 if ctr > 15: process_list(interim_list) interim_list = [] ctr = 0
interim_list = [] infile = open("my_very_large_text_file", "r") ctr = 0 for rec in infile: interim_list.append(rec) ctr += 1 if ctr > 15: process_list(interim_list) interim_list = [] ctr = 0
最后一组
process_list(interim_list)