Files
audiobook-maker-pro-v4.2/pdf_processor.py
2026-05-22 18:28:47 +06:00

644 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# pdf_processor.py - PDF Processing and Content Extraction
# UPDATED: Blocks now include font_size and is_bold metadata for AI analysis
import base64
import re
import fitz # PyMuPDF
# ================================================================
# LIST GROUPING HELPER
# ================================================================
def _group_lists(blocks):
"""Groups consecutive list items into a cohesive Markdown list block."""
final_blocks = []
list_buffer = []
list_type = None
def flush():
if not list_buffer: return
lines = []
for i, text in enumerate(list_buffer):
clean_text = re.sub(r'^\s*[-*+]\s+', '', text)
clean_text = re.sub(r'^\s*\d+\.\s+', '', clean_text)
if list_type == 'numbered_list':
lines.append(f"{i+1}. {clean_text}")
else:
lines.append(f"- {clean_text}")
final_blocks.append({
'type': 'paragraph',
'content': '\n'.join(lines)
})
list_buffer.clear()
for b in blocks:
if b.get('type') in ['image', 'table']:
flush()
list_type = None
final_blocks.append(b)
continue
content = b.get('content', '')
bt = b.get('type', '')
is_bullet = bt == 'list_item' or content.startswith('- ') or content.startswith('* ')
is_number = bt == 'numbered_list' or re.match(r'^\s*\d+\.\s+', content)
if bt.startswith('heading'):
is_bullet = False
is_number = False
if is_bullet or is_number:
current_type = 'numbered_list' if is_number else 'bullet_list'
if list_type and list_type != current_type:
flush()
list_type = current_type
list_buffer.append(content)
else:
flush()
list_type = None
final_blocks.append(b)
flush()
return final_blocks
class PDFProcessor:
"""Process PDF files and extract structured content."""
TITLE_SIZE_THRESHOLD = 24
SUBTITLE_SIZE_THRESHOLD = 18
HEADING_SIZE_THRESHOLD = 14
TITLE_RATIO = 1.8
SUBTITLE_RATIO = 1.4
HEADING_RATIO = 1.2
LIST_PATTERNS = [
r'^\s*[\u2022\u2023\u25E6\u2043\u2219•●○◦‣·∙]\s*',
r'^\s*[-–—]\s+',
r'^\s*\d+[.)]\s+',
r'^\s*[a-zA-Z][.)]\s+',
r'^\s*[ivxIVX]+[.)]\s+',
]
BULLET_CHARS = set('•●○◦‣⁃·∙\u2022\u2023\u25E6\u2043\u2219-–—')
INLINE_BULLET_SPLIT = re.compile(
r'\s*[\u2022\u2023\u25E6\u2043\u2219•●○◦‣·∙]\s+'
)
QUOTE_PATTERNS = [
r'^[\"\'\u201C\u201D\u2018\u2019].+[\"\'\u201C\u201D\u2018\u2019]$',
]
TOC_LEADER_PATTERN = re.compile(r'[.…·]{3,}\s*\.?\s*\d+\s*$')
def __init__(self, pdf_bytes):
self.doc = fitz.open(stream=pdf_bytes, filetype="pdf")
self.elements = []
self.font_sizes = []
self.median_size = 12
self.body_size = 12
def close(self):
if self.doc:
self.doc.close()
def _analyze_font_distribution(self):
font_size_counts = {}
for page in self.doc:
blocks = page.get_text("dict", flags=11)["blocks"]
for block in blocks:
if block.get("type") == 0:
for line in block.get("lines", []):
for span in line.get("spans", []):
size = round(span.get("size", 12), 1)
text = span.get("text", "").strip()
if text:
self.font_sizes.append(size)
font_size_counts[size] = font_size_counts.get(size, 0) + len(text)
if self.font_sizes:
self.font_sizes.sort()
n = len(self.font_sizes)
self.median_size = self.font_sizes[n // 2]
if font_size_counts:
self.body_size = max(font_size_counts.keys(), key=lambda x: font_size_counts[x])
else:
self.body_size = self.median_size
def _is_likely_heading(self, text, font_size, flags):
text_stripped = text.strip()
if not text_stripped:
return False, None
is_bold = bool(flags & 2 ** 4)
is_all_caps = text_stripped.isupper() and len(text_stripped) > 3
size_ratio = font_size / self.body_size if self.body_size > 0 else 1
if size_ratio >= self.TITLE_RATIO or font_size >= self.TITLE_SIZE_THRESHOLD:
if len(text_stripped) < 200:
return True, "title"
if size_ratio >= self.SUBTITLE_RATIO or font_size >= self.SUBTITLE_SIZE_THRESHOLD:
if len(text_stripped) < 150:
return True, "subtitle"
if size_ratio >= self.HEADING_RATIO and is_bold:
if len(text_stripped) < 100:
return True, "heading"
if is_all_caps and is_bold and len(text_stripped) < 80:
return True, "heading"
if is_bold and len(text_stripped) < 60:
return True, "heading"
return False, None
def _classify_element(self, text, font_size, flags, is_italic=False, bbox=None):
text_stripped = text.strip()
if not text_stripped:
return None
is_bold = bool(flags & 2 ** 4)
is_heading, heading_type = self._is_likely_heading(text_stripped, font_size, flags)
if is_heading:
return heading_type
for pattern in self.LIST_PATTERNS:
if re.match(pattern, text_stripped):
if re.match(r'^\s*\d+[.)]\s+', text_stripped) or \
re.match(r'^\s*[a-zA-Z][.)]\s+', text_stripped) or \
re.match(r'^\s*[ivxIVX]+[.)]\s+', text_stripped):
return "numbered_list"
return "list_item"
if is_italic and len(text_stripped) > 50:
return "quote"
for pattern in self.QUOTE_PATTERNS:
if re.match(pattern, text_stripped):
return "quote"
return "paragraph"
def _extract_images(self, page, page_num):
images = []
image_list = page.get_images(full=True)
for img_index, img in enumerate(image_list):
try:
xref = img[0]
base_image = self.doc.extract_image(xref)
if base_image:
image_bytes = base_image["image"]
image_ext = base_image["ext"]
img_rects = page.get_image_rects(img)
bbox = None
if img_rects:
rect = img_rects[0]
bbox = [rect.x0, rect.y0, rect.x1, rect.y1]
images.append({
"type": "image",
"data": base64.b64encode(image_bytes).decode('utf-8'),
"format": image_ext,
"bbox": bbox,
"width": base_image.get("width", 0),
"height": base_image.get("height", 0),
})
except Exception: pass
return images
def _extract_tables(self, page, page_num):
tables = []
try:
table_finder = page.find_tables()
for table_index, table in enumerate(table_finder):
try:
table_data = table.extract()
bbox = list(table.bbox)
markdown_table = self._table_to_markdown(table_data)
tables.append({
"type": "table",
"data": table_data,
"markdown": markdown_table,
"bbox": bbox,
})
except Exception: pass
except Exception: pass
return tables
def _table_to_markdown(self, table_data):
if not table_data: return ""
lines = []
for row_idx, row in enumerate(table_data):
cells = [str(cell).replace('|', '\\|').replace('\n', ' ') if cell else '' for cell in row]
lines.append('| ' + ' | '.join(cells) + ' |')
if row_idx == 0:
lines.append('| ' + ' | '.join(['---'] * len(cells)) + ' |')
return '\n'.join(lines)
def _get_reading_order(self, elements, page_width):
if not elements: return elements
mid_x = page_width / 2
left_col, right_col, full_width = [], [], []
for elem in elements:
bbox = elem.get("bbox")
if not bbox:
full_width.append(elem)
continue
x0, y0, x1, y1 = bbox
width = x1 - x0
if width > page_width * 0.6:
full_width.append(elem)
elif x1 < mid_x:
left_col.append(elem)
elif x0 > mid_x:
right_col.append(elem)
else:
full_width.append(elem)
sort_by_y = lambda e: (e.get("bbox") or [0, 0, 0, 0])[1]
left_col.sort(key=sort_by_y)
right_col.sort(key=sort_by_y)
full_width.sort(key=sort_by_y)
all_elements = [(e, "full") for e in full_width]
all_elements += [(e, "left") for e in left_col]
all_elements += [(e, "right") for e in right_col]
all_elements.sort(key=lambda x: (x[0].get("bbox") or [0, 0, 0, 0])[1])
result = [e[0] for e in all_elements]
for idx, elem in enumerate(result):
elem["reading_order"] = idx
return result
def _bboxes_overlap(self, bbox1, bbox2, threshold=0.5):
if not bbox1 or not bbox2: return False
x1_min, y1_min, x1_max, y1_max = bbox1
x2_min, y2_min, x2_max, y2_max = bbox2
x_overlap = max(0, min(x1_max, x2_max) - max(x1_min, x2_min))
y_overlap = max(0, min(y1_max, y2_max) - max(y1_min, y2_min))
intersection = x_overlap * y_overlap
area1 = (x1_max - x1_min) * (y1_max - y1_min)
if area1 == 0: return False
return intersection / area1 > threshold
def _extract_line_info(self, line):
text = ""
total_chars = 0
weighted_size = 0.0
combined_flags = 0
for span in line.get("spans", []):
span_text = span.get("text", "")
span_size = span.get("size", 12)
span_flags = span.get("flags", 0)
if span_text.strip():
char_count = len(span_text)
text += span_text
weighted_size = ((weighted_size * total_chars + span_size * char_count) /
(total_chars + char_count)) if (total_chars + char_count) > 0 else span_size
total_chars += char_count
combined_flags |= span_flags
stripped = text.strip()
return {
"text": text,
"stripped": stripped,
"bbox": list(line.get("bbox", [0, 0, 0, 0])),
"font_size": round(weighted_size, 1),
"flags": combined_flags,
"is_bold": bool(combined_flags & (2 ** 4)),
"is_italic": bool(combined_flags & (2 ** 1)),
"char_count": total_chars,
"is_bullet": len(stripped) <= 2 and bool(stripped) and all(c in self.BULLET_CHARS for c in stripped),
"is_single_line_entry": False,
}
def _is_single_line_entry(self, info, page_width):
text = info["stripped"]
if not text: return False
if self.TOC_LEADER_PATTERN.search(text): return True
if re.search(r'\d+\s*$', text) and '' in text: return True
return False
def _should_break_between(self, prev_info, curr_info, median_gap, avg_line_height, page_width):
if prev_info["is_bullet"]: return False
prev_bbox = prev_info["bbox"]
curr_bbox = curr_info["bbox"]
gap = curr_bbox[1] - prev_bbox[3]
size_diff = abs(curr_info["font_size"] - prev_info["font_size"])
if size_diff > 1.5: return True
if prev_info["is_bold"] != curr_info["is_bold"]: return True
if median_gap > 0:
gap_ratio = gap / median_gap if median_gap > 0 else 1
if gap_ratio >= 2.0: return True
if gap_ratio >= 1.5:
if prev_info["stripped"] and prev_info["stripped"][-1] in '.!?:"\u201D\u2019':
return True
if gap > avg_line_height * 1.0: return True
x_diff = abs(curr_bbox[0] - prev_bbox[0])
if x_diff > 25: return True
if prev_info.get("is_single_line_entry"): return True
if prev_info["is_bold"] and curr_info["is_bold"]:
prev_line_width = prev_bbox[2] - prev_bbox[0]
if page_width > 0 and prev_line_width < page_width * 0.75:
return True
return False
def _merge_bullet_lines(self, line_infos):
if not line_infos: return line_infos
merged = []
i = 0
while i < len(line_infos):
info = line_infos[i]
if info["is_bullet"] and i + 1 < len(line_infos):
next_info = line_infos[i + 1]
bullet_char = info["stripped"]
merged.append({
"text": bullet_char + " " + next_info["text"],
"stripped": bullet_char + " " + next_info["stripped"],
"bbox": [
min(info["bbox"][0], next_info["bbox"][0]),
min(info["bbox"][1], next_info["bbox"][1]),
max(info["bbox"][2], next_info["bbox"][2]),
max(info["bbox"][3], next_info["bbox"][3]),
],
"font_size": next_info["font_size"],
"flags": next_info["flags"],
"is_bold": next_info["is_bold"],
"is_italic": next_info["is_italic"],
"char_count": info["char_count"] + next_info["char_count"],
"is_bullet": False,
"is_single_line_entry": False,
})
i += 2
else:
merged.append(info)
i += 1
return merged
def _split_block_into_paragraphs(self, block, page_width):
lines = block.get("lines", [])
if not lines: return []
line_infos = []
for line in lines:
info = self._extract_line_info(line)
if info["stripped"]: line_infos.append(info)
if not line_infos: return []
line_infos = self._merge_bullet_lines(line_infos)
for info in line_infos:
info["is_single_line_entry"] = self._is_single_line_entry(info, page_width)
if len(line_infos) == 1: return [line_infos]
gaps = []
line_heights = []
for i in range(len(line_infos)):
h = line_infos[i]["bbox"][3] - line_infos[i]["bbox"][1]
line_heights.append(h)
if i > 0:
gap = line_infos[i]["bbox"][1] - line_infos[i - 1]["bbox"][3]
gaps.append(gap)
avg_line_height = sum(line_heights) / len(line_heights) if line_heights else 12
median_gap = sorted(gaps)[len(gaps) // 2] if gaps else avg_line_height * 0.3
paragraphs = []
current_group = [line_infos[0]]
for i in range(1, len(line_infos)):
if self._should_break_between(line_infos[i - 1], line_infos[i], median_gap, avg_line_height, page_width):
paragraphs.append(current_group)
current_group = [line_infos[i]]
else:
current_group.append(line_infos[i])
if current_group: paragraphs.append(current_group)
return paragraphs
def _group_to_element(self, line_group):
text = " ".join(info["stripped"] for info in line_group if info["stripped"])
if not text.strip(): return None
total_chars = sum(info["char_count"] for info in line_group)
font_size = sum(info["font_size"] * info["char_count"] for info in line_group) / total_chars if total_chars > 0 else self.body_size
flags = 0
for info in line_group: flags |= info["flags"]
x0 = min(info["bbox"][0] for info in line_group)
y0 = min(info["bbox"][1] for info in line_group)
x1 = max(info["bbox"][2] for info in line_group)
y1 = max(info["bbox"][3] for info in line_group)
elem_type = self._classify_element(text, font_size, flags, bool(flags & (2 ** 1)), [x0, y0, x1, y1])
if elem_type:
return {
"type": elem_type,
"text": text.strip(),
"bbox": [x0, y0, x1, y1],
"font_size": round(font_size, 1),
"flags": flags,
}
return None
def _should_merge_elements(self, prev_elem, curr_elem):
if prev_elem["type"] != "paragraph" or curr_elem["type"] != "paragraph": return False
if abs(prev_elem["font_size"] - curr_elem["font_size"]) > 1.5: return False
prev_bold = bool(prev_elem.get("flags", 0) & (2 ** 4))
curr_bold = bool(curr_elem.get("flags", 0) & (2 ** 4))
if prev_bold != curr_bold: return False
prev_text = prev_elem["text"].strip()
curr_text = curr_elem["text"].strip()
if not prev_text or not curr_text: return False
if self.TOC_LEADER_PATTERN.search(prev_text): return False
last_char = prev_text[-1]
if last_char in '.!?':
if curr_text and curr_text[0].islower(): return True
return False
if last_char in '"\u201D\u2019':
if len(prev_text) >= 2 and prev_text[-2] in '.!?':
if curr_text and curr_text[0].islower(): return True
return False
return True
def _merge_continuation_paragraphs(self, elements):
if len(elements) <= 1: return elements
merged = [elements[0]]
for i in range(1, len(elements)):
prev = merged[-1]
curr = elements[i]
if self._should_merge_elements(prev, curr):
prev_bbox = prev["bbox"]
curr_bbox = curr["bbox"]
merged[-1] = {
"type": "paragraph",
"text": prev["text"].rstrip() + " " + curr["text"].lstrip(),
"bbox": [
min(prev_bbox[0], curr_bbox[0]),
min(prev_bbox[1], curr_bbox[1]),
max(prev_bbox[2], curr_bbox[2]),
max(prev_bbox[3], curr_bbox[3]),
],
"font_size": prev["font_size"],
"flags": prev.get("flags", 0),
}
else:
merged.append(curr)
return merged
def _split_combined_list_items(self, elements):
result = []
for elem in elements:
if elem["type"] != "list_item":
result.append(elem)
continue
text = elem["text"].strip()
cleaned = text
for pattern in self.LIST_PATTERNS:
cleaned = re.sub(pattern, '', cleaned, count=1).strip()
parts = self.INLINE_BULLET_SPLIT.split(cleaned)
parts = [p.strip() for p in parts if p.strip()]
if len(parts) <= 1:
result.append(elem)
else:
bbox = elem["bbox"]
item_height = (bbox[3] - bbox[1]) / len(parts) if len(parts) > 0 else 0
for idx, part in enumerate(parts):
result.append({
"type": "list_item",
"text": part.strip(),
"bbox": [bbox[0], bbox[1] + idx * item_height, bbox[2], bbox[1] + (idx + 1) * item_height],
"font_size": elem["font_size"],
"flags": elem.get("flags", 0),
})
return result
def process(self):
self._analyze_font_distribution()
all_pages = []
for page_num, page in enumerate(self.doc):
page_elements = []
page_rect = page.rect
dict_blocks = page.get_text("dict", flags=11)["blocks"]
tables = self._extract_tables(page, page_num)
table_bboxes = [t["bbox"] for t in tables if t.get("bbox")]
images = self._extract_images(page, page_num)
for block in dict_blocks:
if block.get("type") != 0: continue
block_bbox = block.get("bbox", [0, 0, 0, 0])
skip = False
for t_bbox in table_bboxes:
if self._bboxes_overlap(block_bbox, t_bbox): skip = True; break
if skip: continue
for group in self._split_block_into_paragraphs(block, page_rect.width):
element = self._group_to_element(group)
if element: page_elements.append(element)
page_elements = [e for e in page_elements if e["text"].strip()]
page_elements = self._merge_continuation_paragraphs(page_elements)
page_elements = self._split_combined_list_items(page_elements)
page_elements.extend(tables)
page_elements.extend(images)
page_elements = self._get_reading_order(page_elements, page_rect.width)
all_pages.append({
"page_number": page_num,
"width": page_rect.width,
"height": page_rect.height,
"elements": page_elements
})
return {
"page_count": len(self.doc),
"metadata": {
"title": self.doc.metadata.get("title", ""),
"author": self.doc.metadata.get("author", ""),
"subject": self.doc.metadata.get("subject", ""),
},
"pages": all_pages
}
def to_markdown(self, processed_data):
"""Convert processed data to markdown blocks WITH typography metadata."""
blocks = []
for page in processed_data.get("pages", []):
for elem in page.get("elements", []):
elem_type = elem.get("type")
# Base block data
block = None
if elem_type == "title":
block = {"type": "heading1", "content": f"# {elem.get('text', '')}"}
elif elem_type == "subtitle":
block = {"type": "heading2", "content": f"## {elem.get('text', '')}"}
elif elem_type == "heading":
block = {"type": "heading3", "content": f"### {elem.get('text', '')}"}
elif elem_type == "paragraph":
block = {"type": "paragraph", "content": elem.get('text', '')}
elif elem_type == "list_item":
text = elem.get('text', '')
for pattern in self.LIST_PATTERNS: text = re.sub(pattern, '', text, count=1)
block = {"type": "list_item", "content": f"- {text.strip()}"}
elif elem_type == "numbered_list":
text = elem.get('text', '')
for pattern in self.LIST_PATTERNS: text = re.sub(pattern, '', text, count=1)
block = {"type": "numbered_list", "content": f"1. {text.strip()}"}
elif elem_type == "quote":
block = {"type": "quote", "content": f"> {elem.get('text', '')}"}
elif elem_type == "table":
block = {"type": "table", "content": elem.get('markdown', '')}
elif elem_type == "image":
if elem.get("data"):
block = {
"type": "image",
"content": f"![PDF Image](embedded-image.{elem.get('format', 'png')})",
"data": elem.get("data"), "format": elem.get("format", "png")
}
if block:
# ADD typography metadata for AI analysis
if elem.get("font_size"):
block["font_size"] = elem["font_size"]
if elem.get("flags") is not None:
block["is_bold"] = bool(elem["flags"] & (2 ** 4))
blocks.append(block)
# Apply the list grouping logic
return _group_lists(blocks)
def process_pdf_to_markdown(pdf_bytes):
processor = PDFProcessor(pdf_bytes)
try:
processed_data = processor.process()
markdown_blocks = processor.to_markdown(processed_data)
return {
"page_count": processed_data["page_count"],
"metadata": processed_data["metadata"],
"markdown_blocks": markdown_blocks
}
finally:
processor.close()