Audiobook Maker Pro v4.2 — production ready

This commit is contained in:
Ashim Kumar
2026-05-22 18:28:47 +06:00
commit 0617a374dd
41 changed files with 15262 additions and 0 deletions

643
pdf_processor.py Normal file
View File

@@ -0,0 +1,643 @@
# pdf_processor.py - PDF Processing and Content Extraction
# UPDATED: Blocks now include font_size and is_bold metadata for AI analysis
import base64
import re
import fitz # PyMuPDF
# ================================================================
# LIST GROUPING HELPER
# ================================================================
def _group_lists(blocks):
"""Groups consecutive list items into a cohesive Markdown list block."""
final_blocks = []
list_buffer = []
list_type = None
def flush():
if not list_buffer: return
lines = []
for i, text in enumerate(list_buffer):
clean_text = re.sub(r'^\s*[-*+]\s+', '', text)
clean_text = re.sub(r'^\s*\d+\.\s+', '', clean_text)
if list_type == 'numbered_list':
lines.append(f"{i+1}. {clean_text}")
else:
lines.append(f"- {clean_text}")
final_blocks.append({
'type': 'paragraph',
'content': '\n'.join(lines)
})
list_buffer.clear()
for b in blocks:
if b.get('type') in ['image', 'table']:
flush()
list_type = None
final_blocks.append(b)
continue
content = b.get('content', '')
bt = b.get('type', '')
is_bullet = bt == 'list_item' or content.startswith('- ') or content.startswith('* ')
is_number = bt == 'numbered_list' or re.match(r'^\s*\d+\.\s+', content)
if bt.startswith('heading'):
is_bullet = False
is_number = False
if is_bullet or is_number:
current_type = 'numbered_list' if is_number else 'bullet_list'
if list_type and list_type != current_type:
flush()
list_type = current_type
list_buffer.append(content)
else:
flush()
list_type = None
final_blocks.append(b)
flush()
return final_blocks
class PDFProcessor:
"""Process PDF files and extract structured content."""
TITLE_SIZE_THRESHOLD = 24
SUBTITLE_SIZE_THRESHOLD = 18
HEADING_SIZE_THRESHOLD = 14
TITLE_RATIO = 1.8
SUBTITLE_RATIO = 1.4
HEADING_RATIO = 1.2
LIST_PATTERNS = [
r'^\s*[\u2022\u2023\u25E6\u2043\u2219•●○◦‣·∙]\s*',
r'^\s*[-–—]\s+',
r'^\s*\d+[.)]\s+',
r'^\s*[a-zA-Z][.)]\s+',
r'^\s*[ivxIVX]+[.)]\s+',
]
BULLET_CHARS = set('•●○◦‣⁃·∙\u2022\u2023\u25E6\u2043\u2219-–—')
INLINE_BULLET_SPLIT = re.compile(
r'\s*[\u2022\u2023\u25E6\u2043\u2219•●○◦‣·∙]\s+'
)
QUOTE_PATTERNS = [
r'^[\"\'\u201C\u201D\u2018\u2019].+[\"\'\u201C\u201D\u2018\u2019]$',
]
TOC_LEADER_PATTERN = re.compile(r'[.…·]{3,}\s*\.?\s*\d+\s*$')
def __init__(self, pdf_bytes):
self.doc = fitz.open(stream=pdf_bytes, filetype="pdf")
self.elements = []
self.font_sizes = []
self.median_size = 12
self.body_size = 12
def close(self):
if self.doc:
self.doc.close()
def _analyze_font_distribution(self):
font_size_counts = {}
for page in self.doc:
blocks = page.get_text("dict", flags=11)["blocks"]
for block in blocks:
if block.get("type") == 0:
for line in block.get("lines", []):
for span in line.get("spans", []):
size = round(span.get("size", 12), 1)
text = span.get("text", "").strip()
if text:
self.font_sizes.append(size)
font_size_counts[size] = font_size_counts.get(size, 0) + len(text)
if self.font_sizes:
self.font_sizes.sort()
n = len(self.font_sizes)
self.median_size = self.font_sizes[n // 2]
if font_size_counts:
self.body_size = max(font_size_counts.keys(), key=lambda x: font_size_counts[x])
else:
self.body_size = self.median_size
def _is_likely_heading(self, text, font_size, flags):
text_stripped = text.strip()
if not text_stripped:
return False, None
is_bold = bool(flags & 2 ** 4)
is_all_caps = text_stripped.isupper() and len(text_stripped) > 3
size_ratio = font_size / self.body_size if self.body_size > 0 else 1
if size_ratio >= self.TITLE_RATIO or font_size >= self.TITLE_SIZE_THRESHOLD:
if len(text_stripped) < 200:
return True, "title"
if size_ratio >= self.SUBTITLE_RATIO or font_size >= self.SUBTITLE_SIZE_THRESHOLD:
if len(text_stripped) < 150:
return True, "subtitle"
if size_ratio >= self.HEADING_RATIO and is_bold:
if len(text_stripped) < 100:
return True, "heading"
if is_all_caps and is_bold and len(text_stripped) < 80:
return True, "heading"
if is_bold and len(text_stripped) < 60:
return True, "heading"
return False, None
def _classify_element(self, text, font_size, flags, is_italic=False, bbox=None):
text_stripped = text.strip()
if not text_stripped:
return None
is_bold = bool(flags & 2 ** 4)
is_heading, heading_type = self._is_likely_heading(text_stripped, font_size, flags)
if is_heading:
return heading_type
for pattern in self.LIST_PATTERNS:
if re.match(pattern, text_stripped):
if re.match(r'^\s*\d+[.)]\s+', text_stripped) or \
re.match(r'^\s*[a-zA-Z][.)]\s+', text_stripped) or \
re.match(r'^\s*[ivxIVX]+[.)]\s+', text_stripped):
return "numbered_list"
return "list_item"
if is_italic and len(text_stripped) > 50:
return "quote"
for pattern in self.QUOTE_PATTERNS:
if re.match(pattern, text_stripped):
return "quote"
return "paragraph"
def _extract_images(self, page, page_num):
images = []
image_list = page.get_images(full=True)
for img_index, img in enumerate(image_list):
try:
xref = img[0]
base_image = self.doc.extract_image(xref)
if base_image:
image_bytes = base_image["image"]
image_ext = base_image["ext"]
img_rects = page.get_image_rects(img)
bbox = None
if img_rects:
rect = img_rects[0]
bbox = [rect.x0, rect.y0, rect.x1, rect.y1]
images.append({
"type": "image",
"data": base64.b64encode(image_bytes).decode('utf-8'),
"format": image_ext,
"bbox": bbox,
"width": base_image.get("width", 0),
"height": base_image.get("height", 0),
})
except Exception: pass
return images
def _extract_tables(self, page, page_num):
tables = []
try:
table_finder = page.find_tables()
for table_index, table in enumerate(table_finder):
try:
table_data = table.extract()
bbox = list(table.bbox)
markdown_table = self._table_to_markdown(table_data)
tables.append({
"type": "table",
"data": table_data,
"markdown": markdown_table,
"bbox": bbox,
})
except Exception: pass
except Exception: pass
return tables
def _table_to_markdown(self, table_data):
if not table_data: return ""
lines = []
for row_idx, row in enumerate(table_data):
cells = [str(cell).replace('|', '\\|').replace('\n', ' ') if cell else '' for cell in row]
lines.append('| ' + ' | '.join(cells) + ' |')
if row_idx == 0:
lines.append('| ' + ' | '.join(['---'] * len(cells)) + ' |')
return '\n'.join(lines)
def _get_reading_order(self, elements, page_width):
if not elements: return elements
mid_x = page_width / 2
left_col, right_col, full_width = [], [], []
for elem in elements:
bbox = elem.get("bbox")
if not bbox:
full_width.append(elem)
continue
x0, y0, x1, y1 = bbox
width = x1 - x0
if width > page_width * 0.6:
full_width.append(elem)
elif x1 < mid_x:
left_col.append(elem)
elif x0 > mid_x:
right_col.append(elem)
else:
full_width.append(elem)
sort_by_y = lambda e: (e.get("bbox") or [0, 0, 0, 0])[1]
left_col.sort(key=sort_by_y)
right_col.sort(key=sort_by_y)
full_width.sort(key=sort_by_y)
all_elements = [(e, "full") for e in full_width]
all_elements += [(e, "left") for e in left_col]
all_elements += [(e, "right") for e in right_col]
all_elements.sort(key=lambda x: (x[0].get("bbox") or [0, 0, 0, 0])[1])
result = [e[0] for e in all_elements]
for idx, elem in enumerate(result):
elem["reading_order"] = idx
return result
def _bboxes_overlap(self, bbox1, bbox2, threshold=0.5):
if not bbox1 or not bbox2: return False
x1_min, y1_min, x1_max, y1_max = bbox1
x2_min, y2_min, x2_max, y2_max = bbox2
x_overlap = max(0, min(x1_max, x2_max) - max(x1_min, x2_min))
y_overlap = max(0, min(y1_max, y2_max) - max(y1_min, y2_min))
intersection = x_overlap * y_overlap
area1 = (x1_max - x1_min) * (y1_max - y1_min)
if area1 == 0: return False
return intersection / area1 > threshold
def _extract_line_info(self, line):
text = ""
total_chars = 0
weighted_size = 0.0
combined_flags = 0
for span in line.get("spans", []):
span_text = span.get("text", "")
span_size = span.get("size", 12)
span_flags = span.get("flags", 0)
if span_text.strip():
char_count = len(span_text)
text += span_text
weighted_size = ((weighted_size * total_chars + span_size * char_count) /
(total_chars + char_count)) if (total_chars + char_count) > 0 else span_size
total_chars += char_count
combined_flags |= span_flags
stripped = text.strip()
return {
"text": text,
"stripped": stripped,
"bbox": list(line.get("bbox", [0, 0, 0, 0])),
"font_size": round(weighted_size, 1),
"flags": combined_flags,
"is_bold": bool(combined_flags & (2 ** 4)),
"is_italic": bool(combined_flags & (2 ** 1)),
"char_count": total_chars,
"is_bullet": len(stripped) <= 2 and bool(stripped) and all(c in self.BULLET_CHARS for c in stripped),
"is_single_line_entry": False,
}
def _is_single_line_entry(self, info, page_width):
text = info["stripped"]
if not text: return False
if self.TOC_LEADER_PATTERN.search(text): return True
if re.search(r'\d+\s*$', text) and '' in text: return True
return False
def _should_break_between(self, prev_info, curr_info, median_gap, avg_line_height, page_width):
if prev_info["is_bullet"]: return False
prev_bbox = prev_info["bbox"]
curr_bbox = curr_info["bbox"]
gap = curr_bbox[1] - prev_bbox[3]
size_diff = abs(curr_info["font_size"] - prev_info["font_size"])
if size_diff > 1.5: return True
if prev_info["is_bold"] != curr_info["is_bold"]: return True
if median_gap > 0:
gap_ratio = gap / median_gap if median_gap > 0 else 1
if gap_ratio >= 2.0: return True
if gap_ratio >= 1.5:
if prev_info["stripped"] and prev_info["stripped"][-1] in '.!?:"\u201D\u2019':
return True
if gap > avg_line_height * 1.0: return True
x_diff = abs(curr_bbox[0] - prev_bbox[0])
if x_diff > 25: return True
if prev_info.get("is_single_line_entry"): return True
if prev_info["is_bold"] and curr_info["is_bold"]:
prev_line_width = prev_bbox[2] - prev_bbox[0]
if page_width > 0 and prev_line_width < page_width * 0.75:
return True
return False
def _merge_bullet_lines(self, line_infos):
if not line_infos: return line_infos
merged = []
i = 0
while i < len(line_infos):
info = line_infos[i]
if info["is_bullet"] and i + 1 < len(line_infos):
next_info = line_infos[i + 1]
bullet_char = info["stripped"]
merged.append({
"text": bullet_char + " " + next_info["text"],
"stripped": bullet_char + " " + next_info["stripped"],
"bbox": [
min(info["bbox"][0], next_info["bbox"][0]),
min(info["bbox"][1], next_info["bbox"][1]),
max(info["bbox"][2], next_info["bbox"][2]),
max(info["bbox"][3], next_info["bbox"][3]),
],
"font_size": next_info["font_size"],
"flags": next_info["flags"],
"is_bold": next_info["is_bold"],
"is_italic": next_info["is_italic"],
"char_count": info["char_count"] + next_info["char_count"],
"is_bullet": False,
"is_single_line_entry": False,
})
i += 2
else:
merged.append(info)
i += 1
return merged
def _split_block_into_paragraphs(self, block, page_width):
lines = block.get("lines", [])
if not lines: return []
line_infos = []
for line in lines:
info = self._extract_line_info(line)
if info["stripped"]: line_infos.append(info)
if not line_infos: return []
line_infos = self._merge_bullet_lines(line_infos)
for info in line_infos:
info["is_single_line_entry"] = self._is_single_line_entry(info, page_width)
if len(line_infos) == 1: return [line_infos]
gaps = []
line_heights = []
for i in range(len(line_infos)):
h = line_infos[i]["bbox"][3] - line_infos[i]["bbox"][1]
line_heights.append(h)
if i > 0:
gap = line_infos[i]["bbox"][1] - line_infos[i - 1]["bbox"][3]
gaps.append(gap)
avg_line_height = sum(line_heights) / len(line_heights) if line_heights else 12
median_gap = sorted(gaps)[len(gaps) // 2] if gaps else avg_line_height * 0.3
paragraphs = []
current_group = [line_infos[0]]
for i in range(1, len(line_infos)):
if self._should_break_between(line_infos[i - 1], line_infos[i], median_gap, avg_line_height, page_width):
paragraphs.append(current_group)
current_group = [line_infos[i]]
else:
current_group.append(line_infos[i])
if current_group: paragraphs.append(current_group)
return paragraphs
def _group_to_element(self, line_group):
text = " ".join(info["stripped"] for info in line_group if info["stripped"])
if not text.strip(): return None
total_chars = sum(info["char_count"] for info in line_group)
font_size = sum(info["font_size"] * info["char_count"] for info in line_group) / total_chars if total_chars > 0 else self.body_size
flags = 0
for info in line_group: flags |= info["flags"]
x0 = min(info["bbox"][0] for info in line_group)
y0 = min(info["bbox"][1] for info in line_group)
x1 = max(info["bbox"][2] for info in line_group)
y1 = max(info["bbox"][3] for info in line_group)
elem_type = self._classify_element(text, font_size, flags, bool(flags & (2 ** 1)), [x0, y0, x1, y1])
if elem_type:
return {
"type": elem_type,
"text": text.strip(),
"bbox": [x0, y0, x1, y1],
"font_size": round(font_size, 1),
"flags": flags,
}
return None
def _should_merge_elements(self, prev_elem, curr_elem):
if prev_elem["type"] != "paragraph" or curr_elem["type"] != "paragraph": return False
if abs(prev_elem["font_size"] - curr_elem["font_size"]) > 1.5: return False
prev_bold = bool(prev_elem.get("flags", 0) & (2 ** 4))
curr_bold = bool(curr_elem.get("flags", 0) & (2 ** 4))
if prev_bold != curr_bold: return False
prev_text = prev_elem["text"].strip()
curr_text = curr_elem["text"].strip()
if not prev_text or not curr_text: return False
if self.TOC_LEADER_PATTERN.search(prev_text): return False
last_char = prev_text[-1]
if last_char in '.!?':
if curr_text and curr_text[0].islower(): return True
return False
if last_char in '"\u201D\u2019':
if len(prev_text) >= 2 and prev_text[-2] in '.!?':
if curr_text and curr_text[0].islower(): return True
return False
return True
def _merge_continuation_paragraphs(self, elements):
if len(elements) <= 1: return elements
merged = [elements[0]]
for i in range(1, len(elements)):
prev = merged[-1]
curr = elements[i]
if self._should_merge_elements(prev, curr):
prev_bbox = prev["bbox"]
curr_bbox = curr["bbox"]
merged[-1] = {
"type": "paragraph",
"text": prev["text"].rstrip() + " " + curr["text"].lstrip(),
"bbox": [
min(prev_bbox[0], curr_bbox[0]),
min(prev_bbox[1], curr_bbox[1]),
max(prev_bbox[2], curr_bbox[2]),
max(prev_bbox[3], curr_bbox[3]),
],
"font_size": prev["font_size"],
"flags": prev.get("flags", 0),
}
else:
merged.append(curr)
return merged
def _split_combined_list_items(self, elements):
result = []
for elem in elements:
if elem["type"] != "list_item":
result.append(elem)
continue
text = elem["text"].strip()
cleaned = text
for pattern in self.LIST_PATTERNS:
cleaned = re.sub(pattern, '', cleaned, count=1).strip()
parts = self.INLINE_BULLET_SPLIT.split(cleaned)
parts = [p.strip() for p in parts if p.strip()]
if len(parts) <= 1:
result.append(elem)
else:
bbox = elem["bbox"]
item_height = (bbox[3] - bbox[1]) / len(parts) if len(parts) > 0 else 0
for idx, part in enumerate(parts):
result.append({
"type": "list_item",
"text": part.strip(),
"bbox": [bbox[0], bbox[1] + idx * item_height, bbox[2], bbox[1] + (idx + 1) * item_height],
"font_size": elem["font_size"],
"flags": elem.get("flags", 0),
})
return result
def process(self):
self._analyze_font_distribution()
all_pages = []
for page_num, page in enumerate(self.doc):
page_elements = []
page_rect = page.rect
dict_blocks = page.get_text("dict", flags=11)["blocks"]
tables = self._extract_tables(page, page_num)
table_bboxes = [t["bbox"] for t in tables if t.get("bbox")]
images = self._extract_images(page, page_num)
for block in dict_blocks:
if block.get("type") != 0: continue
block_bbox = block.get("bbox", [0, 0, 0, 0])
skip = False
for t_bbox in table_bboxes:
if self._bboxes_overlap(block_bbox, t_bbox): skip = True; break
if skip: continue
for group in self._split_block_into_paragraphs(block, page_rect.width):
element = self._group_to_element(group)
if element: page_elements.append(element)
page_elements = [e for e in page_elements if e["text"].strip()]
page_elements = self._merge_continuation_paragraphs(page_elements)
page_elements = self._split_combined_list_items(page_elements)
page_elements.extend(tables)
page_elements.extend(images)
page_elements = self._get_reading_order(page_elements, page_rect.width)
all_pages.append({
"page_number": page_num,
"width": page_rect.width,
"height": page_rect.height,
"elements": page_elements
})
return {
"page_count": len(self.doc),
"metadata": {
"title": self.doc.metadata.get("title", ""),
"author": self.doc.metadata.get("author", ""),
"subject": self.doc.metadata.get("subject", ""),
},
"pages": all_pages
}
def to_markdown(self, processed_data):
"""Convert processed data to markdown blocks WITH typography metadata."""
blocks = []
for page in processed_data.get("pages", []):
for elem in page.get("elements", []):
elem_type = elem.get("type")
# Base block data
block = None
if elem_type == "title":
block = {"type": "heading1", "content": f"# {elem.get('text', '')}"}
elif elem_type == "subtitle":
block = {"type": "heading2", "content": f"## {elem.get('text', '')}"}
elif elem_type == "heading":
block = {"type": "heading3", "content": f"### {elem.get('text', '')}"}
elif elem_type == "paragraph":
block = {"type": "paragraph", "content": elem.get('text', '')}
elif elem_type == "list_item":
text = elem.get('text', '')
for pattern in self.LIST_PATTERNS: text = re.sub(pattern, '', text, count=1)
block = {"type": "list_item", "content": f"- {text.strip()}"}
elif elem_type == "numbered_list":
text = elem.get('text', '')
for pattern in self.LIST_PATTERNS: text = re.sub(pattern, '', text, count=1)
block = {"type": "numbered_list", "content": f"1. {text.strip()}"}
elif elem_type == "quote":
block = {"type": "quote", "content": f"> {elem.get('text', '')}"}
elif elem_type == "table":
block = {"type": "table", "content": elem.get('markdown', '')}
elif elem_type == "image":
if elem.get("data"):
block = {
"type": "image",
"content": f"![PDF Image](embedded-image.{elem.get('format', 'png')})",
"data": elem.get("data"), "format": elem.get("format", "png")
}
if block:
# ADD typography metadata for AI analysis
if elem.get("font_size"):
block["font_size"] = elem["font_size"]
if elem.get("flags") is not None:
block["is_bold"] = bool(elem["flags"] & (2 ** 4))
blocks.append(block)
# Apply the list grouping logic
return _group_lists(blocks)
def process_pdf_to_markdown(pdf_bytes):
processor = PDFProcessor(pdf_bytes)
try:
processed_data = processor.process()
markdown_blocks = processor.to_markdown(processed_data)
return {
"page_count": processed_data["page_count"],
"metadata": processed_data["metadata"],
"markdown_blocks": markdown_blocks
}
finally:
processor.close()