Documentation Index
Fetch the complete documentation index at: https://docs.textin.com/llms.txt
Use this file to discover all available pages before exploring further.
本教程将展示如何构建一个智能文档助手,它能够:
- 自动解析新上传的文档并更新知识库
- 根据用户问题智能检索相关文档内容
- 自动决定何时需要解析新文档,何时直接检索回答
什么是智能文档助手?
想象这样一个场景:你的团队每天都会上传新的合同、FAQ、产品手册等文档到云存储。你希望有一个 AI 助手能够:
- 自动处理新文档:当有新文档上传时,自动解析并存入知识库
- 智能回答问题:当用户提问时,自动从知识库中找到相关内容并回答
- 自动判断:如果知识库中没有相关信息,自动触发文档解析;如果有,直接检索回答
这就是我们要构建的智能文档助手。
工作原理
整个系统的工作流程如下:
用户提问:"最新版本的新功能有哪些?"
↓
[LangChain Agent] 分析问题
↓
Agent 判断:需要先检索知识库
↓
[Tool: vector_search] 在向量库中搜索
↓
结果:没找到最新版本的信息
↓
Agent 判断:需要解析新文档
↓
[Tool: run_xparse_client] 解析文档并更新知识库
↓
再次检索,找到相关内容
↓
Agent 组织回答并返回给用户
环境准备
首先安装必要的依赖:
python -m venv .venv && source .venv/bin/activate
pip install "xparse-client>=0.2.10" langchain langchain-community langchain-core langchain-milvus \
pymilvus python-dotenv dashscope
创建 .env 文件存储配置:
# .env
X_TI_APP_ID=your-app-id
X_TI_SECRET_CODE=your-secret-code
MILVUS_DB_PATH=./agent_vectors.db
DASHSCOPE_API_KEY==your-dashscope-key
提示:X_TI_APP_ID 与 X_TI_SECRET_CODE 参考 API Key,请登录 Textin 工作台 获取。示例中使用 通义千问 的大模型能力,其他模型用法类似。
完整代码示例
import os
from dotenv import load_dotenv
from xparse_client import create_pipeline_from_config
from langchain_core.tools import Tool
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings
from langchain.agents import create_agent
from langchain_core.messages import HumanMessage
from langchain_community.chat_models import ChatTongyi
# 加载环境变量
load_dotenv()
# ========== Step 1: 初始化 xParse Pipeline ==========
PIPELINE_CONFIG = {
"source": {
"type": "local", # 本地文件系统,也可以改为 "s3"
"directory": "/your/doc/folder", # 文档存放目录
"pattern": ["*.pdf", "*.docx"] # 支持的文件类型
},
"destination": {
"type": "milvus", # 向量数据库
"db_path": os.getenv("MILVUS_DB_PATH"),
"collection_name": "agent_docs",
"dimension": 1024
},
"api_base_url": "https://api.textin.com/api/xparse",
"api_headers": {
"x-ti-app-id": os.getenv("X_TI_APP_ID"),
"x-ti-secret-code": os.getenv("X_TI_SECRET_CODE")
},
"stages": [
{
"type": "parse",
"config": {
"provider": "textin" # 文档解析引擎
}
},
{
"type": "chunk",
"config": {
"strategy": "by_title", # 按标题分块,保持章节完整性
"new_after_n_chars": 480,
"max_characters": 1500,
"overlap": 80 # 块之间重叠80字符,避免信息丢失
}
},
{
"type": "embed",
"config": {
"provider": "qwen",
"model_name": "text-embedding-v4" # 向量化模型
}
}
]
}
# 初始化 Pipeline(全局复用,避免重复创建)
pipeline = create_pipeline_from_config(PIPELINE_CONFIG)
def process_single_file(file_path: str) -> str:
"""处理单个文件并返回结果描述"""
try:
# 从 source 读取文件
file_bytes = pipeline.source.read_file(file_path)
# 处理文件
success = pipeline.process_file(file_bytes, file_path)
if success:
return f"✅ 成功处理文件 {file_path} 并已存入知识库。"
else:
return f"❌ 处理文件 {file_path} 失败。"
except Exception as e:
return f"❌ 处理文件 {file_path} 时出错:{str(e)}"
def run_full_pipeline() -> str:
"""运行完整 Pipeline,处理 source 目录中的所有文件"""
try:
pipeline.run()
return f"✅ 已处理所有文件并已存入知识库。"
except Exception as e:
return f"❌ 运行 Pipeline 时出错:{str(e)}"
# ========== Step 2: 初始化向量数据库 ==========
# 使用与 Pipeline 相同的 embedding 配置,保证语义空间一致
embedding = DashScopeEmbeddings(model="text-embedding-v4")
vector_store = Milvus(
embedding_function=embedding,
collection_name="agent_docs",
connection_args={"uri": os.getenv("MILVUS_DB_PATH")},
vector_field="embeddings", # 使用 embeddings 字段存储向量(与 xparse_client 保持一致)
primary_field="element_id", # 使用 element_id 作为主键(与 xparse_client 保持一致)
text_field="text", # 使用 text 字段存储文本内容
enable_dynamic_field=True # 启用动态字段支持,这样才能返回所有 metadata 字段
)
# ========== Step 3: 构建 LangChain Tools ==========
def pipeline_tool_fn(doc_hint: str) -> str:
"""
文档处理工具:根据输入决定处理单个文件还是整个目录
输入示例:
- "处理 contracts/2025Q1/contract.pdf" -> 处理单个文件
- "更新所有文档" 或 "同步文档库" -> 处理整个目录
"""
# 如果输入包含路径分隔符,认为是文件路径
if doc_hint and ("/" in doc_hint or "\\" in doc_hint):
# 提取文件路径(简单处理,实际可以更智能)
file_path = doc_hint.strip()
return process_single_file(file_path)
else:
# 否则处理整个目录
return run_full_pipeline()
def search_tool_fn(query: str) -> str:
"""
向量检索工具:在知识库中搜索相关内容
返回格式化的检索结果,包含文档来源和内容
"""
docs = vector_store.similarity_search(query, k=4)
if not docs:
return "❌ 在知识库中未找到相关内容。建议先运行文档解析工具更新知识库。"
results = []
for i, doc in enumerate(docs, 1):
filename = doc.metadata.get('filename', '未知文件')
page_num = doc.metadata.get('page_number', '?')
content = doc.page_content[:500] # 限制长度
results.append(f"[{i}] 来源:{filename} (第{page_num}页)\n内容:{content}...")
return "\n\n".join(results)
# 定义工具列表
tools = [
Tool(
name="run_xparse_client",
description="当需要解析新文档或更新知识库时使用此工具。输入可以是文件路径(如 'contracts/doc.pdf')或更新指令(如 '更新所有文档')。",
func=pipeline_tool_fn
),
Tool(
name="vector_search",
description="当需要基于知识库内容回答问题时使用此工具。输入是用户的自然语言问题,工具会在知识库中搜索相关内容。",
func=search_tool_fn
)
]
# ========== Step 4: 初始化 Agent ==========
llm = ChatTongyi(
model="qwen-max",
top_p=0.8,
dashscope_api_key=os.getenv("DASHSCOPE_API_KEY")
)
agent = create_agent(
model=llm,
tools=tools,
debug=True # 显示 Agent 的思考过程
)
# ========== Step 5: 使用示例 ==========
if __name__ == "__main__":
# 示例 1: 用户提问,Agent 会自动检索知识库
print("=" * 60)
print("示例 1: 用户提问")
print("=" * 60)
response = agent.invoke({
"messages": [HumanMessage(content="如何安装milvus?")]
})
print(response["messages"][-1].content)
print()
# 示例 2: 用户要求更新文档,Agent 会调用解析工具
print("=" * 60)
print("示例 2: 更新文档库")
print("=" * 60)
response = agent.invoke({
"messages": [HumanMessage(content="请更新所有文档到知识库")]
})
print(response["messages"][-1].content)
print()
# 示例 3: 用户提问但知识库中没有,Agent 会先解析再检索
print("=" * 60)
print("示例 3: 智能判断")
print("=" * 60)
response = agent.invoke({
"messages": [HumanMessage(content="最新版本的新功能有哪些?如果没有相关信息,请先解析 Milvus_DEVELOPMENT.pdf")]
})
print(response["messages"][-1].content)
代码说明
Step 1: Pipeline 初始化
create_pipeline_from_config 用于初始化 Pipeline,它会:
- 配置数据源(从哪里读取文档)
- 配置目标存储(解析后的数据存到哪里)
- 配置处理流程(解析 → 分块 → 向量化)
重要:Pipeline 只需要初始化一次,可以在全局复用。
Step 2: 向量数据库
向量数据库用于存储文档的向量表示,支持语义搜索。关键点:必须使用与 Pipeline 相同的 embedding 模型,否则语义空间不一致,检索效果会变差。
Tools 是 Agent 可以调用的函数。我们定义了两个工具:
-
run_xparse_client:处理文档的工具
- 如果输入是文件路径,处理单个文件
- 如果输入是更新指令,处理整个目录
-
vector_search:检索知识库的工具
- 根据用户问题在向量库中搜索相关内容
- 返回格式化的结果,包含文档来源
Step 4: Agent 初始化
Agent 是”大脑”,它会:
- 理解用户的问题
- 决定调用哪个工具
- 根据工具返回结果组织最终回答
Step 5: 使用
Agent 会自动判断:
- 用户提问 → 先检索知识库
- 知识库没有答案 → 调用解析工具更新知识库,再检索
- 用户要求更新 → 直接调用解析工具
实际应用场景
场景 1: 客服助手
需求:客服团队经常收到产品相关问题,需要快速从 FAQ 和产品手册中找到答案。
实现:
- 将 FAQ 和产品手册放在
./documents/faqs/ 目录
- 用户提问时,Agent 自动检索并回答
- 有新版本文档时,Agent 自动更新知识库
场景 2: 合同管理
需求:法务团队需要快速查找合同中的特定条款。
实现:
- 将合同文档放在
./documents/contracts/ 目录
- 使用
by_title 分块策略,保持章节完整性
- 用户提问”违约条款”,Agent 自动检索相关章节
场景 3: 知识库维护
需求:定期更新知识库,确保信息是最新的。
实现:
- 设置定时任务,定期调用
run_full_pipeline()
- 或者通过 Agent 接口,用户说”更新文档库”,Agent 自动处理
进阶优化
1. 添加对话历史
让 Agent 记住之前的对话:
from collections import defaultdict
from langchain_core.messages import HumanMessage, AIMessage
# 存储每个会话的历史
chat_history = defaultdict(list)
def chat_with_agent(query: str, session_id: str = "default"):
# 构建消息列表:先添加历史消息,再添加当前用户消息
messages = chat_history[session_id].copy()
messages.append(HumanMessage(content=query))
response = agent.invoke({
"messages": messages
})
# 保存对话历史:添加用户消息和 AI 回复
chat_history[session_id].append(HumanMessage(content=query))
# 从响应中获取最后一条消息(AI 的回复)
ai_response = response["messages"][-1].content
chat_history[session_id].append(AIMessage(content=ai_response))
return ai_response
2. 添加限流保护
避免频繁调用 Pipeline:
import time
from datetime import datetime, timedelta
last_run_time = None
MIN_INTERVAL = timedelta(minutes=10) # 最小间隔10分钟
def pipeline_tool_fn(doc_hint: str) -> str:
global last_run_time
# 检查是否在冷却期
if last_run_time and datetime.now() - last_run_time < MIN_INTERVAL:
return "⚠️ Pipeline 刚刚运行过,请稍后再试(建议间隔10分钟以上)。"
# 执行处理
result = process_single_file(doc_hint) if "/" in doc_hint else run_full_pipeline()
last_run_time = datetime.now()
return result
3. 添加引用来源
在回答中标注信息来源,务必在 Milvus 连接中开启 enable_dynamic_field 属性:
def search_tool_fn(query: str) -> str:
docs = vector_store.similarity_search(query, k=4)
if not docs:
return "❌ 在知识库中未找到相关内容。"
results = []
sources = [] # 收集来源信息
for i, doc in enumerate(docs, 1):
filename = doc.metadata.get('filename', '未知文件')
page_num = doc.metadata.get('page_number', '?')
sources.append(f"{filename}#page{page_num}")
results.append(f"[{i}] {filename} (第{page_num}页)\n{doc.page_content[:500]}...")
# 在结果末尾添加来源列表
results.append(f"\n📚 参考来源:{', '.join(sources)}")
return "\n\n".join(results)
常见问题
Q: Pipeline 处理很慢,会影响用户体验吗?
A: 是的,如果文档很大,Pipeline 可能需要几分钟。建议:
- 对于大文档,使用异步处理,先返回”任务已提交”
- 或者限制单次处理的文件数量
Q: 如何让 Agent 只检索,不自动触发解析?
A: 修改 Tool 的 description,明确说明使用场景,或者添加一个开关参数。
Q: 向量数据库中的数据会过期吗?
A: 不会自动过期。如果需要更新,需要重新运行 Pipeline,新数据会覆盖旧数据(取决于你的 destination 配置)。
Q: 可以使用其他 LLM 吗?
A: 可以。LangChain 支持多种 LLM,只需替换 ChatTongyi(通义千问) 为对应的类,如 ChatOpenAI(OpenAI)、ChatZhipuAI(智谱AI)等。
Q: 如何实现增量处理?
A: 当前 xparse-client 仅支持全量处理或单文档处理,需要自行实现增量逻辑。
通过本教程,你已经学会了如何构建一个智能文档助手。核心思路是:
- Pipeline 负责解析:将文档解析、分块、向量化并存入数据库
- Agent 负责决策:根据用户问题,决定调用哪个工具
- Tools 负责执行:具体的文档处理和检索操作
这样,你就有了一个”能自己跑文档”的 AI 助手!