Langchain支持很方便的OpenAI模型的调用,可以做到快速开发大模型应用。但是要使用Huggingface上的开源模型就没有那么方便了,本文就详细阐述如何用Langchain开发基于Huggingface上的模型,并实时返回生成结果。

实时返回生成结果是LLM很关键的一环,大模型的响应速度很大程度上会影响用户的使用体验,较长的等待时间会导致用户流失。同时,Langchain可以很方便的调用openAI的接口,但是对于我们这种穷屌丝来说用不起GPT的接口,只能用huggingface上的开源模型。所以本文将详细介绍如何使用Langchain调用Huggingface的模型并做到实时返回生成结果。

本文选用Qwen2.5-0.5B-Instruct作为部署的模型,同时我是下载到了本地,所以代码中的路径是本地路径,如果不想下载到本地的话直接用Huggingface上的路径即可。

1. Quick start

如果使用OpenAI的模型,要实现实施返回结果(即流式调用),只需要以下几行代码就可以快速实现:

from langchain_openai import ChatOpenAI

model = ChatOpenAI(model='gpt-4')
chunks = []
for chunk in model.stream('天空是什么颜色?'):
    chunks.append(chunk)
    print(chunk.content, end='|', flush=True)

但是鉴于我们是穷批,用不起GPT的API,所以我们只能借助transformers库自己实现上述的功能。

首先加载模型及其 tokenizer:

from transformers import Qwen2Tokenizer, Qwen2ForCausalLM

tokenizer = Qwen2Tokenizer.from_pretrained(r'D:\huggingface\Qwen2.5-0.5B-Instruct')
model = Qwen2ForCausalLM.from_pretrained(r'D:\huggingface\Qwen2.5-0.5B-Instruct')

接着我们采用以下代码即可实现流式迭代返回生成结果:

import threading
from transformers import TextIteratorStreamer
from langchain_core.messages import AIMessageChunk


def hf_stream(prompt: str):
    inputs = tokenizer(prompt, return_tensors='pt')

    # 创建一个 “流式文本迭代器”, 每当模型生成一个新 token,就会立刻把它变成字符串,通过 streamer 吐出来(yield)
    streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)

    generation_kwargs = dict(
        **inputs,
        streamer=streamer,
        max_new_tokens=100,
        do_sample=True,
        temperature=0.95,
    )

    # 启动模型的文本生成过程,但用新线程异步执行,让主线程能实时处理输出
    # 因为 model.generate() 是 阻塞函数(会等生成完才返回),我们要“边生成边取结果”,所以不能直接运行它,而是让它在后台线程里跑
    thread = threading.Thread(target=model.generate, kwargs=generation_kwargs)
    thread.start()

    # 流式迭代返回的 token 段
    for new_text in streamer:
        yield AIMessageChunk(content=new_text)

上述代码中需要注意的有几个地方:streamer 是一个关键点,这里是将 tokenizer 放入到流式处理中,这是因为 model.generate 是个同步操作,无法执行异步调用,所以采用多线程的方式将 model.generate 放入到多线程中,这样就可以多线程执行模型的生成。同时,由于 generate 的结果是 token id 而非文字,这里 tokenzier 会将 token id decode 成为文字,并用 yield 流式输出,所以 streamer 是将 tokenizer 放入到的流中。

接着就可以直接查看模型的输出结果:

for chunk in hf_stream('请为我介绍梦幻西游的天宫门派'):
    print(chunk.content, end='|')

由于用了end='|',所以在每个输出字符后都会看到"|",以下是运行结果:
生成1

2. 采用链式调用的方式执行代码

Langchain 一个核心功能就是链式调用,即我们可以用诸如:

chain = prompt | llm | parser

的方式,同时执行提示模板、大模型生成、结构化输出的功能。当然,由于穷批用不起API,所以我们这里依旧自己打造 Langchain 调用 huggingface 的链式调用。

首先我们定义如下的类:

from langchain_core.runnables import Runnable
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser


