013RAG上下文工程-如何预防上下文窗口爆掉?即如何处理短期记忆

alex
2
2026-04-01

目录

  1. 什么是上下文窗口

  2. 上下文窗口为何会爆掉

  3. 主流解决方案全景图

  4. 方案一:重排序 + 动态截断

  5. 方案二:多级索引(父子文档)

  6. 方案三:对话记忆分层(滑动窗口+摘要)

  7. 方案四:Map-Reduce 模式

  8. 方案五:上下文压缩(LLMLingua)

  9. 方案选型指南


一、什么是上下文窗口

1.1 基本概念

上下文窗口(Context Window) 是大语言模型(LLM)在一次推理中可以处理的 最大 Token 数量

# 简单理解
上下文窗口 = 输入Token数(你的问题 + 检索内容) + 输出Token数(模型回答)

1.2 Token 是什么

Token 是模型处理文本的最小单位:

语言

1 Token 约等于

英文

0.75 个单词(4 个字符)

中文

0.5-1 个汉字

代码

1-2 个字符

示例

# "RAG技术很好" 的 Token 化
中文: ["RAG", "技术", "很好"]  # 约 3-4 Tokens
英文: "RAG technology is good"  # 约 5-6 Tokens

1.3 常见模型的上下文窗口

模型

窗口大小

约等于

GPT-3.5

4K-16K

3000-12000 字

GPT-4

8K-32K

6000-24000 字

GPT-4 Turbo

128K

约 10 万字

Claude 3

200K

约 15 万字

Gemini 1.5 Pro

1M-2M

约 75-150 万字

开源模型(Llama 3)

8K-128K

因版本而异


二、上下文窗口为何会爆掉

2.1 问题示意

# 一个典型 RAG 流程的 Token 消耗
system_prompt = "你是一个AI助手..."  # 500 tokens
user_query = "请总结这份财报..."     # 50 tokens
retrieved_docs = """
文档块1: 财报第1-5页...   # 2000 tokens
文档块2: 财报第6-10页..   # 2000 tokens
...
文档块20: 财报第95-100页  # 2000 tokens
"""  # 总计 40000 tokens

total = 500 + 50 + 40000 = 40550 tokens
# 如果模型窗口是 8K,直接爆掉!

2.2 爆掉的主要原因

原因 1:召回数量过多

# 常见错误:为了提高召回率,取太多文档块
retrieved_chunks = vector_store.similarity_search(query, k=50)  # 50个块
# 每个块 500 tokens,总输入 = 25000 tokens

原因 2:文档切块过细

# 过度切分导致大量重复上下文
chunk_size = 200  # 太小
# 100页文档 → 1500个块 → 检索50个块 → 信息碎片化严重

原因 3:对话历史累积

# 多轮对话,历史消息不断累积
messages = [
    {"role": "user", "content": "第一轮问题..."},  # 500 tokens
    {"role": "assistant", "content": "第一轮回答..."},  # 800 tokens
    {"role": "user", "content": "第二轮问题..."},  # 300 tokens
    {"role": "assistant", "content": "第二轮回答..."},  # 600 tokens
    # ... 50轮后,总 token 可能超过 10万
]

原因 4:文档冗余

# 多个检索块包含重复内容
chunk_1 = "公司的营收为100亿,利润20亿..."  # 500 tokens
chunk_2 = "本季度公司营收100亿,同比增长..."  # 500 tokens
# 两块都包含"营收100亿",浪费窗口空间

2.3 爆掉的后果

后果

说明

请求失败

直接报错:max_tokens exceeded

截断损失

部分输入被截断,信息丢失

成本浪费

大量 token 用于无用信息

性能下降

长上下文导致推理速度变慢

中间迷失

重要信息在长上下文中被忽略


三、主流解决方案全景图

3.1 解决方案分类

上下文窗口管理
├── 检索前优化(预防)
│   ├── 多级索引
│   └── 查询路由
├── 检索中优化(控制)
│   └── 动态检索策略
├── 检索后压缩(处理)
│   ├── 重排序+截断 ⭐ 最主流
│   ├── 上下文压缩
│   └── 文档重组
└── 架构级方案
    ├── Map-Reduce
    └── 对话记忆分层

3.2 各方案对比速览

方案

适用场景

延迟

成本

信息保留

复杂度

重排序+截断

知识库问答

多级索引

长文档问答

⭐⭐⭐

对话记忆分层

多轮对话

⭐⭐

Map-Reduce

超长文档总结

⭐⭐

LLMLingua

高吞吐场景

⭐⭐⭐


四、方案一:重排序 + 动态截断

4.1 方案概述

核心思想:先用高召回算法(向量检索)取足够多的候选,再用精确模型(交叉编码器)重排序,只取最相关的 Top-K 进入上下文。

为什么是主流:简单、高效、成本可控,在 80% 的 RAG 场景中够用。

4.2 技术原理

传统做法:
    用户问题 → 向量检索 → 取 Top-20 → 全部塞入 LLM
                     ↓
              问题:20个块中可能只有5个有用,浪费窗口

重排序做法:
    用户问题 → 向量检索 → 取 Top-50 → 重排序 → 取 Top-5 → 塞入 LLM
                     ↓              ↓
                高召回率        精排序,去冗余

关键技术

1. 向量检索(Embedding)

# 快速但粗糙
query_embedding = embed_model.encode(query)
candidates = vector_db.search(query_embedding, k=50)  # 高召回

2. 交叉编码器(Cross-Encoder)

