Langchain学习笔记(1)——如何调用Huggingface的模型并实现实时返回生成结果
如何用langchain流式调用huggingface的开源模型做生成任务
Langchain支持很方便的OpenAI模型的调用,可以做到快速开发大模型应用。但是要使用Huggingface上的开源模型就没有那么方便了,本文就详细阐述如何用Langchain开发基于Huggingface上的模型,并实时返回生成结果。
实时返回生成结果是LLM很关键的一环,大模型的响应速度很大程度上会影响用户的使用体验,较长的等待时间会导致用户流失。同时,Langchain可以很方便的调用openAI的接口,但是对于我们这种穷屌丝来说用不起GPT的接口,只能用huggingface上的开源模型。所以本文将详细介绍如何使用Langchain调用Huggingface的模型并做到实时返回生成结果。
本文选用Qwen2.5-0.5B-Instruct作为部署的模型,同时我是下载到了本地,所以代码中的路径是本地路径,如果不想下载到本地的话直接用Huggingface上的路径即可。
1. Quick start
如果使用OpenAI的模型,要实现实施返回结果(即流式调用),只需要以下几行代码就可以快速实现:
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model='gpt-4')
chunks = []
for chunk in model.stream('天空是什么颜色?'):
chunks.append(chunk)
print(chunk.content, end='|', flush=True)
但是鉴于我们是穷批,用不起GPT的API,所以我们只能借助transformers
库自己实现上述的功能。
首先加载模型及其 tokenizer:
from transformers import Qwen2Tokenizer, Qwen2ForCausalLM
tokenizer = Qwen2Tokenizer.from_pretrained(r'D:\huggingface\Qwen2.5-0.5B-Instruct')
model = Qwen2ForCausalLM.from_pretrained(r'D:\huggingface\Qwen2.5-0.5B-Instruct')
接着我们采用以下代码即可实现流式迭代返回生成结果:
import threading
from transformers import TextIteratorStreamer
from langchain_core.messages import AIMessageChunk
def hf_stream(prompt: str):
inputs = tokenizer(prompt, return_tensors='pt')
# 创建一个 “流式文本迭代器”, 每当模型生成一个新 token,就会立刻把它变成字符串,通过 streamer 吐出来(yield)
streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
generation_kwargs = dict(
**inputs,
streamer=streamer,
max_new_tokens=100,
do_sample=True,
temperature=0.95,
)
# 启动模型的文本生成过程,但用新线程异步执行,让主线程能实时处理输出
# 因为 model.generate() 是 阻塞函数(会等生成完才返回),我们要“边生成边取结果”,所以不能直接运行它,而是让它在后台线程里跑
thread = threading.Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
# 流式迭代返回的 token 段
for new_text in streamer:
yield AIMessageChunk(content=new_text)
上述代码中需要注意的有几个地方:streamer
是一个关键点,这里是将 tokenizer
放入到流式处理中,这是因为 model.generate
是个同步操作,无法执行异步调用,所以采用多线程的方式将 model.generate
放入到多线程中,这样就可以多线程执行模型的生成。同时,由于 generate 的结果是 token id
而非文字,这里 tokenzier 会将 token id decode 成为文字,并用 yield 流式输出,所以 streamer 是将 tokenizer 放入到的流中。
接着就可以直接查看模型的输出结果:
for chunk in hf_stream('请为我介绍梦幻西游的天宫门派'):
print(chunk.content, end='|')
由于用了end='|'
,所以在每个输出字符后都会看到"|",以下是运行结果:
2. 采用链式调用的方式执行代码
Langchain 一个核心功能就是链式调用,即我们可以用诸如:
chain = prompt | llm | parser
的方式,同时执行提示模板、大模型生成、结构化输出的功能。当然,由于穷批用不起API,所以我们这里依旧自己打造 Langchain 调用 huggingface 的链式调用。
首先我们定义如下的类:
from langchain_core.runnables import Runnable
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
class HuggingFaceStreamWrapper(Runnable):
def __init__(self, model, tokenizer, max_new_tokens=128):
self.model = model
self.tokenizer = tokenizer
self.max_new_tokens = max_new_tokens
def invoke(self, input_text, config=None) -> AIMessage:
# Runnable 类要求强制实现抽象方法, 否则会报错 TypeError: Can't instantiate abstract class with abstract method invoke
human_texts = [msg.content for msg in input_text.messages if isinstance(msg, HumanMessage)]
prompt = "\n".join(human_texts)
inputs = self.tokenizer(prompt, return_tensors='pt').to(self.model.device)
outputs = self.model.generate(
**inputs,
max_new_tokens=self.max_new_tokens,
do_sample=True,
top_p=0.95,
temperature=0.8,
)
decoded = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
return AIMessage(content=decoded)
async def astream(self, input_text, config=None):
human_texts = [msg.content for msg in input_text.messages if isinstance(msg, HumanMessage)]
prompt = "\n".join(human_texts)
# tokenizer encode
inputs = self.tokenizer(prompt, return_tensors='pt').to(self.model.device)
# streamer
streamer = TextIteratorStreamer(self.tokenizer, skip_prompt=True, skip_special_tokens=True)
# 生成参数
generation_kwargs = dict(
**inputs,
streamer=streamer,
max_new_tokens=self.max_new_tokens,
do_sample=True,
top_p=0.95,
temperature=0.8,
)
# 用线程调用生成(阻塞生成转异步)
thread = threading.Thread(target=self.model.generate, kwargs=generation_kwargs)
thread.start()
for token in streamer:
# 手动触发事件循环调度,允许其他任务执行。
# 它的作用不是延时,而是调度让步
await asyncio.sleep(0) # 允许事件循环切换
# yield 是 Python 中的一个“进阶但极其实用”的关键字,它的作用是让函数变成一个生成器(generator),
# 实现“边计算边返回”的效果,非常适合处理大数据、流式生成、异步 LLM 响应等场景。
# yield 会暂停函数执行,返回一个值,但不会结束函数;
yield AIMessageChunk(content=token)
这个是继承了 langchain Runable 类,用于我们自定义开发 langchain 的链式调用。在这里有几个要注意的点:Runable 中有个方法,名为 invoke
,这个函数是必须要实现的函数,就算是
def invoke(self):
pass
都没问题,但是如果没有该函数,那么会报错:
TypeError: Can't instantiate abstract class with abstract method invoke
说明 invoke
这个函数是必须实现的。
而调用 invoke 函数也能够返回结果,不过 invoke
是同步执行的,模型会一次性返回所有的结果,而非一个字一个字的蹦出来,比如执行下面代码:
hf_model = HuggingFaceStreamWrapper(model, tokenizer)
# 构建链
prompt = ChatPromptTemplate.from_template('请给我介绍梦幻西游中{topic}门派')
parser = StrOutputParser()
chain = prompt | hf_model | parser
# 如果调用 invoke 方法, 必须实现 invoke 函数, 否则可以 pass
outputs = chain.invoke({'topic': '九黎城'})
print(outputs)
会获得:
就是这样一次性输出出来(前面我删了三百多帧),这种用户体验感就会很差。
在上述代码中,我们可以看到,采用 ChatPromptTemplate.from_template('请给我介绍梦幻西游中{topic}门派')
可以生成一个提示语,之后我们在调用的时候,传入 topic
参数,就能够直接将 九黎城
传入进去,这样在做实际开发的时候,就能够给定一个提示模板,由用户自行填充内容。
那么说完了 invoke
调用,链式调用流式输出其实就很简单了,只需要用如下代码即可实现:
hf_model = HuggingFaceStreamWrapper(model, tokenizer)
# 构建链
prompt = ChatPromptTemplate.from_template('请给我介绍梦幻西游中{topic}门派')
parser = StrOutputParser()
chain = prompt | hf_model | parser
async def async_stream():
# 这里同样, 如果要调用 astream 方法, 必须实现 astream 函数, 否则可以 pass
async for chunk in chain.astream({'topic': '九黎城'}):
print(chunk, end='|', flush=True)
asyncio.run(async_stream())
以下是结果:
更多推荐
所有评论(0)