跳转到主要内容
本教程将展示如何构建一个智能文档助手,它能够:
  • 自动解析新上传的文档并更新知识库
  • 根据用户问题智能检索相关文档内容
  • 自动决定何时需要解析新文档,何时直接检索回答

什么是智能文档助手?

想象这样一个场景:你的团队每天都会上传新的合同、FAQ、产品手册等文档到云存储。你希望有一个 AI 助手能够:
  1. 自动处理新文档:当有新文档上传时,自动解析并存入知识库
  2. 智能回答问题:当用户提问时,自动从知识库中找到相关内容并回答
  3. 自动判断:如果知识库中没有相关信息,自动触发文档解析;如果有,直接检索回答
这就是我们要构建的智能文档助手。

工作原理

整个系统的工作流程如下:
用户提问:"最新版本的新功能有哪些?"

[LangChain Agent] 分析问题

Agent 判断:需要先检索知识库

[Tool: vector_search] 在向量库中搜索

结果:没找到最新版本的信息

Agent 判断:需要解析新文档

[Tool: run_xparse_client] 调用 xParse SDK 解析文档,LangChain 分块+向量化后更新知识库

再次检索,找到相关内容

Agent 组织回答并返回给用户

环境准备

首先安装必要的依赖:
python -m venv .venv && source .venv/bin/activate
pip install xparse-client langchain langchain-community langchain-core langchain-text-splitters langchain-milvus \
            python-dotenv dashscope
创建 .env 文件存储配置:
# .env
TEXTIN_APP_ID=your-app-id
TEXTIN_SECRET_CODE=your-secret-code
MILVUS_DB_PATH=./agent_vectors.db
DASHSCOPE_API_KEY=your-dashscope-key
提示:TEXTIN_APP_IDTEXTIN_SECRET_CODE 参考 API Key,请登录 Textin 工作台 获取。示例中使用 通义千问 的大模型能力,其他模型用法类似。

完整代码示例

import os
import glob
from dotenv import load_dotenv
from xparse_client import XParseClient
from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter
from langchain_core.tools import Tool
from langchain_core.documents import Document
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings
from langchain.agents import initialize_agent, AgentType
from langchain_community.chat_models import ChatTongyi

# 加载环境变量
load_dotenv()

# ========== Step 1: 初始化 xParse SDK 客户端 ==========

DOCS_DIR = "/your/doc/folder"

client = XParseClient()

headers_to_split_on = [("#", "header1"), ("##", "header2"), ("###", "header3")]
markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=80)

def process_single_file(file_path: str) -> str:
    """处理单个文件并存入知识库"""
    try:
        with open(file_path, "rb") as f:
            result = client.parse.run(file=f, filename=os.path.basename(file_path))

        md_docs = markdown_splitter.split_text(result.markdown)
        for doc in md_docs:
            doc.metadata["filename"] = os.path.basename(file_path)
        chunks = text_splitter.split_documents(md_docs)

        embedding = DashScopeEmbeddings(model="text-embedding-v4")
        Milvus.from_documents(
            documents=chunks,
            embedding=embedding,
            collection_name="agent_docs",
            connection_args={"uri": os.getenv("MILVUS_DB_PATH")},
        )
        return f"✅ 成功处理文件 {file_path} 并已存入知识库。"
    except Exception as e:
        return f"❌ 处理文件 {file_path} 时出错:{str(e)}"

def build_knowledge_base() -> str:
    """解析目录中的所有文档并构建知识库"""
    try:
        all_chunks = []
        for file_path in glob.glob(os.path.join(DOCS_DIR, "*")):
            if not os.path.isfile(file_path):
                continue
            with open(file_path, "rb") as f:
                result = client.parse.run(file=f, filename=os.path.basename(file_path))
            md_docs = markdown_splitter.split_text(result.markdown)
            for doc in md_docs:
                doc.metadata["filename"] = os.path.basename(file_path)
            chunks = text_splitter.split_documents(md_docs)
            all_chunks.extend(chunks)

        embedding = DashScopeEmbeddings(model="text-embedding-v4")
        Milvus.from_documents(
            documents=all_chunks,
            embedding=embedding,
            collection_name="agent_docs",
            connection_args={"uri": os.getenv("MILVUS_DB_PATH")},
        )
        return f"✅ 已处理所有文件并已存入知识库。"
    except Exception as e:
        return f"❌ 构建知识库时出错:{str(e)}"

# ========== Step 2: 初始化向量数据库 ==========

embedding = DashScopeEmbeddings(model="text-embedding-v4")

