| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561 |
- import os
- import time
- import json
- import sys
- import numpy as np
- from openai import OpenAI
- import re
- import config.config
- # 直接导入模块,而不是通过包导入
- current_dir = os.path.dirname(os.path.abspath(__file__))
- sys.path.append(current_dir)
- import three_variant_retrieval_deepseek_api
- multi_variant_search = three_variant_retrieval_deepseek_api.multi_variant_search
- class HighLevelRetriever:
- def __init__(self):
- """初始化高层次检索器"""
- self.client = OpenAI(api_key=config.config.DEEPSEEK_API, base_url="https://api.deepseek.com")
- print("高层次检索器初始化完成")
-
- def decompose_query(self, user_query):
- """将复杂查询分解为多个原子查询"""
- system_template = """
- 你是一个法律问题分析专家。现在需要你将一个复杂的法律问题分解成三个简单的原子问题。
-
- 请遵循以下原则:
- 1. 识别复杂问题中的多个法律概念或问题点
- 2. 将每个概念或问题点转化为一个独立的、简洁的原子问题
- 3. 确保原子问题涵盖原始复杂问题的所有关键方面
- 4. 每个原子问题应该是明确的、可搜索的
-
- 请输出JSON格式的结果:
- {
- "atomic_queries": [
- {
- "query": "原子问题1",
- "aspect": "这个问题关注的法律方面"
- },
- {
- "query": "原子问题2",
- "aspect": "这个问题关注的法律方面"
- },
- {
- "query": "原子问题3",
- "aspect": "这个问题关注的法律方面"
- }
- ]
- }
-
- 只输出JSON格式的结果,不要包含任何额外的解释或文本。
- """
-
- try:
- print("正在分解查询...")
- response = self.client.chat.completions.create(
- model="deepseek-chat",
- messages=[
- {"role": "system", "content": system_template},
- {"role": "user", "content": user_query},
- ],
- stream=False,
- timeout=60 # 增加超时时间
- )
-
- atomic_queries_text = response.choices[0].message.content
- print(f"API返回的分解结果: {atomic_queries_text[:200]}...")
-
- # 尝试提取JSON部分
- json_match = re.search(r'```(?:json)?\s*(.*?)\s*```', atomic_queries_text, re.DOTALL)
- if json_match:
- atomic_queries_text = json_match.group(1)
-
- # 尝试加载JSON
- try:
- atomic_queries = json.loads(atomic_queries_text)
- if "atomic_queries" not in atomic_queries:
- raise ValueError("返回的JSON缺少'atomic_queries'字段")
- except:
- # 尝试进行第二次解析,寻找可能的JSON结构
- json_pattern = r'({[\s\S]*})'
- match = re.search(json_pattern, atomic_queries_text)
- if match:
- try:
- atomic_queries = json.loads(match.group(1))
- except:
- raise ValueError("无法解析有效的JSON结构")
- else:
- raise ValueError("无法找到JSON结构")
-
- # 确保至少返回了一个查询
- if not atomic_queries.get("atomic_queries") or len(atomic_queries["atomic_queries"]) == 0:
- raise ValueError("没有生成有效的原子查询")
-
- return atomic_queries
- except Exception as e:
- print(f"查询分解失败: {e}")
- print("使用备选分解方法...")
-
- # 备选分解方法:简单地将原始查询拆分为几个关键词查询
- words = user_query.split()
-
- # 如果查询很短,直接使用原始查询
- if len(words) <= 5:
- return {
- "atomic_queries": [
- {"query": user_query, "aspect": "主要问题"}
- ]
- }
-
- # 否则,尝试提取几个子查询
- return {
- "atomic_queries": [
- {"query": user_query, "aspect": "完整问题"},
- {"query": " ".join(words[:len(words)//2]), "aspect": "问题前半部分"},
- {"query": " ".join(words[len(words)//2:]), "aspect": "问题后半部分"}
- ]
- }
-
- def validate_results(self, user_query, all_results):
- """验证检索结果是否足以回答用户查询"""
- if not all_results:
- print("没有找到任何法律条文,验证失败")
- return False, {"is_sufficient": False, "missing_aspects": ["未找到相关法律条文"], "explanation": "未找到相关法律条文"}
-
- # 提取所有检索到的法律条文文本
- result_texts = [item["text"] for item in all_results]
- combined_results = "\n\n".join(result_texts[:15]) # 限制文本长度,只取前15条
-
- system_template = """
- 你是一个法律验证专家。你的任务是分析检索到的法律条文是否足以回答用户的原始问题。
-
- 请评估以下几点:
- 1. 检索到的法律条文是否涵盖了用户问题的所有方面
- 2. 是否存在问题中提及但法律条文中未涉及的关键概念
- 3. 是否需要额外的法律信息来完整回答问题
-
- 请输出JSON格式的结果:
- {
- "is_sufficient": true/false,
- "missing_aspects": ["缺失方面1", "缺失方面2", ...],
- "explanation": "简要解释为什么结果不足以回答问题或为什么足够"
- }
-
- 如果条文确实完全足够回答问题,请将"is_sufficient"设置为true,否则设为false。
- 只输出JSON格式的结果,不要包含任何额外的解释或文本。
- """
-
- try:
- print("开始验证检索结果...")
- response = self.client.chat.completions.create(
- model="deepseek-chat",
- messages=[
- {"role": "system", "content": system_template},
- {"role": "user", "content": f"用户问题:\n{user_query}\n\n检索到的法律条文:\n{combined_results}"},
- ],
- stream=False,
- timeout=60 # 增加超时时间
- )
-
- validation_text = response.choices[0].message.content
- print(f"API返回的验证结果: {validation_text[:200]}...")
-
- # 尝试提取JSON部分
- json_match = re.search(r'```(?:json)?\s*(.*?)\s*```', validation_text, re.DOTALL)
- if json_match:
- validation_text = json_match.group(1)
-
- # 尝试解析JSON
- try:
- validation = json.loads(validation_text)
- except:
- # 尝试进行第二次解析,寻找可能的JSON结构
- json_pattern = r'({[\s\S]*})'
- match = re.search(json_pattern, validation_text)
- if match:
- try:
- validation = json.loads(match.group(1))
- except:
- print("无法解析有效的验证结果JSON")
- # 如果解析失败,基于结果数量和文本进行简单判断
- if len(all_results) >= 15:
- return True, {"is_sufficient": True, "missing_aspects": [], "explanation": "已检索到足够的法律条文"}
- elif "足够" in validation_text or "充分" in validation_text:
- return True, {"is_sufficient": True, "missing_aspects": [], "explanation": "API返回值判断为足够"}
- else:
- return False, {"is_sufficient": False, "missing_aspects": ["API返回解析失败"], "explanation": "无法解析API返回值"}
- else:
- # 如果找不到JSON结构,基于结果数量进行判断
- if len(all_results) >= 15:
- return True, {"is_sufficient": True, "missing_aspects": [], "explanation": "已检索到足够的法律条文"}
- else:
- return False, {"is_sufficient": False, "missing_aspects": ["API返回解析失败"], "explanation": "无法解析API返回值"}
-
- # 确保验证结果包含所需字段
- if "is_sufficient" not in validation:
- print("验证结果缺少'is_sufficient'字段,检查其他信息...")
- # 如果关键字段缺失,使用启发式方法判断
- if len(all_results) >= 15:
- return True, {"is_sufficient": True, "missing_aspects": [], "explanation": "已检索到大量法律条文"}
- elif validation.get("missing_aspects") and len(validation.get("missing_aspects", [])) > 0:
- return False, {"is_sufficient": False, "missing_aspects": validation.get("missing_aspects", ["未指明的缺失"]), "explanation": validation.get("explanation", "需要额外信息")}
- else:
- return False, {"is_sufficient": False, "missing_aspects": ["验证结果不完整"], "explanation": "需要额外信息"}
-
- return validation.get("is_sufficient", False), validation
- except Exception as e:
- print(f"验证过程出错: {e}")
- # 出错时的回退策略:根据结果数量进行简单判断
- if len(all_results) >= 20:
- return True, {"is_sufficient": True, "missing_aspects": [], "explanation": "已检索到大量法律条文,假定足够"}
- elif len(all_results) >= 10:
- return False, {"is_sufficient": False, "missing_aspects": ["可能需要更多条文"], "explanation": "验证失败,需要更多信息"}
- else:
- return False, {"is_sufficient": False, "missing_aspects": ["条文数量不足"], "explanation": "检索到的条文数量较少,需要补充"}
-
- def generate_supplementary_queries(self, user_query, all_results, validation_result):
- """根据验证结果生成补充查询"""
- # 提取所有检索到的法律条文文本
- result_texts = [item["text"] for item in all_results]
- combined_results = "\n\n".join(result_texts[:8]) # 限制文本长度
-
- missing_aspects = validation_result.get("missing_aspects", [])
- if not missing_aspects:
- missing_aspects = ["未明确指出的缺失信息"]
-
- missing_aspects_text = "\n".join([f"- {aspect}" for aspect in missing_aspects])
-
- system_template = """
- 你是一个法律问题补充专家。基于用户的原始问题和已检索到的法律条文,你需要生成补充查询来获取缺失的信息。
-
- 请考虑以下因素:
- 1. 已检索到的法律条文中缺少哪些关键信息
- 2. 哪些额外的法律概念需要被检索
- 3. 如何构建精确的补充查询以获取这些缺失信息
-
- 请输出JSON格式的结果:
- {
- "supplementary_queries": [
- {
- "query": "补充查询1",
- "purpose": "这个查询的目的是什么"
- },
- {
- "query": "补充查询2",
- "purpose": "这个查询的目的是什么"
- },
- ...
- ]
- }
-
- 请确保生成2-3个具体的补充查询,以获取缺失的法律信息。
- 只输出JSON格式的结果,不要包含任何额外的解释或文本。
- """
-
- try:
- print("生成补充查询...")
- response = self.client.chat.completions.create(
- model="deepseek-chat",
- messages=[
- {"role": "system", "content": system_template},
- {"role": "user", "content": f"用户问题:\n{user_query}\n\n已检索到的法律条文:\n{combined_results}\n\n缺失的方面:\n{missing_aspects_text}"},
- ],
- stream=False,
- timeout=60 # 增加超时时间
- )
-
- supplementary_text = response.choices[0].message.content
- print(f"API返回的补充查询结果: {supplementary_text[:200]}...")
-
- # 尝试提取JSON部分
- json_match = re.search(r'```(?:json)?\s*(.*?)\s*```', supplementary_text, re.DOTALL)
- if json_match:
- supplementary_text = json_match.group(1)
-
- # 尝试解析JSON
- try:
- supplementary = json.loads(supplementary_text)
- supplementary_queries = supplementary.get("supplementary_queries", [])
-
- # 确保至少有一个补充查询
- if not supplementary_queries:
- raise ValueError("没有生成补充查询")
-
- return supplementary_queries
- except:
- # 尝试进行第二次解析,寻找可能的JSON结构
- json_pattern = r'({[\s\S]*})'
- match = re.search(json_pattern, supplementary_text)
- if match:
- try:
- supplementary = json.loads(match.group(1))
- supplementary_queries = supplementary.get("supplementary_queries", [])
- if supplementary_queries:
- return supplementary_queries
- except:
- print("无法解析有效的补充查询JSON")
-
- # 如果仍然无法解析,使用备选方法
- raise ValueError("无法解析补充查询JSON")
- except Exception as e:
- print(f"生成补充查询失败: {e}")
- # 根据缺失方面生成默认补充查询
- default_queries = []
-
- # 为每个缺失方面生成查询
- for aspect in missing_aspects[:2]: # 最多使用两个缺失方面
- default_queries.append({
- "query": f"关于{aspect}的法律规定",
- "purpose": f"查找关于{aspect}的缺失信息"
- })
-
- # 如果没有缺失方面或生成查询,添加通用查询
- if not default_queries:
- words = user_query.split()
- if len(words) >= 4:
- # 使用问题的另一部分
- half = len(words) // 2
- default_queries.append({
- "query": " ".join(words[:half]) + "法律规定",
- "purpose": "查找与问题前半部分相关的法律条文"
- })
- default_queries.append({
- "query": " ".join(words[half:]) + "法律依据",
- "purpose": "查找与问题后半部分相关的法律条文"
- })
- else:
- # 简单问题,添加通用补充查询
- default_queries.append({
- "query": f"{user_query}的法律依据",
- "purpose": "查找相关法律依据"
- })
- default_queries.append({
- "query": f"{user_query}的相关规定",
- "purpose": "查找更多相关法律规定"
- })
-
- return default_queries
-
- def search(self, user_query, max_iterations=3): # 恢复为3次迭代
- """执行高层次搜索"""
- print(f"开始处理复杂查询: {user_query}")
- start_time = time.time()
-
- # 第一步:分解查询
- decomposed = self.decompose_query(user_query)
- atomic_queries = decomposed.get("atomic_queries", [])
-
- print(f"将复杂查询分解为{len(atomic_queries)}个原子查询:")
- for i, query in enumerate(atomic_queries):
- print(f" [{i+1}] {query['query']} (关注点: {query['aspect']})")
-
- # 存储所有检索结果
- all_results = []
-
- # 记录搜索过程信息
- search_logs = {
- "initial_decomposition": atomic_queries,
- "iterations": []
- }
-
- # 对每个原子查询执行低层次检索
- for atomic_query in atomic_queries:
- try:
- print(f"\n执行原子查询: {atomic_query['query']}")
- results = multi_variant_search(atomic_query['query'])
- all_results.extend(results['all_results'])
- except Exception as e:
- print(f"执行原子查询失败: {e}")
- continue
-
- # 去重
- unique_results = []
- seen_texts = set()
- for item in all_results:
- if item["text"] not in seen_texts:
- seen_texts.add(item["text"])
- item["source"] = "初始查询"
- unique_results.append(item)
-
- # 如果没有找到任何结果,直接返回
- if not unique_results:
- return {
- "original_query": user_query,
- "atomic_queries": atomic_queries,
- "results": [],
- "total_results": 0,
- "time_taken": time.time() - start_time,
- "search_logs": search_logs
- }
-
- all_results = unique_results
-
- # 迭代补充查询过程
- iteration = 1
- while iteration <= max_iterations and len(all_results) < 50: # 限制结果总数
- print(f"\n开始第{iteration}轮验证和补充...")
- iteration_log = {
- "iteration": iteration,
- "results_before": len(all_results),
- "validation": {},
- "supplementary_queries": []
- }
-
- # 验证当前结果是否足够
- try:
- is_sufficient, validation_result = self.validate_results(user_query, all_results)
- iteration_log["validation"] = validation_result
-
- if is_sufficient:
- print("验证通过,检索到的法律条文足以回答用户查询")
- search_logs["iterations"].append(iteration_log)
- break
-
- explanation = validation_result.get("explanation", "未提供原因")
- print(f"验证未通过: {explanation}")
-
- missing_aspects = validation_result.get("missing_aspects", [])
- if missing_aspects:
- print(f"缺失方面: {', '.join(missing_aspects)}")
-
- # 生成补充查询
- supplementary_queries = self.generate_supplementary_queries(user_query, all_results, validation_result)
- iteration_log["supplementary_queries"] = supplementary_queries
-
- # 如果没有生成补充查询,退出循环
- if not supplementary_queries:
- print("未生成补充查询,结束迭代")
- search_logs["iterations"].append(iteration_log)
- break
-
- print(f"生成{len(supplementary_queries)}个补充查询:")
- for i, query in enumerate(supplementary_queries):
- print(f" [{i+1}] {query['query']} (目的: {query['purpose']})")
-
- # 执行补充查询
- supplementary_results = []
- for supp_query in supplementary_queries:
- try:
- print(f"\n执行补充查询: {supp_query['query']}")
- results = multi_variant_search(supp_query['query'])
- for item in results['all_results']:
- item["source"] = f"补充查询 (轮次 {iteration})"
- item["purpose"] = supp_query['purpose']
- supplementary_results.extend(results['all_results'])
- except Exception as e:
- print(f"执行补充查询失败: {e}")
- continue
-
- # 合并结果并去重
- new_items_count = 0
- for item in supplementary_results:
- if item["text"] not in seen_texts:
- seen_texts.add(item["text"])
- all_results.append(item)
- new_items_count += 1
-
- iteration_log["new_items_added"] = new_items_count
- iteration_log["results_after"] = len(all_results)
- search_logs["iterations"].append(iteration_log)
-
- print(f"第{iteration}轮补充查询添加了{new_items_count}条新的法律条文")
-
- # 如果没有添加新条文,增加一次额外的补充查询机会
- if new_items_count == 0 and iteration < max_iterations:
- print("未找到新的法律条文,尝试更广泛的查询...")
- broad_query = f"{user_query}相关法律"
- try:
- print(f"\n执行广泛查询: {broad_query}")
- results = multi_variant_search(broad_query)
- for item in results['all_results']:
- item["source"] = f"广泛查询 (轮次 {iteration})"
- if item["text"] not in seen_texts:
- seen_texts.add(item["text"])
- all_results.append(item)
- new_items_count += 1
- print(f"广泛查询添加了{new_items_count}条新的法律条文")
- except Exception as e:
- print(f"执行广泛查询失败: {e}")
-
- iteration += 1
-
- except Exception as e:
- print(f"第{iteration}轮验证和补充失败: {e}")
- search_logs["error"] = str(e)
- break
-
- # 按相关性排序
- final_results = sorted(all_results, key=lambda x: x.get("original_score", 0), reverse=True)
-
- end_time = time.time()
-
- return {
- "original_query": user_query,
- "atomic_queries": atomic_queries,
- "results": final_results,
- "total_results": len(final_results),
- "time_taken": end_time - start_time,
- "search_logs": search_logs
- }
- def main():
- try:
- retriever = HighLevelRetriever()
-
- while True:
- try:
- user_query = input("\n请输入您的复杂法律问题 (输入'q'退出): ")
- if user_query.lower() == 'q':
- break
-
- results = retriever.search(user_query)
-
- print(f"\n总共找到 {results['total_results']} 条相关法律条文")
- print(f"耗时: {results['time_taken']:.2f}秒")
-
- # 输出搜索日志
- search_logs = results.get("search_logs", {})
- iterations = search_logs.get("iterations", [])
- if iterations:
- print("\n搜索过程详情:")
- print(f"初始分解: {len(search_logs.get('initial_decomposition', []))}个原子查询")
- for iteration in iterations:
- print(f"第{iteration.get('iteration')}轮: 验证{'通过' if iteration.get('validation', {}).get('is_sufficient', False) else '未通过'}")
- if 'new_items_added' in iteration:
- print(f" 添加了{iteration.get('new_items_added')}条新法律条文")
-
- if results['total_results'] == 0:
- print("未找到相关法律条文。请尝试使用其他关键词或更详细的问题描述。")
- continue
-
- # 输出所有检索到的法律条文
- print("\n检索到的法律条文:")
- for i, item in enumerate(results["results"]):
- print(f"\n[{i+1}] 相似度: {item.get('original_score', 0):.4f}")
- print(f"来源: {item.get('source', '未知')}")
- if "purpose" in item:
- print(f"目的: {item['purpose']}")
- print(f"法律ID: {item['metadata'].get('law_id', '未知')}")
- print(f"内容: {item['text']}")
-
- # 每10条暂停显示
- if (i + 1) % 10 == 0 and i + 1 < len(results["results"]):
- try:
- continue_viewing = input("\n继续查看更多条文? (y/n): ")
- if continue_viewing.lower() != 'y':
- break
- except KeyboardInterrupt:
- print("\n显示中断")
- break
- except KeyboardInterrupt:
- print("\n操作已中断。您可以继续输入新的查询,或输入'q'退出。")
- continue
- except Exception as e:
- print(f"\n处理查询时出错: {e}")
- continue
- except Exception as e:
- print(f"程序出错: {e}")
- if __name__ == "__main__":
- main()
|