Files
audiobook-maker-pro-v4/pdf_processor.py
Ashim Kumar 8e02b9ad09 first commit
2026-02-20 13:53:36 +06:00

773 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# pdf_processor.py - PDF Processing and Content Extraction
import base64
import re
import fitz # PyMuPDF
class PDFProcessor:
"""Process PDF files and extract structured content."""
TITLE_SIZE_THRESHOLD = 24
SUBTITLE_SIZE_THRESHOLD = 18
HEADING_SIZE_THRESHOLD = 14
TITLE_RATIO = 1.8
SUBTITLE_RATIO = 1.4
HEADING_RATIO = 1.2
LIST_PATTERNS = [
r'^\s*[\u2022\u2023\u25E6\u2043\u2219•●○◦‣·∙]\s*',
r'^\s*[-–—]\s+',
r'^\s*\d+[.)]\s+',
r'^\s*[a-zA-Z][.)]\s+',
r'^\s*[ivxIVX]+[.)]\s+',
]
BULLET_CHARS = set('•●○◦‣⁃·∙\u2022\u2023\u25E6\u2043\u2219-–—')
INLINE_BULLET_SPLIT = re.compile(
r'\s*[\u2022\u2023\u25E6\u2043\u2219•●○◦‣·∙]\s+'
)
QUOTE_PATTERNS = [
r'^[\"\'\u201C\u201D\u2018\u2019].+[\"\'\u201C\u201D\u2018\u2019]$',
]
# Pattern for TOC-style dot leaders: text followed by dots and a page number
TOC_LEADER_PATTERN = re.compile(r'[.…·]{3,}\s*\.?\s*\d+\s*$')
def __init__(self, pdf_bytes):
self.doc = fitz.open(stream=pdf_bytes, filetype="pdf")
self.elements = []
self.font_sizes = []
self.median_size = 12
self.body_size = 12
def close(self):
if self.doc:
self.doc.close()
def _analyze_font_distribution(self):
font_size_counts = {}
for page in self.doc:
blocks = page.get_text("dict", flags=11)["blocks"]
for block in blocks:
if block.get("type") == 0:
for line in block.get("lines", []):
for span in line.get("spans", []):
size = round(span.get("size", 12), 1)
text = span.get("text", "").strip()
if text:
self.font_sizes.append(size)
font_size_counts[size] = font_size_counts.get(size, 0) + len(text)
if self.font_sizes:
self.font_sizes.sort()
n = len(self.font_sizes)
self.median_size = self.font_sizes[n // 2]
if font_size_counts:
self.body_size = max(font_size_counts.keys(), key=lambda x: font_size_counts[x])
else:
self.body_size = self.median_size
def _is_likely_heading(self, text, font_size, flags):
text_stripped = text.strip()
if not text_stripped:
return False, None
is_bold = bool(flags & 2 ** 4)
is_all_caps = text_stripped.isupper() and len(text_stripped) > 3
size_ratio = font_size / self.body_size if self.body_size > 0 else 1
if size_ratio >= self.TITLE_RATIO or font_size >= self.TITLE_SIZE_THRESHOLD:
if len(text_stripped) < 200:
return True, "title"
if size_ratio >= self.SUBTITLE_RATIO or font_size >= self.SUBTITLE_SIZE_THRESHOLD:
if len(text_stripped) < 150:
return True, "subtitle"
if size_ratio >= self.HEADING_RATIO and is_bold:
if len(text_stripped) < 100:
return True, "heading"
if is_all_caps and is_bold and len(text_stripped) < 80:
return True, "heading"
# Short bold text is likely a heading even at body font size
if is_bold and len(text_stripped) < 60:
return True, "heading"
return False, None
def _classify_element(self, text, font_size, flags, is_italic=False, bbox=None):
text_stripped = text.strip()
if not text_stripped:
return None
is_bold = bool(flags & 2 ** 4)
# Check headings FIRST (before list patterns)
is_heading, heading_type = self._is_likely_heading(text_stripped, font_size, flags)
if is_heading:
return heading_type
# Then check list patterns
for pattern in self.LIST_PATTERNS:
if re.match(pattern, text_stripped):
return "list_item"
# Then check quotes
if is_italic and len(text_stripped) > 50:
return "quote"
for pattern in self.QUOTE_PATTERNS:
if re.match(pattern, text_stripped):
return "quote"
return "paragraph"
def _extract_images(self, page, page_num):
images = []
image_list = page.get_images(full=True)
for img_index, img in enumerate(image_list):
try:
xref = img[0]
base_image = self.doc.extract_image(xref)
if base_image:
image_bytes = base_image["image"]
image_ext = base_image["ext"]
img_rects = page.get_image_rects(img)
bbox = None
if img_rects:
rect = img_rects[0]
bbox = [rect.x0, rect.y0, rect.x1, rect.y1]
images.append({
"type": "image",
"data": base64.b64encode(image_bytes).decode('utf-8'),
"format": image_ext,
"bbox": bbox,
"width": base_image.get("width", 0),
"height": base_image.get("height", 0),
})
except Exception as e:
print(f"Error extracting image {img_index} from page {page_num}: {e}")
return images
def _extract_tables(self, page, page_num):
tables = []
try:
table_finder = page.find_tables()
for table_index, table in enumerate(table_finder):
try:
table_data = table.extract()
bbox = list(table.bbox)
markdown_table = self._table_to_markdown(table_data)
tables.append({
"type": "table",
"data": table_data,
"markdown": markdown_table,
"bbox": bbox,
})
except Exception as e:
print(f"Error extracting table {table_index} from page {page_num}: {e}")
except Exception as e:
print(f"Error finding tables on page {page_num}: {e}")
return tables
def _table_to_markdown(self, table_data):
if not table_data:
return ""
lines = []
for row_idx, row in enumerate(table_data):
cells = [str(cell).replace('|', '\\|').replace('\n', ' ') if cell else '' for cell in row]
lines.append('| ' + ' | '.join(cells) + ' |')
if row_idx == 0:
lines.append('| ' + ' | '.join(['---'] * len(cells)) + ' |')
return '\n'.join(lines)
def _get_reading_order(self, elements, page_width):
if not elements:
return elements
mid_x = page_width / 2
left_col, right_col, full_width = [], [], []
for elem in elements:
bbox = elem.get("bbox")
if not bbox:
full_width.append(elem)
continue
x0, y0, x1, y1 = bbox
width = x1 - x0
if width > page_width * 0.6:
full_width.append(elem)
elif x1 < mid_x:
left_col.append(elem)
elif x0 > mid_x:
right_col.append(elem)
else:
full_width.append(elem)
sort_by_y = lambda e: e.get("bbox", [0, 0, 0, 0])[1]
left_col.sort(key=sort_by_y)
right_col.sort(key=sort_by_y)
full_width.sort(key=sort_by_y)
all_elements = [(e, "full") for e in full_width]
all_elements += [(e, "left") for e in left_col]
all_elements += [(e, "right") for e in right_col]
all_elements.sort(key=lambda x: x[0].get("bbox", [0, 0, 0, 0])[1])
result = [e[0] for e in all_elements]
for idx, elem in enumerate(result):
elem["reading_order"] = idx
return result
def _bboxes_overlap(self, bbox1, bbox2, threshold=0.5):
if not bbox1 or not bbox2:
return False
x1_min, y1_min, x1_max, y1_max = bbox1
x2_min, y2_min, x2_max, y2_max = bbox2
x_overlap = max(0, min(x1_max, x2_max) - max(x1_min, x2_min))
y_overlap = max(0, min(y1_max, y2_max) - max(y1_min, y2_min))
intersection = x_overlap * y_overlap
area1 = (x1_max - x1_min) * (y1_max - y1_min)
if area1 == 0:
return False
return intersection / area1 > threshold
# ================================================================
# LINE-LEVEL ANALYSIS
# ================================================================
def _extract_line_info(self, line):
"""Extract enriched info from a single dict-mode line."""
text = ""
total_chars = 0
weighted_size = 0.0
combined_flags = 0
for span in line.get("spans", []):
span_text = span.get("text", "")
span_size = span.get("size", 12)
span_flags = span.get("flags", 0)
if span_text.strip():
char_count = len(span_text)
text += span_text
weighted_size = (
(weighted_size * total_chars + span_size * char_count) /
(total_chars + char_count)
) if (total_chars + char_count) > 0 else span_size
total_chars += char_count
combined_flags |= span_flags
stripped = text.strip()
return {
"text": text,
"stripped": stripped,
"bbox": list(line.get("bbox", [0, 0, 0, 0])),
"font_size": round(weighted_size, 1),
"flags": combined_flags,
"is_bold": bool(combined_flags & (2 ** 4)),
"is_italic": bool(combined_flags & (2 ** 1)),
"char_count": total_chars,
"is_bullet": len(stripped) <= 2 and bool(stripped) and all(c in self.BULLET_CHARS for c in stripped),
"is_single_line_entry": False, # Set during analysis
}
def _is_single_line_entry(self, info, page_width):
"""
Determine if a line is a self-contained single-line entry
(like a TOC entry, a short heading, etc.) rather than a wrapped
continuation of a multi-line paragraph.
Signals:
- Contains dot leaders (TOC pattern)
- Is a single line that doesn't reach near the right margin (not a wrapped line)
- Ends with a number (page reference)
"""
text = info["stripped"]
if not text:
return False
# TOC dot leader pattern: "Chapter 1 - Something.............3"
if self.TOC_LEADER_PATTERN.search(text):
return True
# Ends with a digit (possible page number) and has dots
if re.search(r'\d+\s*$', text) and '' in text:
return True
return False
# ================================================================
# MULTI-SIGNAL PARAGRAPH SPLITTING
# ================================================================
def _should_break_between(self, prev_info, curr_info, median_gap, avg_line_height, page_width):
"""Decide whether a paragraph break should be inserted between two lines."""
if prev_info["is_bullet"]:
return False
prev_bbox = prev_info["bbox"]
curr_bbox = curr_info["bbox"]
gap = curr_bbox[1] - prev_bbox[3]
# --- Signal 1: FONT SIZE CHANGE ---
size_diff = abs(curr_info["font_size"] - prev_info["font_size"])
if size_diff > 1.5:
return True
# --- Signal 2: STYLE CHANGE (bold boundary) ---
if prev_info["is_bold"] != curr_info["is_bold"]:
# Any bold/non-bold transition = structural break
return True
# --- Signal 3: VERTICAL GAP (relative) ---
if median_gap > 0:
gap_ratio = gap / median_gap if median_gap > 0 else 1
if gap_ratio >= 2.0:
return True
if gap_ratio >= 1.5:
prev_text = prev_info["stripped"]
if prev_text and prev_text[-1] in '.!?:"\u201D\u2019':
return True
# --- Signal 4: ABSOLUTE GAP ---
if gap > avg_line_height * 1.0:
return True
# --- Signal 5: INDENTATION CHANGE ---
x_diff = abs(curr_bbox[0] - prev_bbox[0])
if x_diff > 25:
return True
# --- Signal 6: SINGLE-LINE ENTRIES ---
# If previous line is a self-contained entry (e.g. TOC line with dot leaders),
# break even if font/style/gap are the same
if prev_info.get("is_single_line_entry"):
return True
# --- Signal 7: BOTH BOLD + PREVIOUS IS SHORT ---
# When both lines are bold and the previous line is relatively short
# (not reaching near the right margin), each is likely a separate heading/entry.
# This handles TOC entries, section headings, etc. that are all bold+same size.
if prev_info["is_bold"] and curr_info["is_bold"]:
prev_line_width = prev_bbox[2] - prev_bbox[0]
# Compare against page content width (approximate)
if page_width > 0 and prev_line_width < page_width * 0.75:
return True
return False
def _merge_bullet_lines(self, line_infos):
"""Merge bullet character lines with their following text lines."""
if not line_infos:
return line_infos
merged = []
i = 0
while i < len(line_infos):
info = line_infos[i]
if info["is_bullet"] and i + 1 < len(line_infos):
next_info = line_infos[i + 1]
bullet_char = info["stripped"]
merged_text = bullet_char + " " + next_info["text"]
merged_stripped = bullet_char + " " + next_info["stripped"]
merged_info = {
"text": merged_text,
"stripped": merged_stripped,
"bbox": [
min(info["bbox"][0], next_info["bbox"][0]),
min(info["bbox"][1], next_info["bbox"][1]),
max(info["bbox"][2], next_info["bbox"][2]),
max(info["bbox"][3], next_info["bbox"][3]),
],
"font_size": next_info["font_size"],
"flags": next_info["flags"],
"is_bold": next_info["is_bold"],
"is_italic": next_info["is_italic"],
"char_count": info["char_count"] + next_info["char_count"],
"is_bullet": False,
"is_single_line_entry": False,
}
merged.append(merged_info)
i += 2
else:
merged.append(info)
i += 1
return merged
def _split_block_into_paragraphs(self, block, page_width):
"""Split a single dict-mode block into paragraph groups."""
lines = block.get("lines", [])
if not lines:
return []
line_infos = []
for line in lines:
info = self._extract_line_info(line)
if info["stripped"]:
line_infos.append(info)
if not line_infos:
return []
line_infos = self._merge_bullet_lines(line_infos)
# Mark single-line entries (TOC lines, etc.)
for info in line_infos:
info["is_single_line_entry"] = self._is_single_line_entry(info, page_width)
if len(line_infos) == 1:
return [line_infos]
gaps = []
line_heights = []
for i in range(len(line_infos)):
h = line_infos[i]["bbox"][3] - line_infos[i]["bbox"][1]
line_heights.append(h)
if i > 0:
gap = line_infos[i]["bbox"][1] - line_infos[i - 1]["bbox"][3]
gaps.append(gap)
avg_line_height = sum(line_heights) / len(line_heights) if line_heights else 12
if gaps:
sorted_gaps = sorted(gaps)
median_gap = sorted_gaps[len(sorted_gaps) // 2]
else:
median_gap = avg_line_height * 0.3
paragraphs = []
current_group = [line_infos[0]]
for i in range(1, len(line_infos)):
if self._should_break_between(
line_infos[i - 1], line_infos[i],
median_gap, avg_line_height, page_width
):
paragraphs.append(current_group)
current_group = [line_infos[i]]
else:
current_group.append(line_infos[i])
if current_group:
paragraphs.append(current_group)
return paragraphs
def _group_to_element(self, line_group):
"""Convert a group of line-infos into a single page element dict."""
text = " ".join(info["stripped"] for info in line_group if info["stripped"])
if not text.strip():
return None
total_chars = sum(info["char_count"] for info in line_group)
if total_chars > 0:
font_size = sum(
info["font_size"] * info["char_count"] for info in line_group
) / total_chars
else:
font_size = self.body_size
flags = 0
for info in line_group:
flags |= info["flags"]
x0 = min(info["bbox"][0] for info in line_group)
y0 = min(info["bbox"][1] for info in line_group)
x1 = max(info["bbox"][2] for info in line_group)
y1 = max(info["bbox"][3] for info in line_group)
is_italic = bool(flags & (2 ** 1))
elem_type = self._classify_element(text, font_size, flags, is_italic, [x0, y0, x1, y1])
if elem_type:
return {
"type": elem_type,
"text": text.strip(),
"bbox": [x0, y0, x1, y1],
"font_size": font_size,
"flags": flags,
}
return None
# ================================================================
# POST-PROCESSING
# ================================================================
def _should_merge_elements(self, prev_elem, curr_elem):
"""
Determine if two consecutive elements should be merged because
they are continuations of the same paragraph split across PyMuPDF blocks.
"""
# Only merge paragraph + paragraph
if prev_elem["type"] != "paragraph" or curr_elem["type"] != "paragraph":
return False
# Font size must be similar
if abs(prev_elem["font_size"] - curr_elem["font_size"]) > 1.5:
return False
# Don't merge if styles differ
prev_bold = bool(prev_elem.get("flags", 0) & (2 ** 4))
curr_bold = bool(curr_elem.get("flags", 0) & (2 ** 4))
if prev_bold != curr_bold:
return False
prev_text = prev_elem["text"].strip()
curr_text = curr_elem["text"].strip()
if not prev_text or not curr_text:
return False
# Don't merge if prev contains dot leaders (TOC entry)
if self.TOC_LEADER_PATTERN.search(prev_text):
return False
last_char = prev_text[-1]
if last_char in '.!?':
if curr_text and curr_text[0].islower():
return True
return False
if last_char in '"\u201D\u2019':
if len(prev_text) >= 2 and prev_text[-2] in '.!?':
if curr_text and curr_text[0].islower():
return True
return False
# Doesn't end with sentence-ending punctuation — likely mid-paragraph
return True
def _merge_continuation_paragraphs(self, elements):
"""Merge consecutive paragraph elements that are continuations."""
if len(elements) <= 1:
return elements
merged = [elements[0]]
for i in range(1, len(elements)):
prev = merged[-1]
curr = elements[i]
if self._should_merge_elements(prev, curr):
combined_text = prev["text"].rstrip() + " " + curr["text"].lstrip()
prev_bbox = prev["bbox"]
curr_bbox = curr["bbox"]
combined_bbox = [
min(prev_bbox[0], curr_bbox[0]),
min(prev_bbox[1], curr_bbox[1]),
max(prev_bbox[2], curr_bbox[2]),
max(prev_bbox[3], curr_bbox[3]),
]
merged[-1] = {
"type": "paragraph",
"text": combined_text,
"bbox": combined_bbox,
"font_size": prev["font_size"],
"flags": prev.get("flags", 0),
}
else:
merged.append(curr)
return merged
def _split_combined_list_items(self, elements):
"""Split list_item elements that contain multiple inline bullet items."""
result = []
for elem in elements:
if elem["type"] != "list_item":
result.append(elem)
continue
text = elem["text"].strip()
cleaned = text
for pattern in self.LIST_PATTERNS:
cleaned = re.sub(pattern, '', cleaned, count=1).strip()
parts = self.INLINE_BULLET_SPLIT.split(cleaned)
parts = [p.strip() for p in parts if p.strip()]
if len(parts) <= 1:
result.append(elem)
else:
bbox = elem["bbox"]
total_height = bbox[3] - bbox[1]
item_height = total_height / len(parts) if len(parts) > 0 else total_height
for idx, part in enumerate(parts):
item_bbox = [
bbox[0],
bbox[1] + idx * item_height,
bbox[2],
bbox[1] + (idx + 1) * item_height,
]
result.append({
"type": "list_item",
"text": part.strip(),
"bbox": item_bbox,
"font_size": elem["font_size"],
"flags": elem.get("flags", 0),
})
return result
def process(self):
"""Process the entire PDF and extract all elements."""
self._analyze_font_distribution()
all_pages = []
total_images = 0
for page_num, page in enumerate(self.doc):
page_elements = []
page_rect = page.rect
dict_blocks = page.get_text("dict", flags=11)["blocks"]
tables = self._extract_tables(page, page_num)
table_bboxes = [t["bbox"] for t in tables if t.get("bbox")]
images = self._extract_images(page, page_num)
total_images += len(images)
for block in dict_blocks:
if block.get("type") != 0:
continue
block_bbox = block.get("bbox", [0, 0, 0, 0])
skip_block = False
for table_bbox in table_bboxes:
if self._bboxes_overlap(block_bbox, table_bbox):
skip_block = True
break
if skip_block:
continue
para_groups = self._split_block_into_paragraphs(block, page_rect.width)
for group in para_groups:
element = self._group_to_element(group)
if element:
page_elements.append(element)
page_elements = [e for e in page_elements if e["text"].strip()]
page_elements = self._merge_continuation_paragraphs(page_elements)
page_elements = self._split_combined_list_items(page_elements)
page_elements.extend(tables)
page_elements.extend(images)
page_elements = self._get_reading_order(page_elements, page_rect.width)
all_pages.append({
"page_number": page_num,
"width": page_rect.width,
"height": page_rect.height,
"elements": page_elements
})
print(f"📄 PDF processed: {len(self.doc)} pages, {total_images} images extracted")
return {
"page_count": len(self.doc),
"metadata": {
"title": self.doc.metadata.get("title", ""),
"author": self.doc.metadata.get("author", ""),
"subject": self.doc.metadata.get("subject", ""),
},
"pages": all_pages
}
def to_markdown(self, processed_data):
"""Convert processed PDF data to Markdown blocks."""
blocks = []
for page in processed_data.get("pages", []):
for elem in page.get("elements", []):
elem_type = elem.get("type")
if elem_type == "title":
blocks.append({
"type": "heading1",
"content": f"# {elem.get('text', '')}"
})
elif elem_type == "subtitle":
blocks.append({
"type": "heading2",
"content": f"## {elem.get('text', '')}"
})
elif elem_type == "heading":
blocks.append({
"type": "heading3",
"content": f"### {elem.get('text', '')}"
})
elif elem_type == "paragraph":
blocks.append({
"type": "paragraph",
"content": elem.get('text', '')
})
elif elem_type == "list_item":
text = elem.get('text', '')
for pattern in self.LIST_PATTERNS:
text = re.sub(pattern, '', text)
blocks.append({
"type": "list_item",
"content": f"- {text.strip()}"
})
elif elem_type == "quote":
blocks.append({
"type": "quote",
"content": f"> {elem.get('text', '')}"
})
elif elem_type == "table":
blocks.append({
"type": "table",
"content": elem.get('markdown', '')
})
elif elem_type == "image":
img_data = elem.get("data", "")
img_format = elem.get("format", "png")
if img_data:
blocks.append({
"type": "image",
"content": f"![PDF Image](embedded-image.{img_format})",
"data": img_data,
"format": img_format
})
return blocks
def process_pdf_to_markdown(pdf_bytes):
"""Process PDF bytes and return markdown blocks."""
processor = PDFProcessor(pdf_bytes)
try:
processed_data = processor.process()
markdown_blocks = processor.to_markdown(processed_data)
return {
"page_count": processed_data["page_count"],
"metadata": processed_data["metadata"],
"markdown_blocks": markdown_blocks
}
finally:
processor.close()