05RAG主流切块策略

alex
0
2025-12-05

一、策略全景图

1、主流的RAG文档切分策略体系

├── 基础切块策略

│ ├── 1. 固定大小切块 (Fixed-size Chunking)

│ ├── 2. 重叠切块 (Overlapping Chunking)

│ └── 3. 滑动窗口切块 (Sliding Window Chunking)

├── 内容感知策略

│ ├── 4. 句子窗口切块 (Sentence Window Chunking)

│ ├── 5. 文档结构切块 (Document Structure Chunking)

│ └── 8. LLM智能切块 (LLM-aided Chunking)

├── 智能递归策略

│ └── 6. 递归切块 (Recursive Chunking)

└── 高级语义策略

└── 7. 语义感知切块 (Semantic-aware Chunking)

二、详细策略说明

1. 固定大小切块 (Fixed-size Chunking)

核心思想: 将文档按照固定字符数或token数切割成等长的块

实现原理:

文档: [=============完整文档=============]
切分: [块1: 500字符][块2: 500字符][块3: 500字符]
def fixed_size_chunking(text: str, chunk_size: int = 512) -> List[str]:
    """
    最基础的固定大小切分
    
    Args:
        text: 输入文本
        chunk_size: 每块字符数
    
    Returns:
        等长文本块列表
    """
    return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]

# 改进版:支持token计数(更准确)
import tiktoken

class TokenBasedChunker:
    """基于token的固定大小切块"""
    def __init__(self, model_name: str = "gpt-4", chunk_size: int = 512):
        self.encoder = tiktoken.encoding_for_model(model_name)
        self.chunk_size = chunk_size
    
    def chunk(self, text: str) -> List[str]:
        """按token数切分"""
        tokens = self.encoder.encode(text)
        chunks = []
        
        for i in range(0, len(tokens), self.chunk_size):
            chunk_tokens = tokens[i:i + self.chunk_size]
            chunk_text = self.encoder.decode(chunk_tokens)
            chunks.append(chunk_text)
        
        return chunks

适用场景:

  • 代码文件处理

  • 日志文件分析

  • 批处理ETL流水线

  • 对语义完整性要求不高的场景

优点: ⚡ 极快速度 | 📏 大小均匀 | 🛠️ 实现简单
缺点: 💔 破坏语义 | 🔪 切分单词 | 🎯 检索质量低


2. 重叠切块 (Overlapping Chunking)

核心思想: 在固定切块基础上添加块间重叠,防止边界信息丢失

实现原理:

文档: [=============完整文档=============]
切分: [块1: 500字符]
        [块2: 500字符] ← 重叠200字符
                [块3: 500字符] ← 重叠200字符
class OverlappingChunker:
    """重叠切块策略"""
    def __init__(self, chunk_size: int = 500, overlap_size: int = 100):
        self.chunk_size = chunk_size
        self.overlap_size = overlap_size
    
    def chunk(self, text: str) -> List[str]:
        """生成重叠分块"""
        chunks = []
        step_size = self.chunk_size - self.overlap_size
        text_length = len(text)
        
        for start in range(0, text_length, step_size):
            end = min(start + self.chunk_size, text_length)
            chunk = text[start:end]
            
            # 优化:不在单词中间切断
            if end < text_length:
                chunk = self._adjust_to_word_boundary(chunk, text, end)
            
            chunks.append(chunk)
            
            if end >= text_length:
                break
        
        return chunks
    
    def _adjust_to_word_boundary(self, chunk: str, full_text: str, end_pos: int) -> str:
        """调整到单词边界"""
        # 如果是英文字母或数字,找到最近的非字母数字边界
        if end_pos < len(full_text) and full_text[end_pos].isalnum():
            # 向前寻找边界
            adjust_pos = end_pos - 1
            while adjust_pos > 0 and full_text[adjust_pos].isalnum():
                adjust_pos -= 1
            
            if adjust_pos > 0 and not full_text[adjust_pos].isalnum():
                return full_text[:adjust_pos + 1]
        
        return chunk

