from pathlib import Path from typing import Optional, Callable from concurrent.futures import ThreadPoolExecutor, as_completed import psycopg2 from .chunker import compute_file_signature, hash_file from .store import HashStore from ..shared.models import FileRecord, ProcessingStats from ..shared.config import DatabaseConfig, ProcessingConfig from ..shared.logger import ProgressLogger class DeduplicationEngine: def __init__(self, db_config: DatabaseConfig, processing_config: ProcessingConfig, logger: ProgressLogger): self.db_config = db_config self.processing_config = processing_config self.logger = logger self.hash_store = HashStore(db_config) self._connection = None def _get_connection(self): if self._connection is None or self._connection.closed: self._connection = psycopg2.connect(host=self.db_config.host, port=self.db_config.port, database=self.db_config.database, user=self.db_config.user, password=self.db_config.password) return self._connection def deduplicate_all(self, disk: Optional[str]=None, use_chunks: bool=True, progress_callback: Optional[Callable[[int, int, ProcessingStats], None]]=None) -> ProcessingStats: self.logger.section('Starting Deduplication') conn = self._get_connection() cursor = conn.cursor() if disk: cursor.execute('\n SELECT path, size\n FROM files\n WHERE disk_label = %s AND checksum IS NULL\n ORDER BY size DESC\n ', (disk,)) else: cursor.execute('\n SELECT path, size\n FROM files\n WHERE checksum IS NULL\n ORDER BY size DESC\n ') files_to_process = cursor.fetchall() total_files = len(files_to_process) self.logger.info(f'Found {total_files} files to process') stats = ProcessingStats() with ThreadPoolExecutor(max_workers=self.processing_config.parallel_workers) as executor: futures = {} for path_str, size in files_to_process: path = Path(path_str) future = executor.submit(self._process_file, path, use_chunks) futures[future] = (path, size) for future in as_completed(futures): path, size = futures[future] try: checksum, duplicate_of = future.result() if checksum: cursor.execute('\n UPDATE files\n SET checksum = %s, duplicate_of = %s\n WHERE path = %s\n ', (checksum, duplicate_of, str(path))) stats.files_succeeded += 1 stats.bytes_processed += size stats.files_processed += 1 if stats.files_processed % self.processing_config.commit_interval == 0: conn.commit() if progress_callback: progress_callback(stats.files_processed, total_files, stats) self.logger.progress(stats.files_processed, total_files, prefix='Files processed', bytes_processed=stats.bytes_processed, elapsed_seconds=stats.elapsed_seconds) except Exception as e: self.logger.warning(f'Failed to process {path}: {e}') stats.files_failed += 1 stats.files_processed += 1 conn.commit() cursor.close() self.logger.info(f'Deduplication complete: {stats.files_succeeded}/{total_files} files, {stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s') return stats def _process_file(self, path: Path, use_chunks: bool) -> tuple[Optional[str], Optional[str]]: if not path.exists(): return (None, None) try: if use_chunks: checksum, chunk_hashes = compute_file_signature(path, use_rabin=True, avg_chunk_size=self.processing_config.chunk_size) else: checksum = hash_file(path, algorithm=self.processing_config.hash_algorithm) chunk_hashes = None if self.hash_store.exists(checksum): canonical_path = self.hash_store.get_canonical(checksum) return (checksum, canonical_path) else: size = path.stat().st_size self.hash_store.store_canonical(checksum, path, size, chunk_hashes) return (checksum, None) except Exception as e: self.logger.debug(f'Error processing {path}: {e}') raise def find_duplicates(self, disk: Optional[str]=None) -> dict[str, list[str]]: self.logger.subsection('Finding Duplicates') conn = self._get_connection() cursor = conn.cursor() if disk: cursor.execute('\n SELECT checksum, array_agg(path ORDER BY path) as paths\n FROM files\n WHERE disk_label = %s AND checksum IS NOT NULL\n GROUP BY checksum\n HAVING COUNT(*) > 1\n ', (disk,)) else: cursor.execute('\n SELECT checksum, array_agg(path ORDER BY path) as paths\n FROM files\n WHERE checksum IS NOT NULL\n GROUP BY checksum\n HAVING COUNT(*) > 1\n ') duplicates = {} for checksum, paths in cursor.fetchall(): canonical = paths[0] duplicates[canonical] = paths[1:] cursor.close() self.logger.info(f'Found {len(duplicates)} sets of duplicates') return duplicates def get_deduplication_stats(self) -> dict: conn = self._get_connection() cursor = conn.cursor() stats = {} cursor.execute('SELECT COUNT(*) FROM files WHERE checksum IS NOT NULL') stats['total_files'] = cursor.fetchone()[0] cursor.execute('SELECT COUNT(DISTINCT checksum) FROM files WHERE checksum IS NOT NULL') stats['unique_files'] = cursor.fetchone()[0] stats['duplicate_files'] = stats['total_files'] - stats['unique_files'] cursor.execute('SELECT COALESCE(SUM(size), 0) FROM files WHERE checksum IS NOT NULL') stats['total_size'] = cursor.fetchone()[0] cursor.execute('\n SELECT COALESCE(SUM(size), 0)\n FROM (\n SELECT DISTINCT ON (checksum) size\n FROM files\n WHERE checksum IS NOT NULL\n ) AS unique_files\n ') stats['unique_size'] = cursor.fetchone()[0] stats['wasted_space'] = stats['total_size'] - stats['unique_size'] if stats['total_size'] > 0: stats['dedup_ratio'] = stats['unique_size'] / stats['total_size'] else: stats['dedup_ratio'] = 1.0 if stats['total_size'] > 0: stats['space_saved_percent'] = stats['wasted_space'] / stats['total_size'] * 100 else: stats['space_saved_percent'] = 0.0 cursor.close() return stats def mark_canonical_files(self) -> int: self.logger.subsection('Marking Canonical Files') conn = self._get_connection() cursor = conn.cursor() cursor.execute('\n WITH canonical AS (\n SELECT DISTINCT ON (checksum) path, checksum\n FROM files\n WHERE checksum IS NOT NULL\n ORDER BY checksum, path\n )\n UPDATE files\n SET duplicate_of = NULL\n WHERE path IN (SELECT path FROM canonical)\n ') count = cursor.rowcount conn.commit() cursor.close() self.logger.info(f'Marked {count} canonical files') return count def close(self): self.hash_store.close() if self._connection and (not self._connection.closed): self._connection.close() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()