import os
import json
import re
from pathlib import Path
from dotenv import load_dotenv
from xparse_client import create_pipeline_from_config
from langchain_core.tools import Tool
from langchain.agents import create_agent
from langchain_core.messages import HumanMessage
from langchain_community.chat_models import ChatTongyi
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings
# 加载环境变量
load_dotenv()
# ========== Step 1: 初始化 xParse Pipeline ==========
MEDICAL_PIPELINE_CONFIG = {
"source": {
"type": "local",
"directory": "/your/medical/documents/folder", # 医疗文档存放目录
"pattern": ["*.pdf", "*.png", "*.jpg", "*.jpeg", "*.docx"] # 支持病历、检查报告、处方单等
},
"destination": {
"type": "milvus", # 输出到向量数据库
"db_path": os.getenv("MILVUS_DB_PATH"),
"collection_name": "medical_documents",
"dimension": 1024
},
"api_base_url": "https://api.textin.com/api/xparse",
"api_headers": {
"x-ti-app-id": os.getenv("X_TI_APP_ID"),
"x-ti-secret-code": os.getenv("X_TI_SECRET_CODE")
},
"stages": [
{
"type": "parse",
"config": {
"provider": "textin" # 使用TextIn解析引擎,对医疗表格识别效果好
}
},
{
"type": "chunk",
"config": {
"strategy": "by_title", # 按标题分块,保持病历章节结构
"include_orig_elements": True, # 保留原始元素,便于追溯
"new_after_n_chars": 512,
"max_characters": 1536, # 病历章节可能较长
"overlap": 100 # 章节间重叠,保持上下文
}
},
{
"type": "embed",
"config": {
"provider": "qwen",
"model_name": "text-embedding-v4" # 使用高精度向量模型
}
}
]
}
# 初始化 Pipeline(全局复用)
pipeline = create_pipeline_from_config(MEDICAL_PIPELINE_CONFIG)
def process_documents() -> str:
"""处理医疗文档"""
try:
pipeline.run()
return "✅ 已处理所有医疗文档,解析结果已存入向量数据库。"
except Exception as e:
return f"❌ 处理文档时出错:{str(e)}"
def process_single_file(file_path: str) -> str:
"""处理单个文件"""
try:
file_bytes = pipeline.source.read_file(file_path)
success = pipeline.process_file(file_bytes, file_path)
if success:
return f"✅ 成功处理文件 {file_path},解析结果已存入向量数据库。"
else:
return f"❌ 处理文件 {file_path} 失败。"
except Exception as e:
return f"❌ 处理文件 {file_path} 时出错:{str(e)}"
# ========== Step 2: 初始化向量数据库 ==========
# 使用与 Pipeline 相同的 embedding 配置,保证语义空间一致
embedding = DashScopeEmbeddings(model="text-embedding-v4")
vector_store = Milvus(
embedding_function=embedding,
collection_name="medical_documents",
connection_args={"uri": os.getenv("MILVUS_DB_PATH")},
vector_field="embeddings", # 使用 embeddings 字段存储向量(与 xparse_client 保持一致)
primary_field="element_id", # 使用 element_id 作为主键(与 xparse_client 保持一致)
text_field="text", # 使用 text 字段存储文本内容
enable_dynamic_field=True # 启用动态字段支持,这样才能返回所有 metadata 字段
)
# ========== Step 3: 初始化大模型 ==========
llm = ChatTongyi(
model="qwen-max",
top_p=0.8,
dashscope_api_key=os.getenv("DASHSCOPE_API_KEY")
)
# ========== Step 4: 构建 LangChain Tools ==========
def extract_medical_info(query: str) -> str:
"""
从医疗文档中提取关键医疗信息
支持提取:
- 症状(主诉、现病史)
- 诊断(初步诊断、最终诊断)
- 用药(药物名称、剂量、用法)
- 检查结果(检验值、影像描述)
- 基本信息(年龄、性别、就诊日期)
"""
# 从向量库中检索相关文档片段
docs = vector_store.similarity_search(query, k=5)
if not docs:
return "❌ 未找到相关医疗文档,请先运行文档处理。"
# 合并检索到的文本
texts = []
sources = []
for doc in docs:
texts.append(doc.page_content)
filename = doc.metadata.get('filename', 'unknown')
page_num = doc.metadata.get('page_number', '?')
sources.append(f"{filename} (第{page_num}页)")
combined_text = "\n\n".join(texts)
# 构建提取提示
prompt = f"""请从以下医疗文档文本中提取关键医疗信息,返回JSON格式:
医疗文档文本:
{combined_text}
请提取以下信息并返回JSON格式:
{{
"basic_info": {{
"age": "年龄",
"gender": "性别",
"visit_date": "就诊日期"
}},
"symptoms": [
"主诉",
"现病史中的症状描述"
],
"diagnosis": [
"初步诊断",
"最终诊断",
"临床诊断"
],
"medications": [
{{
"name": "药物名称",
"dose": "剂量",
"unit": "单位",
"frequency": "用法"
}}
],
"test_results": [
{{
"test_name": "检查项目名称",
"result": "检查结果"
}}
],
"sources": {json.dumps(sources, ensure_ascii=False)}
}}
只返回JSON,不要其他文字。"""
try:
response = llm.invoke([HumanMessage(content=prompt)])
result = json.loads(response.content)
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
return f"❌ 提取信息时出错:{str(e)}"
def search_similar_cases(query: str) -> str:
"""
检索相似病例
基于症状、诊断等信息,使用向量检索找到语义相似的病例
"""
# 使用向量检索找到相似病例
docs = vector_store.similarity_search(query, k=5)
if not docs:
return "❌ 未找到相似病例,请先运行文档处理。"
# 构建结果
results = []
for i, doc in enumerate(docs, 1):
text = doc.page_content
metadata = doc.metadata
filename = metadata.get('filename', 'unknown')
page_num = metadata.get('page_number', '?')
# 使用大模型提取关键信息用于展示
prompt = f"""请从以下病例文本中提取关键信息:
病例文本:
{text}
请返回JSON格式:
{{
"diagnosis": "诊断信息",
"symptoms": "症状信息",
"summary": "病例摘要(100字以内)"
}}
只返回JSON,不要其他文字。"""
try:
response = llm.invoke([HumanMessage(content=prompt)])
case_info = json.loads(response.content)
except:
case_info = {"diagnosis": "未提取", "symptoms": "未提取", "summary": text[:100]}
results.append({
f"相似病例 {i}": {
"文件": filename,
"页码": page_num,
"诊断": case_info.get("diagnosis", "未找到"),
"症状": case_info.get("symptoms", "未找到"),
"相似度": "高" if i <= 2 else "中",
"病例摘要": case_info.get("summary", text[:200])
}
})
return json.dumps(results, ensure_ascii=False, indent=2)
def check_drug_interaction(query: str) -> str:
"""
检查药物相互作用
检查多种药物之间是否存在相互作用、过敏史、用药禁忌等
"""
# 从向量库中检索相关文档(包含用药信息)
docs = vector_store.similarity_search(query, k=3)
if not docs:
return "❌ 未找到相关医疗文档,请先运行文档处理。"
# 合并检索到的文本
texts = []
sources = []
for doc in docs:
texts.append(doc.page_content)
filename = doc.metadata.get('filename', 'unknown')
page_num = doc.metadata.get('page_number', '?')
sources.append(f"{filename} (第{page_num}页)")
combined_text = "\n\n".join(texts)
# 构建检查提示
prompt = f"""请检查以下医疗文档中的药物是否存在相互作用、过敏史、用药禁忌等安全问题:
医疗文档文本:
{combined_text}
请返回JSON格式的检查结果:
{{
"medications_found": ["药物1", "药物2", ...],
"interactions": [
{{
"drug1": "药物1",
"drug2": "药物2",
"warning": "相互作用警告信息",
"severity": "严重/中等/轻微"
}}
],
"allergies": ["过敏药物列表"],
"contraindications": ["用药禁忌列表"],
"overall_status": "安全/警告/危险",
"sources": {json.dumps(sources, ensure_ascii=False)}
}}
只返回JSON,不要其他文字。"""
try:
response = llm.invoke([HumanMessage(content=prompt)])
result = json.loads(response.content)
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
return f"❌ 检查药物相互作用时出错:{str(e)}"
def search_medical_literature(query: str) -> str:
"""
检索医学文献
使用向量检索找到相关的医学文献和研究资料
"""
# 使用向量检索找到相关文献
docs = vector_store.similarity_search(query, k=5)
if not docs:
return "❌ 未找到相关医学文献,请先运行文档处理。"
# 构建结果
results = []
for i, doc in enumerate(docs, 1):
text = doc.page_content
metadata = doc.metadata
filename = metadata.get('filename', 'unknown')
page_num = metadata.get('page_number', '?')
# 判断是否为医学文献(简化判断)
is_literature = any(keyword in text for keyword in ["研究", "文献", "期刊", "论文", "参考文献"])
# 使用大模型提取关键信息
prompt = f"""请从以下文档中提取关键信息:
文档文本:
{text}
请返回JSON格式:
{{
"type": "医学文献/病历/报告",
"summary": "内容摘要(100字以内)",
"key_points": ["关键点1", "关键点2"]
}}
只返回JSON,不要其他文字。"""
try:
response = llm.invoke([HumanMessage(content=prompt)])
doc_info = json.loads(response.content)
except:
doc_info = {"type": "医学文献" if is_literature else "病历/报告", "summary": text[:100], "key_points": []}
results.append({
f"文献 {i}": {
"标题": filename,
"页码": page_num,
"类型": doc_info.get("type", "未知"),
"相关性": "高" if i <= 2 else "中",
"内容摘要": doc_info.get("summary", text[:200]),
"关键点": doc_info.get("key_points", [])
}
})
return json.dumps(results, ensure_ascii=False, indent=2)
# 定义工具列表
tools = [
Tool(
name="process_documents",
description="处理医疗文档,将PDF/图片/Word解析成文本。输入可以是'处理所有文档'或文件路径。",
func=lambda q: process_documents() if "所有" in q else process_single_file(q)
),
Tool(
name="extract_medical_info",
description="从医疗文档中提取关键医疗信息,包括症状、诊断、用药、检查结果、基本信息等。输入格式:提取医疗信息 文件:病历.pdf",
func=extract_medical_info
),
Tool(
name="search_similar_cases",
description="检索相似病例,基于症状、诊断等信息查找历史相似病例。输入应为症状或诊断描述,如'发热、咳嗽、胸闷'。",
func=search_similar_cases
),
Tool(
name="check_drug_interaction",
description="检查药物相互作用,检查多种药物之间是否存在相互作用、过敏史、用药禁忌等。输入格式:检查药物相互作用 文件:处方.pdf 或直接提供药物列表。",
func=check_drug_interaction
),
Tool(
name="search_medical_literature",
description="检索医学文献,查找相关的医学研究文献和资料。输入应为要检索的医学主题或关键词,如'高血压治疗'。",
func=search_medical_literature
)
]
# ========== Step 5: 初始化 Agent ==========
agent = create_agent(
model=llm,
tools=tools,
debug=True, # 显示 Agent 的思考过程
system_prompt="""你是一个专业的医疗文档分析助手。你的任务是帮助医生和医疗工作者:
1. 处理医疗文档(解析PDF/图片/Word成文本)
2. 从病历、检查报告中提取关键医疗信息
3. 检索相似病例,辅助诊断决策
4. 检查药物相互作用,确保用药安全
5. 检索医学文献,提供参考资料
在回答时,请:
- 先处理文档(如果还没有解析结果)
- 引用具体的文档名称和页码
- 提供详细的医疗信息提取结果
- 如果发现药物相互作用或安全问题,明确警告
- 使用工具获取准确的信息,不要猜测
- 注意保护患者隐私,不要泄露敏感信息
- 明确说明:Agent的建议仅供参考,最终诊断需由医生确认
"""
)
# ========== Step 6: 使用示例 ==========
if __name__ == "__main__":
# 示例 1: 处理文档
print("=" * 60)
print("示例 1: 处理医疗文档")
print("=" * 60)
response = agent.invoke({
"messages": [HumanMessage(content="请处理所有医疗文档")]
})
print(response["messages"][-1].content)
print()
# 示例 2: 提取医疗信息
print("=" * 60)
print("示例 2: 提取医疗信息")
print("=" * 60)
response = agent.invoke({
"messages": [HumanMessage(content="从病历中提取患者的主诉、诊断和用药信息 文件:patient_record.pdf")]
})
print(response["messages"][-1].content)
print()
# 示例 3: 检索相似病例
print("=" * 60)
print("示例 3: 检索相似病例")
print("=" * 60)
response = agent.invoke({
"messages": [HumanMessage(content="检索与'发热、咳嗽、胸闷'症状相似的病例")]
})
print(response["messages"][-1].content)
print()
# 示例 4: 药物安全检查
print("=" * 60)
print("示例 4: 药物安全检查")
print("=" * 60)
response = agent.invoke({
"messages": [HumanMessage(content="检查处方中的药物是否存在相互作用 文件:prescription.pdf")]
})
print(response["messages"][-1].content)
print()
# 示例 5: 检索医学文献
print("=" * 60)
print("示例 5: 检索医学文献")
print("=" * 60)
response = agent.invoke({
"messages": [HumanMessage(content="检索关于'高血压治疗'的医学文献")]
})
print(response["messages"][-1].content)