使用Pandas_UDF快速改造Pandas代码

PySpark和panda之间改进性能和互操作性的其核心思想是将Apache Arrow作为序列化格式,以减少PySpark和pandas之间的开销。

Pandas_UDF是在PySpark2.3中新引入的api。Pandas_UDF是用户定义函数,由Spark使用Arrow传输数据,使用Pandas处理数据。Pandas_UDF是使用关键字Pandas_UDF作为装饰器或包装函数来定义的,不需要额外的配置。目前,有两种类型的Pandas_UDF是,分别是Scalar(标量映射)和Grouped Map(分组映射)。

1.1 Scalar

Scalar Pandas UDF用于向量化标量操作。常常与select和withColumn等函数一起使用。其中调用的Python函数需要使用pandas.Series作为输入并返回一个具有相同长度的pandas.Series。具体执行流程是,Spark将列分成批,并将每个批作为数据的子集进行函数的调用,进而执行panda UDF,最后将结果连接在一起。

下面的示例展示如何创建一个scalar panda UDF,计算两列的乘积:

import pandas as pd from pyspark.sql.functions import col, pandas_udf from pyspark.sql.types import LongType # 声明函数并创建UDF
def multiply_func(a, b): return a * b multiply = pandas_udf(multiply_func, returnType=LongType()) x = pd.Series([1, 2, 3]) df = spark.createDataFrame(pd.DataFrame(x, columns=["x"])) # Execute function as a Spark vectorized UDF df.select(multiply(col("x"), col("x"))).show() # +-------------------+ # |multiply_func(x, x)| # +-------------------+ # | 1| # | 4| # | 9| # +-------------------+

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpxsgp.html