class HuggingFaceStreamWrapper(Runnable):
    def __init__(self, model, tokenizer, max_new_tokens=128):
        self.model = model
        self.tokenizer = tokenizer
        self.max_new_tokens = max_new_tokens

    def invoke(self, input_text, config=None) -> AIMessage:
        # Runnable 类要求强制实现抽象方法, 否则会报错 TypeError: Can't instantiate abstract class with abstract method invoke
        human_texts = [msg.content for msg in input_text.messages if isinstance(msg, HumanMessage)]
        prompt = "\n".join(human_texts)

        inputs = self.tokenizer(prompt, return_tensors='pt').to(self.model.device)
        outputs = self.model.generate(
            **inputs,
            max_new_tokens=self.max_new_tokens,
            do_sample=True,
            top_p=0.95,
            temperature=0.8,
        )
        decoded = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        return AIMessage(content=decoded)

    async def astream(self, input_text, config=None):
        human_texts = [msg.content for msg in input_text.messages if isinstance(msg, HumanMessage)]
        prompt = "\n".join(human_texts)

        # tokenizer encode
        inputs = self.tokenizer(prompt, return_tensors='pt').to(self.model.device)

        # streamer
        streamer = TextIteratorStreamer(self.tokenizer, skip_prompt=True, skip_special_tokens=True)

        # 生成参数
        generation_kwargs = dict(
            **inputs,
            streamer=streamer,
            max_new_tokens=self.max_new_tokens,
            do_sample=True,
            top_p=0.95,
            temperature=0.8,
        )

        # 用线程调用生成(阻塞生成转异步)
        thread = threading.Thread(target=self.model.generate, kwargs=generation_kwargs)
        thread.start()

        for token in streamer:
            # 手动触发事件循环调度,允许其他任务执行。
            # 它的作用不是延时,而是调度让步
            await asyncio.sleep(0)  # 允许事件循环切换
            # yield 是 Python 中的一个“进阶但极其实用”的关键字,它的作用是让函数变成一个生成器(generator),
            # 实现“边计算边返回”的效果,非常适合处理大数据、流式生成、异步 LLM 响应等场景。
            # yield 会暂停函数执行,返回一个值,但不会结束函数;
            yield AIMessageChunk(content=token)

这个是继承了 langchain Runable 类,用于我们自定义开发 langchain 的链式调用。在这里有几个要注意的点:Runable 中有个方法,名为 invoke,这个函数是必须要实现的函数,就算是

def invoke(self):
	pass

都没问题,但是如果没有该函数,那么会报错:

TypeError: Can't instantiate abstract class with abstract method invoke

说明 invoke 这个函数是必须实现的。

而调用 invoke 函数也能够返回结果,不过 invoke 是同步执行的,模型会一次性返回所有的结果,而非一个字一个字的蹦出来,比如执行下面代码:

hf_model = HuggingFaceStreamWrapper(model, tokenizer)

# 构建链
prompt = ChatPromptTemplate.from_template('请给我介绍梦幻西游中{topic}门派')
parser = StrOutputParser()
chain = prompt | hf_model | parser

# 如果调用 invoke 方法, 必须实现 invoke 函数, 否则可以 pass
outputs = chain.invoke({'topic': '九黎城'})

print(outputs)

会获得:
invoke
就是这样一次性输出出来(前面我删了三百多帧),这种用户体验感就会很差。

在上述代码中,我们可以看到,采用 ChatPromptTemplate.from_template('请给我介绍梦幻西游中{topic}门派') 可以生成一个提示语,之后我们在调用的时候,传入 topic 参数,就能够直接将 九黎城 传入进去,这样在做实际开发的时候,就能够给定一个提示模板,由用户自行填充内容。

那么说完了 invoke 调用,链式调用流式输出其实就很简单了,只需要用如下代码即可实现:

hf_model = HuggingFaceStreamWrapper(model, tokenizer)

# 构建链
prompt = ChatPromptTemplate.from_template('请给我介绍梦幻西游中{topic}门派')
parser = StrOutputParser()
chain = prompt | hf_model | parser


async def async_stream():
    # 这里同样, 如果要调用 astream 方法, 必须实现 astream 函数, 否则可以 pass
    async for chunk in chain.astream({'topic': '九黎城'}):
        print(chunk, end='|', flush=True)

asyncio.run(async_stream())

以下是结果:
生成2

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