Compare commits

...

10 Commits

Author SHA1 Message Date
mike
196f646f95 flyway 2025-12-13 11:35:46 +01:00
mike
e9eb7ea5d9 fly wa 2025-12-13 11:35:33 +01:00
mike
9759001f4c remove_doc 2025-12-13 04:23:04 +01:00
mike
75034d5e51 base 2025-12-13 03:56:14 +01:00
mike
7c5df059df base 2025-12-13 02:05:17 +01:00
mike
2ec65e059c base 2025-12-13 01:12:59 +01:00
mike
942e87d439 base 2025-12-13 00:29:09 +01:00
mike
6449765890 base 2025-12-12 23:53:56 +01:00
mike
87550e426a base 2025-12-12 23:04:51 +01:00
mike
56b2db82fc base 2025-12-12 19:25:16 +01:00
58 changed files with 2160 additions and 7854 deletions

View File

@@ -35,4 +35,4 @@ HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD python -c "import psycopg2; psycopg2.connect(dbname='${POSTGRES_DB:-disk_reorganizer_db}', user='${POSTGRES_USER:-disk_reorg_user}', password='${POSTGRES_PASSWORD}', host='${DB_HOST:-db}', port='${DB_PORT:-5432}')" || exit 1
# Default command (can be overridden in docker-compose)
CMD ["python", "main.py", "--help"]
CMD ["python", "app/main.py", "--help"]

View File

@@ -59,25 +59,25 @@ System is now **network-capable**, **auditable**, **scalable**, and offers **rea
2. **Index**
```bash
python src/main.py index "D:\\" disk_d
python app/main.py index "D:\\" disk_d
```
3. **Plan**
```bash
python src/main.py plan disk_d disk_e
python app/main.py plan disk_d disk_e
```
4. **Dry-Run**
```bash
python src/main.py execute plan.json --dry-run
python app/main.py execute plan.json --dry-run
```
5. **Execute**
```bash
python src/main.py execute plan.json
python app/main.py execute plan.json
```
6. **Report**

View File

@@ -0,0 +1,110 @@
from pathlib import Path
from typing import Dict, Set, List
from collections import Counter
class FolderAnalyzer:
def __init__(self):
self.manifest_files = {
'java': ['pom.xml', 'build.gradle', 'build.gradle.kts'],
'javascript': ['package.json', 'yarn.lock', 'package-lock.json'],
'python': ['pyproject.toml', 'setup.py', 'requirements.txt', 'Pipfile'],
'go': ['go.mod', 'go.sum'],
'rust': ['Cargo.toml', 'Cargo.lock'],
'docker': ['Dockerfile', 'docker-compose.yml', 'docker-compose.yaml'],
'k8s': ['helm', 'kustomization.yaml', 'deployment.yaml']
}
self.intent_keywords = {
'infrastructure': ['infra', 'deploy', 'k8s', 'docker', 'terraform', 'ansible'],
'application': ['app', 'service', 'api', 'server', 'client'],
'data': ['data', 'dataset', 'models', 'training', 'ml'],
'documentation': ['docs', 'documentation', 'wiki', 'readme'],
'testing': ['test', 'tests', 'spec', 'e2e', 'integration'],
'build': ['build', 'dist', 'target', 'out', 'bin'],
'config': ['config', 'conf', 'settings', 'env']
}
def analyze_folder(self, folder_path: Path, files: List[Dict]) -> Dict:
files_list = [Path(f['path']) for f in files]
has_readme = any('readme' in f.name.lower() for f in files_list)
has_git = any('.git' in str(f) for f in files_list)
manifest_types = self._detect_manifests(files_list)
has_manifest = len(manifest_types) > 0
file_types = Counter(f.suffix.lower() for f in files_list if f.suffix)
dominant_types = dict(file_types.most_common(10))
intent = self._infer_intent(folder_path.name.lower(), files_list)
project_type = self._infer_project_type(manifest_types, dominant_types)
structure = {
'depth': len(folder_path.parts),
'has_src': any('src' in str(f) for f in files_list[:20]),
'has_tests': any('test' in str(f) for f in files_list[:20]),
'has_docs': any('doc' in str(f) for f in files_list[:20])
}
return {
'has_readme': has_readme,
'has_git': has_git,
'has_manifest': has_manifest,
'manifest_types': manifest_types,
'dominant_file_types': dominant_types,
'project_type': project_type,
'intent': intent,
'structure': structure
}
def _detect_manifests(self, files: List[Path]) -> List[str]:
detected = []
file_names = {f.name for f in files}
for tech, manifests in self.manifest_files.items():
if any(m in file_names for m in manifests):
detected.append(tech)
return detected
def _infer_intent(self, folder_name: str, files: List[Path]) -> str:
file_str = ' '.join(str(f) for f in files[:50])
for intent, keywords in self.intent_keywords.items():
if any(kw in folder_name or kw in file_str.lower() for kw in keywords):
return intent
return 'unknown'
def _infer_project_type(self, manifests: List[str], file_types: Dict) -> str:
if manifests:
return manifests[0]
if '.py' in file_types and file_types.get('.py', 0) > 5:
return 'python'
if '.js' in file_types or '.ts' in file_types:
return 'javascript'
if '.java' in file_types:
return 'java'
if '.go' in file_types:
return 'go'
return 'mixed'
def generate_summary(self, folder_analysis: Dict, readme_text: str = None) -> str:
parts = []
if folder_analysis.get('project_type'):
parts.append(f"{folder_analysis['project_type']} project")
if folder_analysis.get('intent'):
parts.append(f"for {folder_analysis['intent']}")
if folder_analysis.get('manifest_types'):
parts.append(f"using {', '.join(folder_analysis['manifest_types'])}")
if readme_text:
first_para = readme_text.split('\n\n')[0][:200]
parts.append(f"Description: {first_para}")
return ' '.join(parts) if parts else 'Mixed content folder'

View File

@@ -0,0 +1,3 @@
from .classifier import FileClassifier
__all__ = ['FileClassifier']

View File

