HSFS 库中的核心抽象是表示特征组、训练数据集和特征存储中的特征的元数据对象。 我们使用 HSFS 的目标是让开发人员能够使用他们喜欢的语言和框架来设计功能。 当我们在 Dataframe API 上对齐时,Dataframe 中包含的任何内容都可以写入特征存储。 如果您有现有的 ETL 或 ELT 管道,它们生成包含特征的数据帧,您可以通过简单地获取对其特征组对象的引用并使用您的数据帧作为参数调用 .insert() 来将该数据帧写入特征存储 . 这可以从定期安排的作业中调用(使用您选择的任何编排器,或者,如果您想要开箱即用的编排器,则 Hopsworks 附带 Airflow)。 但是也可以通过将批次写入 Spark 结构化流应用程序中的数据帧来连续更新特征组对象。
# populate feature group metadata object store_fg_meta = fs.create_feature_group(name="store_fg", version=1, primary_key=["store"], description="Store related features", online_enabled=True) # create feature group for the first time in feature store fg.save(Dataframe) # replace .save with .insert for scheduled batch job fg.insert(Dataframe) # if required, stream data only to the online feature store in long running Spark # Structured Streaming application fg.insert_stream(streaming_Dataframe)读取
许多现有的特征存储没有模型的表示。 然而Hopsworks 引入了训练数据集抽象来表示用于训练模型的特征集和特征值。 也就是说,不可变的训练数据集和模型之间存在一对一的映射关系,但可变特征组与不可变的训练数据集之间是一对多的关系。 您可以通过从特征组中加入、选择和过滤特征来创建训练数据集。 训练数据集包括特征的元数据,例如它们来自哪个特征组、该特征组的提交 ID 以及训练数据集中特征的顺序。 所有这些信息使 HSFS 能够在稍后的时间点重新创建训练数据集,并在服务时透明地构建特征向量。
# create a query feature_join = rain_fg.select_all() \ .join(temperature_fg.select_all(), on=["location_id"]) \ .join(location_fg.select_all()) td = fs.create_training_dataset("rain_dataset", version=1, label=”weekly_rain”, data_format=”tfrecords”) # materialize query in the specified file format td.save(feature_join) # we can also use the training dataset for serving # this serving code typically runs in a Python environment td = fs.get_training_dataset(“rain_dataset”, version=1) # get serving vector td.get_serving_vector({“location_id”: “honolulu”})在线特征库的使用方要么是使用 ML 模型的应用程序,要么是模型服务基础设施,这些基础设施通过缺失的特征来丰富特征向量。 Hopsworks 为在线库提供了一个基于 JDBC 的 API。 JDBC 具有提供高性能协议、网络加密、客户端身份验证和访问控制的优势。 HSFS 为 Python 和 Scala/Java 提供语言级别的支持。 但是,如果您的服务应用程序在不同的编程语言或框架中运行,您总是可以直接使用 JDBC。
6. BenchmarksMikael Ronstrom(NDB 集群的发明者和逻辑时钟的数据负责人,领导 RonDB 团队)为 RonDB 提供了 sysbench 基准测试,并提供了针对 Redis 的比较性能评估。在本节中我们展示了 OnlineFS 服务的性能,能够处理和维持写入在线特征存储的高吞吐量,以及对 Hopsworks 中典型托管 RonDB 设置的特征向量查找延迟和吞吐量的评估。
在此基准测试中,Hopsworks 设置了 3xAWS m5.2xlarge(8 个 vCPU,32 GB)实例(1 个头,2 个工作器)。 Spark 使用 worker 将数据帧写入在线库。此外相同的工作人员被重新用作客户端,在在线特征存储上执行读取操作以进行读取基准测试。
RonDB 设置了 1x AWS t3.medium(2 vCPU,4 GB)实例作为管理节点,2x r5.2xlarge(8 vCPU,64 GB)实例作为数据节点,3x AWS c5.2xlarge(8 vCPU,16 GB) ) MySQL 服务器的实例。这种设置允许我们在具有 2 倍复制的在线特征存储中存储 64GB 的内存数据。 MySQL 服务器为在线特征存储提供 SQL 接口,在此基准测试中,我们没有使 RonDB 数据节点完全饱和,因此可以潜在地添加更多 MySQL 服务器和客户端,以增加超出此处所示水平的吞吐量。
写吞吐我们对 OnlineFS 服务中写入 RonDB 的吞吐量进行了基准测试。 此外,我们测量了从 Kafka 主题中获取记录到提交到 RonDB 之间处理记录所需的时间。 对于这个基准测试,我们部署了两个 OnlineFS 服务,一个在头节点上,一个在 MySQL 服务器节点之一上。
我们通过将 20M 行从 Spark 应用程序写入在线特征存储来运行实验。 经过短暂的预热期后,两个服务实例的吞吐量稳定在约 126K 行/秒(11 个特征)、约 90K 行/秒(51 个特征)和最大特征向量约 60K 行/秒。 由于其设计,这可以通过添加更多服务实例轻松扩展。