跳转到主要内容
本教程将展示如何构建一个智能文档助手,它能够:
  • 自动解析新上传的文档并更新知识库
  • 根据用户问题智能检索相关文档内容
  • 自动决定何时需要解析新文档,何时直接检索回答

什么是智能文档助手?

想象这样一个场景:你的团队每天都会上传新的合同、FAQ、产品手册等文档到云存储。你希望有一个 AI 助手能够:
  1. 自动处理新文档:当有新文档上传时,自动解析并存入知识库
  2. 智能回答问题:当用户提问时,自动从知识库中找到相关内容并回答
  3. 自动判断:如果知识库中没有相关信息,自动触发文档解析;如果有,直接检索回答
这就是我们要构建的智能文档助手。

工作原理

整个系统的工作流程如下:
用户提问:"最新版本的新功能有哪些?"

[LangChain Agent] 分析问题

Agent 判断:需要先检索知识库

[Tool: vector_search] 在向量库中搜索

结果:没找到最新版本的信息

Agent 判断:需要解析新文档

[Tool: run_xparse_client] 解析文档并更新知识库

再次检索,找到相关内容

Agent 组织回答并返回给用户

环境准备

首先安装必要的依赖:
python -m venv .venv && source .venv/bin/activate
pip install "xparse-client>=0.2.10" langchain langchain-community langchain-core langchain-milvus \
            pymilvus python-dotenv dashscope
创建 .env 文件存储配置:
# .env
X_TI_APP_ID=your-app-id
X_TI_SECRET_CODE=your-secret-code
MILVUS_DB_PATH=./agent_vectors.db
DASHSCOPE_API_KEY==your-dashscope-key
提示:X_TI_APP_IDX_TI_SECRET_CODE 参考 API Key,请登录 Textin 工作台 获取。示例中使用 通义千问 的大模型能力,其他模型用法类似。

完整代码示例

import os
from dotenv import load_dotenv
from xparse_client import create_pipeline_from_config
from langchain_core.tools import Tool
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings
from langchain.agents import create_agent
from langchain_core.messages import HumanMessage
from langchain_community.chat_models import ChatTongyi

# 加载环境变量
load_dotenv()

# ========== Step 1: 初始化 xParse Pipeline ==========

PIPELINE_CONFIG = {
    "source": {
        "type": "local",  # 本地文件系统,也可以改为 "s3"
        "directory": "/your/doc/folder",  # 文档存放目录
        "pattern": ["*.pdf", "*.docx"]  # 支持的文件类型
    },
    "destination": {
        "type": "milvus",  # 向量数据库
        "db_path": os.getenv("MILVUS_DB_PATH"),
        "collection_name": "agent_docs",
        "dimension": 1024
    },
    "api_base_url": "https://api.textin.com/api/xparse",
    "api_headers": {
        "x-ti-app-id": os.getenv("X_TI_APP_ID"),
        "x-ti-secret-code": os.getenv("X_TI_SECRET_CODE")
    },
    "stages": [
        {
            "type": "parse",
            "config": {
                "provider": "textin"  # 文档解析引擎
            }
        },
        {
            "type": "chunk",
            "config": {
                "strategy": "by_title",  # 按标题分块,保持章节完整性
                "new_after_n_chars": 480,
                "max_characters": 1500,
                "overlap": 80  # 块之间重叠80字符,避免信息丢失
            }
        },
        {
            "type": "embed",
            "config": {
                "provider": "qwen",
                "model_name": "text-embedding-v4"  # 向量化模型
            }
        }
    ]
}

# 初始化 Pipeline(全局复用,避免重复创建)
pipeline = create_pipeline_from_config(PIPELINE_CONFIG)

def process_single_file(file_path: str) -> str:
    """处理单个文件并返回结果描述"""
    try:
        # 从 source 读取文件
        file_bytes = pipeline.source.read_file(file_path)
        # 处理文件
        success = pipeline.process_file(file_bytes, file_path)
        if success:
            return f"✅ 成功处理文件 {file_path} 并已存入知识库。"
        else:
            return f"❌ 处理文件 {file_path} 失败。"
    except Exception as e:
        return f"❌ 处理文件 {file_path} 时出错:{str(e)}"

def run_full_pipeline() -> str:
    """运行完整 Pipeline,处理 source 目录中的所有文件"""
    try:
        pipeline.run()
        return f"✅ 已处理所有文件并已存入知识库。"
    except Exception as e:
        return f"❌ 运行 Pipeline 时出错:{str(e)}"

# ========== Step 2: 初始化向量数据库 ==========

# 使用与 Pipeline 相同的 embedding 配置,保证语义空间一致
embedding = DashScopeEmbeddings(model="text-embedding-v4")

