跳转到主要内容
本教程将带您了解如何使用 xParse 构建完整的 RAG(检索增强生成)应用。我们将通过三个实际场景,展示从文档处理到向量检索的完整流程。

什么是 RAG?

RAG(Retrieval-Augmented Generation,检索增强生成)是一种结合信息检索和生成式 AI 的技术。通过 RAG,大模型可以基于企业知识库进行回答,而不是仅依赖训练数据,从而提供更准确、更相关的答案。 RAG 的核心流程包括:
  1. 文档处理:将非结构化文档转换为向量表示
  2. 向量存储:将向量数据存储到向量数据库
  3. 检索查询:根据用户问题检索相关文档片段
  4. 生成回答:将检索到的内容作为上下文,让大模型生成答案
xParse 专注于解决第一步和第二步,为您提供端到端的文档处理能力。

环境准备

首先安装必要的依赖:
python -m venv .venv && source .venv/bin/activate
pip install xparse-client langchain langchain-community langchain-core \
            langchain-milvus python-dotenv
创建 .env 文件存储配置:
# .env
XTI_APP_ID=your-app-id
XTI_SECRET_CODE=your-secret-code
DASHSCOPE_API_KEY=your-dashscope-key
提示:XTI_APP_IDXTI_SECRET_CODE 参考 API Key,请登录 Textin 工作台 获取。示例中使用 通义千问 的大模型能力,其他模型用法类似。
下面我们将通过三个实际场景,展示如何构建 RAG 应用。

RAG 完整流程

文档文件 (PDF/Word/Excel...)

[xParse]
    ├─ Parse: 文档解析
    ├─ Chunk: 智能分块
    └─ Embed: 向量化

向量数据库 (Milvus/Zilliz)

[检索系统]
    ├─ 用户问题 → 向量化
    ├─ 向量相似度检索
    └─ 返回相关文档片段

[大模型生成]
    └─ 基于检索内容生成答案

场景 1:企业知识库构建

需求描述

某企业需要构建内部知识库,包含产品手册、技术文档、培训材料等。员工可以通过自然语言提问,快速找到相关信息。 文档特点
  • 格式多样:PDF、Word、Excel
  • 结构清晰:有明确的章节标题
  • 内容专业:技术术语较多
处理要求
  • 保持章节完整性
  • 支持语义检索
  • 快速响应查询

配置方案

针对结构化文档,我们使用 by_title 分块策略,保持章节完整性:
from xparse_client import create_pipeline_from_config
import os
from dotenv import load_dotenv

load_dotenv()

config = {
    # 数据源:本地文档目录
    'source': {
        'type': 'local',
        'directory': './knowledge_base',
        'pattern': ['*.pdf']  # 只匹配 PDF 文件
    },
    
    # 目标存储:Milvus 向量数据库
    'destination': {
        'type': 'milvus',
        'db_path': './kb_vectors.db',
        'collection_name': 'knowledge_base',
        'dimension': 1024
    },
    
    # API 配置
    'api_base_url': 'https://api.textin.com/api/xparse',
    'api_headers': {
        'x-ti-app-id': os.getenv('XTI_APP_ID'),
        'x-ti-secret-code': os.getenv('XTI_SECRET_CODE')
    },
    
    # Stages 配置:定义处理流程
    'stages': [
        {
            'type': 'parse',
            'config': {
                'provider': 'textin'
            }
        },
        {
            'type': 'chunk',
            'config': {
                'strategy': 'by_title',           # 按标题分块
                'include_orig_elements': False,
                'new_after_n_chars': 512,         # 512 字符后考虑新块
                'max_characters': 1536,           # 最大 1536 字符
                'overlap': 100                    # 章节间重叠 100 字符
            }
        },
        {
            'type': 'embed',
            'config': {
                'provider': 'qwen',
                'model_name': 'text-embedding-v4'  # 使用 v4 模型提高精度
            }
        }
    ]
}

# 运行 Pipeline
pipeline = create_pipeline_from_config(config)
pipeline.run()

