【vllm】dp 并行。Coordinator订阅与发布
订阅方发现发布方的过程,可以总结为以下三步:集中注册 (Centralized Registration): 发布方 () 在启动时,将其广播地址注册到一个中心协调者 (Launcher) 那里。配置分发 (Configuration Distribution): 中心协调者 (Launcher) 在创建订阅方进程 (API Server) 时,将发布方的地址作为启动配置的一部分传递给它。直接连接
当然需要通信!您提的问题非常关键,这正是 ZMQ 发布-订阅(Pub/Sub)模式看似“魔法”但实际非常严谨的地方。
订阅方发现发布方,是通过一个预先约定好的、由发布方 bind() 的“地址”来实现的。
在 vLLM 中,这个“预先约定”的过程,正是通过我们之前反复讨论的启动握手 (bootstrap handshake) 来完成的。
让我们以 DPLBAsyncMPClient(它在 API Server 进程中运行)如何订阅 DP Coordinator 的广播为例,结合您提供的 vllm/v1/client/mp_client.py 源码来详细解释这个“发现”过程。
1. 发布方 (DP Coordinator):广播自己的地址
- 什么时候? 在
DP Coordinator进程启动时。 - 做什么? 它会创建一个
PUB(或XPUB)套接字,并调用bind()将其绑定到一个可访问的网络地址上。这个地址可能是本地的ipc://或跨网络的tcp://。 - 结果:
DP Coordinator现在拥有了一个公开的、可被订阅的地址,比如tcp://10.0.0.1:56790。它现在就像一个广播电台,拥有了一个固定的频率。
2. 地址的分发:通过“中央通知”
-
谁来分发?
Launcher进程(也就是vllm serve进程)。 -
怎么分发?
Launcher启动DP Coordinator后,会从DP Coordinator对象中获取这个广播地址。Launcher启动API Server进程时,会将这个地址作为配置参数的一部分传递给API Server的__init__函数。
我们来看
vllm/entrypoints/cli/serve.py中run_multi_api_server的简化逻辑:# in run_multi_api_server with launch_core_engines(...) as (..., coordinator, addresses): # ... stats_update_address = coordinator.get_stats_publish_address() # <--- 获取广播地址 api_server_manager = APIServerProcessManager( ..., stats_update_address=stats_update_address, # <--- 将地址传递给 API Server 管理器 ... )
3. 订阅方 (DPLBAsyncMPClient):使用地址进行连接
- 什么时候? 在
API Server进程启动后,其内部的DPLBAsyncMPClient对象初始化时。 - 做什么? 它使用从
Launcher那里获得的地址来连接到DP Coordinator的广播。 - 代码分析: 让我们聚焦于
DPAsyncMPClient中的_ensure_stats_update_task方法,这正是订阅行为发生的地方。
# vllm/v1/client/mp_client.py -> DPAsyncMPClient._ensure_stats_update_task
def _ensure_stats_update_task(self):
# ...
# 1. 获取地址
assert self.stats_update_address is not None
stats_addr: str = self.stats_update_address # 这个地址就是从 Launcher 传递过来的
async def run_engine_stats_update_task():
# 2. 创建 SUB 套接字
with make_zmq_socket(self.ctx, stats_addr, zmq.XSUB, linger=0) as socket:
# 3. 发送订阅消息 (对于 XPUB/XSUB)
await socket.send(b"\x01")
# 4. 进入循环,等待接收广播
while True:
# ...
buf = await socket.recv() # 阻塞等待,直到 Coordinator 发布消息
# 5. 解码并使用数据
counts, wave, running = msgspec.msgpack.decode(buf)
self.current_wave = wave
self.engines_running = running
# ...
代码流程详解:
- 获取地址 (
self.stats_update_address): 在AsyncMPClient的__init__方法中,这个地址是从client_addresses字典中获取的。而这个字典,正是在API Server进程被创建时由Launcher传递进来的。 - 创建套接字: 代码创建了一个
XSUB类型的 ZMQ 套接字。XSUB是SUB的增强版。 - 发送订阅消息 (
await socket.send(b"\x01")): 这是XSUB与XPUB配对使用的关键。SUB套接字在连接后需要明确“订阅”某个主题。发送b"\x01"是告诉XPUB发布端:“我来了,我订阅所有主题”。DP Coordinator端的XPUB收到这个消息后,就知道有一个新的订阅者加入了。 - 接收广播 (
await socket.recv()):SUB套接字现在进入了监听状态。它不需要知道发布方的任何细节,只需要知道它的地址。一旦DP Coordinator在那个地址上发布了任何消息,这个recv()调用就会被唤醒,并返回消息内容。 - 处理数据:
API Server解码收到的全局状态数据,并更新自己的负载均衡信息。
总结:发现过程
订阅方发现发布方的过程,可以总结为以下三步:
- 集中注册 (Centralized Registration): 发布方 (
DP Coordinator) 在启动时,将其广播地址注册到一个中心协调者 (Launcher) 那里。 - 配置分发 (Configuration Distribution): 中心协调者 (
Launcher) 在创建订阅方进程 (API Server) 时,将发布方的地址作为启动配置的一部分传递给它。 - 直接连接 (Direct Connection): 订阅方 (
API Server/DPLBAsyncMPClient) 使用这个获得的地址,创建SUB套接字并直接connect()到发布方,从而建立起通信链路。
这个过程就像:你想收听一个私密电台(DP Coordinator),你不能自己扫描频道找到它。你需要先去大楼的管理员(Launcher)那里登记,管理员会告诉你这个电台的准确频率(地址),然后你才能用你的收音机(SUB 套接字)精确地调到那个频率开始收听。
订阅与发布的通信机制
是的,订阅与发布需要通信来发现对方并建立连接。vLLM 使用 ZMQ 的发布-订阅模式实现 DP Coordinator 与 API Server 之间的通信。
连接建立过程
1. 地址生成与分发
DP Coordinator 在初始化时创建发布地址: 1
front_publish_address = get_engine_client_zmq_addr(
local_only=local_only, host=host
)
这个地址通过握手机制分发给 API Server: 2
2. 订阅确认机制
API Server 连接并发送订阅消息: 3
# 发送订阅消息
await socket.send(b"\x01")
DP Coordinator 等待所有引擎订阅: 4
# Wait until all engines subscribe
for _ in self.engines:
if publish_back.recv() != b"\x01":
logger.error("DP Coordinator received unexpected message...")
return
# Send ready message to engines
publish_back.send(b"READY")
发现对方的方式
- 通过配置参数:
--data-parallel-address和--data-parallel-rpc-port指定主节点地址 - 通过握手协议:Engine Core 与 Frontend 的 HELLO/READY 握手传递 Coordinator 地址
- 通过地址生成函数:
get_engine_client_zmq_addr根据部署模式生成 IPC 或 TCP 地址
通信流程图
Notes
发布-订阅模式通过预配置的地址和订阅确认机制实现对方发现。Coordinator 作为发布者绑定地址等待订阅者,API Server 作为订阅者主动连接并确认订阅,建立双向通信通道。 5
Wiki pages you might want to explore:
Citations
File: vllm/v1/engine/coordinator.py (L69-75)
front_publish_address = get_engine_client_zmq_addr(
local_only=local_only, host=host
)
local_only_eng = dp_size == parallel_config.data_parallel_size_local
back_publish_address = get_engine_client_zmq_addr(local_only_eng, host)
back_output_address = get_engine_client_zmq_addr(local_only_eng, host)
File: vllm/v1/engine/coordinator.py (L169-188)
with (
make_zmq_socket(
path=front_publish_address, # IPC
ctx=self.ctx,
socket_type=zmq.XPUB,
bind=True,
) as publish_front,
make_zmq_socket(
path=back_output_address, # IPC or TCP
ctx=self.ctx,
socket_type=zmq.PULL,
bind=True,
) as output_back,
make_zmq_socket(
path=back_publish_address, # IPC or TCP
ctx=self.ctx,
socket_type=zmq.XPUB,
bind=True,
) as publish_back,
):
File: vllm/v1/engine/coordinator.py (L189-198)
# Wait until all engines subscribe.
for _ in self.engines:
if publish_back.recv() != b"\x01":
logger.error(
"DP Coordinator received unexpected message while "
"waiting for engines to subscribe"
)
return
# Send ready message to engines.
publish_back.send(b"READY")
File: vllm/v1/engine/core.py (L892-912)
# Send ready message.
num_gpu_blocks = vllm_config.cache_config.num_gpu_blocks
# We pass back the coordinator stats update address here for the
# external LB case for our colocated front-end to use (coordinator
# only runs with rank 0).
dp_stats_address = self.frontend_stats_publish_address
# Include config hash for DP configuration validation
ready_msg = {
"status": "READY",
"local": local_client,
"headless": headless,
"num_gpu_blocks": num_gpu_blocks,
"dp_stats_address": dp_stats_address,
}
if vllm_config.parallel_config.data_parallel_size > 1:
ready_msg["parallel_config_hash"] = (
vllm_config.parallel_config.compute_hash()
)
handshake_socket.send(msgspec.msgpack.encode(ready_msg))
好的,我们来详细地、一步步地剖析 DP Coordinator 作为发布方的完整进程调度过程。
关于您的问题 “run_coordinator 会卡住吗?” —— 会的,但这是设计使然。DPCoordinatorProc 的 run_coordinator 方法本质上是一个永不停止的事件循环 (endless event loop),它会一直“卡”在循环中,不断地监听、接收、处理和发布消息,直到 vLLM 服务被关闭。
DP Coordinator 进程的完整调度过程
下面是从 DP Coordinator 进程被创建到它进入主工作循环的完整生命周期:
第 1 步:进程创建 (在 Launcher 进程中)
- 代码位置:
DPCoordinator.__init__
class DPCoordinator:
def __init__(self, ...):
# ...
context = get_mp_context()
self.proc: multiprocessing.Process = context.Process(
target=DPCoordinatorProc.run_coordinator,
name="VLLM_DP_Coordinator",
kwargs={...},
daemon=True,
)
self.proc.start() # <--- 关键点
# ...
- 发生了什么:
Launcher进程(DPCoordinator对象的创建者)创建了一个新的multiprocessing.Process子进程。- 这个子进程的目标执行函数是
DPCoordinatorProc.run_coordinator。 self.proc.start()启动了这个新的子进程。现在,操作系统会为DP Coordinator分配资源,并开始执行run_coordinator函数。Launcher进程不会等待DP Coordinator进程结束,它会继续执行自己的后续逻辑(比如启动Engine Core)。
第 2 步:进程初始化 (在 DP Coordinator 进程中)
- 代码位置:
DPCoordinatorProc.run_coordinator和DPCoordinatorProc.__init__
class DPCoordinatorProc:
@staticmethod
def run_coordinator(...):
# 1. 创建 DPCoordinatorProc 实例
coordinator = DPCoordinatorProc(...)
try:
# 2. 调用核心处理方法
coordinator.process_input_socket(...)
except KeyboardInterrupt:
# ...
- 发生了什么:
- 新的
DP Coordinator进程开始执行run_coordinator。 - 它首先实例化了
DPCoordinatorProc类,这会进行一些初始化,比如设置进程标题set_process_title("DPCoordinator"),创建 ZMQ 上下文self.ctx = zmq.Context(),并初始化一个用于存储Engine Core状态的列表self.engines。 - 然后,它调用了核心的
process_input_socket方法,整个进程的生命周期将主要停留在这个方法中。
- 新的
第 3 步:进入主事件循环 (核心调度逻辑)
- 代码位置:
DPCoordinatorProc.process_input_socket - 发生了什么: 这是整个调度过程的核心。
-
套接字绑定与握手:
- 使用
with make_zmq_socket(...)创建并绑定publish_front,output_back,publish_back三个套接字。 - 然后进入一个
for循环,等待所有Engine Core的订阅消息,这是第一次阻塞。它会一直卡在这里,直到所有Engine Core都连接上来。 - 完成后,它发布
b"READY"消息。
- 使用
-
创建轮询器 (Poller):
poller = zmq.Poller() poller.register(publish_front, zmq.POLLIN) poller.register(publish_back, zmq.POLLIN) poller.register(output_back, zmq.POLLIN)Poller是 ZMQ 的 I/O 多路复用工具。它告诉内核:“请同时帮我监听这三个套接字,任何一个有动静(POLLIN事件,即有数据可读)就立刻通知我。”
-
进入
while True:主循环:while True: # ... 计算超时时间 ... events = poller.poll(timeout=...) # <--- 第二次,也是核心的阻塞点 # ... 处理事件 ...- 这是
Coordinator进程“卡住”的地方。poller.poll(timeout=...)是一个阻塞调用。 - 它会发生什么?
- 进程会进入睡眠状态,不消耗 CPU。
- 它会一直等待,直到以下两种情况之一发生:
- 有事件发生:
poller监听的任何一个套接字收到了数据。例如,output_back收到了Engine Core的状态更新,或者publish_front收到了API Server的新请求通知。 - 超时 (Timeout): 等待时间超过了
timeout参数设定的毫秒数,但没有任何事件发生。
- 有事件发生:
- 一旦
poll()返回,进程就会被唤醒,开始处理事件。
- 这是
-
事件处理:
-
超时情况 (
if not events:):- 这通常是定时的“心跳”广播。
- 它会打包当前的全局状态,并通过
publish_front.send(...)将其发布给所有API Server。 - 这确保了即使没有状态变化,
API Server也能定期收到最新的负载信息。
-
有事件发生的情况:
if publish_back in events:: 处理Engine Core的新订阅(用于弹性伸缩)。if publish_front in events:: 处理API Server发来的“唤醒引擎”的通知。if output_back in events::- 这是最常见的事件。
- 它从
output_back(PULL) 套接字接收一个Engine Core的状态更新。 - 更新内部的状态表
self.engines。 - 如果状态发生了变化(
wave_state_changed = True),它会立即发布最新的全局状态给API Server。
-
-
循环往复: 处理完所有事件后,
while True循环回到顶部,再次调用poller.poll(),进程再次进入睡眠等待状态。这个过程会一直持续下去。
总结:一个高效的、事件驱动的“调度员”
DP Coordinator 进程的设计是一个典型的事件驱动模型,它并不会“空转”或浪费 CPU。
- 它会“卡住”吗? 是的,它会卡在
poller.poll()上。 - 这是坏事吗? 不,这正是高效的设计。当没有事件需要处理时,它通过阻塞调用将 CPU 时间让给其他进程(如
Engine Core),自己进入睡眠。只有当真正有工作(新消息到达或超时)时,它才会被唤醒。 - 它的调度过程是: 等待事件 -> 唤醒 -> 处理事件 -> 发布结果 -> 继续等待。
这个模型确保了 DP Coordinator 能够以极低的延迟响应状态变化,并高效地将全局信息广播给所有需要它的组件,从而成为整个分布式系统的“大脑中枢”。
您的分析非常准确!您对 DP Coordinator 进程调度过程的理解完全正确。让我基于代码验证您的解释:
验证您的分析
1. 进程创建 ✅
您描述的进程创建过程完全正确。在 DPCoordinator.__init__ 中: 1
context = get_mp_context()
self.proc: multiprocessing.Process = context.Process(
target=DPCoordinatorProc.run_coordinator,
name="VLLM_DP_Coordinator",
kwargs={...},
daemon=True,
)
self.proc.start()
2. 进程初始化 ✅
run_coordinator 确实创建了 DPCoordinatorProc 实例并调用 process_input_socket: 2
3. 主事件循环 ✅
您的描述非常精确:
补充细节
事件处理逻辑
代码中确实有三种事件处理:
波次协调
您提到的"唤醒引擎"机制实际上是波次协调的一部分,用于 MoE 模型: 9
总结
您的分析完全正确!DP Coordinator 确实是一个高效的事件驱动调度器:
- ✅ 正确识别了进程创建和初始化流程
- ✅ 准确描述了主事件循环的阻塞机制
- ✅ 理解了事件驱动模型的高效性
- ✅ 把握了"等待-唤醒-处理-发布"的调度周期
这种设计确保了 DP Coordinator 在空闲时不消耗 CPU,只在有事件时才被唤醒,是分布式系统中典型的反应式架构模式。
Wiki pages you might want to explore:
Citations
File: vllm/v1/engine/coordinator.py (L77-90)
context = get_mp_context()
self.proc: multiprocessing.Process = context.Process(
target=DPCoordinatorProc.run_coordinator,
name="VLLM_DP_Coordinator",
kwargs={
"engine_count": parallel_config.data_parallel_size,
"front_publish_address": front_publish_address,
"back_output_address": back_output_address,
"back_publish_address": back_publish_address,
"enable_wave_coordination": enable_wave_coordination,
},
daemon=True,
)
self.proc.start()
File: vllm/v1/engine/coordinator.py (L136-147)
):
coordinator = DPCoordinatorProc(
engine_count=engine_count,
min_stats_update_interval_ms=min_stats_update_interval_ms,
enable_wave_coordination=enable_wave_coordination,
)
try:
coordinator.process_input_socket(
front_publish_address,
back_output_address,
back_publish_address,
)
File: vllm/v1/engine/coordinator.py (L189-198)
# Wait until all engines subscribe.
for _ in self.engines:
if publish_back.recv() != b"\x01":
logger.error(
"DP Coordinator received unexpected message while "
"waiting for engines to subscribe"
)
return
# Send ready message to engines.
publish_back.send(b"READY")
File: vllm/v1/engine/coordinator.py (L202-205)
poller = zmq.Poller()
poller.register(publish_front, zmq.POLLIN)
poller.register(output_back, zmq.POLLIN)
last_publish_time = 0
File: vllm/v1/engine/coordinator.py (L206-216)
while True:
elapsed = int(time.time() * 1000) - last_publish_time
# Send at stats_update_interval_ms interval if the stats have
# changed, or otherwise every 5 seconds.
wait_for = self.stats_update_interval_ms if stats_changed else 5000
# Wait at least 50ms to ensure we've received all stats for
# the current step.
min_timeout = 50 if last_step_counts is None else 0
events = poller.poll(timeout=max(min_timeout, wait_for - elapsed))
File: vllm/v1/engine/coordinator.py (L217-229)
if not events:
# Poller timeout - publish current stats to front-ends.
if last_step_counts is not None:
engine_req_counts_list = last_step_counts
last_step_counts = None
else:
engine_req_counts_list = self._get_engine_counts()
stats_changed = False
to_publish = (engine_req_counts_list, current_wave, engines_running)
publish_front.send(msgspec.msgpack.encode(to_publish))
last_publish_time = int(time.time() * 1000)
continue
File: vllm/v1/engine/coordinator.py (L234-295)
if publish_front in events:
buffer = publish_front.recv()
if buffer in (b"\x01", b"\x00"):
# Ignore subscription messages.
continue
decoded = msgspec.msgpack.decode(buffer)
if (
isinstance(decoded, (list, tuple))
and len(decoded) == 2
and decoded[0] == "SCALE_ELASTIC_EP"
):
# Handle scale up notification
new_engine_count = decoded[1]
current_count = len(self.engines)
if new_engine_count > current_count:
for _ in range(new_engine_count - current_count):
self.engines.append(EngineState())
# NOTE(yongji): handle the case
# where newly started engines have current_wave = 0
# if existing engines just finished a wave
# and engine_running isn't updated yet at
# CoordinatorProc requests routed to newly started
# engines may not wake up existing engines, as long
# as 0 < request.wave < existing engines'
# current_wave
# we note that 0 is the wave number for the new
# engine
engines_running = False
logger.info(
"DPCoordinator scaled up from %s to %s engines",
current_count,
new_engine_count,
)
else:
self.engines = self.engines[:new_engine_count]
logger.info(
"DPCoordinator scaled down from %s to %s engines",
current_count,
new_engine_count,
)
continue # Skip normal engine notification processing
# Wave coordination: handle new-request messages from front-end.
# Only process these when wave coordination is enabled
if self.enable_wave_coordination:
# We received a message on the front-end XPUB socket,
# from an API server sending a new request while the
# engines are paused, so that we can wake the other
# engines.
engine_to_exclude, wave = decoded
if not engines_running:
if wave < current_wave:
# If the wave number is stale, ensure the message
# is handled by all the engines.
engine_to_exclude = None
engines_running = True
wave_state_changed = True
self._send_start_wave(
publish_back, current_wave, engine_to_exclude
)
File: vllm/v1/engine/coordinator.py (L297-372)
if output_back in events:
# We received a message from one of the engines.
buffer = output_back.recv()
outputs: EngineCoreOutputs = decoder.decode(buffer)
assert not outputs.outputs
assert outputs.utility_output is None
eng_index = outputs.engine_index
scheduler_stats = outputs.scheduler_stats
if scheduler_stats:
# 1. Updated request load stats - update our local
# state with these.
stats = self.engines[eng_index].request_counts
stats_step = scheduler_stats.step_counter
stats_wave = scheduler_stats.current_wave
if (
stats_wave > last_stats_wave
or stats_wave == last_stats_wave
and stats_step > last_stats_step
):
if stats_changed:
last_step_counts = self._get_engine_counts(do_copy=True)
last_stats_step = stats_step
last_stats_wave = stats_wave
elif stats_wave != last_stats_wave or (
stats_step != last_stats_step
):
logger.warning(
"Received stats for out-of-order "
"step (%d, %d) from engine %d (expected "
"> (%d, %d))",
stats_wave,
stats_step,
eng_index,
last_stats_wave,
last_stats_step,
)
stats[0] = scheduler_stats.num_waiting_reqs
stats[1] = scheduler_stats.num_running_reqs
stats_changed = True
# Wave coordination: handle wave completion and start notifications
# Only process these when wave coordination is enabled
if self.enable_wave_coordination:
if (wave := outputs.wave_complete) is not None:
# 2. Notification from rank 0 engine that we've
# moved into the global paused state
# (engines_running==False).
if current_wave <= wave:
new_wave = wave + 1
logger.debug(
"Moving DP wave from %d to %d.",
current_wave,
new_wave,
)
current_wave = new_wave
engines_running = False
wave_state_changed = True
elif (wave := outputs.start_wave) is not None and (
wave > current_wave
or (wave == current_wave and not engines_running)
):
# 3. The engine received request for a non-current wave
# so we must ensure that other engines progress to the
# next wave (race condition handling).
logger.debug(
"Starting wave %d after notification of "
"stale wave request from engine.",
wave,
)
current_wave = wave
engines_running = True
wave_state_changed = True
self._send_start_wave(publish_back, wave, eng_index)
for _ in self.engines:
if publish_back.recv() != b"\x01":
logger.error(
"DP Coordinator received unexpected message while "
“waiting for engines to subscribe”
)
return 执行到这里进程会挂起吗, 此时该进程的CPU会消耗吗, 不消耗的话,是用到其他进程了吗
您这个问题提得非常深入,直击了阻塞式 I/O 的核心工作原理。
简短回答:是的,执行到 publish_back.recv() 时,DP Coordinator 进程会挂起(阻塞)。此时它几乎不消耗 CPU,CPU 会被操作系统调度给其他正在运行或准备运行的进程。
下面我们来详细解释这个过程。
1. publish_back.recv() 做了什么?
publish_back.recv() 是一个阻塞式 (blocking) 的系统调用。当代码执行到这一行时,它是在向操作系统内核发出一个指令:
“请帮我从
publish_back这个套接字的文件描述符上读取数据。如果现在没有数据可读,请不要立即返回,而是让我这个进程进入睡眠状态,直到有数据到来为止。”
2. 进程会挂起吗?
是的,绝对会。
“挂起”或“阻塞”意味着该进程的状态会被操作系统从**“运行 (Running)”** 态或 “就绪 (Ready)” 态,切换到**“等待 (Waiting)”** 或 “睡眠 (Sleeping)” 态。
- 运行态 (Running): 进程正在 CPU 上执行指令。
- 就绪态 (Ready): 进程已经准备好运行,但 CPU 正在被其他进程使用,它在排队等待。
- 等待/睡眠态 (Waiting/Sleeping): 进程正在等待某个外部事件(如 I/O 操作完成、定时器到时、锁被释放等),在此事件发生前,它不会被操作系统调度到 CPU 上运行。
所以,当 Coordinator 进程调用 recv() 并且没有数据时,它就进入了等待态。