vector_store = Milvus(
    embedding_function=embedding,
    collection_name="agent_docs",
    connection_args={"uri": os.getenv("MILVUS_DB_PATH")},
    vector_field="embeddings",  # 使用 embeddings 字段存储向量(与 xparse_client 保持一致)
    primary_field="element_id",  # 使用 element_id 作为主键(与 xparse_client 保持一致)
    text_field="text",  # 使用 text 字段存储文本内容
    enable_dynamic_field=True  # 启用动态字段支持,这样才能返回所有 metadata 字段
)

# ========== Step 3: 构建 LangChain Tools ==========

def pipeline_tool_fn(doc_hint: str) -> str:
    """
    文档处理工具:根据输入决定处理单个文件还是整个目录
    
    输入示例:
    - "处理 contracts/2025Q1/contract.pdf" -> 处理单个文件
    - "更新所有文档" 或 "同步文档库" -> 处理整个目录
    """
    # 如果输入包含路径分隔符,认为是文件路径
    if doc_hint and ("/" in doc_hint or "\\" in doc_hint):
        # 提取文件路径(简单处理,实际可以更智能)
        file_path = doc_hint.strip()
        return process_single_file(file_path)
    else:
        # 否则处理整个目录
        return run_full_pipeline()

def search_tool_fn(query: str) -> str:
    """
    向量检索工具:在知识库中搜索相关内容
    
    返回格式化的检索结果,包含文档来源和内容
    """
    docs = vector_store.similarity_search(query, k=4)
    if not docs:
        return "❌ 在知识库中未找到相关内容。建议先运行文档解析工具更新知识库。"
    
    results = []
    for i, doc in enumerate(docs, 1):
        filename = doc.metadata.get('filename', '未知文件')
        page_num = doc.metadata.get('page_number', '?')
        content = doc.page_content[:500]  # 限制长度
        results.append(f"[{i}] 来源:{filename} (第{page_num}页)\n内容:{content}...")
    
    return "\n\n".join(results)

# 定义工具列表
tools = [
    Tool(
        name="run_xparse_client",
        description="当需要解析新文档或更新知识库时使用此工具。输入可以是文件路径(如 'contracts/doc.pdf')或更新指令(如 '更新所有文档')。",
        func=pipeline_tool_fn
    ),
    Tool(
        name="vector_search",
        description="当需要基于知识库内容回答问题时使用此工具。输入是用户的自然语言问题,工具会在知识库中搜索相关内容。",
        func=search_tool_fn
    )
]

# ========== Step 4: 初始化 Agent ==========

llm = ChatTongyi(
    model="qwen-max",
    top_p=0.8,
    dashscope_api_key=os.getenv("DASHSCOPE_API_KEY")
)

agent = create_agent(
    model=llm,
    tools=tools,
    debug=True  # 显示 Agent 的思考过程
)

# ========== Step 5: 使用示例 ==========

if __name__ == "__main__":
    # 示例 1: 用户提问,Agent 会自动检索知识库
    print("=" * 60)
    print("示例 1: 用户提问")
    print("=" * 60)
    response = agent.invoke({
        "messages": [HumanMessage(content="如何安装milvus?")]
    })
    print(response["messages"][-1].content)
    print()
    
    # 示例 2: 用户要求更新文档,Agent 会调用解析工具
    print("=" * 60)
    print("示例 2: 更新文档库")
    print("=" * 60)
    response = agent.invoke({
        "messages": [HumanMessage(content="请更新所有文档到知识库")]
    })
    print(response["messages"][-1].content)
    print()
    
    # 示例 3: 用户提问但知识库中没有,Agent 会先解析再检索
    print("=" * 60)
    print("示例 3: 智能判断")
    print("=" * 60)
    response = agent.invoke({
        "messages": [HumanMessage(content="最新版本的新功能有哪些?如果没有相关信息,请先解析 Milvus_DEVELOPMENT.pdf")]
    })
    print(response["messages"][-1].content)

代码说明

Step 1: Pipeline 初始化

create_pipeline_from_config 用于初始化 Pipeline,它会:
  • 配置数据源(从哪里读取文档)
  • 配置目标存储(解析后的数据存到哪里)
  • 配置处理流程(解析 → 分块 → 向量化)
重要:Pipeline 只需要初始化一次,可以在全局复用。

Step 2: 向量数据库

向量数据库用于存储文档的向量表示,支持语义搜索。关键点:必须使用与 Pipeline 相同的 embedding 模型,否则语义空间不一致,检索效果会变差。

Step 3: LangChain Tools

Tools 是 Agent 可以调用的函数。我们定义了两个工具:
  1. run_xparse_client:处理文档的工具
    • 如果输入是文件路径,处理单个文件
    • 如果输入是更新指令,处理整个目录
  2. vector_search:检索知识库的工具
    • 根据用户问题在向量库中搜索相关内容
    • 返回格式化的结果,包含文档来源

Step 4: Agent 初始化

Agent 是”大脑”,它会:
  • 理解用户的问题
  • 决定调用哪个工具
  • 根据工具返回结果组织最终回答

