最近加入一个Spark项目,作为临时的开发人员协助进行开发工作。该项目中不存在测试的概念,开发人员按需求进行编码工作后,直接向生产系统部署,再由需求的提出者在生产系统检验程序运行结果的正确性。在这种原始的工作方式下,产品经理和开发人员总是在生产系统验证自己的需求、代码。可以想见,各种直接交给用户的错误导致了一系列的事故和不信任。为了处理各类线上问题,大家都疲于奔命。当工作进行到后期,每一个相关人都已经意气消沉,往往对工作避之不及。
为了改善局面,我尝试了重构部分代码,将连篇的SQL分散到不同的方法里,并对单个方法构建单元测试。目的是,在编码完成后,首先在本地执行单元测试,以实现:
部署到生产系统的代码中无SQL语法错误。
将已出现的bug写入测试用例,避免反复出现相同的bug。
提前发现一些错误,减少影响到后续环节的问题。
通过自动化减少开发和处理程序问题的总时间花费。
通过流程和结果的改善,减少开发人员的思维负担,增加与其他相关人的互信。
本文将介绍我的Spark单元测试实践,供大家参考、批评。本文中的Spark API是PySpark,测试框架为pytest。
对于希望将本文当作单元测试教程使用的读者,本文会假定读者已经准备好了开发和测试所需要的环境,如果没有也没有关系,文末的参考部分会包含一些配置环境相关的链接。
本文链接:https://www.cnblogs.com/hhelibeb/p/10534862.html
转载请注明
概念 定义单元测试是一种测试方法,它的对象是单个程序单元/组件,目的是验证软件的每个组件都符合设计要求。
单元是软件中最小的可测试部分。它通常包含一些输入和单一的输出。
本文中的单元就是python函数(function)。
单元测试通常是程序开发人员的工作。
原则为了实现单元测试,函数最好符合一个条件,
对于相同的输入,函数总有相同的输出。
这要求函数内部不能存在“副作用”。
它的输出结果的确定不应该依赖输入参数外的任何内容,例如,不可以因为本地测试环境中没有相应的数据库就产生“连接数据库异常”导致无法返回结果。
它也不应该改变除了返回结果以外的任何内容,例如,不可以改变全局可变状态。
代码实践下面是数据和程序部分。
数据假设我们的服务对象是一家水果运销公司,公司在不同城市设有仓库,现有三张表,其中inventory包含水果的总库存数量信息,inventory_ratio包含水果在不同城市的应有比例,
目标是根据总库存数量和比例算出水果在各地的库存,写入到第三张表inventory_city中。三张表的列如下,
1. inventory. Columns: “item”, “qty”. 2. inventory_ratio. Columns: “item”, “city”, “ratio”. 3. inventory_city. Columns: “item”, “city”, “qty”. 第一版代码用最直接的方式实现这一功能,代码将是,
from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate() result = spark.sql('''select t1.item, t2.city, case when t2.ratio is not null then t1.qty * t2.ratio else t1.qty end as qty from v_inventory as t1 left join v_ratio as t2 on t1.item = t2.item ''') result.write.csv(path="somepath/inventory_city", mode="overwrite")