# pdf_processor.py - PDF Processing and Content Extraction import base64 import re import fitz # PyMuPDF class PDFProcessor: """Process PDF files and extract structured content.""" TITLE_SIZE_THRESHOLD = 24 SUBTITLE_SIZE_THRESHOLD = 18 HEADING_SIZE_THRESHOLD = 14 TITLE_RATIO = 1.8 SUBTITLE_RATIO = 1.4 HEADING_RATIO = 1.2 LIST_PATTERNS = [ r'^\s*[\u2022\u2023\u25E6\u2043\u2219•●○◦‣⁃·∙]\s*', r'^\s*[-–—]\s+', r'^\s*\d+[.)]\s+', r'^\s*[a-zA-Z][.)]\s+', r'^\s*[ivxIVX]+[.)]\s+', ] BULLET_CHARS = set('•●○◦‣⁃·∙\u2022\u2023\u25E6\u2043\u2219-–—') INLINE_BULLET_SPLIT = re.compile( r'\s*[\u2022\u2023\u25E6\u2043\u2219•●○◦‣⁃·∙]\s+' ) QUOTE_PATTERNS = [ r'^[\"\'\u201C\u201D\u2018\u2019].+[\"\'\u201C\u201D\u2018\u2019]$', ] # Pattern for TOC-style dot leaders: text followed by dots and a page number TOC_LEADER_PATTERN = re.compile(r'[.…·]{3,}\s*\.?\s*\d+\s*$') def __init__(self, pdf_bytes): self.doc = fitz.open(stream=pdf_bytes, filetype="pdf") self.elements = [] self.font_sizes = [] self.median_size = 12 self.body_size = 12 def close(self): if self.doc: self.doc.close() def _analyze_font_distribution(self): font_size_counts = {} for page in self.doc: blocks = page.get_text("dict", flags=11)["blocks"] for block in blocks: if block.get("type") == 0: for line in block.get("lines", []): for span in line.get("spans", []): size = round(span.get("size", 12), 1) text = span.get("text", "").strip() if text: self.font_sizes.append(size) font_size_counts[size] = font_size_counts.get(size, 0) + len(text) if self.font_sizes: self.font_sizes.sort() n = len(self.font_sizes) self.median_size = self.font_sizes[n // 2] if font_size_counts: self.body_size = max(font_size_counts.keys(), key=lambda x: font_size_counts[x]) else: self.body_size = self.median_size def _is_likely_heading(self, text, font_size, flags): text_stripped = text.strip() if not text_stripped: return False, None is_bold = bool(flags & 2 ** 4) is_all_caps = text_stripped.isupper() and len(text_stripped) > 3 size_ratio = font_size / self.body_size if self.body_size > 0 else 1 if size_ratio >= self.TITLE_RATIO or font_size >= self.TITLE_SIZE_THRESHOLD: if len(text_stripped) < 200: return True, "title" if size_ratio >= self.SUBTITLE_RATIO or font_size >= self.SUBTITLE_SIZE_THRESHOLD: if len(text_stripped) < 150: return True, "subtitle" if size_ratio >= self.HEADING_RATIO and is_bold: if len(text_stripped) < 100: return True, "heading" if is_all_caps and is_bold and len(text_stripped) < 80: return True, "heading" # Short bold text is likely a heading even at body font size if is_bold and len(text_stripped) < 60: return True, "heading" return False, None def _classify_element(self, text, font_size, flags, is_italic=False, bbox=None): text_stripped = text.strip() if not text_stripped: return None is_bold = bool(flags & 2 ** 4) # Check headings FIRST (before list patterns) is_heading, heading_type = self._is_likely_heading(text_stripped, font_size, flags) if is_heading: return heading_type # Then check list patterns for pattern in self.LIST_PATTERNS: if re.match(pattern, text_stripped): return "list_item" # Then check quotes if is_italic and len(text_stripped) > 50: return "quote" for pattern in self.QUOTE_PATTERNS: if re.match(pattern, text_stripped): return "quote" return "paragraph" def _extract_images(self, page, page_num): images = [] image_list = page.get_images(full=True) for img_index, img in enumerate(image_list): try: xref = img[0] base_image = self.doc.extract_image(xref) if base_image: image_bytes = base_image["image"] image_ext = base_image["ext"] img_rects = page.get_image_rects(img) bbox = None if img_rects: rect = img_rects[0] bbox = [rect.x0, rect.y0, rect.x1, rect.y1] images.append({ "type": "image", "data": base64.b64encode(image_bytes).decode('utf-8'), "format": image_ext, "bbox": bbox, "width": base_image.get("width", 0), "height": base_image.get("height", 0), }) except Exception as e: print(f"Error extracting image {img_index} from page {page_num}: {e}") return images def _extract_tables(self, page, page_num): tables = [] try: table_finder = page.find_tables() for table_index, table in enumerate(table_finder): try: table_data = table.extract() bbox = list(table.bbox) markdown_table = self._table_to_markdown(table_data) tables.append({ "type": "table", "data": table_data, "markdown": markdown_table, "bbox": bbox, }) except Exception as e: print(f"Error extracting table {table_index} from page {page_num}: {e}") except Exception as e: print(f"Error finding tables on page {page_num}: {e}") return tables def _table_to_markdown(self, table_data): if not table_data: return "" lines = [] for row_idx, row in enumerate(table_data): cells = [str(cell).replace('|', '\\|').replace('\n', ' ') if cell else '' for cell in row] lines.append('| ' + ' | '.join(cells) + ' |') if row_idx == 0: lines.append('| ' + ' | '.join(['---'] * len(cells)) + ' |') return '\n'.join(lines) def _get_reading_order(self, elements, page_width): if not elements: return elements mid_x = page_width / 2 left_col, right_col, full_width = [], [], [] for elem in elements: bbox = elem.get("bbox") if not bbox: full_width.append(elem) continue x0, y0, x1, y1 = bbox width = x1 - x0 if width > page_width * 0.6: full_width.append(elem) elif x1 < mid_x: left_col.append(elem) elif x0 > mid_x: right_col.append(elem) else: full_width.append(elem) sort_by_y = lambda e: e.get("bbox", [0, 0, 0, 0])[1] left_col.sort(key=sort_by_y) right_col.sort(key=sort_by_y) full_width.sort(key=sort_by_y) all_elements = [(e, "full") for e in full_width] all_elements += [(e, "left") for e in left_col] all_elements += [(e, "right") for e in right_col] all_elements.sort(key=lambda x: x[0].get("bbox", [0, 0, 0, 0])[1]) result = [e[0] for e in all_elements] for idx, elem in enumerate(result): elem["reading_order"] = idx return result def _bboxes_overlap(self, bbox1, bbox2, threshold=0.5): if not bbox1 or not bbox2: return False x1_min, y1_min, x1_max, y1_max = bbox1 x2_min, y2_min, x2_max, y2_max = bbox2 x_overlap = max(0, min(x1_max, x2_max) - max(x1_min, x2_min)) y_overlap = max(0, min(y1_max, y2_max) - max(y1_min, y2_min)) intersection = x_overlap * y_overlap area1 = (x1_max - x1_min) * (y1_max - y1_min) if area1 == 0: return False return intersection / area1 > threshold # ================================================================ # LINE-LEVEL ANALYSIS # ================================================================ def _extract_line_info(self, line): """Extract enriched info from a single dict-mode line.""" text = "" total_chars = 0 weighted_size = 0.0 combined_flags = 0 for span in line.get("spans", []): span_text = span.get("text", "") span_size = span.get("size", 12) span_flags = span.get("flags", 0) if span_text.strip(): char_count = len(span_text) text += span_text weighted_size = ( (weighted_size * total_chars + span_size * char_count) / (total_chars + char_count) ) if (total_chars + char_count) > 0 else span_size total_chars += char_count combined_flags |= span_flags stripped = text.strip() return { "text": text, "stripped": stripped, "bbox": list(line.get("bbox", [0, 0, 0, 0])), "font_size": round(weighted_size, 1), "flags": combined_flags, "is_bold": bool(combined_flags & (2 ** 4)), "is_italic": bool(combined_flags & (2 ** 1)), "char_count": total_chars, "is_bullet": len(stripped) <= 2 and bool(stripped) and all(c in self.BULLET_CHARS for c in stripped), "is_single_line_entry": False, # Set during analysis } def _is_single_line_entry(self, info, page_width): """ Determine if a line is a self-contained single-line entry (like a TOC entry, a short heading, etc.) rather than a wrapped continuation of a multi-line paragraph. Signals: - Contains dot leaders (TOC pattern) - Is a single line that doesn't reach near the right margin (not a wrapped line) - Ends with a number (page reference) """ text = info["stripped"] if not text: return False # TOC dot leader pattern: "Chapter 1 - Something.............3" if self.TOC_LEADER_PATTERN.search(text): return True # Ends with a digit (possible page number) and has dots if re.search(r'\d+\s*$', text) and '…' in text: return True return False # ================================================================ # MULTI-SIGNAL PARAGRAPH SPLITTING # ================================================================ def _should_break_between(self, prev_info, curr_info, median_gap, avg_line_height, page_width): """Decide whether a paragraph break should be inserted between two lines.""" if prev_info["is_bullet"]: return False prev_bbox = prev_info["bbox"] curr_bbox = curr_info["bbox"] gap = curr_bbox[1] - prev_bbox[3] # --- Signal 1: FONT SIZE CHANGE --- size_diff = abs(curr_info["font_size"] - prev_info["font_size"]) if size_diff > 1.5: return True # --- Signal 2: STYLE CHANGE (bold boundary) --- if prev_info["is_bold"] != curr_info["is_bold"]: # Any bold/non-bold transition = structural break return True # --- Signal 3: VERTICAL GAP (relative) --- if median_gap > 0: gap_ratio = gap / median_gap if median_gap > 0 else 1 if gap_ratio >= 2.0: return True if gap_ratio >= 1.5: prev_text = prev_info["stripped"] if prev_text and prev_text[-1] in '.!?:"\u201D\u2019': return True # --- Signal 4: ABSOLUTE GAP --- if gap > avg_line_height * 1.0: return True # --- Signal 5: INDENTATION CHANGE --- x_diff = abs(curr_bbox[0] - prev_bbox[0]) if x_diff > 25: return True # --- Signal 6: SINGLE-LINE ENTRIES --- # If previous line is a self-contained entry (e.g. TOC line with dot leaders), # break even if font/style/gap are the same if prev_info.get("is_single_line_entry"): return True # --- Signal 7: BOTH BOLD + PREVIOUS IS SHORT --- # When both lines are bold and the previous line is relatively short # (not reaching near the right margin), each is likely a separate heading/entry. # This handles TOC entries, section headings, etc. that are all bold+same size. if prev_info["is_bold"] and curr_info["is_bold"]: prev_line_width = prev_bbox[2] - prev_bbox[0] # Compare against page content width (approximate) if page_width > 0 and prev_line_width < page_width * 0.75: return True return False def _merge_bullet_lines(self, line_infos): """Merge bullet character lines with their following text lines.""" if not line_infos: return line_infos merged = [] i = 0 while i < len(line_infos): info = line_infos[i] if info["is_bullet"] and i + 1 < len(line_infos): next_info = line_infos[i + 1] bullet_char = info["stripped"] merged_text = bullet_char + " " + next_info["text"] merged_stripped = bullet_char + " " + next_info["stripped"] merged_info = { "text": merged_text, "stripped": merged_stripped, "bbox": [ min(info["bbox"][0], next_info["bbox"][0]), min(info["bbox"][1], next_info["bbox"][1]), max(info["bbox"][2], next_info["bbox"][2]), max(info["bbox"][3], next_info["bbox"][3]), ], "font_size": next_info["font_size"], "flags": next_info["flags"], "is_bold": next_info["is_bold"], "is_italic": next_info["is_italic"], "char_count": info["char_count"] + next_info["char_count"], "is_bullet": False, "is_single_line_entry": False, } merged.append(merged_info) i += 2 else: merged.append(info) i += 1 return merged def _split_block_into_paragraphs(self, block, page_width): """Split a single dict-mode block into paragraph groups.""" lines = block.get("lines", []) if not lines: return [] line_infos = [] for line in lines: info = self._extract_line_info(line) if info["stripped"]: line_infos.append(info) if not line_infos: return [] line_infos = self._merge_bullet_lines(line_infos) # Mark single-line entries (TOC lines, etc.) for info in line_infos: info["is_single_line_entry"] = self._is_single_line_entry(info, page_width) if len(line_infos) == 1: return [line_infos] gaps = [] line_heights = [] for i in range(len(line_infos)): h = line_infos[i]["bbox"][3] - line_infos[i]["bbox"][1] line_heights.append(h) if i > 0: gap = line_infos[i]["bbox"][1] - line_infos[i - 1]["bbox"][3] gaps.append(gap) avg_line_height = sum(line_heights) / len(line_heights) if line_heights else 12 if gaps: sorted_gaps = sorted(gaps) median_gap = sorted_gaps[len(sorted_gaps) // 2] else: median_gap = avg_line_height * 0.3 paragraphs = [] current_group = [line_infos[0]] for i in range(1, len(line_infos)): if self._should_break_between( line_infos[i - 1], line_infos[i], median_gap, avg_line_height, page_width ): paragraphs.append(current_group) current_group = [line_infos[i]] else: current_group.append(line_infos[i]) if current_group: paragraphs.append(current_group) return paragraphs def _group_to_element(self, line_group): """Convert a group of line-infos into a single page element dict.""" text = " ".join(info["stripped"] for info in line_group if info["stripped"]) if not text.strip(): return None total_chars = sum(info["char_count"] for info in line_group) if total_chars > 0: font_size = sum( info["font_size"] * info["char_count"] for info in line_group ) / total_chars else: font_size = self.body_size flags = 0 for info in line_group: flags |= info["flags"] x0 = min(info["bbox"][0] for info in line_group) y0 = min(info["bbox"][1] for info in line_group) x1 = max(info["bbox"][2] for info in line_group) y1 = max(info["bbox"][3] for info in line_group) is_italic = bool(flags & (2 ** 1)) elem_type = self._classify_element(text, font_size, flags, is_italic, [x0, y0, x1, y1]) if elem_type: return { "type": elem_type, "text": text.strip(), "bbox": [x0, y0, x1, y1], "font_size": font_size, "flags": flags, } return None # ================================================================ # POST-PROCESSING # ================================================================ def _should_merge_elements(self, prev_elem, curr_elem): """ Determine if two consecutive elements should be merged because they are continuations of the same paragraph split across PyMuPDF blocks. """ # Only merge paragraph + paragraph if prev_elem["type"] != "paragraph" or curr_elem["type"] != "paragraph": return False # Font size must be similar if abs(prev_elem["font_size"] - curr_elem["font_size"]) > 1.5: return False # Don't merge if styles differ prev_bold = bool(prev_elem.get("flags", 0) & (2 ** 4)) curr_bold = bool(curr_elem.get("flags", 0) & (2 ** 4)) if prev_bold != curr_bold: return False prev_text = prev_elem["text"].strip() curr_text = curr_elem["text"].strip() if not prev_text or not curr_text: return False # Don't merge if prev contains dot leaders (TOC entry) if self.TOC_LEADER_PATTERN.search(prev_text): return False last_char = prev_text[-1] if last_char in '.!?': if curr_text and curr_text[0].islower(): return True return False if last_char in '"\u201D\u2019': if len(prev_text) >= 2 and prev_text[-2] in '.!?': if curr_text and curr_text[0].islower(): return True return False # Doesn't end with sentence-ending punctuation — likely mid-paragraph return True def _merge_continuation_paragraphs(self, elements): """Merge consecutive paragraph elements that are continuations.""" if len(elements) <= 1: return elements merged = [elements[0]] for i in range(1, len(elements)): prev = merged[-1] curr = elements[i] if self._should_merge_elements(prev, curr): combined_text = prev["text"].rstrip() + " " + curr["text"].lstrip() prev_bbox = prev["bbox"] curr_bbox = curr["bbox"] combined_bbox = [ min(prev_bbox[0], curr_bbox[0]), min(prev_bbox[1], curr_bbox[1]), max(prev_bbox[2], curr_bbox[2]), max(prev_bbox[3], curr_bbox[3]), ] merged[-1] = { "type": "paragraph", "text": combined_text, "bbox": combined_bbox, "font_size": prev["font_size"], "flags": prev.get("flags", 0), } else: merged.append(curr) return merged def _split_combined_list_items(self, elements): """Split list_item elements that contain multiple inline bullet items.""" result = [] for elem in elements: if elem["type"] != "list_item": result.append(elem) continue text = elem["text"].strip() cleaned = text for pattern in self.LIST_PATTERNS: cleaned = re.sub(pattern, '', cleaned, count=1).strip() parts = self.INLINE_BULLET_SPLIT.split(cleaned) parts = [p.strip() for p in parts if p.strip()] if len(parts) <= 1: result.append(elem) else: bbox = elem["bbox"] total_height = bbox[3] - bbox[1] item_height = total_height / len(parts) if len(parts) > 0 else total_height for idx, part in enumerate(parts): item_bbox = [ bbox[0], bbox[1] + idx * item_height, bbox[2], bbox[1] + (idx + 1) * item_height, ] result.append({ "type": "list_item", "text": part.strip(), "bbox": item_bbox, "font_size": elem["font_size"], "flags": elem.get("flags", 0), }) return result def process(self): """Process the entire PDF and extract all elements.""" self._analyze_font_distribution() all_pages = [] total_images = 0 for page_num, page in enumerate(self.doc): page_elements = [] page_rect = page.rect dict_blocks = page.get_text("dict", flags=11)["blocks"] tables = self._extract_tables(page, page_num) table_bboxes = [t["bbox"] for t in tables if t.get("bbox")] images = self._extract_images(page, page_num) total_images += len(images) for block in dict_blocks: if block.get("type") != 0: continue block_bbox = block.get("bbox", [0, 0, 0, 0]) skip_block = False for table_bbox in table_bboxes: if self._bboxes_overlap(block_bbox, table_bbox): skip_block = True break if skip_block: continue para_groups = self._split_block_into_paragraphs(block, page_rect.width) for group in para_groups: element = self._group_to_element(group) if element: page_elements.append(element) page_elements = [e for e in page_elements if e["text"].strip()] page_elements = self._merge_continuation_paragraphs(page_elements) page_elements = self._split_combined_list_items(page_elements) page_elements.extend(tables) page_elements.extend(images) page_elements = self._get_reading_order(page_elements, page_rect.width) all_pages.append({ "page_number": page_num, "width": page_rect.width, "height": page_rect.height, "elements": page_elements }) print(f"📄 PDF processed: {len(self.doc)} pages, {total_images} images extracted") return { "page_count": len(self.doc), "metadata": { "title": self.doc.metadata.get("title", ""), "author": self.doc.metadata.get("author", ""), "subject": self.doc.metadata.get("subject", ""), }, "pages": all_pages } def to_markdown(self, processed_data): """Convert processed PDF data to Markdown blocks.""" blocks = [] for page in processed_data.get("pages", []): for elem in page.get("elements", []): elem_type = elem.get("type") if elem_type == "title": blocks.append({ "type": "heading1", "content": f"# {elem.get('text', '')}" }) elif elem_type == "subtitle": blocks.append({ "type": "heading2", "content": f"## {elem.get('text', '')}" }) elif elem_type == "heading": blocks.append({ "type": "heading3", "content": f"### {elem.get('text', '')}" }) elif elem_type == "paragraph": blocks.append({ "type": "paragraph", "content": elem.get('text', '') }) elif elem_type == "list_item": text = elem.get('text', '') for pattern in self.LIST_PATTERNS: text = re.sub(pattern, '', text) blocks.append({ "type": "list_item", "content": f"- {text.strip()}" }) elif elem_type == "quote": blocks.append({ "type": "quote", "content": f"> {elem.get('text', '')}" }) elif elem_type == "table": blocks.append({ "type": "table", "content": elem.get('markdown', '') }) elif elem_type == "image": img_data = elem.get("data", "") img_format = elem.get("format", "png") if img_data: blocks.append({ "type": "image", "content": f"![PDF Image](embedded-image.{img_format})", "data": img_data, "format": img_format }) return blocks def process_pdf_to_markdown(pdf_bytes): """Process PDF bytes and return markdown blocks.""" processor = PDFProcessor(pdf_bytes) try: processed_data = processor.process() markdown_blocks = processor.to_markdown(processed_data) return { "page_count": processed_data["page_count"], "metadata": processed_data["metadata"], "markdown_blocks": markdown_blocks } finally: processor.close()