013RAG上下文工程-如何预防上下文窗口爆掉?

alex
0
2026-04-01

目录

  1. 什么是上下文窗口

  2. 上下文窗口为何会爆掉

  3. 主流解决方案全景图

  4. 方案一:重排序 + 动态截断

  5. 方案二:多级索引(父子文档)

  6. 方案三:对话记忆分层(滑动窗口+摘要)

  7. 方案四:Map-Reduce 模式

  8. 方案五:上下文压缩(LLMLingua)

  9. 方案选型指南


一、什么是上下文窗口

1.1 基本概念

上下文窗口(Context Window) 是大语言模型(LLM)在一次推理中可以处理的 最大 Token 数量

# 简单理解
上下文窗口 = 输入Token数(你的问题 + 检索内容) + 输出Token数(模型回答)

1.2 Token 是什么

Token 是模型处理文本的最小单位:

语言

1 Token 约等于

英文

0.75 个单词(4 个字符)

中文

0.5-1 个汉字

代码

1-2 个字符

示例

# "RAG技术很好" 的 Token 化
中文: ["RAG", "技术", "很好"]  # 约 3-4 Tokens
英文: "RAG technology is good"  # 约 5-6 Tokens

1.3 常见模型的上下文窗口

模型

窗口大小

约等于

GPT-3.5

4K-16K

3000-12000 字

GPT-4

8K-32K

6000-24000 字

GPT-4 Turbo

128K

约 10 万字

Claude 3

200K

约 15 万字

Gemini 1.5 Pro

1M-2M

约 75-150 万字

开源模型(Llama 3)

8K-128K

因版本而异


二、上下文窗口为何会爆掉

2.1 问题示意

# 一个典型 RAG 流程的 Token 消耗
system_prompt = "你是一个AI助手..."  # 500 tokens
user_query = "请总结这份财报..."     # 50 tokens
retrieved_docs = """
文档块1: 财报第1-5页...   # 2000 tokens
文档块2: 财报第6-10页..   # 2000 tokens
...
文档块20: 财报第95-100页  # 2000 tokens
"""  # 总计 40000 tokens

total = 500 + 50 + 40000 = 40550 tokens
# 如果模型窗口是 8K,直接爆掉!

2.2 爆掉的主要原因

原因 1:召回数量过多

# 常见错误:为了提高召回率,取太多文档块
retrieved_chunks = vector_store.similarity_search(query, k=50)  # 50个块
# 每个块 500 tokens,总输入 = 25000 tokens

原因 2:文档切块过细

# 过度切分导致大量重复上下文
chunk_size = 200  # 太小
# 100页文档 → 1500个块 → 检索50个块 → 信息碎片化严重

原因 3:对话历史累积

# 多轮对话,历史消息不断累积
messages = [
    {"role": "user", "content": "第一轮问题..."},  # 500 tokens
    {"role": "assistant", "content": "第一轮回答..."},  # 800 tokens
    {"role": "user", "content": "第二轮问题..."},  # 300 tokens
    {"role": "assistant", "content": "第二轮回答..."},  # 600 tokens
    # ... 50轮后,总 token 可能超过 10万
]

原因 4:文档冗余

# 多个检索块包含重复内容
chunk_1 = "公司的营收为100亿,利润20亿..."  # 500 tokens
chunk_2 = "本季度公司营收100亿,同比增长..."  # 500 tokens
# 两块都包含"营收100亿",浪费窗口空间

2.3 爆掉的后果

后果

说明

请求失败

直接报错:max_tokens exceeded

截断损失

部分输入被截断,信息丢失

成本浪费

大量 token 用于无用信息

性能下降

长上下文导致推理速度变慢

中间迷失

重要信息在长上下文中被忽略


三、主流解决方案全景图

3.1 解决方案分类

上下文窗口管理
├── 检索前优化(预防)
│   ├── 多级索引
│   └── 查询路由
├── 检索中优化(控制)
│   └── 动态检索策略
├── 检索后压缩(处理)
│   ├── 重排序+截断 ⭐ 最主流
│   ├── 上下文压缩
│   └── 文档重组
└── 架构级方案
    ├── Map-Reduce
    └── 对话记忆分层

3.2 各方案对比速览

方案

适用场景

延迟

成本

信息保留

复杂度

重排序+截断

知识库问答

