使用NumPy、Numba和Python异步编程的高性能大数据分

几个月前,一位客户问我:“目前大数据分析中较快的Python数据结构对象是什么? ”我总被问到类似的问题。其中有一些问题很难解决,通常需要多花一些时间才能找到合适的优化解决方案。我一般会在周末和晚上做这些事,并以此为乐。

以前关于这个问题,我第一个简单的答案是Python List对象。我曾在许多数据科学项目中使用List对象,包括数据管道和提取-转换-加载(ETL)生产系统等。然后我就想到了以下问题:我可以使用List对象进行数百万或数十亿行数据的操作和分析吗?如果我将数据科学项目切分成许多小任务,然后使用最新的Python asyncio库异步运行它们,会怎么样呢?基于这些问题,我决定抽出一些时间,借助Python数据生态系统库寻找可用于大数据分析的一些实用解决方案。为了便于读者理解和快速验证结果,程序将计算由浮点数组成的一维NumPy数组的算术平均值、中值和样本标准差。为了对比程序的运行时间,我将使用以下库: 

NumPy  - NumPy是用于科学计算的基础Python包。

Numba  -  Numba提供了由Python直接编写的高性能函数来加速应用程序的能力。通过几个注释,面向数组和数学计算较多的Python代码就可以被实时编译为原生机器指令。而且Numba拥有类似于C、C++和FORTRAN的性能,无需切换语言或Python解释器。

asyncio  -  asyncio是Python异步编程库。

为什么要使用NumPy? 

正如NumPy网站所说:“NumPy是用于科学计算的基础Python包。它提供了强大的N维数组对象和复杂的(广播)功能。”导入NumPy库之后,Python程序的性能更好、执行速度更快、更容易保证一致性并能方便地使用大量的数学运算和矩阵功能。也许正因为如此,我们不再需要使用Python List对象了?重要的是,许多Python数据生态系统库都基于NumPy之上,像PandasSciPyMatplotlib等等。

用到的Python算法 

在此我将通过算术平均值、中值和样本标准差的简单计算,来展示不同Python程序的运行时间并进行对比。测试数据来自由64位浮点数构成的一维NumPy数组。我将实现以下三种Python算法并对它们进行分析:

NumPy数组

结合asyncio异步库的NumPy数组

结合Numba库的NumPy数组

NumPy数组程序 

我们来看看每个算法的代码。每个算法都有遵循面向对象编程(OOP)方法的类对象和主调用程序。类对象包含以下五种方法:

calculate_number_observation() - 计算观测数

calculate_arithmetic_mean() - 计算算术平均值

calculate_median() - 计算中值

calculate_sample_standard_deviation() - 计算样本标准差

print_exception_message() - 如果出现异常,打印异常信息

列表1显示了单独使用NumPy数组的汇总统计类对象代码。

import sys import traceback import time from math import sqrt class SummaryStatistics(object): """ 使用标准过程计算观测数、算术平均值、中值和样本标准差 """ def __init__(self): pass def calculate_number_observation(self, one_dimensional_array): """ 计算观测数 :参数 one_dimensional_array: numpy一维数组 :返回值 观测数 """ number_observation = 0 try: number_observation = one_dimensional_array.size except Exception: self.print_exception_message() return number_observation def calculate_arithmetic_mean(self, one_dimensional_array, number_observation): """ 计算算术平均值 :参数 one_dimensional_array: numpy一维数组 :参数 number_observation: 观测数 :返回值 算术平均值 """ arithmetic_mean = 0.0 try: sum_result = 0.0 for i in range(number_observation): sum_result += one_dimensional_array[i] arithmetic_mean = sum_result / number_observation except Exception: self.print_exception_message() return arithmetic_mean def calculate_median(self, one_dimensional_array, number_observation): """ 计算中值 :参数 one_dimensional_array: numpy一维数组 :参数 number_observation: 观测数 :返回值 中值 """ median = 0.0 try: one_dimensional_array.sort() half_position = number_observation // 2 if not number_observation % 2: median = (one_dimensional_array[half_position - 1] + one_dimensional_array[half_position]) / 2.0 else: median = one_dimensional_array[half_position] except Exception: self.print_exception_message() return median def calculate_sample_standard_deviation(self, one_dimensional_array, number_observation, arithmetic_mean): """ 计算样本标准差 :参数 one_dimensional_array: numpy一维数组 :参数 number_observation: 观测数 :参数 arithmetic_mean: 算术平均值 :返回值 样本标准差值 """ sample_standard_deviation = 0.0 try: sum_result = 0.0 for i in range(number_observation): sum_result += pow((one_dimensional_array[i] - arithmetic_mean), 2) sample_variance = sum_result / (number_observation - 1) sample_standard_deviation = sqrt(sample_variance) except Exception: self.print_exception_message() return sample_standard_deviation def print_exception_message(self, message_orientation = "horizontal"): """ 打印完整的异常信息 :参数 message_orientation: 水平或垂直 :返回值 空 """ try: exc_type, exc_value, exc_tb = sys.exc_info() file_name, line_number, procedure_name, line_code = traceback.extract_tb(exc_tb)[-1] time_stamp = " [Time Stamp]: " + str(time.strftime("%Y-%m-%d %I:%M:%S %p")) file_name = " [File Name]: " + str(file_name) procedure_name = " [Procedure Name]: " + str(procedure_name) error_message = " [Error Message]: " + str(exc_value) error_type = " [Error Type]: " + str(exc_type) line_number = " [Line Number]: " + str(line_number) line_code = " [Line Code]: " + str(line_code) if (message_orientation == "horizontal"): print( "An error occurred:{};{};{};{};{};{};{}".format(time_stamp, file_name, procedure_name, error_message, error_type, line_number, line_code)) elif (message_orientation == "vertical"): print( "An error occurred:\n{}\n{}\n{}\n{}\n{}\n{}\n{}".format(time_stamp, file_name, procedure_name, error_message, error_type, line_number, line_code)) else: pass except Exception: pass

列表1.  单独使用NumPy数组的汇总统计类对象代码

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/60e1d2d1a28483f2f3300925b08e2807.html