“大数据”使用大pandas的工作stream程
在学习pandas的同时,我试图在这个问题上解答好几个月。 我使用SAS进行日常工作,这对我们来说是非常棒的支持。 然而,由于其他原因,SAS作为一款软件是非常糟糕的。
有一天,我希望用python和pandas来替代我的SAS,但是我目前缺乏大型数据集的核心外工作stream程。 我不是在谈论需要分布式networking的“大数据”,而是文件太大而不适合内存,但又足够小以适应硬盘驱动器。
我的第一个想法是使用HDFStore
在磁盘上保存大型数据集,并只将需要的部分HDFStore
到数据HDFStore
进行分析。 其他人则提到MongoDB是一个更易于使用的替代scheme。 我的问题是这样的:
什么是一些最佳实践工作stream程来完成以下工作:
- 将平面文件加载到永久性的磁盘数据库结构中
- 查询该数据库以检索要送入pandas数据结构的数据
- 在pandas中操作片断之后更新数据库
真实世界的例子会受到大家的赞赏,尤其是那些使用“大数据”的pandas的人。
编辑 – 我如何工作的例子:
- 迭代地导入一个大的平面文件并将其存储在一个永久性的磁盘数据库结构中。 这些文件通常太大而不适合内存。
- 为了使用pandas,我想读取这些数据的子集(通常只有几列),可以适应内存。
- 我会通过对所选列进行各种操作来创build新列。
- 然后我必须将这些新列添加到数据库结构中。
我正在试图find执行这些步骤的最佳实践方式。 阅读有关pandas和pytables的链接似乎是追加一个新的列可能是一个问题。
编辑 – 特别回应杰夫的问题:
- 我正在build立消费者信用风险模型。 数据种类包括电话,SSN和地址特征; 财产价值; 像犯罪logging,破产等贬义的信息…我每天使用的数据集平均有近1000到2000个混合数据types的字段:数字和字符数据的连续variables,名义variables和有序variables。 我很less追加行,但是我执行许多操作来创build新的列。
- 典型的操作涉及将使用条件逻辑的多个列组合成新的复合列。 例如,
if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B'
。 这些操作的结果是我的数据集中每个logging的新列。 - 最后,我想将这些新列添加到磁盘数据结构中。 我会重复第2步,用交叉表和描述性统计数据来探索数据,试图find有趣的,直观的关系模型。
- 一个典型的项目文件通常是大约1GB。 文件被组织成一行,其中包含消费者数据的logging。 每行logging的每一行都有相同的列数。 这将永远是这样的。
- 创build新列时,按行sorting是非常罕见的。 但是,在创build报告或生成描述性统计信息时,对行进行子集的处理是非常常见的。 例如,我可能想为特定的业务线创build一个简单的频率,比如零售信用卡。 要做到这一点,我只会select那些业务线=零售的logging,除了我想报告的任何一列。 然而,在创build新的列时,我会提取所有的数据行,并只提取操作所需的列。
- build模过程要求我分析每一列,寻找与某个结果variables有趣的关系,并创build描述这些关系的新复合列。 我探索的专栏通常以小集合完成。 例如,我将重点讨论一组刚刚处理财产价值的20列,并观察它们与贷款违约的关系。 一旦探索了这些内容,并创build了新的专栏,那么我就转到另一组专栏,说大学教育,重复这个过程。 我正在做的是创build候选variables来解释我的数据和一些结果之间的关系。 在这个过程的最后,我运用了一些学习技巧,从这些复合列中创build一个等式。
我很less会将行添加到数据集中。 我将几乎总是创build新的列(统计/机器学习说法中的variables或function)。
我经常以这种方式使用数十GB的数据,例如,我通过查询读取磁盘上的表,创build数据并追加回来。
这是值得阅读的文档,并在这个线程晚了几个build议如何存储您的数据。
这将影响你如何存储你的数据的细节,如:
尽可能多地提供细节; 我可以帮助你发展一个结构。
- 数据大小,行数,列数,列types; 你是追加行,还是只是列?
- 典型的操作是什么样的。 例如,对列进行查询以select一堆行和特定列,然后执行操作(内存中),创build新列,保存这些列。
(给出一个玩具的例子可以使我们提供更具体的build议。) - 处理完之后,你会怎么做? 步骤2是特设的还是可重复的?
- input平面文件:在Gb中有多less,粗略的总大小。 这些如何组织如logging? 每个文件是否包含不同的字段,还是每个文件都有一些logging,包括每个文件中的所有字段?
- 你是否曾经根据标准select行(logging)的子集(例如,select字段A> 5的行)? 然后做一些事情,或者你只是select字段A,B,C与所有的logging(然后做一些事情)?
- 你是否在“所有专栏(分组)”上工作,或者是否只有很好的比例可以用于报告(例如,你想保留数据,但是不需要在列中显示最终结果时间)?
解
确保你的pandas至less安装了0.10.1
。
读块逐块和多表查询 迭代文件 。
由于pytables经过了优化,可以按行进行操作(这是您查询的内容),我们将为每组字段创build一个表。 这样就很容易select一小部分字段(这可以在一个大桌子上工作,但是这样做更有效率……我想我可能可以在将来解决这个限制…这是无论如何更直观):
(以下是伪代码。)
import numpy as np import pandas as pd # create a store store = pd.HDFStore('mystore.h5') # this is the key to your storage: # this maps your fields to a specific group, and defines # what you want to have as data_columns. # you might want to create a nice class wrapping this # (as you will want to have this map and its inversion) group_map = dict( A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']), B = dict(fields = ['field_10',...... ], dc = ['field_10']), ..... REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []), ) group_map_inverted = dict() for g, v in group_map.items(): group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))
读取文件并创build存储(主要是做append_to_multiple
操作):
for f in files: # read in the file, additional options hmay be necessary here # the chunksize is not strictly necessary, you may be able to slurp each # file into memory in which case just eliminate this part of the loop # (you can also change chunksize if necessary) for chunk in pd.read_table(f, chunksize=50000): # we are going to append to each table by group # we are not going to create indexes at this time # but we *ARE* going to create (some) data_columns # figure out the field groupings for g, v in group_map.items(): # create the frame for this group frame = chunk.reindex(columns = v['fields'], copy = False) # append it store.append(g, frame, index=False, data_columns = v['dc'])
现在你已经拥有了文件中的所有表格(实际上,如果你愿意的话,你可以将它们存储在单独的文件中,你可能需要将文件名添加到group_map中,但这可能不是必须的)。
这是你如何获得列和创build新的:
frame = store.select(group_that_I_want) # you can optionally specify: # columns = a list of the columns IN THAT GROUP (if you wanted to # select only say 3 out of the 20 columns in this sub-table) # and a where clause if you want a subset of the rows # do calculations on this frame new_frame = cool_function_on_frame(frame) # to 'add columns', create a new group (you probably want to # limit the columns in this new_group to be only NEW ones # (eg so you don't overlap from the other tables) # add this info to the group_map store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)
当你准备好post_processing时:
# This may be a bit tricky; and depends what you are actually doing. # I may need to modify this function to be a bit more general: report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)
关于data_columns,你实际上并不需要定义任何的 data_columns; 它们允许您基于列来select行。 例如:
store.select(group, where = ['field_1000=foo', 'field_1001>0'])
在最后的报告生成阶段,他们可能会对您最感兴趣(实质上数据列与其他列分隔,如果您定义了很多,这可能会影响效率)。
你也可能想要:
- 创build一个接收字段列表的函数,在groups_map中查找这些组,然后select这些并连接结果,以便得到结果帧(这实际上是select_as_multiple所做的)。 这样的结构将是相当透明的你。
- 某些数据列上的索引(使行子集更快)。
- 启用压缩。
当你有问题时让我知道!
我想上面的答案缺less一个我发现非常有用的简单方法。
当我有一个文件太大,无法加载到内存中时,我将文件拆分成多个较小的文件(按行或列)
例如:30天大小的30天交易数据的情况下,我把它分解成大约1GB大小的文件。 我随后分别处理每个文件并汇总结果
其中一个最大的优点是可以并行处理文件(multithreading或进程)
另一个好处是可以通过常规shell命令来完成文件操作(例如在示例中添加/删除date),这在更高级/复杂的文件格式中是不可能的
这种方法并不能涵盖所有情况,但是在很多方面都非常有用
如果你的数据集在1到20GB之间,你应该得到一个48GB RAM的工作站。 然后Pandas可以将整个数据集保存在RAM中。 我知道这不是你在这里寻找的答案,但是在有4GB内存的笔记本上做科学计算是不合理的。
我知道这是一个古老的线程,但我认为Blaze库值得一试。 它是为这些types的情况而build立的。
从文档:
Blaze将NumPy和Pandas的可用性扩展到分布式和非核心计算。 Blaze提供了一个类似于NumPy ND-Array或Pandas DataFrame的界面,但将这些熟悉的界面映射到各种其他计算引擎,如Postgres或Spark。
编辑:顺便说一下,它是由ContinuumIO和NumPy的作者Travis Oliphant支持的。
pymongo就是这种情况。 我也已经在python中使用sql server,sqlite,HDF,ORM(SQLAlchemy)进行了原型开发。 首先,pymongo是一个基于文档的数据库,所以每个人都将是一个文档(属性的dict
)。 许多人形成一个集合,你可以有很多collections(人,股市,收入)。
pd.dateframe – > pymongo注意:我使用read_csv
的chunksize
来保持它5到10Klogging(pymongo如果更大,则删除套接字)
aCollection.insert((a[1].to_dict() for a in df.iterrows()))
查询:GT =大于…
pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))
.find()
返回一个迭代器,所以我通常使用ichunked
来切成更小的迭代器。
如何join,因为我通常得到10个数据源粘贴在一起:
aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))
那么(在我的情况下,有时候我必须在它的“可合并”之前先aJoinDF
一个aJoinDF
)。
df = pandas.merge(df, aJoinDF, on=aKey, how='left')
然后,您可以通过下面的更新方法将新的信息写入您的主要collections。 (逻辑收集与物理数据源)。
collection.update({primarykey:foo},{key:change})
在较小的查找,只是denormalize。 例如,您在文档中有代码,只需添加字段代码文本,并在创build文档时进行dict
查找。
现在你有一个很好的基于人的数据集,你可以释放你的逻辑在每个案件,并提供更多的属性。 最后,你可以读入pandas你的3记忆最大关键指标,并做pivot / agg /数据探索。 这对我工作的300万logging与数字/大文本/类别/代码/浮动/ …
你也可以使用MongoDB中内置的两个方法(MapReduce和聚合框架)。 在这里可以看到关于聚合框架的更多信息 ,因为它似乎比MapReduce更容易,并且看起来很方便快速的聚合工作。 注意我不需要定义我的字段或关系,我可以将项目添加到文档中。 在当前快速变化的numpy状态下,python工具集,MongoDB帮助我开始工作:)
在这个问题的两年之后,现在已经有了一个“核心外”的大pandas: dask 。 这是优秀的! 虽然它不支持所有的pandasfunction,但是可以使用它。
我发现这有点晚了,但我工作与类似的问题(抵押贷款预付款模式)。 我的解决scheme是跳过pandasHDFStore层,并使用直接的pytables。 我将每个列保存为最终文件中的单个HDF5arrays。
我的基本工作stream程是首先从数据库中获取一个CSV文件。 我gzip它,所以它不是很大。 然后,我将它转换为面向行的HDF5文件,通过在python中迭代它,将每一行转换为真正的数据types,并将其写入HDF5文件。 这需要几十分钟,但它不使用任何内存,因为它只是逐行操作。 然后,我将面向行的HDF5文件“转置”为一个面向列的HDF5文件。
表转置看起来像:
def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"): # Get a reference to the input data. tb = h_in.getNode(table_path) # Create the output group to hold the columns. grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1)) for col_name in tb.colnames: logger.debug("Processing %s", col_name) # Get the data. col_data = tb.col(col_name) # Create the output array. arr = h_out.createCArray(grp, col_name, tables.Atom.from_dtype(col_data.dtype), col_data.shape) # Store the data. arr[:] = col_data h_out.flush()
再读回来看起来像:
def read_hdf5(hdf5_path, group_path="/data", columns=None): """Read a transposed data set from a HDF5 file.""" if isinstance(hdf5_path, tables.file.File): hf = hdf5_path else: hf = tables.openFile(hdf5_path) grp = hf.getNode(group_path) if columns is None: data = [(child.name, child[:]) for child in grp] else: data = [(child.name, child[:]) for child in grp if child.name in columns] # Convert any float32 columns to float64 for processing. for i in range(len(data)): name, vec = data[i] if vec.dtype == np.float32: data[i] = (name, vec.astype(np.float64)) if not isinstance(hdf5_path, tables.file.File): hf.close() return pd.DataFrame.from_items(data)
现在,我通常在一台拥有大量内存的机器上运行这个程序,所以我对内存使用情况可能不够谨慎。 例如,默认情况下,加载操作读取整个数据集。
这通常适用于我,但它有点笨重,我不能使用花式的pytables魔术。
编辑:这种方法的真正优点,比数组loggingpytables默认,是我可以然后加载数据到R使用h5r,它不能处理表。 或者,至less,我一直无法得到它加载异构表。
还有一个变化
在pandas中完成的许多操作也可以通过db查询(sql,mongo)
使用RDBMS或mongodb允许您在DB查询中执行一些聚合(针对大数据进行优化,并高效地使用caching和索引)
稍后,您可以使用“pandas”进行后期处理。
这种方法的优点是,您可以获得数据库优化,以处理大数据,同时仍以高级声明式语法定义逻辑 – 而不必处理决定在内存中执行什么操作的细节以及要执行的操作的核心。
虽然查询语言和pandas是不同的,但是将逻辑的一部分从一个翻译到另一个通常并不复杂。
考虑一下Ruffus,如果你走的是创build一个数据pipe道的简单path,该pipe道被分解成多个较小的文件。
我发现对“大数据”用例有帮助的一个技巧是通过将浮点精度降低到32位来减less数据量。 这在所有情况下都不适用,但是在许多应用中,64位精度是过度的,2倍的内存节省是值得的。 更明显的一点是:
>>> df = pd.DataFrame(np.random.randn(int(1e8), 5)) >>> df.info() <class 'pandas.core.frame.DataFrame'> RangeIndex: 100000000 entries, 0 to 99999999 Data columns (total 5 columns): ... dtypes: float64(5) memory usage: 3.7 GB >>> df.astype(np.float32).info() <class 'pandas.core.frame.DataFrame'> RangeIndex: 100000000 entries, 0 to 99999999 Data columns (total 5 columns): ... dtypes: float32(5) memory usage: 1.9 GB
我最近遇到了类似的问题。 我发现简单地读取块中的数据,并附加它,因为我把它写入同一个csv的块中效果很好。 我的问题是基于另一个表中的信息添加一个date列,使用某些列的值如下。 这可能有助于那些被dask和hdf5搞糊涂的人,但是更熟悉像我这样的pandas。
def addDateColumn(): """Adds time to the daily rainfall data. Reads the csv as chunks of 100k rows at a time and outputs them, appending as needed, to a single csv. Uses the column of the raster names to get the date. """ df = pd.read_csv(pathlist[1]+"CHIRPS_tanz.csv", iterator=True, chunksize=100000) #read csv file as 100k chunks '''Do some stuff''' count = 1 #for indexing item in time list for chunk in df: #for each 100k rows newtime = [] #empty list to append repeating times for different rows toiterate = chunk[chunk.columns[2]] #ID of raster nums to base time while count <= toiterate.max(): for i in toiterate: if i ==count: newtime.append(newyears[count]) count+=1 print "Finished", str(chunknum), "chunks" chunk["time"] = newtime #create new column in dataframe based on time outname = "CHIRPS_tanz_time2.csv" #append each output to same csv, using no header chunk.to_csv(pathlist[2]+outname, mode='a', header=None, index=None)