diff --git a/app/analysis/inventory.py b/app/analysis/inventory.py new file mode 100644 index 0000000..6d5f4c4 --- /dev/null +++ b/app/analysis/inventory.py @@ -0,0 +1,150 @@ +import psycopg2 +from typing import Dict, Optional +import logging + +logger = logging.getLogger(__name__) + +class FileTypeInventory: + def __init__(self, db_config: Dict): + self.db_config = db_config + + self.parseable_extensions = { + 'text': {'txt', 'md', 'log', 'json', 'yaml', 'yml', 'xml', 'csv', 'tsv', 'ini', 'cfg', 'conf'}, + 'code': {'py', 'js', 'java', 'go', 'rs', 'ts', 'tsx', 'jsx', 'cpp', 'h', 'c', 'cs', 'rb', 'php', 'sh', 'bat', 'ps1', 'sql', 'r', 'scala', 'kt'}, + 'pdf': {'pdf'}, + 'document': {'doc', 'docx', 'odt', 'rtf', 'pages'}, + 'spreadsheet': {'xls', 'xlsx', 'ods', 'numbers'}, + 'presentation': {'ppt', 'pptx', 'odp', 'key'}, + 'image': {'jpg', 'jpeg', 'png', 'gif', 'bmp', 'tiff', 'webp', 'svg', 'ico'}, + 'audio': {'mp3', 'wav', 'flac', 'm4a', 'ogg', 'wma', 'aac', 'opus'}, + 'video': {'mp4', 'avi', 'mkv', 'mov', 'wmv', 'flv', 'webm', 'mpg', 'mpeg'}, + 'archive': {'zip', 'tar', 'gz', 'bz2', '7z', 'rar', 'xz'}, + 'executable': {'exe', 'dll', 'so', 'dylib', 'bin', 'app'}, + 'data': {'db', 'sqlite', 'mdb', 'accdb', 'pkl', 'parquet', 'feather', 'arrow'} + } + + self.implemented_parsers = { + 'text': True, + 'code': True, + 'pdf': True, + 'document': False, + 'spreadsheet': False, + 'presentation': False, + 'image': False, + 'audio': False, + 'video': False, + 'archive': False, + 'executable': False, + 'data': False + } + + def get_connection(self): + return psycopg2.connect(**self.db_config) + + def analyze(self, disk: Optional[str] = None, limit: int = 100): + conn = self.get_connection() + cursor = conn.cursor() + try: + query = ''' + SELECT + CASE + WHEN path ~ '\\.([a-zA-Z0-9]+)$' THEN + LOWER(SUBSTRING(path FROM '\\.([a-zA-Z0-9]+)$')) + ELSE 'no_extension' + END as extension, + COUNT(*) as count, + SUM(size)::bigint as total_size, + ROUND(AVG(size)::numeric, 0) as avg_size, + MAX(size) as max_size, + COUNT(CASE WHEN extracted_text IS NOT NULL THEN 1 END) as parsed_count + FROM files + ''' + params = [] + if disk: + query += ' WHERE disk_label = %s' + params.append(disk) + + query += ' GROUP BY extension ORDER BY count DESC' + if limit: + query += f' LIMIT {limit}' + + cursor.execute(query, params) + results = cursor.fetchall() + + return self._format_results(results) + + finally: + cursor.close() + conn.close() + + def _format_results(self, results): + total_files = 0 + total_size = 0 + parseable_files = 0 + parsed_files = 0 + unparsed_by_type = {} + + extension_details = [] + for row in results: + ext, count, size, avg, max_sz, parsed = row + total_files += int(count) + total_size += int(size or 0) + parsed_files += int(parsed or 0) + + parser_type = self._get_parser_type(ext) + is_parseable = parser_type != 'none' and self.implemented_parsers.get(parser_type, False) + + if is_parseable: + parseable_files += int(count) + unparsed_count = int(count) - int(parsed or 0) + if unparsed_count > 0: + if parser_type not in unparsed_by_type: + unparsed_by_type[parser_type] = {'count': 0, 'extensions': set()} + unparsed_by_type[parser_type]['count'] += unparsed_count + unparsed_by_type[parser_type]['extensions'].add(ext) + + extension_details.append({ + 'extension': ext, + 'count': int(count), + 'total_size': int(size or 0), + 'avg_size': int(avg or 0), + 'max_size': int(max_sz or 0), + 'parsed': int(parsed or 0), + 'parser_type': parser_type, + 'is_parseable': is_parseable + }) + + return { + 'extensions': extension_details, + 'summary': { + 'total_files': total_files, + 'total_size': total_size, + 'parseable_files': parseable_files, + 'parsed_files': parsed_files, + 'coverage': (parsed_files / parseable_files * 100) if parseable_files > 0 else 0 + }, + 'unparsed_by_type': unparsed_by_type, + 'parser_status': self._get_parser_status() + } + + def _get_parser_type(self, ext: str) -> str: + for ptype, extensions in self.parseable_extensions.items(): + if ext in extensions: + return ptype + return 'none' + + def _get_parser_status(self): + return { + ptype: { + 'implemented': self.implemented_parsers.get(ptype, False), + 'extensions': list(exts) + } + for ptype, exts in self.parseable_extensions.items() + } + + def format_size(self, size_bytes: int) -> str: + for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + if size_bytes < 1024: + return f'{size_bytes:.1f}{unit}' + size_bytes /= 1024 + return f'{size_bytes:.1f}PB' diff --git a/app/enrichment/llm_client.py b/app/enrichment/llm_client.py index d25ead6..50b3279 100644 --- a/app/enrichment/llm_client.py +++ b/app/enrichment/llm_client.py @@ -6,12 +6,21 @@ from typing import Dict, Optional, List logger = logging.getLogger(__name__) class LLMClient: - def __init__(self, endpoint: str = 'http://localhost:11434', model: str = 'llama3', use_local: bool = True): + def __init__(self, endpoint: str = 'http://localhost:11434', model: str = 'llama3', use_local: bool = True, lm_studio_host: str = None): self.endpoint = endpoint self.model = model self.use_local = use_local - self.lm_studio_endpoint = 'http://192.168.1.74:1234' - self.lm_studio_model = 'openai/gpt-oss-20b' + + self.lm_studio_endpoints = { + 'plato': {'url': 'http://192.168.1.74:1234', 'model': 'openai/gpt-oss-20b'}, + 'postgres': {'url': 'http://192.168.1.159:1234', 'model': 'mistralai/devstral-small-2507'}, + 'local': {'url': 'http://localhost:11434', 'model': 'llama3'} + } + + self.lm_studio_host = lm_studio_host or 'postgres' + studio_config = self.lm_studio_endpoints.get(self.lm_studio_host, self.lm_studio_endpoints['postgres']) + self.lm_studio_endpoint = studio_config['url'] + self.lm_studio_model = studio_config['model'] def summarize(self, text: str, max_length: int = 200) -> Dict: prompt = f"Summarize this concisely in under {max_length} characters:\n\n{text[:3000]}" diff --git a/app/extraction/incremental.py b/app/extraction/incremental.py new file mode 100644 index 0000000..b06e8c8 --- /dev/null +++ b/app/extraction/incremental.py @@ -0,0 +1,196 @@ +import hashlib +import psycopg2 +from pathlib import Path +from typing import Dict, Optional, List +import logging +import time +import json + +logger = logging.getLogger(__name__) + +class IncrementalExtractor: + def __init__(self, db_config: Dict): + self.db_config = db_config + + def get_connection(self): + return psycopg2.connect(**self.db_config) + + def should_extract(self, file_path: str, file_checksum: str) -> bool: + conn = self.get_connection() + cursor = conn.cursor() + try: + cursor.execute(''' + SELECT file_checksum, status + FROM extraction_log + WHERE file_path = %s + ORDER BY created_at DESC + LIMIT 1 + ''', (file_path,)) + + result = cursor.fetchone() + if not result: + return True + + last_checksum, status = result + if last_checksum != file_checksum: + logger.info(f'File changed: {file_path}') + return True + + if status == 'success': + return False + + if status == 'error': + return True + + return True + + finally: + cursor.close() + conn.close() + + def log_extraction(self, node_id: Optional[str], file_path: str, file_checksum: str, + method: str, status: str, error_msg: Optional[str] = None, + extracted_size: Optional[int] = None, processing_time_ms: Optional[int] = None): + conn = self.get_connection() + cursor = conn.cursor() + try: + cursor.execute(''' + INSERT INTO extraction_log (node_id, file_path, file_checksum, extraction_method, + status, error_message, extracted_size, processing_time_ms) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + ''', (node_id, file_path, file_checksum, method, status, error_msg, extracted_size, processing_time_ms)) + conn.commit() + finally: + cursor.close() + conn.close() + + def create_or_update_node(self, node_type: str, path: str, disk_label: str, + checksum: Optional[str], size: Optional[int], + content_hash: Optional[str], metadata: Optional[Dict]) -> str: + conn = self.get_connection() + cursor = conn.cursor() + try: + cursor.execute(''' + INSERT INTO content_nodes (node_type, path, disk_label, checksum, size, + content_hash, extracted_at, metadata) + VALUES (%s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP, %s) + ON CONFLICT (node_type, path, disk_label) DO UPDATE SET + checksum = EXCLUDED.checksum, + size = EXCLUDED.size, + content_hash = EXCLUDED.content_hash, + extracted_at = CURRENT_TIMESTAMP, + metadata = EXCLUDED.metadata, + updated_at = CURRENT_TIMESTAMP + RETURNING id + ''', (node_type, path, disk_label, checksum, size, content_hash, json.dumps(metadata) if metadata else None)) + + node_id = cursor.fetchone()[0] + conn.commit() + return str(node_id) + + finally: + cursor.close() + conn.close() + + def batch_extract(self, file_list: List[Dict], parser_func, parser_name: str, + batch_size: int = 100, skip_existing: bool = True) -> Dict: + stats = { + 'processed': 0, + 'extracted': 0, + 'skipped': 0, + 'errors': 0, + 'total_time_ms': 0 + } + + conn = self.get_connection() + cursor = conn.cursor() + + try: + for idx, file_info in enumerate(file_list, 1): + path = file_info['path'] + checksum = file_info.get('checksum') + disk_label = file_info.get('disk_label') + + if skip_existing and not self.should_extract(path, checksum): + stats['skipped'] += 1 + continue + + start_time = time.time() + try: + result = parser_func(Path(path)) + processing_time_ms = int((time.time() - start_time) * 1000) + + if 'error' not in result and result.get('text'): + text = result['text'] + content_hash = hashlib.sha256(text.encode()).hexdigest() + + node_id = self.create_or_update_node( + node_type='file', + path=path, + disk_label=disk_label, + checksum=checksum, + size=file_info.get('size'), + content_hash=content_hash, + metadata={ + 'extraction': result.get('method', parser_name), + 'quality': result.get('quality', 'unknown') + } + ) + + cursor.execute(''' + UPDATE files + SET extracted_text = %s, + text_quality = %s + WHERE path = %s + ''', (text[:50000], result.get('quality'), path)) + + self.log_extraction( + node_id=node_id, + file_path=path, + file_checksum=checksum, + method=parser_name, + status='success', + extracted_size=len(text), + processing_time_ms=processing_time_ms + ) + + stats['extracted'] += 1 + stats['total_time_ms'] += processing_time_ms + else: + error_msg = result.get('error', 'No text extracted') + self.log_extraction( + node_id=None, + file_path=path, + file_checksum=checksum, + method=parser_name, + status='error', + error_msg=error_msg + ) + stats['errors'] += 1 + + except Exception as e: + logger.error(f'Extract failed for {path}: {e}') + self.log_extraction( + node_id=None, + file_path=path, + file_checksum=checksum, + method=parser_name, + status='error', + error_msg=str(e) + ) + stats['errors'] += 1 + + stats['processed'] += 1 + + if stats['processed'] % batch_size == 0: + conn.commit() + logger.info(f'Batch progress: {stats["processed"]}/{len(file_list)} ' + f'({stats["extracted"]} extracted, {stats["skipped"]} skipped, {stats["errors"]} errors)') + + conn.commit() + + finally: + cursor.close() + conn.close() + + return stats diff --git a/app/main.py b/app/main.py index 8f5a1e4..a47eeb9 100644 --- a/app/main.py +++ b/app/main.py @@ -894,137 +894,3 @@ class DiskReorganizer: cursor.close() conn.close() - def review_migration(self, category: Optional[str]=None, show_build: bool=False): - from classification.classifier import FileClassifier - classifier = FileClassifier() - conn = self.get_connection() - cursor = conn.cursor() - try: - query = 'SELECT path, size, category FROM files WHERE 1=1' - params = [] - if category: - query += ' AND category = %s' - params.append(category) - if not show_build: - query += " AND (metadata->>'labels' IS NULL OR metadata->>'labels' NOT LIKE '%build-artifact%')" - query += ' ORDER BY category, size DESC LIMIT 100' - cursor.execute(query, params) - files = cursor.fetchall() - if not files: - print('No files found matching criteria') - return - print(f'\n=== MIGRATION PREVIEW ===') - print(f'Showing {len(files)} files\n') - current_category = None - for path, size, cat in files: - if cat != current_category: - current_category = cat - print(f'\n{cat}:') - labels, suggested_cat, is_build = classifier.classify_path(path, int(size)) - target = classifier.suggest_target_path(path, suggested_cat, labels) - print(f' {path}') - print(f' → {target} ({self.format_size(int(size))})') - finally: - cursor.close() - conn.close() - - @staticmethod - def format_size(size: int) -> str: - for unit in ['B', 'KB', 'MB', 'GB', 'TB']: - if size < 1024: - return f'{size:.1f}{unit}' - size /= 1024 - return f'{size:.1f}PB' - -def main(): - parser = argparse.ArgumentParser(description='Disk Reorganizer - Free up a disk for Linux dual-boot') - subparsers = parser.add_subparsers(dest='command', required=True) - index_parser = subparsers.add_parser('index', help='Index files on a disk') - index_parser.add_argument('disk_root', help='Root path of disk (e.g., D:\\\\)') - index_parser.add_argument('disk_name', help='Logical name for the disk') - plan_parser = subparsers.add_parser('plan', help='Create migration plan') - plan_parser.add_argument('target_disk', help='Disk to free up') - plan_parser.add_argument('dest_disks', nargs='+', help='Destination disks') - exec_parser = subparsers.add_parser('execute', help='Execute migration plan') - exec_parser.add_argument('plan_file', help='Path to plan JSON file') - exec_parser.add_argument('--dry-run', action='store_true', help='Simulate without actual file operations') - dedupe_parser = subparsers.add_parser('dedupe', help='Deduplicate files and compute checksums') - dedupe_parser.add_argument('--disk', help='Optional: Only dedupe specific disk') - dedupe_parser.add_argument('--no-chunks', action='store_true', help='Disable chunk-level deduplication') - merge_parser = subparsers.add_parser('merge', help='Plan multi-disk merge with deduplication') - merge_parser.add_argument('--sources', nargs='+', required=True, help='Source disks to merge') - merge_parser.add_argument('--target', required=True, help='Target disk') - merge_parser.add_argument('--output', default='merge_plan.json', help='Output plan file') - merge_parser.add_argument('--filter-system', action='store_true', help='Filter system/build files') - merge_parser.add_argument('--network', help='Network target (e.g., user@host:/path)') - profile_parser = subparsers.add_parser('profile', help='Create content profiles (inventory + triage)') - profile_parser.add_argument('--disk', help='Profile specific disk') - profile_parser.add_argument('--update', action='store_true', help='Update database with profiles') - profile_parser.add_argument('--limit', type=int, help='Limit number of files') - extract_parser = subparsers.add_parser('extract', help='Extract content from files') - extract_parser.add_argument('--kind', help='Extract specific kind (pdf, image, audio, video)') - extract_parser.add_argument('--limit', type=int, default=10, help='Limit extraction batch') - - parse_parser = subparsers.add_parser('parse', help='Parse files to extract text') - parse_parser.add_argument('--kind', help='Parse specific kind (text, code, pdf)') - parse_parser.add_argument('--limit', type=int, default=100, help='Limit parse batch') - parse_parser.add_argument('--update', action='store_true', help='Save extracted text to database') - - enrich_parser = subparsers.add_parser('enrich', help='Enrich content with LLM analysis') - enrich_parser.add_argument('--limit', type=int, default=10, help='Limit enrichment batch') - enrich_parser.add_argument('--use-llm', action='store_true', help='Use LLM for summarization') - enrich_parser.add_argument('--network', action='store_true', help='Use network LM_STUDIO instead of local OLLAMA') - - classify_parser = subparsers.add_parser('classify', help='Classify files and suggest organization') - classify_parser.add_argument('--disk', help='Classify specific disk') - classify_parser.add_argument('--update', action='store_true', help='Update database with classifications') - classify_parser.add_argument('--no-resume', action='store_true', help='Start from scratch instead of resuming') - folders_parser = subparsers.add_parser('analyze-folders', help='Analyze folder structure and infer project intent') - folders_parser.add_argument('--disk', help='Analyze specific disk') - folders_parser.add_argument('--min-files', type=int, default=3, help='Minimum files per folder') - search_parser = subparsers.add_parser('search', help='Search indexed content') - search_parser.add_argument('query', help='Search query') - search_parser.add_argument('--type', choices=['text', 'enrichment', 'path'], default='enrichment', help='Search type') - search_parser.add_argument('--limit', type=int, default=20, help='Max results') - review_parser = subparsers.add_parser('review', help='Review proposed migration structure') - review_parser.add_argument('--category', help='Review specific category') - review_parser.add_argument('--show-build', action='store_true', help='Include build artifacts') - report_parser = subparsers.add_parser('report', help='Show current status') - report_parser.add_argument('--format', choices=['text', 'json'], default='text', help='Report format') - report_parser.add_argument('--show-duplicates', action='store_true', help='Show duplicate files') - report_parser.add_argument('--preview-merge', help='Preview merge plan from file') - args = parser.parse_args() - tool = DiskReorganizer() - if args.command == 'index': - tool.index_disk(args.disk_root, args.disk_name) - elif args.command == 'dedupe': - tool.run_deduplication(disk=args.disk, use_chunks=not args.no_chunks) - elif args.command == 'merge': - tool.plan_merge(sources=args.sources, target=args.target, output_file=args.output, filter_system=args.filter_system, network_target=args.network) - elif args.command == 'plan': - plan = tool.plan_migration(args.target_disk, args.dest_disks) - if plan: - print(f"\nPlan generated: {plan['file_count']} files, {tool.format_size(plan['total_size'])}") - print(f"Destination disks: {', '.join(plan['destination_disks'])}") - elif args.command == 'execute': - tool.execute_migration(args.plan_file, dry_run=args.dry_run) - elif args.command == 'profile': - tool.profile_content(disk=args.disk, update_db=args.update, limit=args.limit) - elif args.command == 'extract': - tool.extract_content(kind=args.kind, limit=args.limit) - elif args.command == 'parse': - tool.parse_files(kind=args.kind, limit=args.limit, update_db=args.update) - elif args.command == 'enrich': - tool.enrich_files(limit=args.limit, use_llm=args.use_llm, use_local=not args.network) - elif args.command == 'classify': - tool.classify_files(disk=args.disk, update_db=args.update, resume=not args.no_resume) - elif args.command == 'analyze-folders': - tool.analyze_folders(disk=args.disk, min_files=args.min_files) - elif args.command == 'search': - tool.search_content(query=args.query, limit=args.limit, search_type=args.type) - elif args.command == 'review': - tool.review_migration(category=args.category, show_build=args.show_build) - elif args.command == 'report': - tool.generate_report(format=args.format, show_duplicates=args.show_duplicates, preview_merge=args.preview_merge) -if __name__ == '__main__': - main() diff --git a/app/parsers/audio_parser.py b/app/parsers/audio_parser.py new file mode 100644 index 0000000..5c52de3 --- /dev/null +++ b/app/parsers/audio_parser.py @@ -0,0 +1,62 @@ +from pathlib import Path +from typing import Dict +import logging + +logger = logging.getLogger(__name__) + +class AudioParser: + def __init__(self, whisper_model: str = 'base'): + self.supported_formats = {'.mp3', '.wav', '.flac', '.m4a', '.ogg', '.wma', '.aac'} + self.whisper_model = whisper_model + + def parse(self, file_path: Path) -> Dict: + if file_path.suffix.lower() not in self.supported_formats: + return {'error': f'Unsupported format: {file_path.suffix}'} + + try: + return self._transcribe_with_whisper(file_path) + except Exception as e: + logger.error(f'Audio parse failed for {file_path}: {e}') + return {'error': str(e), 'text': ''} + + def _transcribe_with_whisper(self, file_path: Path) -> Dict: + try: + import whisper + + model = whisper.load_model(self.whisper_model) + result = model.transcribe(str(file_path)) + + return { + 'text': result['text'].strip(), + 'quality': 'good', + 'method': f'whisper-{self.whisper_model}', + 'language': result.get('language', 'unknown'), + 'segments': len(result.get('segments', [])), + 'metadata': { + 'duration': result.get('duration'), + 'language': result.get('language') + } + } + except ImportError: + logger.warning('Whisper not installed') + return {'error': 'Whisper not installed', 'text': '', 'needs': 'pip install openai-whisper'} + except Exception as e: + return {'error': str(e), 'text': ''} + + def extract_metadata(self, file_path: Path) -> Dict: + try: + import mutagen + audio = mutagen.File(str(file_path)) + if audio is None: + return {'error': 'Could not read audio file'} + + return { + 'duration': audio.info.length if hasattr(audio.info, 'length') else None, + 'bitrate': audio.info.bitrate if hasattr(audio.info, 'bitrate') else None, + 'sample_rate': audio.info.sample_rate if hasattr(audio.info, 'sample_rate') else None, + 'channels': audio.info.channels if hasattr(audio.info, 'channels') else None + } + except ImportError: + return {'error': 'mutagen not installed', 'needs': 'pip install mutagen'} + except Exception as e: + return {'error': str(e)} diff --git a/app/parsers/document_parser.py b/app/parsers/document_parser.py new file mode 100644 index 0000000..a0f5ae0 --- /dev/null +++ b/app/parsers/document_parser.py @@ -0,0 +1,60 @@ +from pathlib import Path +from typing import Dict +import logging +import subprocess + +logger = logging.getLogger(__name__) + +class DocumentParser: + def __init__(self): + self.supported_formats = {'.doc', '.docx', '.odt', '.rtf'} + + def parse(self, file_path: Path) -> Dict: + if file_path.suffix.lower() not in self.supported_formats: + return {'error': f'Unsupported format: {file_path.suffix}'} + + try: + if file_path.suffix.lower() in {'.docx', '.odt'}: + return self._parse_with_python(file_path) + else: + return self._parse_with_external(file_path) + except Exception as e: + logger.error(f'Document parse failed for {file_path}: {e}') + return {'error': str(e), 'text': ''} + + def _parse_with_python(self, file_path: Path) -> Dict: + try: + if file_path.suffix.lower() == '.docx': + import docx + doc = docx.Document(str(file_path)) + text = '\n'.join([para.text for para in doc.paragraphs if para.text.strip()]) + return {'text': text, 'quality': 'good', 'method': 'python-docx'} + elif file_path.suffix.lower() == '.odt': + from odf import text as odf_text, teletype + from odf.opendocument import load + doc = load(str(file_path)) + paragraphs = doc.getElementsByType(odf_text.P) + text = '\n'.join([teletype.extractText(p) for p in paragraphs if teletype.extractText(p).strip()]) + return {'text': text, 'quality': 'good', 'method': 'odfpy'} + except ImportError as ie: + logger.warning(f'Missing library for {file_path.suffix}: {ie}') + return {'error': f'Missing library: {ie}', 'text': '', 'needs': 'python-docx or odfpy'} + except Exception as e: + return {'error': str(e), 'text': ''} + + def _parse_with_external(self, file_path: Path) -> Dict: + try: + result = subprocess.run( + ['antiword', str(file_path)], + capture_output=True, + text=True, + timeout=30 + ) + if result.returncode == 0: + return {'text': result.stdout, 'quality': 'good', 'method': 'antiword'} + else: + return {'error': 'antiword failed', 'text': '', 'needs': 'antiword tool'} + except FileNotFoundError: + return {'error': 'antiword not installed', 'text': '', 'needs': 'sudo apt install antiword'} + except Exception as e: + return {'error': str(e), 'text': ''} diff --git a/app/parsers/image_parser.py b/app/parsers/image_parser.py new file mode 100644 index 0000000..c095051 --- /dev/null +++ b/app/parsers/image_parser.py @@ -0,0 +1,63 @@ +from pathlib import Path +from typing import Dict +import logging + +logger = logging.getLogger(__name__) + +class ImageParser: + def __init__(self): + self.supported_formats = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'} + + def parse(self, file_path: Path) -> Dict: + if file_path.suffix.lower() not in self.supported_formats: + return {'error': f'Unsupported format: {file_path.suffix}'} + + try: + return self._parse_with_ocr(file_path) + except Exception as e: + logger.error(f'Image parse failed for {file_path}: {e}') + return {'error': str(e), 'text': ''} + + def _parse_with_ocr(self, file_path: Path) -> Dict: + try: + from PIL import Image + import pytesseract + + img = Image.open(str(file_path)) + text = pytesseract.image_to_string(img) + + data = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT) + conf_scores = [int(c) for c in data['conf'] if c != '-1'] + avg_confidence = sum(conf_scores) / len(conf_scores) if conf_scores else 0 + + quality = 'good' if avg_confidence > 80 else 'medium' if avg_confidence > 60 else 'low' + + return { + 'text': text.strip(), + 'quality': quality, + 'confidence': avg_confidence, + 'method': 'tesseract', + 'metadata': { + 'width': img.width, + 'height': img.height, + 'format': img.format + } + } + except ImportError as ie: + logger.warning(f'Missing library for OCR: {ie}') + return {'error': f'Missing library: {ie}', 'text': '', 'needs': 'pytesseract and tesseract-ocr'} + except Exception as e: + return {'error': str(e), 'text': ''} + + def extract_metadata(self, file_path: Path) -> Dict: + try: + from PIL import Image + img = Image.open(str(file_path)) + return { + 'width': img.width, + 'height': img.height, + 'format': img.format, + 'mode': img.mode + } + except Exception as e: + return {'error': str(e)} diff --git a/requirements.txt b/requirements.txt index ace4d77..7534558 100644 --- a/requirements.txt +++ b/requirements.txt @@ -39,4 +39,6 @@ mypy>=1.0.0 flake8>=6.0.0 chardet -pillow \ No newline at end of file +pillow +requests +openai-whisper \ No newline at end of file diff --git a/sql/migrations/003_content_graph.sql b/sql/migrations/003_content_graph.sql new file mode 100644 index 0000000..5bc5638 --- /dev/null +++ b/sql/migrations/003_content_graph.sql @@ -0,0 +1,76 @@ +CREATE TABLE IF NOT EXISTS content_nodes ( + id UUID DEFAULT gen_random_uuid() PRIMARY KEY, + node_type VARCHAR(50) NOT NULL, + path TEXT NOT NULL, + disk_label VARCHAR(50), + parent_id UUID REFERENCES content_nodes(id) ON DELETE CASCADE, + + checksum VARCHAR(64), + size BIGINT, + modified_time TIMESTAMP, + + content_hash VARCHAR(64), + extracted_at TIMESTAMP, + extraction_method VARCHAR(100), + + metadata JSONB, + + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + + UNIQUE(node_type, path, disk_label) +); + +CREATE INDEX IF NOT EXISTS idx_content_nodes_type ON content_nodes(node_type); +CREATE INDEX IF NOT EXISTS idx_content_nodes_path ON content_nodes(path); +CREATE INDEX IF NOT EXISTS idx_content_nodes_parent ON content_nodes(parent_id); +CREATE INDEX IF NOT EXISTS idx_content_nodes_checksum ON content_nodes(checksum); +CREATE INDEX IF NOT EXISTS idx_content_nodes_content_hash ON content_nodes(content_hash); + +CREATE TABLE IF NOT EXISTS content_edges ( + id UUID DEFAULT gen_random_uuid() PRIMARY KEY, + source_id UUID NOT NULL REFERENCES content_nodes(id) ON DELETE CASCADE, + target_id UUID NOT NULL REFERENCES content_nodes(id) ON DELETE CASCADE, + edge_type VARCHAR(50) NOT NULL, + + metadata JSONB, + confidence FLOAT, + + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + + UNIQUE(source_id, target_id, edge_type) +); + +CREATE INDEX IF NOT EXISTS idx_content_edges_source ON content_edges(source_id); +CREATE INDEX IF NOT EXISTS idx_content_edges_target ON content_edges(target_id); +CREATE INDEX IF NOT EXISTS idx_content_edges_type ON content_edges(edge_type); + +CREATE TABLE IF NOT EXISTS extraction_log ( + id UUID DEFAULT gen_random_uuid() PRIMARY KEY, + node_id UUID REFERENCES content_nodes(id) ON DELETE CASCADE, + file_path TEXT NOT NULL, + file_checksum VARCHAR(64), + + extraction_method VARCHAR(100), + status VARCHAR(50), + error_message TEXT, + + extracted_size BIGINT, + processing_time_ms INT, + + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_extraction_log_node ON extraction_log(node_id); +CREATE INDEX IF NOT EXISTS idx_extraction_log_file ON extraction_log(file_path); +CREATE INDEX IF NOT EXISTS idx_extraction_log_checksum ON extraction_log(file_checksum); +CREATE INDEX IF NOT EXISTS idx_extraction_log_status ON extraction_log(status); +CREATE INDEX IF NOT EXISTS idx_extraction_log_created ON extraction_log(created_at DESC); + +COMMENT ON TABLE content_nodes IS 'Content graph nodes: directories, files, chunks'; +COMMENT ON TABLE content_edges IS 'Content graph edges: contains, derived_from, references, duplicates'; +COMMENT ON TABLE extraction_log IS 'Tracks extraction history for incremental updates'; + +COMMENT ON COLUMN content_nodes.node_type IS 'directory, file, chunk, embedding'; +COMMENT ON COLUMN content_nodes.content_hash IS 'Hash of extracted content (not file bytes)'; +COMMENT ON COLUMN content_edges.edge_type IS 'contains, derived_from, references, duplicates, similar_to';