226 lines
8.2 KiB
Python
226 lines
8.2 KiB
Python
# routes/generation_routes.py - TTS Audio Generation Routes
|
|
|
|
import json
|
|
import base64
|
|
import requests
|
|
from flask import Blueprint, request, jsonify
|
|
|
|
from db import get_db
|
|
from config import TTS_API_URL, get_api_headers, get_api_headers_json
|
|
from utils import convert_to_mp3, strip_markdown
|
|
from auth import login_required
|
|
|
|
generation_bp = Blueprint('generation', __name__)
|
|
|
|
|
|
@generation_bp.route('/api/generate', methods=['POST'])
|
|
@login_required
|
|
def generate_audio():
|
|
"""Generate audio for a single block."""
|
|
data = request.json
|
|
text = data.get('text', '')
|
|
voice = data.get('voice', 'af_heart')
|
|
block_id = data.get('block_id')
|
|
|
|
if not text:
|
|
return jsonify({'error': 'No text provided'}), 400
|
|
|
|
stripped = text.strip()
|
|
if stripped.startswith(''):
|
|
return jsonify({'error': 'Cannot generate audio for image content'}), 400
|
|
|
|
clean_text = strip_markdown(text)
|
|
|
|
if not clean_text.strip():
|
|
return jsonify({'error': 'No speakable text content'}), 400
|
|
|
|
try:
|
|
print(f"🔊 Generating audio: voice={voice}, text length={len(clean_text)}")
|
|
print(f" Text preview: {clean_text[:100]}...")
|
|
|
|
response = requests.post(
|
|
f"{TTS_API_URL}/generate-audio",
|
|
headers=get_api_headers_json(),
|
|
json={'text': clean_text, 'voice': voice, 'speed': 1.0},
|
|
timeout=180
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
try:
|
|
error_detail = response.json().get('error', 'Unknown error')
|
|
except Exception:
|
|
error_detail = f'HTTP {response.status_code}'
|
|
print(f"❌ TTS API Error: {error_detail}")
|
|
return jsonify({'error': f'TTS API Error: {error_detail}'}), response.status_code
|
|
|
|
result = response.json()
|
|
audio_base64 = result.get('audio_base64', '')
|
|
source_format = result.get('audio_format', 'wav')
|
|
|
|
if not audio_base64:
|
|
return jsonify({'error': 'No audio data received from TTS API'}), 500
|
|
|
|
audio_bytes = base64.b64decode(audio_base64)
|
|
files = {'audio_file': (f'audio.{source_format}', audio_bytes)}
|
|
ts_data = {'text': clean_text}
|
|
|
|
transcription = []
|
|
try:
|
|
ts_response = requests.post(
|
|
f"{TTS_API_URL}/timestamp",
|
|
headers=get_api_headers(),
|
|
files=files,
|
|
data=ts_data,
|
|
timeout=120
|
|
)
|
|
|
|
if ts_response.status_code == 200:
|
|
ts_result = ts_response.json()
|
|
transcription = ts_result.get('timestamps', [])
|
|
print(f"✅ Got {len(transcription)} word timestamps")
|
|
else:
|
|
print(f"⚠️ Timestamp API returned {ts_response.status_code}, continuing without timestamps")
|
|
except Exception as ts_err:
|
|
print(f"⚠️ Timestamp generation failed: {ts_err}, continuing without timestamps")
|
|
|
|
if source_format != 'mp3':
|
|
audio_base64 = convert_to_mp3(audio_base64, source_format)
|
|
|
|
if block_id:
|
|
db = get_db()
|
|
cursor = db.cursor()
|
|
cursor.execute('''
|
|
UPDATE markdown_blocks
|
|
SET audio_data = ?, audio_format = 'mp3', transcription = ?
|
|
WHERE id = ?
|
|
''', (audio_base64, json.dumps(transcription), block_id))
|
|
db.commit()
|
|
|
|
print(f"✅ Audio generated successfully: {len(audio_base64)} bytes base64")
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'audio_data': audio_base64,
|
|
'audio_format': 'mp3',
|
|
'transcription': transcription
|
|
})
|
|
|
|
except requests.exceptions.ConnectionError as e:
|
|
print(f"❌ Cannot connect to TTS API at {TTS_API_URL}: {e}")
|
|
return jsonify({'error': f'Cannot connect to TTS API server. Is it running at {TTS_API_URL}?'}), 500
|
|
except requests.exceptions.Timeout as e:
|
|
print(f"❌ TTS API timeout: {e}")
|
|
return jsonify({'error': 'TTS API request timed out. Text may be too long.'}), 500
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"❌ TTS API request error: {e}")
|
|
return jsonify({'error': f'API connection error: {str(e)}'}), 500
|
|
except Exception as e:
|
|
import traceback
|
|
traceback.print_exc()
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@generation_bp.route('/api/generate-chapter', methods=['POST'])
|
|
@login_required
|
|
def generate_chapter_audio():
|
|
"""Generate audio for all blocks in a chapter."""
|
|
data = request.json
|
|
chapter_id = data.get('chapter_id')
|
|
voice = data.get('voice', 'af_heart')
|
|
|
|
if not chapter_id:
|
|
return jsonify({'error': 'Chapter ID required'}), 400
|
|
|
|
db = get_db()
|
|
cursor = db.cursor()
|
|
|
|
cursor.execute('''
|
|
SELECT id, content, tts_text, block_type FROM markdown_blocks
|
|
WHERE chapter_id = ? ORDER BY block_order
|
|
''', (chapter_id,))
|
|
blocks = cursor.fetchall()
|
|
|
|
if not blocks:
|
|
return jsonify({'error': 'No blocks found in chapter'}), 404
|
|
|
|
results = []
|
|
|
|
for block in blocks:
|
|
block_id = block['id']
|
|
block_type = block['block_type'] if 'block_type' in block.keys() else 'paragraph'
|
|
content = block['content'] or ''
|
|
text = block['tts_text'] if block['tts_text'] else content
|
|
|
|
if block_type == 'image':
|
|
results.append({'block_id': block_id, 'success': True, 'skipped': True, 'reason': 'image block'})
|
|
continue
|
|
|
|
stripped = text.strip()
|
|
if stripped.startswith(''):
|
|
results.append({'block_id': block_id, 'success': True, 'skipped': True, 'reason': 'image markdown'})
|
|
continue
|
|
|
|
clean_text = strip_markdown(text)
|
|
if not clean_text.strip():
|
|
results.append({'block_id': block_id, 'success': True, 'skipped': True, 'reason': 'empty text'})
|
|
continue
|
|
|
|
try:
|
|
response = requests.post(
|
|
f"{TTS_API_URL}/generate-audio",
|
|
headers=get_api_headers_json(),
|
|
json={'text': clean_text, 'voice': voice, 'speed': 1.0},
|
|
timeout=180
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
results.append({'block_id': block_id, 'success': False, 'error': 'TTS generation failed'})
|
|
continue
|
|
|
|
result = response.json()
|
|
audio_base64 = result.get('audio_base64', '')
|
|
source_format = result.get('audio_format', 'wav')
|
|
|
|
transcription = []
|
|
try:
|
|
audio_bytes = base64.b64decode(audio_base64)
|
|
files = {'audio_file': (f'audio.{source_format}', audio_bytes)}
|
|
ts_data = {'text': clean_text}
|
|
|
|
ts_response = requests.post(
|
|
f"{TTS_API_URL}/timestamp",
|
|
headers=get_api_headers(),
|
|
files=files,
|
|
data=ts_data,
|
|
timeout=120
|
|
)
|
|
|
|
if ts_response.status_code == 200:
|
|
ts_result = ts_response.json()
|
|
transcription = ts_result.get('timestamps', [])
|
|
except Exception:
|
|
pass
|
|
|
|
if source_format != 'mp3':
|
|
audio_base64 = convert_to_mp3(audio_base64, source_format)
|
|
|
|
cursor.execute('''
|
|
UPDATE markdown_blocks
|
|
SET audio_data = ?, audio_format = 'mp3', transcription = ?
|
|
WHERE id = ?
|
|
''', (audio_base64, json.dumps(transcription), block_id))
|
|
|
|
results.append({
|
|
'block_id': block_id,
|
|
'success': True,
|
|
'audio_data': audio_base64,
|
|
'transcription': transcription
|
|
})
|
|
|
|
except Exception as e:
|
|
results.append({'block_id': block_id, 'success': False, 'error': str(e)})
|
|
|
|
db.commit()
|
|
|
|
return jsonify({'success': True, 'results': results})
|