| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363 |
- import os
- import json
- from datetime import datetime
- from typing import List, Dict, Any
- try:
- import pymysql
- except Exception as exc:
- pymysql = None
- _MYSQL_IMPORT_ERROR = str(exc)
- def get_mysql_config() -> Dict[str, Any]:
- return {
- "host": os.getenv("MYSQL_HOST", "127.0.0.1"),
- "port": int(os.getenv("MYSQL_PORT", "3306")),
- "user": os.getenv("MYSQL_USER", "root"),
- "password": os.getenv("MYSQL_PASSWORD", "123456"),
- "database": os.getenv("MYSQL_DATABASE", "arbitration_system"),
- "charset": "utf8mb4"
- }
- def get_mysql_connection(use_database: bool = True):
- if pymysql is None:
- raise RuntimeError(f"MySQL驱动不可用: {_MYSQL_IMPORT_ERROR}")
- cfg = get_mysql_config()
- if not use_database:
- cfg = {k: v for k, v in cfg.items() if k != "database"}
- return pymysql.connect(**cfg)
- def index_exists(cursor, database: str, table: str, index_name: str) -> bool:
- cursor.execute(
- """
- SELECT 1
- FROM information_schema.statistics
- WHERE table_schema=%s AND table_name=%s AND index_name=%s
- LIMIT 1
- """,
- (database, table, index_name)
- )
- return cursor.fetchone() is not None
- def init_case_db() -> None:
- cfg = get_mysql_config()
- conn = get_mysql_connection(use_database=False)
- try:
- cursor = conn.cursor()
- cursor.execute(f"CREATE DATABASE IF NOT EXISTS `{cfg['database']}` CHARACTER SET utf8mb4")
- conn.commit()
- finally:
- conn.close()
- conn = get_mysql_connection(use_database=True)
- try:
- cursor = conn.cursor()
- cursor.execute(
- """
- CREATE TABLE IF NOT EXISTS cases (
- id BIGINT PRIMARY KEY AUTO_INCREMENT,
- case_id VARCHAR(255),
- summary_text TEXT,
- case_profile_json LONGTEXT,
- dispute_points_json LONGTEXT,
- law_results_json LONGTEXT,
- evidence_results_json LONGTEXT,
- final_judgement_json LONGTEXT,
- embedding_json LONGTEXT,
- created_at DATETIME
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
- """
- )
- if not index_exists(cursor, cfg["database"], "cases", "idx_cases_case_id"):
- cursor.execute("CREATE INDEX idx_cases_case_id ON cases(case_id)")
- cursor.execute(
- """
- CREATE TABLE IF NOT EXISTS case_management (
- id BIGINT PRIMARY KEY AUTO_INCREMENT,
- case_id VARCHAR(255) UNIQUE,
- title VARCHAR(255),
- description TEXT,
- status VARCHAR(50),
- stage VARCHAR(50),
- updated_at DATETIME
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
- """
- )
- if not index_exists(cursor, cfg["database"], "case_management", "idx_case_management_case_id"):
- cursor.execute("CREATE INDEX idx_case_management_case_id ON case_management(case_id)")
- conn.commit()
- finally:
- conn.close()
- def store_case_record(
- case_id: str,
- summary_text: str,
- case_profile: Dict[str, Any],
- dispute_points: List[str],
- law_results: Dict[str, Any],
- evidence_results: Dict[str, Any],
- final_judgement: Dict[str, Any],
- embedding: List[float]
- ) -> None:
- conn = get_mysql_connection(use_database=True)
- try:
- cursor = conn.cursor()
- cursor.execute(
- """
- INSERT INTO cases (
- case_id, summary_text, case_profile_json, dispute_points_json,
- law_results_json, evidence_results_json, final_judgement_json,
- embedding_json, created_at
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
- """,
- (
- case_id,
- summary_text,
- json.dumps(case_profile, ensure_ascii=False),
- json.dumps(dispute_points, ensure_ascii=False),
- json.dumps(law_results, ensure_ascii=False),
- json.dumps(evidence_results, ensure_ascii=False),
- json.dumps(final_judgement, ensure_ascii=False),
- json.dumps(embedding, ensure_ascii=False),
- datetime.utcnow()
- )
- )
- conn.commit()
- finally:
- conn.close()
- def fetch_similar_cases(embedding: List[float], top_k: int = 3) -> List[Dict[str, Any]]:
- conn = get_mysql_connection(use_database=True)
- try:
- cursor = conn.cursor()
- cursor.execute(
- """
- SELECT case_id, summary_text, final_judgement_json, embedding_json
- FROM cases
- WHERE embedding_json IS NOT NULL AND embedding_json != ''
- """
- )
- rows = cursor.fetchall()
- finally:
- conn.close()
- scored = []
- for case_id, summary_text, final_judgement_json, embedding_json in rows:
- try:
- vec = json.loads(embedding_json)
- except Exception:
- vec = []
- scored.append(
- {
- "case_id": case_id,
- "summary_text": summary_text,
- "final_judgement_json": final_judgement_json,
- "embedding": vec
- }
- )
- return scored
- def parse_case_description(description: str) -> Dict[str, Any]:
- if not description or not isinstance(description, str):
- return {}
- try:
- data = json.loads(description)
- return data if isinstance(data, dict) else {}
- except Exception:
- return {}
- def normalize_case_description(case_id: str, description: str) -> str:
- if not case_id:
- return description
- existing = fetch_case_management(case_id)
- existing_desc = existing.get("description", "") if existing else ""
- existing_data = parse_case_description(existing_desc)
- new_data = parse_case_description(description)
- if existing_data.get("materials") and not new_data.get("materials"):
- desc_text = description or existing_data.get("description", "")
- merged = {"description": desc_text, "materials": existing_data.get("materials", {})}
- return json.dumps(merged, ensure_ascii=False)
- return description
- def upsert_case_management(case_id: str, title: str, description: str, status: str, stage: str) -> Dict[str, Any]:
- conn = get_mysql_connection(use_database=True)
- try:
- cursor = conn.cursor()
- description = normalize_case_description(case_id, description)
- cursor.execute(
- """
- INSERT INTO case_management (case_id, title, description, status, stage, updated_at)
- VALUES (%s, %s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- title=VALUES(title),
- description=VALUES(description),
- status=VALUES(status),
- stage=VALUES(stage),
- updated_at=VALUES(updated_at)
- """,
- (case_id, title, description, status, stage, datetime.utcnow())
- )
- conn.commit()
- finally:
- conn.close()
- return {"case_id": case_id, "title": title, "description": description, "status": status, "stage": stage}
- def update_case_materials(case_id: str, materials: Dict[str, Any]) -> None:
- if not case_id:
- return
- existing = fetch_case_management(case_id)
- existing_desc = existing.get("description", "") if existing else ""
- existing_data = parse_case_description(existing_desc)
- description_text = existing_desc if not existing_data else existing_data.get("description", "")
- payload = {
- "description": description_text,
- "materials": materials or {}
- }
- conn = get_mysql_connection(use_database=True)
- try:
- cursor = conn.cursor()
- cursor.execute(
- """
- UPDATE case_management
- SET description=%s, updated_at=%s
- WHERE case_id=%s
- """,
- (json.dumps(payload, ensure_ascii=False), datetime.utcnow(), case_id)
- )
- conn.commit()
- finally:
- conn.close()
- def list_case_management() -> List[Dict[str, Any]]:
- conn = get_mysql_connection(use_database=True)
- try:
- cursor = conn.cursor()
- cursor.execute(
- """
- SELECT case_id, title, description, status, stage, updated_at
- FROM case_management
- ORDER BY updated_at DESC
- """
- )
- rows = cursor.fetchall()
- finally:
- conn.close()
- results = []
- for case_id, title, description, status, stage, updated_at in rows:
- results.append(
- {
- "case_id": case_id,
- "title": title,
- "description": description,
- "status": status,
- "stage": stage,
- "updated_at": updated_at.isoformat() if updated_at else ""
- }
- )
- return results
- def fetch_case_management(case_id: str) -> Dict[str, Any]:
- if not case_id:
- return {}
- conn = get_mysql_connection(use_database=True)
- try:
- cursor = conn.cursor()
- cursor.execute(
- """
- SELECT case_id, title, description, status, stage, updated_at
- FROM case_management
- WHERE case_id=%s
- LIMIT 1
- """,
- (case_id,)
- )
- row = cursor.fetchone()
- finally:
- conn.close()
- if not row:
- return {}
- case_id, title, description, status, stage, updated_at = row
- return {
- "case_id": case_id,
- "title": title,
- "description": description or "",
- "status": status,
- "stage": stage,
- "updated_at": updated_at.isoformat() if updated_at else ""
- }
- def fetch_case_record(case_id: str) -> Dict[str, Any]:
- if not case_id:
- return {}
- conn = get_mysql_connection(use_database=True)
- try:
- cursor = conn.cursor()
- cursor.execute(
- """
- SELECT summary_text, case_profile_json, dispute_points_json, law_results_json,
- evidence_results_json, final_judgement_json, embedding_json, created_at
- FROM cases
- WHERE case_id=%s
- ORDER BY created_at DESC
- LIMIT 1
- """,
- (case_id,)
- )
- row = cursor.fetchone()
- finally:
- conn.close()
- if not row:
- return {}
- (
- summary_text,
- case_profile_json,
- dispute_points_json,
- law_results_json,
- evidence_results_json,
- final_judgement_json,
- embedding_json,
- created_at
- ) = row
- def parse_json(value: str, fallback):
- if not value:
- return fallback
- try:
- parsed = json.loads(value)
- return parsed if parsed is not None else fallback
- except Exception:
- return fallback
- return {
- "summary_text": summary_text or "",
- "case_profile": parse_json(case_profile_json, {}),
- "dispute_points": parse_json(dispute_points_json, []),
- "law_results": parse_json(law_results_json, {}),
- "evidence_results": parse_json(evidence_results_json, {}),
- "final_judgement": parse_json(final_judgement_json, {}),
- "embedding": parse_json(embedding_json, []),
- "created_at": created_at.isoformat() if created_at else ""
- }
- def delete_case_management(case_id: str) -> None:
- conn = get_mysql_connection(use_database=True)
- try:
- cursor = conn.cursor()
- cursor.execute("DELETE FROM case_management WHERE case_id=%s", (case_id,))
- conn.commit()
- finally:
- conn.close()
|