# 使用示例
chunker = OverlappingChunker(chunk_size=500, overlap_size=100)
chunks = chunker.chunk(long_document)

适用场景:

  • 技术文档检索

  • 段落级问答系统

  • 需要保持上下文连续性的场景

优点: 🔗 保持上下文 | 🛡️ 防止信息丢失 | ⚡ 相对高效
缺点: 📦 存储冗余 | 🔄 重复计算 | 🎯 仍需人工调参


3. 滑动窗口切块 (Sliding Window Chunking)

核心思想: 以固定步长在文档上滑动窗口,生成密集覆盖的分块

实现原理:

文档: [=============完整文档=============]
窗口: [窗口1] → [窗口2] → [窗口3] → [窗口4] → ...
步长:     ↓         ↓         ↓
      每次移动固定距离
class SlidingWindowChunker:
    """滑动窗口切块策略"""
    def __init__(self, window_size: int = 300, step_size: int = 100):
        self.window_size = window_size
        self.step_size = step_size
    
    def chunk(self, text: str) -> List[Dict]:
        """生成带位置信息的滑动窗口分块"""
        chunks = []
        text_length = len(text)
        
        for i in range(0, text_length - self.window_size + 1, self.step_size):
            window_text = text[i:i + self.window_size]
            
            chunk_info = {
                "text": window_text,
                "start": i,
                "end": i + self.window_size,
                "window_id": len(chunks),
                "metadata": {
                    "char_count": len(window_text),
                    "word_count": len(window_text.split())
                }
            }
            chunks.append(chunk_info)
        
        return chunks
    
    def chunk_with_context(self, text: str, context_before: int = 50, context_after: int = 50) -> List[Dict]:
        """生成带额外上下文的分块"""
        base_chunks = self.chunk(text)
        
        for chunk in base_chunks:
            start = max(0, chunk["start"] - context_before)
            end = min(len(text), chunk["end"] + context_after)
            
            chunk["full_context"] = text[start:end]
            chunk["context_before"] = text[start:chunk["start"]]
            chunk["context_after"] = text[chunk["end"]:end]
        
        return base_chunks

适用场景:

  • 密集检索任务

  • 命名实体识别

  • 关键词匹配

  • 模式发现

优点: 🔍 密集覆盖 | 📍 位置精确 | 🎯 适合局部匹配
缺点: 💾 存储爆炸 | ⏱️ 处理缓慢 | 🔄 高度冗余


4. 句子窗口切块 (Sentence Window Chunking)

核心思想: 以句子为单位构建窗口,保持句子完整性和上下文关系

实现原理:

句子: [S1][S2][S3][S4][S5][S6][S7][S8]
窗口: [S1 S2 S3 S4 S5] ← 窗口大小=5
           [S3 S4 S5 S6 S7] ← 滑动,保持句子完整

python

import spacy
from typing import List, Tuple

