| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- import os
- import time
- import json
- import numpy as np
- import faiss
- from text2vec import SentenceModel
- from openai import OpenAI
- from langchain_core.prompts import ChatPromptTemplate
- from langchain_core.output_parsers import StrOutputParser
- from langchain_core.runnables import RunnableLambda
- from langchain_openai import ChatOpenAI
- import config.config
- # 向量存储目录结构
- VECTOR_STORE_BASE = config.config.VECTOR_STORE_BASE
- MODEL_PATH = config.config.MODEL_PATH
- # DeepSeek API配置
- DEEPSEEK_API_KEY = config.config.DEEPSEEK_API # 请替换为您的实际API密钥
- DEEPSEEK_BASE_URL = "https://api.deepseek.com"
- # 加载文本向量化模型
- def load_embedding_model(device="cuda"):
- """加载text2vec-base-chinese模型"""
- print(f"正在加载向量化模型: text2vec-base-chinese")
- print(f"使用设备: {device}")
- return SentenceModel(MODEL_PATH, device=device)
- # 转译用户问题的链
- def build_question_translation_chain():
- """构建用于转译用户问题的LLM链"""
- llm = ChatOpenAI(
- api_key=DEEPSEEK_API_KEY,
- base_url=DEEPSEEK_BASE_URL,
- model="deepseek-chat",
- temperature=0.3,
- )
-
- system_template = """
- 你是一个法律问题转译专家。你的任务是将用户的自然语言法律问题转译为更加专业、简洁、直接的形式,以便于向量检索系统能够找到最相关的法律条文。
-
- 遵循以下原则:
- 1. 提取问题中的关键法律概念和术语
- 2. 使用法律专业术语重新表述问题
- 3. 去除无关信息,保留核心法律问题
- 4. 确保转译后的问题简洁明了,直指法律要点
-
- 请直接输出转译后的问题,不要包含额外解释。
- """
-
- prompt = ChatPromptTemplate(
- [
- ("system", system_template),
- ("human", "{question}"),
- ]
- )
-
- return prompt | llm | StrOutputParser()
- # 使用DeepSeek API直接调用
- def translate_question_with_deepseek(question):
- """使用DeepSeek API直接调用模型进行问题转译"""
- client = OpenAI(api_key=DEEPSEEK_API_KEY, base_url=DEEPSEEK_BASE_URL)
-
- system_template = """
- 你是一个法律问题转译专家。你的任务是将用户的自然语言法律问题转译为更加专业、简洁、直接的形式,以便于向量检索系统能够找到最相关的法律条文。
-
- 遵循以下原则:
- 1. 提取问题中的关键法律概念和术语
- 2. 使用法律专业术语重新表述问题
- 3. 去除无关信息,保留核心法律问题
- 4. 确保转译后的问题简洁明了,直指法律要点
-
- 请直接输出转译后的问题,不要包含额外解释。
- """
-
- response = client.chat.completions.create(
- model="deepseek-chat",
- messages=[
- {"role": "system", "content": system_template},
- {"role": "user", "content": question},
- ],
- temperature=0.3,
- stream=False
- )
-
- return response.choices[0].message.content
- # 向量检索相关函数
- def load_vector_store(level_dir):
- """加载指定层级的向量存储"""
- store_dir = os.path.join(VECTOR_STORE_BASE, level_dir)
- index_path = os.path.join(store_dir, "index")
-
- if not os.path.exists(f"{index_path}_index"):
- raise FileNotFoundError(f"向量索引文件不存在: {index_path}_index")
-
- index = faiss.read_index(f"{index_path}_index")
-
- # 加载向量、文本和元数据
- vectors = np.load(f"{index_path}_vectors.npy")
-
- # texts 现在包含的是原始完整文本(包含"第xx条 "前缀)
- texts = np.load(f"{index_path}_texts.npy", allow_pickle=True)
- metadata = np.load(f"{index_path}_metadata.npy", allow_pickle=True)
-
- print(f"成功加载{level_dir}向量库: {len(vectors)}条记录")
-
- return {
- "index": index,
- "vectors": vectors,
- "texts": texts,
- "metadata": metadata
- }
- def search_in_level(query_embedding, level_dir, top_k=10, store=None):
- """在指定层级中搜索,返回top_k个结果"""
- if store is None:
- store = load_vector_store(level_dir)
-
- # 搜索
- distances, indices = store["index"].search(query_embedding.reshape(1, -1), top_k)
-
- results = []
- for i, idx in enumerate(indices[0]):
- if idx != -1: # Faiss返回-1表示找不到足够的结果
- results.append({
- "text": store["texts"][idx],
- "metadata": store["metadata"][idx],
- "score": float(1.0 - distances[0][i]), # 将距离转换为相似度分数
- "index": int(idx),
- "level": level_dir
- })
-
- return results
- def rank_final_results(results, query_embedding, top_k=5):
- """对所有结果重新排序,返回top_k个最相关的结果"""
- # 提取所有文本并计算与查询的相似度
- texts = [item["text"] for item in results]
-
- # 加载模型
- embedding_model = load_embedding_model()
-
- # 计算文本向量
- embeddings = embedding_model.encode(texts)
-
- # 计算相似度
- similarities = np.dot(embeddings, query_embedding.T) / (
- np.linalg.norm(embeddings, axis=1) * np.linalg.norm(query_embedding)
- )
-
- # 更新相似度分数
- for i, item in enumerate(results):
- item["score"] = float(similarities[i])
-
- # 先按相似度排序,选出前20个结果
- sorted_by_similarity = sorted(results, key=lambda x: x["score"], reverse=True)[:20]
-
- # 去重 - 基于文本内容
- unique_results = []
- seen_texts = set()
-
- for item in sorted_by_similarity:
- if item["text"] not in seen_texts:
- seen_texts.add(item["text"])
- unique_results.append(item)
- if len(unique_results) >= top_k:
- break
-
- # 对筛选后的top_k个不重复结果按层级优先级排序
- level_priority = {"level_1": 1, "level_2": 2, "level_3": 3}
- final_results = sorted(unique_results, key=lambda x: level_priority.get(x["level"], 999))
-
- return final_results
- def hierarchical_search(user_query, l1_results=10, l2_sub_searches=3, l3_sub_searches=1, final_results=5):
- """执行三层次的向量检索"""
- print(f"开始处理用户查询: {user_query}")
-
- # 1. 转译用户查询 - 使用DeepSeek API
- translated_query = translate_question_with_deepseek(user_query)
- print(f"转译后的查询: {translated_query}")
-
- # 2. 加载向量化模型
- embedding_model = load_embedding_model()
-
- # 3. 对转译后的查询进行向量化
- query_embedding = embedding_model.encode(translated_query)
-
- # 4. 加载各层级向量库
- level_1_store = load_vector_store("level_1")
- level_2_store = load_vector_store("level_2")
- level_3_store = load_vector_store("level_3")
-
- # 5. 在level_1中搜索前l1_results个结果
- level_1_results = search_in_level(query_embedding, "level_1", l1_results, level_1_store)
- print(f"一级检索结果: {len(level_1_results)}条")
-
- # 6. 对每个level_1结果在level_2中搜索
- all_results = []
- all_results.extend(level_1_results)
-
- level_2_results = []
- for l1_item in level_1_results:
- # 用level_1中的文本创建查询向量
- l1_text_embedding = embedding_model.encode(l1_item["text"])
- # 在level_2中查找相关内容
- l2_items = search_in_level(l1_text_embedding, "level_2", l2_sub_searches, level_2_store)
- level_2_results.extend(l2_items)
-
- print(f"二级检索结果: {len(level_2_results)}条")
- all_results.extend(level_2_results)
-
- # 7. 对每个level_2结果在level_3中搜索
- # level_3_results = []
- # for l2_item in level_2_results:
- # # 用level_2中的文本创建查询向量
- # l2_text_embedding = embedding_model.encode(l2_item["text"])
- # # 在level_3中查找相关内容
- # l3_items = search_in_level(l2_text_embedding, "level_3", l3_sub_searches, level_3_store)
- # level_3_results.extend(l3_items)
-
- # print(f"二级检索结果: {len(level_3_results)}条")
- # all_results.extend(level_3_results)
-
- print(f"所有层级检索结果总量: {len(all_results)}条")
-
- # 8. 对所有结果重新排序,选取最相关的前final_results个
- final_results = rank_final_results(all_results, query_embedding, final_results)
- print(f"最终筛选结果: {len(final_results)}条")
-
- return {
- "original_query": user_query,
- "translated_query": translated_query,
- "results": final_results
- }
- def main():
-
- while True:
- user_query = input("\n请输入您的法律问题 (输入'q'退出): ")
- if user_query.lower() == 'q':
- break
-
- start_time = time.time()
- results = hierarchical_search(user_query)
-
- print("\n检索结果:")
- print(f"原始问题: {results['original_query']}")
- print(f"转译问题: {results['translated_query']}")
- print(f"耗时: {time.time() - start_time:.2f}秒")
-
- for i, item in enumerate(results['results']):
- print(f"\n[{i+1}] 相似度: {item['score']:.4f} - 层级: {item['level']}")
- print(f"法律ID: {item['metadata'].get('law_id', '未知')}")
- print(f"内容: {item['text']}") # 这里输出的是原始完整文本
-
- if __name__ == "__main__":
- main()
|