Speech Annotation Tool Header

Data annotation teams face a universal challenge: reviewing and correcting thousands of transcriptions from automatic speech recognition (ASR) systems is tedious, error-prone, and becomes a bottleneck in any speech AI pipeline. Building a tool to streamline this process is non-trivial—you need to handle large audio files, track progress asynchronously, persist session state, and prevent accidental edits.

In this deep dive, I'll walk you through building a production-ready speech annotation platform that handles these challenges elegantly. We'll cover the complete architecture including intelligent background job processing without external queues, real-time progress tracking, browser-based session persistence, and smart row locking. The result is a lightweight yet powerful tool built on Flask, Whisper, and FFmpeg that can scale from a solo annotator to large data teams.

The Problem: Why Transcription Review Is Hard

On the surface, transcription review seems straightforward: "Load audio, display text, let users edit and save." But real-world complexity emerges quickly:

A production system must solve all of these gracefully.

The Architecture: Two Complementary Workflows

Speech Annotation Tool Architecture

The tool elegantly solves this by supporting two distinct workflows, each optimized for different use cases:

Workflow 1: Review & Correct (For Pre-Transcribed Audio)

Perfect for teams with chunked audio files and Excel transcripts already prepared. Users load both simultaneously, and the platform:

Workflow 2: Auto-Transcribe (For Raw Audio Folders)

For users starting from scratch. They provide a folder of audio files and select a Whisper model, and the platform:

Background Job Processing: Without External Queues

The most critical component for production use is reliable background job processing. I chose Python threading instead of Celery/Redis for a specific reason: simplicity without sacrificing functionality.

Python - Job Manager Architecture
from enum import Enum
from dataclasses import dataclass
from threading import Thread, RLock
import json
from pathlib import Path

class JobType(Enum):
    TRANSCRIBE = "transcribe"
    MANUAL_IMPORT = "manual_import"

class JobManager:
    """Singleton manager for background jobs with type-based locking"""
    
    _instance = None
    
    def __init__(self):
        self._jobs = {}  # Dict[job_id, JobInfo]
        self._active_jobs_by_type = {}  # Lock: only 1 per type
        self._lock = RLock()
        self._jobs_file = Path('data/jobs.json')
        self._load_jobs_state()
    
    def create_job(self, job_id: str, job_type: JobType, 
                  total_items: int, metadata: dict = None):
        """Create a new job"""
        with self._lock:
            job_info = {
                'job_id': job_id,
                'job_type': job_type.value,
                'status': 'pending',
                'progress': 0,
                'total_items': total_items,
                'processed_items': 0,
                'metadata': metadata or {},
            }
            self._jobs[job_id] = job_info
            self._save_jobs_state()
            return job_info
    
    def can_start_job(self, job_type: JobType):
        """Check if another job of this type is already running"""
        with self._lock:
            active_id = self._active_jobs_by_type.get(job_type)
            if active_id:
                return False, active_id
            return True, None
    
    def run_job_async(self, job_id: str, job_type: JobType, 
                    task_func, total_items: int, metadata: dict = None):
        """Start a background job in a daemon thread"""
        job_info = self.create_job(job_id, job_type, total_items, metadata)
        
        def wrapper():
            try:
                # Acquire lock for this job type
                can_start, active_id = self.can_start_job(job_type)
                if not can_start:
                    self.fail_job(job_id, 
                                 f"Another {job_type.value} job already running")
                    return
                
                with self._lock:
                    self._active_jobs_by_type[job_type] = job_id
                    self._jobs[job_id]['status'] = 'running'
                    self._save_jobs_state()
                
                # Run the actual task
                result = task_func(job_id, self)
                self.complete_job(job_id, result)
                
            except Exception as e:
                self.fail_job(job_id, str(e))
            finally:
                with self._lock:
                    self._active_jobs_by_type.pop(job_type, None)
        
        # Start in background thread
        thread = Thread(target=wrapper, daemon=True)
        thread.start()
        
        return job_info
    
    def update_progress(self, job_id: str, processed: int, 
                       total: int):
        """Update job progress"""
        with self._lock:
            if job_id in self._jobs:
                self._jobs[job_id]['processed_items'] = processed
                self._jobs[job_id]['progress'] = int(
                    (processed / total * 100) if total > 0 else 0
                )
                self._save_jobs_state()

Why Threading Instead of Celery?

Threading is often dismissed in favor of task queues like Celery, but for a single-server deployment with moderate load, it offers significant advantages:

Aspect Threading Celery + Redis
Setup Complexity Zero config (built-in) Redis broker + workers required
Memory Overhead Minimal (~10MB) Redis server + workers (200MB+)
Deployment Single process Multiple services to manage
Use Case Solo to small teams Distributed, high-volume
State Persistence JSON file (simple) Automatic (Redis)

