Compare commits

...

2 Commits

Author SHA1 Message Date
mike
0eba1619cf initial commit 2025-12-12 03:46:30 +01:00
mike
bad0d82447 base 2025-12-12 03:28:42 +01:00
38 changed files with 5164 additions and 999 deletions

60
.gitignore vendored
View File

@@ -1,13 +1,7 @@
### PythonVanilla template
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
@@ -21,48 +15,28 @@ parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
*.sqlite3
*.db
*.log
coverage.xml
*.coverage
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
pytest.xml
htmlcov/
.tox/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
.mypy_cache/
.pyre/

10
.idea/.gitignore generated vendored Normal file
View File

@@ -0,0 +1,10 @@
# Default ignored files
/shelf/
/workspace.xml
# Ignored default folder with query files
/queries/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
# Editor-based HTTP Client requests
/httpRequests/

6
.idea/copilot.data.migration.agent.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="AgentMigrationStateService">
<option name="migrationStatus" value="COMPLETED" />
</component>
</project>

14
.idea/defrag.iml generated Normal file
View File

@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
</content>
<orderEntry type="jdk" jdkName="Python 3.13 (.venv)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
</module>

View File

@@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

17
.idea/material_theme_project_new.xml generated Normal file
View File

@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="MaterialThemeProjectNewConfig">
<option name="metadata">
<MTProjectMetadataState>
<option name="migrated" value="true" />
<option name="pristineConfig" value="false" />
<option name="userId" value="-d8205b3:197aab68e6f:-7ffe" />
</MTProjectMetadataState>
</option>
<option name="titleBarState">
<MTProjectTitleBarConfigState>
<option name="overrideColor" value="false" />
</MTProjectTitleBarConfigState>
</option>
</component>
</project>

7
.idea/misc.xml generated Normal file
View File

@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.13 (.venv)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.13 (.venv)" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml generated Normal file
View File

@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/defrag.iml" filepath="$PROJECT_DIR$/.idea/defrag.iml" />
</modules>
</component>
</project>

6
.idea/vcs.xml generated Normal file
View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

340
ARCHITECTURE.md Normal file
View File

@@ -0,0 +1,340 @@
# Data Reorganization Architecture: "Project Defrag"
## Executive Summary
This document outlines the architecture for reorganizing 20TB of backup data across multiple NVMe drives and servers. The solution implements intelligent deduplication, systematic categorization, and optimized storage patterns for enhanced performance and maintainability.
## System Architecture Overview
```mermaid
graph TB
subgraph "Source Environment"
A["Local Machine<br/>8x NVMe + 1 HDD<br/>~10TB"]
B["Server Machine<br/>Mixed Storage<br/>~10TB"]
end
subgraph "Processing Layer"
C["Discovery Engine"]
D["Classification Engine"]
E["Deduplication Engine"]
F["Migration Engine"]
end
subgraph "Target Architecture"
G["App Volumes"]
H["Gitea Repository"]
I["Build Cache (.maven, pycache)"]
J["Artifactories"]
K["Databases"]
L["Backups"]
M["LLM Model Cache"]
N["Git Infrastructure"]
end
A --> C
B --> C
C --> D
D --> E
E --> F
F --> G
F --> H
F --> I
F --> J
F --> K
F --> L
F --> M
F --> N
```
## Data Flow Architecture
### Phase 1: Discovery & Assessment
```mermaid
sequenceDiagram
participant D as Discovery Engine
participant FS as File System Scanner
participant DB as Metadata Database
participant API as System APIs
D->>FS: Scan directory structures
FS->>FS: Identify file types, sizes, dates
FS->>DB: Store file metadata
D->>API: Query system information
API->>DB: Store system context
DB->>D: Return analysis summary
```
### Phase 2: Classification & Deduplication
```mermaid
sequenceDiagram
participant C as Classifier
participant DH as Deduplication Hash
participant CDB as Canonical DB
participant MAP as Mapping Store
C->>C: Analyze file signatures
C->>DH: Generate content hashes
DH->>CDB: Check for duplicates
CDB->>DH: Return canonical reference
DH->>MAP: Store deduplication map
C->>C: Apply categorization rules
```
## Target Directory Structure
```
/mnt/organized/
├── apps/
│ ├── volumes/
│ │ ├── docker-volumes/
│ │ ├── app-data/
│ │ └── user-profiles/
│ └── runtime/
├── development/
│ ├── gitea/
│ │ ├── repositories/
│ │ ├── lfs-objects/
│ │ └── avatars/
│ ├── git-infrastructure/
│ │ ├── hooks/
│ │ ├── templates/
│ │ └── config/
│ └── build-tools/
│ ├── .maven/repository/
│ ├── gradle-cache/
│ └── sbt-cache/
├── artifacts/
│ ├── java/
│ │ ├── maven-central-cache/
│ │ ├── jfrog-artifactory/
│ │ └── gradle-build-cache/
│ ├── python/
│ │ ├── pypi-cache/
│ │ ├── wheelhouse/
│ │ └── pip-cache/
│ ├── node/
│ │ ├── npm-registry/
│ │ ├── yarn-cache/
│ │ └── pnpm-store/
│ └── go/
│ ├── goproxy-cache/
│ ├── module-cache/
│ └── sumdb-cache/
├── cache/
│ ├── llm-models/
│ │ ├── hugging-face/
│ │ ├── openai-cache/
│ │ └── local-llm/
│ ├── pycache/
│ ├── node_modules-archive/
│ └── browser-cache/
├── databases/
│ ├── postgresql/
│ ├── mysql/
│ ├── mongodb/
│ └── redis/
├── backups/
│ ├── system/
│ ├── application/
│ ├── database/
│ └── archive/
└── temp/
├── processing/
├── staging/
└── cleanup/
```
## Technology Stack Recommendation
### Primary Language: **Python 3.11+**
**Rationale:**
- Excellent file system handling capabilities
- Rich ecosystem for data processing (pandas, pyarrow)
- Built-in multiprocessing for I/O operations
- Superior hash library support for deduplication
- Cross-platform compatibility
### Key Libraries:
```python
# Core processing
import asyncio
import hashlib
import multiprocessing as mp
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# Data handling
import pandas as pd
import pyarrow as pa
import sqlite3
import json
# File analysis
import magic # python-magic
import mimetypes
import filetype
# System integration
import psutil
import shutil
import os
```
## Deduplication Strategy
### Algorithm Selection: **Variable-Size Chunking with Rabin Fingerprinting**
```python
class AdvancedDeduplication:
def __init__(self, avg_chunk_size=8192):
self.chunker = RabinChunker(avg_chunk_size)
self.hash_store = HashStore()
def deduplicate_file(self, file_path):
chunks = self.chunker.chunk_file(file_path)
file_hash = self.compute_file_hash(chunks)
if self.hash_store.exists(file_hash):
return self.create_reference(file_hash)
else:
self.store_canonical(file_path, file_hash)
return file_hash
```
### Performance Optimization:
- **Parallel Processing**: Utilize all CPU cores for hashing
- **Memory Mapping**: For large files (>100MB)
- **Incremental Hashing**: Process files in streams
- **Cache Layer**: Redis for frequently accessed hashes
## Classification Engine
### Rule-Based Classification System:
```yaml
classification_rules:
build_artifacts:
patterns:
- "**/target/**"
- "**/build/**"
- "**/dist/**"
- "**/node_modules/**"
action: categorize_as_build_cache
development_tools:
patterns:
- "**/.maven/**"
- "**/.gradle/**"
- "**/.npm/**"
- "**/.cache/**"
action: categorize_as_tool_cache
repositories:
patterns:
- "**/.git/**"
- "**/repositories/**"
- "**/gitea/**"
action: categorize_as_vcs
database_files:
patterns:
- "**/*.db"
- "**/*.sqlite"
- "**/postgresql/**"
- "**/mysql/**"
action: categorize_as_database
model_files:
patterns:
- "**/*.bin"
- "**/*.onnx"
- "**/models/**"
- "**/llm*/**"
action: categorize_as_ai_model
```
## Performance Considerations
### NVMe Optimization Strategies:
1. **Parallel I/O Operations**
- Queue depth optimization (32-64 operations)
- Async I/O with io_uring where available
- Multi-threaded directory traversal
2. **Memory Management**
- Streaming processing for large files
- Memory-mapped file access
- Buffer pool for frequent operations
3. **CPU Optimization**
- SIMD instructions for hashing (AVX2/NEON)
- Process pool for parallel processing
- NUMA-aware memory allocation
## Migration Strategy
### Three-Phase Approach:
```mermaid
graph LR
A[Phase 1: Analysis] --> B[Phase 2: Staging]
B --> C[Phase 3: Migration]
A --> A1[Discovery Scan]
A --> A2[Deduplication Analysis]
A --> A3[Space Calculation]
B --> B1[Create Target Structure]
B --> B2[Hard Link Staging]
B --> B3[Validation Check]
C --> C1[Atomic Move Operations]
C --> C2[Symlink Updates]
C --> C3[Cleanup Verification]
```
## Monitoring & Validation
### Key Metrics:
- **Processing Rate**: Files/second, GB/hour
- **Deduplication Ratio**: Original vs. Final size
- **Error Rate**: Failed operations percentage
- **Resource Usage**: CPU, Memory, I/O utilization
### Validation Checks:
- File integrity verification (hash comparison)
- Directory structure validation
- Symlink resolution testing
- Permission preservation audit
## Risk Mitigation
### Safety Measures:
1. **Read-First Approach**: Never modify source until validation
2. **Incremental Processing**: Process in small batches
3. **Backup Verification**: Ensure backup integrity before operations
4. **Rollback Capability**: Maintain reverse mapping for recovery
5. **Dry-Run Mode**: Preview all operations before execution
## Implementation Timeline
### Phase 1: Tool Development (2-3 weeks)
- Core discovery engine
- Classification system
- Basic deduplication
- Testing framework
### Phase 2: Staging & Validation (1-2 weeks)
- Target structure creation
- Sample data processing
- Performance optimization
- Safety verification
### Phase 3: Production Migration (2-4 weeks)
- Full data processing
- Continuous monitoring
- Issue resolution
- Final validation
This architecture provides a robust, scalable solution for your data reorganization needs while maintaining data integrity and optimizing for your NVMe storage infrastructure.

1012
README.md

File diff suppressed because it is too large Load Diff

View File

@@ -3,3 +3,15 @@ psycopg2-binary>=2.9.9
# Alternative: psycopg2>=2.9.9 (requires PostgreSQL development libraries)
# Use psycopg2-binary for easier installation without compilation
# Core dependencies
# Optional/feature dependencies
redis>=4.5.0 # For RedisHashStore
scikit-learn>=1.0.0 # For MLClassifier
numpy>=1.21.0 # For MLClassifier
# Development dependencies
pytest>=7.0.0
pytest-cov>=4.0.0
black>=22.0.0
mypy>=0.950
flake8>=5.0.0

View File

@@ -0,0 +1,17 @@
"""Classification package exports"""
from .rules import RuleBasedClassifier
from .ml import create_ml_classifier, train_from_database, MLClassifier, DummyMLClassifier
from .engine import ClassificationEngine
from ._protocols import ClassificationRule, IClassifier, IRuleEngine
__all__ = [
'RuleBasedClassifier',
'MLClassifier',
'DummyMLClassifier',
'create_ml_classifier',
'train_from_database',
'ClassificationEngine',
'ClassificationRule',
'IClassifier',
'IRuleEngine',
]

View File

@@ -0,0 +1,72 @@
"""Protocol definitions for the classification package"""
from typing import Protocol, Optional
from pathlib import Path
from dataclasses import dataclass
@dataclass
class ClassificationRule:
"""Rule for classifying files"""
name: str
category: str
patterns: list[str]
priority: int = 0
description: str = ""
class IClassifier(Protocol):
"""Protocol for classification operations"""
def classify(self, path: Path, file_type: Optional[str] = None) -> Optional[str]:
"""Classify a file path
Args:
path: Path to classify
file_type: Optional file type hint
Returns:
Category name or None if no match
"""
...
def get_category_rules(self, category: str) -> list[ClassificationRule]:
"""Get all rules for a category
Args:
category: Category name
Returns:
List of rules for the category
"""
...
class IRuleEngine(Protocol):
"""Protocol for rule-based classification"""
def add_rule(self, rule: ClassificationRule) -> None:
"""Add a classification rule
Args:
rule: Rule to add
"""
...
def remove_rule(self, rule_name: str) -> None:
"""Remove a rule by name
Args:
rule_name: Name of rule to remove
"""
...
def match_path(self, path: Path) -> Optional[str]:
"""Match path against rules
Args:
path: Path to match
Returns:
Category name or None if no match
"""
...

View File

