106 lines
3.4 KiB
Python
106 lines
3.4 KiB
Python
|
|
import requests
|
|||
|
|
import json
|
|||
|
|
import math
|
|||
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|||
|
|
|
|||
|
|
def parse_urls(map_json: list[dict]):
|
|||
|
|
"""
|
|||
|
|
解析 Firecrawl 返回的数据
|
|||
|
|
"""
|
|||
|
|
if not map_json:
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
map_obj = map_json[0]
|
|||
|
|
# 稍微做个容错,防止有些时候结构不一样
|
|||
|
|
if not map_obj.get("success", False):
|
|||
|
|
# 如果不是必须抛异常,可以打印日志并返回空
|
|||
|
|
print(f"Firecrawl Map节点返回失败或无数据:{map_obj}")
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
urls = map_obj.get("links", [])
|
|||
|
|
return urls
|
|||
|
|
|
|||
|
|
def send_batch_request(urls_batch: list[str], task_id: int, BASE_URL: str):
|
|||
|
|
"""
|
|||
|
|
发送单个批次的请求
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
# 设置 timeout 是好习惯,防止卡死
|
|||
|
|
# 因为我们不关心返回值,只要发出去就行,timeout设置短一点
|
|||
|
|
res = requests.post(
|
|||
|
|
f"{BASE_URL}/add_urls",
|
|||
|
|
json={
|
|||
|
|
"task_id": task_id,
|
|||
|
|
"urls": urls_batch
|
|||
|
|
},
|
|||
|
|
timeout=10 # 10秒没发完就拉倒,防止拖累主进程
|
|||
|
|
)
|
|||
|
|
if res.status_code == 200:
|
|||
|
|
return True
|
|||
|
|
else:
|
|||
|
|
print(f"Batch failed with status {res.status_code}: {res.text[:100]}")
|
|||
|
|
return False
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"Batch request error: {e}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def main(map_json: list[dict], BASE_URL: str, task_id: float):
|
|||
|
|
|
|||
|
|
# 1. 解析 URL
|
|||
|
|
all_urls = parse_urls(map_json)
|
|||
|
|
total_count = len(all_urls)
|
|||
|
|
|
|||
|
|
if total_count == 0:
|
|||
|
|
return {"msg": "没有解析到URL"}
|
|||
|
|
|
|||
|
|
# ================= 配置区 =================
|
|||
|
|
BATCH_SIZE = 50 # 每一批发送 50 个 URL (根据你后端性能调整)
|
|||
|
|
MAX_WORKERS = 10 # 同时开 10 个线程并发发送
|
|||
|
|
# ==========================================
|
|||
|
|
|
|||
|
|
# 2. 将 URL 切片 (分批)
|
|||
|
|
# 比如 1000 个 URL,切成 20 个 batch
|
|||
|
|
batches = [all_urls[i:i + BATCH_SIZE] for i in range(0, total_count, BATCH_SIZE)]
|
|||
|
|
|
|||
|
|
print(f"总共 {total_count} 个URL,分为 {len(batches)} 批发送,并发数: {MAX_WORKERS}")
|
|||
|
|
|
|||
|
|
# 3. 多线程并发发送
|
|||
|
|
# 在 Dify/Lambda 等环境中,必须等待线程池执行完才能退出 main,
|
|||
|
|
# 否则请求还没发出去进程就被杀掉了。但因为是并发,速度会非常快。
|
|||
|
|
success_batches = 0
|
|||
|
|
|
|||
|
|
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
|
|||
|
|
# 提交所有任务
|
|||
|
|
futures = [
|
|||
|
|
executor.submit(send_batch_request, batch, int(task_id), BASE_URL)
|
|||
|
|
for batch in batches
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# 等待完成 (as_completed)
|
|||
|
|
for future in as_completed(futures):
|
|||
|
|
if future.result():
|
|||
|
|
success_batches += 1
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
"status": "done",
|
|||
|
|
"total_urls": total_count,
|
|||
|
|
"batches_sent": len(batches),
|
|||
|
|
"success_batches": success_batches,
|
|||
|
|
"msg": "已使用多线程并发发送数据,忽略详细返回值"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
def test():
|
|||
|
|
import json
|
|||
|
|
from time import time
|
|||
|
|
with open("nodes\parse_and_add_urls.json", "r", encoding="utf-8") as f:
|
|||
|
|
data = json.load(f)
|
|||
|
|
map_json = data["json"]
|
|||
|
|
BASE_URL = "http://47.122.127.178"
|
|||
|
|
task_id = 6
|
|||
|
|
start_time = time()
|
|||
|
|
res = main(map_json, BASE_URL, task_id)
|
|||
|
|
end_time = time()
|
|||
|
|
print(f"添加URL耗时:{end_time - start_time}秒")
|
|||
|
|
print(res)
|
|||
|
|
|
|||
|
|
test()
|