import os import time import json import shutil import re from text2vec import SentenceModel from langchain_core.documents import Document import numpy as np import torch import faiss import config.config # 向量存储目录结构 VECTOR_STORE_BASE = config.config.VECTOR_STORE_BASE FILE_STORAGE_BASE = config.config.FILE_STORAGE_BASE MODEL_PATH = config.config.MODEL_PATH # 层级目录列表 LEVEL_DIRS = ["level_1", "level_2", "level_3"] def extract_content_from_law(text): """ 从法律条文中提取具体内容 例如:"第三十二条 劳动者拒绝用人单位管理人员违章指挥、强令冒险作业的,不视为违反劳动合同。" 将提取为:"劳动者拒绝用人单位管理人员违章指挥、强令冒险作业的,不视为违反劳动合同。" """ # 使用正则表达式匹配"第xx条 "模式,其中xx可以是各种中文数字 pattern = r'^第[一二三四五六七八九十百千万零]+条\s+' # 替换匹配的内容为空字符串 content = re.sub(pattern, '', text) return content def load_json_documents(json_file_path: str): """加载并处理JSON文件,将其转换为Document对象列表""" print(f"正在加载JSON文件: {json_file_path}") try: with open(json_file_path, 'r', encoding='utf-8') as f: data = json.load(f) documents = [] for item in data: # JSON结构: {"law_id": "xxx", "text": "xxx"} text = item.get("text", "") # 提取具体内容,去掉"第xx条 "前缀 content = extract_content_from_law(text) # 确保所有元数据都被保存,包括原始text和law_id metadata = {k: v for k, v in item.items()} metadata["source"] = json_file_path # 添加源文件信息 metadata["original_text"] = text # 保存原始文本 doc = Document( page_content=content, # 只用提取后的具体内容进行向量化 metadata=metadata # 保存完整的元数据,包括完整的text和law_id ) documents.append(doc) print(f"成功从JSON文件加载 {len(documents)} 条记录") return documents except Exception as e: print(f"加载JSON文件失败: {str(e)}") return [] def load_py_documents(py_file_path: str): """ 加载并处理 law_list.py 文件,将其中的 docs 列表转换为 Document 对象列表。 假设文件内容格式为:docs = ["条文1", "条文2", ...] """ print(f"正在加载Python文件: {py_file_path}") try: # 读取文件内容并执行,提取 docs 变量 with open(py_file_path, 'r', encoding='utf-8') as f: file_content = f.read() # 创建一个安全的命名空间,通过 exec 提取 docs namespace = {} exec(file_content, namespace) docs = namespace.get('docs', []) if not isinstance(docs, list): print(f"文件中未找到列表变量 'docs' 或类型错误") return [] documents = [] base_name = os.path.splitext(os.path.basename(py_file_path))[0] # 文件名(不含扩展名) for idx, text in enumerate(docs): if not isinstance(text, str): continue # 生成 law_id,例如 law_list_0 law_id = f"{base_name}第{idx+1}条" # 提取具体内容,去掉"第xx条 "前缀 content = extract_content_from_law(text) metadata = { "law_id": law_id, "source": py_file_path, "original_text": text # 保存原始文本 } doc = Document( page_content=content, metadata=metadata ) documents.append(doc) print(f"成功从Python文件加载 {len(documents)} 条记录") return documents except Exception as e: print(f"加载Python文件失败: {str(e)}") return [] def clear_directory(directory): """清空目录中的所有文件,但保留目录结构""" if os.path.exists(directory): print(f"清空目录: {directory}") for filename in os.listdir(directory): file_path = os.path.join(directory, filename) try: if os.path.isfile(file_path): os.unlink(file_path) print(f" 已删除文件: {filename}") elif os.path.isdir(file_path): shutil.rmtree(file_path) print(f" 已删除子目录: {filename}") except Exception as e: print(f" 删除 {file_path} 失败: {e}") else: os.makedirs(directory, exist_ok=True) print(f"创建目录: {directory}") def create_vector_store(documents, save_dir): """创建向量数据库并保存到指定目录""" # 清空并重新创建目录 clear_directory(save_dir) save_file = os.path.join(save_dir, "index") print(f"开始创建向量库: {save_file}") print(f"使用模型: text2vec-base-chinese") print(f"文档数量: {len(documents)}") # 检查GPU是否可用 device = "cuda" if torch.cuda.is_available() else "cpu" print(f"使用设备: {device}") try: start_time = time.time() # 加载模型 model = SentenceModel(MODEL_PATH, device=device) # 提取文本内容 - 仅使用page_content(处理后的具体内容)进行向量化 texts = [doc.page_content for doc in documents] # 保存完整元数据,以便检索时能够还原所有字段 metadata = [doc.metadata for doc in documents] np.save(f"{save_file}_metadata.npy", metadata) # 同时保存原始的完整文本,用于最终展示 original_texts = [doc.metadata.get("original_text", doc.page_content) for doc in documents] np.save(f"{save_file}_texts.npy", original_texts) # 生成向量 embeddings = model.encode(texts) # 保存向量 np.save(f"{save_file}_vectors.npy", embeddings) # 创建Faiss索引 dimension = embeddings.shape[1] index = faiss.IndexFlatL2(dimension) index.add(embeddings) # 保存Faiss索引 faiss.write_index(index, f"{save_file}_index") print(f"\n向量化完成!耗时 {time.time() - start_time:.2f} 秒") print(f"向量维度:{dimension}") print(f"向量数量:{len(embeddings)}") print(f"数据保存位置:{save_file}") return True except Exception as e: print(f"向量化失败:{str(e)}") import traceback traceback.print_exc() return False def process_level_dir(level_dir): """处理指定层级目录中的JSON或Python文件""" source_dir = os.path.join(FILE_STORAGE_BASE, level_dir) target_dir = os.path.join(VECTOR_STORE_BASE, level_dir) print("=" * 50) print(f"开始处理 {level_dir} 目录下的文件") print(f"源目录: {source_dir}") print(f"目标目录: {target_dir}") # 确保源目录存在 if not os.path.exists(source_dir): print(f"源目录 {source_dir} 不存在,跳过处理") return False all_documents = [] # 1. 优先处理JSON文件(原有逻辑) json_files = [f for f in os.listdir(source_dir) if f.lower().endswith('.json')] for json_file in json_files: json_path = os.path.join(source_dir, json_file) docs = load_json_documents(json_path) all_documents.extend(docs) # 2. 如果没有JSON文件,尝试处理Python文件(如 law_list.py) if not json_files: py_files = [f for f in os.listdir(source_dir) if f.lower().endswith('.py')] for py_file in py_files: py_path = os.path.join(source_dir, py_file) docs = load_py_documents(py_path) all_documents.extend(docs) if not all_documents: print(f"目录 {source_dir} 中未找到有效文档,跳过处理") return False # 创建向量存储 success = create_vector_store(all_documents, target_dir) print(f"{level_dir} 处理完成!共处理 {len(all_documents)} 条记录") print("=" * 50) return success def main(): print("=" * 50) print("开始分层向量化处理...") print(f"文件存储基础目录: {FILE_STORAGE_BASE}") print(f"向量存储基础目录: {VECTOR_STORE_BASE}") print("=" * 50) # 确保向量存储基础目录存在 os.makedirs(VECTOR_STORE_BASE, exist_ok=True) success_count = 0 # 依次处理每个层级目录 for level_dir in LEVEL_DIRS: if process_level_dir(level_dir): success_count += 1 print("=" * 50) if success_count > 0: print(f"成功处理了 {success_count}/{len(LEVEL_DIRS)} 个层级目录") else: print("所有层级目录处理失败") print("=" * 50) if __name__ == "__main__": main()