Files
audiobook-maker-pro-v4/docx_processor.py
Ashim Kumar 8e02b9ad09 first commit
2026-02-20 13:53:36 +06:00

903 lines
40 KiB
Python

# docx_processor.py - DOCX/DOC Processing and Content Extraction
# FIXED: Split paragraphs that contain soft returns (<w:br/>) into separate blocks
# FIXED: Extract text from hyperlinks in DOCX
import io
import re
import base64
import email
import email.policy
import quopri
from html.parser import HTMLParser
from urllib.parse import unquote, urlparse
# ================================================================
# FORMAT DETECTION
# ================================================================
def detect_doc_format(file_bytes):
if not file_bytes or len(file_bytes) < 4:
return 'unknown'
if file_bytes[:4] == b'PK\x03\x04':
return 'docx'
if file_bytes[:8] == b'\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1':
return 'ole2'
if file_bytes[:4] == b'\xd0\xcf\x11\xe0':
return 'ole2'
header = file_bytes[:64]
for bom in [b'\xef\xbb\xbf', b'\xff\xfe', b'\xfe\xff']:
if header.startswith(bom):
header = header[len(bom):]
break
if header.lstrip().startswith(b'{\\rtf'):
return 'rtf'
header_str = header.decode('ascii', errors='ignore').strip()
if header_str.upper().startswith('MIME-VERSION'):
return 'mhtml'
sample_512 = file_bytes[:512].decode('ascii', errors='ignore').lower()
if 'mime-version' in sample_512 and 'content-type' in sample_512 and 'boundary' in sample_512:
return 'mhtml'
sample = file_bytes[:4096]
sample_str_lower = ''
for enc in ['utf-8', 'utf-16-le', 'utf-16-be', 'cp1252', 'latin-1']:
try:
sample_str_lower = sample.decode(enc, errors='ignore').lower().strip()
if sample_str_lower:
break
except Exception:
continue
if sample_str_lower:
html_markers = [
'<html', '<!doctype html', '<head', '<meta ',
'xmlns:w="urn:schemas-microsoft-com',
'xmlns:o="urn:schemas-microsoft-com',
'<o:documentproperties>', 'mso-',
]
for marker in html_markers:
if marker in sample_str_lower:
return 'html'
return 'unknown'
# ================================================================
# IMAGE NORMALIZATION HELPERS
# ================================================================
def _normalize_image_key(raw_location):
if not raw_location:
return ''
loc = raw_location.strip()
if loc.lower().startswith('cid:'):
loc = loc[4:]
for _ in range(3):
decoded = unquote(loc)
if decoded == loc:
break
loc = decoded
try:
parsed = urlparse(loc)
path = parsed.path if parsed.path else loc
except Exception:
path = loc
filename = path.replace('\\', '/').rsplit('/', 1)[-1].strip()
return filename.lower()
# ================================================================
# MHTML PROCESSOR
# ================================================================
class MHTMLProcessor:
def __init__(self, file_bytes):
self._file_bytes = file_bytes
self._embedded_images = {}
self._ordered_images = []
def process(self):
html_content = self._extract_html_from_mhtml()
if not html_content:
html_content = self._fallback_extract()
if not html_content:
return {
'metadata': {'title': '', 'author': '', 'subject': ''},
'blocks': [{'type': 'paragraph', 'content': '⚠️ Could not extract content from MHTML file.'}],
}
print(f" 📷 MHTML: {len(self._ordered_images)} image parts")
processor = HTMLDocProcessor(
html_content,
embedded_images=self._embedded_images,
ordered_images=self._ordered_images
)
result = processor.process()
img_blocks = sum(1 for b in result.get('blocks', []) if b.get('type') == 'image')
print(f"📄 MHTML processed: {len(result.get('blocks', []))} blocks ({img_blocks} images)")
return result
def _store_image(self, payload_bytes, content_type, content_location, content_id):
fmt = content_type.split('/')[-1].lower()
if fmt in ('x-wmf', 'x-emf', 'wmf', 'emf'):
return
if fmt == 'jpg':
fmt = 'jpeg'
b64_data = base64.b64encode(payload_bytes).decode('ascii')
self._ordered_images.append((b64_data, fmt, content_location or content_id or ''))
if content_location:
self._embedded_images[content_location] = (b64_data, fmt)
norm = _normalize_image_key(content_location)
if norm:
self._embedded_images[norm] = (b64_data, fmt)
if content_id:
self._embedded_images[f'cid:{content_id}'] = (b64_data, fmt)
self._embedded_images[content_id] = (b64_data, fmt)
def _extract_html_from_mhtml(self):
try:
msg = email.message_from_bytes(self._file_bytes, policy=email.policy.default)
html_body = None
if msg.is_multipart():
for part in msg.walk():
ct = part.get_content_type()
cl = part.get('Content-Location', '').strip()
cid = part.get('Content-ID', '').strip('<> ')
if ct == 'text/html':
payload = part.get_payload(decode=True)
if payload:
cs = part.get_content_charset() or 'utf-8'
try: html_body = payload.decode(cs, errors='ignore')
except: html_body = payload.decode('utf-8', errors='ignore')
elif ct and ct.startswith('image/'):
payload = part.get_payload(decode=True)
if payload and len(payload) > 100:
self._store_image(payload, ct, cl, cid)
else:
ct = msg.get_content_type()
if ct in ('text/html', 'multipart/related'):
payload = msg.get_payload(decode=True)
if payload:
cs = msg.get_content_charset() or 'utf-8'
try: html_body = payload.decode(cs, errors='ignore')
except: html_body = payload.decode('utf-8', errors='ignore')
return html_body
except Exception as e:
print(f" ⚠️ MIME parsing failed: {e}")
return None
def _fallback_extract(self):
try:
text = self._file_bytes.decode('ascii', errors='ignore')
bm = re.search(r'boundary="?([^\s";\r\n]+)"?', text, re.IGNORECASE)
if not bm: return None
boundary = bm.group(1)
parts = text.split(f'--{boundary}')
html_body = None
for part in parts:
he = part.find('\r\n\r\n')
if he == -1: he = part.find('\n\n')
if he == -1: continue
hs = part[:he]; body = part[he:].strip()
ctm = re.search(r'Content-Type:\s*([^\s;]+)', hs, re.IGNORECASE)
ct = ctm.group(1).lower() if ctm else ''
is_qp = bool(re.search(r'Content-Transfer-Encoding:\s*quoted-printable', hs, re.IGNORECASE))
is_b64 = bool(re.search(r'Content-Transfer-Encoding:\s*base64', hs, re.IGNORECASE))
clm = re.search(r'Content-Location:\s*(.+?)[\r\n]', hs, re.IGNORECASE)
cl = clm.group(1).strip() if clm else ''
cidm = re.search(r'Content-ID:\s*<?([^>\s\r\n]+)>?', hs, re.IGNORECASE)
cid = cidm.group(1).strip() if cidm else ''
if ct == 'text/html':
if is_qp: body = quopri.decodestring(body.encode('ascii', errors='ignore')).decode('utf-8', errors='ignore')
elif is_b64:
try: body = base64.b64decode(body).decode('utf-8', errors='ignore')
except: pass
if '<html' in body.lower() or '<body' in body.lower(): html_body = body
elif ct.startswith('image/') and is_b64 and body:
clean_b64 = re.sub(r'\s+', '', body)
try:
pb = base64.b64decode(clean_b64)
if len(pb) > 100: self._store_image(pb, ct, cl, cid)
except: pass
return html_body
except Exception as e:
print(f" ⚠️ Fallback MHTML failed: {e}")
return None
# ================================================================
# HTML COMMENT CLEANUP
# ================================================================
def _clean_html_comments(html_text):
html_text = re.sub(r'<!--\[if\s+!vml\]-->(.*?)<!--\[endif\]-->', r'\1', html_text, flags=re.DOTALL|re.IGNORECASE)
html_text = re.sub(r'<!--\[if\s+!mso\]-->(.*?)<!--\[endif\]-->', r'\1', html_text, flags=re.DOTALL|re.IGNORECASE)
html_text = re.sub(r'<!--\[if\s[^\]]*\]>.*?<!\[endif\]-->', '', html_text, flags=re.DOTALL|re.IGNORECASE)
html_text = re.sub(r'<!--.*?-->', '', html_text, flags=re.DOTALL)
return html_text
# ================================================================
# HTML DOC PROCESSOR
# ================================================================
class HTMLDocProcessor:
def __init__(self, file_bytes, embedded_images=None, ordered_images=None):
if isinstance(file_bytes, str):
self._html_text = file_bytes
self._file_bytes = file_bytes.encode('utf-8', errors='ignore')
else:
self._file_bytes = file_bytes
self._html_text = self._decode_html()
self._embedded_images = embedded_images or {}
self._ordered_images = ordered_images or []
self._used_image_indices = set()
def _decode_html(self):
if self._file_bytes[:3] == b'\xef\xbb\xbf': return self._file_bytes[3:].decode('utf-8', errors='ignore')
if self._file_bytes[:2] == b'\xff\xfe': return self._file_bytes[2:].decode('utf-16-le', errors='ignore')
if self._file_bytes[:2] == b'\xfe\xff': return self._file_bytes[2:].decode('utf-16-be', errors='ignore')
sample = self._file_bytes[:4096]
try:
st = sample.decode('ascii', errors='ignore')
cm = re.search(r'charset[="\s]+([a-zA-Z0-9\-]+)', st, re.IGNORECASE)
if cm:
try: return self._file_bytes.decode(cm.group(1).strip().strip('"\''), errors='ignore')
except: pass
except: pass
for enc in ['utf-8', 'cp1252', 'latin-1']:
try: return self._file_bytes.decode(enc, errors='ignore')
except: continue
return self._file_bytes.decode('latin-1', errors='replace')
def process(self):
metadata = {'title': '', 'author': '', 'subject': ''}
tm = re.search(r'<title[^>]*>(.*?)</title>', self._html_text, re.IGNORECASE|re.DOTALL)
if tm: metadata['title'] = self._strip_tags(tm.group(1)).strip()
blocks = self._extract_all_blocks()
blocks = [b for b in blocks if b.get('content', '').strip() or b.get('data')]
if not blocks: blocks = self._simple_extract()
img_count = sum(1 for b in blocks if b.get('type') == 'image')
print(f"📄 HTML-DOC processed: {len(blocks)} blocks ({img_count} images)")
return {'metadata': metadata, 'blocks': blocks}
def _strip_tags(self, html_str):
import html as hm
return hm.unescape(re.sub(r'<[^>]+>', '', html_str))
def _resolve_image_src(self, src):
import html as hm
if not src: return None, None
src = hm.unescape(src).strip()
if src.startswith('data:image'):
dm = re.match(r'data:image/([^;]+);base64,(.+)', src, re.DOTALL)
if dm: return dm.group(2).strip(), dm.group(1)
if src in self._embedded_images:
self._mark_used(self._embedded_images[src][0]); return self._embedded_images[src]
ns = _normalize_image_key(src)
if ns and ns in self._embedded_images:
self._mark_used(self._embedded_images[ns][0]); return self._embedded_images[ns]
if ns and '.' in ns:
nne = ns.rsplit('.', 1)[0]
if nne and nne in self._embedded_images:
self._mark_used(self._embedded_images[nne][0]); return self._embedded_images[nne]
if ns:
for loc, (data, fmt) in self._embedded_images.items():
ln = _normalize_image_key(loc)
if ln and ns and ln == ns: self._mark_used(data); return data, fmt
return self._get_next_unused()
def _mark_used(self, data_prefix):
p = data_prefix[:60]
for i, (b, f, l) in enumerate(self._ordered_images):
if i not in self._used_image_indices and b[:60] == p:
self._used_image_indices.add(i); return
def _get_next_unused(self):
for i, (b, f, l) in enumerate(self._ordered_images):
if i not in self._used_image_indices:
self._used_image_indices.add(i); return b, f
return None, None
def _extract_all_blocks(self):
import html as hm
blocks = []
cleaned = re.sub(r'<script[^>]*>.*?</script>', '', self._html_text, flags=re.DOTALL|re.IGNORECASE)
cleaned = re.sub(r'<style[^>]*>.*?</style>', '', cleaned, flags=re.DOTALL|re.IGNORECASE)
vml_srcs = []
for vm in re.finditer(r'<!--\[if\s[^\]]*vml[^\]]*\]>(.*?)<!\[endif\]-->', cleaned, re.DOTALL|re.IGNORECASE):
for im in re.finditer(r'<v:imagedata\b[^>]*?\bsrc\s*=\s*["\']([^"\']+)["\']', vm.group(1), re.IGNORECASE|re.DOTALL):
vml_srcs.append((hm.unescape(im.group(1)), vm.start()))
cleaned = _clean_html_comments(cleaned)
cleaned = re.sub(r'</?[ovw]:[^>]+>', '', cleaned, flags=re.IGNORECASE)
bm = re.search(r'<body[^>]*>(.*)</body>', cleaned, re.IGNORECASE|re.DOTALL)
if bm: cleaned = bm.group(1)
img_entries = []
for m in re.finditer(r'<img\b([^>]*?)/?\s*>', cleaned, re.IGNORECASE|re.DOTALL):
sm = re.search(r'\bsrc\s*=\s*["\']([^"\']+)["\']', m.group(1), re.IGNORECASE)
if not sm: sm = re.search(r'\bsrc\s*=\s*(\S+)', m.group(1), re.IGNORECASE)
if sm: img_entries.append((hm.unescape(sm.group(1)), m.start()))
if not img_entries and vml_srcs: img_entries = vml_srcs
self._used_image_indices = set()
for src, pos in img_entries:
d, f = self._resolve_image_src(src)
if d: blocks.append({'type':'image','content':f"![Image](embedded-image.{f})",'data':d,'format':f,'_pos':pos})
for m in re.finditer(r'<(h[1-6])\b[^>]*>(.*?)</\1\s*>', cleaned, re.IGNORECASE|re.DOTALL):
t = re.sub(r'\s+', ' ', self._strip_tags(m.group(2))).strip()
if t:
tag = m.group(1).lower()
p = {'h1':'# ','h2':'## '}.get(tag,'### ')
bt = {'h1':'heading1','h2':'heading2'}.get(tag,'heading3')
blocks.append({'type':bt,'content':f"{p}{t}",'_pos':m.start()})
for m in re.finditer(r'<table\b[^>]*>(.*?)</table\s*>', cleaned, re.IGNORECASE|re.DOTALL):
md = self._parse_table(m.group(1))
if md: blocks.append({'type':'table','content':md,'_pos':m.start()})
for m in re.finditer(r'<p\b([^>]*)>(.*?)</p\s*>', cleaned, re.IGNORECASE|re.DOTALL):
inner = m.group(2); attrs = m.group(1)
it = self._strip_tags(inner).strip()
hw = not it or all(c in ' \t\n\r\xa0' for c in it)
if hw: continue
t = re.sub(r'[ \t]+', ' ', re.sub(r'\n\s*\n', '\n', it)).strip()
if not t: continue
bt = 'paragraph'
cm = re.search(r'class\s*=\s*["\']?([^"\'>\s]+)', attrs, re.IGNORECASE)
cn = cm.group(1) if cm else ''
if 'MsoListParagraph' in cn:
t = re.sub(r'^[·•●○◦‣⁃]\s*', '', re.sub(r'^\d+[.)]\s*', '', t)); bt = 'list_item'
elif 'MsoTitle' in cn: bt = 'heading1'
elif 'MsoSubtitle' in cn: bt = 'heading2'
elif 'MsoQuote' in cn or 'MsoIntenseQuote' in cn: bt = 'quote'
pm = {'heading1':'# ','heading2':'## ','list_item':'- ','quote':'> '}
blocks.append({'type':bt,'content':f"{pm.get(bt,'')}{t}",'_pos':m.start()})
for m in re.finditer(r'<li\b[^>]*>(.*?)</li\s*>', cleaned, re.IGNORECASE|re.DOTALL):
t = re.sub(r'\s+', ' ', self._strip_tags(m.group(1))).strip()
if t: blocks.append({'type':'list_item','content':f"- {t}",'_pos':m.start()})
for m in re.finditer(r'<blockquote\b[^>]*>(.*?)</blockquote\s*>', cleaned, re.IGNORECASE|re.DOTALL):
t = re.sub(r'\s+', ' ', self._strip_tags(m.group(1))).strip()
if t: blocks.append({'type':'quote','content':f"> {t}",'_pos':m.start()})
for m in re.finditer(r'<div\b([^>]*)>(.*?)</div\s*>', cleaned, re.IGNORECASE|re.DOTALL):
if re.search(r'<(?:p|h[1-6]|table|div|ul|ol)\b', m.group(2), re.IGNORECASE): continue
t = re.sub(r'[ \t]+', ' ', self._strip_tags(m.group(2))).strip()
if t and len(t) > 1 and not all(c in ' \t\n\r\xa0' for c in t):
if not any(t in b.get('content','') for b in blocks):
blocks.append({'type':'paragraph','content':t,'_pos':m.start()})
blocks.sort(key=lambda b: b.get('_pos', 0))
seen = set(); deduped = []
for b in blocks:
b.pop('_pos', None)
if b.get('type') == 'image':
k = b.get('data','')[:60]
if k and k in seen: continue
if k: seen.add(k)
deduped.append(b)
else:
c = b.get('content','').strip()
if c and c not in seen: seen.add(c); deduped.append(b)
return deduped
def _parse_table(self, html):
rows = []
for rm in re.finditer(r'<tr\b[^>]*>(.*?)</tr\s*>', html, re.IGNORECASE|re.DOTALL):
cells = []
for cm in re.finditer(r'<t[dh]\b[^>]*>(.*?)</t[dh]\s*>', rm.group(1), re.IGNORECASE|re.DOTALL):
cells.append(re.sub(r'\s+', ' ', self._strip_tags(cm.group(1))).strip().replace('|','\\|'))
if cells: rows.append(cells)
if not rows: return ''
if all(len(r)==1 for r in rows) and len(rows)<=2: return ''
lines = []
for i, r in enumerate(rows):
lines.append('| '+' | '.join(r)+' |')
if i == 0: lines.append('| '+' | '.join(['---']*len(r))+' |')
return '\n'.join(lines)
def _simple_extract(self):
import html as hm
blocks = []; t = self._html_text
t = re.sub(r'<script[^>]*>.*?</script>', '', t, flags=re.DOTALL|re.IGNORECASE)
t = re.sub(r'<style[^>]*>.*?</style>', '', t, flags=re.DOTALL|re.IGNORECASE)
t = _clean_html_comments(t)
bm = re.search(r'<body[^>]*>(.*)</body>', t, re.IGNORECASE|re.DOTALL)
if bm: t = bm.group(1)
for tag, repl in [('br', '\n'), ('p', '\n\n'), ('div', '\n\n'), ('li', '\n'), ('tr', '\n'), ('table', '\n\n')]:
t = re.sub(rf'</?{tag}[^>]*>', repl, t, flags=re.IGNORECASE)
t = hm.unescape(re.sub(r'<[^>]+>', '', t))
for p in re.split(r'\n{2,}', t):
p = re.sub(r'[ \t]+', ' ', p).strip()
if p and len(p) > 1: blocks.append({'type':'paragraph','content':p})
return blocks
# ================================================================
# RTF DOC PROCESSOR
# ================================================================
class RTFDocProcessor:
def __init__(self, file_bytes): self._file_bytes = file_bytes
def process(self):
blocks = []; metadata = {'title':'','author':'','subject':''}
rtf = self._decode_rtf(); metadata.update(self._extract_meta(rtf))
pt = self._rtf_to_text(rtf)
if pt:
for p in re.split(r'\n{2,}', pt):
p = p.strip()
if not p: continue
if len(p) < 80 and p.isupper(): blocks.append({'type':'heading2','content':f"## {p}"})
else: blocks.append({'type':'paragraph','content':p})
print(f"📄 RTF-DOC processed: {len(blocks)} blocks")
return {'metadata': metadata, 'blocks': blocks}
def _decode_rtf(self):
d = self._file_bytes
for b in [b'\xef\xbb\xbf',b'\xff\xfe',b'\xfe\xff']:
if d.startswith(b): d = d[len(b):]; break
try: return d.decode('ascii', errors='ignore')
except: return d.decode('latin-1', errors='replace')
def _extract_meta(self, rtf):
m = {}
for f in ['title','author','subject']:
r = re.search(r'\\'+f+r'\s+([^}]+)', rtf)
if r: m[f] = r.group(1).strip()
return m
def _rtf_to_text(self, rtf):
try:
from striprtf.striprtf import rtf_to_text
return rtf_to_text(rtf, errors='ignore')
except ImportError: pass
except Exception: pass
t = rtf
for g in ['fonttbl','colortbl','stylesheet','info','header','footer']:
t = re.sub(r'\{\\'+re.escape(g)+r'[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', '', t, flags=re.DOTALL)
t = re.sub(r'\\par\b\s*','\n',t); t = re.sub(r'\\pard\b\s*','',t)
t = re.sub(r'\\line\b\s*','\n',t); t = re.sub(r'\\tab\b\s*','\t',t)
def hr(m):
try: return bytes([int(m.group(1),16)]).decode('cp1252',errors='ignore')
except: return ''
t = re.sub(r"\\\'([0-9a-fA-F]{2})", hr, t)
def ur(m):
try:
c = int(m.group(1))
if c < 0: c += 65536
return chr(c)
except: return ''
t = re.sub(r'\\u(-?\d+)\??', ur, t)
t = re.sub(r'\\[a-zA-Z]+\d*\s?','',t); t = re.sub(r'[{}]','',t)
return re.sub(r'\n{3,}','\n\n',re.sub(r' +',' ',t)).strip()
# ================================================================
# DOCX PROCESSOR (using python-docx)
# ================================================================
DOCX_NSMAP = {
'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main',
'a': 'http://schemas.openxmlformats.org/drawingml/2006/main',
'r': 'http://schemas.openxmlformats.org/officeDocument/2006/relationships',
}
W_NS = '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}'
R_NS = '{http://schemas.openxmlformats.org/officeDocument/2006/relationships}'
class DOCXProcessor:
"""Process DOCX files.
CRITICAL FIX: Extracts text from hyperlinks (<w:hyperlink>) which
paragraph.runs misses. Also splits paragraphs containing <w:br/>
(soft line breaks) into separate blocks when they represent
logically distinct paragraphs.
"""
HEADING_PATTERNS = {
'Title':'title','Subtitle':'subtitle',
'Heading 1':'heading1','Heading 2':'heading2',
'Heading 3':'heading3','Heading 4':'heading3',
'Heading 5':'heading3','Heading 6':'heading3',
'Heading 7':'heading3','Heading 8':'heading3','Heading 9':'heading3',
}
QUOTE_STYLES = {'Quote','Intense Quote','Block Text'}
LIST_BULLET_STYLES = {'List Bullet','List Bullet 2','List Bullet 3'}
LIST_NUMBER_STYLES = {'List Number','List Number 2','List Number 3','List Continue'}
def __init__(self, docx_bytes):
import docx as docx_module
self.doc = docx_module.Document(io.BytesIO(docx_bytes))
self._image_cache = {}
self._extract_all_images()
def _extract_all_images(self):
try:
for rel_id, rel in self.doc.part.rels.items():
if "image" in rel.reltype:
try:
ip = rel.target_part
ib = ip.blob; ct = ip.content_type or ''
fmt = 'png'
if 'jpeg' in ct or 'jpg' in ct: fmt = 'jpeg'
elif 'gif' in ct: fmt = 'gif'
elif 'bmp' in ct: fmt = 'bmp'
elif 'tiff' in ct: fmt = 'tiff'
elif 'webp' in ct: fmt = 'webp'
else:
pn = str(ip.partname) if hasattr(ip,'partname') else ''
if '.jpg' in pn or '.jpeg' in pn: fmt = 'jpeg'
elif '.gif' in pn: fmt = 'gif'
self._image_cache[rel_id] = (base64.b64encode(ib).decode('utf-8'), fmt)
except Exception as e: print(f" ⚠️ Image {rel_id}: {e}")
except Exception as e: print(f" ⚠️ Rels error: {e}")
def _get_paragraph_images(self, paragraph):
images = []
try:
for drawing in paragraph._element.findall('.//w:drawing', DOCX_NSMAP):
for blip in drawing.findall('.//a:blip', DOCX_NSMAP):
eid = blip.get(f'{R_NS}embed')
if eid and eid in self._image_cache:
d, f = self._image_cache[eid]
images.append({'data': d, 'format': f})
except Exception as e: print(f" ⚠️ Para images: {e}")
return images
def _get_paragraph_segments(self, paragraph):
"""Extract text from paragraph as a list of SEGMENTS split by <w:br/>.
Each segment is a list of (text, is_bold, is_italic) tuples.
Segments are separated by <w:br/> elements (soft line breaks).
This allows us to split a single <w:p> containing multiple
logical paragraphs (joined by <w:br/>) into separate blocks.
"""
segments = [[]] # List of segments, each segment is a list of (text, bold, italic)
# Walk all direct children of the paragraph element in order
# Children can be: <w:r> (run), <w:hyperlink>, <w:bookmarkStart>, etc.
for child in paragraph._element:
tag = child.tag
if tag == f'{W_NS}r':
# Direct run
self._process_run_element(child, segments)
elif tag == f'{W_NS}hyperlink':
# Hyperlink — contains <w:r> children
for run_elem in child.findall(f'{W_NS}r'):
self._process_run_element(run_elem, segments)
elif tag == f'{W_NS}smartTag':
# Smart tag — contains <w:r> children
for run_elem in child.findall(f'{W_NS}r'):
self._process_run_element(run_elem, segments)
elif tag == f'{W_NS}sdt':
# Structured document tag — may contain runs
for run_elem in child.iter(f'{W_NS}r'):
self._process_run_element(run_elem, segments)
return segments
def _process_run_element(self, run_elem, segments):
"""Process a single <w:r> element, adding text to segments.
If the run contains a <w:br/>, start a new segment.
"""
# Check for <w:br/> first — it means a line break
for elem in run_elem:
if elem.tag == f'{W_NS}br':
# Start a new segment
segments.append([])
elif elem.tag == f'{W_NS}t':
if elem.text:
is_bold, is_italic = self._get_run_formatting(run_elem)
segments[-1].append((elem.text, is_bold, is_italic))
def _get_run_formatting(self, run_elem):
"""Check if a <w:r> element has bold/italic formatting."""
is_bold = False
is_italic = False
rpr = run_elem.find(f'{W_NS}rPr')
if rpr is not None:
b = rpr.find(f'{W_NS}b')
if b is not None:
v = b.get(f'{W_NS}val')
is_bold = v is None or v not in ('0', 'false')
i = rpr.find(f'{W_NS}i')
if i is not None:
v = i.get(f'{W_NS}val')
is_italic = v is None or v not in ('0', 'false')
return is_bold, is_italic
def _segments_to_text(self, segment):
"""Convert a segment (list of (text, bold, italic) tuples) to markdown string."""
parts = []
for text, is_bold, is_italic in segment:
if is_bold and is_italic: parts.append(f"***{text}***")
elif is_bold: parts.append(f"**{text}**")
elif is_italic: parts.append(f"*{text}*")
else: parts.append(text)
return ''.join(parts)
def _segment_plain_text(self, segment):
"""Get plain text from a segment."""
return ''.join(text for text, _, _ in segment)
def _get_full_paragraph_plain_text(self, paragraph):
"""Get ALL plain text from paragraph including hyperlinks."""
texts = []
for t_elem in paragraph._element.iter(f'{W_NS}t'):
if t_elem.text:
texts.append(t_elem.text)
return ''.join(texts).strip()
def _classify_paragraph(self, paragraph):
sn = paragraph.style.name if paragraph.style else ''
for p, bt in self.HEADING_PATTERNS.items():
if sn == p or sn.startswith(p): return bt
if sn in self.QUOTE_STYLES: return 'quote'
if sn in self.LIST_BULLET_STYLES: return 'list_item'
if sn in self.LIST_NUMBER_STYLES: return 'numbered_list'
if sn == 'List Paragraph': return 'list_item'
if 'toc' in sn.lower(): return 'list_item'
return 'paragraph'
def _table_to_markdown(self, table):
rd = []
for r in table.rows:
rd.append([c.text.replace('|','\\|').replace('\n',' ').strip() for c in r.cells])
if not rd: return ""
lines = []
for i, r in enumerate(rd):
lines.append('| '+' | '.join(r)+' |')
if i == 0: lines.append('| '+' | '.join(['---']*len(r))+' |')
return '\n'.join(lines)
def _make_block(self, block_type, text):
tm = {
'title':('heading1','# '),'subtitle':('heading2','## '),
'heading1':('heading1','# '),'heading2':('heading2','## '),
'heading3':('heading3','### '),'quote':('quote','> '),
'list_item':('list_item','- '),'numbered_list':('list_item','1. '),
}
if block_type in tm:
bt, pf = tm[block_type]
return {'type': bt, 'content': f"{pf}{text}"}
return {'type': 'paragraph', 'content': text}
def _process_element(self, element, blocks):
from docx.table import Table as DocxTable
from docx.text.paragraph import Paragraph as DocxParagraph
if isinstance(element, DocxParagraph):
plain_text = self._get_full_paragraph_plain_text(element)
if not plain_text:
# Image-only paragraph
for img in self._get_paragraph_images(element):
blocks.append({
'type': 'image',
'content': f"![Document Image](embedded-image.{img['format']})",
'data': img['data'], 'format': img['format'],
})
return
# Extract images first
for img in self._get_paragraph_images(element):
blocks.append({
'type': 'image',
'content': f"![Document Image](embedded-image.{img['format']})",
'data': img['data'], 'format': img['format'],
})
block_type = self._classify_paragraph(element)
# Get text as segments split by <w:br/>
segments = self._get_paragraph_segments(element)
# Filter out empty segments
non_empty_segments = [s for s in segments if self._segment_plain_text(s).strip()]
if len(non_empty_segments) <= 1:
# Single segment — normal case, one block
text = self._segments_to_text(non_empty_segments[0]) if non_empty_segments else ''
if text.strip():
blocks.append(self._make_block(block_type, text))
else:
# Multiple segments — split into separate blocks
# First segment gets the paragraph's style classification
# Subsequent segments are treated as paragraphs unless they look like headings
for idx, seg in enumerate(non_empty_segments):
seg_text = self._segments_to_text(seg)
seg_plain = self._segment_plain_text(seg).strip()
if not seg_plain:
continue
if idx == 0:
# First segment keeps original type
blocks.append(self._make_block(block_type, seg_text))
else:
# Subsequent segments — detect if they look like headings
# (short, all bold, or specific patterns)
is_all_bold = all(b for _, b, _ in seg if _)
is_short = len(seg_plain) < 100
if is_all_bold and is_short and not seg_plain.endswith(('.', ':', ',')):
# Looks like a sub-heading
blocks.append(self._make_block('heading3', seg_text))
else:
blocks.append(self._make_block('paragraph', seg_text))
elif isinstance(element, DocxTable):
md = self._table_to_markdown(element)
if md.strip():
blocks.append({'type': 'table', 'content': md})
def process(self):
blocks = []; metadata = {'title':'','author':'','subject':''}
try:
cp = self.doc.core_properties
metadata['title'] = cp.title or ''
metadata['author'] = cp.author or ''
metadata['subject'] = cp.subject or ''
except: pass
try:
for element in self.doc.iter_inner_content():
self._process_element(element, blocks)
except AttributeError:
print(" ⚠️ iter_inner_content() not available, using fallback")
for p in self.doc.paragraphs: self._process_element(p, blocks)
for t in self.doc.tables: self._process_element(t, blocks)
img_count = sum(1 for b in blocks if b.get('type') == 'image')
print(f"📄 DOCX processed: {len(blocks)} blocks ({img_count} images)")
return {'metadata': metadata, 'blocks': blocks}
# ================================================================
# OLE2 DOC PROCESSOR
# ================================================================
class DOCProcessor:
def __init__(self, doc_bytes): self._doc_bytes = doc_bytes
def process(self):
blocks = []; metadata = {'title':'','author':'','subject':''}; imgs = []
try:
import olefile
ole = olefile.OleFileIO(io.BytesIO(self._doc_bytes))
try:
m = ole.get_metadata()
for f in ['title','author','subject']:
v = getattr(m,f,None)
if v: metadata[f] = v.decode('utf-8',errors='ignore') if isinstance(v,bytes) else str(v)
except: pass
imgs = self._extract_ole_images(ole)
if ole.exists('WordDocument'):
t = self._extract_text(ole)
if t:
for p in re.split(r'\r\n|\r|\n', t):
p = p.strip()
if p: blocks.append({'type':'paragraph','content':p})
ole.close()
except ImportError:
blocks = self._basic_extract(); imgs = self._scan_images(self._doc_bytes)
except Exception as e:
print(f" ⚠️ OLE failed: {e}")
blocks = self._basic_extract(); imgs = self._scan_images(self._doc_bytes)
if not blocks: blocks = self._basic_extract()
if imgs and blocks:
iv = max(1, len(blocks)//(len(imgs)+1)); r = []; ii = 0
for i, b in enumerate(blocks):
if ii < len(imgs) and i > 0 and i % iv == 0: r.append(imgs[ii]); ii += 1
r.append(b)
while ii < len(imgs): r.append(imgs[ii]); ii += 1
blocks = r
elif imgs: blocks = imgs + blocks
print(f"📄 DOC (OLE2): {len(blocks)} blocks ({len(imgs)} images)")
return {'metadata': metadata, 'blocks': blocks}
def _extract_ole_images(self, ole):
imgs = []
try:
for sp in ole.listdir():
try:
d = ole.openstream(sp).read()
if len(d) < 100: continue
if d[:3] == b'\xff\xd8\xff':
imgs.append({'type':'image','content':'![Image](embedded-image.jpeg)','data':base64.b64encode(d).decode(),'format':'jpeg'}); continue
if d[:8] == b'\x89PNG\r\n\x1a\n':
imgs.append({'type':'image','content':'![Image](embedded-image.png)','data':base64.b64encode(d).decode(),'format':'png'}); continue
if len(d) > 2048: imgs.extend(self._scan_images(d))
except: continue
except: pass
seen = set(); return [i for i in imgs if (k:=i.get('data','')[:80]) and k not in seen and not seen.add(k)]
def _scan_images(self, data):
imgs = []; pos = 0
while pos < len(data)-3:
i = data.find(b'\xff\xd8\xff', pos)
if i == -1: break
e = data.find(b'\xff\xd9', i+3)
if e == -1: break
e += 2
if 2048 < e-i < 50*1024*1024:
imgs.append({'type':'image','content':'![Image](embedded-image.jpeg)','data':base64.b64encode(data[i:e]).decode(),'format':'jpeg'})
pos = e
pos = 0
while pos < len(data)-8:
i = data.find(b'\x89PNG\r\n\x1a\n', pos)
if i == -1: break
e = data.find(b'IEND\xaeB`\x82', i+8)
if e == -1: break
e += 8
if 1024 < e-i < 50*1024*1024:
imgs.append({'type':'image','content':'![Image](embedded-image.png)','data':base64.b64encode(data[i:e]).decode(),'format':'png'})
pos = e
return imgs
def _extract_text(self, ole):
t = ''
try:
if ole.exists('WordDocument'):
s = ole.openstream('WordDocument').read()
d = s.decode('utf-16-le',errors='ignore')
c = ''.join(ch for ch in d if ch in '\r\n\t' or ch.isprintable())
if len(c) > 20: return c.strip()
except: pass
for sp in ole.listdir():
try:
d = ole.openstream(sp).read().decode('utf-16-le',errors='ignore')
c = ''.join(ch for ch in d if ch.isprintable() or ch in '\r\n\t')
if len(c) > len(t): t = c
except: pass
return t
def _basic_extract(self):
blocks = []
try:
d = self._doc_bytes.decode('utf-16-le',errors='ignore')
c = ''.join(ch for ch in d if ch.isprintable() or ch in '\r\n\t')
for p in c.split('\r'):
p = p.strip()
if len(p) > 3: blocks.append({'type':'paragraph','content':p})
except: pass
return blocks
# ================================================================
# MAIN ENTRY POINT
# ================================================================
def process_docx_to_markdown(file_bytes, filename=''):
fmt = detect_doc_format(file_bytes)
print(f" 🔍 File: {filename} | Format: {fmt} | Size: {len(file_bytes)}")
pmap = {'docx':DOCXProcessor,'mhtml':MHTMLProcessor,'html':HTMLDocProcessor,'rtf':RTFDocProcessor,'ole2':DOCProcessor}
if fmt in pmap:
try:
r = pmap[fmt](file_bytes).process()
if r.get('blocks'):
ic = sum(1 for b in r['blocks'] if b.get('type')=='image')
print(f"{len(r['blocks'])} blocks ({ic} images)")
return {'metadata':r['metadata'],'markdown_blocks':r['blocks']}
except Exception as e:
import traceback; print(f" ⚠️ {fmt} failed: {e}"); traceback.print_exc()
for fn, PC in [('DOCX',DOCXProcessor),('MHTML',MHTMLProcessor),('HTML',HTMLDocProcessor),('RTF',RTFDocProcessor),('OLE2',DOCProcessor)]:
try:
r = PC(file_bytes).process()
if r.get('blocks'):
print(f" ✅ Parsed as {fn}")
return {'metadata':r['metadata'],'markdown_blocks':r['blocks']}
except: continue
return {'metadata':{'title':'','author':'','subject':''},'markdown_blocks':[{'type':'paragraph','content':'⚠️ Could not extract content. Try saving as .docx.'}]}