| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- import os
- import time
- import json
- import shutil
- import re
- from text2vec import SentenceModel
- from langchain_core.documents import Document
- import numpy as np
- import torch
- import faiss
- import config.config
- # 向量存储目录结构
- VECTOR_STORE_BASE = config.config.VECTOR_STORE_BASE
- FILE_STORAGE_BASE = config.config.FILE_STORAGE_BASE
- MODEL_PATH = config.config.MODEL_PATH
- # 层级目录列表
- LEVEL_DIRS = ["level_1", "level_2", "level_3"]
- def extract_content_from_law(text):
- """
- 从法律条文中提取具体内容
- 例如:"第三十二条 劳动者拒绝用人单位管理人员违章指挥、强令冒险作业的,不视为违反劳动合同。"
- 将提取为:"劳动者拒绝用人单位管理人员违章指挥、强令冒险作业的,不视为违反劳动合同。"
- """
- # 使用正则表达式匹配"第xx条 "模式,其中xx可以是各种中文数字
- pattern = r'^第[一二三四五六七八九十百千万零]+条\s+'
- # 替换匹配的内容为空字符串
- content = re.sub(pattern, '', text)
- return content
- def load_json_documents(json_file_path: str):
- """加载并处理JSON文件,将其转换为Document对象列表"""
- print(f"正在加载JSON文件: {json_file_path}")
- try:
- with open(json_file_path, 'r', encoding='utf-8') as f:
- data = json.load(f)
- documents = []
- for item in data:
- # JSON结构: {"law_id": "xxx", "text": "xxx"}
- text = item.get("text", "")
- # 提取具体内容,去掉"第xx条 "前缀
- content = extract_content_from_law(text)
- # 确保所有元数据都被保存,包括原始text和law_id
- metadata = {k: v for k, v in item.items()}
- metadata["source"] = json_file_path # 添加源文件信息
- metadata["original_text"] = text # 保存原始文本
- doc = Document(
- page_content=content, # 只用提取后的具体内容进行向量化
- metadata=metadata # 保存完整的元数据,包括完整的text和law_id
- )
- documents.append(doc)
- print(f"成功从JSON文件加载 {len(documents)} 条记录")
- return documents
- except Exception as e:
- print(f"加载JSON文件失败: {str(e)}")
- return []
- def load_py_documents(py_file_path: str):
- """
- 加载并处理 law_list.py 文件,将其中的 docs 列表转换为 Document 对象列表。
- 假设文件内容格式为:docs = ["条文1", "条文2", ...]
- """
- print(f"正在加载Python文件: {py_file_path}")
- try:
- # 读取文件内容并执行,提取 docs 变量
- with open(py_file_path, 'r', encoding='utf-8') as f:
- file_content = f.read()
- # 创建一个安全的命名空间,通过 exec 提取 docs
- namespace = {}
- exec(file_content, namespace)
- docs = namespace.get('docs', [])
- if not isinstance(docs, list):
- print(f"文件中未找到列表变量 'docs' 或类型错误")
- return []
- documents = []
- base_name = os.path.splitext(os.path.basename(py_file_path))[0] # 文件名(不含扩展名)
- for idx, text in enumerate(docs):
- if not isinstance(text, str):
- continue
- # 生成 law_id,例如 law_list_0
- law_id = f"{base_name}第{idx+1}条"
- # 提取具体内容,去掉"第xx条 "前缀
- content = extract_content_from_law(text)
- metadata = {
- "law_id": law_id,
- "source": py_file_path,
- "original_text": text # 保存原始文本
- }
- doc = Document(
- page_content=content,
- metadata=metadata
- )
- documents.append(doc)
- print(f"成功从Python文件加载 {len(documents)} 条记录")
- return documents
- except Exception as e:
- print(f"加载Python文件失败: {str(e)}")
- return []
- def clear_directory(directory):
- """清空目录中的所有文件,但保留目录结构"""
- if os.path.exists(directory):
- print(f"清空目录: {directory}")
- for filename in os.listdir(directory):
- file_path = os.path.join(directory, filename)
- try:
- if os.path.isfile(file_path):
- os.unlink(file_path)
- print(f" 已删除文件: {filename}")
- elif os.path.isdir(file_path):
- shutil.rmtree(file_path)
- print(f" 已删除子目录: {filename}")
- except Exception as e:
- print(f" 删除 {file_path} 失败: {e}")
- else:
- os.makedirs(directory, exist_ok=True)
- print(f"创建目录: {directory}")
- def create_vector_store(documents, save_dir):
- """创建向量数据库并保存到指定目录"""
- # 清空并重新创建目录
- clear_directory(save_dir)
- save_file = os.path.join(save_dir, "index")
- print(f"开始创建向量库: {save_file}")
- print(f"使用模型: text2vec-base-chinese")
- print(f"文档数量: {len(documents)}")
- # 检查GPU是否可用
- device = "cuda" if torch.cuda.is_available() else "cpu"
- print(f"使用设备: {device}")
- try:
- start_time = time.time()
- # 加载模型
- model = SentenceModel(MODEL_PATH, device=device)
- # 提取文本内容 - 仅使用page_content(处理后的具体内容)进行向量化
- texts = [doc.page_content for doc in documents]
- # 保存完整元数据,以便检索时能够还原所有字段
- metadata = [doc.metadata for doc in documents]
- np.save(f"{save_file}_metadata.npy", metadata)
- # 同时保存原始的完整文本,用于最终展示
- original_texts = [doc.metadata.get("original_text", doc.page_content) for doc in documents]
- np.save(f"{save_file}_texts.npy", original_texts)
- # 生成向量
- embeddings = model.encode(texts)
- # 保存向量
- np.save(f"{save_file}_vectors.npy", embeddings)
- # 创建Faiss索引
- dimension = embeddings.shape[1]
- index = faiss.IndexFlatL2(dimension)
- index.add(embeddings)
- # 保存Faiss索引
- faiss.write_index(index, f"{save_file}_index")
- print(f"\n向量化完成!耗时 {time.time() - start_time:.2f} 秒")
- print(f"向量维度:{dimension}")
- print(f"向量数量:{len(embeddings)}")
- print(f"数据保存位置:{save_file}")
- return True
- except Exception as e:
- print(f"向量化失败:{str(e)}")
- import traceback
- traceback.print_exc()
- return False
- def process_level_dir(level_dir):
- """处理指定层级目录中的JSON或Python文件"""
- source_dir = os.path.join(FILE_STORAGE_BASE, level_dir)
- target_dir = os.path.join(VECTOR_STORE_BASE, level_dir)
- print("=" * 50)
- print(f"开始处理 {level_dir} 目录下的文件")
- print(f"源目录: {source_dir}")
- print(f"目标目录: {target_dir}")
- # 确保源目录存在
- if not os.path.exists(source_dir):
- print(f"源目录 {source_dir} 不存在,跳过处理")
- return False
- all_documents = []
- # 1. 优先处理JSON文件(原有逻辑)
- json_files = [f for f in os.listdir(source_dir) if f.lower().endswith('.json')]
- for json_file in json_files:
- json_path = os.path.join(source_dir, json_file)
- docs = load_json_documents(json_path)
- all_documents.extend(docs)
- # 2. 如果没有JSON文件,尝试处理Python文件(如 law_list.py)
- if not json_files:
- py_files = [f for f in os.listdir(source_dir) if f.lower().endswith('.py')]
- for py_file in py_files:
- py_path = os.path.join(source_dir, py_file)
- docs = load_py_documents(py_path)
- all_documents.extend(docs)
- if not all_documents:
- print(f"目录 {source_dir} 中未找到有效文档,跳过处理")
- return False
- # 创建向量存储
- success = create_vector_store(all_documents, target_dir)
- print(f"{level_dir} 处理完成!共处理 {len(all_documents)} 条记录")
- print("=" * 50)
- return success
- def main():
- print("=" * 50)
- print("开始分层向量化处理...")
- print(f"文件存储基础目录: {FILE_STORAGE_BASE}")
- print(f"向量存储基础目录: {VECTOR_STORE_BASE}")
- print("=" * 50)
- # 确保向量存储基础目录存在
- os.makedirs(VECTOR_STORE_BASE, exist_ok=True)
- success_count = 0
- # 依次处理每个层级目录
- for level_dir in LEVEL_DIRS:
- if process_level_dir(level_dir):
- success_count += 1
- print("=" * 50)
- if success_count > 0:
- print(f"成功处理了 {success_count}/{len(LEVEL_DIRS)} 个层级目录")
- else:
- print("所有层级目录处理失败")
- print("=" * 50)
- if __name__ == "__main__":
- main()
|