""" Sentence extraction from chunked transcriptions. Stitches partial chunks together and extracts complete sentences. """ import re from typing import List, Tuple, Optional from collections import deque class SentenceExtractor: """ Buffers transcription chunks and extracts complete sentences. Handles sentence boundaries that span across audio chunks. """ def __init__(self, max_buffer_words=200): """ Initialize the sentence extractor. Args: max_buffer_words: Maximum words to keep in buffer before forcing extraction """ self.buffer = "" self.max_buffer_words = max_buffer_words self.completed_sentences = deque() # Sentence boundary patterns self.sentence_end_pattern = re.compile(r'([.!?]+)\s+') self.sentence_boundaries = re.compile(r'(?<=[.!?])\s+(?=[A-Z])') def add_chunk(self, text: str) -> List[str]: """ Add a new transcription chunk and extract any complete sentences. Args: text: New transcription text chunk Returns: List of complete sentences extracted """ if not text or not text.strip(): return [] # Add to buffer if self.buffer: # Smart joining: check if we need a space if not self.buffer[-1].isspace() and not text[0].isspace(): self.buffer += " " self.buffer += text.strip() # Extract complete sentences sentences = self._extract_sentences() # Check if buffer is too large word_count = len(self.buffer.split()) if word_count > self.max_buffer_words: # Force extraction of what we have forced = self._force_extract() if forced: sentences.extend(forced) return sentences def _extract_sentences(self) -> List[str]: """ Extract complete sentences from buffer. Keeps incomplete sentence in buffer. Returns: List of complete sentences """ sentences = [] # Find sentence boundaries # Pattern: sentence ending punctuation followed by space and capital letter # or sentence ending at punctuation before end of buffer parts = self.sentence_boundaries.split(self.buffer) if len(parts) > 1: # We have complete sentences # Keep the last part (incomplete sentence) in buffer sentences = [s.strip() for s in parts[:-1] if s.strip()] self.buffer = parts[-1].strip() return sentences def _force_extract(self) -> List[str]: """ Force extraction when buffer is too large. Tries to break at reasonable points. Returns: List of extracted text segments """ # Try to find the last sentence-like boundary last_period = max( self.buffer.rfind('. '), self.buffer.rfind('! '), self.buffer.rfind('? ') ) if last_period > 0: # Extract up to last period extracted = self.buffer[:last_period + 1].strip() self.buffer = self.buffer[last_period + 1:].strip() return [extracted] else: # No sentence boundary found, extract by word limit words = self.buffer.split() if len(words) > self.max_buffer_words: # Take 80% of max_buffer_words split_point = int(self.max_buffer_words * 0.8) extracted = " ".join(words[:split_point]) self.buffer = " ".join(words[split_point:]) return [extracted + "..."] return [] def flush(self) -> List[str]: """ Flush remaining buffer and return as sentence(s). Call this at end of transcription. Returns: List of remaining text as sentences """ sentences = [] if self.buffer.strip(): # Try to extract any remaining complete sentences first extracted = self._extract_sentences() sentences.extend(extracted) # Return remaining buffer if it has content if self.buffer.strip(): # Check if it ends with punctuation if not self.buffer[-1] in '.!?': self.buffer += "." sentences.append(self.buffer.strip()) self.buffer = "" return sentences def get_buffer_status(self) -> dict: """ Get current buffer status for debugging. Returns: Dictionary with buffer stats """ return { "buffer_length": len(self.buffer), "buffer_words": len(self.buffer.split()) if self.buffer else 0, "buffer_preview": self.buffer[:100] + "..." if len(self.buffer) > 100 else self.buffer } class SentenceCleaner: """ Cleans and normalizes extracted sentences. Removes duplicates, fixes common transcription issues. """ def __init__(self): self.seen_sentences = set() self.similarity_threshold = 0.85 def clean(self, sentence: str) -> Optional[str]: """ Clean and normalize a sentence. Args: sentence: Raw sentence text Returns: Cleaned sentence or None if should be filtered """ if not sentence or not sentence.strip(): return None # Basic cleaning cleaned = sentence.strip() # Remove multiple spaces cleaned = re.sub(r'\s+', ' ', cleaned) # Fix spacing around punctuation cleaned = re.sub(r'\s+([.!?,;:])', r'\1', cleaned) # Capitalize first letter if cleaned and not cleaned[0].isupper(): cleaned = cleaned[0].upper() + cleaned[1:] # Ensure ends with punctuation if cleaned and not cleaned[-1] in '.!?': cleaned += '.' # Filter very short sentences (likely fragments) if len(cleaned.split()) < 3: return None # Check for duplicates (exact) if cleaned in self.seen_sentences: return None self.seen_sentences.add(cleaned) return cleaned def reset(self): """Reset seen sentences cache.""" self.seen_sentences.clear() def demo(): """Demo usage of sentence extractor.""" extractor = SentenceExtractor() cleaner = SentenceCleaner() # Simulate chunked transcription chunks = [ "Hello everyone welcome to", "to this presentation today we will", "will discuss the importance of AI. Artificial intelligence is", "is transforming many industries. It helps us automate", "automate tasks and make better decisions. What do you", "you think about this technology? I believe it has", "has great potential for the future." ] print("=== Sentence Extraction Demo ===\n") print("Input chunks:") for i, chunk in enumerate(chunks, 1): print(f" Chunk {i}: '{chunk}'") print("\n" + "="*50) print("Extracted sentences:\n") for i, chunk in enumerate(chunks, 1): sentences = extractor.add_chunk(chunk) for sent in sentences: cleaned = cleaner.clean(sent) if cleaned: print(f" [{i}] {cleaned}") # Flush remaining buffer print("\nFlushing buffer...") final_sentences = extractor.flush() for sent in final_sentences: cleaned = cleaner.clean(sent) if cleaned: print(f" [final] {cleaned}") print("\n" + "="*50) print("Buffer status:") print(extractor.get_buffer_status()) if __name__ == "__main__": demo()