#!/usr/bin/env python3 """ Real-time transcription of Windows speaker output using loopback capture. Captures system audio and transcribes with Whisper in near real-time. """ import sounddevice as sd import numpy as np import threading import queue import time import os import argparse import json from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed # Whisper transcription (using faster-whisper for optimal performance) from faster_whisper import WhisperModel # Sentence extraction for stitching chunks from sentence_extractor import SentenceExtractor, SentenceCleaner # LLM integration (optional) try: import ollama OLLAMA_AVAILABLE = True except ImportError: OLLAMA_AVAILABLE = False class WindowsLoopbackAudioCapture: """Capture Windows speaker output using WASAPI loopback""" def __init__(self, device_name=None, sample_rate=16000, chunk_size=2048): self.sample_rate = sample_rate self.chunk_size = chunk_size # Find loopback device self.device_info = self._find_loopback_device(device_name) if not self.device_info: raise RuntimeError( "No loopback device found.\n" "1. Ensure your speakers/headphones are connected\n" "2. Enable 'Stereo Mix' in Sound settings\n" "3. Or install VB-Cable virtual audio device" ) print(f"✓ Using device: {self.device_info['name']} (index {self.device_info['index']})") # Queue for audio data self.audio_queue = queue.Queue() self.stop_event = threading.Event() # Start the stream try: self.stream = sd.InputStream( device=self.device_info['index'], channels=1, samplerate=sample_rate, blocksize=chunk_size, dtype='int16', latency='low', callback=self._audio_callback ) self.stream.start() print("✓ Audio capture stream started") except Exception as e: raise RuntimeError(f"Failed to start audio stream: {e}") def _find_loopback_device(self, device_name): """Find the speaker device with loopback capability""" devices = sd.query_devices() # If device name specified, find exact match if device_name: for dev in devices: if (device_name.lower() in dev['name'].lower() and dev['max_input_channels'] > 0): return dev # Auto-detect: look for WASAPI speakers/headphones (Windows) for dev in devices: if (dev['max_input_channels'] > 0 and any(x in dev['name'] for x in ['Speakers', 'Headphones', 'Output'])): return dev # Fallback: Stereo Mix or similar (Windows) for dev in devices: if 'Stereo Mix' in dev['name']: return dev # Linux fallback: use default input device (pipewire/pulse) try: default_input_idx = sd.default.device[0] # Default input device if default_input_idx is not None: dev = devices[default_input_idx] if dev['max_input_channels'] > 0: print("⚠️ Note: Using default input device (microphone). For speaker capture on Linux, use transcribe_dual_linux.py") return dev except: pass return None def _audio_callback(self, indata, frames, time_info, status): """Callback for audio data""" if status: print(f"⚠ Audio status: {status}") self.audio_queue.put(indata.copy()) def read_chunk(self): """Read audio data from queue""" try: return self.audio_queue.get(timeout=0.05).flatten() except queue.Empty: return None def close(self): """Cleanup resources""" if hasattr(self, 'stream'): self.stream.stop() self.stream.close() class WhisperStreamTranscriber: """Process audio chunks with Whisper/faster-whisper""" def __init__(self, model_name="base", language="en", force_cpu=False, device_index=0): print(f"Loading Whisper model '{model_name}'...") # Check for CUDA availability import torch has_cuda = torch.cuda.is_available() and not force_cpu # Force CPU if CUDA libraries incompatible device = "cpu" compute_type = "int8" if has_cuda: try: # Test if CTranslate2 can actually use CUDA import ctranslate2 cuda_count = ctranslate2.get_cuda_device_count() if cuda_count > 0: # Validate device index if device_index >= cuda_count: print(f"⚠️ GPU index {device_index} not available. Found {cuda_count} GPU(s). Using GPU 0.") device_index = 0 # CTranslate2 uses "cuda" + device_index parameter, not "cuda:N" device = "cuda" compute_type = "float16" print(f"Using device: cuda:{device_index} ({torch.cuda.get_device_name(device_index)})") else: print(f"CUDA available in PyTorch but not in CTranslate2. Using CPU.") device = "cpu" compute_type = "int8" except Exception as e: print(f"CUDA libraries not found ({e}). Using CPU.") device = "cpu" compute_type = "int8" else: print("Using device: cpu") # FASTER-WHISPER (recommended): model_kwargs = { "device": device, "compute_type": compute_type } if device == "cuda": model_kwargs["device_index"] = device_index elif device == "cpu": model_kwargs["cpu_threads"] = 4 self.model = WhisperModel(model_name, **model_kwargs) self.language = language self.audio_buffer = np.array([], dtype=np.float32) self.lock = threading.Lock() def add_audio(self, audio_chunk): """Add new audio data to buffer""" with self.lock: audio_float = audio_chunk.astype(np.float32) / 32768.0 self.audio_buffer = np.concatenate([self.audio_buffer, audio_float]) def transcribe_chunk(self, min_duration=5.0, fast_mode=False): """Transcribe accumulated audio if enough duration""" with self.lock: duration = len(self.audio_buffer) / 16000 if duration < min_duration: return None audio_to_process = self.audio_buffer.copy() self.audio_buffer = np.array([], dtype=np.float32) # Process with FASTER-WHISPER: try: # Optimize parameters for speed vs accuracy if fast_mode: # Fast mode: lower beam size, no VAD segments, _ = self.model.transcribe( audio_to_process, language=self.language, beam_size=1, # Greedy decoding (fastest) best_of=1, temperature=0.0, vad_filter=False, word_timestamps=False ) else: # Balanced mode: moderate beam size with VAD segments, _ = self.model.transcribe( audio_to_process, language=self.language, beam_size=3, # Reduced from 5 vad_filter=True, vad_parameters=dict(min_silence_duration_ms=500), word_timestamps=False ) text = " ".join([segment.text for segment in segments]).strip() return text if text else None except Exception as e: print(f"❌ Transcription error: {e}") return None class LocalLLMAnalyzer: """Local LLM for fact-checking and question generation using Ollama""" def __init__(self, model="llama3.2", debug=False): if not OLLAMA_AVAILABLE: raise RuntimeError( "Ollama package not installed.\n" "Install with: pip install ollama" ) self.model = model self.debug = debug self._test_connection() def _test_connection(self): """Test connection to Ollama service""" try: ollama.list() print(f"✓ Ollama connected using model: {self.model}") except Exception as e: raise RuntimeError( f"Cannot connect to Ollama. Ensure it's installed and running.\n" f"Error: {e}\n" f"Install from: https://ollama.ai\n" f"Then run: ollama pull {self.model}" ) def _extract_json(self, text): """Extract JSON from text that might contain markdown or other formatting""" # Try to find JSON block in markdown code fence import re json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', text, re.DOTALL) if json_match: return json_match.group(1) # Try to find raw JSON object json_match = re.search(r'\{.*\}', text, re.DOTALL) if json_match: return json_match.group(0) return text def fact_check(self, text, context=""): """Analyze text for factual accuracy""" # Try simple structured format first prompt = f"""Analyze this for accuracy. Reply in this exact format: VERDICT: [factual/dubious/not_factual] CONFIDENCE: [0.0-1.0] EXPLANATION: [one sentence] Statement: "{text}" """ try: response = ollama.generate( model=self.model, prompt=prompt, options={"temperature": 0.1, "num_predict": 250} ) response_text = response['response'].strip() if self.debug: print(f"\n[DEBUG] Fact-check response:\n{response_text}\n") # Try to parse structured text format verdict = "dubious" confidence = 0.5 explanation = "No explanation provided" # Extract VERDICT import re verdict_match = re.search(r'VERDICT:\s*(\w+)', response_text, re.IGNORECASE) if verdict_match: verdict = verdict_match.group(1).lower() # Extract CONFIDENCE conf_match = re.search(r'CONFIDENCE:\s*([\d.]+)', response_text, re.IGNORECASE) if conf_match: try: confidence = float(conf_match.group(1)) confidence = max(0.0, min(1.0, confidence)) # Clamp to 0-1 except ValueError: pass # Extract EXPLANATION expl_match = re.search(r'EXPLANATION:\s*(.+?)(?:\n\n|\Z)', response_text, re.IGNORECASE | re.DOTALL) if expl_match: explanation = expl_match.group(1).strip() # Clean up incomplete sentences if explanation and not explanation[-1] in '.!?': # Try to find last complete sentence last_period = max(explanation.rfind('.'), explanation.rfind('!'), explanation.rfind('?')) if last_period > 20: # Keep at least some text explanation = explanation[:last_period + 1] return { "verdict": verdict, "confidence": confidence, "explanation": explanation[:250] if explanation else "Analysis completed", "sources": [], "corrections": "" } except Exception as e: if self.debug: print(f"[DEBUG] Fact-check error: {e}") return { "verdict": "error", "confidence": 0.0, "explanation": f"Analysis failed: {str(e)}", "sources": [], "corrections": "" } def generate_augmenting_questions(self, text, context=""): """Generate insightful questions based on the text""" prompt = f"""Generate 3 questions about this. Reply in this exact format: Q1: [question] Q2: [question] Q3: [question] Statement: "{text}" """ try: response = ollama.generate( model=self.model, prompt=prompt, options={"temperature": 0.7, "num_predict": 250} ) response_text = response['response'].strip() if self.debug: print(f"\n[DEBUG] Questions response:\n{response_text}\n") # Extract questions import re questions = [] for i in range(1, 4): q_match = re.search(rf'Q{i}:\s*(.+?)(?:\n|$)', response_text, re.IGNORECASE) if q_match: question = q_match.group(1).strip() # Clean up incomplete questions if question and not question[-1] in '?': # Try to find last complete question last_q = question.rfind('?') if last_q > 10: question = question[:last_q + 1] else: question = question + "?" questions.append(question) # If we couldn't parse, try to split by newlines and take first 3 non-empty lines if len(questions) < 3: lines = [line.strip() for line in response_text.split('\n') if line.strip()] # Filter out lines that look like question markers lines = [l for l in lines if not re.match(r'^Q\d+:?\s*$', l)] for line in lines[:3]: if not line.endswith('?'): line = line + "?" questions.append(line) # Ensure we have exactly 3 questions default_questions = [ "What are the key points here?", "What evidence supports this?", "What are the implications?" ] while len(questions) < 3: questions.append(default_questions[len(questions)]) return { "questions": questions[:3], "topics": [] } except Exception as e: if self.debug: print(f"[DEBUG] Questions error: {e}") return { "questions": [ "What are the key points?", "What supports this claim?", "What are the implications?" ], "topics": [] } def list_audio_devices(): """Print all available audio input devices""" print("\nAvailable audio capture devices:") devices = sd.query_devices() for i, dev in enumerate(devices): if dev['max_input_channels'] > 0: print(f" [{i}] {dev['name']}") print(f" Channels: {dev['max_input_channels']} | Sample Rate: {dev['default_samplerate']}") print() def save_transcript(text, timestamp, filename): """Append transcript to file""" os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True) with open(filename, "a", encoding="utf-8") as f: f.write(f"[{timestamp}] {text}\n") def save_enriched_transcript(data, filename): """Save enriched transcript with LLM analysis""" os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True) with open(filename, "a", encoding="utf-8") as f: f.write(f"\n{'='*70}\n") f.write(f"[{data['timestamp']}] {data['text']}\n\n") if 'fact_check' in data: fc = data['fact_check'] f.write(f"📊 Fact Check: {fc.get('verdict', 'N/A').upper()} " f"(confidence: {fc.get('confidence', 0):.2f})\n") f.write(f"💡 {fc.get('explanation', 'N/A')}\n") if fc.get('corrections'): f.write(f"✏️ Correction: {fc['corrections']}\n") f.write("\n") if 'questions' in data and data['questions'].get('questions'): f.write("❓ Questions:\n") for i, q in enumerate(data['questions']['questions'], 1): f.write(f"{i}. {q}\n") f.write("\n") def display_enriched_output(text, timestamp, fact_check=None, questions=None): """Display transcript with LLM analysis""" print(f"\n[{timestamp}] {text}") if fact_check: verdict_emoji = { 'factual': '✅', 'dubious': '⚠️', 'not_factual': '❌', 'error': '⚠️' } emoji = verdict_emoji.get(fact_check.get('verdict', 'error'), '❓') print(f"\n{emoji} Fact Check: {fact_check.get('verdict', 'N/A').upper()} " f"(confidence: {fact_check.get('confidence', 0):.2f})") print(f"💡 {fact_check.get('explanation', 'N/A')}") if fact_check.get('corrections'): print(f"✏️ Correction: {fact_check['corrections']}") if questions and questions.get('questions'): print(f"\n❓ Questions:") for i, q in enumerate(questions['questions'], 1): print(f" {i}. {q}") def main(): parser = argparse.ArgumentParser( description="Real-time transcription of Windows speaker output", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python transcribe_speakers.py python transcribe_speakers.py --model small --language es --interval 5 python transcribe_speakers.py --device "Speakers" --output "meeting.txt" python transcribe_speakers.py --model medium --interval 10 --output transcripts/live.txt """ ) parser.add_argument("--model", default="base", choices=["tiny", "base", "small", "medium", "large"], help="Whisper model size (default: base)") parser.add_argument("--language", default="en", help="Language code (default: en)") parser.add_argument("--device", metavar="NAME", help="Audio device name (partial match). If not specified, auto-detects") parser.add_argument("--interval", type=float, default=8.0, help="Processing interval in seconds (default: 8.0)") parser.add_argument("--min-duration", type=float, default=3.0, help="Minimum audio duration before transcription (default: 3.0)") parser.add_argument("--fast-mode", action="store_true", help="Enable fast mode (lower accuracy, faster transcription)") parser.add_argument("--output", "-o", metavar="FILE", help="Save transcript to file (e.g., transcript.txt)") parser.add_argument("--list-devices", action="store_true", help="List all available audio devices and exit") parser.add_argument("--force-cpu", action="store_true", help="Force CPU processing (disable GPU acceleration)") parser.add_argument("--gpu-index", type=int, default=0, help="GPU device index to use (default: 0)") parser.add_argument("--enable-llm", action="store_true", help="Enable LLM analysis (fact-checking and questions)") parser.add_argument("--llm-model", default="llama3.2:latest", help="Ollama model to use for LLM analysis (default: llama3.2:latest)") parser.add_argument("--llm-debug", action="store_true", help="Show LLM raw responses for debugging") parser.add_argument("--sentence-mode", action="store_true", help="Extract complete sentences by stitching chunks together") args = parser.parse_args() if args.list_devices: list_audio_devices() return print("=== Windows Real-Time Audio Transcription ===") print(f"Model: {args.model} | Language: {args.language} | Interval: {args.interval}s") if args.output: print(f"Output: {args.output}") if args.enable_llm: print(f"LLM Analysis: Enabled ({args.llm_model})") if args.sentence_mode: print(f"Sentence Mode: Enabled (stitching chunks into complete sentences)") # Initialize audio capture try: capturer = WindowsLoopbackAudioCapture( device_name=args.device, sample_rate=16000, chunk_size=2048 ) except RuntimeError as e: print(f"\n❌ Audio Error: {e}") print("\nTo fix this:") print("1. Right-click speaker icon → Sounds → Recording tab") print("2. Right-click in empty area → Show Disabled Devices") print("3. Enable 'Stereo Mix' → Set as Default Device") print("\nAlternative: Install VB-Cable (free) from vb-audio.com") print(" Then use: --device 'CABLE Output'") list_audio_devices() return # Initialize transcriber try: transcriber = WhisperStreamTranscriber( model_name=args.model, language=args.language, force_cpu=args.force_cpu, device_index=args.gpu_index ) except Exception as e: print(f"\n❌ Model Error: {e}") print("Make sure you installed Whisper correctly") return # Initialize LLM analyzer (optional) llm_analyzer = None if args.enable_llm: try: llm_analyzer = LocalLLMAnalyzer(model=args.llm_model, debug=args.llm_debug) except RuntimeError as e: print(f"\n❌ LLM Error: {e}") print("Continuing without LLM analysis...") llm_analyzer = None # Initialize sentence extractor (optional) sentence_extractor = None sentence_cleaner = None if args.sentence_mode: sentence_extractor = SentenceExtractor(max_buffer_words=150) sentence_cleaner = SentenceCleaner() print("✓ Sentence extraction initialized") # Main processing loop print(f"\n✅ Started transcription. Press Ctrl+C to stop.\n{'=' * 50}") last_process_time = time.time() total_duration = 0 segment_count = 0 # Thread pool for concurrent LLM processing llm_executor = ThreadPoolExecutor(max_workers=2) if llm_analyzer else None pending_llm_tasks = {} # Maps segment_count -> future try: while True: # Collect audio chunk = capturer.read_chunk() if chunk is not None: transcriber.add_audio(chunk) total_duration += len(chunk) / 16000 # Process at intervals current_time = time.time() if current_time - last_process_time >= args.interval: text = transcriber.transcribe_chunk( min_duration=args.min_duration, fast_mode=args.fast_mode ) if text: segment_count += 1 timestamp = datetime.now().strftime("%H:%M:%S") # Sentence extraction mode if sentence_extractor: # Add chunk to extractor and get complete sentences sentences = sentence_extractor.add_chunk(text) for sentence in sentences: # Clean the sentence cleaned = sentence_cleaner.clean(sentence) if sentence_cleaner else sentence if cleaned: print(f"[{timestamp}] 📝 {cleaned}") # Save individual sentences if args.output and not llm_analyzer: save_transcript(cleaned, timestamp, args.output) # LLM analysis on complete sentences if llm_analyzer: context = f"Sentence from segment {segment_count}" def run_llm_analysis(txt, ctx, ts, seg_num): fc = llm_analyzer.fact_check(txt, ctx) qs = llm_analyzer.generate_augmenting_questions(txt, ctx) return { 'timestamp': ts, 'text': txt, 'segment_count': seg_num, 'fact_check': fc, 'questions': qs } future = llm_executor.submit(run_llm_analysis, cleaned, context, timestamp, segment_count) pending_llm_tasks[segment_count] = future else: # Standard mode: display chunks as-is # Display transcription immediately (don't wait for LLM) print(f"[{timestamp}] {text}") # LLM Analysis (run concurrently in background) - only in non-sentence mode if llm_analyzer and not sentence_extractor: context = f"Segment {segment_count}" # Submit LLM tasks to thread pool def run_llm_analysis(txt, ctx, ts, seg_num): fc = llm_analyzer.fact_check(txt, ctx) qs = llm_analyzer.generate_augmenting_questions(txt, ctx) return { 'timestamp': ts, 'text': txt, 'segment_count': seg_num, 'fact_check': fc, 'questions': qs } future = llm_executor.submit(run_llm_analysis, text, context, timestamp, segment_count) pending_llm_tasks[segment_count] = future else: # Save transcript immediately without LLM if args.output: save_transcript(text, timestamp, args.output) last_process_time = current_time # Check for completed LLM tasks (non-blocking) if llm_analyzer: completed_segments = [] for seg_num, future in pending_llm_tasks.items(): if future.done(): try: result = future.result() # Display enriched output display_enriched_output( result['text'], result['timestamp'], result['fact_check'], result['questions'] ) # Save enriched output if args.output: save_enriched_transcript(result, args.output) completed_segments.append(seg_num) except Exception as e: print(f"⚠️ LLM processing error for segment {seg_num}: {e}") completed_segments.append(seg_num) # Remove completed tasks for seg_num in completed_segments: del pending_llm_tasks[seg_num] except KeyboardInterrupt: print(f"\n{'=' * 50}\n🛑 Stopping transcription...") # Wait for pending LLM tasks to complete if llm_analyzer and pending_llm_tasks: print(f"\n⏳ Waiting for {len(pending_llm_tasks)} pending LLM tasks to complete...") for seg_num, future in pending_llm_tasks.items(): try: result = future.result(timeout=30) display_enriched_output( result['text'], result['timestamp'], result['fact_check'], result['questions'] ) if args.output: save_enriched_transcript(result, args.output) except Exception as e: print(f"⚠️ LLM task {seg_num} failed: {e}") # Shutdown executor if llm_executor: llm_executor.shutdown(wait=True) # Cleanup capturer.close() # Flush sentence buffer if in sentence mode if sentence_extractor: print("\n📝 Flushing sentence buffer...") final_sentences = sentence_extractor.flush() for sentence in final_sentences: cleaned = sentence_cleaner.clean(sentence) if sentence_cleaner else sentence if cleaned: timestamp = datetime.now().strftime("%H:%M:%S") print(f"[{timestamp}] 📝 {cleaned}") if args.output and not llm_analyzer: save_transcript(cleaned, timestamp, args.output) # LLM analysis for flushed sentences if llm_analyzer: fact_check = llm_analyzer.fact_check(cleaned, "Final sentence") questions = llm_analyzer.generate_augmenting_questions(cleaned) display_enriched_output(cleaned, timestamp, fact_check, questions) if args.output: data = { 'timestamp': timestamp, 'text': cleaned, 'fact_check': fact_check, 'questions': questions } save_enriched_transcript(data, args.output) # Process remaining audio print("\nProcessing remaining audio...") final_text = transcriber.transcribe_chunk(min_duration=0) if final_text: timestamp = datetime.now().strftime("%H:%M:%S") print(f"[{timestamp}] {final_text}") # LLM Analysis for final segment (synchronous since we're shutting down) if llm_analyzer: fact_check = llm_analyzer.fact_check(final_text, "Final segment") questions = llm_analyzer.generate_augmenting_questions(final_text) display_enriched_output(final_text, timestamp, fact_check, questions) if args.output: data = { 'timestamp': timestamp, 'text': final_text, 'fact_check': fact_check, 'questions': questions } save_enriched_transcript(data, args.output) else: if args.output: save_transcript(final_text, timestamp, args.output) # Summary print(f"\n✅ Complete! Processed {total_duration:.1f}s of audio") print(f" Generated {segment_count} transcript segments") if args.output and os.path.exists(args.output): abs_path = os.path.abspath(args.output) print(f"💾 Transcript saved to: {abs_path}") if __name__ == "__main__": main()