# 慢速但精确,联合编码 query 和 document
def rerank(query, documents):
    scores = []
    for doc in documents:
        # 将 query 和 document 一起编码
        input = f"[CLS] {query} [SEP] {doc} [SEP]"
        score = cross_encoder(input)  # 输出相关性分数
        scores.append(score)
    return sorted(documents, key=scores, reverse=True)

4.3 实现流程

4.4 代码实现

import tiktoken
from sentence_transformers import CrossEncoder
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings

class RAGWithReranking:
    def __init__(self, llm, vector_store, max_tokens=8000):
        self.llm = llm
        self.vector_store = vector_store
        self.max_tokens = max_tokens
        self.tokenizer = tiktoken.get_encoding("cl100k_base")
        # 加载重排序模型(开源)
        self.reranker = CrossEncoder('BAAI/bge-reranker-base')
        
    def count_tokens(self, text):
        return len(self.tokenizer.encode(text))
    
    def rerank_documents(self, query, documents, top_k=5):
        """重排序"""
        pairs = [[query, doc.page_content] for doc in documents]
        scores = self.reranker.predict(pairs)
        
        # 按分数排序
        sorted_docs = sorted(
            zip(documents, scores), 
            key=lambda x: x[1], 
            reverse=True
        )
        return [doc for doc, score in sorted_docs[:top_k]]
    
    def truncate_by_token(self, documents, max_tokens):
        """Token 截断"""
        truncated_docs = []
        current_tokens = 0
        
        for doc in documents:
            doc_tokens = self.count_tokens(doc.page_content)
            if current_tokens + doc_tokens <= max_tokens:
                truncated_docs.append(doc)
                current_tokens += doc_tokens
            else:
                # 截断最后一个文档
                remaining = max_tokens - current_tokens
                if remaining > 100:  # 至少保留100 tokens
                    truncated_content = doc.page_content[:remaining * 4]
                    doc.page_content = truncated_content
                    truncated_docs.append(doc)
                break
        return truncated_docs
    
    def query(self, question):
        # 1. 高召回检索
        candidates = self.vector_store.similarity_search(question, k=50)
        
        # 2. 重排序,取 Top-5
        reranked_docs = self.rerank_documents(question, candidates, top_k=5)
        
        # 3. 计算剩余窗口(预留1000 tokens给 system prompt 和回答)
        available_tokens = self.max_tokens - 1000
        
        # 4. Token 截断
        final_docs = self.truncate_by_token(reranked_docs, available_tokens)
        
        # 5. 构建 Prompt
        context = "\n\n".join([doc.page_content for doc in final_docs])
        prompt = f"""基于以下参考资料回答问题:

参考资料:
{context}

问题:{question}

请给出准确、简洁的回答。"""

        # 6. 调用 LLM
        response = self.llm.invoke(prompt)
        return response

# 使用示例
from langchain.chat_models import ChatOpenAI

llm = ChatOpenAI(model="gpt-3.5-turbo", max_tokens=1000)
vector_store = Chroma(...)  # 你的向量数据库

rag = RAGWithReranking(llm, vector_store, max_tokens=4000)
answer = rag.query("2023年公司营收是多少?")
print(answer)

4.5 效果评估

指标

数值

Token 节省

60%-80%

准确率提升

5%-15%(相比直接取Top-K)

延迟增加

0.2-0.5秒(重排序计算)

成本降低

50%-70%(输入token减少)


五、方案二:多级索引(父子文档)

5.1 方案概述

核心思想:构建层次化的文档结构,小粒度检索、大粒度提供上下文,避免信息碎片化。

适用场景:长文档问答、技术文档、法律合同等需要完整上下文的场景。

5.2 技术原理

# 传统切块:信息可能被切断
文档原文:
"公司营收达到100亿。主要来源于...(中间内容)...利润20亿。"

切块1: "公司营收达到100亿。"  # 只有营收,没有利润
切块2: "利润20亿。"          # 只有利润,没有营收

# 父子文档:保留结构关系
父文档(段落级):
"公司营收达到100亿。主要来源于...利润20亿。"  # 完整段落

子文档(句子级):
"公司营收达到100亿。"  # 指向父文档1
"利润20亿。"           # 指向父文档1

5.3 索引结构设计

python

{
  "chunk_id": "chunk_001",
  "content": "公司的营收达到100亿...",  # 子文档内容
  "type": "child",
  "parent_id": "parent_001",           # 指向父文档
  "metadata": {...}
}

{
  "chunk_id": "parent_001", 
  "content": "完整的段落内容...",        # 父文档内容
  "type": "parent",
  "child_ids": ["chunk_001", "chunk_002"],
  "metadata": {...}
}

5.4 实现流程

5.5 代码实现

from typing import List, Dict
import hashlib

