Files
Ashim Kumar 11d715eb85 first commit
2026-01-09 21:06:30 +06:00

1779 lines
64 KiB
Python

import os
import json
import uuid
import base64
import sqlite3
import io
import zipfile
from datetime import datetime
from functools import wraps
from contextlib import contextmanager
# --- ENV SETUP ---
from dotenv import load_dotenv
load_dotenv()
import requests
from flask import Flask, request, jsonify, send_from_directory, send_file, g, session, redirect, url_for, render_template
app = Flask(__name__, static_folder='static', static_url_path='/static')
# --- APP CONFIG ---
# Database path - use environment variable or default to /app/data for container
DATABASE_DIR = os.getenv('DATABASE_DIR', '/app/data')
# Ensure database directory exists with proper error handling
def ensure_db_directory():
"""Ensure database directory exists and is writable."""
global DATABASE_DIR
# Try the configured directory first
try:
os.makedirs(DATABASE_DIR, exist_ok=True)
# Test if we can write
test_file = os.path.join(DATABASE_DIR, '.write_test')
with open(test_file, 'w') as f:
f.write('test')
os.remove(test_file)
print(f"✅ Database directory ready: {DATABASE_DIR}")
return True
except (PermissionError, OSError) as e:
print(f"⚠️ Cannot use {DATABASE_DIR}: {e}")
# Fallback to /app/data
fallback_dir = '/app/data'
try:
os.makedirs(fallback_dir, exist_ok=True)
test_file = os.path.join(fallback_dir, '.write_test')
with open(test_file, 'w') as f:
f.write('test')
os.remove(test_file)
DATABASE_DIR = fallback_dir
print(f"✅ Using fallback database directory: {DATABASE_DIR}")
return True
except (PermissionError, OSError) as e:
print(f"⚠️ Cannot use fallback {fallback_dir}: {e}")
# Last resort: current directory
fallback_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data')
try:
os.makedirs(fallback_dir, exist_ok=True)
DATABASE_DIR = fallback_dir
print(f"✅ Using local data directory: {DATABASE_DIR}")
return True
except Exception as e:
print(f"❌ All directory attempts failed: {e}")
return False
# Initialize database directory
ensure_db_directory()
DATABASE = os.path.join(DATABASE_DIR, 'audio_editor.db')
app.secret_key = os.getenv('SECRET_KEY', 'your-secret-key-change-in-production-' + str(uuid.uuid4()))
# --- API CONFIG ---
TTS_API_URL = os.getenv('TTS_API_URL', 'http://localhost:5010/api/v1')
TTS_API_KEY = os.getenv('TTS_API_KEY', '')
# --- READER TEMPLATE FILES ---
READER_TEMPLATES_DIR = os.path.join(os.path.dirname(__file__), 'reader_templates')
def get_api_headers():
"""Get headers for API requests."""
return {
'X-API-Key': TTS_API_KEY
}
# --- AUDIO CONVERSION ---
def convert_to_mp3(audio_base64, source_format='wav'):
"""Convert audio from any format to MP3 base64."""
try:
from pydub import AudioSegment
audio_bytes = base64.b64decode(audio_base64)
audio_buffer = io.BytesIO(audio_bytes)
audio = AudioSegment.from_file(audio_buffer, format=source_format)
mp3_buffer = io.BytesIO()
audio.export(mp3_buffer, format='mp3', bitrate='192k')
mp3_buffer.seek(0)
mp3_base64 = base64.b64encode(mp3_buffer.read()).decode('utf-8')
return mp3_base64
except ImportError:
print("⚠️ pydub not installed, returning original format")
return audio_base64
except Exception as e:
print(f"⚠️ MP3 conversion failed: {e}, returning original")
return audio_base64
# --- DATABASE SETUP ---
def get_db():
"""Get database connection for current request."""
if 'db' not in g:
g.db = sqlite3.connect(DATABASE)
g.db.row_factory = sqlite3.Row
return g.db
@app.teardown_appcontext
def close_db(error):
"""Close database connection at end of request."""
db = g.pop('db', None)
if db is not None:
db.close()
@contextmanager
def get_db_connection():
"""Context manager for database connections outside request context."""
conn = sqlite3.connect(DATABASE)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
def init_db():
"""Initialize database tables."""
with get_db_connection() as conn:
cursor = conn.cursor()
# Users table
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
password TEXT NOT NULL,
is_admin INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_login TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS uploads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
filename TEXT NOT NULL,
audio_data TEXT NOT NULL,
audio_format TEXT DEFAULT 'mp3',
text_content TEXT NOT NULL,
transcription TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
name TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
UNIQUE(user_id, name)
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS project_sections (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id INTEGER NOT NULL,
chapter INTEGER NOT NULL,
section INTEGER NOT NULL,
audio_data TEXT,
audio_format TEXT DEFAULT 'mp3',
text_content TEXT NOT NULL,
html_content TEXT,
tts_text TEXT,
transcription TEXT,
voice TEXT DEFAULT 'af_heart',
image_data TEXT,
image_format TEXT DEFAULT 'png',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE,
UNIQUE(project_id, chapter, section)
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS generations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
name TEXT,
audio_data TEXT NOT NULL,
audio_format TEXT DEFAULT 'mp3',
text_content TEXT NOT NULL,
transcription TEXT,
voice TEXT DEFAULT 'af_heart',
speed REAL DEFAULT 1.0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
)
''')
conn.commit()
# Create default admin user if not exists
cursor.execute("SELECT id FROM users WHERE username = 'admin'")
if not cursor.fetchone():
cursor.execute('''
INSERT INTO users (username, password, is_admin) VALUES (?, ?, ?)
''', ('admin', 'admin123', 1))
conn.commit()
print("📦 Created default admin user (admin/admin123)")
# Migration: Add user_id columns if they don't exist
migrations = [
("user_id", "uploads", "ALTER TABLE uploads ADD COLUMN user_id INTEGER DEFAULT 1"),
("user_id", "projects", "ALTER TABLE projects ADD COLUMN user_id INTEGER DEFAULT 1"),
("user_id", "generations", "ALTER TABLE generations ADD COLUMN user_id INTEGER DEFAULT 1"),
("html_content", "project_sections", "ALTER TABLE project_sections ADD COLUMN html_content TEXT"),
("tts_text", "project_sections", "ALTER TABLE project_sections ADD COLUMN tts_text TEXT"),
("image_data", "project_sections", "ALTER TABLE project_sections ADD COLUMN image_data TEXT"),
("image_format", "project_sections", "ALTER TABLE project_sections ADD COLUMN image_format TEXT DEFAULT 'png'"),
]
for col_name, table_name, sql in migrations:
try:
cursor.execute(f"SELECT {col_name} FROM {table_name} LIMIT 1")
except sqlite3.OperationalError:
print(f"📦 Adding {col_name} column to {table_name}...")
cursor.execute(sql)
conn.commit()
def vacuum_db():
"""Run VACUUM to reclaim space after deletions."""
with get_db_connection() as conn:
conn.execute('VACUUM')
# Initialize database on startup
print(f"📁 Database directory: {DATABASE_DIR}")
print(f"📁 Database file: {DATABASE}")
init_db()
# --- AUTHENTICATION DECORATORS ---
def login_required(f):
"""Decorator to require login for a route."""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
if request.is_json or request.headers.get('Accept') == 'application/json':
return jsonify({'error': 'Authentication required'}), 401
return redirect(url_for('login_page'))
return f(*args, **kwargs)
return decorated_function
def admin_required(f):
"""Decorator to require admin access for a route."""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
if request.is_json or request.headers.get('Accept') == 'application/json':
return jsonify({'error': 'Authentication required'}), 401
return redirect(url_for('login_page'))
if not session.get('is_admin'):
if request.is_json or request.headers.get('Accept') == 'application/json':
return jsonify({'error': 'Admin access required'}), 403
return redirect(url_for('index'))
return f(*args, **kwargs)
return decorated_function
def get_current_user_id():
"""Get the current logged-in user's ID."""
return session.get('user_id')
# --- AUTH ROUTES ---
@app.route('/login', methods=['GET'])
def login_page():
"""Render login page."""
if 'user_id' in session:
return redirect(url_for('index'))
return send_from_directory('templates', 'login.html')
@app.route('/api/login', methods=['POST'])
def api_login():
"""Handle login API request."""
data = request.json
username = data.get('username', '').strip()
password = data.get('password', '')
if not username or not password:
return jsonify({'error': 'Username and password required'}), 400
db = get_db()
cursor = db.cursor()
cursor.execute('SELECT id, username, password, is_admin FROM users WHERE username = ?', (username,))
user = cursor.fetchone()
if not user or user['password'] != password:
return jsonify({'error': 'Invalid username or password'}), 401
# Update last login
cursor.execute('UPDATE users SET last_login = CURRENT_TIMESTAMP WHERE id = ?', (user['id'],))
db.commit()
# Set session
session['user_id'] = user['id']
session['username'] = user['username']
session['is_admin'] = bool(user['is_admin'])
return jsonify({
'message': 'Login successful',
'user': {
'id': user['id'],
'username': user['username'],
'is_admin': bool(user['is_admin'])
}
})
@app.route('/api/logout', methods=['POST'])
def api_logout():
"""Handle logout API request."""
session.clear()
return jsonify({'message': 'Logged out successfully'})
@app.route('/logout')
def logout():
"""Logout and redirect to login page."""
session.clear()
return redirect(url_for('login_page'))
@app.route('/api/me', methods=['GET'])
@login_required
def api_me():
"""Get current user info."""
return jsonify({
'user': {
'id': session.get('user_id'),
'username': session.get('username'),
'is_admin': session.get('is_admin', False)
}
})
# --- ADMIN ROUTES ---
@app.route('/admin')
@admin_required
def admin_dashboard():
"""Render admin dashboard."""
return send_from_directory('templates', 'admin.html')
@app.route('/api/admin/users', methods=['GET'])
@admin_required
def list_users():
"""List all users."""
db = get_db()
cursor = db.cursor()
cursor.execute('''
SELECT u.id, u.username, u.is_admin, u.created_at, u.last_login,
(SELECT COUNT(*) FROM projects WHERE user_id = u.id) as project_count
FROM users u
ORDER BY u.created_at DESC
''')
users = cursor.fetchall()
return jsonify({
'users': [{
'id': u['id'],
'username': u['username'],
'is_admin': bool(u['is_admin']),
'created_at': u['created_at'],
'last_login': u['last_login'],
'project_count': u['project_count']
} for u in users]
})
@app.route('/api/admin/users', methods=['POST'])
@admin_required
def create_user():
"""Create a new user."""
data = request.json
username = data.get('username', '').strip()
password = data.get('password', '')
is_admin = data.get('is_admin', False)
if not username or not password:
return jsonify({'error': 'Username and password required'}), 400
if len(username) < 3:
return jsonify({'error': 'Username must be at least 3 characters'}), 400
if len(password) < 4:
return jsonify({'error': 'Password must be at least 4 characters'}), 400
db = get_db()
cursor = db.cursor()
try:
cursor.execute('''
INSERT INTO users (username, password, is_admin) VALUES (?, ?, ?)
''', (username, password, 1 if is_admin else 0))
db.commit()
return jsonify({
'message': 'User created successfully',
'user_id': cursor.lastrowid
})
except sqlite3.IntegrityError:
return jsonify({'error': 'Username already exists'}), 400
@app.route('/api/admin/users/<int:user_id>', methods=['PUT'])
@admin_required
def update_user(user_id):
"""Update a user."""
data = request.json
db = get_db()
cursor = db.cursor()
# Check if user exists
cursor.execute('SELECT id, username FROM users WHERE id = ?', (user_id,))
user = cursor.fetchone()
if not user:
return jsonify({'error': 'User not found'}), 404
# Prevent modifying own admin status
if user_id == session.get('user_id') and 'is_admin' in data:
return jsonify({'error': 'Cannot modify your own admin status'}), 400
updates = []
params = []
if 'username' in data and data['username'].strip():
updates.append('username = ?')
params.append(data['username'].strip())
if 'password' in data and data['password']:
updates.append('password = ?')
params.append(data['password'])
if 'is_admin' in data:
updates.append('is_admin = ?')
params.append(1 if data['is_admin'] else 0)
if not updates:
return jsonify({'error': 'No updates provided'}), 400
params.append(user_id)
try:
cursor.execute(f"UPDATE users SET {', '.join(updates)} WHERE id = ?", params)
db.commit()
return jsonify({'message': 'User updated successfully'})
except sqlite3.IntegrityError:
return jsonify({'error': 'Username already exists'}), 400
@app.route('/api/admin/users/<int:user_id>', methods=['DELETE'])
@admin_required
def delete_user(user_id):
"""Delete a user."""
# Prevent self-deletion
if user_id == session.get('user_id'):
return jsonify({'error': 'Cannot delete your own account'}), 400
db = get_db()
cursor = db.cursor()
cursor.execute('SELECT id FROM users WHERE id = ?', (user_id,))
if not cursor.fetchone():
return jsonify({'error': 'User not found'}), 404
# Delete user and all related data (cascade)
cursor.execute('DELETE FROM project_sections WHERE project_id IN (SELECT id FROM projects WHERE user_id = ?)', (user_id,))
cursor.execute('DELETE FROM projects WHERE user_id = ?', (user_id,))
cursor.execute('DELETE FROM uploads WHERE user_id = ?', (user_id,))
cursor.execute('DELETE FROM generations WHERE user_id = ?', (user_id,))
cursor.execute('DELETE FROM users WHERE id = ?', (user_id,))
db.commit()
vacuum_db()
return jsonify({'message': 'User deleted successfully'})
@app.route('/api/admin/stats', methods=['GET'])
@admin_required
def admin_stats():
"""Get admin dashboard statistics."""
db = get_db()
cursor = db.cursor()
cursor.execute('SELECT COUNT(*) as count FROM users')
user_count = cursor.fetchone()['count']
cursor.execute('SELECT COUNT(*) as count FROM projects')
project_count = cursor.fetchone()['count']
cursor.execute('SELECT COUNT(*) as count FROM project_sections')
section_count = cursor.fetchone()['count']
cursor.execute('SELECT COUNT(*) as count FROM uploads')
upload_count = cursor.fetchone()['count']
cursor.execute('SELECT COUNT(*) as count FROM generations')
generation_count = cursor.fetchone()['count']
db_size = os.path.getsize(DATABASE) if os.path.exists(DATABASE) else 0
return jsonify({
'users': user_count,
'projects': project_count,
'sections': section_count,
'uploads': upload_count,
'generations': generation_count,
'database_size_mb': round(db_size / (1024 * 1024), 2)
})
# --- ROUTES ---
@app.route('/')
@login_required
def index():
return send_from_directory('templates', 'index.html')
@app.route('/static/<path:filename>')
def serve_static(filename):
"""Serve static files (CSS, JS)."""
return send_from_directory('static', filename)
@app.route('/api_status', methods=['GET'])
@login_required
def api_status():
"""Check API server status."""
try:
response = requests.get(f"{TTS_API_URL}/health", timeout=10)
if response.status_code == 200:
health_data = response.json()
return jsonify({
'connected': True,
'api_url': TTS_API_URL,
'status': health_data.get('status'),
'services': health_data.get('services'),
'device': health_data.get('device')
})
else:
return jsonify({
'connected': False,
'api_url': TTS_API_URL,
'error': f'Status code: {response.status_code}'
})
except Exception as e:
return jsonify({
'connected': False,
'api_url': TTS_API_URL,
'error': str(e)
})
@app.route('/voices', methods=['GET'])
@login_required
def get_voices():
"""Get available voices from the API."""
try:
response = requests.get(
f"{TTS_API_URL}/voices",
headers=get_api_headers(),
timeout=30
)
if response.status_code == 200:
result = response.json()
return jsonify({'voices': result.get('voices', [])})
else:
return jsonify({'voices': [
{'id': 'af_heart', 'name': 'Heart (US Fem)'},
{'id': 'af_bella', 'name': 'Bella (US Fem)'},
{'id': 'am_adam', 'name': 'Adam (US Masc)'}
]})
except:
return jsonify({'voices': [
{'id': 'af_heart', 'name': 'Heart (US Fem)'}
]})
# --- UPLOAD HANDLING ---
@app.route('/upload', methods=['POST'])
@login_required
def upload_files():
"""Upload audio and text files, get timestamps using the API."""
if 'audioFile' not in request.files or 'txtFile' not in request.files:
return jsonify({'error': 'Missing files'}), 400
audio_file = request.files['audioFile']
txt_file = request.files['txtFile']
if not audio_file or not txt_file:
return jsonify({'error': 'Invalid files'}), 400
audio_bytes = audio_file.read()
audio_base64 = base64.b64encode(audio_bytes).decode('utf-8')
source_format = audio_file.filename.rsplit('.', 1)[1].lower() if '.' in audio_file.filename else 'wav'
text_content = txt_file.read().decode('utf-8')
try:
print("⏳ Calling Timestamp API...")
files = {'audio_file': (audio_file.filename, audio_bytes)}
data = {'text': text_content}
response = requests.post(
f"{TTS_API_URL}/timestamp",
headers=get_api_headers(),
files=files,
data=data,
timeout=120
)
if response.status_code != 200:
error_detail = response.json().get('error', 'Unknown error')
return jsonify({'error': f'API Error: {error_detail}'}), response.status_code
result = response.json()
transcription_data = result.get('timestamps', [])
if source_format != 'mp3':
print("🔄 Converting to MP3...")
audio_base64 = convert_to_mp3(audio_base64, source_format)
db = get_db()
cursor = db.cursor()
cursor.execute('''
INSERT INTO uploads (user_id, filename, audio_data, audio_format, text_content, transcription)
VALUES (?, ?, ?, ?, ?, ?)
''', (
get_current_user_id(),
audio_file.filename,
audio_base64,
'mp3',
text_content,
json.dumps(transcription_data)
))
db.commit()
upload_id = cursor.lastrowid
print(f"✅ Upload saved with ID: {upload_id}")
return jsonify({
'message': 'Success',
'upload_id': upload_id,
'audio_data': audio_base64,
'audio_format': 'mp3',
'text_content': text_content,
'transcription': transcription_data
})
except requests.exceptions.RequestException as e:
return jsonify({'error': f'API connection error: {str(e)}'}), 500
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({'error': str(e)}), 500
@app.route('/uploads', methods=['GET'])
@login_required
def list_uploads():
"""List all uploaded files for current user."""
db = get_db()
cursor = db.cursor()
cursor.execute('''
SELECT id, filename, audio_format, created_at,
LENGTH(audio_data) as audio_size,
LENGTH(text_content) as text_size
FROM uploads
WHERE user_id = ?
ORDER BY created_at DESC
''', (get_current_user_id(),))
rows = cursor.fetchall()
uploads = []
for row in rows:
uploads.append({
'id': row['id'],
'filename': row['filename'],
'audio_format': row['audio_format'],
'created_at': row['created_at'],
'audio_size': row['audio_size'],
'text_size': row['text_size']
})
return jsonify({'uploads': uploads})
@app.route('/uploads/<int:upload_id>', methods=['GET'])
@login_required
def get_upload(upload_id):
"""Get a specific upload."""
db = get_db()
cursor = db.cursor()
cursor.execute('SELECT * FROM uploads WHERE id = ? AND user_id = ?', (upload_id, get_current_user_id()))
row = cursor.fetchone()
if not row:
return jsonify({'error': 'Upload not found'}), 404
return jsonify({
'id': row['id'],
'filename': row['filename'],
'audio_data': row['audio_data'],
'audio_format': row['audio_format'],
'text_content': row['text_content'],
'transcription': json.loads(row['transcription']) if row['transcription'] else [],
'created_at': row['created_at']
})
@app.route('/uploads/<int:upload_id>', methods=['DELETE'])
@login_required
def delete_upload(upload_id):
"""Delete an upload."""
db = get_db()
cursor = db.cursor()
cursor.execute('DELETE FROM uploads WHERE id = ? AND user_id = ?', (upload_id, get_current_user_id()))
db.commit()
if cursor.rowcount == 0:
return jsonify({'error': 'Upload not found'}), 404
vacuum_db()
return jsonify({'message': 'Upload deleted successfully'})
@app.route('/uploads/<int:upload_id>/download', methods=['GET'])
@login_required
def download_upload(upload_id):
"""Download upload as zip file."""
db = get_db()
cursor = db.cursor()
cursor.execute('SELECT * FROM uploads WHERE id = ? AND user_id = ?', (upload_id, get_current_user_id()))
row = cursor.fetchone()
if not row:
return jsonify({'error': 'Upload not found'}), 404
zip_buffer = io.BytesIO()
base_name = row['filename'].rsplit('.', 1)[0] if '.' in row['filename'] else row['filename']
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
audio_bytes = base64.b64decode(row['audio_data'])
zf.writestr(f"{base_name}.{row['audio_format']}", audio_bytes)
zf.writestr(f"{base_name}.txt", row['text_content'])
if row['transcription']:
zf.writestr(f"{base_name}.json", row['transcription'])
zip_buffer.seek(0)
return send_file(
zip_buffer,
mimetype='application/zip',
as_attachment=True,
download_name=f"{base_name}.zip"
)
# --- GENERATION HANDLING ---
@app.route('/generate', methods=['POST'])
@login_required
def generate_audio():
"""Generate audio from text using the TTS API, then get timestamps."""
data = request.json
text_content = data.get('text', '')
voice = data.get('voice', 'af_heart')
speed = data.get('speed', 1.0)
name = data.get('name', '')
save_to_db = data.get('save_to_db', True)
if not text_content:
return jsonify({'error': 'No text provided'}), 400
try:
print(f"🔊 Calling Generate Audio API: voice={voice}, speed={speed}")
response = requests.post(
f"{TTS_API_URL}/generate-audio",
headers={**get_api_headers(), 'Content-Type': 'application/json'},
json={'text': text_content, 'voice': voice, 'speed': speed},
timeout=180
)
if response.status_code != 200:
error_detail = response.json().get('error', 'Unknown error')
return jsonify({'error': f'Generate Audio API Error: {error_detail}'}), response.status_code
result = response.json()
audio_base64 = result.get('audio_base64', '')
source_format = result.get('audio_format', 'wav')
if not audio_base64:
return jsonify({'error': 'No audio data received from API'}), 500
print(f"💾 Audio generated successfully")
print("⏳ Calling Timestamp API...")
audio_bytes = base64.b64decode(audio_base64)
files = {'audio_file': (f'audio.{source_format}', audio_bytes)}
ts_data = {'text': text_content}
ts_response = requests.post(
f"{TTS_API_URL}/timestamp",
headers=get_api_headers(),
files=files,
data=ts_data,
timeout=120
)
if ts_response.status_code != 200:
error_detail = ts_response.json().get('error', 'Unknown error')
return jsonify({'error': f'Timestamp API Error: {error_detail}'}), ts_response.status_code
ts_result = ts_response.json()
transcription_data = ts_result.get('timestamps', [])
if source_format != 'mp3':
print("🔄 Converting to MP3...")
audio_base64 = convert_to_mp3(audio_base64, source_format)
generation_id = None
if save_to_db:
db = get_db()
cursor = db.cursor()
cursor.execute('''
INSERT INTO generations (user_id, name, audio_data, audio_format, text_content, transcription, voice, speed)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
get_current_user_id(),
name or f"Generation {datetime.now().strftime('%Y-%m-%d %H:%M')}",
audio_base64,
'mp3',
text_content,
json.dumps(transcription_data),
voice,
speed
))
db.commit()
generation_id = cursor.lastrowid
print(f"✅ Generation saved with ID: {generation_id}")
return jsonify({
'message': 'Generated',
'generation_id': generation_id,
'audio_data': audio_base64,
'audio_format': 'mp3',
'text_content': text_content,
'transcription': transcription_data,
'voice': voice,
'speed': speed
})
except requests.exceptions.RequestException as e:
return jsonify({'error': f'API connection error: {str(e)}'}), 500
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({'error': str(e)}), 500
@app.route('/generations', methods=['GET'])
@login_required
def list_generations():
"""List all generations for current user."""
db = get_db()
cursor = db.cursor()
cursor.execute('''
SELECT id, name, audio_format, voice, speed, created_at,
LENGTH(audio_data) as audio_size,
LENGTH(text_content) as text_size
FROM generations
WHERE user_id = ?
ORDER BY created_at DESC
''', (get_current_user_id(),))
rows = cursor.fetchall()
generations = []
for row in rows:
generations.append({
'id': row['id'],
'name': row['name'],
'audio_format': row['audio_format'],
'voice': row['voice'],
'speed': row['speed'],
'created_at': row['created_at'],
'audio_size': row['audio_size'],
'text_size': row['text_size']
})
return jsonify({'generations': generations})
@app.route('/generations/<int:gen_id>', methods=['GET'])
@login_required
def get_generation(gen_id):
"""Get a specific generation."""
db = get_db()
cursor = db.cursor()
cursor.execute('SELECT * FROM generations WHERE id = ? AND user_id = ?', (gen_id, get_current_user_id()))
row = cursor.fetchone()
if not row:
return jsonify({'error': 'Generation not found'}), 404
return jsonify({
'id': row['id'],
'name': row['name'],
'audio_data': row['audio_data'],
'audio_format': row['audio_format'],
'text_content': row['text_content'],
'transcription': json.loads(row['transcription']) if row['transcription'] else [],
'voice': row['voice'],
'speed': row['speed'],
'created_at': row['created_at']
})
@app.route('/generations/<int:gen_id>', methods=['DELETE'])
@login_required
def delete_generation(gen_id):
"""Delete a generation."""
db = get_db()
cursor = db.cursor()
cursor.execute('DELETE FROM generations WHERE id = ? AND user_id = ?', (gen_id, get_current_user_id()))
db.commit()
if cursor.rowcount == 0:
return jsonify({'error': 'Generation not found'}), 404
vacuum_db()
return jsonify({'message': 'Generation deleted successfully'})
@app.route('/generations/<int:gen_id>/download', methods=['GET'])
@login_required
def download_generation(gen_id):
"""Download generation as zip file."""
db = get_db()
cursor = db.cursor()
cursor.execute('SELECT * FROM generations WHERE id = ? AND user_id = ?', (gen_id, get_current_user_id()))
row = cursor.fetchone()
if not row:
return jsonify({'error': 'Generation not found'}), 404
zip_buffer = io.BytesIO()
base_name = row['name'].replace(' ', '_') if row['name'] else f"generation_{gen_id}"
base_name = "".join(c for c in base_name if c.isalnum() or c in ('_', '-'))
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
audio_bytes = base64.b64decode(row['audio_data'])
zf.writestr(f"{base_name}.{row['audio_format']}", audio_bytes)
zf.writestr(f"{base_name}.txt", row['text_content'])
if row['transcription']:
zf.writestr(f"{base_name}.json", row['transcription'])
zip_buffer.seek(0)
return send_file(
zip_buffer,
mimetype='application/zip',
as_attachment=True,
download_name=f"{base_name}.zip"
)
# --- PROJECT HANDLING (BULK) ---
@app.route('/projects', methods=['GET'])
@login_required
def list_projects():
"""List all projects for current user."""
db = get_db()
cursor = db.cursor()
cursor.execute('''
SELECT p.id, p.name, p.created_at, p.updated_at,
COUNT(ps.id) as section_count
FROM projects p
LEFT JOIN project_sections ps ON p.id = ps.project_id
WHERE p.user_id = ?
GROUP BY p.id
ORDER BY p.updated_at DESC
''', (get_current_user_id(),))
rows = cursor.fetchall()
projects = []
for row in rows:
projects.append({
'id': row['id'],
'name': row['name'],
'created_at': row['created_at'],
'updated_at': row['updated_at'],
'section_count': row['section_count']
})
return jsonify({'projects': projects})
@app.route('/projects', methods=['POST'])
@login_required
def create_project():
"""Create a new project."""
data = request.json
name = data.get('name', '').strip()
if not name:
return jsonify({'error': 'Project name is required'}), 400
db = get_db()
cursor = db.cursor()
try:
cursor.execute('INSERT INTO projects (user_id, name) VALUES (?, ?)', (get_current_user_id(), name))
db.commit()
return jsonify({
'message': 'Project created',
'project_id': cursor.lastrowid,
'name': name
})
except sqlite3.IntegrityError:
return jsonify({'error': 'Project with this name already exists'}), 400
@app.route('/projects/<int:project_id>', methods=['GET'])
@login_required
def get_project(project_id):
"""Get a project with all its sections."""
db = get_db()
cursor = db.cursor()
cursor.execute('SELECT * FROM projects WHERE id = ? AND user_id = ?', (project_id, get_current_user_id()))
project = cursor.fetchone()
if not project:
return jsonify({'error': 'Project not found'}), 404
cursor.execute('''
SELECT id, chapter, section, audio_data, audio_format,
text_content, html_content, tts_text, transcription, voice,
image_data, image_format, created_at
FROM project_sections
WHERE project_id = ?
ORDER BY chapter, section
''', (project_id,))
sections = cursor.fetchall()
section_list = []
for s in sections:
html_content = s['html_content'] if 'html_content' in s.keys() else None
tts_text = s['tts_text'] if 'tts_text' in s.keys() else None
image_data = s['image_data'] if 'image_data' in s.keys() else None
image_format = s['image_format'] if 'image_format' in s.keys() else 'png'
section_list.append({
'id': s['id'],
'chapter': s['chapter'],
'section': s['section'],
'audio_data': s['audio_data'],
'audio_format': s['audio_format'] or 'mp3',
'text_content': s['text_content'],
'html_content': html_content,
'tts_text': tts_text,
'transcription': json.loads(s['transcription']) if s['transcription'] else [],
'voice': s['voice'] or 'af_heart',
'image_data': image_data,
'image_format': image_format,
'created_at': s['created_at'],
'has_audio': s['audio_data'] is not None and len(s['audio_data'] or '') > 0,
'has_image': image_data is not None and len(image_data or '') > 0
})
return jsonify({
'id': project['id'],
'name': project['name'],
'created_at': project['created_at'],
'updated_at': project['updated_at'],
'sections': section_list
})
@app.route('/projects/<int:project_id>', methods=['DELETE'])
@login_required
def delete_project(project_id):
"""Delete a project and all its sections."""
db = get_db()
cursor = db.cursor()
# Verify ownership
cursor.execute('SELECT id FROM projects WHERE id = ? AND user_id = ?', (project_id, get_current_user_id()))
if not cursor.fetchone():
return jsonify({'error': 'Project not found'}), 404
cursor.execute('DELETE FROM project_sections WHERE project_id = ?', (project_id,))
cursor.execute('DELETE FROM projects WHERE id = ?', (project_id,))
db.commit()
vacuum_db()
return jsonify({'message': 'Project deleted successfully'})
@app.route('/projects/<int:project_id>/sections', methods=['POST'])
@login_required
def add_or_update_section(project_id):
"""Add or update a section in a project."""
data = request.json
chapter = data.get('chapter', 1)
section = data.get('section', 1)
text_content = data.get('text', '')
html_content = data.get('html_content', '')
tts_text = data.get('tts_text', '')
voice = data.get('voice', 'af_heart')
image_data = data.get('image_data', '')
image_format = data.get('image_format', 'png')
generate_audio_flag = data.get('generate_audio', True)
if not text_content:
return jsonify({'error': 'Text content is required'}), 400
db = get_db()
cursor = db.cursor()
# Verify ownership
cursor.execute('SELECT id FROM projects WHERE id = ? AND user_id = ?', (project_id, get_current_user_id()))
if not cursor.fetchone():
return jsonify({'error': 'Project not found'}), 404
audio_base64 = None
audio_format = 'mp3'
transcription_data = []
if generate_audio_flag:
try:
gen_text = tts_text if tts_text else text_content
print(f"🔊 Generating audio for Ch{chapter}.Sec{section}")
response = requests.post(
f"{TTS_API_URL}/generate-audio",
headers={**get_api_headers(), 'Content-Type': 'application/json'},
json={'text': gen_text, 'voice': voice, 'speed': 1.0},
timeout=180
)
if response.status_code != 200:
error_detail = response.json().get('error', 'Unknown error')
return jsonify({'error': f'Generate Audio API Error: {error_detail}'}), response.status_code
result = response.json()
audio_base64 = result.get('audio_base64', '')
source_format = result.get('audio_format', 'wav')
if audio_base64:
audio_bytes = base64.b64decode(audio_base64)
files = {'audio_file': (f'audio.{source_format}', audio_bytes)}
ts_data = {'text': gen_text}
ts_response = requests.post(
f"{TTS_API_URL}/timestamp",
headers=get_api_headers(),
files=files,
data=ts_data,
timeout=120
)
if ts_response.status_code == 200:
ts_result = ts_response.json()
transcription_data = ts_result.get('timestamps', [])
if source_format != 'mp3':
print("🔄 Converting to MP3...")
audio_base64 = convert_to_mp3(audio_base64, source_format)
except Exception as e:
print(f"Error generating audio: {e}")
return jsonify({'error': str(e)}), 500
cursor.execute('''
INSERT INTO project_sections (project_id, chapter, section, audio_data, audio_format, text_content, html_content, tts_text, transcription, voice, image_data, image_format)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(project_id, chapter, section) DO UPDATE SET
audio_data = excluded.audio_data,
audio_format = excluded.audio_format,
text_content = excluded.text_content,
html_content = excluded.html_content,
tts_text = excluded.tts_text,
transcription = excluded.transcription,
voice = excluded.voice,
image_data = excluded.image_data,
image_format = excluded.image_format
''', (
project_id,
chapter,
section,
audio_base64,
audio_format,
text_content,
html_content,
tts_text,
json.dumps(transcription_data),
voice,
image_data if image_data else None,
image_format
))
cursor.execute('UPDATE projects SET updated_at = CURRENT_TIMESTAMP WHERE id = ?', (project_id,))
db.commit()
section_id = cursor.lastrowid
return jsonify({
'message': 'Section saved',
'section_id': section_id,
'chapter': chapter,
'section': section,
'audio_data': audio_base64,
'audio_format': audio_format,
'text_content': text_content,
'html_content': html_content,
'tts_text': tts_text,
'transcription': transcription_data,
'voice': voice,
'image_data': image_data,
'image_format': image_format
})
@app.route('/projects/<int:project_id>/sections/save', methods=['POST'])
@login_required
def save_section_directly(project_id):
"""Save a section directly without generating audio."""
data = request.json
chapter = data.get('chapter', 1)
section = data.get('section', 1)
text_content = data.get('text', '')
html_content = data.get('html_content', '')
tts_text = data.get('tts_text', '')
voice = data.get('voice', 'af_heart')
audio_data = data.get('audio_data', '')
audio_format = data.get('audio_format', 'mp3')
transcription = data.get('transcription', [])
image_data = data.get('image_data', '')
image_format = data.get('image_format', 'png')
db = get_db()
cursor = db.cursor()
# Verify ownership
cursor.execute('SELECT id FROM projects WHERE id = ? AND user_id = ?', (project_id, get_current_user_id()))
if not cursor.fetchone():
return jsonify({'error': 'Project not found'}), 404
print(f"💾 Direct save: Ch{chapter}.Sec{section} to project {project_id}")
cursor.execute('''
INSERT INTO project_sections (project_id, chapter, section, audio_data, audio_format, text_content, html_content, tts_text, transcription, voice, image_data, image_format)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(project_id, chapter, section) DO UPDATE SET
audio_data = CASE WHEN excluded.audio_data IS NOT NULL AND excluded.audio_data != '' THEN excluded.audio_data ELSE project_sections.audio_data END,
audio_format = excluded.audio_format,
text_content = excluded.text_content,
html_content = excluded.html_content,
tts_text = excluded.tts_text,
transcription = CASE WHEN excluded.transcription IS NOT NULL AND excluded.transcription != '[]' THEN excluded.transcription ELSE project_sections.transcription END,
voice = excluded.voice,
image_data = CASE WHEN excluded.image_data IS NOT NULL AND excluded.image_data != '' THEN excluded.image_data ELSE project_sections.image_data END,
image_format = CASE WHEN excluded.image_format IS NOT NULL THEN excluded.image_format ELSE project_sections.image_format END
''', (
project_id,
chapter,
section,
audio_data if audio_data else None,
audio_format,
text_content,
html_content,
tts_text,
json.dumps(transcription) if transcription else '[]',
voice,
image_data if image_data else None,
image_format
))
cursor.execute('UPDATE projects SET updated_at = CURRENT_TIMESTAMP WHERE id = ?', (project_id,))
db.commit()
return jsonify({
'message': 'Section saved',
'chapter': chapter,
'section': section
})
@app.route('/projects/<int:project_id>/save_all', methods=['POST'])
@login_required
def save_all_sections(project_id):
"""Save all sections at once."""
data = request.json
sections = data.get('sections', [])
db = get_db()
cursor = db.cursor()
# Verify ownership
cursor.execute('SELECT id FROM projects WHERE id = ? AND user_id = ?', (project_id, get_current_user_id()))
if not cursor.fetchone():
return jsonify({'error': 'Project not found'}), 404
saved_count = 0
for sec in sections:
chapter = sec.get('chapter', 1)
section = sec.get('section', 1)
text_content = sec.get('text', '')
html_content = sec.get('htmlContent') or sec.get('html_content', '')
tts_text = sec.get('ttsText') or sec.get('tts_text', '')
voice = sec.get('voice', 'af_heart')
audio_data = sec.get('audioData') or sec.get('audio_data', '')
audio_format = sec.get('audioFormat') or sec.get('audio_format', 'mp3')
transcription = sec.get('transcription', [])
image_data = sec.get('imageData') or sec.get('image_data', '')
image_format = sec.get('imageFormat') or sec.get('image_format', 'png')
print(f"💾 Saving Ch{chapter}.Sec{section} (image: {len(image_data) if image_data else 0} bytes)")
cursor.execute('''
INSERT INTO project_sections (project_id, chapter, section, audio_data, audio_format, text_content, html_content, tts_text, transcription, voice, image_data, image_format)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(project_id, chapter, section) DO UPDATE SET
audio_data = CASE WHEN excluded.audio_data IS NOT NULL AND excluded.audio_data != '' THEN excluded.audio_data ELSE project_sections.audio_data END,
audio_format = excluded.audio_format,
text_content = excluded.text_content,
html_content = excluded.html_content,
tts_text = excluded.tts_text,
transcription = CASE WHEN excluded.transcription IS NOT NULL AND excluded.transcription != '[]' THEN excluded.transcription ELSE project_sections.transcription END,
voice = excluded.voice,
image_data = CASE WHEN excluded.image_data IS NOT NULL AND excluded.image_data != '' THEN excluded.image_data ELSE project_sections.image_data END,
image_format = CASE WHEN excluded.image_format IS NOT NULL AND excluded.image_format != '' THEN excluded.image_format ELSE project_sections.image_format END
''', (
project_id,
chapter,
section,
audio_data if audio_data else None,
audio_format,
text_content,
html_content,
tts_text,
json.dumps(transcription) if transcription else '[]',
voice,
image_data if image_data else None,
image_format
))
saved_count += 1
cursor.execute('UPDATE projects SET updated_at = CURRENT_TIMESTAMP WHERE id = ?', (project_id,))
db.commit()
print(f"✅ Saved {saved_count} sections to project {project_id}")
return jsonify({
'message': f'Saved {saved_count} sections',
'saved_count': saved_count
})
@app.route('/projects/<int:project_id>/sections/<int:section_id>', methods=['GET'])
@login_required
def get_section(project_id, section_id):
"""Get a specific section with audio data."""
db = get_db()
cursor = db.cursor()
# Verify ownership
cursor.execute('SELECT id FROM projects WHERE id = ? AND user_id = ?', (project_id, get_current_user_id()))
if not cursor.fetchone():
return jsonify({'error': 'Project not found'}), 404
cursor.execute('''
SELECT * FROM project_sections
WHERE id = ? AND project_id = ?
''', (section_id, project_id))
row = cursor.fetchone()
if not row:
return jsonify({'error': 'Section not found'}), 404
html_content = row['html_content'] if 'html_content' in row.keys() else None
tts_text = row['tts_text'] if 'tts_text' in row.keys() else None
image_data = row['image_data'] if 'image_data' in row.keys() else None
image_format = row['image_format'] if 'image_format' in row.keys() else 'png'
return jsonify({
'id': row['id'],
'chapter': row['chapter'],
'section': row['section'],
'audio_data': row['audio_data'],
'audio_format': row['audio_format'] or 'mp3',
'text_content': row['text_content'],
'html_content': html_content,
'tts_text': tts_text,
'transcription': json.loads(row['transcription']) if row['transcription'] else [],
'voice': row['voice'] or 'af_heart',
'image_data': image_data,
'image_format': image_format,
'created_at': row['created_at']
})
@app.route('/projects/<int:project_id>/sections/<int:section_id>', methods=['DELETE'])
@login_required
def delete_section(project_id, section_id):
"""Delete a section."""
db = get_db()
cursor = db.cursor()
# Verify ownership
cursor.execute('SELECT id FROM projects WHERE id = ? AND user_id = ?', (project_id, get_current_user_id()))
if not cursor.fetchone():
return jsonify({'error': 'Project not found'}), 404
cursor.execute('''
DELETE FROM project_sections
WHERE id = ? AND project_id = ?
''', (section_id, project_id))
db.commit()
if cursor.rowcount == 0:
return jsonify({'error': 'Section not found'}), 404
vacuum_db()
return jsonify({'message': 'Section deleted successfully'})
@app.route('/projects/<int:project_id>/download', methods=['GET'])
@login_required
def download_project(project_id):
"""Download entire project as zip file (simple format)."""
db = get_db()
cursor = db.cursor()
# Verify ownership
cursor.execute('SELECT * FROM projects WHERE id = ? AND user_id = ?', (project_id, get_current_user_id()))
project = cursor.fetchone()
if not project:
return jsonify({'error': 'Project not found'}), 404
cursor.execute('''
SELECT * FROM project_sections
WHERE project_id = ?
ORDER BY chapter, section
''', (project_id,))
sections = cursor.fetchall()
zip_buffer = io.BytesIO()
project_name = "".join(c for c in project['name'] if c.isalnum() or c in ('_', '-', ' '))
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for s in sections:
file_prefix = f"{s['chapter']}.{s['section']}_{project_name}"
if s['audio_data']:
audio_bytes = base64.b64decode(s['audio_data'])
zf.writestr(f"{file_prefix}.{s['audio_format'] or 'mp3'}", audio_bytes)
zf.writestr(f"{file_prefix}.txt", s['text_content'] or '')
if s['transcription']:
zf.writestr(f"{file_prefix}.json", s['transcription'])
image_data = s['image_data'] if 'image_data' in s.keys() else None
image_format = s['image_format'] if 'image_format' in s.keys() else 'png'
if image_data:
image_bytes = base64.b64decode(image_data)
zf.writestr(f"{file_prefix}.{image_format}", image_bytes)
zip_buffer.seek(0)
return send_file(
zip_buffer,
mimetype='application/zip',
as_attachment=True,
download_name=f"{project_name}.zip"
)
# --- BULK EXPORT WITH MANIFEST AND READER ---
def get_reader_template(filename):
"""Load reader template file."""
template_path = os.path.join(READER_TEMPLATES_DIR, filename)
if os.path.exists(template_path):
with open(template_path, 'r', encoding='utf-8') as f:
return f.read()
return None
@app.route('/export_bulk', methods=['POST'])
@login_required
def export_bulk():
"""Export bulk project files with manifest.json, index.html, and Reader.html."""
try:
data = request.json
project_name = data.get('projectName', 'Book-1').strip()
files = data.get('files', [])
print(f"📦 Export bulk: project={project_name}, files={len(files)}")
if not files:
return jsonify({'error': 'No files to export'}), 400
db = get_db()
cursor = db.cursor()
# Create or get project for current user
cursor.execute('SELECT id FROM projects WHERE name = ? AND user_id = ?', (project_name, get_current_user_id()))
row = cursor.fetchone()
if row:
project_id = row['id']
print(f"📁 Using existing project ID: {project_id}")
else:
cursor.execute('INSERT INTO projects (user_id, name) VALUES (?, ?)', (get_current_user_id(), project_name))
db.commit()
project_id = cursor.lastrowid
print(f"📁 Created new project ID: {project_id}")
# Save all sections to database
for file_item in files:
chapter = file_item.get('chapter', 0)
section = file_item.get('section', 0)
text = file_item.get('text', '')
html_content = file_item.get('htmlContent') or file_item.get('html_content', '')
tts_text = file_item.get('ttsText') or file_item.get('tts_text', '')
transcription = file_item.get('transcription', [])
audio_data = file_item.get('audioData') or file_item.get('audio_data', '')
audio_format = file_item.get('audioFormat') or file_item.get('audio_format', 'mp3')
voice = file_item.get('voice', 'af_heart')
image_data = file_item.get('imageData') or file_item.get('image_data', '')
image_format = file_item.get('imageFormat') or file_item.get('image_format', 'png')
print(f" 💾 Saving Ch{chapter}.Sec{section}")
cursor.execute('''
INSERT INTO project_sections (project_id, chapter, section, audio_data, audio_format, text_content, html_content, tts_text, transcription, voice, image_data, image_format)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(project_id, chapter, section) DO UPDATE SET
audio_data = CASE WHEN excluded.audio_data IS NOT NULL AND excluded.audio_data != '' THEN excluded.audio_data ELSE project_sections.audio_data END,
audio_format = excluded.audio_format,
text_content = excluded.text_content,
html_content = excluded.html_content,
tts_text = excluded.tts_text,
transcription = CASE WHEN excluded.transcription IS NOT NULL AND excluded.transcription != '[]' THEN excluded.transcription ELSE project_sections.transcription END,
voice = excluded.voice,
image_data = CASE WHEN excluded.image_data IS NOT NULL AND excluded.image_data != '' THEN excluded.image_data ELSE project_sections.image_data END,
image_format = CASE WHEN excluded.image_format IS NOT NULL AND excluded.image_format != '' THEN excluded.image_format ELSE project_sections.image_format END
''', (
project_id,
chapter,
section,
audio_data if audio_data else None,
audio_format,
text,
html_content,
tts_text,
json.dumps(transcription) if transcription else '[]',
voice,
image_data if image_data else None,
image_format
))
cursor.execute('UPDATE projects SET updated_at = CURRENT_TIMESTAMP WHERE id = ?', (project_id,))
db.commit()
print(f"✅ Saved {len(files)} sections to project {project_id}")
# Fetch all sections for export
cursor.execute('SELECT * FROM project_sections WHERE project_id = ? ORDER BY chapter, section', (project_id,))
sections = cursor.fetchall()
# Create ZIP with proper structure
zip_buffer = io.BytesIO()
safe_name = "".join(c for c in project_name if c.isalnum() or c in ('_', '-', ' '))
# Build manifest assets list
manifest_assets = []
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for s in sections:
chapter = s['chapter']
section = s['section']
file_prefix = f"{chapter}.{section}_{safe_name}"
asset_entry = {
"prefix": f"{chapter}.{section}_",
"textFile": f"book/{file_prefix}.txt",
"audioFile": None,
"jsonFile": None,
"imageFile": None
}
# Add text file
zf.writestr(f"book/{file_prefix}.txt", s['text_content'] or '')
# Add audio file
if s['audio_data']:
audio_bytes = base64.b64decode(s['audio_data'])
audio_ext = s['audio_format'] or 'mp3'
zf.writestr(f"book/{file_prefix}.{audio_ext}", audio_bytes)
asset_entry["audioFile"] = f"book/{file_prefix}.{audio_ext}"
# Add JSON timestamps
if s['transcription'] and s['transcription'] != '[]':
zf.writestr(f"book/{file_prefix}.json", s['transcription'])
asset_entry["jsonFile"] = f"book/{file_prefix}.json"
# Add image file
image_data = s['image_data'] if 'image_data' in s.keys() else None
image_format = s['image_format'] if 'image_format' in s.keys() else 'png'
if image_data:
image_bytes = base64.b64decode(image_data)
zf.writestr(f"book/{file_prefix}.{image_format}", image_bytes)
asset_entry["imageFile"] = f"book/{file_prefix}.{image_format}"
manifest_assets.append(asset_entry)
# Create manifest.json
manifest = {
"title": project_name,
"assets": manifest_assets
}
zf.writestr("manifest.json", json.dumps(manifest, indent=2))
# Add index.html
index_html = get_reader_template('index.html')
if index_html:
zf.writestr("index.html", index_html)
else:
print("⚠️ index.html template not found")
# Add Reader.html
reader_html = get_reader_template('Reader.html')
if reader_html:
zf.writestr("Reader.html", reader_html)
else:
print("⚠️ Reader.html template not found")
zip_buffer.seek(0)
print(f"📤 Sending zip: {safe_name}.zip")
return send_file(
zip_buffer,
mimetype='application/zip',
as_attachment=True,
download_name=f"{safe_name}.zip"
)
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({'error': str(e)}), 500
# --- LEGACY SINGLE EXPORT ---
@app.route('/export', methods=['POST'])
@login_required
def export_project():
"""Export single project files as zip."""
try:
data = request.json
base_name = data.get('filename', '').strip()
if not base_name:
base_name = 'task-1'
base_name = "".join(c for c in base_name if c.isalnum() or c in ('_', '-'))
text = data.get('text', '')
json_data = data.get('transcription', [])
audio_data = data.get('audio_data', '')
audio_format = data.get('audio_format', 'mp3')
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
zf.writestr(f"{base_name}.txt", text)
zf.writestr(f"{base_name}.json", json.dumps(json_data, indent=2))
if audio_data:
audio_bytes = base64.b64decode(audio_data)
zf.writestr(f"{base_name}.{audio_format}", audio_bytes)
zip_buffer.seek(0)
return send_file(
zip_buffer,
mimetype='application/zip',
as_attachment=True,
download_name=f"{base_name}.zip"
)
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({'error': str(e)}), 500
# --- DATABASE STATS ---
@app.route('/db/stats', methods=['GET'])
@login_required
def db_stats():
"""Get database statistics for current user."""
db = get_db()
cursor = db.cursor()
user_id = get_current_user_id()
cursor.execute('SELECT COUNT(*) as count FROM uploads WHERE user_id = ?', (user_id,))
uploads_count = cursor.fetchone()['count']
cursor.execute('SELECT COUNT(*) as count FROM generations WHERE user_id = ?', (user_id,))
generations_count = cursor.fetchone()['count']
cursor.execute('SELECT COUNT(*) as count FROM projects WHERE user_id = ?', (user_id,))
projects_count = cursor.fetchone()['count']
cursor.execute('''
SELECT COUNT(*) as count FROM project_sections
WHERE project_id IN (SELECT id FROM projects WHERE user_id = ?)
''', (user_id,))
sections_count = cursor.fetchone()['count']
db_size = os.path.getsize(DATABASE) if os.path.exists(DATABASE) else 0
return jsonify({
'uploads': uploads_count,
'generations': generations_count,
'projects': projects_count,
'sections': sections_count,
'database_size_bytes': db_size,
'database_size_mb': round(db_size / (1024 * 1024), 2)
})
# --- HEALTH CHECK ENDPOINT ---
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint for container orchestration."""
try:
# Check database connection
with get_db_connection() as conn:
conn.execute('SELECT 1')
return jsonify({
'status': 'healthy',
'database': 'connected',
'database_path': DATABASE
}), 200
except Exception as e:
return jsonify({
'status': 'unhealthy',
'error': str(e)
}), 500
if __name__ == '__main__':
print("=" * 60)
print("🚀 Audio Transcription Editor Starting...")
print("=" * 60)
print(f"📍 API Server: {TTS_API_URL}")
print(f"📍 API Key: {'✅ Configured' if TTS_API_KEY else '❌ NOT CONFIGURED!'}")
print(f"📍 Database Directory: {DATABASE_DIR}")
print(f"📍 Database File: {DATABASE}")
print(f"📍 Static Files: {app.static_folder}")
print(f"📍 Reader Templates: {READER_TEMPLATES_DIR}")
print("-" * 60)
print("🔐 Default Admin: admin / admin123")
print("-" * 60)
# Create reader_templates directory if it doesn't exist
if not os.path.exists(READER_TEMPLATES_DIR):
os.makedirs(READER_TEMPLATES_DIR)
print(f"📁 Created reader_templates directory")
if not TTS_API_KEY:
print("⚠️ WARNING: TTS_API_KEY not set in .env file!")
print("=" * 60)
app.run(debug=True, port=5009, host='0.0.0.0')