跳转到主要内容
本教程面向医疗场景,展示如何利用 xParse SDK 解析医疗文档,然后通过大模型自动提取医疗信息、检索相似病例和检查药物相互作用。

场景介绍

业务痛点

在医疗场景中,医生和医疗工作者面临以下挑战:
  • 文档类型多样:需要处理病历、检查报告、处方单、医学影像报告等多种格式
  • 信息提取复杂:需要从非结构化文档中提取症状、诊断、用药、检查结果等关键信息
  • 病例检索困难:需要快速检索相似病例和医学文献,辅助诊断决策
  • 药物安全:需要检查药物相互作用、过敏史、用药禁忌等安全问题
  • 隐私保护:医疗数据涉及患者隐私,需要安全处理

解决方案

通过构建医疗文档Agent,我们可以实现:
  • 自动化文档解析:使用 xParse SDK 自动解析各类医疗文档
  • 智能信息提取:调用 xParse Extract API 直接从文档中提取结构化医疗信息(症状、诊断、用药等)
  • 相似病例检索:基于症状和诊断,从历史病例中检索相似案例
  • 药物安全检查:检查药物相互作用、过敏史、用药禁忌等
  • 医学文献检索:检索相关的医学文献和研究资料

架构设计

医疗文档(PDF/图片/Word)

[xParse SDK] 解析 + 分块 + 向量化

向量数据库(Milvus)

[LangChain Agent]
    ├─ Tool 1: extract_medical_info(xParse Extract API)
    ├─ Tool 2: search_similar_cases(向量检索)
    ├─ Tool 3: check_drug_interaction(向量检索 + LLM 分析)
    └─ Tool 4: search_medical_literature(向量检索)

诊断辅助报告
核心思路
  • 信息提取:使用 xParse Extract API 直接从文档中提取结构化医疗信息,无需先检索再提取
  • 相似病例检索:使用向量检索找到语义相似的病例
  • 医学文献检索:使用向量检索找到相关的医学文献
  • 药物安全检查:大模型分析检索到的用药信息

环境准备

首先安装必要的依赖:
python -m venv .venv && source .venv/bin/activate
pip install xparse-client langchain langchain-community langchain-core langchain-text-splitters langchain-milvus \
            pymilvus python-dotenv dashscope requests
创建 .env 文件存储配置:
# .env
TEXTIN_APP_ID=your-app-id
TEXTIN_SECRET_CODE=your-secret-code
MILVUS_DB_PATH=./medical_vectors.db
DASHSCOPE_API_KEY=your-dashscope-key
提示:TEXTIN_APP_IDTEXTIN_SECRET_CODE 参考 API Key,请登录 Textin 工作台 获取。示例中使用 通义千问 的大模型能力,其他模型用法类似。

完整代码示例

下面是一个完整的、可以直接运行的示例:
import os
import json
import glob
import base64
import requests
from dotenv import load_dotenv
from xparse_client import XParseClient
from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter
from langchain_core.tools import Tool
from langchain.agents import initialize_agent, AgentType
from langchain_core.messages import HumanMessage
from langchain_community.chat_models import ChatTongyi
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings

load_dotenv()

# ========== Step 1: 初始化 xParse SDK 并处理文档 ==========

client = XParseClient()

def process_documents() -> str:
    """处理医疗文档"""
    try:
        docs_dir = "/your/medical/documents/folder"
        headers_to_split_on = [("#", "header1"), ("##", "header2"), ("###", "header3")]
        markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=1536, chunk_overlap=100)

        all_chunks = []
        patterns = ["*.pdf", "*.png", "*.jpg", "*.jpeg", "*.docx"]
        for pattern in patterns:
            for file_path in glob.glob(os.path.join(docs_dir, pattern)):
                with open(file_path, "rb") as f:
                    result = client.parse.run(file=f, filename=os.path.basename(file_path))
                md_docs = markdown_splitter.split_text(result.markdown)
                for doc in md_docs:
                    doc.metadata["filename"] = os.path.basename(file_path)
                chunks = text_splitter.split_documents(md_docs)
                all_chunks.extend(chunks)

        embedding = DashScopeEmbeddings(model="text-embedding-v4")
        Milvus.from_documents(
            documents=all_chunks,
            embedding=embedding,
            collection_name="medical_documents",
            connection_args={"uri": os.getenv("MILVUS_DB_PATH")},
        )
        return "✅ 已处理所有医疗文档,解析结果已存入向量数据库。"
    except Exception as e:
        return f"❌ 处理文档时出错:{str(e)}"

