import os import time import json import numpy as np import faiss from openai import OpenAI from text2vec import SentenceModel from langchain_ollama import ChatOllama from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnableLambda import config.config # 向量存储目录结构 VECTOR_STORE_BASE = config.config.VECTOR_STORE_BASE MODEL_PATH = config.config.MODEL_PATH # 加载文本向量化模型 def load_embedding_model(device="cuda"): """加载text2vec-base-chinese模型""" print(f"正在加载向量化模型: text2vec-base-chinese") print(f"使用设备: {device}") return SentenceModel(MODEL_PATH, device=device) # 生成三个相似问题变体 def generate_question_variants_deepseek_api(question): """生成三个不同的问题变体""" client = OpenAI(api_key=config.config.DEEPSEEK_API, base_url="https://api.deepseek.com") system_template = """ 你是一个法律问题转译专家。你的任务是将用户的自然语言法律问题转译为三个不同但相关的专业法律表述。 遵循以下原则: 1. 第一个变体:提取核心法律概念,使用最规范的法律术语重新表述 2. 第二个变体:从不同角度或法律分支解读问题 3. 第三个变体:扩展问题范围,考虑相关联的法律情况 所有变体都应该: - 使用专业法律术语 - 保持简洁明了 - 直指法律要点 - 去除无关信息 请按以下JSON格式输出三个变体: { "variant_1": "第一个变体", "variant_2": "第二个变体", "variant_3": "第三个变体" } 只输出JSON格式的结果,不要包含任何额外的解释或文本。 """ response = client.chat.completions.create( model="deepseek-chat", messages=[ {"role": "system", "content": system_template}, {"role": "user", "content": question}, ], stream=False ) try: # 解析JSON响应 variants_text = response.choices[0].message.content variants = json.loads(variants_text) return variants except json.JSONDecodeError: # 如果返回的不是有效JSON,尝试提取变体 print(f"警告:API返回的不是有效JSON,尝试手动解析:\n{variants_text}") # 简单回退方案 return { "variant_1": question, "variant_2": question, "variant_3": question } # 向量检索相关函数 def load_vector_store(level_dir): """加载指定层级的向量存储""" store_dir = os.path.join(VECTOR_STORE_BASE, level_dir) index_path = os.path.join(store_dir, "index") if not os.path.exists(f"{index_path}_index"): raise FileNotFoundError(f"向量索引文件不存在: {index_path}_index") index = faiss.read_index(f"{index_path}_index") # 加载向量、文本和元数据 vectors = np.load(f"{index_path}_vectors.npy") # texts 现在包含的是原始完整文本(包含"第xx条 "前缀) texts = np.load(f"{index_path}_texts.npy", allow_pickle=True) metadata = np.load(f"{index_path}_metadata.npy", allow_pickle=True) print(f"成功加载{level_dir}向量库: {len(vectors)}条记录") return { "index": index, "vectors": vectors, "texts": texts, "metadata": metadata } def search_in_level(query_embedding, level_dir, top_k=10, store=None): """在指定层级中搜索,返回top_k个结果""" if store is None: store = load_vector_store(level_dir) # 搜索 distances, indices = store["index"].search(query_embedding.reshape(1, -1), top_k) results = [] for i, idx in enumerate(indices[0]): if idx != -1: # Faiss返回-1表示找不到足够的结果 results.append({ "text": store["texts"][idx], "metadata": store["metadata"][idx], "score": float(1.0 - distances[0][i]), # 将距离转换为相似度分数 "index": int(idx), "level": level_dir }) return results def rank_final_results(results, original_query_embedding, top_k=5): """对所有结果重新排序,返回top_k个最相关的结果""" # 提取所有文本并计算与查询的相似度 texts = [item["text"] for item in results] # 加载模型 embedding_model = load_embedding_model() # 计算文本向量 embeddings = embedding_model.encode(texts) # 计算相似度 similarities = np.dot(embeddings, original_query_embedding.T) / ( np.linalg.norm(embeddings, axis=1) * np.linalg.norm(original_query_embedding) ) # 更新相似度分数 for i, item in enumerate(results): item["score"] = float(similarities[i]) # 先按相似度排序,选出前20个结果 sorted_by_similarity = sorted(results, key=lambda x: x["score"], reverse=True)[:20] # 去重 - 基于文本内容 unique_results = [] seen_texts = set() for item in sorted_by_similarity: if item["text"] not in seen_texts: seen_texts.add(item["text"]) unique_results.append(item) if len(unique_results) >= top_k: break # 对筛选后的top_k个不重复结果按层级优先级排序 level_priority = {"level_1": 1, "level_2": 2, "level_3": 3} final_results = sorted(unique_results, key=lambda x: level_priority.get(x["level"], 999)) return final_results def hierarchical_search_for_variant(variant_query, variant_name, embedding_model, stores, l1_results=10, l2_sub_searches=3, final_results=5): """为单个变体执行分层检索""" print(f"处理{variant_name}: {variant_query}") # 对变体进行向量化 query_embedding = embedding_model.encode(variant_query) # 在level_1中搜索 level_1_results = search_in_level(query_embedding, "level_1", l1_results, stores["level_1"]) # 收集所有结果 all_results = [] all_results.extend(level_1_results) # # 对level_1结果在level_2中搜索 # level_2_results = [] # for l1_item in level_1_results: # l1_text_embedding = embedding_model.encode(l1_item["text"]) # l2_items = search_in_level(l1_text_embedding, "level_2", l2_sub_searches, stores["level_2"]) # level_2_results.extend(l2_items) # # all_results.extend(level_2_results) # 对所有结果重新排序,返回最相关的前final_results个 variant_results = rank_final_results(all_results, query_embedding, final_results) # 添加变体信息 for item in variant_results: item["variant"] = variant_name item["variant_query"] = variant_query return variant_results def multi_variant_search(user_query, variants_per_query=3, final_results=15, top_display=5): """执行多变体的向量检索""" print(f"开始处理用户查询: {user_query}") # 1. 生成三个查询变体 variants = generate_question_variants_deepseek_api(user_query) print(f"生成的三个变体:") for k, v in variants.items(): print(f"{k}: {v}") # 2. 加载向量化模型 embedding_model = load_embedding_model() # 3. 原始查询的向量化 (用于最终排序) original_query_embedding = embedding_model.encode(user_query) # 4. 预加载所有向量库 stores = { "level_1": load_vector_store("level_1"), # "level_2": load_vector_store("level_2"), # "level_3": load_vector_store("level_3") } # 5. 对每个变体执行分层检索 all_variant_results = [] for variant_key, variant_query in variants.items(): variant_results = hierarchical_search_for_variant( variant_query, variant_key, embedding_model, stores, l1_results=10, # 每个变体在level_1中搜索的结果数 l2_sub_searches=3, # 每个level_1结果在level_2中搜索的结果数 final_results=5 # 每个变体返回的最终结果数 ) all_variant_results.extend(variant_results) print(f"所有变体检索结果总量: {len(all_variant_results)}条") # 6. 汇总所有变体的结果,并按与原始查询的相似度排序 merged_results = [] seen_texts = set() # 去重 for item in all_variant_results: if item["text"] not in seen_texts: seen_texts.add(item["text"]) merged_results.append(item) # 按原始查询相似度重新排序 for item in merged_results: text_embedding = embedding_model.encode(item["text"]) similarity = np.dot(text_embedding, original_query_embedding) / ( np.linalg.norm(text_embedding) * np.linalg.norm(original_query_embedding) ) item["original_score"] = float(similarity) # 按原始查询相似度排序,选出top结果 final_results_list = sorted(merged_results, key=lambda x: x["original_score"], reverse=True)[:final_results] # 获取前5条结果,但按照层级优先级排序 level_priority = {"level_1": 1, "level_2": 2, "level_3": 3} top_display_results = sorted(final_results_list[:top_display], key=lambda x: level_priority.get(x["level"], 999)) return { "original_query": user_query, "variants": variants, "all_results": final_results_list, "top_results": top_display_results } def main(): while True: user_query = input("\n请输入您的法律问题 (输入'q'退出): ") if user_query.lower() == 'q': break start_time = time.time() results = multi_variant_search(user_query) print("\n检索结果:") print(f"原始问题: {results['original_query']}") print(f"问题变体:") for k, v in results['variants'].items(): print(f" {k}: {v}") print(f"\n总共找到 {len(results['all_results'])} 条相关法律条文") print(f"耗时: {time.time() - start_time:.2f}秒") # 先展示所有15条结果 print("\n所有检索结果:") for i, item in enumerate(results['all_results']): print(f"\n[{i+1}] 相似度: {item['original_score']:.4f} - 层级: {item['level']} - 变体: {item['variant']}") print(f"法律ID: {item['metadata'].get('law_id', '未知')}") print(f"内容: {item['text']}") # 再展示按层级排序的前5条 print("\n\n按层级优先级排序的前5条结果:") for i, item in enumerate(results['top_results']): print(f"\n[{i+1}] 相似度: {item['original_score']:.4f} - 层级: {item['level']} - 变体: {item['variant']}") print(f"法律ID: {item['metadata'].get('law_id', '未知')}") print(f"内容: {item['text']}") if __name__ == "__main__": main()