Compare commits

...

5 Commits

Author SHA1 Message Date
b9dbf1e8f7 修改docker 2025-12-30 17:34:55 +08:00
d191b13455 修改配置和response的细节 2025-12-30 16:57:31 +08:00
8972246445 增加测试脚本 2025-12-30 16:19:58 +08:00
4d35626b90 直接新建后端把所有逻辑做完了 2025-12-30 10:17:02 +08:00
8c4491b383 aaaa 2025-12-29 14:42:33 +08:00
22 changed files with 2898 additions and 2212 deletions

15
.vscode/launch.json vendored
View File

@@ -1,15 +0,0 @@
{
"configurations": [
{
"name": "Python Debugger: FastAPI",
"type": "debugpy",
"request": "launch",
"module": "uvicorn",
"args": [
"backend.main:app",
"--reload"
],
"jinja": true
}
]
}

View File

@@ -1,49 +1,21 @@
# wiki_crawler
本仓库主要用于存放和更新dify中wiki_crawler的代码节点的代码
本仓库主要用于存放wiki_crawler的代码
`git config --local user.email "qinggang.deng@dxc.com"`
核心依赖 `firecrawl` 和 阿里百炼 的api支持
## 节点返回值格式约定
完成wiki网页爬取和向量化与知识库查找
节点返回值统一采用json格式包含以下字段
- code状态码0失败1成功2警告
- msg状态描述用于提示调用方
- data返回数据json格式根据不同节点有不同的字段若失败则为null
## 当前状况
## 节点输入输出设计
1. 当前在我的电脑本地跑没部署看chenwei有没有空了教我往我们服务器上我自己买的学生服务器还没来得及放上去三月份到期
2. 这个demo后端只实现了功能没有auth相关的部分后续可以直接迁移chenwei那边gtco_ai开一个模块放进去
3. firecrawl的apikey我自己的免费试用apikey快用完了需要准备部署调查付费
4. 可演示但是还没有包装到可以向客户汇报的层次后续考虑直接用dify做一个工具包装集成到Done的bot里或者用chatflow直接包装里面用节点请求部署好的后端进行知识库查询
- start启动节点
- input
- input
- type: ['url''task_id'] # 根据输入类型对input进行处理
- output
- register注册节点涉及sql
- input
- url任务url
- output
- task_id任务id用于后续查询任务状态
- is_new_task是否为新任务1表示是0表示否
- pending_urls剩余待处理url涉及sql
- input
- task_id任务id
- limit最多返回的url数量默认值为10
- output
- urls: 剩余待处理url列表
- save_results保存处理结果涉及sql
- input
- task_id任务id
- results任务结果列表用于存入数据库
- output
- completed已入库url列表
- failed入库url列表
- warnings入库警告列表
- message消息节点前置一个变量聚合器不涉及sql操作
- input
- msgs各个节点的msg经过前置节点整合后统一输出
- output
- output整合消息之后输出给end
- end结束节点
- input
- message节点整合的
切割逻辑准确率定义归结资料测试设计mcp服务调用搜索逻辑问题分类流程架构设计场景假设
整理dify报错
包装mcp server

View File

@@ -8,9 +8,19 @@ class Settings:
DB_PORT: str = "25432"
DB_NAME: str = "wiki_crawler"
@property
DASHSCOPE_API_KEY: str = "sk-8b091493de594c5e9eb42f12f1cc5805"
FIRECRAWL_API_KEY: str = "fc-8a2af3fb6a014a27a57dfbc728cb7365"
@property # property 方法,意义:将方法转换为属性,调用时不需要加括号
def DATABASE_URL(self) -> str:
url = f"postgresql+psycopg2://{self.DB_USER}:{self.DB_PASS}@{self.DB_HOST}:{self.DB_PORT}/{self.DB_NAME}"
return url
def API_KEY(self, type: str) -> str:
if type == "dashscope":
return self.DASHSCOPE_API_KEY
elif type == "firecrawl":
return self.FIRECRAWL_API_KEY
else:
raise ValueError(f"Unknown API type: {type}")
settings = Settings()

View File

