Compare commits

...

4 Commits

Author SHA1 Message Date
mike
1583df8f57 clean up code 2025-12-13 13:57:13 +01:00
mike
f6aa2b7b76 clean up code 2025-12-13 12:38:22 +01:00
mike
78042ff2a2 clean up code 2025-12-13 12:24:43 +01:00
mike
7ce8c8c73d clean up code 2025-12-13 12:00:34 +01:00
21 changed files with 1095 additions and 1670 deletions

150
app/analysis/inventory.py Normal file
View File

@@ -0,0 +1,150 @@
import psycopg2
from typing import Dict, Optional
import logging
logger = logging.getLogger(__name__)
class FileTypeInventory:
def __init__(self, db_config: Dict):
self.db_config = db_config
self.parseable_extensions = {
'text': {'txt', 'md', 'log', 'json', 'yaml', 'yml', 'xml', 'csv', 'tsv', 'ini', 'cfg', 'conf'},
'code': {'py', 'js', 'java', 'go', 'rs', 'ts', 'tsx', 'jsx', 'cpp', 'h', 'c', 'cs', 'rb', 'php', 'sh', 'bat', 'ps1', 'sql', 'r', 'scala', 'kt'},
'pdf': {'pdf'},
'document': {'doc', 'docx', 'odt', 'rtf', 'pages'},
'spreadsheet': {'xls', 'xlsx', 'ods', 'numbers'},
'presentation': {'ppt', 'pptx', 'odp', 'key'},
'image': {'jpg', 'jpeg', 'png', 'gif', 'bmp', 'tiff', 'webp', 'svg', 'ico'},
'audio': {'mp3', 'wav', 'flac', 'm4a', 'ogg', 'wma', 'aac', 'opus'},
'video': {'mp4', 'avi', 'mkv', 'mov', 'wmv', 'flv', 'webm', 'mpg', 'mpeg'},
'archive': {'zip', 'tar', 'gz', 'bz2', '7z', 'rar', 'xz'},
'executable': {'exe', 'dll', 'so', 'dylib', 'bin', 'app'},
'data': {'db', 'sqlite', 'mdb', 'accdb', 'pkl', 'parquet', 'feather', 'arrow'}
}
self.implemented_parsers = {
'text': True,
'code': True,
'pdf': True,
'document': False,
'spreadsheet': False,
'presentation': False,
'image': False,
'audio': False,
'video': False,
'archive': False,
'executable': False,
'data': False
}
def get_connection(self):
return psycopg2.connect(**self.db_config)
def analyze(self, disk: Optional[str] = None, limit: int = 100):
conn = self.get_connection()
cursor = conn.cursor()
try:
query = '''
SELECT
CASE
WHEN path ~ '\\.([a-zA-Z0-9]+)$' THEN
LOWER(SUBSTRING(path FROM '\\.([a-zA-Z0-9]+)$'))
ELSE 'no_extension'
END as extension,
COUNT(*) as count,
SUM(size)::bigint as total_size,
ROUND(AVG(size)::numeric, 0) as avg_size,
MAX(size) as max_size,
COUNT(CASE WHEN extracted_text IS NOT NULL THEN 1 END) as parsed_count
FROM files
'''
params = []
if disk:
query += ' WHERE disk_label = %s'
params.append(disk)
query += ' GROUP BY extension ORDER BY count DESC'
if limit:
query += f' LIMIT {limit}'
cursor.execute(query, params)
results = cursor.fetchall()
return self._format_results(results)
finally:
cursor.close()
conn.close()
def _format_results(self, results):
total_files = 0
total_size = 0
parseable_files = 0
parsed_files = 0
unparsed_by_type = {}
extension_details = []
for row in results:
ext, count, size, avg, max_sz, parsed = row
total_files += int(count)
total_size += int(size or 0)
parsed_files += int(parsed or 0)
parser_type = self._get_parser_type(ext)
is_parseable = parser_type != 'none' and self.implemented_parsers.get(parser_type, False)
if is_parseable:
parseable_files += int(count)
unparsed_count = int(count) - int(parsed or 0)
if unparsed_count > 0:
if parser_type not in unparsed_by_type:
unparsed_by_type[parser_type] = {'count': 0, 'extensions': set()}
unparsed_by_type[parser_type]['count'] += unparsed_count
unparsed_by_type[parser_type]['extensions'].add(ext)
extension_details.append({
'extension': ext,
'count': int(count),
'total_size': int(size or 0),
'avg_size': int(avg or 0),
'max_size': int(max_sz or 0),
'parsed': int(parsed or 0),
'parser_type': parser_type,
'is_parseable': is_parseable
})
return {
'extensions': extension_details,
'summary': {
'total_files': total_files,
'total_size': total_size,
'parseable_files': parseable_files,
'parsed_files': parsed_files,
'coverage': (parsed_files / parseable_files * 100) if parseable_files > 0 else 0
},
'unparsed_by_type': unparsed_by_type,
'parser_status': self._get_parser_status()
}
def _get_parser_type(self, ext: str) -> str:
for ptype, extensions in self.parseable_extensions.items():
if ext in extensions:
return ptype
return 'none'
def _get_parser_status(self):
return {
ptype: {
'implemented': self.implemented_parsers.get(ptype, False),
'extensions': list(exts)
}
for ptype, exts in self.parseable_extensions.items()
}
def format_size(self, size_bytes: int) -> str:
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024:
return f'{size_bytes:.1f}{unit}'
size_bytes /= 1024
return f'{size_bytes:.1f}PB'

View File

@@ -1,21 +1,4 @@
"""Deduplication package exports"""
from .chunker import (
RabinChunker,
SimpleChunker,
hash_chunk,
hash_file,
compute_file_signature
)
from .chunker import RabinChunker, SimpleChunker, hash_chunk, hash_file, compute_file_signature
from .store import HashStore, MemoryHashStore
from .engine import DeduplicationEngine
__all__ = [
'RabinChunker',
'SimpleChunker',
'hash_chunk',
'hash_file',
'compute_file_signature',
'HashStore',
'MemoryHashStore',
'DeduplicationEngine',
]
__all__ = ['RabinChunker', 'SimpleChunker', 'hash_chunk', 'hash_file', 'compute_file_signature', 'HashStore', 'MemoryHashStore', 'DeduplicationEngine']

View File

@@ -0,0 +1 @@

View File

