import os
from dotenv import load_dotenv
from xparse_client import create_pipeline_from_config
from langchain_core.tools import Tool
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings
from langchain.agents import create_agent
from langchain_core.messages import HumanMessage
from langchain_community.chat_models import ChatTongyi
# 加载环境变量
load_dotenv()
# ========== Step 1: 初始化 xParse Pipeline ==========
PIPELINE_CONFIG = {
"source": {
"type": "local", # 本地文件系统,也可以改为 "s3"
"directory": "/your/doc/folder", # 文档存放目录
"pattern": ["*.pdf", "*.docx"] # 支持的文件类型
},
"destination": {
"type": "milvus", # 向量数据库
"db_path": os.getenv("MILVUS_DB_PATH"),
"collection_name": "agent_docs",
"dimension": 1024
},
"api_base_url": "https://api.textin.com/api/xparse",
"api_headers": {
"x-ti-app-id": os.getenv("X_TI_APP_ID"),
"x-ti-secret-code": os.getenv("X_TI_SECRET_CODE")
},
"stages": [
{
"type": "parse",
"config": {
"provider": "textin" # 文档解析引擎
}
},
{
"type": "chunk",
"config": {
"strategy": "by_title", # 按标题分块,保持章节完整性
"new_after_n_chars": 480,
"max_characters": 1500,
"overlap": 80 # 块之间重叠80字符,避免信息丢失
}
},
{
"type": "embed",
"config": {
"provider": "qwen",
"model_name": "text-embedding-v4" # 向量化模型
}
}
]
}
# 初始化 Pipeline(全局复用,避免重复创建)
pipeline = create_pipeline_from_config(PIPELINE_CONFIG)
def process_single_file(file_path: str) -> str:
"""处理单个文件并返回结果描述"""
try:
# 从 source 读取文件
file_bytes = pipeline.source.read_file(file_path)
# 处理文件
success = pipeline.process_file(file_bytes, file_path)
if success:
return f"✅ 成功处理文件 {file_path} 并已存入知识库。"
else:
return f"❌ 处理文件 {file_path} 失败。"
except Exception as e:
return f"❌ 处理文件 {file_path} 时出错:{str(e)}"
def run_full_pipeline() -> str:
"""运行完整 Pipeline,处理 source 目录中的所有文件"""
try:
pipeline.run()
return f"✅ 已处理所有文件并已存入知识库。"
except Exception as e:
return f"❌ 运行 Pipeline 时出错:{str(e)}"
# ========== Step 2: 初始化向量数据库 ==========
# 使用与 Pipeline 相同的 embedding 配置,保证语义空间一致
embedding = DashScopeEmbeddings(model="text-embedding-v4")
vector_store = Milvus(
embedding_function=embedding,
collection_name="agent_docs",
connection_args={"uri": os.getenv("MILVUS_DB_PATH")},
vector_field="embeddings", # 使用 embeddings 字段存储向量(与 xparse_client 保持一致)
primary_field="element_id", # 使用 element_id 作为主键(与 xparse_client 保持一致)
text_field="text", # 使用 text 字段存储文本内容
enable_dynamic_field=True # 启用动态字段支持,这样才能返回所有 metadata 字段
)
# ========== Step 3: 构建 LangChain Tools ==========
def pipeline_tool_fn(doc_hint: str) -> str:
"""
文档处理工具:根据输入决定处理单个文件还是整个目录
输入示例:
- "处理 contracts/2025Q1/contract.pdf" -> 处理单个文件
- "更新所有文档" 或 "同步文档库" -> 处理整个目录
"""
# 如果输入包含路径分隔符,认为是文件路径
if doc_hint and ("/" in doc_hint or "\\" in doc_hint):
# 提取文件路径(简单处理,实际可以更智能)
file_path = doc_hint.strip()
return process_single_file(file_path)
else:
# 否则处理整个目录
return run_full_pipeline()
def search_tool_fn(query: str) -> str:
"""
向量检索工具:在知识库中搜索相关内容
返回格式化的检索结果,包含文档来源和内容
"""
docs = vector_store.similarity_search(query, k=4)
if not docs:
return "❌ 在知识库中未找到相关内容。建议先运行文档解析工具更新知识库。"
results = []
for i, doc in enumerate(docs, 1):
filename = doc.metadata.get('filename', '未知文件')
page_num = doc.metadata.get('page_number', '?')
content = doc.page_content[:500] # 限制长度
results.append(f"[{i}] 来源:{filename} (第{page_num}页)\n内容:{content}...")
return "\n\n".join(results)
# 定义工具列表
tools = [
Tool(
name="run_xparse_client",
description="当需要解析新文档或更新知识库时使用此工具。输入可以是文件路径(如 'contracts/doc.pdf')或更新指令(如 '更新所有文档')。",
func=pipeline_tool_fn
),
Tool(
name="vector_search",
description="当需要基于知识库内容回答问题时使用此工具。输入是用户的自然语言问题,工具会在知识库中搜索相关内容。",
func=search_tool_fn
)
]
# ========== Step 4: 初始化 Agent ==========
llm = ChatTongyi(
model="qwen-max",
top_p=0.8,
dashscope_api_key=os.getenv("DASHSCOPE_API_KEY")
)
agent = create_agent(
model=llm,
tools=tools,
debug=True # 显示 Agent 的思考过程
)
# ========== Step 5: 使用示例 ==========
if __name__ == "__main__":
# 示例 1: 用户提问,Agent 会自动检索知识库
print("=" * 60)
print("示例 1: 用户提问")
print("=" * 60)
response = agent.invoke({
"messages": [HumanMessage(content="如何安装milvus?")]
})
print(response["messages"][-1].content)
print()
# 示例 2: 用户要求更新文档,Agent 会调用解析工具
print("=" * 60)
print("示例 2: 更新文档库")
print("=" * 60)
response = agent.invoke({
"messages": [HumanMessage(content="请更新所有文档到知识库")]
})
print(response["messages"][-1].content)
print()
# 示例 3: 用户提问但知识库中没有,Agent 会先解析再检索
print("=" * 60)
print("示例 3: 智能判断")
print("=" * 60)
response = agent.invoke({
"messages": [HumanMessage(content="最新版本的新功能有哪些?如果没有相关信息,请先解析 Milvus_DEVELOPMENT.pdf")]
})
print(response["messages"][-1].content)