@@ -1,76 +1,130 @@
from fastapi import FastAPI
from .service import crawler_service
from .schemas import RegisterRequest, PendingRequest, SaveResultsRequest, AddUrlsRequest, SearchRequest
# backend/main.py
from fastapi import FastAPI, APIRouter, BackgroundTasks
# 确保导入路径与你的文件名一致,如果文件名是 workflow.py 则用 workflow
from .services.crawler_sql_service import crawler_sql_service
from .services.automated_crawler import workflow
from .schemas import (
RegisterRequest, PendingRequest, SaveResultsRequest, AddUrlsRequest, SearchRequest,
AutoMapRequest, AutoProcessRequest, TextSearchRequest
)
from .utils import make_response
app = FastAPI(title="Wiki Crawler API")
@app.post("/register")
# ==========================================
# 工具函数
# ==========================================
# ==========================================
# V1 Router: 原始的底层接口 (Manual Control)
# ==========================================
router_v1 = APIRouter()
@router_v1.post("/register")
async def register(req: RegisterRequest):
try:
data = crawler_service.register_task(req.url)
return make_response(1, "Success", data)
# Service 返回: {'task_id': 1, 'is_new_task': True, 'msg': '...'}
res = crawler_sql_service.register_task(req.url)
# 使用 pop 将 msg 提取出来作为响应的 msg剩下的作为 data
return make_response(1, res.pop("msg", "Success"), res)
except Exception as e:
return make_response(0, str(e))
@app.post("/add_urls")
@router_v1.post("/add_urls")
async def add_urls(req: AddUrlsRequest):
try:
data = crawler_service.add_urls(req.task_id, req.urls_obj)
return make_response(1, "Success", data)
urls = req.urls_obj["urls"]
res = crawler_sql_service.add_urls(req.task_id, urls=urls)
return make_response(1, res.pop("msg", "Success"), res)
except Exception as e:
return make_response(0, str(e))
@app.post("/pending_urls")
@router_v1.post("/pending_urls")
async def pending_urls(req: PendingRequest):
try:
data = crawler_service.get_pending_urls(req.task_id, req.limit)
msg = "Success" if data["urls"] else "Queue Empty"
return make_response(1, msg, data)
res = crawler_sql_service.get_pending_urls(req.task_id, req.limit)
# 即使队列为空Service 也会返回 msg="Queue is empty"
return make_response(1, res.pop("msg", "Success"), res)
except Exception as e:
return make_response(0, str(e))
@app.post("/save_results")
@router_v1.post("/save_results")
async def save_results(req: SaveResultsRequest):
try:
data = crawler_service.save_results(req.task_id, req.results)
return make_response(1, "Success", data)
res = crawler_sql_service.save_results(req.task_id, req.results)
return make_response(1, res.pop("msg", "Success"), res)
except Exception as e:
return make_response(0, str(e))
@app.post("/search")
async def search(req: SearchRequest):
"""
通用搜索接口:
支持基于 task_id 的局部搜索,也支持不传 task_id 的全库搜索。
"""
@router_v1.post("/search")
async def search_v1(req: SearchRequest):
"""V1 搜索:客户端手动传向量"""
try:
# 1. 基础校验:确保向量不为空且维度正确(阿里 v4 模型通常为 1536
if not req.query_embedding or len(req.query_embedding) != 1536:
return make_response(
code=2,
msg=f"向量维度错误。期望 1536, 实际收到 {len(req.query_embedding) if req.query_embedding else 0}",
data=None
)
vector = req.query_embedding['vector']
if not vector:
return make_response(2, "Vector is empty", None)
# 2. 调用业务类执行搜索
data = crawler_service.search_knowledge(
query_embedding=req.query_embedding,
# Service 现在返回 {'results': [...], 'msg': 'Found ...'}
res = crawler_sql_service.search_knowledge(
query_embedding=vector,
task_id=req.task_id,
limit=req.limit
)
# 3. 统一返回
return make_response(
code=1,
msg="搜索完成",
data=data
)
return make_response(1, res.pop("msg", "Search Done"), res)
except Exception as e:
# 记录日志并返回失败信息
print(f"搜索接口异常: {str(e)}")
return make_response(code=0, msg=f"搜索失败: {str(e)}")
return make_response(0, str(e))
# ==========================================
# V2 Router: 自动化工作流 (Automated Workflow)
# ==========================================
router_v2 = APIRouter()
@router_v2.post("/crawler/map")
async def auto_map(req: AutoMapRequest):
"""
[同步] 输入首页 URL自动调用 Firecrawl Map 并入库
"""
try:
# Workflow 返回: {'task_id':..., 'msg': 'Task mapped...', ...}
res = workflow.map_and_ingest(req.url)
return make_response(1, res.pop("msg", "Mapping Started"), res)
except Exception as e:
return make_response(0, str(e))
@router_v2.post("/crawler/process")
async def auto_process(req: AutoProcessRequest, background_tasks: BackgroundTasks):
"""
[异步] 触发后台任务:消费队列 -> 抓取 -> Embedding -> 入库
"""
try:
# 将耗时操作放入后台任务
background_tasks.add_task(workflow.process_task_queue, req.task_id, req.batch_size)
# 因为是后台任务,无法立即获取 Service 的返回值 msg只能返回通用消息
return make_response(1, "Background processing started", {"task_id": req.task_id})
except Exception as e:
return make_response(0, str(e))
@router_v2.post("/search")
async def search_v2(req: TextSearchRequest):
"""
[智能] 输入自然语言文本 -> 后端转向量 -> 搜索
"""
try:
# Workflow 返回 {'results': [...], 'msg': '...'}
res = workflow.search_with_embedding(req.query, req.task_id, req.limit)
return make_response(1, res.pop("msg", "Search Success"), res)
except Exception as e:
return make_response(0, f"Search Failed: {str(e)}")
# ==========================================
# 挂载路由
# ==========================================
app.include_router(router_v1, prefix="/api/v1", tags=["V1 Manual API"])
app.include_router(router_v2, prefix="/api/v2", tags=["V2 Automated Workflow"])
if __name__ == "__main__":
import uvicorn

View File

@@ -1,5 +1,6 @@
from pydantic import BaseModel
from typing import List, Optional
from typing import Optional, List, Any
class RegisterRequest(BaseModel):
url: str
@@ -27,5 +28,23 @@ class SaveResultsRequest(BaseModel):
class SearchRequest(BaseModel):
# 如果不传 task_id则进行全库搜索
task_id: Optional[int] = None
query_embedding: List[float]
query_embedding: dict
limit: Optional[int] = 5
# ... (保留原有的 Schema: RegisterRequest, AddUrlsRequest 等) ...
# === V2 New Schemas ===
class AutoMapRequest(BaseModel):
url: str
class AutoProcessRequest(BaseModel):
task_id: int
batch_size: Optional[int] = 5
class TextSearchRequest(BaseModel):
query: str # 用户直接传文字,不需要传向量了
task_id: Optional[int] = None
limit: Optional[int] = 5

View File

@@ -0,0 +1,201 @@
import dashscope
from http import HTTPStatus
from firecrawl import FirecrawlApp
from langchain_text_splitters import RecursiveCharacterTextSplitter
from ..config import settings
from .crawler_sql_service import crawler_sql_service
# 初始化配置
dashscope.api_key = settings.DASHSCOPE_API_KEY
class AutomatedCrawler:
def __init__(self):
self.firecrawl = FirecrawlApp(api_key=settings.FIRECRAWL_API_KEY)
self.splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=100,
separators=["\n\n", "\n", "", "", "", " ", ""]
)
def _get_embedding(self, text: str):
"""内部方法:调用 Dashscope 生成向量"""
# 注意:此方法是内部辅助,出错返回 None由调用方处理状态
embedding = None
try:
resp = dashscope.TextEmbedding.call(
model=dashscope.TextEmbedding.Models.text_embedding_v3, # 确认你的模型版本
input=text,
dimension=1536
)
if resp.status_code == HTTPStatus.OK:
embedding = resp.output['embeddings'][0]['embedding']
else:
print(f"Embedding API Error: {resp}")
except Exception as e:
print(f"Embedding Exception: {e}")
return embedding
def map_and_ingest(self, start_url: str):
"""
V2 步骤1: 地图式扫描并入库
"""
print(f"[WorkFlow] Start mapping: {start_url}")
result = {}
try:
# 1. 在数据库注册任务
task_info = crawler_sql_service.register_task(start_url)
task_id = task_info['task_id']
is_new_task = task_info['is_new_task']
# 2. 调用 Firecrawl Map
if is_new_task:
map_result = self.firecrawl.map(start_url)
urls = []
# 兼容 firecrawl sdk 不同版本的返回结构
# 如果 map_result 是对象且有 links 属性
if hasattr(map_result, 'links'):
for link in map_result.links:
# 假设 link 是对象或字典,视具体 SDK 版本而定
# 如果 link 是字符串直接 append
if isinstance(link, str):
urls.append(link)
else:
urls.append(getattr(link, 'url', str(link)))
# 如果是字典
elif isinstance(map_result, dict):
urls = map_result.get('links', [])
print(f"[WorkFlow] Found {len(urls)} links")
# 3. 批量入库
res = {"msg": "No urls found to add"}
if urls:
res = crawler_sql_service.add_urls(task_id, urls)
result = {
"msg": "Task successfully mapped and URLs added",
"task_id": task_id,
"is_new_task": is_new_task,
"url_count": len(urls),
"map_detail": res
}
else:
result = {
"msg": "Task already exists, skipped mapping",
"task_id": task_id,
"is_new_task": False,
"url_count": 0,
"map_detail": {}
}
except Exception as e:
print(f"[WorkFlow] Map Error: {e}")
# 向上抛出异常,由 main.py 捕获并返回错误 Response
raise e
return result
def process_task_queue(self, task_id: int, limit: int = 10):
"""
V2 步骤2: 消费队列 -> 抓取 -> 切片 -> 向量化 -> 存储
"""
processed_count = 0
total_chunks_saved = 0
result = {}
# 1. 获取待处理 URL
pending = crawler_sql_service.get_pending_urls(task_id, limit)
urls = pending['urls']
if not urls:
result = {"msg": "Queue is empty, no processing needed", "processed_count": 0}
else:
for url in urls:
try:
print(f"[WorkFlow] Processing: {url}")
# 2. 单页抓取
scrape_res = self.firecrawl.scrape(
url,
params={'formats': ['markdown'], 'onlyMainContent': True}
)
# 兼容 SDK 返回类型 (对象或字典)
content = ""
metadata = {}
if isinstance(scrape_res, dict):
content = scrape_res.get('markdown', '')
metadata = scrape_res.get('metadata', {})
else:
content = getattr(scrape_res, 'markdown', '')
metadata = getattr(scrape_res, 'metadata', {})
if not metadata and hasattr(scrape_res, 'metadata_dict'):
metadata = scrape_res.metadata_dict
title = metadata.get('title', url)
if not content:
print(f"[WorkFlow] Skip empty content: {url}")
continue
# 3. 切片
chunks = self.splitter.split_text(content)
results_to_save = []
# 4. 向量化
for idx, chunk_text in enumerate(chunks):
vector = self._get_embedding(chunk_text)
if vector:
results_to_save.append({
"source_url": url,
"chunk_index": idx,
"title": title,
"content": chunk_text,
"embedding": vector
})
# 5. 保存
if results_to_save:
save_res = crawler_sql_service.save_results(task_id, results_to_save)
processed_count += 1
total_chunks_saved += save_res['counts']['inserted'] + save_res['counts']['updated']
except Exception as e:
print(f"[WorkFlow] Error processing {url}: {e}")
# 此处不抛出异常,以免打断整个批次的循环
# 实际生产建议在这里调用 service 将 url 标记为 failed
result = {
"msg": f"Batch processing complete. URLs processed: {processed_count}",
"processed_urls": processed_count,
"total_chunks_saved": total_chunks_saved
}
return result
def search_with_embedding(self, query_text: str, task_id: int = None, limit: int = 5):
"""
V2 搜索: 输入文本 -> 自动转向量 -> 搜索数据库
"""
result = {}
# 1. 获取向量
vector = self._get_embedding(query_text)
if not vector:
result = {
"msg": "Failed to generate embedding for query",
"results": []
}
else:
# 2. 执行搜索
# search_knowledge 现在已经返回带 msg 的字典了
result = crawler_sql_service.search_knowledge(vector, task_id, limit)
return result
# 单例模式
workflow = AutomatedCrawler()

