目录
什么是上下文窗口
上下文窗口为何会爆掉
主流解决方案全景图
方案一:重排序 + 动态截断
方案二:多级索引(父子文档)
方案三:对话记忆分层(滑动窗口+摘要)
方案四:Map-Reduce 模式
方案五:上下文压缩(LLMLingua)
方案选型指南
一、什么是上下文窗口
1.1 基本概念
上下文窗口(Context Window) 是大语言模型(LLM)在一次推理中可以处理的 最大 Token 数量。
# 简单理解
上下文窗口 = 输入Token数(你的问题 + 检索内容) + 输出Token数(模型回答)1.2 Token 是什么
Token 是模型处理文本的最小单位:
示例:
# "RAG技术很好" 的 Token 化
中文: ["RAG", "技术", "很好"] # 约 3-4 Tokens
英文: "RAG technology is good" # 约 5-6 Tokens1.3 常见模型的上下文窗口
二、上下文窗口为何会爆掉
2.1 问题示意
# 一个典型 RAG 流程的 Token 消耗
system_prompt = "你是一个AI助手..." # 500 tokens
user_query = "请总结这份财报..." # 50 tokens
retrieved_docs = """
文档块1: 财报第1-5页... # 2000 tokens
文档块2: 财报第6-10页.. # 2000 tokens
...
文档块20: 财报第95-100页 # 2000 tokens
""" # 总计 40000 tokens
total = 500 + 50 + 40000 = 40550 tokens
# 如果模型窗口是 8K,直接爆掉!2.2 爆掉的主要原因
原因 1:召回数量过多
# 常见错误:为了提高召回率,取太多文档块
retrieved_chunks = vector_store.similarity_search(query, k=50) # 50个块
# 每个块 500 tokens,总输入 = 25000 tokens原因 2:文档切块过细
# 过度切分导致大量重复上下文
chunk_size = 200 # 太小
# 100页文档 → 1500个块 → 检索50个块 → 信息碎片化严重原因 3:对话历史累积
# 多轮对话,历史消息不断累积
messages = [
{"role": "user", "content": "第一轮问题..."}, # 500 tokens
{"role": "assistant", "content": "第一轮回答..."}, # 800 tokens
{"role": "user", "content": "第二轮问题..."}, # 300 tokens
{"role": "assistant", "content": "第二轮回答..."}, # 600 tokens
# ... 50轮后,总 token 可能超过 10万
]原因 4:文档冗余
# 多个检索块包含重复内容
chunk_1 = "公司的营收为100亿,利润20亿..." # 500 tokens
chunk_2 = "本季度公司营收100亿,同比增长..." # 500 tokens
# 两块都包含"营收100亿",浪费窗口空间2.3 爆掉的后果
三、主流解决方案全景图
3.1 解决方案分类
上下文窗口管理
├── 检索前优化(预防)
│ ├── 多级索引
│ └── 查询路由
├── 检索中优化(控制)
│ └── 动态检索策略
├── 检索后压缩(处理)
│ ├── 重排序+截断 ⭐ 最主流
│ ├── 上下文压缩
│ └── 文档重组
└── 架构级方案
├── Map-Reduce
└── 对话记忆分层3.2 各方案对比速览
四、方案一:重排序 + 动态截断
4.1 方案概述
核心思想:先用高召回算法(向量检索)取足够多的候选,再用精确模型(交叉编码器)重排序,只取最相关的 Top-K 进入上下文。
为什么是主流:简单、高效、成本可控,在 80% 的 RAG 场景中够用。
4.2 技术原理
传统做法:
用户问题 → 向量检索 → 取 Top-20 → 全部塞入 LLM
↓
问题:20个块中可能只有5个有用,浪费窗口
重排序做法:
用户问题 → 向量检索 → 取 Top-50 → 重排序 → 取 Top-5 → 塞入 LLM
↓ ↓
高召回率 精排序,去冗余关键技术:
1. 向量检索(Embedding)
# 快速但粗糙
query_embedding = embed_model.encode(query)
candidates = vector_db.search(query_embedding, k=50) # 高召回2. 交叉编码器(Cross-Encoder)
# 慢速但精确,联合编码 query 和 document
def rerank(query, documents):
scores = []
for doc in documents:
# 将 query 和 document 一起编码
input = f"[CLS] {query} [SEP] {doc} [SEP]"
score = cross_encoder(input) # 输出相关性分数
scores.append(score)
return sorted(documents, key=scores, reverse=True)4.3 实现流程

