261 lines
7.6 KiB
Python
261 lines
7.6 KiB
Python
"""
|
|
Sentence extraction from chunked transcriptions.
|
|
Stitches partial chunks together and extracts complete sentences.
|
|
"""
|
|
|
|
import re
|
|
from typing import List, Tuple, Optional
|
|
from collections import deque
|
|
|
|
|
|
class SentenceExtractor:
|
|
"""
|
|
Buffers transcription chunks and extracts complete sentences.
|
|
Handles sentence boundaries that span across audio chunks.
|
|
"""
|
|
|
|
def __init__(self, max_buffer_words=200):
|
|
"""
|
|
Initialize the sentence extractor.
|
|
|
|
Args:
|
|
max_buffer_words: Maximum words to keep in buffer before forcing extraction
|
|
"""
|
|
self.buffer = ""
|
|
self.max_buffer_words = max_buffer_words
|
|
self.completed_sentences = deque()
|
|
|
|
# Sentence boundary patterns
|
|
self.sentence_end_pattern = re.compile(r'([.!?]+)\s+')
|
|
self.sentence_boundaries = re.compile(r'(?<=[.!?])\s+(?=[A-Z])')
|
|
|
|
def add_chunk(self, text: str) -> List[str]:
|
|
"""
|
|
Add a new transcription chunk and extract any complete sentences.
|
|
|
|
Args:
|
|
text: New transcription text chunk
|
|
|
|
Returns:
|
|
List of complete sentences extracted
|
|
"""
|
|
if not text or not text.strip():
|
|
return []
|
|
|
|
# Add to buffer
|
|
if self.buffer:
|
|
# Smart joining: check if we need a space
|
|
if not self.buffer[-1].isspace() and not text[0].isspace():
|
|
self.buffer += " "
|
|
self.buffer += text.strip()
|
|
|
|
# Extract complete sentences
|
|
sentences = self._extract_sentences()
|
|
|
|
# Check if buffer is too large
|
|
word_count = len(self.buffer.split())
|
|
if word_count > self.max_buffer_words:
|
|
# Force extraction of what we have
|
|
forced = self._force_extract()
|
|
if forced:
|
|
sentences.extend(forced)
|
|
|
|
return sentences
|
|
|
|
def _extract_sentences(self) -> List[str]:
|
|
"""
|
|
Extract complete sentences from buffer.
|
|
Keeps incomplete sentence in buffer.
|
|
|
|
Returns:
|
|
List of complete sentences
|
|
"""
|
|
sentences = []
|
|
|
|
# Find sentence boundaries
|
|
# Pattern: sentence ending punctuation followed by space and capital letter
|
|
# or sentence ending at punctuation before end of buffer
|
|
parts = self.sentence_boundaries.split(self.buffer)
|
|
|
|
if len(parts) > 1:
|
|
# We have complete sentences
|
|
# Keep the last part (incomplete sentence) in buffer
|
|
sentences = [s.strip() for s in parts[:-1] if s.strip()]
|
|
self.buffer = parts[-1].strip()
|
|
|
|
return sentences
|
|
|
|
def _force_extract(self) -> List[str]:
|
|
"""
|
|
Force extraction when buffer is too large.
|
|
Tries to break at reasonable points.
|
|
|
|
Returns:
|
|
List of extracted text segments
|
|
"""
|
|
# Try to find the last sentence-like boundary
|
|
last_period = max(
|
|
self.buffer.rfind('. '),
|
|
self.buffer.rfind('! '),
|
|
self.buffer.rfind('? ')
|
|
)
|
|
|
|
if last_period > 0:
|
|
# Extract up to last period
|
|
extracted = self.buffer[:last_period + 1].strip()
|
|
self.buffer = self.buffer[last_period + 1:].strip()
|
|
return [extracted]
|
|
else:
|
|
# No sentence boundary found, extract by word limit
|
|
words = self.buffer.split()
|
|
if len(words) > self.max_buffer_words:
|
|
# Take 80% of max_buffer_words
|
|
split_point = int(self.max_buffer_words * 0.8)
|
|
extracted = " ".join(words[:split_point])
|
|
self.buffer = " ".join(words[split_point:])
|
|
return [extracted + "..."]
|
|
|
|
return []
|
|
|
|
def flush(self) -> List[str]:
|
|
"""
|
|
Flush remaining buffer and return as sentence(s).
|
|
Call this at end of transcription.
|
|
|
|
Returns:
|
|
List of remaining text as sentences
|
|
"""
|
|
sentences = []
|
|
|
|
if self.buffer.strip():
|
|
# Try to extract any remaining complete sentences first
|
|
extracted = self._extract_sentences()
|
|
sentences.extend(extracted)
|
|
|
|
# Return remaining buffer if it has content
|
|
if self.buffer.strip():
|
|
# Check if it ends with punctuation
|
|
if not self.buffer[-1] in '.!?':
|
|
self.buffer += "."
|
|
sentences.append(self.buffer.strip())
|
|
self.buffer = ""
|
|
|
|
return sentences
|
|
|
|
def get_buffer_status(self) -> dict:
|
|
"""
|
|
Get current buffer status for debugging.
|
|
|
|
Returns:
|
|
Dictionary with buffer stats
|
|
"""
|
|
return {
|
|
"buffer_length": len(self.buffer),
|
|
"buffer_words": len(self.buffer.split()) if self.buffer else 0,
|
|
"buffer_preview": self.buffer[:100] + "..." if len(self.buffer) > 100 else self.buffer
|
|
}
|
|
|
|
|
|
class SentenceCleaner:
|
|
"""
|
|
Cleans and normalizes extracted sentences.
|
|
Removes duplicates, fixes common transcription issues.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.seen_sentences = set()
|
|
self.similarity_threshold = 0.85
|
|
|
|
def clean(self, sentence: str) -> Optional[str]:
|
|
"""
|
|
Clean and normalize a sentence.
|
|
|
|
Args:
|
|
sentence: Raw sentence text
|
|
|
|
Returns:
|
|
Cleaned sentence or None if should be filtered
|
|
"""
|
|
if not sentence or not sentence.strip():
|
|
return None
|
|
|
|
# Basic cleaning
|
|
cleaned = sentence.strip()
|
|
|
|
# Remove multiple spaces
|
|
cleaned = re.sub(r'\s+', ' ', cleaned)
|
|
|
|
# Fix spacing around punctuation
|
|
cleaned = re.sub(r'\s+([.!?,;:])', r'\1', cleaned)
|
|
|
|
# Capitalize first letter
|
|
if cleaned and not cleaned[0].isupper():
|
|
cleaned = cleaned[0].upper() + cleaned[1:]
|
|
|
|
# Ensure ends with punctuation
|
|
if cleaned and not cleaned[-1] in '.!?':
|
|
cleaned += '.'
|
|
|
|
# Filter very short sentences (likely fragments)
|
|
if len(cleaned.split()) < 3:
|
|
return None
|
|
|
|
# Check for duplicates (exact)
|
|
if cleaned in self.seen_sentences:
|
|
return None
|
|
|
|
self.seen_sentences.add(cleaned)
|
|
return cleaned
|
|
|
|
def reset(self):
|
|
"""Reset seen sentences cache."""
|
|
self.seen_sentences.clear()
|
|
|
|
|
|
def demo():
|
|
"""Demo usage of sentence extractor."""
|
|
extractor = SentenceExtractor()
|
|
cleaner = SentenceCleaner()
|
|
|
|
# Simulate chunked transcription
|
|
chunks = [
|
|
"Hello everyone welcome to",
|
|
"to this presentation today we will",
|
|
"will discuss the importance of AI. Artificial intelligence is",
|
|
"is transforming many industries. It helps us automate",
|
|
"automate tasks and make better decisions. What do you",
|
|
"you think about this technology? I believe it has",
|
|
"has great potential for the future."
|
|
]
|
|
|
|
print("=== Sentence Extraction Demo ===\n")
|
|
print("Input chunks:")
|
|
for i, chunk in enumerate(chunks, 1):
|
|
print(f" Chunk {i}: '{chunk}'")
|
|
|
|
print("\n" + "="*50)
|
|
print("Extracted sentences:\n")
|
|
|
|
for i, chunk in enumerate(chunks, 1):
|
|
sentences = extractor.add_chunk(chunk)
|
|
for sent in sentences:
|
|
cleaned = cleaner.clean(sent)
|
|
if cleaned:
|
|
print(f" [{i}] {cleaned}")
|
|
|
|
# Flush remaining buffer
|
|
print("\nFlushing buffer...")
|
|
final_sentences = extractor.flush()
|
|
for sent in final_sentences:
|
|
cleaned = cleaner.clean(sent)
|
|
if cleaned:
|
|
print(f" [final] {cleaned}")
|
|
|
|
print("\n" + "="*50)
|
|
print("Buffer status:")
|
|
print(extractor.get_buffer_status())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
demo()
|