View File

@@ -1,39 +1,48 @@
# service.py
from sqlalchemy import select, insert, update, delete, and_
from .database import db_instance
from .utils import normalize_url
from sqlalchemy import select, insert, update, and_
from ..database import db_instance
from ..utils import normalize_url
class CrawlerService:
class CrawlerSqlService:
def __init__(self):
self.db = db_instance
def register_task(self, url: str):
"""完全使用库 API 实现的注册"""
clean_url = normalize_url(url)
result = {}
with self.db.engine.begin() as conn:
# 使用 select() API
query = select(self.db.tasks.c.id).where(self.db.tasks.c.root_url == clean_url)
existing = conn.execute(query).fetchone()
if existing:
return {"task_id": existing[0], "is_new_task": False}
result = {
"task_id": existing[0],
"is_new_task": False,
"msg": "Task already exists"
}
else:
# 使用 insert() API
stmt = insert(self.db.tasks).values(root_url=clean_url).returning(self.db.tasks.c.id)
new_task = conn.execute(stmt).fetchone()
result = {
"task_id": new_task[0],
"is_new_task": True,
"msg": "New task created successfully"
}
# 使用 insert() API
stmt = insert(self.db.tasks).values(root_url=clean_url).returning(self.db.tasks.c.id)
new_task = conn.execute(stmt).fetchone()
return {"task_id": new_task[0], "is_new_task": True}
return result
def add_urls(self, task_id: int, urls_obj: dict):
def add_urls(self, task_id: int, urls: list[str]):
"""通用 API 实现的批量添加(含详细返回)"""
success_urls, skipped_urls, failed_urls = [], [], []
# 从 urls_obj 中提取 urls 列表
urls = urls_obj.get("urls", [])
with self.db.engine.begin() as conn:
for url in urls:
clean_url = normalize_url(url)
try:
# 检查队列中是否已存在该 URL (通用写法)
# 检查队列中是否已存在该 URL
check_q = select(self.db.queue).where(
and_(self.db.queue.c.task_id == task_id, self.db.queue.c.url == clean_url)
)
@@ -49,10 +58,20 @@ class CrawlerService:
except Exception:
failed_urls.append(clean_url)
return {"success_urls": success_urls, "skipped_urls": skipped_urls, "failed_urls": failed_urls}
# 构造返回消息
msg = f"Added {len(success_urls)} urls, skipped {len(skipped_urls)}, failed {len(failed_urls)}"
return {
"success_urls": success_urls,
"skipped_urls": skipped_urls,
"failed_urls": failed_urls,
"msg": msg
}
def get_pending_urls(self, task_id: int, limit: int):
"""原子锁定 API 实现"""
result = {}
with self.db.engine.begin() as conn:
query = select(self.db.queue.c.url).where(
and_(self.db.queue.c.task_id == task_id, self.db.queue.c.status == 'pending')
@@ -65,17 +84,20 @@ class CrawlerService:
and_(self.db.queue.c.task_id == task_id, self.db.queue.c.url.in_(urls))
).values(status='processing')
conn.execute(upd)
return {"urls": urls}
result = {"urls": urls, "msg": f"Fetched {len(urls)} pending urls"}
else:
result = {"urls": [], "msg": "Queue is empty"}
return result
def save_results(self, task_id: int, results: list):
"""
保存同一 URL 的多个切片
返回 URL 下切片的详细处理统计及页面更新状态
"""
if not results:
return {"msg": "No data provided"}
return {"msg": "No data provided to save", "counts": {"inserted": 0, "updated": 0, "failed": 0}}
# 1. 基础信息提取 (假设 results 里的 source_url 都是一致的)
# 1. 基础信息提取
first_item = results[0] if isinstance(results[0], dict) else results[0].__dict__
target_url = normalize_url(first_item.get('source_url'))
@@ -86,7 +108,7 @@ class CrawlerService:
is_page_update = False
with self.db.engine.begin() as conn:
# 2. 判断该 URL 是否已经有切片存在 (以此判定是否为“页面更新”)
# 2. 判断该 URL 是否已经有切片存在
check_page_stmt = select(self.db.chunks.c.id).where(
and_(self.db.chunks.c.task_id == task_id, self.db.chunks.c.source_url == target_url)
).limit(1)
@@ -99,7 +121,7 @@ class CrawlerService:
c_idx = data.get('chunk_index')
try:
# 检查具体某个 index 的切片是否存在
# 检查切片是否存在
find_chunk_stmt = select(self.db.chunks.c.id).where(
and_(
self.db.chunks.c.task_id == task_id,
@@ -110,7 +132,7 @@ class CrawlerService:
existing_chunk = conn.execute(find_chunk_stmt).fetchone()
if existing_chunk:
# 覆盖更新现有切片
# 覆盖更新
upd_stmt = update(self.db.chunks).where(
self.db.chunks.c.id == existing_chunk[0]
).values(
@@ -137,16 +159,19 @@ class CrawlerService:
print(f"Chunk {c_idx} failed: {e}")
failed_chunks.append(c_idx)
# 4. 最终更新队列状态
# 4. 更新队列状态
conn.execute(
update(self.db.queue).where(
and_(self.db.queue.c.task_id == task_id, self.db.queue.c.url == target_url)
).values(status='completed')
)
# 构造返回
msg = f"Saved results for {target_url}. Inserted: {len(inserted_chunks)}, Updated: {len(updated_chunks)}"
return {
"source_url": target_url,
"is_page_update": is_page_update, # 标志:此页面此前是否有过内容
"is_page_update": is_page_update,
"detail": {
"inserted_chunk_indexes": inserted_chunks,
"updated_chunk_indexes": updated_chunks,
@@ -156,20 +181,18 @@ class CrawlerService:
"inserted": len(inserted_chunks),
"updated": len(updated_chunks),
"failed": len(failed_chunks)
}
},
"msg": msg
}
def search_knowledge(self, query_embedding: list, task_id: int = None, limit: int = 5):
"""
高性能向量搜索方法
:param query_embedding: 问题的向量
:param task_id: 可选的任务ID不传则搜全表
:param limit: 返回结果数量
"""
results = []
msg = ""
with self.db.engine.connect() as conn:
# 1. 选择需要的字段
# 我们同时返回 task_id方便在全库搜索时知道来源哪个任务
stmt = select(
self.db.chunks.c.task_id,
self.db.chunks.c.source_url,
@@ -178,20 +201,15 @@ class CrawlerService:
self.db.chunks.c.chunk_index
)
# 2. 动态添加过滤条件
if task_id is not None:
stmt = stmt.where(self.db.chunks.c.task_id == task_id)
# 3. 按余弦距离排序1 - 余弦相似度)
# 距离越小,相似度越高
stmt = stmt.order_by(
self.db.chunks.c.embedding.cosine_distance(query_embedding)
).limit(limit)
# 4. 执行并解析结果
rows = conn.execute(stmt).fetchall()
results = []
for r in rows:
results.append({
"task_id": r[0],
@@ -201,7 +219,12 @@ class CrawlerService:
"chunk_index": r[4]
})
return results
if results:
msg = f"Found {len(results)} matches"
else:
msg = "No matching content found"
return {"results": results, "msg": msg}
crawler_service = CrawlerService()
crawler_sql_service = CrawlerSqlService()

View File

@@ -1,6 +1,24 @@
from urllib.parse import urlparse, urlunparse
from sqlalchemy import create_engine, MetaData, Table, select, update, and_
# backend/llm_service.py
import dashscope
from http import HTTPStatus
from .config import settings
# 初始化 Dashscope
dashscope.api_key = settings.DASHSCOPE_API_KEY
def get_embeddings(texts: list[str]):
"""调用通义千问 embedding 模型"""
resp = dashscope.TextEmbedding.call(
model=dashscope.TextEmbedding.Models.text_embedding_v3, # 或其他模型
input=texts
)
if resp.status_code == HTTPStatus.OK:
return [item['embedding'] for item in resp.output['embeddings']]
else:
print(f"Embedding Error: {resp}")
return []
def normalize_url(url: str) -> str:
if not url: return ""
url = url.strip()
@@ -11,5 +29,11 @@ def normalize_url(url: str) -> str:
if not path: path = ""
return urlunparse((scheme, netloc, path, parsed.params, parsed.query, ""))
def make_response(code: int, msg: str, data: any = None):
def make_response(code: int, msg: str = "Success", data: any = None):
"""
统一响应格式
:param code: 1 成功, 0 失败, 其他自定义
:param msg: 提示信息
:param data: 返回数据
"""
return {"code": code, "msg": msg, "data": data}

140
docs/docker.md Normal file
View File

@@ -0,0 +1,140 @@
# Wiki Crawler Backend 部署操作手册
## 核心配置信息 (每次只需修改这里)
**在执行命令前,请先确定本次发布的** **版本号**
| **字段** | **当前值 (示例)** | **说明** | **每次要改吗?** |
| -------------------- | ---------------------------------- | ------------------------------ | ---------------------- |
| **Version** | **v1.0.3** | **镜像的版本标签 (Tag)** | **是 (必须改)** |
| **Image Name** | **wiki-crawl-backend** | **镜像/容器的名字** | **否 (固定)** |
| **Namespace** | **qg-demo** | **阿里云命名空间** | **否 (固定)** |
| **Registry** | **crpi-1rwd6fvain6t49g2...** | **阿里云仓库地址** | **否 (固定)** |
---
## 第一阶段:本地电脑 (Windows) - 打包与上传
**打开 PowerShell 或 CMD进入项目根目录。**
### 1. 构建镜像 (Build)
**修改命令最后的版本号** **v1.0.3**
```powershell
docker build -t crpi-1rwd6fvain6t49g2.cn-hangzhou.personal.cr.aliyuncs.com/qg-demo/wiki-crawl-backend:v1.0.3 .
```
### 2. 推送镜像 (Push)
**修改命令最后的版本号** **v1.0.3**
```powershell
docker push crpi-1rwd6fvain6t49g2.cn-hangzhou.personal.cr.aliyuncs.com/qg-demo/wiki-crawl-backend:v1.0.3
```
> **成功标准:** **看到进度条走完,且最后显示** **Pushed**。
---
## 第二阶段:云服务器 (Linux) - 部署更新
**使用 SSH 登录阿里云服务器,按顺序执行。**
### 1. 拉取新镜像 (Pull)
**修改命令最后的版本号** **v1.0.3**
```bash
docker pull crpi-1rwd6fvain6t49g2.cn-hangzhou.personal.cr.aliyuncs.com/qg-demo/wiki-crawl-backend:v1.0.3
```
### 2. 停止并删除旧容器
**这一步是为了释放端口,不会删除镜像文件**
```bash
docker stop wiki-crawl-backend
docker rm wiki-crawl-backend
```
### 3. 启动新容器 (Run) - 关键步骤
**修改命令最后的版本号** **v1.0.3**
**code**Bash
```bash
docker run -d --name wiki-crawl-backend \
-e PYTHONUNBUFFERED=1 \
-p 80:8000 \
crpi-1rwd6fvain6t49g2.cn-hangzhou.personal.cr.aliyuncs.com/qg-demo/wiki-crawl-backend:v1.0.3
```
### 4. 验证与日志查看
**code**Bash
```
# 查看容器状态 (STATUS 应该是 Up)
docker ps
# 查看实时日志 (按 Ctrl+C 退出)
docker logs -f wiki-crawl-backend
```
---
## 第三阶段:清理工作 (可选)
**为了防止服务器硬盘被旧版本的镜像塞满,建议定期执行清理。**
**code**Bash
```
# 删除所有“未被使用”的旧镜像
docker image prune -a -f
```
---
## 附录:命令参数详解 (小白必读)
**在** **docker run** **命令中,各个参数的含义如下:**
### 1. **-d** **(Detached)**
* **含义:** **后台运行。**
* **作用:** **容器启动后会默默在后台跑,不会占用你的黑窗口。如果不加这个,你一关 SSH 窗口,服务就停了。**
### 2. **--name wiki-crawl-backend**
* **含义:** **给容器起个名字。**
* **作用:** **有了名字,以后你要停止它、重启它、看日志,直接叫它的名字就行(比如** **docker stop wiki-crawl-backend**),不用去查那串随机的容器 ID。
### 3. **-e PYTHONUNBUFFERED=1**
* **含义:** **设置环境变量 (Environment Variable)。**
* **作用:** **这是一个 Python 专用的设置。意思是**“不要缓存输出”**。**
* **如果不加Python 会把日志憋在肚子里,攒够了一坨才吐出来,导致你用** **docker logs** **只能看到几分钟前的日志。**
* **加了:日志实时打印,报错立刻能看到。**
### 4. **-p 80:8000** **(Port Mapping)**
* **含义:** **端口映射。格式是** **宿主机端口:容器内部端口**
* **解析:**
* **80 (左边)**:这是阿里云服务器对外的门牌号。用户访问 **http://1.2.3.4** **时,默认就是找 80 端口。**
* **8000 (右边)**:这是你 Python 代码 (FastAPI/Uvicorn) 实际监听的端口。
* **作用:** **把服务器大门 (80) 接到的客人,领到 Python 小房间 (8000) 去。**
### 5. 那个超长的 URL
**crpi-1rwd6fvain6t49g2.cn-hangzhou.personal.cr.aliyuncs.com/qg-demo/wiki-crawl-backend:v1.0.3**
* **Registry (仓库地址)**: **crpi-1rwd...aliyuncs.com** **-> 你的专属阿里云仓库服务器。**
* **Namespace (命名空间)**: **qg-demo** **-> 你在仓库里划出的个人地盘。**
* **Image Name (镜像名)**: **wiki-crawl-backend** **-> 这个项目的名字。**
* **Tag (标签)**: **v1.0.3** **-> 相当于软件的版本号。如果不写 Tag默认就是** **latest**。**生产环境强烈建议写明确的版本号**,方便回滚(比如 1.0.3 挂了,你可以立马用 1.0.2 启动)。

149
docs/t.md
View File

@@ -1,149 +0,0 @@
针对你希望实现“通用、支持全库或特定任务搜索”的需求,我重新设计并实现了搜索 API。
这一版代码继续采用 **SQLAlchemy Core API**实现了逻辑上的“存取分离”Dify 只管发向量,后端决定怎么搜。
### 1. 修改 `schemas.py`
增加搜索请求模型,将 `task_id` 设为可选(`Optional`),从而支持全局搜索。
```python
from pydantic import BaseModel
from typing import List, Optional
class SearchRequest(BaseModel):
# 如果不传 task_id则进行全库搜索
task_id: Optional[int] = None
query_embedding: List[float]
limit: Optional[int] = 5
```
### 2. 重新实现 `service.py` 中的搜索方法
利用 `pgvector``cosine_distance` 算子。注意这里使用了动态构建查询的技巧。
```python
from sqlalchemy import select, and_
from .database import db_instance
class CrawlerService:
def __init__(self):
self.db = db_instance
def search_knowledge(self, query_embedding: list, task_id: int = None, limit: int = 5):
"""
高性能向量搜索方法
:param query_embedding: 问题的向量
:param task_id: 可选的任务ID不传则搜全表
:param limit: 返回结果数量
"""
with self.db.engine.connect() as conn:
# 1. 选择需要的字段
# 我们同时返回 task_id方便在全库搜索时知道来源哪个任务
stmt = select(
self.db.chunks.c.task_id,
self.db.chunks.c.source_url,
self.db.chunks.c.title,
self.db.chunks.c.content,
self.db.chunks.c.chunk_index
)
# 2. 动态添加过滤条件
if task_id is not None:
stmt = stmt.where(self.db.chunks.c.task_id == task_id)
# 3. 按余弦距离排序1 - 余弦相似度)
# 距离越小,相似度越高
stmt = stmt.order_by(
self.db.chunks.c.embedding.cosine_distance(query_embedding)
).limit(limit)
# 4. 执行并解析结果
rows = conn.execute(stmt).fetchall()
results = []
for r in rows:
results.append({
"task_id": r[0],
"source_url": r[1],
"title": r[2],
"content": r[3],
"chunk_index": r[4]
})
return results
```
### 3. 重新实现 `main.py` 中的接口
增加了对向量维度的基础校验,确保系统健壮性。
```python
@app.post("/search")
async def search(req: SearchRequest):
"""
通用搜索接口:
支持基于 task_id 的局部搜索,也支持不传 task_id 的全库搜索。
"""
try:
# 1. 基础校验:确保向量不为空且维度正确(阿里 v4 模型通常为 1536
if not req.query_embedding or len(req.query_embedding) != 1536:
return make_response(
code=2,
msg=f"向量维度错误。期望 1536, 实际收到 {len(req.query_embedding) if req.query_embedding else 0}",
data=None
)
# 2. 调用业务类执行搜索
data = crawler_service.search_knowledge(
query_embedding=req.query_embedding,
task_id=req.task_id,
limit=req.limit
)
# 3. 统一返回
return make_response(
code=1,
msg="搜索完成",
data=data
)
except Exception as e:
# 记录日志并返回失败信息
print(f"搜索接口异常: {str(e)}")
return make_response(code=0, msg=f"搜索失败: {str(e)}")
```
### 4. 配合性能优化的 SQL 建议
如果你在 Dify 中运行一段时间后发现搜索变慢(数据达到万级或十万级),请在数据库执行以下语句开启 **HNSW 索引**。这是目前向量搜索性能最高、内存占用最平衡的方案。
```sql
-- 针对余弦距离优化搜索速度
-- m: 列表最大连接数ef_construction: 动态候选列表大小
-- 这两个参数决定了索引的精度和构建速度
CREATE INDEX idx_chunks_embedding_hnsw
ON knowledge_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
```
### 5. 在 Dify 中这个节点的输出怎么用?
由于 `/search` 返回的是一个列表,在 Dify 中接一个 **“代码节点”** 处理成文本最方便:
```python
def main(search_data: list) -> dict:
# search_data 是从 HTTP 节点获得的 data 数组
context_list = []
for i, item in enumerate(search_data):
# 格式化每条资料,包含来源和内容
block = f"【资料{i+1}】来源: {item['source_url']}\n内容: {item['content']}"
context_list.append(block)
# 用换行符连接所有资料
return {
"final_context": "\n\n".join(context_list)
}
```
最后把这个 `final_context` 塞进 LLM 节点的 Prompt 即可。这样的设计确保了你的 Dify 流程非常干净:**输入 -> 转向量 -> 搜后端 -> 出答案**。

View File

@@ -1,6 +0,0 @@
def main():
print("Hello from wiki-crawler!")
if __name__ == "__main__":
main()

View File

@@ -48,4 +48,64 @@
"success": true
}
]
}
},
{
"DASHSCOPE_API_KEY": "sk-8b091493de594c5e9eb42f12f1cc5805",
"scrape_json": [
{
"error": "Insufficient credits to perform this request. For more credits, you can upgrade your plan at https://firecrawl.dev/pricing or try changing the request limit to a lower value.",
"success": false
}
]
},
[
{
"data": {
"markdown": "[Skip to main content](https://docs.dify.ai/en/use-dify/knowledge/create-knowledge/import-text-data/readme#content-area)\n\n[Dify Docs home page![light logo](https://assets-docs.dify.ai/2025/05/d05cfc6ebe48f725d171dc71c64a5d16.svg)![dark logo](https://assets-docs.dify.ai/2025/05/c51f1cda47c1d9a4a162d7736f6e4c53.svg)](https://docs.dify.ai/)\n\nLatest\n![US](https://d3gk2c5xim1je2.cloudfront.net/flags/US.svg)\n\nEnglish\n\nSearch...\n\nCtrl K\n\nSearch...\n\nNavigation\n\n1\\. Import Text Data\n\n1\\. Import Text Data\n\nClick on Knowledge in the main navigation bar of Dify. On this page, you can see your existing knowledge bases. Click **Create Knowledge** to enter the setup wizard. The Knowledge supports the import of the following two online data:Click **Knowledge** in the top navigation bar of the Dify, then select **Create Knowledge**. You can upload documents to the knowledge or importing online data to it.\n\n## [](https://docs.dify.ai/en/use-dify/knowledge/create-knowledge/import-text-data/readme\\#upload-local-files) Upload Local Files\n\nDrag and drop or select files to upload. The number of files allowed for **batch upload** depends on your [subscription plan](https://dify.ai/pricing).**Limitations for uploading documents:**\n\n- The upload size limit for a single document is 15MB;\n- Different [subscription plans](https://dify.ai/pricing) for the SaaS version limit **batch upload numbers, total document uploads, and vector storage**\n\n![Create knowledge](https://assets-docs.dify.ai/2025/01/22064cb61356e4c005c4072d5d066cf6.png)\n\n## [](https://docs.dify.ai/en/use-dify/knowledge/create-knowledge/import-text-data/readme\\#import-from-online-data-source) Import From Online Data Source\n\nWhen creating a **Knowledge**, you can import data from online sources. The knowledge supports the following two types of online data: [**1.1 Import Data from Notion** \\\\\n\\\\\nLearn how to import data from Notion](https://docs.dify.ai/en/use-dify/knowledge/create-knowledge/import-text-data/sync-from-notion) [**1.2 Sync from Website** \\\\\n\\\\\nLearn how to sync data from websites](https://docs.dify.ai/en/use-dify/knowledge/create-knowledge/import-text-data/sync-from-website)\n\nIf a knowledge base is set up to use online data, you wont be able to add local documents later or switch it to a local file-based mode. This prevents a single knowledge base from mixing multiple data sources, avoiding management complications.\n\n## [](https://docs.dify.ai/en/use-dify/knowledge/create-knowledge/import-text-data/readme\\#adding-data-later) Adding Data Later\n\nIf you havent prepared your documents or other content yet, simply create an empty knowledge first. You can then upload local files or import online data whenever youre ready.\n\nWas this page helpful?\n\nYesNo\n\n[Previous](https://docs.dify.ai/en/use-dify/knowledge/create-knowledge/introduction) [1.1 Sync Data from Notion\\\\\n\\\\\nNext](https://docs.dify.ai/en/use-dify/knowledge/create-knowledge/import-text-data/sync-from-notion)\n\nCtrl+I\n\nOn this page\n\n- [Upload Local Files](https://docs.dify.ai/en/use-dify/knowledge/create-knowledge/import-text-data/readme#upload-local-files)\n- [Import From Online Data Source](https://docs.dify.ai/en/use-dify/knowledge/create-knowledge/import-text-data/readme#import-from-online-data-source)\n- [Adding Data Later](https://docs.dify.ai/en/use-dify/knowledge/create-knowledge/import-text-data/readme#adding-data-later)\n\nAssistant\n\nResponses are generated using AI and may contain mistakes.\n\n![Create knowledge](https://assets-docs.dify.ai/2025/01/22064cb61356e4c005c4072d5d066cf6.png)",
"metadata": {
"apple-mobile-web-app-title": "Dify Docs",
"application-name": "Dify Docs",
"cacheState": "hit",
"cachedAt": "2025-12-09T08:12:32.803Z",
"canonical": "https://docs.dify.ai/en/use-dify/knowledge/create-knowledge/import-text-data/readme",
"charset": "utf-8",
"concurrencyLimited": true,
"concurrencyQueueDurationMs": 371,
"contentType": "text/html; charset=utf-8",
"creditsUsed": 1,
"favicon": "https://docs.dify.ai/mintlify-assets/_mintlify/favicons/dify-6c0370d8/tWYYD8GkT0MUJV0z/_generated/favicon/favicon-16x16.png",
"generator": "Mintlify",
"language": "en",
"msapplication-TileColor": "#0060FF",
"msapplication-config": "/mintlify-assets/_mintlify/favicons/dify-6c0370d8/tWYYD8GkT0MUJV0z/_generated/favicon/browserconfig.xml",
"next-size-adjust": "",
"og:image": "https://dify-6c0370d8.mintlify.app/mintlify-assets/_next/image?url=%2F_mintlify%2Fapi%2Fog%3Fdivision%3D1.%2BImport%2BText%2BData%26title%3D1.%2BImport%2BText%2BData%26logoLight%3Dhttps%253A%252F%252Fassets-docs.dify.ai%252F2025%252F05%252Fd05cfc6ebe48f725d171dc71c64a5d16.svg%26logoDark%3Dhttps%253A%252F%252Fassets-docs.dify.ai%252F2025%252F05%252Fc51f1cda47c1d9a4a162d7736f6e4c53.svg%26primaryColor%3D%25230060FF%26lightColor%3D%2523688FE8%26darkColor%3D%25230034FF%26backgroundLight%3D%2523ffffff%26backgroundDark%3D%25230b0c0f&w=1200&q=100",
"og:image:height": "630",
"og:image:width": "1200",
"og:site_name": "Dify Docs",
"og:title": "1. Import Text Data - Dify Docs",
"og:type": "website",
"og:url": "https://docs.dify.ai/en/use-dify/knowledge/create-knowledge/import-text-data/readme",
"ogImage": "https://dify-6c0370d8.mintlify.app/mintlify-assets/_next/image?url=%2F_mintlify%2Fapi%2Fog%3Fdivision%3D1.%2BImport%2BText%2BData%26title%3D1.%2BImport%2BText%2BData%26logoLight%3Dhttps%253A%252F%252Fassets-docs.dify.ai%252F2025%252F05%252Fd05cfc6ebe48f725d171dc71c64a5d16.svg%26logoDark%3Dhttps%253A%252F%252Fassets-docs.dify.ai%252F2025%252F05%252Fc51f1cda47c1d9a4a162d7736f6e4c53.svg%26primaryColor%3D%25230060FF%26lightColor%3D%2523688FE8%26darkColor%3D%25230034FF%26backgroundLight%3D%2523ffffff%26backgroundDark%3D%25230b0c0f&w=1200&q=100",
"ogTitle": "1. Import Text Data - Dify Docs",
"ogUrl": "https://docs.dify.ai/en/use-dify/knowledge/create-knowledge/import-text-data/readme",
"proxyUsed": "basic",
"scrapeId": "019b024f-f76e-746b-b13c-6ca4884fdd64",
"sourceURL": "https://docs.dify.ai/en/use-dify/knowledge/create-knowledge/import-text-data/readme",
"statusCode": 200,
"title": "1. Import Text Data - Dify Docs",
"twitter:card": "summary_large_image",
"twitter:image": "https://dify-6c0370d8.mintlify.app/mintlify-assets/_next/image?url=%2F_mintlify%2Fapi%2Fog%3Fdivision%3D1.%2BImport%2BText%2BData%26title%3D1.%2BImport%2BText%2BData%26logoLight%3Dhttps%253A%252F%252Fassets-docs.dify.ai%252F2025%252F05%252Fd05cfc6ebe48f725d171dc71c64a5d16.svg%26logoDark%3Dhttps%253A%252F%252Fassets-docs.dify.ai%252F2025%252F05%252Fc51f1cda47c1d9a4a162d7736f6e4c53.svg%26primaryColor%3D%25230060FF%26lightColor%3D%2523688FE8%26darkColor%3D%25230034FF%26backgroundLight%3D%2523ffffff%26backgroundDark%3D%25230b0c0f&w=1200&q=100",
"twitter:image:height": "630",
"twitter:image:width": "1200",
"twitter:title": "1. Import Text Data - Dify Docs",
"url": "https://docs.dify.ai/en/use-dify/knowledge/create-knowledge/import-text-data/readme",
"viewport": "width=device-width, initial-scale=1"
},
"warning": "This scrape job was throttled at your current concurrency limit. If you'd like to scrape faster, you can upgrade your plan."
},
"success": true
}
]

