自第一次发布以来,Hopsworks 一直使用 NDB Cluster(RonDB 的前身)作为在线特征存储。2020 年我们创建了 RonDB 作为 NDB Cluster 的托管版本,并针对用作在线特征存储进行了优化。
但是在 Hopsworks 中我们将 RonDB 用于不仅仅是在线特征存储。RonDB 还存储整个特征存储库的元数据,包括模式、统计信息和提交。 RonDB 还存储了文件系统 HopsFS 的元数据,其中存储了离线 Hudi 表。使用 RonDB 作为单个元数据数据库,我们使用事务和外键来保持 Feature Store 和 Hudi 元数据与目标文件和目录(inode)一致。Hopsworks 可通过 REST API 或直观的 UI(包括特征目录)访问或通过 Hopsworks 特征存储 API (HSFS) 以编程方式访问。
4. OnlineFS:可扩展的在线特征物化引擎有了底层的 RonDB 和所需的元数据,我们就能够构建一个横向扩展、高吞吐量的物化服务,以在在线特征存储上执行更新、删除和写入——我们简单地将其命名为 OnlineFS。
OnlineFS 是一种使用 ClusterJ 直接访问 RonDB 数据节点的无状态服务。 ClusterJ 被实现为原生 C++ NDB API 之上的高性能 JNI 层,提供低延迟和高吞吐量。由于 RonDB 中元数据的可用性,例如 avro 模式和特征类型,我们能够使 OnlineFS 无状态。 使服务无状态允许我们通过简单地添加或删除服务的实例来向上和向下扩展对在线特征存储的写入,从而随着实例的数量线性地增加或减少吞吐量。
让我们完成将数据写入在线特征存储所需的步骤,这些步骤在下图中编号。
特征作为 Pandas 或 Spark DataFrame写入特征存储
每个 Dataframe 更新一个称为特征组的表(离线存储中有一个类似的表)。一个特征组中的特征共享同一个主键,可以是复合主键。 主键与元数据的其余部分一起被跟踪。 因此Hopsworks 特征存储库有一个 Dataframe API,这意味着特征工程的结果应该是将写入到特征存储的常规 Spark、Spark Structured Streaming 或 Pandas Dataframe。对于所有三种类型的DataFrame,用于写入特征存储的 API 几乎相同。 通过对特征组对象的引用可以插入DataFrame。 特征组在创建时已配置为将 Dataframe 存储到在线和离线库或仅存储到其中之一。
编码和产生
Dataframe 的行使用 avro 进行编码并写入在 Hopsworks 上运行的 Kafka中。每个特性组都有自己的 Kafka 主题,具有可配置的分区数量,并按主键进行分区,这是保证写入顺序所必需的。
消费和解码
我们使用 Kafka 来缓冲来自 Spark 特征工程作业的写入,因为直接写入 RonDB 的大型 Spark 集群可能会使 RonDB 过载,因为现有 Spark JDBC 驱动程序中缺乏背压。OnlineFS 从 Kafka 读取缓冲的消息并对其进行解码。 重要的是OnlineFS 仅解码原始特征类型,而嵌入等复杂特征以二进制格式存储在在线特征存储中。
基于主键的Upsert
OnlineFS 可以使用 ClusterJ API 将行实际更新插入到 RonDB。Upsert 分批执行(具有可配置的批量大小)以提高吞吐量。
由于管道步骤中的所有服务都可以访问相同的元数据,因此我们能够向用户隐藏与编码和模式相关的所有复杂性。 此外所有涉及的服务都是水平可扩展的(Spark、Kafka、OnlineFS),并且由于我们类似于流的设置,该过程不会创建不必要的数据副本,即没有写放大。 由于模式注册表、X.509 证书管理器和 Hopsworks 中的 Kafka 等服务的可用性,这种高度可扩展的设置成为可能。 在任何时候X.509 证书都用于双向身份验证,而 TLS 用于加密网络流量。
5. 可访问性意味着透明的 API在分布式系统中,我们经常谈论透明度。 如果分布式系统对开发人员隐藏网络访问和实现特定知识,则它是透明的。 在 Hopsworks 特征存储库中,写入是通过相同的 API 透明地完成的,如前所述(1)无论是常规的 Spark、Spark Streaming 还是 Pandas 以及(2)系统负责一致地更新在线和离线存储
插入