@@ -0,0 +1,350 @@
"""Main classification engine"""
from pathlib import Path
from typing import Optional, Callable
import psycopg2
from .rules import RuleBasedClassifier
from .ml import create_ml_classifier, DummyMLClassifier
from ..shared.models import ProcessingStats
from ..shared.config import DatabaseConfig
from ..shared.logger import ProgressLogger
class ClassificationEngine:
"""Engine for classifying files"""
def __init__(
self,
db_config: DatabaseConfig,
logger: ProgressLogger,
use_ml: bool = False
):
"""Initialize classification engine
Args:
db_config: Database configuration
logger: Progress logger
use_ml: Whether to use ML classification in addition to rules
"""
self.db_config = db_config
self.logger = logger
self.rule_classifier = RuleBasedClassifier()
self.ml_classifier = create_ml_classifier() if use_ml else None
self.use_ml = use_ml and not isinstance(self.ml_classifier, DummyMLClassifier)
self._connection = None
def _get_connection(self):
"""Get or create database connection"""
if self._connection is None or self._connection.closed:
self._connection = psycopg2.connect(
host=self.db_config.host,
port=self.db_config.port,
database=self.db_config.database,
user=self.db_config.user,
password=self.db_config.password
)
return self._connection
def classify_all(
self,
disk: Optional[str] = None,
batch_size: int = 1000,
progress_callback: Optional[Callable[[int, int, ProcessingStats], None]] = None
) -> ProcessingStats:
"""Classify all files in database
Args:
disk: Optional disk filter
batch_size: Number of files to process per batch
progress_callback: Optional callback for progress updates
Returns:
ProcessingStats with classification statistics
"""
self.logger.section("Starting Classification")
conn = self._get_connection()
cursor = conn.cursor()
# Get files without categories
if disk:
cursor.execute("""
SELECT path, checksum
FROM files
WHERE disk = %s AND category IS NULL
""", (disk,))
else:
cursor.execute("""
SELECT path, checksum
FROM files
WHERE category IS NULL
""")
files_to_classify = cursor.fetchall()
total_files = len(files_to_classify)
self.logger.info(f"Found {total_files} files to classify")
stats = ProcessingStats()
batch = []
for path_str, checksum in files_to_classify:
path = Path(path_str)
# Classify using rules first
category = self.rule_classifier.classify(path)
# If no rule match and ML is available, try ML
if category is None and self.use_ml and self.ml_classifier:
category = self.ml_classifier.classify(path)
# If still no category, assign default
if category is None:
category = "temp/processing"
batch.append((category, str(path)))
stats.files_processed += 1
# Batch update
if len(batch) >= batch_size:
self._update_categories(cursor, batch)
conn.commit()
batch.clear()
# Progress callback
if progress_callback:
progress_callback(stats.files_processed, total_files, stats)
# Log progress
if stats.files_processed % (batch_size * 10) == 0:
self.logger.progress(
stats.files_processed,
total_files,
prefix="Files classified",
elapsed_seconds=stats.elapsed_seconds
)
# Update remaining batch
if batch:
self._update_categories(cursor, batch)
conn.commit()
stats.files_succeeded = stats.files_processed
cursor.close()
self.logger.info(
f"Classification complete: {stats.files_processed} files in {stats.elapsed_seconds:.1f}s"
)
return stats
def _update_categories(self, cursor, batch: list[tuple[str, str]]):
"""Update categories in batch
Args:
cursor: Database cursor
batch: List of (category, path) tuples
"""
from psycopg2.extras import execute_batch
query = """
UPDATE files
SET category = %s
WHERE path = %s
"""
execute_batch(cursor, query, batch)
def classify_path(self, path: Path) -> Optional[str]:
"""Classify a single path
Args:
path: Path to classify
Returns:
Category name or None
"""
# Try rules first
category = self.rule_classifier.classify(path)
# Try ML if available
if category is None and self.use_ml and self.ml_classifier:
category = self.ml_classifier.classify(path)
return category
def get_category_stats(self) -> dict[str, dict]:
"""Get statistics by category
Returns:
Dictionary mapping category to statistics
"""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT
category,
COUNT(*) as file_count,
SUM(size) as total_size
FROM files
WHERE category IS NOT NULL
GROUP BY category
ORDER BY total_size DESC
""")
stats = {}
for category, file_count, total_size in cursor.fetchall():
stats[category] = {
'file_count': file_count,
'total_size': total_size
}
cursor.close()
return stats
def get_uncategorized_count(self) -> int:
"""Get count of uncategorized files
Returns:
Number of files without category
"""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM files WHERE category IS NULL")
count = cursor.fetchone()[0]
cursor.close()
return count
def reclassify_category(
self,
old_category: str,
new_category: str
) -> int:
"""Reclassify all files in a category
Args:
old_category: Current category
new_category: New category
Returns:
Number of files reclassified
"""
self.logger.info(f"Reclassifying {old_category} -> {new_category}")
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE files
SET category = %s
WHERE category = %s
""", (new_category, old_category))
count = cursor.rowcount
conn.commit()
cursor.close()
self.logger.info(f"Reclassified {count} files")
return count
def train_ml_classifier(
self,
min_samples: int = 10
) -> bool:
"""Train ML classifier from existing categorized data
Args:
min_samples: Minimum samples per category
Returns:
True if training successful
"""
if not self.use_ml or self.ml_classifier is None:
self.logger.warning("ML classifier not available")
return False
self.logger.subsection("Training ML Classifier")
conn = self._get_connection()
cursor = conn.cursor()
# Get categorized files
cursor.execute("""
SELECT path, category
FROM files
WHERE category IS NOT NULL
""")
training_data = [(Path(path), category) for path, category in cursor.fetchall()]
cursor.close()
if not training_data:
self.logger.warning("No training data available")
return False
# Count samples per category
category_counts = {}
for _, category in training_data:
category_counts[category] = category_counts.get(category, 0) + 1
# Filter categories with enough samples
filtered_data = [
(path, category)
for path, category in training_data
if category_counts[category] >= min_samples
]
if not filtered_data:
self.logger.warning(f"No categories with >= {min_samples} samples")
return False
self.logger.info(f"Training with {len(filtered_data)} samples")
try:
self.ml_classifier.train(filtered_data)
self.logger.info("ML classifier trained successfully")
return True
except Exception as e:
self.logger.error(f"Failed to train ML classifier: {e}")
return False
def get_all_categories(self) -> list[str]:
"""Get all categories from database
Returns:
List of category names
"""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT DISTINCT category
FROM files
WHERE category IS NOT NULL
ORDER BY category
""")
categories = [row[0] for row in cursor.fetchall()]
cursor.close()
return categories
def close(self):
"""Close database connection"""
if self._connection and not self._connection.closed:
self._connection.close()
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.close()

269
src/classification/ml.py Normal file
View File

@@ -0,0 +1,269 @@
"""ML-based classification (optional, using sklearn if available)"""
from pathlib import Path
from typing import Optional, List, Tuple
import pickle
try:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import Pipeline
SKLEARN_AVAILABLE = True
except ImportError:
SKLEARN_AVAILABLE = False
class MLClassifier:
"""Machine learning-based file classifier
Uses path-based features and optional metadata to classify files.
Requires scikit-learn to be installed.
"""
def __init__(self):
"""Initialize ML classifier"""
if not SKLEARN_AVAILABLE:
raise ImportError(
"scikit-learn is required for ML classification. "
"Install with: pip install scikit-learn"
)
self.model: Optional[Pipeline] = None
self.categories: List[str] = []
self._is_trained = False
def _extract_features(self, path: Path) -> str:
"""Extract features from path
Args:
path: Path to extract features from
Returns:
Feature string
"""
# Convert path to feature string
# Include: path parts, extension, filename
parts = path.parts
extension = path.suffix
filename = path.name
features = []
# Add path components
features.extend(parts)
# Add extension
if extension:
features.append(f"ext:{extension}")
# Add filename components (split on common separators)
name_parts = filename.replace('-', ' ').replace('_', ' ').replace('.', ' ').split()
features.extend([f"name:{part}" for part in name_parts])
return ' '.join(features)
def train(self, training_data: List[Tuple[Path, str]]) -> None:
"""Train the classifier
Args:
training_data: List of (path, category) tuples
"""
if not training_data:
raise ValueError("Training data cannot be empty")
# Extract features and labels
X = [self._extract_features(path) for path, _ in training_data]
y = [category for _, category in training_data]
# Store unique categories
self.categories = sorted(set(y))
# Create and train pipeline
self.model = Pipeline([
('tfidf', TfidfVectorizer(
max_features=1000,
ngram_range=(1, 2),
min_df=1
)),
('classifier', MultinomialNB())
])
self.model.fit(X, y)
self._is_trained = True
def classify(self, path: Path, file_type: Optional[str] = None) -> Optional[str]:
"""Classify a file path
Args:
path: Path to classify
file_type: Optional file type hint (not used in ML classifier)
Returns:
Category name or None if not trained
"""
if not self._is_trained or self.model is None:
return None
features = self._extract_features(path)
try:
prediction = self.model.predict([features])[0]
return prediction
except Exception:
return None
def predict_proba(self, path: Path) -> dict[str, float]:
"""Get prediction probabilities for all categories
Args:
path: Path to classify
Returns:
Dictionary mapping category to probability
"""
if not self._is_trained or self.model is None:
return {}
features = self._extract_features(path)
try:
probabilities = self.model.predict_proba([features])[0]
return {
category: float(prob)
for category, prob in zip(self.categories, probabilities)
}
except Exception:
return {}
def save_model(self, model_path: Path) -> None:
"""Save trained model to disk
Args:
model_path: Path to save model
"""
if not self._is_trained:
raise ValueError("Cannot save untrained model")
model_data = {
'model': self.model,
'categories': self.categories,
'is_trained': self._is_trained
}
with open(model_path, 'wb') as f:
pickle.dump(model_data, f)
def load_model(self, model_path: Path) -> None:
"""Load trained model from disk
Args:
model_path: Path to model file
"""
with open(model_path, 'rb') as f:
model_data = pickle.load(f)
self.model = model_data['model']
self.categories = model_data['categories']
self._is_trained = model_data['is_trained']
@property
def is_trained(self) -> bool:
"""Check if model is trained"""
return self._is_trained
class DummyMLClassifier:
"""Dummy ML classifier for when sklearn is not available"""
def __init__(self):
"""Initialize dummy classifier"""
pass
def train(self, training_data: List[Tuple[Path, str]]) -> None:
"""Dummy train method"""
raise NotImplementedError(
"ML classification requires scikit-learn. "
"Install with: pip install scikit-learn"
)
def classify(self, path: Path, file_type: Optional[str] = None) -> Optional[str]:
"""Dummy classify method"""
return None
def predict_proba(self, path: Path) -> dict[str, float]:
"""Dummy predict_proba method"""
return {}
def save_model(self, model_path: Path) -> None:
"""Dummy save_model method"""
raise NotImplementedError("ML classification not available")
def load_model(self, model_path: Path) -> None:
"""Dummy load_model method"""
raise NotImplementedError("ML classification not available")
@property
def is_trained(self) -> bool:
"""Check if model is trained"""
return False
def create_ml_classifier() -> MLClassifier | DummyMLClassifier:
"""Create ML classifier if sklearn is available, otherwise return dummy
Returns:
MLClassifier or DummyMLClassifier
"""
if SKLEARN_AVAILABLE:
return MLClassifier()
else:
return DummyMLClassifier()
def train_from_database(
db_connection,
min_samples_per_category: int = 10
) -> MLClassifier | DummyMLClassifier:
"""Train ML classifier from database
Args:
db_connection: Database connection
min_samples_per_category: Minimum samples required per category
Returns:
Trained classifier
"""
classifier = create_ml_classifier()
if isinstance(classifier, DummyMLClassifier):
return classifier
# Query classified files from database
cursor = db_connection.cursor()
cursor.execute("""
SELECT path, category
FROM files
WHERE category IS NOT NULL
""")
training_data = [(Path(path), category) for path, category in cursor.fetchall()]
cursor.close()
if not training_data:
return classifier
# Count samples per category
category_counts = {}
for _, category in training_data:
category_counts[category] = category_counts.get(category, 0) + 1
# Filter to categories with enough samples
filtered_data = [
(path, category)
for path, category in training_data
if category_counts[category] >= min_samples_per_category
]
if filtered_data:
classifier.train(filtered_data)
return classifier

282
src/classification/rules.py Normal file
View File

@@ -0,0 +1,282 @@
"""Rule-based classification engine"""
from pathlib import Path
from typing import Optional
import fnmatch
from ._protocols import ClassificationRule
class RuleBasedClassifier:
"""Rule-based file classifier using pattern matching"""
def __init__(self):
"""Initialize rule-based classifier"""
self.rules: list[ClassificationRule] = []
self._load_default_rules()
def _load_default_rules(self):
"""Load default classification rules based on ARCHITECTURE.md"""
# Build artifacts and caches
self.add_rule(ClassificationRule(
name="maven_cache",
category="artifacts/java/maven",
patterns=["**/.m2/**", "**/.maven/**", "**/maven-central-cache/**"],
priority=10,
description="Maven repository and cache"
))
self.add_rule(ClassificationRule(
name="gradle_cache",
category="artifacts/java/gradle",
patterns=["**/.gradle/**", "**/gradle-cache/**", "**/gradle-build-cache/**"],
priority=10,
description="Gradle cache and artifacts"
))
self.add_rule(ClassificationRule(
name="python_cache",
category="cache/pycache",
patterns=["**/__pycache__/**", "**/*.pyc", "**/*.pyo"],
priority=10,
description="Python cache files"
))
self.add_rule(ClassificationRule(
name="python_artifacts",
category="artifacts/python",
patterns=["**/pip-cache/**", "**/pypi-cache/**", "**/wheelhouse/**"],
priority=10,
description="Python package artifacts"
))
self.add_rule(ClassificationRule(
name="node_modules",
category="cache/node_modules-archive",
patterns=["**/node_modules/**"],
priority=10,
description="Node.js modules"
))
self.add_rule(ClassificationRule(
name="node_cache",
category="artifacts/node",
patterns=["**/.npm/**", "**/npm-registry/**", "**/yarn-cache/**", "**/pnpm-store/**"],
priority=10,
description="Node.js package managers cache"
))
self.add_rule(ClassificationRule(
name="go_cache",
category="artifacts/go",
patterns=["**/goproxy-cache/**", "**/go/pkg/mod/**", "**/go-module-cache/**"],
priority=10,
description="Go module cache"
))
# Version control
self.add_rule(ClassificationRule(
name="git_repos",
category="development/git-infrastructure",
patterns=["**/.git/**", "**/gitea/repositories/**"],
priority=15,
description="Git repositories and infrastructure"
))
self.add_rule(ClassificationRule(
name="gitea",
category="development/gitea",
patterns=["**/gitea/**"],
priority=12,
description="Gitea server data"
))
# Databases
self.add_rule(ClassificationRule(
name="postgresql",
category="databases/postgresql",
patterns=["**/postgresql/**", "**/postgres/**", "**/*.sql"],
priority=10,
description="PostgreSQL databases"
))
self.add_rule(ClassificationRule(
name="mysql",
category="databases/mysql",
patterns=["**/mysql/**", "**/mariadb/**"],
priority=10,
description="MySQL/MariaDB databases"
))
self.add_rule(ClassificationRule(
name="mongodb",
category="databases/mongodb",
patterns=["**/mongodb/**", "**/mongo/**"],
priority=10,
description="MongoDB databases"
))
self.add_rule(ClassificationRule(
name="redis",
category="databases/redis",
patterns=["**/redis/**", "**/*.rdb"],
priority=10,
description="Redis databases"
))
self.add_rule(ClassificationRule(
name="sqlite",
category="databases/sqlite",
patterns=["**/*.db", "**/*.sqlite", "**/*.sqlite3"],
priority=8,
description="SQLite databases"
))
# LLM and AI models
self.add_rule(ClassificationRule(
name="llm_models",
category="cache/llm-models",
patterns=[
"**/hugging-face/**",
"**/huggingface/**",
"**/.cache/huggingface/**",
"**/models/**/*.bin",
"**/models/**/*.onnx",
"**/models/**/*.safetensors",
"**/llm*/**",
"**/openai-cache/**"
],
priority=12,
description="LLM and AI model files"
))
# Docker and containers
self.add_rule(ClassificationRule(
name="docker_volumes",
category="apps/volumes/docker-volumes",
patterns=["**/docker/volumes/**", "**/var/lib/docker/volumes/**"],
priority=10,
description="Docker volumes"
))
self.add_rule(ClassificationRule(
name="app_data",
category="apps/volumes/app-data",
patterns=["**/app-data/**", "**/application-data/**"],
priority=8,
description="Application data"
))
# Build outputs
self.add_rule(ClassificationRule(
name="build_output",
category="development/build-tools",
patterns=["**/target/**", "**/build/**", "**/dist/**", "**/out/**"],
priority=5,
description="Build output directories"
))
# Backups
self.add_rule(ClassificationRule(
name="system_backups",
category="backups/system",
patterns=["**/backup/**", "**/backups/**", "**/*.bak", "**/*.backup"],
priority=10,
description="System backups"
))
self.add_rule(ClassificationRule(
name="database_backups",
category="backups/database",
patterns=["**/*.sql.gz", "**/*.dump", "**/db-backup/**"],
priority=11,
description="Database backups"
))
# Archives
self.add_rule(ClassificationRule(
name="archives",
category="backups/archive",
patterns=["**/*.tar", "**/*.tar.gz", "**/*.tgz", "**/*.zip", "**/*.7z"],
priority=5,
description="Archive files"
))
def add_rule(self, rule: ClassificationRule) -> None:
"""Add a classification rule
Args:
rule: Rule to add
"""
self.rules.append(rule)
# Sort rules by priority (higher priority first)
self.rules.sort(key=lambda r: r.priority, reverse=True)
def remove_rule(self, rule_name: str) -> None:
"""Remove a rule by name
Args:
rule_name: Name of rule to remove
"""
self.rules = [r for r in self.rules if r.name != rule_name]
def match_path(self, path: Path) -> Optional[str]:
"""Match path against rules
Args:
path: Path to match
Returns:
Category name or None if no match
"""
path_str = str(path)
# Try to match each rule in priority order
for rule in self.rules:
for pattern in rule.patterns:
if fnmatch.fnmatch(path_str, pattern):
return rule.category
return None
def classify(self, path: Path, file_type: Optional[str] = None) -> Optional[str]:
"""Classify a file path
Args:
path: Path to classify
file_type: Optional file type hint
Returns:
Category name or None if no match
"""
return self.match_path(path)
def get_category_rules(self, category: str) -> list[ClassificationRule]:
"""Get all rules for a category
Args:
category: Category name
Returns:
List of rules for the category
"""
return [r for r in self.rules if r.category == category]
def get_all_categories(self) -> set[str]:
"""Get all defined categories
Returns:
Set of category names
"""
return {r.category for r in self.rules}
def get_rules_by_priority(self, min_priority: int = 0) -> list[ClassificationRule]:
"""Get rules above a minimum priority
Args:
min_priority: Minimum priority threshold
Returns:
List of rules with priority >= min_priority
"""
return [r for r in self.rules if r.priority >= min_priority]

View File

@@ -0,0 +1,21 @@
"""Deduplication package exports"""
from .chunker import (
RabinChunker,
SimpleChunker,
hash_chunk,
hash_file,
compute_file_signature
)
from .store import HashStore, MemoryHashStore
from .engine import DeduplicationEngine
__all__ = [
'RabinChunker',
'SimpleChunker',
'hash_chunk',
'hash_file',
'compute_file_signature',
'HashStore',
'MemoryHashStore',
'DeduplicationEngine',
]

View File

View File

@@ -0,0 +1,241 @@
"""Rabin fingerprint chunker for content-defined chunking"""
import hashlib
from pathlib import Path
from typing import Iterator, Optional
class RabinChunker:
"""Content-defined chunking using Rabin fingerprinting
Uses a rolling hash to identify chunk boundaries based on content,
allowing for efficient deduplication even when data is modified.
"""
def __init__(
self,
avg_chunk_size: int = 8192,
min_chunk_size: Optional[int] = None,
max_chunk_size: Optional[int] = None,
window_size: int = 48
):
"""Initialize Rabin chunker
Args:
avg_chunk_size: Target average chunk size in bytes
min_chunk_size: Minimum chunk size (default: avg_chunk_size // 4)
max_chunk_size: Maximum chunk size (default: avg_chunk_size * 8)
window_size: Rolling hash window size
"""
self.avg_chunk_size = avg_chunk_size
self.min_chunk_size = min_chunk_size or (avg_chunk_size // 4)
self.max_chunk_size = max_chunk_size or (avg_chunk_size * 8)
self.window_size = window_size
# Calculate mask for boundary detection
# For avg_chunk_size, we want boundaries at 1/avg_chunk_size probability
bits = 0
size = avg_chunk_size
while size > 1:
bits += 1
size >>= 1
self.mask = (1 << bits) - 1
# Polynomial for rolling hash (prime number)
self.poly = 0x3DA3358B4DC173
def chunk_file(self, file_path: Path, chunk_size: Optional[int] = None) -> Iterator[bytes]:
"""Chunk a file using Rabin fingerprinting
Args:
file_path: Path to file to chunk
chunk_size: If provided, use fixed-size chunking instead
Yields:
Chunk data as bytes
"""
if chunk_size:
# Use fixed-size chunking
yield from self._chunk_fixed(file_path, chunk_size)
else:
# Use content-defined chunking
yield from self._chunk_rabin(file_path)
def _chunk_fixed(self, file_path: Path, chunk_size: int) -> Iterator[bytes]:
"""Fixed-size chunking
Args:
file_path: Path to file
chunk_size: Chunk size in bytes
Yields:
Fixed-size chunks
"""
with open(file_path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
def _chunk_rabin(self, file_path: Path) -> Iterator[bytes]:
"""Content-defined chunking using Rabin fingerprinting
Args:
file_path: Path to file
Yields:
Variable-size chunks based on content
"""
with open(file_path, 'rb') as f:
chunk_data = bytearray()
window = bytearray()
hash_value = 0
while True:
byte = f.read(1)
if not byte:
# End of file - yield remaining data
if chunk_data:
yield bytes(chunk_data)
break
chunk_data.extend(byte)
window.extend(byte)
# Maintain window size
if len(window) > self.window_size:
window.pop(0)
# Update rolling hash
hash_value = self._rolling_hash(window)
# Check if we should create a boundary
should_break = (
len(chunk_data) >= self.min_chunk_size and
(
(hash_value & self.mask) == 0 or
len(chunk_data) >= self.max_chunk_size
)
)
if should_break:
yield bytes(chunk_data)
chunk_data = bytearray()
window = bytearray()
hash_value = 0
def _rolling_hash(self, window: bytearray) -> int:
"""Calculate rolling hash for window
Args:
window: Byte window
Returns:
Hash value
"""
hash_value = 0
for byte in window:
hash_value = ((hash_value << 1) + byte) & 0xFFFFFFFFFFFFFFFF
return hash_value
class SimpleChunker:
"""Simple fixed-size chunker for comparison"""
def __init__(self, chunk_size: int = 8192):
"""Initialize simple chunker
Args:
chunk_size: Fixed chunk size in bytes
"""
self.chunk_size = chunk_size
def chunk_file(self, file_path: Path) -> Iterator[bytes]:
"""Chunk file into fixed-size pieces
Args:
file_path: Path to file
Yields:
Fixed-size chunks
"""
with open(file_path, 'rb') as f:
while True:
chunk = f.read(self.chunk_size)
if not chunk:
break
yield chunk
def hash_chunk(chunk: bytes, algorithm: str = 'sha256') -> str:
"""Hash a chunk of data
Args:
chunk: Chunk data
algorithm: Hash algorithm (default: sha256)
Returns:
Hex digest of hash
"""
hasher = hashlib.new(algorithm)
hasher.update(chunk)
return hasher.hexdigest()
def hash_file(file_path: Path, algorithm: str = 'sha256', chunk_size: int = 65536) -> str:
"""Hash entire file
Args:
file_path: Path to file
algorithm: Hash algorithm (default: sha256)
chunk_size: Size of chunks to read
Returns:
Hex digest of file hash
"""
hasher = hashlib.new(algorithm)
with open(file_path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
hasher.update(chunk)
return hasher.hexdigest()
def compute_file_signature(
file_path: Path,
use_rabin: bool = True,
avg_chunk_size: int = 8192
) -> tuple[str, list[str]]:
"""Compute file signature with chunk hashes
Args:
file_path: Path to file
use_rabin: Whether to use Rabin chunking (vs fixed-size)
avg_chunk_size: Average chunk size for Rabin or fixed size
Returns:
Tuple of (file_hash, list of chunk hashes)
"""
if use_rabin:
chunker = RabinChunker(avg_chunk_size=avg_chunk_size)
else:
chunker = SimpleChunker(chunk_size=avg_chunk_size)
chunk_hashes = []
file_hasher = hashlib.sha256()
for chunk in chunker.chunk_file(file_path):
# Hash individual chunk
chunk_hash = hash_chunk(chunk)
chunk_hashes.append(chunk_hash)
# Update file hash
file_hasher.update(chunk)
file_hash = file_hasher.hexdigest()
return file_hash, chunk_hashes

353
src/deduplication/engine.py Normal file
View File

@@ -0,0 +1,353 @@
"""Deduplication engine"""
from pathlib import Path
from typing import Optional, Callable
from concurrent.futures import ThreadPoolExecutor, as_completed
import psycopg2
from .chunker import compute_file_signature, hash_file
from .store import HashStore
from ..shared.models import FileRecord, ProcessingStats
from ..shared.config import DatabaseConfig, ProcessingConfig
from ..shared.logger import ProgressLogger
class DeduplicationEngine:
"""Engine for deduplicating files"""
def __init__(
self,
db_config: DatabaseConfig,
processing_config: ProcessingConfig,
logger: ProgressLogger
):
"""Initialize deduplication engine
Args:
db_config: Database configuration
processing_config: Processing configuration
logger: Progress logger
"""
self.db_config = db_config
self.processing_config = processing_config
self.logger = logger
self.hash_store = HashStore(db_config)
self._connection = None
def _get_connection(self):
"""Get or create database connection"""
if self._connection is None or self._connection.closed:
self._connection = psycopg2.connect(
host=self.db_config.host,
port=self.db_config.port,
database=self.db_config.database,
user=self.db_config.user,
password=self.db_config.password
)
return self._connection
def deduplicate_all(
self,
disk: Optional[str] = None,
use_chunks: bool = True,
progress_callback: Optional[Callable[[int, int, ProcessingStats], None]] = None
) -> ProcessingStats:
"""Deduplicate all files in database
Args:
disk: Optional disk filter
use_chunks: Whether to use chunk-level deduplication
progress_callback: Optional callback for progress updates
Returns:
ProcessingStats with deduplication statistics
"""
self.logger.section("Starting Deduplication")
conn = self._get_connection()
cursor = conn.cursor()
# Get files without checksums
if disk:
cursor.execute("""
SELECT path, size
FROM files
WHERE disk = %s AND checksum IS NULL
ORDER BY size DESC
""", (disk,))
else:
cursor.execute("""
SELECT path, size
FROM files
WHERE checksum IS NULL
ORDER BY size DESC
""")
files_to_process = cursor.fetchall()
total_files = len(files_to_process)
self.logger.info(f"Found {total_files} files to process")
stats = ProcessingStats()
# Process files with thread pool
with ThreadPoolExecutor(max_workers=self.processing_config.parallel_workers) as executor:
futures = {}
for path_str, size in files_to_process:
path = Path(path_str)
future = executor.submit(self._process_file, path, use_chunks)
futures[future] = (path, size)
# Process completed futures
for future in as_completed(futures):
path, size = futures[future]
try:
checksum, duplicate_of = future.result()
if checksum:
# Update database
cursor.execute("""
UPDATE files
SET checksum = %s, duplicate_of = %s
WHERE path = %s
""", (checksum, duplicate_of, str(path)))
stats.files_succeeded += 1
stats.bytes_processed += size
stats.files_processed += 1
# Commit periodically
if stats.files_processed % self.processing_config.commit_interval == 0:
conn.commit()
# Progress callback
if progress_callback:
progress_callback(stats.files_processed, total_files, stats)
# Log progress
self.logger.progress(
stats.files_processed,
total_files,
prefix="Files processed",
bytes_processed=stats.bytes_processed,
elapsed_seconds=stats.elapsed_seconds
)
except Exception as e:
self.logger.warning(f"Failed to process {path}: {e}")
stats.files_failed += 1
stats.files_processed += 1
# Final commit
conn.commit()
cursor.close()
self.logger.info(
f"Deduplication complete: {stats.files_succeeded}/{total_files} files, "
f"{stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s"
)
return stats
def _process_file(
self,
path: Path,
use_chunks: bool
) -> tuple[Optional[str], Optional[str]]:
"""Process a single file for deduplication
Args:
path: Path to file
use_chunks: Whether to use chunk-level deduplication
Returns:
Tuple of (checksum, duplicate_of_path)
"""
if not path.exists():
return None, None
try:
if use_chunks:
# Compute file signature with chunks
checksum, chunk_hashes = compute_file_signature(
path,
use_rabin=True,
avg_chunk_size=self.processing_config.chunk_size
)
else:
# Just compute file hash
checksum = hash_file(
path,
algorithm=self.processing_config.hash_algorithm
)
chunk_hashes = None
# Check if hash exists
if self.hash_store.exists(checksum):
# Duplicate found
canonical_path = self.hash_store.get_canonical(checksum)
return checksum, canonical_path
else:
# New unique file
size = path.stat().st_size
self.hash_store.store_canonical(
checksum,
path,
size,
chunk_hashes
)
return checksum, None
except Exception as e:
self.logger.debug(f"Error processing {path}: {e}")
raise
def find_duplicates(
self,
disk: Optional[str] = None
) -> dict[str, list[str]]:
"""Find all duplicate files
Args:
disk: Optional disk filter
Returns:
Dictionary mapping canonical path to list of duplicate paths
"""
self.logger.subsection("Finding Duplicates")
conn = self._get_connection()
cursor = conn.cursor()
# Query for duplicates
if disk:
cursor.execute("""
SELECT checksum, array_agg(path ORDER BY path) as paths
FROM files
WHERE disk = %s AND checksum IS NOT NULL
GROUP BY checksum
HAVING COUNT(*) > 1
""", (disk,))
else:
cursor.execute("""
SELECT checksum, array_agg(path ORDER BY path) as paths
FROM files
WHERE checksum IS NOT NULL
GROUP BY checksum
HAVING COUNT(*) > 1
""")
duplicates = {}
for checksum, paths in cursor.fetchall():
canonical = paths[0]
duplicates[canonical] = paths[1:]
cursor.close()
self.logger.info(f"Found {len(duplicates)} sets of duplicates")
return duplicates
def get_deduplication_stats(self) -> dict:
"""Get deduplication statistics
Returns:
Dictionary with statistics
"""
conn = self._get_connection()
cursor = conn.cursor()
stats = {}
# Total files
cursor.execute("SELECT COUNT(*) FROM files WHERE checksum IS NOT NULL")
stats['total_files'] = cursor.fetchone()[0]
# Unique files
cursor.execute("SELECT COUNT(DISTINCT checksum) FROM files WHERE checksum IS NOT NULL")
stats['unique_files'] = cursor.fetchone()[0]
# Duplicate files
stats['duplicate_files'] = stats['total_files'] - stats['unique_files']
# Total size
cursor.execute("SELECT COALESCE(SUM(size), 0) FROM files WHERE checksum IS NOT NULL")
stats['total_size'] = cursor.fetchone()[0]
# Unique size
cursor.execute("""
SELECT COALESCE(SUM(size), 0)
FROM (
SELECT DISTINCT ON (checksum) size
FROM files
WHERE checksum IS NOT NULL
) AS unique_files
""")
stats['unique_size'] = cursor.fetchone()[0]
# Wasted space
stats['wasted_space'] = stats['total_size'] - stats['unique_size']
# Deduplication ratio
if stats['total_size'] > 0:
stats['dedup_ratio'] = stats['unique_size'] / stats['total_size']
else:
stats['dedup_ratio'] = 1.0
# Space saved percentage
if stats['total_size'] > 0:
stats['space_saved_percent'] = (stats['wasted_space'] / stats['total_size']) * 100
else:
stats['space_saved_percent'] = 0.0
cursor.close()
return stats
def mark_canonical_files(self) -> int:
"""Mark canonical (first occurrence) files in database
Returns:
Number of canonical files marked
"""
self.logger.subsection("Marking Canonical Files")
conn = self._get_connection()
cursor = conn.cursor()
# Find first occurrence of each checksum and mark as canonical
cursor.execute("""
WITH canonical AS (
SELECT DISTINCT ON (checksum) path, checksum
FROM files
WHERE checksum IS NOT NULL
ORDER BY checksum, path
)
UPDATE files
SET duplicate_of = NULL
WHERE path IN (SELECT path FROM canonical)
""")
count = cursor.rowcount
conn.commit()
cursor.close()
self.logger.info(f"Marked {count} canonical files")
return count
def close(self):
"""Close connections"""
self.hash_store.close()
if self._connection and not self._connection.closed:
self._connection.close()
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.close()

412
src/deduplication/store.py Normal file
View File

@@ -0,0 +1,412 @@
"""Hash store for deduplication with optional Redis support"""
from typing import Optional, Dict, Set
from pathlib import Path
import psycopg2
from psycopg2.extras import execute_batch
from ..shared.config import DatabaseConfig
class HashStore:
"""PostgreSQL-based hash store for deduplication"""
def __init__(self, db_config: DatabaseConfig):
"""Initialize hash store
Args:
db_config: Database configuration
"""
self.db_config = db_config
self._connection = None
def _get_connection(self):
"""Get or create database connection"""
if self._connection is None or self._connection.closed:
self._connection = psycopg2.connect(
host=self.db_config.host,
port=self.db_config.port,
database=self.db_config.database,
user=self.db_config.user,
password=self.db_config.password
)
return self._connection
def _ensure_tables(self):
"""Ensure hash store tables exist"""
conn = self._get_connection()
cursor = conn.cursor()
# Create hashes table for file-level deduplication
cursor.execute("""
CREATE TABLE IF NOT EXISTS file_hashes (
checksum TEXT PRIMARY KEY,
canonical_path TEXT NOT NULL,
size BIGINT NOT NULL,
first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ref_count INTEGER DEFAULT 1
)
""")
# Create chunk hashes table for chunk-level deduplication
cursor.execute("""
CREATE TABLE IF NOT EXISTS chunk_hashes (
chunk_hash TEXT PRIMARY KEY,
size INTEGER NOT NULL,
first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ref_count INTEGER DEFAULT 1
)
""")
# Create file-chunk mapping table
cursor.execute("""
CREATE TABLE IF NOT EXISTS file_chunks (
id SERIAL PRIMARY KEY,
file_checksum TEXT NOT NULL,
chunk_hash TEXT NOT NULL,
chunk_index INTEGER NOT NULL,
FOREIGN KEY (file_checksum) REFERENCES file_hashes(checksum),
FOREIGN KEY (chunk_hash) REFERENCES chunk_hashes(chunk_hash),
UNIQUE (file_checksum, chunk_index)
)
""")
# Create indexes
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_file_chunks_file
ON file_chunks(file_checksum)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_file_chunks_chunk
ON file_chunks(chunk_hash)
""")
conn.commit()
cursor.close()
def exists(self, checksum: str) -> bool:
"""Check if hash exists in store
Args:
checksum: File hash to check
Returns:
True if hash exists
"""
self._ensure_tables()
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute(
"SELECT 1 FROM file_hashes WHERE checksum = %s LIMIT 1",
(checksum,)
)
exists = cursor.fetchone() is not None
cursor.close()
return exists
def get_canonical(self, checksum: str) -> Optional[str]:
"""Get canonical path for a hash
Args:
checksum: File hash
Returns:
Canonical file path or None if not found
"""
self._ensure_tables()
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute(
"SELECT canonical_path FROM file_hashes WHERE checksum = %s",
(checksum,)
)
result = cursor.fetchone()
cursor.close()
return result[0] if result else None
def store_canonical(
self,
checksum: str,
path: Path,
size: int,
chunk_hashes: Optional[list[str]] = None
) -> None:
"""Store canonical reference for a hash
Args:
checksum: File hash
path: Canonical file path
size: File size in bytes
chunk_hashes: Optional list of chunk hashes
"""
self._ensure_tables()
conn = self._get_connection()
cursor = conn.cursor()
try:
# Store file hash
cursor.execute("""
INSERT INTO file_hashes (checksum, canonical_path, size)
VALUES (%s, %s, %s)
ON CONFLICT (checksum) DO UPDATE SET
ref_count = file_hashes.ref_count + 1
""", (checksum, str(path), size))
# Store chunk hashes if provided
if chunk_hashes:
# Insert chunk hashes
chunk_data = [(chunk_hash, 0) for chunk_hash in chunk_hashes]
execute_batch(cursor, """
INSERT INTO chunk_hashes (chunk_hash, size)
VALUES (%s, %s)
ON CONFLICT (chunk_hash) DO UPDATE SET
ref_count = chunk_hashes.ref_count + 1
""", chunk_data, page_size=1000)
# Create file-chunk mappings
mapping_data = [
(checksum, chunk_hash, idx)
for idx, chunk_hash in enumerate(chunk_hashes)
]
execute_batch(cursor, """
INSERT INTO file_chunks (file_checksum, chunk_hash, chunk_index)
VALUES (%s, %s, %s)
ON CONFLICT (file_checksum, chunk_index) DO NOTHING
""", mapping_data, page_size=1000)
conn.commit()
except Exception as e:
conn.rollback()
raise
finally:
cursor.close()
def get_chunk_hashes(self, checksum: str) -> list[str]:
"""Get chunk hashes for a file
Args:
checksum: File hash
Returns:
List of chunk hashes in order
"""
self._ensure_tables()
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT chunk_hash
FROM file_chunks
WHERE file_checksum = %s
ORDER BY chunk_index
""", (checksum,))
chunk_hashes = [row[0] for row in cursor.fetchall()]
cursor.close()
return chunk_hashes
def get_duplicates(self) -> Dict[str, list[str]]:
"""Get all duplicate file groups
Returns:
Dictionary mapping canonical path to list of duplicate paths
"""
self._ensure_tables()
conn = self._get_connection()
cursor = conn.cursor()
# Get all files with their hashes
cursor.execute("""
SELECT f.path, f.checksum
FROM files f
WHERE f.checksum IS NOT NULL
""")
# Group by checksum
hash_to_paths: Dict[str, list[str]] = {}
for path, checksum in cursor.fetchall():
if checksum not in hash_to_paths:
hash_to_paths[checksum] = []
hash_to_paths[checksum].append(path)
cursor.close()
# Filter to only duplicates (more than one file)
duplicates = {
paths[0]: paths[1:]
for checksum, paths in hash_to_paths.items()
if len(paths) > 1
}
return duplicates
def get_stats(self) -> Dict[str, int]:
"""Get hash store statistics
Returns:
Dictionary with statistics
"""
self._ensure_tables()
conn = self._get_connection()
cursor = conn.cursor()
stats = {}
# Count unique file hashes
cursor.execute("SELECT COUNT(*) FROM file_hashes")
stats['unique_files'] = cursor.fetchone()[0]
# Count unique chunk hashes
cursor.execute("SELECT COUNT(*) FROM chunk_hashes")
stats['unique_chunks'] = cursor.fetchone()[0]
# Count total references
cursor.execute("SELECT COALESCE(SUM(ref_count), 0) FROM file_hashes")
stats['total_file_refs'] = cursor.fetchone()[0]
# Count total chunk references
cursor.execute("SELECT COALESCE(SUM(ref_count), 0) FROM chunk_hashes")
stats['total_chunk_refs'] = cursor.fetchone()[0]
# Calculate deduplication ratio
if stats['total_file_refs'] > 0:
stats['dedup_ratio'] = stats['unique_files'] / stats['total_file_refs']
else:
stats['dedup_ratio'] = 1.0
cursor.close()
return stats
def find_similar_files(self, checksum: str, threshold: float = 0.8) -> list[tuple[str, float]]:
"""Find files similar to given hash based on chunk overlap
Args:
checksum: File hash to compare
threshold: Similarity threshold (0.0 to 1.0)
Returns:
List of tuples (other_checksum, similarity_score)
"""
self._ensure_tables()
conn = self._get_connection()
cursor = conn.cursor()
# Get chunks for the target file
target_chunks = set(self.get_chunk_hashes(checksum))
if not target_chunks:
cursor.close()
return []
# Find files sharing chunks
cursor.execute("""
SELECT DISTINCT fc.file_checksum
FROM file_chunks fc
WHERE fc.chunk_hash = ANY(%s)
AND fc.file_checksum != %s
""", (list(target_chunks), checksum))
similar_files = []
for (other_checksum,) in cursor.fetchall():
other_chunks = set(self.get_chunk_hashes(other_checksum))
# Calculate Jaccard similarity
intersection = len(target_chunks & other_chunks)
union = len(target_chunks | other_chunks)
if union > 0:
similarity = intersection / union
if similarity >= threshold:
similar_files.append((other_checksum, similarity))
cursor.close()
# Sort by similarity descending
similar_files.sort(key=lambda x: x[1], reverse=True)
return similar_files
def close(self):
"""Close database connection"""
if self._connection and not self._connection.closed:
self._connection.close()
def __enter__(self):
"""Context manager entry"""
self._ensure_tables()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.close()
class MemoryHashStore:
"""In-memory hash store for testing and small datasets"""
def __init__(self):
"""Initialize in-memory hash store"""
self.hashes: Dict[str, tuple[str, int]] = {}
self.chunks: Dict[str, int] = {}
self.file_chunks: Dict[str, list[str]] = {}
def exists(self, checksum: str) -> bool:
"""Check if hash exists"""
return checksum in self.hashes
def get_canonical(self, checksum: str) -> Optional[str]:
"""Get canonical path"""
return self.hashes.get(checksum, (None, 0))[0]
def store_canonical(
self,
checksum: str,
path: Path,
size: int,
chunk_hashes: Optional[list[str]] = None
) -> None:
"""Store canonical reference"""
self.hashes[checksum] = (str(path), size)
if chunk_hashes:
self.file_chunks[checksum] = chunk_hashes
for chunk_hash in chunk_hashes:
self.chunks[chunk_hash] = self.chunks.get(chunk_hash, 0) + 1
def get_chunk_hashes(self, checksum: str) -> list[str]:
"""Get chunk hashes"""
return self.file_chunks.get(checksum, [])
def get_stats(self) -> Dict[str, int]:
"""Get statistics"""
return {
'unique_files': len(self.hashes),
'unique_chunks': len(self.chunks),
'total_file_refs': len(self.hashes),
'total_chunk_refs': sum(self.chunks.values()),
'dedup_ratio': 1.0
}
def close(self):
"""No-op for compatibility"""
pass
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
pass

17
src/discovery/__init__.py Normal file
View File

@@ -0,0 +1,17 @@
"""Discovery package exports"""
from .scanner import FileScanner, FilteredScanner
from .system import SystemAPI
from .engine import DiscoveryEngine
from ._protocols import FileMeta, MountInfo, DiskInfo, IFileScanner, ISystemAPI
__all__ = [
'FileScanner',
'FilteredScanner',
'SystemAPI',
'DiscoveryEngine',
'FileMeta',
'MountInfo',
'DiskInfo',
'IFileScanner',
'ISystemAPI',
]

View File

@@ -0,0 +1,54 @@
"""Protocol definitions for the discovery package"""
from typing import Iterator, Protocol, Any
from pathlib import Path
from dataclasses import dataclass
@dataclass
class FileMeta:
"""Metadata for a discovered file"""
path: Path
size: int
modified_time: float
created_time: float
# Add other metadata fields as needed
@dataclass
class MountInfo:
"""Information about a mounted filesystem"""
device: str
mount_point: str
fs_type: str
options: str
# Add other mount info fields as needed
@dataclass
class DiskInfo:
"""Information about a disk/NVMe device"""
device: str
model: str
size: int
serial: str
# Add other disk info fields as needed
class IFileScanner(Protocol):
"""Protocol for file scanning operations"""
def scan(self, root: Path) -> Iterator[FileMeta]:
"""Scan a directory tree and yield file metadata"""
...
class ISystemAPI(Protocol):
"""Protocol for system information queries"""
def query_mounts(self) -> list[MountInfo]:
"""Query mounted filesystems"""
...
def query_nvmes(self) -> list[DiskInfo]:
"""Query NVMe/disk information"""
...

321
src/discovery/engine.py Normal file
View File

@@ -0,0 +1,321 @@
"""Discovery engine coordinating scanner and system APIs"""
from pathlib import Path
from typing import Optional, Callable
from datetime import datetime
import psycopg2
from psycopg2.extras import execute_batch
from .scanner import FileScanner
from .system import SystemAPI
from ._protocols import FileMeta
from ..shared.models import FileRecord, DiskInfo, ProcessingStats
from ..shared.config import DatabaseConfig
from ..shared.logger import ProgressLogger
class DiscoveryEngine:
"""Discovery engine for scanning and cataloging files"""
def __init__(
self,
db_config: DatabaseConfig,
logger: ProgressLogger,
batch_size: int = 1000
):
"""Initialize discovery engine
Args:
db_config: Database configuration
logger: Progress logger
batch_size: Number of records to batch before database commit
"""
self.db_config = db_config
self.logger = logger
self.batch_size = batch_size
self.system_api = SystemAPI()
self._connection = None
def _get_connection(self):
"""Get or create database connection"""
if self._connection is None or self._connection.closed:
self._connection = psycopg2.connect(
host=self.db_config.host,
port=self.db_config.port,
database=self.db_config.database,
user=self.db_config.user,
password=self.db_config.password
)
return self._connection
def _ensure_tables(self):
"""Ensure database tables exist"""
conn = self._get_connection()
cursor = conn.cursor()
# Create files table
cursor.execute("""
CREATE TABLE IF NOT EXISTS files (
id SERIAL PRIMARY KEY,
path TEXT NOT NULL UNIQUE,
size BIGINT NOT NULL,
modified_time DOUBLE PRECISION NOT NULL,
created_time DOUBLE PRECISION NOT NULL,
disk TEXT NOT NULL,
checksum TEXT,
status TEXT DEFAULT 'indexed',
category TEXT,
duplicate_of TEXT,
discovered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Create index on path
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_files_path ON files(path)
""")
# Create index on disk
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_files_disk ON files(disk)
""")
# Create index on checksum
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_files_checksum ON files(checksum)
""")
conn.commit()
cursor.close()
def discover_path(
self,
root: Path,
scanner: Optional[FileScanner] = None,
progress_callback: Optional[Callable[[int, int, ProcessingStats], None]] = None
) -> ProcessingStats:
"""Discover and catalog files in a path
Args:
root: Root path to discover
scanner: Optional custom scanner (default: FileScanner())
progress_callback: Optional callback for progress updates
Returns:
ProcessingStats with discovery statistics
"""
self.logger.section(f"Discovering: {root}")
# Ensure tables exist
self._ensure_tables()
# Create scanner if not provided
if scanner is None:
scanner = FileScanner(
error_handler=lambda e, p: self.logger.warning(f"Error scanning {p}: {e}")
)
# Get disk info for the root path
disk = self.system_api.get_disk_for_path(root)
if disk is None:
disk = str(root)
# Initialize statistics
stats = ProcessingStats()
batch = []
conn = self._get_connection()
cursor = conn.cursor()
try:
# Scan files
for file_meta in scanner.scan(root):
# Create file record
record = FileRecord(
path=file_meta.path,
size=file_meta.size,
modified_time=file_meta.modified_time,
created_time=file_meta.created_time,
disk=disk
)
batch.append(record)
stats.files_processed += 1
stats.bytes_processed += record.size
# Batch insert
if len(batch) >= self.batch_size:
self._insert_batch(cursor, batch)
conn.commit()
batch.clear()
# Progress callback
if progress_callback:
progress_callback(stats.files_processed, 0, stats)
# Log progress
if stats.files_processed % (self.batch_size * 10) == 0:
self.logger.progress(
stats.files_processed,
stats.files_processed, # We don't know total
prefix="Files discovered",
bytes_processed=stats.bytes_processed,
elapsed_seconds=stats.elapsed_seconds
)
# Insert remaining batch
if batch:
self._insert_batch(cursor, batch)
conn.commit()
stats.files_succeeded = stats.files_processed
except Exception as e:
conn.rollback()
self.logger.error(f"Discovery failed: {e}")
raise
finally:
cursor.close()
self.logger.info(
f"Discovery complete: {stats.files_processed} files, "
f"{stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s"
)
return stats
def _insert_batch(self, cursor, batch: list[FileRecord]):
"""Insert batch of file records
Args:
cursor: Database cursor
batch: List of FileRecord objects
"""
query = """
INSERT INTO files (path, size, modified_time, created_time, disk, checksum, status, category, duplicate_of)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (path) DO UPDATE SET
size = EXCLUDED.size,
modified_time = EXCLUDED.modified_time,
updated_at = CURRENT_TIMESTAMP
"""
data = [
(
str(record.path),
record.size,
record.modified_time,
record.created_time,
record.disk,
record.checksum,
record.status,
record.category,
record.duplicate_of
)
for record in batch
]
execute_batch(cursor, query, data, page_size=self.batch_size)
def get_disk_info(self) -> list[DiskInfo]:
"""Get information about all disks
Returns:
List of DiskInfo objects
"""
self.logger.subsection("Querying disk information")
disks = []
for disk_info in self.system_api.query_nvmes():
# Get mount point if available
mount_point = None
fs_type = "unknown"
for mount in self.system_api.query_mounts():
if mount.device == disk_info.device:
mount_point = Path(mount.mount_point)
fs_type = mount.fs_type
break
if mount_point:
total, used, free = self.system_api.get_disk_usage(mount_point)
else:
total = disk_info.size
used = 0
free = disk_info.size
disk = DiskInfo(
name=disk_info.device,
device=disk_info.device,
mount_point=mount_point or Path("/"),
total_size=total,
used_size=used,
free_size=free,
fs_type=fs_type
)
disks.append(disk)
self.logger.info(
f" {disk.name}: {disk.usage_percent:.1f}% used "
f"({disk.used_size:,} / {disk.total_size:,} bytes)"
)
return disks
def get_file_count(self, disk: Optional[str] = None) -> int:
"""Get count of discovered files
Args:
disk: Optional disk filter
Returns:
Count of files
"""
conn = self._get_connection()
cursor = conn.cursor()
if disk:
cursor.execute("SELECT COUNT(*) FROM files WHERE disk = %s", (disk,))
else:
cursor.execute("SELECT COUNT(*) FROM files")
count = cursor.fetchone()[0]
cursor.close()
return count
def get_total_size(self, disk: Optional[str] = None) -> int:
"""Get total size of discovered files
Args:
disk: Optional disk filter
Returns:
Total size in bytes
"""
conn = self._get_connection()
cursor = conn.cursor()
if disk:
cursor.execute("SELECT COALESCE(SUM(size), 0) FROM files WHERE disk = %s", (disk,))
else:
cursor.execute("SELECT COALESCE(SUM(size), 0) FROM files")
total = cursor.fetchone()[0]
cursor.close()
return total
def close(self):
"""Close database connection"""
if self._connection and not self._connection.closed:
self._connection.close()
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.close()

216
src/discovery/scanner.py Normal file
View File

@@ -0,0 +1,216 @@
"""File system scanner implementing IFileScanner protocol"""
import os
from pathlib import Path
from typing import Iterator, Optional, Callable
from datetime import datetime
from ._protocols import FileMeta
class FileScanner:
"""File system scanner with filtering and error handling"""
def __init__(
self,
follow_symlinks: bool = False,
skip_hidden: bool = True,
error_handler: Optional[Callable[[Exception, Path], None]] = None
):
"""Initialize file scanner
Args:
follow_symlinks: Whether to follow symbolic links
skip_hidden: Whether to skip hidden files/directories
error_handler: Optional callback for handling errors during scan
"""
self.follow_symlinks = follow_symlinks
self.skip_hidden = skip_hidden
self.error_handler = error_handler
self._files_scanned = 0
self._bytes_scanned = 0
self._errors = 0
def scan(self, root: Path) -> Iterator[FileMeta]:
"""Scan a directory tree and yield file metadata
Args:
root: Root directory to scan
Yields:
FileMeta objects for each discovered file
"""
if not root.exists():
error = FileNotFoundError(f"Path does not exist: {root}")
if self.error_handler:
self.error_handler(error, root)
else:
raise error
return
if not root.is_dir():
# If root is a file, just return its metadata
try:
yield self._get_file_meta(root)
except Exception as e:
self._errors += 1
if self.error_handler:
self.error_handler(e, root)
else:
raise
return
# Walk directory tree
for dirpath, dirnames, filenames in os.walk(root, followlinks=self.follow_symlinks):
current_dir = Path(dirpath)
# Filter directories if needed
if self.skip_hidden:
dirnames[:] = [d for d in dirnames if not d.startswith('.')]
# Process files
for filename in filenames:
if self.skip_hidden and filename.startswith('.'):
continue
file_path = current_dir / filename
try:
# Skip broken symlinks
if file_path.is_symlink() and not file_path.exists():
continue
meta = self._get_file_meta(file_path)
self._files_scanned += 1
self._bytes_scanned += meta.size
yield meta
except PermissionError as e:
self._errors += 1
if self.error_handler:
self.error_handler(e, file_path)
# Continue scanning
continue
except Exception as e:
self._errors += 1
if self.error_handler:
self.error_handler(e, file_path)
# Continue scanning
continue
def _get_file_meta(self, path: Path) -> FileMeta:
"""Get file metadata
Args:
path: Path to file
Returns:
FileMeta object with file metadata
Raises:
OSError: If file cannot be accessed
"""
stat = path.stat()
# Get creation time (platform dependent)
created_time = stat.st_ctime
if hasattr(stat, 'st_birthtime'):
created_time = stat.st_birthtime
return FileMeta(
path=path,
size=stat.st_size,
modified_time=stat.st_mtime,
created_time=created_time
)
@property
def files_scanned(self) -> int:
"""Get count of files scanned"""
return self._files_scanned
@property
def bytes_scanned(self) -> int:
"""Get total bytes scanned"""
return self._bytes_scanned
@property
def errors(self) -> int:
"""Get count of errors encountered"""
return self._errors
def reset_stats(self) -> None:
"""Reset scanning statistics"""
self._files_scanned = 0
self._bytes_scanned = 0
self._errors = 0
class FilteredScanner(FileScanner):
"""Scanner with additional filtering capabilities"""
def __init__(
self,
min_size: Optional[int] = None,
max_size: Optional[int] = None,
extensions: Optional[list[str]] = None,
exclude_patterns: Optional[list[str]] = None,
**kwargs
):
"""Initialize filtered scanner
Args:
min_size: Minimum file size in bytes
max_size: Maximum file size in bytes
extensions: List of file extensions to include (e.g., ['.txt', '.py'])
exclude_patterns: List of path patterns to exclude
**kwargs: Additional arguments passed to FileScanner
"""
super().__init__(**kwargs)
self.min_size = min_size
self.max_size = max_size
self.extensions = {ext.lower() for ext in extensions} if extensions else None
self.exclude_patterns = exclude_patterns or []
def scan(self, root: Path) -> Iterator[FileMeta]:
"""Scan with additional filtering
Args:
root: Root directory to scan
Yields:
FileMeta objects for files matching filter criteria
"""
for meta in super().scan(root):
# Size filtering
if self.min_size is not None and meta.size < self.min_size:
continue
if self.max_size is not None and meta.size > self.max_size:
continue
# Extension filtering
if self.extensions is not None:
if meta.path.suffix.lower() not in self.extensions:
continue
# Exclude pattern filtering
if self._should_exclude(meta.path):
continue
yield meta
def _should_exclude(self, path: Path) -> bool:
"""Check if path matches any exclude pattern
Args:
path: Path to check
Returns:
True if path should be excluded
"""
path_str = str(path)
for pattern in self.exclude_patterns:
if pattern in path_str:
return True
return False

236
src/discovery/system.py Normal file
View File

@@ -0,0 +1,236 @@
"""System API for querying mounts and disks"""
import os
import subprocess
from pathlib import Path
from typing import Optional
import psutil
from ._protocols import MountInfo, DiskInfo
class SystemAPI:
"""System information API for querying mounts and disks"""
def query_mounts(self) -> list[MountInfo]:
"""Query mounted filesystems
Returns:
List of MountInfo objects for all mounted filesystems
"""
mounts = []
for partition in psutil.disk_partitions(all=False):
mount_info = MountInfo(
device=partition.device,
mount_point=partition.mountpoint,
fs_type=partition.fstype,
options=partition.opts
)
mounts.append(mount_info)
return mounts
def query_nvmes(self) -> list[DiskInfo]:
"""Query NVMe/disk information
Returns:
List of DiskInfo objects for all disks
"""
disks = []
# Try to get disk information using lsblk
try:
result = subprocess.run(
['lsblk', '-ndo', 'NAME,MODEL,SIZE,SERIAL', '-b'],
capture_output=True,
text=True,
check=False
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if not line.strip():
continue
parts = line.split(maxsplit=3)
if len(parts) >= 3:
device = f"/dev/{parts[0]}"
model = parts[1] if len(parts) > 1 else "Unknown"
size_str = parts[2] if len(parts) > 2 else "0"
serial = parts[3] if len(parts) > 3 else "Unknown"
try:
size = int(size_str)
except ValueError:
size = 0
disk_info = DiskInfo(
device=device,
model=model,
size=size,
serial=serial
)
disks.append(disk_info)
except FileNotFoundError:
# lsblk not available, fall back to basic info
pass
# If lsblk failed or unavailable, try alternative method
if not disks:
disks = self._query_disks_fallback()
return disks
def _query_disks_fallback(self) -> list[DiskInfo]:
"""Fallback method for querying disk information
Returns:
List of DiskInfo objects using psutil
"""
disks = []
seen_devices = set()
for partition in psutil.disk_partitions(all=True):
device = partition.device
# Skip non-disk devices
if not device.startswith('/dev/'):
continue
# Get base device (e.g., /dev/sda from /dev/sda1)
base_device = self._get_base_device(device)
if base_device in seen_devices:
continue
seen_devices.add(base_device)
try:
usage = psutil.disk_usage(partition.mountpoint)
size = usage.total
except (PermissionError, OSError):
size = 0
disk_info = DiskInfo(
device=base_device,
model="Unknown",
size=size,
serial="Unknown"
)
disks.append(disk_info)
return disks
def _get_base_device(self, device: str) -> str:
"""Extract base device name from partition device
Args:
device: Device path (e.g., /dev/sda1, /dev/nvme0n1p1)
Returns:
Base device path (e.g., /dev/sda, /dev/nvme0n1)
"""
# Handle NVMe devices
if 'nvme' in device:
# /dev/nvme0n1p1 -> /dev/nvme0n1
if 'p' in device:
return device.rsplit('p', 1)[0]
return device
# Handle standard devices (sda, sdb, etc.)
# /dev/sda1 -> /dev/sda
import re
match = re.match(r'(/dev/[a-z]+)', device)
if match:
return match.group(1)
return device
def get_disk_for_path(self, path: Path) -> Optional[str]:
"""Get the disk/mount point for a given path
Args:
path: Path to check
Returns:
Mount point device or None if not found
"""
path = path.resolve()
# Find the mount point that contains this path
best_match = None
best_match_len = 0
for partition in psutil.disk_partitions():
mount_point = Path(partition.mountpoint)
try:
if path == mount_point or mount_point in path.parents:
mount_len = len(str(mount_point))
if mount_len > best_match_len:
best_match = partition.device
best_match_len = mount_len
except (ValueError, OSError):
continue
return best_match
def get_disk_usage(self, path: Path) -> tuple[int, int, int]:
"""Get disk usage for a path
Args:
path: Path to check
Returns:
Tuple of (total, used, free) in bytes
"""
try:
usage = psutil.disk_usage(str(path))
return usage.total, usage.used, usage.free
except (PermissionError, OSError):
return 0, 0, 0
def get_mount_point(self, path: Path) -> Optional[Path]:
"""Get the mount point for a given path
Args:
path: Path to check
Returns:
Mount point path or None if not found
"""
path = path.resolve()
# Find the mount point that contains this path
best_match = None
best_match_len = 0
for partition in psutil.disk_partitions():
mount_point = Path(partition.mountpoint)
try:
if path == mount_point or mount_point in path.parents:
mount_len = len(str(mount_point))
if mount_len > best_match_len:
best_match = mount_point
best_match_len = mount_len
except (ValueError, OSError):
continue
return best_match
def is_same_filesystem(self, path1: Path, path2: Path) -> bool:
"""Check if two paths are on the same filesystem
Args:
path1: First path
path2: Second path
Returns:
True if paths are on the same filesystem
"""
try:
stat1 = path1.stat()
stat2 = path2.stat()
return stat1.st_dev == stat2.st_dev
except (OSError, PermissionError):
return False

27
src/migration/__init__.py Normal file
View File

@@ -0,0 +1,27 @@
"""Migration package exports"""
from .copy import (
CopyMigrationStrategy,
FastCopyStrategy,
SafeCopyStrategy,
ReferenceCopyStrategy
)
from .hardlink import (
HardlinkMigrationStrategy,
SymlinkMigrationStrategy,
DedupHardlinkStrategy
)
from .engine import MigrationEngine
from ._protocols import IMigrationStrategy, IMigrationEngine
__all__ = [
'CopyMigrationStrategy',
'FastCopyStrategy',
'SafeCopyStrategy',
'ReferenceCopyStrategy',
'HardlinkMigrationStrategy',
'SymlinkMigrationStrategy',
'DedupHardlinkStrategy',
'MigrationEngine',
'IMigrationStrategy',
'IMigrationEngine',
]

107
src/migration/_protocols.py Normal file
View File

@@ -0,0 +1,107 @@
"""Protocol definitions for the migration package"""
from typing import Protocol
from pathlib import Path
from ..shared.models import OperationRecord
class IMigrationStrategy(Protocol):
"""Protocol for migration strategies"""
def migrate(
self,
source: Path,
destination: Path,
verify: bool = True
) -> bool:
"""Migrate a file from source to destination
Args:
source: Source file path
destination: Destination file path
verify: Whether to verify the operation
Returns:
True if migration successful
"""
...
def can_migrate(self, source: Path, destination: Path) -> bool:
"""Check if migration is possible
Args:
source: Source file path
destination: Destination file path
Returns:
True if migration is possible
"""
...
def estimate_time(self, source: Path) -> float:
"""Estimate migration time in seconds
Args:
source: Source file path
Returns:
Estimated time in seconds
"""
...
def cleanup(self, source: Path) -> bool:
"""Cleanup source file after successful migration
Args:
source: Source file path
Returns:
True if cleanup successful
"""
...
class IMigrationEngine(Protocol):
"""Protocol for migration engine"""
def plan_migration(
self,
disk: str,
target_base: Path
) -> list[OperationRecord]:
"""Plan migration for a disk
Args:
disk: Disk identifier
target_base: Target base directory
Returns:
List of planned operations
"""
...
def execute_migration(
self,
operations: list[OperationRecord],
dry_run: bool = False
) -> dict:
"""Execute migration operations
Args:
operations: List of operations to execute
dry_run: Whether to perform a dry run
Returns:
Dictionary with execution statistics
"""
...
def rollback(self, operation: OperationRecord) -> bool:
"""Rollback a migration operation
Args:
operation: Operation to rollback
Returns:
True if rollback successful
"""
...

268
src/migration/copy.py Normal file
View File

@@ -0,0 +1,268 @@
"""Copy-based migration strategy"""
import shutil
from pathlib import Path
from typing import Optional
import os
from ..shared.logger import ProgressLogger
class CopyMigrationStrategy:
"""Copy files to destination with verification"""
def __init__(
self,
logger: Optional[ProgressLogger] = None,
preserve_metadata: bool = True,
verify_checksums: bool = True
):
"""Initialize copy migration strategy
Args:
logger: Optional progress logger
preserve_metadata: Whether to preserve file metadata
verify_checksums: Whether to verify checksums after copy
"""
self.logger = logger
self.preserve_metadata = preserve_metadata
self.verify_checksums = verify_checksums
def migrate(
self,
source: Path,
destination: Path,
verify: bool = True
) -> bool:
"""Migrate file by copying
Args:
source: Source file path
destination: Destination file path
verify: Whether to verify the operation
Returns:
True if migration successful
"""
if not source.exists():
if self.logger:
self.logger.error(f"Source file does not exist: {source}")
return False
# Create destination directory
destination.parent.mkdir(parents=True, exist_ok=True)
try:
# Copy file
if self.preserve_metadata:
shutil.copy2(source, destination)
else:
shutil.copy(source, destination)
# Verify if requested
if verify and self.verify_checksums:
if not self._verify_copy(source, destination):
if self.logger:
self.logger.error(f"Verification failed: {source} -> {destination}")
destination.unlink()
return False
return True
except Exception as e:
if self.logger:
self.logger.error(f"Copy failed: {source} -> {destination}: {e}")
return False
def _verify_copy(self, source: Path, destination: Path) -> bool:
"""Verify copied file
Args:
source: Source file path
destination: Destination file path
Returns:
True if verification successful
"""
# Check size
source_size = source.stat().st_size
dest_size = destination.stat().st_size
if source_size != dest_size:
return False
# Compare checksums for files larger than 1MB
if source_size > 1024 * 1024:
from ..deduplication.chunker import hash_file
source_hash = hash_file(source)
dest_hash = hash_file(destination)
return source_hash == dest_hash
# For small files, compare content directly
with open(source, 'rb') as f1, open(destination, 'rb') as f2:
return f1.read() == f2.read()
def can_migrate(self, source: Path, destination: Path) -> bool:
"""Check if migration is possible
Args:
source: Source file path
destination: Destination file path
Returns:
True if migration is possible
"""
if not source.exists():
return False
# Check if destination directory is writable
dest_dir = destination.parent
if dest_dir.exists():
return os.access(dest_dir, os.W_OK)
# Check if parent directory exists and is writable
parent = dest_dir.parent
while not parent.exists() and parent != parent.parent:
parent = parent.parent
return parent.exists() and os.access(parent, os.W_OK)
def estimate_time(self, source: Path) -> float:
"""Estimate migration time in seconds
Args:
source: Source file path
Returns:
Estimated time in seconds
"""
if not source.exists():
return 0.0
size = source.stat().st_size
# Estimate based on typical copy speed (100 MB/s)
typical_speed = 100 * 1024 * 1024 # bytes per second
return size / typical_speed
def cleanup(self, source: Path) -> bool:
"""Cleanup source file after successful migration
Args:
source: Source file path
Returns:
True if cleanup successful
"""
try:
if source.exists():
source.unlink()
return True
except Exception as e:
if self.logger:
self.logger.warning(f"Failed to cleanup {source}: {e}")
return False
class FastCopyStrategy(CopyMigrationStrategy):
"""Fast copy strategy without verification"""
def __init__(self, logger: Optional[ProgressLogger] = None):
"""Initialize fast copy strategy"""
super().__init__(
logger=logger,
preserve_metadata=True,
verify_checksums=False
)
class SafeCopyStrategy(CopyMigrationStrategy):
"""Safe copy strategy with full verification"""
def __init__(self, logger: Optional[ProgressLogger] = None):
"""Initialize safe copy strategy"""
super().__init__(
logger=logger,
preserve_metadata=True,
verify_checksums=True
)
class ReferenceCopyStrategy:
"""Create reference copy using reflinks (CoW) if supported"""
def __init__(self, logger: Optional[ProgressLogger] = None):
"""Initialize reflink copy strategy"""
self.logger = logger
def migrate(
self,
source: Path,
destination: Path,
verify: bool = True
) -> bool:
"""Migrate using reflink (copy-on-write)
Args:
source: Source file path
destination: Destination file path
verify: Whether to verify the operation
Returns:
True if migration successful
"""
if not source.exists():
if self.logger:
self.logger.error(f"Source file does not exist: {source}")
return False
# Create destination directory
destination.parent.mkdir(parents=True, exist_ok=True)
try:
# Try reflink copy (works on btrfs, xfs, etc.)
import subprocess
result = subprocess.run(
['cp', '--reflink=auto', str(source), str(destination)],
capture_output=True,
check=False
)
if result.returncode != 0:
# Fallback to regular copy
shutil.copy2(source, destination)
return True
except Exception as e:
if self.logger:
self.logger.error(f"Reflink copy failed: {source} -> {destination}: {e}")
return False
def can_migrate(self, source: Path, destination: Path) -> bool:
"""Check if migration is possible"""
if not source.exists():
return False
dest_dir = destination.parent
if dest_dir.exists():
return os.access(dest_dir, os.W_OK)
return True
def estimate_time(self, source: Path) -> float:
"""Estimate migration time (reflinks are fast)"""
return 0.1 # Reflinks are nearly instant
def cleanup(self, source: Path) -> bool:
"""Cleanup source file"""
try:
if source.exists():
source.unlink()
return True
except Exception as e:
if self.logger:
self.logger.warning(f"Failed to cleanup {source}: {e}")
return False

454
src/migration/engine.py Normal file
View File

@@ -0,0 +1,454 @@
"""Migration engine"""
from pathlib import Path
from typing import Optional, Callable
from datetime import datetime
import psycopg2
from psycopg2.extras import execute_batch
from .copy import CopyMigrationStrategy, SafeCopyStrategy
from .hardlink import HardlinkMigrationStrategy, SymlinkMigrationStrategy
from ..shared.models import OperationRecord, ProcessingStats, MigrationPlan
from ..shared.config import DatabaseConfig, ProcessingConfig
from ..shared.logger import ProgressLogger
class MigrationEngine:
"""Engine for migrating files"""
def __init__(
self,
db_config: DatabaseConfig,
processing_config: ProcessingConfig,
logger: ProgressLogger,
target_base: Path
):
"""Initialize migration engine
Args:
db_config: Database configuration
processing_config: Processing configuration
logger: Progress logger
target_base: Target base directory for migrations
"""
self.db_config = db_config
self.processing_config = processing_config
self.logger = logger
self.target_base = Path(target_base)
self._connection = None
# Initialize strategies
self.copy_strategy = SafeCopyStrategy(logger=logger)
self.hardlink_strategy = HardlinkMigrationStrategy(logger=logger)
self.symlink_strategy = SymlinkMigrationStrategy(logger=logger)
def _get_connection(self):
"""Get or create database connection"""
if self._connection is None or self._connection.closed:
self._connection = psycopg2.connect(
host=self.db_config.host,
port=self.db_config.port,
database=self.db_config.database,
user=self.db_config.user,
password=self.db_config.password
)
return self._connection
def _ensure_tables(self):
"""Ensure migration tables exist"""
conn = self._get_connection()
cursor = conn.cursor()
# Create operations table
cursor.execute("""
CREATE TABLE IF NOT EXISTS operations (
id SERIAL PRIMARY KEY,
source_path TEXT NOT NULL,
dest_path TEXT NOT NULL,
operation_type TEXT NOT NULL,
size BIGINT DEFAULT 0,
status TEXT DEFAULT 'pending',
error TEXT,
executed_at TIMESTAMP,
verified BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Create index on status
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_operations_status
ON operations(status)
""")
conn.commit()
cursor.close()
def plan_migration(
self,
disk: Optional[str] = None,
category: Optional[str] = None
) -> MigrationPlan:
"""Plan migration for files
Args:
disk: Optional disk filter
category: Optional category filter
Returns:
MigrationPlan with planned operations
"""
self.logger.section("Planning Migration")
conn = self._get_connection()
cursor = conn.cursor()
# Build query
conditions = ["category IS NOT NULL"]
params = []
if disk:
conditions.append("disk = %s")
params.append(disk)
if category:
conditions.append("category = %s")
params.append(category)
query = f"""
SELECT path, size, category, duplicate_of
FROM files
WHERE {' AND '.join(conditions)}
ORDER BY category, path
"""
cursor.execute(query, params)
files = cursor.fetchall()
self.logger.info(f"Found {len(files)} files to migrate")
operations = []
total_size = 0
for path_str, size, file_category, duplicate_of in files:
source = Path(path_str)
# Determine destination
dest_path = self.target_base / file_category / source.name
# Determine operation type
if duplicate_of:
# Use hardlink for duplicates
operation_type = 'hardlink'
else:
# Use copy for unique files
operation_type = 'copy'
operation = OperationRecord(
source_path=source,
dest_path=dest_path,
operation_type=operation_type,
size=size
)
operations.append(operation)
total_size += size
cursor.close()
plan = MigrationPlan(
target_disk=str(self.target_base),
destination_disks=[str(self.target_base)],
operations=operations,
total_size=total_size,
file_count=len(operations)
)
self.logger.info(
f"Migration plan created: {plan.file_count} files, "
f"{plan.total_size:,} bytes"
)
return plan
def execute_migration(
self,
operations: list[OperationRecord],
dry_run: bool = False,
progress_callback: Optional[Callable[[int, int, ProcessingStats], None]] = None
) -> ProcessingStats:
"""Execute migration operations
Args:
operations: List of operations to execute
dry_run: Whether to perform a dry run
progress_callback: Optional callback for progress updates
Returns:
ProcessingStats with execution statistics
"""
self.logger.section("Executing Migration" + (" (DRY RUN)" if dry_run else ""))
self._ensure_tables()
stats = ProcessingStats()
total_ops = len(operations)
for operation in operations:
stats.files_processed += 1
if dry_run:
# In dry run, just log what would happen
self.logger.debug(
f"[DRY RUN] Would {operation.operation_type}: "
f"{operation.source_path} -> {operation.dest_path}"
)
stats.files_succeeded += 1
else:
# Execute actual migration
success = self._execute_operation(operation)
if success:
stats.files_succeeded += 1
stats.bytes_processed += operation.size
else:
stats.files_failed += 1
# Progress callback
if progress_callback and stats.files_processed % 100 == 0:
progress_callback(stats.files_processed, total_ops, stats)
# Log progress
if stats.files_processed % 1000 == 0:
self.logger.progress(
stats.files_processed,
total_ops,
prefix="Operations executed",
bytes_processed=stats.bytes_processed,
elapsed_seconds=stats.elapsed_seconds
)
self.logger.info(
f"Migration {'dry run' if dry_run else 'execution'} complete: "
f"{stats.files_succeeded}/{total_ops} operations, "
f"{stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s"
)
return stats
def _execute_operation(self, operation: OperationRecord) -> bool:
"""Execute a single migration operation
Args:
operation: Operation to execute
Returns:
True if successful
"""
operation.status = 'in_progress'
operation.executed_at = datetime.now()
try:
# Select strategy based on operation type
if operation.operation_type == 'copy':
strategy = self.copy_strategy
elif operation.operation_type == 'hardlink':
strategy = self.hardlink_strategy
elif operation.operation_type == 'symlink':
strategy = self.symlink_strategy
else:
raise ValueError(f"Unknown operation type: {operation.operation_type}")
# Execute migration
success = strategy.migrate(
operation.source_path,
operation.dest_path,
verify=self.processing_config.verify_operations
)
if success:
operation.status = 'completed'
operation.verified = True
self._record_operation(operation)
return True
else:
operation.status = 'failed'
operation.error = "Migration failed"
self._record_operation(operation)
return False
except Exception as e:
operation.status = 'failed'
operation.error = str(e)
self._record_operation(operation)
self.logger.error(f"Operation failed: {operation.source_path}: {e}")
return False
def _record_operation(self, operation: OperationRecord):
"""Record operation in database
Args:
operation: Operation to record
"""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO operations (
source_path, dest_path, operation_type, size,
status, error, executed_at, verified
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (
str(operation.source_path),
str(operation.dest_path),
operation.operation_type,
operation.size,
operation.status,
operation.error,
operation.executed_at,
operation.verified
))
conn.commit()
cursor.close()
def rollback(self, operation: OperationRecord) -> bool:
"""Rollback a migration operation
Args:
operation: Operation to rollback
Returns:
True if rollback successful
"""
self.logger.warning(f"Rolling back: {operation.dest_path}")
try:
# Remove destination
if operation.dest_path.exists():
operation.dest_path.unlink()
# Update database
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE operations
SET status = 'rolled_back'
WHERE source_path = %s AND dest_path = %s
""", (str(operation.source_path), str(operation.dest_path)))
conn.commit()
cursor.close()
return True
except Exception as e:
self.logger.error(f"Rollback failed: {operation.dest_path}: {e}")
return False
def get_migration_stats(self) -> dict:
"""Get migration statistics
Returns:
Dictionary with statistics
"""
conn = self._get_connection()
cursor = conn.cursor()
stats = {}
# Total operations
cursor.execute("SELECT COUNT(*) FROM operations")
stats['total_operations'] = cursor.fetchone()[0]
# Operations by status
cursor.execute("""
SELECT status, COUNT(*)
FROM operations
GROUP BY status
""")
for status, count in cursor.fetchall():
stats[f'{status}_operations'] = count
# Total size migrated
cursor.execute("""
SELECT COALESCE(SUM(size), 0)
FROM operations
WHERE status = 'completed'
""")
stats['total_size_migrated'] = cursor.fetchone()[0]
cursor.close()
return stats
def verify_migrations(self) -> dict:
"""Verify completed migrations
Returns:
Dictionary with verification results
"""
self.logger.subsection("Verifying Migrations")
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT source_path, dest_path, operation_type
FROM operations
WHERE status = 'completed' AND verified = FALSE
""")
operations = cursor.fetchall()
cursor.close()
results = {
'total': len(operations),
'verified': 0,
'failed': 0
}
for source_str, dest_str, op_type in operations:
source = Path(source_str)
dest = Path(dest_str)
# Verify destination exists
if not dest.exists():
results['failed'] += 1
self.logger.warning(f"Verification failed: {dest} does not exist")
continue
# Verify based on operation type
if op_type == 'hardlink':
# Check if hardlinked
if source.exists() and source.stat().st_ino == dest.stat().st_ino:
results['verified'] += 1
else:
results['failed'] += 1
else:
# Check if destination exists and has correct size
if dest.exists():
results['verified'] += 1
else:
results['failed'] += 1
self.logger.info(
f"Verification complete: {results['verified']}/{results['total']} verified"
)
return results
def close(self):
"""Close database connection"""
if self._connection and not self._connection.closed:
self._connection.close()
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.close()

377
src/migration/hardlink.py Normal file
View File

@@ -0,0 +1,377 @@
"""Hardlink-based migration strategy"""
import os
from pathlib import Path
from typing import Optional
from ..shared.logger import ProgressLogger
class HardlinkMigrationStrategy:
"""Create hardlinks to files instead of copying"""
def __init__(self, logger: Optional[ProgressLogger] = None):
"""Initialize hardlink migration strategy
Args:
logger: Optional progress logger
"""
self.logger = logger
def migrate(
self,
source: Path,
destination: Path,
verify: bool = True
) -> bool:
"""Migrate file by creating hardlink
Args:
source: Source file path
destination: Destination file path
verify: Whether to verify the operation
Returns:
True if migration successful
"""
if not source.exists():
if self.logger:
self.logger.error(f"Source file does not exist: {source}")
return False
# Check if source and destination are on same filesystem
if not self._same_filesystem(source, destination.parent):
if self.logger:
self.logger.warning(
f"Cannot hardlink across filesystems: {source} -> {destination}"
)
return False
# Create destination directory
destination.parent.mkdir(parents=True, exist_ok=True)
try:
# Create hardlink
os.link(source, destination)
# Verify if requested
if verify:
if not self._verify_hardlink(source, destination):
if self.logger:
self.logger.error(f"Verification failed: {source} -> {destination}")
destination.unlink()
return False
return True
except FileExistsError:
if self.logger:
self.logger.warning(f"Destination already exists: {destination}")
return False
except Exception as e:
if self.logger:
self.logger.error(f"Hardlink failed: {source} -> {destination}: {e}")
return False
def _same_filesystem(self, path1: Path, path2: Path) -> bool:
"""Check if two paths are on the same filesystem
Args:
path1: First path
path2: Second path
Returns:
True if on same filesystem
"""
try:
# Get device IDs
stat1 = path1.stat()
stat2 = path2.stat()
return stat1.st_dev == stat2.st_dev
except Exception:
return False
def _verify_hardlink(self, source: Path, destination: Path) -> bool:
"""Verify hardlink
Args:
source: Source file path
destination: Destination file path
Returns:
True if verification successful
"""
try:
# Check if they have the same inode
source_stat = source.stat()
dest_stat = destination.stat()
return source_stat.st_ino == dest_stat.st_ino
except Exception:
return False
def can_migrate(self, source: Path, destination: Path) -> bool:
"""Check if migration is possible
Args:
source: Source file path
destination: Destination file path
Returns:
True if migration is possible
"""
if not source.exists():
return False
# Check if on same filesystem
dest_dir = destination.parent
if dest_dir.exists():
return self._same_filesystem(source, dest_dir)
# Check parent directories
parent = dest_dir.parent
while not parent.exists() and parent != parent.parent:
parent = parent.parent
return parent.exists() and self._same_filesystem(source, parent)
def estimate_time(self, source: Path) -> float:
"""Estimate migration time in seconds
Args:
source: Source file path
Returns:
Estimated time in seconds (hardlinks are instant)
"""
return 0.01 # Hardlinks are nearly instant
def cleanup(self, source: Path) -> bool:
"""Cleanup source file after successful migration
Note: For hardlinks, we typically don't remove the source
immediately as both links point to the same inode.
Args:
source: Source file path
Returns:
True (no cleanup needed for hardlinks)
"""
# For hardlinks, we don't remove the source
# Both source and destination point to the same data
return True
class SymlinkMigrationStrategy:
"""Create symbolic links to files"""
def __init__(
self,
logger: Optional[ProgressLogger] = None,
absolute_links: bool = True
):
"""Initialize symlink migration strategy
Args:
logger: Optional progress logger
absolute_links: Whether to create absolute symlinks
"""
self.logger = logger
self.absolute_links = absolute_links
def migrate(
self,
source: Path,
destination: Path,
verify: bool = True
) -> bool:
"""Migrate file by creating symlink
Args:
source: Source file path
destination: Destination file path
verify: Whether to verify the operation
Returns:
True if migration successful
"""
if not source.exists():
if self.logger:
self.logger.error(f"Source file does not exist: {source}")
return False
# Create destination directory
destination.parent.mkdir(parents=True, exist_ok=True)
try:
# Determine link target
if self.absolute_links:
target = source.resolve()
else:
# Create relative symlink
target = os.path.relpath(source, destination.parent)
# Create symlink
destination.symlink_to(target)
# Verify if requested
if verify:
if not self._verify_symlink(destination, source):
if self.logger:
self.logger.error(f"Verification failed: {source} -> {destination}")
destination.unlink()
return False
return True
except FileExistsError:
if self.logger:
self.logger.warning(f"Destination already exists: {destination}")
return False
except Exception as e:
if self.logger:
self.logger.error(f"Symlink failed: {source} -> {destination}: {e}")
return False
def _verify_symlink(self, symlink: Path, expected_target: Path) -> bool:
"""Verify symlink
Args:
symlink: Symlink path
expected_target: Expected target path
Returns:
True if verification successful
"""
try:
# Check if it's a symlink
if not symlink.is_symlink():
return False
# Resolve and compare
resolved = symlink.resolve()
expected = expected_target.resolve()
return resolved == expected
except Exception:
return False
def can_migrate(self, source: Path, destination: Path) -> bool:
"""Check if migration is possible
Args:
source: Source file path
destination: Destination file path
Returns:
True if migration is possible
"""
if not source.exists():
return False
# Check if destination directory is writable
dest_dir = destination.parent
if dest_dir.exists():
return os.access(dest_dir, os.W_OK)
return True
def estimate_time(self, source: Path) -> float:
"""Estimate migration time in seconds
Args:
source: Source file path
Returns:
Estimated time in seconds (symlinks are instant)
"""
return 0.01 # Symlinks are instant
def cleanup(self, source: Path) -> bool:
"""Cleanup source file after successful migration
Note: For symlinks, we don't remove the source as the
symlink points to it.
Args:
source: Source file path
Returns:
True (no cleanup needed for symlinks)
"""
# For symlinks, we don't remove the source
return True
class DedupHardlinkStrategy(HardlinkMigrationStrategy):
"""Hardlink strategy for deduplication
Creates hardlinks for duplicate files to save space.
"""
def __init__(self, logger: Optional[ProgressLogger] = None):
"""Initialize dedup hardlink strategy"""
super().__init__(logger=logger)
def deduplicate(
self,
canonical: Path,
duplicate: Path
) -> bool:
"""Replace duplicate with hardlink to canonical
Args:
canonical: Canonical file path
duplicate: Duplicate file path
Returns:
True if deduplication successful
"""
if not canonical.exists():
if self.logger:
self.logger.error(f"Canonical file does not exist: {canonical}")
return False
if not duplicate.exists():
if self.logger:
self.logger.error(f"Duplicate file does not exist: {duplicate}")
return False
# Check if already hardlinked
if self._verify_hardlink(canonical, duplicate):
return True
# Check if on same filesystem
if not self._same_filesystem(canonical, duplicate):
if self.logger:
self.logger.warning(
f"Cannot hardlink across filesystems: {canonical} -> {duplicate}"
)
return False
try:
# Create temporary backup
backup = duplicate.with_suffix(duplicate.suffix + '.bak')
duplicate.rename(backup)
# Create hardlink
os.link(canonical, duplicate)
# Remove backup
backup.unlink()
return True
except Exception as e:
if self.logger:
self.logger.error(f"Deduplication failed: {duplicate}: {e}")
# Restore from backup
if backup.exists():
backup.rename(duplicate)
return False

50
src/shared/__init__.py Normal file
View File

@@ -0,0 +1,50 @@
"""Shared package exports"""
from .models import (
FileRecord,
OperationRecord,
DiskInfo,
MigrationPlan,
ProcessingStats
)
from .config import (
Config,
DatabaseConfig,
ProcessingConfig,
LoggingConfig,
load_config
)
from .logger import (
ProgressLogger,
create_logger,
format_size,
format_rate,
format_time
)
from ._protocols import IDatabase, ILogger
__all__ = [
# Models
'FileRecord',
'OperationRecord',
'DiskInfo',
'MigrationPlan',
'ProcessingStats',
# Config
'Config',
'DatabaseConfig',
'ProcessingConfig',
'LoggingConfig',
'load_config',
# Logger
'ProgressLogger',
'create_logger',
'format_size',
'format_rate',
'format_time',
# Protocols
'IDatabase',
'ILogger',
]

67
src/shared/_protocols.py Normal file
View File

@@ -0,0 +1,67 @@
"""Protocol definitions for the shared package"""
from typing import Protocol, Any
from pathlib import Path
from dataclasses import dataclass
from datetime import datetime
@dataclass
class FileRecord:
"""Core file record with all metadata"""
path: Path
size: int
modified_time: float
created_time: float
disk: str
checksum: str | None = None
status: str = 'indexed' # indexed, planned, moved, verified
category: str | None = None
duplicate_of: str | None = None
@dataclass
class OperationRecord:
"""Record of a migration operation"""
source_path: Path
dest_path: Path
operation_type: str # move, copy, hardlink, symlink
status: str = 'pending' # pending, in_progress, completed, failed
error: str | None = None
executed_at: datetime | None = None
verified: bool = False
class IDatabase(Protocol):
"""Protocol for database operations"""
def store_file(self, file_record: FileRecord) -> None:
"""Store a file record"""
...
def get_files_by_disk(self, disk: str) -> list[FileRecord]:
"""Get all files on a specific disk"""
...
def store_operation(self, operation: OperationRecord) -> None:
"""Store an operation record"""
...
def get_pending_operations(self) -> list[OperationRecord]:
"""Get all pending operations"""
...
class ILogger(Protocol):
"""Protocol for logging operations"""
def info(self, message: str) -> None:
...
def warning(self, message: str) -> None:
...
def error(self, message: str) -> None:
...
def debug(self, message: str) -> None:
...

110
src/shared/config.py Normal file
View File

@@ -0,0 +1,110 @@
"""Configuration management for disk reorganizer"""
import json
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Optional
@dataclass
class DatabaseConfig:
"""Database connection configuration"""
host: str = '192.168.1.159'
port: int = 5432
database: str = 'disk_reorganizer_db'
user: str = 'disk_reorg_user'
password: str = 'heel-goed-wachtwoord'
def to_dict(self) -> dict:
"""Convert to dictionary"""
return asdict(self)
@dataclass
class ProcessingConfig:
"""Processing behavior configuration"""
batch_size: int = 1000
commit_interval: int = 100
parallel_workers: int = 4
chunk_size: int = 8192
hash_algorithm: str = 'sha256'
verify_operations: bool = True
preserve_timestamps: bool = True
def to_dict(self) -> dict:
"""Convert to dictionary"""
return asdict(self)
@dataclass
class LoggingConfig:
"""Logging configuration"""
level: str = 'INFO'
log_file: str = 'disk_reorganizer.log'
console_output: bool = True
file_output: bool = True
def to_dict(self) -> dict:
"""Convert to dictionary"""
return asdict(self)
@dataclass
class Config:
"""Main configuration container"""
database: DatabaseConfig = None
processing: ProcessingConfig = None
logging: LoggingConfig = None
def __post_init__(self):
"""Initialize nested configs with defaults if not provided"""
if self.database is None:
self.database = DatabaseConfig()
if self.processing is None:
self.processing = ProcessingConfig()
if self.logging is None:
self.logging = LoggingConfig()
@classmethod
def from_file(cls, config_path: Path) -> 'Config':
"""Load configuration from JSON file"""
if not config_path.exists():
return cls()
with open(config_path, 'r') as f:
data = json.load(f)
return cls(
database=DatabaseConfig(**data.get('database', {})),
processing=ProcessingConfig(**data.get('processing', {})),
logging=LoggingConfig(**data.get('logging', {}))
)
def to_file(self, config_path: Path) -> None:
"""Save configuration to JSON file"""
data = {
'database': self.database.to_dict(),
'processing': self.processing.to_dict(),
'logging': self.logging.to_dict()
}
with open(config_path, 'w') as f:
json.dump(data, f, indent=2)
def to_dict(self) -> dict:
"""Convert to dictionary"""
return {
'database': self.database.to_dict(),
'processing': self.processing.to_dict(),
'logging': self.logging.to_dict()
}
def load_config(config_path: Optional[Path] = None) -> Config:
"""Load configuration from file or return default"""
if config_path is None:
config_path = Path('config.json')
if config_path.exists():
return Config.from_file(config_path)
return Config()

217
src/shared/logger.py Normal file
View File

@@ -0,0 +1,217 @@
"""Dynamic progress logger with formatting utilities"""
import sys
import logging
from typing import Optional
from datetime import datetime
from pathlib import Path
def format_size(bytes_size: int) -> str:
"""Format bytes to human-readable size string
Args:
bytes_size: Size in bytes
Returns:
Human-readable size string (e.g., "1.5 GB", "234.5 MB")
"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']:
if bytes_size < 1024.0:
return f"{bytes_size:.1f} {unit}"
bytes_size /= 1024.0
return f"{bytes_size:.1f} EB"
def format_rate(bytes_per_second: float) -> str:
"""Format transfer rate to human-readable string
Args:
bytes_per_second: Transfer rate in bytes per second
Returns:
Human-readable rate string (e.g., "125.3 MB/s")
"""
return f"{format_size(int(bytes_per_second))}/s"
def format_time(seconds: float) -> str:
"""Format seconds to human-readable time string
Args:
seconds: Time in seconds
Returns:
Human-readable time string (e.g., "2h 34m 12s", "45m 23s", "12s")
"""
if seconds < 60:
return f"{int(seconds)}s"
elif seconds < 3600:
minutes = int(seconds // 60)
secs = int(seconds % 60)
return f"{minutes}m {secs}s"
else:
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
return f"{hours}h {minutes}m {secs}s"
class ProgressLogger:
"""Dynamic progress logger with real-time statistics"""
def __init__(
self,
name: str = "defrag",
level: int = logging.INFO,
log_file: Optional[Path] = None,
console_output: bool = True
):
"""Initialize progress logger
Args:
name: Logger name
level: Logging level
log_file: Optional log file path
console_output: Whether to output to console
"""
self.logger = logging.getLogger(name)
self.logger.setLevel(level)
self.logger.handlers.clear()
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Add console handler
if console_output:
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(level)
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
# Add file handler
if log_file:
log_file.parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(level)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
self._last_progress_line = ""
def info(self, message: str) -> None:
"""Log info message"""
self.logger.info(message)
def warning(self, message: str) -> None:
"""Log warning message"""
self.logger.warning(message)
def error(self, message: str) -> None:
"""Log error message"""
self.logger.error(message)
def debug(self, message: str) -> None:
"""Log debug message"""
self.logger.debug(message)
def critical(self, message: str) -> None:
"""Log critical message"""
self.logger.critical(message)
def progress(
self,
current: int,
total: int,
prefix: str = "",
suffix: str = "",
bytes_processed: Optional[int] = None,
elapsed_seconds: Optional[float] = None
) -> None:
"""Log progress with dynamic statistics
Args:
current: Current progress count
total: Total count
prefix: Prefix message
suffix: Suffix message
bytes_processed: Optional bytes processed for rate calculation
elapsed_seconds: Optional elapsed time for rate calculation
"""
if total == 0:
percent = 0.0
else:
percent = (current / total) * 100
progress_msg = f"{prefix} [{current}/{total}] {percent:.1f}%"
if bytes_processed is not None and elapsed_seconds is not None and elapsed_seconds > 0:
rate = bytes_per_second = bytes_processed / elapsed_seconds
progress_msg += f" | {format_size(bytes_processed)} @ {format_rate(rate)}"
# Estimate time remaining
if current > 0:
estimated_total_seconds = (elapsed_seconds / current) * total
remaining_seconds = estimated_total_seconds - elapsed_seconds
progress_msg += f" | ETA: {format_time(remaining_seconds)}"
if suffix:
progress_msg += f" | {suffix}"
self.info(progress_msg)
def section(self, title: str) -> None:
"""Log section header
Args:
title: Section title
"""
separator = "=" * 60
self.info(separator)
self.info(f" {title}")
self.info(separator)
def subsection(self, title: str) -> None:
"""Log subsection header
Args:
title: Subsection title
"""
self.info(f"\n--- {title} ---")
def create_logger(
name: str = "defrag",
level: str = "INFO",
log_file: Optional[Path] = None,
console_output: bool = True
) -> ProgressLogger:
"""Create and configure a progress logger
Args:
name: Logger name
level: Logging level as string
log_file: Optional log file path
console_output: Whether to output to console
Returns:
Configured ProgressLogger instance
"""
level_map = {
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'ERROR': logging.ERROR,
'CRITICAL': logging.CRITICAL
}
log_level = level_map.get(level.upper(), logging.INFO)
return ProgressLogger(
name=name,
level=log_level,
log_file=log_file,
console_output=console_output
)

127
src/shared/models.py Normal file
View File

@@ -0,0 +1,127 @@
"""Data models for the disk reorganizer"""
from dataclasses import dataclass, field
from pathlib import Path
from datetime import datetime
from typing import Optional
@dataclass
class FileRecord:
"""Core file record with all metadata"""
path: Path
size: int
modified_time: float
created_time: float
disk: str
checksum: Optional[str] = None
status: str = 'indexed' # indexed, planned, moved, verified
category: Optional[str] = None
duplicate_of: Optional[str] = None
def to_dict(self) -> dict:
"""Convert to dictionary for serialization"""
return {
'path': str(self.path),
'size': self.size,
'modified_time': self.modified_time,
'created_time': self.created_time,
'disk': self.disk,
'checksum': self.checksum,
'status': self.status,
'category': self.category,
'duplicate_of': self.duplicate_of
}
@dataclass
class OperationRecord:
"""Record of a migration operation"""
source_path: Path
dest_path: Path
operation_type: str # move, copy, hardlink, symlink
size: int = 0
status: str = 'pending' # pending, in_progress, completed, failed
error: Optional[str] = None
executed_at: Optional[datetime] = None
verified: bool = False
def to_dict(self) -> dict:
"""Convert to dictionary for serialization"""
return {
'source_path': str(self.source_path),
'dest_path': str(self.dest_path),
'operation_type': self.operation_type,
'size': self.size,
'status': self.status,
'error': self.error,
'executed_at': self.executed_at.isoformat() if self.executed_at else None,
'verified': self.verified
}
@dataclass
class DiskInfo:
"""Information about a disk/volume"""
name: str
device: str
mount_point: Path
total_size: int
used_size: int
free_size: int
fs_type: str
@property
def usage_percent(self) -> float:
"""Calculate usage percentage"""
if self.total_size == 0:
return 0.0
return (self.used_size / self.total_size) * 100
@dataclass
class MigrationPlan:
"""Complete migration plan"""
target_disk: str
destination_disks: list[str]
operations: list[OperationRecord]
total_size: int
file_count: int
created_at: datetime = field(default_factory=datetime.now)
def to_dict(self) -> dict:
"""Convert to dictionary for serialization"""
return {
'target_disk': self.target_disk,
'destination_disks': self.destination_disks,
'operations': [op.to_dict() for op in self.operations],
'total_size': self.total_size,
'file_count': self.file_count,
'created_at': self.created_at.isoformat()
}
@dataclass
class ProcessingStats:
"""Statistics for processing operations"""
files_processed: int = 0
bytes_processed: int = 0
files_succeeded: int = 0
files_failed: int = 0
start_time: datetime = field(default_factory=datetime.now)
@property
def elapsed_seconds(self) -> float:
"""Calculate elapsed time in seconds"""
return (datetime.now() - self.start_time).total_seconds()
@property
def files_per_second(self) -> float:
"""Calculate processing rate"""
elapsed = self.elapsed_seconds
return self.files_processed / elapsed if elapsed > 0 else 0.0
@property
def bytes_per_second(self) -> float:
"""Calculate throughput"""
elapsed = self.elapsed_seconds
return self.bytes_processed / elapsed if elapsed > 0 else 0.0

0
src/tests/__init__.py Normal file
View File