前言

上篇文章我们大致演示了一下ai_agent的食用方法。这里我们做一下核心模块runtime的设计和实现。

一个agent也好,workflow也好,他们单个实现起来并不复杂,困难的是如何将他们有机的组合起来,能够按照一定的逻辑流转起来。并且能够层层嵌套,能力无限。

理解起来比较抽象,我们先看几个重点方向,和现实中的例子:

1. 简单的workflow场景

我们可以使用一个简单的workflow来处理一些场景,比如奶茶店的智能推荐,那么它的一个流程大致如下:

graph LR
User --用户画像+query+上下文+其他--> pormpt
pormpt --> llm 
llm --> answer
answer --> 奶茶卡片

  • 这个流程就能看到,我们将llm,memory,prompt都当做一个节点,按照图中的顺序执行最终会得到一个推荐结果。
  • 可以说这是一个workflow的基础功能。

2. smartflow场景

对于有些问题,我们是不能够将完全预测出执行流程,比如Least-to-Mostsmartflow的场景,这些节点往往是在执行过程中慢慢生成出来的。

我们还是举个例子, 假设有这样一个agent:评阅大师,它的目标是评阅优化输入文案。那么它的工作流程可能是这样的。

graph TD
user -->|文案| pf[评分llm]
pf -->|评分>80| 输出
pf -->|评分<80 \n 待优化方向| yhp[追加上下文]
yhp --> yh[优化llm]
yh --> pf

  • 工作方式一目了然,可以理解为一个评分llm,一个优化llm。对于一个文案,评分llm不断给出需要优化的点,并追加到上下文中,优化llm则根据要求不断优化,直到评分>80。
  • 我们的流程设计肯定不能是DAG,因为出现环了。

3. multi-agent场景

multi-agent常用来解决复杂问题,对流程设计的要求也更高。比较典型的场景是狼人杀。它的流程可能长这样。

graph TD
xdyt[入夜] --> zcr
zcr[主持人] -->|第一轮| lrfy[狼人开刀]
lrfy --> zcr
zcr[主持人agent] -->|第二轮| nwfy[女巫药]
nwfy --> zcr
zcr[主持人agent] -->|第三轮| yyjfy[预言家查人]
yyjfy --> zcr
zcr[主持人agent] -->|第四轮| syrfy[所有人发言]
syrfy --> gp[归票]
gp --> zcr
zcr -->|某个阵容胜利| 结束

  • 上图画的游戏规则并不严谨,但大体能感受到一个multi-agent的工作方式。这里面的每个角色都可以理解为一个agent,主持人可以是一个固定的脚本。
  • 这个场景要求每个agent都有自己的prompt,有独立的memory等,将一个agent放大,那么这个agent也应该是一个worlflow,由无数的节点拼接而来。也就要求我们流程设计能够层层嵌套,能力无限。

4. NL2code场景

保持开放是一个非常重要的能力,一来是能够让程序员直接写脚本。二是大模型有时候也会自己写脚本,也就是NL2Code场景。

我之前参与过一个专项,其中的一个能力是用户自由操作文档,比如文档归类,概要摘取等,结果通过text2sql存到数据库中。

这种场景下,仅仅提供有限功能的节点是不足够的。必须具备无限扩展的能力。

5. 开放域

在开放域中,agent将具备更自由的意志。举几个典型的场景:聊天室 游戏NPC 虚拟宠物

在这种应用中,agent更应该是一个完备的ai,具备更长的生命周期,更强的主观能动性。

agent从运行到结束,要像人一样,生下来就有意识,直到死亡。

简单的做法是先构建一个single agent,并让它至少有个memory+tool+设定这几个模块,最好是采用多模态大模型做基座模型。然后不断循环调用这个agent,从而达到和我一样的牛马状态。当然成本肯定极高。

另一种方法是做双循环,还是这个agent,一个循环是外界有输入后再调用agent,另一个循环是定时唤起agent的主观意识。从而节能减排。

不管是哪种方式,我更倾向于从agent外部解决。而非agent本身。

总结

现实中应用肯定不局限于这几种情况,但通过一定的流程编辑基本都可以解决,只是复杂性会比较高。

runtime 设计

一个agent的runtime大概可以看成是由这几部分构成:调度+节点服务+执行计划+上下文。

节点 service

代码仓库

一个service可以理解成一个原子能力,比如llm,比如tool,也可能是function。

我们这里做一个抽象:

pub trait Service: Send + Sync {
    async fn call(&self, flow: Flow) -> anyhow::Result<Output>;
}

执行计划

这个也很好理解,我们需要按照这个计划不断执行service。

pub trait Plan: Send + Sync {
    fn next(&self, ctx: Arc<Context>, node_id: &str) -> NextNodeResult;
    fn set(&self, nodes: Vec<PlanNode>);
    fn update(
        &self,
        node_code: &str,
        update: Box<dyn FnOnce(Option<&mut PlanNode>) -> anyhow::Result<()>>,
    ) -> anyhow::Result<()>;
}

上下文Context

