Files
defrag/output.md
2025-12-12 06:27:10 +01:00

183 KiB

===== sql/setup_database.sql =====

-- PostgreSQL Database Setup Script for Disk Reorganizer
-- Database: disk_reorganizer_db
-- User: disk_reorg_user

-- Create the database (run as superuser: auction)
CREATE DATABASE disk_reorganizer_db
    WITH
    ENCODING = 'UTF8'
    LC_COLLATE = 'en_US.UTF-8'
    LC_CTYPE = 'en_US.UTF-8'
    TEMPLATE = template0;

-- Connect to the new database
\c disk_reorganizer_db

-- Create the user
CREATE USER disk_reorg_user WITH PASSWORD 'heel-goed-wachtwoord';

-- Create files table
CREATE TABLE IF NOT EXISTS files (
    path TEXT PRIMARY KEY,
    size BIGINT NOT NULL,
    modified_time DOUBLE PRECISION NOT NULL,
    disk TEXT NOT NULL,
    checksum TEXT,
    status TEXT DEFAULT 'indexed',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Create index on disk column for faster queries
CREATE INDEX IF NOT EXISTS idx_files_disk ON files(disk);
CREATE INDEX IF NOT EXISTS idx_files_status ON files(status);

-- Create operations table
CREATE TABLE IF NOT EXISTS operations (
    id SERIAL PRIMARY KEY,
    source_path TEXT NOT NULL,
    dest_path TEXT NOT NULL,
    operation_type TEXT NOT NULL,
    executed INTEGER DEFAULT 0,
    verified INTEGER DEFAULT 0,
    error TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    executed_at TIMESTAMP
);

-- Create index on operations for faster lookups
CREATE INDEX IF NOT EXISTS idx_operations_executed ON operations(executed);
CREATE INDEX IF NOT EXISTS idx_operations_source ON operations(source_path);

-- Grant privileges to disk_reorg_user
GRANT CONNECT ON DATABASE disk_reorganizer_db TO disk_reorg_user;
GRANT USAGE ON SCHEMA public TO disk_reorg_user;
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE files TO disk_reorg_user;
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE operations TO disk_reorg_user;
GRANT USAGE, SELECT ON SEQUENCE operations_id_seq TO disk_reorg_user;

-- Create function to update updated_at timestamp
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = CURRENT_TIMESTAMP;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Create trigger for files table
CREATE TRIGGER update_files_updated_at
    BEFORE UPDATE ON files
    FOR EACH ROW
    EXECUTE FUNCTION update_updated_at_column();

-- Display success message
\echo 'Database setup completed successfully!'
\echo 'Database: disk_reorganizer_db'
\echo 'User: disk_reorg_user'
\echo 'Tables created: files, operations'
\echo 'Indexes and triggers created 2)'

===== sql/init.sql =====

-- sql/init.sql
-- Initialize PostgreSQL database for Project Defrag

-- Enable useful extensions
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS "pgcrypto";

-- Files table
CREATE TABLE IF NOT EXISTS files (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    path TEXT NOT NULL,
    size BIGINT NOT NULL,
    modified_time TIMESTAMP WITH TIME ZONE,
    created_time TIMESTAMP WITH TIME ZONE,
    file_hash VARCHAR(64),  -- SHA-256 hash
    category VARCHAR(50),
    disk_label VARCHAR(50),
    last_verified TIMESTAMP WITH TIME ZONE,

    -- Metadata
    metadata JSONB DEFAULT '{}',

    -- Audit fields
    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,

    -- Constraints
    CONSTRAINT unique_file_path UNIQUE(path)
);

-- Operations table (audit log)
CREATE TABLE IF NOT EXISTS operations (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    operation_type VARCHAR(50) NOT NULL,
    source_path TEXT,
    target_path TEXT,
    status VARCHAR(20) NOT NULL,

    -- File reference
    file_id UUID REFERENCES files(id) ON DELETE SET NULL,

    -- Performance metrics
    duration_ms INTEGER,
    bytes_processed BIGINT,

    -- Error information
    error_message TEXT,
    error_details JSONB,

    -- Context
    session_id VARCHAR(100),
    user_agent TEXT,

    -- Audit fields
    started_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    completed_at TIMESTAMP WITH TIME ZONE,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

-- Deduplication hash store
CREATE TABLE IF NOT EXISTS deduplication_store (
    hash VARCHAR(64) PRIMARY KEY,
    canonical_path TEXT NOT NULL,
    reference_count INTEGER DEFAULT 1,
    first_seen TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    last_seen TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

-- Migration plan table
CREATE TABLE IF NOT EXISTS migration_plans (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name VARCHAR(100) NOT NULL,
    source_disk VARCHAR(50) NOT NULL,
    target_disk VARCHAR(50) NOT NULL,
    plan_json JSONB NOT NULL,

    -- Statistics
    total_files INTEGER DEFAULT 0,
    total_size BIGINT DEFAULT 0,
    estimated_duration INTEGER,  -- in seconds

    -- Status
    status VARCHAR(20) DEFAULT 'draft',

    -- Audit
    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    executed_at TIMESTAMP WITH TIME ZONE,
    completed_at TIMESTAMP WITH TIME ZONE
);

-- Indexes for performance
CREATE INDEX IF NOT EXISTS idx_files_path ON files(path);
CREATE INDEX IF NOT EXISTS idx_files_hash ON files(file_hash);
CREATE INDEX IF NOT EXISTS idx_files_disk ON files(disk_label);
CREATE INDEX IF NOT EXISTS idx_files_category ON files(category);

CREATE INDEX IF NOT EXISTS idx_operations_status ON operations(status);
CREATE INDEX IF NOT EXISTS idx_operations_created ON operations(created_at);
CREATE INDEX IF NOT EXISTS idx_operations_file_id ON operations(file_id);

CREATE INDEX IF NOT EXISTS idx_dedup_canonical ON deduplication_store(canonical_path);

-- Functions for updating timestamps
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = CURRENT_TIMESTAMP;
    RETURN NEW;
END;
$$ language 'plpgsql';

-- Triggers for automatic updated_at
CREATE TRIGGER update_files_updated_at BEFORE UPDATE ON files
    FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

-- View for operational dashboard
CREATE OR REPLACE VIEW operational_dashboard AS
SELECT
    o.status,
    COUNT(*) as operation_count,
    SUM(o.bytes_processed) as total_bytes,
    AVG(o.duration_ms) as avg_duration_ms,
    MIN(o.started_at) as earliest_operation,
    MAX(o.completed_at) as latest_operation
FROM operations o
WHERE o.started_at > CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY o.status;

-- View for disk usage statistics
CREATE OR REPLACE VIEW disk_usage_stats AS
SELECT
    disk_label,
    COUNT(*) as file_count,
    SUM(size) as total_size,
    AVG(size) as avg_file_size,
    MIN(created_time) as oldest_file,
    MAX(modified_time) as newest_file
FROM files
GROUP BY disk_label;

-- Insert default configuration
INSERT INTO migration_plans (name, source_disk, target_disk, plan_json, status)
VALUES (
    'Default Migration Plan',
    'disk_d',
    'disk_e',
    '{"strategy": "hardlink", "verify_copies": true, "preserve_timestamps": true}'::jsonb,
    'draft'
) ON CONFLICT DO NOTHING;

-- Create read-only user for monitoring
DO $$
BEGIN
    IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = 'monitor_user') THEN
        CREATE USER monitor_user WITH PASSWORD 'monitor_password';
    END IF;
END
$$;

GRANT CONNECT ON DATABASE disk_reorganizer_db TO monitor_user;
GRANT USAGE ON SCHEMA public TO monitor_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO monitor_user;
GRANT SELECT ON operational_dashboard TO monitor_user;
GRANT SELECT ON disk_usage_stats TO monitor_user;```

===== src/classification/_protocols.py =====
```python
"""Protocol definitions for the classification package"""
from typing import Protocol, Optional
from pathlib import Path
from dataclasses import dataclass


@dataclass
class ClassificationRule:
    """Rule for classifying files"""
    name: str
    category: str
    patterns: list[str]
    priority: int = 0
    description: str = ""


class IClassifier(Protocol):
    """Protocol for classification operations"""

    def classify(self, path: Path, file_type: Optional[str] = None) -> Optional[str]:
        """Classify a file path

        Args:
            path: Path to classify
            file_type: Optional file type hint

        Returns:
            Category name or None if no match
        """
        ...

    def get_category_rules(self, category: str) -> list[ClassificationRule]:
        """Get all rules for a category

        Args:
            category: Category name

        Returns:
            List of rules for the category
        """
        ...


class IRuleEngine(Protocol):
    """Protocol for rule-based classification"""

    def add_rule(self, rule: ClassificationRule) -> None:
        """Add a classification rule

        Args:
            rule: Rule to add
        """
        ...

    def remove_rule(self, rule_name: str) -> None:
        """Remove a rule by name

        Args:
            rule_name: Name of rule to remove
        """
        ...

    def match_path(self, path: Path) -> Optional[str]:
        """Match path against rules

        Args:
            path: Path to match

        Returns:
            Category name or None if no match
        """
        ...

===== src/classification/engine.py =====

"""Main classification engine"""
from pathlib import Path
from typing import Optional, Callable
import psycopg2

from .rules import RuleBasedClassifier
from .ml import create_ml_classifier, DummyMLClassifier
from ..shared.models import ProcessingStats
from ..shared.config import DatabaseConfig
from ..shared.logger import ProgressLogger


class ClassificationEngine:
    """Engine for classifying files"""

    def __init__(
        self,
        db_config: DatabaseConfig,
        logger: ProgressLogger,
        use_ml: bool = False
    ):
        """Initialize classification engine

        Args:
            db_config: Database configuration
            logger: Progress logger
            use_ml: Whether to use ML classification in addition to rules
        """
        self.db_config = db_config
        self.logger = logger
        self.rule_classifier = RuleBasedClassifier()
        self.ml_classifier = create_ml_classifier() if use_ml else None
        self.use_ml = use_ml and not isinstance(self.ml_classifier, DummyMLClassifier)
        self._connection = None

    def _get_connection(self):
        """Get or create database connection"""
        if self._connection is None or self._connection.closed:
            self._connection = psycopg2.connect(
                host=self.db_config.host,
                port=self.db_config.port,
                database=self.db_config.database,
                user=self.db_config.user,
                password=self.db_config.password
            )
        return self._connection

    def classify_all(
        self,
        disk: Optional[str] = None,
        batch_size: int = 1000,
        progress_callback: Optional[Callable[[int, int, ProcessingStats], None]] = None
    ) -> ProcessingStats:
        """Classify all files in database

        Args:
            disk: Optional disk filter
            batch_size: Number of files to process per batch
            progress_callback: Optional callback for progress updates

        Returns:
            ProcessingStats with classification statistics
        """
        self.logger.section("Starting Classification")

        conn = self._get_connection()
        cursor = conn.cursor()

        # Get files without categories
        if disk:
            cursor.execute("""
                SELECT path, checksum
                FROM files
                WHERE disk = %s AND category IS NULL
            """, (disk,))
        else:
            cursor.execute("""
                SELECT path, checksum
                FROM files
                WHERE category IS NULL
            """)

        files_to_classify = cursor.fetchall()
        total_files = len(files_to_classify)

        self.logger.info(f"Found {total_files} files to classify")

        stats = ProcessingStats()
        batch = []

        for path_str, checksum in files_to_classify:
            path = Path(path_str)

            # Classify using rules first
            category = self.rule_classifier.classify(path)

            # If no rule match and ML is available, try ML
            if category is None and self.use_ml and self.ml_classifier:
                category = self.ml_classifier.classify(path)

            # If still no category, assign default
            if category is None:
                category = "temp/processing"

            batch.append((category, str(path)))
            stats.files_processed += 1

            # Batch update
            if len(batch) >= batch_size:
                self._update_categories(cursor, batch)
                conn.commit()
                batch.clear()

                # Progress callback
                if progress_callback:
                    progress_callback(stats.files_processed, total_files, stats)

                # Log progress
                if stats.files_processed % (batch_size * 10) == 0:
                    self.logger.progress(
                        stats.files_processed,
                        total_files,
                        prefix="Files classified",
                        elapsed_seconds=stats.elapsed_seconds
                    )

        # Update remaining batch
        if batch:
            self._update_categories(cursor, batch)
            conn.commit()

        stats.files_succeeded = stats.files_processed

        cursor.close()

        self.logger.info(
            f"Classification complete: {stats.files_processed} files in {stats.elapsed_seconds:.1f}s"
        )

        return stats

    def _update_categories(self, cursor, batch: list[tuple[str, str]]):
        """Update categories in batch

        Args:
            cursor: Database cursor
            batch: List of (category, path) tuples
        """
        from psycopg2.extras import execute_batch

        query = """
            UPDATE files
            SET category = %s
            WHERE path = %s
        """

        execute_batch(cursor, query, batch)

    def classify_path(self, path: Path) -> Optional[str]:
        """Classify a single path

        Args:
            path: Path to classify

        Returns:
            Category name or None
        """
        # Try rules first
        category = self.rule_classifier.classify(path)

        # Try ML if available
        if category is None and self.use_ml and self.ml_classifier:
            category = self.ml_classifier.classify(path)

        return category

    def get_category_stats(self) -> dict[str, dict]:
        """Get statistics by category

        Returns:
            Dictionary mapping category to statistics
        """
        conn = self._get_connection()
        cursor = conn.cursor()

        cursor.execute("""
            SELECT
                category,
                COUNT(*) as file_count,
                SUM(size) as total_size
            FROM files
            WHERE category IS NOT NULL
            GROUP BY category
            ORDER BY total_size DESC
        """)

        stats = {}
        for category, file_count, total_size in cursor.fetchall():
            stats[category] = {
                'file_count': file_count,
                'total_size': total_size
            }

        cursor.close()

        return stats

    def get_uncategorized_count(self) -> int:
        """Get count of uncategorized files

        Returns:
            Number of files without category
        """
        conn = self._get_connection()
        cursor = conn.cursor()

        cursor.execute("SELECT COUNT(*) FROM files WHERE category IS NULL")
        count = cursor.fetchone()[0]

        cursor.close()

        return count

    def reclassify_category(
        self,
        old_category: str,
        new_category: str
    ) -> int:
        """Reclassify all files in a category

        Args:
            old_category: Current category
            new_category: New category

        Returns:
            Number of files reclassified
        """
        self.logger.info(f"Reclassifying {old_category} -> {new_category}")

        conn = self._get_connection()
        cursor = conn.cursor()

        cursor.execute("""
            UPDATE files
            SET category = %s
            WHERE category = %s
        """, (new_category, old_category))

        count = cursor.rowcount
        conn.commit()
        cursor.close()

        self.logger.info(f"Reclassified {count} files")

        return count

    def train_ml_classifier(
        self,
        min_samples: int = 10
    ) -> bool:
        """Train ML classifier from existing categorized data

        Args:
            min_samples: Minimum samples per category

        Returns:
            True if training successful
        """
        if not self.use_ml or self.ml_classifier is None:
            self.logger.warning("ML classifier not available")
            return False

        self.logger.subsection("Training ML Classifier")

        conn = self._get_connection()
        cursor = conn.cursor()

        # Get categorized files
        cursor.execute("""
            SELECT path, category
            FROM files
            WHERE category IS NOT NULL
        """)

        training_data = [(Path(path), category) for path, category in cursor.fetchall()]
        cursor.close()

        if not training_data:
            self.logger.warning("No training data available")
            return False

        # Count samples per category
        category_counts = {}
        for _, category in training_data:
            category_counts[category] = category_counts.get(category, 0) + 1

        # Filter categories with enough samples
        filtered_data = [
            (path, category)
            for path, category in training_data
            if category_counts[category] >= min_samples
        ]

        if not filtered_data:
            self.logger.warning(f"No categories with >= {min_samples} samples")
            return False

        self.logger.info(f"Training with {len(filtered_data)} samples")

        try:
            self.ml_classifier.train(filtered_data)
            self.logger.info("ML classifier trained successfully")
            return True
        except Exception as e:
            self.logger.error(f"Failed to train ML classifier: {e}")
            return False

    def get_all_categories(self) -> list[str]:
        """Get all categories from database

        Returns:
            List of category names
        """
        conn = self._get_connection()
        cursor = conn.cursor()

        cursor.execute("""
            SELECT DISTINCT category
            FROM files
            WHERE category IS NOT NULL
            ORDER BY category
        """)

        categories = [row[0] for row in cursor.fetchall()]
        cursor.close()

        return categories

    def close(self):
        """Close database connection"""
        if self._connection and not self._connection.closed:
            self._connection.close()

    def __enter__(self):
        """Context manager entry"""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit"""
        self.close()

===== src/classification/ml.py =====

"""ML-based classification (optional, using sklearn if available)"""
from pathlib import Path
from typing import Optional, List, Tuple
import pickle

try:
    from sklearn.feature_extraction.text import TfidfVectorizer
    from sklearn.naive_bayes import MultinomialNB
    from sklearn.pipeline import Pipeline
    SKLEARN_AVAILABLE = True
except ImportError:
    SKLEARN_AVAILABLE = False


class MLClassifier:
    """Machine learning-based file classifier

    Uses path-based features and optional metadata to classify files.
    Requires scikit-learn to be installed.
    """

    def __init__(self):
        """Initialize ML classifier"""
        if not SKLEARN_AVAILABLE:
            raise ImportError(
                "scikit-learn is required for ML classification. "
                "Install with: pip install scikit-learn"
            )

        self.model: Optional[Pipeline] = None
        self.categories: List[str] = []
        self._is_trained = False

    def _extract_features(self, path: Path) -> str:
        """Extract features from path

        Args:
            path: Path to extract features from

        Returns:
            Feature string
        """
        # Convert path to feature string
        # Include: path parts, extension, filename
        parts = path.parts
        extension = path.suffix
        filename = path.name

        features = []

        # Add path components
        features.extend(parts)

        # Add extension
        if extension:
            features.append(f"ext:{extension}")

        # Add filename components (split on common separators)
        name_parts = filename.replace('-', ' ').replace('_', ' ').replace('.', ' ').split()
        features.extend([f"name:{part}" for part in name_parts])

        return ' '.join(features)

    def train(self, training_data: List[Tuple[Path, str]]) -> None:
        """Train the classifier

        Args:
            training_data: List of (path, category) tuples
        """
        if not training_data:
            raise ValueError("Training data cannot be empty")

        # Extract features and labels
        X = [self._extract_features(path) for path, _ in training_data]
        y = [category for _, category in training_data]

        # Store unique categories
        self.categories = sorted(set(y))

        # Create and train pipeline
        self.model = Pipeline([
            ('tfidf', TfidfVectorizer(
                max_features=1000,
                ngram_range=(1, 2),
                min_df=1
            )),
            ('classifier', MultinomialNB())
        ])

        self.model.fit(X, y)
        self._is_trained = True

    def classify(self, path: Path, file_type: Optional[str] = None) -> Optional[str]:
        """Classify a file path

        Args:
            path: Path to classify
            file_type: Optional file type hint (not used in ML classifier)

        Returns:
            Category name or None if not trained
        """
        if not self._is_trained or self.model is None:
            return None

        features = self._extract_features(path)

        try:
            prediction = self.model.predict([features])[0]
            return prediction
        except Exception:
            return None

    def predict_proba(self, path: Path) -> dict[str, float]:
        """Get prediction probabilities for all categories

        Args:
            path: Path to classify

        Returns:
            Dictionary mapping category to probability
        """
        if not self._is_trained or self.model is None:
            return {}

        features = self._extract_features(path)

        try:
            probabilities = self.model.predict_proba([features])[0]
            return {
                category: float(prob)
                for category, prob in zip(self.categories, probabilities)
            }
        except Exception:
            return {}

    def save_model(self, model_path: Path) -> None:
        """Save trained model to disk

        Args:
            model_path: Path to save model
        """
        if not self._is_trained:
            raise ValueError("Cannot save untrained model")

        model_data = {
            'model': self.model,
            'categories': self.categories,
            'is_trained': self._is_trained
        }

        with open(model_path, 'wb') as f:
            pickle.dump(model_data, f)

    def load_model(self, model_path: Path) -> None:
        """Load trained model from disk

        Args:
            model_path: Path to model file
        """
        with open(model_path, 'rb') as f:
            model_data = pickle.load(f)

        self.model = model_data['model']
        self.categories = model_data['categories']
        self._is_trained = model_data['is_trained']

    @property
    def is_trained(self) -> bool:
        """Check if model is trained"""
        return self._is_trained


class DummyMLClassifier:
    """Dummy ML classifier for when sklearn is not available"""

    def __init__(self):
        """Initialize dummy classifier"""
        pass

    def train(self, training_data: List[Tuple[Path, str]]) -> None:
        """Dummy train method"""
        raise NotImplementedError(
            "ML classification requires scikit-learn. "
            "Install with: pip install scikit-learn"
        )

    def classify(self, path: Path, file_type: Optional[str] = None) -> Optional[str]:
        """Dummy classify method"""
        return None

    def predict_proba(self, path: Path) -> dict[str, float]:
        """Dummy predict_proba method"""
        return {}

    def save_model(self, model_path: Path) -> None:
        """Dummy save_model method"""
        raise NotImplementedError("ML classification not available")

    def load_model(self, model_path: Path) -> None:
        """Dummy load_model method"""
        raise NotImplementedError("ML classification not available")

    @property
    def is_trained(self) -> bool:
        """Check if model is trained"""
        return False


def create_ml_classifier() -> MLClassifier | DummyMLClassifier:
    """Create ML classifier if sklearn is available, otherwise return dummy

    Returns:
        MLClassifier or DummyMLClassifier
    """
    if SKLEARN_AVAILABLE:
        return MLClassifier()
    else:
        return DummyMLClassifier()


def train_from_database(
    db_connection,
    min_samples_per_category: int = 10
) -> MLClassifier | DummyMLClassifier:
    """Train ML classifier from database

    Args:
        db_connection: Database connection
        min_samples_per_category: Minimum samples required per category

    Returns:
        Trained classifier
    """
    classifier = create_ml_classifier()

    if isinstance(classifier, DummyMLClassifier):
        return classifier

    # Query classified files from database
    cursor = db_connection.cursor()
    cursor.execute("""
        SELECT path, category
        FROM files
        WHERE category IS NOT NULL
    """)

    training_data = [(Path(path), category) for path, category in cursor.fetchall()]
    cursor.close()

    if not training_data:
        return classifier

    # Count samples per category
    category_counts = {}
    for _, category in training_data:
        category_counts[category] = category_counts.get(category, 0) + 1

    # Filter to categories with enough samples
    filtered_data = [
        (path, category)
        for path, category in training_data
        if category_counts[category] >= min_samples_per_category
    ]

    if filtered_data:
        classifier.train(filtered_data)

    return classifier

===== src/classification/init.py =====

"""Classification package exports"""
from .rules import RuleBasedClassifier
from .ml import create_ml_classifier, train_from_database, MLClassifier, DummyMLClassifier
from .engine import ClassificationEngine
from ._protocols import ClassificationRule, IClassifier, IRuleEngine

__all__ = [
    'RuleBasedClassifier',
    'MLClassifier',
    'DummyMLClassifier',
    'create_ml_classifier',
    'train_from_database',
    'ClassificationEngine',
    'ClassificationRule',
    'IClassifier',
    'IRuleEngine',
]

===== src/classification/rules.py =====

"""Rule-based classification engine"""
from pathlib import Path
from typing import Optional
import fnmatch

from ._protocols import ClassificationRule


class RuleBasedClassifier:
    """Rule-based file classifier using pattern matching"""

    def __init__(self):
        """Initialize rule-based classifier"""
        self.rules: list[ClassificationRule] = []
        self._load_default_rules()

    def _load_default_rules(self):
        """Load default classification rules based on ARCHITECTURE.md"""

        # Build artifacts and caches
        self.add_rule(ClassificationRule(
            name="maven_cache",
            category="artifacts/java/maven",
            patterns=["**/.m2/**", "**/.maven/**", "**/maven-central-cache/**"],
            priority=10,
            description="Maven repository and cache"
        ))

        self.add_rule(ClassificationRule(
            name="gradle_cache",
            category="artifacts/java/gradle",
            patterns=["**/.gradle/**", "**/gradle-cache/**", "**/gradle-build-cache/**"],
            priority=10,
            description="Gradle cache and artifacts"
        ))

        self.add_rule(ClassificationRule(
            name="python_cache",
            category="cache/pycache",
            patterns=["**/__pycache__/**", "**/*.pyc", "**/*.pyo"],
            priority=10,
            description="Python cache files"
        ))

        self.add_rule(ClassificationRule(
            name="python_artifacts",
            category="artifacts/python",
            patterns=["**/pip-cache/**", "**/pypi-cache/**", "**/wheelhouse/**"],
            priority=10,
            description="Python package artifacts"
        ))

        self.add_rule(ClassificationRule(
            name="node_modules",
            category="cache/node_modules-archive",
            patterns=["**/node_modules/**"],
            priority=10,
            description="Node.js modules"
        ))

        self.add_rule(ClassificationRule(
            name="node_cache",
            category="artifacts/node",
            patterns=["**/.npm/**", "**/npm-registry/**", "**/yarn-cache/**", "**/pnpm-store/**"],
            priority=10,
            description="Node.js package managers cache"
        ))

        self.add_rule(ClassificationRule(
            name="go_cache",
            category="artifacts/go",
            patterns=["**/goproxy-cache/**", "**/go/pkg/mod/**", "**/go-module-cache/**"],
            priority=10,
            description="Go module cache"
        ))

        # Version control
        self.add_rule(ClassificationRule(
            name="git_repos",
            category="development/git-infrastructure",
            patterns=["**/.git/**", "**/gitea/repositories/**"],
            priority=15,
            description="Git repositories and infrastructure"
        ))

        self.add_rule(ClassificationRule(
            name="gitea",
            category="development/gitea",
            patterns=["**/gitea/**"],
            priority=12,
            description="Gitea server data"
        ))

        # Databases
        self.add_rule(ClassificationRule(
            name="postgresql",
            category="databases/postgresql",
            patterns=["**/postgresql/**", "**/postgres/**", "**/*.sql"],
            priority=10,
            description="PostgreSQL databases"
        ))

        self.add_rule(ClassificationRule(
            name="mysql",
            category="databases/mysql",
            patterns=["**/mysql/**", "**/mariadb/**"],
            priority=10,
            description="MySQL/MariaDB databases"
        ))

        self.add_rule(ClassificationRule(
            name="mongodb",
            category="databases/mongodb",
            patterns=["**/mongodb/**", "**/mongo/**"],
            priority=10,
            description="MongoDB databases"
        ))

        self.add_rule(ClassificationRule(
            name="redis",
            category="databases/redis",
            patterns=["**/redis/**", "**/*.rdb"],
            priority=10,
            description="Redis databases"
        ))

        self.add_rule(ClassificationRule(
            name="sqlite",
            category="databases/sqlite",
            patterns=["**/*.db", "**/*.sqlite", "**/*.sqlite3"],
            priority=8,
            description="SQLite databases"
        ))

        # LLM and AI models
        self.add_rule(ClassificationRule(
            name="llm_models",
            category="cache/llm-models",
            patterns=[
                "**/hugging-face/**",
                "**/huggingface/**",
                "**/.cache/huggingface/**",
                "**/models/**/*.bin",
                "**/models/**/*.onnx",
                "**/models/**/*.safetensors",
                "**/llm*/**",
                "**/openai-cache/**"
            ],
            priority=12,
            description="LLM and AI model files"
        ))

        # Docker and containers
        self.add_rule(ClassificationRule(
            name="docker_volumes",
            category="apps/volumes/docker-volumes",
            patterns=["**/docker/volumes/**", "**/var/lib/docker/volumes/**"],
            priority=10,
            description="Docker volumes"
        ))

        self.add_rule(ClassificationRule(
            name="app_data",
            category="apps/volumes/app-data",
            patterns=["**/app-data/**", "**/application-data/**"],
            priority=8,
            description="Application data"
        ))

        # Build outputs
        self.add_rule(ClassificationRule(
            name="build_output",
            category="development/build-tools",
            patterns=["**/target/**", "**/build/**", "**/dist/**", "**/out/**"],
            priority=5,
            description="Build output directories"
        ))

        # Backups
        self.add_rule(ClassificationRule(
            name="system_backups",
            category="backups/system",
            patterns=["**/backup/**", "**/backups/**", "**/*.bak", "**/*.backup"],
            priority=10,
            description="System backups"
        ))

        self.add_rule(ClassificationRule(
            name="database_backups",
            category="backups/database",
            patterns=["**/*.sql.gz", "**/*.dump", "**/db-backup/**"],
            priority=11,
            description="Database backups"
        ))

        # Archives
        self.add_rule(ClassificationRule(
            name="archives",
            category="backups/archive",
            patterns=["**/*.tar", "**/*.tar.gz", "**/*.tgz", "**/*.zip", "**/*.7z"],
            priority=5,
            description="Archive files"
        ))

    def add_rule(self, rule: ClassificationRule) -> None:
        """Add a classification rule

        Args:
            rule: Rule to add
        """
        self.rules.append(rule)
        # Sort rules by priority (higher priority first)
        self.rules.sort(key=lambda r: r.priority, reverse=True)

    def remove_rule(self, rule_name: str) -> None:
        """Remove a rule by name

        Args:
            rule_name: Name of rule to remove
        """
        self.rules = [r for r in self.rules if r.name != rule_name]

    def match_path(self, path: Path) -> Optional[str]:
        """Match path against rules

        Args:
            path: Path to match

        Returns:
            Category name or None if no match
        """
        path_str = str(path)

        # Try to match each rule in priority order
        for rule in self.rules:
            for pattern in rule.patterns:
                if fnmatch.fnmatch(path_str, pattern):
                    return rule.category

        return None

    def classify(self, path: Path, file_type: Optional[str] = None) -> Optional[str]:
        """Classify a file path

        Args:
            path: Path to classify
            file_type: Optional file type hint

        Returns:
            Category name or None if no match
        """
        return self.match_path(path)

    def get_category_rules(self, category: str) -> list[ClassificationRule]:
        """Get all rules for a category

        Args:
            category: Category name

        Returns:
            List of rules for the category
        """
        return [r for r in self.rules if r.category == category]

    def get_all_categories(self) -> set[str]:
        """Get all defined categories

        Returns:
            Set of category names
        """
        return {r.category for r in self.rules}

    def get_rules_by_priority(self, min_priority: int = 0) -> list[ClassificationRule]:
        """Get rules above a minimum priority

        Args:
            min_priority: Minimum priority threshold

        Returns:
            List of rules with priority >= min_priority
        """
        return [r for r in self.rules if r.priority >= min_priority]

===== src/setup.py =====

#!/usr/bin/env python3
"""Setup script for defrag disk reorganizer"""
from setuptools import setup, find_packages
from pathlib import Path

# Read requirements
requirements_path = Path(__file__).parent / 'requirements.txt'
with open(requirements_path) as f:
    requirements = [
        line.strip() 
        for line in f 
        if line.strip() and not line.startswith('#')
    ]

# Read long description from README
readme_path = Path(__file__).parent / 'README.md'
long_description = ""
if readme_path.exists():
    with open(readme_path) as f:
        long_description = f.read()

setup(
    name='defrag',
    version='1.0.0',
    description='Intelligent disk reorganization system for 20TB+ data with deduplication and classification',
    long_description=long_description,
    long_description_content_type='text/markdown',
    author='Project Defrag',
    author_email='defrag@example.com',
    url='https://github.com/yourusername/defrag',
    packages=find_packages(),
    install_requires=requirements,
    python_requires='>=3.9',
    entry_points={
        'console_scripts': [
            'defrag=main:main',
        ],
    },
    classifiers=[
        'Development Status :: 4 - Beta',
        'Intended Audience :: System Administrators',
        'Topic :: System :: Filesystems',
        'License :: OSI Approved :: MIT License',
        'Programming Language :: Python :: 3',
        'Programming Language :: Python :: 3.9',
        'Programming Language :: Python :: 3.10',
        'Programming Language :: Python :: 3.11',
        'Programming Language :: Python :: 3.12',
    ],
    keywords='disk management storage deduplication classification migration',
)

===== src/deduplication/engine.py =====

"""Deduplication engine"""
from pathlib import Path
from typing import Optional, Callable
from concurrent.futures import ThreadPoolExecutor, as_completed
import psycopg2

from .chunker import compute_file_signature, hash_file
from .store import HashStore
from ..shared.models import FileRecord, ProcessingStats
from ..shared.config import DatabaseConfig, ProcessingConfig
from ..shared.logger import ProgressLogger


class DeduplicationEngine:
    """Engine for deduplicating files"""

    def __init__(
        self,
        db_config: DatabaseConfig,
        processing_config: ProcessingConfig,
        logger: ProgressLogger
    ):
        """Initialize deduplication engine

        Args:
            db_config: Database configuration
            processing_config: Processing configuration
            logger: Progress logger
        """
        self.db_config = db_config
        self.processing_config = processing_config
        self.logger = logger
        self.hash_store = HashStore(db_config)
        self._connection = None

    def _get_connection(self):
        """Get or create database connection"""
        if self._connection is None or self._connection.closed:
            self._connection = psycopg2.connect(
                host=self.db_config.host,
                port=self.db_config.port,
                database=self.db_config.database,
                user=self.db_config.user,
                password=self.db_config.password
            )
        return self._connection

    def deduplicate_all(
        self,
        disk: Optional[str] = None,
        use_chunks: bool = True,
        progress_callback: Optional[Callable[[int, int, ProcessingStats], None]] = None
    ) -> ProcessingStats:
        """Deduplicate all files in database

        Args:
            disk: Optional disk filter
            use_chunks: Whether to use chunk-level deduplication
            progress_callback: Optional callback for progress updates

        Returns:
            ProcessingStats with deduplication statistics
        """
        self.logger.section("Starting Deduplication")

        conn = self._get_connection()
        cursor = conn.cursor()

        # Get files without checksums
        if disk:
            cursor.execute("""
                SELECT path, size
                FROM files
                WHERE disk = %s AND checksum IS NULL
                ORDER BY size DESC
            """, (disk,))
        else:
            cursor.execute("""
                SELECT path, size
                FROM files
                WHERE checksum IS NULL
                ORDER BY size DESC
            """)

        files_to_process = cursor.fetchall()
        total_files = len(files_to_process)

        self.logger.info(f"Found {total_files} files to process")

        stats = ProcessingStats()

        # Process files with thread pool
        with ThreadPoolExecutor(max_workers=self.processing_config.parallel_workers) as executor:
            futures = {}

            for path_str, size in files_to_process:
                path = Path(path_str)
                future = executor.submit(self._process_file, path, use_chunks)
                futures[future] = (path, size)

            # Process completed futures
            for future in as_completed(futures):
                path, size = futures[future]

                try:
                    checksum, duplicate_of = future.result()

                    if checksum:
                        # Update database
                        cursor.execute("""
                            UPDATE files
                            SET checksum = %s, duplicate_of = %s
                            WHERE path = %s
                        """, (checksum, duplicate_of, str(path)))

                        stats.files_succeeded += 1
                        stats.bytes_processed += size

                    stats.files_processed += 1

                    # Commit periodically
                    if stats.files_processed % self.processing_config.commit_interval == 0:
                        conn.commit()

                        # Progress callback
                        if progress_callback:
                            progress_callback(stats.files_processed, total_files, stats)

                        # Log progress
                        self.logger.progress(
                            stats.files_processed,
                            total_files,
                            prefix="Files processed",
                            bytes_processed=stats.bytes_processed,
                            elapsed_seconds=stats.elapsed_seconds
                        )

                except Exception as e:
                    self.logger.warning(f"Failed to process {path}: {e}")
                    stats.files_failed += 1
                    stats.files_processed += 1

        # Final commit
        conn.commit()
        cursor.close()

        self.logger.info(
            f"Deduplication complete: {stats.files_succeeded}/{total_files} files, "
            f"{stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s"
        )

        return stats

    def _process_file(
        self,
        path: Path,
        use_chunks: bool
    ) -> tuple[Optional[str], Optional[str]]:
        """Process a single file for deduplication

        Args:
            path: Path to file
            use_chunks: Whether to use chunk-level deduplication

        Returns:
            Tuple of (checksum, duplicate_of_path)
        """
        if not path.exists():
            return None, None

        try:
            if use_chunks:
                # Compute file signature with chunks
                checksum, chunk_hashes = compute_file_signature(
                    path,
                    use_rabin=True,
                    avg_chunk_size=self.processing_config.chunk_size
                )
            else:
                # Just compute file hash
                checksum = hash_file(
                    path,
                    algorithm=self.processing_config.hash_algorithm
                )
                chunk_hashes = None

            # Check if hash exists
            if self.hash_store.exists(checksum):
                # Duplicate found
                canonical_path = self.hash_store.get_canonical(checksum)
                return checksum, canonical_path
            else:
                # New unique file
                size = path.stat().st_size
                self.hash_store.store_canonical(
                    checksum,
                    path,
                    size,
                    chunk_hashes
                )
                return checksum, None

        except Exception as e:
            self.logger.debug(f"Error processing {path}: {e}")
            raise

    def find_duplicates(
        self,
        disk: Optional[str] = None
    ) -> dict[str, list[str]]:
        """Find all duplicate files

        Args:
            disk: Optional disk filter

        Returns:
            Dictionary mapping canonical path to list of duplicate paths
        """
        self.logger.subsection("Finding Duplicates")

        conn = self._get_connection()
        cursor = conn.cursor()

        # Query for duplicates
        if disk:
            cursor.execute("""
                SELECT checksum, array_agg(path ORDER BY path) as paths
                FROM files
                WHERE disk = %s AND checksum IS NOT NULL
                GROUP BY checksum
                HAVING COUNT(*) > 1
            """, (disk,))
        else:
            cursor.execute("""
                SELECT checksum, array_agg(path ORDER BY path) as paths
                FROM files
                WHERE checksum IS NOT NULL
                GROUP BY checksum
                HAVING COUNT(*) > 1
            """)

        duplicates = {}
        for checksum, paths in cursor.fetchall():
            canonical = paths[0]
            duplicates[canonical] = paths[1:]

        cursor.close()

        self.logger.info(f"Found {len(duplicates)} sets of duplicates")

        return duplicates

    def get_deduplication_stats(self) -> dict:
        """Get deduplication statistics

        Returns:
            Dictionary with statistics
        """
        conn = self._get_connection()
        cursor = conn.cursor()

        stats = {}

        # Total files
        cursor.execute("SELECT COUNT(*) FROM files WHERE checksum IS NOT NULL")
        stats['total_files'] = cursor.fetchone()[0]

        # Unique files
        cursor.execute("SELECT COUNT(DISTINCT checksum) FROM files WHERE checksum IS NOT NULL")
        stats['unique_files'] = cursor.fetchone()[0]

        # Duplicate files
        stats['duplicate_files'] = stats['total_files'] - stats['unique_files']

        # Total size
        cursor.execute("SELECT COALESCE(SUM(size), 0) FROM files WHERE checksum IS NOT NULL")
        stats['total_size'] = cursor.fetchone()[0]

        # Unique size
        cursor.execute("""
            SELECT COALESCE(SUM(size), 0)
            FROM (
                SELECT DISTINCT ON (checksum) size
                FROM files
                WHERE checksum IS NOT NULL
            ) AS unique_files
        """)
        stats['unique_size'] = cursor.fetchone()[0]

        # Wasted space
        stats['wasted_space'] = stats['total_size'] - stats['unique_size']

        # Deduplication ratio
        if stats['total_size'] > 0:
            stats['dedup_ratio'] = stats['unique_size'] / stats['total_size']
        else:
            stats['dedup_ratio'] = 1.0

        # Space saved percentage
        if stats['total_size'] > 0:
            stats['space_saved_percent'] = (stats['wasted_space'] / stats['total_size']) * 100
        else:
            stats['space_saved_percent'] = 0.0

        cursor.close()

        return stats

    def mark_canonical_files(self) -> int:
        """Mark canonical (first occurrence) files in database

        Returns:
            Number of canonical files marked
        """
        self.logger.subsection("Marking Canonical Files")

        conn = self._get_connection()
        cursor = conn.cursor()

        # Find first occurrence of each checksum and mark as canonical
        cursor.execute("""
            WITH canonical AS (
                SELECT DISTINCT ON (checksum) path, checksum
                FROM files
                WHERE checksum IS NOT NULL
                ORDER BY checksum, path
            )
            UPDATE files
            SET duplicate_of = NULL
            WHERE path IN (SELECT path FROM canonical)
        """)

        count = cursor.rowcount
        conn.commit()
        cursor.close()

        self.logger.info(f"Marked {count} canonical files")

        return count

    def close(self):
        """Close connections"""
        self.hash_store.close()
        if self._connection and not self._connection.closed:
            self._connection.close()

    def __enter__(self):
        """Context manager entry"""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit"""
        self.close()

===== src/deduplication/init.py =====

"""Deduplication package exports"""
from .chunker import (
    RabinChunker,
    SimpleChunker,
    hash_chunk,
    hash_file,
    compute_file_signature
)
from .store import HashStore, MemoryHashStore
from .engine import DeduplicationEngine

__all__ = [
    'RabinChunker',
    'SimpleChunker',
    'hash_chunk',
    'hash_file',
    'compute_file_signature',
    'HashStore',
    'MemoryHashStore',
    'DeduplicationEngine',
]

===== src/deduplication/chunker.py =====

"""Rabin fingerprint chunker for content-defined chunking"""
import hashlib
from pathlib import Path
from typing import Iterator, Optional


class RabinChunker:
    """Content-defined chunking using Rabin fingerprinting

    Uses a rolling hash to identify chunk boundaries based on content,
    allowing for efficient deduplication even when data is modified.
    """

    def __init__(
        self,
        avg_chunk_size: int = 8192,
        min_chunk_size: Optional[int] = None,
        max_chunk_size: Optional[int] = None,
        window_size: int = 48
    ):
        """Initialize Rabin chunker

        Args:
            avg_chunk_size: Target average chunk size in bytes
            min_chunk_size: Minimum chunk size (default: avg_chunk_size // 4)
            max_chunk_size: Maximum chunk size (default: avg_chunk_size * 8)
            window_size: Rolling hash window size
        """
        self.avg_chunk_size = avg_chunk_size
        self.min_chunk_size = min_chunk_size or (avg_chunk_size // 4)
        self.max_chunk_size = max_chunk_size or (avg_chunk_size * 8)
        self.window_size = window_size

        # Calculate mask for boundary detection
        # For avg_chunk_size, we want boundaries at 1/avg_chunk_size probability
        bits = 0
        size = avg_chunk_size
        while size > 1:
            bits += 1
            size >>= 1
        self.mask = (1 << bits) - 1

        # Polynomial for rolling hash (prime number)
        self.poly = 0x3DA3358B4DC173

    def chunk_file(self, file_path: Path, chunk_size: Optional[int] = None) -> Iterator[bytes]:
        """Chunk a file using Rabin fingerprinting

        Args:
            file_path: Path to file to chunk
            chunk_size: If provided, use fixed-size chunking instead

        Yields:
            Chunk data as bytes
        """
        if chunk_size:
            # Use fixed-size chunking
            yield from self._chunk_fixed(file_path, chunk_size)
        else:
            # Use content-defined chunking
            yield from self._chunk_rabin(file_path)

    def _chunk_fixed(self, file_path: Path, chunk_size: int) -> Iterator[bytes]:
        """Fixed-size chunking

        Args:
            file_path: Path to file
            chunk_size: Chunk size in bytes

        Yields:
            Fixed-size chunks
        """
        with open(file_path, 'rb') as f:
            while True:
                chunk = f.read(chunk_size)
                if not chunk:
                    break
                yield chunk

    def _chunk_rabin(self, file_path: Path) -> Iterator[bytes]:
        """Content-defined chunking using Rabin fingerprinting

        Args:
            file_path: Path to file

        Yields:
            Variable-size chunks based on content
        """
        with open(file_path, 'rb') as f:
            chunk_data = bytearray()
            window = bytearray()
            hash_value = 0

            while True:
                byte = f.read(1)
                if not byte:
                    # End of file - yield remaining data
                    if chunk_data:
                        yield bytes(chunk_data)
                    break

                chunk_data.extend(byte)
                window.extend(byte)

                # Maintain window size
                if len(window) > self.window_size:
                    window.pop(0)

                # Update rolling hash
                hash_value = self._rolling_hash(window)

                # Check if we should create a boundary
                should_break = (
                    len(chunk_data) >= self.min_chunk_size and
                    (
                        (hash_value & self.mask) == 0 or
                        len(chunk_data) >= self.max_chunk_size
                    )
                )

                if should_break:
                    yield bytes(chunk_data)
                    chunk_data = bytearray()
                    window = bytearray()
                    hash_value = 0

    def _rolling_hash(self, window: bytearray) -> int:
        """Calculate rolling hash for window

        Args:
            window: Byte window

        Returns:
            Hash value
        """
        hash_value = 0
        for byte in window:
            hash_value = ((hash_value << 1) + byte) & 0xFFFFFFFFFFFFFFFF
        return hash_value


class SimpleChunker:
    """Simple fixed-size chunker for comparison"""

    def __init__(self, chunk_size: int = 8192):
        """Initialize simple chunker

        Args:
            chunk_size: Fixed chunk size in bytes
        """
        self.chunk_size = chunk_size

    def chunk_file(self, file_path: Path) -> Iterator[bytes]:
        """Chunk file into fixed-size pieces

        Args:
            file_path: Path to file

        Yields:
            Fixed-size chunks
        """
        with open(file_path, 'rb') as f:
            while True:
                chunk = f.read(self.chunk_size)
                if not chunk:
                    break
                yield chunk


def hash_chunk(chunk: bytes, algorithm: str = 'sha256') -> str:
    """Hash a chunk of data

    Args:
        chunk: Chunk data
        algorithm: Hash algorithm (default: sha256)

    Returns:
        Hex digest of hash
    """
    hasher = hashlib.new(algorithm)
    hasher.update(chunk)
    return hasher.hexdigest()


def hash_file(file_path: Path, algorithm: str = 'sha256', chunk_size: int = 65536) -> str:
    """Hash entire file

    Args:
        file_path: Path to file
        algorithm: Hash algorithm (default: sha256)
        chunk_size: Size of chunks to read

    Returns:
        Hex digest of file hash
    """
    hasher = hashlib.new(algorithm)

    with open(file_path, 'rb') as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            hasher.update(chunk)

    return hasher.hexdigest()


def compute_file_signature(
    file_path: Path,
    use_rabin: bool = True,
    avg_chunk_size: int = 8192
) -> tuple[str, list[str]]:
    """Compute file signature with chunk hashes

    Args:
        file_path: Path to file
        use_rabin: Whether to use Rabin chunking (vs fixed-size)
        avg_chunk_size: Average chunk size for Rabin or fixed size

    Returns:
        Tuple of (file_hash, list of chunk hashes)
    """
    if use_rabin:
        chunker = RabinChunker(avg_chunk_size=avg_chunk_size)
    else:
        chunker = SimpleChunker(chunk_size=avg_chunk_size)

    chunk_hashes = []
    file_hasher = hashlib.sha256()

    for chunk in chunker.chunk_file(file_path):
        # Hash individual chunk
        chunk_hash = hash_chunk(chunk)
        chunk_hashes.append(chunk_hash)

        # Update file hash
        file_hasher.update(chunk)

    file_hash = file_hasher.hexdigest()

    return file_hash, chunk_hashes

===== src/deduplication/store.py =====

"""Hash store for deduplication with optional Redis support"""
from typing import Optional, Dict, Set
from pathlib import Path
import psycopg2
from psycopg2.extras import execute_batch

from ..shared.config import DatabaseConfig


class HashStore:
    """PostgreSQL-based hash store for deduplication"""

    def __init__(self, db_config: DatabaseConfig):
        """Initialize hash store

        Args:
            db_config: Database configuration
        """
        self.db_config = db_config
        self._connection = None

    def _get_connection(self):
        """Get or create database connection"""
        if self._connection is None or self._connection.closed:
            self._connection = psycopg2.connect(
                host=self.db_config.host,
                port=self.db_config.port,
                database=self.db_config.database,
                user=self.db_config.user,
                password=self.db_config.password
            )
        return self._connection

    def _ensure_tables(self):
        """Ensure hash store tables exist"""
        conn = self._get_connection()
        cursor = conn.cursor()

        # Create hashes table for file-level deduplication
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS file_hashes (
                checksum TEXT PRIMARY KEY,
                canonical_path TEXT NOT NULL,
                size BIGINT NOT NULL,
                first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                ref_count INTEGER DEFAULT 1
            )
        """)

        # Create chunk hashes table for chunk-level deduplication
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS chunk_hashes (
                chunk_hash TEXT PRIMARY KEY,
                size INTEGER NOT NULL,
                first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                ref_count INTEGER DEFAULT 1
            )
        """)

        # Create file-chunk mapping table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS file_chunks (
                id SERIAL PRIMARY KEY,
                file_checksum TEXT NOT NULL,
                chunk_hash TEXT NOT NULL,
                chunk_index INTEGER NOT NULL,
                FOREIGN KEY (file_checksum) REFERENCES file_hashes(checksum),
                FOREIGN KEY (chunk_hash) REFERENCES chunk_hashes(chunk_hash),
                UNIQUE (file_checksum, chunk_index)
            )
        """)

        # Create indexes
        cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx_file_chunks_file
            ON file_chunks(file_checksum)
        """)

        cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx_file_chunks_chunk
            ON file_chunks(chunk_hash)
        """)

        conn.commit()
        cursor.close()

    def exists(self, checksum: str) -> bool:
        """Check if hash exists in store

        Args:
            checksum: File hash to check

        Returns:
            True if hash exists
        """
        self._ensure_tables()
        conn = self._get_connection()
        cursor = conn.cursor()

        cursor.execute(
            "SELECT 1 FROM file_hashes WHERE checksum = %s LIMIT 1",
            (checksum,)
        )

        exists = cursor.fetchone() is not None
        cursor.close()

        return exists

    def get_canonical(self, checksum: str) -> Optional[str]:
        """Get canonical path for a hash

        Args:
            checksum: File hash

        Returns:
            Canonical file path or None if not found
        """
        self._ensure_tables()
        conn = self._get_connection()
        cursor = conn.cursor()

        cursor.execute(
            "SELECT canonical_path FROM file_hashes WHERE checksum = %s",
            (checksum,)
        )

        result = cursor.fetchone()
        cursor.close()

        return result[0] if result else None

    def store_canonical(
        self,
        checksum: str,
        path: Path,
        size: int,
        chunk_hashes: Optional[list[str]] = None
    ) -> None:
        """Store canonical reference for a hash

        Args:
            checksum: File hash
            path: Canonical file path
            size: File size in bytes
            chunk_hashes: Optional list of chunk hashes
        """
        self._ensure_tables()
        conn = self._get_connection()
        cursor = conn.cursor()

        try:
            # Store file hash
            cursor.execute("""
                INSERT INTO file_hashes (checksum, canonical_path, size)
                VALUES (%s, %s, %s)
                ON CONFLICT (checksum) DO UPDATE SET
                    ref_count = file_hashes.ref_count + 1
            """, (checksum, str(path), size))

            # Store chunk hashes if provided
            if chunk_hashes:
                # Insert chunk hashes
                chunk_data = [(chunk_hash, 0) for chunk_hash in chunk_hashes]
                execute_batch(cursor, """
                    INSERT INTO chunk_hashes (chunk_hash, size)
                    VALUES (%s, %s)
                    ON CONFLICT (chunk_hash) DO UPDATE SET
                        ref_count = chunk_hashes.ref_count + 1
                """, chunk_data, page_size=1000)

                # Create file-chunk mappings
                mapping_data = [
                    (checksum, chunk_hash, idx)
                    for idx, chunk_hash in enumerate(chunk_hashes)
                ]
                execute_batch(cursor, """
                    INSERT INTO file_chunks (file_checksum, chunk_hash, chunk_index)
                    VALUES (%s, %s, %s)
                    ON CONFLICT (file_checksum, chunk_index) DO NOTHING
                """, mapping_data, page_size=1000)

            conn.commit()

        except Exception as e:
            conn.rollback()
            raise

        finally:
            cursor.close()

    def get_chunk_hashes(self, checksum: str) -> list[str]:
        """Get chunk hashes for a file

        Args:
            checksum: File hash

        Returns:
            List of chunk hashes in order
        """
        self._ensure_tables()
        conn = self._get_connection()
        cursor = conn.cursor()

        cursor.execute("""
            SELECT chunk_hash
            FROM file_chunks
            WHERE file_checksum = %s
            ORDER BY chunk_index
        """, (checksum,))

        chunk_hashes = [row[0] for row in cursor.fetchall()]
        cursor.close()

        return chunk_hashes

    def get_duplicates(self) -> Dict[str, list[str]]:
        """Get all duplicate file groups

        Returns:
            Dictionary mapping canonical path to list of duplicate paths
        """
        self._ensure_tables()
        conn = self._get_connection()
        cursor = conn.cursor()

        # Get all files with their hashes
        cursor.execute("""
            SELECT f.path, f.checksum
            FROM files f
            WHERE f.checksum IS NOT NULL
        """)

        # Group by checksum
        hash_to_paths: Dict[str, list[str]] = {}
        for path, checksum in cursor.fetchall():
            if checksum not in hash_to_paths:
                hash_to_paths[checksum] = []
            hash_to_paths[checksum].append(path)

        cursor.close()

        # Filter to only duplicates (more than one file)
        duplicates = {
            paths[0]: paths[1:]
            for checksum, paths in hash_to_paths.items()
            if len(paths) > 1
        }

        return duplicates

    def get_stats(self) -> Dict[str, int]:
        """Get hash store statistics

        Returns:
            Dictionary with statistics
        """
        self._ensure_tables()
        conn = self._get_connection()
        cursor = conn.cursor()

        stats = {}

        # Count unique file hashes
        cursor.execute("SELECT COUNT(*) FROM file_hashes")
        stats['unique_files'] = cursor.fetchone()[0]

        # Count unique chunk hashes
        cursor.execute("SELECT COUNT(*) FROM chunk_hashes")
        stats['unique_chunks'] = cursor.fetchone()[0]

        # Count total references
        cursor.execute("SELECT COALESCE(SUM(ref_count), 0) FROM file_hashes")
        stats['total_file_refs'] = cursor.fetchone()[0]

        # Count total chunk references
        cursor.execute("SELECT COALESCE(SUM(ref_count), 0) FROM chunk_hashes")
        stats['total_chunk_refs'] = cursor.fetchone()[0]

        # Calculate deduplication ratio
        if stats['total_file_refs'] > 0:
            stats['dedup_ratio'] = stats['unique_files'] / stats['total_file_refs']
        else:
            stats['dedup_ratio'] = 1.0

        cursor.close()

        return stats

    def find_similar_files(self, checksum: str, threshold: float = 0.8) -> list[tuple[str, float]]:
        """Find files similar to given hash based on chunk overlap

        Args:
            checksum: File hash to compare
            threshold: Similarity threshold (0.0 to 1.0)

        Returns:
            List of tuples (other_checksum, similarity_score)
        """
        self._ensure_tables()
        conn = self._get_connection()
        cursor = conn.cursor()

        # Get chunks for the target file
        target_chunks = set(self.get_chunk_hashes(checksum))

        if not target_chunks:
            cursor.close()
            return []

        # Find files sharing chunks
        cursor.execute("""
            SELECT DISTINCT fc.file_checksum
            FROM file_chunks fc
            WHERE fc.chunk_hash = ANY(%s)
            AND fc.file_checksum != %s
        """, (list(target_chunks), checksum))

        similar_files = []

        for (other_checksum,) in cursor.fetchall():
            other_chunks = set(self.get_chunk_hashes(other_checksum))

            # Calculate Jaccard similarity
            intersection = len(target_chunks & other_chunks)
            union = len(target_chunks | other_chunks)

            if union > 0:
                similarity = intersection / union

                if similarity >= threshold:
                    similar_files.append((other_checksum, similarity))

        cursor.close()

        # Sort by similarity descending
        similar_files.sort(key=lambda x: x[1], reverse=True)

        return similar_files

    def close(self):
        """Close database connection"""
        if self._connection and not self._connection.closed:
            self._connection.close()

    def __enter__(self):
        """Context manager entry"""
        self._ensure_tables()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit"""
        self.close()