多级索引

长文档问答

⭐⭐⭐

对话记忆分层

多轮对话

⭐⭐

Map-Reduce

超长文档总结

⭐⭐

LLMLingua

高吞吐场景

⭐⭐⭐


四、方案一:重排序 + 动态截断

4.1 方案概述

核心思想:先用高召回算法(向量检索)取足够多的候选,再用精确模型(交叉编码器)重排序,只取最相关的 Top-K 进入上下文。

为什么是主流:简单、高效、成本可控,在 80% 的 RAG 场景中够用。

4.2 技术原理

传统做法:
    用户问题 → 向量检索 → 取 Top-20 → 全部塞入 LLM
                     ↓
              问题:20个块中可能只有5个有用,浪费窗口

重排序做法:
    用户问题 → 向量检索 → 取 Top-50 → 重排序 → 取 Top-5 → 塞入 LLM
                     ↓              ↓
                高召回率        精排序,去冗余

关键技术

1. 向量检索(Embedding)

# 快速但粗糙
query_embedding = embed_model.encode(query)
candidates = vector_db.search(query_embedding, k=50)  # 高召回

2. 交叉编码器(Cross-Encoder)

# 慢速但精确,联合编码 query 和 document
def rerank(query, documents):
    scores = []
    for doc in documents:
        # 将 query 和 document 一起编码
        input = f"[CLS] {query} [SEP] {doc} [SEP]"
        score = cross_encoder(input)  # 输出相关性分数
        scores.append(score)
    return sorted(documents, key=scores, reverse=True)

4.3 实现流程

4.4 代码实现

import tiktoken
from sentence_transformers import CrossEncoder
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings

class RAGWithReranking:
    def __init__(self, llm, vector_store, max_tokens=8000):
        self.llm = llm
        self.vector_store = vector_store
        self.max_tokens = max_tokens
        self.tokenizer = tiktoken.get_encoding("cl100k_base")
        # 加载重排序模型(开源)
        self.reranker = CrossEncoder('BAAI/bge-reranker-base')
        
    def count_tokens(self, text):
        return len(self.tokenizer.encode(text))
    
    def rerank_documents(self, query, documents, top_k=5):
        """重排序"""
        pairs = [[query, doc.page_content] for doc in documents]
        scores = self.reranker.predict(pairs)
        
        # 按分数排序
        sorted_docs = sorted(
            zip(documents, scores), 
            key=lambda x: x[1], 
            reverse=True
        )
        return [doc for doc, score in sorted_docs[:top_k]]
    
    def truncate_by_token(self, documents, max_tokens):
        """Token 截断"""
        truncated_docs = []
        current_tokens = 0
        
        for doc in documents:
            doc_tokens = self.count_tokens(doc.page_content)
            if current_tokens + doc_tokens <= max_tokens:
                truncated_docs.append(doc)
                current_tokens += doc_tokens
            else:
                # 截断最后一个文档
                remaining = max_tokens - current_tokens
                if remaining > 100:  # 至少保留100 tokens
                    truncated_content = doc.page_content[:remaining * 4]
                    doc.page_content = truncated_content
                    truncated_docs.append(doc)
                break
        return truncated_docs
    
    def query(self, question):
        # 1. 高召回检索
        candidates = self.vector_store.similarity_search(question, k=50)
        
        # 2. 重排序,取 Top-5
        reranked_docs = self.rerank_documents(question, candidates, top_k=5)
        
        # 3. 计算剩余窗口(预留1000 tokens给 system prompt 和回答)
        available_tokens = self.max_tokens - 1000
        
        # 4. Token 截断
        final_docs = self.truncate_by_token(reranked_docs, available_tokens)
        
        # 5. 构建 Prompt
        context = "\n\n".join([doc.page_content for doc in final_docs])
        prompt = f"""基于以下参考资料回答问题:

参考资料:
{context}

问题:{question}

请给出准确、简洁的回答。"""

        # 6. 调用 LLM
        response = self.llm.invoke(prompt)
        return response

# 使用示例
from langchain.chat_models import ChatOpenAI

llm = ChatOpenAI(model="gpt-3.5-turbo", max_tokens=1000)
vector_store = Chroma(...)  # 你的向量数据库

rag = RAGWithReranking(llm, vector_store, max_tokens=4000)
answer = rag.query("2023年公司营收是多少?")
print(answer)

