first commit

This commit is contained in:
wsl
2025-12-09 10:35:33 +01:00
commit 0ad3c0063a

437
src/main.py Normal file
View File

@@ -0,0 +1,437 @@
#!/usr/bin/env python3
"""
Disk Reorganizer - Safely restructure files across disks to free up one entire disk.
Three modes: index, plan, execute
"""
import os
import sys
import sqlite3
import shutil
import hashlib
import argparse
import json
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import List, Dict, Optional, Tuple
from datetime import datetime
import logging
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('disk_reorganizer.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
@dataclass
class FileRecord:
"""Represents a file in the index"""
path: str
size: int
modified_time: float
disk: str
checksum: Optional[str] = None
status: str = 'indexed' # indexed, planned, moved, verified
class DiskReorganizer:
def __init__(self, db_path: str = "file_index.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
"""Initialize SQLite database"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS files (
path TEXT PRIMARY KEY,
size INTEGER,
modified_time REAL,
disk TEXT,
checksum TEXT,
status TEXT DEFAULT 'indexed'
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS operations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_path TEXT,
dest_path TEXT,
operation_type TEXT,
executed INTEGER DEFAULT 0,
verified INTEGER DEFAULT 0,
error TEXT
)
""")
conn.commit()
def index_disk(self, disk_root: str, disk_name: str):
"""
Index all files on a disk/partition
:param disk_root: Root path of disk (e.g., 'D:\\')
:param disk_name: Logical name for the disk
"""
logger.info(f"Indexing disk: {disk_name} at {disk_root}")
disk_path = Path(disk_root)
if not disk_path.exists():
logger.error(f"Disk path {disk_root} does not exist!")
return
files_count = 0
total_size = 0
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Walk through all files
for root, dirs, files in os.walk(disk_path):
# Skip system directories
dirs[:] = [d for d in dirs if not d.startswith(('$', 'System Volume Information', 'Recovery'))]
for file in files:
try:
file_path = Path(root) / file
if not file_path.is_file():
continue
stat = file_path.stat()
size = stat.st_size
mtime = stat.st_mtime
# Calculate relative path for portability
rel_path = str(file_path.relative_to(disk_path))
cursor.execute(
"INSERT OR REPLACE INTO files VALUES (?, ?, ?, ?, ?, ?)",
(rel_path, size, mtime, disk_name, None, 'indexed')
)
files_count += 1
total_size += size
if files_count % 1000 == 0:
logger.info(f"Indexed {files_count} files, {self.format_size(total_size)}...")
except Exception as e:
logger.warning(f"Skipping {file_path}: {e}")
continue
conn.commit()
logger.info(f"Completed indexing {disk_name}: {files_count} files, {self.format_size(total_size)}")
def calculate_disk_usage(self) -> Dict[str, Dict]:
"""Calculate current usage per disk"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
SELECT disk, SUM(size) as total_size, COUNT(*) as file_count
FROM files
GROUP BY disk
""")
usage = {}
for row in cursor:
disk = row[0]
size = row[1] or 0
count = row[2]
usage[disk] = {
'size': size,
'count': count,
'formatted_size': self.format_size(size)
}
return usage
def plan_migration(self, target_disk: str, destination_disks: List[str]) -> Dict:
"""
Create a migration plan to free up target_disk
:param target_disk: Disk to free up (e.g., 'D:')
:param destination_disks: List of disks to move files to
:return: Migration plan dictionary
"""
logger.info(f"Planning migration to free up {target_disk}")
usage = self.calculate_disk_usage()
if target_disk not in usage:
logger.error(f"Target disk {target_disk} not found in index!")
return {}
# Get files on target disk
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"SELECT path, size, modified_time FROM files WHERE disk = ? ORDER BY size DESC",
(target_disk,)
)
files_to_move = cursor.fetchall()
target_disk_usage = usage[target_disk]['size']
logger.info(f"Need to move {len(files_to_move)} files, {self.format_size(target_disk_usage)}")
# Calculate available space on destination disks
dest_availability = []
for disk in destination_disks:
if disk not in usage:
# Assume empty disk
available = float('inf')
else:
# In real scenario, query actual disk free space
available = float('inf') # Placeholder
dest_availability.append({
'disk': disk,
'available': available,
'planned_usage': 0
})
# Generate move plan
plan = {
'target_disk': target_disk,
'total_size': target_disk_usage,
'file_count': len(files_to_move),
'operations': [],
'destination_disks': destination_disks
}
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
for file_info in files_to_move:
rel_path, size, mtime = file_info
# Find best destination (simple round-robin for balance)
dest_disk = destination_disks[len(plan['operations']) % len(destination_disks)]
# Record operation
op = {
'source_disk': target_disk,
'source_path': rel_path,
'dest_disk': dest_disk,
'dest_path': rel_path, # Keep same relative path
'size': size
}
plan['operations'].append(op)
# Store in database
cursor.execute(
"INSERT INTO operations (source_path, dest_path, operation_type) VALUES (?, ?, ?)",
(f"{target_disk}:{rel_path}", f"{dest_disk}:{rel_path}", 'move')
)
conn.commit()
# Save plan to JSON
plan_file = f"migration_plan_{target_disk}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(plan_file, 'w') as f:
json.dump(plan, f, indent=2)
logger.info(f"Plan created with {len(plan['operations'])} operations")
logger.info(f"Plan saved to {plan_file}")
return plan
def verify_operation(self, source: Path, dest: Path) -> bool:
"""Verify file was copied correctly (size + optional checksum)"""
if not dest.exists():
return False
try:
source_stat = source.stat()
dest_stat = dest.stat()
if source_stat.st_size != dest_stat.st_size:
return False
# Optional: checksum verification for critical files
# if source_stat.st_size < 100*1024*1024: # Only for files < 100MB
# return self.file_checksum(source) == self.file_checksum(dest)
return True
except Exception as e:
logger.error(f"Verification error: {e}")
return False
@staticmethod
def file_checksum(path: Path) -> str:
"""Calculate MD5 checksum of file"""
hash_md5 = hashlib.md5()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def execute_migration(self, plan_file: str, dry_run: bool = True):
"""
Execute migration plan
:param plan_file: Path to plan JSON file
:param dry_run: If True, only simulate operations
"""
logger.info(f"{'DRY RUN' if dry_run else 'EXECUTING'} migration from {plan_file}")
with open(plan_file, 'r') as f:
plan = json.load(f)
operations = plan['operations']
logger.info(f"Processing {len(operations)} operations...")
success_count = 0
error_count = 0
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
for i, op in enumerate(operations, 1):
source_disk = op['source_disk']
source_path = op['source_path']
dest_disk = op['dest_disk']
dest_path = op['dest_path']
source_full = Path(source_disk) / source_path
dest_full = Path(dest_disk) / dest_path
logger.info(f"[{i}/{len(operations)}] {source_full} -> {dest_full}")
if dry_run:
# Simulate
if source_full.exists():
logger.info(f" Would move {self.format_size(op['size'])}")
success_count += 1
else:
logger.warning(f" Source does not exist!")
error_count += 1
continue
try:
# Create destination directory
dest_full.parent.mkdir(parents=True, exist_ok=True)
# Move file (copy + verify + delete)
if source_full.exists():
# Copy with metadata
shutil.copy2(source_full, dest_full)
# Verify
if self.verify_operation(source_full, dest_full):
# Update database
cursor.execute(
"UPDATE files SET disk = ?, status = 'moved' WHERE path = ? AND disk = ?",
(dest_disk, source_path, source_disk)
)
# Safe delete (could be made optional)
# source_full.unlink()
# Log operation as executed
cursor.execute(
"UPDATE operations SET executed = 1 WHERE source_path = ?",
(f"{source_disk}:{source_path}",)
)
logger.info(f" ✓ Moved and verified")
success_count += 1
else:
raise Exception("Verification failed")
else:
logger.warning(f" Source missing, skipping")
error_count += 1
except Exception as e:
logger.error(f" ✗ Error: {e}")
cursor.execute(
"UPDATE operations SET error = ? WHERE source_path = ?",
(str(e), f"{source_disk}:{source_path}")
)
error_count += 1
# Commit every 10 operations
if i % 10 == 0:
conn.commit()
conn.commit()
logger.info(f"Migration complete: {success_count} success, {error_count} errors")
if not dry_run and error_count == 0:
logger.info(f"✓ Disk {plan['target_disk']} is ready for Linux installation!")
logger.info(f" Remember to safely delete original files from {plan['target_disk']}")
def generate_report(self):
"""Generate status report"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
SELECT status, COUNT(*), SUM(size) FROM files GROUP BY status
""")
print("\n=== FILE MIGRATION REPORT ===")
for row in cursor:
status, count, size = row
print(f"{status:15}: {count:6} files, {self.format_size(size or 0)}")
cursor = conn.execute("""
SELECT operation_type, executed, verified, COUNT(*) FROM operations GROUP BY operation_type, executed, verified
""")
print("\n=== OPERATIONS REPORT ===")
for row in cursor:
op_type, executed, verified, count = row
status = "EXECUTED" if executed else "PENDING"
if verified:
status += "+VERIFIED"
print(f"{op_type:10} {status:15}: {count} operations")
@staticmethod
def format_size(size: int) -> str:
"""Format bytes to human readable string"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size < 1024:
return f"{size:.1f}{unit}"
size /= 1024
return f"{size:.1f}PB"
def main():
parser = argparse.ArgumentParser(description='Disk Reorganizer - Free up a disk for Linux dual-boot')
subparsers = parser.add_subparsers(dest='command', required=True)
# Index command
index_parser = subparsers.add_parser('index', help='Index files on a disk')
index_parser.add_argument('disk_root', help='Root path of disk (e.g., D:\\\\)')
index_parser.add_argument('disk_name', help='Logical name for the disk')
# Plan command
plan_parser = subparsers.add_parser('plan', help='Create migration plan')
plan_parser.add_argument('target_disk', help='Disk to free up')
plan_parser.add_argument('dest_disks', nargs='+', help='Destination disks')
# Execute command
exec_parser = subparsers.add_parser('execute', help='Execute migration plan')
exec_parser.add_argument('plan_file', help='Path to plan JSON file')
exec_parser.add_argument('--dry-run', action='store_true', help='Simulate without actual file operations')
# Report command
report_parser = subparsers.add_parser('report', help='Show current status')
args = parser.parse_args()
tool = DiskReorganizer()
if args.command == 'index':
tool.index_disk(args.disk_root, args.disk_name)
elif args.command == 'plan':
plan = tool.plan_migration(args.target_disk, args.dest_disks)
if plan:
print(f"\nPlan generated: {plan['file_count']} files, {tool.format_size(plan['total_size'])}")
print(f"Destination disks: {', '.join(plan['destination_disks'])}")
elif args.command == 'execute':
tool.execute_migration(args.plan_file, dry_run=args.dry_run)
elif args.command == 'report':
tool.generate_report()
if __name__ == '__main__':
main()