修改save_results

This commit is contained in:
2025-12-22 22:33:12 +08:00
parent 75b32b4bcd
commit 307876ff5e

View File

@@ -64,64 +64,104 @@ class CrawlerService:
).values(status='processing')
conn.execute(upd)
return {"urls": urls}
from sqlalchemy import select, insert, update, and_
from .database import db_instance
from .utils import normalize_url
class CrawlerService:
def __init__(self):
self.db = db_instance
def save_results(self, task_id: int, results: list):
"""通用 API 实现的 UPSERT 逻辑:区分插入、更新、失败"""
inserted_urls, updated_urls, failed_urls = [], [], []
"""
保存同一 URL 的多个切片。
返回:该 URL 下切片的详细处理统计及页面更新状态。
"""
if not results:
return {"msg": "No data provided"}
# 1. 基础信息提取 (假设 results 里的 source_url 都是一致的)
first_item = results[0] if isinstance(results[0], dict) else results[0].__dict__
target_url = normalize_url(first_item.get('source_url'))
# 结果统计容器
inserted_chunks = []
updated_chunks = []
failed_chunks = []
is_page_update = False
with self.db.engine.begin() as conn:
# 2. 判断该 URL 是否已经有切片存在 (以此判定是否为“页面更新”)
check_page_stmt = select(self.db.chunks.c.id).where(
and_(self.db.chunks.c.task_id == task_id, self.db.chunks.c.source_url == target_url)
).limit(1)
if conn.execute(check_page_stmt).fetchone():
is_page_update = True
# 3. 逐个处理切片
for res in results:
# 适配 Dify 传来的字典或对象
data = res if isinstance(res, dict) else res.__dict__
clean_url = normalize_url(data.get('source_url'))
c_idx = data.get('chunk_index')
try:
# 1. 检查是否存在该切片
find_q = select(self.db.chunks).where(
# 检查具体某个 index 的切片是否存在
find_chunk_stmt = select(self.db.chunks.c.id).where(
and_(
self.db.chunks.c.task_id == task_id,
self.db.chunks.c.source_url == clean_url,
self.db.chunks.c.source_url == target_url,
self.db.chunks.c.chunk_index == c_idx
)
)
existing = conn.execute(find_q).fetchone()
existing_chunk = conn.execute(find_chunk_stmt).fetchone()
if existing:
# 2. 执行更新 API
upd = update(self.db.chunks).where(self.db.chunks.c.id == existing[0]).values(
if existing_chunk:
# 覆盖更新现有切片
upd_stmt = update(self.db.chunks).where(
self.db.chunks.c.id == existing_chunk[0]
).values(
title=data.get('title'),
content=data.get('content'),
embedding=data.get('embedding')
)
conn.execute(upd)
updated_urls.append(clean_url)
conn.execute(upd_stmt)
updated_chunks.append(c_idx)
else:
# 3. 执行插入 API
ins = insert(self.db.chunks).values(
# 插入新切片
ins_stmt = insert(self.db.chunks).values(
task_id=task_id,
source_url=clean_url,
source_url=target_url,
chunk_index=c_idx,
title=data.get('title'),
content=data.get('content'),
embedding=data.get('embedding')
)
conn.execute(ins)
inserted_urls.append(clean_url)
# 4. 更新队列状态
conn.execute(update(self.db.queue).where(
and_(self.db.queue.c.task_id == task_id, self.db.queue.c.url == clean_url)
).values(status='completed'))
conn.execute(ins_stmt)
inserted_chunks.append(c_idx)
except Exception as e:
print(f"Error: {e}")
failed_urls.append(clean_url)
print(f"Chunk {c_idx} failed: {e}")
failed_chunks.append(c_idx)
# 4. 最终更新队列状态
conn.execute(
update(self.db.queue).where(
and_(self.db.queue.c.task_id == task_id, self.db.queue.c.url == target_url)
).values(status='completed')
)
return {
"inserted_urls": list(set(inserted_urls)),
"updated_urls": list(set(updated_urls)),
"failed_urls": failed_urls
"source_url": target_url,
"is_page_update": is_page_update, # 标志:此页面此前是否有过内容
"detail": {
"inserted_chunk_indexes": inserted_chunks,
"updated_chunk_indexes": updated_chunks,
"failed_chunk_indexes": failed_chunks
},
"counts": {
"inserted": len(inserted_chunks),
"updated": len(updated_chunks),
"failed": len(failed_chunks)
}
}
crawler_service = CrawlerService()