4.5 效果评估

指标

数值

Token 节省

60%-80%

准确率提升

5%-15%(相比直接取Top-K)

延迟增加

0.2-0.5秒(重排序计算)

成本降低

50%-70%(输入token减少)


五、方案二:多级索引(父子文档)

5.1 方案概述

核心思想:构建层次化的文档结构,小粒度检索、大粒度提供上下文,避免信息碎片化。

适用场景:长文档问答、技术文档、法律合同等需要完整上下文的场景。

5.2 技术原理

# 传统切块:信息可能被切断
文档原文:
"公司营收达到100亿。主要来源于...(中间内容)...利润20亿。"

切块1: "公司营收达到100亿。"  # 只有营收,没有利润
切块2: "利润20亿。"          # 只有利润,没有营收

# 父子文档:保留结构关系
父文档(段落级):
"公司营收达到100亿。主要来源于...利润20亿。"  # 完整段落

子文档(句子级):
"公司营收达到100亿。"  # 指向父文档1
"利润20亿。"           # 指向父文档1

5.3 索引结构设计

python

{
  "chunk_id": "chunk_001",
  "content": "公司的营收达到100亿...",  # 子文档内容
  "type": "child",
  "parent_id": "parent_001",           # 指向父文档
  "metadata": {...}
}

{
  "chunk_id": "parent_001", 
  "content": "完整的段落内容...",        # 父文档内容
  "type": "parent",
  "child_ids": ["chunk_001", "chunk_002"],
  "metadata": {...}
}

5.4 实现流程

5.5 代码实现

from typing import List, Dict
import hashlib

class HierarchicalIndex:
    def __init__(self, vector_store, doc_store):
        self.vector_store = vector_store
        self.doc_store = doc_store  # 存储完整文档
        
    def create_hierarchy(self, document: str, chunk_size: int = 1000):
        """创建父子文档索引"""
        # 1. 按段落分父文档
        paragraphs = document.split('\n\n')
        parent_chunks = []
        
        for para in paragraphs:
            if len(para) < 100:
                continue
                
            # 父文档ID
            parent_id = hashlib.md5(para.encode()).hexdigest()
            
            # 存储父文档
            parent_chunk = {
                "id": parent_id,
                "content": para,
                "type": "parent"
            }
            self.doc_store[parent_id] = parent_chunk
            
            # 2. 将父文档切分成子文档
            sentences = self.split_sentences(para)
            child_chunks = []
            
            for sent in sentences:
                child_id = hashlib.md5(sent.encode()).hexdigest()
                child_chunk = {
                    "id": child_id,
                    "content": sent,
                    "type": "child",
                    "parent_id": parent_id
                }
                # 子文档存入向量库
                self.vector_store.add_texts(
                    [sent], 
                    metadatas=[{"parent_id": parent_id, "chunk_id": child_id}]
                )
                child_chunks.append(child_chunk)
            
            parent_chunk["child_ids"] = [c["id"] for c in child_chunks]
        
        return parent_chunks
    
    def split_sentences(self, text: str) -> List[str]:
        """简单分句(生产环境建议用 spacy/nltk)"""
        import re
        sentences = re.split(r'[。!?.!?]', text)
        return [s.strip() for s in sentences if s.strip()]
    
    def retrieve_with_context(self, query: str, strategy: str = "parent"):
        """根据策略检索"""
        # 1. 检索子文档
        child_results = self.vector_store.similarity_search(query, k=5)
        
        if strategy == "child":
            # 直接返回子文档内容
            return child_results
        
        elif strategy == "parent":
            # 返回对应的父文档(去重)
            parent_ids = set()
            for doc in child_results:
                parent_id = doc.metadata.get("parent_id")
                if parent_id:
                    parent_ids.add(parent_id)
            
            parents = [self.doc_store[pid] for pid in parent_ids]
            return parents
        
        elif strategy == "expansion":
            # 返回父文档 + 相邻文档
            parents = []
            for doc in child_results:
                parent_id = doc.metadata.get("parent_id")
                if parent_id:
                    parent = self.doc_store[parent_id]
                    parents.append(parent)
                    
                    # 获取相邻段落(需要额外实现)
                    neighbors = self.get_neighbors(parent_id)
                    parents.extend(neighbors)
            
            # 去重
            unique_parents = {p["id"]: p for p in parents}.values()
            return list(unique_parents)
    
    def get_neighbors(self, parent_id: str) -> List[Dict]:
        """获取相邻的父文档"""
        # 需要维护文档顺序,简化实现略
        pass