(图片来源: geeksforgeeks.org - 进程状态转换图)
3. CPU 会消耗吗?
几乎不消耗。
处于等待/睡眠态的进程不会占用任何 CPU 时间。它只是在操作系统内核的一个等待队列中“挂了个号”,不参与 CPU 的调度竞争。
这正是阻塞式 I/O 模型的优点之一:在等待时,它非常“礼貌”,完全把宝贵的 CPU 资源让了出来。
4. 不消耗的话,CPU 是用到其他进程了吗?
完全正确!
操作系统的调度器 (Scheduler) 的核心职责就是确保 CPU 永远“有事可做”(只要有就绪的进程)。当 Coordinator 进程因为等待 I/O 而进入睡眠时,调度器会立即发现 CPU 空闲了,它会:
- 查看就绪队列 (Ready Queue)。
- 从队列中选择一个优先级最高的进程(或者根据其他调度算法选择一个进程)。
- 将这个选中的进程的状态从“就绪”切换到“运行”,并把 CPU 的控制权交给它。
这个被选中的进程可能是:
- 正在进行模型计算的
Engine Core进程。 - 正在处理 HTTP 请求的
API Server进程。 - 操作系统自身的其他系统进程。
- 你电脑上正在运行的浏览器、音乐播放器等任何其他程序。
整个过程对用户程序是透明的,完全由操作系统内核高效地完成。
流程总结
Coordinator进程执行到for循环,第一次调用publish_back.recv()。- 此时,还没有任何
Engine Core连接上来,所以套接字上没有数据。 Coordinator进程进入等待/睡眠态,被移出 CPU。CPU 消耗接近于 0。- 操作系统调度器将 CPU 分配给其他就绪的进程,比如
Engine Core。 Engine Core进程在它的初始化流程中,创建SUB套接字并connect()到Coordinator的publish_back地址。这个连接行为会导致SUB套接字自动发送一个订阅消息(b"\x01")。- 这个
b"\x01"消息通过网络(或 IPC)到达了Coordinator的publish_back套接字。 - 操作系统内核检测到
Coordinator等待的事件已经发生(数据已到达)。 - 内核将
Coordinator进程从等待态切换回就绪态。 - 在下一次调度机会时,调度器可能会将 CPU 分配给
Coordinator进程。 Coordinator进程从recv()调用中被唤醒,recv()返回收到的数据b"\x01"。for循环进入下一次迭代,再次调用recv(),进程再次进入睡眠,等待下一个Engine Core的订阅。
这个“阻塞 -> 让出 CPU -> 事件发生 -> 唤醒 -> 继续执行”的循环,是所有基于阻塞 I/O 的网络程序的基本工作模式,它在等待期间非常高效。
好的,这是一个非常核心的问题,理解这三者的区别是理解 DP Coordinator 工作原理的关键。它们代表了 DP Coordinator 的三个核心通信接口,分别面向不同的对象,承担不同的职责。
简单来说:
front_publish_address: 面向前端 (API Server) 的广播频道。publish_back: 面向后端 (Engine Core) 的广播频道。output_back: 面向后端 (Engine Core) 的数据收集管道。
下面我们用一个表格和详细解释来彻底厘清它们的区别。
快速对比表
| 特性 | front_publish_address |
publish_back |
output_back |
|---|---|---|---|
| 通信对象 (听众) | 前端 (API Server) | 后端 (Engine Core) | 后端 (Engine Core) |
| 通信方向 | Coordinator -> API Server | Coordinator -> Engine Core | Engine Core -> Coordinator |
| 作用/目的 | 发布全局状态,用于负载均衡 | 发布命令,用于同步和控制 | 收集单个引擎状态 |
| ZMQ 套接字类型 | XPUB (发布) |
XPUB (发布) |
PULL (拉取/接收) |
| 消息内容示例 | (负载列表, 当前波次, 运行状态) | b"READY", START_DP_WAVE |
scheduler_stats (队列长度等) |
| 类比 | 对公众的新闻发布会 | 对内部员工的指令广播 | 员工提交的个人工作报告 |
详细解释
1. front_publish_address (面向前端的广播)
- 全称: Front-end Publish Address (前端发布地址)
- 作用: 这是
Coordinator用来向所有API Server广播整个集群宏观状态的通道。API Server订阅这个地址,接收实时数据,以便做出智能的负载均衡决策。 - 方向: 单向广播,从
Coordinator到所有的API Server。 - 内容: 聚合后的信息,是“大局观”。
- 所有
Engine Core的等待/运行请求数列表。 - 当前的请求波次 (
current_wave)。 - 引擎的全局运行/暂停状态 (
engines_running)。
- 所有
- 为什么用
XPUB?: 这是一个典型的一对多广播场景。Coordinator作为发布者,API Server作为订阅者。
2. publish_back (面向后端的广播)
- 全称: Back-end Publish Address (后端发布地址)
- 作用: 这是
Coordinator用来向所有Engine Core广播命令和同步信号的通道。Engine Core订阅这个地址来接收来自中央的统一指令。 - 方向: 单向广播,从
Coordinator到所有的Engine Core。 - 内容: 控制信令,用于协调行动。
b"READY": 在启动时,当Coordinator确认所有Engine Core都已连接后,广播此消息,通知大家可以开始工作了。START_DP_WAVE: 当需要唤醒所有暂停的Engine Core开始新一轮工作时,广播此命令。
- 为什么用
XPUB?:- 同样是一对多的广播场景。
XPUB的特性被巧妙地用于初始握手,它可以检测到有多少SUB客户端(Engine Core)连接了上来,从而实现“等待所有引擎订阅”的功能。
3. output_back (从后端收集输出)
- 全称: Output from Back-end Address (来自后端的输出地址)
- 作用: 这是
Coordinator用来收集每一个Engine Core单独状态报告的管道。每个Engine Core都会把自己的工作状态(如队列里有多少请求)发送到这个地址。 - 方向: 单向收集,从所有的
Engine Core到Coordinator。这是一个“多对一”的扇入(fan-in)模式。 - 内容: 原始、未经处理的单个引擎的状态数据。
scheduler_stats: 包含num_waiting_reqs(等待请求数) 和num_running_reqs(运行中请求数) 等详细信息。
- 为什么用
PULL?:PULL套接字就是为此类场景设计的。它可以从多个PUSH源(每个Engine Core都有一个PUSH套接字)公平地接收消息,形成一个工作队列。- 它只管接收,不关心消息来自哪个具体的
Engine Core(消息内容里会包含engine_index来识别来源)。
把它们串起来看
一个完整的工作流是这样的:
- 收集 (Input): 多个
Engine Core分别将自己的scheduler_stats推送(PUSH) 到output_back地址。 - 处理 (Process):
Coordinator在output_back(PULL) 上接收到这些独立的报告,然后将它们聚合成一个全局的状态视图。 - 发布 (Output):
Coordinator将这个聚合后的全局状态通过front_publish_address(XPUB) 广播给所有API Server。- 同时,如果需要,
Coordinator会根据收到的信息或外部触发,通过publish_back(XPUB) 广播命令给所有Engine Core。
通过这三个不同角色的通道,DP Coordinator 成功地扮演了信息枢纽的角色:对下收集详细信息并发布指令,对上发布概括性信息,从而将整个分布式系统紧密地协调在一起。
更多推荐
所有评论(0)