跳转到主要内容
本教程将带您了解如何使用 xParse SDK 和 LangChain 构建完整的 RAG(检索增强生成)应用。我们将通过三个实际场景,展示从文档解析到智能分块、向量化到检索的完整流程。

什么是 RAG?

RAG(Retrieval-Augmented Generation,检索增强生成)是一种结合信息检索和生成式 AI 的技术。通过 RAG,大模型可以基于企业知识库进行回答,而不是仅依赖训练数据,从而提供更准确、更相关的答案。 RAG 的核心流程包括:
  1. 文档解析:使用 xParse SDK 将非结构化文档转换为 Markdown 和结构化元素
  2. 智能分块:使用 LangChain 文本分割器对解析结果进行分块
  3. 向量化存储:使用 LangChain Embeddings 将分块向量化并存入向量数据库
  4. 检索查询:根据用户问题检索相关文档片段
  5. 生成回答:将检索到的内容作为上下文,让大模型生成答案
xParse SDK 负责高质量的文档解析,LangChain 负责后续的分块、向量化和检索,两者结合为您提供灵活的 RAG 构建能力。

环境准备

首先安装必要的依赖:
python -m venv .venv && source .venv/bin/activate
pip install xparse-client langchain langchain-community langchain-core \
            langchain-text-splitters langchain-milvus python-dotenv
创建 .env 文件存储配置:
# .env
TEXTIN_APP_ID=your-app-id
TEXTIN_SECRET_CODE=your-secret-code
DASHSCOPE_API_KEY=your-dashscope-key
提示:TEXTIN_APP_IDTEXTIN_SECRET_CODE 参考 API Key,请登录 Textin 工作台 获取。示例中使用 通义千问 的大模型能力,其他模型用法类似。
下面我们将通过三个实际场景,展示如何构建 RAG 应用。

RAG 完整流程

文档文件 (PDF/Word/Excel...)

[xParse SDK 文档解析]
    └─ client.parse.run() → Markdown + 结构化元素

[LangChain 智能分块]
    ├─ MarkdownHeaderTextSplitter(按标题)
    ├─ RecursiveCharacterTextSplitter(按字符)
    └─ 按页面分组(按元素 page_number)

[LangChain 向量化 + 存储]
    ├─ DashScopeEmbeddings 向量化
    └─ Milvus.from_documents() 存入向量数据库

[检索系统]
    ├─ 用户问题 → 向量化
    ├─ 向量相似度检索
    └─ 返回相关文档片段

[大模型生成]
    └─ 基于检索内容生成答案

场景 1:企业知识库构建

需求描述

某企业需要构建内部知识库,包含产品手册、技术文档、培训材料等。员工可以通过自然语言提问,快速找到相关信息。 文档特点
  • 格式多样:PDF、Word、Excel
  • 结构清晰:有明确的章节标题
  • 内容专业:技术术语较多
处理要求
  • 保持章节完整性
  • 支持语义检索
  • 快速响应查询

配置方案

针对结构化文档,我们使用 MarkdownHeaderTextSplitter 按标题分块,保持章节完整性:
from xparse_client import XParseClient
from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_community.embeddings import DashScopeEmbeddings
from langchain_milvus import Milvus
import os, glob
from dotenv import load_dotenv

load_dotenv()

client = XParseClient()

# 按标题分块配置
headers_to_split_on = [("#", "header1"), ("##", "header2"), ("###", "header3")]
markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1536, chunk_overlap=100)

# 解析并分块
all_chunks = []
for file_path in glob.glob("./knowledge_base/*.pdf"):
    with open(file_path, "rb") as f:
        result = client.parse.run(file=f, filename=os.path.basename(file_path))
    md_docs = markdown_splitter.split_text(result.markdown)
    for doc in md_docs:
        doc.metadata["filename"] = os.path.basename(file_path)
    chunks = text_splitter.split_documents(md_docs)
    all_chunks.extend(chunks)

# 向量化并存入 Milvus
embedding = DashScopeEmbeddings(model="text-embedding-v4")
vector_store = Milvus.from_documents(
    documents=all_chunks,
    embedding=embedding,
    collection_name="knowledge_base",
    connection_args={"uri": "./kb_vectors.db"},
)