完整代码示例

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
企业知识库构建示例
"""

from xparse_client import create_pipeline_from_config
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings
import os
from dotenv import load_dotenv

load_dotenv()

def build_knowledge_base():
    """构建知识库"""
    print("=" * 60)
    print("开始构建企业知识库...")
    print("=" * 60)
    
    # Pipeline 配置
    config = {
        'source': {
            'type': 'local',
            'directory': './knowledge_base',
            'pattern': ['*.pdf']
        },
        'destination': {
            'type': 'milvus',
            'db_path': './kb_vectors.db',
            'collection_name': 'knowledge_base',
            'dimension': 1024
        },
        'api_base_url': 'https://api.textin.com/api/xparse',
        'api_headers': {
            'x-ti-app-id': os.getenv('XTI_APP_ID'),
            'x-ti-secret-code': os.getenv('XTI_SECRET_CODE')
        },
        'stages': [
            {
                'type': 'parse',
                'config': {'provider': 'textin'}
            },
            {
                'type': 'chunk',
                'config': {
                    'strategy': 'by_title',
                    'max_characters': 1536,
                    'overlap': 100
                }
            },
            {
                'type': 'embed',
                'config': {
                    'provider': 'qwen',
                    'model_name': 'text-embedding-v4'
                }
            }
        ]
    }
    
    # 运行 Pipeline
    pipeline = create_pipeline_from_config(config)
    pipeline.run()
    
    print("\n" + "=" * 60)
    print("知识库构建完成!")
    print("=" * 60)

def query_knowledge_base(question: str, top_k: int = 5):
    """查询知识库"""
    # 使用与 Pipeline 相同的 embedding 配置
    embedding = DashScopeEmbeddings(model="text-embedding-v4")
    
    # 连接 Milvus 向量存储
    vector_store = Milvus(
        embedding_function=embedding,
        collection_name="knowledge_base",
        connection_args={"uri": "./kb_vectors.db"},
        vector_field="embeddings",
        primary_field="element_id",
        text_field="text",
        enable_dynamic_field=True
    )
    
    # 检索相似文档
    docs = vector_store.similarity_search(question, k=top_k)
    
    print(f"\n问题: {question}")
    print(f"\n找到 {len(docs)} 个相关文档片段:\n")
    
    for i, doc in enumerate(docs, 1):
        print(f"{i}. 文档: {doc.metadata.get('filename', 'N/A')}")
        print(f"   内容: {doc.page_content[:200]}...")
        print()

if __name__ == '__main__':
    # 构建知识库
    build_knowledge_base()
    
    # 查询示例
    query_knowledge_base("如何使用产品 API?")

场景 2:法律文档检索系统

需求描述

律师事务所需要构建法律文档检索系统,包含合同、判决书、法律条文等。律师可以通过关键词或自然语言快速检索相关案例和法律依据。 文档特点
  • 主要是 PDF 格式
  • 页面结构清晰
  • 需要保持页面完整性
  • 跨页内容需要关联
处理要求
  • 按页面分块,保持页面完整性
  • 支持精确检索
  • 保留文档元数据(如案件编号、日期等)

配置方案

针对 PDF 文档,我们使用 by_page 分块策略,保持页面完整性:
from xparse_client import create_pipeline_from_config
import os
from dotenv import load_dotenv

load_dotenv()

config = {
    # 数据源:本地法律文档
    'source': {
        'type': 'local',
        'directory': './legal_documents',
        'pattern': ['*.pdf']
    },
    
    # 目标存储:本地 Milvus 向量数据库
    'destination': {
        'type': 'milvus',
        'db_path': './legal_vectors.db',
        'collection_name': 'legal_documents',
        'dimension': 1024
    },
    
    # API 配置
    'api_base_url': 'https://api.textin.com/api/xparse',
    'api_headers': {
        'x-ti-app-id': os.getenv('XTI_APP_ID'),
        'x-ti-secret-code': os.getenv('XTI_SECRET_CODE')
    },
    
    # Stages 配置:定义处理流程
    'stages': [
        {
            'type': 'parse',
            'config': {
                'provider': 'textin'
            }
        },
        {
            'type': 'chunk',
            'config': {
                'strategy': 'by_page',           # 按页面分块
                'include_orig_elements': True,    # 保留原始元素,便于追溯
                'max_characters': 2048,          # PDF 页面可能较长
                'overlap': 150                   # 页面间重叠,保持上下文
            }
        },
        {
            'type': 'embed',
            'config': {
                'provider': 'qwen',
                'model_name': 'text-embedding-v4'
            }
        }
    ]
}

# 运行 Pipeline
pipeline = create_pipeline_from_config(config)
pipeline.run()

完整代码示例

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
法律文档检索系统示例
"""