# 使用示例
class RAGWithHierarchy:
    def __init__(self, llm, vector_store, doc_store):
        self.llm = llm
        self.hierarchy = HierarchicalIndex(vector_store, doc_store)
    
    def query(self, question, strategy="parent"):
        # 根据问题类型选择策略
        if "是什么" in question or "多少" in question:
            strategy = "child"  # 事实性问题,用子文档
        elif "为什么" in question or "总结" in question:
            strategy = "parent"  # 理解性问题,用父文档
        
        # 检索
        contexts = self.hierarchy.retrieve_with_context(question, strategy)
        
        # 构建 prompt
        context_text = "\n\n".join([c["content"] for c in contexts])
        prompt = f"""参考以下内容回答问题:

{context_text}

问题:{question}
回答:"""
        
        return self.llm.invoke(prompt)

5.6 策略选择指南

问题类型

推荐策略

原因

"XX是多少?"

child

只需要具体数值

"XX是什么意思?"

parent

需要完整上下文理解

"总结一下XX"

parent + neighbors

需要完整段落

"对比A和B"

parent + expansion

可能需要前后文


六、方案三:对话记忆分层(滑动窗口+摘要)

6.1 方案概述

核心思想:将对话记忆分为三层,保留最近对话的完整细节,将早期对话压缩成摘要,关键信息永久保存。

适用场景:多轮对话、AI助手、客服系统。

6.2 三层记忆架构

┌─────────────────────────────────────────────────┐
│                 当前上下文窗口                    │
├─────────────────────────────────────────────────┤
│  持久层(全局摘要)                              │
│  "用户是程序员,偏好Python,正在开发RAG项目..."    │
├─────────────────────────────────────────────────┤
│  摘要层(滚动摘要)                              │
│  "第1-3轮讨论了架构设计,第4-6轮确定了技术选型..." │
├─────────────────────────────────────────────────┤
│  窗口层(最近N轮完整对话)                        │
│  User: "那用什么向量数据库?"                    │
│  Assistant: "建议用Milvus或Chroma..."          │
└─────────────────────────────────────────────────┘

6.3 实现原理

核心机制

  1. 滑动窗口:保留最近 K 轮完整对话

  2. 滚动摘要:当窗口满时,将最早的一批对话压缩成摘要

  3. 全局记忆:从摘要中提取关键信息持久化

6.4 代码实现

python

from typing import List, Dict
from datetime import datetime
import tiktoken