@@ -0,0 +1,124 @@
from pathlib import Path
from typing import List, Set, Dict, Tuple
import re
class FileClassifier:
def __init__(self):
self.build_patterns = {
'node_modules', '__pycache__', '.pytest_cache', 'target', 'build', 'dist',
'.gradle', 'bin', 'obj', '.next', '.nuxt', 'vendor', '.venv', 'venv',
'site-packages', 'bower_components', 'jspm_packages'
}
self.artifact_patterns = {
'java': {'.jar', '.war', '.ear', '.class'},
'python': {'.pyc', '.pyo', '.whl', '.egg'},
'node': {'node_modules'},
'go': {'vendor', 'pkg'},
'rust': {'target'},
'docker': {'.dockerignore', 'Dockerfile'}
}
self.category_keywords = {
'apps': {'app', 'application', 'service', 'api', 'server', 'client'},
'infra': {'infrastructure', 'devops', 'docker', 'kubernetes', 'terraform', 'ansible', 'gitea', 'jenkins'},
'dev': {'project', 'workspace', 'repo', 'src', 'code', 'dev'},
'cache': {'cache', 'temp', 'tmp', '.cache'},
'databases': {'postgres', 'mysql', 'redis', 'mongo', 'db', 'database'},
'backups': {'backup', 'bak', 'snapshot', 'archive'},
'user': {'documents', 'pictures', 'videos', 'downloads', 'desktop', 'music'},
'artifacts': {'build', 'dist', 'release', 'output'},
'temp': {'tmp', 'temp', 'staging', 'processing'}
}
self.media_extensions = {
'video': {'.mp4', '.mkv', '.avi', '.mov', '.wmv', '.flv', '.webm'},
'audio': {'.mp3', '.flac', '.wav', '.ogg', '.m4a', '.aac'},
'image': {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.svg', '.webp'},
'document': {'.pdf', '.doc', '.docx', '.txt', '.md', '.odt'},
'spreadsheet': {'.xls', '.xlsx', '.csv', '.ods'},
'presentation': {'.ppt', '.pptx', '.odp'}
}
self.code_extensions = {
'.py', '.js', '.ts', '.java', '.go', '.rs', '.c', '.cpp', '.h',
'.cs', '.rb', '.php', '.swift', '.kt', '.scala', '.clj', '.r'
}
def classify_path(self, path: str, size: int = 0) -> Tuple[Set[str], str, bool]:
p = Path(path)
labels = set()
primary_category = 'misc'
is_build_artifact = False
parts = p.parts
name_lower = p.name.lower()
for part in parts:
part_lower = part.lower()
if part_lower in self.build_patterns:
is_build_artifact = True
labels.add('build-artifact')
break
if is_build_artifact:
for artifact_type, patterns in self.artifact_patterns.items():
if any(part.lower() in patterns for part in parts) or p.suffix in patterns:
primary_category = f'artifacts/{artifact_type}'
labels.add('artifact')
return labels, primary_category, is_build_artifact
if '.git' in parts:
labels.add('vcs')
primary_category = 'infra/git-infrastructure'
return labels, primary_category, False
for category, keywords in self.category_keywords.items():
if any(kw in name_lower or any(kw in part.lower() for part in parts) for kw in keywords):
labels.add(category)
primary_category = category
break
for media_type, extensions in self.media_extensions.items():
if p.suffix.lower() in extensions:
labels.add(media_type)
labels.add('media')
primary_category = f'user/{media_type}'
break
if p.suffix.lower() in self.code_extensions:
labels.add('code')
if primary_category == 'misc':
primary_category = 'dev'
if size > 100 * 1024 * 1024:
labels.add('large-file')
if any(kw in name_lower for kw in ['test', 'spec', 'mock']):
labels.add('test')
if any(kw in name_lower for kw in ['config', 'settings', 'env']):
labels.add('config')
return labels, primary_category, is_build_artifact
def suggest_target_path(self, source_path: str, category: str, labels: Set[str]) -> str:
p = Path(source_path)
if 'build-artifact' in labels:
return f'trash/build-artifacts/{source_path}'
if category.startswith('artifacts/'):
artifact_type = category.split('/')[-1]
return f'artifacts/{artifact_type}/{p.name}'
if category.startswith('user/'):
media_type = category.split('/')[-1]
return f'user/{media_type}/{p.name}'
parts = [part for part in p.parts if part not in self.build_patterns]
if len(parts) > 3:
project_name = parts[0] if parts else 'misc'
return f'{category}/{project_name}/{"/".join(parts[1:])}'
return f'{category}/{source_path}'

View File

@@ -71,7 +71,7 @@ class ClassificationEngine:
cursor.execute("""
SELECT path, checksum
FROM files
WHERE disk = %s AND category IS NULL
WHERE disk_label = %s AND category IS NULL
""", (disk,))
else:
cursor.execute("""

3
app/content/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from .profiler import ContentProfiler
from .extractors import ContentExtractor
__all__ = ['ContentProfiler', 'ContentExtractor']

62
app/content/extractors.py Normal file
View File

@@ -0,0 +1,62 @@
from pathlib import Path
from typing import Dict, Optional
import json
class ContentExtractor:
def __init__(self):
self.extractors = {'pdf_text': self._extract_pdf, 'ocr+caption': self._extract_image, 'transcribe': self._extract_audio, 'transcribe+scenes': self._extract_video, 'office_text': self._extract_document, 'read': self._extract_text, 'read+syntax': self._extract_code}
def extract(self, file_path: Path, extractor_type: str) -> Dict:
extractor = self.extractors.get(extractor_type)
if not extractor:
return {'error': f'Unknown extractor: {extractor_type}'}
try:
return extractor(file_path)
except Exception as e:
return {'error': str(e)}
def _extract_text(self, file_path: Path) -> Dict:
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read(1024 * 1024)
return {'text': content, 'char_count': len(content), 'needs_llm': False}
except Exception as e:
return {'error': str(e)}
def _extract_code(self, file_path: Path) -> Dict:
result = self._extract_text(file_path)
if 'error' not in result:
result['type'] = 'code'
result['needs_llm'] = True
return result
def _extract_pdf(self, file_path: Path) -> Dict:
try:
import PyPDF2
text_parts = []
with open(file_path, 'rb') as f:
pdf = PyPDF2.PdfReader(f)
for page in pdf.pages[:10]:
text_parts.append(page.extract_text())
text = '\n'.join(text_parts)
return {'text': text, 'pages_extracted': len(text_parts), 'needs_llm': len(text.strip()) > 100, 'type': 'document'}
except Exception as e:
return {'error': str(e), 'needs_ocr': True}
def _extract_image(self, file_path: Path) -> Dict:
return {'type': 'image', 'needs_ocr': True, 'needs_caption': True, 'needs_llm': True, 'pipeline': ['ocr', 'caption', 'embedding'], 'status': 'pending'}
def _extract_audio(self, file_path: Path) -> Dict:
return {'type': 'audio', 'needs_transcription': True, 'needs_llm': True, 'pipeline': ['transcribe', 'summarize'], 'status': 'pending'}
def _extract_video(self, file_path: Path) -> Dict:
return {'type': 'video', 'needs_transcription': True, 'needs_scene_detection': True, 'needs_llm': True, 'pipeline': ['transcribe', 'scenes', 'summarize'], 'status': 'pending'}
def _extract_document(self, file_path: Path) -> Dict:
try:
import textract
text = textract.process(str(file_path)).decode('utf-8')
return {'text': text, 'type': 'document', 'needs_llm': len(text.strip()) > 100}
except:
return {'error': 'textract failed', 'needs_llm': True}

108
app/content/profiler.py Normal file
View File

@@ -0,0 +1,108 @@
from pathlib import Path
from typing import Dict, Optional, Tuple
import mimetypes
import magic
import json
from datetime import datetime
class ContentProfiler:
def __init__(self):
self.mime_detector = magic.Magic(mime=True)
self.kind_mapping = {'text': ['text/plain', 'text/html', 'text/css', 'text/javascript', 'text/markdown'], 'code': ['application/x-python', 'application/javascript', 'text/x-java', 'text/x-c'], 'pdf': ['application/pdf'], 'image': ['image/jpeg', 'image/png', 'image/gif', 'image/webp', 'image/svg+xml'], 'audio': ['audio/mpeg', 'audio/wav', 'audio/ogg', 'audio/flac'], 'video': ['video/mp4', 'video/x-matroska', 'video/avi', 'video/webm'], 'archive': ['application/zip', 'application/x-tar', 'application/gzip', 'application/x-7z-compressed'], 'document': ['application/msword', 'application/vnd.openxmlformats-officedocument'], 'spreadsheet': ['application/vnd.ms-excel', 'text/csv']}
self.text_exts = {'.txt', '.md', '.rst', '.log', '.json', '.xml', '.yaml', '.yml', '.toml', '.ini', '.cfg'}
self.code_exts = {'.py', '.js', '.ts', '.java', '.go', '.rs', '.c', '.cpp', '.h', '.cs', '.rb', '.php'}
self.processable_kinds = {'text', 'code', 'pdf', 'image', 'audio', 'video', 'document'}
def profile_file(self, file_path: Path) -> Dict:
try:
stat = file_path.stat()
size = stat.st_size
mtime = datetime.fromtimestamp(stat.st_mtime)
mime_type = self._detect_mime(file_path)
kind = self._determine_kind(file_path, mime_type)
profile = {'path': str(file_path), 'size': size, 'mtime': mtime.isoformat(), 'mime': mime_type, 'kind': kind, 'processable': kind in self.processable_kinds, 'extractor': self._suggest_extractor(kind, mime_type), 'hints': self._extract_hints(file_path, kind, mime_type, size)}
return profile
except Exception as e:
return {'path': str(file_path), 'error': str(e), 'processable': False}
def _detect_mime(self, file_path: Path) -> str:
try:
return self.mime_detector.from_file(str(file_path))
except:
guess = mimetypes.guess_type(str(file_path))[0]
return guess or 'application/octet-stream'
def _determine_kind(self, file_path: Path, mime_type: str) -> str:
for kind, mimes in self.kind_mapping.items():
if any((mime in mime_type for mime in mimes)):
return kind
suffix = file_path.suffix.lower()
if suffix in self.text_exts:
return 'text'
if suffix in self.code_exts:
return 'code'
return 'unknown'
def _suggest_extractor(self, kind: str, mime_type: str) -> Optional[str]:
extractors = {'pdf': 'pdf_text', 'image': 'ocr+caption', 'audio': 'transcribe', 'video': 'transcribe+scenes', 'document': 'office_text', 'text': 'read', 'code': 'read+syntax'}
return extractors.get(kind)
def _extract_hints(self, file_path: Path, kind: str, mime_type: str, size: int) -> Dict:
hints = {}
if kind == 'text' or kind == 'code':
hints['language'] = self._guess_language(file_path)
if size < 1024 * 1024:
hints['lines'] = self._count_lines(file_path)
if kind == 'pdf':
hints['page_count'] = self._get_pdf_pages(file_path)
if kind in ['audio', 'video']:
hints['duration'] = self._get_media_duration(file_path)
if kind == 'image':
hints['has_exif'] = self._has_exif(file_path)
hints['dimensions'] = self._get_image_dimensions(file_path)
return hints
def _guess_language(self, file_path: Path) -> Optional[str]:
lang_map = {'.py': 'python', '.js': 'javascript', '.ts': 'typescript', '.java': 'java', '.go': 'go', '.rs': 'rust', '.c': 'c', '.cpp': 'cpp', '.cs': 'csharp', '.rb': 'ruby', '.php': 'php'}
return lang_map.get(file_path.suffix.lower())
def _count_lines(self, file_path: Path) -> Optional[int]:
try:
with open(file_path, 'rb') as f:
return sum((1 for _ in f))
except:
return None
def _get_pdf_pages(self, file_path: Path) -> Optional[int]:
try:
import PyPDF2
with open(file_path, 'rb') as f:
pdf = PyPDF2.PdfReader(f)
return len(pdf.pages)
except:
return None
def _get_media_duration(self, file_path: Path) -> Optional[float]:
try:
import ffmpeg
probe = ffmpeg.probe(str(file_path))
return float(probe['format']['duration'])
except:
return None
def _has_exif(self, file_path: Path) -> bool:
try:
from PIL import Image
img = Image.open(file_path)
return hasattr(img, '_getexif') and img._getexif() is not None
except:
return False
def _get_image_dimensions(self, file_path: Path) -> Optional[Tuple[int, int]]:
try:
from PIL import Image
with Image.open(file_path) as img:
return img.size
except:
return None

View File

@@ -71,16 +71,16 @@ class DeduplicationEngine:
cursor.execute("""
SELECT path, size
FROM files
WHERE disk = %s AND checksum IS NULL
WHERE disk_label = %s AND checksum IS NULL
ORDER BY size DESC
""", (disk,))
""", (disk,))
else:
cursor.execute("""
SELECT path, size
FROM files
WHERE checksum IS NULL
ORDER BY size DESC
""")
""")
files_to_process = cursor.fetchall()
total_files = len(files_to_process)
@@ -111,7 +111,7 @@ class DeduplicationEngine:
UPDATE files
SET checksum = %s, duplicate_of = %s
WHERE path = %s
""", (checksum, duplicate_of, str(path)))
""", (checksum, duplicate_of, str(path)))
stats.files_succeeded += 1
stats.bytes_processed += size
@@ -226,10 +226,10 @@ class DeduplicationEngine:
cursor.execute("""
SELECT checksum, array_agg(path ORDER BY path) as paths
FROM files
WHERE disk = %s AND checksum IS NOT NULL
WHERE disk_label = %s AND checksum IS NOT NULL
GROUP BY checksum
HAVING COUNT(*) > 1
""", (disk,))
""", (disk,))
else:
cursor.execute("""
SELECT checksum, array_agg(path ORDER BY path) as paths
@@ -237,7 +237,7 @@ class DeduplicationEngine:
WHERE checksum IS NOT NULL
GROUP BY checksum
HAVING COUNT(*) > 1
""")
""")
duplicates = {}
for checksum, paths in cursor.fetchall():
@@ -284,7 +284,7 @@ class DeduplicationEngine:
FROM files
WHERE checksum IS NOT NULL
) AS unique_files
""")
""")
stats['unique_size'] = cursor.fetchone()[0]
# Wasted space
@@ -328,7 +328,7 @@ class DeduplicationEngine:
UPDATE files
SET duplicate_of = NULL
WHERE path IN (SELECT path FROM canonical)
""")
""")
count = cursor.rowcount
conn.commit()

View File

@@ -0,0 +1,5 @@
from .scanner import FileScanner, FilteredScanner
from .system import SystemAPI
from .engine import DiscoveryEngine
from ._protocols import FileMeta, MountInfo, DiskInfo, IFileScanner, ISystemAPI
__all__ = ['FileScanner', 'FilteredScanner', 'SystemAPI', 'DiscoveryEngine', 'FileMeta', 'MountInfo', 'DiskInfo', 'IFileScanner', 'ISystemAPI']

View File

@@ -1,54 +1,37 @@
"""Protocol definitions for the discovery package"""
from typing import Iterator, Protocol, Any
from pathlib import Path
from dataclasses import dataclass
@dataclass
class FileMeta:
"""Metadata for a discovered file"""
path: Path
size: int
modified_time: float
created_time: float
# Add other metadata fields as needed
@dataclass
class MountInfo:
"""Information about a mounted filesystem"""
device: str
mount_point: str
fs_type: str
options: str
# Add other mount info fields as needed
@dataclass
class DiskInfo:
"""Information about a disk/NVMe device"""
device: str
model: str
size: int
serial: str
# Add other disk info fields as needed
class IFileScanner(Protocol):
"""Protocol for file scanning operations"""
def scan(self, root: Path) -> Iterator[FileMeta]:
"""Scan a directory tree and yield file metadata"""
...
class ISystemAPI(Protocol):
"""Protocol for system information queries"""
def query_mounts(self) -> list[MountInfo]:
"""Query mounted filesystems"""
...
def query_nvmes(self) -> list[DiskInfo]:
"""Query NVMe/disk information"""
...

133
app/discovery/engine.py Normal file
View File

@@ -0,0 +1,133 @@
from pathlib import Path
from typing import Optional, Callable
from datetime import datetime
import psycopg2
from psycopg2.extras import execute_batch
from .scanner import FileScanner
from .system import SystemAPI
from ._protocols import FileMeta
from ..shared.models import FileRecord, DiskInfo, ProcessingStats
from ..shared.config import DatabaseConfig
from ..shared.logger import ProgressLogger
class DiscoveryEngine:
def __init__(self, db_config: DatabaseConfig, logger: ProgressLogger, batch_size: int=1000):
self.db_config = db_config
self.logger = logger
self.batch_size = batch_size
self.system_api = SystemAPI()
self._connection = None
def _get_connection(self):
if self._connection is None or self._connection.closed:
self._connection = psycopg2.connect(host=self.db_config.host, port=self.db_config.port, database=self.db_config.database, user=self.db_config.user, password=self.db_config.password)
return self._connection
def _ensure_tables(self):
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("\n CREATE TABLE IF NOT EXISTS files (\n id SERIAL PRIMARY KEY,\n path TEXT NOT NULL UNIQUE,\n size BIGINT NOT NULL,\n modified_time DOUBLE PRECISION NOT NULL,\n created_time DOUBLE PRECISION NOT NULL,\n disk_label TEXT NOT NULL,\n checksum TEXT,\n status TEXT DEFAULT 'indexed',\n category TEXT,\n duplicate_of TEXT,\n discovered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,\n updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n )\n ")
cursor.execute('\n CREATE INDEX IF NOT EXISTS idx_files_path ON files(path)\n ')
cursor.execute('\n CREATE INDEX IF NOT EXISTS idx_files_disk ON files(disk_label)\n ')
cursor.execute('\n CREATE INDEX IF NOT EXISTS idx_files_checksum ON files(checksum)\n ')
conn.commit()
cursor.close()
def discover_path(self, root: Path, scanner: Optional[FileScanner]=None, progress_callback: Optional[Callable[[int, int, ProcessingStats], None]]=None) -> ProcessingStats:
self.logger.section(f'Discovering: {root}')
self._ensure_tables()
if scanner is None:
scanner = FileScanner(error_handler=lambda e, p: self.logger.warning(f'Error scanning {p}: {e}'))
disk = self.system_api.get_disk_for_path(root)
if disk is None:
disk = str(root)
stats = ProcessingStats()
batch = []
conn = self._get_connection()
cursor = conn.cursor()
try:
for file_meta in scanner.scan(root):
record = FileRecord(path=file_meta.path, size=file_meta.size, modified_time=file_meta.modified_time, created_time=file_meta.created_time, disk_label=disk)
batch.append(record)
stats.files_processed += 1
stats.bytes_processed += record.size
if len(batch) >= self.batch_size:
self._insert_batch(cursor, batch)
conn.commit()
batch.clear()
if progress_callback:
progress_callback(stats.files_processed, 0, stats)
if stats.files_processed % (self.batch_size * 10) == 0:
self.logger.progress(stats.files_processed, stats.files_processed, prefix='Files discovered', bytes_processed=stats.bytes_processed, elapsed_seconds=stats.elapsed_seconds)
if batch:
self._insert_batch(cursor, batch)
conn.commit()
stats.files_succeeded = stats.files_processed
except Exception as e:
conn.rollback()
self.logger.error(f'Discovery failed: {e}')
raise
finally:
cursor.close()
self.logger.info(f'Discovery complete: {stats.files_processed} files, {stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s')
return stats
def _insert_batch(self, cursor, batch: list[FileRecord]):
query = '\n INSERT INTO files (path, size, modified_time, created_time, disk_label, checksum, status, category, duplicate_of)\n VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)\n ON CONFLICT (path) DO UPDATE SET\n size = EXCLUDED.size,\n modified_time = EXCLUDED.modified_time,\n updated_at = CURRENT_TIMESTAMP\n '
data = [(str(record.path), record.size, record.modified_time, record.created_time, record.disk_label, record.checksum, record.status, record.category, record.duplicate_of) for record in batch]
execute_batch(cursor, query, data, page_size=self.batch_size)
def get_disk_info(self) -> list[DiskInfo]:
self.logger.subsection('Querying disk information')
disks = []
for disk_info in self.system_api.query_nvmes():
mount_point = None
fs_type = 'unknown'
for mount in self.system_api.query_mounts():
if mount.device == disk_info.device:
mount_point = Path(mount.mount_point)
fs_type = mount.fs_type
break
if mount_point:
total, used, free = self.system_api.get_disk_usage(mount_point)
else:
total = disk_info.size
used = 0
free = disk_info.size
disk = DiskInfo(name=disk_info.device, device=disk_info.device, mount_point=mount_point or Path('/'), total_size=total, used_size=used, free_size=free, fs_type=fs_type)
disks.append(disk)
self.logger.info(f' {disk.name}: {disk.usage_percent:.1f}% used ({disk.used_size:,} / {disk.total_size:,} bytes)')
return disks
def get_file_count(self, disk: Optional[str]=None) -> int:
conn = self._get_connection()
cursor = conn.cursor()
if disk:
cursor.execute('SELECT COUNT(*) FROM files WHERE disk_label = %s', (disk,))
else:
cursor.execute('SELECT COUNT(*) FROM files')
count = cursor.fetchone()[0]
cursor.close()
return count
def get_total_size(self, disk: Optional[str]=None) -> int:
conn = self._get_connection()
cursor = conn.cursor()
if disk:
cursor.execute('SELECT COALESCE(SUM(size), 0) FROM files WHERE disk_label = %s', (disk,))
else:
cursor.execute('SELECT COALESCE(SUM(size), 0) FROM files')
total = cursor.fetchone()[0]
cursor.close()
return total
def close(self):
if self._connection and (not self._connection.closed):
self._connection.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

View File

@@ -1,28 +1,12 @@
"""File system scanner implementing IFileScanner protocol"""
import os
from pathlib import Path
from typing import Iterator, Optional, Callable
from datetime import datetime
from ._protocols import FileMeta
class FileScanner:
"""File system scanner with filtering and error handling"""
def __init__(
self,
follow_symlinks: bool = False,
skip_hidden: bool = True,
error_handler: Optional[Callable[[Exception, Path], None]] = None
):
"""Initialize file scanner
Args:
follow_symlinks: Whether to follow symbolic links
skip_hidden: Whether to skip hidden files/directories
error_handler: Optional callback for handling errors during scan
"""
def __init__(self, follow_symlinks: bool=False, skip_hidden: bool=True, error_handler: Optional[Callable[[Exception, Path], None]]=None):
self.follow_symlinks = follow_symlinks
self.skip_hidden = skip_hidden
self.error_handler = error_handler
@@ -31,24 +15,14 @@ class FileScanner:
self._errors = 0
def scan(self, root: Path) -> Iterator[FileMeta]:
"""Scan a directory tree and yield file metadata
Args:
root: Root directory to scan
Yields:
FileMeta objects for each discovered file
"""
if not root.exists():
error = FileNotFoundError(f"Path does not exist: {root}")
error = FileNotFoundError(f'Path does not exist: {root}')
if self.error_handler:
self.error_handler(error, root)
else:
raise error
return
if not root.is_dir():
# If root is a file, just return its metadata
try:
yield self._get_file_meta(root)
except Exception as e:
@@ -58,115 +32,59 @@ class FileScanner:
else:
raise
return
# Walk directory tree
for dirpath, dirnames, filenames in os.walk(root, followlinks=self.follow_symlinks):
current_dir = Path(dirpath)
# Filter directories if needed
if self.skip_hidden:
dirnames[:] = [d for d in dirnames if not d.startswith('.')]
# Process files
for filename in filenames:
if self.skip_hidden and filename.startswith('.'):
continue
file_path = current_dir / filename
try:
# Skip broken symlinks
if file_path.is_symlink() and not file_path.exists():
if file_path.is_symlink() and (not file_path.exists()):
continue
meta = self._get_file_meta(file_path)
self._files_scanned += 1
self._bytes_scanned += meta.size
yield meta
except PermissionError as e:
self._errors += 1
if self.error_handler:
self.error_handler(e, file_path)
# Continue scanning
continue
except Exception as e:
self._errors += 1
if self.error_handler:
self.error_handler(e, file_path)
# Continue scanning
continue
def _get_file_meta(self, path: Path) -> FileMeta:
"""Get file metadata
Args:
path: Path to file
Returns:
FileMeta object with file metadata
Raises:
OSError: If file cannot be accessed
"""
stat = path.stat()
# Get creation time (platform dependent)
created_time = stat.st_ctime
if hasattr(stat, 'st_birthtime'):
created_time = stat.st_birthtime
return FileMeta(
path=path,
size=stat.st_size,
modified_time=stat.st_mtime,
created_time=created_time
)
return FileMeta(path=path, size=stat.st_size, modified_time=stat.st_mtime, created_time=created_time)
@property
def files_scanned(self) -> int:
"""Get count of files scanned"""
return self._files_scanned
@property
def bytes_scanned(self) -> int:
"""Get total bytes scanned"""
return self._bytes_scanned
@property
def errors(self) -> int:
"""Get count of errors encountered"""
return self._errors
def reset_stats(self) -> None:
"""Reset scanning statistics"""
self._files_scanned = 0
self._bytes_scanned = 0
self._errors = 0
class FilteredScanner(FileScanner):
"""Scanner with additional filtering capabilities"""
def __init__(
self,
min_size: Optional[int] = None,
max_size: Optional[int] = None,
extensions: Optional[list[str]] = None,
exclude_patterns: Optional[list[str]] = None,
**kwargs
):
"""Initialize filtered scanner
Args:
min_size: Minimum file size in bytes
max_size: Maximum file size in bytes
extensions: List of file extensions to include (e.g., ['.txt', '.py'])
exclude_patterns: List of path patterns to exclude
**kwargs: Additional arguments passed to FileScanner
"""
def __init__(self, min_size: Optional[int]=None, max_size: Optional[int]=None, extensions: Optional[list[str]]=None, exclude_patterns: Optional[list[str]]=None, **kwargs):
super().__init__(**kwargs)
self.min_size = min_size
self.max_size = max_size
@@ -174,41 +92,19 @@ class FilteredScanner(FileScanner):
self.exclude_patterns = exclude_patterns or []
def scan(self, root: Path) -> Iterator[FileMeta]:
"""Scan with additional filtering
Args:
root: Root directory to scan
Yields:
FileMeta objects for files matching filter criteria
"""
for meta in super().scan(root):
# Size filtering
if self.min_size is not None and meta.size < self.min_size:
continue
if self.max_size is not None and meta.size > self.max_size:
continue
# Extension filtering
if self.extensions is not None:
if meta.path.suffix.lower() not in self.extensions:
continue
# Exclude pattern filtering
if self._should_exclude(meta.path):
continue
yield meta
def _should_exclude(self, path: Path) -> bool:
"""Check if path matches any exclude pattern
Args:
path: Path to check
Returns:
True if path should be excluded
"""
path_str = str(path)
for pattern in self.exclude_patterns:
if pattern in path_str:

View File

@@ -1,167 +1,80 @@
"""System API for querying mounts and disks"""
import os
import subprocess
from pathlib import Path
from typing import Optional
import psutil
from ._protocols import MountInfo, DiskInfo
class SystemAPI:
"""System information API for querying mounts and disks"""
def query_mounts(self) -> list[MountInfo]:
"""Query mounted filesystems
Returns:
List of MountInfo objects for all mounted filesystems
"""
mounts = []
for partition in psutil.disk_partitions(all=False):
mount_info = MountInfo(
device=partition.device,
mount_point=partition.mountpoint,
fs_type=partition.fstype,
options=partition.opts
)
mount_info = MountInfo(device=partition.device, mount_point=partition.mountpoint, fs_type=partition.fstype, options=partition.opts)
mounts.append(mount_info)
return mounts
def query_nvmes(self) -> list[DiskInfo]:
"""Query NVMe/disk information
Returns:
List of DiskInfo objects for all disks
"""
disks = []
# Try to get disk information using lsblk
try:
result = subprocess.run(
['lsblk', '-ndo', 'NAME,MODEL,SIZE,SERIAL', '-b'],
capture_output=True,
text=True,
check=False
)
result = subprocess.run(['lsblk', '-ndo', 'NAME,MODEL,SIZE,SERIAL', '-b'], capture_output=True, text=True, check=False)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if not line.strip():
continue
parts = line.split(maxsplit=3)
if len(parts) >= 3:
device = f"/dev/{parts[0]}"
model = parts[1] if len(parts) > 1 else "Unknown"
size_str = parts[2] if len(parts) > 2 else "0"
serial = parts[3] if len(parts) > 3 else "Unknown"
device = f'/dev/{parts[0]}'
model = parts[1] if len(parts) > 1 else 'Unknown'
size_str = parts[2] if len(parts) > 2 else '0'
serial = parts[3] if len(parts) > 3 else 'Unknown'
try:
size = int(size_str)
except ValueError:
size = 0
disk_info = DiskInfo(
device=device,
model=model,
size=size,
serial=serial
)
disk_info = DiskInfo(device=device, model=model, size=size, serial=serial)
disks.append(disk_info)
except FileNotFoundError:
# lsblk not available, fall back to basic info
pass
# If lsblk failed or unavailable, try alternative method
if not disks:
disks = self._query_disks_fallback()
return disks
def _query_disks_fallback(self) -> list[DiskInfo]:
"""Fallback method for querying disk information
Returns:
List of DiskInfo objects using psutil
"""
disks = []
seen_devices = set()
for partition in psutil.disk_partitions(all=True):
device = partition.device
# Skip non-disk devices
if not device.startswith('/dev/'):
continue
# Get base device (e.g., /dev/sda from /dev/sda1)
base_device = self._get_base_device(device)
if base_device in seen_devices:
continue
seen_devices.add(base_device)
try:
usage = psutil.disk_usage(partition.mountpoint)
size = usage.total
except (PermissionError, OSError):
size = 0
disk_info = DiskInfo(
device=base_device,
model="Unknown",
size=size,
serial="Unknown"
)
disk_info = DiskInfo(device=base_device, model='Unknown', size=size, serial='Unknown')
disks.append(disk_info)
return disks
def _get_base_device(self, device: str) -> str:
"""Extract base device name from partition device
Args:
device: Device path (e.g., /dev/sda1, /dev/nvme0n1p1)
Returns:
Base device path (e.g., /dev/sda, /dev/nvme0n1)
"""
# Handle NVMe devices
if 'nvme' in device:
# /dev/nvme0n1p1 -> /dev/nvme0n1
if 'p' in device:
return device.rsplit('p', 1)[0]
return device
# Handle standard devices (sda, sdb, etc.)
# /dev/sda1 -> /dev/sda
import re
match = re.match(r'(/dev/[a-z]+)', device)
match = re.match('(/dev/[a-z]+)', device)
if match:
return match.group(1)
return device
def get_disk_for_path(self, path: Path) -> Optional[str]:
"""Get the disk/mount point for a given path
Args:
path: Path to check
Returns:
Mount point device or None if not found
"""
path = path.resolve()
# Find the mount point that contains this path
best_match = None
best_match_len = 0
for partition in psutil.disk_partitions():
mount_point = Path(partition.mountpoint)
try:
@@ -172,39 +85,19 @@ class SystemAPI:
best_match_len = mount_len
except (ValueError, OSError):
continue
return best_match
def get_disk_usage(self, path: Path) -> tuple[int, int, int]:
"""Get disk usage for a path
Args:
path: Path to check
Returns:
Tuple of (total, used, free) in bytes
"""
try:
usage = psutil.disk_usage(str(path))
return usage.total, usage.used, usage.free
return (usage.total, usage.used, usage.free)
except (PermissionError, OSError):
return 0, 0, 0
return (0, 0, 0)
def get_mount_point(self, path: Path) -> Optional[Path]:
"""Get the mount point for a given path
Args:
path: Path to check
Returns:
Mount point path or None if not found
"""
path = path.resolve()
# Find the mount point that contains this path
best_match = None
best_match_len = 0
for partition in psutil.disk_partitions():
mount_point = Path(partition.mountpoint)
try:
@@ -215,19 +108,9 @@ class SystemAPI:
best_match_len = mount_len
except (ValueError, OSError):
continue
return best_match
def is_same_filesystem(self, path1: Path, path2: Path) -> bool:
"""Check if two paths are on the same filesystem
Args:
path1: First path
path2: Second path
Returns:
True if paths are on the same filesystem
"""
try:
stat1 = path1.stat()
stat2 = path2.stat()

View File

@@ -0,0 +1,59 @@
from typing import Dict
import re
class ContentEnricher:
def __init__(self, llm_client=None):
self.llm_client = llm_client
self.pii_patterns = {
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'credit_card': r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b'
}
def enrich(self, text: str, use_llm: bool = False) -> Dict:
enrichment = {
'summary': self._basic_summary(text),
'word_count': len(text.split()),
'has_pii': self._detect_pii(text),
'quality': self._assess_quality(text),
'topics': self._extract_basic_topics(text)
}
if use_llm and self.llm_client:
llm_result = self.llm_client.classify_content(text)
if llm_result.get('success'):
enrichment['llm_classification'] = llm_result['text']
return enrichment
def _basic_summary(self, text: str) -> str:
sentences = re.split(r'[.!?]+', text)
return ' '.join(sentences[:3])[:200]
def _detect_pii(self, text: str) -> Dict:
detected = {}
for pii_type, pattern in self.pii_patterns.items():
matches = re.findall(pattern, text)
if matches:
detected[pii_type] = len(matches)
return detected
def _assess_quality(self, text: str) -> str:
if len(text.strip()) < 10:
return 'low'
special_char_ratio = sum(1 for c in text if not c.isalnum() and not c.isspace()) / len(text)
if special_char_ratio > 0.3:
return 'low'
return 'high' if len(text.split()) > 50 else 'medium'
def _extract_basic_topics(self, text: str) -> list:
words = re.findall(r'\b[A-Z][a-z]+\b', text)
word_freq = {}
for word in words:
if len(word) > 3:
word_freq[word] = word_freq.get(word, 0) + 1
return sorted(word_freq, key=word_freq.get, reverse=True)[:10]

View File

@@ -0,0 +1,54 @@
import requests
import json
from typing import Dict, Optional
class LLMClient:
def __init__(self, endpoint: str = 'http://192.168.1.74:1234', model: str = 'local'):
self.endpoint = endpoint
self.model = model
self.local_ollama = 'http://localhost:11434'
def summarize(self, text: str, max_length: int = 200) -> Dict:
prompt = f"Summarize the following in {max_length} chars or less:\n\n{text[:2000]}"
return self._query(prompt)
def extract_topics(self, text: str) -> Dict:
prompt = f"Extract 5-10 key topics/tags from this text. Return as comma-separated list:\n\n{text[:2000]}"
return self._query(prompt)
def classify_content(self, text: str) -> Dict:
prompt = f"Classify this content. Return: category, topics, has_pii (yes/no), quality (high/medium/low):\n\n{text[:1000]}"
return self._query(prompt)
def _query(self, prompt: str, use_local: bool = False) -> Dict:
try:
endpoint = self.local_ollama if use_local else self.endpoint
if use_local:
response = requests.post(
f'{endpoint}/api/generate',
json={'model': 'llama3.2', 'prompt': prompt, 'stream': False},
timeout=30
)
else:
response = requests.post(
f'{endpoint}/v1/chat/completions',
json={
'model': self.model,
'messages': [{'role': 'user', 'content': prompt}],
'max_tokens': 500
},
timeout=30
)
if response.status_code == 200:
data = response.json()
if use_local:
return {'success': True, 'text': data.get('response', '')}
else:
return {'success': True, 'text': data['choices'][0]['message']['content']}
else:
return {'success': False, 'error': f'HTTP {response.status_code}'}
except Exception as e:
return {'success': False, 'error': str(e)}

3
app/filters/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from .gitignore import GitignoreFilter, DEFAULT_PATTERNS
__all__ = ['GitignoreFilter', 'DEFAULT_PATTERNS']

30
app/filters/gitignore.py Normal file
View File

@@ -0,0 +1,30 @@
from pathlib import Path
from typing import Set
import fnmatch
DEFAULT_PATTERNS = {
'node_modules/**', '__pycache__/**', '.git/**', 'build/**', 'dist/**',
'.cache/**', 'target/**', 'vendor/**', '.venv/**', 'venv/**',
'*.pyc', '*.pyo', '*.so', '*.dll', '*.dylib', '*.o', '*.a',
'.DS_Store', 'Thumbs.db', '.pytest_cache/**', '.tox/**',
'*.egg-info/**', '.mypy_cache/**', '.coverage', 'htmlcov/**',
'.gradle/**', 'bin/**', 'obj/**', '.vs/**', '.idea/**'
}
class GitignoreFilter:
def __init__(self, patterns: Set[str] = None):
self.patterns = patterns or DEFAULT_PATTERNS
def should_exclude(self, path: str) -> bool:
path_obj = Path(path)
for pattern in self.patterns:
if '**' in pattern:
clean_pattern = pattern.replace('/**', '').replace('**/', '')
if clean_pattern in path_obj.parts:
return True
elif fnmatch.fnmatch(path, pattern) or fnmatch.fnmatch(path_obj.name, pattern):
return True
return False
def filter_files(self, files: list) -> list:
return [f for f in files if not self.should_exclude(f)]

918
app/main.py Normal file
View File

@@ -0,0 +1,918 @@
import os
import sys
from dataclasses import dataclass
import psycopg2
import shutil
import hashlib
import argparse
import json
from pathlib import Path
from typing import List, Dict, Optional
from datetime import datetime
import logging
import time
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.FileHandler('disk_reorganizer.log'), logging.StreamHandler(sys.stdout)])
logger = logging.getLogger(__name__)
@dataclass
class FileRecord:
path: str
size: int
modified_time: float
disk_label: str
checksum: Optional[str] = None
status: str = 'indexed'
class DiskReorganizer:
def __init__(self, db_config: Dict=None):
if db_config is None:
db_config = {'host': os.getenv('DB_HOST', '192.168.1.159'), 'port': int(os.getenv('DB_PORT', 5432)), 'database': os.getenv('DB_NAME', 'disk_reorganizer_db'), 'user': os.getenv('DB_USER', 'auction'), 'password': os.getenv('DB_PASSWORD', 'heel-goed-wachtwoord')}
self.db_config = db_config
self.init_database()
def get_connection(self):
return psycopg2.connect(**self.db_config)
def init_database(self):
try:
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("\n SELECT table_name FROM information_schema.tables\n WHERE table_schema = 'public' AND table_name IN ('files', 'operations')\n ")
tables = cursor.fetchall()
if len(tables) < 2:
logger.error('Database tables not found! Please run setup_database.sh first.')
raise Exception('Database not properly initialized. Run setup_database.sh')
cursor.close()
conn.close()
logger.info('Database connection verified successfully')
except psycopg2.Error as e:
logger.error(f'Database connection failed: {e}')
raise
def index_disk(self, disk_root: str, disk_name: str):
logger.info(f'Indexing disk: {disk_name} at {disk_root}')
disk_path = Path(disk_root)
if not disk_path.exists():
logger.error(f'Disk path {disk_root} does not exist!')
return
files_count = 0
total_size = 0
start_time = time.time()
conn = self.get_connection()
cursor = conn.cursor()
try:
for root, dirs, files in os.walk(disk_path):
dirs[:] = [d for d in dirs if not d.startswith(('$', 'System Volume Information', 'Recovery'))]
for file in files:
try:
file_path = Path(root) / file
if not file_path.is_file():
continue
stat = file_path.stat()
size = stat.st_size
mtime = datetime.fromtimestamp(stat.st_mtime)
rel_path = str(file_path.relative_to(disk_path))
cursor.execute('\n INSERT INTO files (path, size, modified_time, disk_label, checksum, status)\n VALUES (%s, %s, %s, %s, %s, %s)\n ON CONFLICT (path) DO UPDATE SET\n size = EXCLUDED.size,\n modified_time = EXCLUDED.modified_time,\n disk_label = EXCLUDED.disk_label,\n status = EXCLUDED.status\n ', (rel_path, size, mtime, disk_name, None, 'indexed'))
files_count += 1
total_size += size
if files_count % 100 == 0:
elapsed = time.time() - start_time
rate = files_count / elapsed if elapsed > 0 else 0
display_path = str(file_path)
if len(display_path) > 60:
display_path = '...' + display_path[-57:]
print(f'\rIndexing: {files_count:,} files | {self.format_size(total_size)} | {rate:.0f} files/s | {display_path}', end='', flush=True)
if files_count % 1000 == 0:
conn.commit()
except Exception as e:
conn.rollback()
logger.warning(f'\nSkipping {file_path}: {e}')
continue
conn.commit()
print()
logger.info(f'Completed indexing {disk_name}: {files_count} files, {self.format_size(total_size)}')
finally:
cursor.close()
conn.close()
def calculate_disk_usage(self) -> Dict[str, Dict]:
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('\n SELECT disk_label, SUM(size) as total_size, COUNT(*) as file_count\n FROM files\n GROUP BY disk_label\n ')
usage = {}
for row in cursor.fetchall():
disk = row[0]
size = int(row[1] or 0)
count = int(row[2])
usage[disk] = {'size': size, 'count': count, 'formatted_size': self.format_size(size)}
return usage
finally:
cursor.close()
conn.close()
def plan_migration(self, target_disk: str, destination_disks: List[str]) -> Dict:
logger.info(f'Planning migration to free up {target_disk}')
usage = self.calculate_disk_usage()
if target_disk not in usage:
logger.error(f'Target disk {target_disk} not found in index!')
return {}
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('SELECT path, size, modified_time FROM files WHERE disk_label = %s ORDER BY size DESC', (target_disk,))
files_to_move = cursor.fetchall()
cursor.close()
conn.close()
target_disk_usage = usage[target_disk]['size']
logger.info(f'Need to move {len(files_to_move)} files, {self.format_size(target_disk_usage)}')
dest_availability = []
for disk in destination_disks:
if disk not in usage:
available = float('inf')
else:
available = float('inf')
dest_availability.append({'disk': disk, 'available': available, 'planned_usage': 0})
plan = {'target_disk': target_disk, 'total_size': target_disk_usage, 'file_count': len(files_to_move), 'operations': [], 'destination_disks': destination_disks}
conn = self.get_connection()
cursor = conn.cursor()
try:
for file_info in files_to_move:
rel_path, size, mtime = file_info
dest_disk = destination_disks[len(plan['operations']) % len(destination_disks)]
op = {'source_disk': target_disk, 'source_path': rel_path, 'dest_disk': dest_disk, 'target_path': rel_path, 'size': int(size)}
plan['operations'].append(op)
cursor.execute('INSERT INTO operations (source_path, target_path, operation_type, status) VALUES (%s, %s, %s, %s)', (f'{target_disk}:{rel_path}', f'{dest_disk}:{rel_path}', 'move', 'pending'))
conn.commit()
finally:
cursor.close()
conn.close()
plan_file = f"migration_plan_{target_disk}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(plan_file, 'w') as f:
json.dump(plan, f, indent=2)
logger.info(f"Plan created with {len(plan['operations'])} operations")
logger.info(f'Plan saved to {plan_file}')
return plan
def verify_operation(self, source: Path, dest: Path) -> bool:
if not dest.exists():
return False
try:
source_stat = source.stat()
dest_stat = dest.stat()
if source_stat.st_size != dest_stat.st_size:
return False
return True
except Exception as e:
logger.error(f'Verification error: {e}')
return False
@staticmethod
def file_checksum(path: Path) -> str:
hash_md5 = hashlib.md5()
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def execute_migration(self, plan_file: str, dry_run: bool=True):
logger.info(f"{('DRY RUN' if dry_run else 'EXECUTING')} migration from {plan_file}")
with open(plan_file, 'r') as f:
plan = json.load(f)
operations = plan['operations']
logger.info(f'Processing {len(operations)} operations...')
success_count = 0
error_count = 0
start_time = time.time()
conn = self.get_connection()
cursor = conn.cursor()
try:
for i, op in enumerate(operations, 1):
source_disk = op['source_disk']
source_path = op['source_path']
dest_disk = op['dest_disk']
target_path = op['target_path']
source_full = Path(source_disk) / source_path
dest_full = Path(dest_disk) / target_path
elapsed = time.time() - start_time
rate = i / elapsed if elapsed > 0 else 0
eta = (len(operations) - i) / rate if rate > 0 else 0
display_path = str(source_path)
if len(display_path) > 50:
display_path = '...' + display_path[-47:]
print(f'\r[{i}/{len(operations)}] {success_count} OK, {error_count} ERR | {rate:.1f} files/s | ETA: {int(eta)}s | {display_path}', end='', flush=True)
if dry_run:
if source_full.exists():
success_count += 1
else:
logger.warning(f'\n Source does not exist: {source_full}')
error_count += 1
continue
try:
dest_full.parent.mkdir(parents=True, exist_ok=True)
if source_full.exists():
shutil.copy2(source_full, dest_full)
if self.verify_operation(source_full, dest_full):
cursor.execute("UPDATE files SET disk_label = %s, status = 'moved' WHERE path = %s AND disk_label = %s", (dest_disk, source_path, source_disk))
cursor.execute('UPDATE operations SET executed = 1, executed_at = CURRENT_TIMESTAMP WHERE source_path = %s', (f'{source_disk}:{source_path}',))
success_count += 1
else:
raise Exception('Verification failed')
else:
logger.warning(f'\n Source missing: {source_full}')
error_count += 1
except Exception as e:
logger.error(f'\n Error processing {source_path}: {e}')
cursor.execute('UPDATE operations SET error = %s WHERE source_path = %s', (str(e), f'{source_disk}:{source_path}'))
error_count += 1
if i % 10 == 0:
conn.commit()
conn.commit()
print()
finally:
cursor.close()
conn.close()
logger.info(f'Migration complete: {success_count} success, {error_count} errors')
if not dry_run and error_count == 0:
logger.info(f"✓ Disk {plan['target_disk']} is ready for Linux installation!")
logger.info(f" Remember to safely delete original files from {plan['target_disk']}")
def run_deduplication(self, disk: Optional[str]=None, use_chunks: bool=True):
logger.info(f"Starting deduplication{(' for disk ' + disk if disk else '')}")
disk_mount_map = {'SMT': '/media/mike/SMT', 'DISK1': '/media/mike/DISK1', 'LLM': '/media/mike/LLM'}
conn = self.get_connection()
cursor = conn.cursor()
def hash_file_local(file_path: Path) -> str:
hasher = hashlib.sha256()
with open(file_path, 'rb') as f:
while (chunk := f.read(65536)):
hasher.update(chunk)
return hasher.hexdigest()
try:
if disk:
cursor.execute('SELECT path, size, disk_label FROM files WHERE disk_label = %s AND checksum IS NULL ORDER BY size DESC', (disk,))
else:
cursor.execute('SELECT path, size, disk_label FROM files WHERE checksum IS NULL ORDER BY size DESC')
files_to_process = cursor.fetchall()
total = len(files_to_process)
logger.info(f'Found {total} files to hash')
processed = 0
skipped = 0
start_time = time.time()
batch = []
print(f'Phase 1: Computing checksums...')
for idx, (path_str, size, disk_label) in enumerate(files_to_process, 1):
try:
mount_point = disk_mount_map.get(disk_label, disk_label)
full_path = Path(mount_point) / path_str if not Path(path_str).is_absolute() else Path(path_str)
if not full_path.exists():
skipped += 1
if idx % 100 == 0:
elapsed = time.time() - start_time
rate = (processed + skipped) / elapsed if elapsed > 0 else 0
remaining = (total - idx) / rate if rate > 0 else 0
pct = 100 * idx / total
print(f'\r[{pct:5.1f}%] {processed:,}/{total:,} | {rate:.0f}/s | ETA: {int(remaining / 60)}m{int(remaining % 60):02d}s | Skip: {skipped:,}', end='', flush=True)
continue
checksum = hash_file_local(full_path)
batch.append((checksum, path_str))
processed += 1
if len(batch) >= 1000:
try:
cursor.executemany('UPDATE files SET checksum = %s WHERE path = %s', batch)
conn.commit()
batch.clear()
except Exception as e:
conn.rollback()
batch.clear()
print(f'\nBatch update failed: {e}')
if idx % 100 == 0:
elapsed = time.time() - start_time
rate = (processed + skipped) / elapsed if elapsed > 0 else 0
remaining = (total - idx) / rate if rate > 0 else 0
pct = 100 * idx / total
print(f'\r[{pct:5.1f}%] {processed:,}/{total:,} | {rate:.0f}/s | ETA: {int(remaining / 60)}m{int(remaining % 60):02d}s | Skip: {skipped:,}', end='', flush=True)
except Exception as e:
skipped += 1
if idx <= 5:
print(f'\nDebug: {full_path} - {e}')
if batch:
try:
cursor.executemany('UPDATE files SET checksum = %s WHERE path = %s', batch)
conn.commit()
except Exception as e:
conn.rollback()
print(f'\nFinal batch failed: {e}')
print()
elapsed = time.time() - start_time
logger.info(f'Phase 1 done: {processed:,} files in {int(elapsed / 60)}m{int(elapsed % 60):02d}s ({skipped:,} skipped)')
print('Phase 2: Finding duplicates...')
cursor.execute('\n UPDATE files f1 SET duplicate_of = (\n SELECT MIN(path) FROM files f2\n WHERE f2.checksum = f1.checksum AND f2.path < f1.path\n )\n WHERE checksum IS NOT NULL\n ')
conn.commit()
cursor.execute('SELECT COUNT(*) FROM files WHERE duplicate_of IS NOT NULL')
dup_count = cursor.fetchone()[0]
logger.info(f'Phase 2 done: Found {dup_count:,} duplicates')
finally:
cursor.close()
conn.close()
def plan_merge(self, sources: List[str], target: str, output_file: str, filter_system: bool=False, network_target: str=None):
logger.info(f"Planning merge: {', '.join(sources)}{target or network_target}")
if filter_system:
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from filters import GitignoreFilter
file_filter = GitignoreFilter()
logger.info('System/build file filtering enabled')
conn = self.get_connection()
cursor = conn.cursor()
try:
placeholders = ','.join(['%s'] * len(sources))
cursor.execute(f'\n SELECT path, size, checksum, disk_label, duplicate_of\n FROM files\n WHERE disk_label IN ({placeholders})\n ORDER BY size DESC\n ', tuple(sources))
files = cursor.fetchall()
total_files = len(files)
total_size = sum((int(f[1]) for f in files))
unique_files = {}
duplicate_count = 0
duplicate_size = 0
filtered_count = 0
filtered_size = 0
for path, size, checksum, disk_label, duplicate_of in files:
if filter_system and file_filter.should_exclude(path):
filtered_count += 1
filtered_size += int(size)
continue
if checksum and checksum in unique_files:
duplicate_count += 1
duplicate_size += int(size)
elif checksum:
unique_files[checksum] = (path, int(size), disk_label)
unique_count = len(unique_files)
unique_size = sum((f[1] for f in unique_files.values()))
plan = {'sources': sources, 'target': target or network_target, 'network': network_target is not None, 'total_files': total_files, 'total_size': total_size, 'unique_files': unique_count, 'unique_size': unique_size, 'duplicate_files': duplicate_count, 'duplicate_size': duplicate_size, 'filtered_files': filtered_count if filter_system else 0, 'filtered_size': filtered_size if filter_system else 0, 'space_saved': duplicate_size + (filtered_size if filter_system else 0), 'operations': []}
for checksum, (path, size, disk_label) in unique_files.items():
plan['operations'].append({'source_disk': disk_label, 'source_path': path, 'target_disk': target or network_target, 'target_path': path, 'size': size, 'checksum': checksum})
with open(output_file, 'w') as f:
json.dump(plan, f, indent=2)
logger.info(f'Merge plan saved to {output_file}')
print(f'\n=== MERGE PLAN SUMMARY ===')
print(f"Sources: {', '.join(sources)}")
print(f'Target: {target or network_target}')
print(f'Total files: {total_files:,} ({self.format_size(total_size)})')
if filter_system:
print(f'Filtered (system/build): {filtered_count:,} ({self.format_size(filtered_size)})')
print(f'Unique files: {unique_count:,} ({self.format_size(unique_size)})')
print(f'Duplicates: {duplicate_count:,} ({self.format_size(duplicate_size)})')
print(f"Total space saved: {self.format_size(plan['space_saved'])}")
print(f'Space needed on target: {self.format_size(unique_size)}')
finally:
cursor.close()
conn.close()
def generate_report(self, format='text', show_duplicates=False, preview_merge=None):
conn = self.get_connection()
cursor = conn.cursor()
try:
if preview_merge:
with open(preview_merge, 'r') as f:
plan = json.load(f)
print('\n=== MERGE PLAN PREVIEW ===')
print(f"Sources: {', '.join(plan['sources'])}")
print(f"Target: {plan['target']}")
print(f"Total files: {plan['total_files']:,} ({self.format_size(plan['total_size'])})")
print(f"Unique files: {plan['unique_files']:,} ({self.format_size(plan['unique_size'])})")
print(f"Duplicates: {plan['duplicate_files']:,} ({self.format_size(plan['duplicate_size'])})")
print(f"Space saved: {self.format_size(plan['space_saved'])}")
print(f"Space needed on target: {self.format_size(plan['unique_size'])}")
return
cursor.execute('\n SELECT status, COUNT(*), SUM(size) FROM files GROUP BY status\n ')
print('\n=== FILE MIGRATION REPORT ===')
for row in cursor.fetchall():
status, count, size = row
print(f'{status:15}: {count:6} files, {self.format_size(int(size or 0))}')
cursor.execute('\n SELECT disk_label, COUNT(*), SUM(size) FROM files GROUP BY disk_label\n ')
print('\n=== DISK USAGE ===')
for row in cursor.fetchall():
disk, count, size = row
print(f'{disk:20}: {count:6} files, {self.format_size(int(size or 0))}')
cursor.execute('\n SELECT COUNT(*), SUM(size) FROM files WHERE checksum IS NOT NULL\n ')
hashed_count, hashed_size = cursor.fetchone()
cursor.execute('\n SELECT COUNT(*), SUM(size) FROM files WHERE duplicate_of IS NOT NULL\n ')
dup_count, dup_size = cursor.fetchone()
print('\n=== DEDUPLICATION STATS ===')
print(f'Files with checksums: {hashed_count or 0:6}')
print(f'Duplicate files: {dup_count or 0:6} ({self.format_size(int(dup_size or 0))})')
if show_duplicates and dup_count:
print('\n=== DUPLICATE FILES ===')
cursor.execute('\n SELECT path, size, duplicate_of FROM files\n WHERE duplicate_of IS NOT NULL\n ORDER BY size DESC\n LIMIT 20\n ')
for path, size, dup_of in cursor.fetchall():
print(f' {path} ({self.format_size(int(size))}) → {dup_of}')
cursor.execute('\n SELECT operation_type, executed, verified, COUNT(*) FROM operations GROUP BY operation_type, executed, verified\n ')
print('\n=== OPERATIONS REPORT ===')
for row in cursor.fetchall():
op_type, executed, verified, count = row
status = 'EXECUTED' if executed else 'PENDING'
if verified:
status += '+VERIFIED'
print(f'{op_type:10} {status:15}: {count} operations')
finally:
cursor.close()
conn.close()
def profile_content(self, disk: Optional[str]=None, update_db: bool=False, limit: Optional[int]=None):
from content.profiler import ContentProfiler
profiler = ContentProfiler()
disk_mount_map = {'SMT': '/media/mike/SMT', 'DISK1': '/media/mike/DISK1', 'LLM': '/media/mike/LLM'}
conn = self.get_connection()
cursor = conn.cursor()
try:
query = 'SELECT path, size, disk_label FROM files WHERE 1=1'
params = []
if disk:
query += ' AND disk_label = %s'
params.append(disk)
if limit:
query += f' LIMIT {limit}'
cursor.execute(query, params)
files = cursor.fetchall()
total = len(files)
logger.info(f'Profiling {total:,} files...')
kind_stats = {}
processable = 0
batch = []
for idx, (path, size, disk_label) in enumerate(files, 1):
mount_point = disk_mount_map.get(disk_label, disk_label)
full_path = Path(mount_point) / path if not Path(path).is_absolute() else Path(path)
if not full_path.exists():
continue
profile = profiler.profile_file(full_path)
if 'error' not in profile:
kind = profile['kind']
if kind not in kind_stats:
kind_stats[kind] = {'count': 0, 'processable': 0}
kind_stats[kind]['count'] += 1
if profile['processable']:
kind_stats[kind]['processable'] += 1
processable += 1
if update_db:
profile_json = json.dumps(profile)
batch.append((kind, profile_json, path))
if len(batch) >= 500:
cursor.executemany("UPDATE files SET metadata = jsonb_set(COALESCE(metadata, '{}'::jsonb), '{profile}', %s::jsonb) WHERE path = %s", [(pj, p) for k, pj, p in batch])
conn.commit()
batch.clear()
if idx % 100 == 0:
print(f'\rProfiled: {idx:,}/{total:,}', end='', flush=True)
if update_db and batch:
cursor.executemany("UPDATE files SET metadata = jsonb_set(COALESCE(metadata, '{}'::jsonb), '{profile}', %s::jsonb) WHERE path = %s", [(pj, p) for k, pj, p in batch])
conn.commit()
print()
print(f'\n=== CONTENT PROFILE SUMMARY ===')
print(f'Total files: {total:,}')
print(f'Processable: {processable:,}\n')
print(f"{'Kind':<15} {'Total':<10} {'Processable':<12} {'Extractor'}")
print('-' * 60)
for kind in sorted(kind_stats.keys()):
stats = kind_stats[kind]
extractor = profiler._suggest_extractor(kind, '')
print(f"{kind:<15} {stats['count']:<10,} {stats['processable']:<12,} {extractor or 'none'}")
finally:
cursor.close()
conn.close()
def extract_content(self, kind: Optional[str]=None, limit: int=10):
from content.profiler import ContentProfiler
from content.extractors import ContentExtractor
profiler = ContentProfiler()
extractor = ContentExtractor()
disk_mount_map = {'SMT': '/media/mike/SMT', 'DISK1': '/media/mike/DISK1', 'LLM': '/media/mike/LLM'}
conn = self.get_connection()
cursor = conn.cursor()
try:
query = "SELECT path, size, disk_label, metadata FROM files WHERE metadata->'profile'->>'processable' = 'true'"
params = []
if kind:
query += " AND metadata->'profile'->>'kind' = %s"
params.append(kind)
query += f' LIMIT {limit}'
cursor.execute(query, params)
files = cursor.fetchall()
print(f'\n=== EXTRACTING CONTENT ===')
print(f'Processing {len(files)} files\n')
for path, size, disk_label, metadata in files:
mount_point = disk_mount_map.get(disk_label, disk_label)
full_path = Path(mount_point) / path if not Path(path).is_absolute() else Path(path)
if not full_path.exists():
continue
profile = metadata.get('profile', {}) if metadata else {}
extractor_type = profile.get('extractor')
if not extractor_type:
continue
print(f'Extracting: {path}')
print(f" Type: {profile.get('kind')} | Extractor: {extractor_type}")
result = extractor.extract(full_path, extractor_type)
if 'text' in result:
preview = result['text'][:200]
print(f' Preview: {preview}...')
elif 'pipeline' in result:
print(f" Pipeline: {''.join(result['pipeline'])}")
print(f" Status: {result.get('status', 'pending')}")
print()
finally:
cursor.close()
conn.close()
def parse_files(self, kind: Optional[str] = None, limit: int = 100, update_db: bool = False):
from parsers.text_parser import TextParser
from parsers.code_parser import CodeParser
from parsers.pdf_parser import PDFParser
parsers = {'text': TextParser(), 'code': CodeParser(), 'pdf': PDFParser()}
disk_mount_map = {'SMT': '/media/mike/SMT', 'DISK1': '/media/mike/DISK1', 'LLM': '/media/mike/LLM'}
conn = self.get_connection()
cursor = conn.cursor()
try:
query = "SELECT path, size, disk_label FROM files WHERE 1=1"
params = []
if kind:
suffix_map = {'text': "('.txt', '.md', '.log', '.json')", 'code': "('.py', '.js', '.java', '.go')", 'pdf': "('.pdf',)"}
if kind in suffix_map:
query += f" AND RIGHT(path, 4) IN {suffix_map[kind]} OR RIGHT(path, 3) IN {suffix_map[kind]}"
query += f" LIMIT {limit}"
cursor.execute(query, params)
files = cursor.fetchall()
print(f"\n=== PARSING FILES ===\nProcessing {len(files)} files\n")
parsed_count = 0
for path, size, disk_label in files:
mount_point = disk_mount_map.get(disk_label, disk_label)
full_path = Path(mount_point) / path if not Path(path).is_absolute() else Path(path)
if not full_path.exists() or int(size) > 10 * 1024 * 1024:
continue
file_kind = 'pdf' if path.endswith('.pdf') else 'code' if any(path.endswith(e) for e in ['.py', '.js', '.java']) else 'text'
parser = parsers.get(file_kind)
if not parser:
continue
result = parser.parse(full_path)
if 'error' not in result:
text = result.get('text', '')
quality = result.get('quality', 'unknown')
print(f"{path[:60]} | {file_kind} | {len(text):,} chars")
if update_db and text:
cursor.execute("UPDATE files SET extracted_text = %s, text_quality = %s WHERE path = %s", (text[:50000], quality, path))
parsed_count += 1
if parsed_count % 10 == 0:
conn.commit()
if update_db:
conn.commit()
print(f"\nParsed {parsed_count} files")
finally:
cursor.close()
conn.close()
def enrich_files(self, limit: int = 10, llm_endpoint: str = None, use_local: bool = False):
from enrichment.enricher import ContentEnricher
enricher = ContentEnricher()
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute(f"SELECT path, extracted_text FROM files WHERE extracted_text IS NOT NULL LIMIT {limit}")
files = cursor.fetchall()
print(f"\n=== ENRICHING CONTENT ===\nProcessing {len(files)} files\n")
for path, text in files:
enrichment = enricher.enrich(text[:5000], use_llm=False)
print(f"{path[:60]}")
print(f" Quality: {enrichment.get('quality')} | Words: {enrichment.get('word_count'):,}")
print(f" PII: {list(enrichment.get('has_pii', {}).keys())}")
print(f" Topics: {', '.join(enrichment.get('topics', [])[:5])}\n")
cursor.execute("UPDATE files SET enrichment = %s::jsonb WHERE path = %s", (json.dumps(enrichment), path))
conn.commit()
print(f"Enriched {len(files)} files")
finally:
cursor.close()
conn.close()
def classify_files(self, disk: Optional[str]=None, update_db: bool=False, resume: bool=True):
from classification.classifier import FileClassifier
classifier = FileClassifier()
conn = self.get_connection()
cursor = conn.cursor()
try:
task_name = f"classify_{disk or 'all'}"
skip_count = 0
if resume and update_db:
cursor.execute('SELECT last_processed_path, processed_count FROM processing_checkpoints WHERE task_name = %s', (task_name,))
checkpoint = cursor.fetchone()
if checkpoint:
last_path, skip_count = checkpoint
logger.info(f'Resuming from checkpoint: {skip_count:,} files already processed')
if disk:
cursor.execute('SELECT path, size, disk_label FROM files WHERE disk_label = %s ORDER BY path', (disk,))
else:
cursor.execute('SELECT path, size, disk_label FROM files ORDER BY path')
files = cursor.fetchall()
total = len(files)
logger.info(f'Classifying {total:,} files...')
categories = {}
build_artifacts = 0
batch = []
processed = 0
for idx, (path, size, disk_label) in enumerate(files, 1):
if idx <= skip_count:
continue
labels, category, is_build = classifier.classify_path(path, int(size))
if is_build:
build_artifacts += 1
if category not in categories:
categories[category] = {'count': 0, 'size': 0}
categories[category]['count'] += 1
categories[category]['size'] += int(size)
if update_db:
labels_str = ','.join(labels)
batch.append((category, labels_str, path))
if len(batch) >= 1000:
cursor.executemany('UPDATE files SET category = %s WHERE path = %s', [(cat, p) for cat, lbl, p in batch])
cursor.execute('''
INSERT INTO processing_checkpoints (task_name, last_processed_path, processed_count, updated_at)
VALUES (%s, %s, %s, CURRENT_TIMESTAMP)
ON CONFLICT (task_name) DO UPDATE SET
last_processed_path = EXCLUDED.last_processed_path,
processed_count = EXCLUDED.processed_count,
updated_at = CURRENT_TIMESTAMP
''', (task_name, path, idx))
conn.commit()
batch.clear()
processed += 1
if idx % 1000 == 0:
print(f'\rClassified: {idx:,}/{total:,} ({100*idx/total:.1f}%)', end='', flush=True)
if update_db and batch:
cursor.executemany('UPDATE files SET category = %s WHERE path = %s', [(cat, p) for cat, lbl, p in batch])
cursor.execute('''
INSERT INTO processing_checkpoints (task_name, last_processed_path, processed_count, updated_at)
VALUES (%s, %s, %s, CURRENT_TIMESTAMP)
ON CONFLICT (task_name) DO UPDATE SET
last_processed_path = EXCLUDED.last_processed_path,
processed_count = EXCLUDED.processed_count,
updated_at = CURRENT_TIMESTAMP
''', (task_name, files[-1][0] if files else '', total))
conn.commit()
print()
print(f'\n=== CLASSIFICATION SUMMARY ===')
print(f'Total files: {total:,}')
print(f'Build artifacts: {build_artifacts:,}')
print(f'\nCategories:')
for category in sorted(categories.keys()):
info = categories[category]
print(f" {category:30}: {info['count']:8,} files, {self.format_size(info['size'])}")
finally:
cursor.close()
conn.close()
def analyze_folders(self, disk: Optional[str]=None, min_files: int=3):
from analysis.folder_analyzer import FolderAnalyzer
analyzer = FolderAnalyzer()
conn = self.get_connection()
cursor = conn.cursor()
try:
query = '''
SELECT DISTINCT SUBSTRING(path FROM 1 FOR POSITION('/' IN path || '/') - 1) as folder, disk_label
FROM files
WHERE 1=1
'''
params = []
if disk:
query += ' AND disk_label = %s'
params.append(disk)
cursor.execute(query, params)
potential_folders = cursor.fetchall()
logger.info(f'Found {len(potential_folders)} potential folders to analyze')
processed = 0
for folder_name, disk_label in potential_folders:
cursor.execute('''
SELECT path, size FROM files
WHERE disk_label = %s AND path LIKE %s
''', (disk_label, f'{folder_name}%'))
files = cursor.fetchall()
if len(files) < min_files:
continue
files_list = [{'path': f[0], 'size': int(f[1])} for f in files]
folder_path = Path(folder_name)
analysis = analyzer.analyze_folder(folder_path, files_list)
readme_text = None
for file_dict in files_list:
if 'readme' in file_dict['path'].lower():
readme_text = f"Found README at {file_dict['path']}"
break
summary = analyzer.generate_summary(analysis, readme_text)
cursor.execute('''
INSERT INTO folders (path, disk_label, file_count, total_size, project_type, intent, summary,
has_readme, has_git, has_manifest, manifest_types, dominant_file_types, structure)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (path) DO UPDATE SET
file_count = EXCLUDED.file_count,
total_size = EXCLUDED.total_size,
project_type = EXCLUDED.project_type,
intent = EXCLUDED.intent,
summary = EXCLUDED.summary,
has_readme = EXCLUDED.has_readme,
has_git = EXCLUDED.has_git,
has_manifest = EXCLUDED.has_manifest,
manifest_types = EXCLUDED.manifest_types,
dominant_file_types = EXCLUDED.dominant_file_types,
structure = EXCLUDED.structure,
updated_at = CURRENT_TIMESTAMP
''', (
str(folder_path), disk_label, len(files_list), sum(f['size'] for f in files_list),
analysis.get('project_type'), analysis.get('intent'), summary,
analysis.get('has_readme'), analysis.get('has_git'), analysis.get('has_manifest'),
analysis.get('manifest_types'), json.dumps(analysis.get('dominant_file_types', {})),
json.dumps(analysis.get('structure', {}))
))
processed += 1
if processed % 100 == 0:
conn.commit()
print(f'\rAnalyzed: {processed} folders', end='', flush=True)
conn.commit()
print()
logger.info(f'Completed folder analysis: {processed} folders')
cursor.execute('''
SELECT project_type, COUNT(*), SUM(file_count), SUM(total_size)
FROM folders
GROUP BY project_type
''')
print(f'\n=== FOLDER ANALYSIS SUMMARY ===')
for row in cursor.fetchall():
proj_type, count, files, size = row
print(f'{proj_type:20}: {count:6,} folders, {files:8,} files, {self.format_size(int(size or 0))}')
finally:
cursor.close()
conn.close()
def review_migration(self, category: Optional[str]=None, show_build: bool=False):
from classification.classifier import FileClassifier
classifier = FileClassifier()
conn = self.get_connection()
cursor = conn.cursor()
try:
query = 'SELECT path, size, category FROM files WHERE 1=1'
params = []
if category:
query += ' AND category = %s'
params.append(category)
if not show_build:
query += " AND (metadata->>'labels' IS NULL OR metadata->>'labels' NOT LIKE '%build-artifact%')"
query += ' ORDER BY category, size DESC LIMIT 100'
cursor.execute(query, params)
files = cursor.fetchall()
if not files:
print('No files found matching criteria')
return
print(f'\n=== MIGRATION PREVIEW ===')
print(f'Showing {len(files)} files\n')
current_category = None
for path, size, cat in files:
if cat != current_category:
current_category = cat
print(f'\n{cat}:')
labels, suggested_cat, is_build = classifier.classify_path(path, int(size))
target = classifier.suggest_target_path(path, suggested_cat, labels)
print(f' {path}')
print(f'{target} ({self.format_size(int(size))})')
finally:
cursor.close()
conn.close()
@staticmethod
def format_size(size: int) -> str:
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size < 1024:
return f'{size:.1f}{unit}'
size /= 1024
return f'{size:.1f}PB'
def main():
parser = argparse.ArgumentParser(description='Disk Reorganizer - Free up a disk for Linux dual-boot')
subparsers = parser.add_subparsers(dest='command', required=True)
index_parser = subparsers.add_parser('index', help='Index files on a disk')
index_parser.add_argument('disk_root', help='Root path of disk (e.g., D:\\\\)')
index_parser.add_argument('disk_name', help='Logical name for the disk')
plan_parser = subparsers.add_parser('plan', help='Create migration plan')
plan_parser.add_argument('target_disk', help='Disk to free up')
plan_parser.add_argument('dest_disks', nargs='+', help='Destination disks')
exec_parser = subparsers.add_parser('execute', help='Execute migration plan')
exec_parser.add_argument('plan_file', help='Path to plan JSON file')
exec_parser.add_argument('--dry-run', action='store_true', help='Simulate without actual file operations')
dedupe_parser = subparsers.add_parser('dedupe', help='Deduplicate files and compute checksums')
dedupe_parser.add_argument('--disk', help='Optional: Only dedupe specific disk')
dedupe_parser.add_argument('--no-chunks', action='store_true', help='Disable chunk-level deduplication')
merge_parser = subparsers.add_parser('merge', help='Plan multi-disk merge with deduplication')
merge_parser.add_argument('--sources', nargs='+', required=True, help='Source disks to merge')
merge_parser.add_argument('--target', required=True, help='Target disk')
merge_parser.add_argument('--output', default='merge_plan.json', help='Output plan file')
merge_parser.add_argument('--filter-system', action='store_true', help='Filter system/build files')
merge_parser.add_argument('--network', help='Network target (e.g., user@host:/path)')
profile_parser = subparsers.add_parser('profile', help='Create content profiles (inventory + triage)')
profile_parser.add_argument('--disk', help='Profile specific disk')
profile_parser.add_argument('--update', action='store_true', help='Update database with profiles')
profile_parser.add_argument('--limit', type=int, help='Limit number of files')
extract_parser = subparsers.add_parser('extract', help='Extract content from files')
extract_parser.add_argument('--kind', help='Extract specific kind (pdf, image, audio, video)')
extract_parser.add_argument('--limit', type=int, default=10, help='Limit extraction batch')
parse_parser = subparsers.add_parser('parse', help='Parse files to extract text')
parse_parser.add_argument('--kind', help='Parse specific kind (text, code, pdf)')
parse_parser.add_argument('--limit', type=int, default=100, help='Limit parse batch')
parse_parser.add_argument('--update', action='store_true', help='Save extracted text to database')
enrich_parser = subparsers.add_parser('enrich', help='Enrich content with LLM analysis')
enrich_parser.add_argument('--limit', type=int, default=10, help='Limit enrichment batch')
enrich_parser.add_argument('--llm-endpoint', default='http://192.168.1.74:1234', help='LLM endpoint')
enrich_parser.add_argument('--local', action='store_true', help='Use local Ollama')
classify_parser = subparsers.add_parser('classify', help='Classify files and suggest organization')
classify_parser.add_argument('--disk', help='Classify specific disk')
classify_parser.add_argument('--update', action='store_true', help='Update database with classifications')
classify_parser.add_argument('--no-resume', action='store_true', help='Start from scratch instead of resuming')
folders_parser = subparsers.add_parser('analyze-folders', help='Analyze folder structure and infer project intent')
folders_parser.add_argument('--disk', help='Analyze specific disk')
folders_parser.add_argument('--min-files', type=int, default=3, help='Minimum files per folder')
review_parser = subparsers.add_parser('review', help='Review proposed migration structure')
review_parser.add_argument('--category', help='Review specific category')
review_parser.add_argument('--show-build', action='store_true', help='Include build artifacts')
report_parser = subparsers.add_parser('report', help='Show current status')
report_parser.add_argument('--format', choices=['text', 'json'], default='text', help='Report format')
report_parser.add_argument('--show-duplicates', action='store_true', help='Show duplicate files')
report_parser.add_argument('--preview-merge', help='Preview merge plan from file')
args = parser.parse_args()
tool = DiskReorganizer()
if args.command == 'index':
tool.index_disk(args.disk_root, args.disk_name)
elif args.command == 'dedupe':
tool.run_deduplication(disk=args.disk, use_chunks=not args.no_chunks)
elif args.command == 'merge':
tool.plan_merge(sources=args.sources, target=args.target, output_file=args.output, filter_system=args.filter_system, network_target=args.network)
elif args.command == 'plan':
plan = tool.plan_migration(args.target_disk, args.dest_disks)
if plan:
print(f"\nPlan generated: {plan['file_count']} files, {tool.format_size(plan['total_size'])}")
print(f"Destination disks: {', '.join(plan['destination_disks'])}")
elif args.command == 'execute':
tool.execute_migration(args.plan_file, dry_run=args.dry_run)
elif args.command == 'profile':
tool.profile_content(disk=args.disk, update_db=args.update, limit=args.limit)
elif args.command == 'extract':
tool.extract_content(kind=args.kind, limit=args.limit)
elif args.command == 'parse':
tool.parse_files(kind=args.kind, limit=args.limit, update_db=args.update)
elif args.command == 'enrich':
tool.enrich_files(limit=args.limit, llm_endpoint=args.llm_endpoint, use_local=args.local)
elif args.command == 'classify':
tool.classify_files(disk=args.disk, update_db=args.update, resume=not args.no_resume)
elif args.command == 'analyze-folders':
tool.analyze_folders(disk=args.disk, min_files=args.min_files)
elif args.command == 'review':
tool.review_migration(category=args.category, show_build=args.show_build)
elif args.command == 'report':
tool.generate_report(format=args.format, show_duplicates=args.show_duplicates, preview_merge=args.preview_merge)
if __name__ == '__main__':
main()

View File

@@ -63,7 +63,7 @@ class MigrationEngine:
CREATE TABLE IF NOT EXISTS operations (
id SERIAL PRIMARY KEY,
source_path TEXT NOT NULL,
dest_path TEXT NOT NULL,
target_path TEXT NOT NULL,
operation_type TEXT NOT NULL,
size BIGINT DEFAULT 0,
status TEXT DEFAULT 'pending',
@@ -107,7 +107,7 @@ class MigrationEngine:
params = []
if disk:
conditions.append("disk = %s")
conditions.append("disk_label = %s")
params.append(disk)
if category:
@@ -133,7 +133,7 @@ class MigrationEngine:
source = Path(path_str)
# Determine destination
dest_path = self.target_base / file_category / source.name
target_path = self.target_base / file_category / source.name
# Determine operation type
if duplicate_of:
@@ -145,7 +145,7 @@ class MigrationEngine:
operation = OperationRecord(
source_path=source,
dest_path=dest_path,
target_path=target_path,
operation_type=operation_type,
size=size
)
@@ -200,7 +200,7 @@ class MigrationEngine:
# In dry run, just log what would happen
self.logger.debug(
f"[DRY RUN] Would {operation.operation_type}: "
f"{operation.source_path} -> {operation.dest_path}"
f"{operation.source_path} -> {operation.target_path}"
)
stats.files_succeeded += 1
else:
@@ -261,7 +261,7 @@ class MigrationEngine:
# Execute migration
success = strategy.migrate(
operation.source_path,
operation.dest_path,
operation.target_path,
verify=self.processing_config.verify_operations
)
@@ -294,13 +294,13 @@ class MigrationEngine:
cursor.execute("""
INSERT INTO operations (
source_path, dest_path, operation_type, size,
source_path, target_path, operation_type, bytes_processed,
status, error, executed_at, verified
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (
str(operation.source_path),
str(operation.dest_path),
str(operation.target_path),
operation.operation_type,
operation.size,
operation.status,
@@ -321,12 +321,12 @@ class MigrationEngine:
Returns:
True if rollback successful
"""
self.logger.warning(f"Rolling back: {operation.dest_path}")
self.logger.warning(f"Rolling back: {operation.target_path}")
try:
# Remove destination
if operation.dest_path.exists():
operation.dest_path.unlink()
if operation.target_path.exists():
operation.target_path.unlink()
# Update database
conn = self._get_connection()
@@ -335,8 +335,8 @@ class MigrationEngine:
cursor.execute("""
UPDATE operations
SET status = 'rolled_back'
WHERE source_path = %s AND dest_path = %s
""", (str(operation.source_path), str(operation.dest_path)))
WHERE source_path = %s AND target_path = %s
""", (str(operation.source_path), str(operation.target_path)))
conn.commit()
cursor.close()
@@ -344,7 +344,7 @@ class MigrationEngine:
return True
except Exception as e:
self.logger.error(f"Rollback failed: {operation.dest_path}: {e}")
self.logger.error(f"Rollback failed: {operation.target_path}: {e}")
return False
def get_migration_stats(self) -> dict:
@@ -396,7 +396,7 @@ class MigrationEngine:
cursor = conn.cursor()
cursor.execute("""
SELECT source_path, dest_path, operation_type
SELECT source_path, target_path, operation_type
FROM operations
WHERE status = 'completed' AND verified = FALSE
""")

View File

@@ -0,0 +1,44 @@
from pathlib import Path
from typing import Dict
import re
class CodeParser:
def __init__(self):
self.patterns = {
'python': {'imports': r'^import |^from .+ import', 'class': r'^class \w+', 'function': r'^def \w+'},
'javascript': {'imports': r'^import |^require\(', 'class': r'^class \w+', 'function': r'^function \w+|^const \w+ = '},
'java': {'package': r'^package ', 'imports': r'^import ', 'class': r'^public class \w+'},
'go': {'package': r'^package ', 'imports': r'^import ', 'function': r'^func \w+'}
}
def parse(self, file_path: Path) -> Dict:
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text = f.read()
language = self._detect_language(file_path, text)
structure = self._extract_structure(text, language)
return {
'text': text,
'language': language,
'line_count': len(text.split('\n')),
'structure': structure,
'quality': 'high'
}
except Exception as e:
return {'error': str(e)}
def _detect_language(self, file_path: Path, text: str) -> str:
lang_map = {'.py': 'python', '.js': 'javascript', '.ts': 'typescript', '.java': 'java', '.go': 'go'}
return lang_map.get(file_path.suffix.lower(), 'unknown')
def _extract_structure(self, text: str, language: str) -> Dict:
patterns = self.patterns.get(language, {})
structure = {'type': 'code', 'language': language}
for key, pattern in patterns.items():
matches = re.findall(pattern, text, re.MULTILINE)
structure[key] = len(matches)
return structure

View File

@@ -0,0 +1,42 @@
from pathlib import Path
from typing import Dict
class MediaParser:
def parse_audio(self, file_path: Path) -> Dict:
return {
'text': '[Audio transcription pending]',
'needs_transcription': True,
'transcription_service': 'whisper',
'structure': {'type': 'audio'},
'quality': 'pending'
}
def parse_video(self, file_path: Path) -> Dict:
return {
'text': '[Video transcription pending]',
'needs_transcription': True,
'needs_scene_detection': True,
'transcription_service': 'whisper',
'structure': {'type': 'video'},
'quality': 'pending'
}
def parse_image(self, file_path: Path) -> Dict:
try:
from PIL import Image
with Image.open(file_path) as img:
width, height = img.size
mode = img.mode
return {
'text': '[Image caption/OCR pending]',
'needs_ocr': True,
'needs_caption': True,
'dimensions': f'{width}x{height}',
'mode': mode,
'structure': {'type': 'image', 'width': width, 'height': height},
'quality': 'pending'
}
except Exception as e:
return {'error': str(e)}

31
app/parsers/pdf_parser.py Normal file
View File

@@ -0,0 +1,31 @@
from pathlib import Path
from typing import Dict, List
class PDFParser:
def parse(self, file_path: Path) -> Dict:
try:
import PyPDF2
pages = []
with open(file_path, 'rb') as f:
pdf = PyPDF2.PdfReader(f)
page_count = len(pdf.pages)
for i, page in enumerate(pdf.pages[:50]):
text = page.extract_text()
pages.append({'page': i + 1, 'text': text, 'char_count': len(text)})
full_text = '\n\n'.join([p['text'] for p in pages])
has_text_layer = sum(p['char_count'] for p in pages) > 100
return {
'text': full_text,
'page_count': page_count,
'pages_extracted': len(pages),
'has_text_layer': has_text_layer,
'needs_ocr': not has_text_layer,
'structure': {'type': 'document', 'pages': pages[:5]},
'quality': 'high' if has_text_layer else 'needs_ocr'
}
except Exception as e:
return {'error': str(e), 'needs_ocr': True}

View File

@@ -0,0 +1,26 @@
from pathlib import Path
from typing import Dict, Optional
import chardet
class TextParser:
def parse(self, file_path: Path) -> Dict:
try:
with open(file_path, 'rb') as f:
raw_data = f.read(1024 * 1024)
encoding = chardet.detect(raw_data)['encoding'] or 'utf-8'
text = raw_data.decode(encoding, errors='ignore')
lines = text.split('\n')
return {
'text': text,
'encoding': encoding,
'line_count': len(lines),
'char_count': len(text),
'word_count': len(text.split()),
'structure': {'type': 'plain_text'},
'quality': 'high' if encoding == 'utf-8' else 'medium'
}
except Exception as e:
return {'error': str(e)}

View File

@@ -12,7 +12,7 @@ class FileRecord:
size: int
modified_time: float
created_time: float
disk: str
disk_label: str
checksum: str | None = None
status: str = 'indexed' # indexed, planned, moved, verified
category: str | None = None
@@ -23,7 +23,7 @@ class FileRecord:
class OperationRecord:
"""Record of a migration operation"""
source_path: Path
dest_path: Path
target_path: Path
operation_type: str # move, copy, hardlink, symlink
status: str = 'pending' # pending, in_progress, completed, failed
error: str | None = None

View File

@@ -12,7 +12,7 @@ class FileRecord:
size: int
modified_time: float
created_time: float
disk: str
disk_label: str
checksum: Optional[str] = None
status: str = 'indexed' # indexed, planned, moved, verified
category: Optional[str] = None
@@ -25,7 +25,7 @@ class FileRecord:
'size': self.size,
'modified_time': self.modified_time,
'created_time': self.created_time,
'disk': self.disk,
'disk_label': self.disk_label,
'checksum': self.checksum,
'status': self.status,
'category': self.category,
@@ -37,7 +37,7 @@ class FileRecord:
class OperationRecord:
"""Record of a migration operation"""
source_path: Path
dest_path: Path
target_path: Path
operation_type: str # move, copy, hardlink, symlink
size: int = 0
status: str = 'pending' # pending, in_progress, completed, failed
@@ -49,7 +49,7 @@ class OperationRecord:
"""Convert to dictionary for serialization"""
return {
'source_path': str(self.source_path),
'dest_path': str(self.dest_path),
'target_path': str(self.target_path),
'operation_type': self.operation_type,
'size': self.size,
'status': self.status,

Binary file not shown.

View File

@@ -86,7 +86,7 @@ services:
- full-cycle
- development
# Uncomment for development with hot reload
# command: watchmedo auto-restart --pattern="*.py" --recursive -- python main.py
# command: watchmedo auto-restart --pattern="*.py" --recursive -- python app/main.py
# Single command services for specific operations
index:
@@ -105,7 +105,7 @@ services:
- ${HOST_SOURCE_PATH:-/mnt/source}:/mnt/source:ro
- ./config:/app/config
- ./logs:/app/logs
command: ["python", "main.py", "index", "/mnt/source", "disk_d"]
command: ["python", "app/main.py", "index", "/media/mike/SMT", "SMT"]
profiles:
- index-only
networks:
@@ -127,7 +127,7 @@ services:
- ./config:/app/config
- ./plans:/app/plans
- ./logs:/app/logs
command: ["python", "main.py", "plan", "disk_d", "disk_e"]
command: ["python", "app/main.py", "plan", "/media/mike/SMT", "SMT"]
profiles:
- plan-only
networks:
@@ -151,7 +151,7 @@ services:
- ./plans:/app/plans
- ./config:/app/config
- ./logs:/app/logs
command: ["python", "main.py", "execute", "/app/plans/plan.json"]
command: ["python", "app/main.py", "execute", "/app/plans/plan.json"]
profiles:
- execute-only
networks:
@@ -173,7 +173,7 @@ services:
- ./plans:/app/plans
- ./config:/app/config
- ./logs:/app/logs
command: ["python", "main.py", "execute", "/app/plans/plan.json", "--dry-run"]
command: ["python", "app/main.py", "execute", "/app/plans/plan.json", "--dry-run"]
profiles:
- dry-run-only
networks:
@@ -194,7 +194,7 @@ services:
volumes:
- ./reports:/app/reports
- ./logs:/app/logs
command: ["python", "main.py", "report", "--format", "html"]
command: ["python", "app/main.py", "report"]
profiles:
- report-only
networks:
@@ -232,6 +232,37 @@ services:
networks:
- defrag-network
flyway:
image: flyway/flyway:latest
container_name: flyway
volumes:
- ./sql/migration:/flyway/sql:ro
environment:
FLYWAY_URL: jdbc:postgresql://192.168.1.159:5432/disk_reorganizer_db
FLYWAY_USER: disk_reorg_user
FLYWAY_PASSWORD: heel-goed-wachtwoord
FLYWAY_SCHEMAS: public
FLYWAY_LOCATIONS: filesystem:./sql
FLYWAY_CONNECT_RETRIES: "60"
command: migrate
restart: "no"
pg_backup:
image: postgres:16
container_name: pg_backup
environment:
PGPASSWORD: heel-goed-wachtwoord
volumes:
- ./:/backup
command:
- bash
- -lc
- >
pg_dump -h 192.168.1.159 -p 5432 -U disk_reorg_user -d disk_reorganizer_db
--format=custom --no-owner --no-privileges
-f /backup/backup_$(date +%F_%H%M)_disk_reorganizer_db.dump
restart: "no"
networks:
defrag-network:
driver: bridge

7
flyway.conf Normal file
View File

@@ -0,0 +1,7 @@
flyway.url=jdbc:postgresql://192.168.1.159:5432/disk_reorganizer_db
flyway.user=disk_org_user
flyway.password=heel-goed-wachtwoord
flyway.locations=filesystem:sql/migration
flyway.schemas=public

6492
output.md

File diff suppressed because it is too large Load Diff

View File

@@ -37,3 +37,5 @@ pytest-cov>=4.0.0
black>=23.0.0
mypy>=1.0.0
flake8>=6.0.0
chardet

51
setup.sh Normal file
View File

@@ -0,0 +1,51 @@
#!/bin/bash
# setup.sh - Complete Docker setup for Project Defrag
set -e
echo "🚀 Setting up Project Defrag with Docker..."
# 1. Create necessary directories
echo "📁 Creating directories..."
mkdir -p {config,plans,logs,reports,sql/migrations}
# 2. Copy environment file
if [ ! -f .env ]; then
echo "⚙️ Creating .env file from template..."
cp .env.example .env
echo "⚠️ Please edit .env file with your configuration!"
fi
# 3. Build the Docker image
echo "🐳 Building Docker image..."
docker compose build app
# 4. Start the database
#echo "🗄️ Starting PostgreSQL database..."
#docker-compose up -d postgres
# 5. Wait for database to be ready
#echo "⏳ Waiting for database to be ready..."
#sleep 10
# 6. Run database initialization
#echo "📊 Initializing database..."
#docker-compose exec -T postgres psql -U disk_reorg_user -d disk_reorganizer_db -f /docker-entrypoint-initdb.d/init.sql
# 7. Start optional services
echo "🔧 Starting monitoring services..."
docker compose --profile monitoring up -d
echo "✅ Setup complete!"
echo ""
echo "📋 Available commands:"
echo " docker compose up -d # Start all services"
echo " docker compose --profile index-only up index # Run index only"
echo " docker compose --profile plan-only up plan # Generate plan"
echo " docker compose --profile dry-run-only up dry-run # Dry run"
echo " docker compose --profile execute-only up execute # Execute migration"
echo " docker compose --profile report-only up report # Generate report"
echo ""
echo "🌐 Access monitoring:"
echo " - PostgreSQL Admin: http://localhost:5050"
echo " - Redis Commander: http://localhost:8081"

View File

@@ -1,164 +0,0 @@
-- sql/init.sql
-- Initialize PostgreSQL database for Project Defrag
-- Enable useful extensions
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS "pgcrypto";
-- Files table
CREATE TABLE IF NOT EXISTS files (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
path TEXT NOT NULL,
size BIGINT NOT NULL,
modified_time TIMESTAMP WITH TIME ZONE,
created_time TIMESTAMP WITH TIME ZONE,
file_hash VARCHAR(64), -- SHA-256 hash
category VARCHAR(50),
disk_label VARCHAR(50),
last_verified TIMESTAMP WITH TIME ZONE,
-- Metadata
metadata JSONB DEFAULT '{}',
-- Audit fields
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
-- Constraints
CONSTRAINT unique_file_path UNIQUE(path)
);
-- Operations table (audit log)
CREATE TABLE IF NOT EXISTS operations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
operation_type VARCHAR(50) NOT NULL,
source_path TEXT,
target_path TEXT,
status VARCHAR(20) NOT NULL,
-- File reference
file_id UUID REFERENCES files(id) ON DELETE SET NULL,
-- Performance metrics
duration_ms INTEGER,
bytes_processed BIGINT,
-- Error information
error_message TEXT,
error_details JSONB,
-- Context
session_id VARCHAR(100),
user_agent TEXT,
-- Audit fields
started_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
-- Deduplication hash store
CREATE TABLE IF NOT EXISTS deduplication_store (
hash VARCHAR(64) PRIMARY KEY,
canonical_path TEXT NOT NULL,
reference_count INTEGER DEFAULT 1,
first_seen TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
-- Migration plan table
CREATE TABLE IF NOT EXISTS migration_plans (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(100) NOT NULL,
source_disk VARCHAR(50) NOT NULL,
target_disk VARCHAR(50) NOT NULL,
plan_json JSONB NOT NULL,
-- Statistics
total_files INTEGER DEFAULT 0,
total_size BIGINT DEFAULT 0,
estimated_duration INTEGER, -- in seconds
-- Status
status VARCHAR(20) DEFAULT 'draft',
-- Audit
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
executed_at TIMESTAMP WITH TIME ZONE,
completed_at TIMESTAMP WITH TIME ZONE
);
-- Indexes for performance
CREATE INDEX IF NOT EXISTS idx_files_path ON files(path);
CREATE INDEX IF NOT EXISTS idx_files_hash ON files(file_hash);
CREATE INDEX IF NOT EXISTS idx_files_disk ON files(disk_label);
CREATE INDEX IF NOT EXISTS idx_files_category ON files(category);
CREATE INDEX IF NOT EXISTS idx_operations_status ON operations(status);
CREATE INDEX IF NOT EXISTS idx_operations_created ON operations(created_at);
CREATE INDEX IF NOT EXISTS idx_operations_file_id ON operations(file_id);
CREATE INDEX IF NOT EXISTS idx_dedup_canonical ON deduplication_store(canonical_path);
-- Functions for updating timestamps
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ language 'plpgsql';
-- Triggers for automatic updated_at
CREATE TRIGGER update_files_updated_at BEFORE UPDATE ON files
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
-- View for operational dashboard
CREATE OR REPLACE VIEW operational_dashboard AS
SELECT
o.status,
COUNT(*) as operation_count,
SUM(o.bytes_processed) as total_bytes,
AVG(o.duration_ms) as avg_duration_ms,
MIN(o.started_at) as earliest_operation,
MAX(o.completed_at) as latest_operation
FROM operations o
WHERE o.started_at > CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY o.status;
-- View for disk usage statistics
CREATE OR REPLACE VIEW disk_usage_stats AS
SELECT
disk_label,
COUNT(*) as file_count,
SUM(size) as total_size,
AVG(size) as avg_file_size,
MIN(created_time) as oldest_file,
MAX(modified_time) as newest_file
FROM files
GROUP BY disk_label;
-- Insert default configuration
INSERT INTO migration_plans (name, source_disk, target_disk, plan_json, status)
VALUES (
'Default Migration Plan',
'disk_d',
'disk_e',
'{"strategy": "hardlink", "verify_copies": true, "preserve_timestamps": true}'::jsonb,
'draft'
) ON CONFLICT DO NOTHING;
-- Create read-only user for monitoring
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = 'monitor_user') THEN
CREATE USER monitor_user WITH PASSWORD 'monitor_password';
END IF;
END
$$;
GRANT CONNECT ON DATABASE disk_reorganizer_db TO monitor_user;
GRANT USAGE ON SCHEMA public TO monitor_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO monitor_user;
GRANT SELECT ON operational_dashboard TO monitor_user;
GRANT SELECT ON disk_usage_stats TO monitor_user;

View File

@@ -0,0 +1,188 @@
-- sql/init.sql
-- Initialize PostgreSQL database for Project Defrag
-- Enable useful extensions
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS "pgcrypto";
-- future tables/sequences created by your owner role (pick the role that creates them)
ALTER DEFAULT PRIVILEGES FOR ROLE auction IN SCHEMA public
GRANT ALL PRIVILEGES ON TABLES TO disk_reorg_user;
ALTER DEFAULT PRIVILEGES FOR ROLE auction IN SCHEMA public
GRANT ALL PRIVILEGES ON SEQUENCES TO disk_reorg_user;
ALTER DATABASE disk_reorganizer_db OWNER TO disk_reorg_user;
-- Files table
CREATE TABLE IF NOT EXISTS files
(
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
path TEXT NOT NULL,
size BIGINT NOT NULL,
modified_time TIMESTAMP WITH TIME ZONE,
created_time TIMESTAMP WITH TIME ZONE,
file_hash VARCHAR(64), -- SHA-256 hash
checksum VARCHAR(64), -- Alias for file_hash (legacy compatibility)
category VARCHAR(50),
disk_label VARCHAR(50),
last_verified TIMESTAMP WITH TIME ZONE,
status VARCHAR(20) DEFAULT 'indexed',
duplicate_of TEXT, -- Path to canonical file if this is a duplicate
-- Metadata
metadata JSONB DEFAULT '{}',
-- Audit fields
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
-- Constraints
CONSTRAINT unique_file_path UNIQUE (path)
);
-- Operations table (audit log)
CREATE TABLE IF NOT EXISTS operations
(
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
operation_type VARCHAR(50) NOT NULL,
source_path TEXT,
target_path TEXT,
status VARCHAR(20) NOT NULL,
-- Legacy compatibility fields
executed INTEGER DEFAULT 0,
verified INTEGER DEFAULT 0,
error TEXT,
-- File reference
file_id UUID REFERENCES files (id) ON DELETE SET NULL,
-- Performance metrics
duration_ms INTEGER,
bytes_processed BIGINT,
-- Error information
error_message TEXT,
error_details JSONB,
-- Context
session_id VARCHAR(100),
user_agent TEXT,
-- Audit fields
started_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP WITH TIME ZONE,
executed_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
-- Deduplication hash store
CREATE TABLE IF NOT EXISTS deduplication_store
(
hash VARCHAR(64) PRIMARY KEY,
canonical_path TEXT NOT NULL,
reference_count INTEGER DEFAULT 1,
first_seen TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
-- Migration plan table
CREATE TABLE IF NOT EXISTS migration_plans
(
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(100) NOT NULL,
source_disk VARCHAR(50) NOT NULL,
target_disk VARCHAR(50) NOT NULL,
plan_json JSONB NOT NULL,
-- Statistics
total_files INTEGER DEFAULT 0,
total_size BIGINT DEFAULT 0,
estimated_duration INTEGER, -- in seconds
-- Status
status VARCHAR(20) DEFAULT 'draft',
-- Audit
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
executed_at TIMESTAMP WITH TIME ZONE,
completed_at TIMESTAMP WITH TIME ZONE
);
-- Indexes for performance
CREATE INDEX IF NOT EXISTS idx_files_path ON files (path);
CREATE INDEX IF NOT EXISTS idx_files_hash ON files (file_hash);
CREATE INDEX IF NOT EXISTS idx_files_disk ON files (disk_label);
CREATE INDEX IF NOT EXISTS idx_files_category ON files (category);
CREATE INDEX IF NOT EXISTS idx_files_status ON files (status);
create index on files (checksum);
create index on files (checksum, path);
CREATE INDEX IF NOT EXISTS idx_operations_status ON operations (status);
CREATE INDEX IF NOT EXISTS idx_operations_created ON operations (created_at);
CREATE INDEX IF NOT EXISTS idx_operations_file_id ON operations (file_id);
CREATE INDEX IF NOT EXISTS idx_dedup_canonical ON deduplication_store (canonical_path);
-- Functions for updating timestamps
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS
$$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ language 'plpgsql';
-- Triggers for automatic updated_at
CREATE TRIGGER update_files_updated_at
BEFORE UPDATE
ON files
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();
-- View for operational dashboard
CREATE OR REPLACE VIEW operational_dashboard AS
SELECT o.status,
COUNT(*) as operation_count,
SUM(o.bytes_processed) as total_bytes,
AVG(o.duration_ms) as avg_duration_ms,
MIN(o.started_at) as earliest_operation,
MAX(o.completed_at) as latest_operation
FROM operations o
WHERE o.started_at > CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY o.status;
-- View for disk usage statistics
CREATE OR REPLACE VIEW disk_usage_stats AS
SELECT disk_label,
COUNT(*) as file_count,
SUM(size) as total_size,
AVG(size) as avg_file_size,
MIN(created_time) as oldest_file,
MAX(modified_time) as newest_file
FROM files
GROUP BY disk_label;
-- Insert default configuration
INSERT INTO migration_plans (name, source_disk, target_disk, plan_json, status)
VALUES ('Default Migration Plan',
'disk_d',
'disk_e',
'{"strategy": "hardlink", "verify_copies": true, "preserve_timestamps": true}'::jsonb,
'draft')
ON CONFLICT DO NOTHING;
-- Create read-only user for monitoring
DO
$$
BEGIN
IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = 'monitor_user') THEN
CREATE USER monitor_user WITH PASSWORD 'monitor_password';
END IF;
END
$$;
GRANT CONNECT ON DATABASE disk_reorganizer_db TO monitor_user;
GRANT USAGE ON SCHEMA public TO monitor_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO monitor_user;
GRANT SELECT ON operational_dashboard TO monitor_user;
GRANT SELECT ON disk_usage_stats TO monitor_user;

View File

@@ -0,0 +1,11 @@
-- Add extracted text and enrichment columns
ALTER TABLE files ADD COLUMN IF NOT EXISTS extracted_text TEXT;
ALTER TABLE files ADD COLUMN IF NOT EXISTS text_quality VARCHAR(20);
ALTER TABLE files ADD COLUMN IF NOT EXISTS enrichment JSONB;
-- Add indexes for text search
CREATE INDEX IF NOT EXISTS idx_files_extracted_text ON files USING gin(to_tsvector('english', extracted_text));
CREATE INDEX IF NOT EXISTS idx_files_enrichment ON files USING gin(enrichment);
-- Add full text search capability
CREATE INDEX IF NOT EXISTS idx_files_fts ON files USING gin(to_tsvector('english', COALESCE(extracted_text, '')));

View File

@@ -0,0 +1,41 @@
CREATE TABLE IF NOT EXISTS folders
(
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
path TEXT NOT NULL UNIQUE,
parent_path TEXT,
disk_label VARCHAR(50),
file_count INT DEFAULT 0,
total_size BIGINT DEFAULT 0,
project_type VARCHAR(50),
intent TEXT,
summary TEXT,
has_readme BOOLEAN DEFAULT FALSE,
has_git BOOLEAN DEFAULT FALSE,
has_manifest BOOLEAN DEFAULT FALSE,
manifest_types TEXT[],
dominant_file_types JSONB,
structure JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_folders_path ON folders (path);
CREATE INDEX IF NOT EXISTS idx_folders_parent ON folders (parent_path);
CREATE INDEX IF NOT EXISTS idx_folders_disk ON folders (disk_label);
CREATE INDEX IF NOT EXISTS idx_folders_project_type ON folders (project_type);
CREATE TABLE IF NOT EXISTS processing_checkpoints
(
task_name VARCHAR(100) PRIMARY KEY,
last_processed_id TEXT,
last_processed_path TEXT,
processed_count INT DEFAULT 0,
total_count INT,
started_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

View File

@@ -1,6 +1,8 @@
-- PostgreSQL Database Setup Script for Disk Reorganizer
-- Database: disk_reorganizer_db
-- User: disk_reorg_user
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE EXTENSION IF NOT EXISTS "pgcrypto";
-- Create the database (run as superuser: auction)
CREATE DATABASE disk_reorganizer_db
@@ -17,48 +19,27 @@ CREATE DATABASE disk_reorganizer_db
CREATE USER disk_reorg_user WITH PASSWORD 'heel-goed-wachtwoord';
-- Create files table
CREATE TABLE IF NOT EXISTS files (
path TEXT PRIMARY KEY,
size BIGINT NOT NULL,
modified_time DOUBLE PRECISION NOT NULL,
disk TEXT NOT NULL,
checksum TEXT,
status TEXT DEFAULT 'indexed',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Create index on disk column for faster queries
CREATE INDEX IF NOT EXISTS idx_files_disk ON files(disk);
CREATE INDEX IF NOT EXISTS idx_files_status ON files(status);
-- Create operations table
CREATE TABLE IF NOT EXISTS operations (
id SERIAL PRIMARY KEY,
source_path TEXT NOT NULL,
dest_path TEXT NOT NULL,
operation_type TEXT NOT NULL,
executed INTEGER DEFAULT 0,
verified INTEGER DEFAULT 0,
error TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
executed_at TIMESTAMP
);
-- Create index on operations for faster lookups
CREATE INDEX IF NOT EXISTS idx_operations_executed ON operations(executed);
CREATE INDEX IF NOT EXISTS idx_operations_source ON operations(source_path);
-- Grant privileges to disk_reorg_user
GRANT CONNECT ON DATABASE disk_reorganizer_db TO disk_reorg_user;
GRANT USAGE ON SCHEMA public TO disk_reorg_user;
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE files TO disk_reorg_user;
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE operations TO disk_reorg_user;
GRANT USAGE, SELECT ON SEQUENCE operations_id_seq TO disk_reorg_user;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO disk_reorg_user;
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO disk_reorg_user;
-- future tables/sequences created by your owner role (pick the role that creates them)
ALTER DEFAULT PRIVILEGES FOR ROLE auction IN SCHEMA public
GRANT ALL PRIVILEGES ON TABLES TO disk_reorg_user;
ALTER DEFAULT PRIVILEGES FOR ROLE auction IN SCHEMA public
GRANT ALL PRIVILEGES ON SEQUENCES TO disk_reorg_user;
-- Create function to update updated_at timestamp
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
RETURNS TRIGGER AS
$$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
@@ -67,9 +48,10 @@ $$ LANGUAGE plpgsql;
-- Create trigger for files table
CREATE TRIGGER update_files_updated_at
BEFORE UPDATE ON files
BEFORE UPDATE
ON files
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();
EXECUTE FUNCTION update_updated_at_column();
-- Display success message
\echo 'Database setup completed successfully!'

View File

@@ -1,17 +0,0 @@
"""Classification package exports"""
from .rules import RuleBasedClassifier
from .ml import create_ml_classifier, train_from_database, MLClassifier, DummyMLClassifier
from .engine import ClassificationEngine
from ._protocols import ClassificationRule, IClassifier, IRuleEngine
__all__ = [
'RuleBasedClassifier',
'MLClassifier',
'DummyMLClassifier',
'create_ml_classifier',
'train_from_database',
'ClassificationEngine',
'ClassificationRule',
'IClassifier',
'IRuleEngine',
]

View File

@@ -1,17 +0,0 @@
"""Discovery package exports"""
from .scanner import FileScanner, FilteredScanner
from .system import SystemAPI
from .engine import DiscoveryEngine
from ._protocols import FileMeta, MountInfo, DiskInfo, IFileScanner, ISystemAPI
__all__ = [
'FileScanner',
'FilteredScanner',
'SystemAPI',
'DiscoveryEngine',
'FileMeta',
'MountInfo',
'DiskInfo',
'IFileScanner',
'ISystemAPI',
]

View File

@@ -1,321 +0,0 @@
"""Discovery engine coordinating scanner and system APIs"""
from pathlib import Path
from typing import Optional, Callable
from datetime import datetime
import psycopg2
from psycopg2.extras import execute_batch
from .scanner import FileScanner
from .system import SystemAPI
from ._protocols import FileMeta
from ..shared.models import FileRecord, DiskInfo, ProcessingStats
from ..shared.config import DatabaseConfig
from ..shared.logger import ProgressLogger
class DiscoveryEngine:
"""Discovery engine for scanning and cataloging files"""
def __init__(
self,
db_config: DatabaseConfig,
logger: ProgressLogger,
batch_size: int = 1000
):
"""Initialize discovery engine
Args:
db_config: Database configuration
logger: Progress logger
batch_size: Number of records to batch before database commit
"""
self.db_config = db_config
self.logger = logger
self.batch_size = batch_size
self.system_api = SystemAPI()
self._connection = None
def _get_connection(self):
"""Get or create database connection"""
if self._connection is None or self._connection.closed:
self._connection = psycopg2.connect(
host=self.db_config.host,
port=self.db_config.port,
database=self.db_config.database,
user=self.db_config.user,
password=self.db_config.password
)
return self._connection
def _ensure_tables(self):
"""Ensure database tables exist"""
conn = self._get_connection()
cursor = conn.cursor()
# Create files table
cursor.execute("""
CREATE TABLE IF NOT EXISTS files (
id SERIAL PRIMARY KEY,
path TEXT NOT NULL UNIQUE,
size BIGINT NOT NULL,
modified_time DOUBLE PRECISION NOT NULL,
created_time DOUBLE PRECISION NOT NULL,
disk TEXT NOT NULL,
checksum TEXT,
status TEXT DEFAULT 'indexed',
category TEXT,
duplicate_of TEXT,
discovered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Create index on path
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_files_path ON files(path)
""")
# Create index on disk
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_files_disk ON files(disk)
""")
# Create index on checksum
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_files_checksum ON files(checksum)
""")
conn.commit()
cursor.close()
def discover_path(
self,
root: Path,
scanner: Optional[FileScanner] = None,
progress_callback: Optional[Callable[[int, int, ProcessingStats], None]] = None
) -> ProcessingStats:
"""Discover and catalog files in a path
Args:
root: Root path to discover
scanner: Optional custom scanner (default: FileScanner())
progress_callback: Optional callback for progress updates
Returns:
ProcessingStats with discovery statistics
"""
self.logger.section(f"Discovering: {root}")
# Ensure tables exist
self._ensure_tables()
# Create scanner if not provided
if scanner is None:
scanner = FileScanner(
error_handler=lambda e, p: self.logger.warning(f"Error scanning {p}: {e}")
)
# Get disk info for the root path
disk = self.system_api.get_disk_for_path(root)
if disk is None:
disk = str(root)
# Initialize statistics
stats = ProcessingStats()
batch = []
conn = self._get_connection()
cursor = conn.cursor()
try:
# Scan files
for file_meta in scanner.scan(root):
# Create file record
record = FileRecord(
path=file_meta.path,
size=file_meta.size,
modified_time=file_meta.modified_time,
created_time=file_meta.created_time,
disk=disk
)
batch.append(record)
stats.files_processed += 1
stats.bytes_processed += record.size
# Batch insert
if len(batch) >= self.batch_size:
self._insert_batch(cursor, batch)
conn.commit()
batch.clear()
# Progress callback
if progress_callback:
progress_callback(stats.files_processed, 0, stats)
# Log progress
if stats.files_processed % (self.batch_size * 10) == 0:
self.logger.progress(
stats.files_processed,
stats.files_processed, # We don't know total
prefix="Files discovered",
bytes_processed=stats.bytes_processed,
elapsed_seconds=stats.elapsed_seconds
)
# Insert remaining batch
if batch:
self._insert_batch(cursor, batch)
conn.commit()
stats.files_succeeded = stats.files_processed
except Exception as e:
conn.rollback()
self.logger.error(f"Discovery failed: {e}")
raise
finally:
cursor.close()
self.logger.info(
f"Discovery complete: {stats.files_processed} files, "
f"{stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s"
)
return stats
def _insert_batch(self, cursor, batch: list[FileRecord]):
"""Insert batch of file records
Args:
cursor: Database cursor
batch: List of FileRecord objects
"""
query = """
INSERT INTO files (path, size, modified_time, created_time, disk, checksum, status, category, duplicate_of)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (path) DO UPDATE SET
size = EXCLUDED.size,
modified_time = EXCLUDED.modified_time,
updated_at = CURRENT_TIMESTAMP
"""
data = [
(
str(record.path),
record.size,
record.modified_time,
record.created_time,
record.disk,
record.checksum,
record.status,
record.category,
record.duplicate_of
)
for record in batch
]
execute_batch(cursor, query, data, page_size=self.batch_size)
def get_disk_info(self) -> list[DiskInfo]:
"""Get information about all disks
Returns:
List of DiskInfo objects
"""
self.logger.subsection("Querying disk information")
disks = []
for disk_info in self.system_api.query_nvmes():
# Get mount point if available
mount_point = None
fs_type = "unknown"
for mount in self.system_api.query_mounts():
if mount.device == disk_info.device:
mount_point = Path(mount.mount_point)
fs_type = mount.fs_type
break
if mount_point:
total, used, free = self.system_api.get_disk_usage(mount_point)
else:
total = disk_info.size
used = 0
free = disk_info.size
disk = DiskInfo(
name=disk_info.device,
device=disk_info.device,
mount_point=mount_point or Path("/"),
total_size=total,
used_size=used,
free_size=free,
fs_type=fs_type
)
disks.append(disk)
self.logger.info(
f" {disk.name}: {disk.usage_percent:.1f}% used "
f"({disk.used_size:,} / {disk.total_size:,} bytes)"
)
return disks
def get_file_count(self, disk: Optional[str] = None) -> int:
"""Get count of discovered files
Args:
disk: Optional disk filter
Returns:
Count of files
"""
conn = self._get_connection()
cursor = conn.cursor()
if disk:
cursor.execute("SELECT COUNT(*) FROM files WHERE disk = %s", (disk,))
else:
cursor.execute("SELECT COUNT(*) FROM files")
count = cursor.fetchone()[0]
cursor.close()
return count
def get_total_size(self, disk: Optional[str] = None) -> int:
"""Get total size of discovered files
Args:
disk: Optional disk filter
Returns:
Total size in bytes
"""
conn = self._get_connection()
cursor = conn.cursor()
if disk:
cursor.execute("SELECT COALESCE(SUM(size), 0) FROM files WHERE disk = %s", (disk,))
else:
cursor.execute("SELECT COALESCE(SUM(size), 0) FROM files")
total = cursor.fetchone()[0]
cursor.close()
return total
def close(self):
"""Close database connection"""
if self._connection and not self._connection.closed:
self._connection.close()
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.close()

View File

@@ -1,513 +0,0 @@
#!/usr/bin/env python3
"""
Disk Reorganizer - Safely restructure files across disks to free up one entire disk.
Three modes: index, plan, execute
"""
import os
import sys
import psycopg2
from psycopg2 import sql
from psycopg2.extras import RealDictCursor
import shutil
import hashlib
import argparse
import json
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import List, Dict, Optional, Tuple
from datetime import datetime
import logging
import time
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('disk_reorganizer.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
@dataclass
class FileRecord:
"""Represents a file in the index"""
path: str
size: int
modified_time: float
disk: str
checksum: Optional[str] = None
status: str = 'indexed' # indexed, planned, moved, verified
class DiskReorganizer:
def __init__(self, db_config: Dict = None):
"""
Initialize DiskReorganizer with PostgreSQL connection
:param db_config: Database configuration dict with host, port, database, user, password
"""
if db_config is None:
db_config = {
'host': '192.168.1.159',
'port': 5432,
'database': 'disk_reorganizer_db',
'user': 'disk_reorg_user',
'password': 'heel-goed-wachtwoord'
}
self.db_config = db_config
self.init_database()
def get_connection(self):
"""Get PostgreSQL database connection"""
return psycopg2.connect(**self.db_config)
def init_database(self):
"""Verify PostgreSQL database connection and tables exist"""
try:
conn = self.get_connection()
cursor = conn.cursor()
# Test connection and verify tables exist
cursor.execute("""
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public' AND table_name IN ('files', 'operations')
""")
tables = cursor.fetchall()
if len(tables) < 2:
logger.error("Database tables not found! Please run setup_database.sh first.")
raise Exception("Database not properly initialized. Run setup_database.sh")
cursor.close()
conn.close()
logger.info("Database connection verified successfully")
except psycopg2.Error as e:
logger.error(f"Database connection failed: {e}")
raise
def index_disk(self, disk_root: str, disk_name: str):
"""
Index all files on a disk/partition with dynamic progress display
:param disk_root: Root path of disk (e.g., 'D:\\')
:param disk_name: Logical name for the disk
"""
logger.info(f"Indexing disk: {disk_name} at {disk_root}")
disk_path = Path(disk_root)
if not disk_path.exists():
logger.error(f"Disk path {disk_root} does not exist!")
return
files_count = 0
total_size = 0
start_time = time.time()
conn = self.get_connection()
cursor = conn.cursor()
try:
# Walk through all files
for root, dirs, files in os.walk(disk_path):
# Skip system directories
dirs[:] = [d for d in dirs if not d.startswith(('$', 'System Volume Information', 'Recovery'))]
for file in files:
try:
file_path = Path(root) / file
if not file_path.is_file():
continue
stat = file_path.stat()
size = stat.st_size
mtime = stat.st_mtime
# Calculate relative path for portability
rel_path = str(file_path.relative_to(disk_path))
# PostgreSQL INSERT ... ON CONFLICT for upsert
cursor.execute("""
INSERT INTO files (path, size, modified_time, disk, checksum, status)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (path) DO UPDATE SET
size = EXCLUDED.size,
modified_time = EXCLUDED.modified_time,
disk = EXCLUDED.disk,
status = EXCLUDED.status
""", (rel_path, size, mtime, disk_name, None, 'indexed'))
files_count += 1
total_size += size
# Dynamic progress display - update every 100 files
if files_count % 100 == 0:
elapsed = time.time() - start_time
rate = files_count / elapsed if elapsed > 0 else 0
# Truncate path for display
display_path = str(file_path)
if len(display_path) > 60:
display_path = '...' + display_path[-57:]
# Use \r to overwrite the line
print(f"\rIndexing: {files_count:,} files | {self.format_size(total_size)} | {rate:.0f} files/s | {display_path}", end='', flush=True)
# Commit every 1000 files for performance
if files_count % 1000 == 0:
conn.commit()
except Exception as e:
logger.warning(f"\nSkipping {file_path}: {e}")
continue
conn.commit()
print() # New line after progress display
logger.info(f"Completed indexing {disk_name}: {files_count} files, {self.format_size(total_size)}")
finally:
cursor.close()
conn.close()
def calculate_disk_usage(self) -> Dict[str, Dict]:
"""Calculate current usage per disk"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute("""
SELECT disk, SUM(size) as total_size, COUNT(*) as file_count
FROM files
GROUP BY disk
""")
usage = {}
for row in cursor.fetchall():
disk = row[0]
size = row[1] or 0
count = row[2]
usage[disk] = {
'size': size,
'count': count,
'formatted_size': self.format_size(size)
}
return usage
finally:
cursor.close()
conn.close()
def plan_migration(self, target_disk: str, destination_disks: List[str]) -> Dict:
"""
Create a migration plan to free up target_disk
:param target_disk: Disk to free up (e.g., 'D:')
:param destination_disks: List of disks to move files to
:return: Migration plan dictionary
"""
logger.info(f"Planning migration to free up {target_disk}")
usage = self.calculate_disk_usage()
if target_disk not in usage:
logger.error(f"Target disk {target_disk} not found in index!")
return {}
# Get files on target disk
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute(
"SELECT path, size, modified_time FROM files WHERE disk = %s ORDER BY size DESC",
(target_disk,)
)
files_to_move = cursor.fetchall()
cursor.close()
conn.close()
target_disk_usage = usage[target_disk]['size']
logger.info(f"Need to move {len(files_to_move)} files, {self.format_size(target_disk_usage)}")
# Calculate available space on destination disks
dest_availability = []
for disk in destination_disks:
if disk not in usage:
# Assume empty disk
available = float('inf')
else:
# In real scenario, query actual disk free space
available = float('inf') # Placeholder
dest_availability.append({
'disk': disk,
'available': available,
'planned_usage': 0
})
# Generate move plan
plan = {
'target_disk': target_disk,
'total_size': target_disk_usage,
'file_count': len(files_to_move),
'operations': [],
'destination_disks': destination_disks
}
conn = self.get_connection()
cursor = conn.cursor()
try:
for file_info in files_to_move:
rel_path, size, mtime = file_info
# Find best destination (simple round-robin for balance)
dest_disk = destination_disks[len(plan['operations']) % len(destination_disks)]
# Record operation
op = {
'source_disk': target_disk,
'source_path': rel_path,
'dest_disk': dest_disk,
'dest_path': rel_path, # Keep same relative path
'size': size
}
plan['operations'].append(op)
# Store in database
cursor.execute(
"INSERT INTO operations (source_path, dest_path, operation_type) VALUES (%s, %s, %s)",
(f"{target_disk}:{rel_path}", f"{dest_disk}:{rel_path}", 'move')
)
conn.commit()
finally:
cursor.close()
conn.close()
# Save plan to JSON
plan_file = f"migration_plan_{target_disk}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(plan_file, 'w') as f:
json.dump(plan, f, indent=2)
logger.info(f"Plan created with {len(plan['operations'])} operations")
logger.info(f"Plan saved to {plan_file}")
return plan
def verify_operation(self, source: Path, dest: Path) -> bool:
"""Verify file was copied correctly (size + optional checksum)"""
if not dest.exists():
return False
try:
source_stat = source.stat()
dest_stat = dest.stat()
if source_stat.st_size != dest_stat.st_size:
return False
# Optional: checksum verification for critical files
# if source_stat.st_size < 100*1024*1024: # Only for files < 100MB
# return self.file_checksum(source) == self.file_checksum(dest)
return True
except Exception as e:
logger.error(f"Verification error: {e}")
return False
@staticmethod
def file_checksum(path: Path) -> str:
"""Calculate MD5 checksum of file"""
hash_md5 = hashlib.md5()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def execute_migration(self, plan_file: str, dry_run: bool = True):
"""
Execute migration plan
:param plan_file: Path to plan JSON file
:param dry_run: If True, only simulate operations
"""
logger.info(f"{'DRY RUN' if dry_run else 'EXECUTING'} migration from {plan_file}")
with open(plan_file, 'r') as f:
plan = json.load(f)
operations = plan['operations']
logger.info(f"Processing {len(operations)} operations...")
success_count = 0
error_count = 0
start_time = time.time()
conn = self.get_connection()
cursor = conn.cursor()
try:
for i, op in enumerate(operations, 1):
source_disk = op['source_disk']
source_path = op['source_path']
dest_disk = op['dest_disk']
dest_path = op['dest_path']
source_full = Path(source_disk) / source_path
dest_full = Path(dest_disk) / dest_path
# Dynamic progress display
elapsed = time.time() - start_time
rate = i / elapsed if elapsed > 0 else 0
eta = (len(operations) - i) / rate if rate > 0 else 0
display_path = str(source_path)
if len(display_path) > 50:
display_path = '...' + display_path[-47:]
print(f"\r[{i}/{len(operations)}] {success_count} OK, {error_count} ERR | {rate:.1f} files/s | ETA: {int(eta)}s | {display_path}", end='', flush=True)
if dry_run:
# Simulate
if source_full.exists():
success_count += 1
else:
logger.warning(f"\n Source does not exist: {source_full}")
error_count += 1
continue
try:
# Create destination directory
dest_full.parent.mkdir(parents=True, exist_ok=True)
# Move file (copy + verify + delete)
if source_full.exists():
# Copy with metadata
shutil.copy2(source_full, dest_full)
# Verify
if self.verify_operation(source_full, dest_full):
# Update database
cursor.execute(
"UPDATE files SET disk = %s, status = 'moved' WHERE path = %s AND disk = %s",
(dest_disk, source_path, source_disk)
)
# Safe delete (could be made optional)
# source_full.unlink()
# Log operation as executed
cursor.execute(
"UPDATE operations SET executed = 1, executed_at = CURRENT_TIMESTAMP WHERE source_path = %s",
(f"{source_disk}:{source_path}",)
)
success_count += 1
else:
raise Exception("Verification failed")
else:
logger.warning(f"\n Source missing: {source_full}")
error_count += 1
except Exception as e:
logger.error(f"\n Error processing {source_path}: {e}")
cursor.execute(
"UPDATE operations SET error = %s WHERE source_path = %s",
(str(e), f"{source_disk}:{source_path}")
)
error_count += 1
# Commit every 10 operations
if i % 10 == 0:
conn.commit()
conn.commit()
print() # New line after progress display
finally:
cursor.close()
conn.close()
logger.info(f"Migration complete: {success_count} success, {error_count} errors")
if not dry_run and error_count == 0:
logger.info(f"✓ Disk {plan['target_disk']} is ready for Linux installation!")
logger.info(f" Remember to safely delete original files from {plan['target_disk']}")
def generate_report(self):
"""Generate status report"""
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute("""
SELECT status, COUNT(*), SUM(size) FROM files GROUP BY status
""")
print("\n=== FILE MIGRATION REPORT ===")
for row in cursor.fetchall():
status, count, size = row
print(f"{status:15}: {count:6} files, {self.format_size(size or 0)}")
cursor.execute("""
SELECT operation_type, executed, verified, COUNT(*) FROM operations GROUP BY operation_type, executed, verified
""")
print("\n=== OPERATIONS REPORT ===")
for row in cursor.fetchall():
op_type, executed, verified, count = row
status = "EXECUTED" if executed else "PENDING"
if verified:
status += "+VERIFIED"
print(f"{op_type:10} {status:15}: {count} operations")
finally:
cursor.close()
conn.close()
@staticmethod
def format_size(size: int) -> str:
"""Format bytes to human readable string"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size < 1024:
return f"{size:.1f}{unit}"
size /= 1024
return f"{size:.1f}PB"
def main():
parser = argparse.ArgumentParser(description='Disk Reorganizer - Free up a disk for Linux dual-boot')
subparsers = parser.add_subparsers(dest='command', required=True)
# Index command
index_parser = subparsers.add_parser('index', help='Index files on a disk')
index_parser.add_argument('disk_root', help='Root path of disk (e.g., D:\\\\)')
index_parser.add_argument('disk_name', help='Logical name for the disk')
# Plan command
plan_parser = subparsers.add_parser('plan', help='Create migration plan')
plan_parser.add_argument('target_disk', help='Disk to free up')
plan_parser.add_argument('dest_disks', nargs='+', help='Destination disks')
# Execute command
exec_parser = subparsers.add_parser('execute', help='Execute migration plan')
exec_parser.add_argument('plan_file', help='Path to plan JSON file')
exec_parser.add_argument('--dry-run', action='store_true', help='Simulate without actual file operations')
# Report command
report_parser = subparsers.add_parser('report', help='Show current status')
args = parser.parse_args()
tool = DiskReorganizer()
if args.command == 'index':
tool.index_disk(args.disk_root, args.disk_name)
elif args.command == 'plan':
plan = tool.plan_migration(args.target_disk, args.dest_disks)
if plan:
print(f"\nPlan generated: {plan['file_count']} files, {tool.format_size(plan['total_size'])}")
print(f"Destination disks: {', '.join(plan['destination_disks'])}")
elif args.command == 'execute':
tool.execute_migration(args.plan_file, dry_run=args.dry_run)
elif args.command == 'report':
tool.generate_report()
if __name__ == '__main__':
main()