diff --git a/backend/service.py b/backend/service.py index a7271b3..4110825 100644 --- a/backend/service.py +++ b/backend/service.py @@ -64,64 +64,104 @@ class CrawlerService: ).values(status='processing') conn.execute(upd) return {"urls": urls} +from sqlalchemy import select, insert, update, and_ +from .database import db_instance +from .utils import normalize_url + +class CrawlerService: + def __init__(self): + self.db = db_instance def save_results(self, task_id: int, results: list): - """通用 API 实现的 UPSERT 逻辑:区分插入、更新、失败""" - inserted_urls, updated_urls, failed_urls = [], [], [] + """ + 保存同一 URL 的多个切片。 + 返回:该 URL 下切片的详细处理统计及页面更新状态。 + """ + if not results: + return {"msg": "No data provided"} + + # 1. 基础信息提取 (假设 results 里的 source_url 都是一致的) + first_item = results[0] if isinstance(results[0], dict) else results[0].__dict__ + target_url = normalize_url(first_item.get('source_url')) + # 结果统计容器 + inserted_chunks = [] + updated_chunks = [] + failed_chunks = [] + is_page_update = False + with self.db.engine.begin() as conn: + # 2. 判断该 URL 是否已经有切片存在 (以此判定是否为“页面更新”) + check_page_stmt = select(self.db.chunks.c.id).where( + and_(self.db.chunks.c.task_id == task_id, self.db.chunks.c.source_url == target_url) + ).limit(1) + if conn.execute(check_page_stmt).fetchone(): + is_page_update = True + + # 3. 逐个处理切片 for res in results: - # 适配 Dify 传来的字典或对象 data = res if isinstance(res, dict) else res.__dict__ - clean_url = normalize_url(data.get('source_url')) c_idx = data.get('chunk_index') try: - # 1. 检查是否存在该切片 - find_q = select(self.db.chunks).where( + # 检查具体某个 index 的切片是否存在 + find_chunk_stmt = select(self.db.chunks.c.id).where( and_( self.db.chunks.c.task_id == task_id, - self.db.chunks.c.source_url == clean_url, + self.db.chunks.c.source_url == target_url, self.db.chunks.c.chunk_index == c_idx ) ) - existing = conn.execute(find_q).fetchone() + existing_chunk = conn.execute(find_chunk_stmt).fetchone() - if existing: - # 2. 执行更新 API - upd = update(self.db.chunks).where(self.db.chunks.c.id == existing[0]).values( + if existing_chunk: + # 覆盖更新现有切片 + upd_stmt = update(self.db.chunks).where( + self.db.chunks.c.id == existing_chunk[0] + ).values( title=data.get('title'), content=data.get('content'), embedding=data.get('embedding') ) - conn.execute(upd) - updated_urls.append(clean_url) + conn.execute(upd_stmt) + updated_chunks.append(c_idx) else: - # 3. 执行插入 API - ins = insert(self.db.chunks).values( + # 插入新切片 + ins_stmt = insert(self.db.chunks).values( task_id=task_id, - source_url=clean_url, + source_url=target_url, chunk_index=c_idx, title=data.get('title'), content=data.get('content'), embedding=data.get('embedding') ) - conn.execute(ins) - inserted_urls.append(clean_url) - - # 4. 更新队列状态 - conn.execute(update(self.db.queue).where( - and_(self.db.queue.c.task_id == task_id, self.db.queue.c.url == clean_url) - ).values(status='completed')) + conn.execute(ins_stmt) + inserted_chunks.append(c_idx) except Exception as e: - print(f"Error: {e}") - failed_urls.append(clean_url) + print(f"Chunk {c_idx} failed: {e}") + failed_chunks.append(c_idx) + + # 4. 最终更新队列状态 + conn.execute( + update(self.db.queue).where( + and_(self.db.queue.c.task_id == task_id, self.db.queue.c.url == target_url) + ).values(status='completed') + ) return { - "inserted_urls": list(set(inserted_urls)), - "updated_urls": list(set(updated_urls)), - "failed_urls": failed_urls + "source_url": target_url, + "is_page_update": is_page_update, # 标志:此页面此前是否有过内容 + "detail": { + "inserted_chunk_indexes": inserted_chunks, + "updated_chunk_indexes": updated_chunks, + "failed_chunk_indexes": failed_chunks + }, + "counts": { + "inserted": len(inserted_chunks), + "updated": len(updated_chunks), + "failed": len(failed_chunks) + } } crawler_service = CrawlerService() \ No newline at end of file