import requests import json import math from concurrent.futures import ThreadPoolExecutor, as_completed def parse_urls(map_json: list[dict]): """ 解析 Firecrawl 返回的数据 """ if not map_json: return [] map_obj = map_json[0] # 稍微做个容错,防止有些时候结构不一样 if not map_obj.get("success", False): # 如果不是必须抛异常,可以打印日志并返回空 print(f"Firecrawl Map节点返回失败或无数据:{map_obj}") return [] urls = map_obj.get("links", []) return urls def send_batch_request(urls_batch: list[str], task_id: int, BASE_URL: str): """ 发送单个批次的请求 """ try: # 设置 timeout 是好习惯,防止卡死 # 因为我们不关心返回值,只要发出去就行,timeout设置短一点 res = requests.post( f"{BASE_URL}/add_urls", json={ "task_id": task_id, "urls": urls_batch }, timeout=10 # 10秒没发完就拉倒,防止拖累主进程 ) if res.status_code == 200: return True else: print(f"Batch failed with status {res.status_code}: {res.text[:100]}") return False except Exception as e: print(f"Batch request error: {e}") return False def main(map_json: list[dict], BASE_URL: str, task_id: float): # 1. 解析 URL all_urls = parse_urls(map_json) total_count = len(all_urls) if total_count == 0: return {"msg": "没有解析到URL"} # ================= 配置区 ================= BATCH_SIZE = 50 # 每一批发送 50 个 URL (根据你后端性能调整) MAX_WORKERS = 10 # 同时开 10 个线程并发发送 # ========================================== # 2. 将 URL 切片 (分批) # 比如 1000 个 URL,切成 20 个 batch batches = [all_urls[i:i + BATCH_SIZE] for i in range(0, total_count, BATCH_SIZE)] print(f"总共 {total_count} 个URL,分为 {len(batches)} 批发送,并发数: {MAX_WORKERS}") # 3. 多线程并发发送 # 在 Dify/Lambda 等环境中,必须等待线程池执行完才能退出 main, # 否则请求还没发出去进程就被杀掉了。但因为是并发,速度会非常快。 success_batches = 0 with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: # 提交所有任务 futures = [ executor.submit(send_batch_request, batch, int(task_id), BASE_URL) for batch in batches ] # 等待完成 (as_completed) for future in as_completed(futures): if future.result(): success_batches += 1 return { "status": "done", "total_urls": total_count, "batches_sent": len(batches), "success_batches": success_batches, "msg": "已使用多线程并发发送数据,忽略详细返回值" } def test(): import json from time import time with open("nodes\parse_and_add_urls.json", "r", encoding="utf-8") as f: data = json.load(f) map_json = data["json"] BASE_URL = "http://47.122.127.178" task_id = 6 start_time = time() res = main(map_json, BASE_URL, task_id) end_time = time() print(f"添加URL耗时:{end_time - start_time}秒") print(res) test()