@@ -1,75 +1,29 @@
"""Rabin fingerprint chunker for content-defined chunking"""
import hashlib
from pathlib import Path
from typing import Iterator, Optional
class RabinChunker:
"""Content-defined chunking using Rabin fingerprinting
Uses a rolling hash to identify chunk boundaries based on content,
allowing for efficient deduplication even when data is modified.
"""
def __init__(
self,
avg_chunk_size: int = 8192,
min_chunk_size: Optional[int] = None,
max_chunk_size: Optional[int] = None,
window_size: int = 48
):
"""Initialize Rabin chunker
Args:
avg_chunk_size: Target average chunk size in bytes
min_chunk_size: Minimum chunk size (default: avg_chunk_size // 4)
max_chunk_size: Maximum chunk size (default: avg_chunk_size * 8)
window_size: Rolling hash window size
"""
def __init__(self, avg_chunk_size: int=8192, min_chunk_size: Optional[int]=None, max_chunk_size: Optional[int]=None, window_size: int=48):
self.avg_chunk_size = avg_chunk_size
self.min_chunk_size = min_chunk_size or (avg_chunk_size // 4)
self.max_chunk_size = max_chunk_size or (avg_chunk_size * 8)
self.min_chunk_size = min_chunk_size or avg_chunk_size // 4
self.max_chunk_size = max_chunk_size or avg_chunk_size * 8
self.window_size = window_size
# Calculate mask for boundary detection
# For avg_chunk_size, we want boundaries at 1/avg_chunk_size probability
bits = 0
size = avg_chunk_size
while size > 1:
bits += 1
size >>= 1
self.mask = (1 << bits) - 1
self.poly = 17349423945073011
# Polynomial for rolling hash (prime number)
self.poly = 0x3DA3358B4DC173
def chunk_file(self, file_path: Path, chunk_size: Optional[int] = None) -> Iterator[bytes]:
"""Chunk a file using Rabin fingerprinting
Args:
file_path: Path to file to chunk
chunk_size: If provided, use fixed-size chunking instead
Yields:
Chunk data as bytes
"""
def chunk_file(self, file_path: Path, chunk_size: Optional[int]=None) -> Iterator[bytes]:
if chunk_size:
# Use fixed-size chunking
yield from self._chunk_fixed(file_path, chunk_size)
else:
# Use content-defined chunking
yield from self._chunk_rabin(file_path)
def _chunk_fixed(self, file_path: Path, chunk_size: int) -> Iterator[bytes]:
"""Fixed-size chunking
Args:
file_path: Path to file
chunk_size: Chunk size in bytes
Yields:
Fixed-size chunks
"""
with open(file_path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
@@ -78,46 +32,22 @@ class RabinChunker:
yield chunk
def _chunk_rabin(self, file_path: Path) -> Iterator[bytes]:
"""Content-defined chunking using Rabin fingerprinting
Args:
file_path: Path to file
Yields:
Variable-size chunks based on content
"""
with open(file_path, 'rb') as f:
chunk_data = bytearray()
window = bytearray()
hash_value = 0
while True:
byte = f.read(1)
if not byte:
# End of file - yield remaining data
if chunk_data:
yield bytes(chunk_data)
break
chunk_data.extend(byte)
window.extend(byte)
# Maintain window size
if len(window) > self.window_size:
window.pop(0)
# Update rolling hash
hash_value = self._rolling_hash(window)
# Check if we should create a boundary
should_break = (
len(chunk_data) >= self.min_chunk_size and
(
(hash_value & self.mask) == 0 or
len(chunk_data) >= self.max_chunk_size
)
)
should_break = len(chunk_data) >= self.min_chunk_size and (hash_value & self.mask == 0 or len(chunk_data) >= self.max_chunk_size)
if should_break:
yield bytes(chunk_data)
chunk_data = bytearray()
@@ -125,40 +55,17 @@ class RabinChunker:
hash_value = 0
def _rolling_hash(self, window: bytearray) -> int:
"""Calculate rolling hash for window
Args:
window: Byte window
Returns:
Hash value
"""
hash_value = 0
for byte in window:
hash_value = ((hash_value << 1) + byte) & 0xFFFFFFFFFFFFFFFF
hash_value = (hash_value << 1) + byte & 18446744073709551615
return hash_value
class SimpleChunker:
"""Simple fixed-size chunker for comparison"""
def __init__(self, chunk_size: int = 8192):
"""Initialize simple chunker
Args:
chunk_size: Fixed chunk size in bytes
"""
def __init__(self, chunk_size: int=8192):
self.chunk_size = chunk_size
def chunk_file(self, file_path: Path) -> Iterator[bytes]:
"""Chunk file into fixed-size pieces
Args:
file_path: Path to file
Yields:
Fixed-size chunks
"""
with open(file_path, 'rb') as f:
while True:
chunk = f.read(self.chunk_size)
@@ -166,76 +73,31 @@ class SimpleChunker:
break
yield chunk
def hash_chunk(chunk: bytes, algorithm: str = 'sha256') -> str:
"""Hash a chunk of data
Args:
chunk: Chunk data
algorithm: Hash algorithm (default: sha256)
Returns:
Hex digest of hash
"""
def hash_chunk(chunk: bytes, algorithm: str='sha256') -> str:
hasher = hashlib.new(algorithm)
hasher.update(chunk)
return hasher.hexdigest()
def hash_file(file_path: Path, algorithm: str = 'sha256', chunk_size: int = 65536) -> str:
"""Hash entire file
Args:
file_path: Path to file
algorithm: Hash algorithm (default: sha256)
chunk_size: Size of chunks to read
Returns:
Hex digest of file hash
"""
def hash_file(file_path: Path, algorithm: str='sha256', chunk_size: int=65536) -> str:
hasher = hashlib.new(algorithm)
with open(file_path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
hasher.update(chunk)
return hasher.hexdigest()
def compute_file_signature(
file_path: Path,
use_rabin: bool = True,
avg_chunk_size: int = 8192
) -> tuple[str, list[str]]:
"""Compute file signature with chunk hashes
Args:
file_path: Path to file
use_rabin: Whether to use Rabin chunking (vs fixed-size)
avg_chunk_size: Average chunk size for Rabin or fixed size
Returns:
Tuple of (file_hash, list of chunk hashes)
"""
def compute_file_signature(file_path: Path, use_rabin: bool=True, avg_chunk_size: int=8192) -> tuple[str, list[str]]:
if use_rabin:
chunker = RabinChunker(avg_chunk_size=avg_chunk_size)
else:
chunker = SimpleChunker(chunk_size=avg_chunk_size)
chunk_hashes = []
file_hasher = hashlib.sha256()
for chunk in chunker.chunk_file(file_path):
# Hash individual chunk
chunk_hash = hash_chunk(chunk)
chunk_hashes.append(chunk_hash)
# Update file hash
file_hasher.update(chunk)
file_hash = file_hasher.hexdigest()
return file_hash, chunk_hashes
return (file_hash, chunk_hashes)

View File

@@ -1,32 +1,16 @@
"""Deduplication engine"""
from pathlib import Path
from typing import Optional, Callable
from concurrent.futures import ThreadPoolExecutor, as_completed
import psycopg2
from .chunker import compute_file_signature, hash_file
from .store import HashStore
from ..shared.models import FileRecord, ProcessingStats
from ..shared.config import DatabaseConfig, ProcessingConfig
from ..shared.logger import ProgressLogger
class DeduplicationEngine:
"""Engine for deduplicating files"""
def __init__(
self,
db_config: DatabaseConfig,
processing_config: ProcessingConfig,
logger: ProgressLogger
):
"""Initialize deduplication engine
Args:
db_config: Database configuration
processing_config: Processing configuration
logger: Progress logger
"""
def __init__(self, db_config: DatabaseConfig, processing_config: ProcessingConfig, logger: ProgressLogger):
self.db_config = db_config
self.processing_config = processing_config
self.logger = logger
@@ -34,320 +18,130 @@ class DeduplicationEngine:
self._connection = None
def _get_connection(self):
"""Get or create database connection"""
if self._connection is None or self._connection.closed:
self._connection = psycopg2.connect(
host=self.db_config.host,
port=self.db_config.port,
database=self.db_config.database,
user=self.db_config.user,
password=self.db_config.password
)
self._connection = psycopg2.connect(host=self.db_config.host, port=self.db_config.port, database=self.db_config.database, user=self.db_config.user, password=self.db_config.password)
return self._connection
def deduplicate_all(
self,
disk: Optional[str] = None,
use_chunks: bool = True,
progress_callback: Optional[Callable[[int, int, ProcessingStats], None]] = None
) -> ProcessingStats:
"""Deduplicate all files in database
Args:
disk: Optional disk filter
use_chunks: Whether to use chunk-level deduplication
progress_callback: Optional callback for progress updates
Returns:
ProcessingStats with deduplication statistics
"""
self.logger.section("Starting Deduplication")
def deduplicate_all(self, disk: Optional[str]=None, use_chunks: bool=True, progress_callback: Optional[Callable[[int, int, ProcessingStats], None]]=None) -> ProcessingStats:
self.logger.section('Starting Deduplication')
conn = self._get_connection()
cursor = conn.cursor()
# Get files without checksums
if disk:
cursor.execute("""
SELECT path, size
FROM files
WHERE disk_label = %s AND checksum IS NULL
ORDER BY size DESC
""", (disk,))
cursor.execute('\n SELECT path, size\n FROM files\n WHERE disk_label = %s AND checksum IS NULL\n ORDER BY size DESC\n ', (disk,))
else:
cursor.execute("""
SELECT path, size
FROM files
WHERE checksum IS NULL
ORDER BY size DESC
""")
cursor.execute('\n SELECT path, size\n FROM files\n WHERE checksum IS NULL\n ORDER BY size DESC\n ')
files_to_process = cursor.fetchall()
total_files = len(files_to_process)
self.logger.info(f"Found {total_files} files to process")
self.logger.info(f'Found {total_files} files to process')
stats = ProcessingStats()
# Process files with thread pool
with ThreadPoolExecutor(max_workers=self.processing_config.parallel_workers) as executor:
futures = {}
for path_str, size in files_to_process:
path = Path(path_str)
future = executor.submit(self._process_file, path, use_chunks)
futures[future] = (path, size)
# Process completed futures
for future in as_completed(futures):
path, size = futures[future]
try:
checksum, duplicate_of = future.result()
if checksum:
# Update database
cursor.execute("""
UPDATE files
SET checksum = %s, duplicate_of = %s
WHERE path = %s
""", (checksum, duplicate_of, str(path)))
cursor.execute('\n UPDATE files\n SET checksum = %s, duplicate_of = %s\n WHERE path = %s\n ', (checksum, duplicate_of, str(path)))
stats.files_succeeded += 1
stats.bytes_processed += size
stats.files_processed += 1
# Commit periodically
if stats.files_processed % self.processing_config.commit_interval == 0:
conn.commit()
# Progress callback
if progress_callback:
progress_callback(stats.files_processed, total_files, stats)
# Log progress
self.logger.progress(
stats.files_processed,
total_files,
prefix="Files processed",
bytes_processed=stats.bytes_processed,
elapsed_seconds=stats.elapsed_seconds
)
self.logger.progress(stats.files_processed, total_files, prefix='Files processed', bytes_processed=stats.bytes_processed, elapsed_seconds=stats.elapsed_seconds)
except Exception as e:
self.logger.warning(f"Failed to process {path}: {e}")
self.logger.warning(f'Failed to process {path}: {e}')
stats.files_failed += 1
stats.files_processed += 1
# Final commit
conn.commit()
cursor.close()
self.logger.info(
f"Deduplication complete: {stats.files_succeeded}/{total_files} files, "
f"{stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s"
)
self.logger.info(f'Deduplication complete: {stats.files_succeeded}/{total_files} files, {stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s')
return stats
def _process_file(
self,
path: Path,
use_chunks: bool
) -> tuple[Optional[str], Optional[str]]:
"""Process a single file for deduplication
Args:
path: Path to file
use_chunks: Whether to use chunk-level deduplication
Returns:
Tuple of (checksum, duplicate_of_path)
"""
def _process_file(self, path: Path, use_chunks: bool) -> tuple[Optional[str], Optional[str]]:
if not path.exists():
return None, None
return (None, None)
try:
if use_chunks:
# Compute file signature with chunks
checksum, chunk_hashes = compute_file_signature(
path,
use_rabin=True,
avg_chunk_size=self.processing_config.chunk_size
)
checksum, chunk_hashes = compute_file_signature(path, use_rabin=True, avg_chunk_size=self.processing_config.chunk_size)
else:
# Just compute file hash
checksum = hash_file(
path,
algorithm=self.processing_config.hash_algorithm
)
checksum = hash_file(path, algorithm=self.processing_config.hash_algorithm)
chunk_hashes = None
# Check if hash exists
if self.hash_store.exists(checksum):
# Duplicate found
canonical_path = self.hash_store.get_canonical(checksum)
return checksum, canonical_path
return (checksum, canonical_path)
else:
# New unique file
size = path.stat().st_size
self.hash_store.store_canonical(
checksum,
path,
size,
chunk_hashes
)
return checksum, None
self.hash_store.store_canonical(checksum, path, size, chunk_hashes)
return (checksum, None)
except Exception as e:
self.logger.debug(f"Error processing {path}: {e}")
self.logger.debug(f'Error processing {path}: {e}')
raise
def find_duplicates(
self,
disk: Optional[str] = None
) -> dict[str, list[str]]:
"""Find all duplicate files
Args:
disk: Optional disk filter
Returns:
Dictionary mapping canonical path to list of duplicate paths
"""
self.logger.subsection("Finding Duplicates")
def find_duplicates(self, disk: Optional[str]=None) -> dict[str, list[str]]:
self.logger.subsection('Finding Duplicates')
conn = self._get_connection()
cursor = conn.cursor()
# Query for duplicates
if disk:
cursor.execute("""
SELECT checksum, array_agg(path ORDER BY path) as paths
FROM files
WHERE disk_label = %s AND checksum IS NOT NULL
GROUP BY checksum
HAVING COUNT(*) > 1
""", (disk,))
cursor.execute('\n SELECT checksum, array_agg(path ORDER BY path) as paths\n FROM files\n WHERE disk_label = %s AND checksum IS NOT NULL\n GROUP BY checksum\n HAVING COUNT(*) > 1\n ', (disk,))
else:
cursor.execute("""
SELECT checksum, array_agg(path ORDER BY path) as paths
FROM files
WHERE checksum IS NOT NULL
GROUP BY checksum
HAVING COUNT(*) > 1
""")
cursor.execute('\n SELECT checksum, array_agg(path ORDER BY path) as paths\n FROM files\n WHERE checksum IS NOT NULL\n GROUP BY checksum\n HAVING COUNT(*) > 1\n ')
duplicates = {}
for checksum, paths in cursor.fetchall():
canonical = paths[0]
duplicates[canonical] = paths[1:]
cursor.close()
self.logger.info(f"Found {len(duplicates)} sets of duplicates")
self.logger.info(f'Found {len(duplicates)} sets of duplicates')
return duplicates
def get_deduplication_stats(self) -> dict:
"""Get deduplication statistics
Returns:
Dictionary with statistics
"""
conn = self._get_connection()
cursor = conn.cursor()
stats = {}
# Total files
cursor.execute("SELECT COUNT(*) FROM files WHERE checksum IS NOT NULL")
cursor.execute('SELECT COUNT(*) FROM files WHERE checksum IS NOT NULL')
stats['total_files'] = cursor.fetchone()[0]
# Unique files
cursor.execute("SELECT COUNT(DISTINCT checksum) FROM files WHERE checksum IS NOT NULL")
cursor.execute('SELECT COUNT(DISTINCT checksum) FROM files WHERE checksum IS NOT NULL')
stats['unique_files'] = cursor.fetchone()[0]
# Duplicate files
stats['duplicate_files'] = stats['total_files'] - stats['unique_files']
# Total size
cursor.execute("SELECT COALESCE(SUM(size), 0) FROM files WHERE checksum IS NOT NULL")
cursor.execute('SELECT COALESCE(SUM(size), 0) FROM files WHERE checksum IS NOT NULL')
stats['total_size'] = cursor.fetchone()[0]
# Unique size
cursor.execute("""
SELECT COALESCE(SUM(size), 0)
FROM (
SELECT DISTINCT ON (checksum) size
FROM files
WHERE checksum IS NOT NULL
) AS unique_files
""")
cursor.execute('\n SELECT COALESCE(SUM(size), 0)\n FROM (\n SELECT DISTINCT ON (checksum) size\n FROM files\n WHERE checksum IS NOT NULL\n ) AS unique_files\n ')
stats['unique_size'] = cursor.fetchone()[0]
# Wasted space
stats['wasted_space'] = stats['total_size'] - stats['unique_size']
# Deduplication ratio
if stats['total_size'] > 0:
stats['dedup_ratio'] = stats['unique_size'] / stats['total_size']
else:
stats['dedup_ratio'] = 1.0
# Space saved percentage
if stats['total_size'] > 0:
stats['space_saved_percent'] = (stats['wasted_space'] / stats['total_size']) * 100
stats['space_saved_percent'] = stats['wasted_space'] / stats['total_size'] * 100
else:
stats['space_saved_percent'] = 0.0
cursor.close()
return stats
def mark_canonical_files(self) -> int:
"""Mark canonical (first occurrence) files in database
Returns:
Number of canonical files marked
"""
self.logger.subsection("Marking Canonical Files")
self.logger.subsection('Marking Canonical Files')
conn = self._get_connection()
cursor = conn.cursor()
# Find first occurrence of each checksum and mark as canonical
cursor.execute("""
WITH canonical AS (
SELECT DISTINCT ON (checksum) path, checksum
FROM files
WHERE checksum IS NOT NULL
ORDER BY checksum, path
)
UPDATE files
SET duplicate_of = NULL
WHERE path IN (SELECT path FROM canonical)
""")
cursor.execute('\n WITH canonical AS (\n SELECT DISTINCT ON (checksum) path, checksum\n FROM files\n WHERE checksum IS NOT NULL\n ORDER BY checksum, path\n )\n UPDATE files\n SET duplicate_of = NULL\n WHERE path IN (SELECT path FROM canonical)\n ')
count = cursor.rowcount
conn.commit()
cursor.close()
self.logger.info(f"Marked {count} canonical files")
self.logger.info(f'Marked {count} canonical files')
return count
def close(self):
"""Close connections"""
self.hash_store.close()
if self._connection and not self._connection.closed:
if self._connection and (not self._connection.closed):
self._connection.close()
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.close()

View File

@@ -1,412 +1,174 @@
"""Hash store for deduplication with optional Redis support"""
from typing import Optional, Dict, Set
from pathlib import Path
import psycopg2
from psycopg2.extras import execute_batch
from ..shared.config import DatabaseConfig
class HashStore:
"""PostgreSQL-based hash store for deduplication"""
def __init__(self, db_config: DatabaseConfig):
"""Initialize hash store
Args:
db_config: Database configuration
"""
self.db_config = db_config
self._connection = None
def _get_connection(self):
"""Get or create database connection"""
if self._connection is None or self._connection.closed:
self._connection = psycopg2.connect(
host=self.db_config.host,
port=self.db_config.port,
database=self.db_config.database,
user=self.db_config.user,
password=self.db_config.password
)
self._connection = psycopg2.connect(host=self.db_config.host, port=self.db_config.port, database=self.db_config.database, user=self.db_config.user, password=self.db_config.password)
return self._connection
def _ensure_tables(self):
"""Ensure hash store tables exist"""
conn = self._get_connection()
cursor = conn.cursor()
# Create hashes table for file-level deduplication
cursor.execute("""
CREATE TABLE IF NOT EXISTS file_hashes (
checksum TEXT PRIMARY KEY,
canonical_path TEXT NOT NULL,
size BIGINT NOT NULL,
first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ref_count INTEGER DEFAULT 1
)
""")
# Create chunk hashes table for chunk-level deduplication
cursor.execute("""
CREATE TABLE IF NOT EXISTS chunk_hashes (
chunk_hash TEXT PRIMARY KEY,
size INTEGER NOT NULL,
first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ref_count INTEGER DEFAULT 1
)
""")
# Create file-chunk mapping table
cursor.execute("""
CREATE TABLE IF NOT EXISTS file_chunks (
id SERIAL PRIMARY KEY,
file_checksum TEXT NOT NULL,
chunk_hash TEXT NOT NULL,
chunk_index INTEGER NOT NULL,
FOREIGN KEY (file_checksum) REFERENCES file_hashes(checksum),
FOREIGN KEY (chunk_hash) REFERENCES chunk_hashes(chunk_hash),
UNIQUE (file_checksum, chunk_index)
)
""")
# Create indexes
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_file_chunks_file
ON file_chunks(file_checksum)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_file_chunks_chunk
ON file_chunks(chunk_hash)
""")
cursor.execute('\n CREATE TABLE IF NOT EXISTS file_hashes (\n checksum TEXT PRIMARY KEY,\n canonical_path TEXT NOT NULL,\n size BIGINT NOT NULL,\n first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,\n ref_count INTEGER DEFAULT 1\n )\n ')
cursor.execute('\n CREATE TABLE IF NOT EXISTS chunk_hashes (\n chunk_hash TEXT PRIMARY KEY,\n size INTEGER NOT NULL,\n first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,\n ref_count INTEGER DEFAULT 1\n )\n ')
cursor.execute('\n CREATE TABLE IF NOT EXISTS file_chunks (\n id SERIAL PRIMARY KEY,\n file_checksum TEXT NOT NULL,\n chunk_hash TEXT NOT NULL,\n chunk_index INTEGER NOT NULL,\n FOREIGN KEY (file_checksum) REFERENCES file_hashes(checksum),\n FOREIGN KEY (chunk_hash) REFERENCES chunk_hashes(chunk_hash),\n UNIQUE (file_checksum, chunk_index)\n )\n ')
cursor.execute('\n CREATE INDEX IF NOT EXISTS idx_file_chunks_file\n ON file_chunks(file_checksum)\n ')
cursor.execute('\n CREATE INDEX IF NOT EXISTS idx_file_chunks_chunk\n ON file_chunks(chunk_hash)\n ')
conn.commit()
cursor.close()
def exists(self, checksum: str) -> bool:
"""Check if hash exists in store
Args:
checksum: File hash to check
Returns:
True if hash exists
"""
self._ensure_tables()
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute(
"SELECT 1 FROM file_hashes WHERE checksum = %s LIMIT 1",
(checksum,)
)
cursor.execute('SELECT 1 FROM file_hashes WHERE checksum = %s LIMIT 1', (checksum,))
exists = cursor.fetchone() is not None
cursor.close()
return exists
def get_canonical(self, checksum: str) -> Optional[str]:
"""Get canonical path for a hash
Args:
checksum: File hash
Returns:
Canonical file path or None if not found
"""
self._ensure_tables()
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute(
"SELECT canonical_path FROM file_hashes WHERE checksum = %s",
(checksum,)
)
cursor.execute('SELECT canonical_path FROM file_hashes WHERE checksum = %s', (checksum,))
result = cursor.fetchone()
cursor.close()
return result[0] if result else None
def store_canonical(
self,
checksum: str,
path: Path,
size: int,
chunk_hashes: Optional[list[str]] = None
) -> None:
"""Store canonical reference for a hash
Args:
checksum: File hash
path: Canonical file path
size: File size in bytes
chunk_hashes: Optional list of chunk hashes
"""
def store_canonical(self, checksum: str, path: Path, size: int, chunk_hashes: Optional[list[str]]=None) -> None:
self._ensure_tables()
conn = self._get_connection()
cursor = conn.cursor()
try:
# Store file hash
cursor.execute("""
INSERT INTO file_hashes (checksum, canonical_path, size)
VALUES (%s, %s, %s)
ON CONFLICT (checksum) DO UPDATE SET
ref_count = file_hashes.ref_count + 1
""", (checksum, str(path), size))
# Store chunk hashes if provided
cursor.execute('\n INSERT INTO file_hashes (checksum, canonical_path, size)\n VALUES (%s, %s, %s)\n ON CONFLICT (checksum) DO UPDATE SET\n ref_count = file_hashes.ref_count + 1\n ', (checksum, str(path), size))
if chunk_hashes:
# Insert chunk hashes
chunk_data = [(chunk_hash, 0) for chunk_hash in chunk_hashes]
execute_batch(cursor, """
INSERT INTO chunk_hashes (chunk_hash, size)
VALUES (%s, %s)
ON CONFLICT (chunk_hash) DO UPDATE SET
ref_count = chunk_hashes.ref_count + 1
""", chunk_data, page_size=1000)
# Create file-chunk mappings
mapping_data = [
(checksum, chunk_hash, idx)
for idx, chunk_hash in enumerate(chunk_hashes)
]
execute_batch(cursor, """
INSERT INTO file_chunks (file_checksum, chunk_hash, chunk_index)
VALUES (%s, %s, %s)
ON CONFLICT (file_checksum, chunk_index) DO NOTHING
""", mapping_data, page_size=1000)
execute_batch(cursor, '\n INSERT INTO chunk_hashes (chunk_hash, size)\n VALUES (%s, %s)\n ON CONFLICT (chunk_hash) DO UPDATE SET\n ref_count = chunk_hashes.ref_count + 1\n ', chunk_data, page_size=1000)
mapping_data = [(checksum, chunk_hash, idx) for idx, chunk_hash in enumerate(chunk_hashes)]
execute_batch(cursor, '\n INSERT INTO file_chunks (file_checksum, chunk_hash, chunk_index)\n VALUES (%s, %s, %s)\n ON CONFLICT (file_checksum, chunk_index) DO NOTHING\n ', mapping_data, page_size=1000)
conn.commit()
except Exception as e:
conn.rollback()
raise
finally:
cursor.close()
def get_chunk_hashes(self, checksum: str) -> list[str]:
"""Get chunk hashes for a file
Args:
checksum: File hash
Returns:
List of chunk hashes in order
"""
self._ensure_tables()
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT chunk_hash
FROM file_chunks
WHERE file_checksum = %s
ORDER BY chunk_index
""", (checksum,))
cursor.execute('\n SELECT chunk_hash\n FROM file_chunks\n WHERE file_checksum = %s\n ORDER BY chunk_index\n ', (checksum,))
chunk_hashes = [row[0] for row in cursor.fetchall()]
cursor.close()
return chunk_hashes
def get_duplicates(self) -> Dict[str, list[str]]:
"""Get all duplicate file groups
Returns:
Dictionary mapping canonical path to list of duplicate paths
"""
self._ensure_tables()
conn = self._get_connection()
cursor = conn.cursor()
# Get all files with their hashes
cursor.execute("""
SELECT f.path, f.checksum
FROM files f
WHERE f.checksum IS NOT NULL
""")
# Group by checksum
cursor.execute('\n SELECT f.path, f.checksum\n FROM files f\n WHERE f.checksum IS NOT NULL\n ')
hash_to_paths: Dict[str, list[str]] = {}
for path, checksum in cursor.fetchall():
if checksum not in hash_to_paths:
hash_to_paths[checksum] = []
hash_to_paths[checksum].append(path)
cursor.close()
# Filter to only duplicates (more than one file)
duplicates = {
paths[0]: paths[1:]
for checksum, paths in hash_to_paths.items()
if len(paths) > 1
}
duplicates = {paths[0]: paths[1:] for checksum, paths in hash_to_paths.items() if len(paths) > 1}
return duplicates
def get_stats(self) -> Dict[str, int]:
"""Get hash store statistics
Returns:
Dictionary with statistics
"""
self._ensure_tables()
conn = self._get_connection()
cursor = conn.cursor()
stats = {}
# Count unique file hashes
cursor.execute("SELECT COUNT(*) FROM file_hashes")
cursor.execute('SELECT COUNT(*) FROM file_hashes')
stats['unique_files'] = cursor.fetchone()[0]
# Count unique chunk hashes
cursor.execute("SELECT COUNT(*) FROM chunk_hashes")
cursor.execute('SELECT COUNT(*) FROM chunk_hashes')
stats['unique_chunks'] = cursor.fetchone()[0]
# Count total references
cursor.execute("SELECT COALESCE(SUM(ref_count), 0) FROM file_hashes")
cursor.execute('SELECT COALESCE(SUM(ref_count), 0) FROM file_hashes')
stats['total_file_refs'] = cursor.fetchone()[0]
# Count total chunk references
cursor.execute("SELECT COALESCE(SUM(ref_count), 0) FROM chunk_hashes")
cursor.execute('SELECT COALESCE(SUM(ref_count), 0) FROM chunk_hashes')
stats['total_chunk_refs'] = cursor.fetchone()[0]
# Calculate deduplication ratio
if stats['total_file_refs'] > 0:
stats['dedup_ratio'] = stats['unique_files'] / stats['total_file_refs']
else:
stats['dedup_ratio'] = 1.0
cursor.close()
return stats
def find_similar_files(self, checksum: str, threshold: float = 0.8) -> list[tuple[str, float]]:
"""Find files similar to given hash based on chunk overlap
Args:
checksum: File hash to compare
threshold: Similarity threshold (0.0 to 1.0)
Returns:
List of tuples (other_checksum, similarity_score)
"""
def find_similar_files(self, checksum: str, threshold: float=0.8) -> list[tuple[str, float]]:
self._ensure_tables()
conn = self._get_connection()
cursor = conn.cursor()
# Get chunks for the target file
target_chunks = set(self.get_chunk_hashes(checksum))
if not target_chunks:
cursor.close()
return []
# Find files sharing chunks
cursor.execute("""
SELECT DISTINCT fc.file_checksum
FROM file_chunks fc
WHERE fc.chunk_hash = ANY(%s)
AND fc.file_checksum != %s
""", (list(target_chunks), checksum))
cursor.execute('\n SELECT DISTINCT fc.file_checksum\n FROM file_chunks fc\n WHERE fc.chunk_hash = ANY(%s)\n AND fc.file_checksum != %s\n ', (list(target_chunks), checksum))
similar_files = []
for (other_checksum,) in cursor.fetchall():
for other_checksum, in cursor.fetchall():
other_chunks = set(self.get_chunk_hashes(other_checksum))
# Calculate Jaccard similarity
intersection = len(target_chunks & other_chunks)
union = len(target_chunks | other_chunks)
if union > 0:
similarity = intersection / union
if similarity >= threshold:
similar_files.append((other_checksum, similarity))
cursor.close()
# Sort by similarity descending
similar_files.sort(key=lambda x: x[1], reverse=True)
return similar_files
def close(self):
"""Close database connection"""
if self._connection and not self._connection.closed:
if self._connection and (not self._connection.closed):
self._connection.close()
def __enter__(self):
"""Context manager entry"""
self._ensure_tables()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.close()
class MemoryHashStore:
"""In-memory hash store for testing and small datasets"""
def __init__(self):
"""Initialize in-memory hash store"""
self.hashes: Dict[str, tuple[str, int]] = {}
self.chunks: Dict[str, int] = {}
self.file_chunks: Dict[str, list[str]] = {}
def exists(self, checksum: str) -> bool:
"""Check if hash exists"""
return checksum in self.hashes
def get_canonical(self, checksum: str) -> Optional[str]:
"""Get canonical path"""
return self.hashes.get(checksum, (None, 0))[0]
def store_canonical(
self,
checksum: str,
path: Path,
size: int,
chunk_hashes: Optional[list[str]] = None
) -> None:
"""Store canonical reference"""
def store_canonical(self, checksum: str, path: Path, size: int, chunk_hashes: Optional[list[str]]=None) -> None:
self.hashes[checksum] = (str(path), size)
if chunk_hashes:
self.file_chunks[checksum] = chunk_hashes
for chunk_hash in chunk_hashes:
self.chunks[chunk_hash] = self.chunks.get(chunk_hash, 0) + 1
def get_chunk_hashes(self, checksum: str) -> list[str]:
"""Get chunk hashes"""
return self.file_chunks.get(checksum, [])
def get_stats(self) -> Dict[str, int]:
"""Get statistics"""
return {
'unique_files': len(self.hashes),
'unique_chunks': len(self.chunks),
'total_file_refs': len(self.hashes),
'total_chunk_refs': sum(self.chunks.values()),
'dedup_ratio': 1.0
}
return {'unique_files': len(self.hashes), 'unique_chunks': len(self.chunks), 'total_file_refs': len(self.hashes), 'total_chunk_refs': sum(self.chunks.values()), 'dedup_ratio': 1.0}
def close(self):
"""No-op for compatibility"""
pass
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
pass

View File

@@ -1,59 +1,119 @@
from typing import Dict
from typing import Dict, List
import re
class ContentEnricher:
tech_keywords = {'transcribe', 'transcription', 'whisper', 'speech-to-text', 'audio', 'video', 'subtitle', 'caption', 'srt', 'vtt', 'ffmpeg', 'opencv', 'pytorch', 'tensorflow', 'cuda', 'gpu', 'ml', 'nlp', 'llm', 'ollama', 'docker', 'kubernetes', 'postgres', 'database', 'api', 'rest', 'graphql', 'python', 'javascript', 'java', 'rust', 'golang'}
def __init__(self, llm_client=None):
self.llm_client = llm_client
self.pii_patterns = {
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'credit_card': r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b'
'credit_card': r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
'api_key': r'(?i)(api[_-]?key|token|secret)["\']?\s*[:=]\s*["\']?([a-zA-Z0-9_\-]{20,})',
'password': r'(?i)(password|passwd|pwd)["\']?\s*[:=]\s*["\']([^"\']{8,})'
}
def enrich(self, text: str, use_llm: bool = False) -> Dict:
enrichment = {
'summary': self._basic_summary(text),
'word_count': len(text.split()),
'has_pii': self._detect_pii(text),
'quality': self._assess_quality(text),
'topics': self._extract_basic_topics(text)
'topics': self._extract_topics(text),
'entities': self._extract_entities(text),
'tech_stack': self._detect_tech(text),
'security': {
'has_pii': bool(self._detect_pii(text)),
'has_credentials': self._detect_credentials(text),
'pii_details': self._detect_pii(text)
},
'quality': self._assess_quality(text)
}
if use_llm and self.llm_client:
llm_result = self.llm_client.classify_content(text)
if llm_result.get('success'):
enrichment['llm_classification'] = llm_result['text']
summary_result = self.llm_client.summarize(text[:3000], max_length=200)
if summary_result.get('success'):
enrichment['llm_summary'] = summary_result['text']
intent_result = self.llm_client.extract_intent(text[:3000])
if intent_result.get('success'):
enrichment['llm_intent'] = intent_result['text']
topics_result = self.llm_client.extract_topics(text[:3000])
if topics_result.get('success') and topics_result.get('topics'):
enrichment['llm_topics'] = topics_result['topics']
return enrichment
def _basic_summary(self, text: str) -> str:
sentences = re.split(r'[.!?]+', text)
return ' '.join(sentences[:3])[:200]
if not text:
return ''
sentences = re.split(r'[.!?\n]+', text)
summary = []
length = 0
for sent in sentences:
sent = sent.strip()
if not sent:
continue
if length + len(sent) > 200:
break
summary.append(sent)
length += len(sent)
return '. '.join(summary) if summary else text[:200]
def _extract_topics(self, text: str) -> List[str]:
text_lower = text.lower()
topics = []
for tech in self.tech_keywords:
if tech in text_lower:
topics.append(tech)
words = re.findall(r'\b[A-Z][a-z]+\b', text)
word_freq = {}
for word in words:
if len(word) > 3 and word.lower() not in {'this', 'that', 'with', 'from', 'have'}:
word_freq[word] = word_freq.get(word, 0) + 1
sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
topics.extend([w for (w, _) in sorted_words[:5]])
return list(set(topics))[:15]
def _extract_entities(self, text: str) -> Dict[str, List[str]]:
entities = {'files': [], 'urls': [], 'paths': []}
file_pattern = re.compile(r'\b\w+\.(py|js|java|go|rs|cpp|h|md|txt|json|yaml|yml|xml|sql|sh|bat|ts|tsx|jsx)\b')
entities['files'] = list(set(file_pattern.findall(text)))[:10]
url_pattern = re.compile(r'https?://[^\s<>"{}|\\^`\[\]]+')
entities['urls'] = list(set(url_pattern.findall(text)))[:5]
path_pattern = re.compile(r'(?:/[a-zA-Z0-9_.-]+)+/?')
entities['paths'] = list(set(path_pattern.findall(text)))[:10]
return entities
def _detect_tech(self, text: str) -> List[str]:
text_lower = text.lower()
return [tech for tech in self.tech_keywords if tech in text_lower]
def _detect_pii(self, text: str) -> Dict:
detected = {}
for pii_type, pattern in self.pii_patterns.items():
matches = re.findall(pattern, text)
for pii_type in ['email', 'phone', 'ssn', 'credit_card']:
matches = re.findall(self.pii_patterns[pii_type], text)
if matches:
detected[pii_type] = len(matches)
return detected
def _detect_credentials(self, text: str) -> bool:
for name in ['api_key', 'password']:
if re.search(self.pii_patterns[name], text):
return True
return False
def _assess_quality(self, text: str) -> str:
if len(text.strip()) < 10:
return 'low'
if not text or len(text.strip()) < 10:
return 'empty'
words = text.split()
if not words:
return 'empty'
avg_word_len = sum(len(w) for w in words) / len(words)
if avg_word_len < 2 or avg_word_len > 20:
return 'garbled'
special_char_ratio = sum(1 for c in text if not c.isalnum() and not c.isspace()) / len(text)
if special_char_ratio > 0.3:
return 'low'
return 'high' if len(text.split()) > 50 else 'medium'
def _extract_basic_topics(self, text: str) -> list:
words = re.findall(r'\b[A-Z][a-z]+\b', text)
word_freq = {}
for word in words:
if len(word) > 3:
word_freq[word] = word_freq.get(word, 0) + 1
return sorted(word_freq, key=word_freq.get, reverse=True)[:10]
if special_char_ratio > 0.4:
return 'low_confidence'
return 'good'

View File

@@ -1,54 +1,79 @@
import requests
import json
from typing import Dict, Optional
import logging
from typing import Dict, Optional, List
logger = logging.getLogger(__name__)
class LLMClient:
def __init__(self, endpoint: str = 'http://192.168.1.74:1234', model: str = 'local'):
def __init__(self, endpoint: str = 'http://localhost:11434', model: str = 'llama3', use_local: bool = True, lm_studio_host: str = None):
self.endpoint = endpoint
self.model = model
self.local_ollama = 'http://localhost:11434'
self.use_local = use_local
self.lm_studio_endpoints = {
'plato': {'url': 'http://192.168.1.74:1234', 'model': 'openai/gpt-oss-20b'},
'postgres': {'url': 'http://192.168.1.159:1234', 'model': 'mistralai/devstral-small-2507'},
'local': {'url': 'http://localhost:11434', 'model': 'llama3'}
}
self.lm_studio_host = lm_studio_host or 'postgres'
studio_config = self.lm_studio_endpoints.get(self.lm_studio_host, self.lm_studio_endpoints['postgres'])
self.lm_studio_endpoint = studio_config['url']
self.lm_studio_model = studio_config['model']
def summarize(self, text: str, max_length: int = 200) -> Dict:
prompt = f"Summarize the following in {max_length} chars or less:\n\n{text[:2000]}"
prompt = f"Summarize this concisely in under {max_length} characters:\n\n{text[:3000]}"
return self._query(prompt)
def extract_topics(self, text: str) -> Dict:
prompt = f"Extract 5-10 key topics/tags from this text. Return as comma-separated list:\n\n{text[:2000]}"
prompt = f"Extract 5-10 key topics/tags. Return ONLY comma-separated words:\n\n{text[:3000]}"
result = self._query(prompt)
if result.get('success'):
topics = [t.strip() for t in result['text'].split(',')]
result['topics'] = topics[:10]
return result
def extract_intent(self, text: str) -> Dict:
prompt = f"What is the main purpose/intent of this code/document? Answer in 1-2 sentences:\n\n{text[:3000]}"
return self._query(prompt)
def classify_content(self, text: str) -> Dict:
prompt = f"Classify this content. Return: category, topics, has_pii (yes/no), quality (high/medium/low):\n\n{text[:1000]}"
def detect_project_type(self, text: str, file_list: List[str]) -> Dict:
files_str = ', '.join(file_list[:20])
prompt = f"Based on these files: {files_str}\nAnd this content:\n{text[:2000]}\n\nWhat type of project is this? (e.g. web app, ml/ai, transcription, data processing, etc.)"
return self._query(prompt)
def _query(self, prompt: str, use_local: bool = False) -> Dict:
def _query(self, prompt: str, timeout: int = 30) -> Dict:
try:
endpoint = self.local_ollama if use_local else self.endpoint
if use_local:
if self.use_local:
response = requests.post(
f'{endpoint}/api/generate',
json={'model': 'llama3.2', 'prompt': prompt, 'stream': False},
timeout=30
f'{self.endpoint}/api/generate',
json={'model': self.model, 'prompt': prompt, 'stream': False},
timeout=timeout
)
if response.status_code == 200:
data = response.json()
return {'success': True, 'text': data.get('response', '').strip()}
else:
response = requests.post(
f'{endpoint}/v1/chat/completions',
f'{self.lm_studio_endpoint}/v1/chat/completions',
json={
'model': self.model,
'model': self.lm_studio_model,
'messages': [{'role': 'user', 'content': prompt}],
'max_tokens': 500
'max_tokens': 500,
'temperature': 0.7
},
timeout=30
timeout=timeout
)
if response.status_code == 200:
data = response.json()
return {'success': True, 'text': data['choices'][0]['message']['content'].strip()}
if response.status_code == 200:
data = response.json()
if use_local:
return {'success': True, 'text': data.get('response', '')}
else:
return {'success': True, 'text': data['choices'][0]['message']['content']}
else:
return {'success': False, 'error': f'HTTP {response.status_code}'}
return {'success': False, 'error': f'HTTP {response.status_code}'}
except requests.Timeout:
logger.warning(f'LLM request timeout after {timeout}s')
return {'success': False, 'error': 'timeout'}
except Exception as e:
logger.error(f'LLM query failed: {e}')
return {'success': False, 'error': str(e)}

View File

@@ -0,0 +1,196 @@
import hashlib
import psycopg2
from pathlib import Path
from typing import Dict, Optional, List
import logging
import time
import json
logger = logging.getLogger(__name__)
class IncrementalExtractor:
def __init__(self, db_config: Dict):
self.db_config = db_config
def get_connection(self):
return psycopg2.connect(**self.db_config)
def should_extract(self, file_path: str, file_checksum: str) -> bool:
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT file_checksum, status
FROM extraction_log
WHERE file_path = %s
ORDER BY created_at DESC
LIMIT 1
''', (file_path,))
result = cursor.fetchone()
if not result:
return True
last_checksum, status = result
if last_checksum != file_checksum:
logger.info(f'File changed: {file_path}')
return True
if status == 'success':
return False
if status == 'error':
return True
return True
finally:
cursor.close()
conn.close()
def log_extraction(self, node_id: Optional[str], file_path: str, file_checksum: str,
method: str, status: str, error_msg: Optional[str] = None,
extracted_size: Optional[int] = None, processing_time_ms: Optional[int] = None):
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
INSERT INTO extraction_log (node_id, file_path, file_checksum, extraction_method,
status, error_message, extracted_size, processing_time_ms)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
''', (node_id, file_path, file_checksum, method, status, error_msg, extracted_size, processing_time_ms))
conn.commit()
finally:
cursor.close()
conn.close()
def create_or_update_node(self, node_type: str, path: str, disk_label: str,
checksum: Optional[str], size: Optional[int],
content_hash: Optional[str], metadata: Optional[Dict]) -> str:
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute('''
INSERT INTO content_nodes (node_type, path, disk_label, checksum, size,
content_hash, extracted_at, metadata)
VALUES (%s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP, %s)
ON CONFLICT (node_type, path, disk_label) DO UPDATE SET
checksum = EXCLUDED.checksum,
size = EXCLUDED.size,
content_hash = EXCLUDED.content_hash,
extracted_at = CURRENT_TIMESTAMP,
metadata = EXCLUDED.metadata,
updated_at = CURRENT_TIMESTAMP
RETURNING id
''', (node_type, path, disk_label, checksum, size, content_hash, json.dumps(metadata) if metadata else None))
node_id = cursor.fetchone()[0]
conn.commit()
return str(node_id)
finally:
cursor.close()
conn.close()
def batch_extract(self, file_list: List[Dict], parser_func, parser_name: str,
batch_size: int = 100, skip_existing: bool = True) -> Dict:
stats = {
'processed': 0,
'extracted': 0,
'skipped': 0,
'errors': 0,
'total_time_ms': 0
}
conn = self.get_connection()
cursor = conn.cursor()
try:
for idx, file_info in enumerate(file_list, 1):
path = file_info['path']
checksum = file_info.get('checksum')
disk_label = file_info.get('disk_label')
if skip_existing and not self.should_extract(path, checksum):
stats['skipped'] += 1
continue
start_time = time.time()
try:
result = parser_func(Path(path))
processing_time_ms = int((time.time() - start_time) * 1000)
if 'error' not in result and result.get('text'):
text = result['text']
content_hash = hashlib.sha256(text.encode()).hexdigest()
node_id = self.create_or_update_node(
node_type='file',
path=path,
disk_label=disk_label,
checksum=checksum,
size=file_info.get('size'),
content_hash=content_hash,
metadata={
'extraction': result.get('method', parser_name),
'quality': result.get('quality', 'unknown')
}
)
cursor.execute('''
UPDATE files
SET extracted_text = %s,
text_quality = %s
WHERE path = %s
''', (text[:50000], result.get('quality'), path))
self.log_extraction(
node_id=node_id,
file_path=path,
file_checksum=checksum,
method=parser_name,
status='success',
extracted_size=len(text),
processing_time_ms=processing_time_ms
)
stats['extracted'] += 1
stats['total_time_ms'] += processing_time_ms
else:
error_msg = result.get('error', 'No text extracted')
self.log_extraction(
node_id=None,
file_path=path,
file_checksum=checksum,
method=parser_name,
status='error',
error_msg=error_msg
)
stats['errors'] += 1
except Exception as e:
logger.error(f'Extract failed for {path}: {e}')
self.log_extraction(
node_id=None,
file_path=path,
file_checksum=checksum,
method=parser_name,
status='error',
error_msg=str(e)
)
stats['errors'] += 1
stats['processed'] += 1
if stats['processed'] % batch_size == 0:
conn.commit()
logger.info(f'Batch progress: {stats["processed"]}/{len(file_list)} '
f'({stats["extracted"]} extracted, {stats["skipped"]} skipped, {stats["errors"]} errors)')
conn.commit()
finally:
cursor.close()
conn.close()
return stats

View File

@@ -535,14 +535,18 @@ class DiskReorganizer:
try:
query = "SELECT path, size, disk_label FROM files WHERE 1=1"
params = []
if kind:
suffix_map = {'text': "('.txt', '.md', '.log', '.json')", 'code': "('.py', '.js', '.java', '.go')", 'pdf': "('.pdf',)"}
suffix_map = {
'text': ['.txt', '.md', '.log', '.json', '.yaml', '.yml'],
'code': ['.py', '.js', '.java', '.go', '.rs', '.ts', '.cpp', '.h'],
'pdf': ['.pdf']
}
if kind in suffix_map:
query += f" AND RIGHT(path, 4) IN {suffix_map[kind]} OR RIGHT(path, 3) IN {suffix_map[kind]}"
conditions = ' OR '.join([f"path LIKE '%{ext}'" for ext in suffix_map[kind]])
query += f" AND ({conditions})"
query += f" LIMIT {limit}"
cursor.execute(query, params)
cursor.execute(query)
files = cursor.fetchall()
print(f"\n=== PARSING FILES ===\nProcessing {len(files)} files\n")
@@ -580,30 +584,63 @@ class DiskReorganizer:
cursor.close()
conn.close()
def enrich_files(self, limit: int = 10, llm_endpoint: str = None, use_local: bool = False):
def enrich_files(self, limit: int = 10, use_llm: bool = False, use_local: bool = True, batch_size: int = 100):
from enrichment.enricher import ContentEnricher
from enrichment.llm_client import LLMClient
llm_client = LLMClient(use_local=use_local) if use_llm else None
enricher = ContentEnricher(llm_client=llm_client)
enricher = ContentEnricher()
conn = self.get_connection()
cursor = conn.cursor()
try:
cursor.execute(f"SELECT path, extracted_text FROM files WHERE extracted_text IS NOT NULL LIMIT {limit}")
cursor.execute(f"SELECT path, extracted_text FROM files WHERE extracted_text IS NOT NULL AND (enrichment IS NULL OR enrichment = '{{}}'::jsonb) LIMIT {limit}")
files = cursor.fetchall()
print(f"\n=== ENRICHING CONTENT ===\nProcessing {len(files)} files\n")
print(f"\n=== ENRICHING CONTENT ===")
print(f"Processing {len(files)} files")
if use_llm:
print(f"Using LLM: {'Local OLLAMA' if use_local else 'Network LM_STUDIO'}\n")
else:
print("Using rule-based enrichment only\n")
for path, text in files:
enrichment = enricher.enrich(text[:5000], use_llm=False)
print(f"{path[:60]}")
enriched_count = 0
batch = []
for idx, (path, text) in enumerate(files, 1):
if not text:
continue
enrichment = enricher.enrich(text[:5000], use_llm=use_llm)
print(f"{idx}/{len(files)} {path[:60]}")
print(f" Quality: {enrichment.get('quality')} | Words: {enrichment.get('word_count'):,}")
print(f" PII: {list(enrichment.get('has_pii', {}).keys())}")
print(f" Topics: {', '.join(enrichment.get('topics', [])[:5])}\n")
if enrichment.get('security', {}).get('has_pii'):
print(f" PII: {list(enrichment.get('security', {}).get('pii_details', {}).keys())}")
if enrichment.get('tech_stack'):
print(f" Tech: {', '.join(enrichment['tech_stack'][:5])}")
if enrichment.get('topics'):
print(f" Topics: {', '.join(enrichment['topics'][:5])}")
if use_llm and enrichment.get('llm_summary'):
print(f" LLM Summary: {enrichment['llm_summary'][:100]}...")
if use_llm and enrichment.get('llm_intent'):
print(f" Intent: {enrichment['llm_intent'][:100]}...")
print()
cursor.execute("UPDATE files SET enrichment = %s::jsonb WHERE path = %s", (json.dumps(enrichment), path))
batch.append((json.dumps(enrichment), path))
enriched_count += 1
conn.commit()
print(f"Enriched {len(files)} files")
if len(batch) >= batch_size:
cursor.executemany("UPDATE files SET enrichment = %s::jsonb WHERE path = %s", batch)
conn.commit()
batch.clear()
print(f" Committed batch ({enriched_count} files so far)")
if batch:
cursor.executemany("UPDATE files SET enrichment = %s::jsonb WHERE path = %s", batch)
conn.commit()
print(f"\nEnriched {enriched_count} files")
finally:
cursor.close()
@@ -695,6 +732,75 @@ class DiskReorganizer:
cursor.close()
conn.close()
def search_content(self, query: str, limit: int=20, search_type: str='text'):
conn = self.get_connection()
cursor = conn.cursor()
try:
if search_type == 'text':
cursor.execute('''
SELECT path, disk_label, size, category,
ts_rank(to_tsvector('english', COALESCE(extracted_text, '')), plainto_tsquery('english', %s)) as rank,
LEFT(extracted_text, 200) as snippet
FROM files
WHERE extracted_text IS NOT NULL
AND to_tsvector('english', extracted_text) @@ plainto_tsquery('english', %s)
ORDER BY rank DESC
LIMIT %s
''', (query, query, limit))
elif search_type == 'enrichment':
cursor.execute('''
SELECT path, disk_label, size, category, enrichment
FROM files
WHERE enrichment IS NOT NULL
AND enrichment::text ILIKE %s
LIMIT %s
''', (f'%{query}%', limit))
elif search_type == 'path':
cursor.execute('''
SELECT path, disk_label, size, category
FROM files
WHERE path ILIKE %s
LIMIT %s
''', (f'%{query}%', limit))
else:
logger.error(f'Unknown search type: {search_type}')
return
results = cursor.fetchall()
if not results:
print(f'No results found for: {query}')
return
print(f'\n=== SEARCH RESULTS: {len(results)} matches for "{query}" ===\n')
for idx, row in enumerate(results, 1):
if search_type == 'text':
path, disk, size, category, rank, snippet = row
print(f'{idx}. {path}')
print(f' Disk: {disk}, Size: {self.format_size(int(size))}, Category: {category}')
print(f' Rank: {rank:.4f}')
if snippet:
print(f' Snippet: {snippet[:150]}...')
elif search_type == 'enrichment':
path, disk, size, category, enrichment = row
print(f'{idx}. {path}')
print(f' Disk: {disk}, Size: {self.format_size(int(size))}, Category: {category}')
if enrichment:
import json
enrich_data = json.loads(enrichment) if isinstance(enrichment, str) else enrichment
if 'topics' in enrich_data:
print(f' Topics: {", ".join(enrich_data["topics"][:5])}')
if 'tech_stack' in enrich_data:
print(f' Tech: {", ".join(enrich_data["tech_stack"][:5])}')
else:
path, disk, size, category = row
print(f'{idx}. {path}')
print(f' Disk: {disk}, Size: {self.format_size(int(size))}, Category: {category}')
print()
finally:
cursor.close()
conn.close()
def analyze_folders(self, disk: Optional[str]=None, min_files: int=3):
from analysis.folder_analyzer import FolderAnalyzer
analyzer = FolderAnalyzer()
@@ -788,131 +894,3 @@ class DiskReorganizer:
cursor.close()
conn.close()
def review_migration(self, category: Optional[str]=None, show_build: bool=False):
from classification.classifier import FileClassifier
classifier = FileClassifier()
conn = self.get_connection()
cursor = conn.cursor()
try:
query = 'SELECT path, size, category FROM files WHERE 1=1'
params = []
if category:
query += ' AND category = %s'
params.append(category)
if not show_build:
query += " AND (metadata->>'labels' IS NULL OR metadata->>'labels' NOT LIKE '%build-artifact%')"
query += ' ORDER BY category, size DESC LIMIT 100'
cursor.execute(query, params)
files = cursor.fetchall()
if not files:
print('No files found matching criteria')
return
print(f'\n=== MIGRATION PREVIEW ===')
print(f'Showing {len(files)} files\n')
current_category = None
for path, size, cat in files:
if cat != current_category:
current_category = cat
print(f'\n{cat}:')
labels, suggested_cat, is_build = classifier.classify_path(path, int(size))
target = classifier.suggest_target_path(path, suggested_cat, labels)
print(f' {path}')
print(f'{target} ({self.format_size(int(size))})')
finally:
cursor.close()
conn.close()
@staticmethod
def format_size(size: int) -> str:
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size < 1024:
return f'{size:.1f}{unit}'
size /= 1024
return f'{size:.1f}PB'
def main():
parser = argparse.ArgumentParser(description='Disk Reorganizer - Free up a disk for Linux dual-boot')
subparsers = parser.add_subparsers(dest='command', required=True)
index_parser = subparsers.add_parser('index', help='Index files on a disk')
index_parser.add_argument('disk_root', help='Root path of disk (e.g., D:\\\\)')
index_parser.add_argument('disk_name', help='Logical name for the disk')
plan_parser = subparsers.add_parser('plan', help='Create migration plan')
plan_parser.add_argument('target_disk', help='Disk to free up')
plan_parser.add_argument('dest_disks', nargs='+', help='Destination disks')
exec_parser = subparsers.add_parser('execute', help='Execute migration plan')
exec_parser.add_argument('plan_file', help='Path to plan JSON file')
exec_parser.add_argument('--dry-run', action='store_true', help='Simulate without actual file operations')
dedupe_parser = subparsers.add_parser('dedupe', help='Deduplicate files and compute checksums')
dedupe_parser.add_argument('--disk', help='Optional: Only dedupe specific disk')
dedupe_parser.add_argument('--no-chunks', action='store_true', help='Disable chunk-level deduplication')
merge_parser = subparsers.add_parser('merge', help='Plan multi-disk merge with deduplication')
merge_parser.add_argument('--sources', nargs='+', required=True, help='Source disks to merge')
merge_parser.add_argument('--target', required=True, help='Target disk')
merge_parser.add_argument('--output', default='merge_plan.json', help='Output plan file')
merge_parser.add_argument('--filter-system', action='store_true', help='Filter system/build files')
merge_parser.add_argument('--network', help='Network target (e.g., user@host:/path)')
profile_parser = subparsers.add_parser('profile', help='Create content profiles (inventory + triage)')
profile_parser.add_argument('--disk', help='Profile specific disk')
profile_parser.add_argument('--update', action='store_true', help='Update database with profiles')
profile_parser.add_argument('--limit', type=int, help='Limit number of files')
extract_parser = subparsers.add_parser('extract', help='Extract content from files')
extract_parser.add_argument('--kind', help='Extract specific kind (pdf, image, audio, video)')
extract_parser.add_argument('--limit', type=int, default=10, help='Limit extraction batch')
parse_parser = subparsers.add_parser('parse', help='Parse files to extract text')
parse_parser.add_argument('--kind', help='Parse specific kind (text, code, pdf)')
parse_parser.add_argument('--limit', type=int, default=100, help='Limit parse batch')
parse_parser.add_argument('--update', action='store_true', help='Save extracted text to database')
enrich_parser = subparsers.add_parser('enrich', help='Enrich content with LLM analysis')
enrich_parser.add_argument('--limit', type=int, default=10, help='Limit enrichment batch')
enrich_parser.add_argument('--llm-endpoint', default='http://192.168.1.74:1234', help='LLM endpoint')
enrich_parser.add_argument('--local', action='store_true', help='Use local Ollama')
classify_parser = subparsers.add_parser('classify', help='Classify files and suggest organization')
classify_parser.add_argument('--disk', help='Classify specific disk')
classify_parser.add_argument('--update', action='store_true', help='Update database with classifications')
classify_parser.add_argument('--no-resume', action='store_true', help='Start from scratch instead of resuming')
folders_parser = subparsers.add_parser('analyze-folders', help='Analyze folder structure and infer project intent')
folders_parser.add_argument('--disk', help='Analyze specific disk')
folders_parser.add_argument('--min-files', type=int, default=3, help='Minimum files per folder')
review_parser = subparsers.add_parser('review', help='Review proposed migration structure')
review_parser.add_argument('--category', help='Review specific category')
review_parser.add_argument('--show-build', action='store_true', help='Include build artifacts')
report_parser = subparsers.add_parser('report', help='Show current status')
report_parser.add_argument('--format', choices=['text', 'json'], default='text', help='Report format')
report_parser.add_argument('--show-duplicates', action='store_true', help='Show duplicate files')
report_parser.add_argument('--preview-merge', help='Preview merge plan from file')
args = parser.parse_args()
tool = DiskReorganizer()
if args.command == 'index':
tool.index_disk(args.disk_root, args.disk_name)
elif args.command == 'dedupe':
tool.run_deduplication(disk=args.disk, use_chunks=not args.no_chunks)
elif args.command == 'merge':
tool.plan_merge(sources=args.sources, target=args.target, output_file=args.output, filter_system=args.filter_system, network_target=args.network)
elif args.command == 'plan':
plan = tool.plan_migration(args.target_disk, args.dest_disks)
if plan:
print(f"\nPlan generated: {plan['file_count']} files, {tool.format_size(plan['total_size'])}")
print(f"Destination disks: {', '.join(plan['destination_disks'])}")
elif args.command == 'execute':
tool.execute_migration(args.plan_file, dry_run=args.dry_run)
elif args.command == 'profile':
tool.profile_content(disk=args.disk, update_db=args.update, limit=args.limit)
elif args.command == 'extract':
tool.extract_content(kind=args.kind, limit=args.limit)
elif args.command == 'parse':
tool.parse_files(kind=args.kind, limit=args.limit, update_db=args.update)
elif args.command == 'enrich':
tool.enrich_files(limit=args.limit, llm_endpoint=args.llm_endpoint, use_local=args.local)
elif args.command == 'classify':
tool.classify_files(disk=args.disk, update_db=args.update, resume=not args.no_resume)
elif args.command == 'analyze-folders':
tool.analyze_folders(disk=args.disk, min_files=args.min_files)
elif args.command == 'review':
tool.review_migration(category=args.category, show_build=args.show_build)
elif args.command == 'report':
tool.generate_report(format=args.format, show_duplicates=args.show_duplicates, preview_merge=args.preview_merge)
if __name__ == '__main__':
main()

View File

@@ -1,27 +1,5 @@
"""Migration package exports"""
from .copy import (
CopyMigrationStrategy,
FastCopyStrategy,
SafeCopyStrategy,
ReferenceCopyStrategy
)
from .hardlink import (
HardlinkMigrationStrategy,
SymlinkMigrationStrategy,
DedupHardlinkStrategy
)
from .copy import CopyMigrationStrategy, FastCopyStrategy, SafeCopyStrategy, ReferenceCopyStrategy
from .hardlink import HardlinkMigrationStrategy, SymlinkMigrationStrategy, DedupHardlinkStrategy
from .engine import MigrationEngine
from ._protocols import IMigrationStrategy, IMigrationEngine
__all__ = [
'CopyMigrationStrategy',
'FastCopyStrategy',
'SafeCopyStrategy',
'ReferenceCopyStrategy',
'HardlinkMigrationStrategy',
'SymlinkMigrationStrategy',
'DedupHardlinkStrategy',
'MigrationEngine',
'IMigrationStrategy',
'IMigrationEngine',
]
__all__ = ['CopyMigrationStrategy', 'FastCopyStrategy', 'SafeCopyStrategy', 'ReferenceCopyStrategy', 'HardlinkMigrationStrategy', 'SymlinkMigrationStrategy', 'DedupHardlinkStrategy', 'MigrationEngine', 'IMigrationStrategy', 'IMigrationEngine']

View File

@@ -1,107 +1,28 @@
"""Protocol definitions for the migration package"""
from typing import Protocol
from pathlib import Path
from ..shared.models import OperationRecord
class IMigrationStrategy(Protocol):
"""Protocol for migration strategies"""
def migrate(
self,
source: Path,
destination: Path,
verify: bool = True
) -> bool:
"""Migrate a file from source to destination
Args:
source: Source file path
destination: Destination file path
verify: Whether to verify the operation
Returns:
True if migration successful
"""
def migrate(self, source: Path, destination: Path, verify: bool=True) -> bool:
...
def can_migrate(self, source: Path, destination: Path) -> bool:
"""Check if migration is possible
Args:
source: Source file path
destination: Destination file path
Returns:
True if migration is possible
"""
...
def estimate_time(self, source: Path) -> float:
"""Estimate migration time in seconds
Args:
source: Source file path
Returns:
Estimated time in seconds
"""
...
def cleanup(self, source: Path) -> bool:
"""Cleanup source file after successful migration
Args:
source: Source file path
Returns:
True if cleanup successful
"""
...
class IMigrationEngine(Protocol):
"""Protocol for migration engine"""
def plan_migration(
self,
disk: str,
target_base: Path
) -> list[OperationRecord]:
"""Plan migration for a disk
Args:
disk: Disk identifier
target_base: Target base directory
Returns:
List of planned operations
"""
def plan_migration(self, disk: str, target_base: Path) -> list[OperationRecord]:
...
def execute_migration(
self,
operations: list[OperationRecord],
dry_run: bool = False
) -> dict:
"""Execute migration operations
Args:
operations: List of operations to execute
dry_run: Whether to perform a dry run
Returns:
Dictionary with execution statistics
"""
def execute_migration(self, operations: list[OperationRecord], dry_run: bool=False) -> dict:
...
def rollback(self, operation: OperationRecord) -> bool:
"""Rollback a migration operation
Args:
operation: Operation to rollback
Returns:
True if rollback successful
"""
...

View File

@@ -1,268 +1,129 @@
"""Copy-based migration strategy"""
import shutil
from pathlib import Path
from typing import Optional
import os
from ..shared.logger import ProgressLogger
class CopyMigrationStrategy:
"""Copy files to destination with verification"""
def __init__(
self,
logger: Optional[ProgressLogger] = None,
preserve_metadata: bool = True,
verify_checksums: bool = True
):
"""Initialize copy migration strategy
Args:
logger: Optional progress logger
preserve_metadata: Whether to preserve file metadata
verify_checksums: Whether to verify checksums after copy
"""
def __init__(self, logger: Optional[ProgressLogger]=None, preserve_metadata: bool=True, verify_checksums: bool=True):
self.logger = logger
self.preserve_metadata = preserve_metadata
self.verify_checksums = verify_checksums
def migrate(
self,
source: Path,
destination: Path,
verify: bool = True
) -> bool:
"""Migrate file by copying
Args:
source: Source file path
destination: Destination file path
verify: Whether to verify the operation
Returns:
True if migration successful
"""
def migrate(self, source: Path, destination: Path, verify: bool=True) -> bool:
if not source.exists():
if self.logger:
self.logger.error(f"Source file does not exist: {source}")
self.logger.error(f'Source file does not exist: {source}')
return False
# Create destination directory
destination.parent.mkdir(parents=True, exist_ok=True)
try:
# Copy file
if self.preserve_metadata:
shutil.copy2(source, destination)
else:
shutil.copy(source, destination)
# Verify if requested
if verify and self.verify_checksums:
if not self._verify_copy(source, destination):
if self.logger:
self.logger.error(f"Verification failed: {source} -> {destination}")
self.logger.error(f'Verification failed: {source} -> {destination}')
destination.unlink()
return False
return True
except Exception as e:
if self.logger:
self.logger.error(f"Copy failed: {source} -> {destination}: {e}")
self.logger.error(f'Copy failed: {source} -> {destination}: {e}')
return False
def _verify_copy(self, source: Path, destination: Path) -> bool:
"""Verify copied file
Args:
source: Source file path
destination: Destination file path
Returns:
True if verification successful
"""
# Check size
source_size = source.stat().st_size
dest_size = destination.stat().st_size
if source_size != dest_size:
return False
# Compare checksums for files larger than 1MB
if source_size > 1024 * 1024:
from ..deduplication.chunker import hash_file
source_hash = hash_file(source)
dest_hash = hash_file(destination)
return source_hash == dest_hash
# For small files, compare content directly
with open(source, 'rb') as f1, open(destination, 'rb') as f2:
return f1.read() == f2.read()
def can_migrate(self, source: Path, destination: Path) -> bool:
"""Check if migration is possible
Args:
source: Source file path
destination: Destination file path
Returns:
True if migration is possible
"""
if not source.exists():
return False
# Check if destination directory is writable
dest_dir = destination.parent
if dest_dir.exists():
return os.access(dest_dir, os.W_OK)
# Check if parent directory exists and is writable
parent = dest_dir.parent
while not parent.exists() and parent != parent.parent:
parent = parent.parent
return parent.exists() and os.access(parent, os.W_OK)
def estimate_time(self, source: Path) -> float:
"""Estimate migration time in seconds
Args:
source: Source file path
Returns:
Estimated time in seconds
"""
if not source.exists():
return 0.0
size = source.stat().st_size
# Estimate based on typical copy speed (100 MB/s)
typical_speed = 100 * 1024 * 1024 # bytes per second
typical_speed = 100 * 1024 * 1024
return size / typical_speed
def cleanup(self, source: Path) -> bool:
"""Cleanup source file after successful migration
Args:
source: Source file path
Returns:
True if cleanup successful
"""
try:
if source.exists():
source.unlink()
return True
except Exception as e:
if self.logger:
self.logger.warning(f"Failed to cleanup {source}: {e}")
self.logger.warning(f'Failed to cleanup {source}: {e}')
return False
class FastCopyStrategy(CopyMigrationStrategy):
"""Fast copy strategy without verification"""
def __init__(self, logger: Optional[ProgressLogger] = None):
"""Initialize fast copy strategy"""
super().__init__(
logger=logger,
preserve_metadata=True,
verify_checksums=False
)
def __init__(self, logger: Optional[ProgressLogger]=None):
super().__init__(logger=logger, preserve_metadata=True, verify_checksums=False)
class SafeCopyStrategy(CopyMigrationStrategy):
"""Safe copy strategy with full verification"""
def __init__(self, logger: Optional[ProgressLogger] = None):
"""Initialize safe copy strategy"""
super().__init__(
logger=logger,
preserve_metadata=True,
verify_checksums=True
)
def __init__(self, logger: Optional[ProgressLogger]=None):
super().__init__(logger=logger, preserve_metadata=True, verify_checksums=True)
class ReferenceCopyStrategy:
"""Create reference copy using reflinks (CoW) if supported"""
def __init__(self, logger: Optional[ProgressLogger] = None):
"""Initialize reflink copy strategy"""
def __init__(self, logger: Optional[ProgressLogger]=None):
self.logger = logger
def migrate(
self,
source: Path,
destination: Path,
verify: bool = True
) -> bool:
"""Migrate using reflink (copy-on-write)
Args:
source: Source file path
destination: Destination file path
verify: Whether to verify the operation
Returns:
True if migration successful
"""
def migrate(self, source: Path, destination: Path, verify: bool=True) -> bool:
if not source.exists():
if self.logger:
self.logger.error(f"Source file does not exist: {source}")
self.logger.error(f'Source file does not exist: {source}')
return False
# Create destination directory
destination.parent.mkdir(parents=True, exist_ok=True)
try:
# Try reflink copy (works on btrfs, xfs, etc.)
import subprocess
result = subprocess.run(
['cp', '--reflink=auto', str(source), str(destination)],
capture_output=True,
check=False
)
result = subprocess.run(['cp', '--reflink=auto', str(source), str(destination)], capture_output=True, check=False)
if result.returncode != 0:
# Fallback to regular copy
shutil.copy2(source, destination)
return True
except Exception as e:
if self.logger:
self.logger.error(f"Reflink copy failed: {source} -> {destination}: {e}")
self.logger.error(f'Reflink copy failed: {source} -> {destination}: {e}')
return False
def can_migrate(self, source: Path, destination: Path) -> bool:
"""Check if migration is possible"""
if not source.exists():
return False
dest_dir = destination.parent
if dest_dir.exists():
return os.access(dest_dir, os.W_OK)
return True
def estimate_time(self, source: Path) -> float:
"""Estimate migration time (reflinks are fast)"""
return 0.1 # Reflinks are nearly instant
return 0.1
def cleanup(self, source: Path) -> bool:
"""Cleanup source file"""
try:
if source.exists():
source.unlink()
return True
except Exception as e:
if self.logger:
self.logger.warning(f"Failed to cleanup {source}: {e}")
self.logger.warning(f'Failed to cleanup {source}: {e}')
return False

View File

@@ -1,254 +1,100 @@
"""Migration engine"""
from pathlib import Path
from typing import Optional, Callable
from datetime import datetime
import psycopg2
from psycopg2.extras import execute_batch
from .copy import CopyMigrationStrategy, SafeCopyStrategy
from .hardlink import HardlinkMigrationStrategy, SymlinkMigrationStrategy
from ..shared.models import OperationRecord, ProcessingStats, MigrationPlan
from ..shared.config import DatabaseConfig, ProcessingConfig
from ..shared.logger import ProgressLogger
class MigrationEngine:
"""Engine for migrating files"""
def __init__(
self,
db_config: DatabaseConfig,
processing_config: ProcessingConfig,
logger: ProgressLogger,
target_base: Path
):
"""Initialize migration engine
Args:
db_config: Database configuration
processing_config: Processing configuration
logger: Progress logger
target_base: Target base directory for migrations
"""
def __init__(self, db_config: DatabaseConfig, processing_config: ProcessingConfig, logger: ProgressLogger, target_base: Path):
self.db_config = db_config
self.processing_config = processing_config
self.logger = logger
self.target_base = Path(target_base)
self._connection = None
# Initialize strategies
self.copy_strategy = SafeCopyStrategy(logger=logger)
self.hardlink_strategy = HardlinkMigrationStrategy(logger=logger)
self.symlink_strategy = SymlinkMigrationStrategy(logger=logger)
def _get_connection(self):
"""Get or create database connection"""
if self._connection is None or self._connection.closed:
self._connection = psycopg2.connect(
host=self.db_config.host,
port=self.db_config.port,
database=self.db_config.database,
user=self.db_config.user,
password=self.db_config.password
)
self._connection = psycopg2.connect(host=self.db_config.host, port=self.db_config.port, database=self.db_config.database, user=self.db_config.user, password=self.db_config.password)
return self._connection
def _ensure_tables(self):
"""Ensure migration tables exist"""
conn = self._get_connection()
cursor = conn.cursor()
# Create operations table
cursor.execute("""
CREATE TABLE IF NOT EXISTS operations (
id SERIAL PRIMARY KEY,
source_path TEXT NOT NULL,
target_path TEXT NOT NULL,
operation_type TEXT NOT NULL,
size BIGINT DEFAULT 0,
status TEXT DEFAULT 'pending',
error TEXT,
executed_at TIMESTAMP,
verified BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Create index on status
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_operations_status
ON operations(status)
""")
cursor.execute("\n CREATE TABLE IF NOT EXISTS operations (\n id SERIAL PRIMARY KEY,\n source_path TEXT NOT NULL,\n target_path TEXT NOT NULL,\n operation_type TEXT NOT NULL,\n size BIGINT DEFAULT 0,\n status TEXT DEFAULT 'pending',\n error TEXT,\n executed_at TIMESTAMP,\n verified BOOLEAN DEFAULT FALSE,\n created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n )\n ")
cursor.execute('\n CREATE INDEX IF NOT EXISTS idx_operations_status\n ON operations(status)\n ')
conn.commit()
cursor.close()
def plan_migration(
self,
disk: Optional[str] = None,
category: Optional[str] = None
) -> MigrationPlan:
"""Plan migration for files
Args:
disk: Optional disk filter
category: Optional category filter
Returns:
MigrationPlan with planned operations
"""
self.logger.section("Planning Migration")
def plan_migration(self, disk: Optional[str]=None, category: Optional[str]=None) -> MigrationPlan:
self.logger.section('Planning Migration')
conn = self._get_connection()
cursor = conn.cursor()
# Build query
conditions = ["category IS NOT NULL"]
conditions = ['category IS NOT NULL']
params = []
if disk:
conditions.append("disk_label = %s")
conditions.append('disk_label = %s')
params.append(disk)
if category:
conditions.append("category = %s")
conditions.append('category = %s')
params.append(category)
query = f"""
SELECT path, size, category, duplicate_of
FROM files
WHERE {' AND '.join(conditions)}
ORDER BY category, path
"""
query = f"\n SELECT path, size, category, duplicate_of\n FROM files\n WHERE {' AND '.join(conditions)}\n ORDER BY category, path\n "
cursor.execute(query, params)
files = cursor.fetchall()
self.logger.info(f"Found {len(files)} files to migrate")
self.logger.info(f'Found {len(files)} files to migrate')
operations = []
total_size = 0
for path_str, size, file_category, duplicate_of in files:
source = Path(path_str)
# Determine destination
target_path = self.target_base / file_category / source.name
# Determine operation type
if duplicate_of:
# Use hardlink for duplicates
operation_type = 'hardlink'
else:
# Use copy for unique files
operation_type = 'copy'
operation = OperationRecord(
source_path=source,
target_path=target_path,
operation_type=operation_type,
size=size
)
operation = OperationRecord(source_path=source, target_path=target_path, operation_type=operation_type, size=size)
operations.append(operation)
total_size += size
cursor.close()
plan = MigrationPlan(
target_disk=str(self.target_base),
destination_disks=[str(self.target_base)],
operations=operations,
total_size=total_size,
file_count=len(operations)
)
self.logger.info(
f"Migration plan created: {plan.file_count} files, "
f"{plan.total_size:,} bytes"
)
plan = MigrationPlan(target_disk=str(self.target_base), destination_disks=[str(self.target_base)], operations=operations, total_size=total_size, file_count=len(operations))
self.logger.info(f'Migration plan created: {plan.file_count} files, {plan.total_size:,} bytes')
return plan
def execute_migration(
self,
operations: list[OperationRecord],
dry_run: bool = False,
progress_callback: Optional[Callable[[int, int, ProcessingStats], None]] = None
) -> ProcessingStats:
"""Execute migration operations
Args:
operations: List of operations to execute
dry_run: Whether to perform a dry run
progress_callback: Optional callback for progress updates
Returns:
ProcessingStats with execution statistics
"""
self.logger.section("Executing Migration" + (" (DRY RUN)" if dry_run else ""))
def execute_migration(self, operations: list[OperationRecord], dry_run: bool=False, progress_callback: Optional[Callable[[int, int, ProcessingStats], None]]=None) -> ProcessingStats:
self.logger.section('Executing Migration' + (' (DRY RUN)' if dry_run else ''))
self._ensure_tables()
stats = ProcessingStats()
total_ops = len(operations)
for operation in operations:
stats.files_processed += 1
if dry_run:
# In dry run, just log what would happen
self.logger.debug(
f"[DRY RUN] Would {operation.operation_type}: "
f"{operation.source_path} -> {operation.target_path}"
)
self.logger.debug(f'[DRY RUN] Would {operation.operation_type}: {operation.source_path} -> {operation.target_path}')
stats.files_succeeded += 1
else:
# Execute actual migration
success = self._execute_operation(operation)
if success:
stats.files_succeeded += 1
stats.bytes_processed += operation.size
else:
stats.files_failed += 1
# Progress callback
if progress_callback and stats.files_processed % 100 == 0:
progress_callback(stats.files_processed, total_ops, stats)
# Log progress
if stats.files_processed % 1000 == 0:
self.logger.progress(
stats.files_processed,
total_ops,
prefix="Operations executed",
bytes_processed=stats.bytes_processed,
elapsed_seconds=stats.elapsed_seconds
)
self.logger.info(
f"Migration {'dry run' if dry_run else 'execution'} complete: "
f"{stats.files_succeeded}/{total_ops} operations, "
f"{stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s"
)
self.logger.progress(stats.files_processed, total_ops, prefix='Operations executed', bytes_processed=stats.bytes_processed, elapsed_seconds=stats.elapsed_seconds)
self.logger.info(f"Migration {('dry run' if dry_run else 'execution')} complete: {stats.files_succeeded}/{total_ops} operations, {stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s")
return stats
def _execute_operation(self, operation: OperationRecord) -> bool:
"""Execute a single migration operation
Args:
operation: Operation to execute
Returns:
True if successful
"""
operation.status = 'in_progress'
operation.executed_at = datetime.now()
try:
# Select strategy based on operation type
if operation.operation_type == 'copy':
strategy = self.copy_strategy
elif operation.operation_type == 'hardlink':
@@ -256,15 +102,8 @@ class MigrationEngine:
elif operation.operation_type == 'symlink':
strategy = self.symlink_strategy
else:
raise ValueError(f"Unknown operation type: {operation.operation_type}")
# Execute migration
success = strategy.migrate(
operation.source_path,
operation.target_path,
verify=self.processing_config.verify_operations
)
raise ValueError(f'Unknown operation type: {operation.operation_type}')
success = strategy.migrate(operation.source_path, operation.target_path, verify=self.processing_config.verify_operations)
if success:
operation.status = 'completed'
operation.verified = True
@@ -272,183 +111,85 @@ class MigrationEngine:
return True
else:
operation.status = 'failed'
operation.error = "Migration failed"
operation.error = 'Migration failed'
self._record_operation(operation)
return False
except Exception as e:
operation.status = 'failed'
operation.error = str(e)
self._record_operation(operation)
self.logger.error(f"Operation failed: {operation.source_path}: {e}")
self.logger.error(f'Operation failed: {operation.source_path}: {e}')
return False
def _record_operation(self, operation: OperationRecord):
"""Record operation in database
Args:
operation: Operation to record
"""
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO operations (
source_path, target_path, operation_type, bytes_processed,
status, error, executed_at, verified
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (
str(operation.source_path),
str(operation.target_path),
operation.operation_type,
operation.size,
operation.status,
operation.error,
operation.executed_at,
operation.verified
))
cursor.execute('\n INSERT INTO operations (\n source_path, target_path, operation_type, bytes_processed,\n status, error, executed_at, verified\n )\n VALUES (%s, %s, %s, %s, %s, %s, %s, %s)\n ', (str(operation.source_path), str(operation.target_path), operation.operation_type, operation.size, operation.status, operation.error, operation.executed_at, operation.verified))
conn.commit()
cursor.close()
def rollback(self, operation: OperationRecord) -> bool:
"""Rollback a migration operation
Args:
operation: Operation to rollback
Returns:
True if rollback successful
"""
self.logger.warning(f"Rolling back: {operation.target_path}")
self.logger.warning(f'Rolling back: {operation.target_path}')
try:
# Remove destination
if operation.target_path.exists():
operation.target_path.unlink()
# Update database
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE operations
SET status = 'rolled_back'
WHERE source_path = %s AND target_path = %s
""", (str(operation.source_path), str(operation.target_path)))
cursor.execute("\n UPDATE operations\n SET status = 'rolled_back'\n WHERE source_path = %s AND target_path = %s\n ", (str(operation.source_path), str(operation.target_path)))
conn.commit()
cursor.close()
return True
except Exception as e:
self.logger.error(f"Rollback failed: {operation.target_path}: {e}")
self.logger.error(f'Rollback failed: {operation.target_path}: {e}')
return False
def get_migration_stats(self) -> dict:
"""Get migration statistics
Returns:
Dictionary with statistics
"""
conn = self._get_connection()
cursor = conn.cursor()
stats = {}
# Total operations
cursor.execute("SELECT COUNT(*) FROM operations")
cursor.execute('SELECT COUNT(*) FROM operations')
stats['total_operations'] = cursor.fetchone()[0]
# Operations by status
cursor.execute("""
SELECT status, COUNT(*)
FROM operations
GROUP BY status
""")
cursor.execute('\n SELECT status, COUNT(*)\n FROM operations\n GROUP BY status\n ')
for status, count in cursor.fetchall():
stats[f'{status}_operations'] = count
# Total size migrated
cursor.execute("""
SELECT COALESCE(SUM(size), 0)
FROM operations
WHERE status = 'completed'
""")
cursor.execute("\n SELECT COALESCE(SUM(size), 0)\n FROM operations\n WHERE status = 'completed'\n ")
stats['total_size_migrated'] = cursor.fetchone()[0]
cursor.close()
return stats
def verify_migrations(self) -> dict:
"""Verify completed migrations
Returns:
Dictionary with verification results
"""
self.logger.subsection("Verifying Migrations")
self.logger.subsection('Verifying Migrations')
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT source_path, target_path, operation_type
FROM operations
WHERE status = 'completed' AND verified = FALSE
""")
cursor.execute("\n SELECT source_path, target_path, operation_type\n FROM operations\n WHERE status = 'completed' AND verified = FALSE\n ")
operations = cursor.fetchall()
cursor.close()
results = {
'total': len(operations),
'verified': 0,
'failed': 0
}
results = {'total': len(operations), 'verified': 0, 'failed': 0}
for source_str, dest_str, op_type in operations:
source = Path(source_str)
dest = Path(dest_str)
# Verify destination exists
if not dest.exists():
results['failed'] += 1
self.logger.warning(f"Verification failed: {dest} does not exist")
self.logger.warning(f'Verification failed: {dest} does not exist')
continue
# Verify based on operation type
if op_type == 'hardlink':
# Check if hardlinked
if source.exists() and source.stat().st_ino == dest.stat().st_ino:
results['verified'] += 1
else:
results['failed'] += 1
elif dest.exists():
results['verified'] += 1
else:
# Check if destination exists and has correct size
if dest.exists():
results['verified'] += 1
else:
results['failed'] += 1
self.logger.info(
f"Verification complete: {results['verified']}/{results['total']} verified"
)
results['failed'] += 1
self.logger.info(f"Verification complete: {results['verified']}/{results['total']} verified")
return results
def close(self):
"""Close database connection"""
if self._connection and not self._connection.closed:
if self._connection and (not self._connection.closed):
self._connection.close()
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.close()

