Files
defrag/app/main.py
2025-12-12 23:53:56 +01:00

514 lines
19 KiB
Python

#!/usr/bin/env python3
"""
Disk Reorganizer - Safely restructure files across disks to free up one entire disk.
Three modes: index, plan, execute
"""
import os
import sys
import psycopg2
from psycopg2 import sql
from psycopg2.extras import RealDictCursor
import shutil
import hashlib
import argparse
import json
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import List, Dict, Optional, Tuple
from datetime import datetime
import logging
import time
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('disk_reorganizer.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
@dataclass
class FileRecord:
"""Represents a file in the index"""
path: str
size: int
modified_time: float
disk_label: str
checksum: Optional[str] = None
status: str = 'indexed' # indexed, planned, moved, verified
class DiskReorganizer:
def __init__(self, db_config: Dict = None):
"""
Initialize DiskReorganizer with PostgreSQL connection
:param db_config: Database configuration dict with host, port, database, user, password
"""
if db_config is None:
db_config = {
'host': os.getenv('DB_HOST', '192.168.1.159'),
'port': int(os.getenv('DB_PORT', 5432)),
'database': os.getenv('DB_NAME', 'disk_reorganizer_db'),
'user': os.getenv('DB_USER', 'disk_reorg_user'),
'password': os.getenv('DB_PASSWORD', 'heel-goed-wachtwoord')
}
self.db_config = db_config
self.init_database()
def get_connection(self):
"""Get PostgreSQL database connection"""
return psycopg2.connect(**self.db_config)
def init_database(self):
"""Verify PostgreSQL database connection and tables exist"""
try:
conn = self.get_connection()
cursor = conn.cursor()
# Test connection and verify tables exist
cursor.execute("""
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public' AND table_name IN ('files', 'operations')
""")
tables = cursor.fetchall()
if len(tables) < 2:
logger.error("Database tables not found! Please run setup_database.sh first.")
raise Exception("Database not properly initialized. Run setup_database.sh")
cursor.close()
conn.close()
logger.info("Database connection verified successfully")
except psycopg2.Error as e:
logger.error(f"Database connection failed: {e}")
raise
def index_disk(self, disk_root: str, disk_name: str):
"""
Index all files on a disk/partition with dynamic progress display
:param disk_root: Root path of disk (e.g., 'D:\\')
:param disk_name: Logical name for the disk
"""
logger.info(f"Indexing disk: {disk_name} at {disk_root}")
disk_path = Path(disk_root)
if not disk_path.exists():
logger.error(f"Disk path {disk_root} does not exist!")
return
files_count = 0
total_size = 0
start_time = time.time()
conn = self.get_connection()
cursor = conn.cursor()
try:
# Walk through all files
for root, dirs, files in os.walk(disk_path):
# Skip system directories
dirs[:] = [d for d in dirs if not d.startswith(('$', 'System Volume Information', 'Recovery'))]
for file in files:
try:
file_path = Path(root) / file
if not file_path.is_file():
continue
stat = file_path.stat()
size = stat.st_size
mtime = datetime.fromtimestamp(stat.st_mtime)
# Calculate relative path for portability
rel_path = str(file_path.relative_to(disk_path))
# PostgreSQL INSERT ... ON CONFLICT for upsert
cursor.execute("""
INSERT INTO files (path, size, modified_time, disk_label, checksum, status)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (path) DO UPDATE SET
size = EXCLUDED.size,
modified_time = EXCLUDED.modified_time,
disk_label = EXCLUDED.disk_label,
status = EXCLUDED.status
""", (rel_path, size, mtime, disk_name, None, 'indexed'))
files_count += 1
total_size += size
# Dynamic progress display - update every 100 files
if files_count % 100 == 0:
elapsed = time.time() - start_time
rate = files_count / elapsed if elapsed > 0 else 0
# Truncate path for display
display_path = str(file_path)
if len(display_path) > 60:
display_path = '...' + display_path[-57:]
# Use \r to overwrite the line
print(f"\rIndexing: {files_count:,} files | {self.format_size(total_size)} | {rate:.0f} files/s | {display_path}", end='', flush=True)
# Commit every 1000 files for performance
if files_count % 1000 == 0:
conn.commit()
except Exception as e:
conn.rollback()
logger.warning(f"\nSkipping {file_path}: {e}")
continue
conn.commit()
print() # New line after progress display
logger.info(f"Completed indexing {disk_name}: {files_count} files, {self.format_size(total_size)}")
finally:
cursor.close()
conn.close()
def calculate_disk_usage(self) -> Dict[str, Dict]:
"""Calculate current usage per disk"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute("""
SELECT disk_label, SUM(size) as total_size, COUNT(*) as file_count
FROM files
GROUP BY disk_label
""")
usage = {}
for row in cursor.fetchall():
disk = row[0]
size = int(row[1] or 0)
count = int(row[2])
usage[disk] = {
'size': size,
'count': count,
'formatted_size': self.format_size(size)
}
return usage
finally:
cursor.close()
conn.close()
def plan_migration(self, target_disk: str, destination_disks: List[str]) -> Dict:
"""
Create a migration plan to free up target_disk
:param target_disk: Disk to free up (e.g., 'D:')
:param destination_disks: List of disks to move files to
:return: Migration plan dictionary
"""
logger.info(f"Planning migration to free up {target_disk}")
usage = self.calculate_disk_usage()
if target_disk not in usage:
logger.error(f"Target disk {target_disk} not found in index!")
return {}
# Get files on target disk
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute(
"SELECT path, size, modified_time FROM files WHERE disk_label = %s ORDER BY size DESC",
(target_disk,)
)
files_to_move = cursor.fetchall()
cursor.close()
conn.close()
target_disk_usage = usage[target_disk]['size']
logger.info(f"Need to move {len(files_to_move)} files, {self.format_size(target_disk_usage)}")
# Calculate available space on destination disks
dest_availability = []
for disk in destination_disks:
if disk not in usage:
# Assume empty disk
available = float('inf')
else:
# In real scenario, query actual disk free space
available = float('inf') # Placeholder
dest_availability.append({
'disk': disk,
'available': available,
'planned_usage': 0
})
# Generate move plan
plan = {
'target_disk': target_disk,
'total_size': target_disk_usage,
'file_count': len(files_to_move),
'operations': [],
'destination_disks': destination_disks
}
conn = self.get_connection()
cursor = conn.cursor()
try:
for file_info in files_to_move:
rel_path, size, mtime = file_info
# Find best destination (simple round-robin for balance)
dest_disk = destination_disks[len(plan['operations']) % len(destination_disks)]
# Record operation
op = {
'source_disk': target_disk,
'source_path': rel_path,
'dest_disk': dest_disk,
'target_path': rel_path, # Keep same relative path
'size': int(size)
}
plan['operations'].append(op)
# Store in database
cursor.execute(
"INSERT INTO operations (source_path, target_path, operation_type, status) VALUES (%s, %s, %s, %s)",
(f"{target_disk}:{rel_path}", f"{dest_disk}:{rel_path}", 'move', 'pending')
)
conn.commit()
finally:
cursor.close()
conn.close()
# Save plan to JSON
plan_file = f"migration_plan_{target_disk}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(plan_file, 'w') as f:
json.dump(plan, f, indent=2)
logger.info(f"Plan created with {len(plan['operations'])} operations")
logger.info(f"Plan saved to {plan_file}")
return plan
def verify_operation(self, source: Path, dest: Path) -> bool:
"""Verify file was copied correctly (size + optional checksum)"""
if not dest.exists():
return False
try:
source_stat = source.stat()
dest_stat = dest.stat()
if source_stat.st_size != dest_stat.st_size:
return False
# Optional: checksum verification for critical files
# if source_stat.st_size < 100*1024*1024: # Only for files < 100MB
# return self.file_checksum(source) == self.file_checksum(dest)
return True
except Exception as e:
logger.error(f"Verification error: {e}")
return False
@staticmethod
def file_checksum(path: Path) -> str:
"""Calculate MD5 checksum of file"""
hash_md5 = hashlib.md5()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def execute_migration(self, plan_file: str, dry_run: bool = True):
"""
Execute migration plan
:param plan_file: Path to plan JSON file
:param dry_run: If True, only simulate operations
"""
logger.info(f"{'DRY RUN' if dry_run else 'EXECUTING'} migration from {plan_file}")
with open(plan_file, 'r') as f:
plan = json.load(f)
operations = plan['operations']
logger.info(f"Processing {len(operations)} operations...")
success_count = 0
error_count = 0
start_time = time.time()
conn = self.get_connection()
cursor = conn.cursor()
try:
for i, op in enumerate(operations, 1):
source_disk = op['source_disk']
source_path = op['source_path']
dest_disk = op['dest_disk']
target_path = op['target_path']
source_full = Path(source_disk) / source_path
dest_full = Path(dest_disk) / target_path
# Dynamic progress display
elapsed = time.time() - start_time
rate = i / elapsed if elapsed > 0 else 0
eta = (len(operations) - i) / rate if rate > 0 else 0
display_path = str(source_path)
if len(display_path) > 50:
display_path = '...' + display_path[-47:]
print(f"\r[{i}/{len(operations)}] {success_count} OK, {error_count} ERR | {rate:.1f} files/s | ETA: {int(eta)}s | {display_path}", end='', flush=True)
if dry_run:
# Simulate
if source_full.exists():
success_count += 1
else:
logger.warning(f"\n Source does not exist: {source_full}")
error_count += 1
continue
try:
# Create destination directory
dest_full.parent.mkdir(parents=True, exist_ok=True)
# Move file (copy + verify + delete)
if source_full.exists():
# Copy with metadata
shutil.copy2(source_full, dest_full)
# Verify
if self.verify_operation(source_full, dest_full):
# Update database
cursor.execute(
"UPDATE files SET disk_label = %s, status = 'moved' WHERE path = %s AND disk_label = %s",
(dest_disk, source_path, source_disk)
)
# Safe delete (could be made optional)
# source_full.unlink()
# Log operation as executed
cursor.execute(
"UPDATE operations SET executed = 1, executed_at = CURRENT_TIMESTAMP WHERE source_path = %s",
(f"{source_disk}:{source_path}",)
)
success_count += 1
else:
raise Exception("Verification failed")
else:
logger.warning(f"\n Source missing: {source_full}")
error_count += 1
except Exception as e:
logger.error(f"\n Error processing {source_path}: {e}")
cursor.execute(
"UPDATE operations SET error = %s WHERE source_path = %s",
(str(e), f"{source_disk}:{source_path}")
)
error_count += 1
# Commit every 10 operations
if i % 10 == 0:
conn.commit()
conn.commit()
print() # New line after progress display
finally:
cursor.close()
conn.close()
logger.info(f"Migration complete: {success_count} success, {error_count} errors")
if not dry_run and error_count == 0:
logger.info(f"✓ Disk {plan['target_disk']} is ready for Linux installation!")
logger.info(f" Remember to safely delete original files from {plan['target_disk']}")
def generate_report(self):
"""Generate status report"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute("""
SELECT status, COUNT(*), SUM(size) FROM files GROUP BY status
""")
print("\n=== FILE MIGRATION REPORT ===")
for row in cursor.fetchall():
status, count, size = row
print(f"{status:15}: {count:6} files, {self.format_size(size or 0)}")
cursor.execute("""
SELECT operation_type, executed, verified, COUNT(*) FROM operations GROUP BY operation_type, executed, verified
""")
print("\n=== OPERATIONS REPORT ===")
for row in cursor.fetchall():
op_type, executed, verified, count = row
status = "EXECUTED" if executed else "PENDING"
if verified:
status += "+VERIFIED"
print(f"{op_type:10} {status:15}: {count} operations")
finally:
cursor.close()
conn.close()
@staticmethod
def format_size(size: int) -> str:
"""Format bytes to human readable string"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size < 1024:
return f"{size:.1f}{unit}"
size /= 1024
return f"{size:.1f}PB"
def main():
parser = argparse.ArgumentParser(description='Disk Reorganizer - Free up a disk for Linux dual-boot')
subparsers = parser.add_subparsers(dest='command', required=True)
# Index command
index_parser = subparsers.add_parser('index', help='Index files on a disk')
index_parser.add_argument('disk_root', help='Root path of disk (e.g., D:\\\\)')
index_parser.add_argument('disk_name', help='Logical name for the disk')
# Plan command
plan_parser = subparsers.add_parser('plan', help='Create migration plan')
plan_parser.add_argument('target_disk', help='Disk to free up')
plan_parser.add_argument('dest_disks', nargs='+', help='Destination disks')
# Execute command
exec_parser = subparsers.add_parser('execute', help='Execute migration plan')
exec_parser.add_argument('plan_file', help='Path to plan JSON file')
exec_parser.add_argument('--dry-run', action='store_true', help='Simulate without actual file operations')
# Report command
report_parser = subparsers.add_parser('report', help='Show current status')
args = parser.parse_args()
tool = DiskReorganizer()
if args.command == 'index':
tool.index_disk(args.disk_root, args.disk_name)
elif args.command == 'plan':
plan = tool.plan_migration(args.target_disk, args.dest_disks)
if plan:
print(f"\nPlan generated: {plan['file_count']} files, {tool.format_size(plan['total_size'])}")
print(f"Destination disks: {', '.join(plan['destination_disks'])}")
elif args.command == 'execute':
tool.execute_migration(args.plan_file, dry_run=args.dry_run)
elif args.command == 'report':
tool.generate_report()
if __name__ == '__main__':
main()