diff --git a/.gitignore b/.gitignore
index 1612ed8..d086f68 100755
--- a/.gitignore
+++ b/.gitignore
@@ -303,3 +303,4 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
+.idea/
diff --git a/.idea/.gitignore b/.idea/.gitignore
deleted file mode 100644
index ab1f416..0000000
--- a/.idea/.gitignore
+++ /dev/null
@@ -1,10 +0,0 @@
-# Default ignored files
-/shelf/
-/workspace.xml
-# Ignored default folder with query files
-/queries/
-# Datasource local storage ignored files
-/dataSources/
-/dataSources.local.xml
-# Editor-based HTTP Client requests
-/httpRequests/
diff --git a/.idea/go.imports.xml b/.idea/go.imports.xml
deleted file mode 100644
index d7202f0..0000000
--- a/.idea/go.imports.xml
+++ /dev/null
@@ -1,11 +0,0 @@
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml
deleted file mode 100644
index e1e41cb..0000000
--- a/.idea/inspectionProfiles/Project_Default.xml
+++ /dev/null
@@ -1,621 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/material_theme_project_new.xml b/.idea/material_theme_project_new.xml
deleted file mode 100644
index 7b10447..0000000
--- a/.idea/material_theme_project_new.xml
+++ /dev/null
@@ -1,17 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
deleted file mode 100644
index 88d9a0d..0000000
--- a/.idea/misc.xml
+++ /dev/null
@@ -1,9 +0,0 @@
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
deleted file mode 100644
index f95f667..0000000
--- a/.idea/modules.xml
+++ /dev/null
@@ -1,8 +0,0 @@
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
deleted file mode 100644
index 94a25f7..0000000
--- a/.idea/vcs.xml
+++ /dev/null
@@ -1,6 +0,0 @@
-
-
-
-
-
-
\ No newline at end of file
diff --git a/README.md b/README.md
index ab61283..6d1c5ef 100644
--- a/README.md
+++ b/README.md
@@ -7,6 +7,7 @@ Real-time audio transcription using Whisper AI with optional LLM-powered analysi
- Real-time transcription of system audio (Windows/Linux)
- Multiple Whisper model sizes (tiny to large)
- Multi-language support
+- **Sentence extraction mode** - Stitches audio chunks into complete sentences
- Optional LLM analysis for fact-checking and question generation (via Ollama)
- GPU acceleration support
- Flexible audio device configuration
@@ -17,12 +18,15 @@ Real-time audio transcription using Whisper AI with optional LLM-powered analysi
# Install dependencies
pip install -r requirements.txt
-# Basic transcription
+# Basic transcription (no LLM)
python transcribe_speakers.py
-# With LLM analysis
+# With LLM analysis (optional)
python transcribe_speakers.py --enable-llm
+# With sentence extraction
+python transcribe_speakers.py --sentence-mode
+
# List audio devices
python transcribe_speakers.py --list-devices
```
@@ -80,9 +84,7 @@ ollama pull llama3.2
### Available Scripts
-- `transcribe_speakers.py` - Main script with all features
-- `transcribe_speakers_llm.py` - LLM-enabled version
-- `transcribe_No_llm.py` - Basic version without LLM support
+- `transcribe_speakers.py` - Main script with all features (LLM optional via `--enable-llm`)
- `transcribe_dual_linux.py` - Linux-specific with dual audio support
### Common Commands
@@ -97,8 +99,11 @@ python transcribe_speakers.py --language es --output transcript.txt
# Fast mode (low latency)
python transcribe_speakers.py --fast-mode --model tiny --interval 3
-# Maximum accuracy with LLM
-python transcribe_speakers.py --model large --enable-llm --output enriched.txt
+# Extract complete sentences from chunks
+python transcribe_speakers.py --sentence-mode --output sentences.txt
+
+# Maximum accuracy with LLM and sentence extraction
+python transcribe_speakers.py --model large --enable-llm --sentence-mode --output enriched.txt
# Force CPU (avoid GPU issues)
python transcribe_speakers.py --force-cpu
@@ -119,6 +124,7 @@ python transcribe_speakers.py --force-cpu
| `--output` | Save to file | None |
| `--force-cpu` | Disable GPU | False |
| `--gpu-index` | GPU device index | 0 |
+| `--sentence-mode` | Extract complete sentences from chunks | False |
## Model Performance
diff --git a/sentence_extractor.py b/sentence_extractor.py
new file mode 100644
index 0000000..06fcb85
--- /dev/null
+++ b/sentence_extractor.py
@@ -0,0 +1,260 @@
+"""
+Sentence extraction from chunked transcriptions.
+Stitches partial chunks together and extracts complete sentences.
+"""
+
+import re
+from typing import List, Tuple, Optional
+from collections import deque
+
+
+class SentenceExtractor:
+ """
+ Buffers transcription chunks and extracts complete sentences.
+ Handles sentence boundaries that span across audio chunks.
+ """
+
+ def __init__(self, max_buffer_words=200):
+ """
+ Initialize the sentence extractor.
+
+ Args:
+ max_buffer_words: Maximum words to keep in buffer before forcing extraction
+ """
+ self.buffer = ""
+ self.max_buffer_words = max_buffer_words
+ self.completed_sentences = deque()
+
+ # Sentence boundary patterns
+ self.sentence_end_pattern = re.compile(r'([.!?]+)\s+')
+ self.sentence_boundaries = re.compile(r'(?<=[.!?])\s+(?=[A-Z])')
+
+ def add_chunk(self, text: str) -> List[str]:
+ """
+ Add a new transcription chunk and extract any complete sentences.
+
+ Args:
+ text: New transcription text chunk
+
+ Returns:
+ List of complete sentences extracted
+ """
+ if not text or not text.strip():
+ return []
+
+ # Add to buffer
+ if self.buffer:
+ # Smart joining: check if we need a space
+ if not self.buffer[-1].isspace() and not text[0].isspace():
+ self.buffer += " "
+ self.buffer += text.strip()
+
+ # Extract complete sentences
+ sentences = self._extract_sentences()
+
+ # Check if buffer is too large
+ word_count = len(self.buffer.split())
+ if word_count > self.max_buffer_words:
+ # Force extraction of what we have
+ forced = self._force_extract()
+ if forced:
+ sentences.extend(forced)
+
+ return sentences
+
+ def _extract_sentences(self) -> List[str]:
+ """
+ Extract complete sentences from buffer.
+ Keeps incomplete sentence in buffer.
+
+ Returns:
+ List of complete sentences
+ """
+ sentences = []
+
+ # Find sentence boundaries
+ # Pattern: sentence ending punctuation followed by space and capital letter
+ # or sentence ending at punctuation before end of buffer
+ parts = self.sentence_boundaries.split(self.buffer)
+
+ if len(parts) > 1:
+ # We have complete sentences
+ # Keep the last part (incomplete sentence) in buffer
+ sentences = [s.strip() for s in parts[:-1] if s.strip()]
+ self.buffer = parts[-1].strip()
+
+ return sentences
+
+ def _force_extract(self) -> List[str]:
+ """
+ Force extraction when buffer is too large.
+ Tries to break at reasonable points.
+
+ Returns:
+ List of extracted text segments
+ """
+ # Try to find the last sentence-like boundary
+ last_period = max(
+ self.buffer.rfind('. '),
+ self.buffer.rfind('! '),
+ self.buffer.rfind('? ')
+ )
+
+ if last_period > 0:
+ # Extract up to last period
+ extracted = self.buffer[:last_period + 1].strip()
+ self.buffer = self.buffer[last_period + 1:].strip()
+ return [extracted]
+ else:
+ # No sentence boundary found, extract by word limit
+ words = self.buffer.split()
+ if len(words) > self.max_buffer_words:
+ # Take 80% of max_buffer_words
+ split_point = int(self.max_buffer_words * 0.8)
+ extracted = " ".join(words[:split_point])
+ self.buffer = " ".join(words[split_point:])
+ return [extracted + "..."]
+
+ return []
+
+ def flush(self) -> List[str]:
+ """
+ Flush remaining buffer and return as sentence(s).
+ Call this at end of transcription.
+
+ Returns:
+ List of remaining text as sentences
+ """
+ sentences = []
+
+ if self.buffer.strip():
+ # Try to extract any remaining complete sentences first
+ extracted = self._extract_sentences()
+ sentences.extend(extracted)
+
+ # Return remaining buffer if it has content
+ if self.buffer.strip():
+ # Check if it ends with punctuation
+ if not self.buffer[-1] in '.!?':
+ self.buffer += "."
+ sentences.append(self.buffer.strip())
+ self.buffer = ""
+
+ return sentences
+
+ def get_buffer_status(self) -> dict:
+ """
+ Get current buffer status for debugging.
+
+ Returns:
+ Dictionary with buffer stats
+ """
+ return {
+ "buffer_length": len(self.buffer),
+ "buffer_words": len(self.buffer.split()) if self.buffer else 0,
+ "buffer_preview": self.buffer[:100] + "..." if len(self.buffer) > 100 else self.buffer
+ }
+
+
+class SentenceCleaner:
+ """
+ Cleans and normalizes extracted sentences.
+ Removes duplicates, fixes common transcription issues.
+ """
+
+ def __init__(self):
+ self.seen_sentences = set()
+ self.similarity_threshold = 0.85
+
+ def clean(self, sentence: str) -> Optional[str]:
+ """
+ Clean and normalize a sentence.
+
+ Args:
+ sentence: Raw sentence text
+
+ Returns:
+ Cleaned sentence or None if should be filtered
+ """
+ if not sentence or not sentence.strip():
+ return None
+
+ # Basic cleaning
+ cleaned = sentence.strip()
+
+ # Remove multiple spaces
+ cleaned = re.sub(r'\s+', ' ', cleaned)
+
+ # Fix spacing around punctuation
+ cleaned = re.sub(r'\s+([.!?,;:])', r'\1', cleaned)
+
+ # Capitalize first letter
+ if cleaned and not cleaned[0].isupper():
+ cleaned = cleaned[0].upper() + cleaned[1:]
+
+ # Ensure ends with punctuation
+ if cleaned and not cleaned[-1] in '.!?':
+ cleaned += '.'
+
+ # Filter very short sentences (likely fragments)
+ if len(cleaned.split()) < 3:
+ return None
+
+ # Check for duplicates (exact)
+ if cleaned in self.seen_sentences:
+ return None
+
+ self.seen_sentences.add(cleaned)
+ return cleaned
+
+ def reset(self):
+ """Reset seen sentences cache."""
+ self.seen_sentences.clear()
+
+
+def demo():
+ """Demo usage of sentence extractor."""
+ extractor = SentenceExtractor()
+ cleaner = SentenceCleaner()
+
+ # Simulate chunked transcription
+ chunks = [
+ "Hello everyone welcome to",
+ "to this presentation today we will",
+ "will discuss the importance of AI. Artificial intelligence is",
+ "is transforming many industries. It helps us automate",
+ "automate tasks and make better decisions. What do you",
+ "you think about this technology? I believe it has",
+ "has great potential for the future."
+ ]
+
+ print("=== Sentence Extraction Demo ===\n")
+ print("Input chunks:")
+ for i, chunk in enumerate(chunks, 1):
+ print(f" Chunk {i}: '{chunk}'")
+
+ print("\n" + "="*50)
+ print("Extracted sentences:\n")
+
+ for i, chunk in enumerate(chunks, 1):
+ sentences = extractor.add_chunk(chunk)
+ for sent in sentences:
+ cleaned = cleaner.clean(sent)
+ if cleaned:
+ print(f" [{i}] {cleaned}")
+
+ # Flush remaining buffer
+ print("\nFlushing buffer...")
+ final_sentences = extractor.flush()
+ for sent in final_sentences:
+ cleaned = cleaner.clean(sent)
+ if cleaned:
+ print(f" [final] {cleaned}")
+
+ print("\n" + "="*50)
+ print("Buffer status:")
+ print(extractor.get_buffer_status())
+
+
+if __name__ == "__main__":
+ demo()
diff --git a/transcribe.iml b/transcribe.iml
index 4382db5..50c95d3 100644
--- a/transcribe.iml
+++ b/transcribe.iml
@@ -7,6 +7,7 @@
+
\ No newline at end of file
diff --git a/transcribe_No_llm.py b/transcribe_No_llm.py
deleted file mode 100755
index 634da43..0000000
--- a/transcribe_No_llm.py
+++ /dev/null
@@ -1,596 +0,0 @@
-#!/usr/bin/env python3
-"""
-Real-time transcription of Windows speaker output using loopback capture.
-Captures system audio and transcribes with Whisper in near real-time.
-"""
-
-import sounddevice as sd
-import numpy as np
-import threading
-import queue
-import time
-import os
-import argparse
-import json
-from datetime import datetime
-
-# Choose your Whisper backend here:
-# For faster-whisper (recommended):
-from faster_whisper import WhisperModel
-
-# LLM integration
-try:
- import ollama
- OLLAMA_AVAILABLE = True
-except ImportError:
- OLLAMA_AVAILABLE = False
-
-
-# # For regular whisper (comment out the line above and uncomment these):
-# import whisper
-
-
-class WindowsLoopbackAudioCapture:
- """Capture Windows speaker output using WASAPI loopback"""
-
- def __init__(self, device_name=None, sample_rate=16000, chunk_size=2048):
- self.sample_rate = sample_rate
- self.chunk_size = chunk_size
-
- # Find loopback device
- self.device_info = self._find_loopback_device(device_name)
- if not self.device_info:
- raise RuntimeError(
- "No loopback device found.\n"
- "1. Ensure your speakers/headphones are connected\n"
- "2. Enable 'Stereo Mix' in Sound settings\n"
- "3. Or install VB-Cable virtual audio device"
- )
-
- print(f"✓ Using device: {self.device_info['name']} (index {self.device_info['index']})")
-
- # Queue for audio data
- self.audio_queue = queue.Queue()
- self.stop_event = threading.Event()
-
- # Start the stream
- try:
- self.stream = sd.InputStream(
- device=self.device_info['index'],
- channels=1,
- samplerate=sample_rate,
- blocksize=chunk_size,
- dtype='int16',
- latency='low',
- callback=self._audio_callback
- )
- self.stream.start()
- print("✓ Audio capture stream started")
- except Exception as e:
- raise RuntimeError(f"Failed to start audio stream: {e}")
-
- def _find_loopback_device(self, device_name):
- """Find the speaker device with loopback capability"""
- devices = sd.query_devices()
-
- # If device name specified, find exact match
- if device_name:
- for dev in devices:
- if (device_name.lower() in dev['name'].lower() and
- dev['max_input_channels'] > 0):
- return dev
-
- # Auto-detect: look for WASAPI speakers/headphones
- for dev in devices:
- if (dev['max_input_channels'] > 0 and
- any(x in dev['name'] for x in ['Speakers', 'Headphones', 'Output'])):
- return dev
-
- # Fallback: Stereo Mix or similar
- for dev in devices:
- if 'Stereo Mix' in dev['name']:
- return dev
-
- return None
-
- def _audio_callback(self, indata, frames, time_info, status):
- """Callback for audio data"""
- if status:
- print(f"⚠ Audio status: {status}")
- self.audio_queue.put(indata.copy())
-
- def read_chunk(self):
- """Read audio data from queue"""
- try:
- return self.audio_queue.get(timeout=0.05).flatten()
- except queue.Empty:
- return None
-
- def close(self):
- """Cleanup resources"""
- if hasattr(self, 'stream'):
- self.stream.stop()
- self.stream.close()
-
-
-class WhisperStreamTranscriber:
- """Process audio chunks with Whisper/faster-whisper"""
-
- def __init__(self, model_name="base", language="en", force_cpu=False):
- print(f"Loading Whisper model '{model_name}'...")
-
- # Check for CUDA availability
- import torch
- has_cuda = torch.cuda.is_available() and not force_cpu
-
- # Force CPU if CUDA libraries incompatible
- device = "cpu"
- compute_type = "int8"
-
- if has_cuda:
- try:
- # Test if CTranslate2 can actually use CUDA
- import ctranslate2
- cuda_count = ctranslate2.get_cuda_device_count()
- if cuda_count > 0:
- device = "cuda"
- compute_type = "float16"
- print(f"Using device: cuda ({torch.cuda.get_device_name(0)})")
- else:
- print(f"CUDA available in PyTorch but not in CTranslate2. Using CPU.")
- except Exception as e:
- print(f"CUDA libraries not found ({e}). Using CPU.")
- else:
- print("Using device: cpu")
-
- # FASTER-WHISPER (recommended):
- model_kwargs = {
- "device": device,
- "compute_type": compute_type
- }
- if not has_cuda:
- model_kwargs["cpu_threads"] = 4
-
- self.model = WhisperModel(model_name, **model_kwargs)
- self.language = language
- self.audio_buffer = np.array([], dtype=np.float32)
- self.lock = threading.Lock()
-
- # # REGULAR WHISPER:
- # self.model = whisper.load_model(model_name)
- # self.language = language
- # self.audio_buffer = np.array([], dtype=np.float32)
- # self.lock = threading.Lock()
-
- def add_audio(self, audio_chunk):
- """Add new audio data to buffer"""
- with self.lock:
- audio_float = audio_chunk.astype(np.float32) / 32768.0
- self.audio_buffer = np.concatenate([self.audio_buffer, audio_float])
-
- def transcribe_chunk(self, min_duration=5.0):
- """Transcribe accumulated audio if enough duration"""
- with self.lock:
- duration = len(self.audio_buffer) / 16000
- if duration < min_duration:
- return None
-
- audio_to_process = self.audio_buffer.copy()
- self.audio_buffer = np.array([], dtype=np.float32)
-
- # Process with FASTER-WHISPER:
- try:
- segments, _ = self.model.transcribe(
- audio_to_process,
- language=self.language,
- beam_size=5,
- vad_filter=True,
- vad_parameters=dict(min_silence_duration_ms=500),
- word_timestamps=False
- )
- text = " ".join([segment.text for segment in segments]).strip()
- return text if text else None
- except Exception as e:
- print(f"❌ Transcription error: {e}")
- return None
-
- # # REGULAR WHISPER:
- # try:
- # result = self.model.transcribe(
- # audio_to_process,
- # language=self.language,
- # task="transcribe",
- # fp16=False
- # )
- # return result["text"].strip()
- # except Exception as e:
- # print(f"❌ Transcription error: {e}")
- # return None
-
-
-class LocalLLMAnalyzer:
- """Local LLM for fact-checking and question generation using Ollama"""
-
- def __init__(self, model="llama3.2"):
- if not OLLAMA_AVAILABLE:
- raise RuntimeError(
- "Ollama package not installed.\n"
- "Install with: pip install ollama"
- )
-
- self.model = model
- self._test_connection()
-
- def _test_connection(self):
- """Test connection to Ollama service"""
- try:
- ollama.list()
- print(f"✓ Ollama connected using model: {self.model}")
- except Exception as e:
- raise RuntimeError(
- f"Cannot connect to Ollama. Ensure it's installed and running.\n"
- f"Error: {e}\n"
- f"Install from: https://ollama.ai\n"
- f"Then run: ollama pull {self.model}"
- )
-
- def _extract_json(self, text):
- """Extract JSON from text that might contain markdown or other formatting"""
- # Try to find JSON block in markdown code fence
- import re
- json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', text, re.DOTALL)
- if json_match:
- return json_match.group(1)
-
- # Try to find raw JSON object
- json_match = re.search(r'\{.*\}', text, re.DOTALL)
- if json_match:
- return json_match.group(0)
-
- return text
-
- def fact_check(self, text, context=""):
- """Analyze text for factual accuracy"""
- prompt = f"""You are a fact-checking assistant. Analyze this statement for factual accuracy.
-
-Context: {context}
-Statement: "{text}"
-
-You must respond with ONLY valid JSON in this exact format, no other text:
-{{
- "verdict": "factual",
- "confidence": 0.95,
- "explanation": "Brief explanation here",
- "sources": ["source1"],
- "corrections": ""
-}}
-
-Valid verdict values: "factual", "dubious", "not_factual"
-Confidence must be a number between 0.0 and 1.0."""
-
- try:
- response = ollama.generate(
- model=self.model,
- prompt=prompt,
- options={"temperature": 0.1, "num_predict": 200}
- )
-
- # Extract and parse JSON
- response_text = response['response']
- json_text = self._extract_json(response_text)
- result = json.loads(json_text)
-
- # Validate required fields
- if 'verdict' not in result or 'confidence' not in result:
- raise ValueError("Missing required fields")
-
- # Ensure defaults for optional fields
- result.setdefault('explanation', 'No explanation provided')
- result.setdefault('sources', [])
- result.setdefault('corrections', '')
-
- return result
-
- except (json.JSONDecodeError, ValueError) as e:
- # Return a simple analysis without JSON parsing
- return {
- "verdict": "dubious",
- "confidence": 0.5,
- "explanation": f"Could not parse LLM response properly. Model may need JSON format support.",
- "sources": [],
- "corrections": ""
- }
- except Exception as e:
- return {
- "verdict": "error",
- "confidence": 0.0,
- "explanation": f"Analysis failed: {str(e)}",
- "sources": [],
- "corrections": ""
- }
-
- def generate_augmenting_questions(self, text, context=""):
- """Generate insightful questions based on the text"""
- prompt = f"""Based on this statement, generate 3 insightful questions that would help understand the topic better.
-
-Statement: "{text}"
-Context: {context}
-
-Respond with JSON only:
-{{
- "questions": ["Question 1", "Question 2", "Question 3"],
- "topics": ["key_topic_1", "key_topic_2"]
-}}"""
-
- try:
- response = ollama.generate(
- model=self.model,
- prompt=prompt,
- format="json",
- options={"temperature": 0.7}
- )
- return json.loads(response['response'])
- except json.JSONDecodeError:
- return {
- "questions": ["Error: LLM response was not valid JSON"],
- "topics": []
- }
- except Exception as e:
- return {
- "questions": [f"Error: {str(e)}"],
- "topics": []
- }
-
-
-def list_audio_devices():
- """Print all available audio input devices"""
- print("\nAvailable audio capture devices:")
- devices = sd.query_devices()
- for i, dev in enumerate(devices):
- if dev['max_input_channels'] > 0:
- print(f" [{i}] {dev['name']}")
- print(f" Channels: {dev['max_input_channels']} | Sample Rate: {dev['default_samplerate']}")
- print()
-
-
-def save_transcript(text, timestamp, filename):
- """Append transcript to file"""
- os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True)
- with open(filename, "a", encoding="utf-8") as f:
- f.write(f"[{timestamp}] {text}\n")
-
-
-def save_enriched_transcript(data, filename):
- """Save enriched transcript with LLM analysis"""
- os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True)
- with open(filename, "a", encoding="utf-8") as f:
- f.write(f"\n{'='*70}\n")
- f.write(f"[{data['timestamp']}] {data['text']}\n\n")
-
- if 'fact_check' in data:
- fc = data['fact_check']
- f.write(f"📊 Fact Check: {fc.get('verdict', 'N/A').upper()} "
- f"(confidence: {fc.get('confidence', 0):.2f})\n")
- f.write(f"💡 {fc.get('explanation', 'N/A')}\n")
- if fc.get('corrections'):
- f.write(f"✏️ Correction: {fc['corrections']}\n")
- f.write("\n")
-
- if 'questions' in data and data['questions'].get('questions'):
- f.write("❓ Questions:\n")
- for i, q in enumerate(data['questions']['questions'], 1):
- f.write(f"{i}. {q}\n")
- f.write("\n")
-
-
-def display_enriched_output(text, timestamp, fact_check=None, questions=None):
- """Display transcript with LLM analysis"""
- print(f"\n[{timestamp}] {text}")
-
- if fact_check:
- verdict_emoji = {
- 'factual': '✅',
- 'dubious': '⚠️',
- 'not_factual': '❌',
- 'error': '⚠️'
- }
- emoji = verdict_emoji.get(fact_check.get('verdict', 'error'), '❓')
-
- print(f"\n{emoji} Fact Check: {fact_check.get('verdict', 'N/A').upper()} "
- f"(confidence: {fact_check.get('confidence', 0):.2f})")
- print(f"💡 {fact_check.get('explanation', 'N/A')}")
-
- if fact_check.get('corrections'):
- print(f"✏️ Correction: {fact_check['corrections']}")
-
- if questions and questions.get('questions'):
- print(f"\n❓ Questions:")
- for i, q in enumerate(questions['questions'], 1):
- print(f" {i}. {q}")
-
-
-def main():
- parser = argparse.ArgumentParser(
- description="Real-time transcription of Windows speaker output",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
-Examples:
- python transcribe_speakers.py
- python transcribe_speakers.py --model small --language es --interval 5
- python transcribe_speakers.py --device "Speakers" --output "meeting.txt"
- python transcribe_speakers.py --model medium --interval 10 --output transcripts/live.txt
- """
- )
-
- parser.add_argument("--model", default="base",
- choices=["tiny", "base", "small", "medium", "large"],
- help="Whisper model size (default: base)")
- parser.add_argument("--language", default="en",
- help="Language code (default: en)")
- parser.add_argument("--device", metavar="NAME",
- help="Audio device name (partial match). If not specified, auto-detects")
- parser.add_argument("--interval", type=float, default=8.0,
- help="Processing interval in seconds (default: 8.0)")
- parser.add_argument("--output", "-o", metavar="FILE",
- help="Save transcript to file (e.g., transcript.txt)")
- parser.add_argument("--list-devices", action="store_true",
- help="List all available audio devices and exit")
- parser.add_argument("--force-cpu", action="store_true",
- help="Force CPU processing (disable GPU acceleration)")
- parser.add_argument("--enable-llm", action="store_true",
- help="Enable LLM analysis (fact-checking and questions)")
- parser.add_argument("--llm-model", default="gpt-oss:20b",
- help="Ollama model to use for LLM analysis (default: gpt-oss:20b)")
-
- args = parser.parse_args()
-
- if args.list_devices:
- list_audio_devices()
- return
-
- print("=== Windows Real-Time Audio Transcription ===")
- print(f"Model: {args.model} | Language: {args.language} | Interval: {args.interval}s")
- if args.output:
- print(f"Output: {args.output}")
- if args.enable_llm:
- print(f"LLM Analysis: Enabled ({args.llm_model})")
-
- # Initialize audio capture
- try:
- capturer = WindowsLoopbackAudioCapture(
- device_name=args.device,
- sample_rate=16000,
- chunk_size=2048
- )
- except RuntimeError as e:
- print(f"\n❌ Audio Error: {e}")
- print("\nTo fix this:")
- print("1. Right-click speaker icon → Sounds → Recording tab")
- print("2. Right-click in empty area → Show Disabled Devices")
- print("3. Enable 'Stereo Mix' → Set as Default Device")
- print("\nAlternative: Install VB-Cable (free) from vb-audio.com")
- print(" Then use: --device 'CABLE Output'")
- list_audio_devices()
- return
-
- # Initialize transcriber
- try:
- transcriber = WhisperStreamTranscriber(
- model_name=args.model,
- language=args.language,
- force_cpu=args.force_cpu
- )
- except Exception as e:
- print(f"\n❌ Model Error: {e}")
- print("Make sure you installed Whisper correctly")
- return
-
- # Initialize LLM analyzer (optional)
- llm_analyzer = None
- if args.enable_llm:
- try:
- llm_analyzer = LocalLLMAnalyzer(model=args.llm_model)
- except RuntimeError as e:
- print(f"\n❌ LLM Error: {e}")
- print("Continuing without LLM analysis...")
- llm_analyzer = None
-
- # Main processing loop
- print(f"\n✅ Started transcription. Press Ctrl+C to stop.\n{'=' * 50}")
- last_process_time = time.time()
- total_duration = 0
- segment_count = 0
-
- try:
- while True:
- # Collect audio
- chunk = capturer.read_chunk()
- if chunk is not None:
- transcriber.add_audio(chunk)
- total_duration += len(chunk) / 16000
-
- # Process at intervals
- current_time = time.time()
- if current_time - last_process_time >= args.interval:
- text = transcriber.transcribe_chunk()
- if text:
- segment_count += 1
- timestamp = datetime.now().strftime("%H:%M:%S")
-
- # LLM Analysis
- fact_check = None
- questions = None
- if llm_analyzer:
- context = f"Segment {segment_count}"
- fact_check = llm_analyzer.fact_check(text, context)
- questions = llm_analyzer.generate_augmenting_questions(text, context)
-
- # Display output
- if llm_analyzer:
- display_enriched_output(text, timestamp, fact_check, questions)
- else:
- print(f"[{timestamp}] {text}")
-
- # Save output
- if args.output:
- if llm_analyzer:
- data = {
- 'timestamp': timestamp,
- 'text': text,
- 'fact_check': fact_check,
- 'questions': questions
- }
- save_enriched_transcript(data, args.output)
- else:
- save_transcript(text, timestamp, args.output)
-
- last_process_time = current_time
-
- except KeyboardInterrupt:
- print(f"\n{'=' * 50}\n🛑 Stopping transcription...")
-
- # Cleanup
- capturer.close()
-
- # Process remaining audio
- print("\nProcessing remaining audio...")
- final_text = transcriber.transcribe_chunk(min_duration=0)
- if final_text:
- timestamp = datetime.now().strftime("%H:%M:%S")
-
- # LLM Analysis for final segment
- fact_check = None
- questions = None
- if llm_analyzer:
- fact_check = llm_analyzer.fact_check(final_text, "Final segment")
- questions = llm_analyzer.generate_augmenting_questions(final_text)
-
- # Display output
- if llm_analyzer:
- display_enriched_output(final_text, timestamp, fact_check, questions)
- else:
- print(f"[{timestamp}] {final_text}")
-
- # Save output
- if args.output:
- if llm_analyzer:
- data = {
- 'timestamp': timestamp,
- 'text': final_text,
- 'fact_check': fact_check,
- 'questions': questions
- }
- save_enriched_transcript(data, args.output)
- else:
- save_transcript(final_text, timestamp, args.output)
-
- # Summary
- print(f"\n✅ Complete! Processed {total_duration:.1f}s of audio")
- print(f" Generated {segment_count} transcript segments")
- if args.output and os.path.exists(args.output):
- abs_path = os.path.abspath(args.output)
- print(f"💾 Transcript saved to: {abs_path}")
-
-
-if __name__ == "__main__":
- main()
\ No newline at end of file
diff --git a/transcribe_speakers.py b/transcribe_speakers.py
index a35d105..1157130 100755
--- a/transcribe_speakers.py
+++ b/transcribe_speakers.py
@@ -15,11 +15,13 @@ import json
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
-# Choose your Whisper backend here:
-# For faster-whisper (recommended):
+# Whisper transcription (using faster-whisper for optimal performance)
from faster_whisper import WhisperModel
-# LLM integration
+# Sentence extraction for stitching chunks
+from sentence_extractor import SentenceExtractor, SentenceCleaner
+
+# LLM integration (optional)
try:
import ollama
OLLAMA_AVAILABLE = True
@@ -27,10 +29,6 @@ except ImportError:
OLLAMA_AVAILABLE = False
-# # For regular whisper (comment out the line above and uncomment these):
-# import whisper
-
-
class WindowsLoopbackAudioCapture:
"""Capture Windows speaker output using WASAPI loopback"""
@@ -170,12 +168,6 @@ class WhisperStreamTranscriber:
self.audio_buffer = np.array([], dtype=np.float32)
self.lock = threading.Lock()
- # # REGULAR WHISPER:
- # self.model = whisper.load_model(model_name)
- # self.language = language
- # self.audio_buffer = np.array([], dtype=np.float32)
- # self.lock = threading.Lock()
-
def add_audio(self, audio_chunk):
"""Add new audio data to buffer"""
with self.lock:
@@ -222,19 +214,6 @@ class WhisperStreamTranscriber:
print(f"❌ Transcription error: {e}")
return None
- # # REGULAR WHISPER:
- # try:
- # result = self.model.transcribe(
- # audio_to_process,
- # language=self.language,
- # task="transcribe",
- # fp16=False
- # )
- # return result["text"].strip()
- # except Exception as e:
- # print(f"❌ Transcription error: {e}")
- # return None
-
class LocalLLMAnalyzer:
"""Local LLM for fact-checking and question generation using Ollama"""
@@ -536,6 +515,8 @@ Examples:
help="Ollama model to use for LLM analysis (default: gpt-oss:20b)")
parser.add_argument("--llm-debug", action="store_true",
help="Show LLM raw responses for debugging")
+ parser.add_argument("--sentence-mode", action="store_true",
+ help="Extract complete sentences by stitching chunks together")
args = parser.parse_args()
@@ -549,6 +530,8 @@ Examples:
print(f"Output: {args.output}")
if args.enable_llm:
print(f"LLM Analysis: Enabled ({args.llm_model})")
+ if args.sentence_mode:
+ print(f"Sentence Mode: Enabled (stitching chunks into complete sentences)")
# Initialize audio capture
try:
@@ -591,6 +574,14 @@ Examples:
print("Continuing without LLM analysis...")
llm_analyzer = None
+ # Initialize sentence extractor (optional)
+ sentence_extractor = None
+ sentence_cleaner = None
+ if args.sentence_mode:
+ sentence_extractor = SentenceExtractor(max_buffer_words=150)
+ sentence_cleaner = SentenceCleaner()
+ print("✓ Sentence extraction initialized")
+
# Main processing loop
print(f"\n✅ Started transcription. Press Ctrl+C to stop.\n{'=' * 50}")
last_process_time = time.time()
@@ -620,11 +611,45 @@ Examples:
segment_count += 1
timestamp = datetime.now().strftime("%H:%M:%S")
- # Display transcription immediately (don't wait for LLM)
- print(f"[{timestamp}] {text}")
+ # Sentence extraction mode
+ if sentence_extractor:
+ # Add chunk to extractor and get complete sentences
+ sentences = sentence_extractor.add_chunk(text)
- # LLM Analysis (run concurrently in background)
- if llm_analyzer:
+ for sentence in sentences:
+ # Clean the sentence
+ cleaned = sentence_cleaner.clean(sentence) if sentence_cleaner else sentence
+ if cleaned:
+ print(f"[{timestamp}] 📝 {cleaned}")
+
+ # Save individual sentences
+ if args.output and not llm_analyzer:
+ save_transcript(cleaned, timestamp, args.output)
+
+ # LLM analysis on complete sentences
+ if llm_analyzer:
+ context = f"Sentence from segment {segment_count}"
+
+ def run_llm_analysis(txt, ctx, ts, seg_num):
+ fc = llm_analyzer.fact_check(txt, ctx)
+ qs = llm_analyzer.generate_augmenting_questions(txt, ctx)
+ return {
+ 'timestamp': ts,
+ 'text': txt,
+ 'segment_count': seg_num,
+ 'fact_check': fc,
+ 'questions': qs
+ }
+
+ future = llm_executor.submit(run_llm_analysis, cleaned, context, timestamp, segment_count)
+ pending_llm_tasks[segment_count] = future
+ else:
+ # Standard mode: display chunks as-is
+ # Display transcription immediately (don't wait for LLM)
+ print(f"[{timestamp}] {text}")
+
+ # LLM Analysis (run concurrently in background) - only in non-sentence mode
+ if llm_analyzer and not sentence_extractor:
context = f"Segment {segment_count}"
# Submit LLM tasks to thread pool
@@ -701,6 +726,34 @@ Examples:
# Cleanup
capturer.close()
+ # Flush sentence buffer if in sentence mode
+ if sentence_extractor:
+ print("\n📝 Flushing sentence buffer...")
+ final_sentences = sentence_extractor.flush()
+ for sentence in final_sentences:
+ cleaned = sentence_cleaner.clean(sentence) if sentence_cleaner else sentence
+ if cleaned:
+ timestamp = datetime.now().strftime("%H:%M:%S")
+ print(f"[{timestamp}] 📝 {cleaned}")
+
+ if args.output and not llm_analyzer:
+ save_transcript(cleaned, timestamp, args.output)
+
+ # LLM analysis for flushed sentences
+ if llm_analyzer:
+ fact_check = llm_analyzer.fact_check(cleaned, "Final sentence")
+ questions = llm_analyzer.generate_augmenting_questions(cleaned)
+ display_enriched_output(cleaned, timestamp, fact_check, questions)
+
+ if args.output:
+ data = {
+ 'timestamp': timestamp,
+ 'text': cleaned,
+ 'fact_check': fact_check,
+ 'questions': questions
+ }
+ save_enriched_transcript(data, args.output)
+
# Process remaining audio
print("\nProcessing remaining audio...")
final_text = transcriber.transcribe_chunk(min_duration=0)
diff --git a/transcribe_speakers_llm.py b/transcribe_speakers_llm.py
deleted file mode 100755
index a3ff667..0000000
--- a/transcribe_speakers_llm.py
+++ /dev/null
@@ -1,636 +0,0 @@
-#!/usr/bin/env python3
-"""
-Real-time transcription of Windows speaker output using loopback capture.
-Captures system audio and transcribes with Whisper in near real-time.
-"""
-
-import sounddevice as sd
-import numpy as np
-import threading
-import queue
-import time
-import os
-import argparse
-import json
-from datetime import datetime
-
-# Choose your Whisper backend here:
-# For faster-whisper (recommended):
-from faster_whisper import WhisperModel
-
-# LLM integration
-try:
- import ollama
- OLLAMA_AVAILABLE = True
-except ImportError:
- OLLAMA_AVAILABLE = False
-
-
-# # For regular whisper (comment out the line above and uncomment these):
-# import whisper
-
-
-class WindowsLoopbackAudioCapture:
- """Capture Windows speaker output using WASAPI loopback"""
-
- def __init__(self, device_name=None, sample_rate=16000, chunk_size=2048):
- self.sample_rate = sample_rate
- self.chunk_size = chunk_size
-
- # Find loopback device
- self.device_info = self._find_loopback_device(device_name)
- if not self.device_info:
- raise RuntimeError(
- "No loopback device found.\n"
- "1. Ensure your speakers/headphones are connected\n"
- "2. Enable 'Stereo Mix' in Sound settings\n"
- "3. Or install VB-Cable virtual audio device"
- )
-
- print(f"✓ Using device: {self.device_info['name']} (index {self.device_info['index']})")
-
- # Queue for audio data
- self.audio_queue = queue.Queue()
- self.stop_event = threading.Event()
-
- # Start the stream
- try:
- self.stream = sd.InputStream(
- device=self.device_info['index'],
- channels=1,
- samplerate=sample_rate,
- blocksize=chunk_size,
- dtype='int16',
- latency='low',
- callback=self._audio_callback
- )
- self.stream.start()
- print("✓ Audio capture stream started")
- except Exception as e:
- raise RuntimeError(f"Failed to start audio stream: {e}")
-
- def _find_loopback_device(self, device_name):
- """Find the speaker device with loopback capability"""
- devices = sd.query_devices()
-
- # If device name specified, find exact match
- if device_name:
- for dev in devices:
- if (device_name.lower() in dev['name'].lower() and
- dev['max_input_channels'] > 0):
- return dev
-
- # Auto-detect: look for WASAPI speakers/headphones
- for dev in devices:
- if (dev['max_input_channels'] > 0 and
- any(x in dev['name'] for x in ['Speakers', 'Headphones', 'Output'])):
- return dev
-
- # Fallback: Stereo Mix or similar
- for dev in devices:
- if 'Stereo Mix' in dev['name']:
- return dev
-
- return None
-
- def _audio_callback(self, indata, frames, time_info, status):
- """Callback for audio data"""
- if status:
- print(f"⚠ Audio status: {status}")
- self.audio_queue.put(indata.copy())
-
- def read_chunk(self):
- """Read audio data from queue"""
- try:
- return self.audio_queue.get(timeout=0.05).flatten()
- except queue.Empty:
- return None
-
- def close(self):
- """Cleanup resources"""
- if hasattr(self, 'stream'):
- self.stream.stop()
- self.stream.close()
-
-
-class WhisperStreamTranscriber:
- """Process audio chunks with Whisper/faster-whisper"""
-
- def __init__(self, model_name="base", language="en", force_cpu=False):
- print(f"Loading Whisper model '{model_name}'...")
-
- # Check for CUDA availability
- import torch
- has_cuda = torch.cuda.is_available() and not force_cpu
-
- # Force CPU if CUDA libraries incompatible
- device = "cpu"
- compute_type = "int8"
-
- if has_cuda:
- try:
- # Test if CTranslate2 can actually use CUDA
- import ctranslate2
- cuda_count = ctranslate2.get_cuda_device_count()
- if cuda_count > 0:
- device = "cuda"
- compute_type = "float16"
- print(f"Using device: cuda ({torch.cuda.get_device_name(0)})")
- else:
- print(f"CUDA available in PyTorch but not in CTranslate2. Using CPU.")
- except Exception as e:
- print(f"CUDA libraries not found ({e}). Using CPU.")
- else:
- print("Using device: cpu")
-
- # FASTER-WHISPER (recommended):
- model_kwargs = {
- "device": device,
- "compute_type": compute_type
- }
- if not has_cuda:
- model_kwargs["cpu_threads"] = 4
-
- self.model = WhisperModel(model_name, **model_kwargs)
- self.language = language
- self.audio_buffer = np.array([], dtype=np.float32)
- self.lock = threading.Lock()
-
- # # REGULAR WHISPER:
- # self.model = whisper.load_model(model_name)
- # self.language = language
- # self.audio_buffer = np.array([], dtype=np.float32)
- # self.lock = threading.Lock()
-
- def add_audio(self, audio_chunk):
- """Add new audio data to buffer"""
- with self.lock:
- audio_float = audio_chunk.astype(np.float32) / 32768.0
- self.audio_buffer = np.concatenate([self.audio_buffer, audio_float])
-
- def transcribe_chunk(self, min_duration=5.0):
- """Transcribe accumulated audio if enough duration"""
- with self.lock:
- duration = len(self.audio_buffer) / 16000
- if duration < min_duration:
- return None
-
- audio_to_process = self.audio_buffer.copy()
- self.audio_buffer = np.array([], dtype=np.float32)
-
- # Process with FASTER-WHISPER:
- try:
- segments, _ = self.model.transcribe(
- audio_to_process,
- language=self.language,
- beam_size=5,
- vad_filter=True,
- vad_parameters=dict(min_silence_duration_ms=500),
- word_timestamps=False
- )
- text = " ".join([segment.text for segment in segments]).strip()
- return text if text else None
- except Exception as e:
- print(f"❌ Transcription error: {e}")
- return None
-
- # # REGULAR WHISPER:
- # try:
- # result = self.model.transcribe(
- # audio_to_process,
- # language=self.language,
- # task="transcribe",
- # fp16=False
- # )
- # return result["text"].strip()
- # except Exception as e:
- # print(f"❌ Transcription error: {e}")
- # return None
-
-
-class LocalLLMAnalyzer:
- """Local LLM for fact-checking and question generation using Ollama"""
-
- def __init__(self, model="llama3.2", debug=False):
- if not OLLAMA_AVAILABLE:
- raise RuntimeError(
- "Ollama package not installed.\n"
- "Install with: pip install ollama"
- )
-
- self.model = model
- self.debug = debug
- self._test_connection()
-
- def _test_connection(self):
- """Test connection to Ollama service"""
- try:
- ollama.list()
- print(f"✓ Ollama connected using model: {self.model}")
- except Exception as e:
- raise RuntimeError(
- f"Cannot connect to Ollama. Ensure it's installed and running.\n"
- f"Error: {e}\n"
- f"Install from: https://ollama.ai\n"
- f"Then run: ollama pull {self.model}"
- )
-
- def _extract_json(self, text):
- """Extract JSON from text that might contain markdown or other formatting"""
- # Try to find JSON block in markdown code fence
- import re
- json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', text, re.DOTALL)
- if json_match:
- return json_match.group(1)
-
- # Try to find raw JSON object
- json_match = re.search(r'\{.*\}', text, re.DOTALL)
- if json_match:
- return json_match.group(0)
-
- return text
-
- def fact_check(self, text, context=""):
- """Analyze text for factual accuracy"""
- # Try simple structured format first
- prompt = f"""Analyze this for accuracy. Reply in this exact format:
-
-VERDICT: [factual/dubious/not_factual]
-CONFIDENCE: [0.0-1.0]
-EXPLANATION: [one sentence]
-
-Statement: "{text}"
-"""
-
- try:
- response = ollama.generate(
- model=self.model,
- prompt=prompt,
- options={"temperature": 0.1, "num_predict": 150}
- )
-
- response_text = response['response'].strip()
-
- if self.debug:
- print(f"\n[DEBUG] Fact-check response:\n{response_text}\n")
-
- # Try to parse structured text format
- verdict = "dubious"
- confidence = 0.5
- explanation = response_text
-
- # Extract VERDICT
- import re
- verdict_match = re.search(r'VERDICT:\s*(\w+)', response_text, re.IGNORECASE)
- if verdict_match:
- verdict = verdict_match.group(1).lower()
-
- # Extract CONFIDENCE
- conf_match = re.search(r'CONFIDENCE:\s*([\d.]+)', response_text, re.IGNORECASE)
- if conf_match:
- try:
- confidence = float(conf_match.group(1))
- confidence = max(0.0, min(1.0, confidence)) # Clamp to 0-1
- except ValueError:
- pass
-
- # Extract EXPLANATION
- expl_match = re.search(r'EXPLANATION:\s*(.+?)(?:\n|$)', response_text, re.IGNORECASE | re.DOTALL)
- if expl_match:
- explanation = expl_match.group(1).strip()
-
- return {
- "verdict": verdict,
- "confidence": confidence,
- "explanation": explanation[:200], # Truncate if too long
- "sources": [],
- "corrections": ""
- }
-
- except Exception as e:
- if self.debug:
- print(f"[DEBUG] Fact-check error: {e}")
- return {
- "verdict": "error",
- "confidence": 0.0,
- "explanation": f"Analysis failed: {str(e)}",
- "sources": [],
- "corrections": ""
- }
-
- def generate_augmenting_questions(self, text, context=""):
- """Generate insightful questions based on the text"""
- prompt = f"""Generate 3 questions about this. Reply in this exact format:
-
-Q1: [question]
-Q2: [question]
-Q3: [question]
-
-Statement: "{text}"
-"""
-
- try:
- response = ollama.generate(
- model=self.model,
- prompt=prompt,
- options={"temperature": 0.7, "num_predict": 150}
- )
-
- response_text = response['response'].strip()
-
- if self.debug:
- print(f"\n[DEBUG] Questions response:\n{response_text}\n")
-
- # Extract questions
- import re
- questions = []
- for i in range(1, 4):
- q_match = re.search(rf'Q{i}:\s*(.+?)(?:\n|$)', response_text, re.IGNORECASE)
- if q_match:
- questions.append(q_match.group(1).strip())
-
- # If we couldn't parse, try to split by newlines and take first 3 non-empty lines
- if len(questions) < 3:
- lines = [line.strip() for line in response_text.split('\n') if line.strip()]
- questions = lines[:3] if lines else [
- "What are the key points here?",
- "What evidence supports this?",
- "What are the implications?"
- ]
-
- # Ensure we have exactly 3 questions
- while len(questions) < 3:
- questions.append("What else should we consider?")
-
- return {
- "questions": questions[:3],
- "topics": []
- }
-
- except Exception as e:
- if self.debug:
- print(f"[DEBUG] Questions error: {e}")
- return {
- "questions": [
- "What are the key points?",
- "What supports this claim?",
- "What are the implications?"
- ],
- "topics": []
- }
-
-
-def list_audio_devices():
- """Print all available audio input devices"""
- print("\nAvailable audio capture devices:")
- devices = sd.query_devices()
- for i, dev in enumerate(devices):
- if dev['max_input_channels'] > 0:
- print(f" [{i}] {dev['name']}")
- print(f" Channels: {dev['max_input_channels']} | Sample Rate: {dev['default_samplerate']}")
- print()
-
-
-def save_transcript(text, timestamp, filename):
- """Append transcript to file"""
- os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True)
- with open(filename, "a", encoding="utf-8") as f:
- f.write(f"[{timestamp}] {text}\n")
-
-
-def save_enriched_transcript(data, filename):
- """Save enriched transcript with LLM analysis"""
- os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True)
- with open(filename, "a", encoding="utf-8") as f:
- f.write(f"\n{'='*70}\n")
- f.write(f"[{data['timestamp']}] {data['text']}\n\n")
-
- if 'fact_check' in data:
- fc = data['fact_check']
- f.write(f"📊 Fact Check: {fc.get('verdict', 'N/A').upper()} "
- f"(confidence: {fc.get('confidence', 0):.2f})\n")
- f.write(f"💡 {fc.get('explanation', 'N/A')}\n")
- if fc.get('corrections'):
- f.write(f"✏️ Correction: {fc['corrections']}\n")
- f.write("\n")
-
- if 'questions' in data and data['questions'].get('questions'):
- f.write("❓ Questions:\n")
- for i, q in enumerate(data['questions']['questions'], 1):
- f.write(f"{i}. {q}\n")
- f.write("\n")
-
-
-def display_enriched_output(text, timestamp, fact_check=None, questions=None):
- """Display transcript with LLM analysis"""
- print(f"\n[{timestamp}] {text}")
-
- if fact_check:
- verdict_emoji = {
- 'factual': '✅',
- 'dubious': '⚠️',
- 'not_factual': '❌',
- 'error': '⚠️'
- }
- emoji = verdict_emoji.get(fact_check.get('verdict', 'error'), '❓')
-
- print(f"\n{emoji} Fact Check: {fact_check.get('verdict', 'N/A').upper()} "
- f"(confidence: {fact_check.get('confidence', 0):.2f})")
- print(f"💡 {fact_check.get('explanation', 'N/A')}")
-
- if fact_check.get('corrections'):
- print(f"✏️ Correction: {fact_check['corrections']}")
-
- if questions and questions.get('questions'):
- print(f"\n❓ Questions:")
- for i, q in enumerate(questions['questions'], 1):
- print(f" {i}. {q}")
-
-
-def main():
- parser = argparse.ArgumentParser(
- description="Real-time transcription of Windows speaker output",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
-Examples:
- python transcribe_speakers.py
- python transcribe_speakers.py --model small --language es --interval 5
- python transcribe_speakers.py --device "Speakers" --output "meeting.txt"
- python transcribe_speakers.py --model medium --interval 10 --output transcripts/live.txt
- """
- )
-
- parser.add_argument("--model", default="base",
- choices=["tiny", "base", "small", "medium", "large"],
- help="Whisper model size (default: base)")
- parser.add_argument("--language", default="en",
- help="Language code (default: en)")
- parser.add_argument("--device", metavar="NAME",
- help="Audio device name (partial match). If not specified, auto-detects")
- parser.add_argument("--interval", type=float, default=8.0,
- help="Processing interval in seconds (default: 8.0)")
- parser.add_argument("--output", "-o", metavar="FILE",
- help="Save transcript to file (e.g., transcript.txt)")
- parser.add_argument("--list-devices", action="store_true",
- help="List all available audio devices and exit")
- parser.add_argument("--force-cpu", action="store_true",
- help="Force CPU processing (disable GPU acceleration)")
- parser.add_argument("--enable-llm", action="store_true",
- help="Enable LLM analysis (fact-checking and questions)")
- parser.add_argument("--llm-model", default="gpt-oss:20b",
- help="Ollama model to use for LLM analysis (default: gpt-oss:20b)")
- parser.add_argument("--llm-debug", action="store_true",
- help="Show LLM raw responses for debugging")
-
- args = parser.parse_args()
-
- if args.list_devices:
- list_audio_devices()
- return
-
- print("=== Windows Real-Time Audio Transcription ===")
- print(f"Model: {args.model} | Language: {args.language} | Interval: {args.interval}s")
- if args.output:
- print(f"Output: {args.output}")
- if args.enable_llm:
- print(f"LLM Analysis: Enabled ({args.llm_model})")
-
- # Initialize audio capture
- try:
- capturer = WindowsLoopbackAudioCapture(
- device_name=args.device,
- sample_rate=16000,
- chunk_size=2048
- )
- except RuntimeError as e:
- print(f"\n❌ Audio Error: {e}")
- print("\nTo fix this:")
- print("1. Right-click speaker icon → Sounds → Recording tab")
- print("2. Right-click in empty area → Show Disabled Devices")
- print("3. Enable 'Stereo Mix' → Set as Default Device")
- print("\nAlternative: Install VB-Cable (free) from vb-audio.com")
- print(" Then use: --device 'CABLE Output'")
- list_audio_devices()
- return
-
- # Initialize transcriber
- try:
- transcriber = WhisperStreamTranscriber(
- model_name=args.model,
- language=args.language,
- force_cpu=args.force_cpu
- )
- except Exception as e:
- print(f"\n❌ Model Error: {e}")
- print("Make sure you installed Whisper correctly")
- return
-
- # Initialize LLM analyzer (optional)
- llm_analyzer = None
- if args.enable_llm:
- try:
- llm_analyzer = LocalLLMAnalyzer(model=args.llm_model, debug=args.llm_debug)
- except RuntimeError as e:
- print(f"\n❌ LLM Error: {e}")
- print("Continuing without LLM analysis...")
- llm_analyzer = None
-
- # Main processing loop
- print(f"\n✅ Started transcription. Press Ctrl+C to stop.\n{'=' * 50}")
- last_process_time = time.time()
- total_duration = 0
- segment_count = 0
-
- try:
- while True:
- # Collect audio
- chunk = capturer.read_chunk()
- if chunk is not None:
- transcriber.add_audio(chunk)
- total_duration += len(chunk) / 16000
-
- # Process at intervals
- current_time = time.time()
- if current_time - last_process_time >= args.interval:
- text = transcriber.transcribe_chunk()
- if text:
- segment_count += 1
- timestamp = datetime.now().strftime("%H:%M:%S")
-
- # LLM Analysis
- fact_check = None
- questions = None
- if llm_analyzer:
- context = f"Segment {segment_count}"
- fact_check = llm_analyzer.fact_check(text, context)
- questions = llm_analyzer.generate_augmenting_questions(text, context)
-
- # Display output
- if llm_analyzer:
- display_enriched_output(text, timestamp, fact_check, questions)
- else:
- print(f"[{timestamp}] {text}")
-
- # Save output
- if args.output:
- if llm_analyzer:
- data = {
- 'timestamp': timestamp,
- 'text': text,
- 'fact_check': fact_check,
- 'questions': questions
- }
- save_enriched_transcript(data, args.output)
- else:
- save_transcript(text, timestamp, args.output)
-
- last_process_time = current_time
-
- except KeyboardInterrupt:
- print(f"\n{'=' * 50}\n🛑 Stopping transcription...")
-
- # Cleanup
- capturer.close()
-
- # Process remaining audio
- print("\nProcessing remaining audio...")
- final_text = transcriber.transcribe_chunk(min_duration=0)
- if final_text:
- timestamp = datetime.now().strftime("%H:%M:%S")
-
- # LLM Analysis for final segment
- fact_check = None
- questions = None
- if llm_analyzer:
- fact_check = llm_analyzer.fact_check(final_text, "Final segment")
- questions = llm_analyzer.generate_augmenting_questions(final_text)
-
- # Display output
- if llm_analyzer:
- display_enriched_output(final_text, timestamp, fact_check, questions)
- else:
- print(f"[{timestamp}] {final_text}")
-
- # Save output
- if args.output:
- if llm_analyzer:
- data = {
- 'timestamp': timestamp,
- 'text': final_text,
- 'fact_check': fact_check,
- 'questions': questions
- }
- save_enriched_transcript(data, args.output)
- else:
- save_transcript(final_text, timestamp, args.output)
-
- # Summary
- print(f"\n✅ Complete! Processed {total_duration:.1f}s of audio")
- print(f" Generated {segment_count} transcript segments")
- if args.output and os.path.exists(args.output):
- abs_path = os.path.abspath(args.output)
- print(f"💾 Transcript saved to: {abs_path}")
-
-
-if __name__ == "__main__":
- main()
\ No newline at end of file