import os import json import re from typing import List, Dict, Any, Optional from fastapi import UploadFile def list_files_by_ext(root_dir: str, exts: List[str]) -> List[str]: if not root_dir or not os.path.exists(root_dir): return [] found = [] for base, _, files in os.walk(root_dir): for file_name in files: ext = os.path.splitext(file_name)[1].lower() if ext in exts: found.append(os.path.join(base, file_name)) return sorted(found) def parse_json_from_text(text: str) -> Any: if not text: return None try: return json.loads(text) except Exception: pass match = re.search(r"\{[\s\S]*\}", text) if match: try: return json.loads(match.group(0)) except Exception: return None match = re.search(r"\[[\s\S]*\]", text) if match: try: return json.loads(match.group(0)) except Exception: return None return None def extract_evidence_text_from_ocr(ocr_text: str) -> Dict[str, Any]: parsed = parse_json_from_text(ocr_text) lines = [] if isinstance(parsed, list): for item in parsed: if not isinstance(item, dict): continue label = item.get("block_label") content = item.get("block_content") if not content or not isinstance(content, str): continue if label in {"text", "paragraph_title", "table", "header"}: cleaned = content.strip() if cleaned: lines.append(cleaned) text = "\n".join(lines) return {"text": text, "lines": lines} def normalize_filename(name: str) -> str: base = os.path.basename(name) return base.replace("..", "_") async def save_uploads(files: Optional[List[UploadFile]], target_dir: str) -> List[str]: if not files: return [] os.makedirs(target_dir, exist_ok=True) saved = [] for file in files: if not file or not file.filename: continue file_path = os.path.join(target_dir, normalize_filename(file.filename)) content = await file.read() with open(file_path, "wb") as f: f.write(content) saved.append(file_path) return saved