first commit

This commit is contained in:
Ashim Kumar
2026-02-20 13:53:36 +06:00
commit 8e02b9ad09
35 changed files with 11059 additions and 0 deletions

772
pdf_processor.py Normal file
View File

@@ -0,0 +1,772 @@
# pdf_processor.py - PDF Processing and Content Extraction
import base64
import re
import fitz # PyMuPDF
class PDFProcessor:
"""Process PDF files and extract structured content."""
TITLE_SIZE_THRESHOLD = 24
SUBTITLE_SIZE_THRESHOLD = 18
HEADING_SIZE_THRESHOLD = 14
TITLE_RATIO = 1.8
SUBTITLE_RATIO = 1.4
HEADING_RATIO = 1.2
LIST_PATTERNS = [
r'^\s*[\u2022\u2023\u25E6\u2043\u2219•●○◦‣·∙]\s*',
r'^\s*[-–—]\s+',
r'^\s*\d+[.)]\s+',
r'^\s*[a-zA-Z][.)]\s+',
r'^\s*[ivxIVX]+[.)]\s+',
]
BULLET_CHARS = set('•●○◦‣⁃·∙\u2022\u2023\u25E6\u2043\u2219-–—')
INLINE_BULLET_SPLIT = re.compile(
r'\s*[\u2022\u2023\u25E6\u2043\u2219•●○◦‣·∙]\s+'
)
QUOTE_PATTERNS = [
r'^[\"\'\u201C\u201D\u2018\u2019].+[\"\'\u201C\u201D\u2018\u2019]$',
]
# Pattern for TOC-style dot leaders: text followed by dots and a page number
TOC_LEADER_PATTERN = re.compile(r'[.…·]{3,}\s*\.?\s*\d+\s*$')
def __init__(self, pdf_bytes):
self.doc = fitz.open(stream=pdf_bytes, filetype="pdf")
self.elements = []
self.font_sizes = []
self.median_size = 12
self.body_size = 12
def close(self):
if self.doc:
self.doc.close()
def _analyze_font_distribution(self):
font_size_counts = {}
for page in self.doc:
blocks = page.get_text("dict", flags=11)["blocks"]
for block in blocks:
if block.get("type") == 0:
for line in block.get("lines", []):
for span in line.get("spans", []):
size = round(span.get("size", 12), 1)
text = span.get("text", "").strip()
if text:
self.font_sizes.append(size)
font_size_counts[size] = font_size_counts.get(size, 0) + len(text)
if self.font_sizes:
self.font_sizes.sort()
n = len(self.font_sizes)
self.median_size = self.font_sizes[n // 2]
if font_size_counts:
self.body_size = max(font_size_counts.keys(), key=lambda x: font_size_counts[x])
else:
self.body_size = self.median_size
def _is_likely_heading(self, text, font_size, flags):
text_stripped = text.strip()
if not text_stripped:
return False, None
is_bold = bool(flags & 2 ** 4)
is_all_caps = text_stripped.isupper() and len(text_stripped) > 3
size_ratio = font_size / self.body_size if self.body_size > 0 else 1
if size_ratio >= self.TITLE_RATIO or font_size >= self.TITLE_SIZE_THRESHOLD:
if len(text_stripped) < 200:
return True, "title"
if size_ratio >= self.SUBTITLE_RATIO or font_size >= self.SUBTITLE_SIZE_THRESHOLD:
if len(text_stripped) < 150:
return True, "subtitle"
if size_ratio >= self.HEADING_RATIO and is_bold:
if len(text_stripped) < 100:
return True, "heading"
if is_all_caps and is_bold and len(text_stripped) < 80:
return True, "heading"
# Short bold text is likely a heading even at body font size
if is_bold and len(text_stripped) < 60:
return True, "heading"
return False, None
def _classify_element(self, text, font_size, flags, is_italic=False, bbox=None):
text_stripped = text.strip()
if not text_stripped:
return None
is_bold = bool(flags & 2 ** 4)
# Check headings FIRST (before list patterns)
is_heading, heading_type = self._is_likely_heading(text_stripped, font_size, flags)
if is_heading:
return heading_type
# Then check list patterns
for pattern in self.LIST_PATTERNS:
if re.match(pattern, text_stripped):
return "list_item"
# Then check quotes
if is_italic and len(text_stripped) > 50:
return "quote"
for pattern in self.QUOTE_PATTERNS:
if re.match(pattern, text_stripped):
return "quote"
return "paragraph"
def _extract_images(self, page, page_num):
images = []
image_list = page.get_images(full=True)
for img_index, img in enumerate(image_list):
try:
xref = img[0]
base_image = self.doc.extract_image(xref)
if base_image:
image_bytes = base_image["image"]
image_ext = base_image["ext"]
img_rects = page.get_image_rects(img)
bbox = None
if img_rects:
rect = img_rects[0]
bbox = [rect.x0, rect.y0, rect.x1, rect.y1]
images.append({
"type": "image",
"data": base64.b64encode(image_bytes).decode('utf-8'),
"format": image_ext,
"bbox": bbox,
"width": base_image.get("width", 0),
"height": base_image.get("height", 0),
})
except Exception as e:
print(f"Error extracting image {img_index} from page {page_num}: {e}")
return images
def _extract_tables(self, page, page_num):
tables = []
try:
table_finder = page.find_tables()
for table_index, table in enumerate(table_finder):
try:
table_data = table.extract()
bbox = list(table.bbox)
markdown_table = self._table_to_markdown(table_data)
tables.append({
"type": "table",
"data": table_data,
"markdown": markdown_table,
"bbox": bbox,
})
except Exception as e:
print(f"Error extracting table {table_index} from page {page_num}: {e}")
except Exception as e:
print(f"Error finding tables on page {page_num}: {e}")
return tables
def _table_to_markdown(self, table_data):
if not table_data:
return ""
lines = []
for row_idx, row in enumerate(table_data):
cells = [str(cell).replace('|', '\\|').replace('\n', ' ') if cell else '' for cell in row]
lines.append('| ' + ' | '.join(cells) + ' |')
if row_idx == 0:
lines.append('| ' + ' | '.join(['---'] * len(cells)) + ' |')
return '\n'.join(lines)
def _get_reading_order(self, elements, page_width):
if not elements:
return elements
mid_x = page_width / 2
left_col, right_col, full_width = [], [], []
for elem in elements:
bbox = elem.get("bbox")
if not bbox:
full_width.append(elem)
continue
x0, y0, x1, y1 = bbox
width = x1 - x0
if width > page_width * 0.6:
full_width.append(elem)
elif x1 < mid_x:
left_col.append(elem)
elif x0 > mid_x:
right_col.append(elem)
else:
full_width.append(elem)
sort_by_y = lambda e: e.get("bbox", [0, 0, 0, 0])[1]
left_col.sort(key=sort_by_y)
right_col.sort(key=sort_by_y)
full_width.sort(key=sort_by_y)
all_elements = [(e, "full") for e in full_width]
all_elements += [(e, "left") for e in left_col]
all_elements += [(e, "right") for e in right_col]
all_elements.sort(key=lambda x: x[0].get("bbox", [0, 0, 0, 0])[1])
result = [e[0] for e in all_elements]
for idx, elem in enumerate(result):
elem["reading_order"] = idx
return result
def _bboxes_overlap(self, bbox1, bbox2, threshold=0.5):
if not bbox1 or not bbox2:
return False
x1_min, y1_min, x1_max, y1_max = bbox1
x2_min, y2_min, x2_max, y2_max = bbox2
x_overlap = max(0, min(x1_max, x2_max) - max(x1_min, x2_min))
y_overlap = max(0, min(y1_max, y2_max) - max(y1_min, y2_min))
intersection = x_overlap * y_overlap
area1 = (x1_max - x1_min) * (y1_max - y1_min)
if area1 == 0:
return False
return intersection / area1 > threshold
# ================================================================
# LINE-LEVEL ANALYSIS
# ================================================================
def _extract_line_info(self, line):
"""Extract enriched info from a single dict-mode line."""
text = ""
total_chars = 0
weighted_size = 0.0
combined_flags = 0
for span in line.get("spans", []):
span_text = span.get("text", "")
span_size = span.get("size", 12)
span_flags = span.get("flags", 0)
if span_text.strip():
char_count = len(span_text)
text += span_text
weighted_size = (
(weighted_size * total_chars + span_size * char_count) /
(total_chars + char_count)
) if (total_chars + char_count) > 0 else span_size
total_chars += char_count
combined_flags |= span_flags
stripped = text.strip()
return {
"text": text,
"stripped": stripped,
"bbox": list(line.get("bbox", [0, 0, 0, 0])),
"font_size": round(weighted_size, 1),
"flags": combined_flags,
"is_bold": bool(combined_flags & (2 ** 4)),
"is_italic": bool(combined_flags & (2 ** 1)),
"char_count": total_chars,
"is_bullet": len(stripped) <= 2 and bool(stripped) and all(c in self.BULLET_CHARS for c in stripped),
"is_single_line_entry": False, # Set during analysis
}
def _is_single_line_entry(self, info, page_width):
"""
Determine if a line is a self-contained single-line entry
(like a TOC entry, a short heading, etc.) rather than a wrapped
continuation of a multi-line paragraph.
Signals:
- Contains dot leaders (TOC pattern)
- Is a single line that doesn't reach near the right margin (not a wrapped line)
- Ends with a number (page reference)
"""
text = info["stripped"]
if not text:
return False
# TOC dot leader pattern: "Chapter 1 - Something.............3"
if self.TOC_LEADER_PATTERN.search(text):
return True
# Ends with a digit (possible page number) and has dots
if re.search(r'\d+\s*$', text) and '' in text:
return True
return False
# ================================================================
# MULTI-SIGNAL PARAGRAPH SPLITTING
# ================================================================
def _should_break_between(self, prev_info, curr_info, median_gap, avg_line_height, page_width):
"""Decide whether a paragraph break should be inserted between two lines."""
if prev_info["is_bullet"]:
return False
prev_bbox = prev_info["bbox"]
curr_bbox = curr_info["bbox"]
gap = curr_bbox[1] - prev_bbox[3]
# --- Signal 1: FONT SIZE CHANGE ---
size_diff = abs(curr_info["font_size"] - prev_info["font_size"])
if size_diff > 1.5:
return True
# --- Signal 2: STYLE CHANGE (bold boundary) ---
if prev_info["is_bold"] != curr_info["is_bold"]:
# Any bold/non-bold transition = structural break
return True
# --- Signal 3: VERTICAL GAP (relative) ---
if median_gap > 0:
gap_ratio = gap / median_gap if median_gap > 0 else 1
if gap_ratio >= 2.0:
return True
if gap_ratio >= 1.5:
prev_text = prev_info["stripped"]
if prev_text and prev_text[-1] in '.!?:"\u201D\u2019':
return True
# --- Signal 4: ABSOLUTE GAP ---
if gap > avg_line_height * 1.0:
return True
# --- Signal 5: INDENTATION CHANGE ---
x_diff = abs(curr_bbox[0] - prev_bbox[0])
if x_diff > 25:
return True
# --- Signal 6: SINGLE-LINE ENTRIES ---
# If previous line is a self-contained entry (e.g. TOC line with dot leaders),
# break even if font/style/gap are the same
if prev_info.get("is_single_line_entry"):
return True
# --- Signal 7: BOTH BOLD + PREVIOUS IS SHORT ---
# When both lines are bold and the previous line is relatively short
# (not reaching near the right margin), each is likely a separate heading/entry.
# This handles TOC entries, section headings, etc. that are all bold+same size.
if prev_info["is_bold"] and curr_info["is_bold"]:
prev_line_width = prev_bbox[2] - prev_bbox[0]
# Compare against page content width (approximate)
if page_width > 0 and prev_line_width < page_width * 0.75:
return True
return False
def _merge_bullet_lines(self, line_infos):
"""Merge bullet character lines with their following text lines."""
if not line_infos:
return line_infos
merged = []
i = 0
while i < len(line_infos):
info = line_infos[i]
if info["is_bullet"] and i + 1 < len(line_infos):
next_info = line_infos[i + 1]
bullet_char = info["stripped"]
merged_text = bullet_char + " " + next_info["text"]
merged_stripped = bullet_char + " " + next_info["stripped"]
merged_info = {
"text": merged_text,
"stripped": merged_stripped,
"bbox": [
min(info["bbox"][0], next_info["bbox"][0]),
min(info["bbox"][1], next_info["bbox"][1]),
max(info["bbox"][2], next_info["bbox"][2]),
max(info["bbox"][3], next_info["bbox"][3]),
],
"font_size": next_info["font_size"],
"flags": next_info["flags"],
"is_bold": next_info["is_bold"],
"is_italic": next_info["is_italic"],
"char_count": info["char_count"] + next_info["char_count"],
"is_bullet": False,
"is_single_line_entry": False,
}
merged.append(merged_info)
i += 2
else:
merged.append(info)
i += 1
return merged
def _split_block_into_paragraphs(self, block, page_width):
"""Split a single dict-mode block into paragraph groups."""
lines = block.get("lines", [])
if not lines:
return []
line_infos = []
for line in lines:
info = self._extract_line_info(line)
if info["stripped"]:
line_infos.append(info)
if not line_infos:
return []
line_infos = self._merge_bullet_lines(line_infos)
# Mark single-line entries (TOC lines, etc.)
for info in line_infos:
info["is_single_line_entry"] = self._is_single_line_entry(info, page_width)
if len(line_infos) == 1:
return [line_infos]
gaps = []
line_heights = []
for i in range(len(line_infos)):
h = line_infos[i]["bbox"][3] - line_infos[i]["bbox"][1]
line_heights.append(h)
if i > 0:
gap = line_infos[i]["bbox"][1] - line_infos[i - 1]["bbox"][3]
gaps.append(gap)
avg_line_height = sum(line_heights) / len(line_heights) if line_heights else 12
if gaps:
sorted_gaps = sorted(gaps)
median_gap = sorted_gaps[len(sorted_gaps) // 2]
else:
median_gap = avg_line_height * 0.3
paragraphs = []
current_group = [line_infos[0]]
for i in range(1, len(line_infos)):
if self._should_break_between(
line_infos[i - 1], line_infos[i],
median_gap, avg_line_height, page_width
):
paragraphs.append(current_group)
current_group = [line_infos[i]]
else:
current_group.append(line_infos[i])
if current_group:
paragraphs.append(current_group)
return paragraphs
def _group_to_element(self, line_group):
"""Convert a group of line-infos into a single page element dict."""
text = " ".join(info["stripped"] for info in line_group if info["stripped"])
if not text.strip():
return None
total_chars = sum(info["char_count"] for info in line_group)
if total_chars > 0:
font_size = sum(
info["font_size"] * info["char_count"] for info in line_group
) / total_chars
else:
font_size = self.body_size
flags = 0
for info in line_group:
flags |= info["flags"]
x0 = min(info["bbox"][0] for info in line_group)
y0 = min(info["bbox"][1] for info in line_group)
x1 = max(info["bbox"][2] for info in line_group)
y1 = max(info["bbox"][3] for info in line_group)
is_italic = bool(flags & (2 ** 1))
elem_type = self._classify_element(text, font_size, flags, is_italic, [x0, y0, x1, y1])
if elem_type:
return {
"type": elem_type,
"text": text.strip(),
"bbox": [x0, y0, x1, y1],
"font_size": font_size,
"flags": flags,
}
return None
# ================================================================
# POST-PROCESSING
# ================================================================
def _should_merge_elements(self, prev_elem, curr_elem):
"""
Determine if two consecutive elements should be merged because
they are continuations of the same paragraph split across PyMuPDF blocks.
"""
# Only merge paragraph + paragraph
if prev_elem["type"] != "paragraph" or curr_elem["type"] != "paragraph":
return False
# Font size must be similar
if abs(prev_elem["font_size"] - curr_elem["font_size"]) > 1.5:
return False
# Don't merge if styles differ
prev_bold = bool(prev_elem.get("flags", 0) & (2 ** 4))
curr_bold = bool(curr_elem.get("flags", 0) & (2 ** 4))
if prev_bold != curr_bold:
return False
prev_text = prev_elem["text"].strip()
curr_text = curr_elem["text"].strip()
if not prev_text or not curr_text:
return False
# Don't merge if prev contains dot leaders (TOC entry)
if self.TOC_LEADER_PATTERN.search(prev_text):
return False
last_char = prev_text[-1]
if last_char in '.!?':
if curr_text and curr_text[0].islower():
return True
return False
if last_char in '"\u201D\u2019':
if len(prev_text) >= 2 and prev_text[-2] in '.!?':
if curr_text and curr_text[0].islower():
return True
return False
# Doesn't end with sentence-ending punctuation — likely mid-paragraph
return True
def _merge_continuation_paragraphs(self, elements):
"""Merge consecutive paragraph elements that are continuations."""
if len(elements) <= 1:
return elements
merged = [elements[0]]
for i in range(1, len(elements)):
prev = merged[-1]
curr = elements[i]
if self._should_merge_elements(prev, curr):
combined_text = prev["text"].rstrip() + " " + curr["text"].lstrip()
prev_bbox = prev["bbox"]
curr_bbox = curr["bbox"]
combined_bbox = [
min(prev_bbox[0], curr_bbox[0]),
min(prev_bbox[1], curr_bbox[1]),
max(prev_bbox[2], curr_bbox[2]),
max(prev_bbox[3], curr_bbox[3]),
]
merged[-1] = {
"type": "paragraph",
"text": combined_text,
"bbox": combined_bbox,
"font_size": prev["font_size"],
"flags": prev.get("flags", 0),
}
else:
merged.append(curr)
return merged
def _split_combined_list_items(self, elements):
"""Split list_item elements that contain multiple inline bullet items."""
result = []
for elem in elements:
if elem["type"] != "list_item":
result.append(elem)
continue
text = elem["text"].strip()
cleaned = text
for pattern in self.LIST_PATTERNS:
cleaned = re.sub(pattern, '', cleaned, count=1).strip()
parts = self.INLINE_BULLET_SPLIT.split(cleaned)
parts = [p.strip() for p in parts if p.strip()]
if len(parts) <= 1:
result.append(elem)
else:
bbox = elem["bbox"]
total_height = bbox[3] - bbox[1]
item_height = total_height / len(parts) if len(parts) > 0 else total_height
for idx, part in enumerate(parts):
item_bbox = [
bbox[0],
bbox[1] + idx * item_height,
bbox[2],
bbox[1] + (idx + 1) * item_height,
]
result.append({
"type": "list_item",
"text": part.strip(),
"bbox": item_bbox,
"font_size": elem["font_size"],
"flags": elem.get("flags", 0),
})
return result
def process(self):
"""Process the entire PDF and extract all elements."""
self._analyze_font_distribution()
all_pages = []
total_images = 0
for page_num, page in enumerate(self.doc):
page_elements = []
page_rect = page.rect
dict_blocks = page.get_text("dict", flags=11)["blocks"]
tables = self._extract_tables(page, page_num)
table_bboxes = [t["bbox"] for t in tables if t.get("bbox")]
images = self._extract_images(page, page_num)
total_images += len(images)
for block in dict_blocks:
if block.get("type") != 0:
continue
block_bbox = block.get("bbox", [0, 0, 0, 0])
skip_block = False
for table_bbox in table_bboxes:
if self._bboxes_overlap(block_bbox, table_bbox):
skip_block = True
break
if skip_block:
continue
para_groups = self._split_block_into_paragraphs(block, page_rect.width)
for group in para_groups:
element = self._group_to_element(group)
if element:
page_elements.append(element)
page_elements = [e for e in page_elements if e["text"].strip()]
page_elements = self._merge_continuation_paragraphs(page_elements)
page_elements = self._split_combined_list_items(page_elements)
page_elements.extend(tables)
page_elements.extend(images)
page_elements = self._get_reading_order(page_elements, page_rect.width)
all_pages.append({
"page_number": page_num,
"width": page_rect.width,
"height": page_rect.height,
"elements": page_elements
})
print(f"📄 PDF processed: {len(self.doc)} pages, {total_images} images extracted")
return {
"page_count": len(self.doc),
"metadata": {
"title": self.doc.metadata.get("title", ""),
"author": self.doc.metadata.get("author", ""),
"subject": self.doc.metadata.get("subject", ""),
},
"pages": all_pages
}
def to_markdown(self, processed_data):
"""Convert processed PDF data to Markdown blocks."""
blocks = []
for page in processed_data.get("pages", []):
for elem in page.get("elements", []):
elem_type = elem.get("type")
if elem_type == "title":
blocks.append({
"type": "heading1",
"content": f"# {elem.get('text', '')}"
})
elif elem_type == "subtitle":
blocks.append({
"type": "heading2",
"content": f"## {elem.get('text', '')}"
})
elif elem_type == "heading":
blocks.append({
"type": "heading3",
"content": f"### {elem.get('text', '')}"
})
elif elem_type == "paragraph":
blocks.append({
"type": "paragraph",
"content": elem.get('text', '')
})
elif elem_type == "list_item":
text = elem.get('text', '')
for pattern in self.LIST_PATTERNS:
text = re.sub(pattern, '', text)
blocks.append({
"type": "list_item",
"content": f"- {text.strip()}"
})
elif elem_type == "quote":
blocks.append({
"type": "quote",
"content": f"> {elem.get('text', '')}"
})
elif elem_type == "table":
blocks.append({
"type": "table",
"content": elem.get('markdown', '')
})
elif elem_type == "image":
img_data = elem.get("data", "")
img_format = elem.get("format", "png")
if img_data:
blocks.append({
"type": "image",
"content": f"![PDF Image](embedded-image.{img_format})",
"data": img_data,
"format": img_format
})
return blocks
def process_pdf_to_markdown(pdf_bytes):
"""Process PDF bytes and return markdown blocks."""
processor = PDFProcessor(pdf_bytes)
try:
processed_data = processor.process()
markdown_blocks = processor.to_markdown(processed_data)
return {
"page_count": processed_data["page_count"],
"metadata": processed_data["metadata"],
"markdown_blocks": markdown_blocks
}
finally:
processor.close()