这个所谓的上下文Context可以理解为 执行一个任务的具体消息,比如任务编码,堆栈信息,状态等。

  • 这里有一个子上下文的概念,也就是说在一个service里面可以执行一个子计划,也就是能力无限的实现。并且父子上下文会共享堆栈。todo:也会共享状态。
  • 这里没有做共享设计,每个Context会记录自己归属的运行时。也就是每个运行时都是独立的,为了多租户设计的。
pub struct Context {
    pub parent_code: Option<String>,
    //任务流名称
    pub code: String,
    //状态
    pub status: AtomicU8, //0:init 1:running, 2:success, 3:error
    //堆栈信息
    pub stack: Arc<Mutex<ContextStack>>,
    //执行计划
    pub plan: Arc<dyn Plan>,
    //全局扩展字段
    pub extend: Mutex<HashMap<String, Box<dyn Any + Send + Sync + 'static>>>,
    //结束时回调
    pub over_callback: Option<Mutex<Vec<Box<dyn FnOnce(Arc<Context>) + Send + Sync + 'static>>>>,
    pub(crate) runtime: Arc<Runtime>,
}

调度

调度也很简单,就是按照plan不断执行service

  • 这里做了中间层设计,也就是每个service都是一个‘洋葱’,一层层进,一层层出。
  • 每个service都是异步执行的,就是说如果你的plan存在并行结构,那么两个并行的分支会一起执行。
fn exec_next_node(ctx: Arc<Context>, node_code: &str) {
    let nodes = ctx.plan.next(ctx.clone(), node_code);
    ...
    for i in nodes {
        match ctx.runtime.nodes.get(i.node_type_id.as_str()) {
        ...
        };
        //将service和middle封装成一个flow执行
        let flow = Flow::new(i, ctx.clone(), middle);
        tokio::spawn(async move {
            let ctx = flow.ctx.clone();
            let result = tokio::spawn(async move {
            ... //处理一些堆栈信息
                //执行flow
                if let Err(e) = flow.call().await {
                    //错误处理
                } else {
                    //执行���功则继续执行下一个
                    Runtime::exec_next_node(ctx, code.as_str());
                }
            })
            .await;
            ... //检查错误,和故障恢复
        });
    }
}

Service layer

为了方便service的实现,我们将service再包装一层。如下:

pub  trait  ServiceLayer: Sync + Send {
    //service的配置类型
    type Config;
    //输出结果
    type Output;
    //执行过程
    async  fn call(
        &self,
        code: String,
        ctx: Arc<Context>,
        cfg: Self::Config,
    ) -> anyhow::Result<Self::Output>;
}

再为这个包装层加一个自动json解析的实现。这样入参和出参可以直接绑定到struct。

impl<T, In, Out> Service for LayerJson<T, In, Out>
where
T: ServiceLayer<Config = In, Output = Out>,
    In: for<'a> Deserialize<'a> + Send + Sync,
    Out: Serialize + Send + Sync,
{
    async  fn call(&self, flow: Flow) -> anyhow::Result<Output> {
        ... //类型解析和参数的初步处理
        //解析入参
        let cfg = match serde_json::from_str::<In>(node_config.as_str()) {
            ... //错误处理
        };
        //调用执行过程
        let output = self.handle.call(code, ctx, cfg).await ? ;
        //解析出参
        let raw = match serde_json::to_value(&output) {
            Ok(o) => o,
            Err(e) => return anyhow::anyhow!("code[{}],output json error:{}", node_info, e).err(),
        };
        Output::new(raw).raw_to_ctx().ok()
    }
}

测试

测试用例

尾语

整体runtime模块还是比较简单,比较薄的

如何学习大模型

现在社会上大模型越来越普及了,已经有很多人都想往这里面扎,但是却找不到适合的方法去学习。

作为一名资深码农,初入大模型时也吃了很多亏,踩了无数坑。现在我想把我的经验和知识分享给你们,帮助你们学习AI大模型,能够解决你们学习中的困难。

我已将重要的AI大模型资料包括市面上AI大模型各大白皮书、AGI大模型系统学习路线、AI大模型视频教程、实战学习,等录播视频免费分享出来,需要的小伙伴可以扫取。

一、AGI大模型系统学习路线

很多人学习大模型的时候没有方向,东学一点西学一点,像只无头苍蝇乱撞,我下面分享的这个学习路线希望能够帮助到你们学习AI大模型。

在这里插入图片描述

二、AI大模型视频教程

在这里插入图片描述

三、AI大模型各大学习书籍

在这里插入图片描述

四、AI大模型各大场景实战案例

在这里插入图片描述

五、结束语

学习AI大模型是当前科技发展的趋势,它不仅能够为我们提供更多的机会和挑战,还能够让我们更好地理解和应用人工智能技术。通过学习AI大模型,我们可以深入了解深度学习、神经网络等核心概念,并将其应用于自然语言处理、计算机视觉、语音识别等领域。同时,掌握AI大模型还能够为我们的职业发展增添竞争力,成为未来技术领域的领导者。

再者,学习AI大模型也能为我们自己创造更多的价值,提供更多的岗位以及副业创收,让自己的生活更上一层楼。

因此,学习AI大模型是一项有前景且值得投入的时间和精力的重要选择。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