""" 因为map_json和history_json的传入格式不同,在main里改太麻烦 为了解耦,所以写两个函数将map和history转换为标准结构 urls = [ "https://www.baidu.com", "https://www.taobao.com", "https://www.jd.com", "https://www.1688.com", "https://www.taobao.com", "https://www.jd.com", "https://www.1688.com", ] """ def map_json_transform(map_json: list[dict]) -> list[dict]: """ 将Map节点的输出转为干净的输出,避免杂七杂八的数据干扰 输入: Map节点的输出 输出: 转换后的 JSON 数组,每个元素包含 title, url, markdown 字段 """ try: # 如果 Dify 传入的是字符串,先转为字典 map_obj = json.loads(map_json) if isinstance(map_json, str) else map_json # map_obj首先被列表包裹 map_obj = map_obj[0] # 直接从json里提取 links 数组 if isinstance(map_obj, dict) and "links" in map_obj: links = map_obj["links"] except Exception as e: return {"targets": [], "msg": f"Map数据解析失败: {str(e)}"} return {"targets": links, "msg": "Map数据解析成功"} def history_json_transform(history_json: list[dict]) -> list[dict]: """ 将History节点的输出转为干净的输出,避免杂七杂八的数据干扰 输入: History节点的输出 输出: 转换后的 JSON 数组,每个元素包含 url 字段 """ try: # 如果 Dify 传入的是字符串,先转为字典 hist_obj = json.loads(history_json) if isinstance(history_json, str) else history_json # hist_obj首先被列表包裹 hist_obj = hist_obj[0] # 将data解析出来 hist_obj = hist_obj['data'][0] # 得到result # 直接从json里提取 hist_data 数组 if isinstance(hist_obj, dict) and "data" in hist_obj: hist_data = hist_obj["data"] except Exception as e: return {"targets": [], "msg": f"History数据解析失败: {str(e)}"} return {"targets": hist_data, "msg": "History数据解析成功"} def main(map_json: list[dict], history_json: list[dict], batch_size: float): """ map_json: Firecrawl Map 节点的输出 history_json: 数据库查询节点的输出 (包含 hist_data 数组) batch_size: 每次处理的数量 """ # 1. 解析 Map 数据 (全量链接) all_links = [] try: # 如果 Dify 传入的是字符串,先转为字典 map_obj = json.loads(map_json) if isinstance(map_json, str) else map_json # map_obj首先被列表包裹 map_obj = map_obj[0] # 直接从json里提取 links 数组 if isinstance(map_obj, dict) and "links" in map_obj: all_links = map_obj["links"] except Exception as e: return {"targets": [], "msg": f"Map数据解析失败: {str(e)}"} # 2. 解析 History 数据 (已完成链接) completed_set = set() try: hist_obj = json.loads(history_json) if isinstance(history_json, str) else history_json # hist_obj首先被列表包裹 hist_obj = hist_obj[0] # 直接从json里提取 hist_data 数组 if isinstance(hist_obj, dict) and "data" in hist_obj: hist_data = hist_obj["data"] # 将 hist_data 里的 url 提取出来放入集合 (Set) 做 O(1) 查找 for item in hist_data: if isinstance(item, dict): url_val = item.get("url") if url_val: completed_set.add(url_val) except Exception as e: return {"targets": [], "msg": f"History数据解析失败: {str(e)}"} # 3. 核心逻辑: 过滤 (Diff) targets = [] for link in all_links: # A. 基础校验: 必须是字符串且以 http 开头 if not isinstance(link, str) or not link.startswith("http"): continue # B. 过滤 sitemap.xml 自身 if link.endswith(".xml") or "sitemap" in link.split("/")[-1]: continue # C. 过滤已爬取的 (关键步骤) if link in completed_set: continue targets.append(link) # 4. 分批 (Batch) try: limit = int(batch_size) except: limit = 10 # 默认值 current_batch = targets[:limit] remaining_count = len(targets) - len(current_batch) # 5. 返回结果 return { "targets": current_batch, "count_urls": len(all_links), "count_completed": len(completed_set), "count_remaining": remaining_count, "count_error": 0 # "msg": f"全站发现: {len(all_links)} | 已入库: {len(completed_set)} | 本次执行: {len(current_batch)} | 剩余待爬: {remaining_count}" } if __name__ == "__main__": import json with open("anyscript\wiki_crawler\diff.json", "r", encoding="utf-8") as f: data = json.load(f) map_json = data["map_json"] history_json = data["history_json"] batch_size = data["batch_size"] result = main(map_json, history_json, batch_size) print(json.dumps(result, ensure_ascii=False, indent=2))