903 lines
40 KiB
Python
903 lines
40 KiB
Python
# docx_processor.py - DOCX/DOC Processing and Content Extraction
|
|
# FIXED: Split paragraphs that contain soft returns (<w:br/>) into separate blocks
|
|
# FIXED: Extract text from hyperlinks in DOCX
|
|
|
|
import io
|
|
import re
|
|
import base64
|
|
import email
|
|
import email.policy
|
|
import quopri
|
|
from html.parser import HTMLParser
|
|
from urllib.parse import unquote, urlparse
|
|
|
|
|
|
# ================================================================
|
|
# FORMAT DETECTION
|
|
# ================================================================
|
|
|
|
def detect_doc_format(file_bytes):
|
|
if not file_bytes or len(file_bytes) < 4:
|
|
return 'unknown'
|
|
if file_bytes[:4] == b'PK\x03\x04':
|
|
return 'docx'
|
|
if file_bytes[:8] == b'\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1':
|
|
return 'ole2'
|
|
if file_bytes[:4] == b'\xd0\xcf\x11\xe0':
|
|
return 'ole2'
|
|
|
|
header = file_bytes[:64]
|
|
for bom in [b'\xef\xbb\xbf', b'\xff\xfe', b'\xfe\xff']:
|
|
if header.startswith(bom):
|
|
header = header[len(bom):]
|
|
break
|
|
if header.lstrip().startswith(b'{\\rtf'):
|
|
return 'rtf'
|
|
|
|
header_str = header.decode('ascii', errors='ignore').strip()
|
|
if header_str.upper().startswith('MIME-VERSION'):
|
|
return 'mhtml'
|
|
sample_512 = file_bytes[:512].decode('ascii', errors='ignore').lower()
|
|
if 'mime-version' in sample_512 and 'content-type' in sample_512 and 'boundary' in sample_512:
|
|
return 'mhtml'
|
|
|
|
sample = file_bytes[:4096]
|
|
sample_str_lower = ''
|
|
for enc in ['utf-8', 'utf-16-le', 'utf-16-be', 'cp1252', 'latin-1']:
|
|
try:
|
|
sample_str_lower = sample.decode(enc, errors='ignore').lower().strip()
|
|
if sample_str_lower:
|
|
break
|
|
except Exception:
|
|
continue
|
|
if sample_str_lower:
|
|
html_markers = [
|
|
'<html', '<!doctype html', '<head', '<meta ',
|
|
'xmlns:w="urn:schemas-microsoft-com',
|
|
'xmlns:o="urn:schemas-microsoft-com',
|
|
'<o:documentproperties>', 'mso-',
|
|
]
|
|
for marker in html_markers:
|
|
if marker in sample_str_lower:
|
|
return 'html'
|
|
return 'unknown'
|
|
|
|
|
|
# ================================================================
|
|
# IMAGE NORMALIZATION HELPERS
|
|
# ================================================================
|
|
|
|
def _normalize_image_key(raw_location):
|
|
if not raw_location:
|
|
return ''
|
|
loc = raw_location.strip()
|
|
if loc.lower().startswith('cid:'):
|
|
loc = loc[4:]
|
|
for _ in range(3):
|
|
decoded = unquote(loc)
|
|
if decoded == loc:
|
|
break
|
|
loc = decoded
|
|
try:
|
|
parsed = urlparse(loc)
|
|
path = parsed.path if parsed.path else loc
|
|
except Exception:
|
|
path = loc
|
|
filename = path.replace('\\', '/').rsplit('/', 1)[-1].strip()
|
|
return filename.lower()
|
|
|
|
|
|
# ================================================================
|
|
# MHTML PROCESSOR
|
|
# ================================================================
|
|
|
|
class MHTMLProcessor:
|
|
def __init__(self, file_bytes):
|
|
self._file_bytes = file_bytes
|
|
self._embedded_images = {}
|
|
self._ordered_images = []
|
|
|
|
def process(self):
|
|
html_content = self._extract_html_from_mhtml()
|
|
if not html_content:
|
|
html_content = self._fallback_extract()
|
|
if not html_content:
|
|
return {
|
|
'metadata': {'title': '', 'author': '', 'subject': ''},
|
|
'blocks': [{'type': 'paragraph', 'content': '⚠️ Could not extract content from MHTML file.'}],
|
|
}
|
|
|
|
print(f" 📷 MHTML: {len(self._ordered_images)} image parts")
|
|
processor = HTMLDocProcessor(
|
|
html_content,
|
|
embedded_images=self._embedded_images,
|
|
ordered_images=self._ordered_images
|
|
)
|
|
result = processor.process()
|
|
img_blocks = sum(1 for b in result.get('blocks', []) if b.get('type') == 'image')
|
|
print(f"📄 MHTML processed: {len(result.get('blocks', []))} blocks ({img_blocks} images)")
|
|
return result
|
|
|
|
def _store_image(self, payload_bytes, content_type, content_location, content_id):
|
|
fmt = content_type.split('/')[-1].lower()
|
|
if fmt in ('x-wmf', 'x-emf', 'wmf', 'emf'):
|
|
return
|
|
if fmt == 'jpg':
|
|
fmt = 'jpeg'
|
|
b64_data = base64.b64encode(payload_bytes).decode('ascii')
|
|
self._ordered_images.append((b64_data, fmt, content_location or content_id or ''))
|
|
if content_location:
|
|
self._embedded_images[content_location] = (b64_data, fmt)
|
|
norm = _normalize_image_key(content_location)
|
|
if norm:
|
|
self._embedded_images[norm] = (b64_data, fmt)
|
|
if content_id:
|
|
self._embedded_images[f'cid:{content_id}'] = (b64_data, fmt)
|
|
self._embedded_images[content_id] = (b64_data, fmt)
|
|
|
|
def _extract_html_from_mhtml(self):
|
|
try:
|
|
msg = email.message_from_bytes(self._file_bytes, policy=email.policy.default)
|
|
html_body = None
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
ct = part.get_content_type()
|
|
cl = part.get('Content-Location', '').strip()
|
|
cid = part.get('Content-ID', '').strip('<> ')
|
|
if ct == 'text/html':
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
cs = part.get_content_charset() or 'utf-8'
|
|
try: html_body = payload.decode(cs, errors='ignore')
|
|
except: html_body = payload.decode('utf-8', errors='ignore')
|
|
elif ct and ct.startswith('image/'):
|
|
payload = part.get_payload(decode=True)
|
|
if payload and len(payload) > 100:
|
|
self._store_image(payload, ct, cl, cid)
|
|
else:
|
|
ct = msg.get_content_type()
|
|
if ct in ('text/html', 'multipart/related'):
|
|
payload = msg.get_payload(decode=True)
|
|
if payload:
|
|
cs = msg.get_content_charset() or 'utf-8'
|
|
try: html_body = payload.decode(cs, errors='ignore')
|
|
except: html_body = payload.decode('utf-8', errors='ignore')
|
|
return html_body
|
|
except Exception as e:
|
|
print(f" ⚠️ MIME parsing failed: {e}")
|
|
return None
|
|
|
|
def _fallback_extract(self):
|
|
try:
|
|
text = self._file_bytes.decode('ascii', errors='ignore')
|
|
bm = re.search(r'boundary="?([^\s";\r\n]+)"?', text, re.IGNORECASE)
|
|
if not bm: return None
|
|
boundary = bm.group(1)
|
|
parts = text.split(f'--{boundary}')
|
|
html_body = None
|
|
for part in parts:
|
|
he = part.find('\r\n\r\n')
|
|
if he == -1: he = part.find('\n\n')
|
|
if he == -1: continue
|
|
hs = part[:he]; body = part[he:].strip()
|
|
ctm = re.search(r'Content-Type:\s*([^\s;]+)', hs, re.IGNORECASE)
|
|
ct = ctm.group(1).lower() if ctm else ''
|
|
is_qp = bool(re.search(r'Content-Transfer-Encoding:\s*quoted-printable', hs, re.IGNORECASE))
|
|
is_b64 = bool(re.search(r'Content-Transfer-Encoding:\s*base64', hs, re.IGNORECASE))
|
|
clm = re.search(r'Content-Location:\s*(.+?)[\r\n]', hs, re.IGNORECASE)
|
|
cl = clm.group(1).strip() if clm else ''
|
|
cidm = re.search(r'Content-ID:\s*<?([^>\s\r\n]+)>?', hs, re.IGNORECASE)
|
|
cid = cidm.group(1).strip() if cidm else ''
|
|
if ct == 'text/html':
|
|
if is_qp: body = quopri.decodestring(body.encode('ascii', errors='ignore')).decode('utf-8', errors='ignore')
|
|
elif is_b64:
|
|
try: body = base64.b64decode(body).decode('utf-8', errors='ignore')
|
|
except: pass
|
|
if '<html' in body.lower() or '<body' in body.lower(): html_body = body
|
|
elif ct.startswith('image/') and is_b64 and body:
|
|
clean_b64 = re.sub(r'\s+', '', body)
|
|
try:
|
|
pb = base64.b64decode(clean_b64)
|
|
if len(pb) > 100: self._store_image(pb, ct, cl, cid)
|
|
except: pass
|
|
return html_body
|
|
except Exception as e:
|
|
print(f" ⚠️ Fallback MHTML failed: {e}")
|
|
return None
|
|
|
|
|
|
# ================================================================
|
|
# HTML COMMENT CLEANUP
|
|
# ================================================================
|
|
|
|
def _clean_html_comments(html_text):
|
|
html_text = re.sub(r'<!--\[if\s+!vml\]-->(.*?)<!--\[endif\]-->', r'\1', html_text, flags=re.DOTALL|re.IGNORECASE)
|
|
html_text = re.sub(r'<!--\[if\s+!mso\]-->(.*?)<!--\[endif\]-->', r'\1', html_text, flags=re.DOTALL|re.IGNORECASE)
|
|
html_text = re.sub(r'<!--\[if\s[^\]]*\]>.*?<!\[endif\]-->', '', html_text, flags=re.DOTALL|re.IGNORECASE)
|
|
html_text = re.sub(r'<!--.*?-->', '', html_text, flags=re.DOTALL)
|
|
return html_text
|
|
|
|
|
|
# ================================================================
|
|
# HTML DOC PROCESSOR
|
|
# ================================================================
|
|
|
|
class HTMLDocProcessor:
|
|
def __init__(self, file_bytes, embedded_images=None, ordered_images=None):
|
|
if isinstance(file_bytes, str):
|
|
self._html_text = file_bytes
|
|
self._file_bytes = file_bytes.encode('utf-8', errors='ignore')
|
|
else:
|
|
self._file_bytes = file_bytes
|
|
self._html_text = self._decode_html()
|
|
self._embedded_images = embedded_images or {}
|
|
self._ordered_images = ordered_images or []
|
|
self._used_image_indices = set()
|
|
|
|
def _decode_html(self):
|
|
if self._file_bytes[:3] == b'\xef\xbb\xbf': return self._file_bytes[3:].decode('utf-8', errors='ignore')
|
|
if self._file_bytes[:2] == b'\xff\xfe': return self._file_bytes[2:].decode('utf-16-le', errors='ignore')
|
|
if self._file_bytes[:2] == b'\xfe\xff': return self._file_bytes[2:].decode('utf-16-be', errors='ignore')
|
|
sample = self._file_bytes[:4096]
|
|
try:
|
|
st = sample.decode('ascii', errors='ignore')
|
|
cm = re.search(r'charset[="\s]+([a-zA-Z0-9\-]+)', st, re.IGNORECASE)
|
|
if cm:
|
|
try: return self._file_bytes.decode(cm.group(1).strip().strip('"\''), errors='ignore')
|
|
except: pass
|
|
except: pass
|
|
for enc in ['utf-8', 'cp1252', 'latin-1']:
|
|
try: return self._file_bytes.decode(enc, errors='ignore')
|
|
except: continue
|
|
return self._file_bytes.decode('latin-1', errors='replace')
|
|
|
|
def process(self):
|
|
metadata = {'title': '', 'author': '', 'subject': ''}
|
|
tm = re.search(r'<title[^>]*>(.*?)</title>', self._html_text, re.IGNORECASE|re.DOTALL)
|
|
if tm: metadata['title'] = self._strip_tags(tm.group(1)).strip()
|
|
blocks = self._extract_all_blocks()
|
|
blocks = [b for b in blocks if b.get('content', '').strip() or b.get('data')]
|
|
if not blocks: blocks = self._simple_extract()
|
|
img_count = sum(1 for b in blocks if b.get('type') == 'image')
|
|
print(f"📄 HTML-DOC processed: {len(blocks)} blocks ({img_count} images)")
|
|
return {'metadata': metadata, 'blocks': blocks}
|
|
|
|
def _strip_tags(self, html_str):
|
|
import html as hm
|
|
return hm.unescape(re.sub(r'<[^>]+>', '', html_str))
|
|
|
|
def _resolve_image_src(self, src):
|
|
import html as hm
|
|
if not src: return None, None
|
|
src = hm.unescape(src).strip()
|
|
if src.startswith('data:image'):
|
|
dm = re.match(r'data:image/([^;]+);base64,(.+)', src, re.DOTALL)
|
|
if dm: return dm.group(2).strip(), dm.group(1)
|
|
if src in self._embedded_images:
|
|
self._mark_used(self._embedded_images[src][0]); return self._embedded_images[src]
|
|
ns = _normalize_image_key(src)
|
|
if ns and ns in self._embedded_images:
|
|
self._mark_used(self._embedded_images[ns][0]); return self._embedded_images[ns]
|
|
if ns and '.' in ns:
|
|
nne = ns.rsplit('.', 1)[0]
|
|
if nne and nne in self._embedded_images:
|
|
self._mark_used(self._embedded_images[nne][0]); return self._embedded_images[nne]
|
|
if ns:
|
|
for loc, (data, fmt) in self._embedded_images.items():
|
|
ln = _normalize_image_key(loc)
|
|
if ln and ns and ln == ns: self._mark_used(data); return data, fmt
|
|
return self._get_next_unused()
|
|
|
|
def _mark_used(self, data_prefix):
|
|
p = data_prefix[:60]
|
|
for i, (b, f, l) in enumerate(self._ordered_images):
|
|
if i not in self._used_image_indices and b[:60] == p:
|
|
self._used_image_indices.add(i); return
|
|
|
|
def _get_next_unused(self):
|
|
for i, (b, f, l) in enumerate(self._ordered_images):
|
|
if i not in self._used_image_indices:
|
|
self._used_image_indices.add(i); return b, f
|
|
return None, None
|
|
|
|
def _extract_all_blocks(self):
|
|
import html as hm
|
|
blocks = []
|
|
cleaned = re.sub(r'<script[^>]*>.*?</script>', '', self._html_text, flags=re.DOTALL|re.IGNORECASE)
|
|
cleaned = re.sub(r'<style[^>]*>.*?</style>', '', cleaned, flags=re.DOTALL|re.IGNORECASE)
|
|
vml_srcs = []
|
|
for vm in re.finditer(r'<!--\[if\s[^\]]*vml[^\]]*\]>(.*?)<!\[endif\]-->', cleaned, re.DOTALL|re.IGNORECASE):
|
|
for im in re.finditer(r'<v:imagedata\b[^>]*?\bsrc\s*=\s*["\']([^"\']+)["\']', vm.group(1), re.IGNORECASE|re.DOTALL):
|
|
vml_srcs.append((hm.unescape(im.group(1)), vm.start()))
|
|
cleaned = _clean_html_comments(cleaned)
|
|
cleaned = re.sub(r'</?[ovw]:[^>]+>', '', cleaned, flags=re.IGNORECASE)
|
|
bm = re.search(r'<body[^>]*>(.*)</body>', cleaned, re.IGNORECASE|re.DOTALL)
|
|
if bm: cleaned = bm.group(1)
|
|
|
|
img_entries = []
|
|
for m in re.finditer(r'<img\b([^>]*?)/?\s*>', cleaned, re.IGNORECASE|re.DOTALL):
|
|
sm = re.search(r'\bsrc\s*=\s*["\']([^"\']+)["\']', m.group(1), re.IGNORECASE)
|
|
if not sm: sm = re.search(r'\bsrc\s*=\s*(\S+)', m.group(1), re.IGNORECASE)
|
|
if sm: img_entries.append((hm.unescape(sm.group(1)), m.start()))
|
|
if not img_entries and vml_srcs: img_entries = vml_srcs
|
|
|
|
self._used_image_indices = set()
|
|
for src, pos in img_entries:
|
|
d, f = self._resolve_image_src(src)
|
|
if d: blocks.append({'type':'image','content':f"",'data':d,'format':f,'_pos':pos})
|
|
|
|
for m in re.finditer(r'<(h[1-6])\b[^>]*>(.*?)</\1\s*>', cleaned, re.IGNORECASE|re.DOTALL):
|
|
t = re.sub(r'\s+', ' ', self._strip_tags(m.group(2))).strip()
|
|
if t:
|
|
tag = m.group(1).lower()
|
|
p = {'h1':'# ','h2':'## '}.get(tag,'### ')
|
|
bt = {'h1':'heading1','h2':'heading2'}.get(tag,'heading3')
|
|
blocks.append({'type':bt,'content':f"{p}{t}",'_pos':m.start()})
|
|
|
|
for m in re.finditer(r'<table\b[^>]*>(.*?)</table\s*>', cleaned, re.IGNORECASE|re.DOTALL):
|
|
md = self._parse_table(m.group(1))
|
|
if md: blocks.append({'type':'table','content':md,'_pos':m.start()})
|
|
|
|
for m in re.finditer(r'<p\b([^>]*)>(.*?)</p\s*>', cleaned, re.IGNORECASE|re.DOTALL):
|
|
inner = m.group(2); attrs = m.group(1)
|
|
it = self._strip_tags(inner).strip()
|
|
hw = not it or all(c in ' \t\n\r\xa0' for c in it)
|
|
if hw: continue
|
|
t = re.sub(r'[ \t]+', ' ', re.sub(r'\n\s*\n', '\n', it)).strip()
|
|
if not t: continue
|
|
bt = 'paragraph'
|
|
cm = re.search(r'class\s*=\s*["\']?([^"\'>\s]+)', attrs, re.IGNORECASE)
|
|
cn = cm.group(1) if cm else ''
|
|
if 'MsoListParagraph' in cn:
|
|
t = re.sub(r'^[·•●○◦‣⁃]\s*', '', re.sub(r'^\d+[.)]\s*', '', t)); bt = 'list_item'
|
|
elif 'MsoTitle' in cn: bt = 'heading1'
|
|
elif 'MsoSubtitle' in cn: bt = 'heading2'
|
|
elif 'MsoQuote' in cn or 'MsoIntenseQuote' in cn: bt = 'quote'
|
|
pm = {'heading1':'# ','heading2':'## ','list_item':'- ','quote':'> '}
|
|
blocks.append({'type':bt,'content':f"{pm.get(bt,'')}{t}",'_pos':m.start()})
|
|
|
|
for m in re.finditer(r'<li\b[^>]*>(.*?)</li\s*>', cleaned, re.IGNORECASE|re.DOTALL):
|
|
t = re.sub(r'\s+', ' ', self._strip_tags(m.group(1))).strip()
|
|
if t: blocks.append({'type':'list_item','content':f"- {t}",'_pos':m.start()})
|
|
|
|
for m in re.finditer(r'<blockquote\b[^>]*>(.*?)</blockquote\s*>', cleaned, re.IGNORECASE|re.DOTALL):
|
|
t = re.sub(r'\s+', ' ', self._strip_tags(m.group(1))).strip()
|
|
if t: blocks.append({'type':'quote','content':f"> {t}",'_pos':m.start()})
|
|
|
|
for m in re.finditer(r'<div\b([^>]*)>(.*?)</div\s*>', cleaned, re.IGNORECASE|re.DOTALL):
|
|
if re.search(r'<(?:p|h[1-6]|table|div|ul|ol)\b', m.group(2), re.IGNORECASE): continue
|
|
t = re.sub(r'[ \t]+', ' ', self._strip_tags(m.group(2))).strip()
|
|
if t and len(t) > 1 and not all(c in ' \t\n\r\xa0' for c in t):
|
|
if not any(t in b.get('content','') for b in blocks):
|
|
blocks.append({'type':'paragraph','content':t,'_pos':m.start()})
|
|
|
|
blocks.sort(key=lambda b: b.get('_pos', 0))
|
|
seen = set(); deduped = []
|
|
for b in blocks:
|
|
b.pop('_pos', None)
|
|
if b.get('type') == 'image':
|
|
k = b.get('data','')[:60]
|
|
if k and k in seen: continue
|
|
if k: seen.add(k)
|
|
deduped.append(b)
|
|
else:
|
|
c = b.get('content','').strip()
|
|
if c and c not in seen: seen.add(c); deduped.append(b)
|
|
return deduped
|
|
|
|
def _parse_table(self, html):
|
|
rows = []
|
|
for rm in re.finditer(r'<tr\b[^>]*>(.*?)</tr\s*>', html, re.IGNORECASE|re.DOTALL):
|
|
cells = []
|
|
for cm in re.finditer(r'<t[dh]\b[^>]*>(.*?)</t[dh]\s*>', rm.group(1), re.IGNORECASE|re.DOTALL):
|
|
cells.append(re.sub(r'\s+', ' ', self._strip_tags(cm.group(1))).strip().replace('|','\\|'))
|
|
if cells: rows.append(cells)
|
|
if not rows: return ''
|
|
if all(len(r)==1 for r in rows) and len(rows)<=2: return ''
|
|
lines = []
|
|
for i, r in enumerate(rows):
|
|
lines.append('| '+' | '.join(r)+' |')
|
|
if i == 0: lines.append('| '+' | '.join(['---']*len(r))+' |')
|
|
return '\n'.join(lines)
|
|
|
|
def _simple_extract(self):
|
|
import html as hm
|
|
blocks = []; t = self._html_text
|
|
t = re.sub(r'<script[^>]*>.*?</script>', '', t, flags=re.DOTALL|re.IGNORECASE)
|
|
t = re.sub(r'<style[^>]*>.*?</style>', '', t, flags=re.DOTALL|re.IGNORECASE)
|
|
t = _clean_html_comments(t)
|
|
bm = re.search(r'<body[^>]*>(.*)</body>', t, re.IGNORECASE|re.DOTALL)
|
|
if bm: t = bm.group(1)
|
|
for tag, repl in [('br', '\n'), ('p', '\n\n'), ('div', '\n\n'), ('li', '\n'), ('tr', '\n'), ('table', '\n\n')]:
|
|
t = re.sub(rf'</?{tag}[^>]*>', repl, t, flags=re.IGNORECASE)
|
|
t = hm.unescape(re.sub(r'<[^>]+>', '', t))
|
|
for p in re.split(r'\n{2,}', t):
|
|
p = re.sub(r'[ \t]+', ' ', p).strip()
|
|
if p and len(p) > 1: blocks.append({'type':'paragraph','content':p})
|
|
return blocks
|
|
|
|
|
|
# ================================================================
|
|
# RTF DOC PROCESSOR
|
|
# ================================================================
|
|
|
|
class RTFDocProcessor:
|
|
def __init__(self, file_bytes): self._file_bytes = file_bytes
|
|
def process(self):
|
|
blocks = []; metadata = {'title':'','author':'','subject':''}
|
|
rtf = self._decode_rtf(); metadata.update(self._extract_meta(rtf))
|
|
pt = self._rtf_to_text(rtf)
|
|
if pt:
|
|
for p in re.split(r'\n{2,}', pt):
|
|
p = p.strip()
|
|
if not p: continue
|
|
if len(p) < 80 and p.isupper(): blocks.append({'type':'heading2','content':f"## {p}"})
|
|
else: blocks.append({'type':'paragraph','content':p})
|
|
print(f"📄 RTF-DOC processed: {len(blocks)} blocks")
|
|
return {'metadata': metadata, 'blocks': blocks}
|
|
def _decode_rtf(self):
|
|
d = self._file_bytes
|
|
for b in [b'\xef\xbb\xbf',b'\xff\xfe',b'\xfe\xff']:
|
|
if d.startswith(b): d = d[len(b):]; break
|
|
try: return d.decode('ascii', errors='ignore')
|
|
except: return d.decode('latin-1', errors='replace')
|
|
def _extract_meta(self, rtf):
|
|
m = {}
|
|
for f in ['title','author','subject']:
|
|
r = re.search(r'\\'+f+r'\s+([^}]+)', rtf)
|
|
if r: m[f] = r.group(1).strip()
|
|
return m
|
|
def _rtf_to_text(self, rtf):
|
|
try:
|
|
from striprtf.striprtf import rtf_to_text
|
|
return rtf_to_text(rtf, errors='ignore')
|
|
except ImportError: pass
|
|
except Exception: pass
|
|
t = rtf
|
|
for g in ['fonttbl','colortbl','stylesheet','info','header','footer']:
|
|
t = re.sub(r'\{\\'+re.escape(g)+r'[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', '', t, flags=re.DOTALL)
|
|
t = re.sub(r'\\par\b\s*','\n',t); t = re.sub(r'\\pard\b\s*','',t)
|
|
t = re.sub(r'\\line\b\s*','\n',t); t = re.sub(r'\\tab\b\s*','\t',t)
|
|
def hr(m):
|
|
try: return bytes([int(m.group(1),16)]).decode('cp1252',errors='ignore')
|
|
except: return ''
|
|
t = re.sub(r"\\\'([0-9a-fA-F]{2})", hr, t)
|
|
def ur(m):
|
|
try:
|
|
c = int(m.group(1))
|
|
if c < 0: c += 65536
|
|
return chr(c)
|
|
except: return ''
|
|
t = re.sub(r'\\u(-?\d+)\??', ur, t)
|
|
t = re.sub(r'\\[a-zA-Z]+\d*\s?','',t); t = re.sub(r'[{}]','',t)
|
|
return re.sub(r'\n{3,}','\n\n',re.sub(r' +',' ',t)).strip()
|
|
|
|
|
|
# ================================================================
|
|
# DOCX PROCESSOR (using python-docx)
|
|
# ================================================================
|
|
|
|
DOCX_NSMAP = {
|
|
'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main',
|
|
'a': 'http://schemas.openxmlformats.org/drawingml/2006/main',
|
|
'r': 'http://schemas.openxmlformats.org/officeDocument/2006/relationships',
|
|
}
|
|
|
|
W_NS = '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}'
|
|
R_NS = '{http://schemas.openxmlformats.org/officeDocument/2006/relationships}'
|
|
|
|
|
|
class DOCXProcessor:
|
|
"""Process DOCX files.
|
|
|
|
CRITICAL FIX: Extracts text from hyperlinks (<w:hyperlink>) which
|
|
paragraph.runs misses. Also splits paragraphs containing <w:br/>
|
|
(soft line breaks) into separate blocks when they represent
|
|
logically distinct paragraphs.
|
|
"""
|
|
|
|
HEADING_PATTERNS = {
|
|
'Title':'title','Subtitle':'subtitle',
|
|
'Heading 1':'heading1','Heading 2':'heading2',
|
|
'Heading 3':'heading3','Heading 4':'heading3',
|
|
'Heading 5':'heading3','Heading 6':'heading3',
|
|
'Heading 7':'heading3','Heading 8':'heading3','Heading 9':'heading3',
|
|
}
|
|
QUOTE_STYLES = {'Quote','Intense Quote','Block Text'}
|
|
LIST_BULLET_STYLES = {'List Bullet','List Bullet 2','List Bullet 3'}
|
|
LIST_NUMBER_STYLES = {'List Number','List Number 2','List Number 3','List Continue'}
|
|
|
|
def __init__(self, docx_bytes):
|
|
import docx as docx_module
|
|
self.doc = docx_module.Document(io.BytesIO(docx_bytes))
|
|
self._image_cache = {}
|
|
self._extract_all_images()
|
|
|
|
def _extract_all_images(self):
|
|
try:
|
|
for rel_id, rel in self.doc.part.rels.items():
|
|
if "image" in rel.reltype:
|
|
try:
|
|
ip = rel.target_part
|
|
ib = ip.blob; ct = ip.content_type or ''
|
|
fmt = 'png'
|
|
if 'jpeg' in ct or 'jpg' in ct: fmt = 'jpeg'
|
|
elif 'gif' in ct: fmt = 'gif'
|
|
elif 'bmp' in ct: fmt = 'bmp'
|
|
elif 'tiff' in ct: fmt = 'tiff'
|
|
elif 'webp' in ct: fmt = 'webp'
|
|
else:
|
|
pn = str(ip.partname) if hasattr(ip,'partname') else ''
|
|
if '.jpg' in pn or '.jpeg' in pn: fmt = 'jpeg'
|
|
elif '.gif' in pn: fmt = 'gif'
|
|
self._image_cache[rel_id] = (base64.b64encode(ib).decode('utf-8'), fmt)
|
|
except Exception as e: print(f" ⚠️ Image {rel_id}: {e}")
|
|
except Exception as e: print(f" ⚠️ Rels error: {e}")
|
|
|
|
def _get_paragraph_images(self, paragraph):
|
|
images = []
|
|
try:
|
|
for drawing in paragraph._element.findall('.//w:drawing', DOCX_NSMAP):
|
|
for blip in drawing.findall('.//a:blip', DOCX_NSMAP):
|
|
eid = blip.get(f'{R_NS}embed')
|
|
if eid and eid in self._image_cache:
|
|
d, f = self._image_cache[eid]
|
|
images.append({'data': d, 'format': f})
|
|
except Exception as e: print(f" ⚠️ Para images: {e}")
|
|
return images
|
|
|
|
def _get_paragraph_segments(self, paragraph):
|
|
"""Extract text from paragraph as a list of SEGMENTS split by <w:br/>.
|
|
|
|
Each segment is a list of (text, is_bold, is_italic) tuples.
|
|
Segments are separated by <w:br/> elements (soft line breaks).
|
|
|
|
This allows us to split a single <w:p> containing multiple
|
|
logical paragraphs (joined by <w:br/>) into separate blocks.
|
|
"""
|
|
segments = [[]] # List of segments, each segment is a list of (text, bold, italic)
|
|
|
|
# Walk all direct children of the paragraph element in order
|
|
# Children can be: <w:r> (run), <w:hyperlink>, <w:bookmarkStart>, etc.
|
|
for child in paragraph._element:
|
|
tag = child.tag
|
|
|
|
if tag == f'{W_NS}r':
|
|
# Direct run
|
|
self._process_run_element(child, segments)
|
|
|
|
elif tag == f'{W_NS}hyperlink':
|
|
# Hyperlink — contains <w:r> children
|
|
for run_elem in child.findall(f'{W_NS}r'):
|
|
self._process_run_element(run_elem, segments)
|
|
|
|
elif tag == f'{W_NS}smartTag':
|
|
# Smart tag — contains <w:r> children
|
|
for run_elem in child.findall(f'{W_NS}r'):
|
|
self._process_run_element(run_elem, segments)
|
|
|
|
elif tag == f'{W_NS}sdt':
|
|
# Structured document tag — may contain runs
|
|
for run_elem in child.iter(f'{W_NS}r'):
|
|
self._process_run_element(run_elem, segments)
|
|
|
|
return segments
|
|
|
|
def _process_run_element(self, run_elem, segments):
|
|
"""Process a single <w:r> element, adding text to segments.
|
|
|
|
If the run contains a <w:br/>, start a new segment.
|
|
"""
|
|
# Check for <w:br/> first — it means a line break
|
|
for elem in run_elem:
|
|
if elem.tag == f'{W_NS}br':
|
|
# Start a new segment
|
|
segments.append([])
|
|
elif elem.tag == f'{W_NS}t':
|
|
if elem.text:
|
|
is_bold, is_italic = self._get_run_formatting(run_elem)
|
|
segments[-1].append((elem.text, is_bold, is_italic))
|
|
|
|
def _get_run_formatting(self, run_elem):
|
|
"""Check if a <w:r> element has bold/italic formatting."""
|
|
is_bold = False
|
|
is_italic = False
|
|
rpr = run_elem.find(f'{W_NS}rPr')
|
|
if rpr is not None:
|
|
b = rpr.find(f'{W_NS}b')
|
|
if b is not None:
|
|
v = b.get(f'{W_NS}val')
|
|
is_bold = v is None or v not in ('0', 'false')
|
|
i = rpr.find(f'{W_NS}i')
|
|
if i is not None:
|
|
v = i.get(f'{W_NS}val')
|
|
is_italic = v is None or v not in ('0', 'false')
|
|
return is_bold, is_italic
|
|
|
|
def _segments_to_text(self, segment):
|
|
"""Convert a segment (list of (text, bold, italic) tuples) to markdown string."""
|
|
parts = []
|
|
for text, is_bold, is_italic in segment:
|
|
if is_bold and is_italic: parts.append(f"***{text}***")
|
|
elif is_bold: parts.append(f"**{text}**")
|
|
elif is_italic: parts.append(f"*{text}*")
|
|
else: parts.append(text)
|
|
return ''.join(parts)
|
|
|
|
def _segment_plain_text(self, segment):
|
|
"""Get plain text from a segment."""
|
|
return ''.join(text for text, _, _ in segment)
|
|
|
|
def _get_full_paragraph_plain_text(self, paragraph):
|
|
"""Get ALL plain text from paragraph including hyperlinks."""
|
|
texts = []
|
|
for t_elem in paragraph._element.iter(f'{W_NS}t'):
|
|
if t_elem.text:
|
|
texts.append(t_elem.text)
|
|
return ''.join(texts).strip()
|
|
|
|
def _classify_paragraph(self, paragraph):
|
|
sn = paragraph.style.name if paragraph.style else ''
|
|
for p, bt in self.HEADING_PATTERNS.items():
|
|
if sn == p or sn.startswith(p): return bt
|
|
if sn in self.QUOTE_STYLES: return 'quote'
|
|
if sn in self.LIST_BULLET_STYLES: return 'list_item'
|
|
if sn in self.LIST_NUMBER_STYLES: return 'numbered_list'
|
|
if sn == 'List Paragraph': return 'list_item'
|
|
if 'toc' in sn.lower(): return 'list_item'
|
|
return 'paragraph'
|
|
|
|
def _table_to_markdown(self, table):
|
|
rd = []
|
|
for r in table.rows:
|
|
rd.append([c.text.replace('|','\\|').replace('\n',' ').strip() for c in r.cells])
|
|
if not rd: return ""
|
|
lines = []
|
|
for i, r in enumerate(rd):
|
|
lines.append('| '+' | '.join(r)+' |')
|
|
if i == 0: lines.append('| '+' | '.join(['---']*len(r))+' |')
|
|
return '\n'.join(lines)
|
|
|
|
def _make_block(self, block_type, text):
|
|
tm = {
|
|
'title':('heading1','# '),'subtitle':('heading2','## '),
|
|
'heading1':('heading1','# '),'heading2':('heading2','## '),
|
|
'heading3':('heading3','### '),'quote':('quote','> '),
|
|
'list_item':('list_item','- '),'numbered_list':('list_item','1. '),
|
|
}
|
|
if block_type in tm:
|
|
bt, pf = tm[block_type]
|
|
return {'type': bt, 'content': f"{pf}{text}"}
|
|
return {'type': 'paragraph', 'content': text}
|
|
|
|
def _process_element(self, element, blocks):
|
|
from docx.table import Table as DocxTable
|
|
from docx.text.paragraph import Paragraph as DocxParagraph
|
|
|
|
if isinstance(element, DocxParagraph):
|
|
plain_text = self._get_full_paragraph_plain_text(element)
|
|
|
|
if not plain_text:
|
|
# Image-only paragraph
|
|
for img in self._get_paragraph_images(element):
|
|
blocks.append({
|
|
'type': 'image',
|
|
'content': f"",
|
|
'data': img['data'], 'format': img['format'],
|
|
})
|
|
return
|
|
|
|
# Extract images first
|
|
for img in self._get_paragraph_images(element):
|
|
blocks.append({
|
|
'type': 'image',
|
|
'content': f"",
|
|
'data': img['data'], 'format': img['format'],
|
|
})
|
|
|
|
block_type = self._classify_paragraph(element)
|
|
|
|
# Get text as segments split by <w:br/>
|
|
segments = self._get_paragraph_segments(element)
|
|
|
|
# Filter out empty segments
|
|
non_empty_segments = [s for s in segments if self._segment_plain_text(s).strip()]
|
|
|
|
if len(non_empty_segments) <= 1:
|
|
# Single segment — normal case, one block
|
|
text = self._segments_to_text(non_empty_segments[0]) if non_empty_segments else ''
|
|
if text.strip():
|
|
blocks.append(self._make_block(block_type, text))
|
|
else:
|
|
# Multiple segments — split into separate blocks
|
|
# First segment gets the paragraph's style classification
|
|
# Subsequent segments are treated as paragraphs unless they look like headings
|
|
for idx, seg in enumerate(non_empty_segments):
|
|
seg_text = self._segments_to_text(seg)
|
|
seg_plain = self._segment_plain_text(seg).strip()
|
|
|
|
if not seg_plain:
|
|
continue
|
|
|
|
if idx == 0:
|
|
# First segment keeps original type
|
|
blocks.append(self._make_block(block_type, seg_text))
|
|
else:
|
|
# Subsequent segments — detect if they look like headings
|
|
# (short, all bold, or specific patterns)
|
|
is_all_bold = all(b for _, b, _ in seg if _)
|
|
is_short = len(seg_plain) < 100
|
|
|
|
if is_all_bold and is_short and not seg_plain.endswith(('.', ':', ',')):
|
|
# Looks like a sub-heading
|
|
blocks.append(self._make_block('heading3', seg_text))
|
|
else:
|
|
blocks.append(self._make_block('paragraph', seg_text))
|
|
|
|
elif isinstance(element, DocxTable):
|
|
md = self._table_to_markdown(element)
|
|
if md.strip():
|
|
blocks.append({'type': 'table', 'content': md})
|
|
|
|
def process(self):
|
|
blocks = []; metadata = {'title':'','author':'','subject':''}
|
|
try:
|
|
cp = self.doc.core_properties
|
|
metadata['title'] = cp.title or ''
|
|
metadata['author'] = cp.author or ''
|
|
metadata['subject'] = cp.subject or ''
|
|
except: pass
|
|
|
|
try:
|
|
for element in self.doc.iter_inner_content():
|
|
self._process_element(element, blocks)
|
|
except AttributeError:
|
|
print(" ⚠️ iter_inner_content() not available, using fallback")
|
|
for p in self.doc.paragraphs: self._process_element(p, blocks)
|
|
for t in self.doc.tables: self._process_element(t, blocks)
|
|
|
|
img_count = sum(1 for b in blocks if b.get('type') == 'image')
|
|
print(f"📄 DOCX processed: {len(blocks)} blocks ({img_count} images)")
|
|
return {'metadata': metadata, 'blocks': blocks}
|
|
|
|
|
|
# ================================================================
|
|
# OLE2 DOC PROCESSOR
|
|
# ================================================================
|
|
|
|
class DOCProcessor:
|
|
def __init__(self, doc_bytes): self._doc_bytes = doc_bytes
|
|
def process(self):
|
|
blocks = []; metadata = {'title':'','author':'','subject':''}; imgs = []
|
|
try:
|
|
import olefile
|
|
ole = olefile.OleFileIO(io.BytesIO(self._doc_bytes))
|
|
try:
|
|
m = ole.get_metadata()
|
|
for f in ['title','author','subject']:
|
|
v = getattr(m,f,None)
|
|
if v: metadata[f] = v.decode('utf-8',errors='ignore') if isinstance(v,bytes) else str(v)
|
|
except: pass
|
|
imgs = self._extract_ole_images(ole)
|
|
if ole.exists('WordDocument'):
|
|
t = self._extract_text(ole)
|
|
if t:
|
|
for p in re.split(r'\r\n|\r|\n', t):
|
|
p = p.strip()
|
|
if p: blocks.append({'type':'paragraph','content':p})
|
|
ole.close()
|
|
except ImportError:
|
|
blocks = self._basic_extract(); imgs = self._scan_images(self._doc_bytes)
|
|
except Exception as e:
|
|
print(f" ⚠️ OLE failed: {e}")
|
|
blocks = self._basic_extract(); imgs = self._scan_images(self._doc_bytes)
|
|
if not blocks: blocks = self._basic_extract()
|
|
if imgs and blocks:
|
|
iv = max(1, len(blocks)//(len(imgs)+1)); r = []; ii = 0
|
|
for i, b in enumerate(blocks):
|
|
if ii < len(imgs) and i > 0 and i % iv == 0: r.append(imgs[ii]); ii += 1
|
|
r.append(b)
|
|
while ii < len(imgs): r.append(imgs[ii]); ii += 1
|
|
blocks = r
|
|
elif imgs: blocks = imgs + blocks
|
|
print(f"📄 DOC (OLE2): {len(blocks)} blocks ({len(imgs)} images)")
|
|
return {'metadata': metadata, 'blocks': blocks}
|
|
|
|
def _extract_ole_images(self, ole):
|
|
imgs = []
|
|
try:
|
|
for sp in ole.listdir():
|
|
try:
|
|
d = ole.openstream(sp).read()
|
|
if len(d) < 100: continue
|
|
if d[:3] == b'\xff\xd8\xff':
|
|
imgs.append({'type':'image','content':'','data':base64.b64encode(d).decode(),'format':'jpeg'}); continue
|
|
if d[:8] == b'\x89PNG\r\n\x1a\n':
|
|
imgs.append({'type':'image','content':'','data':base64.b64encode(d).decode(),'format':'png'}); continue
|
|
if len(d) > 2048: imgs.extend(self._scan_images(d))
|
|
except: continue
|
|
except: pass
|
|
seen = set(); return [i for i in imgs if (k:=i.get('data','')[:80]) and k not in seen and not seen.add(k)]
|
|
|
|
def _scan_images(self, data):
|
|
imgs = []; pos = 0
|
|
while pos < len(data)-3:
|
|
i = data.find(b'\xff\xd8\xff', pos)
|
|
if i == -1: break
|
|
e = data.find(b'\xff\xd9', i+3)
|
|
if e == -1: break
|
|
e += 2
|
|
if 2048 < e-i < 50*1024*1024:
|
|
imgs.append({'type':'image','content':'','data':base64.b64encode(data[i:e]).decode(),'format':'jpeg'})
|
|
pos = e
|
|
pos = 0
|
|
while pos < len(data)-8:
|
|
i = data.find(b'\x89PNG\r\n\x1a\n', pos)
|
|
if i == -1: break
|
|
e = data.find(b'IEND\xaeB`\x82', i+8)
|
|
if e == -1: break
|
|
e += 8
|
|
if 1024 < e-i < 50*1024*1024:
|
|
imgs.append({'type':'image','content':'','data':base64.b64encode(data[i:e]).decode(),'format':'png'})
|
|
pos = e
|
|
return imgs
|
|
|
|
def _extract_text(self, ole):
|
|
t = ''
|
|
try:
|
|
if ole.exists('WordDocument'):
|
|
s = ole.openstream('WordDocument').read()
|
|
d = s.decode('utf-16-le',errors='ignore')
|
|
c = ''.join(ch for ch in d if ch in '\r\n\t' or ch.isprintable())
|
|
if len(c) > 20: return c.strip()
|
|
except: pass
|
|
for sp in ole.listdir():
|
|
try:
|
|
d = ole.openstream(sp).read().decode('utf-16-le',errors='ignore')
|
|
c = ''.join(ch for ch in d if ch.isprintable() or ch in '\r\n\t')
|
|
if len(c) > len(t): t = c
|
|
except: pass
|
|
return t
|
|
|
|
def _basic_extract(self):
|
|
blocks = []
|
|
try:
|
|
d = self._doc_bytes.decode('utf-16-le',errors='ignore')
|
|
c = ''.join(ch for ch in d if ch.isprintable() or ch in '\r\n\t')
|
|
for p in c.split('\r'):
|
|
p = p.strip()
|
|
if len(p) > 3: blocks.append({'type':'paragraph','content':p})
|
|
except: pass
|
|
return blocks
|
|
|
|
|
|
# ================================================================
|
|
# MAIN ENTRY POINT
|
|
# ================================================================
|
|
|
|
def process_docx_to_markdown(file_bytes, filename=''):
|
|
fmt = detect_doc_format(file_bytes)
|
|
print(f" 🔍 File: {filename} | Format: {fmt} | Size: {len(file_bytes)}")
|
|
|
|
pmap = {'docx':DOCXProcessor,'mhtml':MHTMLProcessor,'html':HTMLDocProcessor,'rtf':RTFDocProcessor,'ole2':DOCProcessor}
|
|
|
|
if fmt in pmap:
|
|
try:
|
|
r = pmap[fmt](file_bytes).process()
|
|
if r.get('blocks'):
|
|
ic = sum(1 for b in r['blocks'] if b.get('type')=='image')
|
|
print(f" ✅ {len(r['blocks'])} blocks ({ic} images)")
|
|
return {'metadata':r['metadata'],'markdown_blocks':r['blocks']}
|
|
except Exception as e:
|
|
import traceback; print(f" ⚠️ {fmt} failed: {e}"); traceback.print_exc()
|
|
|
|
for fn, PC in [('DOCX',DOCXProcessor),('MHTML',MHTMLProcessor),('HTML',HTMLDocProcessor),('RTF',RTFDocProcessor),('OLE2',DOCProcessor)]:
|
|
try:
|
|
r = PC(file_bytes).process()
|
|
if r.get('blocks'):
|
|
print(f" ✅ Parsed as {fn}")
|
|
return {'metadata':r['metadata'],'markdown_blocks':r['blocks']}
|
|
except: continue
|
|
|
|
return {'metadata':{'title':'','author':'','subject':''},'markdown_blocks':[{'type':'paragraph','content':'⚠️ Could not extract content. Try saving as .docx.'}]}
|