4.4 代码实现
import tiktoken
from sentence_transformers import CrossEncoder
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
class RAGWithReranking:
def __init__(self, llm, vector_store, max_tokens=8000):
self.llm = llm
self.vector_store = vector_store
self.max_tokens = max_tokens
self.tokenizer = tiktoken.get_encoding("cl100k_base")
# 加载重排序模型(开源)
self.reranker = CrossEncoder('BAAI/bge-reranker-base')
def count_tokens(self, text):
return len(self.tokenizer.encode(text))
def rerank_documents(self, query, documents, top_k=5):
"""重排序"""
pairs = [[query, doc.page_content] for doc in documents]
scores = self.reranker.predict(pairs)
# 按分数排序
sorted_docs = sorted(
zip(documents, scores),
key=lambda x: x[1],
reverse=True
)
return [doc for doc, score in sorted_docs[:top_k]]
def truncate_by_token(self, documents, max_tokens):
"""Token 截断"""
truncated_docs = []
current_tokens = 0
for doc in documents:
doc_tokens = self.count_tokens(doc.page_content)
if current_tokens + doc_tokens <= max_tokens:
truncated_docs.append(doc)
current_tokens += doc_tokens
else:
# 截断最后一个文档
remaining = max_tokens - current_tokens
if remaining > 100: # 至少保留100 tokens
truncated_content = doc.page_content[:remaining * 4]
doc.page_content = truncated_content
truncated_docs.append(doc)
break
return truncated_docs
def query(self, question):
# 1. 高召回检索
candidates = self.vector_store.similarity_search(question, k=50)
# 2. 重排序,取 Top-5
reranked_docs = self.rerank_documents(question, candidates, top_k=5)
# 3. 计算剩余窗口(预留1000 tokens给 system prompt 和回答)
available_tokens = self.max_tokens - 1000
# 4. Token 截断
final_docs = self.truncate_by_token(reranked_docs, available_tokens)
# 5. 构建 Prompt
context = "\n\n".join([doc.page_content for doc in final_docs])
prompt = f"""基于以下参考资料回答问题:
参考资料:
{context}
问题:{question}
请给出准确、简洁的回答。"""
# 6. 调用 LLM
response = self.llm.invoke(prompt)
return response
# 使用示例
from langchain.chat_models import ChatOpenAI
llm = ChatOpenAI(model="gpt-3.5-turbo", max_tokens=1000)
vector_store = Chroma(...) # 你的向量数据库
rag = RAGWithReranking(llm, vector_store, max_tokens=4000)
answer = rag.query("2023年公司营收是多少?")
print(answer)4.5 效果评估
五、方案二:多级索引(父子文档)
5.1 方案概述
核心思想:构建层次化的文档结构,小粒度检索、大粒度提供上下文,避免信息碎片化。
适用场景:长文档问答、技术文档、法律合同等需要完整上下文的场景。
5.2 技术原理
# 传统切块:信息可能被切断
文档原文:
"公司营收达到100亿。主要来源于...(中间内容)...利润20亿。"
切块1: "公司营收达到100亿。" # 只有营收,没有利润
切块2: "利润20亿。" # 只有利润,没有营收
# 父子文档:保留结构关系
父文档(段落级):
"公司营收达到100亿。主要来源于...利润20亿。" # 完整段落
子文档(句子级):
"公司营收达到100亿。" # 指向父文档1
"利润20亿。" # 指向父文档15.3 索引结构设计
python
{
"chunk_id": "chunk_001",
"content": "公司的营收达到100亿...", # 子文档内容
"type": "child",
"parent_id": "parent_001", # 指向父文档
"metadata": {...}
}
{
"chunk_id": "parent_001",
"content": "完整的段落内容...", # 父文档内容
"type": "parent",
"child_ids": ["chunk_001", "chunk_002"],
"metadata": {...}
}5.4 实现流程