from xparse_client import create_pipeline_from_config
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings
from typing import List, Dict
import os
from dotenv import load_dotenv

load_dotenv()

def build_legal_document_index():
    """构建法律文档索引"""
    print("=" * 60)
    print("开始构建法律文档检索系统...")
    print("=" * 60)
    
    config = {
        'source': {
            'type': 'local',
            'directory': './legal_documents',
            'pattern': ['*.pdf']
        },
        'destination': {
            'type': 'milvus',
            'db_path': './legal_vectors.db',
            'collection_name': 'legal_documents',
            'dimension': 1024
        },
        'api_base_url': 'https://api.textin.com/api/xparse',
        'api_headers': {
            'x-ti-app-id': os.getenv('XTI_APP_ID'),
            'x-ti-secret-code': os.getenv('XTI_SECRET_CODE')
        },
        'stages': [
            {
                'type': 'parse',
                'config': {'provider': 'textin'}
            },
            {
                'type': 'chunk',
                'config': {
                    'strategy': 'by_page',
                    'include_orig_elements': True,
                    'max_characters': 2048,
                    'overlap': 150
                }
            },
            {
                'type': 'embed',
                'config': {
                    'provider': 'qwen',
                    'model_name': 'text-embedding-v4'
                }
            }
        ]
    }
    
    pipeline = create_pipeline_from_config(config)
    pipeline.run()
    
    print("\n" + "=" * 60)
    print("法律文档索引构建完成!")
    print("=" * 60)

def search_legal_documents(query: str, case_type: str = None, top_k: int = 10) -> List[Dict]:
    """检索法律文档"""
    # 使用与 Pipeline 相同的 embedding 配置
    embedding = DashScopeEmbeddings(model="text-embedding-v4")
    
    # 连接本地 Milvus 向量存储
    vector_store = Milvus(
        embedding_function=embedding,
        collection_name="legal_documents",
        connection_args={"uri": "./legal_vectors.db"},
        vector_field="embeddings",
        primary_field="element_id",
        text_field="text",
        enable_dynamic_field=True
    )
    
    # 检索相似文档
    docs = vector_store.similarity_search(query, k=top_k)
    
    # 格式化结果
    results = []
    for doc in docs:
        results.append({
            'content': doc.page_content,
            'metadata': doc.metadata
        })
    
    return results

def format_search_results(results: List[Dict]) -> str:
    """格式化检索结果"""
    output = []
    for i, result in enumerate(results, 1):
        metadata = result.get('metadata', {})
        output.append(f"{i}. 文档: {metadata.get('filename', 'N/A')}")
        output.append(f"   页码: {metadata.get('page_number', 'N/A')}")
        output.append(f"   内容: {result.get('content', '')[:300]}...")
        output.append("")
    return "\n".join(output)

if __name__ == '__main__':
    # 构建索引
    build_legal_document_index()
    
    # 检索示例
    query = "合同违约责任"
    results = search_legal_documents(query, top_k=5)
    print(f"\n查询: {query}")
    print("\n检索结果:")
    print(format_search_results(results))

场景 3:技术文档问答系统

需求描述

