From 9759001f4c3469968eec687f1980435a268962ca Mon Sep 17 00:00:00 2001 From: mike Date: Sat, 13 Dec 2025 04:23:04 +0100 Subject: [PATCH] remove_doc --- app/content/__init__.py | 3 + app/content/extractors.py | 58 +- app/content/profiler.py | 63 +- app/discovery/__init__.py | 14 +- app/discovery/_protocols.py | 17 - app/discovery/engine.py | 240 +---- app/discovery/scanner.py | 114 +-- app/discovery/system.py | 139 +-- app/main.py | 1734 ++++++++++++++--------------------- 9 files changed, 741 insertions(+), 1641 deletions(-) create mode 100644 app/content/__init__.py diff --git a/app/content/__init__.py b/app/content/__init__.py new file mode 100644 index 0000000..674d9ad --- /dev/null +++ b/app/content/__init__.py @@ -0,0 +1,3 @@ +from .profiler import ContentProfiler +from .extractors import ContentExtractor +__all__ = ['ContentProfiler', 'ContentExtractor'] diff --git a/app/content/extractors.py b/app/content/extractors.py index bb55212..b750c98 100644 --- a/app/content/extractors.py +++ b/app/content/extractors.py @@ -3,22 +3,14 @@ from typing import Dict, Optional import json class ContentExtractor: + def __init__(self): - self.extractors = { - 'pdf_text': self._extract_pdf, - 'ocr+caption': self._extract_image, - 'transcribe': self._extract_audio, - 'transcribe+scenes': self._extract_video, - 'office_text': self._extract_document, - 'read': self._extract_text, - 'read+syntax': self._extract_code - } + self.extractors = {'pdf_text': self._extract_pdf, 'ocr+caption': self._extract_image, 'transcribe': self._extract_audio, 'transcribe+scenes': self._extract_video, 'office_text': self._extract_document, 'read': self._extract_text, 'read+syntax': self._extract_code} def extract(self, file_path: Path, extractor_type: str) -> Dict: extractor = self.extractors.get(extractor_type) if not extractor: return {'error': f'Unknown extractor: {extractor_type}'} - try: return extractor(file_path) except Exception as e: @@ -28,11 +20,7 @@ class ContentExtractor: try: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read(1024 * 1024) - return { - 'text': content, - 'char_count': len(content), - 'needs_llm': False - } + return {'text': content, 'char_count': len(content), 'needs_llm': False} except Exception as e: return {'error': str(e)} @@ -51,54 +39,24 @@ class ContentExtractor: pdf = PyPDF2.PdfReader(f) for page in pdf.pages[:10]: text_parts.append(page.extract_text()) - text = '\n'.join(text_parts) - return { - 'text': text, - 'pages_extracted': len(text_parts), - 'needs_llm': len(text.strip()) > 100, - 'type': 'document' - } + return {'text': text, 'pages_extracted': len(text_parts), 'needs_llm': len(text.strip()) > 100, 'type': 'document'} except Exception as e: return {'error': str(e), 'needs_ocr': True} def _extract_image(self, file_path: Path) -> Dict: - return { - 'type': 'image', - 'needs_ocr': True, - 'needs_caption': True, - 'needs_llm': True, - 'pipeline': ['ocr', 'caption', 'embedding'], - 'status': 'pending' - } + return {'type': 'image', 'needs_ocr': True, 'needs_caption': True, 'needs_llm': True, 'pipeline': ['ocr', 'caption', 'embedding'], 'status': 'pending'} def _extract_audio(self, file_path: Path) -> Dict: - return { - 'type': 'audio', - 'needs_transcription': True, - 'needs_llm': True, - 'pipeline': ['transcribe', 'summarize'], - 'status': 'pending' - } + return {'type': 'audio', 'needs_transcription': True, 'needs_llm': True, 'pipeline': ['transcribe', 'summarize'], 'status': 'pending'} def _extract_video(self, file_path: Path) -> Dict: - return { - 'type': 'video', - 'needs_transcription': True, - 'needs_scene_detection': True, - 'needs_llm': True, - 'pipeline': ['transcribe', 'scenes', 'summarize'], - 'status': 'pending' - } + return {'type': 'video', 'needs_transcription': True, 'needs_scene_detection': True, 'needs_llm': True, 'pipeline': ['transcribe', 'scenes', 'summarize'], 'status': 'pending'} def _extract_document(self, file_path: Path) -> Dict: try: import textract text = textract.process(str(file_path)).decode('utf-8') - return { - 'text': text, - 'type': 'document', - 'needs_llm': len(text.strip()) > 100 - } + return {'text': text, 'type': 'document', 'needs_llm': len(text.strip()) > 100} except: return {'error': 'textract failed', 'needs_llm': True} diff --git a/app/content/profiler.py b/app/content/profiler.py index 75e0776..274df5b 100644 --- a/app/content/profiler.py +++ b/app/content/profiler.py @@ -6,21 +6,10 @@ import json from datetime import datetime class ContentProfiler: + def __init__(self): self.mime_detector = magic.Magic(mime=True) - - self.kind_mapping = { - 'text': ['text/plain', 'text/html', 'text/css', 'text/javascript', 'text/markdown'], - 'code': ['application/x-python', 'application/javascript', 'text/x-java', 'text/x-c'], - 'pdf': ['application/pdf'], - 'image': ['image/jpeg', 'image/png', 'image/gif', 'image/webp', 'image/svg+xml'], - 'audio': ['audio/mpeg', 'audio/wav', 'audio/ogg', 'audio/flac'], - 'video': ['video/mp4', 'video/x-matroska', 'video/avi', 'video/webm'], - 'archive': ['application/zip', 'application/x-tar', 'application/gzip', 'application/x-7z-compressed'], - 'document': ['application/msword', 'application/vnd.openxmlformats-officedocument'], - 'spreadsheet': ['application/vnd.ms-excel', 'text/csv'] - } - + self.kind_mapping = {'text': ['text/plain', 'text/html', 'text/css', 'text/javascript', 'text/markdown'], 'code': ['application/x-python', 'application/javascript', 'text/x-java', 'text/x-c'], 'pdf': ['application/pdf'], 'image': ['image/jpeg', 'image/png', 'image/gif', 'image/webp', 'image/svg+xml'], 'audio': ['audio/mpeg', 'audio/wav', 'audio/ogg', 'audio/flac'], 'video': ['video/mp4', 'video/x-matroska', 'video/avi', 'video/webm'], 'archive': ['application/zip', 'application/x-tar', 'application/gzip', 'application/x-7z-compressed'], 'document': ['application/msword', 'application/vnd.openxmlformats-officedocument'], 'spreadsheet': ['application/vnd.ms-excel', 'text/csv']} self.text_exts = {'.txt', '.md', '.rst', '.log', '.json', '.xml', '.yaml', '.yml', '.toml', '.ini', '.cfg'} self.code_exts = {'.py', '.js', '.ts', '.java', '.go', '.rs', '.c', '.cpp', '.h', '.cs', '.rb', '.php'} self.processable_kinds = {'text', 'code', 'pdf', 'image', 'audio', 'video', 'document'} @@ -30,29 +19,12 @@ class ContentProfiler: stat = file_path.stat() size = stat.st_size mtime = datetime.fromtimestamp(stat.st_mtime) - mime_type = self._detect_mime(file_path) kind = self._determine_kind(file_path, mime_type) - - profile = { - 'path': str(file_path), - 'size': size, - 'mtime': mtime.isoformat(), - 'mime': mime_type, - 'kind': kind, - 'processable': kind in self.processable_kinds, - 'extractor': self._suggest_extractor(kind, mime_type), - 'hints': self._extract_hints(file_path, kind, mime_type, size) - } - + profile = {'path': str(file_path), 'size': size, 'mtime': mtime.isoformat(), 'mime': mime_type, 'kind': kind, 'processable': kind in self.processable_kinds, 'extractor': self._suggest_extractor(kind, mime_type), 'hints': self._extract_hints(file_path, kind, mime_type, size)} return profile - except Exception as e: - return { - 'path': str(file_path), - 'error': str(e), - 'processable': False - } + return {'path': str(file_path), 'error': str(e), 'processable': False} def _detect_mime(self, file_path: Path) -> str: try: @@ -63,61 +35,42 @@ class ContentProfiler: def _determine_kind(self, file_path: Path, mime_type: str) -> str: for kind, mimes in self.kind_mapping.items(): - if any(mime in mime_type for mime in mimes): + if any((mime in mime_type for mime in mimes)): return kind - suffix = file_path.suffix.lower() if suffix in self.text_exts: return 'text' if suffix in self.code_exts: return 'code' - return 'unknown' def _suggest_extractor(self, kind: str, mime_type: str) -> Optional[str]: - extractors = { - 'pdf': 'pdf_text', - 'image': 'ocr+caption', - 'audio': 'transcribe', - 'video': 'transcribe+scenes', - 'document': 'office_text', - 'text': 'read', - 'code': 'read+syntax' - } + extractors = {'pdf': 'pdf_text', 'image': 'ocr+caption', 'audio': 'transcribe', 'video': 'transcribe+scenes', 'document': 'office_text', 'text': 'read', 'code': 'read+syntax'} return extractors.get(kind) def _extract_hints(self, file_path: Path, kind: str, mime_type: str, size: int) -> Dict: hints = {} - if kind == 'text' or kind == 'code': hints['language'] = self._guess_language(file_path) if size < 1024 * 1024: hints['lines'] = self._count_lines(file_path) - if kind == 'pdf': hints['page_count'] = self._get_pdf_pages(file_path) - if kind in ['audio', 'video']: hints['duration'] = self._get_media_duration(file_path) - if kind == 'image': hints['has_exif'] = self._has_exif(file_path) hints['dimensions'] = self._get_image_dimensions(file_path) - return hints def _guess_language(self, file_path: Path) -> Optional[str]: - lang_map = { - '.py': 'python', '.js': 'javascript', '.ts': 'typescript', - '.java': 'java', '.go': 'go', '.rs': 'rust', '.c': 'c', - '.cpp': 'cpp', '.cs': 'csharp', '.rb': 'ruby', '.php': 'php' - } + lang_map = {'.py': 'python', '.js': 'javascript', '.ts': 'typescript', '.java': 'java', '.go': 'go', '.rs': 'rust', '.c': 'c', '.cpp': 'cpp', '.cs': 'csharp', '.rb': 'ruby', '.php': 'php'} return lang_map.get(file_path.suffix.lower()) def _count_lines(self, file_path: Path) -> Optional[int]: try: with open(file_path, 'rb') as f: - return sum(1 for _ in f) + return sum((1 for _ in f)) except: return None diff --git a/app/discovery/__init__.py b/app/discovery/__init__.py index 575a15d..22bbd0f 100644 --- a/app/discovery/__init__.py +++ b/app/discovery/__init__.py @@ -1,17 +1,5 @@ -"""Discovery package exports""" from .scanner import FileScanner, FilteredScanner from .system import SystemAPI from .engine import DiscoveryEngine from ._protocols import FileMeta, MountInfo, DiskInfo, IFileScanner, ISystemAPI - -__all__ = [ - 'FileScanner', - 'FilteredScanner', - 'SystemAPI', - 'DiscoveryEngine', - 'FileMeta', - 'MountInfo', - 'DiskInfo', - 'IFileScanner', - 'ISystemAPI', -] +__all__ = ['FileScanner', 'FilteredScanner', 'SystemAPI', 'DiscoveryEngine', 'FileMeta', 'MountInfo', 'DiskInfo', 'IFileScanner', 'ISystemAPI'] diff --git a/app/discovery/_protocols.py b/app/discovery/_protocols.py index 45898bf..7705915 100644 --- a/app/discovery/_protocols.py +++ b/app/discovery/_protocols.py @@ -1,54 +1,37 @@ -"""Protocol definitions for the discovery package""" from typing import Iterator, Protocol, Any from pathlib import Path from dataclasses import dataclass - @dataclass class FileMeta: - """Metadata for a discovered file""" path: Path size: int modified_time: float created_time: float - # Add other metadata fields as needed - @dataclass class MountInfo: - """Information about a mounted filesystem""" device: str mount_point: str fs_type: str options: str - # Add other mount info fields as needed - @dataclass class DiskInfo: - """Information about a disk/NVMe device""" device: str model: str size: int serial: str - # Add other disk info fields as needed - class IFileScanner(Protocol): - """Protocol for file scanning operations""" def scan(self, root: Path) -> Iterator[FileMeta]: - """Scan a directory tree and yield file metadata""" ... - class ISystemAPI(Protocol): - """Protocol for system information queries""" def query_mounts(self) -> list[MountInfo]: - """Query mounted filesystems""" ... def query_nvmes(self) -> list[DiskInfo]: - """Query NVMe/disk information""" ... diff --git a/app/discovery/engine.py b/app/discovery/engine.py index cd2891a..cf9aecd 100644 --- a/app/discovery/engine.py +++ b/app/discovery/engine.py @@ -1,10 +1,8 @@ -"""Discovery engine coordinating scanner and system APIs""" from pathlib import Path from typing import Optional, Callable from datetime import datetime import psycopg2 from psycopg2.extras import execute_batch - from .scanner import FileScanner from .system import SystemAPI from ._protocols import FileMeta @@ -12,23 +10,9 @@ from ..shared.models import FileRecord, DiskInfo, ProcessingStats from ..shared.config import DatabaseConfig from ..shared.logger import ProgressLogger - class DiscoveryEngine: - """Discovery engine for scanning and cataloging files""" - def __init__( - self, - db_config: DatabaseConfig, - logger: ProgressLogger, - batch_size: int = 1000 - ): - """Initialize discovery engine - - Args: - db_config: Database configuration - logger: Progress logger - batch_size: Number of records to batch before database commit - """ + def __init__(self, db_config: DatabaseConfig, logger: ProgressLogger, batch_size: int=1000): self.db_config = db_config self.logger = logger self.batch_size = batch_size @@ -36,286 +20,114 @@ class DiscoveryEngine: self._connection = None def _get_connection(self): - """Get or create database connection""" if self._connection is None or self._connection.closed: - self._connection = psycopg2.connect( - host=self.db_config.host, - port=self.db_config.port, - database=self.db_config.database, - user=self.db_config.user, - password=self.db_config.password - ) + self._connection = psycopg2.connect(host=self.db_config.host, port=self.db_config.port, database=self.db_config.database, user=self.db_config.user, password=self.db_config.password) return self._connection def _ensure_tables(self): - """Ensure database tables exist""" conn = self._get_connection() cursor = conn.cursor() - - # Create files table - cursor.execute(""" - CREATE TABLE IF NOT EXISTS files ( - id SERIAL PRIMARY KEY, - path TEXT NOT NULL UNIQUE, - size BIGINT NOT NULL, - modified_time DOUBLE PRECISION NOT NULL, - created_time DOUBLE PRECISION NOT NULL, - disk_label TEXT NOT NULL, - checksum TEXT, - status TEXT DEFAULT 'indexed', - category TEXT, - duplicate_of TEXT, - discovered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ) - """) - - # Create index on path - cursor.execute(""" - CREATE INDEX IF NOT EXISTS idx_files_path ON files(path) - """) - - # Create index on disk - cursor.execute(""" - CREATE INDEX IF NOT EXISTS idx_files_disk ON files(disk_label) - """) - - # Create index on checksum - cursor.execute(""" - CREATE INDEX IF NOT EXISTS idx_files_checksum ON files(checksum) - """) - + cursor.execute("\n CREATE TABLE IF NOT EXISTS files (\n id SERIAL PRIMARY KEY,\n path TEXT NOT NULL UNIQUE,\n size BIGINT NOT NULL,\n modified_time DOUBLE PRECISION NOT NULL,\n created_time DOUBLE PRECISION NOT NULL,\n disk_label TEXT NOT NULL,\n checksum TEXT,\n status TEXT DEFAULT 'indexed',\n category TEXT,\n duplicate_of TEXT,\n discovered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,\n updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n )\n ") + cursor.execute('\n CREATE INDEX IF NOT EXISTS idx_files_path ON files(path)\n ') + cursor.execute('\n CREATE INDEX IF NOT EXISTS idx_files_disk ON files(disk_label)\n ') + cursor.execute('\n CREATE INDEX IF NOT EXISTS idx_files_checksum ON files(checksum)\n ') conn.commit() cursor.close() - def discover_path( - self, - root: Path, - scanner: Optional[FileScanner] = None, - progress_callback: Optional[Callable[[int, int, ProcessingStats], None]] = None - ) -> ProcessingStats: - """Discover and catalog files in a path - - Args: - root: Root path to discover - scanner: Optional custom scanner (default: FileScanner()) - progress_callback: Optional callback for progress updates - - Returns: - ProcessingStats with discovery statistics - """ - self.logger.section(f"Discovering: {root}") - - # Ensure tables exist + def discover_path(self, root: Path, scanner: Optional[FileScanner]=None, progress_callback: Optional[Callable[[int, int, ProcessingStats], None]]=None) -> ProcessingStats: + self.logger.section(f'Discovering: {root}') self._ensure_tables() - - # Create scanner if not provided if scanner is None: - scanner = FileScanner( - error_handler=lambda e, p: self.logger.warning(f"Error scanning {p}: {e}") - ) - - # Get disk info for the root path + scanner = FileScanner(error_handler=lambda e, p: self.logger.warning(f'Error scanning {p}: {e}')) disk = self.system_api.get_disk_for_path(root) if disk is None: disk = str(root) - - # Initialize statistics stats = ProcessingStats() batch = [] - conn = self._get_connection() cursor = conn.cursor() - try: - # Scan files for file_meta in scanner.scan(root): - # Create file record - record = FileRecord( - path=file_meta.path, - size=file_meta.size, - modified_time=file_meta.modified_time, - created_time=file_meta.created_time, - disk_label=disk - ) - + record = FileRecord(path=file_meta.path, size=file_meta.size, modified_time=file_meta.modified_time, created_time=file_meta.created_time, disk_label=disk) batch.append(record) stats.files_processed += 1 stats.bytes_processed += record.size - - # Batch insert if len(batch) >= self.batch_size: self._insert_batch(cursor, batch) conn.commit() batch.clear() - - # Progress callback if progress_callback: progress_callback(stats.files_processed, 0, stats) - - # Log progress if stats.files_processed % (self.batch_size * 10) == 0: - self.logger.progress( - stats.files_processed, - stats.files_processed, # We don't know total - prefix="Files discovered", - bytes_processed=stats.bytes_processed, - elapsed_seconds=stats.elapsed_seconds - ) - - # Insert remaining batch + self.logger.progress(stats.files_processed, stats.files_processed, prefix='Files discovered', bytes_processed=stats.bytes_processed, elapsed_seconds=stats.elapsed_seconds) if batch: self._insert_batch(cursor, batch) conn.commit() - stats.files_succeeded = stats.files_processed - except Exception as e: conn.rollback() - self.logger.error(f"Discovery failed: {e}") + self.logger.error(f'Discovery failed: {e}') raise - finally: cursor.close() - - self.logger.info( - f"Discovery complete: {stats.files_processed} files, " - f"{stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s" - ) - + self.logger.info(f'Discovery complete: {stats.files_processed} files, {stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s') return stats def _insert_batch(self, cursor, batch: list[FileRecord]): - """Insert batch of file records - - Args: - cursor: Database cursor - batch: List of FileRecord objects - """ - query = """ - INSERT INTO files (path, size, modified_time, created_time, disk_label, checksum, status, category, duplicate_of) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) - ON CONFLICT (path) DO UPDATE SET - size = EXCLUDED.size, - modified_time = EXCLUDED.modified_time, - updated_at = CURRENT_TIMESTAMP - """ - - data = [ - ( - str(record.path), - record.size, - record.modified_time, - record.created_time, - record.disk_label, - record.checksum, - record.status, - record.category, - record.duplicate_of - ) - for record in batch - ] - + query = '\n INSERT INTO files (path, size, modified_time, created_time, disk_label, checksum, status, category, duplicate_of)\n VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)\n ON CONFLICT (path) DO UPDATE SET\n size = EXCLUDED.size,\n modified_time = EXCLUDED.modified_time,\n updated_at = CURRENT_TIMESTAMP\n ' + data = [(str(record.path), record.size, record.modified_time, record.created_time, record.disk_label, record.checksum, record.status, record.category, record.duplicate_of) for record in batch] execute_batch(cursor, query, data, page_size=self.batch_size) def get_disk_info(self) -> list[DiskInfo]: - """Get information about all disks - - Returns: - List of DiskInfo objects - """ - self.logger.subsection("Querying disk information") - + self.logger.subsection('Querying disk information') disks = [] for disk_info in self.system_api.query_nvmes(): - # Get mount point if available mount_point = None - fs_type = "unknown" - + fs_type = 'unknown' for mount in self.system_api.query_mounts(): if mount.device == disk_info.device: mount_point = Path(mount.mount_point) fs_type = mount.fs_type break - if mount_point: total, used, free = self.system_api.get_disk_usage(mount_point) else: total = disk_info.size used = 0 free = disk_info.size - - disk = DiskInfo( - name=disk_info.device, - device=disk_info.device, - mount_point=mount_point or Path("/"), - total_size=total, - used_size=used, - free_size=free, - fs_type=fs_type - ) + disk = DiskInfo(name=disk_info.device, device=disk_info.device, mount_point=mount_point or Path('/'), total_size=total, used_size=used, free_size=free, fs_type=fs_type) disks.append(disk) - - self.logger.info( - f" {disk.name}: {disk.usage_percent:.1f}% used " - f"({disk.used_size:,} / {disk.total_size:,} bytes)" - ) - + self.logger.info(f' {disk.name}: {disk.usage_percent:.1f}% used ({disk.used_size:,} / {disk.total_size:,} bytes)') return disks - def get_file_count(self, disk: Optional[str] = None) -> int: - """Get count of discovered files - - Args: - disk: Optional disk filter - - Returns: - Count of files - """ + def get_file_count(self, disk: Optional[str]=None) -> int: conn = self._get_connection() cursor = conn.cursor() - if disk: - cursor.execute("SELECT COUNT(*) FROM files WHERE disk_label = %s", (disk,)) + cursor.execute('SELECT COUNT(*) FROM files WHERE disk_label = %s', (disk,)) else: - cursor.execute("SELECT COUNT(*) FROM files") - + cursor.execute('SELECT COUNT(*) FROM files') count = cursor.fetchone()[0] cursor.close() - return count - def get_total_size(self, disk: Optional[str] = None) -> int: - """Get total size of discovered files - - Args: - disk: Optional disk filter - - Returns: - Total size in bytes - """ + def get_total_size(self, disk: Optional[str]=None) -> int: conn = self._get_connection() cursor = conn.cursor() - if disk: - cursor.execute("SELECT COALESCE(SUM(size), 0) FROM files WHERE disk_label = %s", (disk,)) + cursor.execute('SELECT COALESCE(SUM(size), 0) FROM files WHERE disk_label = %s', (disk,)) else: - cursor.execute("SELECT COALESCE(SUM(size), 0) FROM files") - + cursor.execute('SELECT COALESCE(SUM(size), 0) FROM files') total = cursor.fetchone()[0] cursor.close() - return total def close(self): - """Close database connection""" - if self._connection and not self._connection.closed: + if self._connection and (not self._connection.closed): self._connection.close() def __enter__(self): - """Context manager entry""" return self def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager exit""" self.close() diff --git a/app/discovery/scanner.py b/app/discovery/scanner.py index f2fc338..f874571 100644 --- a/app/discovery/scanner.py +++ b/app/discovery/scanner.py @@ -1,28 +1,12 @@ -"""File system scanner implementing IFileScanner protocol""" import os from pathlib import Path from typing import Iterator, Optional, Callable from datetime import datetime - from ._protocols import FileMeta - class FileScanner: - """File system scanner with filtering and error handling""" - def __init__( - self, - follow_symlinks: bool = False, - skip_hidden: bool = True, - error_handler: Optional[Callable[[Exception, Path], None]] = None - ): - """Initialize file scanner - - Args: - follow_symlinks: Whether to follow symbolic links - skip_hidden: Whether to skip hidden files/directories - error_handler: Optional callback for handling errors during scan - """ + def __init__(self, follow_symlinks: bool=False, skip_hidden: bool=True, error_handler: Optional[Callable[[Exception, Path], None]]=None): self.follow_symlinks = follow_symlinks self.skip_hidden = skip_hidden self.error_handler = error_handler @@ -31,24 +15,14 @@ class FileScanner: self._errors = 0 def scan(self, root: Path) -> Iterator[FileMeta]: - """Scan a directory tree and yield file metadata - - Args: - root: Root directory to scan - - Yields: - FileMeta objects for each discovered file - """ if not root.exists(): - error = FileNotFoundError(f"Path does not exist: {root}") + error = FileNotFoundError(f'Path does not exist: {root}') if self.error_handler: self.error_handler(error, root) else: raise error return - if not root.is_dir(): - # If root is a file, just return its metadata try: yield self._get_file_meta(root) except Exception as e: @@ -58,115 +32,59 @@ class FileScanner: else: raise return - - # Walk directory tree for dirpath, dirnames, filenames in os.walk(root, followlinks=self.follow_symlinks): current_dir = Path(dirpath) - - # Filter directories if needed if self.skip_hidden: dirnames[:] = [d for d in dirnames if not d.startswith('.')] - - # Process files for filename in filenames: if self.skip_hidden and filename.startswith('.'): continue - file_path = current_dir / filename - try: - # Skip broken symlinks - if file_path.is_symlink() and not file_path.exists(): + if file_path.is_symlink() and (not file_path.exists()): continue - meta = self._get_file_meta(file_path) self._files_scanned += 1 self._bytes_scanned += meta.size - yield meta - except PermissionError as e: self._errors += 1 if self.error_handler: self.error_handler(e, file_path) - # Continue scanning continue - except Exception as e: self._errors += 1 if self.error_handler: self.error_handler(e, file_path) - # Continue scanning continue def _get_file_meta(self, path: Path) -> FileMeta: - """Get file metadata - - Args: - path: Path to file - - Returns: - FileMeta object with file metadata - - Raises: - OSError: If file cannot be accessed - """ stat = path.stat() - - # Get creation time (platform dependent) created_time = stat.st_ctime if hasattr(stat, 'st_birthtime'): created_time = stat.st_birthtime - - return FileMeta( - path=path, - size=stat.st_size, - modified_time=stat.st_mtime, - created_time=created_time - ) + return FileMeta(path=path, size=stat.st_size, modified_time=stat.st_mtime, created_time=created_time) @property def files_scanned(self) -> int: - """Get count of files scanned""" return self._files_scanned @property def bytes_scanned(self) -> int: - """Get total bytes scanned""" return self._bytes_scanned @property def errors(self) -> int: - """Get count of errors encountered""" return self._errors def reset_stats(self) -> None: - """Reset scanning statistics""" self._files_scanned = 0 self._bytes_scanned = 0 self._errors = 0 - class FilteredScanner(FileScanner): - """Scanner with additional filtering capabilities""" - def __init__( - self, - min_size: Optional[int] = None, - max_size: Optional[int] = None, - extensions: Optional[list[str]] = None, - exclude_patterns: Optional[list[str]] = None, - **kwargs - ): - """Initialize filtered scanner - - Args: - min_size: Minimum file size in bytes - max_size: Maximum file size in bytes - extensions: List of file extensions to include (e.g., ['.txt', '.py']) - exclude_patterns: List of path patterns to exclude - **kwargs: Additional arguments passed to FileScanner - """ + def __init__(self, min_size: Optional[int]=None, max_size: Optional[int]=None, extensions: Optional[list[str]]=None, exclude_patterns: Optional[list[str]]=None, **kwargs): super().__init__(**kwargs) self.min_size = min_size self.max_size = max_size @@ -174,41 +92,19 @@ class FilteredScanner(FileScanner): self.exclude_patterns = exclude_patterns or [] def scan(self, root: Path) -> Iterator[FileMeta]: - """Scan with additional filtering - - Args: - root: Root directory to scan - - Yields: - FileMeta objects for files matching filter criteria - """ for meta in super().scan(root): - # Size filtering if self.min_size is not None and meta.size < self.min_size: continue if self.max_size is not None and meta.size > self.max_size: continue - - # Extension filtering if self.extensions is not None: if meta.path.suffix.lower() not in self.extensions: continue - - # Exclude pattern filtering if self._should_exclude(meta.path): continue - yield meta def _should_exclude(self, path: Path) -> bool: - """Check if path matches any exclude pattern - - Args: - path: Path to check - - Returns: - True if path should be excluded - """ path_str = str(path) for pattern in self.exclude_patterns: if pattern in path_str: diff --git a/app/discovery/system.py b/app/discovery/system.py index 245baef..01f6621 100644 --- a/app/discovery/system.py +++ b/app/discovery/system.py @@ -1,167 +1,80 @@ -"""System API for querying mounts and disks""" import os import subprocess from pathlib import Path from typing import Optional import psutil - from ._protocols import MountInfo, DiskInfo - class SystemAPI: - """System information API for querying mounts and disks""" def query_mounts(self) -> list[MountInfo]: - """Query mounted filesystems - - Returns: - List of MountInfo objects for all mounted filesystems - """ mounts = [] - for partition in psutil.disk_partitions(all=False): - mount_info = MountInfo( - device=partition.device, - mount_point=partition.mountpoint, - fs_type=partition.fstype, - options=partition.opts - ) + mount_info = MountInfo(device=partition.device, mount_point=partition.mountpoint, fs_type=partition.fstype, options=partition.opts) mounts.append(mount_info) - return mounts def query_nvmes(self) -> list[DiskInfo]: - """Query NVMe/disk information - - Returns: - List of DiskInfo objects for all disks - """ disks = [] - - # Try to get disk information using lsblk try: - result = subprocess.run( - ['lsblk', '-ndo', 'NAME,MODEL,SIZE,SERIAL', '-b'], - capture_output=True, - text=True, - check=False - ) - + result = subprocess.run(['lsblk', '-ndo', 'NAME,MODEL,SIZE,SERIAL', '-b'], capture_output=True, text=True, check=False) if result.returncode == 0: for line in result.stdout.strip().split('\n'): if not line.strip(): continue - parts = line.split(maxsplit=3) if len(parts) >= 3: - device = f"/dev/{parts[0]}" - model = parts[1] if len(parts) > 1 else "Unknown" - size_str = parts[2] if len(parts) > 2 else "0" - serial = parts[3] if len(parts) > 3 else "Unknown" - + device = f'/dev/{parts[0]}' + model = parts[1] if len(parts) > 1 else 'Unknown' + size_str = parts[2] if len(parts) > 2 else '0' + serial = parts[3] if len(parts) > 3 else 'Unknown' try: size = int(size_str) except ValueError: size = 0 - - disk_info = DiskInfo( - device=device, - model=model, - size=size, - serial=serial - ) + disk_info = DiskInfo(device=device, model=model, size=size, serial=serial) disks.append(disk_info) - except FileNotFoundError: - # lsblk not available, fall back to basic info pass - - # If lsblk failed or unavailable, try alternative method if not disks: disks = self._query_disks_fallback() - return disks def _query_disks_fallback(self) -> list[DiskInfo]: - """Fallback method for querying disk information - - Returns: - List of DiskInfo objects using psutil - """ disks = [] seen_devices = set() - for partition in psutil.disk_partitions(all=True): device = partition.device - - # Skip non-disk devices if not device.startswith('/dev/'): continue - - # Get base device (e.g., /dev/sda from /dev/sda1) base_device = self._get_base_device(device) - if base_device in seen_devices: continue - seen_devices.add(base_device) - try: usage = psutil.disk_usage(partition.mountpoint) size = usage.total except (PermissionError, OSError): size = 0 - - disk_info = DiskInfo( - device=base_device, - model="Unknown", - size=size, - serial="Unknown" - ) + disk_info = DiskInfo(device=base_device, model='Unknown', size=size, serial='Unknown') disks.append(disk_info) - return disks def _get_base_device(self, device: str) -> str: - """Extract base device name from partition device - - Args: - device: Device path (e.g., /dev/sda1, /dev/nvme0n1p1) - - Returns: - Base device path (e.g., /dev/sda, /dev/nvme0n1) - """ - # Handle NVMe devices if 'nvme' in device: - # /dev/nvme0n1p1 -> /dev/nvme0n1 if 'p' in device: return device.rsplit('p', 1)[0] return device - - # Handle standard devices (sda, sdb, etc.) - # /dev/sda1 -> /dev/sda import re - match = re.match(r'(/dev/[a-z]+)', device) + match = re.match('(/dev/[a-z]+)', device) if match: return match.group(1) - return device def get_disk_for_path(self, path: Path) -> Optional[str]: - """Get the disk/mount point for a given path - - Args: - path: Path to check - - Returns: - Mount point device or None if not found - """ path = path.resolve() - - # Find the mount point that contains this path best_match = None best_match_len = 0 - for partition in psutil.disk_partitions(): mount_point = Path(partition.mountpoint) try: @@ -172,39 +85,19 @@ class SystemAPI: best_match_len = mount_len except (ValueError, OSError): continue - return best_match def get_disk_usage(self, path: Path) -> tuple[int, int, int]: - """Get disk usage for a path - - Args: - path: Path to check - - Returns: - Tuple of (total, used, free) in bytes - """ try: usage = psutil.disk_usage(str(path)) - return usage.total, usage.used, usage.free + return (usage.total, usage.used, usage.free) except (PermissionError, OSError): - return 0, 0, 0 + return (0, 0, 0) def get_mount_point(self, path: Path) -> Optional[Path]: - """Get the mount point for a given path - - Args: - path: Path to check - - Returns: - Mount point path or None if not found - """ path = path.resolve() - - # Find the mount point that contains this path best_match = None best_match_len = 0 - for partition in psutil.disk_partitions(): mount_point = Path(partition.mountpoint) try: @@ -215,19 +108,9 @@ class SystemAPI: best_match_len = mount_len except (ValueError, OSError): continue - return best_match def is_same_filesystem(self, path1: Path, path2: Path) -> bool: - """Check if two paths are on the same filesystem - - Args: - path1: First path - path2: Second path - - Returns: - True if paths are on the same filesystem - """ try: stat1 = path1.stat() stat2 = path2.stat() diff --git a/app/main.py b/app/main.py index 7c60bb0..545017c 100644 --- a/app/main.py +++ b/app/main.py @@ -1,1055 +1,679 @@ -#!/usr/bin/env python3 -import os -import sys -from dataclasses import dataclass - -import psycopg2 -import shutil -import hashlib -import argparse -import json -from pathlib import Path -from typing import List, Dict, Optional -from datetime import datetime -import logging -import time - -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s', - handlers=[logging.FileHandler('disk_reorganizer.log'), logging.StreamHandler(sys.stdout)] -) -logger = logging.getLogger(__name__) - -@dataclass -class FileRecord: - """Represents a file in the index""" - path: str - size: int - modified_time: float - disk_label: str - checksum: Optional[str] = None - status: str = 'indexed' # indexed, planned, moved, verified - -class DiskReorganizer: - def __init__(self, db_config: Dict = None): - """ - Initialize DiskReorganizer with PostgreSQL connection - :param db_config: Database configuration dict with host, port, database, user, password - """ - if db_config is None: - db_config = { - 'host': os.getenv('DB_HOST', '192.168.1.159'), - 'port': int(os.getenv('DB_PORT', 5432)), - 'database': os.getenv('DB_NAME', 'disk_reorganizer_db'), - 'user': os.getenv('DB_USER', 'disk_reorg_user'), - 'password': os.getenv('DB_PASSWORD', 'heel-goed-wachtwoord') - } - self.db_config = db_config - self.init_database() - - def get_connection(self): - """Get PostgreSQL database connection""" - return psycopg2.connect(**self.db_config) - - def init_database(self): - """Verify PostgreSQL database connection and tables exist""" - try: - conn = self.get_connection() - cursor = conn.cursor() - - # Test connection and verify tables exist - cursor.execute(""" - SELECT table_name FROM information_schema.tables - WHERE table_schema = 'public' AND table_name IN ('files', 'operations') - """) - tables = cursor.fetchall() - - if len(tables) < 2: - logger.error("Database tables not found! Please run setup_database.sh first.") - raise Exception("Database not properly initialized. Run setup_database.sh") - - cursor.close() - conn.close() - logger.info("Database connection verified successfully") - except psycopg2.Error as e: - logger.error(f"Database connection failed: {e}") - raise - - def index_disk(self, disk_root: str, disk_name: str): - """ - Index all files on a disk/partition with dynamic progress display - :param disk_root: Root path of disk (e.g., 'D:\\') - :param disk_name: Logical name for the disk - """ - logger.info(f"Indexing disk: {disk_name} at {disk_root}") - disk_path = Path(disk_root) - - if not disk_path.exists(): - logger.error(f"Disk path {disk_root} does not exist!") - return - - files_count = 0 - total_size = 0 - start_time = time.time() - - conn = self.get_connection() - cursor = conn.cursor() - - try: - # Walk through all files - for root, dirs, files in os.walk(disk_path): - # Skip system directories - dirs[:] = [d for d in dirs if not d.startswith(('$', 'System Volume Information', 'Recovery'))] - - for file in files: - try: - file_path = Path(root) / file - if not file_path.is_file(): - continue - - stat = file_path.stat() - size = stat.st_size - mtime = datetime.fromtimestamp(stat.st_mtime) - - # Calculate relative path for portability - rel_path = str(file_path.relative_to(disk_path)) - - # PostgreSQL INSERT ... ON CONFLICT for upsert - cursor.execute(""" - INSERT INTO files (path, size, modified_time, disk_label, checksum, status) - VALUES (%s, %s, %s, %s, %s, %s) - ON CONFLICT (path) DO UPDATE SET - size = EXCLUDED.size, - modified_time = EXCLUDED.modified_time, - disk_label = EXCLUDED.disk_label, - status = EXCLUDED.status - """, (rel_path, size, mtime, disk_name, None, 'indexed')) - - files_count += 1 - total_size += size - - # Dynamic progress display - update every 100 files - if files_count % 100 == 0: - elapsed = time.time() - start_time - rate = files_count / elapsed if elapsed > 0 else 0 - # Truncate path for display - display_path = str(file_path) - if len(display_path) > 60: - display_path = '...' + display_path[-57:] - - # Use \r to overwrite the line - print(f"\rIndexing: {files_count:,} files | {self.format_size(total_size)} | {rate:.0f} files/s | {display_path}", end='', flush=True) - - # Commit every 1000 files for performance - if files_count % 1000 == 0: - conn.commit() - - except Exception as e: - conn.rollback() - logger.warning(f"\nSkipping {file_path}: {e}") - continue - - conn.commit() - print() # New line after progress display - logger.info(f"Completed indexing {disk_name}: {files_count} files, {self.format_size(total_size)}") - - finally: - cursor.close() - conn.close() - - def calculate_disk_usage(self) -> Dict[str, Dict]: - """Calculate current usage per disk""" - conn = self.get_connection() - cursor = conn.cursor() - - try: - cursor.execute(""" - SELECT disk_label, SUM(size) as total_size, COUNT(*) as file_count - FROM files - GROUP BY disk_label - """) - - usage = {} - for row in cursor.fetchall(): - disk = row[0] - size = int(row[1] or 0) - count = int(row[2]) - usage[disk] = { - 'size': size, - 'count': count, - 'formatted_size': self.format_size(size) - } - - return usage - finally: - cursor.close() - conn.close() - - def plan_migration(self, target_disk: str, destination_disks: List[str]) -> Dict: - """ - Create a migration plan to free up target_disk - :param target_disk: Disk to free up (e.g., 'D:') - :param destination_disks: List of disks to move files to - :return: Migration plan dictionary - """ - logger.info(f"Planning migration to free up {target_disk}") - - usage = self.calculate_disk_usage() - - if target_disk not in usage: - logger.error(f"Target disk {target_disk} not found in index!") - return {} - - # Get files on target disk - conn = self.get_connection() - cursor = conn.cursor() - - cursor.execute( - "SELECT path, size, modified_time FROM files WHERE disk_label = %s ORDER BY size DESC", - (target_disk,) - ) - files_to_move = cursor.fetchall() - cursor.close() - conn.close() - - target_disk_usage = usage[target_disk]['size'] - logger.info(f"Need to move {len(files_to_move)} files, {self.format_size(target_disk_usage)}") - - # Calculate available space on destination disks - dest_availability = [] - for disk in destination_disks: - if disk not in usage: - # Assume empty disk - available = float('inf') - else: - # In real scenario, query actual disk free space - available = float('inf') # Placeholder - - dest_availability.append({ - 'disk': disk, - 'available': available, - 'planned_usage': 0 - }) - - # Generate move plan - plan = { - 'target_disk': target_disk, - 'total_size': target_disk_usage, - 'file_count': len(files_to_move), - 'operations': [], - 'destination_disks': destination_disks - } - - conn = self.get_connection() - cursor = conn.cursor() - - try: - for file_info in files_to_move: - rel_path, size, mtime = file_info - - # Find best destination (simple round-robin for balance) - dest_disk = destination_disks[len(plan['operations']) % len(destination_disks)] - - # Record operation - op = { - 'source_disk': target_disk, - 'source_path': rel_path, - 'dest_disk': dest_disk, - 'target_path': rel_path, # Keep same relative path - 'size': int(size) - } - plan['operations'].append(op) - - # Store in database - cursor.execute( - "INSERT INTO operations (source_path, target_path, operation_type, status) VALUES (%s, %s, %s, %s)", - (f"{target_disk}:{rel_path}", f"{dest_disk}:{rel_path}", 'move', 'pending') - ) - - conn.commit() - finally: - cursor.close() - conn.close() - - # Save plan to JSON - plan_file = f"migration_plan_{target_disk}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" - with open(plan_file, 'w') as f: - json.dump(plan, f, indent=2) - - logger.info(f"Plan created with {len(plan['operations'])} operations") - logger.info(f"Plan saved to {plan_file}") - - return plan - - def verify_operation(self, source: Path, dest: Path) -> bool: - """Verify file was copied correctly (size + optional checksum)""" - if not dest.exists(): - return False - - try: - source_stat = source.stat() - dest_stat = dest.stat() - - if source_stat.st_size != dest_stat.st_size: - return False - - # Optional: checksum verification for critical files - # if source_stat.st_size < 100*1024*1024: # Only for files < 100MB - # return self.file_checksum(source) == self.file_checksum(dest) - - return True - except Exception as e: - logger.error(f"Verification error: {e}") - return False - - @staticmethod - def file_checksum(path: Path) -> str: - """Calculate MD5 checksum of file""" - hash_md5 = hashlib.md5() - with open(path, "rb") as f: - for chunk in iter(lambda: f.read(4096), b""): - hash_md5.update(chunk) - return hash_md5.hexdigest() - - def execute_migration(self, plan_file: str, dry_run: bool = True): - """ - Execute migration plan - :param plan_file: Path to plan JSON file - :param dry_run: If True, only simulate operations - """ - logger.info(f"{'DRY RUN' if dry_run else 'EXECUTING'} migration from {plan_file}") - - with open(plan_file, 'r') as f: - plan = json.load(f) - - operations = plan['operations'] - logger.info(f"Processing {len(operations)} operations...") - - success_count = 0 - error_count = 0 - start_time = time.time() - - conn = self.get_connection() - cursor = conn.cursor() - - try: - for i, op in enumerate(operations, 1): - source_disk = op['source_disk'] - source_path = op['source_path'] - dest_disk = op['dest_disk'] - target_path = op['target_path'] - - source_full = Path(source_disk) / source_path - dest_full = Path(dest_disk) / target_path - - # Dynamic progress display - elapsed = time.time() - start_time - rate = i / elapsed if elapsed > 0 else 0 - eta = (len(operations) - i) / rate if rate > 0 else 0 - display_path = str(source_path) - if len(display_path) > 50: - display_path = '...' + display_path[-47:] - - print(f"\r[{i}/{len(operations)}] {success_count} OK, {error_count} ERR | {rate:.1f} files/s | ETA: {int(eta)}s | {display_path}", end='', flush=True) - - if dry_run: - # Simulate - if source_full.exists(): - success_count += 1 - else: - logger.warning(f"\n Source does not exist: {source_full}") - error_count += 1 - continue - - try: - # Create destination directory - dest_full.parent.mkdir(parents=True, exist_ok=True) - - # Move file (copy + verify + delete) - if source_full.exists(): - # Copy with metadata - shutil.copy2(source_full, dest_full) - - # Verify - if self.verify_operation(source_full, dest_full): - # Update database - cursor.execute( - "UPDATE files SET disk_label = %s, status = 'moved' WHERE path = %s AND disk_label = %s", - (dest_disk, source_path, source_disk) - ) - - # Safe delete (could be made optional) - # source_full.unlink() - - # Log operation as executed - cursor.execute( - "UPDATE operations SET executed = 1, executed_at = CURRENT_TIMESTAMP WHERE source_path = %s", - (f"{source_disk}:{source_path}",) - ) - - success_count += 1 - else: - raise Exception("Verification failed") - else: - logger.warning(f"\n Source missing: {source_full}") - error_count += 1 - - except Exception as e: - logger.error(f"\n Error processing {source_path}: {e}") - cursor.execute( - "UPDATE operations SET error = %s WHERE source_path = %s", - (str(e), f"{source_disk}:{source_path}") - ) - error_count += 1 - - # Commit every 10 operations - if i % 10 == 0: - conn.commit() - - conn.commit() - print() # New line after progress display - - finally: - cursor.close() - conn.close() - - logger.info(f"Migration complete: {success_count} success, {error_count} errors") - - if not dry_run and error_count == 0: - logger.info(f"✓ Disk {plan['target_disk']} is ready for Linux installation!") - logger.info(f" Remember to safely delete original files from {plan['target_disk']}") - - def run_deduplication(self, disk: Optional[str] = None, use_chunks: bool = True): - logger.info(f"Starting deduplication{' for disk ' + disk if disk else ''}") - - disk_mount_map = { - 'SMT': '/media/mike/SMT', - 'DISK1': '/media/mike/DISK1', - 'LLM': '/media/mike/LLM' - } - - conn = self.get_connection() - cursor = conn.cursor() - - def hash_file_local(file_path: Path) -> str: - hasher = hashlib.sha256() - with open(file_path, 'rb') as f: - while chunk := f.read(65536): - hasher.update(chunk) - return hasher.hexdigest() - - try: - if disk: - cursor.execute("SELECT path, size, disk_label FROM files WHERE disk_label = %s AND checksum IS NULL ORDER BY size DESC", (disk,)) - else: - cursor.execute("SELECT path, size, disk_label FROM files WHERE checksum IS NULL ORDER BY size DESC") - - files_to_process = cursor.fetchall() - total = len(files_to_process) - logger.info(f"Found {total} files to hash") - - processed = 0 - skipped = 0 - start_time = time.time() - batch = [] - - print(f"Phase 1: Computing checksums...") - - for idx, (path_str, size, disk_label) in enumerate(files_to_process, 1): - try: - mount_point = disk_mount_map.get(disk_label, disk_label) - full_path = Path(mount_point) / path_str if not Path(path_str).is_absolute() else Path(path_str) - - if not full_path.exists(): - skipped += 1 - if idx % 100 == 0: - elapsed = time.time() - start_time - rate = (processed + skipped) / elapsed if elapsed > 0 else 0 - remaining = (total - idx) / rate if rate > 0 else 0 - pct = 100 * idx / total - print(f"\r[{pct:5.1f}%] {processed:,}/{total:,} | {rate:.0f}/s | ETA: {int(remaining/60)}m{int(remaining%60):02d}s | Skip: {skipped:,}", end='', flush=True) - continue - - checksum = hash_file_local(full_path) - batch.append((checksum, path_str)) - - processed += 1 - if len(batch) >= 1000: - try: - cursor.executemany("UPDATE files SET checksum = %s WHERE path = %s", batch) - conn.commit() - batch.clear() - except Exception as e: - conn.rollback() - batch.clear() - print(f"\nBatch update failed: {e}") - - if idx % 100 == 0: - elapsed = time.time() - start_time - rate = (processed + skipped) / elapsed if elapsed > 0 else 0 - remaining = (total - idx) / rate if rate > 0 else 0 - pct = 100 * idx / total - print(f"\r[{pct:5.1f}%] {processed:,}/{total:,} | {rate:.0f}/s | ETA: {int(remaining/60)}m{int(remaining%60):02d}s | Skip: {skipped:,}", end='', flush=True) - - except Exception as e: - skipped += 1 - if idx <= 5: - print(f"\nDebug: {full_path} - {e}") - - if batch: - try: - cursor.executemany("UPDATE files SET checksum = %s WHERE path = %s", batch) - conn.commit() - except Exception as e: - conn.rollback() - print(f"\nFinal batch failed: {e}") - - print() - elapsed = time.time() - start_time - logger.info(f"Phase 1 done: {processed:,} files in {int(elapsed/60)}m{int(elapsed%60):02d}s ({skipped:,} skipped)") - - print("Phase 2: Finding duplicates...") - cursor.execute(""" - UPDATE files f1 SET duplicate_of = ( - SELECT MIN(path) FROM files f2 - WHERE f2.checksum = f1.checksum AND f2.path < f1.path - ) - WHERE checksum IS NOT NULL - """) - conn.commit() - - cursor.execute("SELECT COUNT(*) FROM files WHERE duplicate_of IS NOT NULL") - dup_count = cursor.fetchone()[0] - logger.info(f"Phase 2 done: Found {dup_count:,} duplicates") - - finally: - cursor.close() - conn.close() - - def plan_merge(self, sources: List[str], target: str, output_file: str, - filter_system: bool = False, network_target: str = None): - """Plan merge of multiple source disks to target with deduplication""" - logger.info(f"Planning merge: {', '.join(sources)} → {target or network_target}") - - if filter_system: - sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - from filters import GitignoreFilter - file_filter = GitignoreFilter() - logger.info("System/build file filtering enabled") - - conn = self.get_connection() - cursor = conn.cursor() - - try: - placeholders = ','.join(['%s'] * len(sources)) - cursor.execute(f""" - SELECT path, size, checksum, disk_label, duplicate_of - FROM files - WHERE disk_label IN ({placeholders}) - ORDER BY size DESC - """, tuple(sources)) - - files = cursor.fetchall() - total_files = len(files) - total_size = sum(int(f[1]) for f in files) - - unique_files = {} - duplicate_count = 0 - duplicate_size = 0 - filtered_count = 0 - filtered_size = 0 - - for path, size, checksum, disk_label, duplicate_of in files: - if filter_system and file_filter.should_exclude(path): - filtered_count += 1 - filtered_size += int(size) - continue - - if checksum and checksum in unique_files: - duplicate_count += 1 - duplicate_size += int(size) - else: - if checksum: - unique_files[checksum] = (path, int(size), disk_label) - - unique_count = len(unique_files) - unique_size = sum(f[1] for f in unique_files.values()) - - plan = { - 'sources': sources, - 'target': target or network_target, - 'network': network_target is not None, - 'total_files': total_files, - 'total_size': total_size, - 'unique_files': unique_count, - 'unique_size': unique_size, - 'duplicate_files': duplicate_count, - 'duplicate_size': duplicate_size, - 'filtered_files': filtered_count if filter_system else 0, - 'filtered_size': filtered_size if filter_system else 0, - 'space_saved': duplicate_size + (filtered_size if filter_system else 0), - 'operations': [] - } - - for checksum, (path, size, disk_label) in unique_files.items(): - plan['operations'].append({ - 'source_disk': disk_label, - 'source_path': path, - 'target_disk': target or network_target, - 'target_path': path, - 'size': size, - 'checksum': checksum - }) - - with open(output_file, 'w') as f: - json.dump(plan, f, indent=2) - - logger.info(f"Merge plan saved to {output_file}") - print(f"\n=== MERGE PLAN SUMMARY ===") - print(f"Sources: {', '.join(sources)}") - print(f"Target: {target or network_target}") - print(f"Total files: {total_files:,} ({self.format_size(total_size)})") - if filter_system: - print(f"Filtered (system/build): {filtered_count:,} ({self.format_size(filtered_size)})") - print(f"Unique files: {unique_count:,} ({self.format_size(unique_size)})") - print(f"Duplicates: {duplicate_count:,} ({self.format_size(duplicate_size)})") - print(f"Total space saved: {self.format_size(plan['space_saved'])}") - print(f"Space needed on target: {self.format_size(unique_size)}") - - finally: - cursor.close() - conn.close() - - def generate_report(self, format='text', show_duplicates=False, preview_merge=None): - """Generate status report""" - conn = self.get_connection() - cursor = conn.cursor() - - try: - if preview_merge: - # Load and display merge plan - with open(preview_merge, 'r') as f: - plan = json.load(f) - - print("\n=== MERGE PLAN PREVIEW ===") - print(f"Sources: {', '.join(plan['sources'])}") - print(f"Target: {plan['target']}") - print(f"Total files: {plan['total_files']:,} ({self.format_size(plan['total_size'])})") - print(f"Unique files: {plan['unique_files']:,} ({self.format_size(plan['unique_size'])})") - print(f"Duplicates: {plan['duplicate_files']:,} ({self.format_size(plan['duplicate_size'])})") - print(f"Space saved: {self.format_size(plan['space_saved'])}") - print(f"Space needed on target: {self.format_size(plan['unique_size'])}") - return - - cursor.execute(""" - SELECT status, COUNT(*), SUM(size) FROM files GROUP BY status - """) - - print("\n=== FILE MIGRATION REPORT ===") - for row in cursor.fetchall(): - status, count, size = row - print(f"{status:15}: {count:6} files, {self.format_size(int(size or 0))}") - - # Disk usage summary - cursor.execute(""" - SELECT disk_label, COUNT(*), SUM(size) FROM files GROUP BY disk_label - """) - - print("\n=== DISK USAGE ===") - for row in cursor.fetchall(): - disk, count, size = row - print(f"{disk:20}: {count:6} files, {self.format_size(int(size or 0))}") - - # Deduplication stats - cursor.execute(""" - SELECT COUNT(*), SUM(size) FROM files WHERE checksum IS NOT NULL - """) - hashed_count, hashed_size = cursor.fetchone() - - cursor.execute(""" - SELECT COUNT(*), SUM(size) FROM files WHERE duplicate_of IS NOT NULL - """) - dup_count, dup_size = cursor.fetchone() - - print("\n=== DEDUPLICATION STATS ===") - print(f"Files with checksums: {hashed_count or 0:6}") - print(f"Duplicate files: {dup_count or 0:6} ({self.format_size(int(dup_size or 0))})") - - if show_duplicates and dup_count: - print("\n=== DUPLICATE FILES ===") - cursor.execute(""" - SELECT path, size, duplicate_of FROM files - WHERE duplicate_of IS NOT NULL - ORDER BY size DESC - LIMIT 20 - """) - for path, size, dup_of in cursor.fetchall(): - print(f" {path} ({self.format_size(int(size))}) → {dup_of}") - - cursor.execute(""" - SELECT operation_type, executed, verified, COUNT(*) FROM operations GROUP BY operation_type, executed, verified - """) - - print("\n=== OPERATIONS REPORT ===") - for row in cursor.fetchall(): - op_type, executed, verified, count = row - status = "EXECUTED" if executed else "PENDING" - if verified: - status += "+VERIFIED" - print(f"{op_type:10} {status:15}: {count} operations") - - finally: - cursor.close() - conn.close() - - def profile_content(self, disk: Optional[str] = None, update_db: bool = False, limit: Optional[int] = None): - from content.profiler import ContentProfiler - - profiler = ContentProfiler() - disk_mount_map = {'SMT': '/media/mike/SMT', 'DISK1': '/media/mike/DISK1', 'LLM': '/media/mike/LLM'} - - conn = self.get_connection() - cursor = conn.cursor() - - try: - query = "SELECT path, size, disk_label FROM files WHERE 1=1" - params = [] - if disk: - query += " AND disk_label = %s" - params.append(disk) - if limit: - query += f" LIMIT {limit}" - - cursor.execute(query, params) - files = cursor.fetchall() - total = len(files) - logger.info(f"Profiling {total:,} files...") - - kind_stats = {} - processable = 0 - batch = [] - - for idx, (path, size, disk_label) in enumerate(files, 1): - mount_point = disk_mount_map.get(disk_label, disk_label) - full_path = Path(mount_point) / path if not Path(path).is_absolute() else Path(path) - - if not full_path.exists(): - continue - - profile = profiler.profile_file(full_path) - - if 'error' not in profile: - kind = profile['kind'] - if kind not in kind_stats: - kind_stats[kind] = {'count': 0, 'processable': 0} - kind_stats[kind]['count'] += 1 - if profile['processable']: - kind_stats[kind]['processable'] += 1 - processable += 1 - - if update_db: - profile_json = json.dumps(profile) - batch.append((kind, profile_json, path)) - - if len(batch) >= 500: - cursor.executemany( - "UPDATE files SET metadata = jsonb_set(COALESCE(metadata, '{}'::jsonb), '{profile}', %s::jsonb) WHERE path = %s", - [(pj, p) for k, pj, p in batch] - ) - conn.commit() - batch.clear() - - if idx % 100 == 0: - print(f"\rProfiled: {idx:,}/{total:,}", end='', flush=True) - - if update_db and batch: - cursor.executemany( - "UPDATE files SET metadata = jsonb_set(COALESCE(metadata, '{}'::jsonb), '{profile}', %s::jsonb) WHERE path = %s", - [(pj, p) for k, pj, p in batch] - ) - conn.commit() - - print() - print(f"\n=== CONTENT PROFILE SUMMARY ===") - print(f"Total files: {total:,}") - print(f"Processable: {processable:,}\n") - print(f"{'Kind':<15} {'Total':<10} {'Processable':<12} {'Extractor'}") - print("-" * 60) - for kind in sorted(kind_stats.keys()): - stats = kind_stats[kind] - extractor = profiler._suggest_extractor(kind, '') - print(f"{kind:<15} {stats['count']:<10,} {stats['processable']:<12,} {extractor or 'none'}") - - finally: - cursor.close() - conn.close() - - def extract_content(self, kind: Optional[str] = None, limit: int = 10): - from content.profiler import ContentProfiler - from content.extractors import ContentExtractor - - profiler = ContentProfiler() - extractor = ContentExtractor() - disk_mount_map = {'SMT': '/media/mike/SMT', 'DISK1': '/media/mike/DISK1', 'LLM': '/media/mike/LLM'} - - conn = self.get_connection() - cursor = conn.cursor() - - try: - query = "SELECT path, size, disk_label, metadata FROM files WHERE metadata->'profile'->>'processable' = 'true'" - params = [] - if kind: - query += " AND metadata->'profile'->>'kind' = %s" - params.append(kind) - query += f" LIMIT {limit}" - - cursor.execute(query, params) - files = cursor.fetchall() - - print(f"\n=== EXTRACTING CONTENT ===") - print(f"Processing {len(files)} files\n") - - for path, size, disk_label, metadata in files: - mount_point = disk_mount_map.get(disk_label, disk_label) - full_path = Path(mount_point) / path if not Path(path).is_absolute() else Path(path) - - if not full_path.exists(): - continue - - profile = metadata.get('profile', {}) if metadata else {} - extractor_type = profile.get('extractor') - - if not extractor_type: - continue - - print(f"Extracting: {path}") - print(f" Type: {profile.get('kind')} | Extractor: {extractor_type}") - - result = extractor.extract(full_path, extractor_type) - - if 'text' in result: - preview = result['text'][:200] - print(f" Preview: {preview}...") - elif 'pipeline' in result: - print(f" Pipeline: {' → '.join(result['pipeline'])}") - print(f" Status: {result.get('status', 'pending')}") - - print() - - finally: - cursor.close() - conn.close() - - def classify_files(self, disk: Optional[str] = None, update_db: bool = False): - from classification.classifier import FileClassifier - - classifier = FileClassifier() - conn = self.get_connection() - cursor = conn.cursor() - - try: - if disk: - cursor.execute("SELECT path, size, disk_label FROM files WHERE disk_label = %s", (disk,)) - else: - cursor.execute("SELECT path, size, disk_label FROM files") - - files = cursor.fetchall() - total = len(files) - logger.info(f"Classifying {total:,} files...") - - categories = {} - build_artifacts = 0 - batch = [] - - for idx, (path, size, disk_label) in enumerate(files, 1): - labels, category, is_build = classifier.classify_path(path, int(size)) - - if is_build: - build_artifacts += 1 - - if category not in categories: - categories[category] = {'count': 0, 'size': 0} - categories[category]['count'] += 1 - categories[category]['size'] += int(size) - - if update_db: - labels_str = ','.join(labels) - batch.append((category, labels_str, path)) - - if len(batch) >= 1000: - cursor.executemany("UPDATE files SET category = %s WHERE path = %s", [(cat, p) for cat, lbl, p in batch]) - conn.commit() - batch.clear() - - if idx % 1000 == 0: - print(f"\rClassified: {idx:,}/{total:,}", end='', flush=True) - - if update_db and batch: - cursor.executemany("UPDATE files SET category = %s WHERE path = %s", [(cat, p) for cat, lbl, p in batch]) - conn.commit() - - print() - print(f"\n=== CLASSIFICATION SUMMARY ===") - print(f"Total files: {total:,}") - print(f"Build artifacts: {build_artifacts:,}") - print(f"\nCategories:") - for category in sorted(categories.keys()): - info = categories[category] - print(f" {category:30}: {info['count']:8,} files, {self.format_size(info['size'])}") - - finally: - cursor.close() - conn.close() - - def review_migration(self, category: Optional[str] = None, show_build: bool = False): - from classification.classifier import FileClassifier - - classifier = FileClassifier() - conn = self.get_connection() - cursor = conn.cursor() - - try: - query = "SELECT path, size, category FROM files WHERE 1=1" - params = [] - - if category: - query += " AND category = %s" - params.append(category) - - if not show_build: - query += " AND (metadata->>'labels' IS NULL OR metadata->>'labels' NOT LIKE '%build-artifact%')" - - query += " ORDER BY category, size DESC LIMIT 100" - - cursor.execute(query, params) - files = cursor.fetchall() - - if not files: - print("No files found matching criteria") - return - - print(f"\n=== MIGRATION PREVIEW ===") - print(f"Showing {len(files)} files\n") - - current_category = None - for path, size, cat in files: - if cat != current_category: - current_category = cat - print(f"\n{cat}:") - - labels, suggested_cat, is_build = classifier.classify_path(path, int(size)) - target = classifier.suggest_target_path(path, suggested_cat, labels) - print(f" {path}") - print(f" → {target} ({self.format_size(int(size))})") - - finally: - cursor.close() - conn.close() - - @staticmethod - def format_size(size: int) -> str: - for unit in ['B', 'KB', 'MB', 'GB', 'TB']: - if size < 1024: - return f"{size:.1f}{unit}" - size /= 1024 - return f"{size:.1f}PB" - -def main(): - parser = argparse.ArgumentParser(description='Disk Reorganizer - Free up a disk for Linux dual-boot') - subparsers = parser.add_subparsers(dest='command', required=True) - - # Index command - index_parser = subparsers.add_parser('index', help='Index files on a disk') - index_parser.add_argument('disk_root', help='Root path of disk (e.g., D:\\\\)') - index_parser.add_argument('disk_name', help='Logical name for the disk') - - # Plan command - plan_parser = subparsers.add_parser('plan', help='Create migration plan') - plan_parser.add_argument('target_disk', help='Disk to free up') - plan_parser.add_argument('dest_disks', nargs='+', help='Destination disks') - - # Execute command - exec_parser = subparsers.add_parser('execute', help='Execute migration plan') - exec_parser.add_argument('plan_file', help='Path to plan JSON file') - exec_parser.add_argument('--dry-run', action='store_true', help='Simulate without actual file operations') - - # Dedupe command - dedupe_parser = subparsers.add_parser('dedupe', help='Deduplicate files and compute checksums') - dedupe_parser.add_argument('--disk', help='Optional: Only dedupe specific disk') - dedupe_parser.add_argument('--no-chunks', action='store_true', help='Disable chunk-level deduplication') - - # Merge command - merge_parser = subparsers.add_parser('merge', help='Plan multi-disk merge with deduplication') - merge_parser.add_argument('--sources', nargs='+', required=True, help='Source disks to merge') - merge_parser.add_argument('--target', required=True, help='Target disk') - merge_parser.add_argument('--output', default='merge_plan.json', help='Output plan file') - merge_parser.add_argument('--filter-system', action='store_true', help='Filter system/build files') - merge_parser.add_argument('--network', help='Network target (e.g., user@host:/path)') - - # Profile command - profile_parser = subparsers.add_parser('profile', help='Create content profiles (inventory + triage)') - profile_parser.add_argument('--disk', help='Profile specific disk') - profile_parser.add_argument('--update', action='store_true', help='Update database with profiles') - profile_parser.add_argument('--limit', type=int, help='Limit number of files') - - # Extract command - extract_parser = subparsers.add_parser('extract', help='Extract content from files') - extract_parser.add_argument('--kind', help='Extract specific kind (pdf, image, audio, video)') - extract_parser.add_argument('--limit', type=int, default=10, help='Limit extraction batch') - - # Classify command - classify_parser = subparsers.add_parser('classify', help='Classify files and suggest organization') - classify_parser.add_argument('--disk', help='Classify specific disk') - classify_parser.add_argument('--update', action='store_true', help='Update database with classifications') - - # Review command - review_parser = subparsers.add_parser('review', help='Review proposed migration structure') - review_parser.add_argument('--category', help='Review specific category') - review_parser.add_argument('--show-build', action='store_true', help='Include build artifacts') - - # Report command - report_parser = subparsers.add_parser('report', help='Show current status') - report_parser.add_argument('--format', choices=['text', 'json'], default='text', help='Report format') - report_parser.add_argument('--show-duplicates', action='store_true', help='Show duplicate files') - report_parser.add_argument('--preview-merge', help='Preview merge plan from file') - - args = parser.parse_args() - tool = DiskReorganizer() - - if args.command == 'index': - tool.index_disk(args.disk_root, args.disk_name) - - elif args.command == 'dedupe': - tool.run_deduplication(disk=args.disk, use_chunks=not args.no_chunks) - - elif args.command == 'merge': - tool.plan_merge(sources=args.sources, target=args.target, output_file=args.output, - filter_system=args.filter_system, network_target=args.network) - - elif args.command == 'plan': - plan = tool.plan_migration(args.target_disk, args.dest_disks) - if plan: - print(f"\nPlan generated: {plan['file_count']} files, {tool.format_size(plan['total_size'])}") - print(f"Destination disks: {', '.join(plan['destination_disks'])}") - - elif args.command == 'execute': - tool.execute_migration(args.plan_file, dry_run=args.dry_run) - - elif args.command == 'profile': - tool.profile_content(disk=args.disk, update_db=args.update, limit=args.limit) - - elif args.command == 'extract': - tool.extract_content(kind=args.kind, limit=args.limit) - - elif args.command == 'classify': - tool.classify_files(disk=args.disk, update_db=args.update) - - elif args.command == 'review': - tool.review_migration(category=args.category, show_build=args.show_build) - - elif args.command == 'report': - tool.generate_report(format=args.format, show_duplicates=args.show_duplicates, preview_merge=args.preview_merge) - -if __name__ == '__main__': - main() \ No newline at end of file +import os +import sys +from dataclasses import dataclass +import psycopg2 +import shutil +import hashlib +import argparse +import json +from pathlib import Path +from typing import List, Dict, Optional +from datetime import datetime +import logging +import time +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.FileHandler('disk_reorganizer.log'), logging.StreamHandler(sys.stdout)]) +logger = logging.getLogger(__name__) + +@dataclass +class FileRecord: + path: str + size: int + modified_time: float + disk_label: str + checksum: Optional[str] = None + status: str = 'indexed' + +class DiskReorganizer: + + def __init__(self, db_config: Dict=None): + if db_config is None: + db_config = {'host': os.getenv('DB_HOST', '192.168.1.159'), 'port': int(os.getenv('DB_PORT', 5432)), 'database': os.getenv('DB_NAME', 'disk_reorganizer_db'), 'user': os.getenv('DB_USER', 'disk_reorg_user'), 'password': os.getenv('DB_PASSWORD', 'heel-goed-wachtwoord')} + self.db_config = db_config + self.init_database() + + def get_connection(self): + return psycopg2.connect(**self.db_config) + + def init_database(self): + try: + conn = self.get_connection() + cursor = conn.cursor() + cursor.execute("\n SELECT table_name FROM information_schema.tables\n WHERE table_schema = 'public' AND table_name IN ('files', 'operations')\n ") + tables = cursor.fetchall() + if len(tables) < 2: + logger.error('Database tables not found! Please run setup_database.sh first.') + raise Exception('Database not properly initialized. Run setup_database.sh') + cursor.close() + conn.close() + logger.info('Database connection verified successfully') + except psycopg2.Error as e: + logger.error(f'Database connection failed: {e}') + raise + + def index_disk(self, disk_root: str, disk_name: str): + logger.info(f'Indexing disk: {disk_name} at {disk_root}') + disk_path = Path(disk_root) + if not disk_path.exists(): + logger.error(f'Disk path {disk_root} does not exist!') + return + files_count = 0 + total_size = 0 + start_time = time.time() + conn = self.get_connection() + cursor = conn.cursor() + try: + for root, dirs, files in os.walk(disk_path): + dirs[:] = [d for d in dirs if not d.startswith(('$', 'System Volume Information', 'Recovery'))] + for file in files: + try: + file_path = Path(root) / file + if not file_path.is_file(): + continue + stat = file_path.stat() + size = stat.st_size + mtime = datetime.fromtimestamp(stat.st_mtime) + rel_path = str(file_path.relative_to(disk_path)) + cursor.execute('\n INSERT INTO files (path, size, modified_time, disk_label, checksum, status)\n VALUES (%s, %s, %s, %s, %s, %s)\n ON CONFLICT (path) DO UPDATE SET\n size = EXCLUDED.size,\n modified_time = EXCLUDED.modified_time,\n disk_label = EXCLUDED.disk_label,\n status = EXCLUDED.status\n ', (rel_path, size, mtime, disk_name, None, 'indexed')) + files_count += 1 + total_size += size + if files_count % 100 == 0: + elapsed = time.time() - start_time + rate = files_count / elapsed if elapsed > 0 else 0 + display_path = str(file_path) + if len(display_path) > 60: + display_path = '...' + display_path[-57:] + print(f'\rIndexing: {files_count:,} files | {self.format_size(total_size)} | {rate:.0f} files/s | {display_path}', end='', flush=True) + if files_count % 1000 == 0: + conn.commit() + except Exception as e: + conn.rollback() + logger.warning(f'\nSkipping {file_path}: {e}') + continue + conn.commit() + print() + logger.info(f'Completed indexing {disk_name}: {files_count} files, {self.format_size(total_size)}') + finally: + cursor.close() + conn.close() + + def calculate_disk_usage(self) -> Dict[str, Dict]: + conn = self.get_connection() + cursor = conn.cursor() + try: + cursor.execute('\n SELECT disk_label, SUM(size) as total_size, COUNT(*) as file_count\n FROM files\n GROUP BY disk_label\n ') + usage = {} + for row in cursor.fetchall(): + disk = row[0] + size = int(row[1] or 0) + count = int(row[2]) + usage[disk] = {'size': size, 'count': count, 'formatted_size': self.format_size(size)} + return usage + finally: + cursor.close() + conn.close() + + def plan_migration(self, target_disk: str, destination_disks: List[str]) -> Dict: + logger.info(f'Planning migration to free up {target_disk}') + usage = self.calculate_disk_usage() + if target_disk not in usage: + logger.error(f'Target disk {target_disk} not found in index!') + return {} + conn = self.get_connection() + cursor = conn.cursor() + cursor.execute('SELECT path, size, modified_time FROM files WHERE disk_label = %s ORDER BY size DESC', (target_disk,)) + files_to_move = cursor.fetchall() + cursor.close() + conn.close() + target_disk_usage = usage[target_disk]['size'] + logger.info(f'Need to move {len(files_to_move)} files, {self.format_size(target_disk_usage)}') + dest_availability = [] + for disk in destination_disks: + if disk not in usage: + available = float('inf') + else: + available = float('inf') + dest_availability.append({'disk': disk, 'available': available, 'planned_usage': 0}) + plan = {'target_disk': target_disk, 'total_size': target_disk_usage, 'file_count': len(files_to_move), 'operations': [], 'destination_disks': destination_disks} + conn = self.get_connection() + cursor = conn.cursor() + try: + for file_info in files_to_move: + rel_path, size, mtime = file_info + dest_disk = destination_disks[len(plan['operations']) % len(destination_disks)] + op = {'source_disk': target_disk, 'source_path': rel_path, 'dest_disk': dest_disk, 'target_path': rel_path, 'size': int(size)} + plan['operations'].append(op) + cursor.execute('INSERT INTO operations (source_path, target_path, operation_type, status) VALUES (%s, %s, %s, %s)', (f'{target_disk}:{rel_path}', f'{dest_disk}:{rel_path}', 'move', 'pending')) + conn.commit() + finally: + cursor.close() + conn.close() + plan_file = f"migration_plan_{target_disk}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" + with open(plan_file, 'w') as f: + json.dump(plan, f, indent=2) + logger.info(f"Plan created with {len(plan['operations'])} operations") + logger.info(f'Plan saved to {plan_file}') + return plan + + def verify_operation(self, source: Path, dest: Path) -> bool: + if not dest.exists(): + return False + try: + source_stat = source.stat() + dest_stat = dest.stat() + if source_stat.st_size != dest_stat.st_size: + return False + return True + except Exception as e: + logger.error(f'Verification error: {e}') + return False + + @staticmethod + def file_checksum(path: Path) -> str: + hash_md5 = hashlib.md5() + with open(path, 'rb') as f: + for chunk in iter(lambda: f.read(4096), b''): + hash_md5.update(chunk) + return hash_md5.hexdigest() + + def execute_migration(self, plan_file: str, dry_run: bool=True): + logger.info(f"{('DRY RUN' if dry_run else 'EXECUTING')} migration from {plan_file}") + with open(plan_file, 'r') as f: + plan = json.load(f) + operations = plan['operations'] + logger.info(f'Processing {len(operations)} operations...') + success_count = 0 + error_count = 0 + start_time = time.time() + conn = self.get_connection() + cursor = conn.cursor() + try: + for i, op in enumerate(operations, 1): + source_disk = op['source_disk'] + source_path = op['source_path'] + dest_disk = op['dest_disk'] + target_path = op['target_path'] + source_full = Path(source_disk) / source_path + dest_full = Path(dest_disk) / target_path + elapsed = time.time() - start_time + rate = i / elapsed if elapsed > 0 else 0 + eta = (len(operations) - i) / rate if rate > 0 else 0 + display_path = str(source_path) + if len(display_path) > 50: + display_path = '...' + display_path[-47:] + print(f'\r[{i}/{len(operations)}] {success_count} OK, {error_count} ERR | {rate:.1f} files/s | ETA: {int(eta)}s | {display_path}', end='', flush=True) + if dry_run: + if source_full.exists(): + success_count += 1 + else: + logger.warning(f'\n Source does not exist: {source_full}') + error_count += 1 + continue + try: + dest_full.parent.mkdir(parents=True, exist_ok=True) + if source_full.exists(): + shutil.copy2(source_full, dest_full) + if self.verify_operation(source_full, dest_full): + cursor.execute("UPDATE files SET disk_label = %s, status = 'moved' WHERE path = %s AND disk_label = %s", (dest_disk, source_path, source_disk)) + cursor.execute('UPDATE operations SET executed = 1, executed_at = CURRENT_TIMESTAMP WHERE source_path = %s', (f'{source_disk}:{source_path}',)) + success_count += 1 + else: + raise Exception('Verification failed') + else: + logger.warning(f'\n Source missing: {source_full}') + error_count += 1 + except Exception as e: + logger.error(f'\n Error processing {source_path}: {e}') + cursor.execute('UPDATE operations SET error = %s WHERE source_path = %s', (str(e), f'{source_disk}:{source_path}')) + error_count += 1 + if i % 10 == 0: + conn.commit() + conn.commit() + print() + finally: + cursor.close() + conn.close() + logger.info(f'Migration complete: {success_count} success, {error_count} errors') + if not dry_run and error_count == 0: + logger.info(f"✓ Disk {plan['target_disk']} is ready for Linux installation!") + logger.info(f" Remember to safely delete original files from {plan['target_disk']}") + + def run_deduplication(self, disk: Optional[str]=None, use_chunks: bool=True): + logger.info(f"Starting deduplication{(' for disk ' + disk if disk else '')}") + disk_mount_map = {'SMT': '/media/mike/SMT', 'DISK1': '/media/mike/DISK1', 'LLM': '/media/mike/LLM'} + conn = self.get_connection() + cursor = conn.cursor() + + def hash_file_local(file_path: Path) -> str: + hasher = hashlib.sha256() + with open(file_path, 'rb') as f: + while (chunk := f.read(65536)): + hasher.update(chunk) + return hasher.hexdigest() + try: + if disk: + cursor.execute('SELECT path, size, disk_label FROM files WHERE disk_label = %s AND checksum IS NULL ORDER BY size DESC', (disk,)) + else: + cursor.execute('SELECT path, size, disk_label FROM files WHERE checksum IS NULL ORDER BY size DESC') + files_to_process = cursor.fetchall() + total = len(files_to_process) + logger.info(f'Found {total} files to hash') + processed = 0 + skipped = 0 + start_time = time.time() + batch = [] + print(f'Phase 1: Computing checksums...') + for idx, (path_str, size, disk_label) in enumerate(files_to_process, 1): + try: + mount_point = disk_mount_map.get(disk_label, disk_label) + full_path = Path(mount_point) / path_str if not Path(path_str).is_absolute() else Path(path_str) + if not full_path.exists(): + skipped += 1 + if idx % 100 == 0: + elapsed = time.time() - start_time + rate = (processed + skipped) / elapsed if elapsed > 0 else 0 + remaining = (total - idx) / rate if rate > 0 else 0 + pct = 100 * idx / total + print(f'\r[{pct:5.1f}%] {processed:,}/{total:,} | {rate:.0f}/s | ETA: {int(remaining / 60)}m{int(remaining % 60):02d}s | Skip: {skipped:,}', end='', flush=True) + continue + checksum = hash_file_local(full_path) + batch.append((checksum, path_str)) + processed += 1 + if len(batch) >= 1000: + try: + cursor.executemany('UPDATE files SET checksum = %s WHERE path = %s', batch) + conn.commit() + batch.clear() + except Exception as e: + conn.rollback() + batch.clear() + print(f'\nBatch update failed: {e}') + if idx % 100 == 0: + elapsed = time.time() - start_time + rate = (processed + skipped) / elapsed if elapsed > 0 else 0 + remaining = (total - idx) / rate if rate > 0 else 0 + pct = 100 * idx / total + print(f'\r[{pct:5.1f}%] {processed:,}/{total:,} | {rate:.0f}/s | ETA: {int(remaining / 60)}m{int(remaining % 60):02d}s | Skip: {skipped:,}', end='', flush=True) + except Exception as e: + skipped += 1 + if idx <= 5: + print(f'\nDebug: {full_path} - {e}') + if batch: + try: + cursor.executemany('UPDATE files SET checksum = %s WHERE path = %s', batch) + conn.commit() + except Exception as e: + conn.rollback() + print(f'\nFinal batch failed: {e}') + print() + elapsed = time.time() - start_time + logger.info(f'Phase 1 done: {processed:,} files in {int(elapsed / 60)}m{int(elapsed % 60):02d}s ({skipped:,} skipped)') + print('Phase 2: Finding duplicates...') + cursor.execute('\n UPDATE files f1 SET duplicate_of = (\n SELECT MIN(path) FROM files f2\n WHERE f2.checksum = f1.checksum AND f2.path < f1.path\n )\n WHERE checksum IS NOT NULL\n ') + conn.commit() + cursor.execute('SELECT COUNT(*) FROM files WHERE duplicate_of IS NOT NULL') + dup_count = cursor.fetchone()[0] + logger.info(f'Phase 2 done: Found {dup_count:,} duplicates') + finally: + cursor.close() + conn.close() + + def plan_merge(self, sources: List[str], target: str, output_file: str, filter_system: bool=False, network_target: str=None): + logger.info(f"Planning merge: {', '.join(sources)} → {target or network_target}") + if filter_system: + sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + from filters import GitignoreFilter + file_filter = GitignoreFilter() + logger.info('System/build file filtering enabled') + conn = self.get_connection() + cursor = conn.cursor() + try: + placeholders = ','.join(['%s'] * len(sources)) + cursor.execute(f'\n SELECT path, size, checksum, disk_label, duplicate_of\n FROM files\n WHERE disk_label IN ({placeholders})\n ORDER BY size DESC\n ', tuple(sources)) + files = cursor.fetchall() + total_files = len(files) + total_size = sum((int(f[1]) for f in files)) + unique_files = {} + duplicate_count = 0 + duplicate_size = 0 + filtered_count = 0 + filtered_size = 0 + for path, size, checksum, disk_label, duplicate_of in files: + if filter_system and file_filter.should_exclude(path): + filtered_count += 1 + filtered_size += int(size) + continue + if checksum and checksum in unique_files: + duplicate_count += 1 + duplicate_size += int(size) + elif checksum: + unique_files[checksum] = (path, int(size), disk_label) + unique_count = len(unique_files) + unique_size = sum((f[1] for f in unique_files.values())) + plan = {'sources': sources, 'target': target or network_target, 'network': network_target is not None, 'total_files': total_files, 'total_size': total_size, 'unique_files': unique_count, 'unique_size': unique_size, 'duplicate_files': duplicate_count, 'duplicate_size': duplicate_size, 'filtered_files': filtered_count if filter_system else 0, 'filtered_size': filtered_size if filter_system else 0, 'space_saved': duplicate_size + (filtered_size if filter_system else 0), 'operations': []} + for checksum, (path, size, disk_label) in unique_files.items(): + plan['operations'].append({'source_disk': disk_label, 'source_path': path, 'target_disk': target or network_target, 'target_path': path, 'size': size, 'checksum': checksum}) + with open(output_file, 'w') as f: + json.dump(plan, f, indent=2) + logger.info(f'Merge plan saved to {output_file}') + print(f'\n=== MERGE PLAN SUMMARY ===') + print(f"Sources: {', '.join(sources)}") + print(f'Target: {target or network_target}') + print(f'Total files: {total_files:,} ({self.format_size(total_size)})') + if filter_system: + print(f'Filtered (system/build): {filtered_count:,} ({self.format_size(filtered_size)})') + print(f'Unique files: {unique_count:,} ({self.format_size(unique_size)})') + print(f'Duplicates: {duplicate_count:,} ({self.format_size(duplicate_size)})') + print(f"Total space saved: {self.format_size(plan['space_saved'])}") + print(f'Space needed on target: {self.format_size(unique_size)}') + finally: + cursor.close() + conn.close() + + def generate_report(self, format='text', show_duplicates=False, preview_merge=None): + conn = self.get_connection() + cursor = conn.cursor() + try: + if preview_merge: + with open(preview_merge, 'r') as f: + plan = json.load(f) + print('\n=== MERGE PLAN PREVIEW ===') + print(f"Sources: {', '.join(plan['sources'])}") + print(f"Target: {plan['target']}") + print(f"Total files: {plan['total_files']:,} ({self.format_size(plan['total_size'])})") + print(f"Unique files: {plan['unique_files']:,} ({self.format_size(plan['unique_size'])})") + print(f"Duplicates: {plan['duplicate_files']:,} ({self.format_size(plan['duplicate_size'])})") + print(f"Space saved: {self.format_size(plan['space_saved'])}") + print(f"Space needed on target: {self.format_size(plan['unique_size'])}") + return + cursor.execute('\n SELECT status, COUNT(*), SUM(size) FROM files GROUP BY status\n ') + print('\n=== FILE MIGRATION REPORT ===') + for row in cursor.fetchall(): + status, count, size = row + print(f'{status:15}: {count:6} files, {self.format_size(int(size or 0))}') + cursor.execute('\n SELECT disk_label, COUNT(*), SUM(size) FROM files GROUP BY disk_label\n ') + print('\n=== DISK USAGE ===') + for row in cursor.fetchall(): + disk, count, size = row + print(f'{disk:20}: {count:6} files, {self.format_size(int(size or 0))}') + cursor.execute('\n SELECT COUNT(*), SUM(size) FROM files WHERE checksum IS NOT NULL\n ') + hashed_count, hashed_size = cursor.fetchone() + cursor.execute('\n SELECT COUNT(*), SUM(size) FROM files WHERE duplicate_of IS NOT NULL\n ') + dup_count, dup_size = cursor.fetchone() + print('\n=== DEDUPLICATION STATS ===') + print(f'Files with checksums: {hashed_count or 0:6}') + print(f'Duplicate files: {dup_count or 0:6} ({self.format_size(int(dup_size or 0))})') + if show_duplicates and dup_count: + print('\n=== DUPLICATE FILES ===') + cursor.execute('\n SELECT path, size, duplicate_of FROM files\n WHERE duplicate_of IS NOT NULL\n ORDER BY size DESC\n LIMIT 20\n ') + for path, size, dup_of in cursor.fetchall(): + print(f' {path} ({self.format_size(int(size))}) → {dup_of}') + cursor.execute('\n SELECT operation_type, executed, verified, COUNT(*) FROM operations GROUP BY operation_type, executed, verified\n ') + print('\n=== OPERATIONS REPORT ===') + for row in cursor.fetchall(): + op_type, executed, verified, count = row + status = 'EXECUTED' if executed else 'PENDING' + if verified: + status += '+VERIFIED' + print(f'{op_type:10} {status:15}: {count} operations') + finally: + cursor.close() + conn.close() + + def profile_content(self, disk: Optional[str]=None, update_db: bool=False, limit: Optional[int]=None): + from content.profiler import ContentProfiler + profiler = ContentProfiler() + disk_mount_map = {'SMT': '/media/mike/SMT', 'DISK1': '/media/mike/DISK1', 'LLM': '/media/mike/LLM'} + conn = self.get_connection() + cursor = conn.cursor() + try: + query = 'SELECT path, size, disk_label FROM files WHERE 1=1' + params = [] + if disk: + query += ' AND disk_label = %s' + params.append(disk) + if limit: + query += f' LIMIT {limit}' + cursor.execute(query, params) + files = cursor.fetchall() + total = len(files) + logger.info(f'Profiling {total:,} files...') + kind_stats = {} + processable = 0 + batch = [] + for idx, (path, size, disk_label) in enumerate(files, 1): + mount_point = disk_mount_map.get(disk_label, disk_label) + full_path = Path(mount_point) / path if not Path(path).is_absolute() else Path(path) + if not full_path.exists(): + continue + profile = profiler.profile_file(full_path) + if 'error' not in profile: + kind = profile['kind'] + if kind not in kind_stats: + kind_stats[kind] = {'count': 0, 'processable': 0} + kind_stats[kind]['count'] += 1 + if profile['processable']: + kind_stats[kind]['processable'] += 1 + processable += 1 + if update_db: + profile_json = json.dumps(profile) + batch.append((kind, profile_json, path)) + if len(batch) >= 500: + cursor.executemany("UPDATE files SET metadata = jsonb_set(COALESCE(metadata, '{}'::jsonb), '{profile}', %s::jsonb) WHERE path = %s", [(pj, p) for k, pj, p in batch]) + conn.commit() + batch.clear() + if idx % 100 == 0: + print(f'\rProfiled: {idx:,}/{total:,}', end='', flush=True) + if update_db and batch: + cursor.executemany("UPDATE files SET metadata = jsonb_set(COALESCE(metadata, '{}'::jsonb), '{profile}', %s::jsonb) WHERE path = %s", [(pj, p) for k, pj, p in batch]) + conn.commit() + print() + print(f'\n=== CONTENT PROFILE SUMMARY ===') + print(f'Total files: {total:,}') + print(f'Processable: {processable:,}\n') + print(f"{'Kind':<15} {'Total':<10} {'Processable':<12} {'Extractor'}") + print('-' * 60) + for kind in sorted(kind_stats.keys()): + stats = kind_stats[kind] + extractor = profiler._suggest_extractor(kind, '') + print(f"{kind:<15} {stats['count']:<10,} {stats['processable']:<12,} {extractor or 'none'}") + finally: + cursor.close() + conn.close() + + def extract_content(self, kind: Optional[str]=None, limit: int=10): + from content.profiler import ContentProfiler + from content.extractors import ContentExtractor + profiler = ContentProfiler() + extractor = ContentExtractor() + disk_mount_map = {'SMT': '/media/mike/SMT', 'DISK1': '/media/mike/DISK1', 'LLM': '/media/mike/LLM'} + conn = self.get_connection() + cursor = conn.cursor() + try: + query = "SELECT path, size, disk_label, metadata FROM files WHERE metadata->'profile'->>'processable' = 'true'" + params = [] + if kind: + query += " AND metadata->'profile'->>'kind' = %s" + params.append(kind) + query += f' LIMIT {limit}' + cursor.execute(query, params) + files = cursor.fetchall() + print(f'\n=== EXTRACTING CONTENT ===') + print(f'Processing {len(files)} files\n') + for path, size, disk_label, metadata in files: + mount_point = disk_mount_map.get(disk_label, disk_label) + full_path = Path(mount_point) / path if not Path(path).is_absolute() else Path(path) + if not full_path.exists(): + continue + profile = metadata.get('profile', {}) if metadata else {} + extractor_type = profile.get('extractor') + if not extractor_type: + continue + print(f'Extracting: {path}') + print(f" Type: {profile.get('kind')} | Extractor: {extractor_type}") + result = extractor.extract(full_path, extractor_type) + if 'text' in result: + preview = result['text'][:200] + print(f' Preview: {preview}...') + elif 'pipeline' in result: + print(f" Pipeline: {' → '.join(result['pipeline'])}") + print(f" Status: {result.get('status', 'pending')}") + print() + finally: + cursor.close() + conn.close() + + def classify_files(self, disk: Optional[str]=None, update_db: bool=False): + from classification.classifier import FileClassifier + classifier = FileClassifier() + conn = self.get_connection() + cursor = conn.cursor() + try: + if disk: + cursor.execute('SELECT path, size, disk_label FROM files WHERE disk_label = %s', (disk,)) + else: + cursor.execute('SELECT path, size, disk_label FROM files') + files = cursor.fetchall() + total = len(files) + logger.info(f'Classifying {total:,} files...') + categories = {} + build_artifacts = 0 + batch = [] + for idx, (path, size, disk_label) in enumerate(files, 1): + labels, category, is_build = classifier.classify_path(path, int(size)) + if is_build: + build_artifacts += 1 + if category not in categories: + categories[category] = {'count': 0, 'size': 0} + categories[category]['count'] += 1 + categories[category]['size'] += int(size) + if update_db: + labels_str = ','.join(labels) + batch.append((category, labels_str, path)) + if len(batch) >= 1000: + cursor.executemany('UPDATE files SET category = %s WHERE path = %s', [(cat, p) for cat, lbl, p in batch]) + conn.commit() + batch.clear() + if idx % 1000 == 0: + print(f'\rClassified: {idx:,}/{total:,}', end='', flush=True) + if update_db and batch: + cursor.executemany('UPDATE files SET category = %s WHERE path = %s', [(cat, p) for cat, lbl, p in batch]) + conn.commit() + print() + print(f'\n=== CLASSIFICATION SUMMARY ===') + print(f'Total files: {total:,}') + print(f'Build artifacts: {build_artifacts:,}') + print(f'\nCategories:') + for category in sorted(categories.keys()): + info = categories[category] + print(f" {category:30}: {info['count']:8,} files, {self.format_size(info['size'])}") + finally: + cursor.close() + conn.close() + + def review_migration(self, category: Optional[str]=None, show_build: bool=False): + from classification.classifier import FileClassifier + classifier = FileClassifier() + conn = self.get_connection() + cursor = conn.cursor() + try: + query = 'SELECT path, size, category FROM files WHERE 1=1' + params = [] + if category: + query += ' AND category = %s' + params.append(category) + if not show_build: + query += " AND (metadata->>'labels' IS NULL OR metadata->>'labels' NOT LIKE '%build-artifact%')" + query += ' ORDER BY category, size DESC LIMIT 100' + cursor.execute(query, params) + files = cursor.fetchall() + if not files: + print('No files found matching criteria') + return + print(f'\n=== MIGRATION PREVIEW ===') + print(f'Showing {len(files)} files\n') + current_category = None + for path, size, cat in files: + if cat != current_category: + current_category = cat + print(f'\n{cat}:') + labels, suggested_cat, is_build = classifier.classify_path(path, int(size)) + target = classifier.suggest_target_path(path, suggested_cat, labels) + print(f' {path}') + print(f' → {target} ({self.format_size(int(size))})') + finally: + cursor.close() + conn.close() + + @staticmethod + def format_size(size: int) -> str: + for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + if size < 1024: + return f'{size:.1f}{unit}' + size /= 1024 + return f'{size:.1f}PB' + +def main(): + parser = argparse.ArgumentParser(description='Disk Reorganizer - Free up a disk for Linux dual-boot') + subparsers = parser.add_subparsers(dest='command', required=True) + index_parser = subparsers.add_parser('index', help='Index files on a disk') + index_parser.add_argument('disk_root', help='Root path of disk (e.g., D:\\\\)') + index_parser.add_argument('disk_name', help='Logical name for the disk') + plan_parser = subparsers.add_parser('plan', help='Create migration plan') + plan_parser.add_argument('target_disk', help='Disk to free up') + plan_parser.add_argument('dest_disks', nargs='+', help='Destination disks') + exec_parser = subparsers.add_parser('execute', help='Execute migration plan') + exec_parser.add_argument('plan_file', help='Path to plan JSON file') + exec_parser.add_argument('--dry-run', action='store_true', help='Simulate without actual file operations') + dedupe_parser = subparsers.add_parser('dedupe', help='Deduplicate files and compute checksums') + dedupe_parser.add_argument('--disk', help='Optional: Only dedupe specific disk') + dedupe_parser.add_argument('--no-chunks', action='store_true', help='Disable chunk-level deduplication') + merge_parser = subparsers.add_parser('merge', help='Plan multi-disk merge with deduplication') + merge_parser.add_argument('--sources', nargs='+', required=True, help='Source disks to merge') + merge_parser.add_argument('--target', required=True, help='Target disk') + merge_parser.add_argument('--output', default='merge_plan.json', help='Output plan file') + merge_parser.add_argument('--filter-system', action='store_true', help='Filter system/build files') + merge_parser.add_argument('--network', help='Network target (e.g., user@host:/path)') + profile_parser = subparsers.add_parser('profile', help='Create content profiles (inventory + triage)') + profile_parser.add_argument('--disk', help='Profile specific disk') + profile_parser.add_argument('--update', action='store_true', help='Update database with profiles') + profile_parser.add_argument('--limit', type=int, help='Limit number of files') + extract_parser = subparsers.add_parser('extract', help='Extract content from files') + extract_parser.add_argument('--kind', help='Extract specific kind (pdf, image, audio, video)') + extract_parser.add_argument('--limit', type=int, default=10, help='Limit extraction batch') + classify_parser = subparsers.add_parser('classify', help='Classify files and suggest organization') + classify_parser.add_argument('--disk', help='Classify specific disk') + classify_parser.add_argument('--update', action='store_true', help='Update database with classifications') + review_parser = subparsers.add_parser('review', help='Review proposed migration structure') + review_parser.add_argument('--category', help='Review specific category') + review_parser.add_argument('--show-build', action='store_true', help='Include build artifacts') + report_parser = subparsers.add_parser('report', help='Show current status') + report_parser.add_argument('--format', choices=['text', 'json'], default='text', help='Report format') + report_parser.add_argument('--show-duplicates', action='store_true', help='Show duplicate files') + report_parser.add_argument('--preview-merge', help='Preview merge plan from file') + args = parser.parse_args() + tool = DiskReorganizer() + if args.command == 'index': + tool.index_disk(args.disk_root, args.disk_name) + elif args.command == 'dedupe': + tool.run_deduplication(disk=args.disk, use_chunks=not args.no_chunks) + elif args.command == 'merge': + tool.plan_merge(sources=args.sources, target=args.target, output_file=args.output, filter_system=args.filter_system, network_target=args.network) + elif args.command == 'plan': + plan = tool.plan_migration(args.target_disk, args.dest_disks) + if plan: + print(f"\nPlan generated: {plan['file_count']} files, {tool.format_size(plan['total_size'])}") + print(f"Destination disks: {', '.join(plan['destination_disks'])}") + elif args.command == 'execute': + tool.execute_migration(args.plan_file, dry_run=args.dry_run) + elif args.command == 'profile': + tool.profile_content(disk=args.disk, update_db=args.update, limit=args.limit) + elif args.command == 'extract': + tool.extract_content(kind=args.kind, limit=args.limit) + elif args.command == 'classify': + tool.classify_files(disk=args.disk, update_db=args.update) + elif args.command == 'review': + tool.review_migration(category=args.category, show_build=args.show_build) + elif args.command == 'report': + tool.generate_report(format=args.format, show_duplicates=args.show_duplicates, preview_merge=args.preview_merge) +if __name__ == '__main__': + main()