438 lines
16 KiB
Python
438 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Real-time audio transcription with dual capture and optional LLM analysis.
|
|
Supports microphone + speaker monitor, file output, and fact-checking.
|
|
"""
|
|
|
|
import sounddevice as sd
|
|
import numpy as np
|
|
import threading
|
|
import queue
|
|
import time
|
|
import os
|
|
import argparse
|
|
from datetime import datetime
|
|
from faster_whisper import WhisperModel
|
|
|
|
try:
|
|
import ollama
|
|
OLLAMA_AVAILABLE = True
|
|
except ImportError:
|
|
OLLAMA_AVAILABLE = False
|
|
|
|
|
|
class DualAudioCapture:
|
|
"""Capture both microphone and speaker output simultaneously"""
|
|
|
|
def __init__(self, mic_device=None, monitor_device=None, sample_rate=16000, chunk_size=2048):
|
|
self.sample_rate = sample_rate
|
|
self.chunk_size = chunk_size
|
|
self.audio_queue = queue.Queue()
|
|
|
|
# Find devices
|
|
devices = sd.query_devices()
|
|
|
|
# Microphone (default input or specified)
|
|
if mic_device is None:
|
|
self.mic_device = sd.default.device[0] # Default input
|
|
else:
|
|
self.mic_device = self._find_device(mic_device, input_required=True)
|
|
|
|
# Monitor/Loopback (for speaker output)
|
|
if monitor_device:
|
|
self.monitor_device = self._find_device(monitor_device, input_required=True)
|
|
else:
|
|
self.monitor_device = None
|
|
|
|
print(f"✓ Microphone: {devices[self.mic_device]['name']} (index {self.mic_device})")
|
|
if self.monitor_device:
|
|
print(f"✓ Monitor: {devices[self.monitor_device]['name']} (index {self.monitor_device})")
|
|
else:
|
|
print("⚠ No monitor device - capturing microphone only")
|
|
|
|
# Start streams
|
|
self.mic_stream = sd.InputStream(
|
|
device=self.mic_device,
|
|
channels=1,
|
|
samplerate=sample_rate,
|
|
blocksize=chunk_size,
|
|
dtype='int16',
|
|
callback=self._mic_callback
|
|
)
|
|
|
|
if self.monitor_device:
|
|
self.monitor_stream = sd.InputStream(
|
|
device=self.monitor_device,
|
|
channels=1,
|
|
samplerate=sample_rate,
|
|
blocksize=chunk_size,
|
|
dtype='int16',
|
|
callback=self._monitor_callback
|
|
)
|
|
else:
|
|
self.monitor_stream = None
|
|
|
|
self.mic_stream.start()
|
|
if self.monitor_stream:
|
|
self.monitor_stream.start()
|
|
|
|
print("✓ Audio capture started")
|
|
|
|
def _find_device(self, device_name, input_required=True):
|
|
"""Find device by name substring"""
|
|
devices = sd.query_devices()
|
|
for i, dev in enumerate(devices):
|
|
if device_name.lower() in dev['name'].lower():
|
|
if not input_required or dev['max_input_channels'] > 0:
|
|
return i
|
|
raise RuntimeError(f"Device '{device_name}' not found")
|
|
|
|
def _mic_callback(self, indata, frames, time_info, status):
|
|
"""Microphone audio callback"""
|
|
if status:
|
|
print(f"⚠ Mic status: {status}")
|
|
self.audio_queue.put(('mic', indata.copy()))
|
|
|
|
def _monitor_callback(self, indata, frames, time_info, status):
|
|
"""Monitor/speaker audio callback"""
|
|
if status:
|
|
print(f"⚠ Monitor status: {status}")
|
|
self.audio_queue.put(('monitor', indata.copy()))
|
|
|
|
def read_chunk(self):
|
|
"""Read audio data from queue"""
|
|
try:
|
|
return self.audio_queue.get(timeout=0.05)
|
|
except queue.Empty:
|
|
return None
|
|
|
|
def close(self):
|
|
"""Cleanup resources"""
|
|
self.mic_stream.stop()
|
|
self.mic_stream.close()
|
|
if self.monitor_stream:
|
|
self.monitor_stream.stop()
|
|
self.monitor_stream.close()
|
|
|
|
|
|
class WhisperTranscriber:
|
|
"""Process audio with Whisper"""
|
|
|
|
def __init__(self, model_name="base", language="en", force_cpu=False):
|
|
print(f"Loading Whisper model '{model_name}'...")
|
|
|
|
import torch
|
|
has_cuda = torch.cuda.is_available() and not force_cpu
|
|
|
|
device = "cpu"
|
|
compute_type = "int8"
|
|
|
|
if has_cuda:
|
|
try:
|
|
import ctranslate2
|
|
if ctranslate2.get_cuda_device_count() > 0:
|
|
device = "cuda"
|
|
compute_type = "float16"
|
|
print(f"✓ Using GPU: {torch.cuda.get_device_name(0)}")
|
|
except Exception as e:
|
|
print(f"⚠ CUDA unavailable: {e}")
|
|
|
|
if device == "cpu":
|
|
print("✓ Using CPU")
|
|
|
|
model_kwargs = {"device": device, "compute_type": compute_type}
|
|
if device == "cpu":
|
|
model_kwargs["cpu_threads"] = 4
|
|
|
|
self.model = WhisperModel(model_name, **model_kwargs)
|
|
self.language = language
|
|
self.mic_buffer = np.array([], dtype=np.float32)
|
|
self.monitor_buffer = np.array([], dtype=np.float32)
|
|
self.lock = threading.Lock()
|
|
|
|
def add_audio(self, source, audio_chunk):
|
|
"""Add audio to appropriate buffer"""
|
|
with self.lock:
|
|
audio_float = audio_chunk.flatten().astype(np.float32) / 32768.0
|
|
if source == 'mic':
|
|
self.mic_buffer = np.concatenate([self.mic_buffer, audio_float])
|
|
else:
|
|
self.monitor_buffer = np.concatenate([self.monitor_buffer, audio_float])
|
|
|
|
def transcribe_chunk(self, min_duration=3.0):
|
|
"""Transcribe accumulated audio"""
|
|
with self.lock:
|
|
mic_duration = len(self.mic_buffer) / 16000
|
|
monitor_duration = len(self.monitor_buffer) / 16000
|
|
|
|
results = {}
|
|
|
|
# Transcribe microphone
|
|
if mic_duration >= min_duration:
|
|
mic_audio = self.mic_buffer.copy()
|
|
self.mic_buffer = np.array([], dtype=np.float32)
|
|
results['mic'] = self._transcribe(mic_audio)
|
|
|
|
# Transcribe monitor
|
|
if monitor_duration >= min_duration:
|
|
monitor_audio = self.monitor_buffer.copy()
|
|
self.monitor_buffer = np.array([], dtype=np.float32)
|
|
results['monitor'] = self._transcribe(monitor_audio)
|
|
|
|
return results if results else None
|
|
|
|
def _transcribe(self, audio):
|
|
"""Internal transcription"""
|
|
try:
|
|
segments, _ = self.model.transcribe(
|
|
audio,
|
|
language=self.language,
|
|
beam_size=3,
|
|
vad_filter=True,
|
|
vad_parameters=dict(min_silence_duration_ms=500)
|
|
)
|
|
text = " ".join([seg.text for seg in segments]).strip()
|
|
return text if text else None
|
|
except Exception as e:
|
|
print(f"❌ Transcription error: {e}")
|
|
return None
|
|
|
|
|
|
class LLMAnalyzer:
|
|
"""LLM analysis with fact-checking and question generation"""
|
|
|
|
def __init__(self, model="qwen2.5:3b"):
|
|
if not OLLAMA_AVAILABLE:
|
|
raise RuntimeError("Ollama not installed: pip install ollama")
|
|
|
|
self.model = model
|
|
try:
|
|
ollama.list()
|
|
print(f"✓ Ollama connected: {self.model}")
|
|
except Exception as e:
|
|
raise RuntimeError(f"Ollama not running: {e}")
|
|
|
|
def fact_check(self, text):
|
|
"""Quick fact-check"""
|
|
prompt = f"""Fact-check this statement. Reply ONLY with:
|
|
VERDICT: factual/dubious/false
|
|
CONFIDENCE: 0.0-1.0
|
|
REASON: one sentence
|
|
|
|
Statement: "{text}" """
|
|
|
|
try:
|
|
response = ollama.generate(
|
|
model=self.model,
|
|
prompt=prompt,
|
|
options={"temperature": 0.1, "num_predict": 80}
|
|
)
|
|
|
|
import re
|
|
response_text = response['response']
|
|
|
|
verdict = re.search(r'VERDICT:\s*(\w+)', response_text, re.I)
|
|
confidence = re.search(r'CONFIDENCE:\s*([\d.]+)', response_text, re.I)
|
|
reason = re.search(r'REASON:\s*(.+?)(?:\n|$)', response_text, re.I | re.DOTALL)
|
|
|
|
return {
|
|
'verdict': verdict.group(1).lower() if verdict else 'unknown',
|
|
'confidence': float(confidence.group(1)) if confidence else 0.5,
|
|
'reason': reason.group(1).strip() if reason else response_text[:150]
|
|
}
|
|
except Exception as e:
|
|
return {'verdict': 'error', 'confidence': 0.0, 'reason': str(e)}
|
|
|
|
def generate_questions(self, text):
|
|
"""Generate follow-up questions"""
|
|
prompt = f"""Generate 3 insightful questions about this. Reply ONLY with:
|
|
Q1: [question]
|
|
Q2: [question]
|
|
Q3: [question]
|
|
|
|
Statement: "{text}" """
|
|
|
|
try:
|
|
response = ollama.generate(
|
|
model=self.model,
|
|
prompt=prompt,
|
|
options={"temperature": 0.7, "num_predict": 120}
|
|
)
|
|
|
|
import re
|
|
response_text = response['response']
|
|
questions = []
|
|
|
|
for i in range(1, 4):
|
|
q_match = re.search(rf'Q{i}:\s*(.+?)(?:\n|$)', response_text, re.I)
|
|
if q_match:
|
|
question = q_match.group(1).strip()
|
|
if not question.endswith('?'):
|
|
question += '?'
|
|
questions.append(question)
|
|
|
|
# Fallback defaults
|
|
while len(questions) < 3:
|
|
defaults = ["What are the implications?", "What evidence supports this?", "What's the context?"]
|
|
questions.append(defaults[len(questions)])
|
|
|
|
return questions[:3]
|
|
except Exception as e:
|
|
return ["What are the key points?", "What supports this?", "What are the implications?"]
|
|
|
|
|
|
def save_transcript(text, source, timestamp, filename):
|
|
"""Append transcript to file"""
|
|
os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True)
|
|
with open(filename, "a", encoding="utf-8") as f:
|
|
source_label = "MIC" if source == 'mic' else "SPEAKER"
|
|
f.write(f"[{timestamp}] {source_label}: {text}\n")
|
|
|
|
|
|
def save_enriched_transcript(text, source, timestamp, fact_check, questions, filename):
|
|
"""Save enriched transcript with LLM analysis"""
|
|
os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True)
|
|
with open(filename, "a", encoding="utf-8") as f:
|
|
source_label = "MIC" if source == 'mic' else "SPEAKER"
|
|
f.write(f"\n{'='*70}\n")
|
|
f.write(f"[{timestamp}] {source_label}: {text}\n\n")
|
|
|
|
if fact_check:
|
|
f.write(f"📊 Fact Check: {fact_check['verdict'].upper()} ")
|
|
f.write(f"(confidence: {fact_check['confidence']:.2f})\n")
|
|
f.write(f"💡 {fact_check['reason']}\n\n")
|
|
|
|
if questions:
|
|
f.write("❓ Questions:\n")
|
|
for i, q in enumerate(questions, 1):
|
|
f.write(f"{i}. {q}\n")
|
|
f.write("\n")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Real-time audio transcription with dual capture")
|
|
parser.add_argument("--model", default="tiny", choices=["tiny", "base", "small", "medium", "large"],
|
|
help="Whisper model (default: tiny)")
|
|
parser.add_argument("--language", default="en", help="Language code (default: en)")
|
|
parser.add_argument("--mic", help="Microphone device name (partial match)")
|
|
parser.add_argument("--monitor", help="Monitor device name for speaker capture")
|
|
parser.add_argument("--interval", type=float, default=5.0, help="Processing interval in seconds (default: 5.0)")
|
|
parser.add_argument("--min-duration", type=float, default=2.0, help="Minimum audio duration (default: 2.0)")
|
|
parser.add_argument("--enable-llm", action="store_true", help="Enable LLM analysis (fact-checking + questions)")
|
|
parser.add_argument("--llm-model", default="qwen2.5:3b", help="Ollama model (default: qwen2.5:3b)")
|
|
parser.add_argument("--output", "-o", help="Save transcript to file")
|
|
parser.add_argument("--list-devices", action="store_true", help="List audio devices and exit")
|
|
parser.add_argument("--force-cpu", action="store_true", help="Force CPU processing")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.list_devices:
|
|
print("\nAvailable audio devices:")
|
|
for i, dev in enumerate(sd.query_devices()):
|
|
in_ch = dev['max_input_channels']
|
|
out_ch = dev['max_output_channels']
|
|
if in_ch > 0:
|
|
print(f" [{i:2d}] {dev['name']:<50} IN:{in_ch} OUT:{out_ch}")
|
|
return
|
|
|
|
print("=== Real-Time Audio Transcription ===")
|
|
print(f"Model: {args.model} | Language: {args.language} | Interval: {args.interval}s")
|
|
if args.output:
|
|
print(f"Output: {args.output}")
|
|
if args.enable_llm:
|
|
print(f"LLM Analysis: Enabled ({args.llm_model})")
|
|
|
|
# Initialize capture
|
|
try:
|
|
capturer = DualAudioCapture(
|
|
mic_device=args.mic,
|
|
monitor_device=args.monitor,
|
|
sample_rate=16000,
|
|
chunk_size=2048
|
|
)
|
|
except Exception as e:
|
|
print(f"\n❌ Audio Error: {e}")
|
|
print("\nTip: Use --list-devices to see available devices")
|
|
print(" Use --mic and --monitor to specify devices")
|
|
return
|
|
|
|
# Initialize transcriber
|
|
try:
|
|
transcriber = WhisperTranscriber(
|
|
model_name=args.model,
|
|
language=args.language,
|
|
force_cpu=args.force_cpu
|
|
)
|
|
except Exception as e:
|
|
print(f"\n❌ Whisper Error: {e}")
|
|
return
|
|
|
|
# Initialize LLM analyzer
|
|
llm_analyzer = None
|
|
if args.enable_llm:
|
|
try:
|
|
llm_analyzer = LLMAnalyzer(model=args.llm_model)
|
|
except Exception as e:
|
|
print(f"\n⚠ LLM Error: {e}")
|
|
print("Continuing without LLM analysis...")
|
|
|
|
# Main loop
|
|
print(f"\n✅ Started. Press Ctrl+C to stop.\n{'='*60}")
|
|
last_process = time.time()
|
|
|
|
try:
|
|
while True:
|
|
# Collect audio
|
|
chunk = capturer.read_chunk()
|
|
if chunk:
|
|
source, audio = chunk
|
|
transcriber.add_audio(source, audio)
|
|
|
|
# Process at intervals
|
|
if time.time() - last_process >= args.interval:
|
|
results = transcriber.transcribe_chunk(min_duration=args.min_duration)
|
|
|
|
if results:
|
|
timestamp = datetime.now().strftime("%H:%M:%S")
|
|
|
|
for source, text in results.items():
|
|
if text:
|
|
source_emoji = "🎤" if source == 'mic' else "🔊"
|
|
print(f"\n{source_emoji} [{timestamp}] {text}")
|
|
|
|
# LLM analysis
|
|
fact_check = None
|
|
questions = None
|
|
if llm_analyzer:
|
|
fact_check = llm_analyzer.fact_check(text)
|
|
questions = llm_analyzer.generate_questions(text)
|
|
|
|
verdict_emoji = {'factual': '✅', 'dubious': '⚠️', 'false': '❌'}.get(
|
|
fact_check['verdict'], '❓')
|
|
print(f" {verdict_emoji} {fact_check['verdict'].upper()} "
|
|
f"({fact_check['confidence']:.2f}): {fact_check['reason']}")
|
|
print(f" ❓ Questions:")
|
|
for i, q in enumerate(questions, 1):
|
|
print(f" {i}. {q}")
|
|
|
|
# Save to file
|
|
if args.output:
|
|
if llm_analyzer:
|
|
save_enriched_transcript(text, source, timestamp, fact_check, questions, args.output)
|
|
else:
|
|
save_transcript(text, source, timestamp, args.output)
|
|
|
|
last_process = time.time()
|
|
|
|
except KeyboardInterrupt:
|
|
print(f"\n{'='*60}\n🛑 Stopping...")
|
|
|
|
capturer.close()
|
|
if args.output and os.path.exists(args.output):
|
|
print(f"\n💾 Transcript saved: {os.path.abspath(args.output)}")
|
|
print("\n✅ Done!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|