105 lines
3.3 KiB
Python
105 lines
3.3 KiB
Python
from pathlib import Path
|
|
from typing import Dict, Optional
|
|
import json
|
|
|
|
class ContentExtractor:
|
|
def __init__(self):
|
|
self.extractors = {
|
|
'pdf_text': self._extract_pdf,
|
|
'ocr+caption': self._extract_image,
|
|
'transcribe': self._extract_audio,
|
|
'transcribe+scenes': self._extract_video,
|
|
'office_text': self._extract_document,
|
|
'read': self._extract_text,
|
|
'read+syntax': self._extract_code
|
|
}
|
|
|
|
def extract(self, file_path: Path, extractor_type: str) -> Dict:
|
|
extractor = self.extractors.get(extractor_type)
|
|
if not extractor:
|
|
return {'error': f'Unknown extractor: {extractor_type}'}
|
|
|
|
try:
|
|
return extractor(file_path)
|
|
except Exception as e:
|
|
return {'error': str(e)}
|
|
|
|
def _extract_text(self, file_path: Path) -> Dict:
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
content = f.read(1024 * 1024)
|
|
return {
|
|
'text': content,
|
|
'char_count': len(content),
|
|
'needs_llm': False
|
|
}
|
|
except Exception as e:
|
|
return {'error': str(e)}
|
|
|
|
def _extract_code(self, file_path: Path) -> Dict:
|
|
result = self._extract_text(file_path)
|
|
if 'error' not in result:
|
|
result['type'] = 'code'
|
|
result['needs_llm'] = True
|
|
return result
|
|
|
|
def _extract_pdf(self, file_path: Path) -> Dict:
|
|
try:
|
|
import PyPDF2
|
|
text_parts = []
|
|
with open(file_path, 'rb') as f:
|
|
pdf = PyPDF2.PdfReader(f)
|
|
for page in pdf.pages[:10]:
|
|
text_parts.append(page.extract_text())
|
|
|
|
text = '\n'.join(text_parts)
|
|
return {
|
|
'text': text,
|
|
'pages_extracted': len(text_parts),
|
|
'needs_llm': len(text.strip()) > 100,
|
|
'type': 'document'
|
|
}
|
|
except Exception as e:
|
|
return {'error': str(e), 'needs_ocr': True}
|
|
|
|
def _extract_image(self, file_path: Path) -> Dict:
|
|
return {
|
|
'type': 'image',
|
|
'needs_ocr': True,
|
|
'needs_caption': True,
|
|
'needs_llm': True,
|
|
'pipeline': ['ocr', 'caption', 'embedding'],
|
|
'status': 'pending'
|
|
}
|
|
|
|
def _extract_audio(self, file_path: Path) -> Dict:
|
|
return {
|
|
'type': 'audio',
|
|
'needs_transcription': True,
|
|
'needs_llm': True,
|
|
'pipeline': ['transcribe', 'summarize'],
|
|
'status': 'pending'
|
|
}
|
|
|
|
def _extract_video(self, file_path: Path) -> Dict:
|
|
return {
|
|
'type': 'video',
|
|
'needs_transcription': True,
|
|
'needs_scene_detection': True,
|
|
'needs_llm': True,
|
|
'pipeline': ['transcribe', 'scenes', 'summarize'],
|
|
'status': 'pending'
|
|
}
|
|
|
|
def _extract_document(self, file_path: Path) -> Dict:
|
|
try:
|
|
import textract
|
|
text = textract.process(str(file_path)).decode('utf-8')
|
|
return {
|
|
'text': text,
|
|
'type': 'document',
|
|
'needs_llm': len(text.strip()) > 100
|
|
}
|
|
except:
|
|
return {'error': 'textract failed', 'needs_llm': True}
|