import json
import aiohttp
import asyncio
import base64
import os
from typing import List, Dict, Any
from pathlib import Path
import glob
class AsyncExtractClient:
def __init__(self, app_id: str, secret_code: str):
self.app_id = app_id
self.secret_code = secret_code
async def extract(self, file_content: bytes, options: dict, request_body: dict) -> str:
# 将文件内容转换为base64
file_base64 = base64.b64encode(file_content).decode('utf-8')
# 构建请求参数
params = {}
for key, value in options.items():
params[key] = str(value)
# 设置请求头
headers = {
"x-ti-app-id": self.app_id,
"x-ti-secret-code": self.secret_code,
"Content-Type": "application/json"
}
# 构建完整的请求体,包含base64文件数据
full_request_body = {
**request_body,
"file": file_base64 # 添加base64编码的文件数据
}
# 发送异步请求
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.textin.com/ai/service/v2/entity_extraction",
params=params,
headers=headers,
json=full_request_body
) as response:
# 检查响应状态
response.raise_for_status()
return await response.text()
async def extract_single_file(self, file_path: str, output_dir: str, options: dict, request_body: dict) -> Dict[
str, Any]:
"""处理单个文件的提取任务"""
try:
# 读取文件
with open(file_path, "rb") as f:
file_content = f.read()
# 执行提取
response = await self.extract(file_content, options, request_body)
# 生成输出文件名
file_name = Path(file_path).stem
output_file = os.path.join(output_dir, f"{file_name}_result.json")
# 保存结果到输出文件夹
with open(output_file, "w", encoding="utf-8") as f:
f.write(response)
return {
"file_path": file_path,
"output_file": output_file,
"status": "success",
"response": response
}
except Exception as e:
return {
"file_path": file_path,
"status": "error",
"error": str(e)
}
async def extract_folder(self, input_folder: str, output_folder: str, options: dict, request_body: dict,
file_extensions: List[str] = None, max_concurrent: int = 5) -> List[Dict[str, Any]]:
"""处理整个文件夹的文件"""
# 确保输出文件夹存在
os.makedirs(output_folder, exist_ok=True)
# 如果没有指定文件扩展名,默认处理API支持的文档格式
if file_extensions is None:
file_extensions = [
'*.png', '*.jpg', '*.jpeg', '*.pdf', '*.bmp', '*.tiff', '*.webp',
'*.doc', '*.docx', '*.html', '*.mhtml', '*.xls', '*.xlsx', '*.csv',
'*.ppt', '*.pptx', '*.txt', '*.ofd', '*.rtf'
]
# 获取所有匹配的文件
file_paths = []
for ext in file_extensions:
pattern = os.path.join(input_folder, ext)
file_paths.extend(glob.glob(pattern))
# 也搜索子文件夹
file_paths.extend(glob.glob(os.path.join(input_folder, "**", ext), recursive=True))
# 去重并排序
file_paths = sorted(list(set(file_paths)))
if not file_paths:
print(f"在文件夹 {input_folder} 中没有找到匹配的文件")
return []
print(f"找到 {len(file_paths)} 个文件待处理")
# 创建信号量限制并发数
semaphore = asyncio.Semaphore(max_concurrent)
async def extract_with_semaphore(file_path: str) -> Dict[str, Any]:
async with semaphore:
return await self.extract_single_file(file_path, output_folder, options, request_body)
# 创建所有任务
tasks = [extract_with_semaphore(file_path) for file_path in file_paths]
# 并发执行所有任务
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理异常结果
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
"file_path": file_paths[i],
"status": "error",
"error": str(result)
})
else:
processed_results.append(result)
return processed_results
async def main():
# 创建客户端实例,需替换你的API Key
client = AsyncExtractClient("你的x-ti-app-id", "你的x-ti-secret-code")
# 指定输入和输出文件夹
input_folder = "input_files" # 输入文件夹路径
output_folder = "output_results" # 输出文件夹路径
# 设置URL参数
options = dict(
parse_mode="scan"
)
# 设置请求体参数
request_body = dict(
prompt="请抽取这张小票中的实付金额、消费日期、店铺名称、订单号并以字段格式返回,请抽取货号、商品名称、数量、单价,并以表格格式返回"
)
# 可选:指定要处理的文件扩展名
file_extensions = ['*.pdf', '*.jpg', '*.jpeg', '*.png'] # 可以根据需要调整
try:
# 处理整个文件夹
results = await client.extract_folder(
input_folder=input_folder,
output_folder=output_folder,
options=options,
request_body=request_body,
file_extensions=file_extensions,
max_concurrent=5 # 可以根据需要调整并发数
)
# 统计处理结果
success_count = sum(1 for r in results if r["status"] == "success")
error_count = len(results) - success_count
print(f"\n处理完成!")
print(f"总文件数: {len(results)}")
print(f"成功处理: {success_count}")
print(f"处理失败: {error_count}")
# 显示失败的文件
if error_count > 0:
print("\n失败的文件:")
for result in results:
if result["status"] == "error":
print(f" - {result['file_path']}: {result['error']}")
# 保存处理汇总信息
summary_file = os.path.join(output_folder, "processing_summary.json")
summary = {
"input_folder": input_folder,
"output_folder": output_folder,
"total_files": len(results),
"success_count": success_count,
"error_count": error_count,
"results": results
}
with open(summary_file, "w", encoding="utf-8") as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
print(f"\n处理汇总已保存到: {summary_file}")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
# 运行异步主函数
asyncio.run(main())
此页面对您有帮助吗?