import os import json from datetime import datetime from typing import List, Dict, Any try: import pymysql except Exception as exc: pymysql = None _MYSQL_IMPORT_ERROR = str(exc) def get_mysql_config() -> Dict[str, Any]: return { "host": os.getenv("MYSQL_HOST", "127.0.0.1"), "port": int(os.getenv("MYSQL_PORT", "3306")), "user": os.getenv("MYSQL_USER", "root"), "password": os.getenv("MYSQL_PASSWORD", "123456"), "database": os.getenv("MYSQL_DATABASE", "arbitration_system"), "charset": "utf8mb4" } def get_mysql_connection(use_database: bool = True): if pymysql is None: raise RuntimeError(f"MySQL驱动不可用: {_MYSQL_IMPORT_ERROR}") cfg = get_mysql_config() if not use_database: cfg = {k: v for k, v in cfg.items() if k != "database"} return pymysql.connect(**cfg) def index_exists(cursor, database: str, table: str, index_name: str) -> bool: cursor.execute( """ SELECT 1 FROM information_schema.statistics WHERE table_schema=%s AND table_name=%s AND index_name=%s LIMIT 1 """, (database, table, index_name) ) return cursor.fetchone() is not None def init_case_db() -> None: cfg = get_mysql_config() conn = get_mysql_connection(use_database=False) try: cursor = conn.cursor() cursor.execute(f"CREATE DATABASE IF NOT EXISTS `{cfg['database']}` CHARACTER SET utf8mb4") conn.commit() finally: conn.close() conn = get_mysql_connection(use_database=True) try: cursor = conn.cursor() cursor.execute( """ CREATE TABLE IF NOT EXISTS cases ( id BIGINT PRIMARY KEY AUTO_INCREMENT, case_id VARCHAR(255), summary_text TEXT, case_profile_json LONGTEXT, dispute_points_json LONGTEXT, law_results_json LONGTEXT, evidence_results_json LONGTEXT, final_judgement_json LONGTEXT, embedding_json LONGTEXT, created_at DATETIME ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """ ) if not index_exists(cursor, cfg["database"], "cases", "idx_cases_case_id"): cursor.execute("CREATE INDEX idx_cases_case_id ON cases(case_id)") cursor.execute( """ CREATE TABLE IF NOT EXISTS case_management ( id BIGINT PRIMARY KEY AUTO_INCREMENT, case_id VARCHAR(255) UNIQUE, title VARCHAR(255), description TEXT, status VARCHAR(50), stage VARCHAR(50), updated_at DATETIME ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """ ) if not index_exists(cursor, cfg["database"], "case_management", "idx_case_management_case_id"): cursor.execute("CREATE INDEX idx_case_management_case_id ON case_management(case_id)") conn.commit() finally: conn.close() def store_case_record( case_id: str, summary_text: str, case_profile: Dict[str, Any], dispute_points: List[str], law_results: Dict[str, Any], evidence_results: Dict[str, Any], final_judgement: Dict[str, Any], embedding: List[float] ) -> None: conn = get_mysql_connection(use_database=True) try: cursor = conn.cursor() cursor.execute( """ INSERT INTO cases ( case_id, summary_text, case_profile_json, dispute_points_json, law_results_json, evidence_results_json, final_judgement_json, embedding_json, created_at ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( case_id, summary_text, json.dumps(case_profile, ensure_ascii=False), json.dumps(dispute_points, ensure_ascii=False), json.dumps(law_results, ensure_ascii=False), json.dumps(evidence_results, ensure_ascii=False), json.dumps(final_judgement, ensure_ascii=False), json.dumps(embedding, ensure_ascii=False), datetime.utcnow() ) ) conn.commit() finally: conn.close() def fetch_similar_cases(embedding: List[float], top_k: int = 3) -> List[Dict[str, Any]]: conn = get_mysql_connection(use_database=True) try: cursor = conn.cursor() cursor.execute( """ SELECT case_id, summary_text, final_judgement_json, embedding_json FROM cases WHERE embedding_json IS NOT NULL AND embedding_json != '' """ ) rows = cursor.fetchall() finally: conn.close() scored = [] for case_id, summary_text, final_judgement_json, embedding_json in rows: try: vec = json.loads(embedding_json) except Exception: vec = [] scored.append( { "case_id": case_id, "summary_text": summary_text, "final_judgement_json": final_judgement_json, "embedding": vec } ) return scored def parse_case_description(description: str) -> Dict[str, Any]: if not description or not isinstance(description, str): return {} try: data = json.loads(description) return data if isinstance(data, dict) else {} except Exception: return {} def normalize_case_description(case_id: str, description: str) -> str: if not case_id: return description existing = fetch_case_management(case_id) existing_desc = existing.get("description", "") if existing else "" existing_data = parse_case_description(existing_desc) new_data = parse_case_description(description) if existing_data.get("materials") and not new_data.get("materials"): desc_text = description or existing_data.get("description", "") merged = {"description": desc_text, "materials": existing_data.get("materials", {})} return json.dumps(merged, ensure_ascii=False) return description def upsert_case_management(case_id: str, title: str, description: str, status: str, stage: str) -> Dict[str, Any]: conn = get_mysql_connection(use_database=True) try: cursor = conn.cursor() description = normalize_case_description(case_id, description) cursor.execute( """ INSERT INTO case_management (case_id, title, description, status, stage, updated_at) VALUES (%s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE title=VALUES(title), description=VALUES(description), status=VALUES(status), stage=VALUES(stage), updated_at=VALUES(updated_at) """, (case_id, title, description, status, stage, datetime.utcnow()) ) conn.commit() finally: conn.close() return {"case_id": case_id, "title": title, "description": description, "status": status, "stage": stage} def update_case_materials(case_id: str, materials: Dict[str, Any]) -> None: if not case_id: return existing = fetch_case_management(case_id) existing_desc = existing.get("description", "") if existing else "" existing_data = parse_case_description(existing_desc) description_text = existing_desc if not existing_data else existing_data.get("description", "") payload = { "description": description_text, "materials": materials or {} } conn = get_mysql_connection(use_database=True) try: cursor = conn.cursor() cursor.execute( """ UPDATE case_management SET description=%s, updated_at=%s WHERE case_id=%s """, (json.dumps(payload, ensure_ascii=False), datetime.utcnow(), case_id) ) conn.commit() finally: conn.close() def list_case_management() -> List[Dict[str, Any]]: conn = get_mysql_connection(use_database=True) try: cursor = conn.cursor() cursor.execute( """ SELECT case_id, title, description, status, stage, updated_at FROM case_management ORDER BY updated_at DESC """ ) rows = cursor.fetchall() finally: conn.close() results = [] for case_id, title, description, status, stage, updated_at in rows: results.append( { "case_id": case_id, "title": title, "description": description, "status": status, "stage": stage, "updated_at": updated_at.isoformat() if updated_at else "" } ) return results def fetch_case_management(case_id: str) -> Dict[str, Any]: if not case_id: return {} conn = get_mysql_connection(use_database=True) try: cursor = conn.cursor() cursor.execute( """ SELECT case_id, title, description, status, stage, updated_at FROM case_management WHERE case_id=%s LIMIT 1 """, (case_id,) ) row = cursor.fetchone() finally: conn.close() if not row: return {} case_id, title, description, status, stage, updated_at = row return { "case_id": case_id, "title": title, "description": description or "", "status": status, "stage": stage, "updated_at": updated_at.isoformat() if updated_at else "" } def fetch_case_record(case_id: str) -> Dict[str, Any]: if not case_id: return {} conn = get_mysql_connection(use_database=True) try: cursor = conn.cursor() cursor.execute( """ SELECT summary_text, case_profile_json, dispute_points_json, law_results_json, evidence_results_json, final_judgement_json, embedding_json, created_at FROM cases WHERE case_id=%s ORDER BY created_at DESC LIMIT 1 """, (case_id,) ) row = cursor.fetchone() finally: conn.close() if not row: return {} ( summary_text, case_profile_json, dispute_points_json, law_results_json, evidence_results_json, final_judgement_json, embedding_json, created_at ) = row def parse_json(value: str, fallback): if not value: return fallback try: parsed = json.loads(value) return parsed if parsed is not None else fallback except Exception: return fallback return { "summary_text": summary_text or "", "case_profile": parse_json(case_profile_json, {}), "dispute_points": parse_json(dispute_points_json, []), "law_results": parse_json(law_results_json, {}), "evidence_results": parse_json(evidence_results_json, {}), "final_judgement": parse_json(final_judgement_json, {}), "embedding": parse_json(embedding_json, []), "created_at": created_at.isoformat() if created_at else "" } def delete_case_management(case_id: str) -> None: conn = get_mysql_connection(use_database=True) try: cursor = conn.cursor() cursor.execute("DELETE FROM case_management WHERE case_id=%s", (case_id,)) conn.commit() finally: conn.close()