<span class=“js_title_inner“>漫谈 AI Infra|消息队列在 AI 时代能做什么?(下)</span>
每个分区内的顺序是有保障的,消息在消费时不会被移除,这就是kafka专注的stream(流)场景,它提供了高吞吐量和分区的严格排序,这使得它非常适合处理有序的事件流。:是 Pulsar 的一种 Topic 类型,是“不落盘、纯内存” 的消息通道——数据不会写入磁盘、不会做副本复制,Broker 宕机或进程重启即丢失,因此极致轻量、低延迟,适合“可丢、可重试、要快、要大量”的短时消息场景。然后,在越


上篇中,我们梳理了 Pulsar 特性在多模态及模型训练中的应用;下篇将聚焦智能体场景。一起来揭秘 Pulsar 如何为智能体场景赋能!
场景五【智能体】
利用 Pulsar 轻量化主题(non-persistent)
解决 AI 应用的异步通信难题
模型迭代日新月异,企业正在积极把 AI 能力嵌入业务流程。然而,企业应用从调用传统微服务应用 API 接口到 调用大模型“生成式”的 API 接口过程中,一个显著的特征是任务处理时耗变得很长,传统微服务应用通常能实现毫秒级响应,而 AI 应用的处理周期跨度极大——从几分钟到数小时不等;
这就意味着原本微服务间的同步调用就不再适用,可将同步调用改为异步通知来解决长耗时的阻塞;改为异步通知后,那又如何能实现同步调用的即时通信呢?可以采取以下模型:
1
Agent1 在启动时注册一个专属于自己的用于接收回包的非持久化 Topic(non-persistent Topic),非持久化 Topic 非常轻量化,数据不落盘存储,生命周期可由 TTL 自动 或人工回收,Agent1 可使用独占消费模型进行消费该 Topic
2
当 Agent1 有长耗时的调用模型请求时,向正常 Topic 发送请求,并由模型处理模块处理;该 Topic 为常规 Topic,具备消息持久化、消息回放、海量积压等队列特性
3
当 LLM 处理模块完成后,根据请求包中的回包地址进行回包投递

基于此模型,可以利用 Pulsar 的 Persistent-topic,将长时耗任务进行异步化处理,利用 Pulsar 的高可用、低延时的特性来保障请求任务的可靠、解耦和削峰填谷;又可以利用 Pulsar 的 Non-Persistent-topic 的轻量化,实现百万级创建,快速回收等能力

Non-persistent Topic:是 Pulsar 的一种 Topic 类型,是“不落盘、纯内存” 的消息通道——数据不会写入磁盘、不会做副本复制,Broker 宕机或进程重启即丢失,因此极致轻量、低延迟,适合“可丢、可重试、要快、要大量”的短时消息场景。
场景六【智能体】
Pulsar 可为事件驱动的智能体提供“新基建”
AI Agent 的概念正在经历一场深刻的变革,从简单的对话式 AI(Chatbot)向复杂的独立实体转变。AI Agent 就是将一个大模型(大脑)和一系列工具(感官与四肢)组装起来,形成的一个能够感知和改变外部环境的智能程序。
以创建一个营销 Agent 为例,采用 ReAct 的模型,Agent 可能首先从 CRM 中提取客户数据,使用 API 收集市场趋势,并在新信息出现时不断调整策略。通过通过记忆保留上下文并迭代查询,Agent 能够生成更准确、更相关的输出。

当外部接口越来越丰富,Agent 需要不断地扩展收集信息来源,包括其他 Agent、工具和外部系统等等,以便做出更精准的决策。

而这,从系统架构设计的角度上讲,就是一个分布式系统问题。这和微服务时代面临的挑战相似,因为在微服务中,各个组件必须高效地进行通信,而不产生瓶颈或僵化的依赖关系。也和微服务架构系统一样,它们的输出不仅仅应该回流到 AI 应用程序中,它们还应该流入其他关键系统,如数据仓库、CRM、CDP 和客户成功平台。所以完全可以将 Agent 理解为:有“大脑”的微服务;
从微服务的架构演进来看,Agent 的未来是事件驱动的,事件驱动的架构需要一个高效的消息中间件作为“基建”,因为消息中间的特性可以很好的匹配事件驱动需要的横向扩展性、低延迟、松耦合、事件持久化等诉求

Pulsar除了以上在消息中间件的优势外,还提供了 Function Mesh 的能力,利用 Function 的能力可以更近一步简化 AI Agent 的架构:

ReAct模式:ReAct(Reasoning and Action)是目前应用最广泛、最经典的 AI Agent 运行模式之一 。其核心思想是模拟人类解决复杂问题的过程,通过一个 “思考(Thought)→ 行动(Action)→ 观察(Observation)”的循环来逐步推进任务 。
Pulsar Function :Pulsar 提供的轻量级、Serverless 流处理框架,定位是“用写普通函数的代码量,完成 ETL、过滤、聚合、打标签等实时计算”。它把“消息 → 计算 → 消息”的闭环直接跑在 Pulsar 集群内部,简单场景不需要额外部署 Flink、Storm 等重型流处理引擎。