class LayeredChatMemory:
    def __init__(self, llm, max_window_tokens=3000, max_summary_tokens=500):
        self.llm = llm
        self.max_window_tokens = max_window_tokens  # 窗口层上限
        self.max_summary_tokens = max_summary_tokens  # 摘要层上限
        
        self.window_messages = []  # 最近N轮完整对话
        self.rolling_summary = ""  # 滚动摘要
        self.global_memory = {
            "user_profile": {},      # 用户画像
            "key_facts": [],         # 关键事实
            "preferences": [],       # 偏好
            "unfinished_tasks": []   # 未完成任务
        }
        
        self.tokenizer = tiktoken.get_encoding("cl100k_base")
    
    def count_tokens(self, text):
        return len(self.tokenizer.encode(text))
    
    def add_message(self, role: str, content: str):
        """添加新消息"""
        message = {
            "role": role,
            "content": content,
            "timestamp": datetime.now()
        }
        self.window_messages.append(message)
        
        # 检查是否需要压缩
        self._check_and_compress()
    
    def _check_and_compress(self):
        """检查窗口是否超限"""
        current_tokens = self._count_window_tokens()
        
        if current_tokens > self.max_window_tokens:
            self._compress_old_messages()
    
    def _count_window_tokens(self):
        """统计窗口总token"""
        total = 0
        for msg in self.window_messages:
            total += self.count_tokens(msg["content"])
        return total
    
    def _compress_old_messages(self):
        """压缩最早的对话"""
        # 找出要压缩的消息(窗口的一半)
        compress_count = len(self.window_messages) // 3
        if compress_count < 2:
            compress_count = 2
        
        messages_to_compress = self.window_messages[:compress_count]
        self.window_messages = self.window_messages[compress_count:]
        
        # 生成新的摘要
        new_summary = self._generate_summary(messages_to_compress)
        
        # 更新滚动摘要
        if self.rolling_summary:
            self.rolling_summary = self._merge_summaries(
                self.rolling_summary, new_summary
            )
        else:
            self.rolling_summary = new_summary
        
        # 提取关键信息到全局记忆
        self._extract_to_global_memory(messages_to_compress)
    
    def _generate_summary(self, messages: List[Dict]) -> str:
        """生成对话摘要"""
        # 将消息格式化成文本
        conversation = "\n".join([
            f"{m['role']}: {m['content']}" 
            for m in messages
        ])
        
        prompt = f"""请总结以下对话,保留关键信息:
- 用户的身份和偏好
- 重要的事实和决策
- 未完成的任务

对话:
{conversation}

摘要:"""
        
        summary = self.llm.invoke(prompt)
        return summary
    
    def _merge_summaries(self, old_summary: str, new_summary: str) -> str:
        """合并新旧摘要"""
        prompt = f"""将以下两个摘要合并成一个简洁的摘要:

摘要1(较旧的对话):
{old_summary}

摘要2(较新的对话):
{new_summary}

合并后的摘要:"""
        
        merged = self.llm.invoke(prompt)
        
        # Token限制
        if self.count_tokens(merged) > self.max_summary_tokens:
            # 进一步压缩
            merged = self._compress_summary(merged)
        
        return merged
    
    def _compress_summary(self, summary: str) -> str:
        """压缩摘要到指定token数"""
        prompt = f"""将以下摘要压缩成更简洁的形式,保留最关键信息:

{summary}

简洁版本:"""
        
        return self.llm.invoke(prompt)
    
    def _extract_to_global_memory(self, messages: List[Dict]):
        """提取关键信息到全局记忆"""
        conversation = "\n".join([
            f"{m['role']}: {m['content']}" 
            for m in messages
        ])
        
        prompt = f"""从以下对话中提取关键信息,按JSON格式输出:

对话:
{conversation}

输出格式:
{{
    "user_profile": {{"职业": "", "技能": "", ...}},
    "key_facts": ["事实1", "事实2"],
    "preferences": ["偏好1", "偏好2"],
    "unfinished_tasks": ["任务1", "任务2"]
}}

只输出JSON:"""
        
        import json
        try:
            extracted = self.llm.invoke(prompt)
            new_info = json.loads(extracted)
            
            # 合并到全局记忆
            for key in new_info:
                if key in self.global_memory:
                    if isinstance(self.global_memory[key], list):
                        self.global_memory[key].extend(new_info[key])
                        # 去重
                        self.global_memory[key] = list(set(self.global_memory[key]))
                    elif isinstance(self.global_memory[key], dict):
                        self.global_memory[key].update(new_info[key])
        except:
            pass
    
    def get_context(self) -> str:
        """获取当前完整上下文"""
        context_parts = []
        
        # 1. 全局记忆
        if self.global_memory["user_profile"] or self.global_memory["key_facts"]:
            context_parts.append("【用户档案】")
            context_parts.append(str(self.global_memory["user_profile"]))
            context_parts.append("【关键事实】")
            context_parts.append("\n".join(self.global_memory["key_facts"]))
        
        # 2. 滚动摘要
        if self.rolling_summary:
            context_parts.append("【历史对话摘要】")
            context_parts.append(self.rolling_summary)
        
        # 3. 最近对话窗口
        if self.window_messages:
            context_parts.append("【最近对话】")
            for msg in self.window_messages[-5:]:  # 保留最近5轮完整对话
                context_parts.append(f"{msg['role']}: {msg['content']}")
        
        return "\n".join(context_parts)
    
    def clear(self):
        """清空记忆"""
        self.window_messages = []
        self.rolling_summary = ""
        self.global_memory = {
            "user_profile": {},
            "key_facts": [],
            "preferences": [],
            "unfinished_tasks": []
        }


