利用chunksize参数,我们可以为指定的数据集创建分块读取IO流,每次最多读取设定的chunksize行数据,这样我们就可以把针对整个数据集的任务拆分为一个一个小任务最后再汇总结果:
from tqdm.notebook import tqdm # 在降低数据精度及筛选指定列的情况下,以1千万行为块大小 raw = pd.read_csv('train.csv', dtype={ 'ip': 'int32', 'app': 'int16', 'os': 'int16' }, usecols=['ip', 'app', 'os'], chunksize=10000000) # 从raw中循环提取每个块并进行分组聚合,最后再汇总结果 result = \ ( pd .concat([chunk .groupby(['app', 'os'], as_index=False) .agg({'ip': 'count'}) for chunk in tqdm(raw)]) .groupby(['app', 'os']) .agg({'ip': 'sum'}) ) result 图9可以看到,利用分块读取处理的策略,从始至终我们都可以保持较低的内存负载压力,并且一样完成了所需的分析任务,同样的思想,如果你觉得上面分块处理的方式有些费事,那下面我们就来上大招:
利用dask替代pandas进行数据分析
dask相信很多朋友都有听说过,它的思想与上述的分块处理其实很接近,只不过更加简洁,且对系统资源的调度更加智能,从单机到集群,都可以轻松扩展伸缩。
图10推荐使用conda install dask来安装dask相关组件,安装完成后,我们仅仅需要需要将import pandas as pd替换为import dask.dataframe as dd,其他的pandas主流API使用方式则完全兼容,帮助我们无缝地转换代码:
图11可以看到整个读取过程只花费了313毫秒,这当然不是真的读进了内存,而是dask的延时加载技术,这样才有能力处理超过内存范围的数据集。
接下来我们只需要像操纵pandas的数据对象一样正常书写代码,最后加上.compute(),dask便会基于前面搭建好的计算图进行正式的结果运算:
( raw # 按照app和os分组计数 .groupby(['app', 'os']) .agg({'ip': 'count'}) .compute() # 激活计算图 )并且dask会非常智能地调度系统资源,使得我们可以轻松跑满所有CPU:
图12关于dask的更多知识可以移步官网自行学习( https://docs.dask.org/en/latest/ )。