class SentenceWindowChunker:
    """句子窗口切块策略"""
    def __init__(self, window_sentences: int = 5, overlap_sentences: int = 2):
        self.window_sentences = window_sentences
        self.overlap_sentences = overlap_sentences
        self.nlp = spacy.load("zh_core_web_sm")  # 中文模型
    
    def chunk(self, text: str) -> List[Dict]:
        """基于句子的窗口切块"""
        # 分割句子
        doc = self.nlp(text)
        sentences = [sent.text.strip() for sent in doc.sents]
        
        chunks = []
        step = self.window_sentences - self.overlap_sentences
        
        for i in range(0, len(sentences), step):
            end_idx = min(i + self.window_sentences, len(sentences))
            window_sentences = sentences[i:end_idx]
            
            if not window_sentences:
                continue
            
            chunk_text = " ".join(window_sentences)
            
            chunk_info = {
                "text": chunk_text,
                "sentences": window_sentences,
                "sentence_range": (i, end_idx - 1),
                "sentence_count": len(window_sentences),
                "metadata": {
                    "avg_sentence_length": sum(len(s) for s in window_sentences) / len(window_sentences)
                }
            }
            chunks.append(chunk_info)
        
        return chunks
    
    def chunk_with_centered_sentence(self, text: str) -> List[Dict]:
        """以每个句子为中心构建窗口"""
        doc = self.nlp(text)
        sentences = [sent.text.strip() for sent in doc.sents]
        
        chunks = []
        half_window = self.window_sentences // 2
        
        for center_idx in range(len(sentences)):
            start_idx = max(0, center_idx - half_window)
            end_idx = min(len(sentences), center_idx + half_window + 1)
            
            window_sentences = sentences[start_idx:end_idx]
            chunk_text = " ".join(window_sentences)
            
            chunks.append({
                "text": chunk_text,
                "center_sentence": sentences[center_idx],
                "center_index": center_idx,
                "window_sentences": window_sentences
            })
        
        return chunks

适用场景:

  • 对话系统

  • 问答机器人

  • 情感分析

  • 文本摘要

优点: 🗣️ 句子完整 | 🤝 上下文连贯 | 🎯 语义质量高
缺点: 📏 长度不均 | 🐌 依赖NLP模型 | ⚙️ 配置复杂


5. 文档结构切块 (Document Structure Chunking)

核心思想: 基于文档的层级结构(标题、章节、段落)进行智能切分

实现原理:

文档结构:
  ├── 第1章 引言
  │    ├── 1.1 背景
  │    └── 1.2 目标
  ├── 第2章 方法
  │    ├── 2.1 实验设计
  │    └── 2.2 数据分析
  └── 第3章 结论
import re
from dataclasses import dataclass
from typing import List, Optional

@dataclass
class DocumentNode:
    """文档节点"""
    level: int
    title: str
    content: str
    children: List['DocumentNode']
    start_pos: int
    end_pos: int

class DocumentStructureChunker:
    """基于文档结构的切块策略"""
    
    def __init__(self):
        # Markdown标题正则
        self.markdown_patterns = [
            (r'^# (.+)$', 1),      # H1
            (r'^## (.+)$', 2),     # H2
            (r'^### (.+)$', 3),    # H3
            (r'^#### (.+)$', 4),   # H4
        ]
    
    def parse_markdown(self, text: str) -> DocumentNode:
        """解析Markdown文档结构"""
        lines = text.split('\n')
        root = DocumentNode(level=0, title="Root", content="", children=[], start_pos=0, end_pos=len(text))
        stack = [root]
        
        current_content = []
        current_start = 0
        
        for i, line in enumerate(lines):
            line_start = text.find(line, current_start)
            
            # 检查是否为标题
            is_heading = False
            heading_level = 0
            heading_text = ""
            
            for pattern, level in self.markdown_patterns:
                match = re.match(pattern, line)
                if match:
                    is_heading = True
                    heading_level = level
                    heading_text = match.group(1)
                    break
            
            if is_heading:
                # 保存当前节点的内容
                if stack and current_content:
                    stack[-1].content = '\n'.join(current_content)
                    stack[-1].end_pos = line_start
                
                # 创建新节点
                new_node = DocumentNode(
                    level=heading_level,
                    title=heading_text,
                    content="",
                    children=[],
                    start_pos=line_start,
                    end_pos=line_start + len(line)
                )
                
                # 找到父节点
                while stack and stack[-1].level >= heading_level:
                    stack.pop()
                
                if stack:
                    stack[-1].children.append(new_node)
                
                stack.append(new_node)
                current_content = []
            else:
                current_content.append(line)
            
            current_start = line_start + len(line) + 1
        
        # 处理最后一部分内容
        if stack and current_content:
            stack[-1].content = '\n'.join(current_content)
        
        return root
    
    def chunk_by_level(self, root: DocumentNode, target_level: int) -> List[str]:
        """按指定层级切分"""
        chunks = []
        
        def collect_nodes(node: DocumentNode):
            if node.level == target_level:
                chunk = f"{node.title}\n\n{node.content}".strip()
                if chunk:
                    chunks.append(chunk)
            
            for child in node.children:
                collect_nodes(child)
        
        collect_nodes(root)
        return chunks
    
    def chunk_hierarchical(self, root: DocumentNode) -> List[Dict]:
        """生成层次化分块"""
        chunks = []
        
        def traverse(node: DocumentNode, parent_titles: List[str]):
            # 当前节点的完整标题路径
            current_titles = parent_titles + [node.title]
            full_title = " > ".join(filter(None, current_titles))
            
            # 生成当前块
            if node.content.strip():
                chunks.append({
                    "text": node.content,
                    "title": full_title,
                    "level": node.level,
                    "path": current_titles,
                    "position": (node.start_pos, node.end_pos)
                })
            
            # 递归处理子节点
            for child in node.children:
                traverse(child, current_titles)
        
        traverse(root, [])
        return chunks

