From 0ad3c0063aed93fa19514f035a7e6084b742134b Mon Sep 17 00:00:00 2001 From: wsl Date: Tue, 9 Dec 2025 10:35:33 +0100 Subject: [PATCH] first commit --- src/main.py | 437 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 437 insertions(+) create mode 100644 src/main.py diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..909fdf4 --- /dev/null +++ b/src/main.py @@ -0,0 +1,437 @@ +#!/usr/bin/env python3 +""" +Disk Reorganizer - Safely restructure files across disks to free up one entire disk. +Three modes: index, plan, execute +""" + +import os +import sys +import sqlite3 +import shutil +import hashlib +import argparse +import json +from pathlib import Path +from dataclasses import dataclass, asdict +from typing import List, Dict, Optional, Tuple +from datetime import datetime +import logging + +# Setup logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('disk_reorganizer.log'), + logging.StreamHandler(sys.stdout) + ] +) +logger = logging.getLogger(__name__) + +@dataclass +class FileRecord: + """Represents a file in the index""" + path: str + size: int + modified_time: float + disk: str + checksum: Optional[str] = None + status: str = 'indexed' # indexed, planned, moved, verified + +class DiskReorganizer: + def __init__(self, db_path: str = "file_index.db"): + self.db_path = db_path + self.init_database() + + def init_database(self): + """Initialize SQLite database""" + with sqlite3.connect(self.db_path) as conn: + conn.execute(""" + CREATE TABLE IF NOT EXISTS files ( + path TEXT PRIMARY KEY, + size INTEGER, + modified_time REAL, + disk TEXT, + checksum TEXT, + status TEXT DEFAULT 'indexed' + ) + """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS operations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + source_path TEXT, + dest_path TEXT, + operation_type TEXT, + executed INTEGER DEFAULT 0, + verified INTEGER DEFAULT 0, + error TEXT + ) + """) + conn.commit() + + def index_disk(self, disk_root: str, disk_name: str): + """ + Index all files on a disk/partition + :param disk_root: Root path of disk (e.g., 'D:\\') + :param disk_name: Logical name for the disk + """ + logger.info(f"Indexing disk: {disk_name} at {disk_root}") + disk_path = Path(disk_root) + + if not disk_path.exists(): + logger.error(f"Disk path {disk_root} does not exist!") + return + + files_count = 0 + total_size = 0 + + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + # Walk through all files + for root, dirs, files in os.walk(disk_path): + # Skip system directories + dirs[:] = [d for d in dirs if not d.startswith(('$', 'System Volume Information', 'Recovery'))] + + for file in files: + try: + file_path = Path(root) / file + if not file_path.is_file(): + continue + + stat = file_path.stat() + size = stat.st_size + mtime = stat.st_mtime + + # Calculate relative path for portability + rel_path = str(file_path.relative_to(disk_path)) + + cursor.execute( + "INSERT OR REPLACE INTO files VALUES (?, ?, ?, ?, ?, ?)", + (rel_path, size, mtime, disk_name, None, 'indexed') + ) + + files_count += 1 + total_size += size + + if files_count % 1000 == 0: + logger.info(f"Indexed {files_count} files, {self.format_size(total_size)}...") + + except Exception as e: + logger.warning(f"Skipping {file_path}: {e}") + continue + + conn.commit() + + logger.info(f"Completed indexing {disk_name}: {files_count} files, {self.format_size(total_size)}") + + def calculate_disk_usage(self) -> Dict[str, Dict]: + """Calculate current usage per disk""" + with sqlite3.connect(self.db_path) as conn: + cursor = conn.execute(""" + SELECT disk, SUM(size) as total_size, COUNT(*) as file_count + FROM files + GROUP BY disk + """) + + usage = {} + for row in cursor: + disk = row[0] + size = row[1] or 0 + count = row[2] + usage[disk] = { + 'size': size, + 'count': count, + 'formatted_size': self.format_size(size) + } + + return usage + + def plan_migration(self, target_disk: str, destination_disks: List[str]) -> Dict: + """ + Create a migration plan to free up target_disk + :param target_disk: Disk to free up (e.g., 'D:') + :param destination_disks: List of disks to move files to + :return: Migration plan dictionary + """ + logger.info(f"Planning migration to free up {target_disk}") + + usage = self.calculate_disk_usage() + + if target_disk not in usage: + logger.error(f"Target disk {target_disk} not found in index!") + return {} + + # Get files on target disk + with sqlite3.connect(self.db_path) as conn: + cursor = conn.execute( + "SELECT path, size, modified_time FROM files WHERE disk = ? ORDER BY size DESC", + (target_disk,) + ) + files_to_move = cursor.fetchall() + + target_disk_usage = usage[target_disk]['size'] + logger.info(f"Need to move {len(files_to_move)} files, {self.format_size(target_disk_usage)}") + + # Calculate available space on destination disks + dest_availability = [] + for disk in destination_disks: + if disk not in usage: + # Assume empty disk + available = float('inf') + else: + # In real scenario, query actual disk free space + available = float('inf') # Placeholder + + dest_availability.append({ + 'disk': disk, + 'available': available, + 'planned_usage': 0 + }) + + # Generate move plan + plan = { + 'target_disk': target_disk, + 'total_size': target_disk_usage, + 'file_count': len(files_to_move), + 'operations': [], + 'destination_disks': destination_disks + } + + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + for file_info in files_to_move: + rel_path, size, mtime = file_info + + # Find best destination (simple round-robin for balance) + dest_disk = destination_disks[len(plan['operations']) % len(destination_disks)] + + # Record operation + op = { + 'source_disk': target_disk, + 'source_path': rel_path, + 'dest_disk': dest_disk, + 'dest_path': rel_path, # Keep same relative path + 'size': size + } + plan['operations'].append(op) + + # Store in database + cursor.execute( + "INSERT INTO operations (source_path, dest_path, operation_type) VALUES (?, ?, ?)", + (f"{target_disk}:{rel_path}", f"{dest_disk}:{rel_path}", 'move') + ) + + conn.commit() + + # Save plan to JSON + plan_file = f"migration_plan_{target_disk}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" + with open(plan_file, 'w') as f: + json.dump(plan, f, indent=2) + + logger.info(f"Plan created with {len(plan['operations'])} operations") + logger.info(f"Plan saved to {plan_file}") + + return plan + + def verify_operation(self, source: Path, dest: Path) -> bool: + """Verify file was copied correctly (size + optional checksum)""" + if not dest.exists(): + return False + + try: + source_stat = source.stat() + dest_stat = dest.stat() + + if source_stat.st_size != dest_stat.st_size: + return False + + # Optional: checksum verification for critical files + # if source_stat.st_size < 100*1024*1024: # Only for files < 100MB + # return self.file_checksum(source) == self.file_checksum(dest) + + return True + except Exception as e: + logger.error(f"Verification error: {e}") + return False + + @staticmethod + def file_checksum(path: Path) -> str: + """Calculate MD5 checksum of file""" + hash_md5 = hashlib.md5() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_md5.update(chunk) + return hash_md5.hexdigest() + + def execute_migration(self, plan_file: str, dry_run: bool = True): + """ + Execute migration plan + :param plan_file: Path to plan JSON file + :param dry_run: If True, only simulate operations + """ + logger.info(f"{'DRY RUN' if dry_run else 'EXECUTING'} migration from {plan_file}") + + with open(plan_file, 'r') as f: + plan = json.load(f) + + operations = plan['operations'] + logger.info(f"Processing {len(operations)} operations...") + + success_count = 0 + error_count = 0 + + with sqlite3.connect(self.db_path) as conn: + cursor = conn.cursor() + + for i, op in enumerate(operations, 1): + source_disk = op['source_disk'] + source_path = op['source_path'] + dest_disk = op['dest_disk'] + dest_path = op['dest_path'] + + source_full = Path(source_disk) / source_path + dest_full = Path(dest_disk) / dest_path + + logger.info(f"[{i}/{len(operations)}] {source_full} -> {dest_full}") + + if dry_run: + # Simulate + if source_full.exists(): + logger.info(f" Would move {self.format_size(op['size'])}") + success_count += 1 + else: + logger.warning(f" Source does not exist!") + error_count += 1 + continue + + try: + # Create destination directory + dest_full.parent.mkdir(parents=True, exist_ok=True) + + # Move file (copy + verify + delete) + if source_full.exists(): + # Copy with metadata + shutil.copy2(source_full, dest_full) + + # Verify + if self.verify_operation(source_full, dest_full): + # Update database + cursor.execute( + "UPDATE files SET disk = ?, status = 'moved' WHERE path = ? AND disk = ?", + (dest_disk, source_path, source_disk) + ) + + # Safe delete (could be made optional) + # source_full.unlink() + + # Log operation as executed + cursor.execute( + "UPDATE operations SET executed = 1 WHERE source_path = ?", + (f"{source_disk}:{source_path}",) + ) + + logger.info(f" ✓ Moved and verified") + success_count += 1 + else: + raise Exception("Verification failed") + else: + logger.warning(f" Source missing, skipping") + error_count += 1 + + except Exception as e: + logger.error(f" ✗ Error: {e}") + cursor.execute( + "UPDATE operations SET error = ? WHERE source_path = ?", + (str(e), f"{source_disk}:{source_path}") + ) + error_count += 1 + + # Commit every 10 operations + if i % 10 == 0: + conn.commit() + + conn.commit() + + logger.info(f"Migration complete: {success_count} success, {error_count} errors") + + if not dry_run and error_count == 0: + logger.info(f"✓ Disk {plan['target_disk']} is ready for Linux installation!") + logger.info(f" Remember to safely delete original files from {plan['target_disk']}") + + def generate_report(self): + """Generate status report""" + with sqlite3.connect(self.db_path) as conn: + cursor = conn.execute(""" + SELECT status, COUNT(*), SUM(size) FROM files GROUP BY status + """) + + print("\n=== FILE MIGRATION REPORT ===") + for row in cursor: + status, count, size = row + print(f"{status:15}: {count:6} files, {self.format_size(size or 0)}") + + cursor = conn.execute(""" + SELECT operation_type, executed, verified, COUNT(*) FROM operations GROUP BY operation_type, executed, verified + """) + + print("\n=== OPERATIONS REPORT ===") + for row in cursor: + op_type, executed, verified, count = row + status = "EXECUTED" if executed else "PENDING" + if verified: + status += "+VERIFIED" + print(f"{op_type:10} {status:15}: {count} operations") + + @staticmethod + def format_size(size: int) -> str: + """Format bytes to human readable string""" + for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + if size < 1024: + return f"{size:.1f}{unit}" + size /= 1024 + return f"{size:.1f}PB" + +def main(): + parser = argparse.ArgumentParser(description='Disk Reorganizer - Free up a disk for Linux dual-boot') + subparsers = parser.add_subparsers(dest='command', required=True) + + # Index command + index_parser = subparsers.add_parser('index', help='Index files on a disk') + index_parser.add_argument('disk_root', help='Root path of disk (e.g., D:\\\\)') + index_parser.add_argument('disk_name', help='Logical name for the disk') + + # Plan command + plan_parser = subparsers.add_parser('plan', help='Create migration plan') + plan_parser.add_argument('target_disk', help='Disk to free up') + plan_parser.add_argument('dest_disks', nargs='+', help='Destination disks') + + # Execute command + exec_parser = subparsers.add_parser('execute', help='Execute migration plan') + exec_parser.add_argument('plan_file', help='Path to plan JSON file') + exec_parser.add_argument('--dry-run', action='store_true', help='Simulate without actual file operations') + + # Report command + report_parser = subparsers.add_parser('report', help='Show current status') + + args = parser.parse_args() + tool = DiskReorganizer() + + if args.command == 'index': + tool.index_disk(args.disk_root, args.disk_name) + + elif args.command == 'plan': + plan = tool.plan_migration(args.target_disk, args.dest_disks) + if plan: + print(f"\nPlan generated: {plan['file_count']} files, {tool.format_size(plan['total_size'])}") + print(f"Destination disks: {', '.join(plan['destination_disks'])}") + + elif args.command == 'execute': + tool.execute_migration(args.plan_file, dry_run=args.dry_run) + + elif args.command == 'report': + tool.generate_report() + +if __name__ == '__main__': + main() \ No newline at end of file