import os
import glob
from dotenv import load_dotenv
from xparse_client import XParseClient
from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter
from langchain_core.tools import Tool
from langchain_core.documents import Document
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings
from langchain.agents import initialize_agent, AgentType
from langchain_community.chat_models import ChatTongyi
# 加载环境变量
load_dotenv()
# ========== Step 1: 初始化 xParse SDK 客户端 ==========
DOCS_DIR = "/your/doc/folder"
client = XParseClient()
headers_to_split_on = [("#", "header1"), ("##", "header2"), ("###", "header3")]
markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=80)
def process_single_file(file_path: str) -> str:
"""处理单个文件并存入知识库"""
try:
with open(file_path, "rb") as f:
result = client.parse.run(file=f, filename=os.path.basename(file_path))
md_docs = markdown_splitter.split_text(result.markdown)
for doc in md_docs:
doc.metadata["filename"] = os.path.basename(file_path)
chunks = text_splitter.split_documents(md_docs)
embedding = DashScopeEmbeddings(model="text-embedding-v4")
Milvus.from_documents(
documents=chunks,
embedding=embedding,
collection_name="agent_docs",
connection_args={"uri": os.getenv("MILVUS_DB_PATH")},
)
return f"✅ 成功处理文件 {file_path} 并已存入知识库。"
except Exception as e:
return f"❌ 处理文件 {file_path} 时出错:{str(e)}"
def build_knowledge_base() -> str:
"""解析目录中的所有文档并构建知识库"""
try:
all_chunks = []
for file_path in glob.glob(os.path.join(DOCS_DIR, "*")):
if not os.path.isfile(file_path):
continue
with open(file_path, "rb") as f:
result = client.parse.run(file=f, filename=os.path.basename(file_path))
md_docs = markdown_splitter.split_text(result.markdown)
for doc in md_docs:
doc.metadata["filename"] = os.path.basename(file_path)
chunks = text_splitter.split_documents(md_docs)
all_chunks.extend(chunks)
embedding = DashScopeEmbeddings(model="text-embedding-v4")
Milvus.from_documents(
documents=all_chunks,
embedding=embedding,
collection_name="agent_docs",
connection_args={"uri": os.getenv("MILVUS_DB_PATH")},
)
return f"✅ 已处理所有文件并已存入知识库。"
except Exception as e:
return f"❌ 构建知识库时出错:{str(e)}"
# ========== Step 2: 初始化向量数据库 ==========
embedding = DashScopeEmbeddings(model="text-embedding-v4")
vector_store = Milvus(
embedding_function=embedding,
collection_name="agent_docs",
connection_args={"uri": os.getenv("MILVUS_DB_PATH")},
)
# ========== Step 3: 构建 LangChain Tools ==========
def pipeline_tool_fn(doc_hint: str) -> str:
"""
文档处理工具:根据输入决定处理单个文件还是整个目录
输入示例:
- "处理 contracts/2025Q1/contract.pdf" -> 处理单个文件
- "更新所有文档" 或 "同步文档库" -> 处理整个目录
"""
if doc_hint and ("/" in doc_hint or "\\" in doc_hint):
file_path = doc_hint.strip()
return process_single_file(file_path)
else:
return build_knowledge_base()
def search_tool_fn(query: str) -> str:
"""
向量检索工具:在知识库中搜索相关内容
返回格式化的检索结果,包含文档来源和内容
"""
docs = vector_store.similarity_search(query, k=4)
if not docs:
return "❌ 在知识库中未找到相关内容。建议先运行文档解析工具更新知识库。"
results = []
for i, doc in enumerate(docs, 1):
filename = doc.metadata.get('filename', '未知文件')
header1 = doc.metadata.get('header1', '')
header2 = doc.metadata.get('header2', '')
section = f" > {header1}" if header1 else ""
section += f" > {header2}" if header2 else ""
content = doc.page_content[:500]
results.append(f"[{i}] 来源:{filename}{section}\n内容:{content}...")
return "\n\n".join(results)
# 定义工具列表
tools = [
Tool(
name="run_xparse_client",
description="当需要解析新文档或更新知识库时使用此工具。输入可以是文件路径(如 'contracts/doc.pdf')或更新指令(如 '更新所有文档')。",
func=pipeline_tool_fn
),
Tool(
name="vector_search",
description="当需要基于知识库内容回答问题时使用此工具。输入是用户的自然语言问题,工具会在知识库中搜索相关内容。",
func=search_tool_fn
)
]
# ========== Step 4: 初始化 Agent ==========
llm = ChatTongyi(
model="qwen-max",
top_p=0.8,
dashscope_api_key=os.getenv("DASHSCOPE_API_KEY")
)
agent = initialize_agent(
tools=tools,
llm=llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True, # 显示 Agent 的思考过程
)
# ========== Step 5: 使用示例 ==========
if __name__ == "__main__":
# 示例 1: 用户提问,Agent 会自动检索知识库
print("=" * 60)
print("示例 1: 用户提问")
print("=" * 60)
response = agent.invoke({
"input": "如何安装milvus?"
})
print(response["output"])
print()
# 示例 2: 用户要求更新文档,Agent 会调用解析工具
print("=" * 60)
print("示例 2: 更新文档库")
print("=" * 60)
response = agent.invoke({
"input": "请更新所有文档到知识库"
})
print(response["output"])
print()
# 示例 3: 用户提问但知识库中没有,Agent 会先解析再检索
print("=" * 60)
print("示例 3: 智能判断")
print("=" * 60)
response = agent.invoke({
"input": "最新版本的新功能有哪些?如果没有相关信息,请先解析 Milvus_DEVELOPMENT.pdf"
})
print(response["output"])