import os import time import json import sys import numpy as np from openai import OpenAI import re import config.config # 直接导入模块,而不是通过包导入 current_dir = os.path.dirname(os.path.abspath(__file__)) sys.path.append(current_dir) import three_variant_retrieval_deepseek_api multi_variant_search = three_variant_retrieval_deepseek_api.multi_variant_search class HighLevelRetriever: def __init__(self): """初始化高层次检索器""" self.client = OpenAI(api_key=config.config.DEEPSEEK_API, base_url="https://api.deepseek.com") print("高层次检索器初始化完成") def decompose_query(self, user_query): """将复杂查询分解为多个原子查询""" system_template = """ 你是一个法律问题分析专家。现在需要你将一个复杂的法律问题分解成三个简单的原子问题。 请遵循以下原则: 1. 识别复杂问题中的多个法律概念或问题点 2. 将每个概念或问题点转化为一个独立的、简洁的原子问题 3. 确保原子问题涵盖原始复杂问题的所有关键方面 4. 每个原子问题应该是明确的、可搜索的 请输出JSON格式的结果: { "atomic_queries": [ { "query": "原子问题1", "aspect": "这个问题关注的法律方面" }, { "query": "原子问题2", "aspect": "这个问题关注的法律方面" }, { "query": "原子问题3", "aspect": "这个问题关注的法律方面" } ] } 只输出JSON格式的结果,不要包含任何额外的解释或文本。 """ try: print("正在分解查询...") response = self.client.chat.completions.create( model="deepseek-chat", messages=[ {"role": "system", "content": system_template}, {"role": "user", "content": user_query}, ], stream=False, timeout=60 # 增加超时时间 ) atomic_queries_text = response.choices[0].message.content print(f"API返回的分解结果: {atomic_queries_text[:200]}...") # 尝试提取JSON部分 json_match = re.search(r'```(?:json)?\s*(.*?)\s*```', atomic_queries_text, re.DOTALL) if json_match: atomic_queries_text = json_match.group(1) # 尝试加载JSON try: atomic_queries = json.loads(atomic_queries_text) if "atomic_queries" not in atomic_queries: raise ValueError("返回的JSON缺少'atomic_queries'字段") except: # 尝试进行第二次解析,寻找可能的JSON结构 json_pattern = r'({[\s\S]*})' match = re.search(json_pattern, atomic_queries_text) if match: try: atomic_queries = json.loads(match.group(1)) except: raise ValueError("无法解析有效的JSON结构") else: raise ValueError("无法找到JSON结构") # 确保至少返回了一个查询 if not atomic_queries.get("atomic_queries") or len(atomic_queries["atomic_queries"]) == 0: raise ValueError("没有生成有效的原子查询") return atomic_queries except Exception as e: print(f"查询分解失败: {e}") print("使用备选分解方法...") # 备选分解方法:简单地将原始查询拆分为几个关键词查询 words = user_query.split() # 如果查询很短,直接使用原始查询 if len(words) <= 5: return { "atomic_queries": [ {"query": user_query, "aspect": "主要问题"} ] } # 否则,尝试提取几个子查询 return { "atomic_queries": [ {"query": user_query, "aspect": "完整问题"}, {"query": " ".join(words[:len(words)//2]), "aspect": "问题前半部分"}, {"query": " ".join(words[len(words)//2:]), "aspect": "问题后半部分"} ] } def validate_results(self, user_query, all_results): """验证检索结果是否足以回答用户查询""" if not all_results: print("没有找到任何法律条文,验证失败") return False, {"is_sufficient": False, "missing_aspects": ["未找到相关法律条文"], "explanation": "未找到相关法律条文"} # 提取所有检索到的法律条文文本 result_texts = [item["text"] for item in all_results] combined_results = "\n\n".join(result_texts[:15]) # 限制文本长度,只取前15条 system_template = """ 你是一个法律验证专家。你的任务是分析检索到的法律条文是否足以回答用户的原始问题。 请评估以下几点: 1. 检索到的法律条文是否涵盖了用户问题的所有方面 2. 是否存在问题中提及但法律条文中未涉及的关键概念 3. 是否需要额外的法律信息来完整回答问题 请输出JSON格式的结果: { "is_sufficient": true/false, "missing_aspects": ["缺失方面1", "缺失方面2", ...], "explanation": "简要解释为什么结果不足以回答问题或为什么足够" } 如果条文确实完全足够回答问题,请将"is_sufficient"设置为true,否则设为false。 只输出JSON格式的结果,不要包含任何额外的解释或文本。 """ try: print("开始验证检索结果...") response = self.client.chat.completions.create( model="deepseek-chat", messages=[ {"role": "system", "content": system_template}, {"role": "user", "content": f"用户问题:\n{user_query}\n\n检索到的法律条文:\n{combined_results}"}, ], stream=False, timeout=60 # 增加超时时间 ) validation_text = response.choices[0].message.content print(f"API返回的验证结果: {validation_text[:200]}...") # 尝试提取JSON部分 json_match = re.search(r'```(?:json)?\s*(.*?)\s*```', validation_text, re.DOTALL) if json_match: validation_text = json_match.group(1) # 尝试解析JSON try: validation = json.loads(validation_text) except: # 尝试进行第二次解析,寻找可能的JSON结构 json_pattern = r'({[\s\S]*})' match = re.search(json_pattern, validation_text) if match: try: validation = json.loads(match.group(1)) except: print("无法解析有效的验证结果JSON") # 如果解析失败,基于结果数量和文本进行简单判断 if len(all_results) >= 15: return True, {"is_sufficient": True, "missing_aspects": [], "explanation": "已检索到足够的法律条文"} elif "足够" in validation_text or "充分" in validation_text: return True, {"is_sufficient": True, "missing_aspects": [], "explanation": "API返回值判断为足够"} else: return False, {"is_sufficient": False, "missing_aspects": ["API返回解析失败"], "explanation": "无法解析API返回值"} else: # 如果找不到JSON结构,基于结果数量进行判断 if len(all_results) >= 15: return True, {"is_sufficient": True, "missing_aspects": [], "explanation": "已检索到足够的法律条文"} else: return False, {"is_sufficient": False, "missing_aspects": ["API返回解析失败"], "explanation": "无法解析API返回值"} # 确保验证结果包含所需字段 if "is_sufficient" not in validation: print("验证结果缺少'is_sufficient'字段,检查其他信息...") # 如果关键字段缺失,使用启发式方法判断 if len(all_results) >= 15: return True, {"is_sufficient": True, "missing_aspects": [], "explanation": "已检索到大量法律条文"} elif validation.get("missing_aspects") and len(validation.get("missing_aspects", [])) > 0: return False, {"is_sufficient": False, "missing_aspects": validation.get("missing_aspects", ["未指明的缺失"]), "explanation": validation.get("explanation", "需要额外信息")} else: return False, {"is_sufficient": False, "missing_aspects": ["验证结果不完整"], "explanation": "需要额外信息"} return validation.get("is_sufficient", False), validation except Exception as e: print(f"验证过程出错: {e}") # 出错时的回退策略:根据结果数量进行简单判断 if len(all_results) >= 20: return True, {"is_sufficient": True, "missing_aspects": [], "explanation": "已检索到大量法律条文,假定足够"} elif len(all_results) >= 10: return False, {"is_sufficient": False, "missing_aspects": ["可能需要更多条文"], "explanation": "验证失败,需要更多信息"} else: return False, {"is_sufficient": False, "missing_aspects": ["条文数量不足"], "explanation": "检索到的条文数量较少,需要补充"} def generate_supplementary_queries(self, user_query, all_results, validation_result): """根据验证结果生成补充查询""" # 提取所有检索到的法律条文文本 result_texts = [item["text"] for item in all_results] combined_results = "\n\n".join(result_texts[:8]) # 限制文本长度 missing_aspects = validation_result.get("missing_aspects", []) if not missing_aspects: missing_aspects = ["未明确指出的缺失信息"] missing_aspects_text = "\n".join([f"- {aspect}" for aspect in missing_aspects]) system_template = """ 你是一个法律问题补充专家。基于用户的原始问题和已检索到的法律条文,你需要生成补充查询来获取缺失的信息。 请考虑以下因素: 1. 已检索到的法律条文中缺少哪些关键信息 2. 哪些额外的法律概念需要被检索 3. 如何构建精确的补充查询以获取这些缺失信息 请输出JSON格式的结果: { "supplementary_queries": [ { "query": "补充查询1", "purpose": "这个查询的目的是什么" }, { "query": "补充查询2", "purpose": "这个查询的目的是什么" }, ... ] } 请确保生成2-3个具体的补充查询,以获取缺失的法律信息。 只输出JSON格式的结果,不要包含任何额外的解释或文本。 """ try: print("生成补充查询...") response = self.client.chat.completions.create( model="deepseek-chat", messages=[ {"role": "system", "content": system_template}, {"role": "user", "content": f"用户问题:\n{user_query}\n\n已检索到的法律条文:\n{combined_results}\n\n缺失的方面:\n{missing_aspects_text}"}, ], stream=False, timeout=60 # 增加超时时间 ) supplementary_text = response.choices[0].message.content print(f"API返回的补充查询结果: {supplementary_text[:200]}...") # 尝试提取JSON部分 json_match = re.search(r'```(?:json)?\s*(.*?)\s*```', supplementary_text, re.DOTALL) if json_match: supplementary_text = json_match.group(1) # 尝试解析JSON try: supplementary = json.loads(supplementary_text) supplementary_queries = supplementary.get("supplementary_queries", []) # 确保至少有一个补充查询 if not supplementary_queries: raise ValueError("没有生成补充查询") return supplementary_queries except: # 尝试进行第二次解析,寻找可能的JSON结构 json_pattern = r'({[\s\S]*})' match = re.search(json_pattern, supplementary_text) if match: try: supplementary = json.loads(match.group(1)) supplementary_queries = supplementary.get("supplementary_queries", []) if supplementary_queries: return supplementary_queries except: print("无法解析有效的补充查询JSON") # 如果仍然无法解析,使用备选方法 raise ValueError("无法解析补充查询JSON") except Exception as e: print(f"生成补充查询失败: {e}") # 根据缺失方面生成默认补充查询 default_queries = [] # 为每个缺失方面生成查询 for aspect in missing_aspects[:2]: # 最多使用两个缺失方面 default_queries.append({ "query": f"关于{aspect}的法律规定", "purpose": f"查找关于{aspect}的缺失信息" }) # 如果没有缺失方面或生成查询,添加通用查询 if not default_queries: words = user_query.split() if len(words) >= 4: # 使用问题的另一部分 half = len(words) // 2 default_queries.append({ "query": " ".join(words[:half]) + "法律规定", "purpose": "查找与问题前半部分相关的法律条文" }) default_queries.append({ "query": " ".join(words[half:]) + "法律依据", "purpose": "查找与问题后半部分相关的法律条文" }) else: # 简单问题,添加通用补充查询 default_queries.append({ "query": f"{user_query}的法律依据", "purpose": "查找相关法律依据" }) default_queries.append({ "query": f"{user_query}的相关规定", "purpose": "查找更多相关法律规定" }) return default_queries def search(self, user_query, max_iterations=3): # 恢复为3次迭代 """执行高层次搜索""" print(f"开始处理复杂查询: {user_query}") start_time = time.time() # 第一步:分解查询 decomposed = self.decompose_query(user_query) atomic_queries = decomposed.get("atomic_queries", []) print(f"将复杂查询分解为{len(atomic_queries)}个原子查询:") for i, query in enumerate(atomic_queries): print(f" [{i+1}] {query['query']} (关注点: {query['aspect']})") # 存储所有检索结果 all_results = [] # 记录搜索过程信息 search_logs = { "initial_decomposition": atomic_queries, "iterations": [] } # 对每个原子查询执行低层次检索 for atomic_query in atomic_queries: try: print(f"\n执行原子查询: {atomic_query['query']}") results = multi_variant_search(atomic_query['query']) all_results.extend(results['all_results']) except Exception as e: print(f"执行原子查询失败: {e}") continue # 去重 unique_results = [] seen_texts = set() for item in all_results: if item["text"] not in seen_texts: seen_texts.add(item["text"]) item["source"] = "初始查询" unique_results.append(item) # 如果没有找到任何结果,直接返回 if not unique_results: return { "original_query": user_query, "atomic_queries": atomic_queries, "results": [], "total_results": 0, "time_taken": time.time() - start_time, "search_logs": search_logs } all_results = unique_results # 迭代补充查询过程 iteration = 1 while iteration <= max_iterations and len(all_results) < 50: # 限制结果总数 print(f"\n开始第{iteration}轮验证和补充...") iteration_log = { "iteration": iteration, "results_before": len(all_results), "validation": {}, "supplementary_queries": [] } # 验证当前结果是否足够 try: is_sufficient, validation_result = self.validate_results(user_query, all_results) iteration_log["validation"] = validation_result if is_sufficient: print("验证通过,检索到的法律条文足以回答用户查询") search_logs["iterations"].append(iteration_log) break explanation = validation_result.get("explanation", "未提供原因") print(f"验证未通过: {explanation}") missing_aspects = validation_result.get("missing_aspects", []) if missing_aspects: print(f"缺失方面: {', '.join(missing_aspects)}") # 生成补充查询 supplementary_queries = self.generate_supplementary_queries(user_query, all_results, validation_result) iteration_log["supplementary_queries"] = supplementary_queries # 如果没有生成补充查询,退出循环 if not supplementary_queries: print("未生成补充查询,结束迭代") search_logs["iterations"].append(iteration_log) break print(f"生成{len(supplementary_queries)}个补充查询:") for i, query in enumerate(supplementary_queries): print(f" [{i+1}] {query['query']} (目的: {query['purpose']})") # 执行补充查询 supplementary_results = [] for supp_query in supplementary_queries: try: print(f"\n执行补充查询: {supp_query['query']}") results = multi_variant_search(supp_query['query']) for item in results['all_results']: item["source"] = f"补充查询 (轮次 {iteration})" item["purpose"] = supp_query['purpose'] supplementary_results.extend(results['all_results']) except Exception as e: print(f"执行补充查询失败: {e}") continue # 合并结果并去重 new_items_count = 0 for item in supplementary_results: if item["text"] not in seen_texts: seen_texts.add(item["text"]) all_results.append(item) new_items_count += 1 iteration_log["new_items_added"] = new_items_count iteration_log["results_after"] = len(all_results) search_logs["iterations"].append(iteration_log) print(f"第{iteration}轮补充查询添加了{new_items_count}条新的法律条文") # 如果没有添加新条文,增加一次额外的补充查询机会 if new_items_count == 0 and iteration < max_iterations: print("未找到新的法律条文,尝试更广泛的查询...") broad_query = f"{user_query}相关法律" try: print(f"\n执行广泛查询: {broad_query}") results = multi_variant_search(broad_query) for item in results['all_results']: item["source"] = f"广泛查询 (轮次 {iteration})" if item["text"] not in seen_texts: seen_texts.add(item["text"]) all_results.append(item) new_items_count += 1 print(f"广泛查询添加了{new_items_count}条新的法律条文") except Exception as e: print(f"执行广泛查询失败: {e}") iteration += 1 except Exception as e: print(f"第{iteration}轮验证和补充失败: {e}") search_logs["error"] = str(e) break # 按相关性排序 final_results = sorted(all_results, key=lambda x: x.get("original_score", 0), reverse=True) end_time = time.time() return { "original_query": user_query, "atomic_queries": atomic_queries, "results": final_results, "total_results": len(final_results), "time_taken": end_time - start_time, "search_logs": search_logs } def main(): try: retriever = HighLevelRetriever() while True: try: user_query = input("\n请输入您的复杂法律问题 (输入'q'退出): ") if user_query.lower() == 'q': break results = retriever.search(user_query) print(f"\n总共找到 {results['total_results']} 条相关法律条文") print(f"耗时: {results['time_taken']:.2f}秒") # 输出搜索日志 search_logs = results.get("search_logs", {}) iterations = search_logs.get("iterations", []) if iterations: print("\n搜索过程详情:") print(f"初始分解: {len(search_logs.get('initial_decomposition', []))}个原子查询") for iteration in iterations: print(f"第{iteration.get('iteration')}轮: 验证{'通过' if iteration.get('validation', {}).get('is_sufficient', False) else '未通过'}") if 'new_items_added' in iteration: print(f" 添加了{iteration.get('new_items_added')}条新法律条文") if results['total_results'] == 0: print("未找到相关法律条文。请尝试使用其他关键词或更详细的问题描述。") continue # 输出所有检索到的法律条文 print("\n检索到的法律条文:") for i, item in enumerate(results["results"]): print(f"\n[{i+1}] 相似度: {item.get('original_score', 0):.4f}") print(f"来源: {item.get('source', '未知')}") if "purpose" in item: print(f"目的: {item['purpose']}") print(f"法律ID: {item['metadata'].get('law_id', '未知')}") print(f"内容: {item['text']}") # 每10条暂停显示 if (i + 1) % 10 == 0 and i + 1 < len(results["results"]): try: continue_viewing = input("\n继续查看更多条文? (y/n): ") if continue_viewing.lower() != 'y': break except KeyboardInterrupt: print("\n显示中断") break except KeyboardInterrupt: print("\n操作已中断。您可以继续输入新的查询,或输入'q'退出。") continue except Exception as e: print(f"\n处理查询时出错: {e}") continue except Exception as e: print(f"程序出错: {e}") if __name__ == "__main__": main()