当您想要进行文档抽取API的多并发请求时,以下是一份完整的示例代码供您参考,您也可以根据实际使用需要进行修改调整。Documentation Index
Fetch the complete documentation index at: https://docs.textin.com/llms.txt
Use this file to discover all available pages before exploring further.
import json
import aiohttp
import asyncio
import base64
import os
from typing import List, Dict, Any
from pathlib import Path
import glob
class AsyncExtractClient:
def __init__(self, app_id: str, secret_code: str):
self.app_id = app_id
self.secret_code = secret_code
async def extract(self, file_content: bytes, options: dict, request_body: dict) -> str:
# 将文件内容转换为base64
file_base64 = base64.b64encode(file_content).decode('utf-8')
# 构建请求参数
params = {}
for key, value in options.items():
params[key] = str(value)
# 设置请求头
headers = {
"x-ti-app-id": self.app_id,
"x-ti-secret-code": self.secret_code,
"Content-Type": "application/json"
}
# 构建完整的请求体,包含base64文件数据
full_request_body = {
**request_body,
"file": file_base64 # 添加base64编码的文件数据
}
# 发送异步请求
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.textin.com/ai/service/v2/entity_extraction",
params=params,
headers=headers,
json=full_request_body
) as response:
# 检查响应状态
response.raise_for_status()
return await response.text()
async def extract_single_file(self, file_path: str, output_dir: str, options: dict, request_body: dict) -> Dict[
str, Any]:
"""处理单个文件的提取任务"""
try:
# 读取文件
with open(file_path, "rb") as f:
file_content = f.read()
# 执行提取
response = await self.extract(file_content, options, request_body)
# 生成输出文件名
file_name = Path(file_path).stem
output_file = os.path.join(output_dir, f"{file_name}_result.json")
# 保存结果到输出文件夹
with open(output_file, "w", encoding="utf-8") as f:
f.write(response)
return {
"file_path": file_path,
"output_file": output_file,
"status": "success",
"response": response
}
except Exception as e:
return {
"file_path": file_path,
"status": "error",
"error": str(e)
}
async def extract_folder(self, input_folder: str, output_folder: str, options: dict, request_body: dict,
file_extensions: List[str] = None, max_concurrent: int = 5) -> List[Dict[str, Any]]:
"""处理整个文件夹的文件"""
# 确保输出文件夹存在
os.makedirs(output_folder, exist_ok=True)
# 如果没有指定文件扩展名,默认处理API支持的文档格式
if file_extensions is None:
file_extensions = [
'*.png', '*.jpg', '*.jpeg', '*.pdf', '*.bmp', '*.tiff', '*.webp',
'*.doc', '*.docx', '*.html', '*.mhtml', '*.xls', '*.xlsx', '*.csv',
'*.ppt', '*.pptx', '*.txt', '*.ofd', '*.rtf'
]
# 获取所有匹配的文件
file_paths = []
for ext in file_extensions:
pattern = os.path.join(input_folder, ext)
file_paths.extend(glob.glob(pattern))
# 也搜索子文件夹
file_paths.extend(glob.glob(os.path.join(input_folder, "**", ext), recursive=True))
# 去重并排序
file_paths = sorted(list(set(file_paths)))
if not file_paths:
print(f"在文件夹 {input_folder} 中没有找到匹配的文件")
return []
print(f"找到 {len(file_paths)} 个文件待处理")
# 创建信号量限制并发数
semaphore = asyncio.Semaphore(max_concurrent)
async def extract_with_semaphore(file_path: str) -> Dict[str, Any]:
async with semaphore:
return await self.extract_single_file(file_path, output_folder, options, request_body)
# 创建所有任务
tasks = [extract_with_semaphore(file_path) for file_path in file_paths]
# 并发执行所有任务
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理异常结果
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
"file_path": file_paths[i],
"status": "error",
"error": str(result)
})
else:
processed_results.append(result)
return processed_results
async def main():
# 创建客户端实例,需替换你的API Key
client = AsyncExtractClient("你的x-ti-app-id", "你的x-ti-secret-code")
# 指定输入和输出文件夹
input_folder = "input_files" # 输入文件夹路径
output_folder = "output_results" # 输出文件夹路径
# 设置URL参数
options = dict(
parse_mode="scan"
)
# 设置请求体参数
request_body = dict(
prompt="请抽取这张小票中的实付金额、消费日期、店铺名称、订单号并以字段格式返回,请抽取货号、商品名称、数量、单价,并以表格格式返回"
)
# 可选:指定要处理的文件扩展名
file_extensions = ['*.pdf', '*.jpg', '*.jpeg', '*.png'] # 可以根据需要调整
try:
# 处理整个文件夹
results = await client.extract_folder(
input_folder=input_folder,
output_folder=output_folder,
options=options,
request_body=request_body,
file_extensions=file_extensions,
max_concurrent=5 # 可以根据需要调整并发数
)
# 统计处理结果
success_count = sum(1 for r in results if r["status"] == "success")
error_count = len(results) - success_count
print(f"\n处理完成!")
print(f"总文件数: {len(results)}")
print(f"成功处理: {success_count}")
print(f"处理失败: {error_count}")
# 显示失败的文件
if error_count > 0:
print("\n失败的文件:")
for result in results:
if result["status"] == "error":
print(f" - {result['file_path']}: {result['error']}")
# 保存处理汇总信息
summary_file = os.path.join(output_folder, "processing_summary.json")
summary = {
"input_folder": input_folder,
"output_folder": output_folder,
"total_files": len(results),
"success_count": success_count,
"error_count": error_count,
"results": results
}
with open(summary_file, "w", encoding="utf-8") as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
print(f"\n处理汇总已保存到: {summary_file}")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
# 运行异步主函数
asyncio.run(main())