View File

@@ -41,6 +41,9 @@ def chunks_embedding(texts: list[str], api_key: str) -> list[list[float]]:
def main(text: str, api_key: str):
vector = chunks_embedding([text], api_key)[0]
return {
'vector': vector
'vector': {
'vector': vector
}
}

1
nodes/llm.md Normal file
View File

@@ -0,0 +1 @@

View File

@@ -1,19 +1,27 @@
def check_status(status_code: float, body: str):
import json
def parse_response(status_code: float, body: str):
'''
检查状态码和约定的返回值
并且返回正确的body
'''
if status_code != 200:
raise Exception(f"注册任务失败,状态码:{status_code}")
if "code" not in body or body["code"] != 1:
data = json.loads(body)
if "code" not in data or data["code"] != 1:
raise Exception(f"注册任务失败,返回值:{body}")
return data["data"]
def main(status_code: float, body: str):
try:
check_status(status_code, body)
data = parse_response(status_code, body)
except Exception as e:
raise e
urls = body["data"]["urls"]
urls = data["urls"]
return {
"urls": urls,

View File

@@ -27,14 +27,3 @@ def main(status_code: float, body: str):
"task_id": task_id,
"is_new_task": is_new_task
}
def test():
import json
with open("nodes\parse_register.json", "r") as f:
data = json.load(f)
status_code = data["status_code"]
body = data["body"]
res = main(status_code, body)
print(res)
test()