# 使用示例
class ChatBot:
    def __init__(self, llm):
        self.llm = llm
        self.memory = LayeredChatMemory(llm)
    
    def chat(self, user_input):
        # 添加用户消息
        self.memory.add_message("user", user_input)
        
        # 获取上下文
        context = self.memory.get_context()
        
        # 构建 prompt
        prompt = f"""你是一个AI助手。以下是对话背景:

{context}

请根据以上信息回答用户的问题。
当前用户:{user_input}
助手:"""
        
        # 生成回答
        response = self.llm.invoke(prompt)
        
        # 添加助手消息
        self.memory.add_message("assistant", response)
        
        return response


# 运行示例
bot = ChatBot(llm)

# 多轮对话
bot.chat("我是程序员,擅长Python")
bot.chat("我在做一个RAG项目")
bot.chat("用什么向量数据库好?")
bot.chat("Chroma和Milvus哪个更适合生产环境?")
# 第5轮时,早期对话会被自动压缩

6.5 压缩时机与策略

class CompressionStrategy:
    """压缩策略配置"""
    
    # 策略1:基于Token数量
    token_threshold = 3000  # 超过3000 tokens触发压缩
    compress_ratio = 0.3    # 压缩30%的最旧消息
    
    # 策略2:基于消息数量
    message_threshold = 15   # 超过15轮触发压缩
    keep_recent = 5          # 保留最近5轮
    
    # 策略3:基于时间(适合长对话)
    time_threshold = 3600    # 1小时前的对话压缩
    keep_recent_time = 300   # 保留最近5分钟
    
    # 策略4:混合策略(推荐)
    def should_compress(self, messages, tokens):
        if tokens > 3000:
            return True
        if len(messages) > 15:
            return True
        return False

七、方案四:Map-Reduce 模式

7.1 方案概述

核心思想:将超长文档分批处理,先分别总结(Map),再合并总结(Reduce),避免一次性加载全部内容。

适用场景:超长文档总结、财报分析、长篇小说理解。

7.2 技术原理

传统做法:
    100页财报 → 全部加载 → LLM总结 → 可能爆窗

Map-Reduce做法:
    100页财报 → 分成10批(每批10页)
                    ↓
    Map阶段:并行处理
        批次1 → 总结1
        批次2 → 总结2
        ...
        批次10 → 总结10
                    ↓
    Reduce阶段:
        10个总结 → 合并 → 最终总结

7.3 代码实现

from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict

class MapReduceRAG:
    def __init__(self, llm, max_chunk_tokens=3000):
        self.llm = llm
        self.max_chunk_tokens = max_chunk_tokens
    
    def split_document(self, document: str) -> List[str]:
        """切分文档(按token)"""
        chunks = []
        current_chunk = []
        current_tokens = 0
        
        for sentence in self.split_sentences(document):
            sentence_tokens = self.count_tokens(sentence)
            
            if current_tokens + sentence_tokens > self.max_chunk_tokens:
                chunks.append(" ".join(current_chunk))
                current_chunk = [sentence]
                current_tokens = sentence_tokens
            else:
                current_chunk.append(sentence)
                current_tokens += sentence_tokens
        
        if current_chunk:
            chunks.append(" ".join(current_chunk))
        
        return chunks
    
    def split_sentences(self, text: str) -> List[str]:
        """简单分句"""
        import re
        sentences = re.split(r'[。!?.!?]', text)
        return [s.strip() for s in sentences if s.strip()]
    
    def count_tokens(self, text):
        """简单token计数(生产环境用tiktoken)"""
        return len(text) // 4  # 粗略估计
    
    def map_summarize(self, chunk: str, idx: int) -> Dict:
        """Map阶段:对单个chunk进行总结"""
        prompt = f"""请总结以下内容,提取关键信息(事实、数据、结论):

第{idx}部分:
{chunk}

总结:"""
        
        summary = self.llm.invoke(prompt)
        return {
            "index": idx,
            "summary": summary,
            "length": len(chunk)
        }
    
    def reduce_summarize(self, summaries: List[Dict]) -> str:
        """Reduce阶段:合并所有总结"""
        # 按顺序排列
        summaries_sorted = sorted(summaries, key=lambda x: x["index"])
        
        # 合并总结文本
        combined = "\n\n".join([
            f"## 第{s['index']}部分\n{s['summary']}" 
            for s in summaries_sorted
        ])
        
        prompt = f"""以下是对同一份文档不同部分的总结,请将它们整合成一份完整的、连贯的总结:

{combined}

要求:
1. 去除重复信息
2. 按逻辑顺序组织
3. 保留所有关键事实和数据
4. 给出最终总结

最终总结:"""
        
        final_summary = self.llm.invoke(prompt)
        return final_summary
    
    def query(self, document: str, question: str = None) -> str:
        """执行 Map-Reduce"""
        # 1. 切分文档
        chunks = self.split_document(document)
        print(f"文档切分为 {len(chunks)} 个chunks")
        
        # 2. Map阶段:并行处理
        summaries = []
        with ThreadPoolExecutor(max_workers=5) as executor:
            futures = [
                executor.submit(self.map_summarize, chunk, i+1) 
                for i, chunk in enumerate(chunks)
            ]
            
            for future in as_completed(futures):
                summaries.append(future.result())
        
        # 3. Reduce阶段:合并总结
        final_summary = self.reduce_summarize(summaries)
        
        # 4. 如果有具体问题,基于总结回答
        if question:
            answer_prompt = f"""基于以下文档总结回答问题:

文档总结:
{final_summary}

问题:{question}

回答:"""
            return self.llm.invoke(answer_prompt)
        
        return final_summary


