# pdf_processor.py - PDF Processing and Content Extraction # UPDATED: Blocks now include font_size and is_bold metadata for AI analysis import base64 import re import fitz # PyMuPDF # ================================================================ # LIST GROUPING HELPER # ================================================================ def _group_lists(blocks): """Groups consecutive list items into a cohesive Markdown list block.""" final_blocks = [] list_buffer = [] list_type = None def flush(): if not list_buffer: return lines = [] for i, text in enumerate(list_buffer): clean_text = re.sub(r'^\s*[-*+]\s+', '', text) clean_text = re.sub(r'^\s*\d+\.\s+', '', clean_text) if list_type == 'numbered_list': lines.append(f"{i+1}. {clean_text}") else: lines.append(f"- {clean_text}") final_blocks.append({ 'type': 'paragraph', 'content': '\n'.join(lines) }) list_buffer.clear() for b in blocks: if b.get('type') in ['image', 'table']: flush() list_type = None final_blocks.append(b) continue content = b.get('content', '') bt = b.get('type', '') is_bullet = bt == 'list_item' or content.startswith('- ') or content.startswith('* ') is_number = bt == 'numbered_list' or re.match(r'^\s*\d+\.\s+', content) if bt.startswith('heading'): is_bullet = False is_number = False if is_bullet or is_number: current_type = 'numbered_list' if is_number else 'bullet_list' if list_type and list_type != current_type: flush() list_type = current_type list_buffer.append(content) else: flush() list_type = None final_blocks.append(b) flush() return final_blocks class PDFProcessor: """Process PDF files and extract structured content.""" TITLE_SIZE_THRESHOLD = 24 SUBTITLE_SIZE_THRESHOLD = 18 HEADING_SIZE_THRESHOLD = 14 TITLE_RATIO = 1.8 SUBTITLE_RATIO = 1.4 HEADING_RATIO = 1.2 LIST_PATTERNS = [ r'^\s*[\u2022\u2023\u25E6\u2043\u2219•●○◦‣⁃·∙]\s*', r'^\s*[-–—]\s+', r'^\s*\d+[.)]\s+', r'^\s*[a-zA-Z][.)]\s+', r'^\s*[ivxIVX]+[.)]\s+', ] BULLET_CHARS = set('•●○◦‣⁃·∙\u2022\u2023\u25E6\u2043\u2219-–—') INLINE_BULLET_SPLIT = re.compile( r'\s*[\u2022\u2023\u25E6\u2043\u2219•●○◦‣⁃·∙]\s+' ) QUOTE_PATTERNS = [ r'^[\"\'\u201C\u201D\u2018\u2019].+[\"\'\u201C\u201D\u2018\u2019]$', ] TOC_LEADER_PATTERN = re.compile(r'[.…·]{3,}\s*\.?\s*\d+\s*$') def __init__(self, pdf_bytes): self.doc = fitz.open(stream=pdf_bytes, filetype="pdf") self.elements = [] self.font_sizes = [] self.median_size = 12 self.body_size = 12 def close(self): if self.doc: self.doc.close() def _analyze_font_distribution(self): font_size_counts = {} for page in self.doc: blocks = page.get_text("dict", flags=11)["blocks"] for block in blocks: if block.get("type") == 0: for line in block.get("lines", []): for span in line.get("spans", []): size = round(span.get("size", 12), 1) text = span.get("text", "").strip() if text: self.font_sizes.append(size) font_size_counts[size] = font_size_counts.get(size, 0) + len(text) if self.font_sizes: self.font_sizes.sort() n = len(self.font_sizes) self.median_size = self.font_sizes[n // 2] if font_size_counts: self.body_size = max(font_size_counts.keys(), key=lambda x: font_size_counts[x]) else: self.body_size = self.median_size def _is_likely_heading(self, text, font_size, flags): text_stripped = text.strip() if not text_stripped: return False, None is_bold = bool(flags & 2 ** 4) is_all_caps = text_stripped.isupper() and len(text_stripped) > 3 size_ratio = font_size / self.body_size if self.body_size > 0 else 1 if size_ratio >= self.TITLE_RATIO or font_size >= self.TITLE_SIZE_THRESHOLD: if len(text_stripped) < 200: return True, "title" if size_ratio >= self.SUBTITLE_RATIO or font_size >= self.SUBTITLE_SIZE_THRESHOLD: if len(text_stripped) < 150: return True, "subtitle" if size_ratio >= self.HEADING_RATIO and is_bold: if len(text_stripped) < 100: return True, "heading" if is_all_caps and is_bold and len(text_stripped) < 80: return True, "heading" if is_bold and len(text_stripped) < 60: return True, "heading" return False, None def _classify_element(self, text, font_size, flags, is_italic=False, bbox=None): text_stripped = text.strip() if not text_stripped: return None is_bold = bool(flags & 2 ** 4) is_heading, heading_type = self._is_likely_heading(text_stripped, font_size, flags) if is_heading: return heading_type for pattern in self.LIST_PATTERNS: if re.match(pattern, text_stripped): if re.match(r'^\s*\d+[.)]\s+', text_stripped) or \ re.match(r'^\s*[a-zA-Z][.)]\s+', text_stripped) or \ re.match(r'^\s*[ivxIVX]+[.)]\s+', text_stripped): return "numbered_list" return "list_item" if is_italic and len(text_stripped) > 50: return "quote" for pattern in self.QUOTE_PATTERNS: if re.match(pattern, text_stripped): return "quote" return "paragraph" def _extract_images(self, page, page_num): images = [] image_list = page.get_images(full=True) for img_index, img in enumerate(image_list): try: xref = img[0] base_image = self.doc.extract_image(xref) if base_image: image_bytes = base_image["image"] image_ext = base_image["ext"] img_rects = page.get_image_rects(img) bbox = None if img_rects: rect = img_rects[0] bbox = [rect.x0, rect.y0, rect.x1, rect.y1] images.append({ "type": "image", "data": base64.b64encode(image_bytes).decode('utf-8'), "format": image_ext, "bbox": bbox, "width": base_image.get("width", 0), "height": base_image.get("height", 0), }) except Exception: pass return images def _extract_tables(self, page, page_num): tables = [] try: table_finder = page.find_tables() for table_index, table in enumerate(table_finder): try: table_data = table.extract() bbox = list(table.bbox) markdown_table = self._table_to_markdown(table_data) tables.append({ "type": "table", "data": table_data, "markdown": markdown_table, "bbox": bbox, }) except Exception: pass except Exception: pass return tables def _table_to_markdown(self, table_data): if not table_data: return "" lines = [] for row_idx, row in enumerate(table_data): cells = [str(cell).replace('|', '\\|').replace('\n', ' ') if cell else '' for cell in row] lines.append('| ' + ' | '.join(cells) + ' |') if row_idx == 0: lines.append('| ' + ' | '.join(['---'] * len(cells)) + ' |') return '\n'.join(lines) def _get_reading_order(self, elements, page_width): if not elements: return elements mid_x = page_width / 2 left_col, right_col, full_width = [], [], [] for elem in elements: bbox = elem.get("bbox") if not bbox: full_width.append(elem) continue x0, y0, x1, y1 = bbox width = x1 - x0 if width > page_width * 0.6: full_width.append(elem) elif x1 < mid_x: left_col.append(elem) elif x0 > mid_x: right_col.append(elem) else: full_width.append(elem) sort_by_y = lambda e: (e.get("bbox") or [0, 0, 0, 0])[1] left_col.sort(key=sort_by_y) right_col.sort(key=sort_by_y) full_width.sort(key=sort_by_y) all_elements = [(e, "full") for e in full_width] all_elements += [(e, "left") for e in left_col] all_elements += [(e, "right") for e in right_col] all_elements.sort(key=lambda x: (x[0].get("bbox") or [0, 0, 0, 0])[1]) result = [e[0] for e in all_elements] for idx, elem in enumerate(result): elem["reading_order"] = idx return result def _bboxes_overlap(self, bbox1, bbox2, threshold=0.5): if not bbox1 or not bbox2: return False x1_min, y1_min, x1_max, y1_max = bbox1 x2_min, y2_min, x2_max, y2_max = bbox2 x_overlap = max(0, min(x1_max, x2_max) - max(x1_min, x2_min)) y_overlap = max(0, min(y1_max, y2_max) - max(y1_min, y2_min)) intersection = x_overlap * y_overlap area1 = (x1_max - x1_min) * (y1_max - y1_min) if area1 == 0: return False return intersection / area1 > threshold def _extract_line_info(self, line): text = "" total_chars = 0 weighted_size = 0.0 combined_flags = 0 for span in line.get("spans", []): span_text = span.get("text", "") span_size = span.get("size", 12) span_flags = span.get("flags", 0) if span_text.strip(): char_count = len(span_text) text += span_text weighted_size = ((weighted_size * total_chars + span_size * char_count) / (total_chars + char_count)) if (total_chars + char_count) > 0 else span_size total_chars += char_count combined_flags |= span_flags stripped = text.strip() return { "text": text, "stripped": stripped, "bbox": list(line.get("bbox", [0, 0, 0, 0])), "font_size": round(weighted_size, 1), "flags": combined_flags, "is_bold": bool(combined_flags & (2 ** 4)), "is_italic": bool(combined_flags & (2 ** 1)), "char_count": total_chars, "is_bullet": len(stripped) <= 2 and bool(stripped) and all(c in self.BULLET_CHARS for c in stripped), "is_single_line_entry": False, } def _is_single_line_entry(self, info, page_width): text = info["stripped"] if not text: return False if self.TOC_LEADER_PATTERN.search(text): return True if re.search(r'\d+\s*$', text) and '…' in text: return True return False def _should_break_between(self, prev_info, curr_info, median_gap, avg_line_height, page_width): if prev_info["is_bullet"]: return False prev_bbox = prev_info["bbox"] curr_bbox = curr_info["bbox"] gap = curr_bbox[1] - prev_bbox[3] size_diff = abs(curr_info["font_size"] - prev_info["font_size"]) if size_diff > 1.5: return True if prev_info["is_bold"] != curr_info["is_bold"]: return True if median_gap > 0: gap_ratio = gap / median_gap if median_gap > 0 else 1 if gap_ratio >= 2.0: return True if gap_ratio >= 1.5: if prev_info["stripped"] and prev_info["stripped"][-1] in '.!?:"\u201D\u2019': return True if gap > avg_line_height * 1.0: return True x_diff = abs(curr_bbox[0] - prev_bbox[0]) if x_diff > 25: return True if prev_info.get("is_single_line_entry"): return True if prev_info["is_bold"] and curr_info["is_bold"]: prev_line_width = prev_bbox[2] - prev_bbox[0] if page_width > 0 and prev_line_width < page_width * 0.75: return True return False def _merge_bullet_lines(self, line_infos): if not line_infos: return line_infos merged = [] i = 0 while i < len(line_infos): info = line_infos[i] if info["is_bullet"] and i + 1 < len(line_infos): next_info = line_infos[i + 1] bullet_char = info["stripped"] merged.append({ "text": bullet_char + " " + next_info["text"], "stripped": bullet_char + " " + next_info["stripped"], "bbox": [ min(info["bbox"][0], next_info["bbox"][0]), min(info["bbox"][1], next_info["bbox"][1]), max(info["bbox"][2], next_info["bbox"][2]), max(info["bbox"][3], next_info["bbox"][3]), ], "font_size": next_info["font_size"], "flags": next_info["flags"], "is_bold": next_info["is_bold"], "is_italic": next_info["is_italic"], "char_count": info["char_count"] + next_info["char_count"], "is_bullet": False, "is_single_line_entry": False, }) i += 2 else: merged.append(info) i += 1 return merged def _split_block_into_paragraphs(self, block, page_width): lines = block.get("lines", []) if not lines: return [] line_infos = [] for line in lines: info = self._extract_line_info(line) if info["stripped"]: line_infos.append(info) if not line_infos: return [] line_infos = self._merge_bullet_lines(line_infos) for info in line_infos: info["is_single_line_entry"] = self._is_single_line_entry(info, page_width) if len(line_infos) == 1: return [line_infos] gaps = [] line_heights = [] for i in range(len(line_infos)): h = line_infos[i]["bbox"][3] - line_infos[i]["bbox"][1] line_heights.append(h) if i > 0: gap = line_infos[i]["bbox"][1] - line_infos[i - 1]["bbox"][3] gaps.append(gap) avg_line_height = sum(line_heights) / len(line_heights) if line_heights else 12 median_gap = sorted(gaps)[len(gaps) // 2] if gaps else avg_line_height * 0.3 paragraphs = [] current_group = [line_infos[0]] for i in range(1, len(line_infos)): if self._should_break_between(line_infos[i - 1], line_infos[i], median_gap, avg_line_height, page_width): paragraphs.append(current_group) current_group = [line_infos[i]] else: current_group.append(line_infos[i]) if current_group: paragraphs.append(current_group) return paragraphs def _group_to_element(self, line_group): text = " ".join(info["stripped"] for info in line_group if info["stripped"]) if not text.strip(): return None total_chars = sum(info["char_count"] for info in line_group) font_size = sum(info["font_size"] * info["char_count"] for info in line_group) / total_chars if total_chars > 0 else self.body_size flags = 0 for info in line_group: flags |= info["flags"] x0 = min(info["bbox"][0] for info in line_group) y0 = min(info["bbox"][1] for info in line_group) x1 = max(info["bbox"][2] for info in line_group) y1 = max(info["bbox"][3] for info in line_group) elem_type = self._classify_element(text, font_size, flags, bool(flags & (2 ** 1)), [x0, y0, x1, y1]) if elem_type: return { "type": elem_type, "text": text.strip(), "bbox": [x0, y0, x1, y1], "font_size": round(font_size, 1), "flags": flags, } return None def _should_merge_elements(self, prev_elem, curr_elem): if prev_elem["type"] != "paragraph" or curr_elem["type"] != "paragraph": return False if abs(prev_elem["font_size"] - curr_elem["font_size"]) > 1.5: return False prev_bold = bool(prev_elem.get("flags", 0) & (2 ** 4)) curr_bold = bool(curr_elem.get("flags", 0) & (2 ** 4)) if prev_bold != curr_bold: return False prev_text = prev_elem["text"].strip() curr_text = curr_elem["text"].strip() if not prev_text or not curr_text: return False if self.TOC_LEADER_PATTERN.search(prev_text): return False last_char = prev_text[-1] if last_char in '.!?': if curr_text and curr_text[0].islower(): return True return False if last_char in '"\u201D\u2019': if len(prev_text) >= 2 and prev_text[-2] in '.!?': if curr_text and curr_text[0].islower(): return True return False return True def _merge_continuation_paragraphs(self, elements): if len(elements) <= 1: return elements merged = [elements[0]] for i in range(1, len(elements)): prev = merged[-1] curr = elements[i] if self._should_merge_elements(prev, curr): prev_bbox = prev["bbox"] curr_bbox = curr["bbox"] merged[-1] = { "type": "paragraph", "text": prev["text"].rstrip() + " " + curr["text"].lstrip(), "bbox": [ min(prev_bbox[0], curr_bbox[0]), min(prev_bbox[1], curr_bbox[1]), max(prev_bbox[2], curr_bbox[2]), max(prev_bbox[3], curr_bbox[3]), ], "font_size": prev["font_size"], "flags": prev.get("flags", 0), } else: merged.append(curr) return merged def _split_combined_list_items(self, elements): result = [] for elem in elements: if elem["type"] != "list_item": result.append(elem) continue text = elem["text"].strip() cleaned = text for pattern in self.LIST_PATTERNS: cleaned = re.sub(pattern, '', cleaned, count=1).strip() parts = self.INLINE_BULLET_SPLIT.split(cleaned) parts = [p.strip() for p in parts if p.strip()] if len(parts) <= 1: result.append(elem) else: bbox = elem["bbox"] item_height = (bbox[3] - bbox[1]) / len(parts) if len(parts) > 0 else 0 for idx, part in enumerate(parts): result.append({ "type": "list_item", "text": part.strip(), "bbox": [bbox[0], bbox[1] + idx * item_height, bbox[2], bbox[1] + (idx + 1) * item_height], "font_size": elem["font_size"], "flags": elem.get("flags", 0), }) return result def process(self): self._analyze_font_distribution() all_pages = [] for page_num, page in enumerate(self.doc): page_elements = [] page_rect = page.rect dict_blocks = page.get_text("dict", flags=11)["blocks"] tables = self._extract_tables(page, page_num) table_bboxes = [t["bbox"] for t in tables if t.get("bbox")] images = self._extract_images(page, page_num) for block in dict_blocks: if block.get("type") != 0: continue block_bbox = block.get("bbox", [0, 0, 0, 0]) skip = False for t_bbox in table_bboxes: if self._bboxes_overlap(block_bbox, t_bbox): skip = True; break if skip: continue for group in self._split_block_into_paragraphs(block, page_rect.width): element = self._group_to_element(group) if element: page_elements.append(element) page_elements = [e for e in page_elements if e["text"].strip()] page_elements = self._merge_continuation_paragraphs(page_elements) page_elements = self._split_combined_list_items(page_elements) page_elements.extend(tables) page_elements.extend(images) page_elements = self._get_reading_order(page_elements, page_rect.width) all_pages.append({ "page_number": page_num, "width": page_rect.width, "height": page_rect.height, "elements": page_elements }) return { "page_count": len(self.doc), "metadata": { "title": self.doc.metadata.get("title", ""), "author": self.doc.metadata.get("author", ""), "subject": self.doc.metadata.get("subject", ""), }, "pages": all_pages } def to_markdown(self, processed_data): """Convert processed data to markdown blocks WITH typography metadata.""" blocks = [] for page in processed_data.get("pages", []): for elem in page.get("elements", []): elem_type = elem.get("type") # Base block data block = None if elem_type == "title": block = {"type": "heading1", "content": f"# {elem.get('text', '')}"} elif elem_type == "subtitle": block = {"type": "heading2", "content": f"## {elem.get('text', '')}"} elif elem_type == "heading": block = {"type": "heading3", "content": f"### {elem.get('text', '')}"} elif elem_type == "paragraph": block = {"type": "paragraph", "content": elem.get('text', '')} elif elem_type == "list_item": text = elem.get('text', '') for pattern in self.LIST_PATTERNS: text = re.sub(pattern, '', text, count=1) block = {"type": "list_item", "content": f"- {text.strip()}"} elif elem_type == "numbered_list": text = elem.get('text', '') for pattern in self.LIST_PATTERNS: text = re.sub(pattern, '', text, count=1) block = {"type": "numbered_list", "content": f"1. {text.strip()}"} elif elem_type == "quote": block = {"type": "quote", "content": f"> {elem.get('text', '')}"} elif elem_type == "table": block = {"type": "table", "content": elem.get('markdown', '')} elif elem_type == "image": if elem.get("data"): block = { "type": "image", "content": f"![PDF Image](embedded-image.{elem.get('format', 'png')})", "data": elem.get("data"), "format": elem.get("format", "png") } if block: # ADD typography metadata for AI analysis if elem.get("font_size"): block["font_size"] = elem["font_size"] if elem.get("flags") is not None: block["is_bold"] = bool(elem["flags"] & (2 ** 4)) blocks.append(block) # Apply the list grouping logic return _group_lists(blocks) def process_pdf_to_markdown(pdf_bytes): processor = PDFProcessor(pdf_bytes) try: processed_data = processor.process() markdown_blocks = processor.to_markdown(processed_data) return { "page_count": processed_data["page_count"], "metadata": processed_data["metadata"], "markdown_blocks": markdown_blocks } finally: processor.close()