def process_single_file(file_path: str) -> str:
    """处理单个文件"""
    try:
        with open(file_path, "rb") as f:
            result = client.parse.run(file=f, filename=os.path.basename(file_path))

        headers_to_split_on = [("#", "header1"), ("##", "header2"), ("###", "header3")]
        markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=1536, chunk_overlap=100)

        md_docs = markdown_splitter.split_text(result.markdown)
        for doc in md_docs:
            doc.metadata["filename"] = os.path.basename(file_path)
        chunks = text_splitter.split_documents(md_docs)

        embedding = DashScopeEmbeddings(model="text-embedding-v4")
        Milvus.from_documents(
            documents=chunks,
            embedding=embedding,
            collection_name="medical_documents",
            connection_args={"uri": os.getenv("MILVUS_DB_PATH")},
        )
        return f"✅ 成功处理文件 {file_path},解析结果已存入向量数据库。"
    except Exception as e:
        return f"❌ 处理文件 {file_path} 时出错:{str(e)}"

# ========== Step 2: 初始化向量数据库 ==========

embedding = DashScopeEmbeddings(model="text-embedding-v4")
vector_store = Milvus(
    embedding_function=embedding,
    collection_name="medical_documents",
    connection_args={"uri": os.getenv("MILVUS_DB_PATH")},
)

# ========== Step 3: 初始化大模型 ==========

llm = ChatTongyi(
    model="qwen-max",
    top_p=0.8,
    dashscope_api_key=os.getenv("DASHSCOPE_API_KEY")
)

# ========== Step 4: 构建 LangChain Tools ==========

EXTRACT_API_URL = "https://api.textin.com/ai/service/v3/entity_extraction"

def extract_from_file(file_path: str, schema: dict, generate_citations: bool = False) -> dict:
    with open(file_path, "rb") as f:
        file_base64 = base64.b64encode(f.read()).decode("utf-8")
    payload = {
        "file": {"file_base64": file_base64, "file_name": os.path.basename(file_path)},
        "schema": schema,
        "extract_options": {"generate_citations": generate_citations}
    }
    headers = {
        "x-ti-app-id": os.getenv("TEXTIN_APP_ID"),
        "x-ti-secret-code": os.getenv("TEXTIN_SECRET_CODE"),
        "Content-Type": "application/json"
    }
    response = requests.post(EXTRACT_API_URL, json=payload, headers=headers)
    result = response.json()
    if result.get("code") != 200:
        raise Exception(f"Extract API 错误: {result.get('message', '未知错误')}")
    return result["result"]

MEDICAL_SCHEMA = {
    "type": "object",
    "properties": {
        "基本信息": {
            "type": "object",
            "description": "患者基本信息",
            "properties": {
                "年龄": {"type": ["string", "null"], "description": "患者年龄"},
                "性别": {"type": ["string", "null"], "description": "患者性别"},
                "就诊日期": {"type": ["string", "null"], "description": "就诊日期"}
            }
        },
        "症状": {
            "type": "array",
            "description": "患者症状列表(主诉、现病史中的症状描述)",
            "items": {"type": "string"}
        },
        "诊断": {
            "type": "array",
            "description": "诊断结果列表(初步诊断、最终诊断、临床诊断)",
            "items": {"type": "string"}
        },
        "用药": {
            "type": "array",
            "description": "用药信息列表",
            "items": {
                "type": "object",
                "properties": {
                    "药物名称": {"type": ["string", "null"], "description": "药物名称"},
                    "剂量": {"type": ["string", "null"], "description": "剂量"},
                    "用法": {"type": ["string", "null"], "description": "用法用量"}
                },
                "required": ["药物名称"]
            }
        },
        "检查结果": {
            "type": "array",
            "description": "检查检验结果列表",
            "items": {
                "type": "object",
                "properties": {
                    "检查项目": {"type": ["string", "null"], "description": "检查项目名称"},
                    "结果": {"type": ["string", "null"], "description": "检查结果"}
                },
                "required": ["检查项目"]
            }
        }
    },
    "required": ["基本信息", "症状", "诊断", "用药", "检查结果"]
}

