跳转到主要内容
本教程面向财务审计、合规审核等场景,展示如何利用 xParse 作为数据底座,构建能够自动解析财务文档、提取关键信息、进行合规性检查和异常检测的智能Agent。

场景介绍

业务痛点

在企业财务审计和合规审核场景中,审计人员面临以下挑战:
  • 文档量大:需要处理大量财务报表、合同、发票、银行对账单等文档
  • 信息提取繁琐:需要从非结构化文档中提取关键财务指标(金额、日期、合同条款等)
  • 合规性检查复杂:需要对照法规和内部政策,检查合同条款、财务数据是否符合规范
  • 异常检测困难:需要识别金额异常、日期冲突、数据不一致等问题
  • 追溯困难:发现问题后,需要追溯到原始文档的具体位置进行验证

解决方案

通过构建财务审计Agent,我们可以实现:
  • 自动化文档解析:使用 xParse Pipeline 自动解析各类财务文档
  • 智能信息提取:从解析结果中提取关键财务数据和合同条款
  • 合规性自动检查:基于知识库和历史案例,自动检查合规性
  • 异常自动检测:识别金额异常、日期冲突等异常情况
  • 结果可追溯:保留原始元素和坐标信息,便于追溯验证

架构设计

财务文档(PDF/Excel/图片)

[xParse Pipeline]
    ├─ Parse: 解析财务报表、合同、发票
    ├─ Chunk: 按页面分块(保持页面完整性)
    └─ Embed: 向量化

向量数据库(Milvus/Zilliz)

[LangChain Agent]
    ├─ Tool 1: extract_financial_data(提取财务数据)
    ├─ Tool 2: check_compliance(合规性检查)
    ├─ Tool 3: detect_anomalies(异常检测)
    └─ Tool 4: vector_search(检索历史案例)

审计报告(含引用和追溯信息)

环境准备

python -m venv .venv && source .venv/bin/activate
pip install "xparse-client>=0.2.5" langchain langchain-core \
            langchain_milvus langchain-community \
            pymilvus python-dotenv
export XTI_APP_ID=your-app-id # 在 TextIn 官网注册获取
export XTI_SECRET_CODE=your-secret-code # 在 TextIn 官网注册获取
export DASHSCOPE_API_KEY=your-dashscope-api-key # 本教程使用通义千问大模型,也可以替换成其他大模型
提示:X_TI_APP_IDX_TI_SECRET_CODE 参考 API Key,请登录 Textin 工作台 获取。示例中使用 通义千问 的大模型能力,其他模型用法类似。

Step 1:配置 xParse Pipeline

针对财务审计场景,我们使用以下配置:
  • 分块策略by_page - 保持页面完整性,便于追溯
  • 原始元素保留include_orig_elements=True - 保留原始元素信息
  • 表格优化:确保表格结构完整提取
from xparse_client import create_pipeline_from_config
import os
from dotenv import load_dotenv

load_dotenv()

AUDIT_PIPELINE_CONFIG = {
    "source": {
        "type": "local",
        "directory": "./audit_documents",
        "pattern": ["*.pdf", "*.xlsx", "*.xls", "*.png", "*.jpg", "*.txt", "*.docx", "*.doc"]  # 支持多种财务文档格式
    },
    "destination": {
        "type": "milvus",
        "db_path": "./audit_vectors.db",
        "collection_name": "audit_documents",
        "dimension": 1024
    },
    "api_base_url": "https://api.textin.com/api/xparse",
    "api_headers": {
        "x-ti-app-id": os.getenv("XTI_APP_ID"),
        "x-ti-secret-code": os.getenv("XTI_SECRET_CODE")
    },
    "stages": [
        {
            "type": "parse",
            "config": {
                "provider": "textin"  # 使用TextIn解析引擎,对表格识别效果好
            }
        },
        {
            "type": "chunk",
            "config": {
                "strategy": "by_page",  # 按页面分块,保持页面完整性
                "include_orig_elements": True,  # 保留原始元素,便于追溯
                "max_characters": 2048,  # 财务文档页面可能较长
                "overlap": 100  # 页面间重叠,保持上下文
            }
        },
        {
            "type": "embed",
            "config": {
                "provider": "qwen",
                "model_name": "text-embedding-v3"
            }
        }
    ]
}

def run_audit_pipeline() -> None:
    """运行审计文档处理Pipeline"""
    pipeline = create_pipeline_from_config(AUDIT_PIPELINE_CONFIG)
    pipeline.run()

Step 2:构建 LangChain Tools

Tool 1: 提取财务数据

