From 918e96ad216ca481c3a282e74d7a26b8272e701b Mon Sep 17 00:00:00 2001 From: mike Date: Wed, 17 Dec 2025 16:30:46 +0100 Subject: [PATCH] init --- .gitignore | 1 + .idea/.gitignore | 10 - .idea/go.imports.xml | 11 - .idea/inspectionProfiles/Project_Default.xml | 621 ------------------ .idea/material_theme_project_new.xml | 17 - .idea/misc.xml | 9 - .idea/modules.xml | 8 - .idea/vcs.xml | 6 - README.md | 20 +- sentence_extractor.py | 260 ++++++++ transcribe.iml | 1 + transcribe_No_llm.py | 596 ----------------- transcribe_speakers.py | 113 +++- transcribe_speakers_llm.py | 636 ------------------- 14 files changed, 358 insertions(+), 1951 deletions(-) delete mode 100644 .idea/.gitignore delete mode 100644 .idea/go.imports.xml delete mode 100644 .idea/inspectionProfiles/Project_Default.xml delete mode 100644 .idea/material_theme_project_new.xml delete mode 100644 .idea/misc.xml delete mode 100644 .idea/modules.xml delete mode 100644 .idea/vcs.xml create mode 100644 sentence_extractor.py delete mode 100755 transcribe_No_llm.py delete mode 100755 transcribe_speakers_llm.py diff --git a/.gitignore b/.gitignore index 1612ed8..d086f68 100755 --- a/.gitignore +++ b/.gitignore @@ -303,3 +303,4 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ +.idea/ diff --git a/.idea/.gitignore b/.idea/.gitignore deleted file mode 100644 index ab1f416..0000000 --- a/.idea/.gitignore +++ /dev/null @@ -1,10 +0,0 @@ -# Default ignored files -/shelf/ -/workspace.xml -# Ignored default folder with query files -/queries/ -# Datasource local storage ignored files -/dataSources/ -/dataSources.local.xml -# Editor-based HTTP Client requests -/httpRequests/ diff --git a/.idea/go.imports.xml b/.idea/go.imports.xml deleted file mode 100644 index d7202f0..0000000 --- a/.idea/go.imports.xml +++ /dev/null @@ -1,11 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml deleted file mode 100644 index e1e41cb..0000000 --- a/.idea/inspectionProfiles/Project_Default.xml +++ /dev/null @@ -1,621 +0,0 @@ - - - - \ No newline at end of file diff --git a/.idea/material_theme_project_new.xml b/.idea/material_theme_project_new.xml deleted file mode 100644 index 7b10447..0000000 --- a/.idea/material_theme_project_new.xml +++ /dev/null @@ -1,17 +0,0 @@ - - - - - - - \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml deleted file mode 100644 index 88d9a0d..0000000 --- a/.idea/misc.xml +++ /dev/null @@ -1,9 +0,0 @@ - - - - - - - - - \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml deleted file mode 100644 index f95f667..0000000 --- a/.idea/modules.xml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml deleted file mode 100644 index 94a25f7..0000000 --- a/.idea/vcs.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/README.md b/README.md index ab61283..6d1c5ef 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ Real-time audio transcription using Whisper AI with optional LLM-powered analysi - Real-time transcription of system audio (Windows/Linux) - Multiple Whisper model sizes (tiny to large) - Multi-language support +- **Sentence extraction mode** - Stitches audio chunks into complete sentences - Optional LLM analysis for fact-checking and question generation (via Ollama) - GPU acceleration support - Flexible audio device configuration @@ -17,12 +18,15 @@ Real-time audio transcription using Whisper AI with optional LLM-powered analysi # Install dependencies pip install -r requirements.txt -# Basic transcription +# Basic transcription (no LLM) python transcribe_speakers.py -# With LLM analysis +# With LLM analysis (optional) python transcribe_speakers.py --enable-llm +# With sentence extraction +python transcribe_speakers.py --sentence-mode + # List audio devices python transcribe_speakers.py --list-devices ``` @@ -80,9 +84,7 @@ ollama pull llama3.2 ### Available Scripts -- `transcribe_speakers.py` - Main script with all features -- `transcribe_speakers_llm.py` - LLM-enabled version -- `transcribe_No_llm.py` - Basic version without LLM support +- `transcribe_speakers.py` - Main script with all features (LLM optional via `--enable-llm`) - `transcribe_dual_linux.py` - Linux-specific with dual audio support ### Common Commands @@ -97,8 +99,11 @@ python transcribe_speakers.py --language es --output transcript.txt # Fast mode (low latency) python transcribe_speakers.py --fast-mode --model tiny --interval 3 -# Maximum accuracy with LLM -python transcribe_speakers.py --model large --enable-llm --output enriched.txt +# Extract complete sentences from chunks +python transcribe_speakers.py --sentence-mode --output sentences.txt + +# Maximum accuracy with LLM and sentence extraction +python transcribe_speakers.py --model large --enable-llm --sentence-mode --output enriched.txt # Force CPU (avoid GPU issues) python transcribe_speakers.py --force-cpu @@ -119,6 +124,7 @@ python transcribe_speakers.py --force-cpu | `--output` | Save to file | None | | `--force-cpu` | Disable GPU | False | | `--gpu-index` | GPU device index | 0 | +| `--sentence-mode` | Extract complete sentences from chunks | False | ## Model Performance diff --git a/sentence_extractor.py b/sentence_extractor.py new file mode 100644 index 0000000..06fcb85 --- /dev/null +++ b/sentence_extractor.py @@ -0,0 +1,260 @@ +""" +Sentence extraction from chunked transcriptions. +Stitches partial chunks together and extracts complete sentences. +""" + +import re +from typing import List, Tuple, Optional +from collections import deque + + +class SentenceExtractor: + """ + Buffers transcription chunks and extracts complete sentences. + Handles sentence boundaries that span across audio chunks. + """ + + def __init__(self, max_buffer_words=200): + """ + Initialize the sentence extractor. + + Args: + max_buffer_words: Maximum words to keep in buffer before forcing extraction + """ + self.buffer = "" + self.max_buffer_words = max_buffer_words + self.completed_sentences = deque() + + # Sentence boundary patterns + self.sentence_end_pattern = re.compile(r'([.!?]+)\s+') + self.sentence_boundaries = re.compile(r'(?<=[.!?])\s+(?=[A-Z])') + + def add_chunk(self, text: str) -> List[str]: + """ + Add a new transcription chunk and extract any complete sentences. + + Args: + text: New transcription text chunk + + Returns: + List of complete sentences extracted + """ + if not text or not text.strip(): + return [] + + # Add to buffer + if self.buffer: + # Smart joining: check if we need a space + if not self.buffer[-1].isspace() and not text[0].isspace(): + self.buffer += " " + self.buffer += text.strip() + + # Extract complete sentences + sentences = self._extract_sentences() + + # Check if buffer is too large + word_count = len(self.buffer.split()) + if word_count > self.max_buffer_words: + # Force extraction of what we have + forced = self._force_extract() + if forced: + sentences.extend(forced) + + return sentences + + def _extract_sentences(self) -> List[str]: + """ + Extract complete sentences from buffer. + Keeps incomplete sentence in buffer. + + Returns: + List of complete sentences + """ + sentences = [] + + # Find sentence boundaries + # Pattern: sentence ending punctuation followed by space and capital letter + # or sentence ending at punctuation before end of buffer + parts = self.sentence_boundaries.split(self.buffer) + + if len(parts) > 1: + # We have complete sentences + # Keep the last part (incomplete sentence) in buffer + sentences = [s.strip() for s in parts[:-1] if s.strip()] + self.buffer = parts[-1].strip() + + return sentences + + def _force_extract(self) -> List[str]: + """ + Force extraction when buffer is too large. + Tries to break at reasonable points. + + Returns: + List of extracted text segments + """ + # Try to find the last sentence-like boundary + last_period = max( + self.buffer.rfind('. '), + self.buffer.rfind('! '), + self.buffer.rfind('? ') + ) + + if last_period > 0: + # Extract up to last period + extracted = self.buffer[:last_period + 1].strip() + self.buffer = self.buffer[last_period + 1:].strip() + return [extracted] + else: + # No sentence boundary found, extract by word limit + words = self.buffer.split() + if len(words) > self.max_buffer_words: + # Take 80% of max_buffer_words + split_point = int(self.max_buffer_words * 0.8) + extracted = " ".join(words[:split_point]) + self.buffer = " ".join(words[split_point:]) + return [extracted + "..."] + + return [] + + def flush(self) -> List[str]: + """ + Flush remaining buffer and return as sentence(s). + Call this at end of transcription. + + Returns: + List of remaining text as sentences + """ + sentences = [] + + if self.buffer.strip(): + # Try to extract any remaining complete sentences first + extracted = self._extract_sentences() + sentences.extend(extracted) + + # Return remaining buffer if it has content + if self.buffer.strip(): + # Check if it ends with punctuation + if not self.buffer[-1] in '.!?': + self.buffer += "." + sentences.append(self.buffer.strip()) + self.buffer = "" + + return sentences + + def get_buffer_status(self) -> dict: + """ + Get current buffer status for debugging. + + Returns: + Dictionary with buffer stats + """ + return { + "buffer_length": len(self.buffer), + "buffer_words": len(self.buffer.split()) if self.buffer else 0, + "buffer_preview": self.buffer[:100] + "..." if len(self.buffer) > 100 else self.buffer + } + + +class SentenceCleaner: + """ + Cleans and normalizes extracted sentences. + Removes duplicates, fixes common transcription issues. + """ + + def __init__(self): + self.seen_sentences = set() + self.similarity_threshold = 0.85 + + def clean(self, sentence: str) -> Optional[str]: + """ + Clean and normalize a sentence. + + Args: + sentence: Raw sentence text + + Returns: + Cleaned sentence or None if should be filtered + """ + if not sentence or not sentence.strip(): + return None + + # Basic cleaning + cleaned = sentence.strip() + + # Remove multiple spaces + cleaned = re.sub(r'\s+', ' ', cleaned) + + # Fix spacing around punctuation + cleaned = re.sub(r'\s+([.!?,;:])', r'\1', cleaned) + + # Capitalize first letter + if cleaned and not cleaned[0].isupper(): + cleaned = cleaned[0].upper() + cleaned[1:] + + # Ensure ends with punctuation + if cleaned and not cleaned[-1] in '.!?': + cleaned += '.' + + # Filter very short sentences (likely fragments) + if len(cleaned.split()) < 3: + return None + + # Check for duplicates (exact) + if cleaned in self.seen_sentences: + return None + + self.seen_sentences.add(cleaned) + return cleaned + + def reset(self): + """Reset seen sentences cache.""" + self.seen_sentences.clear() + + +def demo(): + """Demo usage of sentence extractor.""" + extractor = SentenceExtractor() + cleaner = SentenceCleaner() + + # Simulate chunked transcription + chunks = [ + "Hello everyone welcome to", + "to this presentation today we will", + "will discuss the importance of AI. Artificial intelligence is", + "is transforming many industries. It helps us automate", + "automate tasks and make better decisions. What do you", + "you think about this technology? I believe it has", + "has great potential for the future." + ] + + print("=== Sentence Extraction Demo ===\n") + print("Input chunks:") + for i, chunk in enumerate(chunks, 1): + print(f" Chunk {i}: '{chunk}'") + + print("\n" + "="*50) + print("Extracted sentences:\n") + + for i, chunk in enumerate(chunks, 1): + sentences = extractor.add_chunk(chunk) + for sent in sentences: + cleaned = cleaner.clean(sent) + if cleaned: + print(f" [{i}] {cleaned}") + + # Flush remaining buffer + print("\nFlushing buffer...") + final_sentences = extractor.flush() + for sent in final_sentences: + cleaned = cleaner.clean(sent) + if cleaned: + print(f" [final] {cleaned}") + + print("\n" + "="*50) + print("Buffer status:") + print(extractor.get_buffer_status()) + + +if __name__ == "__main__": + demo() diff --git a/transcribe.iml b/transcribe.iml index 4382db5..50c95d3 100644 --- a/transcribe.iml +++ b/transcribe.iml @@ -7,6 +7,7 @@ + \ No newline at end of file diff --git a/transcribe_No_llm.py b/transcribe_No_llm.py deleted file mode 100755 index 634da43..0000000 --- a/transcribe_No_llm.py +++ /dev/null @@ -1,596 +0,0 @@ -#!/usr/bin/env python3 -""" -Real-time transcription of Windows speaker output using loopback capture. -Captures system audio and transcribes with Whisper in near real-time. -""" - -import sounddevice as sd -import numpy as np -import threading -import queue -import time -import os -import argparse -import json -from datetime import datetime - -# Choose your Whisper backend here: -# For faster-whisper (recommended): -from faster_whisper import WhisperModel - -# LLM integration -try: - import ollama - OLLAMA_AVAILABLE = True -except ImportError: - OLLAMA_AVAILABLE = False - - -# # For regular whisper (comment out the line above and uncomment these): -# import whisper - - -class WindowsLoopbackAudioCapture: - """Capture Windows speaker output using WASAPI loopback""" - - def __init__(self, device_name=None, sample_rate=16000, chunk_size=2048): - self.sample_rate = sample_rate - self.chunk_size = chunk_size - - # Find loopback device - self.device_info = self._find_loopback_device(device_name) - if not self.device_info: - raise RuntimeError( - "No loopback device found.\n" - "1. Ensure your speakers/headphones are connected\n" - "2. Enable 'Stereo Mix' in Sound settings\n" - "3. Or install VB-Cable virtual audio device" - ) - - print(f"✓ Using device: {self.device_info['name']} (index {self.device_info['index']})") - - # Queue for audio data - self.audio_queue = queue.Queue() - self.stop_event = threading.Event() - - # Start the stream - try: - self.stream = sd.InputStream( - device=self.device_info['index'], - channels=1, - samplerate=sample_rate, - blocksize=chunk_size, - dtype='int16', - latency='low', - callback=self._audio_callback - ) - self.stream.start() - print("✓ Audio capture stream started") - except Exception as e: - raise RuntimeError(f"Failed to start audio stream: {e}") - - def _find_loopback_device(self, device_name): - """Find the speaker device with loopback capability""" - devices = sd.query_devices() - - # If device name specified, find exact match - if device_name: - for dev in devices: - if (device_name.lower() in dev['name'].lower() and - dev['max_input_channels'] > 0): - return dev - - # Auto-detect: look for WASAPI speakers/headphones - for dev in devices: - if (dev['max_input_channels'] > 0 and - any(x in dev['name'] for x in ['Speakers', 'Headphones', 'Output'])): - return dev - - # Fallback: Stereo Mix or similar - for dev in devices: - if 'Stereo Mix' in dev['name']: - return dev - - return None - - def _audio_callback(self, indata, frames, time_info, status): - """Callback for audio data""" - if status: - print(f"⚠ Audio status: {status}") - self.audio_queue.put(indata.copy()) - - def read_chunk(self): - """Read audio data from queue""" - try: - return self.audio_queue.get(timeout=0.05).flatten() - except queue.Empty: - return None - - def close(self): - """Cleanup resources""" - if hasattr(self, 'stream'): - self.stream.stop() - self.stream.close() - - -class WhisperStreamTranscriber: - """Process audio chunks with Whisper/faster-whisper""" - - def __init__(self, model_name="base", language="en", force_cpu=False): - print(f"Loading Whisper model '{model_name}'...") - - # Check for CUDA availability - import torch - has_cuda = torch.cuda.is_available() and not force_cpu - - # Force CPU if CUDA libraries incompatible - device = "cpu" - compute_type = "int8" - - if has_cuda: - try: - # Test if CTranslate2 can actually use CUDA - import ctranslate2 - cuda_count = ctranslate2.get_cuda_device_count() - if cuda_count > 0: - device = "cuda" - compute_type = "float16" - print(f"Using device: cuda ({torch.cuda.get_device_name(0)})") - else: - print(f"CUDA available in PyTorch but not in CTranslate2. Using CPU.") - except Exception as e: - print(f"CUDA libraries not found ({e}). Using CPU.") - else: - print("Using device: cpu") - - # FASTER-WHISPER (recommended): - model_kwargs = { - "device": device, - "compute_type": compute_type - } - if not has_cuda: - model_kwargs["cpu_threads"] = 4 - - self.model = WhisperModel(model_name, **model_kwargs) - self.language = language - self.audio_buffer = np.array([], dtype=np.float32) - self.lock = threading.Lock() - - # # REGULAR WHISPER: - # self.model = whisper.load_model(model_name) - # self.language = language - # self.audio_buffer = np.array([], dtype=np.float32) - # self.lock = threading.Lock() - - def add_audio(self, audio_chunk): - """Add new audio data to buffer""" - with self.lock: - audio_float = audio_chunk.astype(np.float32) / 32768.0 - self.audio_buffer = np.concatenate([self.audio_buffer, audio_float]) - - def transcribe_chunk(self, min_duration=5.0): - """Transcribe accumulated audio if enough duration""" - with self.lock: - duration = len(self.audio_buffer) / 16000 - if duration < min_duration: - return None - - audio_to_process = self.audio_buffer.copy() - self.audio_buffer = np.array([], dtype=np.float32) - - # Process with FASTER-WHISPER: - try: - segments, _ = self.model.transcribe( - audio_to_process, - language=self.language, - beam_size=5, - vad_filter=True, - vad_parameters=dict(min_silence_duration_ms=500), - word_timestamps=False - ) - text = " ".join([segment.text for segment in segments]).strip() - return text if text else None - except Exception as e: - print(f"❌ Transcription error: {e}") - return None - - # # REGULAR WHISPER: - # try: - # result = self.model.transcribe( - # audio_to_process, - # language=self.language, - # task="transcribe", - # fp16=False - # ) - # return result["text"].strip() - # except Exception as e: - # print(f"❌ Transcription error: {e}") - # return None - - -class LocalLLMAnalyzer: - """Local LLM for fact-checking and question generation using Ollama""" - - def __init__(self, model="llama3.2"): - if not OLLAMA_AVAILABLE: - raise RuntimeError( - "Ollama package not installed.\n" - "Install with: pip install ollama" - ) - - self.model = model - self._test_connection() - - def _test_connection(self): - """Test connection to Ollama service""" - try: - ollama.list() - print(f"✓ Ollama connected using model: {self.model}") - except Exception as e: - raise RuntimeError( - f"Cannot connect to Ollama. Ensure it's installed and running.\n" - f"Error: {e}\n" - f"Install from: https://ollama.ai\n" - f"Then run: ollama pull {self.model}" - ) - - def _extract_json(self, text): - """Extract JSON from text that might contain markdown or other formatting""" - # Try to find JSON block in markdown code fence - import re - json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', text, re.DOTALL) - if json_match: - return json_match.group(1) - - # Try to find raw JSON object - json_match = re.search(r'\{.*\}', text, re.DOTALL) - if json_match: - return json_match.group(0) - - return text - - def fact_check(self, text, context=""): - """Analyze text for factual accuracy""" - prompt = f"""You are a fact-checking assistant. Analyze this statement for factual accuracy. - -Context: {context} -Statement: "{text}" - -You must respond with ONLY valid JSON in this exact format, no other text: -{{ - "verdict": "factual", - "confidence": 0.95, - "explanation": "Brief explanation here", - "sources": ["source1"], - "corrections": "" -}} - -Valid verdict values: "factual", "dubious", "not_factual" -Confidence must be a number between 0.0 and 1.0.""" - - try: - response = ollama.generate( - model=self.model, - prompt=prompt, - options={"temperature": 0.1, "num_predict": 200} - ) - - # Extract and parse JSON - response_text = response['response'] - json_text = self._extract_json(response_text) - result = json.loads(json_text) - - # Validate required fields - if 'verdict' not in result or 'confidence' not in result: - raise ValueError("Missing required fields") - - # Ensure defaults for optional fields - result.setdefault('explanation', 'No explanation provided') - result.setdefault('sources', []) - result.setdefault('corrections', '') - - return result - - except (json.JSONDecodeError, ValueError) as e: - # Return a simple analysis without JSON parsing - return { - "verdict": "dubious", - "confidence": 0.5, - "explanation": f"Could not parse LLM response properly. Model may need JSON format support.", - "sources": [], - "corrections": "" - } - except Exception as e: - return { - "verdict": "error", - "confidence": 0.0, - "explanation": f"Analysis failed: {str(e)}", - "sources": [], - "corrections": "" - } - - def generate_augmenting_questions(self, text, context=""): - """Generate insightful questions based on the text""" - prompt = f"""Based on this statement, generate 3 insightful questions that would help understand the topic better. - -Statement: "{text}" -Context: {context} - -Respond with JSON only: -{{ - "questions": ["Question 1", "Question 2", "Question 3"], - "topics": ["key_topic_1", "key_topic_2"] -}}""" - - try: - response = ollama.generate( - model=self.model, - prompt=prompt, - format="json", - options={"temperature": 0.7} - ) - return json.loads(response['response']) - except json.JSONDecodeError: - return { - "questions": ["Error: LLM response was not valid JSON"], - "topics": [] - } - except Exception as e: - return { - "questions": [f"Error: {str(e)}"], - "topics": [] - } - - -def list_audio_devices(): - """Print all available audio input devices""" - print("\nAvailable audio capture devices:") - devices = sd.query_devices() - for i, dev in enumerate(devices): - if dev['max_input_channels'] > 0: - print(f" [{i}] {dev['name']}") - print(f" Channels: {dev['max_input_channels']} | Sample Rate: {dev['default_samplerate']}") - print() - - -def save_transcript(text, timestamp, filename): - """Append transcript to file""" - os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True) - with open(filename, "a", encoding="utf-8") as f: - f.write(f"[{timestamp}] {text}\n") - - -def save_enriched_transcript(data, filename): - """Save enriched transcript with LLM analysis""" - os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True) - with open(filename, "a", encoding="utf-8") as f: - f.write(f"\n{'='*70}\n") - f.write(f"[{data['timestamp']}] {data['text']}\n\n") - - if 'fact_check' in data: - fc = data['fact_check'] - f.write(f"📊 Fact Check: {fc.get('verdict', 'N/A').upper()} " - f"(confidence: {fc.get('confidence', 0):.2f})\n") - f.write(f"💡 {fc.get('explanation', 'N/A')}\n") - if fc.get('corrections'): - f.write(f"✏️ Correction: {fc['corrections']}\n") - f.write("\n") - - if 'questions' in data and data['questions'].get('questions'): - f.write("❓ Questions:\n") - for i, q in enumerate(data['questions']['questions'], 1): - f.write(f"{i}. {q}\n") - f.write("\n") - - -def display_enriched_output(text, timestamp, fact_check=None, questions=None): - """Display transcript with LLM analysis""" - print(f"\n[{timestamp}] {text}") - - if fact_check: - verdict_emoji = { - 'factual': '✅', - 'dubious': '⚠️', - 'not_factual': '❌', - 'error': '⚠️' - } - emoji = verdict_emoji.get(fact_check.get('verdict', 'error'), '❓') - - print(f"\n{emoji} Fact Check: {fact_check.get('verdict', 'N/A').upper()} " - f"(confidence: {fact_check.get('confidence', 0):.2f})") - print(f"💡 {fact_check.get('explanation', 'N/A')}") - - if fact_check.get('corrections'): - print(f"✏️ Correction: {fact_check['corrections']}") - - if questions and questions.get('questions'): - print(f"\n❓ Questions:") - for i, q in enumerate(questions['questions'], 1): - print(f" {i}. {q}") - - -def main(): - parser = argparse.ArgumentParser( - description="Real-time transcription of Windows speaker output", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - python transcribe_speakers.py - python transcribe_speakers.py --model small --language es --interval 5 - python transcribe_speakers.py --device "Speakers" --output "meeting.txt" - python transcribe_speakers.py --model medium --interval 10 --output transcripts/live.txt - """ - ) - - parser.add_argument("--model", default="base", - choices=["tiny", "base", "small", "medium", "large"], - help="Whisper model size (default: base)") - parser.add_argument("--language", default="en", - help="Language code (default: en)") - parser.add_argument("--device", metavar="NAME", - help="Audio device name (partial match). If not specified, auto-detects") - parser.add_argument("--interval", type=float, default=8.0, - help="Processing interval in seconds (default: 8.0)") - parser.add_argument("--output", "-o", metavar="FILE", - help="Save transcript to file (e.g., transcript.txt)") - parser.add_argument("--list-devices", action="store_true", - help="List all available audio devices and exit") - parser.add_argument("--force-cpu", action="store_true", - help="Force CPU processing (disable GPU acceleration)") - parser.add_argument("--enable-llm", action="store_true", - help="Enable LLM analysis (fact-checking and questions)") - parser.add_argument("--llm-model", default="gpt-oss:20b", - help="Ollama model to use for LLM analysis (default: gpt-oss:20b)") - - args = parser.parse_args() - - if args.list_devices: - list_audio_devices() - return - - print("=== Windows Real-Time Audio Transcription ===") - print(f"Model: {args.model} | Language: {args.language} | Interval: {args.interval}s") - if args.output: - print(f"Output: {args.output}") - if args.enable_llm: - print(f"LLM Analysis: Enabled ({args.llm_model})") - - # Initialize audio capture - try: - capturer = WindowsLoopbackAudioCapture( - device_name=args.device, - sample_rate=16000, - chunk_size=2048 - ) - except RuntimeError as e: - print(f"\n❌ Audio Error: {e}") - print("\nTo fix this:") - print("1. Right-click speaker icon → Sounds → Recording tab") - print("2. Right-click in empty area → Show Disabled Devices") - print("3. Enable 'Stereo Mix' → Set as Default Device") - print("\nAlternative: Install VB-Cable (free) from vb-audio.com") - print(" Then use: --device 'CABLE Output'") - list_audio_devices() - return - - # Initialize transcriber - try: - transcriber = WhisperStreamTranscriber( - model_name=args.model, - language=args.language, - force_cpu=args.force_cpu - ) - except Exception as e: - print(f"\n❌ Model Error: {e}") - print("Make sure you installed Whisper correctly") - return - - # Initialize LLM analyzer (optional) - llm_analyzer = None - if args.enable_llm: - try: - llm_analyzer = LocalLLMAnalyzer(model=args.llm_model) - except RuntimeError as e: - print(f"\n❌ LLM Error: {e}") - print("Continuing without LLM analysis...") - llm_analyzer = None - - # Main processing loop - print(f"\n✅ Started transcription. Press Ctrl+C to stop.\n{'=' * 50}") - last_process_time = time.time() - total_duration = 0 - segment_count = 0 - - try: - while True: - # Collect audio - chunk = capturer.read_chunk() - if chunk is not None: - transcriber.add_audio(chunk) - total_duration += len(chunk) / 16000 - - # Process at intervals - current_time = time.time() - if current_time - last_process_time >= args.interval: - text = transcriber.transcribe_chunk() - if text: - segment_count += 1 - timestamp = datetime.now().strftime("%H:%M:%S") - - # LLM Analysis - fact_check = None - questions = None - if llm_analyzer: - context = f"Segment {segment_count}" - fact_check = llm_analyzer.fact_check(text, context) - questions = llm_analyzer.generate_augmenting_questions(text, context) - - # Display output - if llm_analyzer: - display_enriched_output(text, timestamp, fact_check, questions) - else: - print(f"[{timestamp}] {text}") - - # Save output - if args.output: - if llm_analyzer: - data = { - 'timestamp': timestamp, - 'text': text, - 'fact_check': fact_check, - 'questions': questions - } - save_enriched_transcript(data, args.output) - else: - save_transcript(text, timestamp, args.output) - - last_process_time = current_time - - except KeyboardInterrupt: - print(f"\n{'=' * 50}\n🛑 Stopping transcription...") - - # Cleanup - capturer.close() - - # Process remaining audio - print("\nProcessing remaining audio...") - final_text = transcriber.transcribe_chunk(min_duration=0) - if final_text: - timestamp = datetime.now().strftime("%H:%M:%S") - - # LLM Analysis for final segment - fact_check = None - questions = None - if llm_analyzer: - fact_check = llm_analyzer.fact_check(final_text, "Final segment") - questions = llm_analyzer.generate_augmenting_questions(final_text) - - # Display output - if llm_analyzer: - display_enriched_output(final_text, timestamp, fact_check, questions) - else: - print(f"[{timestamp}] {final_text}") - - # Save output - if args.output: - if llm_analyzer: - data = { - 'timestamp': timestamp, - 'text': final_text, - 'fact_check': fact_check, - 'questions': questions - } - save_enriched_transcript(data, args.output) - else: - save_transcript(final_text, timestamp, args.output) - - # Summary - print(f"\n✅ Complete! Processed {total_duration:.1f}s of audio") - print(f" Generated {segment_count} transcript segments") - if args.output and os.path.exists(args.output): - abs_path = os.path.abspath(args.output) - print(f"💾 Transcript saved to: {abs_path}") - - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/transcribe_speakers.py b/transcribe_speakers.py index a35d105..1157130 100755 --- a/transcribe_speakers.py +++ b/transcribe_speakers.py @@ -15,11 +15,13 @@ import json from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed -# Choose your Whisper backend here: -# For faster-whisper (recommended): +# Whisper transcription (using faster-whisper for optimal performance) from faster_whisper import WhisperModel -# LLM integration +# Sentence extraction for stitching chunks +from sentence_extractor import SentenceExtractor, SentenceCleaner + +# LLM integration (optional) try: import ollama OLLAMA_AVAILABLE = True @@ -27,10 +29,6 @@ except ImportError: OLLAMA_AVAILABLE = False -# # For regular whisper (comment out the line above and uncomment these): -# import whisper - - class WindowsLoopbackAudioCapture: """Capture Windows speaker output using WASAPI loopback""" @@ -170,12 +168,6 @@ class WhisperStreamTranscriber: self.audio_buffer = np.array([], dtype=np.float32) self.lock = threading.Lock() - # # REGULAR WHISPER: - # self.model = whisper.load_model(model_name) - # self.language = language - # self.audio_buffer = np.array([], dtype=np.float32) - # self.lock = threading.Lock() - def add_audio(self, audio_chunk): """Add new audio data to buffer""" with self.lock: @@ -222,19 +214,6 @@ class WhisperStreamTranscriber: print(f"❌ Transcription error: {e}") return None - # # REGULAR WHISPER: - # try: - # result = self.model.transcribe( - # audio_to_process, - # language=self.language, - # task="transcribe", - # fp16=False - # ) - # return result["text"].strip() - # except Exception as e: - # print(f"❌ Transcription error: {e}") - # return None - class LocalLLMAnalyzer: """Local LLM for fact-checking and question generation using Ollama""" @@ -536,6 +515,8 @@ Examples: help="Ollama model to use for LLM analysis (default: gpt-oss:20b)") parser.add_argument("--llm-debug", action="store_true", help="Show LLM raw responses for debugging") + parser.add_argument("--sentence-mode", action="store_true", + help="Extract complete sentences by stitching chunks together") args = parser.parse_args() @@ -549,6 +530,8 @@ Examples: print(f"Output: {args.output}") if args.enable_llm: print(f"LLM Analysis: Enabled ({args.llm_model})") + if args.sentence_mode: + print(f"Sentence Mode: Enabled (stitching chunks into complete sentences)") # Initialize audio capture try: @@ -591,6 +574,14 @@ Examples: print("Continuing without LLM analysis...") llm_analyzer = None + # Initialize sentence extractor (optional) + sentence_extractor = None + sentence_cleaner = None + if args.sentence_mode: + sentence_extractor = SentenceExtractor(max_buffer_words=150) + sentence_cleaner = SentenceCleaner() + print("✓ Sentence extraction initialized") + # Main processing loop print(f"\n✅ Started transcription. Press Ctrl+C to stop.\n{'=' * 50}") last_process_time = time.time() @@ -620,11 +611,45 @@ Examples: segment_count += 1 timestamp = datetime.now().strftime("%H:%M:%S") - # Display transcription immediately (don't wait for LLM) - print(f"[{timestamp}] {text}") + # Sentence extraction mode + if sentence_extractor: + # Add chunk to extractor and get complete sentences + sentences = sentence_extractor.add_chunk(text) - # LLM Analysis (run concurrently in background) - if llm_analyzer: + for sentence in sentences: + # Clean the sentence + cleaned = sentence_cleaner.clean(sentence) if sentence_cleaner else sentence + if cleaned: + print(f"[{timestamp}] 📝 {cleaned}") + + # Save individual sentences + if args.output and not llm_analyzer: + save_transcript(cleaned, timestamp, args.output) + + # LLM analysis on complete sentences + if llm_analyzer: + context = f"Sentence from segment {segment_count}" + + def run_llm_analysis(txt, ctx, ts, seg_num): + fc = llm_analyzer.fact_check(txt, ctx) + qs = llm_analyzer.generate_augmenting_questions(txt, ctx) + return { + 'timestamp': ts, + 'text': txt, + 'segment_count': seg_num, + 'fact_check': fc, + 'questions': qs + } + + future = llm_executor.submit(run_llm_analysis, cleaned, context, timestamp, segment_count) + pending_llm_tasks[segment_count] = future + else: + # Standard mode: display chunks as-is + # Display transcription immediately (don't wait for LLM) + print(f"[{timestamp}] {text}") + + # LLM Analysis (run concurrently in background) - only in non-sentence mode + if llm_analyzer and not sentence_extractor: context = f"Segment {segment_count}" # Submit LLM tasks to thread pool @@ -701,6 +726,34 @@ Examples: # Cleanup capturer.close() + # Flush sentence buffer if in sentence mode + if sentence_extractor: + print("\n📝 Flushing sentence buffer...") + final_sentences = sentence_extractor.flush() + for sentence in final_sentences: + cleaned = sentence_cleaner.clean(sentence) if sentence_cleaner else sentence + if cleaned: + timestamp = datetime.now().strftime("%H:%M:%S") + print(f"[{timestamp}] 📝 {cleaned}") + + if args.output and not llm_analyzer: + save_transcript(cleaned, timestamp, args.output) + + # LLM analysis for flushed sentences + if llm_analyzer: + fact_check = llm_analyzer.fact_check(cleaned, "Final sentence") + questions = llm_analyzer.generate_augmenting_questions(cleaned) + display_enriched_output(cleaned, timestamp, fact_check, questions) + + if args.output: + data = { + 'timestamp': timestamp, + 'text': cleaned, + 'fact_check': fact_check, + 'questions': questions + } + save_enriched_transcript(data, args.output) + # Process remaining audio print("\nProcessing remaining audio...") final_text = transcriber.transcribe_chunk(min_duration=0) diff --git a/transcribe_speakers_llm.py b/transcribe_speakers_llm.py deleted file mode 100755 index a3ff667..0000000 --- a/transcribe_speakers_llm.py +++ /dev/null @@ -1,636 +0,0 @@ -#!/usr/bin/env python3 -""" -Real-time transcription of Windows speaker output using loopback capture. -Captures system audio and transcribes with Whisper in near real-time. -""" - -import sounddevice as sd -import numpy as np -import threading -import queue -import time -import os -import argparse -import json -from datetime import datetime - -# Choose your Whisper backend here: -# For faster-whisper (recommended): -from faster_whisper import WhisperModel - -# LLM integration -try: - import ollama - OLLAMA_AVAILABLE = True -except ImportError: - OLLAMA_AVAILABLE = False - - -# # For regular whisper (comment out the line above and uncomment these): -# import whisper - - -class WindowsLoopbackAudioCapture: - """Capture Windows speaker output using WASAPI loopback""" - - def __init__(self, device_name=None, sample_rate=16000, chunk_size=2048): - self.sample_rate = sample_rate - self.chunk_size = chunk_size - - # Find loopback device - self.device_info = self._find_loopback_device(device_name) - if not self.device_info: - raise RuntimeError( - "No loopback device found.\n" - "1. Ensure your speakers/headphones are connected\n" - "2. Enable 'Stereo Mix' in Sound settings\n" - "3. Or install VB-Cable virtual audio device" - ) - - print(f"✓ Using device: {self.device_info['name']} (index {self.device_info['index']})") - - # Queue for audio data - self.audio_queue = queue.Queue() - self.stop_event = threading.Event() - - # Start the stream - try: - self.stream = sd.InputStream( - device=self.device_info['index'], - channels=1, - samplerate=sample_rate, - blocksize=chunk_size, - dtype='int16', - latency='low', - callback=self._audio_callback - ) - self.stream.start() - print("✓ Audio capture stream started") - except Exception as e: - raise RuntimeError(f"Failed to start audio stream: {e}") - - def _find_loopback_device(self, device_name): - """Find the speaker device with loopback capability""" - devices = sd.query_devices() - - # If device name specified, find exact match - if device_name: - for dev in devices: - if (device_name.lower() in dev['name'].lower() and - dev['max_input_channels'] > 0): - return dev - - # Auto-detect: look for WASAPI speakers/headphones - for dev in devices: - if (dev['max_input_channels'] > 0 and - any(x in dev['name'] for x in ['Speakers', 'Headphones', 'Output'])): - return dev - - # Fallback: Stereo Mix or similar - for dev in devices: - if 'Stereo Mix' in dev['name']: - return dev - - return None - - def _audio_callback(self, indata, frames, time_info, status): - """Callback for audio data""" - if status: - print(f"⚠ Audio status: {status}") - self.audio_queue.put(indata.copy()) - - def read_chunk(self): - """Read audio data from queue""" - try: - return self.audio_queue.get(timeout=0.05).flatten() - except queue.Empty: - return None - - def close(self): - """Cleanup resources""" - if hasattr(self, 'stream'): - self.stream.stop() - self.stream.close() - - -class WhisperStreamTranscriber: - """Process audio chunks with Whisper/faster-whisper""" - - def __init__(self, model_name="base", language="en", force_cpu=False): - print(f"Loading Whisper model '{model_name}'...") - - # Check for CUDA availability - import torch - has_cuda = torch.cuda.is_available() and not force_cpu - - # Force CPU if CUDA libraries incompatible - device = "cpu" - compute_type = "int8" - - if has_cuda: - try: - # Test if CTranslate2 can actually use CUDA - import ctranslate2 - cuda_count = ctranslate2.get_cuda_device_count() - if cuda_count > 0: - device = "cuda" - compute_type = "float16" - print(f"Using device: cuda ({torch.cuda.get_device_name(0)})") - else: - print(f"CUDA available in PyTorch but not in CTranslate2. Using CPU.") - except Exception as e: - print(f"CUDA libraries not found ({e}). Using CPU.") - else: - print("Using device: cpu") - - # FASTER-WHISPER (recommended): - model_kwargs = { - "device": device, - "compute_type": compute_type - } - if not has_cuda: - model_kwargs["cpu_threads"] = 4 - - self.model = WhisperModel(model_name, **model_kwargs) - self.language = language - self.audio_buffer = np.array([], dtype=np.float32) - self.lock = threading.Lock() - - # # REGULAR WHISPER: - # self.model = whisper.load_model(model_name) - # self.language = language - # self.audio_buffer = np.array([], dtype=np.float32) - # self.lock = threading.Lock() - - def add_audio(self, audio_chunk): - """Add new audio data to buffer""" - with self.lock: - audio_float = audio_chunk.astype(np.float32) / 32768.0 - self.audio_buffer = np.concatenate([self.audio_buffer, audio_float]) - - def transcribe_chunk(self, min_duration=5.0): - """Transcribe accumulated audio if enough duration""" - with self.lock: - duration = len(self.audio_buffer) / 16000 - if duration < min_duration: - return None - - audio_to_process = self.audio_buffer.copy() - self.audio_buffer = np.array([], dtype=np.float32) - - # Process with FASTER-WHISPER: - try: - segments, _ = self.model.transcribe( - audio_to_process, - language=self.language, - beam_size=5, - vad_filter=True, - vad_parameters=dict(min_silence_duration_ms=500), - word_timestamps=False - ) - text = " ".join([segment.text for segment in segments]).strip() - return text if text else None - except Exception as e: - print(f"❌ Transcription error: {e}") - return None - - # # REGULAR WHISPER: - # try: - # result = self.model.transcribe( - # audio_to_process, - # language=self.language, - # task="transcribe", - # fp16=False - # ) - # return result["text"].strip() - # except Exception as e: - # print(f"❌ Transcription error: {e}") - # return None - - -class LocalLLMAnalyzer: - """Local LLM for fact-checking and question generation using Ollama""" - - def __init__(self, model="llama3.2", debug=False): - if not OLLAMA_AVAILABLE: - raise RuntimeError( - "Ollama package not installed.\n" - "Install with: pip install ollama" - ) - - self.model = model - self.debug = debug - self._test_connection() - - def _test_connection(self): - """Test connection to Ollama service""" - try: - ollama.list() - print(f"✓ Ollama connected using model: {self.model}") - except Exception as e: - raise RuntimeError( - f"Cannot connect to Ollama. Ensure it's installed and running.\n" - f"Error: {e}\n" - f"Install from: https://ollama.ai\n" - f"Then run: ollama pull {self.model}" - ) - - def _extract_json(self, text): - """Extract JSON from text that might contain markdown or other formatting""" - # Try to find JSON block in markdown code fence - import re - json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', text, re.DOTALL) - if json_match: - return json_match.group(1) - - # Try to find raw JSON object - json_match = re.search(r'\{.*\}', text, re.DOTALL) - if json_match: - return json_match.group(0) - - return text - - def fact_check(self, text, context=""): - """Analyze text for factual accuracy""" - # Try simple structured format first - prompt = f"""Analyze this for accuracy. Reply in this exact format: - -VERDICT: [factual/dubious/not_factual] -CONFIDENCE: [0.0-1.0] -EXPLANATION: [one sentence] - -Statement: "{text}" -""" - - try: - response = ollama.generate( - model=self.model, - prompt=prompt, - options={"temperature": 0.1, "num_predict": 150} - ) - - response_text = response['response'].strip() - - if self.debug: - print(f"\n[DEBUG] Fact-check response:\n{response_text}\n") - - # Try to parse structured text format - verdict = "dubious" - confidence = 0.5 - explanation = response_text - - # Extract VERDICT - import re - verdict_match = re.search(r'VERDICT:\s*(\w+)', response_text, re.IGNORECASE) - if verdict_match: - verdict = verdict_match.group(1).lower() - - # Extract CONFIDENCE - conf_match = re.search(r'CONFIDENCE:\s*([\d.]+)', response_text, re.IGNORECASE) - if conf_match: - try: - confidence = float(conf_match.group(1)) - confidence = max(0.0, min(1.0, confidence)) # Clamp to 0-1 - except ValueError: - pass - - # Extract EXPLANATION - expl_match = re.search(r'EXPLANATION:\s*(.+?)(?:\n|$)', response_text, re.IGNORECASE | re.DOTALL) - if expl_match: - explanation = expl_match.group(1).strip() - - return { - "verdict": verdict, - "confidence": confidence, - "explanation": explanation[:200], # Truncate if too long - "sources": [], - "corrections": "" - } - - except Exception as e: - if self.debug: - print(f"[DEBUG] Fact-check error: {e}") - return { - "verdict": "error", - "confidence": 0.0, - "explanation": f"Analysis failed: {str(e)}", - "sources": [], - "corrections": "" - } - - def generate_augmenting_questions(self, text, context=""): - """Generate insightful questions based on the text""" - prompt = f"""Generate 3 questions about this. Reply in this exact format: - -Q1: [question] -Q2: [question] -Q3: [question] - -Statement: "{text}" -""" - - try: - response = ollama.generate( - model=self.model, - prompt=prompt, - options={"temperature": 0.7, "num_predict": 150} - ) - - response_text = response['response'].strip() - - if self.debug: - print(f"\n[DEBUG] Questions response:\n{response_text}\n") - - # Extract questions - import re - questions = [] - for i in range(1, 4): - q_match = re.search(rf'Q{i}:\s*(.+?)(?:\n|$)', response_text, re.IGNORECASE) - if q_match: - questions.append(q_match.group(1).strip()) - - # If we couldn't parse, try to split by newlines and take first 3 non-empty lines - if len(questions) < 3: - lines = [line.strip() for line in response_text.split('\n') if line.strip()] - questions = lines[:3] if lines else [ - "What are the key points here?", - "What evidence supports this?", - "What are the implications?" - ] - - # Ensure we have exactly 3 questions - while len(questions) < 3: - questions.append("What else should we consider?") - - return { - "questions": questions[:3], - "topics": [] - } - - except Exception as e: - if self.debug: - print(f"[DEBUG] Questions error: {e}") - return { - "questions": [ - "What are the key points?", - "What supports this claim?", - "What are the implications?" - ], - "topics": [] - } - - -def list_audio_devices(): - """Print all available audio input devices""" - print("\nAvailable audio capture devices:") - devices = sd.query_devices() - for i, dev in enumerate(devices): - if dev['max_input_channels'] > 0: - print(f" [{i}] {dev['name']}") - print(f" Channels: {dev['max_input_channels']} | Sample Rate: {dev['default_samplerate']}") - print() - - -def save_transcript(text, timestamp, filename): - """Append transcript to file""" - os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True) - with open(filename, "a", encoding="utf-8") as f: - f.write(f"[{timestamp}] {text}\n") - - -def save_enriched_transcript(data, filename): - """Save enriched transcript with LLM analysis""" - os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True) - with open(filename, "a", encoding="utf-8") as f: - f.write(f"\n{'='*70}\n") - f.write(f"[{data['timestamp']}] {data['text']}\n\n") - - if 'fact_check' in data: - fc = data['fact_check'] - f.write(f"📊 Fact Check: {fc.get('verdict', 'N/A').upper()} " - f"(confidence: {fc.get('confidence', 0):.2f})\n") - f.write(f"💡 {fc.get('explanation', 'N/A')}\n") - if fc.get('corrections'): - f.write(f"✏️ Correction: {fc['corrections']}\n") - f.write("\n") - - if 'questions' in data and data['questions'].get('questions'): - f.write("❓ Questions:\n") - for i, q in enumerate(data['questions']['questions'], 1): - f.write(f"{i}. {q}\n") - f.write("\n") - - -def display_enriched_output(text, timestamp, fact_check=None, questions=None): - """Display transcript with LLM analysis""" - print(f"\n[{timestamp}] {text}") - - if fact_check: - verdict_emoji = { - 'factual': '✅', - 'dubious': '⚠️', - 'not_factual': '❌', - 'error': '⚠️' - } - emoji = verdict_emoji.get(fact_check.get('verdict', 'error'), '❓') - - print(f"\n{emoji} Fact Check: {fact_check.get('verdict', 'N/A').upper()} " - f"(confidence: {fact_check.get('confidence', 0):.2f})") - print(f"💡 {fact_check.get('explanation', 'N/A')}") - - if fact_check.get('corrections'): - print(f"✏️ Correction: {fact_check['corrections']}") - - if questions and questions.get('questions'): - print(f"\n❓ Questions:") - for i, q in enumerate(questions['questions'], 1): - print(f" {i}. {q}") - - -def main(): - parser = argparse.ArgumentParser( - description="Real-time transcription of Windows speaker output", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - python transcribe_speakers.py - python transcribe_speakers.py --model small --language es --interval 5 - python transcribe_speakers.py --device "Speakers" --output "meeting.txt" - python transcribe_speakers.py --model medium --interval 10 --output transcripts/live.txt - """ - ) - - parser.add_argument("--model", default="base", - choices=["tiny", "base", "small", "medium", "large"], - help="Whisper model size (default: base)") - parser.add_argument("--language", default="en", - help="Language code (default: en)") - parser.add_argument("--device", metavar="NAME", - help="Audio device name (partial match). If not specified, auto-detects") - parser.add_argument("--interval", type=float, default=8.0, - help="Processing interval in seconds (default: 8.0)") - parser.add_argument("--output", "-o", metavar="FILE", - help="Save transcript to file (e.g., transcript.txt)") - parser.add_argument("--list-devices", action="store_true", - help="List all available audio devices and exit") - parser.add_argument("--force-cpu", action="store_true", - help="Force CPU processing (disable GPU acceleration)") - parser.add_argument("--enable-llm", action="store_true", - help="Enable LLM analysis (fact-checking and questions)") - parser.add_argument("--llm-model", default="gpt-oss:20b", - help="Ollama model to use for LLM analysis (default: gpt-oss:20b)") - parser.add_argument("--llm-debug", action="store_true", - help="Show LLM raw responses for debugging") - - args = parser.parse_args() - - if args.list_devices: - list_audio_devices() - return - - print("=== Windows Real-Time Audio Transcription ===") - print(f"Model: {args.model} | Language: {args.language} | Interval: {args.interval}s") - if args.output: - print(f"Output: {args.output}") - if args.enable_llm: - print(f"LLM Analysis: Enabled ({args.llm_model})") - - # Initialize audio capture - try: - capturer = WindowsLoopbackAudioCapture( - device_name=args.device, - sample_rate=16000, - chunk_size=2048 - ) - except RuntimeError as e: - print(f"\n❌ Audio Error: {e}") - print("\nTo fix this:") - print("1. Right-click speaker icon → Sounds → Recording tab") - print("2. Right-click in empty area → Show Disabled Devices") - print("3. Enable 'Stereo Mix' → Set as Default Device") - print("\nAlternative: Install VB-Cable (free) from vb-audio.com") - print(" Then use: --device 'CABLE Output'") - list_audio_devices() - return - - # Initialize transcriber - try: - transcriber = WhisperStreamTranscriber( - model_name=args.model, - language=args.language, - force_cpu=args.force_cpu - ) - except Exception as e: - print(f"\n❌ Model Error: {e}") - print("Make sure you installed Whisper correctly") - return - - # Initialize LLM analyzer (optional) - llm_analyzer = None - if args.enable_llm: - try: - llm_analyzer = LocalLLMAnalyzer(model=args.llm_model, debug=args.llm_debug) - except RuntimeError as e: - print(f"\n❌ LLM Error: {e}") - print("Continuing without LLM analysis...") - llm_analyzer = None - - # Main processing loop - print(f"\n✅ Started transcription. Press Ctrl+C to stop.\n{'=' * 50}") - last_process_time = time.time() - total_duration = 0 - segment_count = 0 - - try: - while True: - # Collect audio - chunk = capturer.read_chunk() - if chunk is not None: - transcriber.add_audio(chunk) - total_duration += len(chunk) / 16000 - - # Process at intervals - current_time = time.time() - if current_time - last_process_time >= args.interval: - text = transcriber.transcribe_chunk() - if text: - segment_count += 1 - timestamp = datetime.now().strftime("%H:%M:%S") - - # LLM Analysis - fact_check = None - questions = None - if llm_analyzer: - context = f"Segment {segment_count}" - fact_check = llm_analyzer.fact_check(text, context) - questions = llm_analyzer.generate_augmenting_questions(text, context) - - # Display output - if llm_analyzer: - display_enriched_output(text, timestamp, fact_check, questions) - else: - print(f"[{timestamp}] {text}") - - # Save output - if args.output: - if llm_analyzer: - data = { - 'timestamp': timestamp, - 'text': text, - 'fact_check': fact_check, - 'questions': questions - } - save_enriched_transcript(data, args.output) - else: - save_transcript(text, timestamp, args.output) - - last_process_time = current_time - - except KeyboardInterrupt: - print(f"\n{'=' * 50}\n🛑 Stopping transcription...") - - # Cleanup - capturer.close() - - # Process remaining audio - print("\nProcessing remaining audio...") - final_text = transcriber.transcribe_chunk(min_duration=0) - if final_text: - timestamp = datetime.now().strftime("%H:%M:%S") - - # LLM Analysis for final segment - fact_check = None - questions = None - if llm_analyzer: - fact_check = llm_analyzer.fact_check(final_text, "Final segment") - questions = llm_analyzer.generate_augmenting_questions(final_text) - - # Display output - if llm_analyzer: - display_enriched_output(final_text, timestamp, fact_check, questions) - else: - print(f"[{timestamp}] {final_text}") - - # Save output - if args.output: - if llm_analyzer: - data = { - 'timestamp': timestamp, - 'text': final_text, - 'fact_check': fact_check, - 'questions': questions - } - save_enriched_transcript(data, args.output) - else: - save_transcript(final_text, timestamp, args.output) - - # Summary - print(f"\n✅ Complete! Processed {total_duration:.1f}s of audio") - print(f" Generated {segment_count} transcript segments") - if args.output and os.path.exists(args.output): - abs_path = os.path.abspath(args.output) - print(f"💾 Transcript saved to: {abs_path}") - - -if __name__ == "__main__": - main() \ No newline at end of file