Step 5: 使用

Agent 会自动判断:
  • 用户提问 → 先检索知识库
  • 知识库没有答案 → 调用解析工具更新知识库,再检索
  • 用户要求更新 → 直接调用解析工具

实际应用场景

场景 1: 客服助手

需求:客服团队经常收到产品相关问题,需要快速从 FAQ 和产品手册中找到答案。 实现
  • 将 FAQ 和产品手册放在 ./documents/faqs/ 目录
  • 用户提问时,Agent 自动检索并回答
  • 有新版本文档时,Agent 自动更新知识库

场景 2: 合同管理

需求:法务团队需要快速查找合同中的特定条款。 实现
  • 将合同文档放在 ./documents/contracts/ 目录
  • 使用 by_title 分块策略,保持章节完整性
  • 用户提问”违约条款”,Agent 自动检索相关章节

场景 3: 知识库维护

需求:定期更新知识库,确保信息是最新的。 实现
  • 设置定时任务,定期调用 run_full_pipeline()
  • 或者通过 Agent 接口,用户说”更新文档库”,Agent 自动处理

进阶优化

1. 添加对话历史

让 Agent 记住之前的对话:
from collections import defaultdict
from langchain_core.messages import HumanMessage, AIMessage

# 存储每个会话的历史
chat_history = defaultdict(list)

def chat_with_agent(query: str, session_id: str = "default"):
    # 构建消息列表:先添加历史消息,再添加当前用户消息
    messages = chat_history[session_id].copy()
    messages.append(HumanMessage(content=query))
    
    response = agent.invoke({
        "messages": messages
    })
    
    # 保存对话历史:添加用户消息和 AI 回复
    chat_history[session_id].append(HumanMessage(content=query))
    # 从响应中获取最后一条消息(AI 的回复)
    ai_response = response["messages"][-1].content
    chat_history[session_id].append(AIMessage(content=ai_response))
    
    return ai_response

2. 添加限流保护

避免频繁调用 Pipeline:
import time
from datetime import datetime, timedelta

last_run_time = None
MIN_INTERVAL = timedelta(minutes=10)  # 最小间隔10分钟

def pipeline_tool_fn(doc_hint: str) -> str:
    global last_run_time
    
    # 检查是否在冷却期
    if last_run_time and datetime.now() - last_run_time < MIN_INTERVAL:
        return "⚠️ Pipeline 刚刚运行过,请稍后再试(建议间隔10分钟以上)。"
    
    # 执行处理
    result = process_single_file(doc_hint) if "/" in doc_hint else run_full_pipeline()
    last_run_time = datetime.now()
    return result

3. 添加引用来源

在回答中标注信息来源,务必在 Milvus 连接中开启 enable_dynamic_field 属性:
def search_tool_fn(query: str) -> str:
    docs = vector_store.similarity_search(query, k=4)
    if not docs:
        return "❌ 在知识库中未找到相关内容。"
    
    results = []
    sources = []  # 收集来源信息
    for i, doc in enumerate(docs, 1):
        filename = doc.metadata.get('filename', '未知文件')
        page_num = doc.metadata.get('page_number', '?')
        sources.append(f"{filename}#page{page_num}")
        results.append(f"[{i}] {filename} (第{page_num}页)\n{doc.page_content[:500]}...")
    
    # 在结果末尾添加来源列表
    results.append(f"\n📚 参考来源:{', '.join(sources)}")
    return "\n\n".join(results)

常见问题

Q: Pipeline 处理很慢,会影响用户体验吗? A: 是的,如果文档很大,Pipeline 可能需要几分钟。建议:
  • 对于大文档,使用异步处理,先返回”任务已提交”
  • 或者限制单次处理的文件数量
Q: 如何让 Agent 只检索,不自动触发解析? A: 修改 Tool 的 description,明确说明使用场景,或者添加一个开关参数。 Q: 向量数据库中的数据会过期吗? A: 不会自动过期。如果需要更新,需要重新运行 Pipeline,新数据会覆盖旧数据(取决于你的 destination 配置)。 Q: 可以使用其他 LLM 吗? A: 可以。LangChain 支持多种 LLM,只需替换 ChatTongyi(通义千问) 为对应的类,如 ChatOpenAI(OpenAI)、ChatZhipuAI(智谱AI)等。 Q: 如何实现增量处理? A: 当前 xparse-client 仅支持全量处理或单文档处理,需要自行实现增量逻辑。

总结

通过本教程,你已经学会了如何构建一个智能文档助手。核心思路是:
  1. Pipeline 负责解析:将文档解析、分块、向量化并存入数据库
  2. Agent 负责决策:根据用户问题,决定调用哪个工具
  3. Tools 负责执行:具体的文档处理和检索操作
这样,你就有了一个”能自己跑文档”的 AI 助手!