场景七【智能体】
具身智能需要“传感器流 + 任务队列”
在具身智能的场景中,既需要处理传感器读数流(连续、有序的数据),也需要处理独立的指令或任务(这些任务需要独立处理);
例如:一个机器人 Agent 在处理任务时,首先机器人的视觉或遥测传感器持续发布事件流,这些事件需要按顺序来处理或者来理解当前所处环境的变化;然后当机器人的 AI 决定采取行动,例如“拾取物体”或“导航到位置”时,这些任务会被添加到工作队列中。这些行动消息可能需要多个执行器模块(消费者)会分担这些任务。每个任务消息会被分配给一个执行器,执行器在完成任务后会进行确认。如果任务失败,执行器可以发送负向确认(表示失败),然后另一个实例可以重试。
我们回顾上述的过程,虽然都是利用消息管道进行消息传递,但是这是两种不同的数据类型:
-
类似传感器流,生产者将数据追加到一个无界、有序的日志(即流)中。消费者随后按顺序从这个日志中读取数据,并维护流中的偏移量(offset)。每个分区内的顺序是有保障的,消息在消费时不会被移除,这就是kafka专注的stream(流)场景,它提供了高吞吐量和分区的严格排序,这使得它非常适合处理有序的事件流。
-
类似任务消息,生产者将消息发送到队列,每条消息只由一个消费者处理(即使有多个消费者在监听)。消费者从队列中拉取消息,并在处理完成后确认每条消息,消息随后会从队列中移除。队列擅长分发可以并行处理且无需全局排序要求的任务或工作。这就是RabbitMQ、RocketMQ专注的Queues(消息)场景,专注于每个消费者只处理一条消息,并具备消息重试和死信队列等能力。
然后,在越来越多的 AI 场景,需要两者兼具,因为 AI 代理在实时环境中观测,同时必须执行可靠的操作。这正是 Pulsar 持续专注的方向,将流 + 消息进行融合,Pulsar 原生支持多种消息语义。其灵活的订阅模式(独占、共享、故障转移、键共享)让你能在同一平台上为不同任务选择合适的工具。这意味着系统扩展更少,组件间集成更简单——这对复杂的 AI 代理架构来说是一个很大的优势。
Pulsar 的流(Streaming)和消息(Messaging)场景结合,通过 Key-shard(键共享)、Failover(故障转移)、Exclusive(独占)、Shared(共享)四种订阅模式来实现

参考:
-
https://huggingface.co/spaces/nanotron/ultrascale-playbook
-
https://seanfalconer.medium.com/the-future-of-ai-agents-is-event-driven-9e25124060d6
-
https://www.linkedin.com/pulse/kafkas-role-powering-next-wave-event-driven-agentic-ai-jeyaraman-xq0kc
-
https://mp.weixin.qq.com/s/4pIAZqH01Ib_OGGGD9OWQg
-
https://streamnative.io/blog/streams-vs-queues-why-your-agents-need-both--and-why-pulsar-protocol-delivers
-
https://dzone.com/articles/agentic-ai-using-apache-kafka-as-event-broker-with-agent2agent-protocol
-
https://mp.weixin.qq.com/s?__biz=MjM5MDE0Mjc4MA==&mid=2651248787&idx=2&sn=b2bc09cebce5296ba7d7b5cab1b4c76a&poc_token=HMD7OmmjPWSU8S4Wv17TfFVZvOZepoGlcSeCHT0I

Apache Pulsar 作为一个高性能、分布式的发布-订阅消息系统,正在全球范围内获得越来越多的关注和应用。如果你对分布式系统、消息队列或流处理感兴趣,欢迎加入我们!
Github:
https://github.com/apache/pulsar

扫码加入 Pulsar 社区交流群
最佳实践
互联网
腾讯BiFang | 腾讯云 | 微信 | 腾讯 | BIGO | 360 | 滴滴 | 腾讯互娱 | 腾讯游戏 | vivo | 科大讯飞 | 新浪微博 | 金山云 | STICORP | 雅虎日本 | Nutanix Beam | 智联招聘 | 达达 | 小红书
金融/计费
腾讯计费 | 平安证券 | 拉卡拉 | Qraft | 甜橙金融
电商
Flipkart | 谊品生鲜 | Narvar | Iterable
机器学习
物联网/芯片制造
应用材料|云兴科技智慧城市 | 科拓停车 | 华为云 | 清华大学能源互联网创新研究院 | 涂鸦智能
通信
教育
推荐阅读
免费可视化集群管控 | 资料合集 | 实现原理 | BookKeeper储存架构解析 | Pulsar运维 | MQ设计精要 | Pulsar vs Kafka | 从RabbitMQ 到 Pulsar | 内存使用原理 | 从Kafka到Pulsar | 跨地域复制 | Spring + Pulsar | Doris + Pulsar | SpringBoot + Pulsar

更多推荐
所有评论(0)