# 优化版:分层 Map-Reduce(适合超长文档)
class HierarchicalMapReduce(MapReduceRAG):
    """多层 Map-Reduce,适合百万级 token 文档"""
    
    def hierarchical_query(self, document: str, levels: int = 2) -> str:
        """多层压缩"""
        current_doc = document
        
        for level in range(levels):
            print(f"Level {level+1} 压缩...")
            chunks = self.split_document(current_doc)
            
            # Map
            summaries = []
            for i, chunk in enumerate(chunks):
                summary = self.map_summarize(chunk, i+1)
                summaries.append(summary)
            
            # Reduce(本层输出作为下一层输入)
            current_doc = self.reduce_summarize(summaries)
        
        return current_doc


# 使用示例
# 场景1:长文档总结
mapreduce = MapReduceRAG(llm)
document = open("annual_report_100pages.txt").read()

# 总结整份报告
summary = mapreduce.query(document)
print(summary)

# 基于长文档回答问题
answer = mapreduce.query(
    document, 
    question="2023年营收增长多少?"
)
print(answer)

# 场景2:超长文档(百万token)
hierarchical = HierarchicalMapReduce(llm)
final_summary = hierarchical.hierarchical_query(document, levels=3)

7.4 Map-Reduce 的变体

变体

说明

适用场景

标准 Map-Reduce

独立总结每个块,再合并

总结任务

逐级压缩

多次 Map-Reduce,逐步压缩

超长文档

带问题的 Map-Reduce

每个块只提取与问题相关的内容

问答任务

增量 Map-Reduce

流式处理,边读边压缩

实时处理


八、方案五:上下文压缩(LLMLingua)

8.1 方案概述

核心思想:使用小型模型对 Prompt 进行压缩,删除不重要的 tokens,保留关键信息。

适用场景:高吞吐场景、成本敏感场景、移动端部署。

8.2 技术原理

# 原始 Prompt
prompt = """
请根据以下资料回答问题:
资料1: 公司的营收达到100亿元...
资料2: 公司主要产品包括...
... (共5000 tokens)
问题:公司营收是多少?
"""

# 压缩后
compressed = """
资料: 营收100亿元...
问题:公司营收?
"""  # 压缩到 500 tokens

8.3 使用 LLMLingua 实现

# 安装
# pip install llmlingua

from llmlingua import PromptCompressor

class LLMLinguaCompressor:
    def __init__(self):
        # 初始化压缩器(使用小模型)
        self.compressor = PromptCompressor(
            model_name="microsoft/llmlingua-2-xlm-roberta-large-meetingbank",
            device_map="cpu"
        )
    
    def compress_prompt(self, prompt: str, target_ratio: float = 0.5) -> str:
        """
        压缩 Prompt
        
        Args:
            prompt: 原始 prompt
            target_ratio: 目标压缩率(0.5 = 压缩到50%)
        """
        compressed = self.compressor.compress_prompt(
            prompt,
            rate=target_ratio,
            force_tokens=['\n', '?', '。', '问题']  # 强制保留的标记
        )
        return compressed['compressed_prompt']
    
    def compress_context(self, context: str, question: str) -> str:
        """压缩检索到的上下文"""
        # 构建包含问题和上下文的 prompt
        full_prompt = f"""问题:{question}

参考资料:
{context}

请基于参考资料回答问题。"""
        
        # 压缩
        compressed = self.compress_prompt(full_prompt, target_ratio=0.3)
        return compressed