View File

@@ -1,90 +1,43 @@
"""Hardlink-based migration strategy"""
import os
from pathlib import Path
from typing import Optional
from ..shared.logger import ProgressLogger
class HardlinkMigrationStrategy:
"""Create hardlinks to files instead of copying"""
def __init__(self, logger: Optional[ProgressLogger] = None):
"""Initialize hardlink migration strategy
Args:
logger: Optional progress logger
"""
def __init__(self, logger: Optional[ProgressLogger]=None):
self.logger = logger
def migrate(
self,
source: Path,
destination: Path,
verify: bool = True
) -> bool:
"""Migrate file by creating hardlink
Args:
source: Source file path
destination: Destination file path
verify: Whether to verify the operation
Returns:
True if migration successful
"""
def migrate(self, source: Path, destination: Path, verify: bool=True) -> bool:
if not source.exists():
if self.logger:
self.logger.error(f"Source file does not exist: {source}")
self.logger.error(f'Source file does not exist: {source}')
return False
# Check if source and destination are on same filesystem
if not self._same_filesystem(source, destination.parent):
if self.logger:
self.logger.warning(
f"Cannot hardlink across filesystems: {source} -> {destination}"
)
self.logger.warning(f'Cannot hardlink across filesystems: {source} -> {destination}')
return False
# Create destination directory
destination.parent.mkdir(parents=True, exist_ok=True)
try:
# Create hardlink
os.link(source, destination)
# Verify if requested
if verify:
if not self._verify_hardlink(source, destination):
if self.logger:
self.logger.error(f"Verification failed: {source} -> {destination}")
self.logger.error(f'Verification failed: {source} -> {destination}')
destination.unlink()
return False
return True
except FileExistsError:
if self.logger:
self.logger.warning(f"Destination already exists: {destination}")
self.logger.warning(f'Destination already exists: {destination}')
return False
except Exception as e:
if self.logger:
self.logger.error(f"Hardlink failed: {source} -> {destination}: {e}")
self.logger.error(f'Hardlink failed: {source} -> {destination}: {e}')
return False
def _same_filesystem(self, path1: Path, path2: Path) -> bool:
"""Check if two paths are on the same filesystem
Args:
path1: First path
path2: Second path
Returns:
True if on same filesystem
"""
try:
# Get device IDs
stat1 = path1.stat()
stat2 = path2.stat()
return stat1.st_dev == stat2.st_dev
@@ -92,286 +45,117 @@ class HardlinkMigrationStrategy:
return False
def _verify_hardlink(self, source: Path, destination: Path) -> bool:
"""Verify hardlink
Args:
source: Source file path
destination: Destination file path
Returns:
True if verification successful
"""
try:
# Check if they have the same inode
source_stat = source.stat()
dest_stat = destination.stat()
return source_stat.st_ino == dest_stat.st_ino
except Exception:
return False
def can_migrate(self, source: Path, destination: Path) -> bool:
"""Check if migration is possible
Args:
source: Source file path
destination: Destination file path
Returns:
True if migration is possible
"""
if not source.exists():
return False
# Check if on same filesystem
dest_dir = destination.parent
if dest_dir.exists():
return self._same_filesystem(source, dest_dir)
# Check parent directories
parent = dest_dir.parent
while not parent.exists() and parent != parent.parent:
parent = parent.parent
return parent.exists() and self._same_filesystem(source, parent)
def estimate_time(self, source: Path) -> float:
"""Estimate migration time in seconds
Args:
source: Source file path
Returns:
Estimated time in seconds (hardlinks are instant)
"""
return 0.01 # Hardlinks are nearly instant
return 0.01
def cleanup(self, source: Path) -> bool:
"""Cleanup source file after successful migration
Note: For hardlinks, we typically don't remove the source
immediately as both links point to the same inode.
Args:
source: Source file path
Returns:
True (no cleanup needed for hardlinks)
"""
# For hardlinks, we don't remove the source
# Both source and destination point to the same data
return True
class SymlinkMigrationStrategy:
"""Create symbolic links to files"""
def __init__(
self,
logger: Optional[ProgressLogger] = None,
absolute_links: bool = True
):
"""Initialize symlink migration strategy
Args:
logger: Optional progress logger
absolute_links: Whether to create absolute symlinks
"""
def __init__(self, logger: Optional[ProgressLogger]=None, absolute_links: bool=True):
self.logger = logger
self.absolute_links = absolute_links
def migrate(
self,
source: Path,
destination: Path,
verify: bool = True
) -> bool:
"""Migrate file by creating symlink
Args:
source: Source file path
destination: Destination file path
verify: Whether to verify the operation
Returns:
True if migration successful
"""
def migrate(self, source: Path, destination: Path, verify: bool=True) -> bool:
if not source.exists():
if self.logger:
self.logger.error(f"Source file does not exist: {source}")
self.logger.error(f'Source file does not exist: {source}')
return False
# Create destination directory
destination.parent.mkdir(parents=True, exist_ok=True)
try:
# Determine link target
if self.absolute_links:
target = source.resolve()
else:
# Create relative symlink
target = os.path.relpath(source, destination.parent)
# Create symlink
destination.symlink_to(target)
# Verify if requested
if verify:
if not self._verify_symlink(destination, source):
if self.logger:
self.logger.error(f"Verification failed: {source} -> {destination}")
self.logger.error(f'Verification failed: {source} -> {destination}')
destination.unlink()
return False
return True
except FileExistsError:
if self.logger:
self.logger.warning(f"Destination already exists: {destination}")
self.logger.warning(f'Destination already exists: {destination}')
return False
except Exception as e:
if self.logger:
self.logger.error(f"Symlink failed: {source} -> {destination}: {e}")
self.logger.error(f'Symlink failed: {source} -> {destination}: {e}')
return False
def _verify_symlink(self, symlink: Path, expected_target: Path) -> bool:
"""Verify symlink
Args:
symlink: Symlink path
expected_target: Expected target path
Returns:
True if verification successful
"""
try:
# Check if it's a symlink
if not symlink.is_symlink():
return False
# Resolve and compare
resolved = symlink.resolve()
expected = expected_target.resolve()
return resolved == expected
except Exception:
return False
def can_migrate(self, source: Path, destination: Path) -> bool:
"""Check if migration is possible
Args:
source: Source file path
destination: Destination file path
Returns:
True if migration is possible
"""
if not source.exists():
return False
# Check if destination directory is writable
dest_dir = destination.parent
if dest_dir.exists():
return os.access(dest_dir, os.W_OK)
return True
def estimate_time(self, source: Path) -> float:
"""Estimate migration time in seconds
Args:
source: Source file path
Returns:
Estimated time in seconds (symlinks are instant)
"""
return 0.01 # Symlinks are instant
return 0.01
def cleanup(self, source: Path) -> bool:
"""Cleanup source file after successful migration
Note: For symlinks, we don't remove the source as the
symlink points to it.
Args:
source: Source file path
Returns:
True (no cleanup needed for symlinks)
"""
# For symlinks, we don't remove the source
return True
class DedupHardlinkStrategy(HardlinkMigrationStrategy):
"""Hardlink strategy for deduplication
Creates hardlinks for duplicate files to save space.
"""
def __init__(self, logger: Optional[ProgressLogger] = None):
"""Initialize dedup hardlink strategy"""
def __init__(self, logger: Optional[ProgressLogger]=None):
super().__init__(logger=logger)
def deduplicate(
self,
canonical: Path,
duplicate: Path
) -> bool:
"""Replace duplicate with hardlink to canonical
Args:
canonical: Canonical file path
duplicate: Duplicate file path
Returns:
True if deduplication successful
"""
def deduplicate(self, canonical: Path, duplicate: Path) -> bool:
if not canonical.exists():
if self.logger:
self.logger.error(f"Canonical file does not exist: {canonical}")
self.logger.error(f'Canonical file does not exist: {canonical}')
return False
if not duplicate.exists():
if self.logger:
self.logger.error(f"Duplicate file does not exist: {duplicate}")
self.logger.error(f'Duplicate file does not exist: {duplicate}')
return False
# Check if already hardlinked
if self._verify_hardlink(canonical, duplicate):
return True
# Check if on same filesystem
if not self._same_filesystem(canonical, duplicate):
if self.logger:
self.logger.warning(
f"Cannot hardlink across filesystems: {canonical} -> {duplicate}"
)
self.logger.warning(f'Cannot hardlink across filesystems: {canonical} -> {duplicate}')
return False
try:
# Create temporary backup
backup = duplicate.with_suffix(duplicate.suffix + '.bak')
duplicate.rename(backup)
# Create hardlink
os.link(canonical, duplicate)
# Remove backup
backup.unlink()
return True
except Exception as e:
if self.logger:
self.logger.error(f"Deduplication failed: {duplicate}: {e}")
# Restore from backup
self.logger.error(f'Deduplication failed: {duplicate}: {e}')
if backup.exists():
backup.rename(duplicate)
return False