def extract_medical_info(query: str) -> str:
    """从医疗文档中提取关键医疗信息"""
    docs_dir = "/your/medical/documents/folder"

    if "文件:" in query:
        filename = query.split("文件:")[-1].strip()
        file_path = os.path.join(docs_dir, filename)
    else:
        files = []
        for pattern in ["*.pdf", "*.png", "*.jpg", "*.jpeg", "*.docx"]:
            files.extend(glob.glob(os.path.join(docs_dir, pattern)))
        if not files:
            return "❌ 未找到医疗文档,请确认文档目录。"
        file_path = files[0]

    if not os.path.exists(file_path):
        return f"❌ 文件不存在: {file_path}"

    try:
        result = extract_from_file(file_path, MEDICAL_SCHEMA, generate_citations=True)
        extracted = result["extracted_schema"]
        citations = result.get("citations", {})

        extracted["来源文件"] = os.path.basename(file_path)

        return json.dumps(extracted, ensure_ascii=False, indent=2)
    except Exception as e:
        return f"❌ 提取医疗信息时出错:{str(e)}"

def search_similar_cases(query: str) -> str:
    """
    检索相似病例
    
    基于症状、诊断等信息,使用向量检索找到语义相似的病例
    """
    docs = vector_store.similarity_search(query, k=5)
    
    if not docs:
        return "❌ 未找到相似病例,请先运行文档处理。"
    
    results = []
    for i, doc in enumerate(docs, 1):
        text = doc.page_content
        metadata = doc.metadata
        filename = metadata.get('filename', 'unknown')
        page_num = metadata.get('page_number', '?')
        
        prompt = f"""请从以下病例文本中提取关键信息:

病例文本:
{text}

请返回JSON格式:
{{
    "diagnosis": "诊断信息",
    "symptoms": "症状信息",
    "summary": "病例摘要(100字以内)"
}}

只返回JSON,不要其他文字。"""
        
        try:
            response = llm.invoke([HumanMessage(content=prompt)])
            case_info = json.loads(response.content)
        except:
            case_info = {"diagnosis": "未提取", "symptoms": "未提取", "summary": text[:100]}
        
        results.append({
            f"相似病例 {i}": {
                "文件": filename,
                "页码": page_num,
                "诊断": case_info.get("diagnosis", "未找到"),
                "症状": case_info.get("symptoms", "未找到"),
                "相似度": "高" if i <= 2 else "中",
                "病例摘要": case_info.get("summary", text[:200])
            }
        })
    
    return json.dumps(results, ensure_ascii=False, indent=2)

def check_drug_interaction(query: str) -> str:
    """
    检查药物相互作用
    
    检查多种药物之间是否存在相互作用、过敏史、用药禁忌等
    """
    docs = vector_store.similarity_search(query, k=3)
    
    if not docs:
        return "❌ 未找到相关医疗文档,请先运行文档处理。"
    
    texts = []
    sources = []
    for doc in docs:
        texts.append(doc.page_content)
        filename = doc.metadata.get('filename', 'unknown')
        page_num = doc.metadata.get('page_number', '?')
        sources.append(f"{filename} (第{page_num}页)")
    
    combined_text = "\n\n".join(texts)
    
    prompt = f"""请检查以下医疗文档中的药物是否存在相互作用、过敏史、用药禁忌等安全问题:

医疗文档文本:
{combined_text}

请返回JSON格式的检查结果:
{{
    "medications_found": ["药物1", "药物2", ...],
    "interactions": [
        {{
            "drug1": "药物1",
            "drug2": "药物2",
            "warning": "相互作用警告信息",
            "severity": "严重/中等/轻微"
        }}
    ],
    "allergies": ["过敏药物列表"],
    "contraindications": ["用药禁忌列表"],
    "overall_status": "安全/警告/危险",
    "sources": {json.dumps(sources, ensure_ascii=False)}
}}

只返回JSON,不要其他文字。"""
    
    try:
        response = llm.invoke([HumanMessage(content=prompt)])
        result = json.loads(response.content)
        return json.dumps(result, ensure_ascii=False, indent=2)
    except Exception as e:
        return f"❌ 检查药物相互作用时出错:{str(e)}"