Audio Processing: From MP3 to Segments

A critical component is robust audio handling. Users upload MP3, FLAC, OGG, WMA—anything. The system needs to normalize everything to a consistent format and chunk into manageable pieces.

Python - Audio Processing Pipeline
import subprocess
from pathlib import Path

def convert_to_wav(source_path: Path, job_id: str) -> Path:
    """
    Convert any audio format to 16kHz mono WAV using FFmpeg.
    FFmpeg handles format detection automatically.
    """
    
    output_wav = Path(f"data/segments/{job_id}/audio.wav")
    output_wav.parent.mkdir(parents=True, exist_ok=True)
    
    # FFmpeg command: convert to 16kHz mono WAV
    cmd = [
        'ffmpeg',
        '-i', str(source_path),
        '-ar', '16000',        # Audio rate: 16kHz
        '-ac', '1',           # Channels: 1 (mono)
        '-c:a', 'pcm_s16le',  # Codec: 16-bit PCM
        '-y', str(output_wav)
    ]
    
    subprocess.run(cmd, check=True, 
                   capture_output=True)
    return output_wav

def segment_audio(wav_path: Path, job_id: str, 
                 segment_seconds: int = 30) -> List[Path]:
    """
    Split WAV file into fixed-duration segments.
    Uses FFmpeg for efficient splitting without re-encoding.
    """
    
    segment_dir = Path(f"data/segments/{job_id}")
    segment_dir.mkdir(parents=True, exist_ok=True)
    
    # Get total duration in seconds
    duration_cmd = [
        'ffprobe',
        '-v', 'error',
        '-show_entries', 'format=duration',
        '-of', 'default=noprint_wrappers=1:nokey=1:noprint_wrappers=1',
        str(wav_path)
    ]
    
    duration_result = subprocess.run(duration_cmd, capture_output=True,
                                    text=True, check=True)
    total_seconds = float(duration_result.stdout.strip())
    
    segments = []
    segment_index = 0
    current_start = 0.0
    
    while current_start < total_seconds:
        current_end = min(current_start + segment_seconds, total_seconds)
        duration = current_end - current_start
        
        segment_path = segment_dir / f"{segment_index:03d}.wav"
        
        # FFmpeg trim: extract segment without re-encoding
        trim_cmd = [
            'ffmpeg',
            '-i', str(wav_path),
            '-ss', str(current_start),
            '-t', str(duration),
            '-c', 'copy',           # No re-encoding
            '-y', str(segment_path)
        ]
        
        subprocess.run(trim_cmd, check=True,
                      capture_output=True)
        
        segments.append(segment_path)
        current_start = current_end
        segment_index += 1
    
    return segments

Key design decisions:

Session Persistence: LocalStorage for Tracking

One challenge with web applications is that users expect their work to persist across browser refreshes. Without a backend database per user, we can leverage browser localStorage to track corrections.

JavaScript - Correction Tracker
class CorrectionTracker {
    constructor() {
        this.storageKey = 'asr_corrections_tracker';
        this.corrections = this._loadFromStorage();
    }
    
    _loadFromStorage() {
        const data = localStorage.getItem(this.storageKey);
        return data ? JSON.parse(data) : {};
    }
    
    _saveToStorage() {
        localStorage.setItem(this.storageKey, 
                           JSON.stringify(this.corrections));
    }
    
    markCorrected(recordId, originalText, correctedText) {
        """Mark a record as corrected"""
        this.corrections[recordId] = {
            corrected: true,
            originalText: originalText,
            correctedText: correctedText,
            timestamp: new Date().toISOString()
        };
        this._saveToStorage();
    }
    
    isCorrected(recordId) {
        return recordId in this.corrections 
               && this.corrections[recordId].corrected;
    }
    
    getStats() {
        const total = Object.keys(this.corrections).length;
        const corrected = Object.values(
            this.corrections
        ).filter(c => c.corrected).length;
        
        return { total, corrected, progress: 
                  total > 0 ? (corrected / total * 100).toFixed(1) : 0 };
    }
    
    // Usage:
    // tracker.markCorrected('uuid-1', 'Original', 'Corrected')
    // tracker.isCorrected('uuid-1') → true
    // tracker.getStats() → {total: 47, corrected: 23, progress: 48.9%}
}

Smart Row Locking: Prevent Accidental Overwrites

Once an annotator finishes editing a record, they should be able to "lock" it to prevent accidental changes. This is stored both in the CSV backend and checked on save.

Python - Row Locking Logic
def update_record(record_id: str, 
                  corrected_text: str) -> Tuple[bool, str]:
    """Update a record's correction, respecting locks"""
    
    with _lock:  # Thread-safe
        df = pd.read_csv(STATE_FILE)
        record = df[df['id'] == record_id]
        
        if record.empty:
            return False, "Record not found"
        
        if record['locked'].iloc[0]:
            return False, "Record is locked. Unlock first."
        
        # Update the record
        df.loc[df['id'] == record_id, 
               'correct_transcripts'] = corrected_text
        
        pd.to_csv(STATE_FILE, index=False)
        return True, "Record updated"

