如何有效地parsing固定宽度的文件?
我试图find一个有效的方式来parsing保存固定宽度线的文件。 例如,前20个字符代表一列,从21:30开始另一列等等。
假设该行可以包含100个字符,那么将一个行parsing为多个组件的有效方法是什么?
我可以使用每行的string切片,但是如果行很大,则会有点难看。 还有其他的快速方法吗?
使用Python标准库的struct
模块将非常容易,而且速度非常快,因为它是用C编写的。
以下是如何使用它来做你想要的。 它还允许通过为字段中的字符数指定负值来跳过字符列。
import struct fieldwidths = (2, -10, 24) # negative widths represent ignored padding fields fmtstring = ' '.join('{}{}'.format(abs(fw), 'x' if fw < 0 else 's') for fw in fieldwidths) fieldstruct = struct.Struct(fmtstring) parse = fieldstruct.unpack_from print('fmtstring: {!r}, recsize: {} chars'.format(fmtstring, fieldstruct.size)) line = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789\n' fields = parse(line) print('fields: {}'.format(fields))
输出:
fmtstring: '2s 10x 24s', recsize: 36 chars fields: ('AB', 'MNOPQRSTUVWXYZ0123456789')
以下修改将使其适用于Python 2或3(并处理Unicodeinput):
import sys fieldstruct = struct.Struct(fmtstring) if sys.version_info[0] < 3: parse = fieldstruct.unpack_from else: # converts unicode input to byte string and results back to unicode string unpack = fieldstruct.unpack_from parse = lambda line: tuple(s.decode() for s in unpack(line.encode()))
正如你正在考虑的那样,这里有一个方法可以用string切片来完成,但担心它会变得太难看。 关于它的好处是,除了没有那么难看,它在Python 2和Python 3中都是不变的,并且能够处理Unicodestring。 我没有对它进行基准testing,但怀疑它可能与struct
模块版本的速度竞争。 通过消除填充字段的能力,可以稍微加快速度。
try: from itertools import izip_longest # added in Py 2.6 except ImportError: from itertools import zip_longest as izip_longest # name change in Py 3.x try: from itertools import accumulate # added in Py 3.2 except ImportError: def accumulate(iterable): 'Return running totals (simplified version).' total = next(iterable) yield total for value in iterable: total += value yield total def make_parser(fieldwidths): cuts = tuple(cut for cut in accumulate(abs(fw) for fw in fieldwidths)) pads = tuple(fw < 0 for fw in fieldwidths) # bool values for padding fields flds = tuple(izip_longest(pads, (0,)+cuts, cuts))[:-1] # ignore final one parse = lambda line: tuple(line[i:j] for pad, i, j in flds if not pad) # optional informational function attributes parse.size = sum(abs(fw) for fw in fieldwidths) parse.fmtstring = ' '.join('{}{}'.format(abs(fw), 'x' if fw < 0 else 's') for fw in fieldwidths) return parse line = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789\n' fieldwidths = (2, -10, 24) # negative widths represent ignored padding fields parse = make_parser(fieldwidths) fields = parse(line) print('format: {!r}, rec size: {} chars'.format(parse.fmtstring, parse.size)) print('fields: {}'.format(fields))
输出:
format: '2s 10x 24s', rec size: 36 chars fields: ('AB', 'MNOPQRSTUVWXYZ0123456789')
我不确定这是否有效,但它应该是可读的(而不是手动切片)。 我定义了一个函数slices
,获得一个string和列的长度,并返回子string。 我把它做成了一个生成器,所以对于很长的一行,它不会构build一个临时的子string列表。
def slices(s, *args): position = 0 for length in args: yield s[position:position + length] position += length
例
In [32]: list(slices('abcdefghijklmnopqrstuvwxyz0123456789', 2)) Out[32]: ['ab'] In [33]: list(slices('abcdefghijklmnopqrstuvwxyz0123456789', 2, 10, 50)) Out[33]: ['ab', 'cdefghijkl', 'mnopqrstuvwxyz0123456789'] In [51]: d,c,h = slices('dogcathouse', 3, 3, 5) In [52]: d,c,h Out[52]: ('dog', 'cat', 'house')
但是我认为如果你一次需要所有的列,那么发生器的优点就会丢失。 当你想要逐一处理列的时候,可以从中受益的地方就是循环。
比已经提到的解决scheme更容易和更漂亮的两个选项
首先是使用pandas
import pandas as pd path = 'filename.txt' #using pandas with a column specification col_specification =[(0, 20), (21, 30), (31, 50), (51, 100)] data = pd.read_fwf(path, colspecs=col_specification)
第二个选项使用numpy.loadtxt
import numpy as np #using numpy and letting it figure it out automagically data_also = np.loadtxt(path)
这真的取决于你想用什么方式使用你的数据。
下面的代码给出了一个草图,说明如果你有一些严重的固定列宽的文件处理,你可能想要做什么。
“严重”=多个文件types中的多个loggingtypes,logging多达1000字节,布局定义者和“反对”生产者/消费者是一个有态度的政府部门,布局变化导致未使用的列,多达100万条logging在一个文件中,…
特点:预编译结构格式。 忽略不需要的列。 将inputstring转换为所需的数据types(草图省略error handling)。 将logging转换为对象实例(或者如果您喜欢,则使用字典或命名元组)。
码:
import struct, datetime, io, pprint # functions for converting input fields to usable data cnv_text = rstrip cnv_int = int cnv_date_dmy = lambda s: datetime.datetime.strptime(s, "%d%m%Y") # ddmmyyyy # etc # field specs (field name, start pos (1-relative), len, converter func) fieldspecs = [ ('surname', 11, 20, cnv_text), ('given_names', 31, 20, cnv_text), ('birth_date', 51, 8, cnv_date_dmy), ('start_date', 71, 8, cnv_date_dmy), ] fieldspecs.sort(key=lambda x: x[1]) # just in case # build the format for struct.unpack unpack_len = 0 unpack_fmt = "" for fieldspec in fieldspecs: start = fieldspec[1] - 1 end = start + fieldspec[2] if start > unpack_len: unpack_fmt += str(start - unpack_len) + "x" unpack_fmt += str(end - start) + "s" unpack_len = end field_indices = range(len(fieldspecs)) print unpack_len, unpack_fmt unpacker = struct.Struct(unpack_fmt).unpack_from class Record(object): pass # or use named tuples raw_data = """\ ....v....1....v....2....v....3....v....4....v....5....v....6....v....7....v....8 Featherstonehaugh Algernon Marmaduke 31121969 01012005XX """ f = cStringIO.StringIO(raw_data) headings = f.next() for line in f: # The guts of this loop would of course be hidden away in a function/method # and could be made less ugly raw_fields = unpacker(line) r = Record() for x in field_indices: setattr(r, fieldspecs[x][0], fieldspecs[x][3](raw_fields[x])) pprint.pprint(r.__dict__) print "Customer name:", r.given_names, r.surname
输出:
78 10x20s20s8s12x8s {'birth_date': datetime.datetime(1969, 12, 31, 0, 0), 'given_names': 'Algernon Marmaduke', 'start_date': datetime.datetime(2005, 1, 1, 0, 0), 'surname': 'Featherstonehaugh'} Customer name: Algernon Marmaduke Featherstonehaugh
> str = '1234567890' > w = [0,2,5,7,10] > [ str[ w[i-1] : w[i] ] for i in range(1,len(w)) ] ['12', '345', '67', '890']
下面是Python 3的一个简单模块,基于John Machin的答案 – 根据需要进行调整:)
""" fixedwidth Parse and iterate through a fixedwidth text file, returning record objects. Adapted from https://stackoverflow.com/a/4916375/243392 USAGE import fixedwidth, pprint # define the fixed width fields we want # fieldspecs is a list of [name, description, start, width, type] arrays. fieldspecs = [ ["FILEID", "File Identification", 1, 6, "A/N"], ["STUSAB", "State/US Abbreviation (USPS)", 7, 2, "A"], ["SUMLEV", "Summary Level", 9, 3, "A/N"], ["LOGRECNO", "Logical Record Number", 19, 7, "N"], ["POP100", "Population Count (100%)", 30, 9, "N"], ] # define the fieldtype conversion functions fieldtype_fns = { 'A': str.rstrip, 'A/N': str.rstrip, 'N': int, } # iterate over record objects in the file with open(f, 'rb'): for record in fixedwidth.reader(f, fieldspecs, fieldtype_fns): pprint.pprint(record.__dict__) # output: {'FILEID': 'SF1ST', 'LOGRECNO': 2, 'POP100': 1, 'STUSAB': 'TX', 'SUMLEV': '040'} {'FILEID': 'SF1ST', 'LOGRECNO': 3, 'POP100': 2, 'STUSAB': 'TX', 'SUMLEV': '040'} ... """ import struct, io # fieldspec columns iName, iDescription, iStart, iWidth, iType = range(5) def get_struct_unpacker(fieldspecs): """ Build the format string for struct.unpack to use, based on the fieldspecs. fieldspecs is a list of [name, description, start, width, type] arrays. Returns a string like "6s2s3s7x7s4x9s". """ unpack_len = 0 unpack_fmt = "" for fieldspec in fieldspecs: start = fieldspec[iStart] - 1 end = start + fieldspec[iWidth] if start > unpack_len: unpack_fmt += str(start - unpack_len) + "x" unpack_fmt += str(end - start) + "s" unpack_len = end struct_unpacker = struct.Struct(unpack_fmt).unpack_from return struct_unpacker class Record(object): pass # or use named tuples def reader(f, fieldspecs, fieldtype_fns): """ Wrap a fixedwidth file and return records according to the given fieldspecs. fieldspecs is a list of [name, description, start, width, type] arrays. fieldtype_fns is a dictionary of functions used to transform the raw string values, one for each type. """ # make sure fieldspecs are sorted properly fieldspecs.sort(key=lambda fieldspec: fieldspec[iStart]) struct_unpacker = get_struct_unpacker(fieldspecs) field_indices = range(len(fieldspecs)) for line in f: raw_fields = struct_unpacker(line) # split line into field values record = Record() for i in field_indices: fieldspec = fieldspecs[i] fieldname = fieldspec[iName] s = raw_fields[i].decode() # convert raw bytes to a string fn = fieldtype_fns[fieldspec[iType]] # get conversion function value = fn(s) # convert string to value (eg to an int) setattr(record, fieldname, value) yield record if __name__=='__main__': # test module import pprint, io # define the fields we want # fieldspecs are [name, description, start, width, type] fieldspecs = [ ["FILEID", "File Identification", 1, 6, "A/N"], ["STUSAB", "State/US Abbreviation (USPS)", 7, 2, "A"], ["SUMLEV", "Summary Level", 9, 3, "A/N"], ["LOGRECNO", "Logical Record Number", 19, 7, "N"], ["POP100", "Population Count (100%)", 30, 9, "N"], ] # define a conversion function for integers def to_int(s): """ Convert a numeric string to an integer. Allows a leading ! as an indicator of missing or uncertain data. Returns None if no data. """ try: return int(s) except: try: return int(s[1:]) # ignore a leading ! except: return None # assume has a leading ! and no value # define the conversion fns fieldtype_fns = { 'A': str.rstrip, 'A/N': str.rstrip, 'N': to_int, # 'N': int, # 'D': lambda s: datetime.datetime.strptime(s, "%d%m%Y"), # ddmmyyyy # etc } # define a fixedwidth sample sample = """\ SF1ST TX04089000 00000023748 1 SF1ST TX04090000 00000033748! 2 SF1ST TX04091000 00000043748! """ sample_data = sample.encode() # convert string to bytes file_like = io.BytesIO(sample_data) # create a file-like wrapper around bytes # iterate over record objects in the file for record in reader(file_like, fieldspecs, fieldtype_fns): # print(record) pprint.pprint(record.__dict__)