完整代码示例

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
企业知识库构建示例
"""

from xparse_client import XParseClient
from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings
import os, glob
from dotenv import load_dotenv

load_dotenv()

def build_knowledge_base():
    """构建知识库"""
    print("=" * 60)
    print("开始构建企业知识库...")
    print("=" * 60)
    
    client = XParseClient()
    
    headers_to_split_on = [("#", "header1"), ("##", "header2"), ("###", "header3")]
    markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1536, chunk_overlap=100)
    
    all_chunks = []
    for file_path in glob.glob("./knowledge_base/*.pdf"):
        with open(file_path, "rb") as f:
            result = client.parse.run(file=f, filename=os.path.basename(file_path))
        md_docs = markdown_splitter.split_text(result.markdown)
        for doc in md_docs:
            doc.metadata["filename"] = os.path.basename(file_path)
        chunks = text_splitter.split_documents(md_docs)
        all_chunks.extend(chunks)
    
    embedding = DashScopeEmbeddings(model="text-embedding-v4")
    vector_store = Milvus.from_documents(
        documents=all_chunks,
        embedding=embedding,
        collection_name="knowledge_base",
        connection_args={"uri": "./kb_vectors.db"},
    )
    
    print(f"\n共处理 {len(all_chunks)} 个文档片段")
    print("\n" + "=" * 60)
    print("知识库构建完成!")
    print("=" * 60)

def query_knowledge_base(question: str, top_k: int = 5):
    """查询知识库"""
    embedding = DashScopeEmbeddings(model="text-embedding-v4")
    
    vector_store = Milvus(
        embedding_function=embedding,
        collection_name="knowledge_base",
        connection_args={"uri": "./kb_vectors.db"},
    )
    
    docs = vector_store.similarity_search(question, k=top_k)
    
    print(f"\n问题: {question}")
    print(f"\n找到 {len(docs)} 个相关文档片段:\n")
    
    for i, doc in enumerate(docs, 1):
        print(f"{i}. 文档: {doc.metadata.get('filename', 'N/A')}")
        print(f"   内容: {doc.page_content[:200]}...")
        print()

if __name__ == '__main__':
    # 构建知识库
    build_knowledge_base()
    
    # 查询示例
    query_knowledge_base("如何使用产品 API?")

场景 2:法律文档检索系统

需求描述

律师事务所需要构建法律文档检索系统,包含合同、判决书、法律条文等。律师可以通过关键词或自然语言快速检索相关案例和法律依据。 文档特点
  • 主要是 PDF 格式
  • 页面结构清晰
  • 需要保持页面完整性
  • 跨页内容需要关联
处理要求
  • 按页面分块,保持页面完整性
  • 支持精确检索
  • 保留文档元数据(如案件编号、日期等)

配置方案

针对 PDF 文档,我们通过 xParse 解析出的元素按 page_number 分组,保持页面完整性:
from xparse_client import XParseClient
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_community.embeddings import DashScopeEmbeddings
from langchain_milvus import Milvus
from collections import defaultdict
import os, glob
from dotenv import load_dotenv

load_dotenv()

client = XParseClient()

# 按页面分块
all_chunks = []
for file_path in glob.glob("./legal_documents/*.pdf"):
    with open(file_path, "rb") as f:
        result = client.parse.run(file=f, filename=os.path.basename(file_path))
    page_texts = defaultdict(list)
    for el in result.elements:
        page_texts[el.page_number].append(el.text)
    page_docs = [
        Document(
            page_content="\n\n".join(texts),
            metadata={"filename": os.path.basename(file_path), "page_number": pn}
        )
        for pn, texts in sorted(page_texts.items())
    ]
    chunks = RecursiveCharacterTextSplitter(
        chunk_size=2048, chunk_overlap=150
    ).split_documents(page_docs)
    all_chunks.extend(chunks)

# 向量化并存入 Milvus
embedding = DashScopeEmbeddings(model="text-embedding-v4")
vector_store = Milvus.from_documents(
    documents=all_chunks,
    embedding=embedding,
    collection_name="legal_documents",
    connection_args={"uri": "./legal_vectors.db"},
)

完整代码示例

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
法律文档检索系统示例
"""

from xparse_client import XParseClient
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings
from collections import defaultdict
from typing import List, Dict
import os, glob
from dotenv import load_dotenv