# 支持PDF/Word等结构化文档
class AdvancedDocumentChunker(DocumentStructureChunker):
    """支持多种文档格式的结构化切块"""
    
    def chunk_pdf(self, pdf_content: str) -> List[Dict]:
        """处理PDF文档结构"""
        # 检测PDF标题和章节
        # 实际实现需要使用PDF解析库如PyPDF2或pdfplumber
        pass
    
    def chunk_word(self, doc_content: str) -> List[Dict]:
        """处理Word文档结构"""
        # 检测Word的样式和格式
        # 实际实现需要使用python-docx
        pass

适用场景:

  • 学术论文检索

  • 技术手册

  • 法律文档

  • 结构化报告

优点: 🏗️ 保持结构 | 📚 层次清晰 | 🎯 检索精准
缺点: 🐌 解析复杂 | 📄 格式依赖 | ⚙️ 实现困难


6. 递归切块 (Recursive Chunking)

核心思想: 使用分隔符优先级列表递归分割,智能适应不同文档结构

实现原理:

第一级: 用"\n\n"分割 → [大块1][大块2][大块3]
第二级: 对>chunk_size的块用"\n"分割
第三级: 对仍然太大的块用"。"分割
...
直到所有块都≤chunk_size
from typing import List, Optional

class RecursiveChunker:
    """递归切块策略 - LangChain风格实现"""
    
    def __init__(self, 
                 chunk_size: int = 1000,
                 chunk_overlap: int = 200,
                 separators: Optional[List[str]] = None):
        
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        
        # 分隔符优先级列表(从大到小)
        self.separators = separators or [
            "\n\n",        # 双换行(段落)
            "\n",          # 单换行
            "。", ".",     # 句子结束
            ";", ";",     # 分号
            ",", ",",     # 逗号
            " ", " ",      # 空格
            ""             # 字符级别(最后手段)
        ]
    
    def chunk(self, text: str) -> List[str]:
        """递归切分文档"""
        return self._split_text_recursive(text, self.separators)
    
    def _split_text_recursive(self, text: str, separators: List[str]) -> List[str]:
        """递归分割实现"""
        # 基本情况:文本足够小
        if len(text) <= self.chunk_size:
            return [text]
        
        # 尝试当前分隔符
        separator = separators[0] if separators else ""
        
        if separator:
            splits = text.split(separator)
        else:
            # 字符级分割
            splits = [text[i:i+self.chunk_size] for i in range(0, len(text), self.chunk_size)]
        
        # 合并小的分割片段
        merged_splits = self._merge_splits(splits, separator)
        
        # 检查合并后的分块
        good_splits = []
        need_recursive = False
        
        for split in merged_splits:
            if len(split) <= self.chunk_size:
                good_splits.append(split)
            else:
                need_recursive = True
                # 递归处理这个太大的分块
                remaining_separators = separators[1:] if len(separators) > 1 else [""]
                sub_splits = self._split_text_recursive(split, remaining_separators)
                good_splits.extend(sub_splits)
        
        # 如果没有需要递归处理的,直接返回
        if not need_recursive:
            return good_splits
        
        return good_splits
    
    def _merge_splits(self, splits: List[str], separator: str) -> List[str]:
        """合并小片段,添加重叠"""
        merged = []
        current_chunk = ""
        
        for split in splits:
            # 如果加上当前片段不超过chunk_size,就合并
            if len(current_chunk) + len(split) + len(separator) <= self.chunk_size:
                if current_chunk:
                    current_chunk += separator + split
                else:
                    current_chunk = split
            else:
                # 保存当前块
                if current_chunk:
                    merged.append(current_chunk)
                
                # 开始新块,可能包含重叠
                if self.chunk_overlap > 0 and current_chunk:
                    # 从当前块末尾取重叠部分
                    overlap_text = current_chunk[-self.chunk_overlap:]
                    current_chunk = overlap_text + separator + split
                else:
                    current_chunk = split
        
        # 添加最后一个块
        if current_chunk:
            merged.append(current_chunk)
        
        return merged
    
    def chunk_with_metadata(self, text: str) -> List[Dict]:
        """带元数据的切分"""
        chunks = self.chunk(text)
        
        enhanced_chunks = []
        for i, chunk_text in enumerate(chunks):
            # 找到块在原文中的位置
            start_pos = text.find(chunk_text)
            end_pos = start_pos + len(chunk_text)
            
            enhanced_chunks.append({
                "text": chunk_text,
                "chunk_id": i,
                "position": (start_pos, end_pos),
                "length": len(chunk_text),
                "token_count": len(chunk_text.split()),  # 简化的token计数
                "metadata": {
                    "chunk_size": self.chunk_size,
                    "chunk_overlap": self.chunk_overlap
                }
            })
        
        return enhanced_chunks