View File

@@ -0,0 +1,62 @@
from pathlib import Path
from typing import Dict
import logging
logger = logging.getLogger(__name__)
class AudioParser:
def __init__(self, whisper_model: str = 'base'):
self.supported_formats = {'.mp3', '.wav', '.flac', '.m4a', '.ogg', '.wma', '.aac'}
self.whisper_model = whisper_model
def parse(self, file_path: Path) -> Dict:
if file_path.suffix.lower() not in self.supported_formats:
return {'error': f'Unsupported format: {file_path.suffix}'}
try:
return self._transcribe_with_whisper(file_path)
except Exception as e:
logger.error(f'Audio parse failed for {file_path}: {e}')
return {'error': str(e), 'text': ''}
def _transcribe_with_whisper(self, file_path: Path) -> Dict:
try:
import whisper
model = whisper.load_model(self.whisper_model)
result = model.transcribe(str(file_path))
return {
'text': result['text'].strip(),
'quality': 'good',
'method': f'whisper-{self.whisper_model}',
'language': result.get('language', 'unknown'),
'segments': len(result.get('segments', [])),
'metadata': {
'duration': result.get('duration'),
'language': result.get('language')
}
}
except ImportError:
logger.warning('Whisper not installed')
return {'error': 'Whisper not installed', 'text': '', 'needs': 'pip install openai-whisper'}
except Exception as e:
return {'error': str(e), 'text': ''}
def extract_metadata(self, file_path: Path) -> Dict:
try:
import mutagen
audio = mutagen.File(str(file_path))
if audio is None:
return {'error': 'Could not read audio file'}
return {
'duration': audio.info.length if hasattr(audio.info, 'length') else None,
'bitrate': audio.info.bitrate if hasattr(audio.info, 'bitrate') else None,
'sample_rate': audio.info.sample_rate if hasattr(audio.info, 'sample_rate') else None,
'channels': audio.info.channels if hasattr(audio.info, 'channels') else None
}
except ImportError:
return {'error': 'mutagen not installed', 'needs': 'pip install mutagen'}
except Exception as e:
return {'error': str(e)}

