Files
wiki_crawler/backend/utils/text_process.py

61 lines
2.1 KiB
Python
Raw Normal View History

2026-01-13 01:37:26 +08:00
import re
from langchain_text_splitters import RecursiveCharacterTextSplitter, MarkdownHeaderTextSplitter
class TextProcessor:
"""文本处理工具类:负责 Markdown 清洗和切分"""
def __init__(self):
# 基于 Markdown 标题的语义切分器
self.md_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=[
("#", "h1"),
("##", "h2"),
("###", "h3"),
],
strip_headers=False
)
# 备用的字符切分器
self.char_splitter = RecursiveCharacterTextSplitter(
chunk_size=800,
chunk_overlap=100,
separators=["\n\n", "\n", "", "", "", " ", ""]
)
def clean_markdown(self, text: str) -> str:
"""清洗 Markdown 中的网页噪音"""
if not text: return ""
# 去除 'Skip to main content'
text = re.sub(r'\[Skip to main content\].*?\n', '', text, flags=re.IGNORECASE)
# 去除页脚导航 (Previous / Next)
text = re.sub(r'\[Previous\].*?\[Next\].*', '', text, flags=re.DOTALL | re.IGNORECASE)
return text.strip()
def split_markdown(self, text: str):
"""执行切分策略:先按标题切,过长则按字符切"""
md_chunks = self.md_splitter.split_text(text)
final_chunks = []
for chunk in md_chunks:
# chunk.page_content 是文本
# chunk.metadata 是标题层级
if len(chunk.page_content) > 1000:
sub_texts = self.char_splitter.split_text(chunk.page_content)
for sub in sub_texts:
final_chunks.append({
"content": sub,
"metadata": chunk.metadata
})
else:
final_chunks.append({
"content": chunk.page_content,
"metadata": chunk.metadata
})
return final_chunks
# 单例工具
text_processor = TextProcessor()