documents_extractor.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. import os
  2. import base64
  3. import fitz # PyMuPDF
  4. import pdfplumber
  5. import win32com.client as win32
  6. from docx import Document
  7. from openai import OpenAI
  8. from typing import List, Union
  9. import config.config
  10. class DocumentReader:
  11. def __init__(self, api_key: str = None, pdf_sample_pages: int = 2):
  12. """
  13. :param api_key: Moonshot API Key
  14. :param pdf_sample_pages: 判断PDF是否为扫描件时抽样的页数(默认为2)
  15. """
  16. self.client = OpenAI(
  17. api_key=api_key or config.config.MOONSHOT_API_KEY,
  18. base_url="https://api.moonshot.cn/v1",
  19. )
  20. self.pdf_sample_pages = pdf_sample_pages
  21. # ================= PDF 处理(智能判断类型)=================
  22. def _read_pdf_with_validation(self, pdf_path: str) -> str:
  23. """
  24. 读取PDF文件:
  25. - 抽样前 self.pdf_sample_pages 页,若均无文本则判定为扫描件,全部页面直接OCR;
  26. - 否则采用混合模式:每页优先提取文本,无文本时再OCR。
  27. """
  28. full_text = []
  29. try:
  30. with fitz.open(pdf_path) as doc_fitz, pdfplumber.open(pdf_path) as doc_plumber:
  31. total_pages = len(doc_fitz)
  32. if total_pages == 0:
  33. return "[Error] PDF文件为空"
  34. # 抽样判断PDF类型
  35. sample_limit = min(self.pdf_sample_pages, total_pages)
  36. has_text_page = False
  37. for page_num in range(sample_limit):
  38. # 仅用fitz快速判断是否有文本(pdfplumber较慢,这里只用fitz)
  39. text_fitz = doc_fitz[page_num].get_text().strip()
  40. if text_fitz:
  41. has_text_page = True
  42. break
  43. # 根据类型选择处理模式
  44. if not has_text_page:
  45. # 纯扫描件:所有页直接OCR
  46. print(f"[Info] 检测为扫描型PDF,共{total_pages}页,直接进行OCR...")
  47. for page_num in range(total_pages):
  48. print(f"[Info] 正在OCR第 {page_num+1}/{total_pages} 页...")
  49. pix = doc_fitz[page_num].get_pixmap(dpi=150)
  50. img_bytes = pix.tobytes("png")
  51. ocr_result = self._read_images_via_kimi(
  52. [img_bytes],
  53. prompt="请提取图片中的全部文字内容,保留原始格式。"
  54. )
  55. if ocr_result.startswith("[Error]"):
  56. page_content = f"[OCR失败] {ocr_result}"
  57. else:
  58. page_content = ocr_result
  59. full_text.append(f"--- Page {page_num + 1} ---\n{page_content}")
  60. else:
  61. # 混合型或文本型:逐页先提取文本,空则OCR
  62. print(f"[Info] 检测为文本型或混合型PDF,逐页处理...")
  63. for page_num in range(total_pages):
  64. # 尝试提取文本
  65. text_fitz = doc_fitz[page_num].get_text().strip()
  66. text_plumber = doc_plumber.pages[page_num].extract_text() or ""
  67. page_content = text_plumber if len(text_plumber) > len(text_fitz) else text_fitz
  68. if not page_content.strip():
  69. # 扫描页,进行OCR
  70. print(f"[Info] 第 {page_num+1} 页为扫描页,正在进行OCR...")
  71. pix = doc_fitz[page_num].get_pixmap(dpi=150)
  72. img_bytes = pix.tobytes("png")
  73. ocr_result = self._read_images_via_kimi(
  74. [img_bytes],
  75. prompt="请提取图片中的全部文字内容,保留原始格式。"
  76. )
  77. if not ocr_result.startswith("[Error]"):
  78. page_content = ocr_result
  79. else:
  80. page_content = f"[OCR失败] {ocr_result}"
  81. full_text.append(f"--- Page {page_num + 1} ---\n{page_content}")
  82. except Exception as e:
  83. return f"[Error] PDF读取失败: {str(e)}"
  84. return "\n".join(full_text)
  85. # ================= Word 处理 =================
  86. def _read_word(self, word_path: str) -> str:
  87. abs_path = os.path.abspath(word_path)
  88. ext = os.path.splitext(abs_path)[1].lower()
  89. if ext == ".docx":
  90. return self._extract_docx_structured_text(abs_path)
  91. elif ext == ".doc":
  92. temp_docx = abs_path + "x"
  93. word_app = win32.Dispatch('Word.Application')
  94. word_app.Visible = False
  95. try:
  96. doc = word_app.Documents.Open(abs_path)
  97. doc.SaveAs(temp_docx, FileFormat=16) # 16 = wdFormatXMLDocument
  98. doc.Close()
  99. text = self._extract_docx_structured_text(temp_docx)
  100. if os.path.exists(temp_docx):
  101. os.remove(temp_docx)
  102. return text
  103. except Exception as e:
  104. return f"[Error] .doc转换失败: {str(e)}"
  105. finally:
  106. word_app.Quit()
  107. return "[Error] 不支持的Word格式"
  108. def _extract_docx_structured_text(self, docx_path: str) -> str:
  109. """提取.docx中的段落和表格文本"""
  110. try:
  111. doc = Document(docx_path)
  112. full_content = []
  113. for element in doc.element.body.iterchildren():
  114. # 段落
  115. if element.tag.endswith('p'):
  116. texts = [node.text for node in element.iter() if node.tag.endswith('t') and node.text]
  117. para_text = "".join(texts).strip()
  118. if para_text:
  119. full_content.append(para_text)
  120. # 表格
  121. elif element.tag.endswith('tbl'):
  122. for table in doc.tables:
  123. if table._element == element:
  124. for row in table.rows:
  125. row_data = [cell.text.strip() for cell in row.cells if cell.text.strip()]
  126. if row_data:
  127. full_content.append(" | ".join(row_data))
  128. break
  129. return "\n".join(full_content)
  130. except Exception as e:
  131. return f"[Error] Docx解析异常: {str(e)}"
  132. # ================= 图片 OCR(支持文件路径、base64 data URL、字节数据)=================
  133. def _read_images_via_kimi(self, image_inputs: List[Union[str, bytes]], prompt: str = "请提取图片中的文字内容") -> str:
  134. """
  135. 通用图片OCR接口:
  136. - image_inputs 列表中的每个元素可以是:
  137. * 图片文件路径 (str)
  138. * data URL 字符串 (以 "data:image/" 开头)
  139. * 图片原始字节数据 (bytes)
  140. """
  141. content_list = []
  142. for img_input in image_inputs:
  143. if isinstance(img_input, str) and os.path.exists(img_input):
  144. # 文件路径
  145. with open(img_input, "rb") as f:
  146. image_data = f.read()
  147. ext = os.path.splitext(img_input)[1].replace(".", "").lower()
  148. if ext not in ['png', 'jpg', 'jpeg', 'gif', 'bmp']:
  149. ext = 'png' # 默认
  150. image_url = f"data:image/{ext};base64,{base64.b64encode(image_data).decode('utf-8')}"
  151. elif isinstance(img_input, str) and img_input.startswith("data:image/"):
  152. # 已经是 data URL
  153. image_url = img_input
  154. elif isinstance(img_input, bytes):
  155. # 字节数据,默认使用PNG格式
  156. image_data = img_input
  157. image_url = f"data:image/png;base64,{base64.b64encode(image_data).decode('utf-8')}"
  158. else:
  159. continue # 忽略无效输入
  160. content_list.append({"type": "image_url", "image_url": {"url": image_url}})
  161. if not content_list:
  162. return "[Error] 没有有效的图片输入"
  163. content_list.append({"type": "text", "text": prompt})
  164. try:
  165. completion = self.client.chat.completions.create(
  166. model="moonshot-v1-8k-vision-preview",
  167. messages=[{"role": "user", "content": content_list}],
  168. )
  169. return completion.choices[0].message.content
  170. except Exception as e:
  171. return f"[Error] Kimi API失败: {str(e)}"
  172. # ================= 主接口 =================
  173. def process_input(self, inputs: Union[str, List[str]]) -> str:
  174. """
  175. 统一入口:
  176. - 如果 inputs 是列表,则视为多张图片路径,调用图片OCR。
  177. - 如果是单个字符串,判断路径类型:
  178. * 目录不存在 → 报错
  179. * PDF → 调用PDF处理(自动判断文本/扫描)
  180. * Word文档 → 调用Word处理
  181. * 图片文件 → 调用图片OCR
  182. * 其他 → 报错
  183. """
  184. if isinstance(inputs, list):
  185. return self._read_images_via_kimi(inputs)
  186. if not os.path.exists(inputs):
  187. return f"[Error] 路径不存在: {inputs}"
  188. ext = os.path.splitext(inputs)[1].lower()
  189. if ext in [".pdf",".PDF"]:
  190. return self._read_pdf_with_validation(inputs)
  191. elif ext in [".docx", ".doc"]:
  192. return self._read_word(inputs)
  193. elif ext in [".png", ".jpg", ".jpeg", ".gif", ".bmp"]:
  194. return self._read_images_via_kimi([inputs])
  195. return f"[Error] 未知文件格式: {ext}"
  196. # ================= 测试运行 =================
  197. if __name__ == "__main__":
  198. # 请替换为你的 Moonshot API Key
  199. reader = DocumentReader()
  200. # 示例:处理一个可能为扫描件的PDF
  201. # test_path1 = r"E:\project\Ruling_and_Judgment_Comparative_Analysis_System\demo\(2024-3853)朱焱诉苏州星际浩宸企业管理有限公司(工资差额、经济补偿).doc"
  202. # test_path2 = r"E:\project\Ruling_and_Judgment_Comparative_Analysis_System\demo\苏园劳人仲案字〔2024〕第3853号.pdf"
  203. # test_path3 = r"E:\project\Ruling_and_Judgment_Comparative_Analysis_System\demo\微信图片_20250925131023_134_4.jpg"
  204. test_path4 = r"E:\project\arbitration_system\transcript_extractor\test\庭审笔录.PDF"
  205. print(f"[Log] 开始处理: {test_path4}")
  206. result = reader.process_input(test_path4)
  207. print("\n--- 提取结果 ---\n")
  208. print(result)