5.5 代码实现
from typing import List, Dict
import hashlib
class HierarchicalIndex:
def __init__(self, vector_store, doc_store):
self.vector_store = vector_store
self.doc_store = doc_store # 存储完整文档
def create_hierarchy(self, document: str, chunk_size: int = 1000):
"""创建父子文档索引"""
# 1. 按段落分父文档
paragraphs = document.split('\n\n')
parent_chunks = []
for para in paragraphs:
if len(para) < 100:
continue
# 父文档ID
parent_id = hashlib.md5(para.encode()).hexdigest()
# 存储父文档
parent_chunk = {
"id": parent_id,
"content": para,
"type": "parent"
}
self.doc_store[parent_id] = parent_chunk
# 2. 将父文档切分成子文档
sentences = self.split_sentences(para)
child_chunks = []
for sent in sentences:
child_id = hashlib.md5(sent.encode()).hexdigest()
child_chunk = {
"id": child_id,
"content": sent,
"type": "child",
"parent_id": parent_id
}
# 子文档存入向量库
self.vector_store.add_texts(
[sent],
metadatas=[{"parent_id": parent_id, "chunk_id": child_id}]
)
child_chunks.append(child_chunk)
parent_chunk["child_ids"] = [c["id"] for c in child_chunks]
return parent_chunks
def split_sentences(self, text: str) -> List[str]:
"""简单分句(生产环境建议用 spacy/nltk)"""
import re
sentences = re.split(r'[。!?.!?]', text)
return [s.strip() for s in sentences if s.strip()]
def retrieve_with_context(self, query: str, strategy: str = "parent"):
"""根据策略检索"""
# 1. 检索子文档
child_results = self.vector_store.similarity_search(query, k=5)
if strategy == "child":
# 直接返回子文档内容
return child_results
elif strategy == "parent":
# 返回对应的父文档(去重)
parent_ids = set()
for doc in child_results:
parent_id = doc.metadata.get("parent_id")
if parent_id:
parent_ids.add(parent_id)
parents = [self.doc_store[pid] for pid in parent_ids]
return parents
elif strategy == "expansion":
# 返回父文档 + 相邻文档
parents = []
for doc in child_results:
parent_id = doc.metadata.get("parent_id")
if parent_id:
parent = self.doc_store[parent_id]
parents.append(parent)
# 获取相邻段落(需要额外实现)
neighbors = self.get_neighbors(parent_id)
parents.extend(neighbors)
# 去重
unique_parents = {p["id"]: p for p in parents}.values()
return list(unique_parents)
def get_neighbors(self, parent_id: str) -> List[Dict]:
"""获取相邻的父文档"""
# 需要维护文档顺序,简化实现略
pass
# 使用示例
class RAGWithHierarchy:
def __init__(self, llm, vector_store, doc_store):
self.llm = llm
self.hierarchy = HierarchicalIndex(vector_store, doc_store)
def query(self, question, strategy="parent"):
# 根据问题类型选择策略
if "是什么" in question or "多少" in question:
strategy = "child" # 事实性问题,用子文档
elif "为什么" in question or "总结" in question:
strategy = "parent" # 理解性问题,用父文档
# 检索
contexts = self.hierarchy.retrieve_with_context(question, strategy)
# 构建 prompt
context_text = "\n\n".join([c["content"] for c in contexts])
prompt = f"""参考以下内容回答问题:
{context_text}
问题:{question}
回答:"""
return self.llm.invoke(prompt)5.6 策略选择指南
六、方案三:对话记忆分层(滑动窗口+摘要)
6.1 方案概述
核心思想:将对话记忆分为三层,保留最近对话的完整细节,将早期对话压缩成摘要,关键信息永久保存。
适用场景:多轮对话、AI助手、客服系统。
6.2 三层记忆架构
┌─────────────────────────────────────────────────┐
│ 当前上下文窗口 │
├─────────────────────────────────────────────────┤
│ 持久层(全局摘要) │
│ "用户是程序员,偏好Python,正在开发RAG项目..." │
├─────────────────────────────────────────────────┤
│ 摘要层(滚动摘要) │
│ "第1-3轮讨论了架构设计,第4-6轮确定了技术选型..." │
├─────────────────────────────────────────────────┤
│ 窗口层(最近N轮完整对话) │
│ User: "那用什么向量数据库?" │
│ Assistant: "建议用Milvus或Chroma..." │
└─────────────────────────────────────────────────┘6.3 实现原理
核心机制:
滑动窗口:保留最近 K 轮完整对话
滚动摘要:当窗口满时,将最早的一批对话压缩成摘要
全局记忆:从摘要中提取关键信息持久化
6.4 代码实现
python
from typing import List, Dict
from datetime import datetime
import tiktoken
class LayeredChatMemory:
def __init__(self, llm, max_window_tokens=3000, max_summary_tokens=500):
self.llm = llm
self.max_window_tokens = max_window_tokens # 窗口层上限
self.max_summary_tokens = max_summary_tokens # 摘要层上限
self.window_messages = [] # 最近N轮完整对话
self.rolling_summary = "" # 滚动摘要
self.global_memory = {
"user_profile": {}, # 用户画像
"key_facts": [], # 关键事实
"preferences": [], # 偏好
"unfinished_tasks": [] # 未完成任务
}
self.tokenizer = tiktoken.get_encoding("cl100k_base")
def count_tokens(self, text):
return len(self.tokenizer.encode(text))
def add_message(self, role: str, content: str):
"""添加新消息"""
message = {
"role": role,
"content": content,
"timestamp": datetime.now()
}
self.window_messages.append(message)
# 检查是否需要压缩
self._check_and_compress()
def _check_and_compress(self):
"""检查窗口是否超限"""
current_tokens = self._count_window_tokens()
if current_tokens > self.max_window_tokens:
self._compress_old_messages()
def _count_window_tokens(self):
"""统计窗口总token"""
total = 0
for msg in self.window_messages:
total += self.count_tokens(msg["content"])
return total
def _compress_old_messages(self):
"""压缩最早的对话"""
# 找出要压缩的消息(窗口的一半)
compress_count = len(self.window_messages) // 3
if compress_count < 2:
compress_count = 2
messages_to_compress = self.window_messages[:compress_count]
self.window_messages = self.window_messages[compress_count:]
# 生成新的摘要
new_summary = self._generate_summary(messages_to_compress)
# 更新滚动摘要
if self.rolling_summary:
self.rolling_summary = self._merge_summaries(
self.rolling_summary, new_summary
)
else:
self.rolling_summary = new_summary
# 提取关键信息到全局记忆
self._extract_to_global_memory(messages_to_compress)
def _generate_summary(self, messages: List[Dict]) -> str:
"""生成对话摘要"""
# 将消息格式化成文本
conversation = "\n".join([
f"{m['role']}: {m['content']}"
for m in messages
])
prompt = f"""请总结以下对话,保留关键信息:
- 用户的身份和偏好
- 重要的事实和决策
- 未完成的任务
对话:
{conversation}
摘要:"""
summary = self.llm.invoke(prompt)
return summary
def _merge_summaries(self, old_summary: str, new_summary: str) -> str:
"""合并新旧摘要"""
prompt = f"""将以下两个摘要合并成一个简洁的摘要:
摘要1(较旧的对话):
{old_summary}
摘要2(较新的对话):
{new_summary}
合并后的摘要:"""
merged = self.llm.invoke(prompt)
# Token限制
if self.count_tokens(merged) > self.max_summary_tokens:
# 进一步压缩
merged = self._compress_summary(merged)
return merged
def _compress_summary(self, summary: str) -> str:
"""压缩摘要到指定token数"""
prompt = f"""将以下摘要压缩成更简洁的形式,保留最关键信息:
{summary}
简洁版本:"""
return self.llm.invoke(prompt)
def _extract_to_global_memory(self, messages: List[Dict]):
"""提取关键信息到全局记忆"""
conversation = "\n".join([
f"{m['role']}: {m['content']}"
for m in messages
])
prompt = f"""从以下对话中提取关键信息,按JSON格式输出:
对话:
{conversation}
输出格式:
{{
"user_profile": {{"职业": "", "技能": "", ...}},
"key_facts": ["事实1", "事实2"],
"preferences": ["偏好1", "偏好2"],
"unfinished_tasks": ["任务1", "任务2"]
}}
只输出JSON:"""
import json
try:
extracted = self.llm.invoke(prompt)
new_info = json.loads(extracted)
# 合并到全局记忆
for key in new_info:
if key in self.global_memory:
if isinstance(self.global_memory[key], list):
self.global_memory[key].extend(new_info[key])
# 去重
self.global_memory[key] = list(set(self.global_memory[key]))
elif isinstance(self.global_memory[key], dict):
self.global_memory[key].update(new_info[key])
except:
pass
def get_context(self) -> str:
"""获取当前完整上下文"""
context_parts = []
# 1. 全局记忆
if self.global_memory["user_profile"] or self.global_memory["key_facts"]:
context_parts.append("【用户档案】")
context_parts.append(str(self.global_memory["user_profile"]))
context_parts.append("【关键事实】")
context_parts.append("\n".join(self.global_memory["key_facts"]))
# 2. 滚动摘要
if self.rolling_summary:
context_parts.append("【历史对话摘要】")
context_parts.append(self.rolling_summary)
# 3. 最近对话窗口
if self.window_messages:
context_parts.append("【最近对话】")
for msg in self.window_messages[-5:]: # 保留最近5轮完整对话
context_parts.append(f"{msg['role']}: {msg['content']}")
return "\n".join(context_parts)
def clear(self):
"""清空记忆"""
self.window_messages = []
self.rolling_summary = ""
self.global_memory = {
"user_profile": {},
"key_facts": [],
"preferences": [],
"unfinished_tasks": []
}
# 使用示例
class ChatBot:
def __init__(self, llm):
self.llm = llm
self.memory = LayeredChatMemory(llm)
def chat(self, user_input):
# 添加用户消息
self.memory.add_message("user", user_input)
# 获取上下文
context = self.memory.get_context()
# 构建 prompt
prompt = f"""你是一个AI助手。以下是对话背景:
{context}
请根据以上信息回答用户的问题。
当前用户:{user_input}
助手:"""
# 生成回答
response = self.llm.invoke(prompt)
# 添加助手消息
self.memory.add_message("assistant", response)
return response
# 运行示例
bot = ChatBot(llm)
# 多轮对话
bot.chat("我是程序员,擅长Python")
bot.chat("我在做一个RAG项目")
bot.chat("用什么向量数据库好?")
bot.chat("Chroma和Milvus哪个更适合生产环境?")
# 第5轮时,早期对话会被自动压缩6.5 压缩时机与策略
class CompressionStrategy:
"""压缩策略配置"""
# 策略1:基于Token数量
token_threshold = 3000 # 超过3000 tokens触发压缩
compress_ratio = 0.3 # 压缩30%的最旧消息
# 策略2:基于消息数量
message_threshold = 15 # 超过15轮触发压缩
keep_recent = 5 # 保留最近5轮
# 策略3:基于时间(适合长对话)
time_threshold = 3600 # 1小时前的对话压缩
keep_recent_time = 300 # 保留最近5分钟
# 策略4:混合策略(推荐)
def should_compress(self, messages, tokens):
if tokens > 3000:
return True
if len(messages) > 15:
return True
return False七、方案四:Map-Reduce 模式
7.1 方案概述
核心思想:将超长文档分批处理,先分别总结(Map),再合并总结(Reduce),避免一次性加载全部内容。
适用场景:超长文档总结、财报分析、长篇小说理解。
7.2 技术原理
传统做法:
100页财报 → 全部加载 → LLM总结 → 可能爆窗
Map-Reduce做法:
100页财报 → 分成10批(每批10页)
↓
Map阶段:并行处理
批次1 → 总结1
批次2 → 总结2
...
批次10 → 总结10
↓
Reduce阶段:
10个总结 → 合并 → 最终总结7.3 代码实现
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict
class MapReduceRAG:
def __init__(self, llm, max_chunk_tokens=3000):
self.llm = llm
self.max_chunk_tokens = max_chunk_tokens
def split_document(self, document: str) -> List[str]:
"""切分文档(按token)"""
chunks = []
current_chunk = []
current_tokens = 0
for sentence in self.split_sentences(document):
sentence_tokens = self.count_tokens(sentence)
if current_tokens + sentence_tokens > self.max_chunk_tokens:
chunks.append(" ".join(current_chunk))
current_chunk = [sentence]
current_tokens = sentence_tokens
else:
current_chunk.append(sentence)
current_tokens += sentence_tokens
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
def split_sentences(self, text: str) -> List[str]:
"""简单分句"""
import re
sentences = re.split(r'[。!?.!?]', text)
return [s.strip() for s in sentences if s.strip()]
def count_tokens(self, text):
"""简单token计数(生产环境用tiktoken)"""
return len(text) // 4 # 粗略估计
def map_summarize(self, chunk: str, idx: int) -> Dict:
"""Map阶段:对单个chunk进行总结"""
prompt = f"""请总结以下内容,提取关键信息(事实、数据、结论):
第{idx}部分:
{chunk}
总结:"""
summary = self.llm.invoke(prompt)
return {
"index": idx,
"summary": summary,
"length": len(chunk)
}
def reduce_summarize(self, summaries: List[Dict]) -> str:
"""Reduce阶段:合并所有总结"""
# 按顺序排列
summaries_sorted = sorted(summaries, key=lambda x: x["index"])
# 合并总结文本
combined = "\n\n".join([
f"## 第{s['index']}部分\n{s['summary']}"
for s in summaries_sorted
])
prompt = f"""以下是对同一份文档不同部分的总结,请将它们整合成一份完整的、连贯的总结:
{combined}
要求:
1. 去除重复信息
2. 按逻辑顺序组织
3. 保留所有关键事实和数据
4. 给出最终总结
最终总结:"""
final_summary = self.llm.invoke(prompt)
return final_summary
def query(self, document: str, question: str = None) -> str:
"""执行 Map-Reduce"""
# 1. 切分文档
chunks = self.split_document(document)
print(f"文档切分为 {len(chunks)} 个chunks")
# 2. Map阶段:并行处理
summaries = []
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(self.map_summarize, chunk, i+1)
for i, chunk in enumerate(chunks)
]
for future in as_completed(futures):
summaries.append(future.result())
# 3. Reduce阶段:合并总结
final_summary = self.reduce_summarize(summaries)
# 4. 如果有具体问题,基于总结回答
if question:
answer_prompt = f"""基于以下文档总结回答问题:
文档总结:
{final_summary}
问题:{question}
回答:"""
return self.llm.invoke(answer_prompt)
return final_summary
# 优化版:分层 Map-Reduce(适合超长文档)
class HierarchicalMapReduce(MapReduceRAG):
"""多层 Map-Reduce,适合百万级 token 文档"""
def hierarchical_query(self, document: str, levels: int = 2) -> str:
"""多层压缩"""
current_doc = document
for level in range(levels):
print(f"Level {level+1} 压缩...")
chunks = self.split_document(current_doc)
# Map
summaries = []
for i, chunk in enumerate(chunks):
summary = self.map_summarize(chunk, i+1)
summaries.append(summary)
# Reduce(本层输出作为下一层输入)
current_doc = self.reduce_summarize(summaries)
return current_doc
# 使用示例
# 场景1:长文档总结
mapreduce = MapReduceRAG(llm)
document = open("annual_report_100pages.txt").read()
# 总结整份报告
summary = mapreduce.query(document)
print(summary)
# 基于长文档回答问题
answer = mapreduce.query(
document,
question="2023年营收增长多少?"
)
print(answer)
# 场景2:超长文档(百万token)
hierarchical = HierarchicalMapReduce(llm)
final_summary = hierarchical.hierarchical_query(document, levels=3)7.4 Map-Reduce 的变体
八、方案五:上下文压缩(LLMLingua)
8.1 方案概述
核心思想:使用小型模型对 Prompt 进行压缩,删除不重要的 tokens,保留关键信息。
适用场景:高吞吐场景、成本敏感场景、移动端部署。
8.2 技术原理
# 原始 Prompt
prompt = """
请根据以下资料回答问题:
资料1: 公司的营收达到100亿元...
资料2: 公司主要产品包括...
... (共5000 tokens)
问题:公司营收是多少?
"""
# 压缩后
compressed = """
资料: 营收100亿元...
问题:公司营收?
""" # 压缩到 500 tokens8.3 使用 LLMLingua 实现
# 安装
# pip install llmlingua
from llmlingua import PromptCompressor
class LLMLinguaCompressor:
def __init__(self):
# 初始化压缩器(使用小模型)
self.compressor = PromptCompressor(
model_name="microsoft/llmlingua-2-xlm-roberta-large-meetingbank",
device_map="cpu"
)
def compress_prompt(self, prompt: str, target_ratio: float = 0.5) -> str:
"""
压缩 Prompt
Args:
prompt: 原始 prompt
target_ratio: 目标压缩率(0.5 = 压缩到50%)
"""
compressed = self.compressor.compress_prompt(
prompt,
rate=target_ratio,
force_tokens=['\n', '?', '。', '问题'] # 强制保留的标记
)
return compressed['compressed_prompt']
def compress_context(self, context: str, question: str) -> str:
"""压缩检索到的上下文"""
# 构建包含问题和上下文的 prompt
full_prompt = f"""问题:{question}
参考资料:
{context}
请基于参考资料回答问题。"""
# 压缩
compressed = self.compress_prompt(full_prompt, target_ratio=0.3)
return compressed
# 集成到 RAG
class RAGWithLLMLingua:
def __init__(self, llm, vector_store):
self.llm = llm
self.vector_store = vector_store
self.compressor = LLMLinguaCompressor()
def query(self, question: str, compress_ratio: float = 0.5):
# 1. 检索
docs = self.vector_store.similarity_search(question, k=20)
context = "\n\n".join([doc.page_content for doc in docs])
# 2. 压缩上下文
compressed_context = self.compressor.compress_context(context, question)
# 3. 构建最终 prompt
prompt = f"""问题:{question}
参考资料:
{compressed_context}
请回答问题:"""
return self.llm.invoke(prompt)8.4 LLMLingua 的压缩效果
8.5 注意事项
# LLMLingua 的局限性
limitations = {
"中文支持": "中文模型不如英文成熟",
"压缩损失": "可能丢失关键事实(尤其是数字)",
"额外开销": "需要加载小模型(~500MB)",
"调试困难": "压缩后的文本不易读"
}
# 使用建议
recommendations = {
"保留关键词": "设置 force_tokens 保留重要标记",
"验证准确率": "压缩后需验证答案质量",
"缓存结果": "相同 context 可缓存压缩结果",
"A/B测试": "小流量测试压缩效果"
}九、方案选型指南
9.1 决策树