from langchain_core.tools import Tool
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings
import re
import json
import os

embedding = DashScopeEmbeddings(
    model="text-embedding-v3",
    dashscope_api_key=os.getenv("DASHSCOPE_API_KEY"),
)
vector_store = Milvus(
    embedding_function=embedding,
    collection_name="audit_documents",
    connection_args={"uri": "./audit_vectors.db"},
    primary_field="element_id",
    text_field="text",
    vector_field="embeddings",
    enable_dynamic_field=True,
)

def extract_financial_data(query: str) -> str:
    """
    从文档中提取财务数据
    
    支持提取:
    - 金额(人民币、美元等)
    - 日期(合同签署日期、付款日期等)
    - 合同条款(签约方、违约责任等)
    - 发票信息(发票号、税号等)
    """
    # 检索相关文档片段
    docs = vector_store.similarity_search(query, k=5)
    
    # 提取关键信息
    results = []
    for doc in docs:
        text = doc.page_content
        metadata = doc.metadata
        
        # 提取金额
        amounts = re.findall(r'[¥$€]\s*[\d,]+\.?\d*|[\d,]+\.?\d*\s*[元美元欧元]', text)
        
        # 提取日期
        dates = re.findall(r'\d{4}[-年]\d{1,2}[-月]\d{1,2}[]?|\d{4}/\d{1,2}/\d{1,2}', text)
        
        # 提取合同相关信息
        contract_info = {}
        if "合同" in text or "协议" in text:
            parties = re.findall(r'(甲方|乙方|签约方)[::]\s*([^\n]+)', text)
            contract_info["parties"] = dict(parties)
        
        results.append({
            "file": metadata.get("filename", "unknown"),
            "page": metadata.get("page_number", "unknown"),
            "amounts": amounts[:5],  # 限制数量
            "dates": dates[:5],
            "contract_info": contract_info,
            "snippet": text[:200]
        })
    
    return json.dumps(results, ensure_ascii=False, indent=2)

Tool 2: 合规性检查

def check_compliance(query: str) -> str:
    """
    检查文档是否符合合规要求
    
    检查项包括:
    - 合同条款是否符合法规要求
    - 财务数据是否符合会计准则
    - 发票信息是否完整
    - 审批流程是否合规
    """
    # 检索相关文档和历史合规案例
    docs = vector_store.similarity_search(query, k=3)
    
    # 合规性检查规则(示例)
    compliance_checks = []
    
    for doc in docs:
        text = doc.page_content
        metadata = doc.metadata
        
        checks = {
            "file": metadata.get("filename", "unknown"),
            "page": metadata.get("page_number", "unknown"),
            "issues": []
        }
        
        # 检查1: 合同金额是否过大(示例规则)
        amounts = re.findall(r'[\d,]+\.?\d*', text)
        for amount_str in amounts:
            try:
                amount = float(amount_str.replace(',', ''))
                if amount > 1000000:  # 超过100万需要特殊审批
                    checks["issues"].append(f"金额 {amount_str} 超过100万,需要特殊审批")
            except:
                pass
        
        # 检查2: 日期是否合理
        dates = re.findall(r'\d{4}[-年]\d{1,2}[-月]\d{1,2}[]?', text)
        for date_str in dates:
            # 简单检查:日期不能是未来日期(需要更复杂的逻辑)
            pass
        
        # 检查3: 合同关键条款是否存在
        required_terms = ["违约责任", "争议解决", "合同期限"]
        missing_terms = [term for term in required_terms if term not in text]
        if missing_terms:
            checks["issues"].append(f"缺少关键条款: {', '.join(missing_terms)}")
        
        if checks["issues"]:
            compliance_checks.append(checks)
    
    if not compliance_checks:
        return "✅ 未发现合规性问题"
    
    return json.dumps(compliance_checks, ensure_ascii=False, indent=2)

Tool 3: 异常检测

