From 7ce8c8c73d2742e2d8617f52f3c569db72429840 Mon Sep 17 00:00:00 2001 From: mike Date: Sat, 13 Dec 2025 12:00:34 +0100 Subject: [PATCH] clean up code --- app/deduplication/__init__.py | 21 +- app/deduplication/_protocols.py | 1 + app/deduplication/chunker.py | 162 ++------------- app/deduplication/engine.py | 272 ++++---------------------- app/deduplication/store.py | 290 +++------------------------ app/migration/__init__.py | 28 +-- app/migration/_protocols.py | 85 +------- app/migration/copy.py | 175 ++--------------- app/migration/engine.py | 337 ++++---------------------------- app/migration/hardlink.py | 258 ++---------------------- 10 files changed, 158 insertions(+), 1471 deletions(-) diff --git a/app/deduplication/__init__.py b/app/deduplication/__init__.py index 9d2e261..264efca 100644 --- a/app/deduplication/__init__.py +++ b/app/deduplication/__init__.py @@ -1,21 +1,4 @@ -"""Deduplication package exports""" -from .chunker import ( - RabinChunker, - SimpleChunker, - hash_chunk, - hash_file, - compute_file_signature -) +from .chunker import RabinChunker, SimpleChunker, hash_chunk, hash_file, compute_file_signature from .store import HashStore, MemoryHashStore from .engine import DeduplicationEngine - -__all__ = [ - 'RabinChunker', - 'SimpleChunker', - 'hash_chunk', - 'hash_file', - 'compute_file_signature', - 'HashStore', - 'MemoryHashStore', - 'DeduplicationEngine', -] +__all__ = ['RabinChunker', 'SimpleChunker', 'hash_chunk', 'hash_file', 'compute_file_signature', 'HashStore', 'MemoryHashStore', 'DeduplicationEngine'] diff --git a/app/deduplication/_protocols.py b/app/deduplication/_protocols.py index e69de29..8b13789 100644 --- a/app/deduplication/_protocols.py +++ b/app/deduplication/_protocols.py @@ -0,0 +1 @@ + diff --git a/app/deduplication/chunker.py b/app/deduplication/chunker.py index 63b5554..e00d966 100644 --- a/app/deduplication/chunker.py +++ b/app/deduplication/chunker.py @@ -1,75 +1,29 @@ -"""Rabin fingerprint chunker for content-defined chunking""" import hashlib from pathlib import Path from typing import Iterator, Optional - class RabinChunker: - """Content-defined chunking using Rabin fingerprinting - Uses a rolling hash to identify chunk boundaries based on content, - allowing for efficient deduplication even when data is modified. - """ - - def __init__( - self, - avg_chunk_size: int = 8192, - min_chunk_size: Optional[int] = None, - max_chunk_size: Optional[int] = None, - window_size: int = 48 - ): - """Initialize Rabin chunker - - Args: - avg_chunk_size: Target average chunk size in bytes - min_chunk_size: Minimum chunk size (default: avg_chunk_size // 4) - max_chunk_size: Maximum chunk size (default: avg_chunk_size * 8) - window_size: Rolling hash window size - """ + def __init__(self, avg_chunk_size: int=8192, min_chunk_size: Optional[int]=None, max_chunk_size: Optional[int]=None, window_size: int=48): self.avg_chunk_size = avg_chunk_size - self.min_chunk_size = min_chunk_size or (avg_chunk_size // 4) - self.max_chunk_size = max_chunk_size or (avg_chunk_size * 8) + self.min_chunk_size = min_chunk_size or avg_chunk_size // 4 + self.max_chunk_size = max_chunk_size or avg_chunk_size * 8 self.window_size = window_size - - # Calculate mask for boundary detection - # For avg_chunk_size, we want boundaries at 1/avg_chunk_size probability bits = 0 size = avg_chunk_size while size > 1: bits += 1 size >>= 1 self.mask = (1 << bits) - 1 + self.poly = 17349423945073011 - # Polynomial for rolling hash (prime number) - self.poly = 0x3DA3358B4DC173 - - def chunk_file(self, file_path: Path, chunk_size: Optional[int] = None) -> Iterator[bytes]: - """Chunk a file using Rabin fingerprinting - - Args: - file_path: Path to file to chunk - chunk_size: If provided, use fixed-size chunking instead - - Yields: - Chunk data as bytes - """ + def chunk_file(self, file_path: Path, chunk_size: Optional[int]=None) -> Iterator[bytes]: if chunk_size: - # Use fixed-size chunking yield from self._chunk_fixed(file_path, chunk_size) else: - # Use content-defined chunking yield from self._chunk_rabin(file_path) def _chunk_fixed(self, file_path: Path, chunk_size: int) -> Iterator[bytes]: - """Fixed-size chunking - - Args: - file_path: Path to file - chunk_size: Chunk size in bytes - - Yields: - Fixed-size chunks - """ with open(file_path, 'rb') as f: while True: chunk = f.read(chunk_size) @@ -78,46 +32,22 @@ class RabinChunker: yield chunk def _chunk_rabin(self, file_path: Path) -> Iterator[bytes]: - """Content-defined chunking using Rabin fingerprinting - - Args: - file_path: Path to file - - Yields: - Variable-size chunks based on content - """ with open(file_path, 'rb') as f: chunk_data = bytearray() window = bytearray() hash_value = 0 - while True: byte = f.read(1) if not byte: - # End of file - yield remaining data if chunk_data: yield bytes(chunk_data) break - chunk_data.extend(byte) window.extend(byte) - - # Maintain window size if len(window) > self.window_size: window.pop(0) - - # Update rolling hash hash_value = self._rolling_hash(window) - - # Check if we should create a boundary - should_break = ( - len(chunk_data) >= self.min_chunk_size and - ( - (hash_value & self.mask) == 0 or - len(chunk_data) >= self.max_chunk_size - ) - ) - + should_break = len(chunk_data) >= self.min_chunk_size and (hash_value & self.mask == 0 or len(chunk_data) >= self.max_chunk_size) if should_break: yield bytes(chunk_data) chunk_data = bytearray() @@ -125,40 +55,17 @@ class RabinChunker: hash_value = 0 def _rolling_hash(self, window: bytearray) -> int: - """Calculate rolling hash for window - - Args: - window: Byte window - - Returns: - Hash value - """ hash_value = 0 for byte in window: - hash_value = ((hash_value << 1) + byte) & 0xFFFFFFFFFFFFFFFF + hash_value = (hash_value << 1) + byte & 18446744073709551615 return hash_value - class SimpleChunker: - """Simple fixed-size chunker for comparison""" - def __init__(self, chunk_size: int = 8192): - """Initialize simple chunker - - Args: - chunk_size: Fixed chunk size in bytes - """ + def __init__(self, chunk_size: int=8192): self.chunk_size = chunk_size def chunk_file(self, file_path: Path) -> Iterator[bytes]: - """Chunk file into fixed-size pieces - - Args: - file_path: Path to file - - Yields: - Fixed-size chunks - """ with open(file_path, 'rb') as f: while True: chunk = f.read(self.chunk_size) @@ -166,76 +73,31 @@ class SimpleChunker: break yield chunk - -def hash_chunk(chunk: bytes, algorithm: str = 'sha256') -> str: - """Hash a chunk of data - - Args: - chunk: Chunk data - algorithm: Hash algorithm (default: sha256) - - Returns: - Hex digest of hash - """ +def hash_chunk(chunk: bytes, algorithm: str='sha256') -> str: hasher = hashlib.new(algorithm) hasher.update(chunk) return hasher.hexdigest() - -def hash_file(file_path: Path, algorithm: str = 'sha256', chunk_size: int = 65536) -> str: - """Hash entire file - - Args: - file_path: Path to file - algorithm: Hash algorithm (default: sha256) - chunk_size: Size of chunks to read - - Returns: - Hex digest of file hash - """ +def hash_file(file_path: Path, algorithm: str='sha256', chunk_size: int=65536) -> str: hasher = hashlib.new(algorithm) - with open(file_path, 'rb') as f: while True: chunk = f.read(chunk_size) if not chunk: break hasher.update(chunk) - return hasher.hexdigest() - -def compute_file_signature( - file_path: Path, - use_rabin: bool = True, - avg_chunk_size: int = 8192 -) -> tuple[str, list[str]]: - """Compute file signature with chunk hashes - - Args: - file_path: Path to file - use_rabin: Whether to use Rabin chunking (vs fixed-size) - avg_chunk_size: Average chunk size for Rabin or fixed size - - Returns: - Tuple of (file_hash, list of chunk hashes) - """ +def compute_file_signature(file_path: Path, use_rabin: bool=True, avg_chunk_size: int=8192) -> tuple[str, list[str]]: if use_rabin: chunker = RabinChunker(avg_chunk_size=avg_chunk_size) else: chunker = SimpleChunker(chunk_size=avg_chunk_size) - chunk_hashes = [] file_hasher = hashlib.sha256() - for chunk in chunker.chunk_file(file_path): - # Hash individual chunk chunk_hash = hash_chunk(chunk) chunk_hashes.append(chunk_hash) - - # Update file hash file_hasher.update(chunk) - file_hash = file_hasher.hexdigest() - - return file_hash, chunk_hashes + return (file_hash, chunk_hashes) diff --git a/app/deduplication/engine.py b/app/deduplication/engine.py index c80d51c..2771cad 100644 --- a/app/deduplication/engine.py +++ b/app/deduplication/engine.py @@ -1,32 +1,16 @@ -"""Deduplication engine""" from pathlib import Path from typing import Optional, Callable from concurrent.futures import ThreadPoolExecutor, as_completed import psycopg2 - from .chunker import compute_file_signature, hash_file from .store import HashStore from ..shared.models import FileRecord, ProcessingStats from ..shared.config import DatabaseConfig, ProcessingConfig from ..shared.logger import ProgressLogger - class DeduplicationEngine: - """Engine for deduplicating files""" - def __init__( - self, - db_config: DatabaseConfig, - processing_config: ProcessingConfig, - logger: ProgressLogger - ): - """Initialize deduplication engine - - Args: - db_config: Database configuration - processing_config: Processing configuration - logger: Progress logger - """ + def __init__(self, db_config: DatabaseConfig, processing_config: ProcessingConfig, logger: ProgressLogger): self.db_config = db_config self.processing_config = processing_config self.logger = logger @@ -34,320 +18,130 @@ class DeduplicationEngine: self._connection = None def _get_connection(self): - """Get or create database connection""" if self._connection is None or self._connection.closed: - self._connection = psycopg2.connect( - host=self.db_config.host, - port=self.db_config.port, - database=self.db_config.database, - user=self.db_config.user, - password=self.db_config.password - ) + self._connection = psycopg2.connect(host=self.db_config.host, port=self.db_config.port, database=self.db_config.database, user=self.db_config.user, password=self.db_config.password) return self._connection - def deduplicate_all( - self, - disk: Optional[str] = None, - use_chunks: bool = True, - progress_callback: Optional[Callable[[int, int, ProcessingStats], None]] = None - ) -> ProcessingStats: - """Deduplicate all files in database - - Args: - disk: Optional disk filter - use_chunks: Whether to use chunk-level deduplication - progress_callback: Optional callback for progress updates - - Returns: - ProcessingStats with deduplication statistics - """ - self.logger.section("Starting Deduplication") - + def deduplicate_all(self, disk: Optional[str]=None, use_chunks: bool=True, progress_callback: Optional[Callable[[int, int, ProcessingStats], None]]=None) -> ProcessingStats: + self.logger.section('Starting Deduplication') conn = self._get_connection() cursor = conn.cursor() - - # Get files without checksums if disk: - cursor.execute(""" - SELECT path, size - FROM files - WHERE disk_label = %s AND checksum IS NULL - ORDER BY size DESC - """, (disk,)) + cursor.execute('\n SELECT path, size\n FROM files\n WHERE disk_label = %s AND checksum IS NULL\n ORDER BY size DESC\n ', (disk,)) else: - cursor.execute(""" - SELECT path, size - FROM files - WHERE checksum IS NULL - ORDER BY size DESC - """) - + cursor.execute('\n SELECT path, size\n FROM files\n WHERE checksum IS NULL\n ORDER BY size DESC\n ') files_to_process = cursor.fetchall() total_files = len(files_to_process) - - self.logger.info(f"Found {total_files} files to process") - + self.logger.info(f'Found {total_files} files to process') stats = ProcessingStats() - - # Process files with thread pool with ThreadPoolExecutor(max_workers=self.processing_config.parallel_workers) as executor: futures = {} - for path_str, size in files_to_process: path = Path(path_str) future = executor.submit(self._process_file, path, use_chunks) futures[future] = (path, size) - - # Process completed futures for future in as_completed(futures): path, size = futures[future] - try: checksum, duplicate_of = future.result() - if checksum: - # Update database - cursor.execute(""" - UPDATE files - SET checksum = %s, duplicate_of = %s - WHERE path = %s - """, (checksum, duplicate_of, str(path))) - + cursor.execute('\n UPDATE files\n SET checksum = %s, duplicate_of = %s\n WHERE path = %s\n ', (checksum, duplicate_of, str(path))) stats.files_succeeded += 1 stats.bytes_processed += size - stats.files_processed += 1 - - # Commit periodically if stats.files_processed % self.processing_config.commit_interval == 0: conn.commit() - - # Progress callback if progress_callback: progress_callback(stats.files_processed, total_files, stats) - - # Log progress - self.logger.progress( - stats.files_processed, - total_files, - prefix="Files processed", - bytes_processed=stats.bytes_processed, - elapsed_seconds=stats.elapsed_seconds - ) - + self.logger.progress(stats.files_processed, total_files, prefix='Files processed', bytes_processed=stats.bytes_processed, elapsed_seconds=stats.elapsed_seconds) except Exception as e: - self.logger.warning(f"Failed to process {path}: {e}") + self.logger.warning(f'Failed to process {path}: {e}') stats.files_failed += 1 stats.files_processed += 1 - - # Final commit conn.commit() cursor.close() - - self.logger.info( - f"Deduplication complete: {stats.files_succeeded}/{total_files} files, " - f"{stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s" - ) - + self.logger.info(f'Deduplication complete: {stats.files_succeeded}/{total_files} files, {stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s') return stats - def _process_file( - self, - path: Path, - use_chunks: bool - ) -> tuple[Optional[str], Optional[str]]: - """Process a single file for deduplication - - Args: - path: Path to file - use_chunks: Whether to use chunk-level deduplication - - Returns: - Tuple of (checksum, duplicate_of_path) - """ + def _process_file(self, path: Path, use_chunks: bool) -> tuple[Optional[str], Optional[str]]: if not path.exists(): - return None, None - + return (None, None) try: if use_chunks: - # Compute file signature with chunks - checksum, chunk_hashes = compute_file_signature( - path, - use_rabin=True, - avg_chunk_size=self.processing_config.chunk_size - ) + checksum, chunk_hashes = compute_file_signature(path, use_rabin=True, avg_chunk_size=self.processing_config.chunk_size) else: - # Just compute file hash - checksum = hash_file( - path, - algorithm=self.processing_config.hash_algorithm - ) + checksum = hash_file(path, algorithm=self.processing_config.hash_algorithm) chunk_hashes = None - - # Check if hash exists if self.hash_store.exists(checksum): - # Duplicate found canonical_path = self.hash_store.get_canonical(checksum) - return checksum, canonical_path + return (checksum, canonical_path) else: - # New unique file size = path.stat().st_size - self.hash_store.store_canonical( - checksum, - path, - size, - chunk_hashes - ) - return checksum, None - + self.hash_store.store_canonical(checksum, path, size, chunk_hashes) + return (checksum, None) except Exception as e: - self.logger.debug(f"Error processing {path}: {e}") + self.logger.debug(f'Error processing {path}: {e}') raise - def find_duplicates( - self, - disk: Optional[str] = None - ) -> dict[str, list[str]]: - """Find all duplicate files - - Args: - disk: Optional disk filter - - Returns: - Dictionary mapping canonical path to list of duplicate paths - """ - self.logger.subsection("Finding Duplicates") - + def find_duplicates(self, disk: Optional[str]=None) -> dict[str, list[str]]: + self.logger.subsection('Finding Duplicates') conn = self._get_connection() cursor = conn.cursor() - - # Query for duplicates if disk: - cursor.execute(""" - SELECT checksum, array_agg(path ORDER BY path) as paths - FROM files - WHERE disk_label = %s AND checksum IS NOT NULL - GROUP BY checksum - HAVING COUNT(*) > 1 - """, (disk,)) + cursor.execute('\n SELECT checksum, array_agg(path ORDER BY path) as paths\n FROM files\n WHERE disk_label = %s AND checksum IS NOT NULL\n GROUP BY checksum\n HAVING COUNT(*) > 1\n ', (disk,)) else: - cursor.execute(""" - SELECT checksum, array_agg(path ORDER BY path) as paths - FROM files - WHERE checksum IS NOT NULL - GROUP BY checksum - HAVING COUNT(*) > 1 - """) - + cursor.execute('\n SELECT checksum, array_agg(path ORDER BY path) as paths\n FROM files\n WHERE checksum IS NOT NULL\n GROUP BY checksum\n HAVING COUNT(*) > 1\n ') duplicates = {} for checksum, paths in cursor.fetchall(): canonical = paths[0] duplicates[canonical] = paths[1:] - cursor.close() - - self.logger.info(f"Found {len(duplicates)} sets of duplicates") - + self.logger.info(f'Found {len(duplicates)} sets of duplicates') return duplicates def get_deduplication_stats(self) -> dict: - """Get deduplication statistics - - Returns: - Dictionary with statistics - """ conn = self._get_connection() cursor = conn.cursor() - stats = {} - - # Total files - cursor.execute("SELECT COUNT(*) FROM files WHERE checksum IS NOT NULL") + cursor.execute('SELECT COUNT(*) FROM files WHERE checksum IS NOT NULL') stats['total_files'] = cursor.fetchone()[0] - - # Unique files - cursor.execute("SELECT COUNT(DISTINCT checksum) FROM files WHERE checksum IS NOT NULL") + cursor.execute('SELECT COUNT(DISTINCT checksum) FROM files WHERE checksum IS NOT NULL') stats['unique_files'] = cursor.fetchone()[0] - - # Duplicate files stats['duplicate_files'] = stats['total_files'] - stats['unique_files'] - - # Total size - cursor.execute("SELECT COALESCE(SUM(size), 0) FROM files WHERE checksum IS NOT NULL") + cursor.execute('SELECT COALESCE(SUM(size), 0) FROM files WHERE checksum IS NOT NULL') stats['total_size'] = cursor.fetchone()[0] - - # Unique size - cursor.execute(""" - SELECT COALESCE(SUM(size), 0) - FROM ( - SELECT DISTINCT ON (checksum) size - FROM files - WHERE checksum IS NOT NULL - ) AS unique_files - """) + cursor.execute('\n SELECT COALESCE(SUM(size), 0)\n FROM (\n SELECT DISTINCT ON (checksum) size\n FROM files\n WHERE checksum IS NOT NULL\n ) AS unique_files\n ') stats['unique_size'] = cursor.fetchone()[0] - - # Wasted space stats['wasted_space'] = stats['total_size'] - stats['unique_size'] - - # Deduplication ratio if stats['total_size'] > 0: stats['dedup_ratio'] = stats['unique_size'] / stats['total_size'] else: stats['dedup_ratio'] = 1.0 - - # Space saved percentage if stats['total_size'] > 0: - stats['space_saved_percent'] = (stats['wasted_space'] / stats['total_size']) * 100 + stats['space_saved_percent'] = stats['wasted_space'] / stats['total_size'] * 100 else: stats['space_saved_percent'] = 0.0 - cursor.close() - return stats def mark_canonical_files(self) -> int: - """Mark canonical (first occurrence) files in database - - Returns: - Number of canonical files marked - """ - self.logger.subsection("Marking Canonical Files") - + self.logger.subsection('Marking Canonical Files') conn = self._get_connection() cursor = conn.cursor() - - # Find first occurrence of each checksum and mark as canonical - cursor.execute(""" - WITH canonical AS ( - SELECT DISTINCT ON (checksum) path, checksum - FROM files - WHERE checksum IS NOT NULL - ORDER BY checksum, path - ) - UPDATE files - SET duplicate_of = NULL - WHERE path IN (SELECT path FROM canonical) - """) - + cursor.execute('\n WITH canonical AS (\n SELECT DISTINCT ON (checksum) path, checksum\n FROM files\n WHERE checksum IS NOT NULL\n ORDER BY checksum, path\n )\n UPDATE files\n SET duplicate_of = NULL\n WHERE path IN (SELECT path FROM canonical)\n ') count = cursor.rowcount conn.commit() cursor.close() - - self.logger.info(f"Marked {count} canonical files") - + self.logger.info(f'Marked {count} canonical files') return count def close(self): - """Close connections""" self.hash_store.close() - if self._connection and not self._connection.closed: + if self._connection and (not self._connection.closed): self._connection.close() def __enter__(self): - """Context manager entry""" return self def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager exit""" self.close() diff --git a/app/deduplication/store.py b/app/deduplication/store.py index 117a580..82532b8 100644 --- a/app/deduplication/store.py +++ b/app/deduplication/store.py @@ -1,412 +1,174 @@ -"""Hash store for deduplication with optional Redis support""" from typing import Optional, Dict, Set from pathlib import Path import psycopg2 from psycopg2.extras import execute_batch - from ..shared.config import DatabaseConfig - class HashStore: - """PostgreSQL-based hash store for deduplication""" def __init__(self, db_config: DatabaseConfig): - """Initialize hash store - - Args: - db_config: Database configuration - """ self.db_config = db_config self._connection = None def _get_connection(self): - """Get or create database connection""" if self._connection is None or self._connection.closed: - self._connection = psycopg2.connect( - host=self.db_config.host, - port=self.db_config.port, - database=self.db_config.database, - user=self.db_config.user, - password=self.db_config.password - ) + self._connection = psycopg2.connect(host=self.db_config.host, port=self.db_config.port, database=self.db_config.database, user=self.db_config.user, password=self.db_config.password) return self._connection def _ensure_tables(self): - """Ensure hash store tables exist""" conn = self._get_connection() cursor = conn.cursor() - - # Create hashes table for file-level deduplication - cursor.execute(""" - CREATE TABLE IF NOT EXISTS file_hashes ( - checksum TEXT PRIMARY KEY, - canonical_path TEXT NOT NULL, - size BIGINT NOT NULL, - first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - ref_count INTEGER DEFAULT 1 - ) - """) - - # Create chunk hashes table for chunk-level deduplication - cursor.execute(""" - CREATE TABLE IF NOT EXISTS chunk_hashes ( - chunk_hash TEXT PRIMARY KEY, - size INTEGER NOT NULL, - first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - ref_count INTEGER DEFAULT 1 - ) - """) - - # Create file-chunk mapping table - cursor.execute(""" - CREATE TABLE IF NOT EXISTS file_chunks ( - id SERIAL PRIMARY KEY, - file_checksum TEXT NOT NULL, - chunk_hash TEXT NOT NULL, - chunk_index INTEGER NOT NULL, - FOREIGN KEY (file_checksum) REFERENCES file_hashes(checksum), - FOREIGN KEY (chunk_hash) REFERENCES chunk_hashes(chunk_hash), - UNIQUE (file_checksum, chunk_index) - ) - """) - - # Create indexes - cursor.execute(""" - CREATE INDEX IF NOT EXISTS idx_file_chunks_file - ON file_chunks(file_checksum) - """) - - cursor.execute(""" - CREATE INDEX IF NOT EXISTS idx_file_chunks_chunk - ON file_chunks(chunk_hash) - """) - + cursor.execute('\n CREATE TABLE IF NOT EXISTS file_hashes (\n checksum TEXT PRIMARY KEY,\n canonical_path TEXT NOT NULL,\n size BIGINT NOT NULL,\n first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,\n ref_count INTEGER DEFAULT 1\n )\n ') + cursor.execute('\n CREATE TABLE IF NOT EXISTS chunk_hashes (\n chunk_hash TEXT PRIMARY KEY,\n size INTEGER NOT NULL,\n first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,\n ref_count INTEGER DEFAULT 1\n )\n ') + cursor.execute('\n CREATE TABLE IF NOT EXISTS file_chunks (\n id SERIAL PRIMARY KEY,\n file_checksum TEXT NOT NULL,\n chunk_hash TEXT NOT NULL,\n chunk_index INTEGER NOT NULL,\n FOREIGN KEY (file_checksum) REFERENCES file_hashes(checksum),\n FOREIGN KEY (chunk_hash) REFERENCES chunk_hashes(chunk_hash),\n UNIQUE (file_checksum, chunk_index)\n )\n ') + cursor.execute('\n CREATE INDEX IF NOT EXISTS idx_file_chunks_file\n ON file_chunks(file_checksum)\n ') + cursor.execute('\n CREATE INDEX IF NOT EXISTS idx_file_chunks_chunk\n ON file_chunks(chunk_hash)\n ') conn.commit() cursor.close() def exists(self, checksum: str) -> bool: - """Check if hash exists in store - - Args: - checksum: File hash to check - - Returns: - True if hash exists - """ self._ensure_tables() conn = self._get_connection() cursor = conn.cursor() - - cursor.execute( - "SELECT 1 FROM file_hashes WHERE checksum = %s LIMIT 1", - (checksum,) - ) - + cursor.execute('SELECT 1 FROM file_hashes WHERE checksum = %s LIMIT 1', (checksum,)) exists = cursor.fetchone() is not None cursor.close() - return exists def get_canonical(self, checksum: str) -> Optional[str]: - """Get canonical path for a hash - - Args: - checksum: File hash - - Returns: - Canonical file path or None if not found - """ self._ensure_tables() conn = self._get_connection() cursor = conn.cursor() - - cursor.execute( - "SELECT canonical_path FROM file_hashes WHERE checksum = %s", - (checksum,) - ) - + cursor.execute('SELECT canonical_path FROM file_hashes WHERE checksum = %s', (checksum,)) result = cursor.fetchone() cursor.close() - return result[0] if result else None - def store_canonical( - self, - checksum: str, - path: Path, - size: int, - chunk_hashes: Optional[list[str]] = None - ) -> None: - """Store canonical reference for a hash - - Args: - checksum: File hash - path: Canonical file path - size: File size in bytes - chunk_hashes: Optional list of chunk hashes - """ + def store_canonical(self, checksum: str, path: Path, size: int, chunk_hashes: Optional[list[str]]=None) -> None: self._ensure_tables() conn = self._get_connection() cursor = conn.cursor() - try: - # Store file hash - cursor.execute(""" - INSERT INTO file_hashes (checksum, canonical_path, size) - VALUES (%s, %s, %s) - ON CONFLICT (checksum) DO UPDATE SET - ref_count = file_hashes.ref_count + 1 - """, (checksum, str(path), size)) - - # Store chunk hashes if provided + cursor.execute('\n INSERT INTO file_hashes (checksum, canonical_path, size)\n VALUES (%s, %s, %s)\n ON CONFLICT (checksum) DO UPDATE SET\n ref_count = file_hashes.ref_count + 1\n ', (checksum, str(path), size)) if chunk_hashes: - # Insert chunk hashes chunk_data = [(chunk_hash, 0) for chunk_hash in chunk_hashes] - execute_batch(cursor, """ - INSERT INTO chunk_hashes (chunk_hash, size) - VALUES (%s, %s) - ON CONFLICT (chunk_hash) DO UPDATE SET - ref_count = chunk_hashes.ref_count + 1 - """, chunk_data, page_size=1000) - - # Create file-chunk mappings - mapping_data = [ - (checksum, chunk_hash, idx) - for idx, chunk_hash in enumerate(chunk_hashes) - ] - execute_batch(cursor, """ - INSERT INTO file_chunks (file_checksum, chunk_hash, chunk_index) - VALUES (%s, %s, %s) - ON CONFLICT (file_checksum, chunk_index) DO NOTHING - """, mapping_data, page_size=1000) - + execute_batch(cursor, '\n INSERT INTO chunk_hashes (chunk_hash, size)\n VALUES (%s, %s)\n ON CONFLICT (chunk_hash) DO UPDATE SET\n ref_count = chunk_hashes.ref_count + 1\n ', chunk_data, page_size=1000) + mapping_data = [(checksum, chunk_hash, idx) for idx, chunk_hash in enumerate(chunk_hashes)] + execute_batch(cursor, '\n INSERT INTO file_chunks (file_checksum, chunk_hash, chunk_index)\n VALUES (%s, %s, %s)\n ON CONFLICT (file_checksum, chunk_index) DO NOTHING\n ', mapping_data, page_size=1000) conn.commit() - except Exception as e: conn.rollback() raise - finally: cursor.close() def get_chunk_hashes(self, checksum: str) -> list[str]: - """Get chunk hashes for a file - - Args: - checksum: File hash - - Returns: - List of chunk hashes in order - """ self._ensure_tables() conn = self._get_connection() cursor = conn.cursor() - - cursor.execute(""" - SELECT chunk_hash - FROM file_chunks - WHERE file_checksum = %s - ORDER BY chunk_index - """, (checksum,)) - + cursor.execute('\n SELECT chunk_hash\n FROM file_chunks\n WHERE file_checksum = %s\n ORDER BY chunk_index\n ', (checksum,)) chunk_hashes = [row[0] for row in cursor.fetchall()] cursor.close() - return chunk_hashes def get_duplicates(self) -> Dict[str, list[str]]: - """Get all duplicate file groups - - Returns: - Dictionary mapping canonical path to list of duplicate paths - """ self._ensure_tables() conn = self._get_connection() cursor = conn.cursor() - - # Get all files with their hashes - cursor.execute(""" - SELECT f.path, f.checksum - FROM files f - WHERE f.checksum IS NOT NULL - """) - - # Group by checksum + cursor.execute('\n SELECT f.path, f.checksum\n FROM files f\n WHERE f.checksum IS NOT NULL\n ') hash_to_paths: Dict[str, list[str]] = {} for path, checksum in cursor.fetchall(): if checksum not in hash_to_paths: hash_to_paths[checksum] = [] hash_to_paths[checksum].append(path) - cursor.close() - - # Filter to only duplicates (more than one file) - duplicates = { - paths[0]: paths[1:] - for checksum, paths in hash_to_paths.items() - if len(paths) > 1 - } - + duplicates = {paths[0]: paths[1:] for checksum, paths in hash_to_paths.items() if len(paths) > 1} return duplicates def get_stats(self) -> Dict[str, int]: - """Get hash store statistics - - Returns: - Dictionary with statistics - """ self._ensure_tables() conn = self._get_connection() cursor = conn.cursor() - stats = {} - - # Count unique file hashes - cursor.execute("SELECT COUNT(*) FROM file_hashes") + cursor.execute('SELECT COUNT(*) FROM file_hashes') stats['unique_files'] = cursor.fetchone()[0] - - # Count unique chunk hashes - cursor.execute("SELECT COUNT(*) FROM chunk_hashes") + cursor.execute('SELECT COUNT(*) FROM chunk_hashes') stats['unique_chunks'] = cursor.fetchone()[0] - - # Count total references - cursor.execute("SELECT COALESCE(SUM(ref_count), 0) FROM file_hashes") + cursor.execute('SELECT COALESCE(SUM(ref_count), 0) FROM file_hashes') stats['total_file_refs'] = cursor.fetchone()[0] - - # Count total chunk references - cursor.execute("SELECT COALESCE(SUM(ref_count), 0) FROM chunk_hashes") + cursor.execute('SELECT COALESCE(SUM(ref_count), 0) FROM chunk_hashes') stats['total_chunk_refs'] = cursor.fetchone()[0] - - # Calculate deduplication ratio if stats['total_file_refs'] > 0: stats['dedup_ratio'] = stats['unique_files'] / stats['total_file_refs'] else: stats['dedup_ratio'] = 1.0 - cursor.close() - return stats - def find_similar_files(self, checksum: str, threshold: float = 0.8) -> list[tuple[str, float]]: - """Find files similar to given hash based on chunk overlap - - Args: - checksum: File hash to compare - threshold: Similarity threshold (0.0 to 1.0) - - Returns: - List of tuples (other_checksum, similarity_score) - """ + def find_similar_files(self, checksum: str, threshold: float=0.8) -> list[tuple[str, float]]: self._ensure_tables() conn = self._get_connection() cursor = conn.cursor() - - # Get chunks for the target file target_chunks = set(self.get_chunk_hashes(checksum)) - if not target_chunks: cursor.close() return [] - - # Find files sharing chunks - cursor.execute(""" - SELECT DISTINCT fc.file_checksum - FROM file_chunks fc - WHERE fc.chunk_hash = ANY(%s) - AND fc.file_checksum != %s - """, (list(target_chunks), checksum)) - + cursor.execute('\n SELECT DISTINCT fc.file_checksum\n FROM file_chunks fc\n WHERE fc.chunk_hash = ANY(%s)\n AND fc.file_checksum != %s\n ', (list(target_chunks), checksum)) similar_files = [] - - for (other_checksum,) in cursor.fetchall(): + for other_checksum, in cursor.fetchall(): other_chunks = set(self.get_chunk_hashes(other_checksum)) - - # Calculate Jaccard similarity intersection = len(target_chunks & other_chunks) union = len(target_chunks | other_chunks) - if union > 0: similarity = intersection / union - if similarity >= threshold: similar_files.append((other_checksum, similarity)) - cursor.close() - - # Sort by similarity descending similar_files.sort(key=lambda x: x[1], reverse=True) - return similar_files def close(self): - """Close database connection""" - if self._connection and not self._connection.closed: + if self._connection and (not self._connection.closed): self._connection.close() def __enter__(self): - """Context manager entry""" self._ensure_tables() return self def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager exit""" self.close() - class MemoryHashStore: - """In-memory hash store for testing and small datasets""" def __init__(self): - """Initialize in-memory hash store""" self.hashes: Dict[str, tuple[str, int]] = {} self.chunks: Dict[str, int] = {} self.file_chunks: Dict[str, list[str]] = {} def exists(self, checksum: str) -> bool: - """Check if hash exists""" return checksum in self.hashes def get_canonical(self, checksum: str) -> Optional[str]: - """Get canonical path""" return self.hashes.get(checksum, (None, 0))[0] - def store_canonical( - self, - checksum: str, - path: Path, - size: int, - chunk_hashes: Optional[list[str]] = None - ) -> None: - """Store canonical reference""" + def store_canonical(self, checksum: str, path: Path, size: int, chunk_hashes: Optional[list[str]]=None) -> None: self.hashes[checksum] = (str(path), size) - if chunk_hashes: self.file_chunks[checksum] = chunk_hashes for chunk_hash in chunk_hashes: self.chunks[chunk_hash] = self.chunks.get(chunk_hash, 0) + 1 def get_chunk_hashes(self, checksum: str) -> list[str]: - """Get chunk hashes""" return self.file_chunks.get(checksum, []) def get_stats(self) -> Dict[str, int]: - """Get statistics""" - return { - 'unique_files': len(self.hashes), - 'unique_chunks': len(self.chunks), - 'total_file_refs': len(self.hashes), - 'total_chunk_refs': sum(self.chunks.values()), - 'dedup_ratio': 1.0 - } + return {'unique_files': len(self.hashes), 'unique_chunks': len(self.chunks), 'total_file_refs': len(self.hashes), 'total_chunk_refs': sum(self.chunks.values()), 'dedup_ratio': 1.0} def close(self): - """No-op for compatibility""" pass def __enter__(self): - """Context manager entry""" return self def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager exit""" pass diff --git a/app/migration/__init__.py b/app/migration/__init__.py index 601434d..b29638f 100644 --- a/app/migration/__init__.py +++ b/app/migration/__init__.py @@ -1,27 +1,5 @@ -"""Migration package exports""" -from .copy import ( - CopyMigrationStrategy, - FastCopyStrategy, - SafeCopyStrategy, - ReferenceCopyStrategy -) -from .hardlink import ( - HardlinkMigrationStrategy, - SymlinkMigrationStrategy, - DedupHardlinkStrategy -) +from .copy import CopyMigrationStrategy, FastCopyStrategy, SafeCopyStrategy, ReferenceCopyStrategy +from .hardlink import HardlinkMigrationStrategy, SymlinkMigrationStrategy, DedupHardlinkStrategy from .engine import MigrationEngine from ._protocols import IMigrationStrategy, IMigrationEngine - -__all__ = [ - 'CopyMigrationStrategy', - 'FastCopyStrategy', - 'SafeCopyStrategy', - 'ReferenceCopyStrategy', - 'HardlinkMigrationStrategy', - 'SymlinkMigrationStrategy', - 'DedupHardlinkStrategy', - 'MigrationEngine', - 'IMigrationStrategy', - 'IMigrationEngine', -] +__all__ = ['CopyMigrationStrategy', 'FastCopyStrategy', 'SafeCopyStrategy', 'ReferenceCopyStrategy', 'HardlinkMigrationStrategy', 'SymlinkMigrationStrategy', 'DedupHardlinkStrategy', 'MigrationEngine', 'IMigrationStrategy', 'IMigrationEngine'] diff --git a/app/migration/_protocols.py b/app/migration/_protocols.py index 188b8ee..6d7b996 100644 --- a/app/migration/_protocols.py +++ b/app/migration/_protocols.py @@ -1,107 +1,28 @@ -"""Protocol definitions for the migration package""" from typing import Protocol from pathlib import Path from ..shared.models import OperationRecord - class IMigrationStrategy(Protocol): - """Protocol for migration strategies""" - def migrate( - self, - source: Path, - destination: Path, - verify: bool = True - ) -> bool: - """Migrate a file from source to destination - - Args: - source: Source file path - destination: Destination file path - verify: Whether to verify the operation - - Returns: - True if migration successful - """ + def migrate(self, source: Path, destination: Path, verify: bool=True) -> bool: ... def can_migrate(self, source: Path, destination: Path) -> bool: - """Check if migration is possible - - Args: - source: Source file path - destination: Destination file path - - Returns: - True if migration is possible - """ ... def estimate_time(self, source: Path) -> float: - """Estimate migration time in seconds - - Args: - source: Source file path - - Returns: - Estimated time in seconds - """ ... def cleanup(self, source: Path) -> bool: - """Cleanup source file after successful migration - - Args: - source: Source file path - - Returns: - True if cleanup successful - """ ... - class IMigrationEngine(Protocol): - """Protocol for migration engine""" - def plan_migration( - self, - disk: str, - target_base: Path - ) -> list[OperationRecord]: - """Plan migration for a disk - - Args: - disk: Disk identifier - target_base: Target base directory - - Returns: - List of planned operations - """ + def plan_migration(self, disk: str, target_base: Path) -> list[OperationRecord]: ... - def execute_migration( - self, - operations: list[OperationRecord], - dry_run: bool = False - ) -> dict: - """Execute migration operations - - Args: - operations: List of operations to execute - dry_run: Whether to perform a dry run - - Returns: - Dictionary with execution statistics - """ + def execute_migration(self, operations: list[OperationRecord], dry_run: bool=False) -> dict: ... def rollback(self, operation: OperationRecord) -> bool: - """Rollback a migration operation - - Args: - operation: Operation to rollback - - Returns: - True if rollback successful - """ ... diff --git a/app/migration/copy.py b/app/migration/copy.py index 7304c0f..3e35a8e 100644 --- a/app/migration/copy.py +++ b/app/migration/copy.py @@ -1,268 +1,129 @@ -"""Copy-based migration strategy""" import shutil from pathlib import Path from typing import Optional import os - from ..shared.logger import ProgressLogger - class CopyMigrationStrategy: - """Copy files to destination with verification""" - def __init__( - self, - logger: Optional[ProgressLogger] = None, - preserve_metadata: bool = True, - verify_checksums: bool = True - ): - """Initialize copy migration strategy - - Args: - logger: Optional progress logger - preserve_metadata: Whether to preserve file metadata - verify_checksums: Whether to verify checksums after copy - """ + def __init__(self, logger: Optional[ProgressLogger]=None, preserve_metadata: bool=True, verify_checksums: bool=True): self.logger = logger self.preserve_metadata = preserve_metadata self.verify_checksums = verify_checksums - def migrate( - self, - source: Path, - destination: Path, - verify: bool = True - ) -> bool: - """Migrate file by copying - - Args: - source: Source file path - destination: Destination file path - verify: Whether to verify the operation - - Returns: - True if migration successful - """ + def migrate(self, source: Path, destination: Path, verify: bool=True) -> bool: if not source.exists(): if self.logger: - self.logger.error(f"Source file does not exist: {source}") + self.logger.error(f'Source file does not exist: {source}') return False - - # Create destination directory destination.parent.mkdir(parents=True, exist_ok=True) - try: - # Copy file if self.preserve_metadata: shutil.copy2(source, destination) else: shutil.copy(source, destination) - - # Verify if requested if verify and self.verify_checksums: if not self._verify_copy(source, destination): if self.logger: - self.logger.error(f"Verification failed: {source} -> {destination}") + self.logger.error(f'Verification failed: {source} -> {destination}') destination.unlink() return False - return True - except Exception as e: if self.logger: - self.logger.error(f"Copy failed: {source} -> {destination}: {e}") + self.logger.error(f'Copy failed: {source} -> {destination}: {e}') return False def _verify_copy(self, source: Path, destination: Path) -> bool: - """Verify copied file - - Args: - source: Source file path - destination: Destination file path - - Returns: - True if verification successful - """ - # Check size source_size = source.stat().st_size dest_size = destination.stat().st_size - if source_size != dest_size: return False - - # Compare checksums for files larger than 1MB if source_size > 1024 * 1024: from ..deduplication.chunker import hash_file - source_hash = hash_file(source) dest_hash = hash_file(destination) - return source_hash == dest_hash - - # For small files, compare content directly with open(source, 'rb') as f1, open(destination, 'rb') as f2: return f1.read() == f2.read() def can_migrate(self, source: Path, destination: Path) -> bool: - """Check if migration is possible - - Args: - source: Source file path - destination: Destination file path - - Returns: - True if migration is possible - """ if not source.exists(): return False - - # Check if destination directory is writable dest_dir = destination.parent if dest_dir.exists(): return os.access(dest_dir, os.W_OK) - - # Check if parent directory exists and is writable parent = dest_dir.parent while not parent.exists() and parent != parent.parent: parent = parent.parent - return parent.exists() and os.access(parent, os.W_OK) def estimate_time(self, source: Path) -> float: - """Estimate migration time in seconds - - Args: - source: Source file path - - Returns: - Estimated time in seconds - """ if not source.exists(): return 0.0 - size = source.stat().st_size - - # Estimate based on typical copy speed (100 MB/s) - typical_speed = 100 * 1024 * 1024 # bytes per second + typical_speed = 100 * 1024 * 1024 return size / typical_speed def cleanup(self, source: Path) -> bool: - """Cleanup source file after successful migration - - Args: - source: Source file path - - Returns: - True if cleanup successful - """ try: if source.exists(): source.unlink() return True except Exception as e: if self.logger: - self.logger.warning(f"Failed to cleanup {source}: {e}") + self.logger.warning(f'Failed to cleanup {source}: {e}') return False - class FastCopyStrategy(CopyMigrationStrategy): - """Fast copy strategy without verification""" - - def __init__(self, logger: Optional[ProgressLogger] = None): - """Initialize fast copy strategy""" - super().__init__( - logger=logger, - preserve_metadata=True, - verify_checksums=False - ) + def __init__(self, logger: Optional[ProgressLogger]=None): + super().__init__(logger=logger, preserve_metadata=True, verify_checksums=False) class SafeCopyStrategy(CopyMigrationStrategy): - """Safe copy strategy with full verification""" - - def __init__(self, logger: Optional[ProgressLogger] = None): - """Initialize safe copy strategy""" - super().__init__( - logger=logger, - preserve_metadata=True, - verify_checksums=True - ) + def __init__(self, logger: Optional[ProgressLogger]=None): + super().__init__(logger=logger, preserve_metadata=True, verify_checksums=True) class ReferenceCopyStrategy: - """Create reference copy using reflinks (CoW) if supported""" - def __init__(self, logger: Optional[ProgressLogger] = None): - """Initialize reflink copy strategy""" + def __init__(self, logger: Optional[ProgressLogger]=None): self.logger = logger - def migrate( - self, - source: Path, - destination: Path, - verify: bool = True - ) -> bool: - """Migrate using reflink (copy-on-write) - - Args: - source: Source file path - destination: Destination file path - verify: Whether to verify the operation - - Returns: - True if migration successful - """ + def migrate(self, source: Path, destination: Path, verify: bool=True) -> bool: if not source.exists(): if self.logger: - self.logger.error(f"Source file does not exist: {source}") + self.logger.error(f'Source file does not exist: {source}') return False - - # Create destination directory destination.parent.mkdir(parents=True, exist_ok=True) - try: - # Try reflink copy (works on btrfs, xfs, etc.) import subprocess - - result = subprocess.run( - ['cp', '--reflink=auto', str(source), str(destination)], - capture_output=True, - check=False - ) - + result = subprocess.run(['cp', '--reflink=auto', str(source), str(destination)], capture_output=True, check=False) if result.returncode != 0: - # Fallback to regular copy shutil.copy2(source, destination) - return True - except Exception as e: if self.logger: - self.logger.error(f"Reflink copy failed: {source} -> {destination}: {e}") + self.logger.error(f'Reflink copy failed: {source} -> {destination}: {e}') return False def can_migrate(self, source: Path, destination: Path) -> bool: - """Check if migration is possible""" if not source.exists(): return False - dest_dir = destination.parent if dest_dir.exists(): return os.access(dest_dir, os.W_OK) - return True def estimate_time(self, source: Path) -> float: - """Estimate migration time (reflinks are fast)""" - return 0.1 # Reflinks are nearly instant + return 0.1 def cleanup(self, source: Path) -> bool: - """Cleanup source file""" try: if source.exists(): source.unlink() return True except Exception as e: if self.logger: - self.logger.warning(f"Failed to cleanup {source}: {e}") + self.logger.warning(f'Failed to cleanup {source}: {e}') return False diff --git a/app/migration/engine.py b/app/migration/engine.py index bf742a5..17ddcb0 100644 --- a/app/migration/engine.py +++ b/app/migration/engine.py @@ -1,254 +1,100 @@ -"""Migration engine""" from pathlib import Path from typing import Optional, Callable from datetime import datetime import psycopg2 from psycopg2.extras import execute_batch - from .copy import CopyMigrationStrategy, SafeCopyStrategy from .hardlink import HardlinkMigrationStrategy, SymlinkMigrationStrategy from ..shared.models import OperationRecord, ProcessingStats, MigrationPlan from ..shared.config import DatabaseConfig, ProcessingConfig from ..shared.logger import ProgressLogger - class MigrationEngine: - """Engine for migrating files""" - def __init__( - self, - db_config: DatabaseConfig, - processing_config: ProcessingConfig, - logger: ProgressLogger, - target_base: Path - ): - """Initialize migration engine - - Args: - db_config: Database configuration - processing_config: Processing configuration - logger: Progress logger - target_base: Target base directory for migrations - """ + def __init__(self, db_config: DatabaseConfig, processing_config: ProcessingConfig, logger: ProgressLogger, target_base: Path): self.db_config = db_config self.processing_config = processing_config self.logger = logger self.target_base = Path(target_base) self._connection = None - - # Initialize strategies self.copy_strategy = SafeCopyStrategy(logger=logger) self.hardlink_strategy = HardlinkMigrationStrategy(logger=logger) self.symlink_strategy = SymlinkMigrationStrategy(logger=logger) def _get_connection(self): - """Get or create database connection""" if self._connection is None or self._connection.closed: - self._connection = psycopg2.connect( - host=self.db_config.host, - port=self.db_config.port, - database=self.db_config.database, - user=self.db_config.user, - password=self.db_config.password - ) + self._connection = psycopg2.connect(host=self.db_config.host, port=self.db_config.port, database=self.db_config.database, user=self.db_config.user, password=self.db_config.password) return self._connection def _ensure_tables(self): - """Ensure migration tables exist""" conn = self._get_connection() cursor = conn.cursor() - - # Create operations table - cursor.execute(""" - CREATE TABLE IF NOT EXISTS operations ( - id SERIAL PRIMARY KEY, - source_path TEXT NOT NULL, - target_path TEXT NOT NULL, - operation_type TEXT NOT NULL, - size BIGINT DEFAULT 0, - status TEXT DEFAULT 'pending', - error TEXT, - executed_at TIMESTAMP, - verified BOOLEAN DEFAULT FALSE, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ) - """) - - # Create index on status - cursor.execute(""" - CREATE INDEX IF NOT EXISTS idx_operations_status - ON operations(status) - """) - + cursor.execute("\n CREATE TABLE IF NOT EXISTS operations (\n id SERIAL PRIMARY KEY,\n source_path TEXT NOT NULL,\n target_path TEXT NOT NULL,\n operation_type TEXT NOT NULL,\n size BIGINT DEFAULT 0,\n status TEXT DEFAULT 'pending',\n error TEXT,\n executed_at TIMESTAMP,\n verified BOOLEAN DEFAULT FALSE,\n created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n )\n ") + cursor.execute('\n CREATE INDEX IF NOT EXISTS idx_operations_status\n ON operations(status)\n ') conn.commit() cursor.close() - def plan_migration( - self, - disk: Optional[str] = None, - category: Optional[str] = None - ) -> MigrationPlan: - """Plan migration for files - - Args: - disk: Optional disk filter - category: Optional category filter - - Returns: - MigrationPlan with planned operations - """ - self.logger.section("Planning Migration") - + def plan_migration(self, disk: Optional[str]=None, category: Optional[str]=None) -> MigrationPlan: + self.logger.section('Planning Migration') conn = self._get_connection() cursor = conn.cursor() - - # Build query - conditions = ["category IS NOT NULL"] + conditions = ['category IS NOT NULL'] params = [] - if disk: - conditions.append("disk_label = %s") + conditions.append('disk_label = %s') params.append(disk) - if category: - conditions.append("category = %s") + conditions.append('category = %s') params.append(category) - - query = f""" - SELECT path, size, category, duplicate_of - FROM files - WHERE {' AND '.join(conditions)} - ORDER BY category, path - """ - + query = f"\n SELECT path, size, category, duplicate_of\n FROM files\n WHERE {' AND '.join(conditions)}\n ORDER BY category, path\n " cursor.execute(query, params) files = cursor.fetchall() - - self.logger.info(f"Found {len(files)} files to migrate") - + self.logger.info(f'Found {len(files)} files to migrate') operations = [] total_size = 0 - for path_str, size, file_category, duplicate_of in files: source = Path(path_str) - - # Determine destination target_path = self.target_base / file_category / source.name - - # Determine operation type if duplicate_of: - # Use hardlink for duplicates operation_type = 'hardlink' else: - # Use copy for unique files operation_type = 'copy' - - operation = OperationRecord( - source_path=source, - target_path=target_path, - operation_type=operation_type, - size=size - ) - + operation = OperationRecord(source_path=source, target_path=target_path, operation_type=operation_type, size=size) operations.append(operation) total_size += size - cursor.close() - - plan = MigrationPlan( - target_disk=str(self.target_base), - destination_disks=[str(self.target_base)], - operations=operations, - total_size=total_size, - file_count=len(operations) - ) - - self.logger.info( - f"Migration plan created: {plan.file_count} files, " - f"{plan.total_size:,} bytes" - ) - + plan = MigrationPlan(target_disk=str(self.target_base), destination_disks=[str(self.target_base)], operations=operations, total_size=total_size, file_count=len(operations)) + self.logger.info(f'Migration plan created: {plan.file_count} files, {plan.total_size:,} bytes') return plan - def execute_migration( - self, - operations: list[OperationRecord], - dry_run: bool = False, - progress_callback: Optional[Callable[[int, int, ProcessingStats], None]] = None - ) -> ProcessingStats: - """Execute migration operations - - Args: - operations: List of operations to execute - dry_run: Whether to perform a dry run - progress_callback: Optional callback for progress updates - - Returns: - ProcessingStats with execution statistics - """ - self.logger.section("Executing Migration" + (" (DRY RUN)" if dry_run else "")) - + def execute_migration(self, operations: list[OperationRecord], dry_run: bool=False, progress_callback: Optional[Callable[[int, int, ProcessingStats], None]]=None) -> ProcessingStats: + self.logger.section('Executing Migration' + (' (DRY RUN)' if dry_run else '')) self._ensure_tables() - stats = ProcessingStats() total_ops = len(operations) - for operation in operations: stats.files_processed += 1 - if dry_run: - # In dry run, just log what would happen - self.logger.debug( - f"[DRY RUN] Would {operation.operation_type}: " - f"{operation.source_path} -> {operation.target_path}" - ) + self.logger.debug(f'[DRY RUN] Would {operation.operation_type}: {operation.source_path} -> {operation.target_path}') stats.files_succeeded += 1 else: - # Execute actual migration success = self._execute_operation(operation) - if success: stats.files_succeeded += 1 stats.bytes_processed += operation.size else: stats.files_failed += 1 - - # Progress callback if progress_callback and stats.files_processed % 100 == 0: progress_callback(stats.files_processed, total_ops, stats) - - # Log progress if stats.files_processed % 1000 == 0: - self.logger.progress( - stats.files_processed, - total_ops, - prefix="Operations executed", - bytes_processed=stats.bytes_processed, - elapsed_seconds=stats.elapsed_seconds - ) - - self.logger.info( - f"Migration {'dry run' if dry_run else 'execution'} complete: " - f"{stats.files_succeeded}/{total_ops} operations, " - f"{stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s" - ) - + self.logger.progress(stats.files_processed, total_ops, prefix='Operations executed', bytes_processed=stats.bytes_processed, elapsed_seconds=stats.elapsed_seconds) + self.logger.info(f"Migration {('dry run' if dry_run else 'execution')} complete: {stats.files_succeeded}/{total_ops} operations, {stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s") return stats def _execute_operation(self, operation: OperationRecord) -> bool: - """Execute a single migration operation - - Args: - operation: Operation to execute - - Returns: - True if successful - """ operation.status = 'in_progress' operation.executed_at = datetime.now() - try: - # Select strategy based on operation type if operation.operation_type == 'copy': strategy = self.copy_strategy elif operation.operation_type == 'hardlink': @@ -256,15 +102,8 @@ class MigrationEngine: elif operation.operation_type == 'symlink': strategy = self.symlink_strategy else: - raise ValueError(f"Unknown operation type: {operation.operation_type}") - - # Execute migration - success = strategy.migrate( - operation.source_path, - operation.target_path, - verify=self.processing_config.verify_operations - ) - + raise ValueError(f'Unknown operation type: {operation.operation_type}') + success = strategy.migrate(operation.source_path, operation.target_path, verify=self.processing_config.verify_operations) if success: operation.status = 'completed' operation.verified = True @@ -272,183 +111,85 @@ class MigrationEngine: return True else: operation.status = 'failed' - operation.error = "Migration failed" + operation.error = 'Migration failed' self._record_operation(operation) return False - except Exception as e: operation.status = 'failed' operation.error = str(e) self._record_operation(operation) - self.logger.error(f"Operation failed: {operation.source_path}: {e}") + self.logger.error(f'Operation failed: {operation.source_path}: {e}') return False def _record_operation(self, operation: OperationRecord): - """Record operation in database - - Args: - operation: Operation to record - """ conn = self._get_connection() cursor = conn.cursor() - - cursor.execute(""" - INSERT INTO operations ( - source_path, target_path, operation_type, bytes_processed, - status, error, executed_at, verified - ) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s) - """, ( - str(operation.source_path), - str(operation.target_path), - operation.operation_type, - operation.size, - operation.status, - operation.error, - operation.executed_at, - operation.verified - )) - + cursor.execute('\n INSERT INTO operations (\n source_path, target_path, operation_type, bytes_processed,\n status, error, executed_at, verified\n )\n VALUES (%s, %s, %s, %s, %s, %s, %s, %s)\n ', (str(operation.source_path), str(operation.target_path), operation.operation_type, operation.size, operation.status, operation.error, operation.executed_at, operation.verified)) conn.commit() cursor.close() def rollback(self, operation: OperationRecord) -> bool: - """Rollback a migration operation - - Args: - operation: Operation to rollback - - Returns: - True if rollback successful - """ - self.logger.warning(f"Rolling back: {operation.target_path}") - + self.logger.warning(f'Rolling back: {operation.target_path}') try: - # Remove destination if operation.target_path.exists(): operation.target_path.unlink() - - # Update database conn = self._get_connection() cursor = conn.cursor() - - cursor.execute(""" - UPDATE operations - SET status = 'rolled_back' - WHERE source_path = %s AND target_path = %s - """, (str(operation.source_path), str(operation.target_path))) - + cursor.execute("\n UPDATE operations\n SET status = 'rolled_back'\n WHERE source_path = %s AND target_path = %s\n ", (str(operation.source_path), str(operation.target_path))) conn.commit() cursor.close() - return True - except Exception as e: - self.logger.error(f"Rollback failed: {operation.target_path}: {e}") + self.logger.error(f'Rollback failed: {operation.target_path}: {e}') return False def get_migration_stats(self) -> dict: - """Get migration statistics - - Returns: - Dictionary with statistics - """ conn = self._get_connection() cursor = conn.cursor() - stats = {} - - # Total operations - cursor.execute("SELECT COUNT(*) FROM operations") + cursor.execute('SELECT COUNT(*) FROM operations') stats['total_operations'] = cursor.fetchone()[0] - - # Operations by status - cursor.execute(""" - SELECT status, COUNT(*) - FROM operations - GROUP BY status - """) - + cursor.execute('\n SELECT status, COUNT(*)\n FROM operations\n GROUP BY status\n ') for status, count in cursor.fetchall(): stats[f'{status}_operations'] = count - - # Total size migrated - cursor.execute(""" - SELECT COALESCE(SUM(size), 0) - FROM operations - WHERE status = 'completed' - """) + cursor.execute("\n SELECT COALESCE(SUM(size), 0)\n FROM operations\n WHERE status = 'completed'\n ") stats['total_size_migrated'] = cursor.fetchone()[0] - cursor.close() - return stats def verify_migrations(self) -> dict: - """Verify completed migrations - - Returns: - Dictionary with verification results - """ - self.logger.subsection("Verifying Migrations") - + self.logger.subsection('Verifying Migrations') conn = self._get_connection() cursor = conn.cursor() - - cursor.execute(""" - SELECT source_path, target_path, operation_type - FROM operations - WHERE status = 'completed' AND verified = FALSE - """) - + cursor.execute("\n SELECT source_path, target_path, operation_type\n FROM operations\n WHERE status = 'completed' AND verified = FALSE\n ") operations = cursor.fetchall() cursor.close() - - results = { - 'total': len(operations), - 'verified': 0, - 'failed': 0 - } - + results = {'total': len(operations), 'verified': 0, 'failed': 0} for source_str, dest_str, op_type in operations: source = Path(source_str) dest = Path(dest_str) - - # Verify destination exists if not dest.exists(): results['failed'] += 1 - self.logger.warning(f"Verification failed: {dest} does not exist") + self.logger.warning(f'Verification failed: {dest} does not exist') continue - - # Verify based on operation type if op_type == 'hardlink': - # Check if hardlinked if source.exists() and source.stat().st_ino == dest.stat().st_ino: results['verified'] += 1 else: results['failed'] += 1 + elif dest.exists(): + results['verified'] += 1 else: - # Check if destination exists and has correct size - if dest.exists(): - results['verified'] += 1 - else: - results['failed'] += 1 - - self.logger.info( - f"Verification complete: {results['verified']}/{results['total']} verified" - ) - + results['failed'] += 1 + self.logger.info(f"Verification complete: {results['verified']}/{results['total']} verified") return results def close(self): - """Close database connection""" - if self._connection and not self._connection.closed: + if self._connection and (not self._connection.closed): self._connection.close() def __enter__(self): - """Context manager entry""" return self def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager exit""" self.close() diff --git a/app/migration/hardlink.py b/app/migration/hardlink.py index d7d2987..62efe5b 100644 --- a/app/migration/hardlink.py +++ b/app/migration/hardlink.py @@ -1,90 +1,43 @@ -"""Hardlink-based migration strategy""" import os from pathlib import Path from typing import Optional - from ..shared.logger import ProgressLogger - class HardlinkMigrationStrategy: - """Create hardlinks to files instead of copying""" - def __init__(self, logger: Optional[ProgressLogger] = None): - """Initialize hardlink migration strategy - - Args: - logger: Optional progress logger - """ + def __init__(self, logger: Optional[ProgressLogger]=None): self.logger = logger - def migrate( - self, - source: Path, - destination: Path, - verify: bool = True - ) -> bool: - """Migrate file by creating hardlink - - Args: - source: Source file path - destination: Destination file path - verify: Whether to verify the operation - - Returns: - True if migration successful - """ + def migrate(self, source: Path, destination: Path, verify: bool=True) -> bool: if not source.exists(): if self.logger: - self.logger.error(f"Source file does not exist: {source}") + self.logger.error(f'Source file does not exist: {source}') return False - - # Check if source and destination are on same filesystem if not self._same_filesystem(source, destination.parent): if self.logger: - self.logger.warning( - f"Cannot hardlink across filesystems: {source} -> {destination}" - ) + self.logger.warning(f'Cannot hardlink across filesystems: {source} -> {destination}') return False - - # Create destination directory destination.parent.mkdir(parents=True, exist_ok=True) - try: - # Create hardlink os.link(source, destination) - - # Verify if requested if verify: if not self._verify_hardlink(source, destination): if self.logger: - self.logger.error(f"Verification failed: {source} -> {destination}") + self.logger.error(f'Verification failed: {source} -> {destination}') destination.unlink() return False - return True - except FileExistsError: if self.logger: - self.logger.warning(f"Destination already exists: {destination}") + self.logger.warning(f'Destination already exists: {destination}') return False - except Exception as e: if self.logger: - self.logger.error(f"Hardlink failed: {source} -> {destination}: {e}") + self.logger.error(f'Hardlink failed: {source} -> {destination}: {e}') return False def _same_filesystem(self, path1: Path, path2: Path) -> bool: - """Check if two paths are on the same filesystem - - Args: - path1: First path - path2: Second path - - Returns: - True if on same filesystem - """ try: - # Get device IDs stat1 = path1.stat() stat2 = path2.stat() return stat1.st_dev == stat2.st_dev @@ -92,286 +45,117 @@ class HardlinkMigrationStrategy: return False def _verify_hardlink(self, source: Path, destination: Path) -> bool: - """Verify hardlink - - Args: - source: Source file path - destination: Destination file path - - Returns: - True if verification successful - """ try: - # Check if they have the same inode source_stat = source.stat() dest_stat = destination.stat() - return source_stat.st_ino == dest_stat.st_ino - except Exception: return False def can_migrate(self, source: Path, destination: Path) -> bool: - """Check if migration is possible - - Args: - source: Source file path - destination: Destination file path - - Returns: - True if migration is possible - """ if not source.exists(): return False - - # Check if on same filesystem dest_dir = destination.parent if dest_dir.exists(): return self._same_filesystem(source, dest_dir) - - # Check parent directories parent = dest_dir.parent while not parent.exists() and parent != parent.parent: parent = parent.parent - return parent.exists() and self._same_filesystem(source, parent) def estimate_time(self, source: Path) -> float: - """Estimate migration time in seconds - - Args: - source: Source file path - - Returns: - Estimated time in seconds (hardlinks are instant) - """ - return 0.01 # Hardlinks are nearly instant + return 0.01 def cleanup(self, source: Path) -> bool: - """Cleanup source file after successful migration - - Note: For hardlinks, we typically don't remove the source - immediately as both links point to the same inode. - - Args: - source: Source file path - - Returns: - True (no cleanup needed for hardlinks) - """ - # For hardlinks, we don't remove the source - # Both source and destination point to the same data return True - class SymlinkMigrationStrategy: - """Create symbolic links to files""" - def __init__( - self, - logger: Optional[ProgressLogger] = None, - absolute_links: bool = True - ): - """Initialize symlink migration strategy - - Args: - logger: Optional progress logger - absolute_links: Whether to create absolute symlinks - """ + def __init__(self, logger: Optional[ProgressLogger]=None, absolute_links: bool=True): self.logger = logger self.absolute_links = absolute_links - def migrate( - self, - source: Path, - destination: Path, - verify: bool = True - ) -> bool: - """Migrate file by creating symlink - - Args: - source: Source file path - destination: Destination file path - verify: Whether to verify the operation - - Returns: - True if migration successful - """ + def migrate(self, source: Path, destination: Path, verify: bool=True) -> bool: if not source.exists(): if self.logger: - self.logger.error(f"Source file does not exist: {source}") + self.logger.error(f'Source file does not exist: {source}') return False - - # Create destination directory destination.parent.mkdir(parents=True, exist_ok=True) - try: - # Determine link target if self.absolute_links: target = source.resolve() else: - # Create relative symlink target = os.path.relpath(source, destination.parent) - - # Create symlink destination.symlink_to(target) - - # Verify if requested if verify: if not self._verify_symlink(destination, source): if self.logger: - self.logger.error(f"Verification failed: {source} -> {destination}") + self.logger.error(f'Verification failed: {source} -> {destination}') destination.unlink() return False - return True - except FileExistsError: if self.logger: - self.logger.warning(f"Destination already exists: {destination}") + self.logger.warning(f'Destination already exists: {destination}') return False - except Exception as e: if self.logger: - self.logger.error(f"Symlink failed: {source} -> {destination}: {e}") + self.logger.error(f'Symlink failed: {source} -> {destination}: {e}') return False def _verify_symlink(self, symlink: Path, expected_target: Path) -> bool: - """Verify symlink - - Args: - symlink: Symlink path - expected_target: Expected target path - - Returns: - True if verification successful - """ try: - # Check if it's a symlink if not symlink.is_symlink(): return False - - # Resolve and compare resolved = symlink.resolve() expected = expected_target.resolve() - return resolved == expected - except Exception: return False def can_migrate(self, source: Path, destination: Path) -> bool: - """Check if migration is possible - - Args: - source: Source file path - destination: Destination file path - - Returns: - True if migration is possible - """ if not source.exists(): return False - - # Check if destination directory is writable dest_dir = destination.parent if dest_dir.exists(): return os.access(dest_dir, os.W_OK) - return True def estimate_time(self, source: Path) -> float: - """Estimate migration time in seconds - - Args: - source: Source file path - - Returns: - Estimated time in seconds (symlinks are instant) - """ - return 0.01 # Symlinks are instant + return 0.01 def cleanup(self, source: Path) -> bool: - """Cleanup source file after successful migration - - Note: For symlinks, we don't remove the source as the - symlink points to it. - - Args: - source: Source file path - - Returns: - True (no cleanup needed for symlinks) - """ - # For symlinks, we don't remove the source return True - class DedupHardlinkStrategy(HardlinkMigrationStrategy): - """Hardlink strategy for deduplication - Creates hardlinks for duplicate files to save space. - """ - - def __init__(self, logger: Optional[ProgressLogger] = None): - """Initialize dedup hardlink strategy""" + def __init__(self, logger: Optional[ProgressLogger]=None): super().__init__(logger=logger) - def deduplicate( - self, - canonical: Path, - duplicate: Path - ) -> bool: - """Replace duplicate with hardlink to canonical - - Args: - canonical: Canonical file path - duplicate: Duplicate file path - - Returns: - True if deduplication successful - """ + def deduplicate(self, canonical: Path, duplicate: Path) -> bool: if not canonical.exists(): if self.logger: - self.logger.error(f"Canonical file does not exist: {canonical}") + self.logger.error(f'Canonical file does not exist: {canonical}') return False - if not duplicate.exists(): if self.logger: - self.logger.error(f"Duplicate file does not exist: {duplicate}") + self.logger.error(f'Duplicate file does not exist: {duplicate}') return False - - # Check if already hardlinked if self._verify_hardlink(canonical, duplicate): return True - - # Check if on same filesystem if not self._same_filesystem(canonical, duplicate): if self.logger: - self.logger.warning( - f"Cannot hardlink across filesystems: {canonical} -> {duplicate}" - ) + self.logger.warning(f'Cannot hardlink across filesystems: {canonical} -> {duplicate}') return False - try: - # Create temporary backup backup = duplicate.with_suffix(duplicate.suffix + '.bak') duplicate.rename(backup) - - # Create hardlink os.link(canonical, duplicate) - - # Remove backup backup.unlink() - return True - except Exception as e: if self.logger: - self.logger.error(f"Deduplication failed: {duplicate}: {e}") - - # Restore from backup + self.logger.error(f'Deduplication failed: {duplicate}: {e}') if backup.exists(): backup.rename(duplicate) - return False