跳转到主要内容
本教程面向需要使用 API 的场景,展示如何使用 xParse 的 API 接口进行文档处理。

概述

xParse 除了提供 Pipeline API 外,还提供了其他 API 接口,适用于不同的使用场景:
  • Parse 同步 API:同步执行文档解析,立即返回结果
  • Parse 异步 API:异步执行文档解析,通过 job_id 查询状态
  • Extract 同步 API:同步执行文档抽取,立即返回结果

Parse 同步 API

适用场景

  • 只需要文档解析功能
  • 需要同步获取结果
  • 处理单个文档或小文件(<10MB)

使用示例

import requests
import json

url = "https://api.textin.com/api/xparse/parse/sync"

# 准备文件
with open('document.pdf', 'rb') as f:
    file_content = f.read()

# 准备配置(可选)
config = {
    "provider": "textin",
    "parse_mode": "auto"
}

# 发送请求
files = {'file': ('document.pdf', file_content, 'application/pdf')}
data = {
    'config': json.dumps(config) if config else None
}

headers = {
    'x-ti-app-id': 'your-app-id',
    'x-ti-secret-code': 'your-secret-code'
}

response = requests.post(url, files=files, data=data, headers=headers)
result = response.json()

if result['code'] == 200 and result['data']['status'] == 'completed':
    elements = result['data']['result']['elements']
    print(f"解析成功,共 {len(elements)} 个元素")
else:
    print(f"解析失败: {result.get('data', {}).get('message', 'Unknown error')}")

返回结果

返回结果与 Pipeline Parse 节点完全一致,包含 elements 数组。 API文档Parse同步API

Parse 异步 API

适用场景

  • 处理大文件(>10MB)或批量文件
  • 需要异步处理,避免长时间等待
  • 需要 webhook 回调通知

使用示例

1. 创建异步任务

import requests
import json
import time

url = "https://api.textin.com/api/xparse/parse/async"

# 准备文件
with open('large_document.pdf', 'rb') as f:
    file_content = f.read()

# 准备配置(可选)
config = {
    "provider": "textin",
    "parse_mode": "auto"
}

# 可选:设置 webhook
webhook = "https://your-server.com/webhook"

# 发送请求
files = {'file': ('large_document.pdf', file_content, 'application/pdf')}
data = {
    'config': json.dumps(config) if config else None,
    'webhook': webhook if webhook else None
}

headers = {
    'x-ti-app-id': 'your-app-id',
    'x-ti-secret-code': 'your-secret-code'
}

response = requests.post(url, files=files, data=data, headers=headers)
result = response.json()

if result['code'] == 200:
    job_id = result['data']['job_id']
    print(f"任务创建成功,job_id: {job_id}")
else:
    print(f"任务创建失败: {result.get('msg', 'Unknown error')}")

2. 查询任务状态

def check_job_status(job_id):
    url = f"https://api.textin.com/api/xparse/parse/async/{job_id}"
    
    headers = {
        'x-ti-app-id': 'your-app-id',
        'x-ti-secret-code': 'your-secret-code'
    }
    
    response = requests.get(url, headers=headers)
    result = response.json()
    
    if result['code'] == 200:
        status = result['data']['status']
        return status, result['data']
    else:
        return None, None

# 轮询查询状态
job_id = "xxx"  # 从创建任务返回的结果中获取

while True:
    status, data = check_job_status(job_id)
    
    if status == 'completed':
        print(f"任务完成")
        # 查询结果
        elements = get_job_result(job_id)
        break
    elif status == 'failed':
        print(f"任务失败: {data.get('message', 'Unknown error')}")
        break
    elif status in ['pending', 'in_progress']:
        print(f"任务处理中,状态: {status}")
        time.sleep(5)  # 等待5秒后再次查询
    else:
        print(f"未知状态: {status}")
        break

3. 查询任务结果

当任务状态为 completed 时,可以通过结果查询API获取详细的解析结果:
def get_job_result(job_id):
    url = f"https://api.textin.com/api/xparse/parse/async/result/{job_id}"
    
    headers = {
        'x-ti-app-id': 'your-app-id',
        'x-ti-secret-code': 'your-secret-code'
    }
    
    response = requests.get(url, headers=headers)
    result = response.json()
    
    if result['code'] == 200:
        elements = result['data']['elements']
        return elements
    else:
        print(f"查询结果失败: {result.get('msg', 'Unknown error')}")
        return None

# 查询结果
job_id = "xxx"  # 任务ID
elements = get_job_result(job_id)
if elements:
    print(f"解析成功,共 {len(elements)} 个元素")

4. Webhook 回调

如果设置了 webhook,当任务完成或失败时,系统会调用该 URL:
from flask import Flask, request

app = Flask(__name__)

