import os
import json
import glob
import base64
import requests
from dotenv import load_dotenv
from xparse_client import XParseClient
from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter
from langchain_core.tools import Tool
from langchain.agents import initialize_agent, AgentType
from langchain_core.messages import HumanMessage
from langchain_community.chat_models import ChatTongyi
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings
load_dotenv()
# ========== Step 1: 初始化 xParse SDK 并处理文档 ==========
client = XParseClient()
def process_documents() -> str:
"""处理医疗文档"""
try:
docs_dir = "/your/medical/documents/folder"
headers_to_split_on = [("#", "header1"), ("##", "header2"), ("###", "header3")]
markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1536, chunk_overlap=100)
all_chunks = []
patterns = ["*.pdf", "*.png", "*.jpg", "*.jpeg", "*.docx"]
for pattern in patterns:
for file_path in glob.glob(os.path.join(docs_dir, pattern)):
with open(file_path, "rb") as f:
result = client.parse.run(file=f, filename=os.path.basename(file_path))
md_docs = markdown_splitter.split_text(result.markdown)
for doc in md_docs:
doc.metadata["filename"] = os.path.basename(file_path)
chunks = text_splitter.split_documents(md_docs)
all_chunks.extend(chunks)
embedding = DashScopeEmbeddings(model="text-embedding-v4")
Milvus.from_documents(
documents=all_chunks,
embedding=embedding,
collection_name="medical_documents",
connection_args={"uri": os.getenv("MILVUS_DB_PATH")},
)
return "✅ 已处理所有医疗文档,解析结果已存入向量数据库。"
except Exception as e:
return f"❌ 处理文档时出错:{str(e)}"
def process_single_file(file_path: str) -> str:
"""处理单个文件"""
try:
with open(file_path, "rb") as f:
result = client.parse.run(file=f, filename=os.path.basename(file_path))
headers_to_split_on = [("#", "header1"), ("##", "header2"), ("###", "header3")]
markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1536, chunk_overlap=100)
md_docs = markdown_splitter.split_text(result.markdown)
for doc in md_docs:
doc.metadata["filename"] = os.path.basename(file_path)
chunks = text_splitter.split_documents(md_docs)
embedding = DashScopeEmbeddings(model="text-embedding-v4")
Milvus.from_documents(
documents=chunks,
embedding=embedding,
collection_name="medical_documents",
connection_args={"uri": os.getenv("MILVUS_DB_PATH")},
)
return f"✅ 成功处理文件 {file_path},解析结果已存入向量数据库。"
except Exception as e:
return f"❌ 处理文件 {file_path} 时出错:{str(e)}"
# ========== Step 2: 初始化向量数据库 ==========
embedding = DashScopeEmbeddings(model="text-embedding-v4")
vector_store = Milvus(
embedding_function=embedding,
collection_name="medical_documents",
connection_args={"uri": os.getenv("MILVUS_DB_PATH")},
)
# ========== Step 3: 初始化大模型 ==========
llm = ChatTongyi(
model="qwen-max",
top_p=0.8,
dashscope_api_key=os.getenv("DASHSCOPE_API_KEY")
)
# ========== Step 4: 构建 LangChain Tools ==========
EXTRACT_API_URL = "https://api.textin.com/ai/service/v3/entity_extraction"
def extract_from_file(file_path: str, schema: dict, generate_citations: bool = False) -> dict:
with open(file_path, "rb") as f:
file_base64 = base64.b64encode(f.read()).decode("utf-8")
payload = {
"file": {"file_base64": file_base64, "file_name": os.path.basename(file_path)},
"schema": schema,
"extract_options": {"generate_citations": generate_citations}
}
headers = {
"x-ti-app-id": os.getenv("TEXTIN_APP_ID"),
"x-ti-secret-code": os.getenv("TEXTIN_SECRET_CODE"),
"Content-Type": "application/json"
}
response = requests.post(EXTRACT_API_URL, json=payload, headers=headers)
result = response.json()
if result.get("code") != 200:
raise Exception(f"Extract API 错误: {result.get('message', '未知错误')}")
return result["result"]
MEDICAL_SCHEMA = {
"type": "object",
"properties": {
"基本信息": {
"type": "object",
"description": "患者基本信息",
"properties": {
"年龄": {"type": ["string", "null"], "description": "患者年龄"},
"性别": {"type": ["string", "null"], "description": "患者性别"},
"就诊日期": {"type": ["string", "null"], "description": "就诊日期"}
}
},
"症状": {
"type": "array",
"description": "患者症状列表(主诉、现病史中的症状描述)",
"items": {"type": "string"}
},
"诊断": {
"type": "array",
"description": "诊断结果列表(初步诊断、最终诊断、临床诊断)",
"items": {"type": "string"}
},
"用药": {
"type": "array",
"description": "用药信息列表",
"items": {
"type": "object",
"properties": {
"药物名称": {"type": ["string", "null"], "description": "药物名称"},
"剂量": {"type": ["string", "null"], "description": "剂量"},
"用法": {"type": ["string", "null"], "description": "用法用量"}
},
"required": ["药物名称"]
}
},
"检查结果": {
"type": "array",
"description": "检查检验结果列表",
"items": {
"type": "object",
"properties": {
"检查项目": {"type": ["string", "null"], "description": "检查项目名称"},
"结果": {"type": ["string", "null"], "description": "检查结果"}
},
"required": ["检查项目"]
}
}
},
"required": ["基本信息", "症状", "诊断", "用药", "检查结果"]
}
def extract_medical_info(query: str) -> str:
"""从医疗文档中提取关键医疗信息"""
docs_dir = "/your/medical/documents/folder"
if "文件:" in query:
filename = query.split("文件:")[-1].strip()
file_path = os.path.join(docs_dir, filename)
else:
files = []
for pattern in ["*.pdf", "*.png", "*.jpg", "*.jpeg", "*.docx"]:
files.extend(glob.glob(os.path.join(docs_dir, pattern)))
if not files:
return "❌ 未找到医疗文档,请确认文档目录。"
file_path = files[0]
if not os.path.exists(file_path):
return f"❌ 文件不存在: {file_path}"
try:
result = extract_from_file(file_path, MEDICAL_SCHEMA, generate_citations=True)
extracted = result["extracted_schema"]
citations = result.get("citations", {})
extracted["来源文件"] = os.path.basename(file_path)
return json.dumps(extracted, ensure_ascii=False, indent=2)
except Exception as e:
return f"❌ 提取医疗信息时出错:{str(e)}"
def search_similar_cases(query: str) -> str:
"""
检索相似病例
基于症状、诊断等信息,使用向量检索找到语义相似的病例
"""
docs = vector_store.similarity_search(query, k=5)
if not docs:
return "❌ 未找到相似病例,请先运行文档处理。"
results = []
for i, doc in enumerate(docs, 1):
text = doc.page_content
metadata = doc.metadata
filename = metadata.get('filename', 'unknown')
page_num = metadata.get('page_number', '?')
prompt = f"""请从以下病例文本中提取关键信息:
病例文本:
{text}
请返回JSON格式:
{{
"diagnosis": "诊断信息",
"symptoms": "症状信息",
"summary": "病例摘要(100字以内)"
}}
只返回JSON,不要其他文字。"""
try:
response = llm.invoke([HumanMessage(content=prompt)])
case_info = json.loads(response.content)
except:
case_info = {"diagnosis": "未提取", "symptoms": "未提取", "summary": text[:100]}
results.append({
f"相似病例 {i}": {
"文件": filename,
"页码": page_num,
"诊断": case_info.get("diagnosis", "未找到"),
"症状": case_info.get("symptoms", "未找到"),
"相似度": "高" if i <= 2 else "中",
"病例摘要": case_info.get("summary", text[:200])
}
})
return json.dumps(results, ensure_ascii=False, indent=2)
def check_drug_interaction(query: str) -> str:
"""
检查药物相互作用
检查多种药物之间是否存在相互作用、过敏史、用药禁忌等
"""
docs = vector_store.similarity_search(query, k=3)
if not docs:
return "❌ 未找到相关医疗文档,请先运行文档处理。"
texts = []
sources = []
for doc in docs:
texts.append(doc.page_content)
filename = doc.metadata.get('filename', 'unknown')
page_num = doc.metadata.get('page_number', '?')
sources.append(f"{filename} (第{page_num}页)")
combined_text = "\n\n".join(texts)
prompt = f"""请检查以下医疗文档中的药物是否存在相互作用、过敏史、用药禁忌等安全问题:
医疗文档文本:
{combined_text}
请返回JSON格式的检查结果:
{{
"medications_found": ["药物1", "药物2", ...],
"interactions": [
{{
"drug1": "药物1",
"drug2": "药物2",
"warning": "相互作用警告信息",
"severity": "严重/中等/轻微"
}}
],
"allergies": ["过敏药物列表"],
"contraindications": ["用药禁忌列表"],
"overall_status": "安全/警告/危险",
"sources": {json.dumps(sources, ensure_ascii=False)}
}}
只返回JSON,不要其他文字。"""
try:
response = llm.invoke([HumanMessage(content=prompt)])
result = json.loads(response.content)
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
return f"❌ 检查药物相互作用时出错:{str(e)}"
def search_medical_literature(query: str) -> str:
"""
检索医学文献
使用向量检索找到相关的医学文献和研究资料
"""
docs = vector_store.similarity_search(query, k=5)
if not docs:
return "❌ 未找到相关医学文献,请先运行文档处理。"
results = []
for i, doc in enumerate(docs, 1):
text = doc.page_content
metadata = doc.metadata
filename = metadata.get('filename', 'unknown')
page_num = metadata.get('page_number', '?')
is_literature = any(keyword in text for keyword in ["研究", "文献", "期刊", "论文", "参考文献"])
prompt = f"""请从以下文档中提取关键信息:
文档文本:
{text}
请返回JSON格式:
{{
"type": "医学文献/病历/报告",
"summary": "内容摘要(100字以内)",
"key_points": ["关键点1", "关键点2"]
}}
只返回JSON,不要其他文字。"""
try:
response = llm.invoke([HumanMessage(content=prompt)])
doc_info = json.loads(response.content)
except:
doc_info = {"type": "医学文献" if is_literature else "病历/报告", "summary": text[:100], "key_points": []}
results.append({
f"文献 {i}": {
"标题": filename,
"页码": page_num,
"类型": doc_info.get("type", "未知"),
"相关性": "高" if i <= 2 else "中",
"内容摘要": doc_info.get("summary", text[:200]),
"关键点": doc_info.get("key_points", [])
}
})
return json.dumps(results, ensure_ascii=False, indent=2)
# 定义工具列表
tools = [
Tool(
name="process_documents",
description="处理医疗文档,将PDF/图片/Word解析成文本。输入可以是'处理所有文档'或文件路径。",
func=lambda q: process_documents() if "所有" in q else process_single_file(q)
),
Tool(
name="extract_medical_info",
description="从医疗文档中提取关键医疗信息(症状、诊断、用药、检查结果、基本信息等)。输入格式:提取医疗信息 文件:病历.pdf",
func=extract_medical_info
),
Tool(
name="search_similar_cases",
description="检索相似病例,基于症状、诊断等信息查找历史相似病例。输入应为症状或诊断描述,如'发热、咳嗽、胸闷'。",
func=search_similar_cases
),
Tool(
name="check_drug_interaction",
description="检查药物相互作用,检查多种药物之间是否存在相互作用、过敏史、用药禁忌等。输入格式:检查药物相互作用 文件:处方.pdf 或直接提供药物列表。",
func=check_drug_interaction
),
Tool(
name="search_medical_literature",
description="检索医学文献,查找相关的医学研究文献和资料。输入应为要检索的医学主题或关键词,如'高血压治疗'。",
func=search_medical_literature
)
]
# ========== Step 5: 初始化 Agent ==========
agent = initialize_agent(
tools=tools,
llm=llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True,
)
# ========== Step 6: 使用示例 ==========
if __name__ == "__main__":
print("=" * 60)
print("示例 1: 处理医疗文档")
print("=" * 60)
response = agent.invoke({
"input": "请处理所有医疗文档"
})
print(response["output"])
print()
print("=" * 60)
print("示例 2: 提取医疗信息")
print("=" * 60)
response = agent.invoke({
"input": "从病历中提取患者的主诉、诊断和用药信息 文件:patient_record.pdf"
})
print(response["output"])
print()
print("=" * 60)
print("示例 3: 检索相似病例")
print("=" * 60)
response = agent.invoke({
"input": "检索与'发热、咳嗽、胸闷'症状相似的病例"
})
print(response["output"])
print()
print("=" * 60)
print("示例 4: 药物安全检查")
print("=" * 60)
response = agent.invoke({
"input": "检查处方中的药物是否存在相互作用 文件:prescription.pdf"
})
print(response["output"])
print()
print("=" * 60)
print("示例 5: 检索医学文献")
print("=" * 60)
response = agent.invoke({
"input": "检索关于'高血压治疗'的医学文献"
})
print(response["output"])