aaaa
This commit is contained in:
106
nodes/parse_and_add_urls.py
Normal file
106
nodes/parse_and_add_urls.py
Normal file
@@ -0,0 +1,106 @@
|
||||
import requests
|
||||
import json
|
||||
import math
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
def parse_urls(map_json: list[dict]):
|
||||
"""
|
||||
解析 Firecrawl 返回的数据
|
||||
"""
|
||||
if not map_json:
|
||||
return []
|
||||
|
||||
map_obj = map_json[0]
|
||||
# 稍微做个容错,防止有些时候结构不一样
|
||||
if not map_obj.get("success", False):
|
||||
# 如果不是必须抛异常,可以打印日志并返回空
|
||||
print(f"Firecrawl Map节点返回失败或无数据:{map_obj}")
|
||||
return []
|
||||
|
||||
urls = map_obj.get("links", [])
|
||||
return urls
|
||||
|
||||
def send_batch_request(urls_batch: list[str], task_id: int, BASE_URL: str):
|
||||
"""
|
||||
发送单个批次的请求
|
||||
"""
|
||||
try:
|
||||
# 设置 timeout 是好习惯,防止卡死
|
||||
# 因为我们不关心返回值,只要发出去就行,timeout设置短一点
|
||||
res = requests.post(
|
||||
f"{BASE_URL}/add_urls",
|
||||
json={
|
||||
"task_id": task_id,
|
||||
"urls": urls_batch
|
||||
},
|
||||
timeout=10 # 10秒没发完就拉倒,防止拖累主进程
|
||||
)
|
||||
if res.status_code == 200:
|
||||
return True
|
||||
else:
|
||||
print(f"Batch failed with status {res.status_code}: {res.text[:100]}")
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"Batch request error: {e}")
|
||||
return False
|
||||
|
||||
def main(map_json: list[dict], BASE_URL: str, task_id: float):
|
||||
|
||||
# 1. 解析 URL
|
||||
all_urls = parse_urls(map_json)
|
||||
total_count = len(all_urls)
|
||||
|
||||
if total_count == 0:
|
||||
return {"msg": "没有解析到URL"}
|
||||
|
||||
# ================= 配置区 =================
|
||||
BATCH_SIZE = 50 # 每一批发送 50 个 URL (根据你后端性能调整)
|
||||
MAX_WORKERS = 10 # 同时开 10 个线程并发发送
|
||||
# ==========================================
|
||||
|
||||
# 2. 将 URL 切片 (分批)
|
||||
# 比如 1000 个 URL,切成 20 个 batch
|
||||
batches = [all_urls[i:i + BATCH_SIZE] for i in range(0, total_count, BATCH_SIZE)]
|
||||
|
||||
print(f"总共 {total_count} 个URL,分为 {len(batches)} 批发送,并发数: {MAX_WORKERS}")
|
||||
|
||||
# 3. 多线程并发发送
|
||||
# 在 Dify/Lambda 等环境中,必须等待线程池执行完才能退出 main,
|
||||
# 否则请求还没发出去进程就被杀掉了。但因为是并发,速度会非常快。
|
||||
success_batches = 0
|
||||
|
||||
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
|
||||
# 提交所有任务
|
||||
futures = [
|
||||
executor.submit(send_batch_request, batch, int(task_id), BASE_URL)
|
||||
for batch in batches
|
||||
]
|
||||
|
||||
# 等待完成 (as_completed)
|
||||
for future in as_completed(futures):
|
||||
if future.result():
|
||||
success_batches += 1
|
||||
|
||||
return {
|
||||
"status": "done",
|
||||
"total_urls": total_count,
|
||||
"batches_sent": len(batches),
|
||||
"success_batches": success_batches,
|
||||
"msg": "已使用多线程并发发送数据,忽略详细返回值"
|
||||
}
|
||||
|
||||
def test():
|
||||
import json
|
||||
from time import time
|
||||
with open("nodes\parse_and_add_urls.json", "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
map_json = data["json"]
|
||||
BASE_URL = "http://47.122.127.178"
|
||||
task_id = 6
|
||||
start_time = time()
|
||||
res = main(map_json, BASE_URL, task_id)
|
||||
end_time = time()
|
||||
print(f"添加URL耗时:{end_time - start_time}秒")
|
||||
print(res)
|
||||
|
||||
test()
|
||||
Reference in New Issue
Block a user