first commit
This commit is contained in:
596
transcribe_No_llm.py
Executable file
596
transcribe_No_llm.py
Executable file
@@ -0,0 +1,596 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Real-time transcription of Windows speaker output using loopback capture.
|
||||
Captures system audio and transcribes with Whisper in near real-time.
|
||||
"""
|
||||
|
||||
import sounddevice as sd
|
||||
import numpy as np
|
||||
import threading
|
||||
import queue
|
||||
import time
|
||||
import os
|
||||
import argparse
|
||||
import json
|
||||
from datetime import datetime
|
||||
|
||||
# Choose your Whisper backend here:
|
||||
# For faster-whisper (recommended):
|
||||
from faster_whisper import WhisperModel
|
||||
|
||||
# LLM integration
|
||||
try:
|
||||
import ollama
|
||||
OLLAMA_AVAILABLE = True
|
||||
except ImportError:
|
||||
OLLAMA_AVAILABLE = False
|
||||
|
||||
|
||||
# # For regular whisper (comment out the line above and uncomment these):
|
||||
# import whisper
|
||||
|
||||
|
||||
class WindowsLoopbackAudioCapture:
|
||||
"""Capture Windows speaker output using WASAPI loopback"""
|
||||
|
||||
def __init__(self, device_name=None, sample_rate=16000, chunk_size=2048):
|
||||
self.sample_rate = sample_rate
|
||||
self.chunk_size = chunk_size
|
||||
|
||||
# Find loopback device
|
||||
self.device_info = self._find_loopback_device(device_name)
|
||||
if not self.device_info:
|
||||
raise RuntimeError(
|
||||
"No loopback device found.\n"
|
||||
"1. Ensure your speakers/headphones are connected\n"
|
||||
"2. Enable 'Stereo Mix' in Sound settings\n"
|
||||
"3. Or install VB-Cable virtual audio device"
|
||||
)
|
||||
|
||||
print(f"✓ Using device: {self.device_info['name']} (index {self.device_info['index']})")
|
||||
|
||||
# Queue for audio data
|
||||
self.audio_queue = queue.Queue()
|
||||
self.stop_event = threading.Event()
|
||||
|
||||
# Start the stream
|
||||
try:
|
||||
self.stream = sd.InputStream(
|
||||
device=self.device_info['index'],
|
||||
channels=1,
|
||||
samplerate=sample_rate,
|
||||
blocksize=chunk_size,
|
||||
dtype='int16',
|
||||
latency='low',
|
||||
callback=self._audio_callback
|
||||
)
|
||||
self.stream.start()
|
||||
print("✓ Audio capture stream started")
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to start audio stream: {e}")
|
||||
|
||||
def _find_loopback_device(self, device_name):
|
||||
"""Find the speaker device with loopback capability"""
|
||||
devices = sd.query_devices()
|
||||
|
||||
# If device name specified, find exact match
|
||||
if device_name:
|
||||
for dev in devices:
|
||||
if (device_name.lower() in dev['name'].lower() and
|
||||
dev['max_input_channels'] > 0):
|
||||
return dev
|
||||
|
||||
# Auto-detect: look for WASAPI speakers/headphones
|
||||
for dev in devices:
|
||||
if (dev['max_input_channels'] > 0 and
|
||||
any(x in dev['name'] for x in ['Speakers', 'Headphones', 'Output'])):
|
||||
return dev
|
||||
|
||||
# Fallback: Stereo Mix or similar
|
||||
for dev in devices:
|
||||
if 'Stereo Mix' in dev['name']:
|
||||
return dev
|
||||
|
||||
return None
|
||||
|
||||
def _audio_callback(self, indata, frames, time_info, status):
|
||||
"""Callback for audio data"""
|
||||
if status:
|
||||
print(f"⚠ Audio status: {status}")
|
||||
self.audio_queue.put(indata.copy())
|
||||
|
||||
def read_chunk(self):
|
||||
"""Read audio data from queue"""
|
||||
try:
|
||||
return self.audio_queue.get(timeout=0.05).flatten()
|
||||
except queue.Empty:
|
||||
return None
|
||||
|
||||
def close(self):
|
||||
"""Cleanup resources"""
|
||||
if hasattr(self, 'stream'):
|
||||
self.stream.stop()
|
||||
self.stream.close()
|
||||
|
||||
|
||||
class WhisperStreamTranscriber:
|
||||
"""Process audio chunks with Whisper/faster-whisper"""
|
||||
|
||||
def __init__(self, model_name="base", language="en", force_cpu=False):
|
||||
print(f"Loading Whisper model '{model_name}'...")
|
||||
|
||||
# Check for CUDA availability
|
||||
import torch
|
||||
has_cuda = torch.cuda.is_available() and not force_cpu
|
||||
|
||||
# Force CPU if CUDA libraries incompatible
|
||||
device = "cpu"
|
||||
compute_type = "int8"
|
||||
|
||||
if has_cuda:
|
||||
try:
|
||||
# Test if CTranslate2 can actually use CUDA
|
||||
import ctranslate2
|
||||
cuda_count = ctranslate2.get_cuda_device_count()
|
||||
if cuda_count > 0:
|
||||
device = "cuda"
|
||||
compute_type = "float16"
|
||||
print(f"Using device: cuda ({torch.cuda.get_device_name(0)})")
|
||||
else:
|
||||
print(f"CUDA available in PyTorch but not in CTranslate2. Using CPU.")
|
||||
except Exception as e:
|
||||
print(f"CUDA libraries not found ({e}). Using CPU.")
|
||||
else:
|
||||
print("Using device: cpu")
|
||||
|
||||
# FASTER-WHISPER (recommended):
|
||||
model_kwargs = {
|
||||
"device": device,
|
||||
"compute_type": compute_type
|
||||
}
|
||||
if not has_cuda:
|
||||
model_kwargs["cpu_threads"] = 4
|
||||
|
||||
self.model = WhisperModel(model_name, **model_kwargs)
|
||||
self.language = language
|
||||
self.audio_buffer = np.array([], dtype=np.float32)
|
||||
self.lock = threading.Lock()
|
||||
|
||||
# # REGULAR WHISPER:
|
||||
# self.model = whisper.load_model(model_name)
|
||||
# self.language = language
|
||||
# self.audio_buffer = np.array([], dtype=np.float32)
|
||||
# self.lock = threading.Lock()
|
||||
|
||||
def add_audio(self, audio_chunk):
|
||||
"""Add new audio data to buffer"""
|
||||
with self.lock:
|
||||
audio_float = audio_chunk.astype(np.float32) / 32768.0
|
||||
self.audio_buffer = np.concatenate([self.audio_buffer, audio_float])
|
||||
|
||||
def transcribe_chunk(self, min_duration=5.0):
|
||||
"""Transcribe accumulated audio if enough duration"""
|
||||
with self.lock:
|
||||
duration = len(self.audio_buffer) / 16000
|
||||
if duration < min_duration:
|
||||
return None
|
||||
|
||||
audio_to_process = self.audio_buffer.copy()
|
||||
self.audio_buffer = np.array([], dtype=np.float32)
|
||||
|
||||
# Process with FASTER-WHISPER:
|
||||
try:
|
||||
segments, _ = self.model.transcribe(
|
||||
audio_to_process,
|
||||
language=self.language,
|
||||
beam_size=5,
|
||||
vad_filter=True,
|
||||
vad_parameters=dict(min_silence_duration_ms=500),
|
||||
word_timestamps=False
|
||||
)
|
||||
text = " ".join([segment.text for segment in segments]).strip()
|
||||
return text if text else None
|
||||
except Exception as e:
|
||||
print(f"❌ Transcription error: {e}")
|
||||
return None
|
||||
|
||||
# # REGULAR WHISPER:
|
||||
# try:
|
||||
# result = self.model.transcribe(
|
||||
# audio_to_process,
|
||||
# language=self.language,
|
||||
# task="transcribe",
|
||||
# fp16=False
|
||||
# )
|
||||
# return result["text"].strip()
|
||||
# except Exception as e:
|
||||
# print(f"❌ Transcription error: {e}")
|
||||
# return None
|
||||
|
||||
|
||||
class LocalLLMAnalyzer:
|
||||
"""Local LLM for fact-checking and question generation using Ollama"""
|
||||
|
||||
def __init__(self, model="llama3.2"):
|
||||
if not OLLAMA_AVAILABLE:
|
||||
raise RuntimeError(
|
||||
"Ollama package not installed.\n"
|
||||
"Install with: pip install ollama"
|
||||
)
|
||||
|
||||
self.model = model
|
||||
self._test_connection()
|
||||
|
||||
def _test_connection(self):
|
||||
"""Test connection to Ollama service"""
|
||||
try:
|
||||
ollama.list()
|
||||
print(f"✓ Ollama connected using model: {self.model}")
|
||||
except Exception as e:
|
||||
raise RuntimeError(
|
||||
f"Cannot connect to Ollama. Ensure it's installed and running.\n"
|
||||
f"Error: {e}\n"
|
||||
f"Install from: https://ollama.ai\n"
|
||||
f"Then run: ollama pull {self.model}"
|
||||
)
|
||||
|
||||
def _extract_json(self, text):
|
||||
"""Extract JSON from text that might contain markdown or other formatting"""
|
||||
# Try to find JSON block in markdown code fence
|
||||
import re
|
||||
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', text, re.DOTALL)
|
||||
if json_match:
|
||||
return json_match.group(1)
|
||||
|
||||
# Try to find raw JSON object
|
||||
json_match = re.search(r'\{.*\}', text, re.DOTALL)
|
||||
if json_match:
|
||||
return json_match.group(0)
|
||||
|
||||
return text
|
||||
|
||||
def fact_check(self, text, context=""):
|
||||
"""Analyze text for factual accuracy"""
|
||||
prompt = f"""You are a fact-checking assistant. Analyze this statement for factual accuracy.
|
||||
|
||||
Context: {context}
|
||||
Statement: "{text}"
|
||||
|
||||
You must respond with ONLY valid JSON in this exact format, no other text:
|
||||
{{
|
||||
"verdict": "factual",
|
||||
"confidence": 0.95,
|
||||
"explanation": "Brief explanation here",
|
||||
"sources": ["source1"],
|
||||
"corrections": ""
|
||||
}}
|
||||
|
||||
Valid verdict values: "factual", "dubious", "not_factual"
|
||||
Confidence must be a number between 0.0 and 1.0."""
|
||||
|
||||
try:
|
||||
response = ollama.generate(
|
||||
model=self.model,
|
||||
prompt=prompt,
|
||||
options={"temperature": 0.1, "num_predict": 200}
|
||||
)
|
||||
|
||||
# Extract and parse JSON
|
||||
response_text = response['response']
|
||||
json_text = self._extract_json(response_text)
|
||||
result = json.loads(json_text)
|
||||
|
||||
# Validate required fields
|
||||
if 'verdict' not in result or 'confidence' not in result:
|
||||
raise ValueError("Missing required fields")
|
||||
|
||||
# Ensure defaults for optional fields
|
||||
result.setdefault('explanation', 'No explanation provided')
|
||||
result.setdefault('sources', [])
|
||||
result.setdefault('corrections', '')
|
||||
|
||||
return result
|
||||
|
||||
except (json.JSONDecodeError, ValueError) as e:
|
||||
# Return a simple analysis without JSON parsing
|
||||
return {
|
||||
"verdict": "dubious",
|
||||
"confidence": 0.5,
|
||||
"explanation": f"Could not parse LLM response properly. Model may need JSON format support.",
|
||||
"sources": [],
|
||||
"corrections": ""
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
"verdict": "error",
|
||||
"confidence": 0.0,
|
||||
"explanation": f"Analysis failed: {str(e)}",
|
||||
"sources": [],
|
||||
"corrections": ""
|
||||
}
|
||||
|
||||
def generate_augmenting_questions(self, text, context=""):
|
||||
"""Generate insightful questions based on the text"""
|
||||
prompt = f"""Based on this statement, generate 3 insightful questions that would help understand the topic better.
|
||||
|
||||
Statement: "{text}"
|
||||
Context: {context}
|
||||
|
||||
Respond with JSON only:
|
||||
{{
|
||||
"questions": ["Question 1", "Question 2", "Question 3"],
|
||||
"topics": ["key_topic_1", "key_topic_2"]
|
||||
}}"""
|
||||
|
||||
try:
|
||||
response = ollama.generate(
|
||||
model=self.model,
|
||||
prompt=prompt,
|
||||
format="json",
|
||||
options={"temperature": 0.7}
|
||||
)
|
||||
return json.loads(response['response'])
|
||||
except json.JSONDecodeError:
|
||||
return {
|
||||
"questions": ["Error: LLM response was not valid JSON"],
|
||||
"topics": []
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
"questions": [f"Error: {str(e)}"],
|
||||
"topics": []
|
||||
}
|
||||
|
||||
|
||||
def list_audio_devices():
|
||||
"""Print all available audio input devices"""
|
||||
print("\nAvailable audio capture devices:")
|
||||
devices = sd.query_devices()
|
||||
for i, dev in enumerate(devices):
|
||||
if dev['max_input_channels'] > 0:
|
||||
print(f" [{i}] {dev['name']}")
|
||||
print(f" Channels: {dev['max_input_channels']} | Sample Rate: {dev['default_samplerate']}")
|
||||
print()
|
||||
|
||||
|
||||
def save_transcript(text, timestamp, filename):
|
||||
"""Append transcript to file"""
|
||||
os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True)
|
||||
with open(filename, "a", encoding="utf-8") as f:
|
||||
f.write(f"[{timestamp}] {text}\n")
|
||||
|
||||
|
||||
def save_enriched_transcript(data, filename):
|
||||
"""Save enriched transcript with LLM analysis"""
|
||||
os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True)
|
||||
with open(filename, "a", encoding="utf-8") as f:
|
||||
f.write(f"\n{'='*70}\n")
|
||||
f.write(f"[{data['timestamp']}] {data['text']}\n\n")
|
||||
|
||||
if 'fact_check' in data:
|
||||
fc = data['fact_check']
|
||||
f.write(f"📊 Fact Check: {fc.get('verdict', 'N/A').upper()} "
|
||||
f"(confidence: {fc.get('confidence', 0):.2f})\n")
|
||||
f.write(f"💡 {fc.get('explanation', 'N/A')}\n")
|
||||
if fc.get('corrections'):
|
||||
f.write(f"✏️ Correction: {fc['corrections']}\n")
|
||||
f.write("\n")
|
||||
|
||||
if 'questions' in data and data['questions'].get('questions'):
|
||||
f.write("❓ Questions:\n")
|
||||
for i, q in enumerate(data['questions']['questions'], 1):
|
||||
f.write(f"{i}. {q}\n")
|
||||
f.write("\n")
|
||||
|
||||
|
||||
def display_enriched_output(text, timestamp, fact_check=None, questions=None):
|
||||
"""Display transcript with LLM analysis"""
|
||||
print(f"\n[{timestamp}] {text}")
|
||||
|
||||
if fact_check:
|
||||
verdict_emoji = {
|
||||
'factual': '✅',
|
||||
'dubious': '⚠️',
|
||||
'not_factual': '❌',
|
||||
'error': '⚠️'
|
||||
}
|
||||
emoji = verdict_emoji.get(fact_check.get('verdict', 'error'), '❓')
|
||||
|
||||
print(f"\n{emoji} Fact Check: {fact_check.get('verdict', 'N/A').upper()} "
|
||||
f"(confidence: {fact_check.get('confidence', 0):.2f})")
|
||||
print(f"💡 {fact_check.get('explanation', 'N/A')}")
|
||||
|
||||
if fact_check.get('corrections'):
|
||||
print(f"✏️ Correction: {fact_check['corrections']}")
|
||||
|
||||
if questions and questions.get('questions'):
|
||||
print(f"\n❓ Questions:")
|
||||
for i, q in enumerate(questions['questions'], 1):
|
||||
print(f" {i}. {q}")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Real-time transcription of Windows speaker output",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
Examples:
|
||||
python transcribe_speakers.py
|
||||
python transcribe_speakers.py --model small --language es --interval 5
|
||||
python transcribe_speakers.py --device "Speakers" --output "meeting.txt"
|
||||
python transcribe_speakers.py --model medium --interval 10 --output transcripts/live.txt
|
||||
"""
|
||||
)
|
||||
|
||||
parser.add_argument("--model", default="base",
|
||||
choices=["tiny", "base", "small", "medium", "large"],
|
||||
help="Whisper model size (default: base)")
|
||||
parser.add_argument("--language", default="en",
|
||||
help="Language code (default: en)")
|
||||
parser.add_argument("--device", metavar="NAME",
|
||||
help="Audio device name (partial match). If not specified, auto-detects")
|
||||
parser.add_argument("--interval", type=float, default=8.0,
|
||||
help="Processing interval in seconds (default: 8.0)")
|
||||
parser.add_argument("--output", "-o", metavar="FILE",
|
||||
help="Save transcript to file (e.g., transcript.txt)")
|
||||
parser.add_argument("--list-devices", action="store_true",
|
||||
help="List all available audio devices and exit")
|
||||
parser.add_argument("--force-cpu", action="store_true",
|
||||
help="Force CPU processing (disable GPU acceleration)")
|
||||
parser.add_argument("--enable-llm", action="store_true",
|
||||
help="Enable LLM analysis (fact-checking and questions)")
|
||||
parser.add_argument("--llm-model", default="gpt-oss:20b",
|
||||
help="Ollama model to use for LLM analysis (default: gpt-oss:20b)")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.list_devices:
|
||||
list_audio_devices()
|
||||
return
|
||||
|
||||
print("=== Windows Real-Time Audio Transcription ===")
|
||||
print(f"Model: {args.model} | Language: {args.language} | Interval: {args.interval}s")
|
||||
if args.output:
|
||||
print(f"Output: {args.output}")
|
||||
if args.enable_llm:
|
||||
print(f"LLM Analysis: Enabled ({args.llm_model})")
|
||||
|
||||
# Initialize audio capture
|
||||
try:
|
||||
capturer = WindowsLoopbackAudioCapture(
|
||||
device_name=args.device,
|
||||
sample_rate=16000,
|
||||
chunk_size=2048
|
||||
)
|
||||
except RuntimeError as e:
|
||||
print(f"\n❌ Audio Error: {e}")
|
||||
print("\nTo fix this:")
|
||||
print("1. Right-click speaker icon → Sounds → Recording tab")
|
||||
print("2. Right-click in empty area → Show Disabled Devices")
|
||||
print("3. Enable 'Stereo Mix' → Set as Default Device")
|
||||
print("\nAlternative: Install VB-Cable (free) from vb-audio.com")
|
||||
print(" Then use: --device 'CABLE Output'")
|
||||
list_audio_devices()
|
||||
return
|
||||
|
||||
# Initialize transcriber
|
||||
try:
|
||||
transcriber = WhisperStreamTranscriber(
|
||||
model_name=args.model,
|
||||
language=args.language,
|
||||
force_cpu=args.force_cpu
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"\n❌ Model Error: {e}")
|
||||
print("Make sure you installed Whisper correctly")
|
||||
return
|
||||
|
||||
# Initialize LLM analyzer (optional)
|
||||
llm_analyzer = None
|
||||
if args.enable_llm:
|
||||
try:
|
||||
llm_analyzer = LocalLLMAnalyzer(model=args.llm_model)
|
||||
except RuntimeError as e:
|
||||
print(f"\n❌ LLM Error: {e}")
|
||||
print("Continuing without LLM analysis...")
|
||||
llm_analyzer = None
|
||||
|
||||
# Main processing loop
|
||||
print(f"\n✅ Started transcription. Press Ctrl+C to stop.\n{'=' * 50}")
|
||||
last_process_time = time.time()
|
||||
total_duration = 0
|
||||
segment_count = 0
|
||||
|
||||
try:
|
||||
while True:
|
||||
# Collect audio
|
||||
chunk = capturer.read_chunk()
|
||||
if chunk is not None:
|
||||
transcriber.add_audio(chunk)
|
||||
total_duration += len(chunk) / 16000
|
||||
|
||||
# Process at intervals
|
||||
current_time = time.time()
|
||||
if current_time - last_process_time >= args.interval:
|
||||
text = transcriber.transcribe_chunk()
|
||||
if text:
|
||||
segment_count += 1
|
||||
timestamp = datetime.now().strftime("%H:%M:%S")
|
||||
|
||||
# LLM Analysis
|
||||
fact_check = None
|
||||
questions = None
|
||||
if llm_analyzer:
|
||||
context = f"Segment {segment_count}"
|
||||
fact_check = llm_analyzer.fact_check(text, context)
|
||||
questions = llm_analyzer.generate_augmenting_questions(text, context)
|
||||
|
||||
# Display output
|
||||
if llm_analyzer:
|
||||
display_enriched_output(text, timestamp, fact_check, questions)
|
||||
else:
|
||||
print(f"[{timestamp}] {text}")
|
||||
|
||||
# Save output
|
||||
if args.output:
|
||||
if llm_analyzer:
|
||||
data = {
|
||||
'timestamp': timestamp,
|
||||
'text': text,
|
||||
'fact_check': fact_check,
|
||||
'questions': questions
|
||||
}
|
||||
save_enriched_transcript(data, args.output)
|
||||
else:
|
||||
save_transcript(text, timestamp, args.output)
|
||||
|
||||
last_process_time = current_time
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print(f"\n{'=' * 50}\n🛑 Stopping transcription...")
|
||||
|
||||
# Cleanup
|
||||
capturer.close()
|
||||
|
||||
# Process remaining audio
|
||||
print("\nProcessing remaining audio...")
|
||||
final_text = transcriber.transcribe_chunk(min_duration=0)
|
||||
if final_text:
|
||||
timestamp = datetime.now().strftime("%H:%M:%S")
|
||||
|
||||
# LLM Analysis for final segment
|
||||
fact_check = None
|
||||
questions = None
|
||||
if llm_analyzer:
|
||||
fact_check = llm_analyzer.fact_check(final_text, "Final segment")
|
||||
questions = llm_analyzer.generate_augmenting_questions(final_text)
|
||||
|
||||
# Display output
|
||||
if llm_analyzer:
|
||||
display_enriched_output(final_text, timestamp, fact_check, questions)
|
||||
else:
|
||||
print(f"[{timestamp}] {final_text}")
|
||||
|
||||
# Save output
|
||||
if args.output:
|
||||
if llm_analyzer:
|
||||
data = {
|
||||
'timestamp': timestamp,
|
||||
'text': final_text,
|
||||
'fact_check': fact_check,
|
||||
'questions': questions
|
||||
}
|
||||
save_enriched_transcript(data, args.output)
|
||||
else:
|
||||
save_transcript(final_text, timestamp, args.output)
|
||||
|
||||
# Summary
|
||||
print(f"\n✅ Complete! Processed {total_duration:.1f}s of audio")
|
||||
print(f" Generated {segment_count} transcript segments")
|
||||
if args.output and os.path.exists(args.output):
|
||||
abs_path = os.path.abspath(args.output)
|
||||
print(f"💾 Transcript saved to: {abs_path}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user