class MemoryHashStore:
    """In-memory hash store for testing and small datasets"""

    def __init__(self):
        """Initialize in-memory hash store"""
        self.hashes: Dict[str, tuple[str, int]] = {}
        self.chunks: Dict[str, int] = {}
        self.file_chunks: Dict[str, list[str]] = {}

    def exists(self, checksum: str) -> bool:
        """Check if hash exists"""
        return checksum in self.hashes

    def get_canonical(self, checksum: str) -> Optional[str]:
        """Get canonical path"""
        return self.hashes.get(checksum, (None, 0))[0]

    def store_canonical(
        self,
        checksum: str,
        path: Path,
        size: int,
        chunk_hashes: Optional[list[str]] = None
    ) -> None:
        """Store canonical reference"""
        self.hashes[checksum] = (str(path), size)

        if chunk_hashes:
            self.file_chunks[checksum] = chunk_hashes
            for chunk_hash in chunk_hashes:
                self.chunks[chunk_hash] = self.chunks.get(chunk_hash, 0) + 1

    def get_chunk_hashes(self, checksum: str) -> list[str]:
        """Get chunk hashes"""
        return self.file_chunks.get(checksum, [])

    def get_stats(self) -> Dict[str, int]:
        """Get statistics"""
        return {
            'unique_files': len(self.hashes),
            'unique_chunks': len(self.chunks),
            'total_file_refs': len(self.hashes),
            'total_chunk_refs': sum(self.chunks.values()),
            'dedup_ratio': 1.0
        }

    def close(self):
        """No-op for compatibility"""
        pass

    def __enter__(self):
        """Context manager entry"""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit"""
        pass

===== src/main.py =====

#!/usr/bin/env python3
"""
Disk Reorganizer - Safely restructure files across disks to free up one entire disk.
Three modes: index, plan, execute
"""

import os
import sys
import psycopg2
from psycopg2 import sql
from psycopg2.extras import RealDictCursor
import shutil
import hashlib
import argparse
import json
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import List, Dict, Optional, Tuple
from datetime import datetime
import logging
import time

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('disk_reorganizer.log'),
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger(__name__)

@dataclass
class FileRecord:
    """Represents a file in the index"""
    path: str
    size: int
    modified_time: float
    disk: str
    checksum: Optional[str] = None
    status: str = 'indexed'  # indexed, planned, moved, verified

class DiskReorganizer:
    def __init__(self, db_config: Dict = None):
        """
        Initialize DiskReorganizer with PostgreSQL connection
        :param db_config: Database configuration dict with host, port, database, user, password
        """
        if db_config is None:
            db_config = {
                'host': '192.168.1.159',
                'port': 5432,
                'database': 'disk_reorganizer_db',
                'user': 'disk_reorg_user',
                'password': 'heel-goed-wachtwoord'
            }
        self.db_config = db_config
        self.init_database()

    def get_connection(self):
        """Get PostgreSQL database connection"""
        return psycopg2.connect(**self.db_config)

    def init_database(self):
        """Verify PostgreSQL database connection and tables exist"""
        try:
            conn = self.get_connection()
            cursor = conn.cursor()

            # Test connection and verify tables exist
            cursor.execute("""
                SELECT table_name FROM information_schema.tables
                WHERE table_schema = 'public' AND table_name IN ('files', 'operations')
            """)
            tables = cursor.fetchall()

            if len(tables) < 2:
                logger.error("Database tables not found! Please run setup_database.sh first.")
                raise Exception("Database not properly initialized. Run setup_database.sh")

            cursor.close()
            conn.close()
            logger.info("Database connection verified successfully")
        except psycopg2.Error as e:
            logger.error(f"Database connection failed: {e}")
            raise

    def index_disk(self, disk_root: str, disk_name: str):
        """
        Index all files on a disk/partition with dynamic progress display
        :param disk_root: Root path of disk (e.g., 'D:\\')
        :param disk_name: Logical name for the disk
        """
        logger.info(f"Indexing disk: {disk_name} at {disk_root}")
        disk_path = Path(disk_root)

        if not disk_path.exists():
            logger.error(f"Disk path {disk_root} does not exist!")
            return

        files_count = 0
        total_size = 0
        start_time = time.time()

        conn = self.get_connection()
        cursor = conn.cursor()

        try:
            # Walk through all files
            for root, dirs, files in os.walk(disk_path):
                # Skip system directories
                dirs[:] = [d for d in dirs if not d.startswith(('$', 'System Volume Information', 'Recovery'))]

                for file in files:
                    try:
                        file_path = Path(root) / file
                        if not file_path.is_file():
                            continue

                        stat = file_path.stat()
                        size = stat.st_size
                        mtime = stat.st_mtime

                        # Calculate relative path for portability
                        rel_path = str(file_path.relative_to(disk_path))

                        # PostgreSQL INSERT ... ON CONFLICT for upsert
                        cursor.execute("""
                            INSERT INTO files (path, size, modified_time, disk, checksum, status)
                            VALUES (%s, %s, %s, %s, %s, %s)
                            ON CONFLICT (path) DO UPDATE SET
                                size = EXCLUDED.size,
                                modified_time = EXCLUDED.modified_time,
                                disk = EXCLUDED.disk,
                                status = EXCLUDED.status
                        """, (rel_path, size, mtime, disk_name, None, 'indexed'))

                        files_count += 1
                        total_size += size

                        # Dynamic progress display - update every 100 files
                        if files_count % 100 == 0:
                            elapsed = time.time() - start_time
                            rate = files_count / elapsed if elapsed > 0 else 0
                            # Truncate path for display
                            display_path = str(file_path)
                            if len(display_path) > 60:
                                display_path = '...' + display_path[-57:]

                            # Use \r to overwrite the line
                            print(f"\rIndexing: {files_count:,} files | {self.format_size(total_size)} | {rate:.0f} files/s | {display_path}", end='', flush=True)

                        # Commit every 1000 files for performance
                        if files_count % 1000 == 0:
                            conn.commit()

                    except Exception as e:
                        logger.warning(f"\nSkipping {file_path}: {e}")
                        continue

            conn.commit()
            print()  # New line after progress display
            logger.info(f"Completed indexing {disk_name}: {files_count} files, {self.format_size(total_size)}")

        finally:
            cursor.close()
            conn.close()

    def calculate_disk_usage(self) -> Dict[str, Dict]:
        """Calculate current usage per disk"""
        conn = self.get_connection()
        cursor = conn.cursor()

        try:
            cursor.execute("""
                SELECT disk, SUM(size) as total_size, COUNT(*) as file_count
                FROM files
                GROUP BY disk
            """)

            usage = {}
            for row in cursor.fetchall():
                disk = row[0]
                size = row[1] or 0
                count = row[2]
                usage[disk] = {
                    'size': size,
                    'count': count,
                    'formatted_size': self.format_size(size)
                }

            return usage
        finally:
            cursor.close()
            conn.close()

    def plan_migration(self, target_disk: str, destination_disks: List[str]) -> Dict:
        """
        Create a migration plan to free up target_disk
        :param target_disk: Disk to free up (e.g., 'D:')
        :param destination_disks: List of disks to move files to
        :return: Migration plan dictionary
        """
        logger.info(f"Planning migration to free up {target_disk}")

        usage = self.calculate_disk_usage()

        if target_disk not in usage:
            logger.error(f"Target disk {target_disk} not found in index!")
            return {}

        # Get files on target disk
        conn = self.get_connection()
        cursor = conn.cursor()

        cursor.execute(
            "SELECT path, size, modified_time FROM files WHERE disk = %s ORDER BY size DESC",
            (target_disk,)
        )
        files_to_move = cursor.fetchall()
        cursor.close()
        conn.close()

        target_disk_usage = usage[target_disk]['size']
        logger.info(f"Need to move {len(files_to_move)} files, {self.format_size(target_disk_usage)}")

        # Calculate available space on destination disks
        dest_availability = []
        for disk in destination_disks:
            if disk not in usage:
                # Assume empty disk
                available = float('inf')
            else:
                # In real scenario, query actual disk free space
                available = float('inf')  # Placeholder

            dest_availability.append({
                'disk': disk,
                'available': available,
                'planned_usage': 0
            })

        # Generate move plan
        plan = {
            'target_disk': target_disk,
            'total_size': target_disk_usage,
            'file_count': len(files_to_move),
            'operations': [],
            'destination_disks': destination_disks
        }

        conn = self.get_connection()
        cursor = conn.cursor()

        try:
            for file_info in files_to_move:
                rel_path, size, mtime = file_info

                # Find best destination (simple round-robin for balance)
                dest_disk = destination_disks[len(plan['operations']) % len(destination_disks)]

                # Record operation
                op = {
                    'source_disk': target_disk,
                    'source_path': rel_path,
                    'dest_disk': dest_disk,
                    'dest_path': rel_path,  # Keep same relative path
                    'size': size
                }
                plan['operations'].append(op)

                # Store in database
                cursor.execute(
                    "INSERT INTO operations (source_path, dest_path, operation_type) VALUES (%s, %s, %s)",
                    (f"{target_disk}:{rel_path}", f"{dest_disk}:{rel_path}", 'move')
                )

            conn.commit()
        finally:
            cursor.close()
            conn.close()

        # Save plan to JSON
        plan_file = f"migration_plan_{target_disk}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        with open(plan_file, 'w') as f:
            json.dump(plan, f, indent=2)

        logger.info(f"Plan created with {len(plan['operations'])} operations")
        logger.info(f"Plan saved to {plan_file}")

        return plan

    def verify_operation(self, source: Path, dest: Path) -> bool:
        """Verify file was copied correctly (size + optional checksum)"""
        if not dest.exists():
            return False

        try:
            source_stat = source.stat()
            dest_stat = dest.stat()

            if source_stat.st_size != dest_stat.st_size:
                return False

            # Optional: checksum verification for critical files
            # if source_stat.st_size < 100*1024*1024:  # Only for files < 100MB
            #     return self.file_checksum(source) == self.file_checksum(dest)

            return True
        except Exception as e:
            logger.error(f"Verification error: {e}")
            return False

    @staticmethod
    def file_checksum(path: Path) -> str:
        """Calculate MD5 checksum of file"""
        hash_md5 = hashlib.md5()
        with open(path, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_md5.update(chunk)
        return hash_md5.hexdigest()

    def execute_migration(self, plan_file: str, dry_run: bool = True):
        """
        Execute migration plan
        :param plan_file: Path to plan JSON file
        :param dry_run: If True, only simulate operations
        """
        logger.info(f"{'DRY RUN' if dry_run else 'EXECUTING'} migration from {plan_file}")

        with open(plan_file, 'r') as f:
            plan = json.load(f)

        operations = plan['operations']
        logger.info(f"Processing {len(operations)} operations...")

        success_count = 0
        error_count = 0
        start_time = time.time()

        conn = self.get_connection()
        cursor = conn.cursor()

        try:
            for i, op in enumerate(operations, 1):
                source_disk = op['source_disk']
                source_path = op['source_path']
                dest_disk = op['dest_disk']
                dest_path = op['dest_path']

                source_full = Path(source_disk) / source_path
                dest_full = Path(dest_disk) / dest_path

                # Dynamic progress display
                elapsed = time.time() - start_time
                rate = i / elapsed if elapsed > 0 else 0
                eta = (len(operations) - i) / rate if rate > 0 else 0
                display_path = str(source_path)
                if len(display_path) > 50:
                    display_path = '...' + display_path[-47:]

                print(f"\r[{i}/{len(operations)}] {success_count} OK, {error_count} ERR | {rate:.1f} files/s | ETA: {int(eta)}s | {display_path}", end='', flush=True)

                if dry_run:
                    # Simulate
                    if source_full.exists():
                        success_count += 1
                    else:
                        logger.warning(f"\n  Source does not exist: {source_full}")
                        error_count += 1
                    continue

                try:
                    # Create destination directory
                    dest_full.parent.mkdir(parents=True, exist_ok=True)

                    # Move file (copy + verify + delete)
                    if source_full.exists():
                        # Copy with metadata
                        shutil.copy2(source_full, dest_full)

                        # Verify
                        if self.verify_operation(source_full, dest_full):
                            # Update database
                            cursor.execute(
                                "UPDATE files SET disk = %s, status = 'moved' WHERE path = %s AND disk = %s",
                                (dest_disk, source_path, source_disk)
                            )

                            # Safe delete (could be made optional)
                            # source_full.unlink()

                            # Log operation as executed
                            cursor.execute(
                                "UPDATE operations SET executed = 1, executed_at = CURRENT_TIMESTAMP WHERE source_path = %s",
                                (f"{source_disk}:{source_path}",)
                            )

                            success_count += 1
                        else:
                            raise Exception("Verification failed")
                    else:
                        logger.warning(f"\n  Source missing: {source_full}")
                        error_count += 1

                except Exception as e:
                    logger.error(f"\n  Error processing {source_path}: {e}")
                    cursor.execute(
                        "UPDATE operations SET error = %s WHERE source_path = %s",
                        (str(e), f"{source_disk}:{source_path}")
                    )
                    error_count += 1

                # Commit every 10 operations
                if i % 10 == 0:
                    conn.commit()

            conn.commit()
            print()  # New line after progress display

        finally:
            cursor.close()
            conn.close()

        logger.info(f"Migration complete: {success_count} success, {error_count} errors")

        if not dry_run and error_count == 0:
            logger.info(f"✓ Disk {plan['target_disk']} is ready for Linux installation!")
            logger.info(f"  Remember to safely delete original files from {plan['target_disk']}")

    def generate_report(self):
        """Generate status report"""
        conn = self.get_connection()
        cursor = conn.cursor()

        try:
            cursor.execute("""
                SELECT status, COUNT(*), SUM(size) FROM files GROUP BY status
            """)

            print("\n=== FILE MIGRATION REPORT ===")
            for row in cursor.fetchall():
                status, count, size = row
                print(f"{status:15}: {count:6} files, {self.format_size(size or 0)}")

            cursor.execute("""
                SELECT operation_type, executed, verified, COUNT(*) FROM operations GROUP BY operation_type, executed, verified
            """)

            print("\n=== OPERATIONS REPORT ===")
            for row in cursor.fetchall():
                op_type, executed, verified, count = row
                status = "EXECUTED" if executed else "PENDING"
                if verified:
                    status += "+VERIFIED"
                print(f"{op_type:10} {status:15}: {count} operations")

        finally:
            cursor.close()
            conn.close()

    @staticmethod
    def format_size(size: int) -> str:
        """Format bytes to human readable string"""
        for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
            if size < 1024:
                return f"{size:.1f}{unit}"
            size /= 1024
        return f"{size:.1f}PB"

def main():
    parser = argparse.ArgumentParser(description='Disk Reorganizer - Free up a disk for Linux dual-boot')
    subparsers = parser.add_subparsers(dest='command', required=True)

    # Index command
    index_parser = subparsers.add_parser('index', help='Index files on a disk')
    index_parser.add_argument('disk_root', help='Root path of disk (e.g., D:\\\\)')
    index_parser.add_argument('disk_name', help='Logical name for the disk')

    # Plan command
    plan_parser = subparsers.add_parser('plan', help='Create migration plan')
    plan_parser.add_argument('target_disk', help='Disk to free up')
    plan_parser.add_argument('dest_disks', nargs='+', help='Destination disks')

    # Execute command
    exec_parser = subparsers.add_parser('execute', help='Execute migration plan')
    exec_parser.add_argument('plan_file', help='Path to plan JSON file')
    exec_parser.add_argument('--dry-run', action='store_true', help='Simulate without actual file operations')

    # Report command
    report_parser = subparsers.add_parser('report', help='Show current status')

    args = parser.parse_args()
    tool = DiskReorganizer()

    if args.command == 'index':
        tool.index_disk(args.disk_root, args.disk_name)

    elif args.command == 'plan':
        plan = tool.plan_migration(args.target_disk, args.dest_disks)
        if plan:
            print(f"\nPlan generated: {plan['file_count']} files, {tool.format_size(plan['total_size'])}")
            print(f"Destination disks: {', '.join(plan['destination_disks'])}")

    elif args.command == 'execute':
        tool.execute_migration(args.plan_file, dry_run=args.dry_run)

    elif args.command == 'report':
        tool.generate_report()

if __name__ == '__main__':
    main()```

===== src/shared/logger.py =====
```python
"""Dynamic progress logger with formatting utilities"""
import sys
import logging
from typing import Optional
from datetime import datetime
from pathlib import Path


def format_size(bytes_size: int) -> str:
    """Format bytes to human-readable size string

    Args:
        bytes_size: Size in bytes

    Returns:
        Human-readable size string (e.g., "1.5 GB", "234.5 MB")
    """
    for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']:
        if bytes_size < 1024.0:
            return f"{bytes_size:.1f} {unit}"
        bytes_size /= 1024.0
    return f"{bytes_size:.1f} EB"


def format_rate(bytes_per_second: float) -> str:
    """Format transfer rate to human-readable string

    Args:
        bytes_per_second: Transfer rate in bytes per second

    Returns:
        Human-readable rate string (e.g., "125.3 MB/s")
    """
    return f"{format_size(int(bytes_per_second))}/s"


def format_time(seconds: float) -> str:
    """Format seconds to human-readable time string

    Args:
        seconds: Time in seconds

    Returns:
        Human-readable time string (e.g., "2h 34m 12s", "45m 23s", "12s")
    """
    if seconds < 60:
        return f"{int(seconds)}s"
    elif seconds < 3600:
        minutes = int(seconds // 60)
        secs = int(seconds % 60)
        return f"{minutes}m {secs}s"
    else:
        hours = int(seconds // 3600)
        minutes = int((seconds % 3600) // 60)
        secs = int(seconds % 60)
        return f"{hours}h {minutes}m {secs}s"


class ProgressLogger:
    """Dynamic progress logger with real-time statistics"""

    def __init__(
        self,
        name: str = "defrag",
        level: int = logging.INFO,
        log_file: Optional[Path] = None,
        console_output: bool = True
    ):
        """Initialize progress logger

        Args:
            name: Logger name
            level: Logging level
            log_file: Optional log file path
            console_output: Whether to output to console
        """
        self.logger = logging.getLogger(name)
        self.logger.setLevel(level)
        self.logger.handlers.clear()

        # Create formatter
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
        )

        # Add console handler
        if console_output:
            console_handler = logging.StreamHandler(sys.stdout)
            console_handler.setLevel(level)
            console_handler.setFormatter(formatter)
            self.logger.addHandler(console_handler)

        # Add file handler
        if log_file:
            log_file.parent.mkdir(parents=True, exist_ok=True)
            file_handler = logging.FileHandler(log_file)
            file_handler.setLevel(level)
            file_handler.setFormatter(formatter)
            self.logger.addHandler(file_handler)

        self._last_progress_line = ""

    def info(self, message: str) -> None:
        """Log info message"""
        self.logger.info(message)

    def warning(self, message: str) -> None:
        """Log warning message"""
        self.logger.warning(message)

    def error(self, message: str) -> None:
        """Log error message"""
        self.logger.error(message)

    def debug(self, message: str) -> None:
        """Log debug message"""
        self.logger.debug(message)

    def critical(self, message: str) -> None:
        """Log critical message"""
        self.logger.critical(message)

    def progress(
        self,
        current: int,
        total: int,
        prefix: str = "",
        suffix: str = "",
        bytes_processed: Optional[int] = None,
        elapsed_seconds: Optional[float] = None
    ) -> None:
        """Log progress with dynamic statistics

        Args:
            current: Current progress count
            total: Total count
            prefix: Prefix message
            suffix: Suffix message
            bytes_processed: Optional bytes processed for rate calculation
            elapsed_seconds: Optional elapsed time for rate calculation
        """
        if total == 0:
            percent = 0.0
        else:
            percent = (current / total) * 100

        progress_msg = f"{prefix} [{current}/{total}] {percent:.1f}%"

        if bytes_processed is not None and elapsed_seconds is not None and elapsed_seconds > 0:
            rate = bytes_per_second = bytes_processed / elapsed_seconds
            progress_msg += f" | {format_size(bytes_processed)} @ {format_rate(rate)}"

            # Estimate time remaining
            if current > 0:
                estimated_total_seconds = (elapsed_seconds / current) * total
                remaining_seconds = estimated_total_seconds - elapsed_seconds
                progress_msg += f" | ETA: {format_time(remaining_seconds)}"

        if suffix:
            progress_msg += f" | {suffix}"

        self.info(progress_msg)

    def section(self, title: str) -> None:
        """Log section header

        Args:
            title: Section title
        """
        separator = "=" * 60
        self.info(separator)
        self.info(f"  {title}")
        self.info(separator)

    def subsection(self, title: str) -> None:
        """Log subsection header

        Args:
            title: Subsection title
        """
        self.info(f"\n--- {title} ---")


def create_logger(
    name: str = "defrag",
    level: str = "INFO",
    log_file: Optional[Path] = None,
    console_output: bool = True
) -> ProgressLogger:
    """Create and configure a progress logger

    Args:
        name: Logger name
        level: Logging level as string
        log_file: Optional log file path
        console_output: Whether to output to console

    Returns:
        Configured ProgressLogger instance
    """
    level_map = {
        'DEBUG': logging.DEBUG,
        'INFO': logging.INFO,
        'WARNING': logging.WARNING,
        'ERROR': logging.ERROR,
        'CRITICAL': logging.CRITICAL
    }

    log_level = level_map.get(level.upper(), logging.INFO)

    return ProgressLogger(
        name=name,
        level=log_level,
        log_file=log_file,
        console_output=console_output
    )

===== src/shared/_protocols.py =====

"""Protocol definitions for the shared package"""
from typing import Protocol, Any
from pathlib import Path
from dataclasses import dataclass
from datetime import datetime


@dataclass
class FileRecord:
    """Core file record with all metadata"""
    path: Path
    size: int
    modified_time: float
    created_time: float
    disk: str
    checksum: str | None = None
    status: str = 'indexed'  # indexed, planned, moved, verified
    category: str | None = None
    duplicate_of: str | None = None


@dataclass
class OperationRecord:
    """Record of a migration operation"""
    source_path: Path
    dest_path: Path
    operation_type: str  # move, copy, hardlink, symlink
    status: str = 'pending'  # pending, in_progress, completed, failed
    error: str | None = None
    executed_at: datetime | None = None
    verified: bool = False


class IDatabase(Protocol):
    """Protocol for database operations"""

    def store_file(self, file_record: FileRecord) -> None:
        """Store a file record"""
        ...

    def get_files_by_disk(self, disk: str) -> list[FileRecord]:
        """Get all files on a specific disk"""
        ...

    def store_operation(self, operation: OperationRecord) -> None:
        """Store an operation record"""
        ...

    def get_pending_operations(self) -> list[OperationRecord]:
        """Get all pending operations"""
        ...


class ILogger(Protocol):
    """Protocol for logging operations"""

    def info(self, message: str) -> None:
        ...

    def warning(self, message: str) -> None:
        ...

    def error(self, message: str) -> None:
        ...

    def debug(self, message: str) -> None:
        ...

===== src/shared/models.py =====

"""Data models for the disk reorganizer"""
from dataclasses import dataclass, field
from pathlib import Path
from datetime import datetime
from typing import Optional


@dataclass
class FileRecord:
    """Core file record with all metadata"""
    path: Path
    size: int
    modified_time: float
    created_time: float
    disk: str
    checksum: Optional[str] = None
    status: str = 'indexed'  # indexed, planned, moved, verified
    category: Optional[str] = None
    duplicate_of: Optional[str] = None

    def to_dict(self) -> dict:
        """Convert to dictionary for serialization"""
        return {
            'path': str(self.path),
            'size': self.size,
            'modified_time': self.modified_time,
            'created_time': self.created_time,
            'disk': self.disk,
            'checksum': self.checksum,
            'status': self.status,
            'category': self.category,
            'duplicate_of': self.duplicate_of
        }


@dataclass
class OperationRecord:
    """Record of a migration operation"""
    source_path: Path
    dest_path: Path
    operation_type: str  # move, copy, hardlink, symlink
    size: int = 0
    status: str = 'pending'  # pending, in_progress, completed, failed
    error: Optional[str] = None
    executed_at: Optional[datetime] = None
    verified: bool = False

    def to_dict(self) -> dict:
        """Convert to dictionary for serialization"""
        return {
            'source_path': str(self.source_path),
            'dest_path': str(self.dest_path),
            'operation_type': self.operation_type,
            'size': self.size,
            'status': self.status,
            'error': self.error,
            'executed_at': self.executed_at.isoformat() if self.executed_at else None,
            'verified': self.verified
        }


@dataclass
class DiskInfo:
    """Information about a disk/volume"""
    name: str
    device: str
    mount_point: Path
    total_size: int
    used_size: int
    free_size: int
    fs_type: str

    @property
    def usage_percent(self) -> float:
        """Calculate usage percentage"""
        if self.total_size == 0:
            return 0.0
        return (self.used_size / self.total_size) * 100


@dataclass
class MigrationPlan:
    """Complete migration plan"""
    target_disk: str
    destination_disks: list[str]
    operations: list[OperationRecord]
    total_size: int
    file_count: int
    created_at: datetime = field(default_factory=datetime.now)

    def to_dict(self) -> dict:
        """Convert to dictionary for serialization"""
        return {
            'target_disk': self.target_disk,
            'destination_disks': self.destination_disks,
            'operations': [op.to_dict() for op in self.operations],
            'total_size': self.total_size,
            'file_count': self.file_count,
            'created_at': self.created_at.isoformat()
        }


@dataclass
class ProcessingStats:
    """Statistics for processing operations"""
    files_processed: int = 0
    bytes_processed: int = 0
    files_succeeded: int = 0
    files_failed: int = 0
    start_time: datetime = field(default_factory=datetime.now)

    @property
    def elapsed_seconds(self) -> float:
        """Calculate elapsed time in seconds"""
        return (datetime.now() - self.start_time).total_seconds()

    @property
    def files_per_second(self) -> float:
        """Calculate processing rate"""
        elapsed = self.elapsed_seconds
        return self.files_processed / elapsed if elapsed > 0 else 0.0

    @property
    def bytes_per_second(self) -> float:
        """Calculate throughput"""
        elapsed = self.elapsed_seconds
        return self.bytes_processed / elapsed if elapsed > 0 else 0.0

===== src/shared/config.py =====

"""Configuration management for disk reorganizer"""
import json
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Optional


@dataclass
class DatabaseConfig:
    """Database connection configuration"""
    host: str = '192.168.1.159'
    port: int = 5432
    database: str = 'disk_reorganizer_db'
    user: str = 'disk_reorg_user'
    password: str = 'heel-goed-wachtwoord'

    def to_dict(self) -> dict:
        """Convert to dictionary"""
        return asdict(self)


@dataclass
class ProcessingConfig:
    """Processing behavior configuration"""
    batch_size: int = 1000
    commit_interval: int = 100
    parallel_workers: int = 4
    chunk_size: int = 8192
    hash_algorithm: str = 'sha256'
    verify_operations: bool = True
    preserve_timestamps: bool = True

    def to_dict(self) -> dict:
        """Convert to dictionary"""
        return asdict(self)


@dataclass
class LoggingConfig:
    """Logging configuration"""
    level: str = 'INFO'
    log_file: str = 'disk_reorganizer.log'
    console_output: bool = True
    file_output: bool = True

    def to_dict(self) -> dict:
        """Convert to dictionary"""
        return asdict(self)


@dataclass
class Config:
    """Main configuration container"""
    database: DatabaseConfig = None
    processing: ProcessingConfig = None
    logging: LoggingConfig = None

    def __post_init__(self):
        """Initialize nested configs with defaults if not provided"""
        if self.database is None:
            self.database = DatabaseConfig()
        if self.processing is None:
            self.processing = ProcessingConfig()
        if self.logging is None:
            self.logging = LoggingConfig()

    @classmethod
    def from_file(cls, config_path: Path) -> 'Config':
        """Load configuration from JSON file"""
        if not config_path.exists():
            return cls()

        with open(config_path, 'r') as f:
            data = json.load(f)

        return cls(
            database=DatabaseConfig(**data.get('database', {})),
            processing=ProcessingConfig(**data.get('processing', {})),
            logging=LoggingConfig(**data.get('logging', {}))
        )

    def to_file(self, config_path: Path) -> None:
        """Save configuration to JSON file"""
        data = {
            'database': self.database.to_dict(),
            'processing': self.processing.to_dict(),
            'logging': self.logging.to_dict()
        }

        with open(config_path, 'w') as f:
            json.dump(data, f, indent=2)

    def to_dict(self) -> dict:
        """Convert to dictionary"""
        return {
            'database': self.database.to_dict(),
            'processing': self.processing.to_dict(),
            'logging': self.logging.to_dict()
        }


def load_config(config_path: Optional[Path] = None) -> Config:
    """Load configuration from file or return default"""
    if config_path is None:
        config_path = Path('config.json')

    if config_path.exists():
        return Config.from_file(config_path)

    return Config()

===== src/shared/init.py =====

"""Shared package exports"""
from .models import (
    FileRecord,
    OperationRecord,
    DiskInfo,
    MigrationPlan,
    ProcessingStats
)
from .config import (
    Config,
    DatabaseConfig,
    ProcessingConfig,
    LoggingConfig,
    load_config
)
from .logger import (
    ProgressLogger,
    create_logger,
    format_size,
    format_rate,
    format_time
)
from ._protocols import IDatabase, ILogger

__all__ = [
    # Models
    'FileRecord',
    'OperationRecord',
    'DiskInfo',
    'MigrationPlan',
    'ProcessingStats',

    # Config
    'Config',
    'DatabaseConfig',
    'ProcessingConfig',
    'LoggingConfig',
    'load_config',

    # Logger
    'ProgressLogger',
    'create_logger',
    'format_size',
    'format_rate',
    'format_time',

    # Protocols
    'IDatabase',
    'ILogger',
]

===== src/migration/_protocols.py =====

"""Protocol definitions for the migration package"""
from typing import Protocol
from pathlib import Path
from ..shared.models import OperationRecord


class IMigrationStrategy(Protocol):
    """Protocol for migration strategies"""

    def migrate(
        self,
        source: Path,
        destination: Path,
        verify: bool = True
    ) -> bool:
        """Migrate a file from source to destination

        Args:
            source: Source file path
            destination: Destination file path
            verify: Whether to verify the operation

        Returns:
            True if migration successful
        """
        ...

    def can_migrate(self, source: Path, destination: Path) -> bool:
        """Check if migration is possible

        Args:
            source: Source file path
            destination: Destination file path

        Returns:
            True if migration is possible
        """
        ...

    def estimate_time(self, source: Path) -> float:
        """Estimate migration time in seconds

        Args:
            source: Source file path

        Returns:
            Estimated time in seconds
        """
        ...

    def cleanup(self, source: Path) -> bool:
        """Cleanup source file after successful migration

        Args:
            source: Source file path

        Returns:
            True if cleanup successful
        """
        ...


class IMigrationEngine(Protocol):
    """Protocol for migration engine"""

    def plan_migration(
        self,
        disk: str,
        target_base: Path
    ) -> list[OperationRecord]:
        """Plan migration for a disk

        Args:
            disk: Disk identifier
            target_base: Target base directory

        Returns:
            List of planned operations
        """
        ...

    def execute_migration(
        self,
        operations: list[OperationRecord],
        dry_run: bool = False
    ) -> dict:
        """Execute migration operations

        Args:
            operations: List of operations to execute
            dry_run: Whether to perform a dry run

        Returns:
            Dictionary with execution statistics
        """
        ...

    def rollback(self, operation: OperationRecord) -> bool:
        """Rollback a migration operation

        Args:
            operation: Operation to rollback

        Returns:
            True if rollback successful
        """
        ...

===== src/migration/engine.py =====

"""Migration engine"""
from pathlib import Path
from typing import Optional, Callable
from datetime import datetime
import psycopg2
from psycopg2.extras import execute_batch

from .copy import CopyMigrationStrategy, SafeCopyStrategy
from .hardlink import HardlinkMigrationStrategy, SymlinkMigrationStrategy
from ..shared.models import OperationRecord, ProcessingStats, MigrationPlan
from ..shared.config import DatabaseConfig, ProcessingConfig
from ..shared.logger import ProgressLogger


class MigrationEngine:
    """Engine for migrating files"""

    def __init__(
        self,
        db_config: DatabaseConfig,
        processing_config: ProcessingConfig,
        logger: ProgressLogger,
        target_base: Path
    ):
        """Initialize migration engine

        Args:
            db_config: Database configuration
            processing_config: Processing configuration
            logger: Progress logger
            target_base: Target base directory for migrations
        """
        self.db_config = db_config
        self.processing_config = processing_config
        self.logger = logger
        self.target_base = Path(target_base)
        self._connection = None

        # Initialize strategies
        self.copy_strategy = SafeCopyStrategy(logger=logger)
        self.hardlink_strategy = HardlinkMigrationStrategy(logger=logger)
        self.symlink_strategy = SymlinkMigrationStrategy(logger=logger)

    def _get_connection(self):
        """Get or create database connection"""
        if self._connection is None or self._connection.closed:
            self._connection = psycopg2.connect(
                host=self.db_config.host,
                port=self.db_config.port,
                database=self.db_config.database,
                user=self.db_config.user,
                password=self.db_config.password
            )
        return self._connection

    def _ensure_tables(self):
        """Ensure migration tables exist"""
        conn = self._get_connection()
        cursor = conn.cursor()

        # Create operations table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS operations (
                id SERIAL PRIMARY KEY,
                source_path TEXT NOT NULL,
                dest_path TEXT NOT NULL,
                operation_type TEXT NOT NULL,
                size BIGINT DEFAULT 0,
                status TEXT DEFAULT 'pending',
                error TEXT,
                executed_at TIMESTAMP,
                verified BOOLEAN DEFAULT FALSE,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)

        # Create index on status
        cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx_operations_status
            ON operations(status)
        """)

        conn.commit()
        cursor.close()

    def plan_migration(
        self,
        disk: Optional[str] = None,
        category: Optional[str] = None
    ) -> MigrationPlan:
        """Plan migration for files

        Args:
            disk: Optional disk filter
            category: Optional category filter

        Returns:
            MigrationPlan with planned operations
        """
        self.logger.section("Planning Migration")

        conn = self._get_connection()
        cursor = conn.cursor()

        # Build query
        conditions = ["category IS NOT NULL"]
        params = []

        if disk:
            conditions.append("disk = %s")
            params.append(disk)

        if category:
            conditions.append("category = %s")
            params.append(category)

        query = f"""
            SELECT path, size, category, duplicate_of
            FROM files
            WHERE {' AND '.join(conditions)}
            ORDER BY category, path
        """

        cursor.execute(query, params)
        files = cursor.fetchall()

        self.logger.info(f"Found {len(files)} files to migrate")

        operations = []
        total_size = 0

        for path_str, size, file_category, duplicate_of in files:
            source = Path(path_str)

            # Determine destination
            dest_path = self.target_base / file_category / source.name

            # Determine operation type
            if duplicate_of:
                # Use hardlink for duplicates
                operation_type = 'hardlink'
            else:
                # Use copy for unique files
                operation_type = 'copy'

            operation = OperationRecord(
                source_path=source,
                dest_path=dest_path,
                operation_type=operation_type,
                size=size
            )

            operations.append(operation)
            total_size += size

        cursor.close()

        plan = MigrationPlan(
            target_disk=str(self.target_base),
            destination_disks=[str(self.target_base)],
            operations=operations,
            total_size=total_size,
            file_count=len(operations)
        )

        self.logger.info(
            f"Migration plan created: {plan.file_count} files, "
            f"{plan.total_size:,} bytes"
        )

        return plan

    def execute_migration(
        self,
        operations: list[OperationRecord],
        dry_run: bool = False,
        progress_callback: Optional[Callable[[int, int, ProcessingStats], None]] = None
    ) -> ProcessingStats:
        """Execute migration operations

        Args:
            operations: List of operations to execute
            dry_run: Whether to perform a dry run
            progress_callback: Optional callback for progress updates

        Returns:
            ProcessingStats with execution statistics
        """
        self.logger.section("Executing Migration" + (" (DRY RUN)" if dry_run else ""))

        self._ensure_tables()

        stats = ProcessingStats()
        total_ops = len(operations)

        for operation in operations:
            stats.files_processed += 1

            if dry_run:
                # In dry run, just log what would happen
                self.logger.debug(
                    f"[DRY RUN] Would {operation.operation_type}: "
                    f"{operation.source_path} -> {operation.dest_path}"
                )
                stats.files_succeeded += 1
            else:
                # Execute actual migration
                success = self._execute_operation(operation)

                if success:
                    stats.files_succeeded += 1
                    stats.bytes_processed += operation.size
                else:
                    stats.files_failed += 1

            # Progress callback
            if progress_callback and stats.files_processed % 100 == 0:
                progress_callback(stats.files_processed, total_ops, stats)

            # Log progress
            if stats.files_processed % 1000 == 0:
                self.logger.progress(
                    stats.files_processed,
                    total_ops,
                    prefix="Operations executed",
                    bytes_processed=stats.bytes_processed,
                    elapsed_seconds=stats.elapsed_seconds
                )

        self.logger.info(
            f"Migration {'dry run' if dry_run else 'execution'} complete: "
            f"{stats.files_succeeded}/{total_ops} operations, "
            f"{stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s"
        )

        return stats

    def _execute_operation(self, operation: OperationRecord) -> bool:
        """Execute a single migration operation

        Args:
            operation: Operation to execute

        Returns:
            True if successful
        """
        operation.status = 'in_progress'
        operation.executed_at = datetime.now()

        try:
            # Select strategy based on operation type
            if operation.operation_type == 'copy':
                strategy = self.copy_strategy
            elif operation.operation_type == 'hardlink':
                strategy = self.hardlink_strategy
            elif operation.operation_type == 'symlink':
                strategy = self.symlink_strategy
            else:
                raise ValueError(f"Unknown operation type: {operation.operation_type}")

            # Execute migration
            success = strategy.migrate(
                operation.source_path,
                operation.dest_path,
                verify=self.processing_config.verify_operations
            )

            if success:
                operation.status = 'completed'
                operation.verified = True
                self._record_operation(operation)
                return True
            else:
                operation.status = 'failed'
                operation.error = "Migration failed"
                self._record_operation(operation)
                return False

        except Exception as e:
            operation.status = 'failed'
            operation.error = str(e)
            self._record_operation(operation)
            self.logger.error(f"Operation failed: {operation.source_path}: {e}")
            return False

    def _record_operation(self, operation: OperationRecord):
        """Record operation in database

        Args:
            operation: Operation to record
        """
        conn = self._get_connection()
        cursor = conn.cursor()

        cursor.execute("""
            INSERT INTO operations (
                source_path, dest_path, operation_type, size,
                status, error, executed_at, verified
            )
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
        """, (
            str(operation.source_path),
            str(operation.dest_path),
            operation.operation_type,
            operation.size,
            operation.status,
            operation.error,
            operation.executed_at,
            operation.verified
        ))

        conn.commit()
        cursor.close()

    def rollback(self, operation: OperationRecord) -> bool:
        """Rollback a migration operation

        Args:
            operation: Operation to rollback

        Returns:
            True if rollback successful
        """
        self.logger.warning(f"Rolling back: {operation.dest_path}")

        try:
            # Remove destination
            if operation.dest_path.exists():
                operation.dest_path.unlink()

            # Update database
            conn = self._get_connection()
            cursor = conn.cursor()

            cursor.execute("""
                UPDATE operations
                SET status = 'rolled_back'
                WHERE source_path = %s AND dest_path = %s
            """, (str(operation.source_path), str(operation.dest_path)))

            conn.commit()
            cursor.close()

            return True

        except Exception as e:
            self.logger.error(f"Rollback failed: {operation.dest_path}: {e}")
            return False

    def get_migration_stats(self) -> dict:
        """Get migration statistics

        Returns:
            Dictionary with statistics
        """
        conn = self._get_connection()
        cursor = conn.cursor()

        stats = {}

        # Total operations
        cursor.execute("SELECT COUNT(*) FROM operations")
        stats['total_operations'] = cursor.fetchone()[0]

        # Operations by status
        cursor.execute("""
            SELECT status, COUNT(*)
            FROM operations
            GROUP BY status
        """)

        for status, count in cursor.fetchall():
            stats[f'{status}_operations'] = count

        # Total size migrated
        cursor.execute("""
            SELECT COALESCE(SUM(size), 0)
            FROM operations
            WHERE status = 'completed'
        """)
        stats['total_size_migrated'] = cursor.fetchone()[0]

        cursor.close()

        return stats

    def verify_migrations(self) -> dict:
        """Verify completed migrations

        Returns:
            Dictionary with verification results
        """
        self.logger.subsection("Verifying Migrations")

        conn = self._get_connection()
        cursor = conn.cursor()

        cursor.execute("""
            SELECT source_path, dest_path, operation_type
            FROM operations
            WHERE status = 'completed' AND verified = FALSE
        """)

        operations = cursor.fetchall()
        cursor.close()

        results = {
            'total': len(operations),
            'verified': 0,
            'failed': 0
        }

        for source_str, dest_str, op_type in operations:
            source = Path(source_str)
            dest = Path(dest_str)

            # Verify destination exists
            if not dest.exists():
                results['failed'] += 1
                self.logger.warning(f"Verification failed: {dest} does not exist")
                continue

            # Verify based on operation type
            if op_type == 'hardlink':
                # Check if hardlinked
                if source.exists() and source.stat().st_ino == dest.stat().st_ino:
                    results['verified'] += 1
                else:
                    results['failed'] += 1
            else:
                # Check if destination exists and has correct size
                if dest.exists():
                    results['verified'] += 1
                else:
                    results['failed'] += 1

        self.logger.info(
            f"Verification complete: {results['verified']}/{results['total']} verified"
        )

        return results

    def close(self):
        """Close database connection"""
        if self._connection and not self._connection.closed:
            self._connection.close()

    def __enter__(self):
        """Context manager entry"""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit"""
        self.close()

===== src/migration/hardlink.py =====

"""Hardlink-based migration strategy"""
import os
from pathlib import Path
from typing import Optional

from ..shared.logger import ProgressLogger


class HardlinkMigrationStrategy:
    """Create hardlinks to files instead of copying"""

    def __init__(self, logger: Optional[ProgressLogger] = None):
        """Initialize hardlink migration strategy

        Args:
            logger: Optional progress logger
        """
        self.logger = logger

    def migrate(
        self,
        source: Path,
        destination: Path,
        verify: bool = True
    ) -> bool:
        """Migrate file by creating hardlink

        Args:
            source: Source file path
            destination: Destination file path
            verify: Whether to verify the operation

        Returns:
            True if migration successful
        """
        if not source.exists():
            if self.logger:
                self.logger.error(f"Source file does not exist: {source}")
            return False

        # Check if source and destination are on same filesystem
        if not self._same_filesystem(source, destination.parent):
            if self.logger:
                self.logger.warning(
                    f"Cannot hardlink across filesystems: {source} -> {destination}"
                )
            return False

        # Create destination directory
        destination.parent.mkdir(parents=True, exist_ok=True)

        try:
            # Create hardlink
            os.link(source, destination)

            # Verify if requested
            if verify:
                if not self._verify_hardlink(source, destination):
                    if self.logger:
                        self.logger.error(f"Verification failed: {source} -> {destination}")
                    destination.unlink()
                    return False

            return True

        except FileExistsError:
            if self.logger:
                self.logger.warning(f"Destination already exists: {destination}")
            return False

        except Exception as e:
            if self.logger:
                self.logger.error(f"Hardlink failed: {source} -> {destination}: {e}")
            return False

    def _same_filesystem(self, path1: Path, path2: Path) -> bool:
        """Check if two paths are on the same filesystem

        Args:
            path1: First path
            path2: Second path

        Returns:
            True if on same filesystem
        """
        try:
            # Get device IDs
            stat1 = path1.stat()
            stat2 = path2.stat()
            return stat1.st_dev == stat2.st_dev
        except Exception:
            return False

    def _verify_hardlink(self, source: Path, destination: Path) -> bool:
        """Verify hardlink

        Args:
            source: Source file path
            destination: Destination file path

        Returns:
            True if verification successful
        """
        try:
            # Check if they have the same inode
            source_stat = source.stat()
            dest_stat = destination.stat()

            return source_stat.st_ino == dest_stat.st_ino

        except Exception:
            return False

    def can_migrate(self, source: Path, destination: Path) -> bool:
        """Check if migration is possible

        Args:
            source: Source file path
            destination: Destination file path

        Returns:
            True if migration is possible
        """
        if not source.exists():
            return False

        # Check if on same filesystem
        dest_dir = destination.parent
        if dest_dir.exists():
            return self._same_filesystem(source, dest_dir)

        # Check parent directories
        parent = dest_dir.parent
        while not parent.exists() and parent != parent.parent:
            parent = parent.parent

        return parent.exists() and self._same_filesystem(source, parent)

    def estimate_time(self, source: Path) -> float:
        """Estimate migration time in seconds

        Args:
            source: Source file path

        Returns:
            Estimated time in seconds (hardlinks are instant)
        """
        return 0.01  # Hardlinks are nearly instant

    def cleanup(self, source: Path) -> bool:
        """Cleanup source file after successful migration

        Note: For hardlinks, we typically don't remove the source
        immediately as both links point to the same inode.

        Args:
            source: Source file path

        Returns:
            True (no cleanup needed for hardlinks)
        """
        # For hardlinks, we don't remove the source
        # Both source and destination point to the same data
        return True


class SymlinkMigrationStrategy:
    """Create symbolic links to files"""

    def __init__(
        self,
        logger: Optional[ProgressLogger] = None,
        absolute_links: bool = True
    ):
        """Initialize symlink migration strategy

        Args:
            logger: Optional progress logger
            absolute_links: Whether to create absolute symlinks
        """
        self.logger = logger
        self.absolute_links = absolute_links

    def migrate(
        self,
        source: Path,
        destination: Path,
        verify: bool = True
    ) -> bool:
        """Migrate file by creating symlink

        Args:
            source: Source file path
            destination: Destination file path
            verify: Whether to verify the operation

        Returns:
            True if migration successful
        """
        if not source.exists():
            if self.logger:
                self.logger.error(f"Source file does not exist: {source}")
            return False

        # Create destination directory
        destination.parent.mkdir(parents=True, exist_ok=True)

        try:
            # Determine link target
            if self.absolute_links:
                target = source.resolve()
            else:
                # Create relative symlink
                target = os.path.relpath(source, destination.parent)

            # Create symlink
            destination.symlink_to(target)

            # Verify if requested
            if verify:
                if not self._verify_symlink(destination, source):
                    if self.logger:
                        self.logger.error(f"Verification failed: {source} -> {destination}")
                    destination.unlink()
                    return False

            return True

        except FileExistsError:
            if self.logger:
                self.logger.warning(f"Destination already exists: {destination}")
            return False

        except Exception as e:
            if self.logger:
                self.logger.error(f"Symlink failed: {source} -> {destination}: {e}")
            return False

    def _verify_symlink(self, symlink: Path, expected_target: Path) -> bool:
        """Verify symlink

        Args:
            symlink: Symlink path
            expected_target: Expected target path

        Returns:
            True if verification successful
        """
        try:
            # Check if it's a symlink
            if not symlink.is_symlink():
                return False

            # Resolve and compare
            resolved = symlink.resolve()
            expected = expected_target.resolve()

            return resolved == expected

        except Exception:
            return False

    def can_migrate(self, source: Path, destination: Path) -> bool:
        """Check if migration is possible

        Args:
            source: Source file path
            destination: Destination file path

        Returns:
            True if migration is possible
        """
        if not source.exists():
            return False

        # Check if destination directory is writable
        dest_dir = destination.parent
        if dest_dir.exists():
            return os.access(dest_dir, os.W_OK)

        return True

    def estimate_time(self, source: Path) -> float:
        """Estimate migration time in seconds

        Args:
            source: Source file path

        Returns:
            Estimated time in seconds (symlinks are instant)
        """
        return 0.01  # Symlinks are instant

    def cleanup(self, source: Path) -> bool:
        """Cleanup source file after successful migration

        Note: For symlinks, we don't remove the source as the
        symlink points to it.

        Args:
            source: Source file path

        Returns:
            True (no cleanup needed for symlinks)
        """
        # For symlinks, we don't remove the source
        return True


class DedupHardlinkStrategy(HardlinkMigrationStrategy):
    """Hardlink strategy for deduplication

    Creates hardlinks for duplicate files to save space.
    """

    def __init__(self, logger: Optional[ProgressLogger] = None):
        """Initialize dedup hardlink strategy"""
        super().__init__(logger=logger)

    def deduplicate(
        self,
        canonical: Path,
        duplicate: Path
    ) -> bool:
        """Replace duplicate with hardlink to canonical

        Args:
            canonical: Canonical file path
            duplicate: Duplicate file path

        Returns:
            True if deduplication successful
        """
        if not canonical.exists():
            if self.logger:
                self.logger.error(f"Canonical file does not exist: {canonical}")
            return False

        if not duplicate.exists():
            if self.logger:
                self.logger.error(f"Duplicate file does not exist: {duplicate}")
            return False

        # Check if already hardlinked
        if self._verify_hardlink(canonical, duplicate):
            return True

        # Check if on same filesystem
        if not self._same_filesystem(canonical, duplicate):
            if self.logger:
                self.logger.warning(
                    f"Cannot hardlink across filesystems: {canonical} -> {duplicate}"
                )
            return False

        try:
            # Create temporary backup
            backup = duplicate.with_suffix(duplicate.suffix + '.bak')
            duplicate.rename(backup)

            # Create hardlink
            os.link(canonical, duplicate)

            # Remove backup
            backup.unlink()

            return True

        except Exception as e:
            if self.logger:
                self.logger.error(f"Deduplication failed: {duplicate}: {e}")

            # Restore from backup
            if backup.exists():
                backup.rename(duplicate)

            return False

===== src/migration/init.py =====

"""Migration package exports"""
from .copy import (
    CopyMigrationStrategy,
    FastCopyStrategy,
    SafeCopyStrategy,
    ReferenceCopyStrategy
)
from .hardlink import (
    HardlinkMigrationStrategy,
    SymlinkMigrationStrategy,
    DedupHardlinkStrategy
)
from .engine import MigrationEngine
from ._protocols import IMigrationStrategy, IMigrationEngine

__all__ = [
    'CopyMigrationStrategy',
    'FastCopyStrategy',
    'SafeCopyStrategy',
    'ReferenceCopyStrategy',
    'HardlinkMigrationStrategy',
    'SymlinkMigrationStrategy',
    'DedupHardlinkStrategy',
    'MigrationEngine',
    'IMigrationStrategy',
    'IMigrationEngine',
]

===== src/migration/copy.py =====

"""Copy-based migration strategy"""
import shutil
from pathlib import Path
from typing import Optional
import os

from ..shared.logger import ProgressLogger


class CopyMigrationStrategy:
    """Copy files to destination with verification"""

    def __init__(
        self,
        logger: Optional[ProgressLogger] = None,
        preserve_metadata: bool = True,
        verify_checksums: bool = True
    ):
        """Initialize copy migration strategy

        Args:
            logger: Optional progress logger
            preserve_metadata: Whether to preserve file metadata
            verify_checksums: Whether to verify checksums after copy
        """
        self.logger = logger
        self.preserve_metadata = preserve_metadata
        self.verify_checksums = verify_checksums

    def migrate(
        self,
        source: Path,
        destination: Path,
        verify: bool = True
    ) -> bool:
        """Migrate file by copying

        Args:
            source: Source file path
            destination: Destination file path
            verify: Whether to verify the operation

        Returns:
            True if migration successful
        """
        if not source.exists():
            if self.logger:
                self.logger.error(f"Source file does not exist: {source}")
            return False

        # Create destination directory
        destination.parent.mkdir(parents=True, exist_ok=True)

        try:
            # Copy file
            if self.preserve_metadata:
                shutil.copy2(source, destination)
            else:
                shutil.copy(source, destination)

            # Verify if requested
            if verify and self.verify_checksums:
                if not self._verify_copy(source, destination):
                    if self.logger:
                        self.logger.error(f"Verification failed: {source} -> {destination}")
                    destination.unlink()
                    return False

            return True

        except Exception as e:
            if self.logger:
                self.logger.error(f"Copy failed: {source} -> {destination}: {e}")
            return False

    def _verify_copy(self, source: Path, destination: Path) -> bool:
        """Verify copied file

        Args:
            source: Source file path
            destination: Destination file path

        Returns:
            True if verification successful
        """
        # Check size
        source_size = source.stat().st_size
        dest_size = destination.stat().st_size

        if source_size != dest_size:
            return False

        # Compare checksums for files larger than 1MB
        if source_size > 1024 * 1024:
            from ..deduplication.chunker import hash_file

            source_hash = hash_file(source)
            dest_hash = hash_file(destination)

            return source_hash == dest_hash

        # For small files, compare content directly
        with open(source, 'rb') as f1, open(destination, 'rb') as f2:
            return f1.read() == f2.read()

    def can_migrate(self, source: Path, destination: Path) -> bool:
        """Check if migration is possible

        Args:
            source: Source file path
            destination: Destination file path

        Returns:
            True if migration is possible
        """
        if not source.exists():
            return False

        # Check if destination directory is writable
        dest_dir = destination.parent
        if dest_dir.exists():
            return os.access(dest_dir, os.W_OK)

        # Check if parent directory exists and is writable
        parent = dest_dir.parent
        while not parent.exists() and parent != parent.parent:
            parent = parent.parent

        return parent.exists() and os.access(parent, os.W_OK)

    def estimate_time(self, source: Path) -> float:
        """Estimate migration time in seconds

        Args:
            source: Source file path

        Returns:
            Estimated time in seconds
        """
        if not source.exists():
            return 0.0

        size = source.stat().st_size

        # Estimate based on typical copy speed (100 MB/s)
        typical_speed = 100 * 1024 * 1024  # bytes per second
        return size / typical_speed

    def cleanup(self, source: Path) -> bool:
        """Cleanup source file after successful migration

        Args:
            source: Source file path

        Returns:
            True if cleanup successful
        """
        try:
            if source.exists():
                source.unlink()
            return True
        except Exception as e:
            if self.logger:
                self.logger.warning(f"Failed to cleanup {source}: {e}")
            return False


class FastCopyStrategy(CopyMigrationStrategy):
    """Fast copy strategy without verification"""

    def __init__(self, logger: Optional[ProgressLogger] = None):
        """Initialize fast copy strategy"""
        super().__init__(
            logger=logger,
            preserve_metadata=True,
            verify_checksums=False
        )


class SafeCopyStrategy(CopyMigrationStrategy):
    """Safe copy strategy with full verification"""

    def __init__(self, logger: Optional[ProgressLogger] = None):
        """Initialize safe copy strategy"""
        super().__init__(
            logger=logger,
            preserve_metadata=True,
            verify_checksums=True
        )


class ReferenceCopyStrategy:
    """Create reference copy using reflinks (CoW) if supported"""

    def __init__(self, logger: Optional[ProgressLogger] = None):
        """Initialize reflink copy strategy"""
        self.logger = logger

    def migrate(
        self,
        source: Path,
        destination: Path,
        verify: bool = True
    ) -> bool:
        """Migrate using reflink (copy-on-write)

        Args:
            source: Source file path
            destination: Destination file path
            verify: Whether to verify the operation

        Returns:
            True if migration successful
        """
        if not source.exists():
            if self.logger:
                self.logger.error(f"Source file does not exist: {source}")
            return False

        # Create destination directory
        destination.parent.mkdir(parents=True, exist_ok=True)

        try:
            # Try reflink copy (works on btrfs, xfs, etc.)
            import subprocess

            result = subprocess.run(
                ['cp', '--reflink=auto', str(source), str(destination)],
                capture_output=True,
                check=False
            )

            if result.returncode != 0:
                # Fallback to regular copy
                shutil.copy2(source, destination)

            return True

        except Exception as e:
            if self.logger:
                self.logger.error(f"Reflink copy failed: {source} -> {destination}: {e}")
            return False

    def can_migrate(self, source: Path, destination: Path) -> bool:
        """Check if migration is possible"""
        if not source.exists():
            return False

        dest_dir = destination.parent
        if dest_dir.exists():
            return os.access(dest_dir, os.W_OK)

        return True

    def estimate_time(self, source: Path) -> float:
        """Estimate migration time (reflinks are fast)"""
        return 0.1  # Reflinks are nearly instant

    def cleanup(self, source: Path) -> bool:
        """Cleanup source file"""
        try:
            if source.exists():
                source.unlink()
            return True
        except Exception as e:
            if self.logger:
                self.logger.warning(f"Failed to cleanup {source}: {e}")
            return False

===== src/discovery/_protocols.py =====

"""Protocol definitions for the discovery package"""
from typing import Iterator, Protocol, Any
from pathlib import Path
from dataclasses import dataclass


@dataclass
class FileMeta:
    """Metadata for a discovered file"""
    path: Path
    size: int
    modified_time: float
    created_time: float
    # Add other metadata fields as needed


@dataclass
class MountInfo:
    """Information about a mounted filesystem"""
    device: str
    mount_point: str
    fs_type: str
    options: str
    # Add other mount info fields as needed


@dataclass
class DiskInfo:
    """Information about a disk/NVMe device"""
    device: str
    model: str
    size: int
    serial: str
    # Add other disk info fields as needed


class IFileScanner(Protocol):
    """Protocol for file scanning operations"""

    def scan(self, root: Path) -> Iterator[FileMeta]:
        """Scan a directory tree and yield file metadata"""
        ...


class ISystemAPI(Protocol):
    """Protocol for system information queries"""

    def query_mounts(self) -> list[MountInfo]:
        """Query mounted filesystems"""
        ...

    def query_nvmes(self) -> list[DiskInfo]:
        """Query NVMe/disk information"""
        ...

===== src/discovery/scanner.py =====

"""File system scanner implementing IFileScanner protocol"""
import os
from pathlib import Path
from typing import Iterator, Optional, Callable
from datetime import datetime

from ._protocols import FileMeta


class FileScanner:
    """File system scanner with filtering and error handling"""

    def __init__(
        self,
        follow_symlinks: bool = False,
        skip_hidden: bool = True,
        error_handler: Optional[Callable[[Exception, Path], None]] = None
    ):
        """Initialize file scanner

        Args:
            follow_symlinks: Whether to follow symbolic links
            skip_hidden: Whether to skip hidden files/directories
            error_handler: Optional callback for handling errors during scan
        """
        self.follow_symlinks = follow_symlinks
        self.skip_hidden = skip_hidden
        self.error_handler = error_handler
        self._files_scanned = 0
        self._bytes_scanned = 0
        self._errors = 0

    def scan(self, root: Path) -> Iterator[FileMeta]:
        """Scan a directory tree and yield file metadata

        Args:
            root: Root directory to scan

        Yields:
            FileMeta objects for each discovered file
        """
        if not root.exists():
            error = FileNotFoundError(f"Path does not exist: {root}")
            if self.error_handler:
                self.error_handler(error, root)
            else:
                raise error
            return

        if not root.is_dir():
            # If root is a file, just return its metadata
            try:
                yield self._get_file_meta(root)
            except Exception as e:
                self._errors += 1
                if self.error_handler:
                    self.error_handler(e, root)
                else:
                    raise
            return

        # Walk directory tree
        for dirpath, dirnames, filenames in os.walk(root, followlinks=self.follow_symlinks):
            current_dir = Path(dirpath)

            # Filter directories if needed
            if self.skip_hidden:
                dirnames[:] = [d for d in dirnames if not d.startswith('.')]

            # Process files
            for filename in filenames:
                if self.skip_hidden and filename.startswith('.'):
                    continue

                file_path = current_dir / filename

                try:
                    # Skip broken symlinks
                    if file_path.is_symlink() and not file_path.exists():
                        continue

                    meta = self._get_file_meta(file_path)
                    self._files_scanned += 1
                    self._bytes_scanned += meta.size

                    yield meta

                except PermissionError as e:
                    self._errors += 1
                    if self.error_handler:
                        self.error_handler(e, file_path)
                    # Continue scanning
                    continue

                except Exception as e:
                    self._errors += 1
                    if self.error_handler:
                        self.error_handler(e, file_path)
                    # Continue scanning
                    continue

    def _get_file_meta(self, path: Path) -> FileMeta:
        """Get file metadata

        Args:
            path: Path to file

        Returns:
            FileMeta object with file metadata

        Raises:
            OSError: If file cannot be accessed
        """
        stat = path.stat()

        # Get creation time (platform dependent)
        created_time = stat.st_ctime
        if hasattr(stat, 'st_birthtime'):
            created_time = stat.st_birthtime

        return FileMeta(
            path=path,
            size=stat.st_size,
            modified_time=stat.st_mtime,
            created_time=created_time
        )

    @property
    def files_scanned(self) -> int:
        """Get count of files scanned"""
        return self._files_scanned

    @property
    def bytes_scanned(self) -> int:
        """Get total bytes scanned"""
        return self._bytes_scanned

    @property
    def errors(self) -> int:
        """Get count of errors encountered"""
        return self._errors

    def reset_stats(self) -> None:
        """Reset scanning statistics"""
        self._files_scanned = 0
        self._bytes_scanned = 0
        self._errors = 0


class FilteredScanner(FileScanner):
    """Scanner with additional filtering capabilities"""

    def __init__(
        self,
        min_size: Optional[int] = None,
        max_size: Optional[int] = None,
        extensions: Optional[list[str]] = None,
        exclude_patterns: Optional[list[str]] = None,
        **kwargs
    ):
        """Initialize filtered scanner

        Args:
            min_size: Minimum file size in bytes
            max_size: Maximum file size in bytes
            extensions: List of file extensions to include (e.g., ['.txt', '.py'])
            exclude_patterns: List of path patterns to exclude
            **kwargs: Additional arguments passed to FileScanner
        """
        super().__init__(**kwargs)
        self.min_size = min_size
        self.max_size = max_size
        self.extensions = {ext.lower() for ext in extensions} if extensions else None
        self.exclude_patterns = exclude_patterns or []

    def scan(self, root: Path) -> Iterator[FileMeta]:
        """Scan with additional filtering

        Args:
            root: Root directory to scan

        Yields:
            FileMeta objects for files matching filter criteria
        """
        for meta in super().scan(root):
            # Size filtering
            if self.min_size is not None and meta.size < self.min_size:
                continue
            if self.max_size is not None and meta.size > self.max_size:
                continue

            # Extension filtering
            if self.extensions is not None:
                if meta.path.suffix.lower() not in self.extensions:
                    continue

            # Exclude pattern filtering
            if self._should_exclude(meta.path):
                continue

            yield meta

    def _should_exclude(self, path: Path) -> bool:
        """Check if path matches any exclude pattern

        Args:
            path: Path to check

        Returns:
            True if path should be excluded
        """
        path_str = str(path)
        for pattern in self.exclude_patterns:
            if pattern in path_str:
                return True
        return False

===== src/discovery/engine.py =====

"""Discovery engine coordinating scanner and system APIs"""
from pathlib import Path
from typing import Optional, Callable
from datetime import datetime
import psycopg2
from psycopg2.extras import execute_batch

from .scanner import FileScanner
from .system import SystemAPI
from ._protocols import FileMeta
from ..shared.models import FileRecord, DiskInfo, ProcessingStats
from ..shared.config import DatabaseConfig
from ..shared.logger import ProgressLogger


class DiscoveryEngine:
    """Discovery engine for scanning and cataloging files"""

    def __init__(
        self,
        db_config: DatabaseConfig,
        logger: ProgressLogger,
        batch_size: int = 1000
    ):
        """Initialize discovery engine

        Args:
            db_config: Database configuration
            logger: Progress logger
            batch_size: Number of records to batch before database commit
        """
        self.db_config = db_config
        self.logger = logger
        self.batch_size = batch_size
        self.system_api = SystemAPI()
        self._connection = None

    def _get_connection(self):
        """Get or create database connection"""
        if self._connection is None or self._connection.closed:
            self._connection = psycopg2.connect(
                host=self.db_config.host,
                port=self.db_config.port,
                database=self.db_config.database,
                user=self.db_config.user,
                password=self.db_config.password
            )
        return self._connection

    def _ensure_tables(self):
        """Ensure database tables exist"""
        conn = self._get_connection()
        cursor = conn.cursor()

        # Create files table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS files (
                id SERIAL PRIMARY KEY,
                path TEXT NOT NULL UNIQUE,
                size BIGINT NOT NULL,
                modified_time DOUBLE PRECISION NOT NULL,
                created_time DOUBLE PRECISION NOT NULL,
                disk TEXT NOT NULL,
                checksum TEXT,
                status TEXT DEFAULT 'indexed',
                category TEXT,
                duplicate_of TEXT,
                discovered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)

        # Create index on path
        cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx_files_path ON files(path)
        """)

        # Create index on disk
        cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx_files_disk ON files(disk)
        """)

        # Create index on checksum
        cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx_files_checksum ON files(checksum)
        """)

        conn.commit()
        cursor.close()

    def discover_path(
        self,
        root: Path,
        scanner: Optional[FileScanner] = None,
        progress_callback: Optional[Callable[[int, int, ProcessingStats], None]] = None
    ) -> ProcessingStats:
        """Discover and catalog files in a path

        Args:
            root: Root path to discover
            scanner: Optional custom scanner (default: FileScanner())
            progress_callback: Optional callback for progress updates

        Returns:
            ProcessingStats with discovery statistics
        """
        self.logger.section(f"Discovering: {root}")

        # Ensure tables exist
        self._ensure_tables()

        # Create scanner if not provided
        if scanner is None:
            scanner = FileScanner(
                error_handler=lambda e, p: self.logger.warning(f"Error scanning {p}: {e}")
            )

        # Get disk info for the root path
        disk = self.system_api.get_disk_for_path(root)
        if disk is None:
            disk = str(root)

        # Initialize statistics
        stats = ProcessingStats()
        batch = []

        conn = self._get_connection()
        cursor = conn.cursor()

        try:
            # Scan files
            for file_meta in scanner.scan(root):
                # Create file record
                record = FileRecord(
                    path=file_meta.path,
                    size=file_meta.size,
                    modified_time=file_meta.modified_time,
                    created_time=file_meta.created_time,
                    disk=disk
                )

                batch.append(record)
                stats.files_processed += 1
                stats.bytes_processed += record.size

                # Batch insert
                if len(batch) >= self.batch_size:
                    self._insert_batch(cursor, batch)
                    conn.commit()
                    batch.clear()

                    # Progress callback
                    if progress_callback:
                        progress_callback(stats.files_processed, 0, stats)

                    # Log progress
                    if stats.files_processed % (self.batch_size * 10) == 0:
                        self.logger.progress(
                            stats.files_processed,
                            stats.files_processed,  # We don't know total
                            prefix="Files discovered",
                            bytes_processed=stats.bytes_processed,
                            elapsed_seconds=stats.elapsed_seconds
                        )

            # Insert remaining batch
            if batch:
                self._insert_batch(cursor, batch)
                conn.commit()

            stats.files_succeeded = stats.files_processed

        except Exception as e:
            conn.rollback()
            self.logger.error(f"Discovery failed: {e}")
            raise

        finally:
            cursor.close()

        self.logger.info(
            f"Discovery complete: {stats.files_processed} files, "
            f"{stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s"
        )

        return stats

    def _insert_batch(self, cursor, batch: list[FileRecord]):
        """Insert batch of file records

        Args:
            cursor: Database cursor
            batch: List of FileRecord objects
        """
        query = """
            INSERT INTO files (path, size, modified_time, created_time, disk, checksum, status, category, duplicate_of)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON CONFLICT (path) DO UPDATE SET
                size = EXCLUDED.size,
                modified_time = EXCLUDED.modified_time,
                updated_at = CURRENT_TIMESTAMP
        """

        data = [
            (
                str(record.path),
                record.size,
                record.modified_time,
                record.created_time,
                record.disk,
                record.checksum,
                record.status,
                record.category,
                record.duplicate_of
            )
            for record in batch
        ]

        execute_batch(cursor, query, data, page_size=self.batch_size)

    def get_disk_info(self) -> list[DiskInfo]:
        """Get information about all disks

        Returns:
            List of DiskInfo objects
        """
        self.logger.subsection("Querying disk information")

        disks = []
        for disk_info in self.system_api.query_nvmes():
            # Get mount point if available
            mount_point = None
            fs_type = "unknown"

            for mount in self.system_api.query_mounts():
                if mount.device == disk_info.device:
                    mount_point = Path(mount.mount_point)
                    fs_type = mount.fs_type
                    break

            if mount_point:
                total, used, free = self.system_api.get_disk_usage(mount_point)
            else:
                total = disk_info.size
                used = 0
                free = disk_info.size

            disk = DiskInfo(
                name=disk_info.device,
                device=disk_info.device,
                mount_point=mount_point or Path("/"),
                total_size=total,
                used_size=used,
                free_size=free,
                fs_type=fs_type
            )
            disks.append(disk)

            self.logger.info(
                f"  {disk.name}: {disk.usage_percent:.1f}% used "
                f"({disk.used_size:,} / {disk.total_size:,} bytes)"
            )

        return disks

    def get_file_count(self, disk: Optional[str] = None) -> int:
        """Get count of discovered files

        Args:
            disk: Optional disk filter

        Returns:
            Count of files
        """
        conn = self._get_connection()
        cursor = conn.cursor()

        if disk:
            cursor.execute("SELECT COUNT(*) FROM files WHERE disk = %s", (disk,))
        else:
            cursor.execute("SELECT COUNT(*) FROM files")

        count = cursor.fetchone()[0]
        cursor.close()

        return count

    def get_total_size(self, disk: Optional[str] = None) -> int:
        """Get total size of discovered files

        Args:
            disk: Optional disk filter

        Returns:
            Total size in bytes
        """
        conn = self._get_connection()
        cursor = conn.cursor()

        if disk:
            cursor.execute("SELECT COALESCE(SUM(size), 0) FROM files WHERE disk = %s", (disk,))
        else:
            cursor.execute("SELECT COALESCE(SUM(size), 0) FROM files")

        total = cursor.fetchone()[0]
        cursor.close()

        return total

    def close(self):
        """Close database connection"""
        if self._connection and not self._connection.closed:
            self._connection.close()

    def __enter__(self):
        """Context manager entry"""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit"""
        self.close()

===== src/discovery/system.py =====

"""System API for querying mounts and disks"""
import os
import subprocess
from pathlib import Path
from typing import Optional
import psutil

from ._protocols import MountInfo, DiskInfo


class SystemAPI:
    """System information API for querying mounts and disks"""

    def query_mounts(self) -> list[MountInfo]:
        """Query mounted filesystems

        Returns:
            List of MountInfo objects for all mounted filesystems
        """
        mounts = []

        for partition in psutil.disk_partitions(all=False):
            mount_info = MountInfo(
                device=partition.device,
                mount_point=partition.mountpoint,
                fs_type=partition.fstype,
                options=partition.opts
            )
            mounts.append(mount_info)

        return mounts

    def query_nvmes(self) -> list[DiskInfo]:
        """Query NVMe/disk information

        Returns:
            List of DiskInfo objects for all disks
        """
        disks = []

        # Try to get disk information using lsblk
        try:
            result = subprocess.run(
                ['lsblk', '-ndo', 'NAME,MODEL,SIZE,SERIAL', '-b'],
                capture_output=True,
                text=True,
                check=False
            )

            if result.returncode == 0:
                for line in result.stdout.strip().split('\n'):
                    if not line.strip():
                        continue

                    parts = line.split(maxsplit=3)
                    if len(parts) >= 3:
                        device = f"/dev/{parts[0]}"
                        model = parts[1] if len(parts) > 1 else "Unknown"
                        size_str = parts[2] if len(parts) > 2 else "0"
                        serial = parts[3] if len(parts) > 3 else "Unknown"

                        try:
                            size = int(size_str)
                        except ValueError:
                            size = 0

                        disk_info = DiskInfo(
                            device=device,
                            model=model,
                            size=size,
                            serial=serial
                        )
                        disks.append(disk_info)

        except FileNotFoundError:
            # lsblk not available, fall back to basic info
            pass

        # If lsblk failed or unavailable, try alternative method
        if not disks:
            disks = self._query_disks_fallback()

        return disks

    def _query_disks_fallback(self) -> list[DiskInfo]:
        """Fallback method for querying disk information

        Returns:
            List of DiskInfo objects using psutil
        """
        disks = []
        seen_devices = set()

        for partition in psutil.disk_partitions(all=True):
            device = partition.device

            # Skip non-disk devices
            if not device.startswith('/dev/'):
                continue

            # Get base device (e.g., /dev/sda from /dev/sda1)
            base_device = self._get_base_device(device)

            if base_device in seen_devices:
                continue

            seen_devices.add(base_device)

            try:
                usage = psutil.disk_usage(partition.mountpoint)
                size = usage.total
            except (PermissionError, OSError):
                size = 0

            disk_info = DiskInfo(
                device=base_device,
                model="Unknown",
                size=size,
                serial="Unknown"
            )
            disks.append(disk_info)

        return disks

    def _get_base_device(self, device: str) -> str:
        """Extract base device name from partition device

        Args:
            device: Device path (e.g., /dev/sda1, /dev/nvme0n1p1)

        Returns:
            Base device path (e.g., /dev/sda, /dev/nvme0n1)
        """
        # Handle NVMe devices
        if 'nvme' in device:
            # /dev/nvme0n1p1 -> /dev/nvme0n1
            if 'p' in device:
                return device.rsplit('p', 1)[0]
            return device

        # Handle standard devices (sda, sdb, etc.)
        # /dev/sda1 -> /dev/sda
        import re
        match = re.match(r'(/dev/[a-z]+)', device)
        if match:
            return match.group(1)

        return device

    def get_disk_for_path(self, path: Path) -> Optional[str]:
        """Get the disk/mount point for a given path

        Args:
            path: Path to check

        Returns:
            Mount point device or None if not found
        """
        path = path.resolve()

        # Find the mount point that contains this path
        best_match = None
        best_match_len = 0

        for partition in psutil.disk_partitions():
            mount_point = Path(partition.mountpoint)
            try:
                if path == mount_point or mount_point in path.parents:
                    mount_len = len(str(mount_point))
                    if mount_len > best_match_len:
                        best_match = partition.device
                        best_match_len = mount_len
            except (ValueError, OSError):
                continue

        return best_match

    def get_disk_usage(self, path: Path) -> tuple[int, int, int]:
        """Get disk usage for a path

        Args:
            path: Path to check

        Returns:
            Tuple of (total, used, free) in bytes
        """
        try:
            usage = psutil.disk_usage(str(path))
            return usage.total, usage.used, usage.free
        except (PermissionError, OSError):
            return 0, 0, 0

    def get_mount_point(self, path: Path) -> Optional[Path]:
        """Get the mount point for a given path

        Args:
            path: Path to check

        Returns:
            Mount point path or None if not found
        """
        path = path.resolve()

        # Find the mount point that contains this path
        best_match = None
        best_match_len = 0

        for partition in psutil.disk_partitions():
            mount_point = Path(partition.mountpoint)
            try:
                if path == mount_point or mount_point in path.parents:
                    mount_len = len(str(mount_point))
                    if mount_len > best_match_len:
                        best_match = mount_point
                        best_match_len = mount_len
            except (ValueError, OSError):
                continue

        return best_match

    def is_same_filesystem(self, path1: Path, path2: Path) -> bool:
        """Check if two paths are on the same filesystem

        Args:
            path1: First path
            path2: Second path

        Returns:
            True if paths are on the same filesystem
        """
        try:
            stat1 = path1.stat()
            stat2 = path2.stat()
            return stat1.st_dev == stat2.st_dev
        except (OSError, PermissionError):
            return False

===== src/discovery/init.py =====

"""Discovery package exports"""
from .scanner import FileScanner, FilteredScanner
from .system import SystemAPI
from .engine import DiscoveryEngine
from ._protocols import FileMeta, MountInfo, DiskInfo, IFileScanner, ISystemAPI

__all__ = [
    'FileScanner',
    'FilteredScanner',
    'SystemAPI',
    'DiscoveryEngine',
    'FileMeta',
    'MountInfo',
    'DiskInfo',
    'IFileScanner',
    'ISystemAPI',
]

===== README.md =====

Hier is je **extreme short, sharp, architectural** versie — volledig gecomprimeerd, professioneel, helder.
Bron verwerkt uit je bestand 

---

# Disk Reorganizer — Architectural Summary

## Core Outcome

Migration from **SQLite → PostgreSQL** completed.
System is now **network-capable**, **auditable**, **scalable**, and offers **real-time operational telemetry**.

---

## Architecture

### Database Layer (PostgreSQL)

* Central DB: `disk_reorganizer_db`
* User: `disk_reorg_user`
* Tables: `files`, `operations`
* Features: indexes, triggers, conflict-upserts, audit fields
* Deployment: SQL + Windows/Linux setup scripts

### Application Layer

* Python driver migrated to **psycopg2**
* Unified DB config + connection pooling
* Refactored CRUD + batch commits
* Robust error handling + transactional execution

### Operational Layer

* **Dynamic in-screen logging** during indexing + migration

  * File/sec, GB processed, ETA, success/error counters
  * Clean single-line, non-spamming UI updates

---

## Workflow

1. **Setup**

```json
     {
  "host": "192.168.1.159",
  "port": 5432,
  "database": "disk_reorganizer_db",
  "user": "disk_reorg_user",
  "password": "heel-goed-wachtwoord"
}
./setup_database.sh      # or setup_database.bat
pip install -r requirements.txt
  1. Index

    python src/main.py index "D:\\" disk_d
    
  2. Plan

    python src/main.py plan disk_d disk_e
    
  3. Dry-Run

    python src/main.py execute plan.json --dry-run
    
  4. Execute

    python src/main.py execute plan.json
    
  5. Report

    python src/main.py report
    

Guarantees

  • No destructive actions by default
  • Originals preserved
  • Every action logged in DB
  • Error-resilient, continues safely
  • Suitable for millions of file records

Failure Points to Check

  • PostgreSQL reachable on 5432
  • Correct credentials
  • Disk permissions
  • Python + psycopg2 installed

Essence

A lean, safe, high-visibility disk migration tool running on a proper relational backbone, engineered for clarity, scale, and operational certainty.

Wil je ook een ultrakorte executive one-pager of een diagram-versie?


===== defrag.iml =====
<?xml version="1.0" encoding="UTF-8"?> ```

===== docker-compose.yml =====

services:
  # PostgreSQL Database
  postgres:
    image: postgres:15-alpine
    container_name: project_defrag_db
    environment:
      POSTGRES_USER: disk_reorg_user
      POSTGRES_PASSWORD: heel-goed-wachtwoord
      POSTGRES_DB: disk_reorganizer_db
      POSTGRES_INITDB_ARGS: "--encoding=UTF8 --locale=C"
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./sql/init.sql:/docker-entrypoint-initdb.d/init.sql
      - ./sql/migrations:/docker-entrypoint-initdb.d/migrations
    ports:
      - "5432:5432"
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U disk_reorg_user -d disk_reorganizer_db"]
      interval: 10s
      timeout: 5s
      retries: 5
    networks:
      - defrag-network

  # Redis for deduplication hash store (optional)
  redis:
    image: redis:7-alpine
    container_name: project_defrag_redis
    command: redis-server --appendonly yes
    volumes:
      - redis_data:/data
    ports:
      - "6379:6379"
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5
    networks:
      - defrag-network

  # Application Service
  app:
    build: .
    container_name: project_defrag_app
    depends_on:
      postgres:
        condition: service_healthy
      redis:
        condition: service_healthy
    environment:
      # Database Configuration
      DB_HOST: postgres
      DB_PORT: 5432
      DB_NAME: disk_reorganizer_db
      DB_USER: disk_reorg_user
      DB_PASSWORD: heel-goed-wachtwoord

      # Redis Configuration
      REDIS_HOST: redis
      REDIS_PORT: 6379

      # Application Configuration
      LOG_LEVEL: INFO
      MAX_WORKERS: 4
      CHUNK_SIZE_KB: 64

      # Mount points (set these when running specific commands)
      SOURCE_MOUNT: /mnt/source
      TARGET_MOUNT: /mnt/target
    volumes:
      # Mount host directories for file operations
      - ${HOST_SOURCE_PATH:-/mnt/source}:/mnt/source:ro
      - ${HOST_TARGET_PATH:-/mnt/target}:/mnt/target

      # Mount for configuration and plans
      - ./config:/app/config
      - ./plans:/app/plans
      - ./logs:/app/logs

      # Bind mount for development (optional)
      - .:/app
    networks:
      - defrag-network
    profiles:
      - full-cycle
      - development
    # Uncomment for development with hot reload
    # command: watchmedo auto-restart --pattern="*.py" --recursive -- python main.py

  # Single command services for specific operations
  index:
    build: .
    container_name: defrag_index
    depends_on:
      postgres:
        condition: service_healthy
    environment:
      DB_HOST: postgres
      DB_PORT: 5432
      DB_NAME: disk_reorganizer_db
      DB_USER: disk_reorg_user
      DB_PASSWORD: heel-goed-wachtwoord
    volumes:
      - ${HOST_SOURCE_PATH:-/mnt/source}:/mnt/source:ro
      - ./config:/app/config
      - ./logs:/app/logs
    command: ["python", "main.py", "index", "/mnt/source", "disk_d"]
    profiles:
      - index-only
    networks:
      - defrag-network

  plan:
    build: .
    container_name: defrag_plan
    depends_on:
      postgres:
        condition: service_healthy
    environment:
      DB_HOST: postgres
      DB_PORT: 5432
      DB_NAME: disk_reorganizer_db
      DB_USER: disk_reorg_user
      DB_PASSWORD: heel-goed-wachtwoord
    volumes:
      - ./config:/app/config
      - ./plans:/app/plans
      - ./logs:/app/logs
    command: ["python", "main.py", "plan", "disk_d", "disk_e"]
    profiles:
      - plan-only
    networks:
      - defrag-network

  execute:
    build: .
    container_name: defrag_execute
    depends_on:
      postgres:
        condition: service_healthy
    environment:
      DB_HOST: postgres
      DB_PORT: 5432
      DB_NAME: disk_reorganizer_db
      DB_USER: disk_reorg_user
      DB_PASSWORD: heel-goed-wachtwoord
    volumes:
      - ${HOST_SOURCE_PATH:-/mnt/source}:/mnt/source
      - ${HOST_TARGET_PATH:-/mnt/target}:/mnt/target
      - ./plans:/app/plans
      - ./config:/app/config
      - ./logs:/app/logs
    command: ["python", "main.py", "execute", "/app/plans/plan.json"]
    profiles:
      - execute-only
    networks:
      - defrag-network

  dry-run:
    build: .
    container_name: defrag_dry_run
    depends_on:
      postgres:
        condition: service_healthy
    environment:
      DB_HOST: postgres
      DB_PORT: 5432
      DB_NAME: disk_reorganizer_db
      DB_USER: disk_reorg_user
      DB_PASSWORD: heel-goed-wachtwoord
    volumes:
      - ./plans:/app/plans
      - ./config:/app/config
      - ./logs:/app/logs
    command: ["python", "main.py", "execute", "/app/plans/plan.json", "--dry-run"]
    profiles:
      - dry-run-only
    networks:
      - defrag-network

  report:
    build: .
    container_name: defrag_report
    depends_on:
      postgres:
        condition: service_healthy
    environment:
      DB_HOST: postgres
      DB_PORT: 5432
      DB_NAME: disk_reorganizer_db
      DB_USER: disk_reorg_user
      DB_PASSWORD: heel-goed-wachtwoord
    volumes:
      - ./reports:/app/reports
      - ./logs:/app/logs
    command: ["python", "main.py", "report", "--format", "html"]
    profiles:
      - report-only
    networks:
      - defrag-network

  # Monitoring and Admin Services
  pgadmin:
    image: dpage/pgadmin4:latest
    container_name: defrag_pgadmin
    environment:
      PGADMIN_DEFAULT_EMAIL: admin@defrag.local
      PGADMIN_DEFAULT_PASSWORD: admin123
    volumes:
      - pgadmin_data:/var/lib/pgadmin
    ports:
      - "5050:80"
    depends_on:
      - postgres
    profiles:
      - monitoring
    networks:
      - defrag-network

  redis-commander:
    image: rediscommander/redis-commander:latest
    container_name: defrag_redis_commander
    environment:
      REDIS_HOSTS: local:redis:6379
    ports:
      - "8081:8081"
    depends_on:
      - redis
    profiles:
      - monitoring
    networks:
      - defrag-network

networks:
  defrag-network:
    driver: bridge

volumes:
  postgres_data:
    driver: local
  redis_data:
    driver: local
  pgadmin_data:
    driver: local```

===== pyproject.toml =====
```toml
[build-system]
requires = ["setuptools>=65.0", "wheel"]
build-backend = "setuptools.build_meta"

[project]
name = "defrag"
version = "1.0.0"
description = "Intelligent disk reorganization system for 20TB+ data"
readme = "README.md"
requires-python = ">=3.9"
license = {text = "MIT"}
authors = [
    {name = "Project Defrag"}
]
keywords = ["disk", "storage", "deduplication", "classification", "migration"]
classifiers = [
    "Development Status :: 4 - Beta",
    "Intended Audience :: System Administrators",
    "Topic :: System :: Filesystems",
    "License :: OSI Approved :: MIT License",
    "Programming Language :: Python :: 3",
    "Programming Language :: Python :: 3.9",
    "Programming Language :: Python :: 3.10",
    "Programming Language :: Python :: 3.11",
    "Programming Language :: Python :: 3.12",
]

dependencies = [
    "psycopg2-binary>=2.9.0",
    "psutil>=5.9.0",
    "pandas>=1.5.0",
    "pyarrow>=10.0.0",
    "python-magic>=0.4.27",
]

[project.optional-dependencies]
redis = ["redis>=4.5.0"]
ml = ["scikit-learn>=1.2.0", "numpy>=1.24.0"]
dev = [
    "pytest>=7.2.0",
    "pytest-cov>=4.0.0",
    "black>=23.0.0",
    "mypy>=1.0.0",
    "flake8>=6.0.0",
]
all = [
    "redis>=4.5.0",
    "scikit-learn>=1.2.0",
    "numpy>=1.24.0",
]

[project.scripts]
defrag = "main:main"

[tool.black]
line-length = 100
target-version = ['py39', 'py310', 'py311', 'py312']
include = '\.pyi?$'

[tool.mypy]
python_version = "3.9"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = false
disallow_incomplete_defs = false
check_untyped_defs = true
no_implicit_optional = true

[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = "-v --cov=. --cov-report=html --cov-report=term"

===== docker-compose.override.yml =====

services:
  app:
    environment:
      - LOG_LEVEL=DEBUG
      - PYTHONPATH=/app
    volumes:
      - .:/app
      - /var/run/docker.sock:/var/run/docker.sock
    ports:
      - "8000:8000"
    command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload

  postgres:
    environment:
      - POSTGRES_LOG_STATEMENT=all
    ports:
      - "5433:5432"  # Different port to avoid conflict with host PostgreSQL

  redis:
    command: redis-server --appendonly yes --loglevel verbose

===== requirements.txt =====

# PostgreSQL database adapter for Python
psycopg2-binary>=2.9.9

# Alternative: psycopg2>=2.9.9 (requires PostgreSQL development libraries)
# Use psycopg2-binary for easier installation without compilation
# Core dependencies
# Optional/feature dependencies
redis>=4.5.0  # For RedisHashStore
scikit-learn>=1.0.0  # For MLClassifier
numpy>=1.21.0  # For MLClassifier

# Development dependencies
pytest>=7.0.0
pytest-cov>=4.0.0
black>=22.0.0
mypy>=0.950
flake8>=5.0.0
# Core dependencies
psycopg2-binary>=2.9.0
psutil>=5.9.0

# Data processing
pandas>=1.5.0
pyarrow>=10.0.0

# File type detection
python-magic>=0.4.27

# Optional dependencies
redis>=4.5.0  # For RedisHashStore (optional)
scikit-learn>=1.2.0  # For MLClassifier (optional)
numpy>=1.24.0  # For MLClassifier (optional)

# Development dependencies
pytest>=7.2.0
pytest-cov>=4.0.0
black>=23.0.0
mypy>=1.0.0
flake8>=6.0.0

===== ARCHITECTURE.md =====

# Data Reorganization Architecture: "Project Defrag"

## Executive Summary

This document outlines the architecture for reorganizing 20TB of backup data across multiple NVMe drives and servers. The solution implements intelligent deduplication, systematic categorization, and optimized storage patterns for enhanced performance and maintainability.

## System Architecture Overview

```mermaid
graph TB
    subgraph "Source Environment"
        A["Local Machine<br/>8x NVMe + 1 HDD<br/>~10TB"]
        B["Server Machine<br/>Mixed Storage<br/>~10TB"]
    end
    
    subgraph "Processing Layer"
        C["Discovery Engine"]
        D["Classification Engine"]
        E["Deduplication Engine"]
        F["Migration Engine"]
    end
    
    subgraph "Target Architecture"
        G["App Volumes"]
        H["Gitea Repository"]
        I["Build Cache (.maven, pycache)"]
        J["Artifactories"]
        K["Databases"]
        L["Backups"]
        M["LLM Model Cache"]
        N["Git Infrastructure"]
    end
    
    A --> C
    B --> C
    C --> D
    D --> E
    E --> F
    F --> G
    F --> H
    F --> I
    F --> J
    F --> K
    F --> L
    F --> M
    F --> N

Data Flow Architecture

Phase 1: Discovery & Assessment

sequenceDiagram
    participant D as Discovery Engine
    participant FS as File System Scanner
    participant DB as Metadata Database
    participant API as System APIs
    
    D->>FS: Scan directory structures
    FS->>FS: Identify file types, sizes, dates
    FS->>DB: Store file metadata
    D->>API: Query system information
    API->>DB: Store system context
    DB->>D: Return analysis summary

Phase 2: Classification & Deduplication

sequenceDiagram
    participant C as Classifier
    participant DH as Deduplication Hash
    participant CDB as Canonical DB
    participant MAP as Mapping Store
    
    C->>C: Analyze file signatures
    C->>DH: Generate content hashes
    DH->>CDB: Check for duplicates
    CDB->>DH: Return canonical reference
    DH->>MAP: Store deduplication map
    C->>C: Apply categorization rules

Target Directory Structure

/mnt/organized/
├── apps/
│   ├── volumes/
│   │   ├── docker-volumes/
│   │   ├── app-data/
│   │   └── user-profiles/
│   └── runtime/
├── development/
│   ├── gitea/
│   │   ├── repositories/
│   │   ├── lfs-objects/
│   │   └── avatars/
│   ├── git-infrastructure/
│   │   ├── hooks/
│   │   ├── templates/
│   │   └── config/
│   └── build-tools/
│       ├── .maven/repository/
│       ├── gradle-cache/
│       └── sbt-cache/
├── artifacts/
│   ├── java/
│   │   ├── maven-central-cache/
│   │   ├── jfrog-artifactory/
│   │   └── gradle-build-cache/
│   ├── python/
│   │   ├── pypi-cache/
│   │   ├── wheelhouse/
│   │   └── pip-cache/
│   ├── node/
│   │   ├── npm-registry/
│   │   ├── yarn-cache/
│   │   └── pnpm-store/
│   └── go/
│       ├── goproxy-cache/
│       ├── module-cache/
│       └── sumdb-cache/
├── cache/
│   ├── llm-models/
│   │   ├── hugging-face/
│   │   ├── openai-cache/
│   │   └── local-llm/
│   ├── pycache/
│   ├── node_modules-archive/
│   └── browser-cache/
├── databases/
│   ├── postgresql/
│   ├── mysql/
│   ├── mongodb/
│   └── redis/
├── backups/
│   ├── system/
│   ├── application/
│   ├── database/
│   └── archive/
└── temp/
    ├── processing/
    ├── staging/
    └── cleanup/

Technology Stack Recommendation

Primary Language: Python 3.11+

Rationale:

  • Excellent file system handling capabilities
  • Rich ecosystem for data processing (pandas, pyarrow)
  • Built-in multiprocessing for I/O operations
  • Superior hash library support for deduplication
  • Cross-platform compatibility

Key Libraries:

# Core processing
import asyncio
import hashlib
import multiprocessing as mp
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# Data handling
import pandas as pd
import pyarrow as pa
import sqlite3
import json

# File analysis
import magic  # python-magic
import mimetypes
import filetype

# System integration
import psutil
import shutil
import os

Deduplication Strategy

Algorithm Selection: Variable-Size Chunking with Rabin Fingerprinting

class AdvancedDeduplication:
    def __init__(self, avg_chunk_size=8192):
        self.chunker = RabinChunker(avg_chunk_size)
        self.hash_store = HashStore()
        
    def deduplicate_file(self, file_path):
        chunks = self.chunker.chunk_file(file_path)
        file_hash = self.compute_file_hash(chunks)
        
        if self.hash_store.exists(file_hash):
            return self.create_reference(file_hash)
        else:
            self.store_canonical(file_path, file_hash)
            return file_hash

Performance Optimization:

  • Parallel Processing: Utilize all CPU cores for hashing
  • Memory Mapping: For large files (>100MB)
  • Incremental Hashing: Process files in streams
  • Cache Layer: Redis for frequently accessed hashes

Classification Engine

Rule-Based Classification System:

classification_rules:
  build_artifacts:
    patterns:
      - "**/target/**"
      - "**/build/**"
      - "**/dist/**"
      - "**/node_modules/**"
    action: categorize_as_build_cache
  
  development_tools:
    patterns:
      - "**/.maven/**"
      - "**/.gradle/**"
      - "**/.npm/**"
      - "**/.cache/**"
    action: categorize_as_tool_cache
  
  repositories:
    patterns:
      - "**/.git/**"
      - "**/repositories/**"
      - "**/gitea/**"
    action: categorize_as_vcs
  
  database_files:
    patterns:
      - "**/*.db"
      - "**/*.sqlite"
      - "**/postgresql/**"
      - "**/mysql/**"
    action: categorize_as_database
  
  model_files:
    patterns:
      - "**/*.bin"
      - "**/*.onnx"
      - "**/models/**"
      - "**/llm*/**"
    action: categorize_as_ai_model

Performance Considerations

NVMe Optimization Strategies:

  1. Parallel I/O Operations

    • Queue depth optimization (32-64 operations)
    • Async I/O with io_uring where available
    • Multi-threaded directory traversal
  2. Memory Management

    • Streaming processing for large files
    • Memory-mapped file access
    • Buffer pool for frequent operations
  3. CPU Optimization

    • SIMD instructions for hashing (AVX2/NEON)
    • Process pool for parallel processing
    • NUMA-aware memory allocation

Migration Strategy

Three-Phase Approach:

graph LR
    A[Phase 1: Analysis] --> B[Phase 2: Staging]
    B --> C[Phase 3: Migration]
    
    A --> A1[Discovery Scan]
    A --> A2[Deduplication Analysis]
    A --> A3[Space Calculation]
    
    B --> B1[Create Target Structure]
    B --> B2[Hard Link Staging]
    B --> B3[Validation Check]
    
    C --> C1[Atomic Move Operations]
    C --> C2[Symlink Updates]
    C --> C3[Cleanup Verification]

Monitoring & Validation

Key Metrics:

  • Processing Rate: Files/second, GB/hour
  • Deduplication Ratio: Original vs. Final size
  • Error Rate: Failed operations percentage
  • Resource Usage: CPU, Memory, I/O utilization

Validation Checks:

  • File integrity verification (hash comparison)
  • Directory structure validation
  • Symlink resolution testing
  • Permission preservation audit

Risk Mitigation

Safety Measures:

  1. Read-First Approach: Never modify source until validation
  2. Incremental Processing: Process in small batches
  3. Backup Verification: Ensure backup integrity before operations
  4. Rollback Capability: Maintain reverse mapping for recovery
  5. Dry-Run Mode: Preview all operations before execution

Implementation Timeline

Phase 1: Tool Development (2-3 weeks)

  • Core discovery engine
  • Classification system
  • Basic deduplication
  • Testing framework

Phase 2: Staging & Validation (1-2 weeks)

  • Target structure creation
  • Sample data processing
  • Performance optimization
  • Safety verification

Phase 3: Production Migration (2-4 weeks)

  • Full data processing
  • Continuous monitoring
  • Issue resolution
  • Final validation

This architecture provides a robust, scalable solution for your data reorganization needs while maintaining data integrity and optimizing for your NVMe storage infrastructure.```

===== Dockerfile =====

# Dockerfile for Project Defrag with PostgreSQL integration
FROM python:3.11-slim

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    libpq-dev \
    postgresql-client \
    && rm -rf /var/lib/apt/lists/*

# Set working directory
WORKDIR /app

# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1 \
    PYTHONPATH=/app

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && \
    pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Create non-root user
RUN useradd -m -u 1000 appuser && \
    chown -R appuser:appuser /app
USER appuser

# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
    CMD python -c "import psycopg2; psycopg2.connect(dbname='${POSTGRES_DB:-disk_reorganizer_db}', user='${POSTGRES_USER:-disk_reorg_user}', password='${POSTGRES_PASSWORD}', host='${DB_HOST:-db}', port='${DB_PORT:-5432}')" || exit 1

# Default command (can be overridden in docker-compose)
CMD ["python", "main.py", "--help"]```

===== DIRECTORY TREE =====
```text