text_utils.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. import os
  2. import json
  3. import re
  4. from typing import List, Dict, Any, Optional
  5. from fastapi import UploadFile
  6. def list_files_by_ext(root_dir: str, exts: List[str]) -> List[str]:
  7. if not root_dir or not os.path.exists(root_dir):
  8. return []
  9. found = []
  10. for base, _, files in os.walk(root_dir):
  11. for file_name in files:
  12. ext = os.path.splitext(file_name)[1].lower()
  13. if ext in exts:
  14. found.append(os.path.join(base, file_name))
  15. return sorted(found)
  16. def parse_json_from_text(text: str) -> Any:
  17. if not text:
  18. return None
  19. try:
  20. return json.loads(text)
  21. except Exception:
  22. pass
  23. match = re.search(r"\{[\s\S]*\}", text)
  24. if match:
  25. try:
  26. return json.loads(match.group(0))
  27. except Exception:
  28. return None
  29. match = re.search(r"\[[\s\S]*\]", text)
  30. if match:
  31. try:
  32. return json.loads(match.group(0))
  33. except Exception:
  34. return None
  35. return None
  36. def extract_evidence_text_from_ocr(ocr_text: str) -> Dict[str, Any]:
  37. parsed = parse_json_from_text(ocr_text)
  38. lines = []
  39. if isinstance(parsed, list):
  40. for item in parsed:
  41. if not isinstance(item, dict):
  42. continue
  43. label = item.get("block_label")
  44. content = item.get("block_content")
  45. if not content or not isinstance(content, str):
  46. continue
  47. if label in {"text", "paragraph_title", "table", "header"}:
  48. cleaned = content.strip()
  49. if cleaned:
  50. lines.append(cleaned)
  51. text = "\n".join(lines)
  52. return {"text": text, "lines": lines}
  53. def normalize_filename(name: str) -> str:
  54. base = os.path.basename(name)
  55. return base.replace("..", "_")
  56. async def save_uploads(files: Optional[List[UploadFile]], target_dir: str) -> List[str]:
  57. if not files:
  58. return []
  59. os.makedirs(target_dir, exist_ok=True)
  60. saved = []
  61. for file in files:
  62. if not file or not file.filename:
  63. continue
  64. file_path = os.path.join(target_dir, normalize_filename(file.filename))
  65. content = await file.read()
  66. with open(file_path, "wb") as f:
  67. f.write(content)
  68. saved.append(file_path)
  69. return saved