View File

@@ -0,0 +1,60 @@
from pathlib import Path
from typing import Dict
import logging
import subprocess
logger = logging.getLogger(__name__)
class DocumentParser:
def __init__(self):
self.supported_formats = {'.doc', '.docx', '.odt', '.rtf'}
def parse(self, file_path: Path) -> Dict:
if file_path.suffix.lower() not in self.supported_formats:
return {'error': f'Unsupported format: {file_path.suffix}'}
try:
if file_path.suffix.lower() in {'.docx', '.odt'}:
return self._parse_with_python(file_path)
else:
return self._parse_with_external(file_path)
except Exception as e:
logger.error(f'Document parse failed for {file_path}: {e}')
return {'error': str(e), 'text': ''}
def _parse_with_python(self, file_path: Path) -> Dict:
try:
if file_path.suffix.lower() == '.docx':
import docx
doc = docx.Document(str(file_path))
text = '\n'.join([para.text for para in doc.paragraphs if para.text.strip()])
return {'text': text, 'quality': 'good', 'method': 'python-docx'}
elif file_path.suffix.lower() == '.odt':
from odf import text as odf_text, teletype
from odf.opendocument import load
doc = load(str(file_path))
paragraphs = doc.getElementsByType(odf_text.P)
text = '\n'.join([teletype.extractText(p) for p in paragraphs if teletype.extractText(p).strip()])
return {'text': text, 'quality': 'good', 'method': 'odfpy'}
except ImportError as ie:
logger.warning(f'Missing library for {file_path.suffix}: {ie}')
return {'error': f'Missing library: {ie}', 'text': '', 'needs': 'python-docx or odfpy'}
except Exception as e:
return {'error': str(e), 'text': ''}
def _parse_with_external(self, file_path: Path) -> Dict:
try:
result = subprocess.run(
['antiword', str(file_path)],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
return {'text': result.stdout, 'quality': 'good', 'method': 'antiword'}
else:
return {'error': 'antiword failed', 'text': '', 'needs': 'antiword tool'}
except FileNotFoundError:
return {'error': 'antiword not installed', 'text': '', 'needs': 'sudo apt install antiword'}
except Exception as e:
return {'error': str(e), 'text': ''}

View File

@@ -0,0 +1,63 @@
from pathlib import Path
from typing import Dict
import logging
logger = logging.getLogger(__name__)
class ImageParser:
def __init__(self):
self.supported_formats = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'}
def parse(self, file_path: Path) -> Dict:
if file_path.suffix.lower() not in self.supported_formats:
return {'error': f'Unsupported format: {file_path.suffix}'}
try:
return self._parse_with_ocr(file_path)
except Exception as e:
logger.error(f'Image parse failed for {file_path}: {e}')
return {'error': str(e), 'text': ''}
def _parse_with_ocr(self, file_path: Path) -> Dict:
try:
from PIL import Image
import pytesseract
img = Image.open(str(file_path))
text = pytesseract.image_to_string(img)
data = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT)
conf_scores = [int(c) for c in data['conf'] if c != '-1']
avg_confidence = sum(conf_scores) / len(conf_scores) if conf_scores else 0
quality = 'good' if avg_confidence > 80 else 'medium' if avg_confidence > 60 else 'low'
return {
'text': text.strip(),
'quality': quality,
'confidence': avg_confidence,
'method': 'tesseract',
'metadata': {
'width': img.width,
'height': img.height,
'format': img.format
}
}
except ImportError as ie:
logger.warning(f'Missing library for OCR: {ie}')
return {'error': f'Missing library: {ie}', 'text': '', 'needs': 'pytesseract and tesseract-ocr'}
except Exception as e:
return {'error': str(e), 'text': ''}
def extract_metadata(self, file_path: Path) -> Dict:
try:
from PIL import Image
img = Image.open(str(file_path))
return {
'width': img.width,
'height': img.height,
'format': img.format,
'mode': img.mode
}
except Exception as e:
return {'error': str(e)}

