| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- import os
- import base64
- import fitz # PyMuPDF
- import pdfplumber
- import win32com.client as win32
- from docx import Document
- from openai import OpenAI
- from typing import List, Union
- import config.config
- class DocumentReader:
- def __init__(self, api_key: str = None, pdf_sample_pages: int = 2):
- """
- :param api_key: Moonshot API Key
- :param pdf_sample_pages: 判断PDF是否为扫描件时抽样的页数(默认为2)
- """
- self.client = OpenAI(
- api_key=api_key or config.config.MOONSHOT_API_KEY,
- base_url="https://api.moonshot.cn/v1",
- )
- self.pdf_sample_pages = pdf_sample_pages
- # ================= PDF 处理(智能判断类型)=================
- def _read_pdf_with_validation(self, pdf_path: str) -> str:
- """
- 读取PDF文件:
- - 抽样前 self.pdf_sample_pages 页,若均无文本则判定为扫描件,全部页面直接OCR;
- - 否则采用混合模式:每页优先提取文本,无文本时再OCR。
- """
- full_text = []
- try:
- with fitz.open(pdf_path) as doc_fitz, pdfplumber.open(pdf_path) as doc_plumber:
- total_pages = len(doc_fitz)
- if total_pages == 0:
- return "[Error] PDF文件为空"
- # 抽样判断PDF类型
- sample_limit = min(self.pdf_sample_pages, total_pages)
- has_text_page = False
- for page_num in range(sample_limit):
- # 仅用fitz快速判断是否有文本(pdfplumber较慢,这里只用fitz)
- text_fitz = doc_fitz[page_num].get_text().strip()
- if text_fitz:
- has_text_page = True
- break
- # 根据类型选择处理模式
- if not has_text_page:
- # 纯扫描件:所有页直接OCR
- print(f"[Info] 检测为扫描型PDF,共{total_pages}页,直接进行OCR...")
- for page_num in range(total_pages):
- print(f"[Info] 正在OCR第 {page_num+1}/{total_pages} 页...")
- pix = doc_fitz[page_num].get_pixmap(dpi=150)
- img_bytes = pix.tobytes("png")
- ocr_result = self._read_images_via_kimi(
- [img_bytes],
- prompt="请提取图片中的全部文字内容,保留原始格式。"
- )
- if ocr_result.startswith("[Error]"):
- page_content = f"[OCR失败] {ocr_result}"
- else:
- page_content = ocr_result
- full_text.append(f"--- Page {page_num + 1} ---\n{page_content}")
- else:
- # 混合型或文本型:逐页先提取文本,空则OCR
- print(f"[Info] 检测为文本型或混合型PDF,逐页处理...")
- for page_num in range(total_pages):
- # 尝试提取文本
- text_fitz = doc_fitz[page_num].get_text().strip()
- text_plumber = doc_plumber.pages[page_num].extract_text() or ""
- page_content = text_plumber if len(text_plumber) > len(text_fitz) else text_fitz
- if not page_content.strip():
- # 扫描页,进行OCR
- print(f"[Info] 第 {page_num+1} 页为扫描页,正在进行OCR...")
- pix = doc_fitz[page_num].get_pixmap(dpi=150)
- img_bytes = pix.tobytes("png")
- ocr_result = self._read_images_via_kimi(
- [img_bytes],
- prompt="请提取图片中的全部文字内容,保留原始格式。"
- )
- if not ocr_result.startswith("[Error]"):
- page_content = ocr_result
- else:
- page_content = f"[OCR失败] {ocr_result}"
- full_text.append(f"--- Page {page_num + 1} ---\n{page_content}")
- except Exception as e:
- return f"[Error] PDF读取失败: {str(e)}"
- return "\n".join(full_text)
- # ================= Word 处理 =================
- def _read_word(self, word_path: str) -> str:
- abs_path = os.path.abspath(word_path)
- ext = os.path.splitext(abs_path)[1].lower()
- if ext == ".docx":
- return self._extract_docx_structured_text(abs_path)
- elif ext == ".doc":
- temp_docx = abs_path + "x"
- word_app = win32.Dispatch('Word.Application')
- word_app.Visible = False
- try:
- doc = word_app.Documents.Open(abs_path)
- doc.SaveAs(temp_docx, FileFormat=16) # 16 = wdFormatXMLDocument
- doc.Close()
- text = self._extract_docx_structured_text(temp_docx)
- if os.path.exists(temp_docx):
- os.remove(temp_docx)
- return text
- except Exception as e:
- return f"[Error] .doc转换失败: {str(e)}"
- finally:
- word_app.Quit()
- return "[Error] 不支持的Word格式"
- def _extract_docx_structured_text(self, docx_path: str) -> str:
- """提取.docx中的段落和表格文本"""
- try:
- doc = Document(docx_path)
- full_content = []
- for element in doc.element.body.iterchildren():
- # 段落
- if element.tag.endswith('p'):
- texts = [node.text for node in element.iter() if node.tag.endswith('t') and node.text]
- para_text = "".join(texts).strip()
- if para_text:
- full_content.append(para_text)
- # 表格
- elif element.tag.endswith('tbl'):
- for table in doc.tables:
- if table._element == element:
- for row in table.rows:
- row_data = [cell.text.strip() for cell in row.cells if cell.text.strip()]
- if row_data:
- full_content.append(" | ".join(row_data))
- break
- return "\n".join(full_content)
- except Exception as e:
- return f"[Error] Docx解析异常: {str(e)}"
- # ================= 图片 OCR(支持文件路径、base64 data URL、字节数据)=================
- def _read_images_via_kimi(self, image_inputs: List[Union[str, bytes]], prompt: str = "请提取图片中的文字内容") -> str:
- """
- 通用图片OCR接口:
- - image_inputs 列表中的每个元素可以是:
- * 图片文件路径 (str)
- * data URL 字符串 (以 "data:image/" 开头)
- * 图片原始字节数据 (bytes)
- """
- content_list = []
- for img_input in image_inputs:
- if isinstance(img_input, str) and os.path.exists(img_input):
- # 文件路径
- with open(img_input, "rb") as f:
- image_data = f.read()
- ext = os.path.splitext(img_input)[1].replace(".", "").lower()
- if ext not in ['png', 'jpg', 'jpeg', 'gif', 'bmp']:
- ext = 'png' # 默认
- image_url = f"data:image/{ext};base64,{base64.b64encode(image_data).decode('utf-8')}"
- elif isinstance(img_input, str) and img_input.startswith("data:image/"):
- # 已经是 data URL
- image_url = img_input
- elif isinstance(img_input, bytes):
- # 字节数据,默认使用PNG格式
- image_data = img_input
- image_url = f"data:image/png;base64,{base64.b64encode(image_data).decode('utf-8')}"
- else:
- continue # 忽略无效输入
- content_list.append({"type": "image_url", "image_url": {"url": image_url}})
- if not content_list:
- return "[Error] 没有有效的图片输入"
- content_list.append({"type": "text", "text": prompt})
- try:
- completion = self.client.chat.completions.create(
- model="moonshot-v1-8k-vision-preview",
- messages=[{"role": "user", "content": content_list}],
- )
- return completion.choices[0].message.content
- except Exception as e:
- return f"[Error] Kimi API失败: {str(e)}"
- # ================= 主接口 =================
- def process_input(self, inputs: Union[str, List[str]]) -> str:
- """
- 统一入口:
- - 如果 inputs 是列表,则视为多张图片路径,调用图片OCR。
- - 如果是单个字符串,判断路径类型:
- * 目录不存在 → 报错
- * PDF → 调用PDF处理(自动判断文本/扫描)
- * Word文档 → 调用Word处理
- * 图片文件 → 调用图片OCR
- * 其他 → 报错
- """
- if isinstance(inputs, list):
- return self._read_images_via_kimi(inputs)
- if not os.path.exists(inputs):
- return f"[Error] 路径不存在: {inputs}"
- ext = os.path.splitext(inputs)[1].lower()
- if ext in [".pdf",".PDF"]:
- return self._read_pdf_with_validation(inputs)
- elif ext in [".docx", ".doc"]:
- return self._read_word(inputs)
- elif ext in [".png", ".jpg", ".jpeg", ".gif", ".bmp"]:
- return self._read_images_via_kimi([inputs])
- return f"[Error] 未知文件格式: {ext}"
- # ================= 测试运行 =================
- if __name__ == "__main__":
- # 请替换为你的 Moonshot API Key
- reader = DocumentReader()
- # 示例:处理一个可能为扫描件的PDF
- # test_path1 = r"E:\project\Ruling_and_Judgment_Comparative_Analysis_System\demo\(2024-3853)朱焱诉苏州星际浩宸企业管理有限公司(工资差额、经济补偿).doc"
- # test_path2 = r"E:\project\Ruling_and_Judgment_Comparative_Analysis_System\demo\苏园劳人仲案字〔2024〕第3853号.pdf"
- # test_path3 = r"E:\project\Ruling_and_Judgment_Comparative_Analysis_System\demo\微信图片_20250925131023_134_4.jpg"
- test_path4 = r"E:\project\arbitration_system\transcript_extractor\test\庭审笔录.PDF"
- print(f"[Log] 开始处理: {test_path4}")
- result = reader.process_input(test_path4)
- print("\n--- 提取结果 ---\n")
- print(result)
|