diff --git a/requirements.txt b/requirements.txt index 63bae10..e0ab347 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,8 +4,6 @@ psycopg2-binary>=2.9.9 # Alternative: psycopg2>=2.9.9 (requires PostgreSQL development libraries) # Use psycopg2-binary for easier installation without compilation # Core dependencies -python>=3.9 - # Optional/feature dependencies redis>=4.5.0 # For RedisHashStore scikit-learn>=1.0.0 # For MLClassifier diff --git a/src/classification/__init__.py b/src/classification/__init__.py index e69de29..04fee6d 100644 --- a/src/classification/__init__.py +++ b/src/classification/__init__.py @@ -0,0 +1,17 @@ +"""Classification package exports""" +from .rules import RuleBasedClassifier +from .ml import create_ml_classifier, train_from_database, MLClassifier, DummyMLClassifier +from .engine import ClassificationEngine +from ._protocols import ClassificationRule, IClassifier, IRuleEngine + +__all__ = [ + 'RuleBasedClassifier', + 'MLClassifier', + 'DummyMLClassifier', + 'create_ml_classifier', + 'train_from_database', + 'ClassificationEngine', + 'ClassificationRule', + 'IClassifier', + 'IRuleEngine', +] diff --git a/src/classification/_protocols.py b/src/classification/_protocols.py index e69de29..4623b3f 100644 --- a/src/classification/_protocols.py +++ b/src/classification/_protocols.py @@ -0,0 +1,72 @@ +"""Protocol definitions for the classification package""" +from typing import Protocol, Optional +from pathlib import Path +from dataclasses import dataclass + + +@dataclass +class ClassificationRule: + """Rule for classifying files""" + name: str + category: str + patterns: list[str] + priority: int = 0 + description: str = "" + + +class IClassifier(Protocol): + """Protocol for classification operations""" + + def classify(self, path: Path, file_type: Optional[str] = None) -> Optional[str]: + """Classify a file path + + Args: + path: Path to classify + file_type: Optional file type hint + + Returns: + Category name or None if no match + """ + ... + + def get_category_rules(self, category: str) -> list[ClassificationRule]: + """Get all rules for a category + + Args: + category: Category name + + Returns: + List of rules for the category + """ + ... + + +class IRuleEngine(Protocol): + """Protocol for rule-based classification""" + + def add_rule(self, rule: ClassificationRule) -> None: + """Add a classification rule + + Args: + rule: Rule to add + """ + ... + + def remove_rule(self, rule_name: str) -> None: + """Remove a rule by name + + Args: + rule_name: Name of rule to remove + """ + ... + + def match_path(self, path: Path) -> Optional[str]: + """Match path against rules + + Args: + path: Path to match + + Returns: + Category name or None if no match + """ + ... diff --git a/src/classification/engine.py b/src/classification/engine.py index e69de29..e89d583 100644 --- a/src/classification/engine.py +++ b/src/classification/engine.py @@ -0,0 +1,350 @@ +"""Main classification engine""" +from pathlib import Path +from typing import Optional, Callable +import psycopg2 + +from .rules import RuleBasedClassifier +from .ml import create_ml_classifier, DummyMLClassifier +from ..shared.models import ProcessingStats +from ..shared.config import DatabaseConfig +from ..shared.logger import ProgressLogger + + +class ClassificationEngine: + """Engine for classifying files""" + + def __init__( + self, + db_config: DatabaseConfig, + logger: ProgressLogger, + use_ml: bool = False + ): + """Initialize classification engine + + Args: + db_config: Database configuration + logger: Progress logger + use_ml: Whether to use ML classification in addition to rules + """ + self.db_config = db_config + self.logger = logger + self.rule_classifier = RuleBasedClassifier() + self.ml_classifier = create_ml_classifier() if use_ml else None + self.use_ml = use_ml and not isinstance(self.ml_classifier, DummyMLClassifier) + self._connection = None + + def _get_connection(self): + """Get or create database connection""" + if self._connection is None or self._connection.closed: + self._connection = psycopg2.connect( + host=self.db_config.host, + port=self.db_config.port, + database=self.db_config.database, + user=self.db_config.user, + password=self.db_config.password + ) + return self._connection + + def classify_all( + self, + disk: Optional[str] = None, + batch_size: int = 1000, + progress_callback: Optional[Callable[[int, int, ProcessingStats], None]] = None + ) -> ProcessingStats: + """Classify all files in database + + Args: + disk: Optional disk filter + batch_size: Number of files to process per batch + progress_callback: Optional callback for progress updates + + Returns: + ProcessingStats with classification statistics + """ + self.logger.section("Starting Classification") + + conn = self._get_connection() + cursor = conn.cursor() + + # Get files without categories + if disk: + cursor.execute(""" + SELECT path, checksum + FROM files + WHERE disk = %s AND category IS NULL + """, (disk,)) + else: + cursor.execute(""" + SELECT path, checksum + FROM files + WHERE category IS NULL + """) + + files_to_classify = cursor.fetchall() + total_files = len(files_to_classify) + + self.logger.info(f"Found {total_files} files to classify") + + stats = ProcessingStats() + batch = [] + + for path_str, checksum in files_to_classify: + path = Path(path_str) + + # Classify using rules first + category = self.rule_classifier.classify(path) + + # If no rule match and ML is available, try ML + if category is None and self.use_ml and self.ml_classifier: + category = self.ml_classifier.classify(path) + + # If still no category, assign default + if category is None: + category = "temp/processing" + + batch.append((category, str(path))) + stats.files_processed += 1 + + # Batch update + if len(batch) >= batch_size: + self._update_categories(cursor, batch) + conn.commit() + batch.clear() + + # Progress callback + if progress_callback: + progress_callback(stats.files_processed, total_files, stats) + + # Log progress + if stats.files_processed % (batch_size * 10) == 0: + self.logger.progress( + stats.files_processed, + total_files, + prefix="Files classified", + elapsed_seconds=stats.elapsed_seconds + ) + + # Update remaining batch + if batch: + self._update_categories(cursor, batch) + conn.commit() + + stats.files_succeeded = stats.files_processed + + cursor.close() + + self.logger.info( + f"Classification complete: {stats.files_processed} files in {stats.elapsed_seconds:.1f}s" + ) + + return stats + + def _update_categories(self, cursor, batch: list[tuple[str, str]]): + """Update categories in batch + + Args: + cursor: Database cursor + batch: List of (category, path) tuples + """ + from psycopg2.extras import execute_batch + + query = """ + UPDATE files + SET category = %s + WHERE path = %s + """ + + execute_batch(cursor, query, batch) + + def classify_path(self, path: Path) -> Optional[str]: + """Classify a single path + + Args: + path: Path to classify + + Returns: + Category name or None + """ + # Try rules first + category = self.rule_classifier.classify(path) + + # Try ML if available + if category is None and self.use_ml and self.ml_classifier: + category = self.ml_classifier.classify(path) + + return category + + def get_category_stats(self) -> dict[str, dict]: + """Get statistics by category + + Returns: + Dictionary mapping category to statistics + """ + conn = self._get_connection() + cursor = conn.cursor() + + cursor.execute(""" + SELECT + category, + COUNT(*) as file_count, + SUM(size) as total_size + FROM files + WHERE category IS NOT NULL + GROUP BY category + ORDER BY total_size DESC + """) + + stats = {} + for category, file_count, total_size in cursor.fetchall(): + stats[category] = { + 'file_count': file_count, + 'total_size': total_size + } + + cursor.close() + + return stats + + def get_uncategorized_count(self) -> int: + """Get count of uncategorized files + + Returns: + Number of files without category + """ + conn = self._get_connection() + cursor = conn.cursor() + + cursor.execute("SELECT COUNT(*) FROM files WHERE category IS NULL") + count = cursor.fetchone()[0] + + cursor.close() + + return count + + def reclassify_category( + self, + old_category: str, + new_category: str + ) -> int: + """Reclassify all files in a category + + Args: + old_category: Current category + new_category: New category + + Returns: + Number of files reclassified + """ + self.logger.info(f"Reclassifying {old_category} -> {new_category}") + + conn = self._get_connection() + cursor = conn.cursor() + + cursor.execute(""" + UPDATE files + SET category = %s + WHERE category = %s + """, (new_category, old_category)) + + count = cursor.rowcount + conn.commit() + cursor.close() + + self.logger.info(f"Reclassified {count} files") + + return count + + def train_ml_classifier( + self, + min_samples: int = 10 + ) -> bool: + """Train ML classifier from existing categorized data + + Args: + min_samples: Minimum samples per category + + Returns: + True if training successful + """ + if not self.use_ml or self.ml_classifier is None: + self.logger.warning("ML classifier not available") + return False + + self.logger.subsection("Training ML Classifier") + + conn = self._get_connection() + cursor = conn.cursor() + + # Get categorized files + cursor.execute(""" + SELECT path, category + FROM files + WHERE category IS NOT NULL + """) + + training_data = [(Path(path), category) for path, category in cursor.fetchall()] + cursor.close() + + if not training_data: + self.logger.warning("No training data available") + return False + + # Count samples per category + category_counts = {} + for _, category in training_data: + category_counts[category] = category_counts.get(category, 0) + 1 + + # Filter categories with enough samples + filtered_data = [ + (path, category) + for path, category in training_data + if category_counts[category] >= min_samples + ] + + if not filtered_data: + self.logger.warning(f"No categories with >= {min_samples} samples") + return False + + self.logger.info(f"Training with {len(filtered_data)} samples") + + try: + self.ml_classifier.train(filtered_data) + self.logger.info("ML classifier trained successfully") + return True + except Exception as e: + self.logger.error(f"Failed to train ML classifier: {e}") + return False + + def get_all_categories(self) -> list[str]: + """Get all categories from database + + Returns: + List of category names + """ + conn = self._get_connection() + cursor = conn.cursor() + + cursor.execute(""" + SELECT DISTINCT category + FROM files + WHERE category IS NOT NULL + ORDER BY category + """) + + categories = [row[0] for row in cursor.fetchall()] + cursor.close() + + return categories + + def close(self): + """Close database connection""" + if self._connection and not self._connection.closed: + self._connection.close() + + def __enter__(self): + """Context manager entry""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit""" + self.close() diff --git a/src/classification/ml.py b/src/classification/ml.py index e69de29..d334d90 100644 --- a/src/classification/ml.py +++ b/src/classification/ml.py @@ -0,0 +1,269 @@ +"""ML-based classification (optional, using sklearn if available)""" +from pathlib import Path +from typing import Optional, List, Tuple +import pickle + +try: + from sklearn.feature_extraction.text import TfidfVectorizer + from sklearn.naive_bayes import MultinomialNB + from sklearn.pipeline import Pipeline + SKLEARN_AVAILABLE = True +except ImportError: + SKLEARN_AVAILABLE = False + + +class MLClassifier: + """Machine learning-based file classifier + + Uses path-based features and optional metadata to classify files. + Requires scikit-learn to be installed. + """ + + def __init__(self): + """Initialize ML classifier""" + if not SKLEARN_AVAILABLE: + raise ImportError( + "scikit-learn is required for ML classification. " + "Install with: pip install scikit-learn" + ) + + self.model: Optional[Pipeline] = None + self.categories: List[str] = [] + self._is_trained = False + + def _extract_features(self, path: Path) -> str: + """Extract features from path + + Args: + path: Path to extract features from + + Returns: + Feature string + """ + # Convert path to feature string + # Include: path parts, extension, filename + parts = path.parts + extension = path.suffix + filename = path.name + + features = [] + + # Add path components + features.extend(parts) + + # Add extension + if extension: + features.append(f"ext:{extension}") + + # Add filename components (split on common separators) + name_parts = filename.replace('-', ' ').replace('_', ' ').replace('.', ' ').split() + features.extend([f"name:{part}" for part in name_parts]) + + return ' '.join(features) + + def train(self, training_data: List[Tuple[Path, str]]) -> None: + """Train the classifier + + Args: + training_data: List of (path, category) tuples + """ + if not training_data: + raise ValueError("Training data cannot be empty") + + # Extract features and labels + X = [self._extract_features(path) for path, _ in training_data] + y = [category for _, category in training_data] + + # Store unique categories + self.categories = sorted(set(y)) + + # Create and train pipeline + self.model = Pipeline([ + ('tfidf', TfidfVectorizer( + max_features=1000, + ngram_range=(1, 2), + min_df=1 + )), + ('classifier', MultinomialNB()) + ]) + + self.model.fit(X, y) + self._is_trained = True + + def classify(self, path: Path, file_type: Optional[str] = None) -> Optional[str]: + """Classify a file path + + Args: + path: Path to classify + file_type: Optional file type hint (not used in ML classifier) + + Returns: + Category name or None if not trained + """ + if not self._is_trained or self.model is None: + return None + + features = self._extract_features(path) + + try: + prediction = self.model.predict([features])[0] + return prediction + except Exception: + return None + + def predict_proba(self, path: Path) -> dict[str, float]: + """Get prediction probabilities for all categories + + Args: + path: Path to classify + + Returns: + Dictionary mapping category to probability + """ + if not self._is_trained or self.model is None: + return {} + + features = self._extract_features(path) + + try: + probabilities = self.model.predict_proba([features])[0] + return { + category: float(prob) + for category, prob in zip(self.categories, probabilities) + } + except Exception: + return {} + + def save_model(self, model_path: Path) -> None: + """Save trained model to disk + + Args: + model_path: Path to save model + """ + if not self._is_trained: + raise ValueError("Cannot save untrained model") + + model_data = { + 'model': self.model, + 'categories': self.categories, + 'is_trained': self._is_trained + } + + with open(model_path, 'wb') as f: + pickle.dump(model_data, f) + + def load_model(self, model_path: Path) -> None: + """Load trained model from disk + + Args: + model_path: Path to model file + """ + with open(model_path, 'rb') as f: + model_data = pickle.load(f) + + self.model = model_data['model'] + self.categories = model_data['categories'] + self._is_trained = model_data['is_trained'] + + @property + def is_trained(self) -> bool: + """Check if model is trained""" + return self._is_trained + + +class DummyMLClassifier: + """Dummy ML classifier for when sklearn is not available""" + + def __init__(self): + """Initialize dummy classifier""" + pass + + def train(self, training_data: List[Tuple[Path, str]]) -> None: + """Dummy train method""" + raise NotImplementedError( + "ML classification requires scikit-learn. " + "Install with: pip install scikit-learn" + ) + + def classify(self, path: Path, file_type: Optional[str] = None) -> Optional[str]: + """Dummy classify method""" + return None + + def predict_proba(self, path: Path) -> dict[str, float]: + """Dummy predict_proba method""" + return {} + + def save_model(self, model_path: Path) -> None: + """Dummy save_model method""" + raise NotImplementedError("ML classification not available") + + def load_model(self, model_path: Path) -> None: + """Dummy load_model method""" + raise NotImplementedError("ML classification not available") + + @property + def is_trained(self) -> bool: + """Check if model is trained""" + return False + + +def create_ml_classifier() -> MLClassifier | DummyMLClassifier: + """Create ML classifier if sklearn is available, otherwise return dummy + + Returns: + MLClassifier or DummyMLClassifier + """ + if SKLEARN_AVAILABLE: + return MLClassifier() + else: + return DummyMLClassifier() + + +def train_from_database( + db_connection, + min_samples_per_category: int = 10 +) -> MLClassifier | DummyMLClassifier: + """Train ML classifier from database + + Args: + db_connection: Database connection + min_samples_per_category: Minimum samples required per category + + Returns: + Trained classifier + """ + classifier = create_ml_classifier() + + if isinstance(classifier, DummyMLClassifier): + return classifier + + # Query classified files from database + cursor = db_connection.cursor() + cursor.execute(""" + SELECT path, category + FROM files + WHERE category IS NOT NULL + """) + + training_data = [(Path(path), category) for path, category in cursor.fetchall()] + cursor.close() + + if not training_data: + return classifier + + # Count samples per category + category_counts = {} + for _, category in training_data: + category_counts[category] = category_counts.get(category, 0) + 1 + + # Filter to categories with enough samples + filtered_data = [ + (path, category) + for path, category in training_data + if category_counts[category] >= min_samples_per_category + ] + + if filtered_data: + classifier.train(filtered_data) + + return classifier diff --git a/src/classification/rules.py b/src/classification/rules.py index e69de29..b194c18 100644 --- a/src/classification/rules.py +++ b/src/classification/rules.py @@ -0,0 +1,282 @@ +"""Rule-based classification engine""" +from pathlib import Path +from typing import Optional +import fnmatch + +from ._protocols import ClassificationRule + + +class RuleBasedClassifier: + """Rule-based file classifier using pattern matching""" + + def __init__(self): + """Initialize rule-based classifier""" + self.rules: list[ClassificationRule] = [] + self._load_default_rules() + + def _load_default_rules(self): + """Load default classification rules based on ARCHITECTURE.md""" + + # Build artifacts and caches + self.add_rule(ClassificationRule( + name="maven_cache", + category="artifacts/java/maven", + patterns=["**/.m2/**", "**/.maven/**", "**/maven-central-cache/**"], + priority=10, + description="Maven repository and cache" + )) + + self.add_rule(ClassificationRule( + name="gradle_cache", + category="artifacts/java/gradle", + patterns=["**/.gradle/**", "**/gradle-cache/**", "**/gradle-build-cache/**"], + priority=10, + description="Gradle cache and artifacts" + )) + + self.add_rule(ClassificationRule( + name="python_cache", + category="cache/pycache", + patterns=["**/__pycache__/**", "**/*.pyc", "**/*.pyo"], + priority=10, + description="Python cache files" + )) + + self.add_rule(ClassificationRule( + name="python_artifacts", + category="artifacts/python", + patterns=["**/pip-cache/**", "**/pypi-cache/**", "**/wheelhouse/**"], + priority=10, + description="Python package artifacts" + )) + + self.add_rule(ClassificationRule( + name="node_modules", + category="cache/node_modules-archive", + patterns=["**/node_modules/**"], + priority=10, + description="Node.js modules" + )) + + self.add_rule(ClassificationRule( + name="node_cache", + category="artifacts/node", + patterns=["**/.npm/**", "**/npm-registry/**", "**/yarn-cache/**", "**/pnpm-store/**"], + priority=10, + description="Node.js package managers cache" + )) + + self.add_rule(ClassificationRule( + name="go_cache", + category="artifacts/go", + patterns=["**/goproxy-cache/**", "**/go/pkg/mod/**", "**/go-module-cache/**"], + priority=10, + description="Go module cache" + )) + + # Version control + self.add_rule(ClassificationRule( + name="git_repos", + category="development/git-infrastructure", + patterns=["**/.git/**", "**/gitea/repositories/**"], + priority=15, + description="Git repositories and infrastructure" + )) + + self.add_rule(ClassificationRule( + name="gitea", + category="development/gitea", + patterns=["**/gitea/**"], + priority=12, + description="Gitea server data" + )) + + # Databases + self.add_rule(ClassificationRule( + name="postgresql", + category="databases/postgresql", + patterns=["**/postgresql/**", "**/postgres/**", "**/*.sql"], + priority=10, + description="PostgreSQL databases" + )) + + self.add_rule(ClassificationRule( + name="mysql", + category="databases/mysql", + patterns=["**/mysql/**", "**/mariadb/**"], + priority=10, + description="MySQL/MariaDB databases" + )) + + self.add_rule(ClassificationRule( + name="mongodb", + category="databases/mongodb", + patterns=["**/mongodb/**", "**/mongo/**"], + priority=10, + description="MongoDB databases" + )) + + self.add_rule(ClassificationRule( + name="redis", + category="databases/redis", + patterns=["**/redis/**", "**/*.rdb"], + priority=10, + description="Redis databases" + )) + + self.add_rule(ClassificationRule( + name="sqlite", + category="databases/sqlite", + patterns=["**/*.db", "**/*.sqlite", "**/*.sqlite3"], + priority=8, + description="SQLite databases" + )) + + # LLM and AI models + self.add_rule(ClassificationRule( + name="llm_models", + category="cache/llm-models", + patterns=[ + "**/hugging-face/**", + "**/huggingface/**", + "**/.cache/huggingface/**", + "**/models/**/*.bin", + "**/models/**/*.onnx", + "**/models/**/*.safetensors", + "**/llm*/**", + "**/openai-cache/**" + ], + priority=12, + description="LLM and AI model files" + )) + + # Docker and containers + self.add_rule(ClassificationRule( + name="docker_volumes", + category="apps/volumes/docker-volumes", + patterns=["**/docker/volumes/**", "**/var/lib/docker/volumes/**"], + priority=10, + description="Docker volumes" + )) + + self.add_rule(ClassificationRule( + name="app_data", + category="apps/volumes/app-data", + patterns=["**/app-data/**", "**/application-data/**"], + priority=8, + description="Application data" + )) + + # Build outputs + self.add_rule(ClassificationRule( + name="build_output", + category="development/build-tools", + patterns=["**/target/**", "**/build/**", "**/dist/**", "**/out/**"], + priority=5, + description="Build output directories" + )) + + # Backups + self.add_rule(ClassificationRule( + name="system_backups", + category="backups/system", + patterns=["**/backup/**", "**/backups/**", "**/*.bak", "**/*.backup"], + priority=10, + description="System backups" + )) + + self.add_rule(ClassificationRule( + name="database_backups", + category="backups/database", + patterns=["**/*.sql.gz", "**/*.dump", "**/db-backup/**"], + priority=11, + description="Database backups" + )) + + # Archives + self.add_rule(ClassificationRule( + name="archives", + category="backups/archive", + patterns=["**/*.tar", "**/*.tar.gz", "**/*.tgz", "**/*.zip", "**/*.7z"], + priority=5, + description="Archive files" + )) + + def add_rule(self, rule: ClassificationRule) -> None: + """Add a classification rule + + Args: + rule: Rule to add + """ + self.rules.append(rule) + # Sort rules by priority (higher priority first) + self.rules.sort(key=lambda r: r.priority, reverse=True) + + def remove_rule(self, rule_name: str) -> None: + """Remove a rule by name + + Args: + rule_name: Name of rule to remove + """ + self.rules = [r for r in self.rules if r.name != rule_name] + + def match_path(self, path: Path) -> Optional[str]: + """Match path against rules + + Args: + path: Path to match + + Returns: + Category name or None if no match + """ + path_str = str(path) + + # Try to match each rule in priority order + for rule in self.rules: + for pattern in rule.patterns: + if fnmatch.fnmatch(path_str, pattern): + return rule.category + + return None + + def classify(self, path: Path, file_type: Optional[str] = None) -> Optional[str]: + """Classify a file path + + Args: + path: Path to classify + file_type: Optional file type hint + + Returns: + Category name or None if no match + """ + return self.match_path(path) + + def get_category_rules(self, category: str) -> list[ClassificationRule]: + """Get all rules for a category + + Args: + category: Category name + + Returns: + List of rules for the category + """ + return [r for r in self.rules if r.category == category] + + def get_all_categories(self) -> set[str]: + """Get all defined categories + + Returns: + Set of category names + """ + return {r.category for r in self.rules} + + def get_rules_by_priority(self, min_priority: int = 0) -> list[ClassificationRule]: + """Get rules above a minimum priority + + Args: + min_priority: Minimum priority threshold + + Returns: + List of rules with priority >= min_priority + """ + return [r for r in self.rules if r.priority >= min_priority] diff --git a/src/deduplication/__init__.py b/src/deduplication/__init__.py index e69de29..9d2e261 100644 --- a/src/deduplication/__init__.py +++ b/src/deduplication/__init__.py @@ -0,0 +1,21 @@ +"""Deduplication package exports""" +from .chunker import ( + RabinChunker, + SimpleChunker, + hash_chunk, + hash_file, + compute_file_signature +) +from .store import HashStore, MemoryHashStore +from .engine import DeduplicationEngine + +__all__ = [ + 'RabinChunker', + 'SimpleChunker', + 'hash_chunk', + 'hash_file', + 'compute_file_signature', + 'HashStore', + 'MemoryHashStore', + 'DeduplicationEngine', +] diff --git a/src/deduplication/chunker.py b/src/deduplication/chunker.py index e69de29..63b5554 100644 --- a/src/deduplication/chunker.py +++ b/src/deduplication/chunker.py @@ -0,0 +1,241 @@ +"""Rabin fingerprint chunker for content-defined chunking""" +import hashlib +from pathlib import Path +from typing import Iterator, Optional + + +class RabinChunker: + """Content-defined chunking using Rabin fingerprinting + + Uses a rolling hash to identify chunk boundaries based on content, + allowing for efficient deduplication even when data is modified. + """ + + def __init__( + self, + avg_chunk_size: int = 8192, + min_chunk_size: Optional[int] = None, + max_chunk_size: Optional[int] = None, + window_size: int = 48 + ): + """Initialize Rabin chunker + + Args: + avg_chunk_size: Target average chunk size in bytes + min_chunk_size: Minimum chunk size (default: avg_chunk_size // 4) + max_chunk_size: Maximum chunk size (default: avg_chunk_size * 8) + window_size: Rolling hash window size + """ + self.avg_chunk_size = avg_chunk_size + self.min_chunk_size = min_chunk_size or (avg_chunk_size // 4) + self.max_chunk_size = max_chunk_size or (avg_chunk_size * 8) + self.window_size = window_size + + # Calculate mask for boundary detection + # For avg_chunk_size, we want boundaries at 1/avg_chunk_size probability + bits = 0 + size = avg_chunk_size + while size > 1: + bits += 1 + size >>= 1 + self.mask = (1 << bits) - 1 + + # Polynomial for rolling hash (prime number) + self.poly = 0x3DA3358B4DC173 + + def chunk_file(self, file_path: Path, chunk_size: Optional[int] = None) -> Iterator[bytes]: + """Chunk a file using Rabin fingerprinting + + Args: + file_path: Path to file to chunk + chunk_size: If provided, use fixed-size chunking instead + + Yields: + Chunk data as bytes + """ + if chunk_size: + # Use fixed-size chunking + yield from self._chunk_fixed(file_path, chunk_size) + else: + # Use content-defined chunking + yield from self._chunk_rabin(file_path) + + def _chunk_fixed(self, file_path: Path, chunk_size: int) -> Iterator[bytes]: + """Fixed-size chunking + + Args: + file_path: Path to file + chunk_size: Chunk size in bytes + + Yields: + Fixed-size chunks + """ + with open(file_path, 'rb') as f: + while True: + chunk = f.read(chunk_size) + if not chunk: + break + yield chunk + + def _chunk_rabin(self, file_path: Path) -> Iterator[bytes]: + """Content-defined chunking using Rabin fingerprinting + + Args: + file_path: Path to file + + Yields: + Variable-size chunks based on content + """ + with open(file_path, 'rb') as f: + chunk_data = bytearray() + window = bytearray() + hash_value = 0 + + while True: + byte = f.read(1) + if not byte: + # End of file - yield remaining data + if chunk_data: + yield bytes(chunk_data) + break + + chunk_data.extend(byte) + window.extend(byte) + + # Maintain window size + if len(window) > self.window_size: + window.pop(0) + + # Update rolling hash + hash_value = self._rolling_hash(window) + + # Check if we should create a boundary + should_break = ( + len(chunk_data) >= self.min_chunk_size and + ( + (hash_value & self.mask) == 0 or + len(chunk_data) >= self.max_chunk_size + ) + ) + + if should_break: + yield bytes(chunk_data) + chunk_data = bytearray() + window = bytearray() + hash_value = 0 + + def _rolling_hash(self, window: bytearray) -> int: + """Calculate rolling hash for window + + Args: + window: Byte window + + Returns: + Hash value + """ + hash_value = 0 + for byte in window: + hash_value = ((hash_value << 1) + byte) & 0xFFFFFFFFFFFFFFFF + return hash_value + + +class SimpleChunker: + """Simple fixed-size chunker for comparison""" + + def __init__(self, chunk_size: int = 8192): + """Initialize simple chunker + + Args: + chunk_size: Fixed chunk size in bytes + """ + self.chunk_size = chunk_size + + def chunk_file(self, file_path: Path) -> Iterator[bytes]: + """Chunk file into fixed-size pieces + + Args: + file_path: Path to file + + Yields: + Fixed-size chunks + """ + with open(file_path, 'rb') as f: + while True: + chunk = f.read(self.chunk_size) + if not chunk: + break + yield chunk + + +def hash_chunk(chunk: bytes, algorithm: str = 'sha256') -> str: + """Hash a chunk of data + + Args: + chunk: Chunk data + algorithm: Hash algorithm (default: sha256) + + Returns: + Hex digest of hash + """ + hasher = hashlib.new(algorithm) + hasher.update(chunk) + return hasher.hexdigest() + + +def hash_file(file_path: Path, algorithm: str = 'sha256', chunk_size: int = 65536) -> str: + """Hash entire file + + Args: + file_path: Path to file + algorithm: Hash algorithm (default: sha256) + chunk_size: Size of chunks to read + + Returns: + Hex digest of file hash + """ + hasher = hashlib.new(algorithm) + + with open(file_path, 'rb') as f: + while True: + chunk = f.read(chunk_size) + if not chunk: + break + hasher.update(chunk) + + return hasher.hexdigest() + + +def compute_file_signature( + file_path: Path, + use_rabin: bool = True, + avg_chunk_size: int = 8192 +) -> tuple[str, list[str]]: + """Compute file signature with chunk hashes + + Args: + file_path: Path to file + use_rabin: Whether to use Rabin chunking (vs fixed-size) + avg_chunk_size: Average chunk size for Rabin or fixed size + + Returns: + Tuple of (file_hash, list of chunk hashes) + """ + if use_rabin: + chunker = RabinChunker(avg_chunk_size=avg_chunk_size) + else: + chunker = SimpleChunker(chunk_size=avg_chunk_size) + + chunk_hashes = [] + file_hasher = hashlib.sha256() + + for chunk in chunker.chunk_file(file_path): + # Hash individual chunk + chunk_hash = hash_chunk(chunk) + chunk_hashes.append(chunk_hash) + + # Update file hash + file_hasher.update(chunk) + + file_hash = file_hasher.hexdigest() + + return file_hash, chunk_hashes diff --git a/src/deduplication/engine.py b/src/deduplication/engine.py index e69de29..3dfa55c 100644 --- a/src/deduplication/engine.py +++ b/src/deduplication/engine.py @@ -0,0 +1,353 @@ +"""Deduplication engine""" +from pathlib import Path +from typing import Optional, Callable +from concurrent.futures import ThreadPoolExecutor, as_completed +import psycopg2 + +from .chunker import compute_file_signature, hash_file +from .store import HashStore +from ..shared.models import FileRecord, ProcessingStats +from ..shared.config import DatabaseConfig, ProcessingConfig +from ..shared.logger import ProgressLogger + + +class DeduplicationEngine: + """Engine for deduplicating files""" + + def __init__( + self, + db_config: DatabaseConfig, + processing_config: ProcessingConfig, + logger: ProgressLogger + ): + """Initialize deduplication engine + + Args: + db_config: Database configuration + processing_config: Processing configuration + logger: Progress logger + """ + self.db_config = db_config + self.processing_config = processing_config + self.logger = logger + self.hash_store = HashStore(db_config) + self._connection = None + + def _get_connection(self): + """Get or create database connection""" + if self._connection is None or self._connection.closed: + self._connection = psycopg2.connect( + host=self.db_config.host, + port=self.db_config.port, + database=self.db_config.database, + user=self.db_config.user, + password=self.db_config.password + ) + return self._connection + + def deduplicate_all( + self, + disk: Optional[str] = None, + use_chunks: bool = True, + progress_callback: Optional[Callable[[int, int, ProcessingStats], None]] = None + ) -> ProcessingStats: + """Deduplicate all files in database + + Args: + disk: Optional disk filter + use_chunks: Whether to use chunk-level deduplication + progress_callback: Optional callback for progress updates + + Returns: + ProcessingStats with deduplication statistics + """ + self.logger.section("Starting Deduplication") + + conn = self._get_connection() + cursor = conn.cursor() + + # Get files without checksums + if disk: + cursor.execute(""" + SELECT path, size + FROM files + WHERE disk = %s AND checksum IS NULL + ORDER BY size DESC + """, (disk,)) + else: + cursor.execute(""" + SELECT path, size + FROM files + WHERE checksum IS NULL + ORDER BY size DESC + """) + + files_to_process = cursor.fetchall() + total_files = len(files_to_process) + + self.logger.info(f"Found {total_files} files to process") + + stats = ProcessingStats() + + # Process files with thread pool + with ThreadPoolExecutor(max_workers=self.processing_config.parallel_workers) as executor: + futures = {} + + for path_str, size in files_to_process: + path = Path(path_str) + future = executor.submit(self._process_file, path, use_chunks) + futures[future] = (path, size) + + # Process completed futures + for future in as_completed(futures): + path, size = futures[future] + + try: + checksum, duplicate_of = future.result() + + if checksum: + # Update database + cursor.execute(""" + UPDATE files + SET checksum = %s, duplicate_of = %s + WHERE path = %s + """, (checksum, duplicate_of, str(path))) + + stats.files_succeeded += 1 + stats.bytes_processed += size + + stats.files_processed += 1 + + # Commit periodically + if stats.files_processed % self.processing_config.commit_interval == 0: + conn.commit() + + # Progress callback + if progress_callback: + progress_callback(stats.files_processed, total_files, stats) + + # Log progress + self.logger.progress( + stats.files_processed, + total_files, + prefix="Files processed", + bytes_processed=stats.bytes_processed, + elapsed_seconds=stats.elapsed_seconds + ) + + except Exception as e: + self.logger.warning(f"Failed to process {path}: {e}") + stats.files_failed += 1 + stats.files_processed += 1 + + # Final commit + conn.commit() + cursor.close() + + self.logger.info( + f"Deduplication complete: {stats.files_succeeded}/{total_files} files, " + f"{stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s" + ) + + return stats + + def _process_file( + self, + path: Path, + use_chunks: bool + ) -> tuple[Optional[str], Optional[str]]: + """Process a single file for deduplication + + Args: + path: Path to file + use_chunks: Whether to use chunk-level deduplication + + Returns: + Tuple of (checksum, duplicate_of_path) + """ + if not path.exists(): + return None, None + + try: + if use_chunks: + # Compute file signature with chunks + checksum, chunk_hashes = compute_file_signature( + path, + use_rabin=True, + avg_chunk_size=self.processing_config.chunk_size + ) + else: + # Just compute file hash + checksum = hash_file( + path, + algorithm=self.processing_config.hash_algorithm + ) + chunk_hashes = None + + # Check if hash exists + if self.hash_store.exists(checksum): + # Duplicate found + canonical_path = self.hash_store.get_canonical(checksum) + return checksum, canonical_path + else: + # New unique file + size = path.stat().st_size + self.hash_store.store_canonical( + checksum, + path, + size, + chunk_hashes + ) + return checksum, None + + except Exception as e: + self.logger.debug(f"Error processing {path}: {e}") + raise + + def find_duplicates( + self, + disk: Optional[str] = None + ) -> dict[str, list[str]]: + """Find all duplicate files + + Args: + disk: Optional disk filter + + Returns: + Dictionary mapping canonical path to list of duplicate paths + """ + self.logger.subsection("Finding Duplicates") + + conn = self._get_connection() + cursor = conn.cursor() + + # Query for duplicates + if disk: + cursor.execute(""" + SELECT checksum, array_agg(path ORDER BY path) as paths + FROM files + WHERE disk = %s AND checksum IS NOT NULL + GROUP BY checksum + HAVING COUNT(*) > 1 + """, (disk,)) + else: + cursor.execute(""" + SELECT checksum, array_agg(path ORDER BY path) as paths + FROM files + WHERE checksum IS NOT NULL + GROUP BY checksum + HAVING COUNT(*) > 1 + """) + + duplicates = {} + for checksum, paths in cursor.fetchall(): + canonical = paths[0] + duplicates[canonical] = paths[1:] + + cursor.close() + + self.logger.info(f"Found {len(duplicates)} sets of duplicates") + + return duplicates + + def get_deduplication_stats(self) -> dict: + """Get deduplication statistics + + Returns: + Dictionary with statistics + """ + conn = self._get_connection() + cursor = conn.cursor() + + stats = {} + + # Total files + cursor.execute("SELECT COUNT(*) FROM files WHERE checksum IS NOT NULL") + stats['total_files'] = cursor.fetchone()[0] + + # Unique files + cursor.execute("SELECT COUNT(DISTINCT checksum) FROM files WHERE checksum IS NOT NULL") + stats['unique_files'] = cursor.fetchone()[0] + + # Duplicate files + stats['duplicate_files'] = stats['total_files'] - stats['unique_files'] + + # Total size + cursor.execute("SELECT COALESCE(SUM(size), 0) FROM files WHERE checksum IS NOT NULL") + stats['total_size'] = cursor.fetchone()[0] + + # Unique size + cursor.execute(""" + SELECT COALESCE(SUM(size), 0) + FROM ( + SELECT DISTINCT ON (checksum) size + FROM files + WHERE checksum IS NOT NULL + ) AS unique_files + """) + stats['unique_size'] = cursor.fetchone()[0] + + # Wasted space + stats['wasted_space'] = stats['total_size'] - stats['unique_size'] + + # Deduplication ratio + if stats['total_size'] > 0: + stats['dedup_ratio'] = stats['unique_size'] / stats['total_size'] + else: + stats['dedup_ratio'] = 1.0 + + # Space saved percentage + if stats['total_size'] > 0: + stats['space_saved_percent'] = (stats['wasted_space'] / stats['total_size']) * 100 + else: + stats['space_saved_percent'] = 0.0 + + cursor.close() + + return stats + + def mark_canonical_files(self) -> int: + """Mark canonical (first occurrence) files in database + + Returns: + Number of canonical files marked + """ + self.logger.subsection("Marking Canonical Files") + + conn = self._get_connection() + cursor = conn.cursor() + + # Find first occurrence of each checksum and mark as canonical + cursor.execute(""" + WITH canonical AS ( + SELECT DISTINCT ON (checksum) path, checksum + FROM files + WHERE checksum IS NOT NULL + ORDER BY checksum, path + ) + UPDATE files + SET duplicate_of = NULL + WHERE path IN (SELECT path FROM canonical) + """) + + count = cursor.rowcount + conn.commit() + cursor.close() + + self.logger.info(f"Marked {count} canonical files") + + return count + + def close(self): + """Close connections""" + self.hash_store.close() + if self._connection and not self._connection.closed: + self._connection.close() + + def __enter__(self): + """Context manager entry""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit""" + self.close() diff --git a/src/deduplication/store.py b/src/deduplication/store.py index e69de29..117a580 100644 --- a/src/deduplication/store.py +++ b/src/deduplication/store.py @@ -0,0 +1,412 @@ +"""Hash store for deduplication with optional Redis support""" +from typing import Optional, Dict, Set +from pathlib import Path +import psycopg2 +from psycopg2.extras import execute_batch + +from ..shared.config import DatabaseConfig + + +class HashStore: + """PostgreSQL-based hash store for deduplication""" + + def __init__(self, db_config: DatabaseConfig): + """Initialize hash store + + Args: + db_config: Database configuration + """ + self.db_config = db_config + self._connection = None + + def _get_connection(self): + """Get or create database connection""" + if self._connection is None or self._connection.closed: + self._connection = psycopg2.connect( + host=self.db_config.host, + port=self.db_config.port, + database=self.db_config.database, + user=self.db_config.user, + password=self.db_config.password + ) + return self._connection + + def _ensure_tables(self): + """Ensure hash store tables exist""" + conn = self._get_connection() + cursor = conn.cursor() + + # Create hashes table for file-level deduplication + cursor.execute(""" + CREATE TABLE IF NOT EXISTS file_hashes ( + checksum TEXT PRIMARY KEY, + canonical_path TEXT NOT NULL, + size BIGINT NOT NULL, + first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + ref_count INTEGER DEFAULT 1 + ) + """) + + # Create chunk hashes table for chunk-level deduplication + cursor.execute(""" + CREATE TABLE IF NOT EXISTS chunk_hashes ( + chunk_hash TEXT PRIMARY KEY, + size INTEGER NOT NULL, + first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + ref_count INTEGER DEFAULT 1 + ) + """) + + # Create file-chunk mapping table + cursor.execute(""" + CREATE TABLE IF NOT EXISTS file_chunks ( + id SERIAL PRIMARY KEY, + file_checksum TEXT NOT NULL, + chunk_hash TEXT NOT NULL, + chunk_index INTEGER NOT NULL, + FOREIGN KEY (file_checksum) REFERENCES file_hashes(checksum), + FOREIGN KEY (chunk_hash) REFERENCES chunk_hashes(chunk_hash), + UNIQUE (file_checksum, chunk_index) + ) + """) + + # Create indexes + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_file_chunks_file + ON file_chunks(file_checksum) + """) + + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_file_chunks_chunk + ON file_chunks(chunk_hash) + """) + + conn.commit() + cursor.close() + + def exists(self, checksum: str) -> bool: + """Check if hash exists in store + + Args: + checksum: File hash to check + + Returns: + True if hash exists + """ + self._ensure_tables() + conn = self._get_connection() + cursor = conn.cursor() + + cursor.execute( + "SELECT 1 FROM file_hashes WHERE checksum = %s LIMIT 1", + (checksum,) + ) + + exists = cursor.fetchone() is not None + cursor.close() + + return exists + + def get_canonical(self, checksum: str) -> Optional[str]: + """Get canonical path for a hash + + Args: + checksum: File hash + + Returns: + Canonical file path or None if not found + """ + self._ensure_tables() + conn = self._get_connection() + cursor = conn.cursor() + + cursor.execute( + "SELECT canonical_path FROM file_hashes WHERE checksum = %s", + (checksum,) + ) + + result = cursor.fetchone() + cursor.close() + + return result[0] if result else None + + def store_canonical( + self, + checksum: str, + path: Path, + size: int, + chunk_hashes: Optional[list[str]] = None + ) -> None: + """Store canonical reference for a hash + + Args: + checksum: File hash + path: Canonical file path + size: File size in bytes + chunk_hashes: Optional list of chunk hashes + """ + self._ensure_tables() + conn = self._get_connection() + cursor = conn.cursor() + + try: + # Store file hash + cursor.execute(""" + INSERT INTO file_hashes (checksum, canonical_path, size) + VALUES (%s, %s, %s) + ON CONFLICT (checksum) DO UPDATE SET + ref_count = file_hashes.ref_count + 1 + """, (checksum, str(path), size)) + + # Store chunk hashes if provided + if chunk_hashes: + # Insert chunk hashes + chunk_data = [(chunk_hash, 0) for chunk_hash in chunk_hashes] + execute_batch(cursor, """ + INSERT INTO chunk_hashes (chunk_hash, size) + VALUES (%s, %s) + ON CONFLICT (chunk_hash) DO UPDATE SET + ref_count = chunk_hashes.ref_count + 1 + """, chunk_data, page_size=1000) + + # Create file-chunk mappings + mapping_data = [ + (checksum, chunk_hash, idx) + for idx, chunk_hash in enumerate(chunk_hashes) + ] + execute_batch(cursor, """ + INSERT INTO file_chunks (file_checksum, chunk_hash, chunk_index) + VALUES (%s, %s, %s) + ON CONFLICT (file_checksum, chunk_index) DO NOTHING + """, mapping_data, page_size=1000) + + conn.commit() + + except Exception as e: + conn.rollback() + raise + + finally: + cursor.close() + + def get_chunk_hashes(self, checksum: str) -> list[str]: + """Get chunk hashes for a file + + Args: + checksum: File hash + + Returns: + List of chunk hashes in order + """ + self._ensure_tables() + conn = self._get_connection() + cursor = conn.cursor() + + cursor.execute(""" + SELECT chunk_hash + FROM file_chunks + WHERE file_checksum = %s + ORDER BY chunk_index + """, (checksum,)) + + chunk_hashes = [row[0] for row in cursor.fetchall()] + cursor.close() + + return chunk_hashes + + def get_duplicates(self) -> Dict[str, list[str]]: + """Get all duplicate file groups + + Returns: + Dictionary mapping canonical path to list of duplicate paths + """ + self._ensure_tables() + conn = self._get_connection() + cursor = conn.cursor() + + # Get all files with their hashes + cursor.execute(""" + SELECT f.path, f.checksum + FROM files f + WHERE f.checksum IS NOT NULL + """) + + # Group by checksum + hash_to_paths: Dict[str, list[str]] = {} + for path, checksum in cursor.fetchall(): + if checksum not in hash_to_paths: + hash_to_paths[checksum] = [] + hash_to_paths[checksum].append(path) + + cursor.close() + + # Filter to only duplicates (more than one file) + duplicates = { + paths[0]: paths[1:] + for checksum, paths in hash_to_paths.items() + if len(paths) > 1 + } + + return duplicates + + def get_stats(self) -> Dict[str, int]: + """Get hash store statistics + + Returns: + Dictionary with statistics + """ + self._ensure_tables() + conn = self._get_connection() + cursor = conn.cursor() + + stats = {} + + # Count unique file hashes + cursor.execute("SELECT COUNT(*) FROM file_hashes") + stats['unique_files'] = cursor.fetchone()[0] + + # Count unique chunk hashes + cursor.execute("SELECT COUNT(*) FROM chunk_hashes") + stats['unique_chunks'] = cursor.fetchone()[0] + + # Count total references + cursor.execute("SELECT COALESCE(SUM(ref_count), 0) FROM file_hashes") + stats['total_file_refs'] = cursor.fetchone()[0] + + # Count total chunk references + cursor.execute("SELECT COALESCE(SUM(ref_count), 0) FROM chunk_hashes") + stats['total_chunk_refs'] = cursor.fetchone()[0] + + # Calculate deduplication ratio + if stats['total_file_refs'] > 0: + stats['dedup_ratio'] = stats['unique_files'] / stats['total_file_refs'] + else: + stats['dedup_ratio'] = 1.0 + + cursor.close() + + return stats + + def find_similar_files(self, checksum: str, threshold: float = 0.8) -> list[tuple[str, float]]: + """Find files similar to given hash based on chunk overlap + + Args: + checksum: File hash to compare + threshold: Similarity threshold (0.0 to 1.0) + + Returns: + List of tuples (other_checksum, similarity_score) + """ + self._ensure_tables() + conn = self._get_connection() + cursor = conn.cursor() + + # Get chunks for the target file + target_chunks = set(self.get_chunk_hashes(checksum)) + + if not target_chunks: + cursor.close() + return [] + + # Find files sharing chunks + cursor.execute(""" + SELECT DISTINCT fc.file_checksum + FROM file_chunks fc + WHERE fc.chunk_hash = ANY(%s) + AND fc.file_checksum != %s + """, (list(target_chunks), checksum)) + + similar_files = [] + + for (other_checksum,) in cursor.fetchall(): + other_chunks = set(self.get_chunk_hashes(other_checksum)) + + # Calculate Jaccard similarity + intersection = len(target_chunks & other_chunks) + union = len(target_chunks | other_chunks) + + if union > 0: + similarity = intersection / union + + if similarity >= threshold: + similar_files.append((other_checksum, similarity)) + + cursor.close() + + # Sort by similarity descending + similar_files.sort(key=lambda x: x[1], reverse=True) + + return similar_files + + def close(self): + """Close database connection""" + if self._connection and not self._connection.closed: + self._connection.close() + + def __enter__(self): + """Context manager entry""" + self._ensure_tables() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit""" + self.close() + + +class MemoryHashStore: + """In-memory hash store for testing and small datasets""" + + def __init__(self): + """Initialize in-memory hash store""" + self.hashes: Dict[str, tuple[str, int]] = {} + self.chunks: Dict[str, int] = {} + self.file_chunks: Dict[str, list[str]] = {} + + def exists(self, checksum: str) -> bool: + """Check if hash exists""" + return checksum in self.hashes + + def get_canonical(self, checksum: str) -> Optional[str]: + """Get canonical path""" + return self.hashes.get(checksum, (None, 0))[0] + + def store_canonical( + self, + checksum: str, + path: Path, + size: int, + chunk_hashes: Optional[list[str]] = None + ) -> None: + """Store canonical reference""" + self.hashes[checksum] = (str(path), size) + + if chunk_hashes: + self.file_chunks[checksum] = chunk_hashes + for chunk_hash in chunk_hashes: + self.chunks[chunk_hash] = self.chunks.get(chunk_hash, 0) + 1 + + def get_chunk_hashes(self, checksum: str) -> list[str]: + """Get chunk hashes""" + return self.file_chunks.get(checksum, []) + + def get_stats(self) -> Dict[str, int]: + """Get statistics""" + return { + 'unique_files': len(self.hashes), + 'unique_chunks': len(self.chunks), + 'total_file_refs': len(self.hashes), + 'total_chunk_refs': sum(self.chunks.values()), + 'dedup_ratio': 1.0 + } + + def close(self): + """No-op for compatibility""" + pass + + def __enter__(self): + """Context manager entry""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit""" + pass diff --git a/src/discovery/__init__.py b/src/discovery/__init__.py index e69de29..575a15d 100644 --- a/src/discovery/__init__.py +++ b/src/discovery/__init__.py @@ -0,0 +1,17 @@ +"""Discovery package exports""" +from .scanner import FileScanner, FilteredScanner +from .system import SystemAPI +from .engine import DiscoveryEngine +from ._protocols import FileMeta, MountInfo, DiskInfo, IFileScanner, ISystemAPI + +__all__ = [ + 'FileScanner', + 'FilteredScanner', + 'SystemAPI', + 'DiscoveryEngine', + 'FileMeta', + 'MountInfo', + 'DiskInfo', + 'IFileScanner', + 'ISystemAPI', +] diff --git a/src/discovery/engine.py b/src/discovery/engine.py index e69de29..1183cb9 100644 --- a/src/discovery/engine.py +++ b/src/discovery/engine.py @@ -0,0 +1,321 @@ +"""Discovery engine coordinating scanner and system APIs""" +from pathlib import Path +from typing import Optional, Callable +from datetime import datetime +import psycopg2 +from psycopg2.extras import execute_batch + +from .scanner import FileScanner +from .system import SystemAPI +from ._protocols import FileMeta +from ..shared.models import FileRecord, DiskInfo, ProcessingStats +from ..shared.config import DatabaseConfig +from ..shared.logger import ProgressLogger + + +class DiscoveryEngine: + """Discovery engine for scanning and cataloging files""" + + def __init__( + self, + db_config: DatabaseConfig, + logger: ProgressLogger, + batch_size: int = 1000 + ): + """Initialize discovery engine + + Args: + db_config: Database configuration + logger: Progress logger + batch_size: Number of records to batch before database commit + """ + self.db_config = db_config + self.logger = logger + self.batch_size = batch_size + self.system_api = SystemAPI() + self._connection = None + + def _get_connection(self): + """Get or create database connection""" + if self._connection is None or self._connection.closed: + self._connection = psycopg2.connect( + host=self.db_config.host, + port=self.db_config.port, + database=self.db_config.database, + user=self.db_config.user, + password=self.db_config.password + ) + return self._connection + + def _ensure_tables(self): + """Ensure database tables exist""" + conn = self._get_connection() + cursor = conn.cursor() + + # Create files table + cursor.execute(""" + CREATE TABLE IF NOT EXISTS files ( + id SERIAL PRIMARY KEY, + path TEXT NOT NULL UNIQUE, + size BIGINT NOT NULL, + modified_time DOUBLE PRECISION NOT NULL, + created_time DOUBLE PRECISION NOT NULL, + disk TEXT NOT NULL, + checksum TEXT, + status TEXT DEFAULT 'indexed', + category TEXT, + duplicate_of TEXT, + discovered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """) + + # Create index on path + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_files_path ON files(path) + """) + + # Create index on disk + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_files_disk ON files(disk) + """) + + # Create index on checksum + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_files_checksum ON files(checksum) + """) + + conn.commit() + cursor.close() + + def discover_path( + self, + root: Path, + scanner: Optional[FileScanner] = None, + progress_callback: Optional[Callable[[int, int, ProcessingStats], None]] = None + ) -> ProcessingStats: + """Discover and catalog files in a path + + Args: + root: Root path to discover + scanner: Optional custom scanner (default: FileScanner()) + progress_callback: Optional callback for progress updates + + Returns: + ProcessingStats with discovery statistics + """ + self.logger.section(f"Discovering: {root}") + + # Ensure tables exist + self._ensure_tables() + + # Create scanner if not provided + if scanner is None: + scanner = FileScanner( + error_handler=lambda e, p: self.logger.warning(f"Error scanning {p}: {e}") + ) + + # Get disk info for the root path + disk = self.system_api.get_disk_for_path(root) + if disk is None: + disk = str(root) + + # Initialize statistics + stats = ProcessingStats() + batch = [] + + conn = self._get_connection() + cursor = conn.cursor() + + try: + # Scan files + for file_meta in scanner.scan(root): + # Create file record + record = FileRecord( + path=file_meta.path, + size=file_meta.size, + modified_time=file_meta.modified_time, + created_time=file_meta.created_time, + disk=disk + ) + + batch.append(record) + stats.files_processed += 1 + stats.bytes_processed += record.size + + # Batch insert + if len(batch) >= self.batch_size: + self._insert_batch(cursor, batch) + conn.commit() + batch.clear() + + # Progress callback + if progress_callback: + progress_callback(stats.files_processed, 0, stats) + + # Log progress + if stats.files_processed % (self.batch_size * 10) == 0: + self.logger.progress( + stats.files_processed, + stats.files_processed, # We don't know total + prefix="Files discovered", + bytes_processed=stats.bytes_processed, + elapsed_seconds=stats.elapsed_seconds + ) + + # Insert remaining batch + if batch: + self._insert_batch(cursor, batch) + conn.commit() + + stats.files_succeeded = stats.files_processed + + except Exception as e: + conn.rollback() + self.logger.error(f"Discovery failed: {e}") + raise + + finally: + cursor.close() + + self.logger.info( + f"Discovery complete: {stats.files_processed} files, " + f"{stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s" + ) + + return stats + + def _insert_batch(self, cursor, batch: list[FileRecord]): + """Insert batch of file records + + Args: + cursor: Database cursor + batch: List of FileRecord objects + """ + query = """ + INSERT INTO files (path, size, modified_time, created_time, disk, checksum, status, category, duplicate_of) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (path) DO UPDATE SET + size = EXCLUDED.size, + modified_time = EXCLUDED.modified_time, + updated_at = CURRENT_TIMESTAMP + """ + + data = [ + ( + str(record.path), + record.size, + record.modified_time, + record.created_time, + record.disk, + record.checksum, + record.status, + record.category, + record.duplicate_of + ) + for record in batch + ] + + execute_batch(cursor, query, data, page_size=self.batch_size) + + def get_disk_info(self) -> list[DiskInfo]: + """Get information about all disks + + Returns: + List of DiskInfo objects + """ + self.logger.subsection("Querying disk information") + + disks = [] + for disk_info in self.system_api.query_nvmes(): + # Get mount point if available + mount_point = None + fs_type = "unknown" + + for mount in self.system_api.query_mounts(): + if mount.device == disk_info.device: + mount_point = Path(mount.mount_point) + fs_type = mount.fs_type + break + + if mount_point: + total, used, free = self.system_api.get_disk_usage(mount_point) + else: + total = disk_info.size + used = 0 + free = disk_info.size + + disk = DiskInfo( + name=disk_info.device, + device=disk_info.device, + mount_point=mount_point or Path("/"), + total_size=total, + used_size=used, + free_size=free, + fs_type=fs_type + ) + disks.append(disk) + + self.logger.info( + f" {disk.name}: {disk.usage_percent:.1f}% used " + f"({disk.used_size:,} / {disk.total_size:,} bytes)" + ) + + return disks + + def get_file_count(self, disk: Optional[str] = None) -> int: + """Get count of discovered files + + Args: + disk: Optional disk filter + + Returns: + Count of files + """ + conn = self._get_connection() + cursor = conn.cursor() + + if disk: + cursor.execute("SELECT COUNT(*) FROM files WHERE disk = %s", (disk,)) + else: + cursor.execute("SELECT COUNT(*) FROM files") + + count = cursor.fetchone()[0] + cursor.close() + + return count + + def get_total_size(self, disk: Optional[str] = None) -> int: + """Get total size of discovered files + + Args: + disk: Optional disk filter + + Returns: + Total size in bytes + """ + conn = self._get_connection() + cursor = conn.cursor() + + if disk: + cursor.execute("SELECT COALESCE(SUM(size), 0) FROM files WHERE disk = %s", (disk,)) + else: + cursor.execute("SELECT COALESCE(SUM(size), 0) FROM files") + + total = cursor.fetchone()[0] + cursor.close() + + return total + + def close(self): + """Close database connection""" + if self._connection and not self._connection.closed: + self._connection.close() + + def __enter__(self): + """Context manager entry""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit""" + self.close() diff --git a/src/discovery/scanner.py b/src/discovery/scanner.py index e69de29..f2fc338 100644 --- a/src/discovery/scanner.py +++ b/src/discovery/scanner.py @@ -0,0 +1,216 @@ +"""File system scanner implementing IFileScanner protocol""" +import os +from pathlib import Path +from typing import Iterator, Optional, Callable +from datetime import datetime + +from ._protocols import FileMeta + + +class FileScanner: + """File system scanner with filtering and error handling""" + + def __init__( + self, + follow_symlinks: bool = False, + skip_hidden: bool = True, + error_handler: Optional[Callable[[Exception, Path], None]] = None + ): + """Initialize file scanner + + Args: + follow_symlinks: Whether to follow symbolic links + skip_hidden: Whether to skip hidden files/directories + error_handler: Optional callback for handling errors during scan + """ + self.follow_symlinks = follow_symlinks + self.skip_hidden = skip_hidden + self.error_handler = error_handler + self._files_scanned = 0 + self._bytes_scanned = 0 + self._errors = 0 + + def scan(self, root: Path) -> Iterator[FileMeta]: + """Scan a directory tree and yield file metadata + + Args: + root: Root directory to scan + + Yields: + FileMeta objects for each discovered file + """ + if not root.exists(): + error = FileNotFoundError(f"Path does not exist: {root}") + if self.error_handler: + self.error_handler(error, root) + else: + raise error + return + + if not root.is_dir(): + # If root is a file, just return its metadata + try: + yield self._get_file_meta(root) + except Exception as e: + self._errors += 1 + if self.error_handler: + self.error_handler(e, root) + else: + raise + return + + # Walk directory tree + for dirpath, dirnames, filenames in os.walk(root, followlinks=self.follow_symlinks): + current_dir = Path(dirpath) + + # Filter directories if needed + if self.skip_hidden: + dirnames[:] = [d for d in dirnames if not d.startswith('.')] + + # Process files + for filename in filenames: + if self.skip_hidden and filename.startswith('.'): + continue + + file_path = current_dir / filename + + try: + # Skip broken symlinks + if file_path.is_symlink() and not file_path.exists(): + continue + + meta = self._get_file_meta(file_path) + self._files_scanned += 1 + self._bytes_scanned += meta.size + + yield meta + + except PermissionError as e: + self._errors += 1 + if self.error_handler: + self.error_handler(e, file_path) + # Continue scanning + continue + + except Exception as e: + self._errors += 1 + if self.error_handler: + self.error_handler(e, file_path) + # Continue scanning + continue + + def _get_file_meta(self, path: Path) -> FileMeta: + """Get file metadata + + Args: + path: Path to file + + Returns: + FileMeta object with file metadata + + Raises: + OSError: If file cannot be accessed + """ + stat = path.stat() + + # Get creation time (platform dependent) + created_time = stat.st_ctime + if hasattr(stat, 'st_birthtime'): + created_time = stat.st_birthtime + + return FileMeta( + path=path, + size=stat.st_size, + modified_time=stat.st_mtime, + created_time=created_time + ) + + @property + def files_scanned(self) -> int: + """Get count of files scanned""" + return self._files_scanned + + @property + def bytes_scanned(self) -> int: + """Get total bytes scanned""" + return self._bytes_scanned + + @property + def errors(self) -> int: + """Get count of errors encountered""" + return self._errors + + def reset_stats(self) -> None: + """Reset scanning statistics""" + self._files_scanned = 0 + self._bytes_scanned = 0 + self._errors = 0 + + +class FilteredScanner(FileScanner): + """Scanner with additional filtering capabilities""" + + def __init__( + self, + min_size: Optional[int] = None, + max_size: Optional[int] = None, + extensions: Optional[list[str]] = None, + exclude_patterns: Optional[list[str]] = None, + **kwargs + ): + """Initialize filtered scanner + + Args: + min_size: Minimum file size in bytes + max_size: Maximum file size in bytes + extensions: List of file extensions to include (e.g., ['.txt', '.py']) + exclude_patterns: List of path patterns to exclude + **kwargs: Additional arguments passed to FileScanner + """ + super().__init__(**kwargs) + self.min_size = min_size + self.max_size = max_size + self.extensions = {ext.lower() for ext in extensions} if extensions else None + self.exclude_patterns = exclude_patterns or [] + + def scan(self, root: Path) -> Iterator[FileMeta]: + """Scan with additional filtering + + Args: + root: Root directory to scan + + Yields: + FileMeta objects for files matching filter criteria + """ + for meta in super().scan(root): + # Size filtering + if self.min_size is not None and meta.size < self.min_size: + continue + if self.max_size is not None and meta.size > self.max_size: + continue + + # Extension filtering + if self.extensions is not None: + if meta.path.suffix.lower() not in self.extensions: + continue + + # Exclude pattern filtering + if self._should_exclude(meta.path): + continue + + yield meta + + def _should_exclude(self, path: Path) -> bool: + """Check if path matches any exclude pattern + + Args: + path: Path to check + + Returns: + True if path should be excluded + """ + path_str = str(path) + for pattern in self.exclude_patterns: + if pattern in path_str: + return True + return False diff --git a/src/discovery/system.py b/src/discovery/system.py index e69de29..245baef 100644 --- a/src/discovery/system.py +++ b/src/discovery/system.py @@ -0,0 +1,236 @@ +"""System API for querying mounts and disks""" +import os +import subprocess +from pathlib import Path +from typing import Optional +import psutil + +from ._protocols import MountInfo, DiskInfo + + +class SystemAPI: + """System information API for querying mounts and disks""" + + def query_mounts(self) -> list[MountInfo]: + """Query mounted filesystems + + Returns: + List of MountInfo objects for all mounted filesystems + """ + mounts = [] + + for partition in psutil.disk_partitions(all=False): + mount_info = MountInfo( + device=partition.device, + mount_point=partition.mountpoint, + fs_type=partition.fstype, + options=partition.opts + ) + mounts.append(mount_info) + + return mounts + + def query_nvmes(self) -> list[DiskInfo]: + """Query NVMe/disk information + + Returns: + List of DiskInfo objects for all disks + """ + disks = [] + + # Try to get disk information using lsblk + try: + result = subprocess.run( + ['lsblk', '-ndo', 'NAME,MODEL,SIZE,SERIAL', '-b'], + capture_output=True, + text=True, + check=False + ) + + if result.returncode == 0: + for line in result.stdout.strip().split('\n'): + if not line.strip(): + continue + + parts = line.split(maxsplit=3) + if len(parts) >= 3: + device = f"/dev/{parts[0]}" + model = parts[1] if len(parts) > 1 else "Unknown" + size_str = parts[2] if len(parts) > 2 else "0" + serial = parts[3] if len(parts) > 3 else "Unknown" + + try: + size = int(size_str) + except ValueError: + size = 0 + + disk_info = DiskInfo( + device=device, + model=model, + size=size, + serial=serial + ) + disks.append(disk_info) + + except FileNotFoundError: + # lsblk not available, fall back to basic info + pass + + # If lsblk failed or unavailable, try alternative method + if not disks: + disks = self._query_disks_fallback() + + return disks + + def _query_disks_fallback(self) -> list[DiskInfo]: + """Fallback method for querying disk information + + Returns: + List of DiskInfo objects using psutil + """ + disks = [] + seen_devices = set() + + for partition in psutil.disk_partitions(all=True): + device = partition.device + + # Skip non-disk devices + if not device.startswith('/dev/'): + continue + + # Get base device (e.g., /dev/sda from /dev/sda1) + base_device = self._get_base_device(device) + + if base_device in seen_devices: + continue + + seen_devices.add(base_device) + + try: + usage = psutil.disk_usage(partition.mountpoint) + size = usage.total + except (PermissionError, OSError): + size = 0 + + disk_info = DiskInfo( + device=base_device, + model="Unknown", + size=size, + serial="Unknown" + ) + disks.append(disk_info) + + return disks + + def _get_base_device(self, device: str) -> str: + """Extract base device name from partition device + + Args: + device: Device path (e.g., /dev/sda1, /dev/nvme0n1p1) + + Returns: + Base device path (e.g., /dev/sda, /dev/nvme0n1) + """ + # Handle NVMe devices + if 'nvme' in device: + # /dev/nvme0n1p1 -> /dev/nvme0n1 + if 'p' in device: + return device.rsplit('p', 1)[0] + return device + + # Handle standard devices (sda, sdb, etc.) + # /dev/sda1 -> /dev/sda + import re + match = re.match(r'(/dev/[a-z]+)', device) + if match: + return match.group(1) + + return device + + def get_disk_for_path(self, path: Path) -> Optional[str]: + """Get the disk/mount point for a given path + + Args: + path: Path to check + + Returns: + Mount point device or None if not found + """ + path = path.resolve() + + # Find the mount point that contains this path + best_match = None + best_match_len = 0 + + for partition in psutil.disk_partitions(): + mount_point = Path(partition.mountpoint) + try: + if path == mount_point or mount_point in path.parents: + mount_len = len(str(mount_point)) + if mount_len > best_match_len: + best_match = partition.device + best_match_len = mount_len + except (ValueError, OSError): + continue + + return best_match + + def get_disk_usage(self, path: Path) -> tuple[int, int, int]: + """Get disk usage for a path + + Args: + path: Path to check + + Returns: + Tuple of (total, used, free) in bytes + """ + try: + usage = psutil.disk_usage(str(path)) + return usage.total, usage.used, usage.free + except (PermissionError, OSError): + return 0, 0, 0 + + def get_mount_point(self, path: Path) -> Optional[Path]: + """Get the mount point for a given path + + Args: + path: Path to check + + Returns: + Mount point path or None if not found + """ + path = path.resolve() + + # Find the mount point that contains this path + best_match = None + best_match_len = 0 + + for partition in psutil.disk_partitions(): + mount_point = Path(partition.mountpoint) + try: + if path == mount_point or mount_point in path.parents: + mount_len = len(str(mount_point)) + if mount_len > best_match_len: + best_match = mount_point + best_match_len = mount_len + except (ValueError, OSError): + continue + + return best_match + + def is_same_filesystem(self, path1: Path, path2: Path) -> bool: + """Check if two paths are on the same filesystem + + Args: + path1: First path + path2: Second path + + Returns: + True if paths are on the same filesystem + """ + try: + stat1 = path1.stat() + stat2 = path2.stat() + return stat1.st_dev == stat2.st_dev + except (OSError, PermissionError): + return False diff --git a/src/migration/__init__.py b/src/migration/__init__.py index e69de29..601434d 100644 --- a/src/migration/__init__.py +++ b/src/migration/__init__.py @@ -0,0 +1,27 @@ +"""Migration package exports""" +from .copy import ( + CopyMigrationStrategy, + FastCopyStrategy, + SafeCopyStrategy, + ReferenceCopyStrategy +) +from .hardlink import ( + HardlinkMigrationStrategy, + SymlinkMigrationStrategy, + DedupHardlinkStrategy +) +from .engine import MigrationEngine +from ._protocols import IMigrationStrategy, IMigrationEngine + +__all__ = [ + 'CopyMigrationStrategy', + 'FastCopyStrategy', + 'SafeCopyStrategy', + 'ReferenceCopyStrategy', + 'HardlinkMigrationStrategy', + 'SymlinkMigrationStrategy', + 'DedupHardlinkStrategy', + 'MigrationEngine', + 'IMigrationStrategy', + 'IMigrationEngine', +] diff --git a/src/migration/_protocols.py b/src/migration/_protocols.py index e69de29..188b8ee 100644 --- a/src/migration/_protocols.py +++ b/src/migration/_protocols.py @@ -0,0 +1,107 @@ +"""Protocol definitions for the migration package""" +from typing import Protocol +from pathlib import Path +from ..shared.models import OperationRecord + + +class IMigrationStrategy(Protocol): + """Protocol for migration strategies""" + + def migrate( + self, + source: Path, + destination: Path, + verify: bool = True + ) -> bool: + """Migrate a file from source to destination + + Args: + source: Source file path + destination: Destination file path + verify: Whether to verify the operation + + Returns: + True if migration successful + """ + ... + + def can_migrate(self, source: Path, destination: Path) -> bool: + """Check if migration is possible + + Args: + source: Source file path + destination: Destination file path + + Returns: + True if migration is possible + """ + ... + + def estimate_time(self, source: Path) -> float: + """Estimate migration time in seconds + + Args: + source: Source file path + + Returns: + Estimated time in seconds + """ + ... + + def cleanup(self, source: Path) -> bool: + """Cleanup source file after successful migration + + Args: + source: Source file path + + Returns: + True if cleanup successful + """ + ... + + +class IMigrationEngine(Protocol): + """Protocol for migration engine""" + + def plan_migration( + self, + disk: str, + target_base: Path + ) -> list[OperationRecord]: + """Plan migration for a disk + + Args: + disk: Disk identifier + target_base: Target base directory + + Returns: + List of planned operations + """ + ... + + def execute_migration( + self, + operations: list[OperationRecord], + dry_run: bool = False + ) -> dict: + """Execute migration operations + + Args: + operations: List of operations to execute + dry_run: Whether to perform a dry run + + Returns: + Dictionary with execution statistics + """ + ... + + def rollback(self, operation: OperationRecord) -> bool: + """Rollback a migration operation + + Args: + operation: Operation to rollback + + Returns: + True if rollback successful + """ + ... diff --git a/src/migration/copy.py b/src/migration/copy.py index e69de29..7304c0f 100644 --- a/src/migration/copy.py +++ b/src/migration/copy.py @@ -0,0 +1,268 @@ +"""Copy-based migration strategy""" +import shutil +from pathlib import Path +from typing import Optional +import os + +from ..shared.logger import ProgressLogger + + +class CopyMigrationStrategy: + """Copy files to destination with verification""" + + def __init__( + self, + logger: Optional[ProgressLogger] = None, + preserve_metadata: bool = True, + verify_checksums: bool = True + ): + """Initialize copy migration strategy + + Args: + logger: Optional progress logger + preserve_metadata: Whether to preserve file metadata + verify_checksums: Whether to verify checksums after copy + """ + self.logger = logger + self.preserve_metadata = preserve_metadata + self.verify_checksums = verify_checksums + + def migrate( + self, + source: Path, + destination: Path, + verify: bool = True + ) -> bool: + """Migrate file by copying + + Args: + source: Source file path + destination: Destination file path + verify: Whether to verify the operation + + Returns: + True if migration successful + """ + if not source.exists(): + if self.logger: + self.logger.error(f"Source file does not exist: {source}") + return False + + # Create destination directory + destination.parent.mkdir(parents=True, exist_ok=True) + + try: + # Copy file + if self.preserve_metadata: + shutil.copy2(source, destination) + else: + shutil.copy(source, destination) + + # Verify if requested + if verify and self.verify_checksums: + if not self._verify_copy(source, destination): + if self.logger: + self.logger.error(f"Verification failed: {source} -> {destination}") + destination.unlink() + return False + + return True + + except Exception as e: + if self.logger: + self.logger.error(f"Copy failed: {source} -> {destination}: {e}") + return False + + def _verify_copy(self, source: Path, destination: Path) -> bool: + """Verify copied file + + Args: + source: Source file path + destination: Destination file path + + Returns: + True if verification successful + """ + # Check size + source_size = source.stat().st_size + dest_size = destination.stat().st_size + + if source_size != dest_size: + return False + + # Compare checksums for files larger than 1MB + if source_size > 1024 * 1024: + from ..deduplication.chunker import hash_file + + source_hash = hash_file(source) + dest_hash = hash_file(destination) + + return source_hash == dest_hash + + # For small files, compare content directly + with open(source, 'rb') as f1, open(destination, 'rb') as f2: + return f1.read() == f2.read() + + def can_migrate(self, source: Path, destination: Path) -> bool: + """Check if migration is possible + + Args: + source: Source file path + destination: Destination file path + + Returns: + True if migration is possible + """ + if not source.exists(): + return False + + # Check if destination directory is writable + dest_dir = destination.parent + if dest_dir.exists(): + return os.access(dest_dir, os.W_OK) + + # Check if parent directory exists and is writable + parent = dest_dir.parent + while not parent.exists() and parent != parent.parent: + parent = parent.parent + + return parent.exists() and os.access(parent, os.W_OK) + + def estimate_time(self, source: Path) -> float: + """Estimate migration time in seconds + + Args: + source: Source file path + + Returns: + Estimated time in seconds + """ + if not source.exists(): + return 0.0 + + size = source.stat().st_size + + # Estimate based on typical copy speed (100 MB/s) + typical_speed = 100 * 1024 * 1024 # bytes per second + return size / typical_speed + + def cleanup(self, source: Path) -> bool: + """Cleanup source file after successful migration + + Args: + source: Source file path + + Returns: + True if cleanup successful + """ + try: + if source.exists(): + source.unlink() + return True + except Exception as e: + if self.logger: + self.logger.warning(f"Failed to cleanup {source}: {e}") + return False + + +class FastCopyStrategy(CopyMigrationStrategy): + """Fast copy strategy without verification""" + + def __init__(self, logger: Optional[ProgressLogger] = None): + """Initialize fast copy strategy""" + super().__init__( + logger=logger, + preserve_metadata=True, + verify_checksums=False + ) + + +class SafeCopyStrategy(CopyMigrationStrategy): + """Safe copy strategy with full verification""" + + def __init__(self, logger: Optional[ProgressLogger] = None): + """Initialize safe copy strategy""" + super().__init__( + logger=logger, + preserve_metadata=True, + verify_checksums=True + ) + + +class ReferenceCopyStrategy: + """Create reference copy using reflinks (CoW) if supported""" + + def __init__(self, logger: Optional[ProgressLogger] = None): + """Initialize reflink copy strategy""" + self.logger = logger + + def migrate( + self, + source: Path, + destination: Path, + verify: bool = True + ) -> bool: + """Migrate using reflink (copy-on-write) + + Args: + source: Source file path + destination: Destination file path + verify: Whether to verify the operation + + Returns: + True if migration successful + """ + if not source.exists(): + if self.logger: + self.logger.error(f"Source file does not exist: {source}") + return False + + # Create destination directory + destination.parent.mkdir(parents=True, exist_ok=True) + + try: + # Try reflink copy (works on btrfs, xfs, etc.) + import subprocess + + result = subprocess.run( + ['cp', '--reflink=auto', str(source), str(destination)], + capture_output=True, + check=False + ) + + if result.returncode != 0: + # Fallback to regular copy + shutil.copy2(source, destination) + + return True + + except Exception as e: + if self.logger: + self.logger.error(f"Reflink copy failed: {source} -> {destination}: {e}") + return False + + def can_migrate(self, source: Path, destination: Path) -> bool: + """Check if migration is possible""" + if not source.exists(): + return False + + dest_dir = destination.parent + if dest_dir.exists(): + return os.access(dest_dir, os.W_OK) + + return True + + def estimate_time(self, source: Path) -> float: + """Estimate migration time (reflinks are fast)""" + return 0.1 # Reflinks are nearly instant + + def cleanup(self, source: Path) -> bool: + """Cleanup source file""" + try: + if source.exists(): + source.unlink() + return True + except Exception as e: + if self.logger: + self.logger.warning(f"Failed to cleanup {source}: {e}") + return False diff --git a/src/migration/engine.py b/src/migration/engine.py index e69de29..27cdb32 100644 --- a/src/migration/engine.py +++ b/src/migration/engine.py @@ -0,0 +1,454 @@ +"""Migration engine""" +from pathlib import Path +from typing import Optional, Callable +from datetime import datetime +import psycopg2 +from psycopg2.extras import execute_batch + +from .copy import CopyMigrationStrategy, SafeCopyStrategy +from .hardlink import HardlinkMigrationStrategy, SymlinkMigrationStrategy +from ..shared.models import OperationRecord, ProcessingStats, MigrationPlan +from ..shared.config import DatabaseConfig, ProcessingConfig +from ..shared.logger import ProgressLogger + + +class MigrationEngine: + """Engine for migrating files""" + + def __init__( + self, + db_config: DatabaseConfig, + processing_config: ProcessingConfig, + logger: ProgressLogger, + target_base: Path + ): + """Initialize migration engine + + Args: + db_config: Database configuration + processing_config: Processing configuration + logger: Progress logger + target_base: Target base directory for migrations + """ + self.db_config = db_config + self.processing_config = processing_config + self.logger = logger + self.target_base = Path(target_base) + self._connection = None + + # Initialize strategies + self.copy_strategy = SafeCopyStrategy(logger=logger) + self.hardlink_strategy = HardlinkMigrationStrategy(logger=logger) + self.symlink_strategy = SymlinkMigrationStrategy(logger=logger) + + def _get_connection(self): + """Get or create database connection""" + if self._connection is None or self._connection.closed: + self._connection = psycopg2.connect( + host=self.db_config.host, + port=self.db_config.port, + database=self.db_config.database, + user=self.db_config.user, + password=self.db_config.password + ) + return self._connection + + def _ensure_tables(self): + """Ensure migration tables exist""" + conn = self._get_connection() + cursor = conn.cursor() + + # Create operations table + cursor.execute(""" + CREATE TABLE IF NOT EXISTS operations ( + id SERIAL PRIMARY KEY, + source_path TEXT NOT NULL, + dest_path TEXT NOT NULL, + operation_type TEXT NOT NULL, + size BIGINT DEFAULT 0, + status TEXT DEFAULT 'pending', + error TEXT, + executed_at TIMESTAMP, + verified BOOLEAN DEFAULT FALSE, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """) + + # Create index on status + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_operations_status + ON operations(status) + """) + + conn.commit() + cursor.close() + + def plan_migration( + self, + disk: Optional[str] = None, + category: Optional[str] = None + ) -> MigrationPlan: + """Plan migration for files + + Args: + disk: Optional disk filter + category: Optional category filter + + Returns: + MigrationPlan with planned operations + """ + self.logger.section("Planning Migration") + + conn = self._get_connection() + cursor = conn.cursor() + + # Build query + conditions = ["category IS NOT NULL"] + params = [] + + if disk: + conditions.append("disk = %s") + params.append(disk) + + if category: + conditions.append("category = %s") + params.append(category) + + query = f""" + SELECT path, size, category, duplicate_of + FROM files + WHERE {' AND '.join(conditions)} + ORDER BY category, path + """ + + cursor.execute(query, params) + files = cursor.fetchall() + + self.logger.info(f"Found {len(files)} files to migrate") + + operations = [] + total_size = 0 + + for path_str, size, file_category, duplicate_of in files: + source = Path(path_str) + + # Determine destination + dest_path = self.target_base / file_category / source.name + + # Determine operation type + if duplicate_of: + # Use hardlink for duplicates + operation_type = 'hardlink' + else: + # Use copy for unique files + operation_type = 'copy' + + operation = OperationRecord( + source_path=source, + dest_path=dest_path, + operation_type=operation_type, + size=size + ) + + operations.append(operation) + total_size += size + + cursor.close() + + plan = MigrationPlan( + target_disk=str(self.target_base), + destination_disks=[str(self.target_base)], + operations=operations, + total_size=total_size, + file_count=len(operations) + ) + + self.logger.info( + f"Migration plan created: {plan.file_count} files, " + f"{plan.total_size:,} bytes" + ) + + return plan + + def execute_migration( + self, + operations: list[OperationRecord], + dry_run: bool = False, + progress_callback: Optional[Callable[[int, int, ProcessingStats], None]] = None + ) -> ProcessingStats: + """Execute migration operations + + Args: + operations: List of operations to execute + dry_run: Whether to perform a dry run + progress_callback: Optional callback for progress updates + + Returns: + ProcessingStats with execution statistics + """ + self.logger.section("Executing Migration" + (" (DRY RUN)" if dry_run else "")) + + self._ensure_tables() + + stats = ProcessingStats() + total_ops = len(operations) + + for operation in operations: + stats.files_processed += 1 + + if dry_run: + # In dry run, just log what would happen + self.logger.debug( + f"[DRY RUN] Would {operation.operation_type}: " + f"{operation.source_path} -> {operation.dest_path}" + ) + stats.files_succeeded += 1 + else: + # Execute actual migration + success = self._execute_operation(operation) + + if success: + stats.files_succeeded += 1 + stats.bytes_processed += operation.size + else: + stats.files_failed += 1 + + # Progress callback + if progress_callback and stats.files_processed % 100 == 0: + progress_callback(stats.files_processed, total_ops, stats) + + # Log progress + if stats.files_processed % 1000 == 0: + self.logger.progress( + stats.files_processed, + total_ops, + prefix="Operations executed", + bytes_processed=stats.bytes_processed, + elapsed_seconds=stats.elapsed_seconds + ) + + self.logger.info( + f"Migration {'dry run' if dry_run else 'execution'} complete: " + f"{stats.files_succeeded}/{total_ops} operations, " + f"{stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s" + ) + + return stats + + def _execute_operation(self, operation: OperationRecord) -> bool: + """Execute a single migration operation + + Args: + operation: Operation to execute + + Returns: + True if successful + """ + operation.status = 'in_progress' + operation.executed_at = datetime.now() + + try: + # Select strategy based on operation type + if operation.operation_type == 'copy': + strategy = self.copy_strategy + elif operation.operation_type == 'hardlink': + strategy = self.hardlink_strategy + elif operation.operation_type == 'symlink': + strategy = self.symlink_strategy + else: + raise ValueError(f"Unknown operation type: {operation.operation_type}") + + # Execute migration + success = strategy.migrate( + operation.source_path, + operation.dest_path, + verify=self.processing_config.verify_operations + ) + + if success: + operation.status = 'completed' + operation.verified = True + self._record_operation(operation) + return True + else: + operation.status = 'failed' + operation.error = "Migration failed" + self._record_operation(operation) + return False + + except Exception as e: + operation.status = 'failed' + operation.error = str(e) + self._record_operation(operation) + self.logger.error(f"Operation failed: {operation.source_path}: {e}") + return False + + def _record_operation(self, operation: OperationRecord): + """Record operation in database + + Args: + operation: Operation to record + """ + conn = self._get_connection() + cursor = conn.cursor() + + cursor.execute(""" + INSERT INTO operations ( + source_path, dest_path, operation_type, size, + status, error, executed_at, verified + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + """, ( + str(operation.source_path), + str(operation.dest_path), + operation.operation_type, + operation.size, + operation.status, + operation.error, + operation.executed_at, + operation.verified + )) + + conn.commit() + cursor.close() + + def rollback(self, operation: OperationRecord) -> bool: + """Rollback a migration operation + + Args: + operation: Operation to rollback + + Returns: + True if rollback successful + """ + self.logger.warning(f"Rolling back: {operation.dest_path}") + + try: + # Remove destination + if operation.dest_path.exists(): + operation.dest_path.unlink() + + # Update database + conn = self._get_connection() + cursor = conn.cursor() + + cursor.execute(""" + UPDATE operations + SET status = 'rolled_back' + WHERE source_path = %s AND dest_path = %s + """, (str(operation.source_path), str(operation.dest_path))) + + conn.commit() + cursor.close() + + return True + + except Exception as e: + self.logger.error(f"Rollback failed: {operation.dest_path}: {e}") + return False + + def get_migration_stats(self) -> dict: + """Get migration statistics + + Returns: + Dictionary with statistics + """ + conn = self._get_connection() + cursor = conn.cursor() + + stats = {} + + # Total operations + cursor.execute("SELECT COUNT(*) FROM operations") + stats['total_operations'] = cursor.fetchone()[0] + + # Operations by status + cursor.execute(""" + SELECT status, COUNT(*) + FROM operations + GROUP BY status + """) + + for status, count in cursor.fetchall(): + stats[f'{status}_operations'] = count + + # Total size migrated + cursor.execute(""" + SELECT COALESCE(SUM(size), 0) + FROM operations + WHERE status = 'completed' + """) + stats['total_size_migrated'] = cursor.fetchone()[0] + + cursor.close() + + return stats + + def verify_migrations(self) -> dict: + """Verify completed migrations + + Returns: + Dictionary with verification results + """ + self.logger.subsection("Verifying Migrations") + + conn = self._get_connection() + cursor = conn.cursor() + + cursor.execute(""" + SELECT source_path, dest_path, operation_type + FROM operations + WHERE status = 'completed' AND verified = FALSE + """) + + operations = cursor.fetchall() + cursor.close() + + results = { + 'total': len(operations), + 'verified': 0, + 'failed': 0 + } + + for source_str, dest_str, op_type in operations: + source = Path(source_str) + dest = Path(dest_str) + + # Verify destination exists + if not dest.exists(): + results['failed'] += 1 + self.logger.warning(f"Verification failed: {dest} does not exist") + continue + + # Verify based on operation type + if op_type == 'hardlink': + # Check if hardlinked + if source.exists() and source.stat().st_ino == dest.stat().st_ino: + results['verified'] += 1 + else: + results['failed'] += 1 + else: + # Check if destination exists and has correct size + if dest.exists(): + results['verified'] += 1 + else: + results['failed'] += 1 + + self.logger.info( + f"Verification complete: {results['verified']}/{results['total']} verified" + ) + + return results + + def close(self): + """Close database connection""" + if self._connection and not self._connection.closed: + self._connection.close() + + def __enter__(self): + """Context manager entry""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit""" + self.close() diff --git a/src/migration/hardlink.py b/src/migration/hardlink.py index e69de29..d7d2987 100644 --- a/src/migration/hardlink.py +++ b/src/migration/hardlink.py @@ -0,0 +1,377 @@ +"""Hardlink-based migration strategy""" +import os +from pathlib import Path +from typing import Optional + +from ..shared.logger import ProgressLogger + + +class HardlinkMigrationStrategy: + """Create hardlinks to files instead of copying""" + + def __init__(self, logger: Optional[ProgressLogger] = None): + """Initialize hardlink migration strategy + + Args: + logger: Optional progress logger + """ + self.logger = logger + + def migrate( + self, + source: Path, + destination: Path, + verify: bool = True + ) -> bool: + """Migrate file by creating hardlink + + Args: + source: Source file path + destination: Destination file path + verify: Whether to verify the operation + + Returns: + True if migration successful + """ + if not source.exists(): + if self.logger: + self.logger.error(f"Source file does not exist: {source}") + return False + + # Check if source and destination are on same filesystem + if not self._same_filesystem(source, destination.parent): + if self.logger: + self.logger.warning( + f"Cannot hardlink across filesystems: {source} -> {destination}" + ) + return False + + # Create destination directory + destination.parent.mkdir(parents=True, exist_ok=True) + + try: + # Create hardlink + os.link(source, destination) + + # Verify if requested + if verify: + if not self._verify_hardlink(source, destination): + if self.logger: + self.logger.error(f"Verification failed: {source} -> {destination}") + destination.unlink() + return False + + return True + + except FileExistsError: + if self.logger: + self.logger.warning(f"Destination already exists: {destination}") + return False + + except Exception as e: + if self.logger: + self.logger.error(f"Hardlink failed: {source} -> {destination}: {e}") + return False + + def _same_filesystem(self, path1: Path, path2: Path) -> bool: + """Check if two paths are on the same filesystem + + Args: + path1: First path + path2: Second path + + Returns: + True if on same filesystem + """ + try: + # Get device IDs + stat1 = path1.stat() + stat2 = path2.stat() + return stat1.st_dev == stat2.st_dev + except Exception: + return False + + def _verify_hardlink(self, source: Path, destination: Path) -> bool: + """Verify hardlink + + Args: + source: Source file path + destination: Destination file path + + Returns: + True if verification successful + """ + try: + # Check if they have the same inode + source_stat = source.stat() + dest_stat = destination.stat() + + return source_stat.st_ino == dest_stat.st_ino + + except Exception: + return False + + def can_migrate(self, source: Path, destination: Path) -> bool: + """Check if migration is possible + + Args: + source: Source file path + destination: Destination file path + + Returns: + True if migration is possible + """ + if not source.exists(): + return False + + # Check if on same filesystem + dest_dir = destination.parent + if dest_dir.exists(): + return self._same_filesystem(source, dest_dir) + + # Check parent directories + parent = dest_dir.parent + while not parent.exists() and parent != parent.parent: + parent = parent.parent + + return parent.exists() and self._same_filesystem(source, parent) + + def estimate_time(self, source: Path) -> float: + """Estimate migration time in seconds + + Args: + source: Source file path + + Returns: + Estimated time in seconds (hardlinks are instant) + """ + return 0.01 # Hardlinks are nearly instant + + def cleanup(self, source: Path) -> bool: + """Cleanup source file after successful migration + + Note: For hardlinks, we typically don't remove the source + immediately as both links point to the same inode. + + Args: + source: Source file path + + Returns: + True (no cleanup needed for hardlinks) + """ + # For hardlinks, we don't remove the source + # Both source and destination point to the same data + return True + + +class SymlinkMigrationStrategy: + """Create symbolic links to files""" + + def __init__( + self, + logger: Optional[ProgressLogger] = None, + absolute_links: bool = True + ): + """Initialize symlink migration strategy + + Args: + logger: Optional progress logger + absolute_links: Whether to create absolute symlinks + """ + self.logger = logger + self.absolute_links = absolute_links + + def migrate( + self, + source: Path, + destination: Path, + verify: bool = True + ) -> bool: + """Migrate file by creating symlink + + Args: + source: Source file path + destination: Destination file path + verify: Whether to verify the operation + + Returns: + True if migration successful + """ + if not source.exists(): + if self.logger: + self.logger.error(f"Source file does not exist: {source}") + return False + + # Create destination directory + destination.parent.mkdir(parents=True, exist_ok=True) + + try: + # Determine link target + if self.absolute_links: + target = source.resolve() + else: + # Create relative symlink + target = os.path.relpath(source, destination.parent) + + # Create symlink + destination.symlink_to(target) + + # Verify if requested + if verify: + if not self._verify_symlink(destination, source): + if self.logger: + self.logger.error(f"Verification failed: {source} -> {destination}") + destination.unlink() + return False + + return True + + except FileExistsError: + if self.logger: + self.logger.warning(f"Destination already exists: {destination}") + return False + + except Exception as e: + if self.logger: + self.logger.error(f"Symlink failed: {source} -> {destination}: {e}") + return False + + def _verify_symlink(self, symlink: Path, expected_target: Path) -> bool: + """Verify symlink + + Args: + symlink: Symlink path + expected_target: Expected target path + + Returns: + True if verification successful + """ + try: + # Check if it's a symlink + if not symlink.is_symlink(): + return False + + # Resolve and compare + resolved = symlink.resolve() + expected = expected_target.resolve() + + return resolved == expected + + except Exception: + return False + + def can_migrate(self, source: Path, destination: Path) -> bool: + """Check if migration is possible + + Args: + source: Source file path + destination: Destination file path + + Returns: + True if migration is possible + """ + if not source.exists(): + return False + + # Check if destination directory is writable + dest_dir = destination.parent + if dest_dir.exists(): + return os.access(dest_dir, os.W_OK) + + return True + + def estimate_time(self, source: Path) -> float: + """Estimate migration time in seconds + + Args: + source: Source file path + + Returns: + Estimated time in seconds (symlinks are instant) + """ + return 0.01 # Symlinks are instant + + def cleanup(self, source: Path) -> bool: + """Cleanup source file after successful migration + + Note: For symlinks, we don't remove the source as the + symlink points to it. + + Args: + source: Source file path + + Returns: + True (no cleanup needed for symlinks) + """ + # For symlinks, we don't remove the source + return True + + +class DedupHardlinkStrategy(HardlinkMigrationStrategy): + """Hardlink strategy for deduplication + + Creates hardlinks for duplicate files to save space. + """ + + def __init__(self, logger: Optional[ProgressLogger] = None): + """Initialize dedup hardlink strategy""" + super().__init__(logger=logger) + + def deduplicate( + self, + canonical: Path, + duplicate: Path + ) -> bool: + """Replace duplicate with hardlink to canonical + + Args: + canonical: Canonical file path + duplicate: Duplicate file path + + Returns: + True if deduplication successful + """ + if not canonical.exists(): + if self.logger: + self.logger.error(f"Canonical file does not exist: {canonical}") + return False + + if not duplicate.exists(): + if self.logger: + self.logger.error(f"Duplicate file does not exist: {duplicate}") + return False + + # Check if already hardlinked + if self._verify_hardlink(canonical, duplicate): + return True + + # Check if on same filesystem + if not self._same_filesystem(canonical, duplicate): + if self.logger: + self.logger.warning( + f"Cannot hardlink across filesystems: {canonical} -> {duplicate}" + ) + return False + + try: + # Create temporary backup + backup = duplicate.with_suffix(duplicate.suffix + '.bak') + duplicate.rename(backup) + + # Create hardlink + os.link(canonical, duplicate) + + # Remove backup + backup.unlink() + + return True + + except Exception as e: + if self.logger: + self.logger.error(f"Deduplication failed: {duplicate}: {e}") + + # Restore from backup + if backup.exists(): + backup.rename(duplicate) + + return False diff --git a/src/shared/__init__.py b/src/shared/__init__.py index e69de29..f40b202 100644 --- a/src/shared/__init__.py +++ b/src/shared/__init__.py @@ -0,0 +1,50 @@ +"""Shared package exports""" +from .models import ( + FileRecord, + OperationRecord, + DiskInfo, + MigrationPlan, + ProcessingStats +) +from .config import ( + Config, + DatabaseConfig, + ProcessingConfig, + LoggingConfig, + load_config +) +from .logger import ( + ProgressLogger, + create_logger, + format_size, + format_rate, + format_time +) +from ._protocols import IDatabase, ILogger + +__all__ = [ + # Models + 'FileRecord', + 'OperationRecord', + 'DiskInfo', + 'MigrationPlan', + 'ProcessingStats', + + # Config + 'Config', + 'DatabaseConfig', + 'ProcessingConfig', + 'LoggingConfig', + 'load_config', + + # Logger + 'ProgressLogger', + 'create_logger', + 'format_size', + 'format_rate', + 'format_time', + + # Protocols + 'IDatabase', + 'ILogger', +] diff --git a/src/shared/_protocols.py b/src/shared/_protocols.py index e69de29..3120edc 100644 --- a/src/shared/_protocols.py +++ b/src/shared/_protocols.py @@ -0,0 +1,67 @@ +"""Protocol definitions for the shared package""" +from typing import Protocol, Any +from pathlib import Path +from dataclasses import dataclass +from datetime import datetime + + +@dataclass +class FileRecord: + """Core file record with all metadata""" + path: Path + size: int + modified_time: float + created_time: float + disk: str + checksum: str | None = None + status: str = 'indexed' # indexed, planned, moved, verified + category: str | None = None + duplicate_of: str | None = None + + +@dataclass +class OperationRecord: + """Record of a migration operation""" + source_path: Path + dest_path: Path + operation_type: str # move, copy, hardlink, symlink + status: str = 'pending' # pending, in_progress, completed, failed + error: str | None = None + executed_at: datetime | None = None + verified: bool = False + + +class IDatabase(Protocol): + """Protocol for database operations""" + + def store_file(self, file_record: FileRecord) -> None: + """Store a file record""" + ... + + def get_files_by_disk(self, disk: str) -> list[FileRecord]: + """Get all files on a specific disk""" + ... + + def store_operation(self, operation: OperationRecord) -> None: + """Store an operation record""" + ... + + def get_pending_operations(self) -> list[OperationRecord]: + """Get all pending operations""" + ... + + +class ILogger(Protocol): + """Protocol for logging operations""" + + def info(self, message: str) -> None: + ... + + def warning(self, message: str) -> None: + ... + + def error(self, message: str) -> None: + ... + + def debug(self, message: str) -> None: + ... diff --git a/src/shared/config.py b/src/shared/config.py index e69de29..5b8c8d1 100644 --- a/src/shared/config.py +++ b/src/shared/config.py @@ -0,0 +1,110 @@ +"""Configuration management for disk reorganizer""" +import json +from pathlib import Path +from dataclasses import dataclass, asdict +from typing import Optional + + +@dataclass +class DatabaseConfig: + """Database connection configuration""" + host: str = '192.168.1.159' + port: int = 5432 + database: str = 'disk_reorganizer_db' + user: str = 'disk_reorg_user' + password: str = 'heel-goed-wachtwoord' + + def to_dict(self) -> dict: + """Convert to dictionary""" + return asdict(self) + + +@dataclass +class ProcessingConfig: + """Processing behavior configuration""" + batch_size: int = 1000 + commit_interval: int = 100 + parallel_workers: int = 4 + chunk_size: int = 8192 + hash_algorithm: str = 'sha256' + verify_operations: bool = True + preserve_timestamps: bool = True + + def to_dict(self) -> dict: + """Convert to dictionary""" + return asdict(self) + + +@dataclass +class LoggingConfig: + """Logging configuration""" + level: str = 'INFO' + log_file: str = 'disk_reorganizer.log' + console_output: bool = True + file_output: bool = True + + def to_dict(self) -> dict: + """Convert to dictionary""" + return asdict(self) + + +@dataclass +class Config: + """Main configuration container""" + database: DatabaseConfig = None + processing: ProcessingConfig = None + logging: LoggingConfig = None + + def __post_init__(self): + """Initialize nested configs with defaults if not provided""" + if self.database is None: + self.database = DatabaseConfig() + if self.processing is None: + self.processing = ProcessingConfig() + if self.logging is None: + self.logging = LoggingConfig() + + @classmethod + def from_file(cls, config_path: Path) -> 'Config': + """Load configuration from JSON file""" + if not config_path.exists(): + return cls() + + with open(config_path, 'r') as f: + data = json.load(f) + + return cls( + database=DatabaseConfig(**data.get('database', {})), + processing=ProcessingConfig(**data.get('processing', {})), + logging=LoggingConfig(**data.get('logging', {})) + ) + + def to_file(self, config_path: Path) -> None: + """Save configuration to JSON file""" + data = { + 'database': self.database.to_dict(), + 'processing': self.processing.to_dict(), + 'logging': self.logging.to_dict() + } + + with open(config_path, 'w') as f: + json.dump(data, f, indent=2) + + def to_dict(self) -> dict: + """Convert to dictionary""" + return { + 'database': self.database.to_dict(), + 'processing': self.processing.to_dict(), + 'logging': self.logging.to_dict() + } + + +def load_config(config_path: Optional[Path] = None) -> Config: + """Load configuration from file or return default""" + if config_path is None: + config_path = Path('config.json') + + if config_path.exists(): + return Config.from_file(config_path) + + return Config() diff --git a/src/shared/logger.py b/src/shared/logger.py index e69de29..87c1fb7 100644 --- a/src/shared/logger.py +++ b/src/shared/logger.py @@ -0,0 +1,217 @@ +"""Dynamic progress logger with formatting utilities""" +import sys +import logging +from typing import Optional +from datetime import datetime +from pathlib import Path + + +def format_size(bytes_size: int) -> str: + """Format bytes to human-readable size string + + Args: + bytes_size: Size in bytes + + Returns: + Human-readable size string (e.g., "1.5 GB", "234.5 MB") + """ + for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']: + if bytes_size < 1024.0: + return f"{bytes_size:.1f} {unit}" + bytes_size /= 1024.0 + return f"{bytes_size:.1f} EB" + + +def format_rate(bytes_per_second: float) -> str: + """Format transfer rate to human-readable string + + Args: + bytes_per_second: Transfer rate in bytes per second + + Returns: + Human-readable rate string (e.g., "125.3 MB/s") + """ + return f"{format_size(int(bytes_per_second))}/s" + + +def format_time(seconds: float) -> str: + """Format seconds to human-readable time string + + Args: + seconds: Time in seconds + + Returns: + Human-readable time string (e.g., "2h 34m 12s", "45m 23s", "12s") + """ + if seconds < 60: + return f"{int(seconds)}s" + elif seconds < 3600: + minutes = int(seconds // 60) + secs = int(seconds % 60) + return f"{minutes}m {secs}s" + else: + hours = int(seconds // 3600) + minutes = int((seconds % 3600) // 60) + secs = int(seconds % 60) + return f"{hours}h {minutes}m {secs}s" + + +class ProgressLogger: + """Dynamic progress logger with real-time statistics""" + + def __init__( + self, + name: str = "defrag", + level: int = logging.INFO, + log_file: Optional[Path] = None, + console_output: bool = True + ): + """Initialize progress logger + + Args: + name: Logger name + level: Logging level + log_file: Optional log file path + console_output: Whether to output to console + """ + self.logger = logging.getLogger(name) + self.logger.setLevel(level) + self.logger.handlers.clear() + + # Create formatter + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + # Add console handler + if console_output: + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setLevel(level) + console_handler.setFormatter(formatter) + self.logger.addHandler(console_handler) + + # Add file handler + if log_file: + log_file.parent.mkdir(parents=True, exist_ok=True) + file_handler = logging.FileHandler(log_file) + file_handler.setLevel(level) + file_handler.setFormatter(formatter) + self.logger.addHandler(file_handler) + + self._last_progress_line = "" + + def info(self, message: str) -> None: + """Log info message""" + self.logger.info(message) + + def warning(self, message: str) -> None: + """Log warning message""" + self.logger.warning(message) + + def error(self, message: str) -> None: + """Log error message""" + self.logger.error(message) + + def debug(self, message: str) -> None: + """Log debug message""" + self.logger.debug(message) + + def critical(self, message: str) -> None: + """Log critical message""" + self.logger.critical(message) + + def progress( + self, + current: int, + total: int, + prefix: str = "", + suffix: str = "", + bytes_processed: Optional[int] = None, + elapsed_seconds: Optional[float] = None + ) -> None: + """Log progress with dynamic statistics + + Args: + current: Current progress count + total: Total count + prefix: Prefix message + suffix: Suffix message + bytes_processed: Optional bytes processed for rate calculation + elapsed_seconds: Optional elapsed time for rate calculation + """ + if total == 0: + percent = 0.0 + else: + percent = (current / total) * 100 + + progress_msg = f"{prefix} [{current}/{total}] {percent:.1f}%" + + if bytes_processed is not None and elapsed_seconds is not None and elapsed_seconds > 0: + rate = bytes_per_second = bytes_processed / elapsed_seconds + progress_msg += f" | {format_size(bytes_processed)} @ {format_rate(rate)}" + + # Estimate time remaining + if current > 0: + estimated_total_seconds = (elapsed_seconds / current) * total + remaining_seconds = estimated_total_seconds - elapsed_seconds + progress_msg += f" | ETA: {format_time(remaining_seconds)}" + + if suffix: + progress_msg += f" | {suffix}" + + self.info(progress_msg) + + def section(self, title: str) -> None: + """Log section header + + Args: + title: Section title + """ + separator = "=" * 60 + self.info(separator) + self.info(f" {title}") + self.info(separator) + + def subsection(self, title: str) -> None: + """Log subsection header + + Args: + title: Subsection title + """ + self.info(f"\n--- {title} ---") + + +def create_logger( + name: str = "defrag", + level: str = "INFO", + log_file: Optional[Path] = None, + console_output: bool = True +) -> ProgressLogger: + """Create and configure a progress logger + + Args: + name: Logger name + level: Logging level as string + log_file: Optional log file path + console_output: Whether to output to console + + Returns: + Configured ProgressLogger instance + """ + level_map = { + 'DEBUG': logging.DEBUG, + 'INFO': logging.INFO, + 'WARNING': logging.WARNING, + 'ERROR': logging.ERROR, + 'CRITICAL': logging.CRITICAL + } + + log_level = level_map.get(level.upper(), logging.INFO) + + return ProgressLogger( + name=name, + level=log_level, + log_file=log_file, + console_output=console_output + ) diff --git a/src/shared/models.py b/src/shared/models.py index e69de29..e864b08 100644 --- a/src/shared/models.py +++ b/src/shared/models.py @@ -0,0 +1,127 @@ +"""Data models for the disk reorganizer""" +from dataclasses import dataclass, field +from pathlib import Path +from datetime import datetime +from typing import Optional + + +@dataclass +class FileRecord: + """Core file record with all metadata""" + path: Path + size: int + modified_time: float + created_time: float + disk: str + checksum: Optional[str] = None + status: str = 'indexed' # indexed, planned, moved, verified + category: Optional[str] = None + duplicate_of: Optional[str] = None + + def to_dict(self) -> dict: + """Convert to dictionary for serialization""" + return { + 'path': str(self.path), + 'size': self.size, + 'modified_time': self.modified_time, + 'created_time': self.created_time, + 'disk': self.disk, + 'checksum': self.checksum, + 'status': self.status, + 'category': self.category, + 'duplicate_of': self.duplicate_of + } + + +@dataclass +class OperationRecord: + """Record of a migration operation""" + source_path: Path + dest_path: Path + operation_type: str # move, copy, hardlink, symlink + size: int = 0 + status: str = 'pending' # pending, in_progress, completed, failed + error: Optional[str] = None + executed_at: Optional[datetime] = None + verified: bool = False + + def to_dict(self) -> dict: + """Convert to dictionary for serialization""" + return { + 'source_path': str(self.source_path), + 'dest_path': str(self.dest_path), + 'operation_type': self.operation_type, + 'size': self.size, + 'status': self.status, + 'error': self.error, + 'executed_at': self.executed_at.isoformat() if self.executed_at else None, + 'verified': self.verified + } + + +@dataclass +class DiskInfo: + """Information about a disk/volume""" + name: str + device: str + mount_point: Path + total_size: int + used_size: int + free_size: int + fs_type: str + + @property + def usage_percent(self) -> float: + """Calculate usage percentage""" + if self.total_size == 0: + return 0.0 + return (self.used_size / self.total_size) * 100 + + +@dataclass +class MigrationPlan: + """Complete migration plan""" + target_disk: str + destination_disks: list[str] + operations: list[OperationRecord] + total_size: int + file_count: int + created_at: datetime = field(default_factory=datetime.now) + + def to_dict(self) -> dict: + """Convert to dictionary for serialization""" + return { + 'target_disk': self.target_disk, + 'destination_disks': self.destination_disks, + 'operations': [op.to_dict() for op in self.operations], + 'total_size': self.total_size, + 'file_count': self.file_count, + 'created_at': self.created_at.isoformat() + } + + +@dataclass +class ProcessingStats: + """Statistics for processing operations""" + files_processed: int = 0 + bytes_processed: int = 0 + files_succeeded: int = 0 + files_failed: int = 0 + start_time: datetime = field(default_factory=datetime.now) + + @property + def elapsed_seconds(self) -> float: + """Calculate elapsed time in seconds""" + return (datetime.now() - self.start_time).total_seconds() + + @property + def files_per_second(self) -> float: + """Calculate processing rate""" + elapsed = self.elapsed_seconds + return self.files_processed / elapsed if elapsed > 0 else 0.0 + + @property + def bytes_per_second(self) -> float: + """Calculate throughput""" + elapsed = self.elapsed_seconds + return self.bytes_processed / elapsed if elapsed > 0 else 0.0