def search_medical_literature(query: str) -> str:
    """
    检索医学文献
    
    使用向量检索找到相关的医学文献和研究资料
    """
    docs = vector_store.similarity_search(query, k=5)
    
    if not docs:
        return "❌ 未找到相关医学文献,请先运行文档处理。"
    
    results = []
    for i, doc in enumerate(docs, 1):
        text = doc.page_content
        metadata = doc.metadata
        filename = metadata.get('filename', 'unknown')
        page_num = metadata.get('page_number', '?')
        
        is_literature = any(keyword in text for keyword in ["研究", "文献", "期刊", "论文", "参考文献"])
        
        prompt = f"""请从以下文档中提取关键信息:

文档文本:
{text}

请返回JSON格式:
{{
    "type": "医学文献/病历/报告",
    "summary": "内容摘要(100字以内)",
    "key_points": ["关键点1", "关键点2"]
}}

只返回JSON,不要其他文字。"""
        
        try:
            response = llm.invoke([HumanMessage(content=prompt)])
            doc_info = json.loads(response.content)
        except:
            doc_info = {"type": "医学文献" if is_literature else "病历/报告", "summary": text[:100], "key_points": []}
        
        results.append({
            f"文献 {i}": {
                "标题": filename,
                "页码": page_num,
                "类型": doc_info.get("type", "未知"),
                "相关性": "高" if i <= 2 else "中",
                "内容摘要": doc_info.get("summary", text[:200]),
                "关键点": doc_info.get("key_points", [])
            }
        })
    
    return json.dumps(results, ensure_ascii=False, indent=2)

# 定义工具列表
tools = [
    Tool(
        name="process_documents",
        description="处理医疗文档,将PDF/图片/Word解析成文本。输入可以是'处理所有文档'或文件路径。",
        func=lambda q: process_documents() if "所有" in q else process_single_file(q)
    ),
    Tool(
        name="extract_medical_info",
        description="从医疗文档中提取关键医疗信息(症状、诊断、用药、检查结果、基本信息等)。输入格式:提取医疗信息 文件:病历.pdf",
        func=extract_medical_info
    ),
    Tool(
        name="search_similar_cases",
        description="检索相似病例,基于症状、诊断等信息查找历史相似病例。输入应为症状或诊断描述,如'发热、咳嗽、胸闷'。",
        func=search_similar_cases
    ),
    Tool(
        name="check_drug_interaction",
        description="检查药物相互作用,检查多种药物之间是否存在相互作用、过敏史、用药禁忌等。输入格式:检查药物相互作用 文件:处方.pdf 或直接提供药物列表。",
        func=check_drug_interaction
    ),
    Tool(
        name="search_medical_literature",
        description="检索医学文献,查找相关的医学研究文献和资料。输入应为要检索的医学主题或关键词,如'高血压治疗'。",
        func=search_medical_literature
    )
]

# ========== Step 5: 初始化 Agent ==========

agent = initialize_agent(
    tools=tools,
    llm=llm,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True,
)

# ========== Step 6: 使用示例 ==========

if __name__ == "__main__":
    print("=" * 60)
    print("示例 1: 处理医疗文档")
    print("=" * 60)
    response = agent.invoke({
        "input": "请处理所有医疗文档"
    })
    print(response["output"])
    print()
    
    print("=" * 60)
    print("示例 2: 提取医疗信息")
    print("=" * 60)
    response = agent.invoke({
        "input": "从病历中提取患者的主诉、诊断和用药信息 文件:patient_record.pdf"
    })
    print(response["output"])
    print()
    
    print("=" * 60)
    print("示例 3: 检索相似病例")
    print("=" * 60)
    response = agent.invoke({
        "input": "检索与'发热、咳嗽、胸闷'症状相似的病例"
    })
    print(response["output"])
    print()
    
    print("=" * 60)
    print("示例 4: 药物安全检查")
    print("=" * 60)
    response = agent.invoke({
        "input": "检查处方中的药物是否存在相互作用 文件:prescription.pdf"
    })
    print(response["output"])
    print()
    
    print("=" * 60)
    print("示例 5: 检索医学文献")
    print("=" * 60)
    response = agent.invoke({
        "input": "检索关于'高血压治疗'的医学文献"
    })
    print(response["output"])

代码说明

Step 1: xParse SDK 解析与文本处理

文档处理分为三个阶段:
  • 解析:使用 XParseClientclient.parse.run() 解析医疗文档,返回 Markdown 格式的结构化文本
  • 分块:使用 LangChain 的 MarkdownHeaderTextSplitter 按标题分块保持病历章节结构,再用 RecursiveCharacterTextSplitter 进行二次分块控制块大小
  • 向量化:使用 DashScopeEmbeddings 生成向量,通过 Milvus.from_documents() 存入向量数据库
