高效地将一个函数应用到分组的pandasDataFrame上
我经常需要将一个函数应用于一个非常大的DataFrame
(混合数据types)的组,并希望利用多个核心。
我可以从组中创build一个迭代器并使用多处理模块,但是效率不高,因为每个组和结果都必须在进程之间进行消息传递。
有没有办法避免酸洗,甚至完全避免DataFrame
的复制? 看起来多处理模块的共享内存function仅限于numpy
数组。 还有其他的select吗?
从上面的评论来看,似乎这是为pandas
计划的一段时间(也有一个我刚注意到的有趣的rosetta
项目 )。
然而,直到每一个并行function被并入pandas
,我注意到直接使用cython
+ OpenMP和C ++编写高效和非内存复制的并行增强是非常容易的。
下面是一个简单的例子,写一个并行的小组总和,其用法是这样的:
import pandas as pd import para_group_demo df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)}) print para_group_demo.sum(df.a, df.b)
输出是:
sum key 0 6 1 11 2 4
注意无疑,这个简单的例子的function最终将成为pandas
一部分。 但是,有一些事情在C ++中并行化一段时间会更加自然,重要的是要意识到将其结合到pandas
是多么容易。
为此,我编写了一个简单的单源文件扩展,其代码如下。
它从一些导入和types定义开始
from libc.stdint cimport int64_t, uint64_t from libcpp.vector cimport vector from libcpp.unordered_map cimport unordered_map cimport cython from cython.operator cimport dereference as deref, preincrement as inc from cython.parallel import prange import pandas as pd ctypedef unordered_map[int64_t, uint64_t] counts_t ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t ctypedef vector[counts_t] counts_vec_t
C ++ unordered_map
types用于由单个线程进行求和,并且该vector
用于由所有线程进行求和。
现在到函数sum
。 它以快速访问的键入内存视图开始:
def sum(crit, vals): cdef int64_t[:] crit_view = crit.values cdef int64_t[:] vals_view = vals.values
该函数继续将线程的半等分(这里硬编码为4),并且让每个线程在其范围内求和:
cdef uint64_t num_threads = 4 cdef uint64_t l = len(crit) cdef uint64_t s = l / num_threads + 1 cdef uint64_t i, j, e cdef counts_vec_t counts counts = counts_vec_t(num_threads) counts.resize(num_threads) with cython.boundscheck(False): for i in prange(num_threads, nogil=True): j = i * s e = j + s if e > l: e = l while j < e: counts[i][crit_view[j]] += vals_view[j] inc(j)
线程完成后,函数将所有结果(来自不同的范围)合并成一个unordered_map
:
cdef counts_t total cdef counts_it_t it, e_it for i in range(num_threads): it = counts[i].begin() e_it = counts[i].end() while it != e_it: total[deref(it).first] += deref(it).second inc(it)
剩下的就是创build一个DataFrame
并返回结果:
key, sum_ = [], [] it = total.begin() e_it = total.end() while it != e_it: key.append(deref(it).first) sum_.append(deref(it).second) inc(it) df = pd.DataFrame({'key': key, 'sum': sum_}) df.set_index('key', inplace=True) return df