适用场景:

  • 通用文档处理

  • 混合格式内容

  • 企业内部知识库

  • 内容管理系统

优点: 🧠 智能适应 | 🎯 质量均衡 | ⚙️ 配置灵活
缺点: 🔄 递归开销 | 📏 大小不均 | 🎛️ 参数敏感


7. 语义感知切块 (Semantic-aware Chunking)

核心思想: 使用文本嵌入模型检测语义边界,基于内容相似度智能切分

实现原理:

文档: [段落1][段落2][段落3][段落4][段落5]
嵌入: [向量1][向量2][向量3][向量4][向量5]
相似度:  高     高     低     高     高
切分点:              ↑ (相似度低于阈值)
结果: [块1: 段落1-2-3][块2: 段落4-5]
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.cluster import AgglomerativeClustering

class SemanticAwareChunker:
    """语义感知切块策略"""
    
    def __init__(self, 
                 model_name: str = "paraphrase-multilingual-MiniLM-L12-v2",
                 similarity_threshold: float = 0.75,
                 min_chunk_size: int = 100,
                 max_chunk_size: int = 800):
        
        self.model = SentenceTransformer(model_name)
        self.similarity_threshold = similarity_threshold
        self.min_chunk_size = min_chunk_size
        self.max_chunk_size = max_chunk_size
    
    def chunk(self, text: str) -> List[Dict]:
        """基于语义的智能切分"""
        # 1. 预处理:分割为基本单元
        basic_units = self._split_to_units(text)
        
        # 2. 计算每个单元的嵌入
        embeddings = self.model.encode(basic_units)
        
        # 3. 基于相似度合并单元
        chunks = self._merge_by_semantic(basic_units, embeddings)
        
        # 4. 后处理:确保大小合适
        final_chunks = self._post_process(chunks)
        
        return final_chunks
    
    def _split_to_units(self, text: str) -> List[str]:
        """将文本分割为基本语义单元"""
        # 方法1:按段落分割
        paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
        
        # 如果段落太大,进一步分割
        units = []
        for para in paragraphs:
            if len(para) > self.max_chunk_size:
                # 按句子分割
                sentences = self._split_sentences(para)
                units.extend(sentences)
            else:
                units.append(para)
        
        return units
    
    def _split_sentences(self, text: str) -> List[str]:
        """分割句子(简化版)"""
        import re
        # 中英文句子分隔符
        delimiters = r'[。!?;.!?;]'
        sentences = re.split(delimiters, text)
        return [s.strip() for s in sentences if s.strip()]
    
    def _merge_by_semantic(self, units: List[str], embeddings: np.ndarray) -> List[Dict]:
        """基于语义相似度合并单元"""
        if len(units) <= 1:
            return [{"text": units[0] if units else "", "units": units}]
        
        chunks = []
        current_chunk = [units[0]]
        current_embedding = embeddings[0].reshape(1, -1)
        
        for i in range(1, len(units)):
            # 计算当前块与新单元的相似度
            new_embedding = embeddings[i].reshape(1, -1)
            
            # 计算平均嵌入
            combined_embeddings = np.vstack([current_embedding, new_embedding])
            avg_embedding = np.mean(combined_embeddings, axis=0, keepdims=True)
            
            # 计算与之前块的相似度
            similarity = self._cosine_similarity(current_embedding.flatten(), avg_embedding.flatten())
            
            # 检查合并条件
            should_merge = (
                similarity > self.similarity_threshold and
                sum(len(u) for u in current_chunk) + len(units[i]) <= self.max_chunk_size
            )
            
            if should_merge:
                current_chunk.append(units[i])
                current_embedding = avg_embedding
            else:
                # 保存当前块
                chunk_text = " ".join(current_chunk)
                chunks.append({
                    "text": chunk_text,
                    "units": current_chunk.copy(),
                    "embedding": current_embedding.flatten(),
                    "size": len(chunk_text)
                })
                
                # 开始新块
                current_chunk = [units[i]]
                current_embedding = embeddings[i].reshape(1, -1)
        
        # 添加最后一个块
        if current_chunk:
            chunk_text = " ".join(current_chunk)
            chunks.append({
                "text": chunk_text,
                "units": current_chunk,
                "embedding": current_embedding.flatten(),
                "size": len(chunk_text)
            })
        
        return chunks
    
    def _post_process(self, chunks: List[Dict]) -> List[Dict]:
        """后处理:调整分块大小"""
        final_chunks = []
        
        for chunk in chunks:
            if chunk["size"] < self.min_chunk_size and final_chunks:
                # 合并到前一个块
                final_chunks[-1]["text"] += " " + chunk["text"]
                final_chunks[-1]["units"].extend(chunk["units"])
                final_chunks[-1]["size"] += chunk["size"]
            elif chunk["size"] > self.max_chunk_size:
                # 重新切分太大的块
                sub_chunks = self._split_large_chunk(chunk)
                final_chunks.extend(sub_chunks)
            else:
                final_chunks.append(chunk)
        
        return final_chunks
    
    def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
        """计算余弦相似度"""
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
    
    def _split_large_chunk(self, chunk: Dict) -> List[Dict]:
        """切分过大的块"""
        # 使用更激进的分割策略
        text = chunk["text"]
        sentences = self._split_sentences(text)
        
        sub_chunks = []
        current = []
        
        for sentence in sentences:
            if sum(len(s) for s in current) + len(sentence) > self.max_chunk_size:
                if current:
                    sub_chunks.append({
                        "text": " ".join(current),
                        "units": current.copy(),
                        "size": sum(len(s) for s in current)
                    })
                current = [sentence]
            else:
                current.append(sentence)
        
        if current:
            sub_chunks.append({
                "text": " ".join(current),
                "units": current,
                "size": sum(len(s) for s in current)
            })
        
        return sub_chunks

动物装饰