当您想要进行文档抽取API的多并发请求时,以下是一份完整的示例代码供您参考,您也可以根据实际使用需要进行修改调整。
import json
import aiohttp
import asyncio
import base64
import os
from typing import List, Dict, Any
from pathlib import Path
import glob


class AsyncExtractClient:
    def __init__(self, app_id: str, secret_code: str):
        self.app_id = app_id
        self.secret_code = secret_code

    async def extract(self, file_content: bytes, options: dict, request_body: dict) -> str:
        # 将文件内容转换为base64
        file_base64 = base64.b64encode(file_content).decode('utf-8')

        # 构建请求参数
        params = {}
        for key, value in options.items():
            params[key] = str(value)

        # 设置请求头
        headers = {
            "x-ti-app-id": self.app_id,
            "x-ti-secret-code": self.secret_code,
            "Content-Type": "application/json"
        }

        # 构建完整的请求体,包含base64文件数据
        full_request_body = {
            **request_body,
            "file": file_base64  # 添加base64编码的文件数据
        }

        # 发送异步请求
        async with aiohttp.ClientSession() as session:
            async with session.post(
                    "https://api.textin.com/ai/service/v2/entity_extraction",
                    params=params,
                    headers=headers,
                    json=full_request_body
            ) as response:
                # 检查响应状态
                response.raise_for_status()
                return await response.text()

    async def extract_single_file(self, file_path: str, output_dir: str, options: dict, request_body: dict) -> Dict[
        str, Any]:
        """处理单个文件的提取任务"""
        try:
            # 读取文件
            with open(file_path, "rb") as f:
                file_content = f.read()

            # 执行提取
            response = await self.extract(file_content, options, request_body)

            # 生成输出文件名
            file_name = Path(file_path).stem
            output_file = os.path.join(output_dir, f"{file_name}_result.json")

            # 保存结果到输出文件夹
            with open(output_file, "w", encoding="utf-8") as f:
                f.write(response)

            return {
                "file_path": file_path,
                "output_file": output_file,
                "status": "success",
                "response": response
            }
        except Exception as e:
            return {
                "file_path": file_path,
                "status": "error",
                "error": str(e)
            }

    async def extract_folder(self, input_folder: str, output_folder: str, options: dict, request_body: dict,
                             file_extensions: List[str] = None, max_concurrent: int = 5) -> List[Dict[str, Any]]:
        """处理整个文件夹的文件"""

        # 确保输出文件夹存在
        os.makedirs(output_folder, exist_ok=True)

        # 如果没有指定文件扩展名,默认处理API支持的文档格式
        if file_extensions is None:
            file_extensions = [
                '*.png', '*.jpg', '*.jpeg', '*.pdf', '*.bmp', '*.tiff', '*.webp',
                '*.doc', '*.docx', '*.html', '*.mhtml', '*.xls', '*.xlsx', '*.csv',
                '*.ppt', '*.pptx', '*.txt', '*.ofd', '*.rtf'
            ]

        # 获取所有匹配的文件
        file_paths = []
        for ext in file_extensions:
            pattern = os.path.join(input_folder, ext)
            file_paths.extend(glob.glob(pattern))
            # 也搜索子文件夹
            file_paths.extend(glob.glob(os.path.join(input_folder, "**", ext), recursive=True))

        # 去重并排序
        file_paths = sorted(list(set(file_paths)))

        if not file_paths:
            print(f"在文件夹 {input_folder} 中没有找到匹配的文件")
            return []

        print(f"找到 {len(file_paths)} 个文件待处理")

        # 创建信号量限制并发数
        semaphore = asyncio.Semaphore(max_concurrent)

        async def extract_with_semaphore(file_path: str) -> Dict[str, Any]:
            async with semaphore:
                return await self.extract_single_file(file_path, output_folder, options, request_body)

        # 创建所有任务
        tasks = [extract_with_semaphore(file_path) for file_path in file_paths]

        # 并发执行所有任务
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # 处理异常结果
        processed_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                processed_results.append({
                    "file_path": file_paths[i],
                    "status": "error",
                    "error": str(result)
                })
            else:
                processed_results.append(result)

        return processed_results


async def main():
    # 创建客户端实例,需替换你的API Key
    client = AsyncExtractClient("你的x-ti-app-id", "你的x-ti-secret-code")

    # 指定输入和输出文件夹
    input_folder = "input_files"  # 输入文件夹路径
    output_folder = "output_results"  # 输出文件夹路径

    # 设置URL参数
    options = dict(
        parse_mode="scan"
    )

    # 设置请求体参数
    request_body = dict(
        prompt="请抽取这张小票中的实付金额、消费日期、店铺名称、订单号并以字段格式返回,请抽取货号、商品名称、数量、单价,并以表格格式返回"
    )

    # 可选:指定要处理的文件扩展名
    file_extensions = ['*.pdf', '*.jpg', '*.jpeg', '*.png']  # 可以根据需要调整

    try:
        # 处理整个文件夹
        results = await client.extract_folder(
            input_folder=input_folder,
            output_folder=output_folder,
            options=options,
            request_body=request_body,
            file_extensions=file_extensions,
            max_concurrent=5  # 可以根据需要调整并发数
        )

        # 统计处理结果
        success_count = sum(1 for r in results if r["status"] == "success")
        error_count = len(results) - success_count

        print(f"\n处理完成!")
        print(f"总文件数: {len(results)}")
        print(f"成功处理: {success_count}")
        print(f"处理失败: {error_count}")

        # 显示失败的文件
        if error_count > 0:
            print("\n失败的文件:")
            for result in results:
                if result["status"] == "error":
                    print(f"  - {result['file_path']}: {result['error']}")

        # 保存处理汇总信息
        summary_file = os.path.join(output_folder, "processing_summary.json")
        summary = {
            "input_folder": input_folder,
            "output_folder": output_folder,
            "total_files": len(results),
            "success_count": success_count,
            "error_count": error_count,
            "results": results
        }

        with open(summary_file, "w", encoding="utf-8") as f:
            json.dump(summary, f, ensure_ascii=False, indent=2)

        print(f"\n处理汇总已保存到: {summary_file}")

    except Exception as e:
        print(f"Error: {e}")


if __name__ == "__main__":
    # 运行异步主函数
    asyncio.run(main())
您也可以参考 文档解析:多并发请求 中的内容,或许也会对您有所帮助。