View File

@@ -1,20 +1,27 @@
def check_status(status_code: float, body: str):
import json
def parse_response(status_code: float, body: str):
'''
检查状态码和约定的返回值
并且返回正确的body
'''
if status_code != 200:
raise Exception(f"注册任务失败,状态码:{status_code}")
if "code" not in body or body["code"] != 1:
data = json.loads(body)
if "code" not in data or data["code"] != 1:
raise Exception(f"注册任务失败,返回值:{body}")
return data["data"]
def main(status_code: float, body: str):
try:
check_status(status_code, body)
data = parse_response(status_code, body)
except Exception as e:
raise e
urls_result = body["data"]
return {
"add_urls_result": urls_result
"add_urls_result": data
}

View File

@@ -1,12 +1,20 @@
def check_status(status_code: float, body: str):
import json
def parse_response(status_code: float, body: str):
'''
检查状态码和约定的返回值
并且返回正确的body
'''
if status_code != 200:
raise Exception(f"注册任务失败,状态码:{status_code}")
if "code" not in body or body["code"] != 1:
data = json.loads(body)
if "code" not in data or data["code"] != 1:
raise Exception(f"注册任务失败,返回值:{body}")
return data["data"]
def format_rag_context(data: list) -> str:
'''
将检索到的 data 列表格式化为 Markdown 文本
@@ -37,11 +45,10 @@ def format_rag_context(data: list) -> str:
def main(status_code: float, body: str):
try:
check_status(status_code, body)
data = parse_response(status_code, body)
except Exception as e:
raise e
data = body["data"]
rag_context = format_rag_context(data)
return {

View File

@@ -5,14 +5,17 @@ description = "Add your description here"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"dashscope>=1.25.5",
"fastapi>=0.125.0",
"firecrawl>=4.10.2",
"firecrawl-py>=4.12.0",
"langchain>=1.2.0",
"langchain-community>=0.4.1",
"langchain-core>=1.2.2",
"numpy>=2.2.6",
"pgvector>=0.4.2",
"pinecone>=8.0.0",
"pip>=25.3",
"psycopg2-binary>=2.9.11",
"pymilvus>=2.6.5",
"qdrant-client==1.10.1",

View File

@@ -1,8 +0,0 @@
import random
# 生成1536 8位随机向量
def generate_random_vector(dim=1536):
return [round(random.uniform(-1, 1), 8) for _ in range(dim)]
data = [generate_random_vector() for _ in range(1000)]
print(data[0])

130
scripts/rob.py Normal file
View File

@@ -0,0 +1,130 @@
import requests
import json
import dashscope
from http import HTTPStatus
from typing import List, Dict
# ================= 配置区域 =================
# 1. 设置 DashScope API Key (这里填入你提供的 Key)
dashscope.api_key = "sk-8b091493de594c5e9eb42f12f1cc5805"
# 2. 本地后端地址 (刚才写的 FastAPI)
BACKEND_SEARCH_URL = "http://127.0.0.1:8000/api/v2/search"
# 3. 选择模型 (qwen-turbo, qwen-plus, qwen-max)
MODEL_NAME = dashscope.Generation.Models.qwen_plus
# ===========================================
class WikiBot:
def __init__(self):
self.history = [] #以此保存多轮对话上下文(可选)
def search_knowledge_base(self, query: str, top_k: int = 5) -> List[Dict]:
"""调用本地后端接口检索相关知识"""
try:
payload = {
"query": query,
"limit": top_k
}
# 调用 /api/v2/search后端会自动做 embedding
resp = requests.post(BACKEND_SEARCH_URL, json=payload)
if resp.status_code == 200:
data = resp.json()
if data.get("code") == 1:
return data.get("data", [])
print(f"[Warning] 检索失败: {resp.text}")
return []
except Exception as e:
print(f"[Error] 连接后端失败: {e}")
return []
def build_prompt(self, query: str, context_chunks: List[Dict]) -> str:
"""构建 RAG 提示词"""
if not context_chunks:
return f"用户问题:{query}\n\n当前知识库中没有找到相关信息,请直接告知用户无法回答。"
# 拼接参考资料
context_str = ""
for idx, item in enumerate(context_chunks):
# 这里把 source_url 也带上,方便 AI 引用来源
source = item.get('source_url', '未知来源')
content = item.get('content', '').strip()
context_str += f"【参考资料 {idx+1}】(来源: {source}):\n{content}\n\n"
# 系统提示词 (System Prompt)
prompt = f"""你是一个专业的 Wiki 知识库助手。
请严格根据下方的【参考上下文】来回答用户的【问题】。
要求:
1. 回答要准确、简洁,并整合不同参考资料中的信息。
2. 如果【参考上下文】中包含答案,请用自己的话回答,并在句尾标注来源,例如 [参考资料 1]。
3. 如果【参考上下文】与问题无关或不包含答案,请直接回答:“知识库中暂未收录相关信息”,不要编造答案。
4. 保持回答格式清晰(可以使用 Markdown
====== 参考上下文 开始 ======
{context_str}
====== 参考上下文 结束 ======
用户问题:{query}
"""
return prompt
def chat(self, query: str):
"""主对话逻辑"""
print(f"\n🔍 正在检索知识库...")
# 1. 检索
chunks = self.search_knowledge_base(query)
print(f"✅ 找到 {len(chunks)} 条相关资料")
# 2. 构建 Prompt
prompt = self.build_prompt(query, chunks)
# (可选) 调试时打印 prompt 看看给 AI 喂了什么
# print(f"DEBUG PROMPT:\n{prompt}\n")
print("🤖 Wiki助手正在思考...\n" + "-"*30)
# 3. 调用 DashScope 生成 (流式输出)
responses = dashscope.Generation.call(
model=MODEL_NAME,
messages=[
{'role': 'system', 'content': 'You are a helpful assistant.'},
{'role': 'user', 'content': prompt}
],
result_format='message', # 设置输出为 message 格式
stream=True, # 开启流式输出
incremental_output=True # 增量输出
)
full_content = ""
for response in responses:
if response.status_code == HTTPStatus.OK:
text = response.output.choices[0]['message']['content']
full_content += text
print(text, end='', flush=True)
else:
print(f"\nRequest id: {response.request_id}, Status code: {response.status_code}, error code: {response.code}, error message: {response.message}")
print("\n" + "-"*30 + "\n")
# ================= 运行入口 =================
if __name__ == "__main__":
bot = WikiBot()
print("✨ Wiki 知识库助手已启动 (输入 'q''exit' 退出)")
print("⚠️ 请确保后端服务 (main.py) 正在 localhost:8000 运行")
while True:
user_input = input("\n🙋 请输入问题: ").strip()
if user_input.lower() in ['q', 'exit', 'quit']:
print("再见!")
break
if not user_input:
continue
bot.chat(user_input)

3969
uv.lock generated

File diff suppressed because it is too large Load Diff