vector_store = Milvus(
    embedding_function=embedding,
    collection_name="agent_docs",
    connection_args={"uri": os.getenv("MILVUS_DB_PATH")},
)

# ========== Step 3: 构建 LangChain Tools ==========

def pipeline_tool_fn(doc_hint: str) -> str:
    """
    文档处理工具:根据输入决定处理单个文件还是整个目录
    
    输入示例:
    - "处理 contracts/2025Q1/contract.pdf" -> 处理单个文件
    - "更新所有文档" 或 "同步文档库" -> 处理整个目录
    """
    if doc_hint and ("/" in doc_hint or "\\" in doc_hint):
        file_path = doc_hint.strip()
        return process_single_file(file_path)
    else:
        return build_knowledge_base()

def search_tool_fn(query: str) -> str:
    """
    向量检索工具:在知识库中搜索相关内容
    
    返回格式化的检索结果,包含文档来源和内容
    """
    docs = vector_store.similarity_search(query, k=4)
    if not docs:
        return "❌ 在知识库中未找到相关内容。建议先运行文档解析工具更新知识库。"
    
    results = []
    for i, doc in enumerate(docs, 1):
        filename = doc.metadata.get('filename', '未知文件')
        header1 = doc.metadata.get('header1', '')
        header2 = doc.metadata.get('header2', '')
        section = f" > {header1}" if header1 else ""
        section += f" > {header2}" if header2 else ""
        content = doc.page_content[:500]
        results.append(f"[{i}] 来源:{filename}{section}\n内容:{content}...")
    
    return "\n\n".join(results)

# 定义工具列表
tools = [
    Tool(
        name="run_xparse_client",
        description="当需要解析新文档或更新知识库时使用此工具。输入可以是文件路径(如 'contracts/doc.pdf')或更新指令(如 '更新所有文档')。",
        func=pipeline_tool_fn
    ),
    Tool(
        name="vector_search",
        description="当需要基于知识库内容回答问题时使用此工具。输入是用户的自然语言问题,工具会在知识库中搜索相关内容。",
        func=search_tool_fn
    )
]

# ========== Step 4: 初始化 Agent ==========

llm = ChatTongyi(
    model="qwen-max",
    top_p=0.8,
    dashscope_api_key=os.getenv("DASHSCOPE_API_KEY")
)

agent = initialize_agent(
    tools=tools,
    llm=llm,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True,  # 显示 Agent 的思考过程
)

# ========== Step 5: 使用示例 ==========

if __name__ == "__main__":
    # 示例 1: 用户提问,Agent 会自动检索知识库
    print("=" * 60)
    print("示例 1: 用户提问")
    print("=" * 60)
    response = agent.invoke({
        "input": "如何安装milvus?"
    })
    print(response["output"])
    print()
    
    # 示例 2: 用户要求更新文档,Agent 会调用解析工具
    print("=" * 60)
    print("示例 2: 更新文档库")
    print("=" * 60)
    response = agent.invoke({
        "input": "请更新所有文档到知识库"
    })
    print(response["output"])
    print()
    
    # 示例 3: 用户提问但知识库中没有,Agent 会先解析再检索
    print("=" * 60)
    print("示例 3: 智能判断")
    print("=" * 60)
    response = agent.invoke({
        "input": "最新版本的新功能有哪些?如果没有相关信息,请先解析 Milvus_DEVELOPMENT.pdf"
    })
    print(response["output"])

代码说明

Step 1: xParse SDK 客户端初始化

XParseClient 用于初始化 xParse 客户端,它会从环境变量中读取 TEXTIN_APP_IDTEXTIN_SECRET_CODE 进行认证。调用 client.parse.run() 即可将文档解析为 Markdown,然后使用 LangChain 的 MarkdownHeaderTextSplitter 按标题层级分块,再用 RecursiveCharacterTextSplitter 控制块的大小,最后通过 DashScopeEmbeddings 向量化后存入 Milvus。 重要XParseClient 只需要初始化一次,可以在全局复用。

Step 2: 向量数据库

向量数据库用于存储文档的向量表示,支持语义搜索。关键点:检索时必须使用与构建知识库时相同的 embedding 模型,否则语义空间不一致,检索效果会变差。

Step 3: LangChain Tools

Tools 是 Agent 可以调用的函数。我们定义了两个工具:
  1. run_xparse_client:处理文档的工具
    • 如果输入是文件路径,处理单个文件
    • 如果输入是更新指令,处理整个目录
  2. vector_search:检索知识库的工具
    • 根据用户问题在向量库中搜索相关内容
    • 返回格式化的结果,包含文档来源

Step 4: Agent 初始化

