通过指定ray.init(地址=<GCS addr>)连接到现有的Ray集群。在后端,Driver将以指定的地址连接到GCS,并查找群集其他组件的地址,例如其本地raylet地址。Driver必须与Ray群集的现有节点之一合部。这是因为Ray的共享内存功能,所以合部是必要的前提。
使用Ray客户端`ray.util.connect()'从远程计算机(例如笔记本电脑)连接。默认情况下,每个Ray群集都会在可以接收远程客户端连接的头节点上启动一个Ray Client Server,用来接收远程client连接。但是由于网络延迟,直接从客户端运行的某些操作可能会更慢。
Runtime所有Ray核心组件都是用C++实现的。Ray通过一个名为“core worker”的通用嵌入式C++库支持Python和Java。此库实现ownership表、进程内存储,并管理与其他工作器和Raylet的gRPC通信。由于库是用C++实现的,所有语言运行时都共享Ray工作协议的通用高性能实现。
Task的lifetimeOwner负责确保提交的Task的执行,并促进将返回的ObjectRef解析为其基础值。如下图,提交Task的进程被视为结果的Owner,并负责从raylet获取资源以执行Task,Driver拥有A的结果,Worker 1拥有B的结果。
提交Task时,Owner会等待所有依赖项就绪,即作为参数传递给Task的ObjectRefs(请参见Object的lifetime)变得可用。依赖项不需要是本地的;Owner一旦认为依赖项在群集中的任何地方可用,就会立即就绪。当依赖关系就绪时,Owner从分布式调度程序请求资源以执行任务,一旦资源可用,调度程序就会授予请求,并使用分配给owner的worker的地址进行响应。
Owner将task spec通过gRPC发送给租用的worker来调度任务。执行任务后,worker必须存储返回值。如果返回值较小,则工作线程将值直接inline返回给Owner,Owner将其复制到其进程中对象存储区。如果返回值很大,则worker将对象存储在其本地共享内存存储中,并向所有者返回分布式内存中的ref。让owner可以引用对象,不必将对象提取到其本地节点。
当Task以ObjectRef作为其参数提交时,必须在worker开始执行之前解析对象值。如果该值较小,则它将直接从所有者的进程中对象存储复制到任务说明中,在任务说明中,执行worker线程可以引用它。如果该值较大,则必须从分布式内存中提取对象,以便worker在其本地共享内存存储中具有副本。scheduler通过查找对象的位置并从其他节点请求副本来协调此对象传输。
容错:任务可能会以错误结束。Ray区分了两种类型的任务错误:
应用程序级。这是工作进程处于活动状态,但任务以错误结束的任何场景。例如,在Python中抛出IndexError的任务。
系统级。这是工作进程意外死亡的任何场景。例如,隔离故障的进程,或者如果工作程序的本地raylet死亡。
由于应用程序级错误而失败的任务永远不会重试。异常被捕获并存储为任务的返回值。由于系统级错误而失败的任务可以自动重试到指定的尝试次数。
代码参考:
src/ray/core_worker/core_worker.cc
src/ray/common/task/task_spec.h
src/ray/core_worker/transport/direct_task_transport.cc
src/ray/core_worker/transport/依赖关系_解析器.cc
src/ray/core_worker/task_manager.cc
src/ray/protobuf/common.proto
Object的lifetime下图Ray中的分布式内存管理。worker可以创建和获取对象。owner负责确定对象何时安全释放。
对象的owner就是通过提交创建task或调用ray.put创建初始ObjectRef的worker。owner管理对象的生存期。Ray保证,如果owner是活的,对象最终可能会被解析为其值(或者在worker失败的情况下引发错误)。如果owner已死亡,则获取对象值的尝试永远不会hang,但可能会引发异常,即使对象仍有物理副本。