技术团队需要构建 API 文档问答系统,开发者可以通过自然语言提问,快速找到 API 使用方法、参数说明等。 文档特点
  • 主要是 Markdown 和 PDF 格式
  • 代码示例较多
  • 结构相对简单
  • 需要精确匹配 API 名称
处理要求
  • 基础分块即可
  • 支持代码块识别
  • 快速检索响应

配置方案

针对技术文档,我们使用 basic 分块策略,简单高效:
from xparse_client import create_pipeline_from_config
import os
from dotenv import load_dotenv

load_dotenv()

config = {
    # 数据源:本地技术文档
    'source': {
        'type': 'local',
        'directory': './api_docs',
        'pattern': ['*.pdf']
    },
    
    # 目标存储:本地 Milvus
    'destination': {
        'type': 'milvus',
        'db_path': './api_docs.db',
        'collection_name': 'api_documentation',
        'dimension': 1024
    },
    
    # API 配置
    'api_base_url': 'https://api.textin.com/api/xparse',
    'api_headers': {
        'x-ti-app-id': os.getenv('XTI_APP_ID'),
        'x-ti-secret-code': os.getenv('XTI_SECRET_CODE')
    },
    
    # Stages 配置:定义处理流程
    'stages': [
        {
            'type': 'parse',
            'config': {
                'provider': 'textin'
            }
        },
        {
            'type': 'chunk',
            'config': {
                'strategy': 'basic',              # 基础分块
                'include_orig_elements': False,
                'new_after_n_chars': 512,
                'max_characters': 1024,
                'overlap': 50                    # 适度的重叠
            }
        },
        {
            'type': 'embed',
            'config': {
                'provider': 'qwen',
                'model_name': 'text-embedding-v3'  # 标准模型,速度快
            }
        }
    ]
}

# 运行 Pipeline
pipeline = create_pipeline_from_config(config)
pipeline.run()

