搜索引擎最核心的数据结构——把"文档到词"的关系,翻转为"词到文档"的映射
想象你在图书馆找书。正向索引像是一本本翻开每本书看里面有什么内容; 而倒排索引则像是图书馆后面的卡片目录柜—— 你查一个关键词,它直接告诉你哪些书里包含这个词。
一步步观察倒排索引是如何从原始文本中自动构建出来的
基于倒排索引可以实现多种查询方式,各有适用场景
通过对多个词项的 Posting List 做集合运算来获取结果。简单高效,返回结果无排序。
def search_boolean(self, query: str):
# 解析查询为词项列表
terms = self.tokenize(query)
result_set = None
for term in terms:
entry = self.index.get(term)
doc_ids = {p.doc_id for p in entry.postings} if entry else set()
if result_set is None:
result_set = doc_ids # 第一个词:直接赋值
else:
result_set &= doc_ids # AND: 取交集 ∩
# result_set |= doc_ids # OR: 取并集 ∪
# result_set -= doc_ids # NOT: 取差集 −
return sorted(result_set)不仅要求文档包含所有词,还要求这些词在文档中相邻出现且顺序一致。 这需要利用 Posting List 中记录的位置信息 (positions)。
def search_phrase(self, phrase: str):
tokens = self.tokenize(phrase)
terms = [t[0] for t in tokens]
# 获取第一个词的候选文档及位置
candidates = {}
for posting in self.index[terms[0]].postings:
candidates[posting.doc_id] = [posting.positions]
# 逐个后续词做位置连续性过滤
for i in range(1, len(terms)):
term_pos = {p.doc_id: p.positions for p in self.index[terms[i]].postings}
new_candidates = {}
for doc_id, prev_lists in candidates.items():
if doc_id not in term_pos: continue
for prev_list in prev_lists:
last_pos = prev_list[-1]
for cp in term_pos[doc_id]:
if cp == last_pos + 1: # 关键!严格相邻
new_candidates.setdefault(doc_id, []).append(prev_list+[cp])
candidates = new_candidates
return sorted(candidates.keys())布尔查询只回答"是否匹配",而实际搜索需要按相关程度对结果打分排序。 经典方案是 TF-IDF,工业界标准是 BM25。
信息检索领域最重要的相关性评分算法,被 Elasticsearch、Lucene、SQLite FTS5 等广泛使用
自包含、可直接运行的倒排索引实现,含 BM25 评分、布尔查询、短语查询
@dataclass
class Posting:
"""倒排列表中的一条记录"""
doc_id: int # 文档ID
frequency: int = 0 # 词频 TF(在该文档中出现次数)
positions: List[int] # 出现位置列表(用于短语查询)
@dataclass
class TermEntry:
"""词典中每个词项的条目"""
term: str # 词项本身
df: int = 0 # 文档频率(多少个文档包含此词)
postings: List[Posting] # 倒排列表
@property
def idf(self) -> float:
# IDF = log(N / df) + 1 (加1避免零值)
return math.log(self._total_docs / self.df) + 1
@dataclass
class Document:
"""待索引的文档"""
doc_id: int
title: str
content: str
@property
def full_text(self) -> str:
return f"{self.title} {self.content}"class Tokenizer:
"""分词器:英文按单词,中文按字符,支持停用词过滤"""
STOP_WORDS = {'的', '是', '了', '在', 'the', 'a', 'an', 'is', 'are', ...}
def tokenize(self, text: str) -> List[Tuple[str, int]]:
"""返回 [(token, position), ...] """
text = text.lower()
tokens = []
pos = 0
pattern = re.compile(r'[a-z0-9_]+|[\u4e00-\u9fff]')
for match in pattern.finditer(text):
tok = match.group()
if tok not in self.STOP_WORDS:
tokens.append((tok, pos))
pos += 1
return tokens
class InvertedIndex:
def __init__(self, tokenizer=None):
self.tokenizer = tokenizer or Tokenizer()
self.index: Dict[str, TermEntry] = {} # 词典 → 词项条目
self.documents: Dict[int, Document] = {} # 文档存储
self.doc_count: int = 0
self.avg_doc_length: float = 0.0 # 平均文档长度(BM25用)
self.k1 = 1.5 # BM25参数
self.b = 0.75
def add_document(self, doc: Document):
"""添加文档到索引(核心构建逻辑)"""
self.documents[doc.doc_id] = doc
self.doc_count += 1
tokens = self.tokenizer.tokenize(doc.full_text)
# 统计每个词的出现位置
temp_index = defaultdict(list)
for token, pos in tokens:
temp_index[token].append(pos)
# 更新全局倒排索引
for token, positions in temp_index.items():
if token not in self.index:
self.index[token] = TermEntry(term=token, df=0, postings=[])
entry = self.index[token]
entry.df += 1
entry.postings.append(Posting(
doc_id=doc.doc_id,
frequency=len(positions),
positions=positions,
))
# 更新平均文档长度
total = self.avg_doc_length * (self.doc_count - 1) + len(tokens)
self.avg_doc_length = total / self.doc_countdef search_bm25(self, query: str, top_k: int = 10) -> List[Tuple[int, float]]:
"""
BM25 评分搜索
score(D,Q) = Σ IDF(qi) × [f(qi,D)×(k1+1)] / [f(qi,D) + k1×(1-b+b×|D|/avgdl)]
"""
query_terms = list(set([t[0] for t in self.tokenizer.tokenize(query)]))
scores: Dict[int, float] = defaultdict(float)
for term in query_terms:
if term not in self.index:
continue
entry = self.index[term]
idf = entry.idf # log(N/df) + 1
for posting in entry.postings:
doc_id = posting.doc_id
tf = posting.frequency
# 计算文档长度
doc_len = len(self.tokenizer.tokenize(
self.documents[doc_id].full_text))
# ★ BM25 TF归一化(词频饱和 + 长度归一化)
numerator = tf * (self.k1 + 1)
denominator = tf + self.k1 * (
1 - self.b + self.b * doc_len / max(self.avg_doc_length, 1)
)
tf_norm = numerator / denominator
scores[doc_id] += idf * tf_norm
# 按分数降序排列
return sorted(scores.items(), key=lambda x: x[1], reverse=True)[:top_k]# === 1. 准备文档集 ===
documents = [
Document(1, "搜索引擎原理",
"搜索引擎使用倒排索引来加速文本检索..."),
Document(2, "Python编程指南",
"Python是一种流行的编程语言..."),
Document(3, "数据库索引技术",
"数据库使用B+树索引优化查询性能..."),
Document(4, "AI Agent记忆系统",
"AI Agent的记忆系统需要高效检索机制..."),
]
# === 2. 构建索引 ===
indexer = InvertedIndex(Tokenizer(use_stop_words=True))
indexer.build_index(documents)
# === 3. 布尔搜索 ===
results = indexer.search_boolean("倒排 索引")
# → [1, 4]
# === 4. BM25 相关性搜索 ===
results = indexer.search_bm25("索引 查询 性能", top_k=5)
# → [(3, 3.82), (1, 2.15), (5, 1.87), ...]
for rank, (doc_id, score) in enumerate(results, 1):
print(f"#{rank} [{score:.4f}] 《{indexer.documents[doc_id].title}》")
# === 5. 精确短语查询 ===
results = indexer.search_phrase("倒排索引")
# → [1] (只有Doc#1中这两个字是紧挨着的)
# === 6. 查看统计 ===
print(indexer.get_statistics())
# {'总文档数': 4, '词典大小': 67, '总posting数': 142, ...}