import os import json import uuid import base64 import sqlite3 import io import zipfile from datetime import datetime from functools import wraps from contextlib import contextmanager # --- ENV SETUP --- from dotenv import load_dotenv load_dotenv() import requests from flask import Flask, request, jsonify, send_from_directory, send_file, g, session, redirect, url_for, render_template app = Flask(__name__, static_folder='static', static_url_path='/static') # --- APP CONFIG --- # Database path - use environment variable or default to /app/data for container DATABASE_DIR = os.getenv('DATABASE_DIR', '/app/data') # Ensure database directory exists with proper error handling def ensure_db_directory(): """Ensure database directory exists and is writable.""" global DATABASE_DIR # Try the configured directory first try: os.makedirs(DATABASE_DIR, exist_ok=True) # Test if we can write test_file = os.path.join(DATABASE_DIR, '.write_test') with open(test_file, 'w') as f: f.write('test') os.remove(test_file) print(f"✅ Database directory ready: {DATABASE_DIR}") return True except (PermissionError, OSError) as e: print(f"⚠️ Cannot use {DATABASE_DIR}: {e}") # Fallback to /app/data fallback_dir = '/app/data' try: os.makedirs(fallback_dir, exist_ok=True) test_file = os.path.join(fallback_dir, '.write_test') with open(test_file, 'w') as f: f.write('test') os.remove(test_file) DATABASE_DIR = fallback_dir print(f"✅ Using fallback database directory: {DATABASE_DIR}") return True except (PermissionError, OSError) as e: print(f"⚠️ Cannot use fallback {fallback_dir}: {e}") # Last resort: current directory fallback_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data') try: os.makedirs(fallback_dir, exist_ok=True) DATABASE_DIR = fallback_dir print(f"✅ Using local data directory: {DATABASE_DIR}") return True except Exception as e: print(f"❌ All directory attempts failed: {e}") return False # Initialize database directory ensure_db_directory() DATABASE = os.path.join(DATABASE_DIR, 'audio_editor.db') app.secret_key = os.getenv('SECRET_KEY', 'your-secret-key-change-in-production-' + str(uuid.uuid4())) # --- API CONFIG --- TTS_API_URL = os.getenv('TTS_API_URL', 'http://localhost:5010/api/v1') TTS_API_KEY = os.getenv('TTS_API_KEY', '') # --- READER TEMPLATE FILES --- READER_TEMPLATES_DIR = os.path.join(os.path.dirname(__file__), 'reader_templates') def get_api_headers(): """Get headers for API requests.""" return { 'X-API-Key': TTS_API_KEY } # --- AUDIO CONVERSION --- def convert_to_mp3(audio_base64, source_format='wav'): """Convert audio from any format to MP3 base64.""" try: from pydub import AudioSegment audio_bytes = base64.b64decode(audio_base64) audio_buffer = io.BytesIO(audio_bytes) audio = AudioSegment.from_file(audio_buffer, format=source_format) mp3_buffer = io.BytesIO() audio.export(mp3_buffer, format='mp3', bitrate='192k') mp3_buffer.seek(0) mp3_base64 = base64.b64encode(mp3_buffer.read()).decode('utf-8') return mp3_base64 except ImportError: print("⚠️ pydub not installed, returning original format") return audio_base64 except Exception as e: print(f"⚠️ MP3 conversion failed: {e}, returning original") return audio_base64 # --- DATABASE SETUP --- def get_db(): """Get database connection for current request.""" if 'db' not in g: g.db = sqlite3.connect(DATABASE) g.db.row_factory = sqlite3.Row return g.db @app.teardown_appcontext def close_db(error): """Close database connection at end of request.""" db = g.pop('db', None) if db is not None: db.close() @contextmanager def get_db_connection(): """Context manager for database connections outside request context.""" conn = sqlite3.connect(DATABASE) conn.row_factory = sqlite3.Row try: yield conn finally: conn.close() def init_db(): """Initialize database tables.""" with get_db_connection() as conn: cursor = conn.cursor() # Users table cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL UNIQUE, password TEXT NOT NULL, is_admin INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_login TIMESTAMP ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS uploads ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, filename TEXT NOT NULL, audio_data TEXT NOT NULL, audio_format TEXT DEFAULT 'mp3', text_content TEXT NOT NULL, transcription TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS projects ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, name TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, UNIQUE(user_id, name) ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS project_sections ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_id INTEGER NOT NULL, chapter INTEGER NOT NULL, section INTEGER NOT NULL, audio_data TEXT, audio_format TEXT DEFAULT 'mp3', text_content TEXT NOT NULL, html_content TEXT, tts_text TEXT, transcription TEXT, voice TEXT DEFAULT 'af_heart', image_data TEXT, image_format TEXT DEFAULT 'png', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE, UNIQUE(project_id, chapter, section) ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS generations ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, name TEXT, audio_data TEXT NOT NULL, audio_format TEXT DEFAULT 'mp3', text_content TEXT NOT NULL, transcription TEXT, voice TEXT DEFAULT 'af_heart', speed REAL DEFAULT 1.0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE ) ''') conn.commit() # Create default admin user if not exists cursor.execute("SELECT id FROM users WHERE username = 'admin'") if not cursor.fetchone(): cursor.execute(''' INSERT INTO users (username, password, is_admin) VALUES (?, ?, ?) ''', ('admin', 'admin123', 1)) conn.commit() print("📦 Created default admin user (admin/admin123)") # Migration: Add user_id columns if they don't exist migrations = [ ("user_id", "uploads", "ALTER TABLE uploads ADD COLUMN user_id INTEGER DEFAULT 1"), ("user_id", "projects", "ALTER TABLE projects ADD COLUMN user_id INTEGER DEFAULT 1"), ("user_id", "generations", "ALTER TABLE generations ADD COLUMN user_id INTEGER DEFAULT 1"), ("html_content", "project_sections", "ALTER TABLE project_sections ADD COLUMN html_content TEXT"), ("tts_text", "project_sections", "ALTER TABLE project_sections ADD COLUMN tts_text TEXT"), ("image_data", "project_sections", "ALTER TABLE project_sections ADD COLUMN image_data TEXT"), ("image_format", "project_sections", "ALTER TABLE project_sections ADD COLUMN image_format TEXT DEFAULT 'png'"), ] for col_name, table_name, sql in migrations: try: cursor.execute(f"SELECT {col_name} FROM {table_name} LIMIT 1") except sqlite3.OperationalError: print(f"📦 Adding {col_name} column to {table_name}...") cursor.execute(sql) conn.commit() def vacuum_db(): """Run VACUUM to reclaim space after deletions.""" with get_db_connection() as conn: conn.execute('VACUUM') # Initialize database on startup print(f"📁 Database directory: {DATABASE_DIR}") print(f"📁 Database file: {DATABASE}") init_db() # --- AUTHENTICATION DECORATORS --- def login_required(f): """Decorator to require login for a route.""" @wraps(f) def decorated_function(*args, **kwargs): if 'user_id' not in session: if request.is_json or request.headers.get('Accept') == 'application/json': return jsonify({'error': 'Authentication required'}), 401 return redirect(url_for('login_page')) return f(*args, **kwargs) return decorated_function def admin_required(f): """Decorator to require admin access for a route.""" @wraps(f) def decorated_function(*args, **kwargs): if 'user_id' not in session: if request.is_json or request.headers.get('Accept') == 'application/json': return jsonify({'error': 'Authentication required'}), 401 return redirect(url_for('login_page')) if not session.get('is_admin'): if request.is_json or request.headers.get('Accept') == 'application/json': return jsonify({'error': 'Admin access required'}), 403 return redirect(url_for('index')) return f(*args, **kwargs) return decorated_function def get_current_user_id(): """Get the current logged-in user's ID.""" return session.get('user_id') # --- AUTH ROUTES --- @app.route('/login', methods=['GET']) def login_page(): """Render login page.""" if 'user_id' in session: return redirect(url_for('index')) return send_from_directory('templates', 'login.html') @app.route('/api/login', methods=['POST']) def api_login(): """Handle login API request.""" data = request.json username = data.get('username', '').strip() password = data.get('password', '') if not username or not password: return jsonify({'error': 'Username and password required'}), 400 db = get_db() cursor = db.cursor() cursor.execute('SELECT id, username, password, is_admin FROM users WHERE username = ?', (username,)) user = cursor.fetchone() if not user or user['password'] != password: return jsonify({'error': 'Invalid username or password'}), 401 # Update last login cursor.execute('UPDATE users SET last_login = CURRENT_TIMESTAMP WHERE id = ?', (user['id'],)) db.commit() # Set session session['user_id'] = user['id'] session['username'] = user['username'] session['is_admin'] = bool(user['is_admin']) return jsonify({ 'message': 'Login successful', 'user': { 'id': user['id'], 'username': user['username'], 'is_admin': bool(user['is_admin']) } }) @app.route('/api/logout', methods=['POST']) def api_logout(): """Handle logout API request.""" session.clear() return jsonify({'message': 'Logged out successfully'}) @app.route('/logout') def logout(): """Logout and redirect to login page.""" session.clear() return redirect(url_for('login_page')) @app.route('/api/me', methods=['GET']) @login_required def api_me(): """Get current user info.""" return jsonify({ 'user': { 'id': session.get('user_id'), 'username': session.get('username'), 'is_admin': session.get('is_admin', False) } }) # --- ADMIN ROUTES --- @app.route('/admin') @admin_required def admin_dashboard(): """Render admin dashboard.""" return send_from_directory('templates', 'admin.html') @app.route('/api/admin/users', methods=['GET']) @admin_required def list_users(): """List all users.""" db = get_db() cursor = db.cursor() cursor.execute(''' SELECT u.id, u.username, u.is_admin, u.created_at, u.last_login, (SELECT COUNT(*) FROM projects WHERE user_id = u.id) as project_count FROM users u ORDER BY u.created_at DESC ''') users = cursor.fetchall() return jsonify({ 'users': [{ 'id': u['id'], 'username': u['username'], 'is_admin': bool(u['is_admin']), 'created_at': u['created_at'], 'last_login': u['last_login'], 'project_count': u['project_count'] } for u in users] }) @app.route('/api/admin/users', methods=['POST']) @admin_required def create_user(): """Create a new user.""" data = request.json username = data.get('username', '').strip() password = data.get('password', '') is_admin = data.get('is_admin', False) if not username or not password: return jsonify({'error': 'Username and password required'}), 400 if len(username) < 3: return jsonify({'error': 'Username must be at least 3 characters'}), 400 if len(password) < 4: return jsonify({'error': 'Password must be at least 4 characters'}), 400 db = get_db() cursor = db.cursor() try: cursor.execute(''' INSERT INTO users (username, password, is_admin) VALUES (?, ?, ?) ''', (username, password, 1 if is_admin else 0)) db.commit() return jsonify({ 'message': 'User created successfully', 'user_id': cursor.lastrowid }) except sqlite3.IntegrityError: return jsonify({'error': 'Username already exists'}), 400 @app.route('/api/admin/users/', methods=['PUT']) @admin_required def update_user(user_id): """Update a user.""" data = request.json db = get_db() cursor = db.cursor() # Check if user exists cursor.execute('SELECT id, username FROM users WHERE id = ?', (user_id,)) user = cursor.fetchone() if not user: return jsonify({'error': 'User not found'}), 404 # Prevent modifying own admin status if user_id == session.get('user_id') and 'is_admin' in data: return jsonify({'error': 'Cannot modify your own admin status'}), 400 updates = [] params = [] if 'username' in data and data['username'].strip(): updates.append('username = ?') params.append(data['username'].strip()) if 'password' in data and data['password']: updates.append('password = ?') params.append(data['password']) if 'is_admin' in data: updates.append('is_admin = ?') params.append(1 if data['is_admin'] else 0) if not updates: return jsonify({'error': 'No updates provided'}), 400 params.append(user_id) try: cursor.execute(f"UPDATE users SET {', '.join(updates)} WHERE id = ?", params) db.commit() return jsonify({'message': 'User updated successfully'}) except sqlite3.IntegrityError: return jsonify({'error': 'Username already exists'}), 400 @app.route('/api/admin/users/', methods=['DELETE']) @admin_required def delete_user(user_id): """Delete a user.""" # Prevent self-deletion if user_id == session.get('user_id'): return jsonify({'error': 'Cannot delete your own account'}), 400 db = get_db() cursor = db.cursor() cursor.execute('SELECT id FROM users WHERE id = ?', (user_id,)) if not cursor.fetchone(): return jsonify({'error': 'User not found'}), 404 # Delete user and all related data (cascade) cursor.execute('DELETE FROM project_sections WHERE project_id IN (SELECT id FROM projects WHERE user_id = ?)', (user_id,)) cursor.execute('DELETE FROM projects WHERE user_id = ?', (user_id,)) cursor.execute('DELETE FROM uploads WHERE user_id = ?', (user_id,)) cursor.execute('DELETE FROM generations WHERE user_id = ?', (user_id,)) cursor.execute('DELETE FROM users WHERE id = ?', (user_id,)) db.commit() vacuum_db() return jsonify({'message': 'User deleted successfully'}) @app.route('/api/admin/stats', methods=['GET']) @admin_required def admin_stats(): """Get admin dashboard statistics.""" db = get_db() cursor = db.cursor() cursor.execute('SELECT COUNT(*) as count FROM users') user_count = cursor.fetchone()['count'] cursor.execute('SELECT COUNT(*) as count FROM projects') project_count = cursor.fetchone()['count'] cursor.execute('SELECT COUNT(*) as count FROM project_sections') section_count = cursor.fetchone()['count'] cursor.execute('SELECT COUNT(*) as count FROM uploads') upload_count = cursor.fetchone()['count'] cursor.execute('SELECT COUNT(*) as count FROM generations') generation_count = cursor.fetchone()['count'] db_size = os.path.getsize(DATABASE) if os.path.exists(DATABASE) else 0 return jsonify({ 'users': user_count, 'projects': project_count, 'sections': section_count, 'uploads': upload_count, 'generations': generation_count, 'database_size_mb': round(db_size / (1024 * 1024), 2) }) # --- ROUTES --- @app.route('/') @login_required def index(): return send_from_directory('templates', 'index.html') @app.route('/static/') def serve_static(filename): """Serve static files (CSS, JS).""" return send_from_directory('static', filename) @app.route('/api_status', methods=['GET']) @login_required def api_status(): """Check API server status.""" try: response = requests.get(f"{TTS_API_URL}/health", timeout=10) if response.status_code == 200: health_data = response.json() return jsonify({ 'connected': True, 'api_url': TTS_API_URL, 'status': health_data.get('status'), 'services': health_data.get('services'), 'device': health_data.get('device') }) else: return jsonify({ 'connected': False, 'api_url': TTS_API_URL, 'error': f'Status code: {response.status_code}' }) except Exception as e: return jsonify({ 'connected': False, 'api_url': TTS_API_URL, 'error': str(e) }) @app.route('/voices', methods=['GET']) @login_required def get_voices(): """Get available voices from the API.""" try: response = requests.get( f"{TTS_API_URL}/voices", headers=get_api_headers(), timeout=30 ) if response.status_code == 200: result = response.json() return jsonify({'voices': result.get('voices', [])}) else: return jsonify({'voices': [ {'id': 'af_heart', 'name': 'Heart (US Fem)'}, {'id': 'af_bella', 'name': 'Bella (US Fem)'}, {'id': 'am_adam', 'name': 'Adam (US Masc)'} ]}) except: return jsonify({'voices': [ {'id': 'af_heart', 'name': 'Heart (US Fem)'} ]}) # --- UPLOAD HANDLING --- @app.route('/upload', methods=['POST']) @login_required def upload_files(): """Upload audio and text files, get timestamps using the API.""" if 'audioFile' not in request.files or 'txtFile' not in request.files: return jsonify({'error': 'Missing files'}), 400 audio_file = request.files['audioFile'] txt_file = request.files['txtFile'] if not audio_file or not txt_file: return jsonify({'error': 'Invalid files'}), 400 audio_bytes = audio_file.read() audio_base64 = base64.b64encode(audio_bytes).decode('utf-8') source_format = audio_file.filename.rsplit('.', 1)[1].lower() if '.' in audio_file.filename else 'wav' text_content = txt_file.read().decode('utf-8') try: print("⏳ Calling Timestamp API...") files = {'audio_file': (audio_file.filename, audio_bytes)} data = {'text': text_content} response = requests.post( f"{TTS_API_URL}/timestamp", headers=get_api_headers(), files=files, data=data, timeout=120 ) if response.status_code != 200: error_detail = response.json().get('error', 'Unknown error') return jsonify({'error': f'API Error: {error_detail}'}), response.status_code result = response.json() transcription_data = result.get('timestamps', []) if source_format != 'mp3': print("🔄 Converting to MP3...") audio_base64 = convert_to_mp3(audio_base64, source_format) db = get_db() cursor = db.cursor() cursor.execute(''' INSERT INTO uploads (user_id, filename, audio_data, audio_format, text_content, transcription) VALUES (?, ?, ?, ?, ?, ?) ''', ( get_current_user_id(), audio_file.filename, audio_base64, 'mp3', text_content, json.dumps(transcription_data) )) db.commit() upload_id = cursor.lastrowid print(f"✅ Upload saved with ID: {upload_id}") return jsonify({ 'message': 'Success', 'upload_id': upload_id, 'audio_data': audio_base64, 'audio_format': 'mp3', 'text_content': text_content, 'transcription': transcription_data }) except requests.exceptions.RequestException as e: return jsonify({'error': f'API connection error: {str(e)}'}), 500 except Exception as e: import traceback traceback.print_exc() return jsonify({'error': str(e)}), 500 @app.route('/uploads', methods=['GET']) @login_required def list_uploads(): """List all uploaded files for current user.""" db = get_db() cursor = db.cursor() cursor.execute(''' SELECT id, filename, audio_format, created_at, LENGTH(audio_data) as audio_size, LENGTH(text_content) as text_size FROM uploads WHERE user_id = ? ORDER BY created_at DESC ''', (get_current_user_id(),)) rows = cursor.fetchall() uploads = [] for row in rows: uploads.append({ 'id': row['id'], 'filename': row['filename'], 'audio_format': row['audio_format'], 'created_at': row['created_at'], 'audio_size': row['audio_size'], 'text_size': row['text_size'] }) return jsonify({'uploads': uploads}) @app.route('/uploads/', methods=['GET']) @login_required def get_upload(upload_id): """Get a specific upload.""" db = get_db() cursor = db.cursor() cursor.execute('SELECT * FROM uploads WHERE id = ? AND user_id = ?', (upload_id, get_current_user_id())) row = cursor.fetchone() if not row: return jsonify({'error': 'Upload not found'}), 404 return jsonify({ 'id': row['id'], 'filename': row['filename'], 'audio_data': row['audio_data'], 'audio_format': row['audio_format'], 'text_content': row['text_content'], 'transcription': json.loads(row['transcription']) if row['transcription'] else [], 'created_at': row['created_at'] }) @app.route('/uploads/', methods=['DELETE']) @login_required def delete_upload(upload_id): """Delete an upload.""" db = get_db() cursor = db.cursor() cursor.execute('DELETE FROM uploads WHERE id = ? AND user_id = ?', (upload_id, get_current_user_id())) db.commit() if cursor.rowcount == 0: return jsonify({'error': 'Upload not found'}), 404 vacuum_db() return jsonify({'message': 'Upload deleted successfully'}) @app.route('/uploads//download', methods=['GET']) @login_required def download_upload(upload_id): """Download upload as zip file.""" db = get_db() cursor = db.cursor() cursor.execute('SELECT * FROM uploads WHERE id = ? AND user_id = ?', (upload_id, get_current_user_id())) row = cursor.fetchone() if not row: return jsonify({'error': 'Upload not found'}), 404 zip_buffer = io.BytesIO() base_name = row['filename'].rsplit('.', 1)[0] if '.' in row['filename'] else row['filename'] with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf: audio_bytes = base64.b64decode(row['audio_data']) zf.writestr(f"{base_name}.{row['audio_format']}", audio_bytes) zf.writestr(f"{base_name}.txt", row['text_content']) if row['transcription']: zf.writestr(f"{base_name}.json", row['transcription']) zip_buffer.seek(0) return send_file( zip_buffer, mimetype='application/zip', as_attachment=True, download_name=f"{base_name}.zip" ) # --- GENERATION HANDLING --- @app.route('/generate', methods=['POST']) @login_required def generate_audio(): """Generate audio from text using the TTS API, then get timestamps.""" data = request.json text_content = data.get('text', '') voice = data.get('voice', 'af_heart') speed = data.get('speed', 1.0) name = data.get('name', '') save_to_db = data.get('save_to_db', True) if not text_content: return jsonify({'error': 'No text provided'}), 400 try: print(f"🔊 Calling Generate Audio API: voice={voice}, speed={speed}") response = requests.post( f"{TTS_API_URL}/generate-audio", headers={**get_api_headers(), 'Content-Type': 'application/json'}, json={'text': text_content, 'voice': voice, 'speed': speed}, timeout=180 ) if response.status_code != 200: error_detail = response.json().get('error', 'Unknown error') return jsonify({'error': f'Generate Audio API Error: {error_detail}'}), response.status_code result = response.json() audio_base64 = result.get('audio_base64', '') source_format = result.get('audio_format', 'wav') if not audio_base64: return jsonify({'error': 'No audio data received from API'}), 500 print(f"💾 Audio generated successfully") print("⏳ Calling Timestamp API...") audio_bytes = base64.b64decode(audio_base64) files = {'audio_file': (f'audio.{source_format}', audio_bytes)} ts_data = {'text': text_content} ts_response = requests.post( f"{TTS_API_URL}/timestamp", headers=get_api_headers(), files=files, data=ts_data, timeout=120 ) if ts_response.status_code != 200: error_detail = ts_response.json().get('error', 'Unknown error') return jsonify({'error': f'Timestamp API Error: {error_detail}'}), ts_response.status_code ts_result = ts_response.json() transcription_data = ts_result.get('timestamps', []) if source_format != 'mp3': print("🔄 Converting to MP3...") audio_base64 = convert_to_mp3(audio_base64, source_format) generation_id = None if save_to_db: db = get_db() cursor = db.cursor() cursor.execute(''' INSERT INTO generations (user_id, name, audio_data, audio_format, text_content, transcription, voice, speed) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', ( get_current_user_id(), name or f"Generation {datetime.now().strftime('%Y-%m-%d %H:%M')}", audio_base64, 'mp3', text_content, json.dumps(transcription_data), voice, speed )) db.commit() generation_id = cursor.lastrowid print(f"✅ Generation saved with ID: {generation_id}") return jsonify({ 'message': 'Generated', 'generation_id': generation_id, 'audio_data': audio_base64, 'audio_format': 'mp3', 'text_content': text_content, 'transcription': transcription_data, 'voice': voice, 'speed': speed }) except requests.exceptions.RequestException as e: return jsonify({'error': f'API connection error: {str(e)}'}), 500 except Exception as e: import traceback traceback.print_exc() return jsonify({'error': str(e)}), 500 @app.route('/generations', methods=['GET']) @login_required def list_generations(): """List all generations for current user.""" db = get_db() cursor = db.cursor() cursor.execute(''' SELECT id, name, audio_format, voice, speed, created_at, LENGTH(audio_data) as audio_size, LENGTH(text_content) as text_size FROM generations WHERE user_id = ? ORDER BY created_at DESC ''', (get_current_user_id(),)) rows = cursor.fetchall() generations = [] for row in rows: generations.append({ 'id': row['id'], 'name': row['name'], 'audio_format': row['audio_format'], 'voice': row['voice'], 'speed': row['speed'], 'created_at': row['created_at'], 'audio_size': row['audio_size'], 'text_size': row['text_size'] }) return jsonify({'generations': generations}) @app.route('/generations/', methods=['GET']) @login_required def get_generation(gen_id): """Get a specific generation.""" db = get_db() cursor = db.cursor() cursor.execute('SELECT * FROM generations WHERE id = ? AND user_id = ?', (gen_id, get_current_user_id())) row = cursor.fetchone() if not row: return jsonify({'error': 'Generation not found'}), 404 return jsonify({ 'id': row['id'], 'name': row['name'], 'audio_data': row['audio_data'], 'audio_format': row['audio_format'], 'text_content': row['text_content'], 'transcription': json.loads(row['transcription']) if row['transcription'] else [], 'voice': row['voice'], 'speed': row['speed'], 'created_at': row['created_at'] }) @app.route('/generations/', methods=['DELETE']) @login_required def delete_generation(gen_id): """Delete a generation.""" db = get_db() cursor = db.cursor() cursor.execute('DELETE FROM generations WHERE id = ? AND user_id = ?', (gen_id, get_current_user_id())) db.commit() if cursor.rowcount == 0: return jsonify({'error': 'Generation not found'}), 404 vacuum_db() return jsonify({'message': 'Generation deleted successfully'}) @app.route('/generations//download', methods=['GET']) @login_required def download_generation(gen_id): """Download generation as zip file.""" db = get_db() cursor = db.cursor() cursor.execute('SELECT * FROM generations WHERE id = ? AND user_id = ?', (gen_id, get_current_user_id())) row = cursor.fetchone() if not row: return jsonify({'error': 'Generation not found'}), 404 zip_buffer = io.BytesIO() base_name = row['name'].replace(' ', '_') if row['name'] else f"generation_{gen_id}" base_name = "".join(c for c in base_name if c.isalnum() or c in ('_', '-')) with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf: audio_bytes = base64.b64decode(row['audio_data']) zf.writestr(f"{base_name}.{row['audio_format']}", audio_bytes) zf.writestr(f"{base_name}.txt", row['text_content']) if row['transcription']: zf.writestr(f"{base_name}.json", row['transcription']) zip_buffer.seek(0) return send_file( zip_buffer, mimetype='application/zip', as_attachment=True, download_name=f"{base_name}.zip" ) # --- PROJECT HANDLING (BULK) --- @app.route('/projects', methods=['GET']) @login_required def list_projects(): """List all projects for current user.""" db = get_db() cursor = db.cursor() cursor.execute(''' SELECT p.id, p.name, p.created_at, p.updated_at, COUNT(ps.id) as section_count FROM projects p LEFT JOIN project_sections ps ON p.id = ps.project_id WHERE p.user_id = ? GROUP BY p.id ORDER BY p.updated_at DESC ''', (get_current_user_id(),)) rows = cursor.fetchall() projects = [] for row in rows: projects.append({ 'id': row['id'], 'name': row['name'], 'created_at': row['created_at'], 'updated_at': row['updated_at'], 'section_count': row['section_count'] }) return jsonify({'projects': projects}) @app.route('/projects', methods=['POST']) @login_required def create_project(): """Create a new project.""" data = request.json name = data.get('name', '').strip() if not name: return jsonify({'error': 'Project name is required'}), 400 db = get_db() cursor = db.cursor() try: cursor.execute('INSERT INTO projects (user_id, name) VALUES (?, ?)', (get_current_user_id(), name)) db.commit() return jsonify({ 'message': 'Project created', 'project_id': cursor.lastrowid, 'name': name }) except sqlite3.IntegrityError: return jsonify({'error': 'Project with this name already exists'}), 400 @app.route('/projects/', methods=['GET']) @login_required def get_project(project_id): """Get a project with all its sections.""" db = get_db() cursor = db.cursor() cursor.execute('SELECT * FROM projects WHERE id = ? AND user_id = ?', (project_id, get_current_user_id())) project = cursor.fetchone() if not project: return jsonify({'error': 'Project not found'}), 404 cursor.execute(''' SELECT id, chapter, section, audio_data, audio_format, text_content, html_content, tts_text, transcription, voice, image_data, image_format, created_at FROM project_sections WHERE project_id = ? ORDER BY chapter, section ''', (project_id,)) sections = cursor.fetchall() section_list = [] for s in sections: html_content = s['html_content'] if 'html_content' in s.keys() else None tts_text = s['tts_text'] if 'tts_text' in s.keys() else None image_data = s['image_data'] if 'image_data' in s.keys() else None image_format = s['image_format'] if 'image_format' in s.keys() else 'png' section_list.append({ 'id': s['id'], 'chapter': s['chapter'], 'section': s['section'], 'audio_data': s['audio_data'], 'audio_format': s['audio_format'] or 'mp3', 'text_content': s['text_content'], 'html_content': html_content, 'tts_text': tts_text, 'transcription': json.loads(s['transcription']) if s['transcription'] else [], 'voice': s['voice'] or 'af_heart', 'image_data': image_data, 'image_format': image_format, 'created_at': s['created_at'], 'has_audio': s['audio_data'] is not None and len(s['audio_data'] or '') > 0, 'has_image': image_data is not None and len(image_data or '') > 0 }) return jsonify({ 'id': project['id'], 'name': project['name'], 'created_at': project['created_at'], 'updated_at': project['updated_at'], 'sections': section_list }) @app.route('/projects/', methods=['DELETE']) @login_required def delete_project(project_id): """Delete a project and all its sections.""" db = get_db() cursor = db.cursor() # Verify ownership cursor.execute('SELECT id FROM projects WHERE id = ? AND user_id = ?', (project_id, get_current_user_id())) if not cursor.fetchone(): return jsonify({'error': 'Project not found'}), 404 cursor.execute('DELETE FROM project_sections WHERE project_id = ?', (project_id,)) cursor.execute('DELETE FROM projects WHERE id = ?', (project_id,)) db.commit() vacuum_db() return jsonify({'message': 'Project deleted successfully'}) @app.route('/projects//sections', methods=['POST']) @login_required def add_or_update_section(project_id): """Add or update a section in a project.""" data = request.json chapter = data.get('chapter', 1) section = data.get('section', 1) text_content = data.get('text', '') html_content = data.get('html_content', '') tts_text = data.get('tts_text', '') voice = data.get('voice', 'af_heart') image_data = data.get('image_data', '') image_format = data.get('image_format', 'png') generate_audio_flag = data.get('generate_audio', True) if not text_content: return jsonify({'error': 'Text content is required'}), 400 db = get_db() cursor = db.cursor() # Verify ownership cursor.execute('SELECT id FROM projects WHERE id = ? AND user_id = ?', (project_id, get_current_user_id())) if not cursor.fetchone(): return jsonify({'error': 'Project not found'}), 404 audio_base64 = None audio_format = 'mp3' transcription_data = [] if generate_audio_flag: try: gen_text = tts_text if tts_text else text_content print(f"🔊 Generating audio for Ch{chapter}.Sec{section}") response = requests.post( f"{TTS_API_URL}/generate-audio", headers={**get_api_headers(), 'Content-Type': 'application/json'}, json={'text': gen_text, 'voice': voice, 'speed': 1.0}, timeout=180 ) if response.status_code != 200: error_detail = response.json().get('error', 'Unknown error') return jsonify({'error': f'Generate Audio API Error: {error_detail}'}), response.status_code result = response.json() audio_base64 = result.get('audio_base64', '') source_format = result.get('audio_format', 'wav') if audio_base64: audio_bytes = base64.b64decode(audio_base64) files = {'audio_file': (f'audio.{source_format}', audio_bytes)} ts_data = {'text': gen_text} ts_response = requests.post( f"{TTS_API_URL}/timestamp", headers=get_api_headers(), files=files, data=ts_data, timeout=120 ) if ts_response.status_code == 200: ts_result = ts_response.json() transcription_data = ts_result.get('timestamps', []) if source_format != 'mp3': print("🔄 Converting to MP3...") audio_base64 = convert_to_mp3(audio_base64, source_format) except Exception as e: print(f"Error generating audio: {e}") return jsonify({'error': str(e)}), 500 cursor.execute(''' INSERT INTO project_sections (project_id, chapter, section, audio_data, audio_format, text_content, html_content, tts_text, transcription, voice, image_data, image_format) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(project_id, chapter, section) DO UPDATE SET audio_data = excluded.audio_data, audio_format = excluded.audio_format, text_content = excluded.text_content, html_content = excluded.html_content, tts_text = excluded.tts_text, transcription = excluded.transcription, voice = excluded.voice, image_data = excluded.image_data, image_format = excluded.image_format ''', ( project_id, chapter, section, audio_base64, audio_format, text_content, html_content, tts_text, json.dumps(transcription_data), voice, image_data if image_data else None, image_format )) cursor.execute('UPDATE projects SET updated_at = CURRENT_TIMESTAMP WHERE id = ?', (project_id,)) db.commit() section_id = cursor.lastrowid return jsonify({ 'message': 'Section saved', 'section_id': section_id, 'chapter': chapter, 'section': section, 'audio_data': audio_base64, 'audio_format': audio_format, 'text_content': text_content, 'html_content': html_content, 'tts_text': tts_text, 'transcription': transcription_data, 'voice': voice, 'image_data': image_data, 'image_format': image_format }) @app.route('/projects//sections/save', methods=['POST']) @login_required def save_section_directly(project_id): """Save a section directly without generating audio.""" data = request.json chapter = data.get('chapter', 1) section = data.get('section', 1) text_content = data.get('text', '') html_content = data.get('html_content', '') tts_text = data.get('tts_text', '') voice = data.get('voice', 'af_heart') audio_data = data.get('audio_data', '') audio_format = data.get('audio_format', 'mp3') transcription = data.get('transcription', []) image_data = data.get('image_data', '') image_format = data.get('image_format', 'png') db = get_db() cursor = db.cursor() # Verify ownership cursor.execute('SELECT id FROM projects WHERE id = ? AND user_id = ?', (project_id, get_current_user_id())) if not cursor.fetchone(): return jsonify({'error': 'Project not found'}), 404 print(f"💾 Direct save: Ch{chapter}.Sec{section} to project {project_id}") cursor.execute(''' INSERT INTO project_sections (project_id, chapter, section, audio_data, audio_format, text_content, html_content, tts_text, transcription, voice, image_data, image_format) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(project_id, chapter, section) DO UPDATE SET audio_data = CASE WHEN excluded.audio_data IS NOT NULL AND excluded.audio_data != '' THEN excluded.audio_data ELSE project_sections.audio_data END, audio_format = excluded.audio_format, text_content = excluded.text_content, html_content = excluded.html_content, tts_text = excluded.tts_text, transcription = CASE WHEN excluded.transcription IS NOT NULL AND excluded.transcription != '[]' THEN excluded.transcription ELSE project_sections.transcription END, voice = excluded.voice, image_data = CASE WHEN excluded.image_data IS NOT NULL AND excluded.image_data != '' THEN excluded.image_data ELSE project_sections.image_data END, image_format = CASE WHEN excluded.image_format IS NOT NULL THEN excluded.image_format ELSE project_sections.image_format END ''', ( project_id, chapter, section, audio_data if audio_data else None, audio_format, text_content, html_content, tts_text, json.dumps(transcription) if transcription else '[]', voice, image_data if image_data else None, image_format )) cursor.execute('UPDATE projects SET updated_at = CURRENT_TIMESTAMP WHERE id = ?', (project_id,)) db.commit() return jsonify({ 'message': 'Section saved', 'chapter': chapter, 'section': section }) @app.route('/projects//save_all', methods=['POST']) @login_required def save_all_sections(project_id): """Save all sections at once.""" data = request.json sections = data.get('sections', []) db = get_db() cursor = db.cursor() # Verify ownership cursor.execute('SELECT id FROM projects WHERE id = ? AND user_id = ?', (project_id, get_current_user_id())) if not cursor.fetchone(): return jsonify({'error': 'Project not found'}), 404 saved_count = 0 for sec in sections: chapter = sec.get('chapter', 1) section = sec.get('section', 1) text_content = sec.get('text', '') html_content = sec.get('htmlContent') or sec.get('html_content', '') tts_text = sec.get('ttsText') or sec.get('tts_text', '') voice = sec.get('voice', 'af_heart') audio_data = sec.get('audioData') or sec.get('audio_data', '') audio_format = sec.get('audioFormat') or sec.get('audio_format', 'mp3') transcription = sec.get('transcription', []) image_data = sec.get('imageData') or sec.get('image_data', '') image_format = sec.get('imageFormat') or sec.get('image_format', 'png') print(f"💾 Saving Ch{chapter}.Sec{section} (image: {len(image_data) if image_data else 0} bytes)") cursor.execute(''' INSERT INTO project_sections (project_id, chapter, section, audio_data, audio_format, text_content, html_content, tts_text, transcription, voice, image_data, image_format) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(project_id, chapter, section) DO UPDATE SET audio_data = CASE WHEN excluded.audio_data IS NOT NULL AND excluded.audio_data != '' THEN excluded.audio_data ELSE project_sections.audio_data END, audio_format = excluded.audio_format, text_content = excluded.text_content, html_content = excluded.html_content, tts_text = excluded.tts_text, transcription = CASE WHEN excluded.transcription IS NOT NULL AND excluded.transcription != '[]' THEN excluded.transcription ELSE project_sections.transcription END, voice = excluded.voice, image_data = CASE WHEN excluded.image_data IS NOT NULL AND excluded.image_data != '' THEN excluded.image_data ELSE project_sections.image_data END, image_format = CASE WHEN excluded.image_format IS NOT NULL AND excluded.image_format != '' THEN excluded.image_format ELSE project_sections.image_format END ''', ( project_id, chapter, section, audio_data if audio_data else None, audio_format, text_content, html_content, tts_text, json.dumps(transcription) if transcription else '[]', voice, image_data if image_data else None, image_format )) saved_count += 1 cursor.execute('UPDATE projects SET updated_at = CURRENT_TIMESTAMP WHERE id = ?', (project_id,)) db.commit() print(f"✅ Saved {saved_count} sections to project {project_id}") return jsonify({ 'message': f'Saved {saved_count} sections', 'saved_count': saved_count }) @app.route('/projects//sections/', methods=['GET']) @login_required def get_section(project_id, section_id): """Get a specific section with audio data.""" db = get_db() cursor = db.cursor() # Verify ownership cursor.execute('SELECT id FROM projects WHERE id = ? AND user_id = ?', (project_id, get_current_user_id())) if not cursor.fetchone(): return jsonify({'error': 'Project not found'}), 404 cursor.execute(''' SELECT * FROM project_sections WHERE id = ? AND project_id = ? ''', (section_id, project_id)) row = cursor.fetchone() if not row: return jsonify({'error': 'Section not found'}), 404 html_content = row['html_content'] if 'html_content' in row.keys() else None tts_text = row['tts_text'] if 'tts_text' in row.keys() else None image_data = row['image_data'] if 'image_data' in row.keys() else None image_format = row['image_format'] if 'image_format' in row.keys() else 'png' return jsonify({ 'id': row['id'], 'chapter': row['chapter'], 'section': row['section'], 'audio_data': row['audio_data'], 'audio_format': row['audio_format'] or 'mp3', 'text_content': row['text_content'], 'html_content': html_content, 'tts_text': tts_text, 'transcription': json.loads(row['transcription']) if row['transcription'] else [], 'voice': row['voice'] or 'af_heart', 'image_data': image_data, 'image_format': image_format, 'created_at': row['created_at'] }) @app.route('/projects//sections/', methods=['DELETE']) @login_required def delete_section(project_id, section_id): """Delete a section.""" db = get_db() cursor = db.cursor() # Verify ownership cursor.execute('SELECT id FROM projects WHERE id = ? AND user_id = ?', (project_id, get_current_user_id())) if not cursor.fetchone(): return jsonify({'error': 'Project not found'}), 404 cursor.execute(''' DELETE FROM project_sections WHERE id = ? AND project_id = ? ''', (section_id, project_id)) db.commit() if cursor.rowcount == 0: return jsonify({'error': 'Section not found'}), 404 vacuum_db() return jsonify({'message': 'Section deleted successfully'}) @app.route('/projects//download', methods=['GET']) @login_required def download_project(project_id): """Download entire project as zip file (simple format).""" db = get_db() cursor = db.cursor() # Verify ownership cursor.execute('SELECT * FROM projects WHERE id = ? AND user_id = ?', (project_id, get_current_user_id())) project = cursor.fetchone() if not project: return jsonify({'error': 'Project not found'}), 404 cursor.execute(''' SELECT * FROM project_sections WHERE project_id = ? ORDER BY chapter, section ''', (project_id,)) sections = cursor.fetchall() zip_buffer = io.BytesIO() project_name = "".join(c for c in project['name'] if c.isalnum() or c in ('_', '-', ' ')) with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf: for s in sections: file_prefix = f"{s['chapter']}.{s['section']}_{project_name}" if s['audio_data']: audio_bytes = base64.b64decode(s['audio_data']) zf.writestr(f"{file_prefix}.{s['audio_format'] or 'mp3'}", audio_bytes) zf.writestr(f"{file_prefix}.txt", s['text_content'] or '') if s['transcription']: zf.writestr(f"{file_prefix}.json", s['transcription']) image_data = s['image_data'] if 'image_data' in s.keys() else None image_format = s['image_format'] if 'image_format' in s.keys() else 'png' if image_data: image_bytes = base64.b64decode(image_data) zf.writestr(f"{file_prefix}.{image_format}", image_bytes) zip_buffer.seek(0) return send_file( zip_buffer, mimetype='application/zip', as_attachment=True, download_name=f"{project_name}.zip" ) # --- BULK EXPORT WITH MANIFEST AND READER --- def get_reader_template(filename): """Load reader template file.""" template_path = os.path.join(READER_TEMPLATES_DIR, filename) if os.path.exists(template_path): with open(template_path, 'r', encoding='utf-8') as f: return f.read() return None @app.route('/export_bulk', methods=['POST']) @login_required def export_bulk(): """Export bulk project files with manifest.json, index.html, and Reader.html.""" try: data = request.json project_name = data.get('projectName', 'Book-1').strip() files = data.get('files', []) print(f"📦 Export bulk: project={project_name}, files={len(files)}") if not files: return jsonify({'error': 'No files to export'}), 400 db = get_db() cursor = db.cursor() # Create or get project for current user cursor.execute('SELECT id FROM projects WHERE name = ? AND user_id = ?', (project_name, get_current_user_id())) row = cursor.fetchone() if row: project_id = row['id'] print(f"📁 Using existing project ID: {project_id}") else: cursor.execute('INSERT INTO projects (user_id, name) VALUES (?, ?)', (get_current_user_id(), project_name)) db.commit() project_id = cursor.lastrowid print(f"📁 Created new project ID: {project_id}") # Save all sections to database for file_item in files: chapter = file_item.get('chapter', 0) section = file_item.get('section', 0) text = file_item.get('text', '') html_content = file_item.get('htmlContent') or file_item.get('html_content', '') tts_text = file_item.get('ttsText') or file_item.get('tts_text', '') transcription = file_item.get('transcription', []) audio_data = file_item.get('audioData') or file_item.get('audio_data', '') audio_format = file_item.get('audioFormat') or file_item.get('audio_format', 'mp3') voice = file_item.get('voice', 'af_heart') image_data = file_item.get('imageData') or file_item.get('image_data', '') image_format = file_item.get('imageFormat') or file_item.get('image_format', 'png') print(f" 💾 Saving Ch{chapter}.Sec{section}") cursor.execute(''' INSERT INTO project_sections (project_id, chapter, section, audio_data, audio_format, text_content, html_content, tts_text, transcription, voice, image_data, image_format) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(project_id, chapter, section) DO UPDATE SET audio_data = CASE WHEN excluded.audio_data IS NOT NULL AND excluded.audio_data != '' THEN excluded.audio_data ELSE project_sections.audio_data END, audio_format = excluded.audio_format, text_content = excluded.text_content, html_content = excluded.html_content, tts_text = excluded.tts_text, transcription = CASE WHEN excluded.transcription IS NOT NULL AND excluded.transcription != '[]' THEN excluded.transcription ELSE project_sections.transcription END, voice = excluded.voice, image_data = CASE WHEN excluded.image_data IS NOT NULL AND excluded.image_data != '' THEN excluded.image_data ELSE project_sections.image_data END, image_format = CASE WHEN excluded.image_format IS NOT NULL AND excluded.image_format != '' THEN excluded.image_format ELSE project_sections.image_format END ''', ( project_id, chapter, section, audio_data if audio_data else None, audio_format, text, html_content, tts_text, json.dumps(transcription) if transcription else '[]', voice, image_data if image_data else None, image_format )) cursor.execute('UPDATE projects SET updated_at = CURRENT_TIMESTAMP WHERE id = ?', (project_id,)) db.commit() print(f"✅ Saved {len(files)} sections to project {project_id}") # Fetch all sections for export cursor.execute('SELECT * FROM project_sections WHERE project_id = ? ORDER BY chapter, section', (project_id,)) sections = cursor.fetchall() # Create ZIP with proper structure zip_buffer = io.BytesIO() safe_name = "".join(c for c in project_name if c.isalnum() or c in ('_', '-', ' ')) # Build manifest assets list manifest_assets = [] with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf: for s in sections: chapter = s['chapter'] section = s['section'] file_prefix = f"{chapter}.{section}_{safe_name}" asset_entry = { "prefix": f"{chapter}.{section}_", "textFile": f"book/{file_prefix}.txt", "audioFile": None, "jsonFile": None, "imageFile": None } # Add text file zf.writestr(f"book/{file_prefix}.txt", s['text_content'] or '') # Add audio file if s['audio_data']: audio_bytes = base64.b64decode(s['audio_data']) audio_ext = s['audio_format'] or 'mp3' zf.writestr(f"book/{file_prefix}.{audio_ext}", audio_bytes) asset_entry["audioFile"] = f"book/{file_prefix}.{audio_ext}" # Add JSON timestamps if s['transcription'] and s['transcription'] != '[]': zf.writestr(f"book/{file_prefix}.json", s['transcription']) asset_entry["jsonFile"] = f"book/{file_prefix}.json" # Add image file image_data = s['image_data'] if 'image_data' in s.keys() else None image_format = s['image_format'] if 'image_format' in s.keys() else 'png' if image_data: image_bytes = base64.b64decode(image_data) zf.writestr(f"book/{file_prefix}.{image_format}", image_bytes) asset_entry["imageFile"] = f"book/{file_prefix}.{image_format}" manifest_assets.append(asset_entry) # Create manifest.json manifest = { "title": project_name, "assets": manifest_assets } zf.writestr("manifest.json", json.dumps(manifest, indent=2)) # Add index.html index_html = get_reader_template('index.html') if index_html: zf.writestr("index.html", index_html) else: print("⚠️ index.html template not found") # Add Reader.html reader_html = get_reader_template('Reader.html') if reader_html: zf.writestr("Reader.html", reader_html) else: print("⚠️ Reader.html template not found") zip_buffer.seek(0) print(f"📤 Sending zip: {safe_name}.zip") return send_file( zip_buffer, mimetype='application/zip', as_attachment=True, download_name=f"{safe_name}.zip" ) except Exception as e: import traceback traceback.print_exc() return jsonify({'error': str(e)}), 500 # --- LEGACY SINGLE EXPORT --- @app.route('/export', methods=['POST']) @login_required def export_project(): """Export single project files as zip.""" try: data = request.json base_name = data.get('filename', '').strip() if not base_name: base_name = 'task-1' base_name = "".join(c for c in base_name if c.isalnum() or c in ('_', '-')) text = data.get('text', '') json_data = data.get('transcription', []) audio_data = data.get('audio_data', '') audio_format = data.get('audio_format', 'mp3') zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf: zf.writestr(f"{base_name}.txt", text) zf.writestr(f"{base_name}.json", json.dumps(json_data, indent=2)) if audio_data: audio_bytes = base64.b64decode(audio_data) zf.writestr(f"{base_name}.{audio_format}", audio_bytes) zip_buffer.seek(0) return send_file( zip_buffer, mimetype='application/zip', as_attachment=True, download_name=f"{base_name}.zip" ) except Exception as e: import traceback traceback.print_exc() return jsonify({'error': str(e)}), 500 # --- DATABASE STATS --- @app.route('/db/stats', methods=['GET']) @login_required def db_stats(): """Get database statistics for current user.""" db = get_db() cursor = db.cursor() user_id = get_current_user_id() cursor.execute('SELECT COUNT(*) as count FROM uploads WHERE user_id = ?', (user_id,)) uploads_count = cursor.fetchone()['count'] cursor.execute('SELECT COUNT(*) as count FROM generations WHERE user_id = ?', (user_id,)) generations_count = cursor.fetchone()['count'] cursor.execute('SELECT COUNT(*) as count FROM projects WHERE user_id = ?', (user_id,)) projects_count = cursor.fetchone()['count'] cursor.execute(''' SELECT COUNT(*) as count FROM project_sections WHERE project_id IN (SELECT id FROM projects WHERE user_id = ?) ''', (user_id,)) sections_count = cursor.fetchone()['count'] db_size = os.path.getsize(DATABASE) if os.path.exists(DATABASE) else 0 return jsonify({ 'uploads': uploads_count, 'generations': generations_count, 'projects': projects_count, 'sections': sections_count, 'database_size_bytes': db_size, 'database_size_mb': round(db_size / (1024 * 1024), 2) }) # --- HEALTH CHECK ENDPOINT --- @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint for container orchestration.""" try: # Check database connection with get_db_connection() as conn: conn.execute('SELECT 1') return jsonify({ 'status': 'healthy', 'database': 'connected', 'database_path': DATABASE }), 200 except Exception as e: return jsonify({ 'status': 'unhealthy', 'error': str(e) }), 500 if __name__ == '__main__': print("=" * 60) print("🚀 Audio Transcription Editor Starting...") print("=" * 60) print(f"📍 API Server: {TTS_API_URL}") print(f"📍 API Key: {'✅ Configured' if TTS_API_KEY else '❌ NOT CONFIGURED!'}") print(f"📍 Database Directory: {DATABASE_DIR}") print(f"📍 Database File: {DATABASE}") print(f"📍 Static Files: {app.static_folder}") print(f"📍 Reader Templates: {READER_TEMPLATES_DIR}") print("-" * 60) print("🔐 Default Admin: admin / admin123") print("-" * 60) # Create reader_templates directory if it doesn't exist if not os.path.exists(READER_TEMPLATES_DIR): os.makedirs(READER_TEMPLATES_DIR) print(f"📁 Created reader_templates directory") if not TTS_API_KEY: print("⚠️ WARNING: TTS_API_KEY not set in .env file!") print("=" * 60) app.run(debug=True, port=5009, host='0.0.0.0')