完整代码示例(集成大模型)

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
技术文档问答系统示例(集成大模型)
"""

from xparse_client import create_pipeline_from_config
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings
from langchain_community.chat_models import ChatTongyi
from langchain_core.messages import HumanMessage
from typing import List, Dict
import os
from dotenv import load_dotenv

load_dotenv()

class APIDocQASystem:
    """API 文档问答系统"""
    
    def __init__(self, milvus_path: str, collection_name: str):
        self.milvus_path = milvus_path
        self.collection_name = collection_name
        
        # 使用与 Pipeline 相同的 embedding 配置
        self.embedding = DashScopeEmbeddings(model="text-embedding-v3")
        
        # 初始化向量存储
        self.vector_store = Milvus(
            embedding_function=self.embedding,
            collection_name=collection_name,
            connection_args={"uri": milvus_path},
            vector_field="embeddings",
            primary_field="element_id",
            text_field="text",
            enable_dynamic_field=True
        )
        
        # 初始化大模型
        self.llm = ChatTongyi(
            model="qwen-max",
            dashscope_api_key=os.getenv("DASHSCOPE_API_KEY")
        )
    
    def build_index(self):
        """构建文档索引"""
        config = {
            'source': {
                'type': 'local',
                'directory': './api_docs',
                'pattern': ['*.pdf']
            },
            'destination': {
                'type': 'milvus',
                'db_path': './api_docs.db',
                'collection_name': 'api_documentation',
                'dimension': 1024
            },
            'api_base_url': 'https://api.textin.com/api/xparse',
            'api_headers': {
                'x-ti-app-id': os.getenv('XTI_APP_ID'),
                'x-ti-secret-code': os.getenv('XTI_SECRET_CODE')
            },
            'stages': [
                {
                    'type': 'parse',
                    'config': {'provider': 'textin'}
                },
                {
                    'type': 'chunk',
                    'config': {
                        'strategy': 'basic',
                        'max_characters': 1024,
                        'overlap': 50
                    }
                },
                {
                    'type': 'embed',
                    'config': {
                        'provider': 'qwen',
                        'model_name': 'text-embedding-v3'
                    }
                }
            ]
        }
        
        pipeline = create_pipeline_from_config(config)
        pipeline.run()
        print("文档索引构建完成!")
    
    def retrieve(self, query: str, top_k: int = 3) -> List[Dict]:
        """检索相关文档"""
        # 使用向量存储进行检索
        docs = self.vector_store.similarity_search(query, k=top_k)
        
        # 格式化结果
        results = []
        for doc in docs:
            results.append({
                'content': doc.page_content,
                'metadata': doc.metadata
            })
        
        return results
    
    def answer(self, question: str) -> str:
        """基于检索结果生成答案"""
        # 检索相关文档
        results = self.retrieve(question, top_k=3)
        
        # 构建上下文
        context = "\n\n".join([
            f"文档片段 {i+1}:\n{result['content']}"
            for i, result in enumerate(results)
        ])
        
        # 调用大模型生成答案
        prompt = f"""基于以下文档内容回答用户问题。

                文档内容:
                {context}

                用户问题:{question}

                请基于文档内容回答问题,如果文档中没有相关信息,请说明。"""
        
        response = self.llm.invoke([HumanMessage(content=prompt)])
        return response.content

def main():
    # 创建问答系统
    qa_system = APIDocQASystem(
        milvus_path='./api_docs.db',
        collection_name='api_documentation'
    )
    
    # 构建索引(首次运行)
    # qa_system.build_index()
    
    # 问答示例
    questions = [
        "如何使用用户认证 API?",
        "API 的 rate limit 是多少?",
        "如何上传文件?"
    ]
    
    for question in questions:
        print(f"\n问题: {question}")
        answer = qa_system.answer(question)
        print(f"回答: {answer}")

if __name__ == '__main__':
    main()

与向量数据库集成

Milvus 使用说明

Milvus 是一个开源的向量数据库,xParse 会自动创建集合和索引。使用 LangChain 可以更方便地进行向量检索:
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings
import os
from dotenv import load_dotenv

load_dotenv()

# 使用与 Pipeline 相同的 embedding 配置
embedding = DashScopeEmbeddings(model="text-embedding-v4")

# 连接 Milvus 向量存储
vector_store = Milvus(
    embedding_function=embedding,
    collection_name="documents",
    connection_args={"uri": "./vectors.db"},
    vector_field="embeddings",
    primary_field="element_id",
    text_field="text",
    enable_dynamic_field=True
)

# 语义检索
docs = vector_store.similarity_search("用户认证", k=5)

for doc in docs:
    print(f"文档: {doc.metadata.get('filename', 'N/A')}")
    print(f"内容: {doc.page_content[:100]}...")
    print()

Zilliz 使用说明

Zilliz 是 Milvus 的云端托管版本,使用 LangChain 集成方式类似:
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings

# 使用与 Pipeline 相同的 embedding 配置
embedding = DashScopeEmbeddings(model="text-embedding-v4")

# 连接 Zilliz 向量存储
vector_store = Milvus(
    embedding_function=embedding,
    collection_name="documents",
    connection_args={
        "uri": "https://xxxxxxx.serverless.xxxxxxx.cloud.zilliz.com.cn",
        "token": "your-api-key"
    },
    vector_field="embeddings",
    primary_field="element_id",
    text_field="text",
    enable_dynamic_field=True
)

# 语义检索
docs = vector_store.similarity_search("用户认证", k=5)

检索和查询最佳实践

1. 使用 LangChain 进行检索

使用 LangChain 的向量存储可以自动处理查询向量化,无需手动调用 embed API:
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings
import os
from dotenv import load_dotenv

load_dotenv()

# 使用与 Pipeline 相同的 embedding 配置
embedding = DashScopeEmbeddings(model="text-embedding-v4")

# 连接向量存储
vector_store = Milvus(
    embedding_function=embedding,
    collection_name="documents",
    connection_args={"uri": "./vectors.db"},
    vector_field="embeddings",
    primary_field="element_id",
    text_field="text",
    enable_dynamic_field=True
)

# 直接使用自然语言查询,无需手动向量化
docs = vector_store.similarity_search("用户认证", k=5)

2. 相似度阈值

使用 LangChain 的 similarity_search_with_score 可以获取相似度分数:
def search_with_threshold(query: str, threshold: float = 0.7, top_k: int = 10):
    """带阈值的检索"""
    # 获取带分数的检索结果
    docs_with_scores = vector_store.similarity_search_with_score(query, k=top_k)
    
    # 过滤低相似度结果
    # 注意:LangChain 返回的分数是距离(越小越相似),需要转换为相似度
    filtered = [
        (doc, score) for doc, score in docs_with_scores
        if (1 - score) >= threshold  # COSINE 距离转换为相似度
    ]
    
    return filtered

3. 混合检索

结合向量检索和关键词检索:
def hybrid_search(query: str, vector_store, milvus_path, top_k: int = 5):
    """混合检索:向量 + 关键词"""
    from pymilvus import MilvusClient
    from langchain_core.documents import Document
    
    # 向量检索
    vector_docs = vector_store.similarity_search(query, k=top_k)
    
    # 从 vector_store 获取 collection 信息
    collection_name = vector_store.collection_name
    
    # 关键词检索(使用 Milvus 的查询功能)
    keyword_results = []
    try:
        client = MilvusClient(uri=milvus_path)
        
        # 检查 collection 是否存在
        collections = client.list_collections()
        if collection_name not in collections:
            print(f"Collection '{collection_name}' 不存在,跳过关键词检索")
            return vector_docs
        
        # 构建关键词查询表达式(使用 LIKE 进行模糊匹配)
        # 将查询字符串拆分为多个关键词
        keywords = query.split()
        
        if not keywords:
            return vector_docs
        
        # 构建查询表达式:匹配包含任一关键词的文档
        expr_parts = []
        for keyword in keywords:
            # 转义特殊字符,避免 SQL 注入风险
            escaped_keyword = keyword.replace("'", "''").replace("%", "\\%").replace("_", "\\_")
            expr_parts.append(f"text like '%{escaped_keyword}%'")
        
        # 使用 OR 连接多个关键词条件
        expr = " or ".join(expr_parts)
        
        # 执行查询
        keyword_data = client.query(
            collection_name=collection_name,
            filter=expr,
            limit=top_k,
            output_fields=["text", "metadata", "element_id"]
        )
        
        # 将查询结果转换为 Document 格式
        for item in keyword_data:
            doc = Document(
                page_content=item.get("text", ""),
                metadata=item.get("metadata", {}) if isinstance(item.get("metadata"), dict) else {}
            )
            # 确保 element_id 在 metadata 中
            if "element_id" in item:
                doc.metadata["element_id"] = item["element_id"]
            keyword_results.append(doc)
    except Exception as e:
        # 如果查询失败,记录错误但不中断程序
        print(f"关键词检索失败: {e}")
        keyword_results = []
    
    # 合并结果(去重)
    all_docs = {}
    for doc in vector_docs:
        doc_id = doc.metadata.get('element_id')
        if doc_id:
            all_docs[doc_id] = doc
    
    for doc in keyword_results:
        doc_id = doc.metadata.get('element_id')
        if doc_id and doc_id not in all_docs:
            all_docs[doc_id] = doc
    
    return list(all_docs.values())

性能优化建议

  1. 批量处理:使用 Pipeline 的批量处理能力,一次性处理多个文档
  2. 分块策略优化:根据文档类型选择合适的分块策略,减少不必要的分块
  3. 向量模型选择:平衡精度和速度,生产环境可考虑使用 text-embedding-v3
  4. 索引优化:在 Milvus 中创建合适的索引,提升检索速度

下一步

  • 查看快速启动指南:了解 Pipeline 的基本使用方法
  • 阅读API 文档:了解详细的接口参数和配置选项
  • 探索更多场景:根据您的业务需求,定制 Pipeline 配置
如果您在构建 RAG 应用时遇到问题,可以参考这些示例代码,或联系技术支持获取帮助。