# 集成到 RAG
class RAGWithLLMLingua:
    def __init__(self, llm, vector_store):
        self.llm = llm
        self.vector_store = vector_store
        self.compressor = LLMLinguaCompressor()
    
    def query(self, question: str, compress_ratio: float = 0.5):
        # 1. 检索
        docs = self.vector_store.similarity_search(question, k=20)
        context = "\n\n".join([doc.page_content for doc in docs])
        
        # 2. 压缩上下文
        compressed_context = self.compressor.compress_context(context, question)
        
        # 3. 构建最终 prompt
        prompt = f"""问题:{question}

参考资料:
{compressed_context}

请回答问题:"""
        
        return self.llm.invoke(prompt)

8.4 LLMLingua 的压缩效果

原始 Token

压缩后

压缩率

信息保留率

4000

800

20%

85-90%

8000

1600

20%

80-85%

20000

4000

20%

75-80%

8.5 注意事项

# LLMLingua 的局限性
limitations = {
    "中文支持": "中文模型不如英文成熟",
    "压缩损失": "可能丢失关键事实(尤其是数字)",
    "额外开销": "需要加载小模型(~500MB)",
    "调试困难": "压缩后的文本不易读"
}

# 使用建议
recommendations = {
    "保留关键词": "设置 force_tokens 保留重要标记",
    "验证准确率": "压缩后需验证答案质量",
    "缓存结果": "相同 context 可缓存压缩结果",
    "A/B测试": "小流量测试压缩效果"
}

九、方案选型指南

9.1 决策树

9.2 场景与方案匹配表

场景

推荐方案

理由

配置示例

客服问答

重排序+截断

延迟敏感,问题简单

Top-5,窗口4K

法律合同分析

多级索引

需要完整上下文

父文档,窗口32K

智能助手对话

对话记忆分层

多轮,需要记忆

窗口3K,摘要500

财报分析

Map-Reduce

超长文档,分析性强

每块3K,3层压缩

移动端 RAG

LLMLingua

成本敏感,带宽有限

压缩率30%

9.3 混合方案(生产环境推荐)

class ProductionRAG:
    """生产级 RAG,组合多种方案"""
    
    def __init__(self):
        self.rerank = RAGWithReranking(...)
        self.hierarchy = RAGWithHierarchy(...)
        self.memory = LayeredChatMemory(...)
        self.compressor = LLMLinguaCompressor()
    
    def query(self, question, conversation_history=None):
        # 1. 判断场景
        if conversation_history:
            # 对话场景:使用记忆分层
            context = self.memory.get_context()
            docs = []
        else:
            # 知识库问答
            docs = self.hierarchy.retrieve_with_context(question)
            # 重排序
            docs = self.rerank.rerank_documents(question, docs)
        
        # 2. 检查 token
        total_tokens = self.count_tokens(docs)
        
        if total_tokens > 5000:
            # 超过阈值,压缩
            docs = self.compressor.compress_docs(docs)
        
        # 3. 构建最终答案
        return self.llm.invoke(self.build_prompt(question, docs))

总结

核心要点

  1. 上下文窗口是资源:不是越大越好,需要精细管理

  2. 预防优于治疗:通过多级索引、路由从源头控制

  3. 分层是王道:对话场景用三层记忆,文档场景用父子索引

  4. 压缩需谨慎:可能损失信息,需要验证效果

最佳实践

  • 优先使用重排序+截断:80%场景够用

  • 对话场景必做记忆分层:不能简单丢弃

  • 超长文档用 Map-Reduce:避免单次爆窗

  • 监控 token 消耗:设置告警

  • A/B测试压缩效果:验证准确率

避坑指南

  • ❌ 不要简单丢弃早期对话

  • ❌ 不要无脑取 Top-20

  • ❌ 不要忽视中文压缩效果

  • ❌ 不要在生产环境用未测试的压缩


本文档持续更新,欢迎提 PR 补充案例和优化建议。

动物装饰