def detect_anomalies(query: str) -> str:
    """
    检测财务数据中的异常
    
    检测项包括:
    - 金额异常(过大、过小、负数等)
    - 日期冲突(付款日期早于合同日期等)
    - 数据不一致(同一合同在不同文档中金额不同)
    """
    docs = vector_store.similarity_search(query, k=5)
    
    anomalies = []
    
    # 收集所有金额和日期
    all_amounts = []
    all_dates = []
    
    for doc in docs:
        text = doc.page_content
        metadata = doc.metadata
        
        # 提取金额
        amounts = re.findall(r'[\d,]+\.?\d*', text)
        for amount_str in amounts:
            try:
                amount = float(amount_str.replace(',', ''))
                all_amounts.append({
                    "value": amount,
                    "source": metadata.get("filename", "unknown"),
                    "page": metadata.get("page_number", "unknown")
                })
            except:
                pass
        
        # 提取日期
        dates = re.findall(r'\d{4}[-年]\d{1,2}[-月]\d{1,2}[]?', text)
        all_dates.extend([{
            "value": date,
            "source": metadata.get("filename", "unknown"),
            "page": metadata.get("page_number", "unknown")
        } for date in dates])
    
    # 检测异常
    # 1. 金额异常
    if all_amounts:
        # 过滤掉0值和负数,只保留正数金额
        positive_amounts = [a for a in all_amounts if a["value"] > 0]
        
        if len(positive_amounts) >= 2:
            amounts_values = [a["value"] for a in positive_amounts]
            max_amount = max(amounts_values)
            min_amount = min(amounts_values)
            
            # 确保最小值不为0,避免除以零
            if min_amount > 0:
                ratio = max_amount / min_amount
                if ratio > 1000:  # 金额差异过大
                    anomalies.append({
                        "type": "金额差异异常",
                        "description": f"金额差异异常:最大金额 {max_amount:,.2f} 元与最小金额 {min_amount:,.2f} 元的比例达到 {ratio:.2f},超过1000倍",
                        "details": [a for a in positive_amounts if a["value"] in [max_amount, min_amount]]
                    })
    
    # 2. 负数金额(可能是错误)
    negative_amounts = [a for a in all_amounts if a["value"] < 0]
    if negative_amounts:
        anomalies.append({
            "type": "负数金额异常",
            "description": "发现负数金额,可能是录入错误",
            "details": negative_amounts
        })
    
    if not anomalies:
        return "✅ 未发现异常"
    
    return json.dumps(anomalies, ensure_ascii=False, indent=2)

Tool 4: 检索历史案例

def search_historical_cases(query: str) -> str:
    """检索历史审计案例"""
    docs = vector_store.similarity_search(query, k=5)
    
    results = []
    for i, doc in enumerate(docs, 1):
        results.append({
            f"案例 {i}": {
                "文件": doc.metadata.get("filename", "unknown"),
                "页码": doc.metadata.get("page_number", "unknown"),
                "内容": doc.page_content[:300] + "...",
                "相似度": "高" if i <= 2 else "中"
            }
        })
    
    return json.dumps(results, ensure_ascii=False, indent=2)

组装所有Tools

tools = [
    Tool(
        name="extract_financial_data",
        description="从财务文档中提取关键财务数据,包括金额、日期、合同条款、发票信息等。输入应为要提取的数据类型描述,如'提取合同金额和签署日期'。",
        func=extract_financial_data
    ),
    Tool(
        name="check_compliance",
        description="检查文档是否符合合规要求,包括合同条款合规性、财务数据合规性、发票信息完整性等。输入应为要检查的合规项描述。",
        func=check_compliance
    ),
    Tool(
        name="detect_anomalies",
        description="检测财务数据中的异常,包括金额异常、日期冲突、数据不一致等。输入应为要检测的异常类型描述。",
        func=detect_anomalies
    ),
    Tool(
        name="search_historical_cases",
        description="检索历史审计案例,用于参考和对比。输入应为要检索的案例类型或关键词。",
        func=search_historical_cases
    ),
    Tool(
        name="vector_search",
        description="基于语义检索相关文档片段。输入应为自然语言查询。",
        func=lambda q: "\n\n".join([
            f"[{i+1}] {doc.metadata.get('filename', 'unknown')}\n{doc.page_content[:300]}..."
            for i, doc in enumerate(vector_store.similarity_search(q, k=3))
        ])
    )
]

Step 3:配置 LangChain Agent

from langchain.agents import create_agent
from langchain_community.chat_models import ChatTongyi
import os

llm = ChatTongyi(
    model="qwen-max",
    dashscope_api_key=os.getenv("DASHSCOPE_API_KEY"),
    temperature=0,  # 使用较低温度以获得更确定性的输出
)

agent = create_agent(
    model=llm,
    tools=tools,
    debug=True,
    system_prompt="""你是一个专业的财务审计助手。你的任务是帮助审计人员:
1. 从财务文档中提取关键信息
2. 检查文档的合规性
3. 检测数据异常
4. 检索历史审计案例

在回答时,请:
- 引用具体的文档名称和页码
- 提供详细的数据和检查结果
- 如果发现问题,说明问题的严重程度和建议的处理方式
- 使用工具获取准确的信息,不要猜测
"""
)