def lock_record(record_id: str) -> bool:
    """Lock a record to prevent edits"""
    
    with _lock:
        df = pd.read_csv(STATE_FILE)
        df.loc[df['id'] == record_id, 'locked'] = True
        df.loc[df['id'] == record_id, 'locked_at'] = (
            pd.Timestamp.now().isoformat()
        )
        df.to_csv(STATE_FILE, index=False)
        return True

Real-Time Progress Tracking

The frontend polls the job status endpoint every 2 seconds while transcription runs, updating a progress banner:

JavaScript - Job Polling
class JobTracker {
    constructor() {
        this.jobId = null;
        this.pollInterval = null;
    }
    
    startTracking(jobId) {
        this.jobId = jobId;
        this.showBanner();
        this.startPolling();
    }
    
    startPolling() {
        this.pollInterval = setInterval(() => {
            fetch(`/api/jobs/${this.jobId}`)
                .then(r => r.json())
                .then(jobInfo => {
                    this.updateBanner(jobInfo);
                    
                    if (jobInfo.status === 'completed' 
                        || jobInfo.status === 'failed') {
                        this.stopTracking();
                        location.reload(); # Refresh to show results
                    }
                });
        }, 2000); # Poll every 2 seconds
    }
    
    updateBanner(jobInfo) {
        const progress = jobInfo.progress || 0;
        const html = `
            <div class="job-banner">
                <div class="progress-bar">
                    <div class="progress-fill" 
                         style="width: ${progress}%"></div>
                </div>
                <p>Processing... 
                   ${jobInfo.processed_items}/${jobInfo.total_items} 
                   (${progress}%)</p>
            </div>
        `;
        document.getElementById('job-status').innerHTML = html;
    }
    
    stopTracking() {
        clearInterval(this.pollInterval);
        this.jobId = null;
    }
}

Deployment Architecture

The complete stack is designed for simplicity and reliability:

Component Technology Purpose
Web Framework Flask 3.0+ Lightweight, minimal overhead
Speech Recognition OpenAI Whisper State-of-the-art ASR, runs locally
Audio Processing FFmpeg Format conversion, chunking
Data Storage CSV + JSON Simple, versionable, zero DB setup
Job Orchestration Python Threading Built-in, no external broker needed
Session Tracking Browser localStorage Client-side, survives refresh

Production Deployment

For production, deploy with Gunicorn behind Nginx:

Bash - Production Deployment
# 1. Install dependencies
pip install -r requirements.txt
pip install gunicorn

# 2. Start Gunicorn with 4 workers
gunicorn -w 4 -b 0.0.0.0:5000 --timeout 300 app:app

# 3. Nginx configuration (reverse proxy)
server {
    listen 80;
    server_name yourdomain.com;
    
    location / {
        proxy_pass http://127.0.0.1:5000;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        
        # Allow large file uploads
        client_max_body_size 500M;
        proxy_request_buffering off;
        proxy_buffering off;
    }
}

Performance Characteristics

Real-world measurements from production deployment:

Operation Time Scalability
Audio Upload (100MB) 3-5 seconds Streamed, not buffered
Whisper Inference (30s segment) 2-5 seconds Depends on model; tiny is 2s, small is 5s
CSV Save (10K records) < 100ms O(n) with thread lock
Page Load (empty session) < 500ms Fast, no database queries
1000 File Transcription ~2 hours (single GPU) Linear with file count

Lessons Learned

1. Simplicity > Perfection

I initially designed with database backends and complex state machines. Realizing 80% of needs are met by CSV + JSON dramatically simplified the entire system. No migrations, no schema changes, no query optimization needed.

2. Threading is Underrated for Single-Server Deployments

Task queues solve real problems at scale, but for teams up to ~50 users, threading handles background jobs elegantly without operational overhead.

3. Client-Side Session Tracking is Powerful

localStorage eliminates the need for a per-user backend session store. Users get persistent progress tracking without server state.

4. Audio Streaming Matters

Loading 500MB files into memory causes crashes. FFmpeg's streaming approach with segment serving keeps memory usage constant regardless of file size.

What's Next?

Current version uses threading with JSON persistence. Future enhancements could include:

Try It Yourself

The complete system is open source and production-ready. You can:

"Building annotation tools forces you to think about real-world constraints: large files, long-running jobs, session state, and user experience. The solutions you develop generalize to many web applications."

Resources & Links

Tags:

#Flask #AudioProcessing #BackgroundJobs #Python #Whisper #FFmpeg #WebDevelopment #SpeechRecognition