View File

@@ -0,0 +1,65 @@
import os
import subprocess
import tempfile
from pathlib import Path
from typing import Dict, Optional
import logging
logger = logging.getLogger(__name__)
class TranscriptionParser:
def __init__(self, model: str = 'base'):
self.model = model
self.whisper_available = self._check_whisper()
def _check_whisper(self) -> bool:
try:
import whisper
return True
except ImportError:
logger.warning('Whisper not installed. Install with: pip install openai-whisper')
return False
def parse(self, file_path: Path) -> Dict:
if not self.whisper_available:
return {'success': False, 'error': 'Whisper not available', 'text': ''}
if not self._is_supported(file_path):
return {'success': False, 'error': 'Unsupported file type', 'text': ''}
try:
import whisper
logger.info(f'Transcribing {file_path} with Whisper model={self.model}')
model = whisper.load_model(self.model)
result = model.transcribe(str(file_path))
return {
'success': True,
'text': result['text'],
'segments': result.get('segments', []),
'language': result.get('language', 'unknown')
}
except Exception as e:
logger.error(f'Transcription failed for {file_path}: {e}')
return {'success': False, 'error': str(e), 'text': ''}
def _is_supported(self, file_path: Path) -> bool:
supported = {'.mp3', '.mp4', '.wav', '.m4a', '.flac', '.ogg', '.avi', '.mkv', '.webm'}
return file_path.suffix.lower() in supported
def parse_with_timestamps(self, file_path: Path) -> Dict:
result = self.parse(file_path)
if not result['success']:
return result
segments = result.get('segments', [])
timestamped_text = []
for seg in segments:
start = seg.get('start', 0)
end = seg.get('end', 0)
text = seg.get('text', '').strip()
timestamped_text.append(f'[{start:.2f}s - {end:.2f}s] {text}')
result['timestamped_text'] = '\n'.join(timestamped_text)
return result

