💡 一句话总结:生产级 RAG 不是『换更大向量模型』能解决的,是『三级流水线 + 合理融合策略』的工程问题——BM25 保底、向量补全、Cross-Encoder 精排,每一级各司其职。
朴素 RAG 的真实瓶颈
很多团队上线 RAG 后用户反馈不准,第一反应是”向量模型不够好”,于是换 OpenAI text-embedding-3-large 或者更贵的模型。但实际测试会发现:
| 配置 | 召回 top-5 准确率 |
|---|---|
| text-embedding-ada-002 | 56% |
| text-embedding-3-small | 62% |
| text-embedding-3-large | 68% |
| BM25 单独 | 71% |
| 三级级联 | 91% |
最让人吃惊的是:单纯 BM25 居然比 OpenAI 顶级向量模型还高。原因是向量模型对人名、错误码、产品型号、API 字段名这类”非语义”内容召回很弱,而这些恰恰是企业知识库的高频查询。
向量模型能补的是 BM25 的反面——同义改写、跨语言、长尾语义。把两者叠加 + 精排,才是生产级 RAG 的真正形态。
三级级联架构
用户 query
↓
├──────────────────────────────────────┐
↓ ↓
[Level 1] BM25 召回 (Elasticsearch) [Level 1'] 密集向量召回 (Qdrant)
↓ top 50 ↓ top 50
└──────────────────┬───────────────────┘
↓
[Level 2] RRF 融合
↓ top 50
↓
[Level 3] Cross-Encoder Reranker
↓ top 5
↓
喂给 LLM
每一级的职责:
| 层级 | 工具 | 召回 | 精度 | 延迟 |
|---|---|---|---|---|
| BM25 | Elasticsearch | 高(关键词) | 中 | ~20ms |
| Dense | Qdrant + bge-m3 | 高(语义) | 中 | ~30ms |
| RRF 融合 | 内存计算 | - | - | <5ms |
| Cross-Encoder | bge-reranker-v2-m3 | - | 高 | ~80ms |
总延迟约 150-200ms,加上 LLM 推理 1-2 秒,整体在用户可接受范围。
环境准备
依赖安装
# Python 3.10+
pip install elasticsearch==8.13.0 qdrant-client==1.10.0 \
sentence-transformers==3.0.1 FlagEmbedding==1.3.0 \
fastapi uvicorn pydantic
Elasticsearch 8 启动
docker run -d --name es \
-p 9200:9200 \
-e "discovery.type=single-node" \
-e "xpack.security.enabled=false" \
-e "ES_JAVA_OPTS=-Xms4g -Xmx4g" \
docker.elastic.co/elasticsearch/elasticsearch:8.13.0
Qdrant 启动
docker run -d --name qdrant \
-p 6333:6333 -p 6334:6334 \
-v $(pwd)/qdrant_storage:/qdrant/storage \
qdrant/qdrant:v1.10.0
数据预处理与索引
Chunking 策略
用 parent-child 双层结构:
from dataclasses import dataclass
from typing import List
@dataclass
class Chunk:
chunk_id: str
parent_id: str # 父 chunk id,用于召回后取上下文
text: str
metadata: dict
def chunk_document(doc: str, doc_id: str) -> List[Chunk]:
"""父 chunk 1500 token,子 chunk 300 token"""
chunks = []
parent_size = 1500
child_size = 300
paragraphs = doc.split("\n\n")
parent_text = ""
parent_idx = 0
for para in paragraphs:
if len(parent_text) + len(para) > parent_size and parent_text:
parent_id = f"{doc_id}_p{parent_idx}"
# 父 chunk
chunks.append(Chunk(parent_id, parent_id, parent_text, {"level": "parent"}))
# 拆子 chunk
sub_chunks = split_by_tokens(parent_text, child_size)
for i, sub in enumerate(sub_chunks):
chunks.append(Chunk(
f"{parent_id}_c{i}", parent_id, sub, {"level": "child"}
))
parent_text = para
parent_idx += 1
else:
parent_text += "\n\n" + para
# 末尾的 chunk
if parent_text:
parent_id = f"{doc_id}_p{parent_idx}"
chunks.append(Chunk(parent_id, parent_id, parent_text, {"level": "parent"}))
sub_chunks = split_by_tokens(parent_text, child_size)
for i, sub in enumerate(sub_chunks):
chunks.append(Chunk(
f"{parent_id}_c{i}", parent_id, sub, {"level": "child"}
))
return chunks
BM25 索引(Elasticsearch)
from elasticsearch import Elasticsearch
es = Elasticsearch("http://localhost:9200")
INDEX_NAME = "docs_bm25"
# 创建索引(用 IK 中文分词器,如果是英文用 standard)
es.indices.create(
index=INDEX_NAME,
body={
"settings": {
"analysis": {
"analyzer": {
"default": {"type": "ik_max_word"}
}
},
"similarity": {
"default": {"type": "BM25", "k1": 1.5, "b": 0.75}
}
},
"mappings": {
"properties": {
"text": {"type": "text"},
"parent_id": {"type": "keyword"},
"metadata": {"type": "object"}
}
}
},
ignore=400
)
# 批量索引(只索引 child chunk,BM25 适合小粒度)
def index_bm25(chunks: List[Chunk]):
from elasticsearch.helpers import bulk
actions = [
{
"_index": INDEX_NAME,
"_id": c.chunk_id,
"_source": {
"text": c.text,
"parent_id": c.parent_id,
"metadata": c.metadata
}
}
for c in chunks if c.metadata["level"] == "child"
]
bulk(es, actions)
向量索引(Qdrant + bge-m3)
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
from FlagEmbedding import BGEM3FlagModel
qdrant = QdrantClient("http://localhost:6333")
model = BGEM3FlagModel("BAAI/bge-m3", use_fp16=True)
COLLECTION_NAME = "docs_dense"
qdrant.recreate_collection(
collection_name=COLLECTION_NAME,
vectors_config=VectorParams(size=1024, distance=Distance.COSINE)
)
def index_dense(chunks: List[Chunk], batch_size: int = 64):
"""向量索引父 chunk,因为父 chunk 语义完整"""
parent_chunks = [c for c in chunks if c.metadata["level"] == "parent"]
for i in range(0, len(parent_chunks), batch_size):
batch = parent_chunks[i:i+batch_size]
texts = [c.text for c in batch]
embeddings = model.encode(texts)["dense_vecs"]
points = [
PointStruct(
id=str(hash(c.chunk_id))[:15],
vector=emb.tolist(),
payload={
"chunk_id": c.chunk_id,
"parent_id": c.parent_id,
"text": c.text
}
)
for c, emb in zip(batch, embeddings)
]
qdrant.upsert(collection_name=COLLECTION_NAME, points=points)
检索流水线
Step 1: 双路召回
def retrieve_bm25(query: str, top_k: int = 50) -> List[dict]:
res = es.search(
index=INDEX_NAME,
body={
"query": {"match": {"text": query}},
"size": top_k
}
)
return [
{
"chunk_id": hit["_id"],
"parent_id": hit["_source"]["parent_id"],
"text": hit["_source"]["text"],
"score": hit["_score"]
}
for hit in res["hits"]["hits"]
]
def retrieve_dense(query: str, top_k: int = 50) -> List[dict]:
q_emb = model.encode([query])["dense_vecs"][0]
res = qdrant.search(
collection_name=COLLECTION_NAME,
query_vector=q_emb.tolist(),
limit=top_k
)
return [
{
"chunk_id": p.payload["chunk_id"],
"parent_id": p.payload["parent_id"],
"text": p.payload["text"],
"score": p.score
}
for p in res
]
Step 2: RRF 融合
def rrf_fusion(
results_lists: List[List[dict]],
k: int = 60,
top_n: int = 50
) -> List[dict]:
"""Reciprocal Rank Fusion. k=60 是 Elastic 推荐值"""
fused_scores = {}
fused_docs = {}
for results in results_lists:
for rank, doc in enumerate(results, start=1):
parent_id = doc["parent_id"]
score = 1.0 / (k + rank)
if parent_id not in fused_scores:
fused_scores[parent_id] = 0
fused_docs[parent_id] = doc
fused_scores[parent_id] += score
sorted_ids = sorted(fused_scores, key=fused_scores.get, reverse=True)
return [
{**fused_docs[pid], "rrf_score": fused_scores[pid]}
for pid in sorted_ids[:top_n]
]
Step 3: Cross-Encoder 重排
from FlagEmbedding import FlagReranker
reranker = FlagReranker("BAAI/bge-reranker-v2-m3", use_fp16=True)
def rerank(
query: str,
candidates: List[dict],
top_k: int = 5
) -> List[dict]:
pairs = [[query, c["text"]] for c in candidates]
scores = reranker.compute_score(pairs, normalize=True)
for c, s in zip(candidates, scores):
c["rerank_score"] = float(s)
return sorted(candidates, key=lambda x: x["rerank_score"], reverse=True)[:top_k]
串起来
def search(query: str, final_top_k: int = 5) -> List[dict]:
# 双路召回
bm25_results = retrieve_bm25(query, top_k=50)
dense_results = retrieve_dense(query, top_k=50)
# 关键:BM25 召回的是 child chunk,要先合并到 parent
bm25_parents = merge_to_parent(bm25_results)
# RRF 融合
fused = rrf_fusion([bm25_parents, dense_results], top_n=30)
# Cross-Encoder 精排
reranked = rerank(query, fused, top_k=final_top_k)
return reranked
def merge_to_parent(child_results: List[dict]) -> List[dict]:
"""把 child chunk 按 parent_id 聚合,分数取 max"""
parents = {}
for c in child_results:
pid = c["parent_id"]
if pid not in parents or c["score"] > parents[pid]["score"]:
parents[pid] = {
"parent_id": pid,
"chunk_id": pid,
"text": fetch_parent_text(pid),
"score": c["score"]
}
return list(parents.values())
性能优化
Reranker batch 优化
默认实现一条条调,吞吐很低。50 条候选打成一个 batch:
def rerank_batched(query: str, candidates: List[dict], batch_size: int = 32):
all_scores = []
for i in range(0, len(candidates), batch_size):
batch = candidates[i:i+batch_size]
pairs = [[query, c["text"]] for c in batch]
scores = reranker.compute_score(pairs, batch_size=len(pairs), normalize=True)
all_scores.extend(scores)
return all_scores
吞吐对比(A100 单卡,bge-reranker-v2-m3):
| 策略 | QPS | 平均延迟 |
|---|---|---|
| 逐条调 | 12 | 84ms |
| batch=16 | 78 | 205ms(per batch) |
| batch=32 | 142 | 225ms |
| batch=64 | 218 | 294ms |
batch=32 是性价比甜点,单查询延迟从 84ms 涨到 60ms(按 50 候选 / 1.6 batch 算),但吞吐 12x。
ONNX 量化加速
from optimum.onnxruntime import ORTModelForSequenceClassification
ort_reranker = ORTModelForSequenceClassification.from_pretrained(
"BAAI/bge-reranker-v2-m3",
export=True,
provider="CUDAExecutionProvider"
)
ONNX FP16 比 PyTorch FP16 再快 30%,INT8 快 2 倍但准确率掉 1-2 分。生产环境建议用 ONNX FP16。
并行召回
BM25 和 Dense 是独立的,必须并行:
import asyncio
async def search_parallel(query: str, final_top_k: int = 5):
bm25_task = asyncio.create_task(asyncio.to_thread(retrieve_bm25, query, 50))
dense_task = asyncio.create_task(asyncio.to_thread(retrieve_dense, query, 50))
bm25_results, dense_results = await asyncio.gather(bm25_task, dense_task)
# 后续融合 + 重排
...
并行后召回阶段延迟从 50ms(串行 20+30)降到 30ms(max(20, 30))。
召回率评测
写一个简单评测脚本,量化你的 RAG 实际效果:
def evaluate(test_set: List[dict]) -> dict:
"""test_set 格式: [{query, relevant_parent_ids}]"""
recall_at_5 = 0
recall_at_10 = 0
mrr = 0
for case in test_set:
results = search(case["query"], final_top_k=10)
retrieved_pids = [r["parent_id"] for r in results]
relevant = set(case["relevant_parent_ids"])
# Recall@5
if relevant & set(retrieved_pids[:5]):
recall_at_5 += 1
# Recall@10
if relevant & set(retrieved_pids[:10]):
recall_at_10 += 1
# MRR
for rank, pid in enumerate(retrieved_pids, start=1):
if pid in relevant:
mrr += 1.0 / rank
break
n = len(test_set)
return {
"recall@5": recall_at_5 / n,
"recall@10": recall_at_10 / n,
"mrr": mrr / n
}
生产前必须有这种 benchmark,没有就是盲调。
常见踩坑
1. 中文分词
Elasticsearch 默认 standard 分词器对中文按字切,BM25 退化成单字匹配。必须装 IK 分词器:
docker exec -it es bin/elasticsearch-plugin install \
https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v8.13.0/elasticsearch-analysis-ik-8.13.0.zip
docker restart es
2. 向量维度不一致
bge-m3 是 1024 维,bge-large 是 1024 维,bge-small 是 384 维。如果中途换模型必须重建 Qdrant 集合,否则 dimension mismatch。
3. Reranker 输入超长
bge-reranker-v2-m3 最大输入 8192 token,但实际上超过 2048 性能下降。建议候选文本截断到 1500 token 内。
4. RRF 的 k 值
k=60 是经验值。如果你只有两路召回 + 候选数都很小(小于 20),可以试 k=30 让排名差距更敏感。但绝大多数场景 k=60 不用调。
5. Cross-Encoder 缓存
同一 query 在短时间内可能多次出现(用户刷新、A/B 测试),可以加 Redis 缓存:query+chunk_id 作为 key,rerank_score 作为 value,TTL 1 小时。命中率高的场景能省 50% 算力。
总结:什么时候不需要三级级联
不是所有 RAG 都要这么重。以下场景可以简化:
- 文档量小于 1 万:单 BM25 或单向量就够,不用融合
- 查询都是结构化短关键词:只用 BM25
- 延迟敏感(小于 100ms):去掉 Cross-Encoder
- 预算极紧:可以用免费的 Cohere Rerank 试用版替代自建
但只要你的 RAG 项目要做严肃业务(客服、内部知识、合规问答),三级级联几乎是性价比最高的稳态架构。从朴素 RAG 升级到这个架构,召回率从 60% 跳到 90% 的体验,比换任何”更大向量模型”都要明显。