import re from langchain_text_splitters import RecursiveCharacterTextSplitter, MarkdownHeaderTextSplitter class TextProcessor: """文本处理工具类:负责 Markdown 清洗和切分""" def __init__(self): # 基于 Markdown 标题的语义切分器 self.md_splitter = MarkdownHeaderTextSplitter( headers_to_split_on=[ ("#", "h1"), ("##", "h2"), ("###", "h3"), ], strip_headers=False ) # 备用的字符切分器 self.char_splitter = RecursiveCharacterTextSplitter( chunk_size=800, chunk_overlap=100, separators=["\n\n", "\n", "。", "!", "?", " ", ""] ) def clean_markdown(self, text: str) -> str: """清洗 Markdown 中的网页噪音""" if not text: return "" # 去除 'Skip to main content' text = re.sub(r'\[Skip to main content\].*?\n', '', text, flags=re.IGNORECASE) # 去除页脚导航 (Previous / Next) text = re.sub(r'\[Previous\].*?\[Next\].*', '', text, flags=re.DOTALL | re.IGNORECASE) return text.strip() def split_markdown(self, text: str): """执行切分策略:先按标题切,过长则按字符切""" md_chunks = self.md_splitter.split_text(text) final_chunks = [] for chunk in md_chunks: # chunk.page_content 是文本 # chunk.metadata 是标题层级 if len(chunk.page_content) > 1000: sub_texts = self.char_splitter.split_text(chunk.page_content) for sub in sub_texts: final_chunks.append({ "content": sub, "metadata": chunk.metadata }) else: final_chunks.append({ "content": chunk.page_content, "metadata": chunk.metadata }) return final_chunks # 单例工具 text_processor = TextProcessor()