348 lines
12 KiB
Python
348 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Real-time transcription with dual audio capture (microphone + speaker monitor).
|
|
Linux/PipeWire optimized with Ollama LLM fact-checking.
|
|
"""
|
|
|
|
import sounddevice as sd
|
|
import numpy as np
|
|
import threading
|
|
import queue
|
|
import time
|
|
import argparse
|
|
from datetime import datetime
|
|
from faster_whisper import WhisperModel
|
|
|
|
try:
|
|
import ollama
|
|
OLLAMA_AVAILABLE = True
|
|
except ImportError:
|
|
OLLAMA_AVAILABLE = False
|
|
|
|
|
|
class DualAudioCapture:
|
|
"""Capture both microphone and speaker output simultaneously"""
|
|
|
|
def __init__(self, mic_device=None, monitor_device=None, sample_rate=16000, chunk_size=2048):
|
|
self.sample_rate = sample_rate
|
|
self.chunk_size = chunk_size
|
|
self.audio_queue = queue.Queue()
|
|
|
|
# Find devices
|
|
devices = sd.query_devices()
|
|
|
|
# Microphone (default input or specified)
|
|
if mic_device is None:
|
|
self.mic_device = sd.default.device[0] # Default input
|
|
else:
|
|
self.mic_device = self._find_device(mic_device, input_required=True)
|
|
|
|
# Monitor/Loopback (for speaker output)
|
|
if monitor_device:
|
|
self.monitor_device = self._find_device(monitor_device, input_required=True)
|
|
else:
|
|
self.monitor_device = None
|
|
|
|
print(f"✓ Microphone: {devices[self.mic_device]['name']} (index {self.mic_device})")
|
|
if self.monitor_device:
|
|
print(f"✓ Monitor: {devices[self.monitor_device]['name']} (index {self.monitor_device})")
|
|
else:
|
|
print("⚠ No monitor device - capturing microphone only")
|
|
|
|
# Start streams
|
|
self.mic_stream = sd.InputStream(
|
|
device=self.mic_device,
|
|
channels=1,
|
|
samplerate=sample_rate,
|
|
blocksize=chunk_size,
|
|
dtype='int16',
|
|
callback=self._mic_callback
|
|
)
|
|
|
|
if self.monitor_device:
|
|
self.monitor_stream = sd.InputStream(
|
|
device=self.monitor_device,
|
|
channels=1,
|
|
samplerate=sample_rate,
|
|
blocksize=chunk_size,
|
|
dtype='int16',
|
|
callback=self._monitor_callback
|
|
)
|
|
else:
|
|
self.monitor_stream = None
|
|
|
|
self.mic_stream.start()
|
|
if self.monitor_stream:
|
|
self.monitor_stream.start()
|
|
|
|
print("✓ Audio capture started")
|
|
|
|
def _find_device(self, device_name, input_required=True):
|
|
"""Find device by name substring"""
|
|
devices = sd.query_devices()
|
|
for i, dev in enumerate(devices):
|
|
if device_name.lower() in dev['name'].lower():
|
|
if not input_required or dev['max_input_channels'] > 0:
|
|
return i
|
|
raise RuntimeError(f"Device '{device_name}' not found")
|
|
|
|
def _mic_callback(self, indata, frames, time_info, status):
|
|
"""Microphone audio callback"""
|
|
if status:
|
|
print(f"⚠ Mic status: {status}")
|
|
self.audio_queue.put(('mic', indata.copy()))
|
|
|
|
def _monitor_callback(self, indata, frames, time_info, status):
|
|
"""Monitor/speaker audio callback"""
|
|
if status:
|
|
print(f"⚠ Monitor status: {status}")
|
|
self.audio_queue.put(('monitor', indata.copy()))
|
|
|
|
def read_chunk(self):
|
|
"""Read audio data from queue"""
|
|
try:
|
|
return self.audio_queue.get(timeout=0.05)
|
|
except queue.Empty:
|
|
return None
|
|
|
|
def close(self):
|
|
"""Cleanup resources"""
|
|
self.mic_stream.stop()
|
|
self.mic_stream.close()
|
|
if self.monitor_stream:
|
|
self.monitor_stream.stop()
|
|
self.monitor_stream.close()
|
|
|
|
|
|
class WhisperTranscriber:
|
|
"""Process audio with Whisper"""
|
|
|
|
def __init__(self, model_name="base", language="en", force_cpu=False):
|
|
print(f"Loading Whisper model '{model_name}'...")
|
|
|
|
import torch
|
|
has_cuda = torch.cuda.is_available() and not force_cpu
|
|
|
|
device = "cpu"
|
|
compute_type = "int8"
|
|
|
|
if has_cuda:
|
|
try:
|
|
import ctranslate2
|
|
if ctranslate2.get_cuda_device_count() > 0:
|
|
device = "cuda"
|
|
compute_type = "float16"
|
|
print(f"✓ Using GPU: {torch.cuda.get_device_name(0)}")
|
|
except Exception as e:
|
|
print(f"⚠ CUDA unavailable: {e}")
|
|
|
|
if device == "cpu":
|
|
print("✓ Using CPU")
|
|
|
|
model_kwargs = {"device": device, "compute_type": compute_type}
|
|
if device == "cpu":
|
|
model_kwargs["cpu_threads"] = 4
|
|
|
|
self.model = WhisperModel(model_name, **model_kwargs)
|
|
self.language = language
|
|
self.mic_buffer = np.array([], dtype=np.float32)
|
|
self.monitor_buffer = np.array([], dtype=np.float32)
|
|
self.lock = threading.Lock()
|
|
|
|
def add_audio(self, source, audio_chunk):
|
|
"""Add audio to appropriate buffer"""
|
|
with self.lock:
|
|
audio_float = audio_chunk.flatten().astype(np.float32) / 32768.0
|
|
if source == 'mic':
|
|
self.mic_buffer = np.concatenate([self.mic_buffer, audio_float])
|
|
else:
|
|
self.monitor_buffer = np.concatenate([self.monitor_buffer, audio_float])
|
|
|
|
def transcribe_chunk(self, min_duration=3.0):
|
|
"""Transcribe accumulated audio"""
|
|
with self.lock:
|
|
mic_duration = len(self.mic_buffer) / 16000
|
|
monitor_duration = len(self.monitor_buffer) / 16000
|
|
|
|
results = {}
|
|
|
|
# Transcribe microphone
|
|
if mic_duration >= min_duration:
|
|
mic_audio = self.mic_buffer.copy()
|
|
self.mic_buffer = np.array([], dtype=np.float32)
|
|
results['mic'] = self._transcribe(mic_audio)
|
|
|
|
# Transcribe monitor
|
|
if monitor_duration >= min_duration:
|
|
monitor_audio = self.monitor_buffer.copy()
|
|
self.monitor_buffer = np.array([], dtype=np.float32)
|
|
results['monitor'] = self._transcribe(monitor_audio)
|
|
|
|
return results if results else None
|
|
|
|
def _transcribe(self, audio):
|
|
"""Internal transcription"""
|
|
try:
|
|
segments, _ = self.model.transcribe(
|
|
audio,
|
|
language=self.language,
|
|
beam_size=3, # Faster than default 5
|
|
vad_filter=True,
|
|
vad_parameters=dict(min_silence_duration_ms=500)
|
|
)
|
|
text = " ".join([seg.text for seg in segments]).strip()
|
|
return text if text else None
|
|
except Exception as e:
|
|
print(f"❌ Transcription error: {e}")
|
|
return None
|
|
|
|
|
|
class LLMFactChecker:
|
|
"""Fast fact-checking with Ollama"""
|
|
|
|
def __init__(self, model="qwen2.5:3b"):
|
|
if not OLLAMA_AVAILABLE:
|
|
raise RuntimeError("Ollama not installed: pip install ollama")
|
|
|
|
self.model = model
|
|
try:
|
|
ollama.list()
|
|
print(f"✓ Ollama connected: {self.model}")
|
|
except Exception as e:
|
|
raise RuntimeError(f"Ollama not running: {e}")
|
|
|
|
def fact_check(self, text):
|
|
"""Quick fact-check"""
|
|
prompt = f"""Fact-check this statement. Reply ONLY with:
|
|
VERDICT: factual/dubious/false
|
|
CONFIDENCE: 0.0-1.0
|
|
REASON: one sentence
|
|
|
|
Statement: "{text}" """
|
|
|
|
try:
|
|
response = ollama.generate(
|
|
model=self.model,
|
|
prompt=prompt,
|
|
options={"temperature": 0.1, "num_predict": 80}
|
|
)
|
|
|
|
import re
|
|
text = response['response']
|
|
|
|
verdict = re.search(r'VERDICT:\s*(\w+)', text, re.I)
|
|
confidence = re.search(r'CONFIDENCE:\s*([\d.]+)', text, re.I)
|
|
reason = re.search(r'REASON:\s*(.+?)(?:\n|$)', text, re.I | re.DOTALL)
|
|
|
|
return {
|
|
'verdict': verdict.group(1).lower() if verdict else 'unknown',
|
|
'confidence': float(confidence.group(1)) if confidence else 0.5,
|
|
'reason': reason.group(1).strip() if reason else text[:150]
|
|
}
|
|
except Exception as e:
|
|
return {'verdict': 'error', 'confidence': 0.0, 'reason': str(e)}
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Dual audio transcription with fact-checking")
|
|
parser.add_argument("--model", default="tiny", choices=["tiny", "base", "small", "medium"],
|
|
help="Whisper model (default: tiny for speed)")
|
|
parser.add_argument("--language", default="en", help="Language code")
|
|
parser.add_argument("--mic", help="Microphone device name (partial match)")
|
|
parser.add_argument("--monitor", help="Monitor device name for speaker capture")
|
|
parser.add_argument("--interval", type=float, default=5.0, help="Processing interval (seconds)")
|
|
parser.add_argument("--min-duration", type=float, default=2.0, help="Min audio duration")
|
|
parser.add_argument("--enable-llm", action="store_true", help="Enable fact-checking")
|
|
parser.add_argument("--llm-model", default="qwen2.5:3b", help="Ollama model")
|
|
parser.add_argument("--list-devices", action="store_true", help="List audio devices")
|
|
parser.add_argument("--force-cpu", action="store_true", help="Force CPU")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.list_devices:
|
|
print("\nAvailable audio devices:")
|
|
for i, dev in enumerate(sd.query_devices()):
|
|
in_ch = dev['max_input_channels']
|
|
out_ch = dev['max_output_channels']
|
|
if in_ch > 0:
|
|
print(f" [{i:2d}] {dev['name']:<50} IN:{in_ch} OUT:{out_ch}")
|
|
return
|
|
|
|
print("=== Dual Audio Transcription with Fact-Checking ===")
|
|
print(f"Model: {args.model} | Language: {args.language} | Interval: {args.interval}s")
|
|
|
|
# Initialize capture
|
|
try:
|
|
capturer = DualAudioCapture(
|
|
mic_device=args.mic,
|
|
monitor_device=args.monitor,
|
|
sample_rate=16000,
|
|
chunk_size=2048
|
|
)
|
|
except Exception as e:
|
|
print(f"\n❌ Audio Error: {e}")
|
|
print("\nTip: Use --list-devices to see available devices")
|
|
print(" Use --mic and --monitor to specify devices")
|
|
return
|
|
|
|
# Initialize transcriber
|
|
try:
|
|
transcriber = WhisperTranscriber(
|
|
model_name=args.model,
|
|
language=args.language,
|
|
force_cpu=args.force_cpu
|
|
)
|
|
except Exception as e:
|
|
print(f"\n❌ Whisper Error: {e}")
|
|
return
|
|
|
|
# Initialize fact checker
|
|
fact_checker = None
|
|
if args.enable_llm:
|
|
try:
|
|
fact_checker = LLMFactChecker(model=args.llm_model)
|
|
except Exception as e:
|
|
print(f"\n⚠ LLM Error: {e}")
|
|
print("Continuing without fact-checking...")
|
|
|
|
# Main loop
|
|
print(f"\n✅ Started. Press Ctrl+C to stop.\n{'='*60}")
|
|
last_process = time.time()
|
|
|
|
try:
|
|
while True:
|
|
# Collect audio
|
|
chunk = capturer.read_chunk()
|
|
if chunk:
|
|
source, audio = chunk
|
|
transcriber.add_audio(source, audio)
|
|
|
|
# Process at intervals
|
|
if time.time() - last_process >= args.interval:
|
|
results = transcriber.transcribe_chunk(min_duration=args.min_duration)
|
|
|
|
if results:
|
|
timestamp = datetime.now().strftime("%H:%M:%S")
|
|
|
|
for source, text in results.items():
|
|
if text:
|
|
source_emoji = "🎤" if source == 'mic' else "🔊"
|
|
print(f"\n{source_emoji} [{timestamp}] {text}")
|
|
|
|
if fact_checker:
|
|
fc = fact_checker.fact_check(text)
|
|
verdict_emoji = {'factual': '✅', 'dubious': '⚠️', 'false': '❌'}.get(fc['verdict'], '❓')
|
|
print(f" {verdict_emoji} {fc['verdict'].upper()} ({fc['confidence']:.2f}): {fc['reason']}")
|
|
|
|
last_process = time.time()
|
|
|
|
except KeyboardInterrupt:
|
|
print(f"\n{'='*60}\n🛑 Stopping...")
|
|
|
|
capturer.close()
|
|
print("\n✅ Done!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|