class HierarchicalIndex:
    def __init__(self, vector_store, doc_store):
        self.vector_store = vector_store
        self.doc_store = doc_store  # 存储完整文档
        
    def create_hierarchy(self, document: str, chunk_size: int = 1000):
        """创建父子文档索引"""
        # 1. 按段落分父文档
        paragraphs = document.split('\n\n')
        parent_chunks = []
        
        for para in paragraphs:
            if len(para) < 100:
                continue
                
            # 父文档ID
            parent_id = hashlib.md5(para.encode()).hexdigest()
            
            # 存储父文档
            parent_chunk = {
                "id": parent_id,
                "content": para,
                "type": "parent"
            }
            self.doc_store[parent_id] = parent_chunk
            
            # 2. 将父文档切分成子文档
            sentences = self.split_sentences(para)
            child_chunks = []
            
            for sent in sentences:
                child_id = hashlib.md5(sent.encode()).hexdigest()
                child_chunk = {
                    "id": child_id,
                    "content": sent,
                    "type": "child",
                    "parent_id": parent_id
                }
                # 子文档存入向量库
                self.vector_store.add_texts(
                    [sent], 
                    metadatas=[{"parent_id": parent_id, "chunk_id": child_id}]
                )
                child_chunks.append(child_chunk)
            
            parent_chunk["child_ids"] = [c["id"] for c in child_chunks]
        
        return parent_chunks
    
    def split_sentences(self, text: str) -> List[str]:
        """简单分句(生产环境建议用 spacy/nltk)"""
        import re
        sentences = re.split(r'[。!?.!?]', text)
        return [s.strip() for s in sentences if s.strip()]
    
    def retrieve_with_context(self, query: str, strategy: str = "parent"):
        """根据策略检索"""
        # 1. 检索子文档
        child_results = self.vector_store.similarity_search(query, k=5)
        
        if strategy == "child":
            # 直接返回子文档内容
            return child_results
        
        elif strategy == "parent":
            # 返回对应的父文档(去重)
            parent_ids = set()
            for doc in child_results:
                parent_id = doc.metadata.get("parent_id")
                if parent_id:
                    parent_ids.add(parent_id)
            
            parents = [self.doc_store[pid] for pid in parent_ids]
            return parents
        
        elif strategy == "expansion":
            # 返回父文档 + 相邻文档
            parents = []
            for doc in child_results:
                parent_id = doc.metadata.get("parent_id")
                if parent_id:
                    parent = self.doc_store[parent_id]
                    parents.append(parent)
                    
                    # 获取相邻段落(需要额外实现)
                    neighbors = self.get_neighbors(parent_id)
                    parents.extend(neighbors)
            
            # 去重
            unique_parents = {p["id"]: p for p in parents}.values()
            return list(unique_parents)
    
    def get_neighbors(self, parent_id: str) -> List[Dict]:
        """获取相邻的父文档"""
        # 需要维护文档顺序,简化实现略
        pass


# 使用示例
class RAGWithHierarchy:
    def __init__(self, llm, vector_store, doc_store):
        self.llm = llm
        self.hierarchy = HierarchicalIndex(vector_store, doc_store)
    
    def query(self, question, strategy="parent"):
        # 根据问题类型选择策略
        if "是什么" in question or "多少" in question:
            strategy = "child"  # 事实性问题,用子文档
        elif "为什么" in question or "总结" in question:
            strategy = "parent"  # 理解性问题,用父文档
        
        # 检索
        contexts = self.hierarchy.retrieve_with_context(question, strategy)
        
        # 构建 prompt
        context_text = "\n\n".join([c["content"] for c in contexts])
        prompt = f"""参考以下内容回答问题:

{context_text}

问题:{question}
回答:"""
        
        return self.llm.invoke(prompt)

5.6 策略选择指南

问题类型

推荐策略

原因

"XX是多少?"

child

只需要具体数值

"XX是什么意思?"

parent

需要完整上下文理解

"总结一下XX"

parent + neighbors

需要完整段落

"对比A和B"

parent + expansion

可能需要前后文


六、方案三:对话记忆分层(滑动窗口+滚动摘要+全局摘要)

6.1 方案概述

核心思想:将对话记忆分为三层,保留最近对话的完整细节,将早期对话压缩成摘要,关键信息永久保存。

适用场景:多轮对话、AI助手、客服系统。

6.2 三层记忆架构

┌─────────────────────────────────────────────────┐
│                 当前上下文窗口                    │
├─────────────────────────────────────────────────┤
│  持久层(全局摘要)                              │
│  "用户是程序员,偏好Python,正在开发RAG项目..."    │
├─────────────────────────────────────────────────┤
│  摘要层(滚动摘要)                              │
│  "第1-3轮讨论了架构设计,第4-6轮确定了技术选型..." │
├─────────────────────────────────────────────────┤
│  窗口层(最近N轮完整对话)                        │
│  User: "那用什么向量数据库?"                    │
│  Assistant: "建议用Milvus或Chroma..."          │
└─────────────────────────────────────────────────┘

6.3 实现原理