@app.route('/webhook', methods=['POST'])
def webhook():
    data = request.json
    job_id = data.get('job_id')
    status = data.get('status')
    
    if status == 'completed':
        result_url = data.get('result_url')
        # 处理完成的任务
        print(f"任务 {job_id} 完成,结果URL: {result_url}")
    elif status == 'failed':
        message = data.get('message')
        # 处理失败的任务
        print(f"任务 {job_id} 失败: {message}")
    
    return {'status': 'ok'}

if __name__ == '__main__':
    app.run(port=5000)

Extract 同步 API

适用场景

  • 只需要抽取功能
  • 不需要保留解析结果
  • 处理单个文档

使用示例

import requests
import json

url = "https://api.textin.com/api/xparse/extract/sync"

# 准备文件
with open('invoice.pdf', 'rb') as f:
    file_content = f.read()

# 准备抽取配置
extract_config = {
    "schema": {
        "type": "object",
        "properties": {
            "发票号码": {
                "type": ["string", "null"],
                "description": "发票号码"
            },
            "开票日期": {
                "type": ["string", "null"],
                "description": "开票日期"
            },
            "金额": {
                "type": ["number", "null"],
                "description": "发票金额"
            }
        },
        "required": ["发票号码", "开票日期", "金额"]
    },
    "generate_citations": False,
    "stamp": False
}

# 可选:Parse配置
parse_config = {
    "provider": "textin",
    "parse_mode": "auto"
}

# 发送请求
files = {'file': ('invoice.pdf', file_content, 'application/pdf')}
data = {
    'extract_config': json.dumps(extract_config),
    'parse_config': json.dumps(parse_config) if parse_config else None
}

headers = {
    'x-ti-app-id': 'your-app-id',
    'x-ti-secret-code': 'your-secret-code'
}

response = requests.post(url, files=files, data=data, headers=headers)
result = response.json()

if result['code'] == 200 and result['data']['status'] == 'completed':
    extracted_schema = result['data']['result']['extracted_schema']
    print(f"抽取成功: {extracted_schema}")
else:
    print(f"抽取失败: {result.get('data', {}).get('message', 'Unknown error')}")
API文档Extract同步API

Parse 同步 vs 异步 API 详细对比

特性同步API异步API
响应方式立即返回结果返回job_id,需查询状态
等待时间需要等待处理完成立即返回,后台处理
适用场景小文件、单个文档大文件、批量处理
超时限制受HTTP超时限制无超时限制
Webhook支持不支持支持
使用复杂度简单需要轮询或webhook
性能适合小文件适合大文件
错误处理直接返回错误需要查询状态获取错误

选择建议

  • 小文件(<10MB):使用同步API,简单直接
  • 大文件(>10MB)或批量处理:使用异步API,避免超时
  • 需要实时响应:使用同步API
  • 需要后台处理:使用异步API + Webhook
  • 需要组合处理:使用Pipeline API

Pipeline API vs 其他API

Pipeline API

优点
  • 支持组合多个处理阶段
  • 统一的接口和配置
  • 支持批量处理
  • 返回统一的统计信息
缺点
  • 需要配置多个stage
  • 不能单独使用某个功能
适用场景
  • 需要组合多个处理阶段
  • 需要批量处理文档
  • 需要将结果存储到向量数据库

其他API

优点
  • 使用简单,只需配置单个功能
  • 适合单一功能场景
  • 异步API支持大文件处理
缺点
  • 不能组合多个处理阶段
  • 需要自行实现批量处理
适用场景
  • 只需要单个功能
  • 处理单个文档
  • 需要异步处理大文件

最佳实践

  1. API选择
    • 根据文件大小选择同步或异步API
    • 根据需求选择Pipeline或其他API
    • 考虑错误处理和重试机制
  2. 性能优化
    • 小文件使用同步API
    • 大文件使用异步API
    • 批量处理使用Pipeline API
  3. 错误处理
    • 检查返回的code和status
    • 处理网络错误和超时
    • 实现重试机制
  4. Webhook使用
    • 设置可靠的webhook URL
    • 处理webhook回调
    • 实现幂等性处理

常见问题

Q: 什么时候使用同步API,什么时候使用异步API?

A:
  • 文件小于10MB:使用同步API
  • 文件大于10MB或批量处理:使用异步API
  • 需要实时响应:使用同步API
  • 需要后台处理:使用异步API

Q: Pipeline API和其他API有什么区别?

A:
  • Pipeline API支持组合多个处理阶段,其他API只支持单个功能
  • Pipeline API适合批量处理,其他API适合单个文档
  • Pipeline API返回统一的统计信息,其他API返回特定功能的结果

Q: 异步API的webhook是必须的吗?

A: 不是必须的。如果不设置webhook,可以通过轮询查询任务状态。

相关文档