Step 4:完整示例代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
财务审计Agent完整示例
"""

import os
import re
import json
from dotenv import load_dotenv
from xparse_client import create_pipeline_from_config
from langchain_core.tools import Tool
from langchain.agents import create_agent
from langchain_community.chat_models import ChatTongyi
from langchain_milvus import Milvus
from langchain_community.embeddings import DashScopeEmbeddings

load_dotenv()

class AuditAgent:
    """财务审计Agent"""
    
    def __init__(self):
        self.setup_pipeline()
        self.setup_vector_store()
        self.setup_agent()
    
    def setup_pipeline(self):
        """配置Pipeline"""
        self.pipeline_config = {
            "source": {
                "type": "local",
                "directory": "./audit_documents",
                "pattern": ["*.pdf", "*.xlsx", "*.xls", "*.png", "*.jpg", "*.txt", "*.docx", "*.doc"]
            },
            "destination": {
                "type": "milvus",
                "db_path": "./audit_vectors.db",
                "collection_name": "audit_documents",
                "dimension": 1024
            },
            "api_base_url": "https://api.textin.com/api/xparse",
            "api_headers": {
                "x-ti-app-id": os.getenv("XTI_APP_ID"),
                "x-ti-secret-code": os.getenv("XTI_SECRET_CODE")
            },
            "stages": [
                {
                    "type": "parse",
                    "config": {"provider": "textin"}
                },
                {
                    "type": "chunk",
                    "config": {
                        "strategy": "by_page",
                        "include_orig_elements": True,
                        "max_characters": 2048,
                        "overlap": 100
                    }
                },
                {
                    "type": "embed",
                    "config": {
                        "provider": "qwen",
                        "model_name": "text-embedding-v3"
                    }
                }
            ]
        }
    
    def setup_vector_store(self):
        """配置向量数据库"""
        self.embedding = DashScopeEmbeddings(
            model="text-embedding-v3",
            dashscope_api_key=os.getenv("DASHSCOPE_API_KEY"),
        )
        self.vector_store = Milvus(
            embedding_function=self.embedding,
            collection_name="audit_documents",
            connection_args={"uri": self.pipeline_config["destination"]["db_path"]},
            primary_field="element_id",
            text_field="text",
            vector_field="embeddings",
            enable_dynamic_field=True,
        )
    
    def setup_agent(self):
        """配置Agent和Tools"""
        # 定义Tools(简化版,完整版见上文)
        tools = [
            Tool(
                name="extract_financial_data",
                description="提取财务数据:金额、日期、合同条款等",
                func=self.extract_financial_data
            ),
            Tool(
                name="check_compliance",
                description="检查合规性:合同条款、财务数据合规性等",
                func=self.check_compliance
            ),
            Tool(
                name="detect_anomalies",
                description="检测异常:金额异常、日期冲突等",
                func=self.detect_anomalies
            ),
            Tool(
                name="vector_search",
                description="语义检索相关文档",
                func=lambda q: "\n\n".join([
                    f"[{i+1}] {doc.metadata.get('filename', 'unknown')}\n{doc.page_content[:300]}..."
                    for i, doc in enumerate(self.vector_store.similarity_search(q, k=3))
                ])
            )
        ]
        
        llm = ChatTongyi(
            model="qwen-max",
            dashscope_api_key=os.getenv("DASHSCOPE_API_KEY"),
            temperature=0,  # 使用较低温度以获得更确定性的输出
        )
        
        self.agent = create_agent(
            model=llm,
            tools=tools,
            debug=True
        )
    
    def extract_financial_data(self, query: str) -> str:
        """提取财务数据"""
        docs = self.vector_store.similarity_search(query, k=5)
        results = []
        for doc in docs:
            text = doc.page_content
            amounts = re.findall(r'[¥$€]\s*[\d,]+\.?\d*|[\d,]+\.?\d*\s*[元美元欧元]', text)
            dates = re.findall(r'\d{4}[-年]\d{1,2}[-月]\d{1,2}[]?', text)
            results.append({
                "file": doc.metadata.get("filename", "unknown"),
                "page": doc.metadata.get("page_number", "unknown"),
                "amounts": amounts[:5],
                "dates": dates[:5],
                "snippet": text[:200]
            })
        return json.dumps(results, ensure_ascii=False, indent=2)
    
    def check_compliance(self, query: str) -> str:
        """合规性检查"""
        docs = self.vector_store.similarity_search(query, k=3)
        issues = []
        for doc in docs:
            text = doc.page_content
            amounts = re.findall(r'[\d,]+\.?\d*', text)
            for amount_str in amounts:
                try:
                    amount = float(amount_str.replace(',', ''))
                    if amount > 1000000:
                        issues.append({
                            "file": doc.metadata.get("filename", "unknown"),
                            "page": doc.metadata.get("page_number", "unknown"),
                            "issue": f"金额 {amount_str} 超过100万,需要特殊审批"
                        })
                except:
                    pass
        return json.dumps(issues, ensure_ascii=False, indent=2) if issues else "✅ 未发现合规性问题"
    
    def detect_anomalies(self, query: str) -> str:
        """异常检测"""
        docs = self.vector_store.similarity_search(query, k=5)
        anomalies = []
        all_amounts = []
        for doc in docs:
            amounts = re.findall(r'[\d,]+\.?\d*', doc.page_content)
            for amount_str in amounts:
                try:
                    amount = float(amount_str.replace(',', ''))
                    # 过滤掉0值和负数,只保留正数金额
                    if amount > 0:
                        all_amounts.append(amount)
                except:
                    pass
        
        # 检查是否有足够的金额数据进行比较,并避免除以零
        if len(all_amounts) >= 2:
            min_amount = min(all_amounts)
            max_amount = max(all_amounts)
            # 确保最小值不为0,避免除以零
            if min_amount > 0:
                ratio = max_amount / min_amount
                if ratio > 1000:
                    anomalies.append(f"金额差异异常:最大金额 {max_amount:,.2f} 元与最小金额 {min_amount:,.2f} 元的比例达到 {ratio:.2f},超过1000倍")
            elif max_amount > 0:
                # 如果最小值为0但最大值不为0,这也是一种异常
                anomalies.append(f"发现零金额异常:存在金额为0的记录,同时存在金额为 {max_amount:,.2f} 元的记录")
        
        return json.dumps(anomalies, ensure_ascii=False, indent=2) if anomalies else "✅ 未发现异常"
    
    def process_documents(self):
        """处理文档"""
        print("=" * 60)
        print("开始处理审计文档...")
        print("=" * 60)
        
        pipeline = create_pipeline_from_config(self.pipeline_config)
        pipeline.run()
        
        print("\n文档处理完成!")
    
    def query(self, question: str) -> str:
        """查询Agent"""
        from langchain_core.messages import HumanMessage
        response = self.agent.invoke({
            "messages": [HumanMessage(content=question)]
        })
        return response["messages"][-1].content

def main():
    """主函数"""
    agent = AuditAgent()
    
    # 1. 处理文档(首次运行)
    agent.process_documents()
    
    # 2. 查询示例
    questions = [
        "提取所有合同中的金额和签署日期",
        "检查这些合同是否符合合规要求",
        "检测是否有金额异常的情况",
        "检索类似的历史审计案例"
    ]
    
    for question in questions:
        print(f"\n{'='*60}")
        print(f"问题: {question}")
        print(f"{'='*60}")
        answer = agent.query(question)
        print(f"\n回答:\n{answer}")

if __name__ == "__main__":
    main()

使用示例

示例1:提取财务数据

agent = AuditAgent()
response = agent.query("从财务报表中提取所有超过10万的金额和对应的日期")
print(response)

示例2:合规性检查

response = agent.query("检查所有合同是否符合以下要求:1) 金额超过100万需要特殊审批 2) 必须包含违约责任条款")
print(response)

示例3:异常检测

response = agent.query("检测财务报表中是否有异常:金额为负数、日期不合理、同一合同金额不一致等")
print(response)

最佳实践

  1. 文档预处理:确保文档格式统一,命名规范(如:合同_2024Q1_供应商A.pdf
  2. 分块策略:使用 by_page 保持页面完整性,便于追溯问题
  3. 原始元素保留:开启 include_orig_elements,便于验证和可视化
  4. 合规规则配置:将合规规则存储在配置文件中,便于更新和维护
  5. 异常阈值设置:根据业务需求设置合理的异常检测阈值
  6. 结果追溯:在Agent回答中包含文档名称和页码,便于人工验证

常见问题

Q: 如何处理加密的PDF文档?
A: 在Parse配置中添加 pdf_pwd 参数,或在Pipeline运行前先解密文档。
Q: 如何提高提取准确率?
A: 1) 使用 text-embedding-v3 模型;2) 优化分块策略;3) 增加检索的文档数量(k值)。
Q: 如何集成到现有审计系统?
A: 可以将Agent封装为REST API,通过HTTP接口调用,或集成到现有的审计工作流中。

相关文档