为什么使用 LangChain 分块和向量化
  • MarkdownHeaderTextSplitter 能识别 Markdown 标题层级,按病历章节自然分块
  • RecursiveCharacterTextSplitter 控制块大小(1536字符)并保留上下文重叠(100字符)
  • 向量检索比文本匹配更准确,能理解医学概念和术语的语义关系

Step 2: 向量数据库初始化

使用与文档处理相同的 embedding 模型初始化向量数据库连接,保证语义空间一致。

Step 3: 大模型初始化

使用通义千问(qwen-max)作为大模型,用于信息提取和结果分析。

Step 4: Tools 实现

  • extract_medical_info:使用 xParse Extract API 直接从文档中提取结构化医疗信息,无需先检索再提取
  • search_similar_cases:使用向量检索找到语义相似的病例,再用大模型提取关键信息
  • check_drug_interaction:使用向量检索找到相关用药信息,再用大模型检查相互作用
  • search_medical_literature:使用向量检索找到相关文献,再用大模型提取关键信息
关键点extract_medical_info 通过 Extract API 直接对源文件进行结构化提取,避免了向量检索 + LLM 提取的两步流程,提取结果更完整准确。其他工具结合向量检索(语义相似度)和大模型(信息提取和分析),既快速又准确。

Step 5: Agent 配置

Agent 会自动:
  • 判断是否需要先处理文档
  • 选择合适的 Tool 提取信息或检索
  • 组织最终的回答

使用示例

示例 1:处理文档

response = agent.invoke({
    "messages": [HumanMessage(content="请处理所有医疗文档")]
})
print(response["messages"][-1].content)

示例 2:提取医疗信息

response = agent.invoke({
    "messages": [HumanMessage(content="从病历中提取患者的主诉、诊断和用药信息 文件:patient_record.pdf")]
})
print(response["messages"][-1].content)

示例 3:检索相似病例

response = agent.invoke({
    "messages": [HumanMessage(content="检索与'发热、咳嗽、胸闷'症状相似的病例")]
})
print(response["messages"][-1].content)

示例 4:药物安全检查

response = agent.invoke({
    "messages": [HumanMessage(content="检查处方中的'阿司匹林'和'华法林'是否存在相互作用 文件:prescription.pdf")]
})
print(response["messages"][-1].content)

示例 5:检索医学文献

response = agent.invoke({
    "messages": [HumanMessage(content="检索关于'高血压治疗'的医学文献")]
})
print(response["messages"][-1].content)

最佳实践

  1. 隐私保护:医疗数据涉及患者隐私,确保数据加密存储和传输
  2. 分块策略:使用 MarkdownHeaderTextSplitter 按标题分块保持病历章节结构,便于理解上下文
  3. 块大小控制:通过 chunk_size=1536chunk_overlap=100 平衡语义完整性和检索精度
  4. 药物数据库:在实际应用中,建议集成专业的药物相互作用数据库,提高检查准确性
  5. 多语言支持:医疗术语可能涉及多语言,确保解析引擎支持
  6. 结果验证:Agent 的建议仅供参考,最终诊断需由医生确认
  7. 提示工程:优化大模型的提示词,提高提取和检索准确率
  8. 错误处理:对于识别失败或提取错误的情况,记录错误信息,便于人工处理

常见问题

Q: 如何处理手写病历?
A: 使用支持 OCR 的解析引擎(TextIn xParse),可以识别手写内容,但准确率可能低于打印文档。建议预处理图片(开启 xParse 切边增强)提高识别率。
Q: 如何提高诊断准确性?
A: 1) 优化提示词,明确要求提取的字段;2) 使用更强的模型(如 qwen-max);3) 结合多个检查结果综合判断;4) 参考最新的医学文献。
Q: 如何保护患者隐私?
A: 1) 数据加密存储;2) 访问权限控制;3) 日志脱敏处理;4) 符合HIPAA等医疗数据保护法规;5) 不在提示词中包含患者姓名等敏感信息。
Q: 可以使用其他 LLM 吗?
A: 可以。LangChain 支持多种 LLM,只需替换 ChatTongyi(通义千问)为对应的类,如 ChatOpenAI(OpenAI)、ChatZhipuAI(智谱AI)等。
Q: 如何提高相似病例检索的准确性?
A: 1) 在提示词中明确相似度判断标准;2) 要求大模型提取关键特征(症状、诊断、检查结果)进行匹配;3) 可以结合多个文档综合判断。

相关文档