load_dotenv()

def build_legal_document_index():
    """构建法律文档索引"""
    print("=" * 60)
    print("开始构建法律文档检索系统...")
    print("=" * 60)
    
    client = XParseClient()
    
    all_chunks = []
    for file_path in glob.glob("./legal_documents/*.pdf"):
        with open(file_path, "rb") as f:
            result = client.parse.run(file=f, filename=os.path.basename(file_path))
        page_texts = defaultdict(list)
        for el in result.elements:
            page_texts[el.page_number].append(el.text)
        page_docs = [
            Document(
                page_content="\n\n".join(texts),
                metadata={"filename": os.path.basename(file_path), "page_number": pn}
            )
            for pn, texts in sorted(page_texts.items())
        ]
        chunks = RecursiveCharacterTextSplitter(
            chunk_size=2048, chunk_overlap=150
        ).split_documents(page_docs)
        all_chunks.extend(chunks)
    
    embedding = DashScopeEmbeddings(model="text-embedding-v4")
    vector_store = Milvus.from_documents(
        documents=all_chunks,
        embedding=embedding,
        collection_name="legal_documents",
        connection_args={"uri": "./legal_vectors.db"},
    )
    
    print(f"\n共处理 {len(all_chunks)} 个文档片段")
    print("\n" + "=" * 60)
    print("法律文档索引构建完成!")
    print("=" * 60)

def search_legal_documents(query: str, case_type: str = None, top_k: int = 10) -> List[Dict]:
    """检索法律文档"""
    embedding = DashScopeEmbeddings(model="text-embedding-v4")
    
    vector_store = Milvus(
        embedding_function=embedding,
        collection_name="legal_documents",
        connection_args={"uri": "./legal_vectors.db"},
    )
    
    docs = vector_store.similarity_search(query, k=top_k)
    
    results = []
    for doc in docs:
        results.append({
            'content': doc.page_content,
            'metadata': doc.metadata
        })
    
    return results

def format_search_results(results: List[Dict]) -> str:
    """格式化检索结果"""
    output = []
    for i, result in enumerate(results, 1):
        metadata = result.get('metadata', {})
        output.append(f"{i}. 文档: {metadata.get('filename', 'N/A')}")
        output.append(f"   页码: {metadata.get('page_number', 'N/A')}")
        output.append(f"   内容: {result.get('content', '')[:300]}...")
        output.append("")
    return "\n".join(output)

if __name__ == '__main__':
    # 构建索引
    build_legal_document_index()
    
    # 检索示例
    query = "合同违约责任"
    results = search_legal_documents(query, top_k=5)
    print(f"\n查询: {query}")
    print("\n检索结果:")
    print(format_search_results(results))

场景 3:技术文档问答系统

需求描述

技术团队需要构建 API 文档问答系统,开发者可以通过自然语言提问,快速找到 API 使用方法、参数说明等。 文档特点
  • 主要是 Markdown 和 PDF 格式
  • 代码示例较多
  • 结构相对简单
  • 需要精确匹配 API 名称
处理要求
  • 基础分块即可
  • 支持代码块识别
  • 快速检索响应

配置方案

针对技术文档,我们使用 RecursiveCharacterTextSplitter 进行基础分块,简单高效:
from xparse_client import XParseClient
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_community.embeddings import DashScopeEmbeddings
from langchain_milvus import Milvus
import os, glob
from dotenv import load_dotenv

load_dotenv()

client = XParseClient()

# 基础分块
all_chunks = []
for file_path in glob.glob("./api_docs/*.pdf"):
    with open(file_path, "rb") as f:
        result = client.parse.run(file=f, filename=os.path.basename(file_path))
    doc = Document(
        page_content=result.markdown,
        metadata={"filename": os.path.basename(file_path)}
    )
    chunks = RecursiveCharacterTextSplitter(
        chunk_size=1024, chunk_overlap=50
    ).split_documents([doc])
    all_chunks.extend(chunks)

# 向量化并存入 Milvus
embedding = DashScopeEmbeddings(model="text-embedding-v3")
vector_store = Milvus.from_documents(
    documents=all_chunks,
    embedding=embedding,
    collection_name="api_documentation",
    connection_args={"uri": "./api_docs.db"},
)