9.2 场景与方案匹配表
9.3 混合方案(生产环境推荐)
class ProductionRAG:
"""生产级 RAG,组合多种方案"""
def __init__(self):
self.rerank = RAGWithReranking(...)
self.hierarchy = RAGWithHierarchy(...)
self.memory = LayeredChatMemory(...)
self.compressor = LLMLinguaCompressor()
def query(self, question, conversation_history=None):
# 1. 判断场景
if conversation_history:
# 对话场景:使用记忆分层
context = self.memory.get_context()
docs = []
else:
# 知识库问答
docs = self.hierarchy.retrieve_with_context(question)
# 重排序
docs = self.rerank.rerank_documents(question, docs)
# 2. 检查 token
total_tokens = self.count_tokens(docs)
if total_tokens > 5000:
# 超过阈值,压缩
docs = self.compressor.compress_docs(docs)
# 3. 构建最终答案
return self.llm.invoke(self.build_prompt(question, docs))总结
核心要点
上下文窗口是资源:不是越大越好,需要精细管理
预防优于治疗:通过多级索引、路由从源头控制
分层是王道:对话场景用三层记忆,文档场景用父子索引
压缩需谨慎:可能损失信息,需要验证效果
最佳实践
✅ 优先使用重排序+截断:80%场景够用
✅ 对话场景必做记忆分层:不能简单丢弃
✅ 超长文档用 Map-Reduce:避免单次爆窗
✅ 监控 token 消耗:设置告警
✅ A/B测试压缩效果:验证准确率
避坑指南
❌ 不要简单丢弃早期对话
❌ 不要无脑取 Top-20
❌ 不要忽视中文压缩效果
❌ 不要在生产环境用未测试的压缩
本文档持续更新,欢迎提 PR 补充案例和优化建议。