1712.05889_Ray: A Distributed Framework for Emerging AI Applications #################################################################### * https://arxiv.org/abs/1712.05889 * 组织: UC Berkeley * scholar: 1619 Abstract ======== * The next generation of AI applications will continuously interact with the environment and learn from these interactions. * These applications impose new and demanding systems requirements, both in terms of performance and flexibility. * In this paper, we consider these requirements and present Ray—a distributed system to address them. * Ray implements a unified interface that can express both task-parallel and actor-based computations, supported by a single dynamic execution engine. * To meet the performance requirements, Ray employs a distributed scheduler and a distributed and fault-tolerant store to manage the system’s control state. * In our experiments, we demonstrate scaling beyond 1.8 million tasks per second and better performance than existing specialized systems for several challenging reinforcement learning applications. 1. Introduction =============== * 这段内容整体讲了什么? 1. 过去20年数据分析的发展 → 出现了很多大数据处理框架。 2. 现在 AI/ML 特别是强化学习 (RL) 的兴起 → 带来了新的系统需求,这些需求是传统框架无法满足的。 3. 为什么需要一个新的系统 → 针对 RL 训练、仿真、部署的特点,现有的大数据或深度学习框架不够用。 4. Ray 框架的提出 → 一个统一的、灵活的集群计算框架,专门解决 RL 和类似工作负载的问题。 * 大数据框架的起点 - 背景:过去20年,企业/组织收集了大量数据。 - 催生了很多分布式计算框架: - 批处理框架 (MapReduce, Spark) - 流处理框架 (Naiad, Storm) - 图处理框架 (GraphX, Pregel) - 这些框架帮助企业/科研机构 —— 让“大数据分析”成为日常业务战略的一部分 → 进入“Big Data”时代。 * AI/ML 尤其是深度学习的兴起 - 数据分析不仅是简单查询,还扩展到更复杂的 AI / ML,特别是 监督学习。 - 典型代表:深度神经网络,用大量带标签的数据训练模型。 - 为此,又出现了 专门的深度学习框架,比如 TensorFlow, MXNet, PyTorch。 - 这些框架通常依赖 GPU/TPU 硬件,以提高批量训练效率。 * 强化学习 (RL) 的不同之处 - RL 应用越来越多,特点是: - 动态环境:环境会变化。 - 连续动作:需要不断做出决策,长期优化目标。 - 反馈滞后:不像监督学习那样直接给标签,要等到一系列动作完成才知道效果。 - 典型案例: - AlphaGo - 无人机控制、对话系统、机器人操作 * RL 应用的核心需求 - 一个 RL 系统要具备 三个能力: 1. 仿真:用模拟环境测试不同策略,学到长期效果。 2. 分布式训练:和深度学习一样,需要多机分布式训练。 3. 策略部署:要把学到的策略实时部署,用于控制实际系统(如无人机、游戏 AI)。 - 这些需求对系统的新要求 - 细粒度计算:比如与现实环境交互时毫秒级响应。 - 异质性: - 时间上 → 有的仿真几毫秒,有的可能几小时。 - 资源上 → 训练用 GPU,仿真用 CPU。 - 动态性:仿真结果、环境反馈会影响后续决策。 * 现有框架的不足 - 传统系统(大数据框架、深度学习框架)不适用: - 批处理 (MapReduce, Spark) → 粗粒度,不适合实时、细粒度仿真。 - 任务并行 (Dask, CIEL) → 不支持分布式训练和部署。 - 流处理 (Storm, Naiad) → 同样欠缺训练能力。 - 深度学习框架 (TF, MXNet) → 不擅长仿真与实时部署。 - 模型服务系统 (Clipper, TF Serving) → 只负责部署,不负责训练和仿真。 - 组合多个系统 理论可行,但实践中: - 开发成本高,需要自行解决调度、容错、数据流等问题。 - 很多 RL 应用目前是 定制开发,重复造轮子,效率低下。 * Ray 框架 - Ray目标: - 一个通用分布式框架,统一支持: - 仿真 (Simulation) - 训练 (Training) - 部署 (Serving) - 设计关键点: 1. 统一接口 → 结合任务并行 (task) 和 actor 模型。 2. 任务并行 → 高效动态调度仿真、故障恢复。 3. actor → 保持状态,比如模型训练过程中的参数。 4. 底层动态调度引擎,支持百万级任务每秒调度。 - 性能优化: - 去中心化: 1. 调度器是分布式的。 2. 元数据存储是分片存储,高效容错。 - 无状态设计,除了元数据,其他组件都是无状态,易扩展。 - 定位 - Ray 并不是想替代 Spark 或 TF Serving,而是专门解决 RL 及类似应用的 端到端统一性问题。 +--------------+--------------------------------------------------------------------------------------------------+---+ | 问题 | 现状 | | +==============+==================================================================================================+===+ | --- | --- | | +--------------+--------------------------------------------------------------------------------------------------+---+ | 过去的系统 | 适用于大数据批处理、深度学习训练或模型部署,各自擅长某一方面,但 缺乏统一性 | | +--------------+--------------------------------------------------------------------------------------------------+---+ | RL 应用特点 | 需要 仿真 + 训练 + 部署 的组合,且要求 细粒度调度、异质性支持、动态性 | | +--------------+--------------------------------------------------------------------------------------------------+---+ | 现有系统痛点 | 不能满足 RL 需求,现有组合方案开发成本高,工程负担重 | ˇ | +--------------+--------------------------------------------------------------------------------------------------+---+ | Ray 方案 | 提供一个统一接口,结合 task-parallel 和 actor 模型,支持仿真、训练、部署,具备高可扩展性和容错性 | | +--------------+--------------------------------------------------------------------------------------------------+---+ .. figure:: https://img.zhaoweiguo.com/uPic/2025/03/q0ESaE.png Figure 1: Example of an RL system. 2. Motivation and Requirements ============================== .. figure:: https://img.zhaoweiguo.com/uPic/2025/03/bgzC2R.png Figure 2:Typical RL pseudocode for learning a policy. * 大背景:为什么要设计 Ray 这样一个系统,以及 RL(强化学习)系统到底有哪些需求,现有的框架为什么不合适。 * 核心概念: - 强化学习 (RL) 的三个关键步骤: - Simulation 模拟 —— 环境交互,生成数据。 - Training 训练 —— 通过模拟数据,用梯度下降等方法更新策略。 - Serving 部署 —— 把训练好的策略拿来做实际决策(实时推断)。 * RL系统的核心过程 - 强化学习就是一个“Agent(智能体)”和“环境”反复交互的过程,目的是学会一个“策略”来最大化奖励。 - “策略”本质就是 根据当前环境状态,选择一个动作的函数。 * 典型RL学习流程 - Policy Evaluation 策略评估 —— 看看当前策略表现如何。 - 评估时,Agent 跟环境交互生成一串 “轨迹”,每条轨迹记录了:1.当前状态2.采取动作后得到的奖励。这些轨迹就是我们训练的依据。 - Policy Improvement 策略改进 —— 基于评估结果,调整策略。 - 用这些轨迹,计算梯度,更新策略,使得以后获得的奖励更大。 * RL的三大工作负载 +-----------------+--------------------------------------------------+--------------------------------------+ | 任务 | 作用 | 特点 | +=================+==================================================+======================================+ | Simulation 模拟 | 产生数据,环境交互 | 粒度细,有时几毫秒,有时几分钟 | +-----------------+--------------------------------------------------+--------------------------------------+ | Training 训练 | 用数据更新策略 | 分布式训练,依赖 CPU/GPU/TPU | +-----------------+--------------------------------------------------+--------------------------------------+ | Serving 部署 | 拿训练好的策略实时做决策 | 低延迟,负载均衡,多节点服务 | +-----------------+--------------------------------------------------+--------------------------------------+ * RL不同于监督学习的地方 - 监督学习中:训练模型 & 部署模型 可以用不同系统,没有强耦合。 - RL中:训练、模拟、部署三者相互依赖,必须协同工作。 - 现有框架都是只管其中一环,没法整体高效处理 RL。 * RL框架的需求总结 * Ray 设计时需要满足的三个关键需求: 1) Fine-grained, heterogeneous computations 细粒度 & 异构计算 - 计算时间跨度大:有的任务几毫秒(比如棋盘动作模拟),有的几个小时(比如复杂训练)。 - 硬件异构:训练用 GPU/TPU,模拟用 CPU。 - 现有系统很多只能处理“粗粒度、大批量、均匀任务”,不适合 RL。 2) Flexible computation model 灵活的计算模型 - Stateless 无状态:模拟、数据预处理 → 可以随便调度,方便负载均衡。 - Stateful 有状态:比如 Parameter Server、第三方仿真器 → 需要长期保存状态,通常运行在特定硬件上(如GPU)。 - 需要既支持无状态也支持有状态的任务。 3) Dynamic execution 动态执行 - 任务完成的顺序 无法预知(模拟过程谁先结束不一定)。 - 任务结果 影响未来调度(比如模拟失败,可能要补跑)。 * 额外要求 * Ray 还要能 每秒处理百万级任务,才能满足大规模 RL 应用。 .. figure:: https://img.zhaoweiguo.com/uPic/2025/03/GoJ8Eq.png Table 1:Ray API Table 2:Tasks vs. actors tradeoffs:: +---------------------------------+--------------------------------+ | Tasks (stateless) | Actors (stateful) | +=================================+================================+ | Fine-grained load balancing | Coarse-grained load balancing | +---------------------------------+--------------------------------+ | Support for object locality | Poor locality support | +---------------------------------+--------------------------------+ | High overhead for small updates | Low overhead for small updates | +---------------------------------+--------------------------------+ | Efficient failure handling | Overhead from checkpointing | +---------------------------------+--------------------------------+ .. note:: 一句话:Ray 的设计动机来源于 RL 的独特需求:训练、模拟、部署三者 高度耦合,有 异构、细粒度、动态、状态管理 等复杂需求,现有的大数据和深度学习框架无法满足,所以要设计一个 灵活、动态、高性能 的分布式系统来统一支撑它们。 3. Programming and Computation Model ==================================== * 本章节主要讲:Ray 的核心编程模型 以及它在 动态任务图 计算模型下如何支持分布式计算的,内容偏技术实现层面。 * 背景概念 - Ray 设计的出发点是:既支持无状态的分布式任务计算,又支持有状态的、带内部状态更新的计算,这是它相比其他框架(CIEL、Orleans、Akka)的一大优势。 - 它提供两种核心抽象: - Task (任务) → 适合无状态计算,易并行。 - Actor (演员/角色) → 适合有状态计算,连续维护内部状态。 .. figure:: https://img.zhaoweiguo.com/uPic/2025/03/I1o0WI.png Figure 3: Python code implementing the example in Figure 2 in Ray. Note that @ray.remote indicates remote functions and actors. Invocations of remote functions and actor methods return futures, which can be passed to subsequent remote functions or actor methods to encode task dependencies. Each actor has an environment object self.env shared between all of its methods. .. figure:: https://img.zhaoweiguo.com/uPic/2025/03/xgqxFQ.png Figure 4: The task graph corresponding to an invocation of train ``policy.remote()`` in Figure 3. Remote function calls and the actor method calls correspond to tasks in the task graph. The figure shows two actors. The method invocations for each actor (the tasks labeled A1i and A2i ) have stateful edges between them indicating that they share the mutable actor state. There are control edges from train policy to the tasks that it invokes. To train multiple policies in parallel, we could call train ``policy.remote()`` multiple times. 3.1 Programming Model --------------------- * Task - 定义:Task 是一个远程函数的调用,运行在无状态 worker 上。 - 特点: - 无状态 & 无副作用 → 只依赖输入,输出确定。 - 可并行调度,粒度细。 - 返回值是 Future:调用 Task 后立即返回 Future 对象,你可以马上开始其他计算,等它完成后再取结果。 - 支持 Futures 作为输入传递,可以自然表达依赖关系和并行性。 - 易于失败恢复 → 因为 Task 没有内部状态,只需重新执行即可。 * Actor - 定义:Actor 是一个有状态的远程对象,它的方法可以远程调用。 - 特点: - 有状态,每个方法调用会修改内部状态。 - 方法串行执行,保证一致性。 - 方法执行也返回 Future,和 Task 类似,但它背后有状态。 - 可以 把 Actor 句柄传递给其他 Task 或 Actor,实现跨节点协作。 - 适合实现: - 参数服务器(Parameter Server) - GPU 上的反复迭代训练 - 包装第三方仿真器(如游戏环境、仿真引擎),这些通常很难序列化。 * 增强 API 的三个关键点(解决 RL 的需求): 1. ray.wait(): - 等待前 **k** 个任务完成,而不是所有任务 → 应对任务执行时间差异大、需要先用已有结果继续计算的情况。 2. 资源需求声明: - 开发者可以指定每个任务需要多少 CPU/GPU/内存 → 方便调度器合理分配资源,支持异构硬件。 3. 嵌套远程函数: - 远程函数里可以调用其他远程函数 → 支持高度可扩展和分布式执行,提升吞吐量。 3.2 Computation Model --------------------- * Ray 把整个应用的执行流程建模成一个 动态演化的有向图 (Task Graph),这个图会随着程序运行不断扩展。 * 图的元素: - 节点: 1. 数据对象(如张量、模型参数等) 2. Task 调用(远程函数) 3. Actor 方法调用(也是一种特殊的 Task) - 边: 1. 数据边 (Data Edge): - 表示任务的输入/输出依赖。 - 如果 Task 产生数据 → 从 Task 指向数据。 - 如果数据是 Task 的输入 → 从数据指向 Task。 2. 控制边 (Control Edge): - 表示嵌套 Task 调用之间的调度顺序依赖。 3. 状态边 (Stateful Edge): - 核心 → 用于描述 Actor 内部连续方法调用的顺序依赖。 - 如果同一个 Actor 上调用方法 M1 → M2 → M3,它们会连成一条链。 - 体现出 Actor 内部状态从一次方法调用传递到下一次调用的过程。 * 为什么要引入状态边? - Task 图本质是无状态的,但 RL 任务和很多分布式应用都有 “状态”。 - 状态边能: - 让 Actor 与无状态的 Task 共同存在于一个统一的调度和恢复系统中。 - 方便故障恢复:和 Spark 一样,Ray 会跟踪 数据血缘 (Lineage),丢失数据或状态时可以根据任务图和状态边重建。 总结 ---- * Ray 用一个 动态演化的任务图 来表达 RL 训练中涉及的所有计算,不仅支持无状态的并行计算 (Tasks),还能优雅处理有状态的计算 (Actors)。 * 它通过数据边、控制边和状态边将它们有机整合起来,同时为分布式调度、资源管理、故障恢复提供了坚实基础。 4. Architecture =============== .. figure:: https://img.zhaoweiguo.com/uPic/2025/03/jCt1Ol.png Figure 5: Ray’s architecture consists of two parts: an application layer and a system layer. The application layer implements the API and the computation model described in Section 3, the system layer implements task scheduling and data management to satisfy the performance and fault-tolerance requirements 4.1 Application Layer --------------------- * 有三种类型的进程: - Driver:用户程序运行的进程,负责发起任务。 - Worker:无状态的工作进程,执行 driver 或其他 worker 调用的任务。它执行一个任务就结束,没有本地状态。 - Actor:有状态的进程,专门执行它暴露的方法,状态会保留,适合需要状态维护的任务(比如强化学习中的环境)。 4.2 System Layer ---------------- * 系统层由三个主要组件组成:全局控制存储、分布式调度器和分布式对象存储。所有组件都具有水平可扩展性和容错性。 4.2.1 Global Control Store (GCS) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ * 核心作用:存储整个系统的控制状态,设计上是一个高可用、可扩展的 key-value 存储,带有 pub-sub 功能。 * 为什么需要它? - 系统要支持 百万级别的动态任务,还要容错,不能依赖单节点(如 master 节点)。 - GCS 负责存储 任务 lineage 信息(即谁生成了什么对象、对象依赖什么任务),保证失败后可以重算。 - 去中心化:不像 Spark 等系统把所有调度、对象元数据塞进一个 scheduler,GCS 单独负责这些信息,让其他组件(比如调度器和对象存储)保持无状态,容易扩展。 4.2.2 Bottom-Up Distributed Scheduler ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. figure:: https://img.zhaoweiguo.com/uPic/2025/03/vUUUSi.png Figure 6: Bottom-up distributed scheduler. Tasks are submitted bottom-up, from drivers and workers to a local scheduler and forwarded to the global scheduler only if needed (Section 4.2.2). The thickness of each arrow is proportional to its request rate. * 核心作用:实现高效的分布式任务调度。 * 设计特点: - 两级结构:本地调度器 + 全局调度器 - 本地调度器:每个节点一个,先尝试本地调度,避免全局调度器成为瓶颈。 - 全局调度器:只有在本地排不动了或资源不够时才会转发到全局调度器。 * 调度考虑因素: - 节点的当前负载 - 任务资源需求 - 数据局部性 - 估算等待时间(排队时间 + 数据传输时间) * 高扩展性:全局调度器可以有多个副本,状态共享通过 GCS 实现。 4.2.3 In-Memory Distributed Object Store ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ * 核心作用:存储任务的输入输出数据,全部放在内存中,保证超低延迟。 * 特点: - 每个节点上用 共享内存 实现,任务间 零拷贝 传输。 - 数据格式用 Apache Arrow,高效。 - 输入如果不在本地,执行前先复制到本地。 - 输出直接写到本地 Object Store。 - 数据是 不可变 的,简化容错处理(失败了,按 lineage 重新执行生成数据)。 - 数据淘汰策略:LRU(放不下时丢到磁盘)。 4.3 Putting Everything Together ------------------------------- * 看一个完整例子,解释数据和控制流: .. figure:: https://img.zhaoweiguo.com/uPic/2025/03/uiIdW3.png Figure 7: An end-to-end example that adds a and b and returns c. Solid lines are data plane operations and dotted lines are control plane operations. (a) The function ``add()`` is registered with the GCS by node 1 (N1), invoked on N1, and executed on N2. (b) N1 gets add()’s result using ``ray.get()`` . The Object Table entry for c is created in step 4 and updated in step 6 after c is copied to N1. * N1 上的 Driver 想执行 add(a, b) 并拿到结果 c * Step 0:add 函数在 GCS 注册,所有 Worker 知道这个函数。 * a)Executing a task remotely:: (a) 调用 add(a, b) 的流程: 1. Driver 调用 `add.remote(a, b)`,发给 N1 本地调度器。 2. 本地调度器看本地是否可以执行,发现不合适,发到全局调度器。 3. 全局调度器从 GCS 查询 a, b 的位置,发现 b 在 N2。 4. 决定在 N2 执行(因为数据局部性好)。 5. N2 的本地调度器检查本地是否有 a 和 b。 6. N2 没有 a,去 GCS 查位置。 7. 知道 a 在 N1,于是从 N1 复制到 N2。 8. 数据准备好了,N2 调度 worker 执行 add(a, b)。 9. worker 直接从共享内存读 a, b,执行任务。 * b)Returning the result of a remote task:: (b) Driver 获取 c 的流程: 1. Driver 调用 `ray.get(c)`,先查自己本地的 Object Store。 2. 本地没有 c,于是去 GCS 查。 3. 此时 c 还没产生,GCS 让 Driver 注册一个回调。 4. N2 上 add 任务完成,把 c 存到本地 Object Store,并更新 GCS。 5. GCS 触发回调,通知 N1,c 位置已更新。 6. N1 从 N2 拉取 c。 7. Driver 成功拿到 c,`ray.get()` 返回。 5. Evaluation ============= * 核心是 Ray 系统的性能评估 * 这段内容的重点在于验证 Ray 是否具备低延迟、高扩展性和容错性,尤其是在 RL(Reinforcement Learning,强化学习)这类对计算需求高、任务复杂的应用场景中的表现。 * 总体评估目标:用四个核心问题来评估 Ray: 1. Ray 是否满足低延迟、可扩展性和容错的需求? 2. 用 Ray API 写分布式原语(如 allreduce)时,额外的开销如何? 3. 在 RL 任务中,Ray 相比于专用系统(比如专用的训练、推理、仿真系统)表现如何? 4. Ray 在 RL 应用中相对于定制系统的优势是什么? * 结论:Ray 在低延迟、扩展性、容错、灵活性上都有优秀表现,尤其适合强化学习等复杂分布式应用。它既能和专用系统性能持平甚至超越,同时提供更高的开发效率和灵活性。 5.1 Microbenchmarks ------------------- * Locality-aware Task Placement(数据本地性调度) - 背景:在分布式系统中,数据本地性影响性能。如果任务和其依赖的数据不在同一节点,数据传输延迟会增加。 - 结果: - Ray 的任务调度是数据本地性感知的,所以即使任务有随机依赖数据,它的延迟不会随着数据输入的大小而大幅增加(不会多出 1-2 个数量级的延迟)。 - 相比之下,actor 一旦部署就固定在某节点,没法灵活移动。 - 开发者可以用任务(task)对 actor 的输出做后处理,同时享受数据本地性带来的优势。 * End-to-end Scalability(整体扩展性) - 他们用空任务(几乎不消耗 CPU/GPU)测试了系统的极限吞吐量。 - 结果: - 60 个节点时,每秒可处理 100 万个任务,100 节点可达到 180 万/秒。 - 右侧极限数据点展示了:100 万个任务在不到 1 分钟完成(54 秒)。 - 说明 Ray 的调度器和架构在大规模并发负载下仍保持线性扩展。 * Object Store 性能 - 主要看两方面指标: - IOPS(小对象,操作次数/秒) - 吞吐量(大对象,GB/s) - 单客户端写入大对象时,超过 15GB/s 吞吐,小对象时 IOPS 达 18K+。 - 内部优化: - 大于 0.5MB 的对象会用 8 个线程拷贝。 - 小对象则用单线程,瓶颈更多来自序列化和进程间通信。 * GCS(Global Control Store)容错和内存管理 - GCS 采用轻量级链式复制实现一致性和容错。 - 即使节点失效,重组期间客户端延迟不会超过 30ms。 - GCS flushing: - 会定期把状态写入磁盘,防止内存膨胀。 - 如果不 flush,GCS 的内存使用会线性增长,导致系统崩溃。 * 任务和 Actor 容错 - 任务容错:当节点被移除,Ray 会自动重建丢失的任务依赖,恢复后吞吐量稳定。 - Actor 容错: - Actor 失败后可以从上一次 checkpoint 继续恢复。 - 通过 checkpoint,减少需要重算的步骤数(例子里从 1 万降到 500)。 * Allreduce 性能 - Allreduce 是分布式训练常用的通信原语。 - Ray 实现的 ring allreduce 在 16 个节点上比 OpenMPI 更快: - 100MB 数据约 200ms,1GB 数据约 1200ms,比 OpenMPI 快 1.5~2 倍。 - 原因是 Ray 多线程并行网络传输,OpenMPI 单线程瓶颈明显。 - 调度延迟非常敏感,几毫秒的额外延迟会使性能降低一半。 5.2 Building blocks ------------------- * Ray 的 API 灵活到可以实现 RL 应用里训练、推理、仿真的所有核心环节,对比一些专门系统的性能。 * 1.分布式训练 - 实现了 data-parallel SGD,用 Ray 的 actor 表示模型副本。 - 结果: - 性能与 Horovod、TensorFlow 分布式模式相当,性能差距 <10%。 - 关键优化: - 梯度计算、传输、聚合流水线化。 - 通过自定义 TF operator,直接写入 Ray object store,减少内存拷贝。 * 2.模型推理(Serving) - 对比 Ray actor 和 Clipper(专门的模型 serving 系统)。 - 环境:客户端和服务器共用一台机器(RL 常见)。 - 结果: - 小输入:Ray 每秒 6200 次预测 vs Clipper 4400。 - 大输入:Ray 6900 次/sec,Clipper 290/sec。 - Ray 更快的原因:共享内存 + 低开销序列化。 * 3.仿真 - RL 中,仿真任务计算量和时间跨度不均,需要异步处理。 - 实验对比: - MPI 实现用 BSP(Bulk Synchronous Parallel)模型,分批同步,存在 barrier。 - Ray 使用异步任务模型,边提交边收集结果。 - 结果: - Ray 的吞吐量比 MPI 高 1.8 倍,尤其在 CPU 核多、任务多时优势明显。 5.3 RL 应用 ----------- * 本章展示 Ray 框架 在强化学习 (RL) 应用中的优势,尤其是在和传统、定制化实现的对比中,Ray如何提供更高的性能、更好的可扩展性和更低的开发成本。 * 传统 RL 系统的问题 - 问题:传统 RL 系统通常是一锤子买卖(one-off solutions),都是针对具体算法定制开发,导致: - 很难灵活地加入新的优化(比如不同的计算架构、不同的并行方式)。 - 修改或扩展都需要大量工程投入。 - Ray的优势: - Ray 的 动态任务图执行引擎 + 编程模型,可以很方便地实现这些优化。 - 你只需要在应用层调整,很少改代码,甚至支持不同计算架构(比如 CPU、GPU 混合)。 - 训练、仿真、服务 这三步可以无缝整合。 5.3.1 Evolution Strategies ^^^^^^^^^^^^^^^^^^^^^^^^^^ * 背景: - ES(Evolution Strategies) 是一种强化学习方法,简单来说,就是不断生成新策略、让多个 worker 并行仿真,然后聚合结果。 - 参考实现用了 Redis 做通信、底层多进程库做数据共享。 * Ray实现的亮点: - 扩展性极强,实验能跑到 8192个核,而传统实现到 2048核 就挂了(驱动程序处理不过来)。 - Ray 通过 actor聚合树(aggregation tree)设计,大量 worker 的数据分层汇总,极大降低了瓶颈。 - 成绩:平均完成时间 3.7分钟,比之前最好的 10分钟快两倍。 - 代码对比: - 传统实现几百行代码处理通信协议、数据共享。 - Ray 只改了 7行代码,就能并行化,并轻松实现聚合优化。 5.3.2 Proximal Policy Optimization ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ * 背景: - PPO 是目前流行的 RL 算法。 - 参考实现用了 OpenMPI(专门的高效通信库)。 * Ray实现的亮点: - 异步 scatter-gather 模式:worker 完成 rollout 就上报,动态分配新任务,直到收集到32万步仿真数据。 - 更新:每次用 32768 batch size 做 20 次 SGD。 * 性能对比: - Ray 在所有测试中都超过了 MPI 实现,且 用更少 GPU。 - 原因: 1. Ray支持异构资源:可以明确告诉 Ray 哪个任务/actor 需要 GPU,哪个只用 CPU,而 MPI 通常是对称架构(所有进程一样,资源要求一样)。 2. TensorFlow 多 GPU 单进程优化能直接用,Ray 还能把数据固定在 GPU 内存。 3. MPI 版本为了兼容多 GPU 和大规模集群,搞了两套实现(单机优化 vs 多机 MPI),Ray 一套代码就能搞定。 * 成本优势: - Ray 允许 CPU-only 任务跑在便宜的 CPU 实例上,PPO 训练成本降低了 4.5 倍。 - MPI 因为架构对称,CPU-only 机器无法利用。 - Ray 支持 容错(失败自动重试),可以大胆用便宜的 spot instance(便宜4倍),整体成本降低18倍。 6 Related Work ============== Dynamic task graphs ------------------- * Ray vs CIEL 和 Dask * 共同点: - 都支持 动态任务图(也就是任务可以在运行时动态生成新的子任务)。 - 都实现了 futures 抽象(即异步计算,任务完成后返回 future,便于依赖管理)。 - CIEL 还提供 基于数据血缘的容错机制,Dask 和 Ray 都与 Python 紧密集成。 * Ray 的两个优势: 1. Actor 模型扩展: - Ray 提供了 Actor 抽象,这是支持 有状态计算(stateful computation)非常重要的,尤其适用于 分布式训练和服务场景, - 因为它能保证计算和模型数据位置的局部性(避免频繁的数据搬移)。 2. 完全分布式的调度器和控制平面: - 不依赖单个中心节点(master)去存储所有元数据。 - 这点对高效支持 如 allreduce 这类通信密集型操作 至关重要,因为没有调度瓶颈。 - 举例:在 16 台节点,每个节点 100MB 数据,Ray 的 allreduce 能在 200ms 内提交 32 轮、每轮 16 个任务。而 Dask 因为调度器集中,每轮至少要 5ms 调度延迟,整体时间可能会 慢 2 倍。 Dataflow systems ---------------- * 如 MapReduce、Spark、Dryad: - 适合批处理、大规模分析、机器学习工作负载。 - 但是: - 模型过于静态和粗粒度,不适合细粒度、动态的仿真任务。 - Spark/MapReduce 使用 BSP 模型(批处理阶段同步执行,每个任务计算相同且耗时类似),灵活性差。 - Dryad 比 BSP 更灵活,但仍不支持动态任务图。 - 都 不支持 Actor 模型,且控制平面通常是集中式,扩展性差。 - Naiad 尽管扩展性提升,但只支持 静态任务图,不适合需要根据运行时状态调整任务的场景。 Machine learning frameworks --------------------------- - 如 TensorFlow、MXNet: - 专注于深度学习,擅长处理 静态 DAG 的线性代数运算。 - 不适合需要紧密结合 训练 + 仿真 + 在线推理 的通用计算。 - TensorFlow Fold 和 MXNet 内部 C++ API 支持部分动态任务图,但: - 不支持 DAG 在执行中根据任务进度、故障动态修改。 - 编程模型接近 MPI,用户需自己处理消息传递、同步、容错,编程体验差。 - OpenMPI - 性能虽高,但编程难度高,容错全靠开发者手动实现。 Actor systems ------------- * 如 Orleans、Akka: - 擅长开发高可用、并发分布式系统。 - 但 容错性不如 Ray: - Orleans: - 有状态 Actor 需要开发者手动做 checkpoint,恢复麻烦。 - 无状态 Actor 虽可扩展,但无 lineage,任务数据血缘跟踪弱。 - Akka: - 虽支持状态持久化,但对无状态计算容错不够高效。 - 消息传递语义: - Orleans 是 at-least-once,Akka 是 at-most-once,存在重复或丢失问题。 - Ray 提供 exactly-once 语义,每次方法调用都被 GCS 记录,参数与结果不可变。 - 其他如 Erlang、C++ Actor Framework 也有类似容错上的限制。 Global control store and scheduling ----------------------------------- - 传统中心化控制平面: - 如 SDN(软件定义网络)、GFS、MapReduce 等,控制平面逻辑和存储通常耦合,扩展性受限。 - Ray 的改进: 1. 存储和调度逻辑解耦: - 控制信息存在 GCS(全局控制存储)中。 - 调度器可独立扩展,控制存储与计算层独立扩展,有助于大规模部署。 2. 改进 Omega 架构: - Omega 各调度器协调依赖全局共享状态。 - Ray 在此基础上加了 全球调度器(global scheduler),负责均衡本地调度器负载。 - Ray 追求的是 毫秒级调度延迟,而传统系统是秒级别。 3. 分布式、可水平扩展的调度器: - 能处理动态构建的任务图,解决了如 CIEL、Spark 等系统在调度上存在的瓶颈。 - Cilk: - 使用 work-stealing 策略在动态任务图负载均衡方面表现优异。 - 但没有 全局协调器,不易扩展到支持数据局部性和资源异构性的分布式环境。 总结 ---- * Ray 相比其他系统的优势是: - 同时支持 动态任务图 和 有状态 Actor 模型。 - 具备 去中心化、可扩展、解耦的控制平面和调度器,保证性能和容错性。 - 能在 高吞吐、低延迟 和 容错性 之间取得优良平衡,尤其适合训练+仿真+推理的复杂工作负载。 7 Discussion and Experiences ============================ 本章主要是Ray 团队在设计和开发 Ray 系统过程中的一些经验、挑战和思考,核心围绕 API设计、系统局限性、容错机制、GCS(Global Control Store)及可扩展性 进行展开。 * API 设计 - Minimalism(极简主义) - 最开始:API 设计得很基础,只有最核心的 task abstraction(任务抽象)。 - 后来增加: - ``wait()``:为了解决 任务执行时间长短不一(如仿真 rollouts) 的情况,允许等待部分任务完成。 - Actor abstraction(Actor模型):为了支持第三方仿真器,以及避免昂贵初始化的重复开销,比如训练一个模型时需要反复加载环境或权重。 - 设计理念: - API 是 低层的 (low-level),但同时简单且强大。 - 团队用这个 API 实现了很多主流强化学习算法,比如 A3C, PPO, DQN 等,迁移这些算法到 Ray 上通常只需 几十行代码。 - 未来方向: - 根据用户反馈,计划在 API 上增加更高层的封装,比如高层 primitive 或者库,甚至可能让调度器更智能。 * 系统局限性(Limitations) - 针对泛化工作负载,Ray 选择了灵活性,但也带来一些局限: - 调度器做决策时,无法预知完整计算图(不像 Spark 这类静态 DAG 系统),所以没法做特别激进的优化。 - 可能需要更复杂的 运行时 profiling(动态运行数据分析)来辅助调度优化。 - 因为 Ray 存储了每个 task 的 lineage(血统信息,用于容错回溯),需要额外实现 垃圾回收(GC)机制,防止元数据无限增长。 * 容错机制 - Fault Tolerance - AI 任务是否真的需要容错? - 很多人认为 AI 训练 本来就有随机性,丢几个 rollouts 可能没事。 - 但他们认为 “容错是必须的”,原因有三: 1. 易于开发:让开发者可以完全忽略故障,简化了思维模型,写代码轻松。 2. 调试方便:Ray 使用 确定性重放 (deterministic replay) 实现容错,能精准复现故障,调试 stochastic AI 算法时特别有帮助。 3. 节省成本:容错机制让他们可以放心使用 便宜的、不稳定的云资源(如 AWS Spot 实例),即便中断也不怕数据丢失。 - 容错带来一点额外开销,但他们发现 对目标工作负载影响很小。 * GCS(Global Control Store) 和水平扩展性 - GCS的作用: - 类似一个全局状态存储中心,存储控制信息、调度信息、任务血统。 - 极大简化了 Ray 的开发和调试:开发者可以实时查看整个系统状态,不用手动检查各个节点状态。 - GCS 也支持了一个 timeline 可视化工具,帮助开发者调试应用层任务。 - GCS 对扩展性的贡献: - 当 GCS 成为瓶颈时,他们可以 简单地通过增加 shards(分片)进行扩展。 - 全局调度器(Global Scheduler)也是靠 添加副本 来扩展,受益于 GCS 中心化设计。 - 总结观点: - 他们认为 “中心化控制状态 + 分布式实现” 是未来分布式系统设计的重要趋势,因为这样可以既简化系统,又保证水平扩展性。 * 整体理解 - Ray 其实是为了解决 Spark 这类系统 在 AI 和动态任务图场景下不灵活 的问题,从零开始重新设计的分布式系统。 - 核心设计亮点: - 简单强大的 API,灵活支持动态任务图。 - Actor 模型适合 stateful 任务。 - Fault tolerance 默认开启,让开发者无需手动关心。 - 中心化的 GCS,让系统状态可见,调度器和存储层能独立扩展。 - 他们在实际使用中确实发现: - API 简单,扩展容易。 - 容错极大提升开发效率。 - 中心化 GCS + 分布式实现很好地兼顾了 可扩展性 + 易维护性。 .. note:: Ray 从 Spark 走来,针对 AI 场景重新设计,核心在于:灵活的动态任务支持、强大的容错机制、简化开发体验和可扩展的全局控制存储。 8. Conclusion ============= * 目前还没有哪个通用系统能高效支持 训练 (training)、服务 (serving) 和 仿真 (simulation) 这三步紧密循环的过程。 * 为了能够表达这些核心构建模块,并满足新兴 AI 应用的需求,Ray 将任务并行编程模型和 actor 编程模型统一在一个动态任务图中,并采用了由全局控制存储(Global Control Store)和自底向上的分布式调度器(bottom-up distributed scheduler)所支持的可扩展架构。 * 这一架构所实现的编程灵活性、高吞吐量以及低延迟,对于新兴人工智能工作负载尤为重要,因为这些工作负载中的任务在资源需求、执行时长和功能上都表现出高度多样性。 * 我们的评估结果显示,Ray 具备良好的线性扩展性,最多可实现每秒 180 万个任务的吞吐量,同时具备透明的容错机制,并在多个现代强化学习任务上实现了显著的性能提升。 * 因此,Ray 在灵活性、性能和易用性之间提供了强有力的平衡,成为未来 AI 应用开发的有力工具。