完整代码示例(集成大模型)

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
技术文档问答系统示例(集成大模型)
"""

from xparse_client import XParseClient
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings
from langchain_community.chat_models import ChatTongyi
from langchain_core.messages import HumanMessage
from typing import List, Dict
import os, glob
from dotenv import load_dotenv

load_dotenv()

class APIDocQASystem:
    """API 文档问答系统"""
    
    def __init__(self, milvus_path: str, collection_name: str):
        self.milvus_path = milvus_path
        self.collection_name = collection_name
        
        self.embedding = DashScopeEmbeddings(model="text-embedding-v3")
        
        self.vector_store = Milvus(
            embedding_function=self.embedding,
            collection_name=collection_name,
            connection_args={"uri": milvus_path},
        )
        
        self.llm = ChatTongyi(
            model="qwen-max",
            dashscope_api_key=os.getenv("DASHSCOPE_API_KEY")
        )
    
    def build_index(self):
        """构建文档索引"""
        client = XParseClient()
        
        all_chunks = []
        for file_path in glob.glob("./api_docs/*.pdf"):
            with open(file_path, "rb") as f:
                result = client.parse.run(file=f, filename=os.path.basename(file_path))
            doc = Document(
                page_content=result.markdown,
                metadata={"filename": os.path.basename(file_path)}
            )
            chunks = RecursiveCharacterTextSplitter(
                chunk_size=1024, chunk_overlap=50
            ).split_documents([doc])
            all_chunks.extend(chunks)
        
        embedding = DashScopeEmbeddings(model="text-embedding-v3")
        self.vector_store = Milvus.from_documents(
            documents=all_chunks,
            embedding=embedding,
            collection_name=self.collection_name,
            connection_args={"uri": self.milvus_path},
        )
        print("文档索引构建完成!")
    
    def retrieve(self, query: str, top_k: int = 3) -> List[Dict]:
        """检索相关文档"""
        docs = self.vector_store.similarity_search(query, k=top_k)
        
        results = []
        for doc in docs:
            results.append({
                'content': doc.page_content,
                'metadata': doc.metadata
            })
        
        return results
    
    def answer(self, question: str) -> str:
        """基于检索结果生成答案"""
        results = self.retrieve(question, top_k=3)
        
        context = "\n\n".join([
            f"文档片段 {i+1}:\n{result['content']}"
            for i, result in enumerate(results)
        ])
        
        prompt = f"""基于以下文档内容回答用户问题。

                文档内容:
                {context}

                用户问题:{question}

                请基于文档内容回答问题,如果文档中没有相关信息,请说明。"""
        
        response = self.llm.invoke([HumanMessage(content=prompt)])
        return response.content

def main():
    qa_system = APIDocQASystem(
        milvus_path='./api_docs.db',
        collection_name='api_documentation'
    )
    
    # 构建索引(首次运行)
    # qa_system.build_index()
    
    # 问答示例
    questions = [
        "如何使用用户认证 API?",
        "API 的 rate limit 是多少?",
        "如何上传文件?"
    ]
    
    for question in questions:
        print(f"\n问题: {question}")
        answer = qa_system.answer(question)
        print(f"回答: {answer}")

if __name__ == '__main__':
    main()

与向量数据库集成

Milvus 使用说明

Milvus 是一个开源的向量数据库,通过 LangChain 可以方便地将 xParse 解析的文档存入 Milvus 并进行向量检索:
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings
import os
from dotenv import load_dotenv

load_dotenv()

embedding = DashScopeEmbeddings(model="text-embedding-v4")

# 连接 Milvus 向量存储
vector_store = Milvus(
    embedding_function=embedding,
    collection_name="documents",
    connection_args={"uri": "./vectors.db"},
)

# 语义检索
docs = vector_store.similarity_search("用户认证", k=5)

for doc in docs:
    print(f"文档: {doc.metadata.get('filename', 'N/A')}")
    print(f"内容: {doc.page_content[:100]}...")
    print()

Zilliz 使用说明

Zilliz 是 Milvus 的云端托管版本,使用 LangChain 集成方式类似:
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings

embedding = DashScopeEmbeddings(model="text-embedding-v4")

# 连接 Zilliz 向量存储
vector_store = Milvus(
    embedding_function=embedding,
    collection_name="documents",
    connection_args={
        "uri": "https://xxxxxxx.serverless.xxxxxxx.cloud.zilliz.com.cn",
        "token": "your-api-key"
    },
)

# 语义检索
docs = vector_store.similarity_search("用户认证", k=5)

检索和查询最佳实践

1. 使用 LangChain 进行检索

使用 LangChain 的向量存储可以自动处理查询向量化,无需手动调用 embed API:
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings
import os
from dotenv import load_dotenv

load_dotenv()

embedding = DashScopeEmbeddings(model="text-embedding-v4")

# 连接向量存储
vector_store = Milvus(
    embedding_function=embedding,
    collection_name="documents",
    connection_args={"uri": "./vectors.db"},
)

# 直接使用自然语言查询,无需手动向量化
docs = vector_store.similarity_search("用户认证", k=5)

2. 相似度阈值

使用 LangChain 的 similarity_search_with_score 可以获取相似度分数:
def search_with_threshold(query: str, threshold: float = 0.7, top_k: int = 10):
    """带阈值的检索"""
    # 获取带分数的检索结果
    docs_with_scores = vector_store.similarity_search_with_score(query, k=top_k)
    
    # 过滤低相似度结果
    # 注意:LangChain 返回的分数是距离(越小越相似),需要转换为相似度
    filtered = [
        (doc, score) for doc, score in docs_with_scores
        if (1 - score) >= threshold  # COSINE 距离转换为相似度
    ]
    
    return filtered

3. 混合检索

结合向量检索和关键词检索:
def hybrid_search(query: str, vector_store, milvus_path, top_k: int = 5):
    """混合检索:向量 + 关键词"""
    from pymilvus import MilvusClient
    from langchain_core.documents import Document
    
    # 向量检索
    vector_docs = vector_store.similarity_search(query, k=top_k)
    
    collection_name = vector_store.collection_name
    
    # 关键词检索(使用 Milvus 的查询功能)
    keyword_results = []
    try:
        client = MilvusClient(uri=milvus_path)
        
        collections = client.list_collections()
        if collection_name not in collections:
            print(f"Collection '{collection_name}' 不存在,跳过关键词检索")
            return vector_docs
        
        keywords = query.split()
        
        if not keywords:
            return vector_docs
        
        expr_parts = []
        for keyword in keywords:
            escaped_keyword = keyword.replace("'", "''").replace("%", "\\%").replace("_", "\\_")
            expr_parts.append(f"text like '%{escaped_keyword}%'")
        
        expr = " or ".join(expr_parts)
        
        keyword_data = client.query(
            collection_name=collection_name,
            filter=expr,
            limit=top_k,
            output_fields=["text", "metadata"]
        )
        
        for item in keyword_data:
            doc = Document(
                page_content=item.get("text", ""),
                metadata=item.get("metadata", {}) if isinstance(item.get("metadata"), dict) else {}
            )
            keyword_results.append(doc)
    except Exception as e:
        print(f"关键词检索失败: {e}")
        keyword_results = []
    
    # 合并结果(去重)
    all_docs = {}
    for doc in vector_docs:
        doc_id = doc.page_content[:100]
        all_docs[doc_id] = doc
    
    for doc in keyword_results:
        doc_id = doc.page_content[:100]
        if doc_id not in all_docs:
            all_docs[doc_id] = doc
    
    return list(all_docs.values())

性能优化建议

  1. 批量处理:使用 xParse SDK 批量解析多个文档,配合 LangChain 批量分块和向量化
  2. 分块策略优化:根据文档类型选择合适的 LangChain 分割器,减少不必要的分块
  3. 向量模型选择:平衡精度和速度,生产环境可考虑使用 text-embedding-v3
  4. 索引优化:在 Milvus 中创建合适的索引,提升检索速度

下一步

  • 查看快速启动指南:了解 xParse SDK 的基本使用方法
  • 阅读API 文档:了解详细的接口参数和配置选项
  • 探索更多场景:根据您的业务需求,灵活组合 xParse SDK 和 LangChain 能力
如果您在构建 RAG 应用时遇到问题,可以参考这些示例代码,或联系技术支持获取帮助。