Agent 是”大脑”,它会:
  • 理解用户的问题
  • 决定调用哪个工具
  • 根据工具返回结果组织最终回答

Step 5: 使用

Agent 会自动判断:
  • 用户提问 → 先检索知识库
  • 知识库没有答案 → 调用解析工具更新知识库,再检索
  • 用户要求更新 → 直接调用解析工具

实际应用场景

场景 1: 客服助手

需求:客服团队经常收到产品相关问题,需要快速从 FAQ 和产品手册中找到答案。 实现
  • 将 FAQ 和产品手册放在 ./documents/faqs/ 目录
  • 用户提问时,Agent 自动检索并回答
  • 有新版本文档时,Agent 自动更新知识库

场景 2: 合同管理

需求:法务团队需要快速查找合同中的特定条款。 实现
  • 将合同文档放在 ./documents/contracts/ 目录
  • xParse 解析后按 Markdown 标题分块,保持章节完整性
  • 用户提问”违约条款”,Agent 自动检索相关章节

场景 3: 知识库维护

需求:定期更新知识库,确保信息是最新的。 实现
  • 设置定时任务,定期调用 build_knowledge_base()
  • 或者通过 Agent 接口,用户说”更新文档库”,Agent 自动处理

进阶优化

1. 添加对话历史

让 Agent 记住之前的对话:
from langchain.memory import ConversationBufferMemory

# 创建记忆组件
memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)

# 在初始化 Agent 时添加记忆
agent = initialize_agent(
    tools=tools,
    llm=llm,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True,
    memory=memory  # 添加记忆组件
)

def chat_with_agent(query: str):
    """Agent 会自动使用 memory 记住对话历史"""
    response = agent.invoke({
        "input": query
    })
    return response["output"]

2. 添加限流保护

避免频繁调用文档解析:
import time
from datetime import datetime, timedelta

last_run_time = None
MIN_INTERVAL = timedelta(minutes=10)  # 最小间隔10分钟

def pipeline_tool_fn(doc_hint: str) -> str:
    global last_run_time
    
    # 检查是否在冷却期
    if last_run_time and datetime.now() - last_run_time < MIN_INTERVAL:
        return "⚠️ 文档解析刚刚运行过,请稍后再试(建议间隔10分钟以上)。"
    
    # 执行处理
    result = process_single_file(doc_hint) if "/" in doc_hint else build_knowledge_base()
    last_run_time = datetime.now()
    return result

3. 添加引用来源

在回答中标注信息来源:
def search_tool_fn(query: str) -> str:
    docs = vector_store.similarity_search(query, k=4)
    if not docs:
        return "❌ 在知识库中未找到相关内容。"
    
    results = []
    sources = []  # 收集来源信息
    for i, doc in enumerate(docs, 1):
        filename = doc.metadata.get('filename', '未知文件')
        header1 = doc.metadata.get('header1', '')
        sources.append(f"{filename}#{header1}")
        results.append(f"[{i}] {filename} ({header1})\n{doc.page_content[:500]}...")
    
    # 在结果末尾添加来源列表
    results.append(f"\n📚 参考来源:{', '.join(sources)}")
    return "\n\n".join(results)

常见问题

Q: 长文档解析时间长,会影响用户体验吗? A: 是的,如果文档很大,解析可能需要一些时间。建议:
  • 对于大文档,使用异步处理,先返回”任务已提交”
  • 或者限制单次处理的文件数量
Q: 如何让 Agent 只检索,不自动触发解析? A: 修改 Tool 的 description,明确说明使用场景,或者添加一个开关参数。 Q: 向量数据库中的数据会过期吗? A: 不会自动过期。如果需要更新,需要重新调用 build_knowledge_base() 构建知识库,新数据会追加到向量库中。 Q: 可以使用其他 LLM 吗? A: 可以。LangChain 支持多种 LLM,只需替换 ChatTongyi(通义千问) 为对应的类,如 ChatOpenAI(OpenAI)、ChatZhipuAI(智谱AI)等。 Q: 如何实现增量处理? A: 可以通过 process_single_file() 逐个处理新增文档,也可以自行维护已处理文件列表来实现增量逻辑。

总结

通过本教程,你已经学会了如何构建一个智能文档助手。核心思路是:
  1. xParse SDK 负责解析:调用 client.parse.run() 将文档解析为 Markdown,再通过 LangChain 进行分块、向量化并存入数据库
  2. Agent 负责决策:根据用户问题,决定调用哪个工具
  3. Tools 负责执行:具体的文档处理和检索操作
这样,你就有了一个”能自己跑文档”的 AI 助手!