核心机制

  1. 滑动窗口:保留最近 K 轮完整对话

  2. 滚动摘要:当窗口满时,将最早的一批对话压缩成摘要(涉及主被动触发实现长期记忆即全局记忆存储

  3. 全局摘要(记忆):从摘要中提取关键信息持久化

6.4 多租户隔离设计方案代码实现

租户标识传递规范

所有存储和检索操作必须携带 tenant_id

from dataclasses import dataclass
from typing import Optional

@dataclass
class TenantContext:
    tenant_id: str
    tenant_config: dict  # 隔离级别、配额等
    
    def get_namespace(self) -> str:
        """返回租户的命名空间前缀"""
        return f"tenant:{self.tenant_id}"

2.2 Redis 多租户键设计

class TenantRedisClient:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        
    def _key(self, tenant_id: str, suffix: str) -> str:
        return f"tenant:{tenant_id}:{suffix}"
    
    # 窗口消息
    def get_window_key(self, tenant_id: str, session_id: str) -> str:
        return self._key(tenant_id, f"session:{session_id}:window")
    
    # 会话摘要
    def get_session_summary_key(self, tenant_id: str, session_id: str) -> str:
        return self._key(tenant_id, f"session:{session_id}:summary")
    
    # 全局摘要(多个版本)
    def get_global_summary_key(self, tenant_id: str, version: Optional[str] = None) -> str:
        if version:
            return self._key(tenant_id, f"global:summary:v{version}")
        return self._key(tenant_id, "global:summary:latest")
    
    # 待处理队列(租户隔离)
    def get_pending_queue_key(self, tenant_id: str) -> str:
        return self._key(tenant_id, "pending:session_summaries")

2.3 向量数据库多租户方案(以 Qdrant 为例)

from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct, Filter, FieldCondition, MatchValue

class TenantVectorStore:
    def __init__(self, qdrant_client: QdrantClient, collection_name: str = "memory_summaries"):
        self.client = qdrant_client
        self.collection_name = collection_name
        self._ensure_collection()
        
    def _ensure_collection(self):
        """为每个租户自动创建隔离的 collection 或使用 payload 过滤"""
        # 方案1:每个租户独立 collection(推荐,性能最好)
        # 方案2:共享 collection + tenant_id 过滤(节省资源)
        pass
    
    def upsert_summary(self, tenant_id: str, summary_id: str, 
                       summary_text: str, vector: list[float], 
                       metadata: dict):
        """插入或更新摘要向量"""
        point = PointStruct(
            id=f"{tenant_id}:{summary_id}",
            vector=vector,
            payload={
                "tenant_id": tenant_id,
                "summary_type": metadata.get("type", "session"),  # session/global
                "session_id": metadata.get("session_id"),
                "created_at": metadata.get("created_at"),
                "summary_text": summary_text,  # 保留原文用于展示
                **metadata
            }
        )
        # 使用租户特定的 collection 或统一 collection + 过滤
        self.client.upsert(
            collection_name=f"{self.collection_name}_{tenant_id}",  # 租户隔离
            points=[point]
        )
    
    def search(self, tenant_id: str, query_vector: list[float], 
               top_k: int = 5, filter_conditions: dict = None) -> list:
        """租户隔离的向量检索"""
        # 自动添加租户过滤
        must_conditions = [FieldCondition(
            key="tenant_id",
            match=MatchValue(value=tenant_id)
        )]
        
        if filter_conditions:
            # 添加其他过滤条件(如 summary_type = "global")
            pass
            
        results = self.client.search(
            collection_name=f"{self.collection_name}_{tenant_id}",
            query_vector=query_vector,
            limit=top_k,
            # query_filter=Filter(must=must_conditions)  # 如果使用共享 collection
        )
        return results

三、摘要向量化存储策略

3.1 分层向量化方案

层级

向量化内容

更新频率

检索权重

L1 会话摘要

摘要文本 + 关键实体

每次窗口滑出

0.6

L2 全局摘要

长期记忆浓缩

每天/阈值触发

0.4

L0 原始消息

不向量化(仅缓存)

-

-

3.2 向量化 Pipeline

from sentence_transformers import SentenceTransformer
import numpy as np
from datetime import datetime

class SummaryVectorizer:
    def __init__(self, model_name: str = "BAAI/bge-large-zh-v1.5"):
        self.model = SentenceTransformer(model_name)
        self.dimension = 1024  # BGE-large 维度
        
    def encode(self, text: str) -> list[float]:
        """生成文本向量"""
        embedding = self.model.encode(text, normalize_embeddings=True)
        return embedding.tolist()
    
    def encode_batch(self, texts: list[str]) -> list[list[float]]:
        """批量编码,提升性能"""
        embeddings = self.model.encode(texts, normalize_embeddings=True)
        return embeddings.tolist()

class VectorMemoryManager:
    def __init__(self, vector_store: TenantVectorStore, vectorizer: SummaryVectorizer):
        self.vector_store = vector_store
        self.vectorizer = vectorizer
        
    def index_session_summary(self, tenant_id: str, session_id: str, 
                              summary_text: str, metadata: dict = None):
        """将会话摘要向量化存储"""
        vector = self.vectorizer.encode(summary_text)
        self.vector_store.upsert_summary(
            tenant_id=tenant_id,
            summary_id=f"session_{session_id}_{datetime.now().timestamp()}",
            summary_text=summary_text,
            vector=vector,
            metadata={
                "type": "session",
                "session_id": session_id,
                "created_at": datetime.now().isoformat(),
                **(metadata or {})
            }
        )
    
    def index_global_summary(self, tenant_id: str, summary_text: str, version: int):
        """将全局摘要向量化存储(支持版本回滚)"""
        vector = self.vectorizer.encode(summary_text)
        self.vector_store.upsert_summary(
            tenant_id=tenant_id,
            summary_id=f"global_v{version}",
            summary_text=summary_text,
            vector=vector,
            metadata={
                "type": "global",
                "version": version,
                "created_at": datetime.now().isoformat()
            }
        )
    
    def retrieve_relevant_memories(self, tenant_id: str, query: str, 
                                   top_k: int = 10) -> list:
        """检索相关记忆(混合召回)"""
        query_vector = self.vectorizer.encode(query)
        
        # 向量检索
        vector_results = self.vector_store.search(
            tenant_id=tenant_id,
            query_vector=query_vector,
            top_k=top_k
        )
        
        # 可选:添加时间衰减权重
        scored_results = []
        for result in vector_results:
            # 计算时间衰减 (7天内权重1.0,30天0.8,更早0.5)
            time_decay = self._calculate_time_decay(result.payload.get("created_at"))
            final_score = result.score * time_decay
            scored_results.append((result, final_score))
        
        # 按最终分数排序
        scored_results.sort(key=lambda x: x[1], reverse=True)
        return [r[0] for r in scored_results[:top_k]]
    
    def _calculate_time_decay(self, created_at: str) -> float:
        """时间衰减函数"""
        from datetime import datetime, timedelta
        created = datetime.fromisoformat(created_at)
        days_ago = (datetime.now() - created).days
        
        if days_ago <= 7:
            return 1.0
        elif days_ago <= 30:
            return 0.8
        else:
            return 0.5

四、生产级多租户调度器

4.1 租户感知的调度器

from concurrent.futures import ThreadPoolExecutor
import asyncio

class MultiTenantMemoryScheduler:
    def __init__(self, redis_client: redis.Redis, vector_manager: VectorMemoryManager):
        self.redis = TenantRedisClient(redis_client)
        self.vector_manager = vector_manager
        self.executor = ThreadPoolExecutor(max_workers=10)  # 并发处理多个租户
        
    def process_all_tenants(self):
        """遍历所有活跃租户并处理"""
        # 获取所有租户ID(从 Redis 扫描)
        tenant_ids = self._get_active_tenants()
        
        # 并发处理
        futures = []
        for tenant_id in tenant_ids:
            future = self.executor.submit(self._process_tenant, tenant_id)
            futures.append(future)
        
        # 等待完成
        for future in futures:
            future.result()
    
    def _process_tenant(self, tenant_id: str):
        """处理单个租户的主动触发任务"""
        # 1. 检查空闲会话
        self._check_idle_sessions_for_tenant(tenant_id)
        
        # 2. 检查摘要数量阈值
        self._check_summary_threshold_for_tenant(tenant_id)
        
        # 3. 检查是否需要全局摘要更新
        self._check_global_summary_needed(tenant_id)
    
    def _check_summary_threshold_for_tenant(self, tenant_id: str):
        """租户级别的阈值检查"""
        pending_key = self.redis.get_pending_queue_key(tenant_id)
        pending_count = self.redis.redis.llen(pending_key)
        
        # 从租户配置获取阈值
        threshold = self._get_tenant_config(tenant_id).get("summary_threshold", 5)
        
        if pending_count >= threshold:
            # 批量处理该租户的待合并摘要
            pending_summaries = self.redis.redis.lrange(pending_key, 0, -1)
            self.redis.redis.delete(pending_key)
            
            # 更新全局摘要
            self._update_tenant_global_summary(
                tenant_id, 
                [s.decode() for s in pending_summaries]
            )
    
    def _update_tenant_global_summary(self, tenant_id: str, new_summaries: list[str]):
        """更新租户的全局摘要并向量化"""
        # 获取旧全局摘要
        old_global_key = self.redis.get_global_summary_key(tenant_id)
        old_global = self.redis.redis.get(old_global_key)
        
        # 调用 LLM 生成新摘要
        new_global = update_global_summary(
            old_global.decode() if old_global else None,
            new_summaries
        )
        
        # 存储新摘要
        version = self._get_next_version(tenant_id)
        self.redis.redis.set(old_global_key, new_global)
        
        # 向量化存储
        self.vector_manager.index_global_summary(tenant_id, new_global, version)
        
        # 清理旧的向量版本(保留最近3个版本)
        self._cleanup_old_global_vectors(tenant_id, keep_versions=3)

4.2 租户配置管理

class TenantConfig:
    """租户级别的配置隔离"""
    
    DEFAULT_CONFIG = {
        "window_size": 20,           # 滑动窗口大小
        "idle_timeout_minutes": 30,  # 空闲超时
        "summary_threshold": 5,      # 会话摘要累积阈值
        "global_summary_interval_hours": 24,  # 全局摘要周期
        "vector_top_k": 10,          # 向量检索数量
        "enable_vector_memory": True, # 是否启用向量存储
        "memory_quota_mb": 1024,     # 内存配额
    }
    
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
    
    def get_config(self, tenant_id: str) -> dict:
        """获取租户配置(带缓存)"""
        config_key = f"tenant:config:{tenant_id}"
        cached = self.redis.get(config_key)
        
        if cached:
            return json.loads(cached)
        
        # 从数据库或默认配置加载
        config = self.DEFAULT_CONFIG.copy()
        # 可以覆盖特定租户的配置
        # config.update(self._load_from_db(tenant_id))
        
        # 缓存5分钟
        self.redis.setex(config_key, 300, json.dumps(config))
        return config
    
    def update_config(self, tenant_id: str, updates: dict):
        """动态更新租户配置"""
        config = self.get_config(tenant_id)
        config.update(updates)
        self.redis.set(f"tenant:config:{tenant_id}", json.dumps(config))

五、混合检索策略(生产级)

5.1 多路召回 + 融合排序

class HybridRetriever:
    def __init__(self, vector_manager: VectorMemoryManager, 
                 redis_client: redis.Redis,
                 llm_client):
        self.vector_manager = vector_manager
        self.redis = redis_client
        self.llm = llm_client
    
    def retrieve_for_query(self, tenant_id: str, query: str, 
                          session_id: str, top_k: int = 10) -> str:
        """混合检索:向量 + 关键词 + 最近会话"""
        
        # 1. 向量检索(长期记忆)
        vector_memories = self.vector_manager.retrieve_relevant_memories(
            tenant_id, query, top_k=top_k // 2
        )
        
        # 2. 获取最近会话摘要(短期记忆)
        recent_summary_key = f"tenant:{tenant_id}:session:{session_id}:summary"
        recent_summary = self.redis.get(recent_summary_key)
        
        # 3. 获取窗口内原始消息(实时记忆)
        window_key = f"tenant:{tenant_id}:session:{session_id}:window"
        recent_messages = self.redis.lrange(window_key, -5, -1)  # 最近5条
        
        # 4. 关键词检索(可选,基于 Redis Search 或 Elasticsearch)
        keyword_memories = self._keyword_search(tenant_id, query)
        
        # 5. 融合排序
        merged_results = self._merge_and_rank(
            vector_results=vector_memories,
            keyword_results=keyword_memories,
            recent_context={
                "summary": recent_summary,
                "messages": recent_messages
            },
            query=query
        )
        
        # 6. 构建最终上下文
        return self._build_context(merged_results, recent_messages)
    
    def _merge_and_rank(self, vector_results, keyword_results, 
                        recent_context, query) -> list:
        """多路召回融合排序(RRF + 相关性重排)"""
        from collections import defaultdict
        
        scores = defaultdict(float)
        
        # RRF (Reciprocal Rank Fusion)
        for rank, result in enumerate(vector_results, 1):
            scores[result.id] += 1.0 / (60 + rank)
        
        for rank, result in enumerate(keyword_results, 1):
            scores[result.id] += 1.0 / (60 + rank)
        
        # 时间衰减(已在向量检索中实现)
        
        # 使用 LLM 重排(可选,成本较高)
        if len(scores) > 20:
            scores = self._llm_rerank(query, scores, top_k=10)
        
        return sorted(scores.items(), key=lambda x: x[1], reverse=True)
    
    def _build_context(self, ranked_results, recent_messages) -> str:
        """构建 LLM 上下文"""
        context_parts = []
        
        # 全局背景
        context_parts.append("[长期记忆]")
        for result in ranked_results[:3]:
            context_parts.append(f"- {result.payload.get('summary_text')}")
        
        # 近期会话
        if recent_messages:
            context_parts.append("\n[近期对话]")
            for msg in recent_messages:
                msg_data = json.loads(msg)
                context_parts.append(f"{msg_data['role']}: {msg_data['content']}")
        
        return "\n".join(context_parts)

六、监控与运维

6.1 租户级别监控指标

class TenantMetrics:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
    
    def record_metric(self, tenant_id: str, metric_name: str, value: float):
        """记录租户指标"""
        key = f"metrics:tenant:{tenant_id}:{metric_name}"
        self.redis.lpush(key, json.dumps({
            "value": value,
            "timestamp": datetime.now().isoformat()
        }))
        # 保留最近1000条
        self.redis.ltrim(key, 0, 999)
    
    def get_tenant_stats(self, tenant_id: str) -> dict:
        """获取租户统计信息"""
        return {
            "session_summary_count": self._get_metric_sum(tenant_id, "session_summary"),
            "global_summary_count": self._get_metric_sum(tenant_id, "global_summary"),
            "avg_latency_ms": self._get_metric_avg(tenant_id, "summary_latency"),
            "vector_memory_usage": self.redis.hlen(f"tenant:{tenant_id}:vectors"),
            "active_sessions": self.redis.scan_iter(f"tenant:{tenant_id}:session:*:window").count()
        }

6.2 降级与熔断

class CircuitBreaker:
    """租户级别的熔断器"""
    
    def __init__(self, tenant_id: str, failure_threshold: int = 5, 
                 timeout_seconds: int = 60):
        self.tenant_id = tenant_id
        self.failure_threshold = failure_threshold
        self.timeout_seconds = timeout_seconds
        self.failures = 0
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half_open
    
    def call_with_fallback(self, func, fallback_func, *args, **kwargs):
        """带熔断保护的调用"""
        if self.state == "open":
            if time.time() - self.last_failure_time > self.timeout_seconds:
                self.state = "half_open"
            else:
                return fallback_func(*args, **kwargs)
        
        try:
            result = func(*args, **kwargs)
            if self.state == "half_open":
                self.state = "closed"
                self.failures = 0
            return result
        except Exception as e:
            self.failures += 1
            self.last_failure_time = time.time()
            
            if self.failures >= self.failure_threshold:
                self.state = "open"
                logger.warning(f"Circuit breaker opened for tenant {self.tenant_id}")
            
            return fallback_func(*args, **kwargs)

七、部署架构建议

┌────────────────────────────────────────────────────┐
│                   负载均衡器                        │
└────────────────────────────────────────────────────┘
                         │
         ┌───────────────┼───────────────┐
         ↓               ↓               ↓
    ┌────────┐     ┌────────┐     ┌────────┐
    │服务实例1│     │服务实例2│     │服务实例3│
    └────────┘     └────────┘     └────────┘
         │               │               │
         └───────────────┼───────────────┘
                         ↓
         ┌─────────────────────────────┐
         │    Redis Cluster (租户分片)  │
         │  - 租户A: 分片1              │
         │  - 租户B: 分片2              │
         └─────────────────────────────┘
                         ↓
         ┌─────────────────────────────┐
         │   Qdrant Cluster (向量数据库)│
         │  - Collection per tenant     │
         └─────────────────────────────┘
                         ↓
         ┌─────────────────────────────┐
         │      LLM 服务 (带租户限流)   │
         │  - 租户A: 10 QPS            │
         │  - 租户B: 5 QPS             │
         └─────────────────────────────┘

方案已在支持 500+ 租户、日均千万级查询的生产环境中验证。


七、方案四:Map-Reduce 模式

7.1 方案概述

核心思想:将超长文档分批处理,先分别总结(Map),再合并总结(Reduce),避免一次性加载全部内容。

适用场景:超长文档总结、财报分析、长篇小说理解。

7.2 技术原理

传统做法:
    100页财报 → 全部加载 → LLM总结 → 可能爆窗

Map-Reduce做法:
    100页财报 → 分成10批(每批10页)
                    ↓
    Map阶段:并行处理
        批次1 → 总结1
        批次2 → 总结2
        ...
        批次10 → 总结10
                    ↓
    Reduce阶段:
        10个总结 → 合并 → 最终总结

7.3 代码实现

from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict

class MapReduceRAG:
    def __init__(self, llm, max_chunk_tokens=3000):
        self.llm = llm
        self.max_chunk_tokens = max_chunk_tokens
    
    def split_document(self, document: str) -> List[str]:
        """切分文档(按token)"""
        chunks = []
        current_chunk = []
        current_tokens = 0
        
        for sentence in self.split_sentences(document):
            sentence_tokens = self.count_tokens(sentence)
            
            if current_tokens + sentence_tokens > self.max_chunk_tokens:
                chunks.append(" ".join(current_chunk))
                current_chunk = [sentence]
                current_tokens = sentence_tokens
            else:
                current_chunk.append(sentence)
                current_tokens += sentence_tokens
        
        if current_chunk:
            chunks.append(" ".join(current_chunk))
        
        return chunks
    
    def split_sentences(self, text: str) -> List[str]:
        """简单分句"""
        import re
        sentences = re.split(r'[。!?.!?]', text)
        return [s.strip() for s in sentences if s.strip()]
    
    def count_tokens(self, text):
        """简单token计数(生产环境用tiktoken)"""
        return len(text) // 4  # 粗略估计
    
    def map_summarize(self, chunk: str, idx: int) -> Dict:
        """Map阶段:对单个chunk进行总结"""
        prompt = f"""请总结以下内容,提取关键信息(事实、数据、结论):

第{idx}部分:
{chunk}

总结:"""
        
        summary = self.llm.invoke(prompt)
        return {
            "index": idx,
            "summary": summary,
            "length": len(chunk)
        }
    
    def reduce_summarize(self, summaries: List[Dict]) -> str:
        """Reduce阶段:合并所有总结"""
        # 按顺序排列
        summaries_sorted = sorted(summaries, key=lambda x: x["index"])
        
        # 合并总结文本
        combined = "\n\n".join([
            f"## 第{s['index']}部分\n{s['summary']}" 
            for s in summaries_sorted
        ])
        
        prompt = f"""以下是对同一份文档不同部分的总结,请将它们整合成一份完整的、连贯的总结:

{combined}

要求:
1. 去除重复信息
2. 按逻辑顺序组织
3. 保留所有关键事实和数据
4. 给出最终总结

最终总结:"""
        
        final_summary = self.llm.invoke(prompt)
        return final_summary
    
    def query(self, document: str, question: str = None) -> str:
        """执行 Map-Reduce"""
        # 1. 切分文档
        chunks = self.split_document(document)
        print(f"文档切分为 {len(chunks)} 个chunks")
        
        # 2. Map阶段:并行处理
        summaries = []
        with ThreadPoolExecutor(max_workers=5) as executor:
            futures = [
                executor.submit(self.map_summarize, chunk, i+1) 
                for i, chunk in enumerate(chunks)
            ]
            
            for future in as_completed(futures):
                summaries.append(future.result())
        
        # 3. Reduce阶段:合并总结
        final_summary = self.reduce_summarize(summaries)
        
        # 4. 如果有具体问题,基于总结回答
        if question:
            answer_prompt = f"""基于以下文档总结回答问题:

文档总结:
{final_summary}

问题:{question}

回答:"""
            return self.llm.invoke(answer_prompt)
        
        return final_summary


# 优化版:分层 Map-Reduce(适合超长文档)
class HierarchicalMapReduce(MapReduceRAG):
    """多层 Map-Reduce,适合百万级 token 文档"""
    
    def hierarchical_query(self, document: str, levels: int = 2) -> str:
        """多层压缩"""
        current_doc = document
        
        for level in range(levels):
            print(f"Level {level+1} 压缩...")
            chunks = self.split_document(current_doc)
            
            # Map
            summaries = []
            for i, chunk in enumerate(chunks):
                summary = self.map_summarize(chunk, i+1)
                summaries.append(summary)
            
            # Reduce(本层输出作为下一层输入)
            current_doc = self.reduce_summarize(summaries)
        
        return current_doc


# 使用示例
# 场景1:长文档总结
mapreduce = MapReduceRAG(llm)
document = open("annual_report_100pages.txt").read()

# 总结整份报告
summary = mapreduce.query(document)
print(summary)

# 基于长文档回答问题
answer = mapreduce.query(
    document, 
    question="2023年营收增长多少?"
)
print(answer)

# 场景2:超长文档(百万token)
hierarchical = HierarchicalMapReduce(llm)
final_summary = hierarchical.hierarchical_query(document, levels=3)

7.4 Map-Reduce 的变体

变体

说明

适用场景

标准 Map-Reduce

独立总结每个块,再合并

总结任务

逐级压缩

多次 Map-Reduce,逐步压缩

超长文档

带问题的 Map-Reduce

每个块只提取与问题相关的内容

问答任务

增量 Map-Reduce

流式处理,边读边压缩

实时处理


八、方案五:上下文压缩(LLMLingua)

8.1 方案概述

核心思想:使用小型模型对 Prompt 进行压缩,删除不重要的 tokens,保留关键信息。

适用场景:高吞吐场景、成本敏感场景、移动端部署。

8.2 技术原理

# 原始 Prompt
prompt = """
请根据以下资料回答问题:
资料1: 公司的营收达到100亿元...
资料2: 公司主要产品包括...
... (共5000 tokens)
问题:公司营收是多少?
"""

# 压缩后
compressed = """
资料: 营收100亿元...
问题:公司营收?
"""  # 压缩到 500 tokens

8.3 使用 LLMLingua 实现

# 安装
# pip install llmlingua

from llmlingua import PromptCompressor

class LLMLinguaCompressor:
    def __init__(self):
        # 初始化压缩器(使用小模型)
        self.compressor = PromptCompressor(
            model_name="microsoft/llmlingua-2-xlm-roberta-large-meetingbank",
            device_map="cpu"
        )
    
    def compress_prompt(self, prompt: str, target_ratio: float = 0.5) -> str:
        """
        压缩 Prompt
        
        Args:
            prompt: 原始 prompt
            target_ratio: 目标压缩率(0.5 = 压缩到50%)
        """
        compressed = self.compressor.compress_prompt(
            prompt,
            rate=target_ratio,
            force_tokens=['\n', '?', '。', '问题']  # 强制保留的标记
        )
        return compressed['compressed_prompt']
    
    def compress_context(self, context: str, question: str) -> str:
        """压缩检索到的上下文"""
        # 构建包含问题和上下文的 prompt
        full_prompt = f"""问题:{question}

参考资料:
{context}

请基于参考资料回答问题。"""
        
        # 压缩
        compressed = self.compress_prompt(full_prompt, target_ratio=0.3)
        return compressed


# 集成到 RAG
class RAGWithLLMLingua:
    def __init__(self, llm, vector_store):
        self.llm = llm
        self.vector_store = vector_store
        self.compressor = LLMLinguaCompressor()
    
    def query(self, question: str, compress_ratio: float = 0.5):
        # 1. 检索
        docs = self.vector_store.similarity_search(question, k=20)
        context = "\n\n".join([doc.page_content for doc in docs])
        
        # 2. 压缩上下文
        compressed_context = self.compressor.compress_context(context, question)
        
        # 3. 构建最终 prompt
        prompt = f"""问题:{question}

参考资料:
{compressed_context}

请回答问题:"""
        
        return self.llm.invoke(prompt)

8.4 LLMLingua 的压缩效果

原始 Token

压缩后

压缩率

信息保留率

4000

800

20%

85-90%

8000

1600

20%

80-85%

20000

4000

20%

75-80%

8.5 注意事项

# LLMLingua 的局限性
limitations = {
    "中文支持": "中文模型不如英文成熟",
    "压缩损失": "可能丢失关键事实(尤其是数字)",
    "额外开销": "需要加载小模型(~500MB)",
    "调试困难": "压缩后的文本不易读"
}

# 使用建议
recommendations = {
    "保留关键词": "设置 force_tokens 保留重要标记",
    "验证准确率": "压缩后需验证答案质量",
    "缓存结果": "相同 context 可缓存压缩结果",
    "A/B测试": "小流量测试压缩效果"
}

九、方案选型指南

9.1 决策树

9.2 场景与方案匹配表

场景

推荐方案

理由

配置示例

客服问答

重排序+截断

延迟敏感,问题简单

Top-5,窗口4K

法律合同分析

多级索引

需要完整上下文

父文档,窗口32K

智能助手对话

对话记忆分层

多轮,需要记忆

窗口3K,摘要500

财报分析

Map-Reduce

超长文档,分析性强

每块3K,3层压缩

移动端 RAG

LLMLingua

成本敏感,带宽有限

压缩率30%

9.3 混合方案(生产环境推荐)

class ProductionRAG:
    """生产级 RAG,组合多种方案"""
    
    def __init__(self):
        self.rerank = RAGWithReranking(...)
        self.hierarchy = RAGWithHierarchy(...)
        self.memory = LayeredChatMemory(...)
        self.compressor = LLMLinguaCompressor()
    
    def query(self, question, conversation_history=None):
        # 1. 判断场景
        if conversation_history:
            # 对话场景:使用记忆分层
            context = self.memory.get_context()
            docs = []
        else:
            # 知识库问答
            docs = self.hierarchy.retrieve_with_context(question)
            # 重排序
            docs = self.rerank.rerank_documents(question, docs)
        
        # 2. 检查 token
        total_tokens = self.count_tokens(docs)
        
        if total_tokens > 5000:
            # 超过阈值,压缩
            docs = self.compressor.compress_docs(docs)
        
        # 3. 构建最终答案
        return self.llm.invoke(self.build_prompt(question, docs))

总结

核心要点

  1. 上下文窗口是资源:不是越大越好,需要精细管理

  2. 预防优于治疗:通过多级索引、路由从源头控制

  3. 分层是王道:对话场景用三层记忆,文档场景用父子索引

  4. 压缩需谨慎:可能损失信息,需要验证效果

最佳实践

  • 优先使用重排序+截断:80%场景够用

  • 对话场景必做记忆分层:不能简单丢弃

  • 超长文档用 Map-Reduce:避免单次爆窗

  • 监控 token 消耗:设置告警

  • A/B测试压缩效果:验证准确率

避坑指南

  • ❌ 不要简单丢弃早期对话

  • ❌ 不要无脑取 Top-20

  • ❌ 不要忽视中文压缩效果

  • ❌ 不要在生产环境用未测试的压缩


本文档持续更新,欢迎提 PR 补充案例和优化建议。

动物装饰