View File

@@ -38,4 +38,7 @@ black>=23.0.0
mypy>=1.0.0
flake8>=6.0.0
chardet
chardet
pillow
requests
openai-whisper

View File

@@ -0,0 +1,76 @@
CREATE TABLE IF NOT EXISTS content_nodes (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
node_type VARCHAR(50) NOT NULL,
path TEXT NOT NULL,
disk_label VARCHAR(50),
parent_id UUID REFERENCES content_nodes(id) ON DELETE CASCADE,
checksum VARCHAR(64),
size BIGINT,
modified_time TIMESTAMP,
content_hash VARCHAR(64),
extracted_at TIMESTAMP,
extraction_method VARCHAR(100),
metadata JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
UNIQUE(node_type, path, disk_label)
);
CREATE INDEX IF NOT EXISTS idx_content_nodes_type ON content_nodes(node_type);
CREATE INDEX IF NOT EXISTS idx_content_nodes_path ON content_nodes(path);
CREATE INDEX IF NOT EXISTS idx_content_nodes_parent ON content_nodes(parent_id);
CREATE INDEX IF NOT EXISTS idx_content_nodes_checksum ON content_nodes(checksum);
CREATE INDEX IF NOT EXISTS idx_content_nodes_content_hash ON content_nodes(content_hash);
CREATE TABLE IF NOT EXISTS content_edges (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
source_id UUID NOT NULL REFERENCES content_nodes(id) ON DELETE CASCADE,
target_id UUID NOT NULL REFERENCES content_nodes(id) ON DELETE CASCADE,
edge_type VARCHAR(50) NOT NULL,
metadata JSONB,
confidence FLOAT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
UNIQUE(source_id, target_id, edge_type)
);
CREATE INDEX IF NOT EXISTS idx_content_edges_source ON content_edges(source_id);
CREATE INDEX IF NOT EXISTS idx_content_edges_target ON content_edges(target_id);
CREATE INDEX IF NOT EXISTS idx_content_edges_type ON content_edges(edge_type);
CREATE TABLE IF NOT EXISTS extraction_log (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
node_id UUID REFERENCES content_nodes(id) ON DELETE CASCADE,
file_path TEXT NOT NULL,
file_checksum VARCHAR(64),
extraction_method VARCHAR(100),
status VARCHAR(50),
error_message TEXT,
extracted_size BIGINT,
processing_time_ms INT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_extraction_log_node ON extraction_log(node_id);
CREATE INDEX IF NOT EXISTS idx_extraction_log_file ON extraction_log(file_path);
CREATE INDEX IF NOT EXISTS idx_extraction_log_checksum ON extraction_log(file_checksum);
CREATE INDEX IF NOT EXISTS idx_extraction_log_status ON extraction_log(status);
CREATE INDEX IF NOT EXISTS idx_extraction_log_created ON extraction_log(created_at DESC);
COMMENT ON TABLE content_nodes IS 'Content graph nodes: directories, files, chunks';
COMMENT ON TABLE content_edges IS 'Content graph edges: contains, derived_from, references, duplicates';
COMMENT ON TABLE extraction_log IS 'Tracks extraction history for incremental updates';
COMMENT ON COLUMN content_nodes.node_type IS 'directory, file, chunk, embedding';
COMMENT ON COLUMN content_nodes.content_hash IS 'Hash of extracted content (not file bytes)';
COMMENT ON COLUMN content_edges.edge_type IS 'contains, derived_from, references, duplicates, similar_to';