Compare commits
4 Commits
2b2c575385
...
1583df8f57
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1583df8f57 | ||
|
|
f6aa2b7b76 | ||
|
|
78042ff2a2 | ||
|
|
7ce8c8c73d |
150
app/analysis/inventory.py
Normal file
150
app/analysis/inventory.py
Normal file
@@ -0,0 +1,150 @@
|
||||
import psycopg2
|
||||
from typing import Dict, Optional
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class FileTypeInventory:
|
||||
def __init__(self, db_config: Dict):
|
||||
self.db_config = db_config
|
||||
|
||||
self.parseable_extensions = {
|
||||
'text': {'txt', 'md', 'log', 'json', 'yaml', 'yml', 'xml', 'csv', 'tsv', 'ini', 'cfg', 'conf'},
|
||||
'code': {'py', 'js', 'java', 'go', 'rs', 'ts', 'tsx', 'jsx', 'cpp', 'h', 'c', 'cs', 'rb', 'php', 'sh', 'bat', 'ps1', 'sql', 'r', 'scala', 'kt'},
|
||||
'pdf': {'pdf'},
|
||||
'document': {'doc', 'docx', 'odt', 'rtf', 'pages'},
|
||||
'spreadsheet': {'xls', 'xlsx', 'ods', 'numbers'},
|
||||
'presentation': {'ppt', 'pptx', 'odp', 'key'},
|
||||
'image': {'jpg', 'jpeg', 'png', 'gif', 'bmp', 'tiff', 'webp', 'svg', 'ico'},
|
||||
'audio': {'mp3', 'wav', 'flac', 'm4a', 'ogg', 'wma', 'aac', 'opus'},
|
||||
'video': {'mp4', 'avi', 'mkv', 'mov', 'wmv', 'flv', 'webm', 'mpg', 'mpeg'},
|
||||
'archive': {'zip', 'tar', 'gz', 'bz2', '7z', 'rar', 'xz'},
|
||||
'executable': {'exe', 'dll', 'so', 'dylib', 'bin', 'app'},
|
||||
'data': {'db', 'sqlite', 'mdb', 'accdb', 'pkl', 'parquet', 'feather', 'arrow'}
|
||||
}
|
||||
|
||||
self.implemented_parsers = {
|
||||
'text': True,
|
||||
'code': True,
|
||||
'pdf': True,
|
||||
'document': False,
|
||||
'spreadsheet': False,
|
||||
'presentation': False,
|
||||
'image': False,
|
||||
'audio': False,
|
||||
'video': False,
|
||||
'archive': False,
|
||||
'executable': False,
|
||||
'data': False
|
||||
}
|
||||
|
||||
def get_connection(self):
|
||||
return psycopg2.connect(**self.db_config)
|
||||
|
||||
def analyze(self, disk: Optional[str] = None, limit: int = 100):
|
||||
conn = self.get_connection()
|
||||
cursor = conn.cursor()
|
||||
try:
|
||||
query = '''
|
||||
SELECT
|
||||
CASE
|
||||
WHEN path ~ '\\.([a-zA-Z0-9]+)$' THEN
|
||||
LOWER(SUBSTRING(path FROM '\\.([a-zA-Z0-9]+)$'))
|
||||
ELSE 'no_extension'
|
||||
END as extension,
|
||||
COUNT(*) as count,
|
||||
SUM(size)::bigint as total_size,
|
||||
ROUND(AVG(size)::numeric, 0) as avg_size,
|
||||
MAX(size) as max_size,
|
||||
COUNT(CASE WHEN extracted_text IS NOT NULL THEN 1 END) as parsed_count
|
||||
FROM files
|
||||
'''
|
||||
params = []
|
||||
if disk:
|
||||
query += ' WHERE disk_label = %s'
|
||||
params.append(disk)
|
||||
|
||||
query += ' GROUP BY extension ORDER BY count DESC'
|
||||
if limit:
|
||||
query += f' LIMIT {limit}'
|
||||
|
||||
cursor.execute(query, params)
|
||||
results = cursor.fetchall()
|
||||
|
||||
return self._format_results(results)
|
||||
|
||||
finally:
|
||||
cursor.close()
|
||||
conn.close()
|
||||
|
||||
def _format_results(self, results):
|
||||
total_files = 0
|
||||
total_size = 0
|
||||
parseable_files = 0
|
||||
parsed_files = 0
|
||||
unparsed_by_type = {}
|
||||
|
||||
extension_details = []
|
||||
for row in results:
|
||||
ext, count, size, avg, max_sz, parsed = row
|
||||
total_files += int(count)
|
||||
total_size += int(size or 0)
|
||||
parsed_files += int(parsed or 0)
|
||||
|
||||
parser_type = self._get_parser_type(ext)
|
||||
is_parseable = parser_type != 'none' and self.implemented_parsers.get(parser_type, False)
|
||||
|
||||
if is_parseable:
|
||||
parseable_files += int(count)
|
||||
unparsed_count = int(count) - int(parsed or 0)
|
||||
if unparsed_count > 0:
|
||||
if parser_type not in unparsed_by_type:
|
||||
unparsed_by_type[parser_type] = {'count': 0, 'extensions': set()}
|
||||
unparsed_by_type[parser_type]['count'] += unparsed_count
|
||||
unparsed_by_type[parser_type]['extensions'].add(ext)
|
||||
|
||||
extension_details.append({
|
||||
'extension': ext,
|
||||
'count': int(count),
|
||||
'total_size': int(size or 0),
|
||||
'avg_size': int(avg or 0),
|
||||
'max_size': int(max_sz or 0),
|
||||
'parsed': int(parsed or 0),
|
||||
'parser_type': parser_type,
|
||||
'is_parseable': is_parseable
|
||||
})
|
||||
|
||||
return {
|
||||
'extensions': extension_details,
|
||||
'summary': {
|
||||
'total_files': total_files,
|
||||
'total_size': total_size,
|
||||
'parseable_files': parseable_files,
|
||||
'parsed_files': parsed_files,
|
||||
'coverage': (parsed_files / parseable_files * 100) if parseable_files > 0 else 0
|
||||
},
|
||||
'unparsed_by_type': unparsed_by_type,
|
||||
'parser_status': self._get_parser_status()
|
||||
}
|
||||
|
||||
def _get_parser_type(self, ext: str) -> str:
|
||||
for ptype, extensions in self.parseable_extensions.items():
|
||||
if ext in extensions:
|
||||
return ptype
|
||||
return 'none'
|
||||
|
||||
def _get_parser_status(self):
|
||||
return {
|
||||
ptype: {
|
||||
'implemented': self.implemented_parsers.get(ptype, False),
|
||||
'extensions': list(exts)
|
||||
}
|
||||
for ptype, exts in self.parseable_extensions.items()
|
||||
}
|
||||
|
||||
def format_size(self, size_bytes: int) -> str:
|
||||
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
|
||||
if size_bytes < 1024:
|
||||
return f'{size_bytes:.1f}{unit}'
|
||||
size_bytes /= 1024
|
||||
return f'{size_bytes:.1f}PB'
|
||||
@@ -1,21 +1,4 @@
|
||||
"""Deduplication package exports"""
|
||||
from .chunker import (
|
||||
RabinChunker,
|
||||
SimpleChunker,
|
||||
hash_chunk,
|
||||
hash_file,
|
||||
compute_file_signature
|
||||
)
|
||||
from .chunker import RabinChunker, SimpleChunker, hash_chunk, hash_file, compute_file_signature
|
||||
from .store import HashStore, MemoryHashStore
|
||||
from .engine import DeduplicationEngine
|
||||
|
||||
__all__ = [
|
||||
'RabinChunker',
|
||||
'SimpleChunker',
|
||||
'hash_chunk',
|
||||
'hash_file',
|
||||
'compute_file_signature',
|
||||
'HashStore',
|
||||
'MemoryHashStore',
|
||||
'DeduplicationEngine',
|
||||
]
|
||||
__all__ = ['RabinChunker', 'SimpleChunker', 'hash_chunk', 'hash_file', 'compute_file_signature', 'HashStore', 'MemoryHashStore', 'DeduplicationEngine']
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
|
||||
|
||||
@@ -1,75 +1,29 @@
|
||||
"""Rabin fingerprint chunker for content-defined chunking"""
|
||||
import hashlib
|
||||
from pathlib import Path
|
||||
from typing import Iterator, Optional
|
||||
|
||||
|
||||
class RabinChunker:
|
||||
"""Content-defined chunking using Rabin fingerprinting
|
||||
|
||||
Uses a rolling hash to identify chunk boundaries based on content,
|
||||
allowing for efficient deduplication even when data is modified.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
avg_chunk_size: int = 8192,
|
||||
min_chunk_size: Optional[int] = None,
|
||||
max_chunk_size: Optional[int] = None,
|
||||
window_size: int = 48
|
||||
):
|
||||
"""Initialize Rabin chunker
|
||||
|
||||
Args:
|
||||
avg_chunk_size: Target average chunk size in bytes
|
||||
min_chunk_size: Minimum chunk size (default: avg_chunk_size // 4)
|
||||
max_chunk_size: Maximum chunk size (default: avg_chunk_size * 8)
|
||||
window_size: Rolling hash window size
|
||||
"""
|
||||
def __init__(self, avg_chunk_size: int=8192, min_chunk_size: Optional[int]=None, max_chunk_size: Optional[int]=None, window_size: int=48):
|
||||
self.avg_chunk_size = avg_chunk_size
|
||||
self.min_chunk_size = min_chunk_size or (avg_chunk_size // 4)
|
||||
self.max_chunk_size = max_chunk_size or (avg_chunk_size * 8)
|
||||
self.min_chunk_size = min_chunk_size or avg_chunk_size // 4
|
||||
self.max_chunk_size = max_chunk_size or avg_chunk_size * 8
|
||||
self.window_size = window_size
|
||||
|
||||
# Calculate mask for boundary detection
|
||||
# For avg_chunk_size, we want boundaries at 1/avg_chunk_size probability
|
||||
bits = 0
|
||||
size = avg_chunk_size
|
||||
while size > 1:
|
||||
bits += 1
|
||||
size >>= 1
|
||||
self.mask = (1 << bits) - 1
|
||||
self.poly = 17349423945073011
|
||||
|
||||
# Polynomial for rolling hash (prime number)
|
||||
self.poly = 0x3DA3358B4DC173
|
||||
|
||||
def chunk_file(self, file_path: Path, chunk_size: Optional[int] = None) -> Iterator[bytes]:
|
||||
"""Chunk a file using Rabin fingerprinting
|
||||
|
||||
Args:
|
||||
file_path: Path to file to chunk
|
||||
chunk_size: If provided, use fixed-size chunking instead
|
||||
|
||||
Yields:
|
||||
Chunk data as bytes
|
||||
"""
|
||||
def chunk_file(self, file_path: Path, chunk_size: Optional[int]=None) -> Iterator[bytes]:
|
||||
if chunk_size:
|
||||
# Use fixed-size chunking
|
||||
yield from self._chunk_fixed(file_path, chunk_size)
|
||||
else:
|
||||
# Use content-defined chunking
|
||||
yield from self._chunk_rabin(file_path)
|
||||
|
||||
def _chunk_fixed(self, file_path: Path, chunk_size: int) -> Iterator[bytes]:
|
||||
"""Fixed-size chunking
|
||||
|
||||
Args:
|
||||
file_path: Path to file
|
||||
chunk_size: Chunk size in bytes
|
||||
|
||||
Yields:
|
||||
Fixed-size chunks
|
||||
"""
|
||||
with open(file_path, 'rb') as f:
|
||||
while True:
|
||||
chunk = f.read(chunk_size)
|
||||
@@ -78,46 +32,22 @@ class RabinChunker:
|
||||
yield chunk
|
||||
|
||||
def _chunk_rabin(self, file_path: Path) -> Iterator[bytes]:
|
||||
"""Content-defined chunking using Rabin fingerprinting
|
||||
|
||||
Args:
|
||||
file_path: Path to file
|
||||
|
||||
Yields:
|
||||
Variable-size chunks based on content
|
||||
"""
|
||||
with open(file_path, 'rb') as f:
|
||||
chunk_data = bytearray()
|
||||
window = bytearray()
|
||||
hash_value = 0
|
||||
|
||||
while True:
|
||||
byte = f.read(1)
|
||||
if not byte:
|
||||
# End of file - yield remaining data
|
||||
if chunk_data:
|
||||
yield bytes(chunk_data)
|
||||
break
|
||||
|
||||
chunk_data.extend(byte)
|
||||
window.extend(byte)
|
||||
|
||||
# Maintain window size
|
||||
if len(window) > self.window_size:
|
||||
window.pop(0)
|
||||
|
||||
# Update rolling hash
|
||||
hash_value = self._rolling_hash(window)
|
||||
|
||||
# Check if we should create a boundary
|
||||
should_break = (
|
||||
len(chunk_data) >= self.min_chunk_size and
|
||||
(
|
||||
(hash_value & self.mask) == 0 or
|
||||
len(chunk_data) >= self.max_chunk_size
|
||||
)
|
||||
)
|
||||
|
||||
should_break = len(chunk_data) >= self.min_chunk_size and (hash_value & self.mask == 0 or len(chunk_data) >= self.max_chunk_size)
|
||||
if should_break:
|
||||
yield bytes(chunk_data)
|
||||
chunk_data = bytearray()
|
||||
@@ -125,40 +55,17 @@ class RabinChunker:
|
||||
hash_value = 0
|
||||
|
||||
def _rolling_hash(self, window: bytearray) -> int:
|
||||
"""Calculate rolling hash for window
|
||||
|
||||
Args:
|
||||
window: Byte window
|
||||
|
||||
Returns:
|
||||
Hash value
|
||||
"""
|
||||
hash_value = 0
|
||||
for byte in window:
|
||||
hash_value = ((hash_value << 1) + byte) & 0xFFFFFFFFFFFFFFFF
|
||||
hash_value = (hash_value << 1) + byte & 18446744073709551615
|
||||
return hash_value
|
||||
|
||||
|
||||
class SimpleChunker:
|
||||
"""Simple fixed-size chunker for comparison"""
|
||||
|
||||
def __init__(self, chunk_size: int = 8192):
|
||||
"""Initialize simple chunker
|
||||
|
||||
Args:
|
||||
chunk_size: Fixed chunk size in bytes
|
||||
"""
|
||||
def __init__(self, chunk_size: int=8192):
|
||||
self.chunk_size = chunk_size
|
||||
|
||||
def chunk_file(self, file_path: Path) -> Iterator[bytes]:
|
||||
"""Chunk file into fixed-size pieces
|
||||
|
||||
Args:
|
||||
file_path: Path to file
|
||||
|
||||
Yields:
|
||||
Fixed-size chunks
|
||||
"""
|
||||
with open(file_path, 'rb') as f:
|
||||
while True:
|
||||
chunk = f.read(self.chunk_size)
|
||||
@@ -166,76 +73,31 @@ class SimpleChunker:
|
||||
break
|
||||
yield chunk
|
||||
|
||||
|
||||
def hash_chunk(chunk: bytes, algorithm: str = 'sha256') -> str:
|
||||
"""Hash a chunk of data
|
||||
|
||||
Args:
|
||||
chunk: Chunk data
|
||||
algorithm: Hash algorithm (default: sha256)
|
||||
|
||||
Returns:
|
||||
Hex digest of hash
|
||||
"""
|
||||
def hash_chunk(chunk: bytes, algorithm: str='sha256') -> str:
|
||||
hasher = hashlib.new(algorithm)
|
||||
hasher.update(chunk)
|
||||
return hasher.hexdigest()
|
||||
|
||||
|
||||
def hash_file(file_path: Path, algorithm: str = 'sha256', chunk_size: int = 65536) -> str:
|
||||
"""Hash entire file
|
||||
|
||||
Args:
|
||||
file_path: Path to file
|
||||
algorithm: Hash algorithm (default: sha256)
|
||||
chunk_size: Size of chunks to read
|
||||
|
||||
Returns:
|
||||
Hex digest of file hash
|
||||
"""
|
||||
def hash_file(file_path: Path, algorithm: str='sha256', chunk_size: int=65536) -> str:
|
||||
hasher = hashlib.new(algorithm)
|
||||
|
||||
with open(file_path, 'rb') as f:
|
||||
while True:
|
||||
chunk = f.read(chunk_size)
|
||||
if not chunk:
|
||||
break
|
||||
hasher.update(chunk)
|
||||
|
||||
return hasher.hexdigest()
|
||||
|
||||
|
||||
def compute_file_signature(
|
||||
file_path: Path,
|
||||
use_rabin: bool = True,
|
||||
avg_chunk_size: int = 8192
|
||||
) -> tuple[str, list[str]]:
|
||||
"""Compute file signature with chunk hashes
|
||||
|
||||
Args:
|
||||
file_path: Path to file
|
||||
use_rabin: Whether to use Rabin chunking (vs fixed-size)
|
||||
avg_chunk_size: Average chunk size for Rabin or fixed size
|
||||
|
||||
Returns:
|
||||
Tuple of (file_hash, list of chunk hashes)
|
||||
"""
|
||||
def compute_file_signature(file_path: Path, use_rabin: bool=True, avg_chunk_size: int=8192) -> tuple[str, list[str]]:
|
||||
if use_rabin:
|
||||
chunker = RabinChunker(avg_chunk_size=avg_chunk_size)
|
||||
else:
|
||||
chunker = SimpleChunker(chunk_size=avg_chunk_size)
|
||||
|
||||
chunk_hashes = []
|
||||
file_hasher = hashlib.sha256()
|
||||
|
||||
for chunk in chunker.chunk_file(file_path):
|
||||
# Hash individual chunk
|
||||
chunk_hash = hash_chunk(chunk)
|
||||
chunk_hashes.append(chunk_hash)
|
||||
|
||||
# Update file hash
|
||||
file_hasher.update(chunk)
|
||||
|
||||
file_hash = file_hasher.hexdigest()
|
||||
|
||||
return file_hash, chunk_hashes
|
||||
return (file_hash, chunk_hashes)
|
||||
|
||||
@@ -1,32 +1,16 @@
|
||||
"""Deduplication engine"""
|
||||
from pathlib import Path
|
||||
from typing import Optional, Callable
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
import psycopg2
|
||||
|
||||
from .chunker import compute_file_signature, hash_file
|
||||
from .store import HashStore
|
||||
from ..shared.models import FileRecord, ProcessingStats
|
||||
from ..shared.config import DatabaseConfig, ProcessingConfig
|
||||
from ..shared.logger import ProgressLogger
|
||||
|
||||
|
||||
class DeduplicationEngine:
|
||||
"""Engine for deduplicating files"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
db_config: DatabaseConfig,
|
||||
processing_config: ProcessingConfig,
|
||||
logger: ProgressLogger
|
||||
):
|
||||
"""Initialize deduplication engine
|
||||
|
||||
Args:
|
||||
db_config: Database configuration
|
||||
processing_config: Processing configuration
|
||||
logger: Progress logger
|
||||
"""
|
||||
def __init__(self, db_config: DatabaseConfig, processing_config: ProcessingConfig, logger: ProgressLogger):
|
||||
self.db_config = db_config
|
||||
self.processing_config = processing_config
|
||||
self.logger = logger
|
||||
@@ -34,320 +18,130 @@ class DeduplicationEngine:
|
||||
self._connection = None
|
||||
|
||||
def _get_connection(self):
|
||||
"""Get or create database connection"""
|
||||
if self._connection is None or self._connection.closed:
|
||||
self._connection = psycopg2.connect(
|
||||
host=self.db_config.host,
|
||||
port=self.db_config.port,
|
||||
database=self.db_config.database,
|
||||
user=self.db_config.user,
|
||||
password=self.db_config.password
|
||||
)
|
||||
self._connection = psycopg2.connect(host=self.db_config.host, port=self.db_config.port, database=self.db_config.database, user=self.db_config.user, password=self.db_config.password)
|
||||
return self._connection
|
||||
|
||||
def deduplicate_all(
|
||||
self,
|
||||
disk: Optional[str] = None,
|
||||
use_chunks: bool = True,
|
||||
progress_callback: Optional[Callable[[int, int, ProcessingStats], None]] = None
|
||||
) -> ProcessingStats:
|
||||
"""Deduplicate all files in database
|
||||
|
||||
Args:
|
||||
disk: Optional disk filter
|
||||
use_chunks: Whether to use chunk-level deduplication
|
||||
progress_callback: Optional callback for progress updates
|
||||
|
||||
Returns:
|
||||
ProcessingStats with deduplication statistics
|
||||
"""
|
||||
self.logger.section("Starting Deduplication")
|
||||
|
||||
def deduplicate_all(self, disk: Optional[str]=None, use_chunks: bool=True, progress_callback: Optional[Callable[[int, int, ProcessingStats], None]]=None) -> ProcessingStats:
|
||||
self.logger.section('Starting Deduplication')
|
||||
conn = self._get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Get files without checksums
|
||||
if disk:
|
||||
cursor.execute("""
|
||||
SELECT path, size
|
||||
FROM files
|
||||
WHERE disk_label = %s AND checksum IS NULL
|
||||
ORDER BY size DESC
|
||||
""", (disk,))
|
||||
cursor.execute('\n SELECT path, size\n FROM files\n WHERE disk_label = %s AND checksum IS NULL\n ORDER BY size DESC\n ', (disk,))
|
||||
else:
|
||||
cursor.execute("""
|
||||
SELECT path, size
|
||||
FROM files
|
||||
WHERE checksum IS NULL
|
||||
ORDER BY size DESC
|
||||
""")
|
||||
|
||||
cursor.execute('\n SELECT path, size\n FROM files\n WHERE checksum IS NULL\n ORDER BY size DESC\n ')
|
||||
files_to_process = cursor.fetchall()
|
||||
total_files = len(files_to_process)
|
||||
|
||||
self.logger.info(f"Found {total_files} files to process")
|
||||
|
||||
self.logger.info(f'Found {total_files} files to process')
|
||||
stats = ProcessingStats()
|
||||
|
||||
# Process files with thread pool
|
||||
with ThreadPoolExecutor(max_workers=self.processing_config.parallel_workers) as executor:
|
||||
futures = {}
|
||||
|
||||
for path_str, size in files_to_process:
|
||||
path = Path(path_str)
|
||||
future = executor.submit(self._process_file, path, use_chunks)
|
||||
futures[future] = (path, size)
|
||||
|
||||
# Process completed futures
|
||||
for future in as_completed(futures):
|
||||
path, size = futures[future]
|
||||
|
||||
try:
|
||||
checksum, duplicate_of = future.result()
|
||||
|
||||
if checksum:
|
||||
# Update database
|
||||
cursor.execute("""
|
||||
UPDATE files
|
||||
SET checksum = %s, duplicate_of = %s
|
||||
WHERE path = %s
|
||||
""", (checksum, duplicate_of, str(path)))
|
||||
|
||||
cursor.execute('\n UPDATE files\n SET checksum = %s, duplicate_of = %s\n WHERE path = %s\n ', (checksum, duplicate_of, str(path)))
|
||||
stats.files_succeeded += 1
|
||||
stats.bytes_processed += size
|
||||
|
||||
stats.files_processed += 1
|
||||
|
||||
# Commit periodically
|
||||
if stats.files_processed % self.processing_config.commit_interval == 0:
|
||||
conn.commit()
|
||||
|
||||
# Progress callback
|
||||
if progress_callback:
|
||||
progress_callback(stats.files_processed, total_files, stats)
|
||||
|
||||
# Log progress
|
||||
self.logger.progress(
|
||||
stats.files_processed,
|
||||
total_files,
|
||||
prefix="Files processed",
|
||||
bytes_processed=stats.bytes_processed,
|
||||
elapsed_seconds=stats.elapsed_seconds
|
||||
)
|
||||
|
||||
self.logger.progress(stats.files_processed, total_files, prefix='Files processed', bytes_processed=stats.bytes_processed, elapsed_seconds=stats.elapsed_seconds)
|
||||
except Exception as e:
|
||||
self.logger.warning(f"Failed to process {path}: {e}")
|
||||
self.logger.warning(f'Failed to process {path}: {e}')
|
||||
stats.files_failed += 1
|
||||
stats.files_processed += 1
|
||||
|
||||
# Final commit
|
||||
conn.commit()
|
||||
cursor.close()
|
||||
|
||||
self.logger.info(
|
||||
f"Deduplication complete: {stats.files_succeeded}/{total_files} files, "
|
||||
f"{stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s"
|
||||
)
|
||||
|
||||
self.logger.info(f'Deduplication complete: {stats.files_succeeded}/{total_files} files, {stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s')
|
||||
return stats
|
||||
|
||||
def _process_file(
|
||||
self,
|
||||
path: Path,
|
||||
use_chunks: bool
|
||||
) -> tuple[Optional[str], Optional[str]]:
|
||||
"""Process a single file for deduplication
|
||||
|
||||
Args:
|
||||
path: Path to file
|
||||
use_chunks: Whether to use chunk-level deduplication
|
||||
|
||||
Returns:
|
||||
Tuple of (checksum, duplicate_of_path)
|
||||
"""
|
||||
def _process_file(self, path: Path, use_chunks: bool) -> tuple[Optional[str], Optional[str]]:
|
||||
if not path.exists():
|
||||
return None, None
|
||||
|
||||
return (None, None)
|
||||
try:
|
||||
if use_chunks:
|
||||
# Compute file signature with chunks
|
||||
checksum, chunk_hashes = compute_file_signature(
|
||||
path,
|
||||
use_rabin=True,
|
||||
avg_chunk_size=self.processing_config.chunk_size
|
||||
)
|
||||
checksum, chunk_hashes = compute_file_signature(path, use_rabin=True, avg_chunk_size=self.processing_config.chunk_size)
|
||||
else:
|
||||
# Just compute file hash
|
||||
checksum = hash_file(
|
||||
path,
|
||||
algorithm=self.processing_config.hash_algorithm
|
||||
)
|
||||
checksum = hash_file(path, algorithm=self.processing_config.hash_algorithm)
|
||||
chunk_hashes = None
|
||||
|
||||
# Check if hash exists
|
||||
if self.hash_store.exists(checksum):
|
||||
# Duplicate found
|
||||
canonical_path = self.hash_store.get_canonical(checksum)
|
||||
return checksum, canonical_path
|
||||
return (checksum, canonical_path)
|
||||
else:
|
||||
# New unique file
|
||||
size = path.stat().st_size
|
||||
self.hash_store.store_canonical(
|
||||
checksum,
|
||||
path,
|
||||
size,
|
||||
chunk_hashes
|
||||
)
|
||||
return checksum, None
|
||||
|
||||
self.hash_store.store_canonical(checksum, path, size, chunk_hashes)
|
||||
return (checksum, None)
|
||||
except Exception as e:
|
||||
self.logger.debug(f"Error processing {path}: {e}")
|
||||
self.logger.debug(f'Error processing {path}: {e}')
|
||||
raise
|
||||
|
||||
def find_duplicates(
|
||||
self,
|
||||
disk: Optional[str] = None
|
||||
) -> dict[str, list[str]]:
|
||||
"""Find all duplicate files
|
||||
|
||||
Args:
|
||||
disk: Optional disk filter
|
||||
|
||||
Returns:
|
||||
Dictionary mapping canonical path to list of duplicate paths
|
||||
"""
|
||||
self.logger.subsection("Finding Duplicates")
|
||||
|
||||
def find_duplicates(self, disk: Optional[str]=None) -> dict[str, list[str]]:
|
||||
self.logger.subsection('Finding Duplicates')
|
||||
conn = self._get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Query for duplicates
|
||||
if disk:
|
||||
cursor.execute("""
|
||||
SELECT checksum, array_agg(path ORDER BY path) as paths
|
||||
FROM files
|
||||
WHERE disk_label = %s AND checksum IS NOT NULL
|
||||
GROUP BY checksum
|
||||
HAVING COUNT(*) > 1
|
||||
""", (disk,))
|
||||
cursor.execute('\n SELECT checksum, array_agg(path ORDER BY path) as paths\n FROM files\n WHERE disk_label = %s AND checksum IS NOT NULL\n GROUP BY checksum\n HAVING COUNT(*) > 1\n ', (disk,))
|
||||
else:
|
||||
cursor.execute("""
|
||||
SELECT checksum, array_agg(path ORDER BY path) as paths
|
||||
FROM files
|
||||
WHERE checksum IS NOT NULL
|
||||
GROUP BY checksum
|
||||
HAVING COUNT(*) > 1
|
||||
""")
|
||||
|
||||
cursor.execute('\n SELECT checksum, array_agg(path ORDER BY path) as paths\n FROM files\n WHERE checksum IS NOT NULL\n GROUP BY checksum\n HAVING COUNT(*) > 1\n ')
|
||||
duplicates = {}
|
||||
for checksum, paths in cursor.fetchall():
|
||||
canonical = paths[0]
|
||||
duplicates[canonical] = paths[1:]
|
||||
|
||||
cursor.close()
|
||||
|
||||
self.logger.info(f"Found {len(duplicates)} sets of duplicates")
|
||||
|
||||
self.logger.info(f'Found {len(duplicates)} sets of duplicates')
|
||||
return duplicates
|
||||
|
||||
def get_deduplication_stats(self) -> dict:
|
||||
"""Get deduplication statistics
|
||||
|
||||
Returns:
|
||||
Dictionary with statistics
|
||||
"""
|
||||
conn = self._get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
stats = {}
|
||||
|
||||
# Total files
|
||||
cursor.execute("SELECT COUNT(*) FROM files WHERE checksum IS NOT NULL")
|
||||
cursor.execute('SELECT COUNT(*) FROM files WHERE checksum IS NOT NULL')
|
||||
stats['total_files'] = cursor.fetchone()[0]
|
||||
|
||||
# Unique files
|
||||
cursor.execute("SELECT COUNT(DISTINCT checksum) FROM files WHERE checksum IS NOT NULL")
|
||||
cursor.execute('SELECT COUNT(DISTINCT checksum) FROM files WHERE checksum IS NOT NULL')
|
||||
stats['unique_files'] = cursor.fetchone()[0]
|
||||
|
||||
# Duplicate files
|
||||
stats['duplicate_files'] = stats['total_files'] - stats['unique_files']
|
||||
|
||||
# Total size
|
||||
cursor.execute("SELECT COALESCE(SUM(size), 0) FROM files WHERE checksum IS NOT NULL")
|
||||
cursor.execute('SELECT COALESCE(SUM(size), 0) FROM files WHERE checksum IS NOT NULL')
|
||||
stats['total_size'] = cursor.fetchone()[0]
|
||||
|
||||
# Unique size
|
||||
cursor.execute("""
|
||||
SELECT COALESCE(SUM(size), 0)
|
||||
FROM (
|
||||
SELECT DISTINCT ON (checksum) size
|
||||
FROM files
|
||||
WHERE checksum IS NOT NULL
|
||||
) AS unique_files
|
||||
""")
|
||||
cursor.execute('\n SELECT COALESCE(SUM(size), 0)\n FROM (\n SELECT DISTINCT ON (checksum) size\n FROM files\n WHERE checksum IS NOT NULL\n ) AS unique_files\n ')
|
||||
stats['unique_size'] = cursor.fetchone()[0]
|
||||
|
||||
# Wasted space
|
||||
stats['wasted_space'] = stats['total_size'] - stats['unique_size']
|
||||
|
||||
# Deduplication ratio
|
||||
if stats['total_size'] > 0:
|
||||
stats['dedup_ratio'] = stats['unique_size'] / stats['total_size']
|
||||
else:
|
||||
stats['dedup_ratio'] = 1.0
|
||||
|
||||
# Space saved percentage
|
||||
if stats['total_size'] > 0:
|
||||
stats['space_saved_percent'] = (stats['wasted_space'] / stats['total_size']) * 100
|
||||
stats['space_saved_percent'] = stats['wasted_space'] / stats['total_size'] * 100
|
||||
else:
|
||||
stats['space_saved_percent'] = 0.0
|
||||
|
||||
cursor.close()
|
||||
|
||||
return stats
|
||||
|
||||
def mark_canonical_files(self) -> int:
|
||||
"""Mark canonical (first occurrence) files in database
|
||||
|
||||
Returns:
|
||||
Number of canonical files marked
|
||||
"""
|
||||
self.logger.subsection("Marking Canonical Files")
|
||||
|
||||
self.logger.subsection('Marking Canonical Files')
|
||||
conn = self._get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Find first occurrence of each checksum and mark as canonical
|
||||
cursor.execute("""
|
||||
WITH canonical AS (
|
||||
SELECT DISTINCT ON (checksum) path, checksum
|
||||
FROM files
|
||||
WHERE checksum IS NOT NULL
|
||||
ORDER BY checksum, path
|
||||
)
|
||||
UPDATE files
|
||||
SET duplicate_of = NULL
|
||||
WHERE path IN (SELECT path FROM canonical)
|
||||
""")
|
||||
|
||||
cursor.execute('\n WITH canonical AS (\n SELECT DISTINCT ON (checksum) path, checksum\n FROM files\n WHERE checksum IS NOT NULL\n ORDER BY checksum, path\n )\n UPDATE files\n SET duplicate_of = NULL\n WHERE path IN (SELECT path FROM canonical)\n ')
|
||||
count = cursor.rowcount
|
||||
conn.commit()
|
||||
cursor.close()
|
||||
|
||||
self.logger.info(f"Marked {count} canonical files")
|
||||
|
||||
self.logger.info(f'Marked {count} canonical files')
|
||||
return count
|
||||
|
||||
def close(self):
|
||||
"""Close connections"""
|
||||
self.hash_store.close()
|
||||
if self._connection and not self._connection.closed:
|
||||
if self._connection and (not self._connection.closed):
|
||||
self._connection.close()
|
||||
|
||||
def __enter__(self):
|
||||
"""Context manager entry"""
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
"""Context manager exit"""
|
||||
self.close()
|
||||
|
||||
@@ -1,412 +1,174 @@
|
||||
"""Hash store for deduplication with optional Redis support"""
|
||||
from typing import Optional, Dict, Set
|
||||
from pathlib import Path
|
||||
import psycopg2
|
||||
from psycopg2.extras import execute_batch
|
||||
|
||||
from ..shared.config import DatabaseConfig
|
||||
|
||||
|
||||
class HashStore:
|
||||
"""PostgreSQL-based hash store for deduplication"""
|
||||
|
||||
def __init__(self, db_config: DatabaseConfig):
|
||||
"""Initialize hash store
|
||||
|
||||
Args:
|
||||
db_config: Database configuration
|
||||
"""
|
||||
self.db_config = db_config
|
||||
self._connection = None
|
||||
|
||||
def _get_connection(self):
|
||||
"""Get or create database connection"""
|
||||
if self._connection is None or self._connection.closed:
|
||||
self._connection = psycopg2.connect(
|
||||
host=self.db_config.host,
|
||||
port=self.db_config.port,
|
||||
database=self.db_config.database,
|
||||
user=self.db_config.user,
|
||||
password=self.db_config.password
|
||||
)
|
||||
self._connection = psycopg2.connect(host=self.db_config.host, port=self.db_config.port, database=self.db_config.database, user=self.db_config.user, password=self.db_config.password)
|
||||
return self._connection
|
||||
|
||||
def _ensure_tables(self):
|
||||
"""Ensure hash store tables exist"""
|
||||
conn = self._get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Create hashes table for file-level deduplication
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS file_hashes (
|
||||
checksum TEXT PRIMARY KEY,
|
||||
canonical_path TEXT NOT NULL,
|
||||
size BIGINT NOT NULL,
|
||||
first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
ref_count INTEGER DEFAULT 1
|
||||
)
|
||||
""")
|
||||
|
||||
# Create chunk hashes table for chunk-level deduplication
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS chunk_hashes (
|
||||
chunk_hash TEXT PRIMARY KEY,
|
||||
size INTEGER NOT NULL,
|
||||
first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
ref_count INTEGER DEFAULT 1
|
||||
)
|
||||
""")
|
||||
|
||||
# Create file-chunk mapping table
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS file_chunks (
|
||||
id SERIAL PRIMARY KEY,
|
||||
file_checksum TEXT NOT NULL,
|
||||
chunk_hash TEXT NOT NULL,
|
||||
chunk_index INTEGER NOT NULL,
|
||||
FOREIGN KEY (file_checksum) REFERENCES file_hashes(checksum),
|
||||
FOREIGN KEY (chunk_hash) REFERENCES chunk_hashes(chunk_hash),
|
||||
UNIQUE (file_checksum, chunk_index)
|
||||
)
|
||||
""")
|
||||
|
||||
# Create indexes
|
||||
cursor.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_file_chunks_file
|
||||
ON file_chunks(file_checksum)
|
||||
""")
|
||||
|
||||
cursor.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_file_chunks_chunk
|
||||
ON file_chunks(chunk_hash)
|
||||
""")
|
||||
|
||||
cursor.execute('\n CREATE TABLE IF NOT EXISTS file_hashes (\n checksum TEXT PRIMARY KEY,\n canonical_path TEXT NOT NULL,\n size BIGINT NOT NULL,\n first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,\n ref_count INTEGER DEFAULT 1\n )\n ')
|
||||
cursor.execute('\n CREATE TABLE IF NOT EXISTS chunk_hashes (\n chunk_hash TEXT PRIMARY KEY,\n size INTEGER NOT NULL,\n first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,\n ref_count INTEGER DEFAULT 1\n )\n ')
|
||||
cursor.execute('\n CREATE TABLE IF NOT EXISTS file_chunks (\n id SERIAL PRIMARY KEY,\n file_checksum TEXT NOT NULL,\n chunk_hash TEXT NOT NULL,\n chunk_index INTEGER NOT NULL,\n FOREIGN KEY (file_checksum) REFERENCES file_hashes(checksum),\n FOREIGN KEY (chunk_hash) REFERENCES chunk_hashes(chunk_hash),\n UNIQUE (file_checksum, chunk_index)\n )\n ')
|
||||
cursor.execute('\n CREATE INDEX IF NOT EXISTS idx_file_chunks_file\n ON file_chunks(file_checksum)\n ')
|
||||
cursor.execute('\n CREATE INDEX IF NOT EXISTS idx_file_chunks_chunk\n ON file_chunks(chunk_hash)\n ')
|
||||
conn.commit()
|
||||
cursor.close()
|
||||
|
||||
def exists(self, checksum: str) -> bool:
|
||||
"""Check if hash exists in store
|
||||
|
||||
Args:
|
||||
checksum: File hash to check
|
||||
|
||||
Returns:
|
||||
True if hash exists
|
||||
"""
|
||||
self._ensure_tables()
|
||||
conn = self._get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute(
|
||||
"SELECT 1 FROM file_hashes WHERE checksum = %s LIMIT 1",
|
||||
(checksum,)
|
||||
)
|
||||
|
||||
cursor.execute('SELECT 1 FROM file_hashes WHERE checksum = %s LIMIT 1', (checksum,))
|
||||
exists = cursor.fetchone() is not None
|
||||
cursor.close()
|
||||
|
||||
return exists
|
||||
|
||||
def get_canonical(self, checksum: str) -> Optional[str]:
|
||||
"""Get canonical path for a hash
|
||||
|
||||
Args:
|
||||
checksum: File hash
|
||||
|
||||
Returns:
|
||||
Canonical file path or None if not found
|
||||
"""
|
||||
self._ensure_tables()
|
||||
conn = self._get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute(
|
||||
"SELECT canonical_path FROM file_hashes WHERE checksum = %s",
|
||||
(checksum,)
|
||||
)
|
||||
|
||||
cursor.execute('SELECT canonical_path FROM file_hashes WHERE checksum = %s', (checksum,))
|
||||
result = cursor.fetchone()
|
||||
cursor.close()
|
||||
|
||||
return result[0] if result else None
|
||||
|
||||
def store_canonical(
|
||||
self,
|
||||
checksum: str,
|
||||
path: Path,
|
||||
size: int,
|
||||
chunk_hashes: Optional[list[str]] = None
|
||||
) -> None:
|
||||
"""Store canonical reference for a hash
|
||||
|
||||
Args:
|
||||
checksum: File hash
|
||||
path: Canonical file path
|
||||
size: File size in bytes
|
||||
chunk_hashes: Optional list of chunk hashes
|
||||
"""
|
||||
def store_canonical(self, checksum: str, path: Path, size: int, chunk_hashes: Optional[list[str]]=None) -> None:
|
||||
self._ensure_tables()
|
||||
conn = self._get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
# Store file hash
|
||||
cursor.execute("""
|
||||
INSERT INTO file_hashes (checksum, canonical_path, size)
|
||||
VALUES (%s, %s, %s)
|
||||
ON CONFLICT (checksum) DO UPDATE SET
|
||||
ref_count = file_hashes.ref_count + 1
|
||||
""", (checksum, str(path), size))
|
||||
|
||||
# Store chunk hashes if provided
|
||||
cursor.execute('\n INSERT INTO file_hashes (checksum, canonical_path, size)\n VALUES (%s, %s, %s)\n ON CONFLICT (checksum) DO UPDATE SET\n ref_count = file_hashes.ref_count + 1\n ', (checksum, str(path), size))
|
||||
if chunk_hashes:
|
||||
# Insert chunk hashes
|
||||
chunk_data = [(chunk_hash, 0) for chunk_hash in chunk_hashes]
|
||||
execute_batch(cursor, """
|
||||
INSERT INTO chunk_hashes (chunk_hash, size)
|
||||
VALUES (%s, %s)
|
||||
ON CONFLICT (chunk_hash) DO UPDATE SET
|
||||
ref_count = chunk_hashes.ref_count + 1
|
||||
""", chunk_data, page_size=1000)
|
||||
|
||||
# Create file-chunk mappings
|
||||
mapping_data = [
|
||||
(checksum, chunk_hash, idx)
|
||||
for idx, chunk_hash in enumerate(chunk_hashes)
|
||||
]
|
||||
execute_batch(cursor, """
|
||||
INSERT INTO file_chunks (file_checksum, chunk_hash, chunk_index)
|
||||
VALUES (%s, %s, %s)
|
||||
ON CONFLICT (file_checksum, chunk_index) DO NOTHING
|
||||
""", mapping_data, page_size=1000)
|
||||
|
||||
execute_batch(cursor, '\n INSERT INTO chunk_hashes (chunk_hash, size)\n VALUES (%s, %s)\n ON CONFLICT (chunk_hash) DO UPDATE SET\n ref_count = chunk_hashes.ref_count + 1\n ', chunk_data, page_size=1000)
|
||||
mapping_data = [(checksum, chunk_hash, idx) for idx, chunk_hash in enumerate(chunk_hashes)]
|
||||
execute_batch(cursor, '\n INSERT INTO file_chunks (file_checksum, chunk_hash, chunk_index)\n VALUES (%s, %s, %s)\n ON CONFLICT (file_checksum, chunk_index) DO NOTHING\n ', mapping_data, page_size=1000)
|
||||
conn.commit()
|
||||
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
raise
|
||||
|
||||
finally:
|
||||
cursor.close()
|
||||
|
||||
def get_chunk_hashes(self, checksum: str) -> list[str]:
|
||||
"""Get chunk hashes for a file
|
||||
|
||||
Args:
|
||||
checksum: File hash
|
||||
|
||||
Returns:
|
||||
List of chunk hashes in order
|
||||
"""
|
||||
self._ensure_tables()
|
||||
conn = self._get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute("""
|
||||
SELECT chunk_hash
|
||||
FROM file_chunks
|
||||
WHERE file_checksum = %s
|
||||
ORDER BY chunk_index
|
||||
""", (checksum,))
|
||||
|
||||
cursor.execute('\n SELECT chunk_hash\n FROM file_chunks\n WHERE file_checksum = %s\n ORDER BY chunk_index\n ', (checksum,))
|
||||
chunk_hashes = [row[0] for row in cursor.fetchall()]
|
||||
cursor.close()
|
||||
|
||||
return chunk_hashes
|
||||
|
||||
def get_duplicates(self) -> Dict[str, list[str]]:
|
||||
"""Get all duplicate file groups
|
||||
|
||||
Returns:
|
||||
Dictionary mapping canonical path to list of duplicate paths
|
||||
"""
|
||||
self._ensure_tables()
|
||||
conn = self._get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Get all files with their hashes
|
||||
cursor.execute("""
|
||||
SELECT f.path, f.checksum
|
||||
FROM files f
|
||||
WHERE f.checksum IS NOT NULL
|
||||
""")
|
||||
|
||||
# Group by checksum
|
||||
cursor.execute('\n SELECT f.path, f.checksum\n FROM files f\n WHERE f.checksum IS NOT NULL\n ')
|
||||
hash_to_paths: Dict[str, list[str]] = {}
|
||||
for path, checksum in cursor.fetchall():
|
||||
if checksum not in hash_to_paths:
|
||||
hash_to_paths[checksum] = []
|
||||
hash_to_paths[checksum].append(path)
|
||||
|
||||
cursor.close()
|
||||
|
||||
# Filter to only duplicates (more than one file)
|
||||
duplicates = {
|
||||
paths[0]: paths[1:]
|
||||
for checksum, paths in hash_to_paths.items()
|
||||
if len(paths) > 1
|
||||
}
|
||||
|
||||
duplicates = {paths[0]: paths[1:] for checksum, paths in hash_to_paths.items() if len(paths) > 1}
|
||||
return duplicates
|
||||
|
||||
def get_stats(self) -> Dict[str, int]:
|
||||
"""Get hash store statistics
|
||||
|
||||
Returns:
|
||||
Dictionary with statistics
|
||||
"""
|
||||
self._ensure_tables()
|
||||
conn = self._get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
stats = {}
|
||||
|
||||
# Count unique file hashes
|
||||
cursor.execute("SELECT COUNT(*) FROM file_hashes")
|
||||
cursor.execute('SELECT COUNT(*) FROM file_hashes')
|
||||
stats['unique_files'] = cursor.fetchone()[0]
|
||||
|
||||
# Count unique chunk hashes
|
||||
cursor.execute("SELECT COUNT(*) FROM chunk_hashes")
|
||||
cursor.execute('SELECT COUNT(*) FROM chunk_hashes')
|
||||
stats['unique_chunks'] = cursor.fetchone()[0]
|
||||
|
||||
# Count total references
|
||||
cursor.execute("SELECT COALESCE(SUM(ref_count), 0) FROM file_hashes")
|
||||
cursor.execute('SELECT COALESCE(SUM(ref_count), 0) FROM file_hashes')
|
||||
stats['total_file_refs'] = cursor.fetchone()[0]
|
||||
|
||||
# Count total chunk references
|
||||
cursor.execute("SELECT COALESCE(SUM(ref_count), 0) FROM chunk_hashes")
|
||||
cursor.execute('SELECT COALESCE(SUM(ref_count), 0) FROM chunk_hashes')
|
||||
stats['total_chunk_refs'] = cursor.fetchone()[0]
|
||||
|
||||
# Calculate deduplication ratio
|
||||
if stats['total_file_refs'] > 0:
|
||||
stats['dedup_ratio'] = stats['unique_files'] / stats['total_file_refs']
|
||||
else:
|
||||
stats['dedup_ratio'] = 1.0
|
||||
|
||||
cursor.close()
|
||||
|
||||
return stats
|
||||
|
||||
def find_similar_files(self, checksum: str, threshold: float = 0.8) -> list[tuple[str, float]]:
|
||||
"""Find files similar to given hash based on chunk overlap
|
||||
|
||||
Args:
|
||||
checksum: File hash to compare
|
||||
threshold: Similarity threshold (0.0 to 1.0)
|
||||
|
||||
Returns:
|
||||
List of tuples (other_checksum, similarity_score)
|
||||
"""
|
||||
def find_similar_files(self, checksum: str, threshold: float=0.8) -> list[tuple[str, float]]:
|
||||
self._ensure_tables()
|
||||
conn = self._get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Get chunks for the target file
|
||||
target_chunks = set(self.get_chunk_hashes(checksum))
|
||||
|
||||
if not target_chunks:
|
||||
cursor.close()
|
||||
return []
|
||||
|
||||
# Find files sharing chunks
|
||||
cursor.execute("""
|
||||
SELECT DISTINCT fc.file_checksum
|
||||
FROM file_chunks fc
|
||||
WHERE fc.chunk_hash = ANY(%s)
|
||||
AND fc.file_checksum != %s
|
||||
""", (list(target_chunks), checksum))
|
||||
|
||||
cursor.execute('\n SELECT DISTINCT fc.file_checksum\n FROM file_chunks fc\n WHERE fc.chunk_hash = ANY(%s)\n AND fc.file_checksum != %s\n ', (list(target_chunks), checksum))
|
||||
similar_files = []
|
||||
|
||||
for (other_checksum,) in cursor.fetchall():
|
||||
for other_checksum, in cursor.fetchall():
|
||||
other_chunks = set(self.get_chunk_hashes(other_checksum))
|
||||
|
||||
# Calculate Jaccard similarity
|
||||
intersection = len(target_chunks & other_chunks)
|
||||
union = len(target_chunks | other_chunks)
|
||||
|
||||
if union > 0:
|
||||
similarity = intersection / union
|
||||
|
||||
if similarity >= threshold:
|
||||
similar_files.append((other_checksum, similarity))
|
||||
|
||||
cursor.close()
|
||||
|
||||
# Sort by similarity descending
|
||||
similar_files.sort(key=lambda x: x[1], reverse=True)
|
||||
|
||||
return similar_files
|
||||
|
||||
def close(self):
|
||||
"""Close database connection"""
|
||||
if self._connection and not self._connection.closed:
|
||||
if self._connection and (not self._connection.closed):
|
||||
self._connection.close()
|
||||
|
||||
def __enter__(self):
|
||||
"""Context manager entry"""
|
||||
self._ensure_tables()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
"""Context manager exit"""
|
||||
self.close()
|
||||
|
||||
|
||||
class MemoryHashStore:
|
||||
"""In-memory hash store for testing and small datasets"""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize in-memory hash store"""
|
||||
self.hashes: Dict[str, tuple[str, int]] = {}
|
||||
self.chunks: Dict[str, int] = {}
|
||||
self.file_chunks: Dict[str, list[str]] = {}
|
||||
|
||||
def exists(self, checksum: str) -> bool:
|
||||
"""Check if hash exists"""
|
||||
return checksum in self.hashes
|
||||
|
||||
def get_canonical(self, checksum: str) -> Optional[str]:
|
||||
"""Get canonical path"""
|
||||
return self.hashes.get(checksum, (None, 0))[0]
|
||||
|
||||
def store_canonical(
|
||||
self,
|
||||
checksum: str,
|
||||
path: Path,
|
||||
size: int,
|
||||
chunk_hashes: Optional[list[str]] = None
|
||||
) -> None:
|
||||
"""Store canonical reference"""
|
||||
def store_canonical(self, checksum: str, path: Path, size: int, chunk_hashes: Optional[list[str]]=None) -> None:
|
||||
self.hashes[checksum] = (str(path), size)
|
||||
|
||||
if chunk_hashes:
|
||||
self.file_chunks[checksum] = chunk_hashes
|
||||
for chunk_hash in chunk_hashes:
|
||||
self.chunks[chunk_hash] = self.chunks.get(chunk_hash, 0) + 1
|
||||
|
||||
def get_chunk_hashes(self, checksum: str) -> list[str]:
|
||||
"""Get chunk hashes"""
|
||||
return self.file_chunks.get(checksum, [])
|
||||
|
||||
def get_stats(self) -> Dict[str, int]:
|
||||
"""Get statistics"""
|
||||
return {
|
||||
'unique_files': len(self.hashes),
|
||||
'unique_chunks': len(self.chunks),
|
||||
'total_file_refs': len(self.hashes),
|
||||
'total_chunk_refs': sum(self.chunks.values()),
|
||||
'dedup_ratio': 1.0
|
||||
}
|
||||
return {'unique_files': len(self.hashes), 'unique_chunks': len(self.chunks), 'total_file_refs': len(self.hashes), 'total_chunk_refs': sum(self.chunks.values()), 'dedup_ratio': 1.0}
|
||||
|
||||
def close(self):
|
||||
"""No-op for compatibility"""
|
||||
pass
|
||||
|
||||
def __enter__(self):
|
||||
"""Context manager entry"""
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
"""Context manager exit"""
|
||||
pass
|
||||
|
||||
@@ -1,59 +1,119 @@
|
||||
from typing import Dict
|
||||
from typing import Dict, List
|
||||
import re
|
||||
|
||||
class ContentEnricher:
|
||||
tech_keywords = {'transcribe', 'transcription', 'whisper', 'speech-to-text', 'audio', 'video', 'subtitle', 'caption', 'srt', 'vtt', 'ffmpeg', 'opencv', 'pytorch', 'tensorflow', 'cuda', 'gpu', 'ml', 'nlp', 'llm', 'ollama', 'docker', 'kubernetes', 'postgres', 'database', 'api', 'rest', 'graphql', 'python', 'javascript', 'java', 'rust', 'golang'}
|
||||
|
||||
def __init__(self, llm_client=None):
|
||||
self.llm_client = llm_client
|
||||
self.pii_patterns = {
|
||||
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
|
||||
'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
|
||||
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
|
||||
'credit_card': r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b'
|
||||
'credit_card': r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
|
||||
'api_key': r'(?i)(api[_-]?key|token|secret)["\']?\s*[:=]\s*["\']?([a-zA-Z0-9_\-]{20,})',
|
||||
'password': r'(?i)(password|passwd|pwd)["\']?\s*[:=]\s*["\']([^"\']{8,})'
|
||||
}
|
||||
|
||||
def enrich(self, text: str, use_llm: bool = False) -> Dict:
|
||||
enrichment = {
|
||||
'summary': self._basic_summary(text),
|
||||
'word_count': len(text.split()),
|
||||
'has_pii': self._detect_pii(text),
|
||||
'quality': self._assess_quality(text),
|
||||
'topics': self._extract_basic_topics(text)
|
||||
'topics': self._extract_topics(text),
|
||||
'entities': self._extract_entities(text),
|
||||
'tech_stack': self._detect_tech(text),
|
||||
'security': {
|
||||
'has_pii': bool(self._detect_pii(text)),
|
||||
'has_credentials': self._detect_credentials(text),
|
||||
'pii_details': self._detect_pii(text)
|
||||
},
|
||||
'quality': self._assess_quality(text)
|
||||
}
|
||||
|
||||
if use_llm and self.llm_client:
|
||||
llm_result = self.llm_client.classify_content(text)
|
||||
if llm_result.get('success'):
|
||||
enrichment['llm_classification'] = llm_result['text']
|
||||
summary_result = self.llm_client.summarize(text[:3000], max_length=200)
|
||||
if summary_result.get('success'):
|
||||
enrichment['llm_summary'] = summary_result['text']
|
||||
|
||||
intent_result = self.llm_client.extract_intent(text[:3000])
|
||||
if intent_result.get('success'):
|
||||
enrichment['llm_intent'] = intent_result['text']
|
||||
|
||||
topics_result = self.llm_client.extract_topics(text[:3000])
|
||||
if topics_result.get('success') and topics_result.get('topics'):
|
||||
enrichment['llm_topics'] = topics_result['topics']
|
||||
|
||||
return enrichment
|
||||
|
||||
def _basic_summary(self, text: str) -> str:
|
||||
sentences = re.split(r'[.!?]+', text)
|
||||
return ' '.join(sentences[:3])[:200]
|
||||
if not text:
|
||||
return ''
|
||||
sentences = re.split(r'[.!?\n]+', text)
|
||||
summary = []
|
||||
length = 0
|
||||
for sent in sentences:
|
||||
sent = sent.strip()
|
||||
if not sent:
|
||||
continue
|
||||
if length + len(sent) > 200:
|
||||
break
|
||||
summary.append(sent)
|
||||
length += len(sent)
|
||||
return '. '.join(summary) if summary else text[:200]
|
||||
|
||||
def _extract_topics(self, text: str) -> List[str]:
|
||||
text_lower = text.lower()
|
||||
topics = []
|
||||
for tech in self.tech_keywords:
|
||||
if tech in text_lower:
|
||||
topics.append(tech)
|
||||
words = re.findall(r'\b[A-Z][a-z]+\b', text)
|
||||
word_freq = {}
|
||||
for word in words:
|
||||
if len(word) > 3 and word.lower() not in {'this', 'that', 'with', 'from', 'have'}:
|
||||
word_freq[word] = word_freq.get(word, 0) + 1
|
||||
sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
|
||||
topics.extend([w for (w, _) in sorted_words[:5]])
|
||||
return list(set(topics))[:15]
|
||||
|
||||
def _extract_entities(self, text: str) -> Dict[str, List[str]]:
|
||||
entities = {'files': [], 'urls': [], 'paths': []}
|
||||
file_pattern = re.compile(r'\b\w+\.(py|js|java|go|rs|cpp|h|md|txt|json|yaml|yml|xml|sql|sh|bat|ts|tsx|jsx)\b')
|
||||
entities['files'] = list(set(file_pattern.findall(text)))[:10]
|
||||
url_pattern = re.compile(r'https?://[^\s<>"{}|\\^`\[\]]+')
|
||||
entities['urls'] = list(set(url_pattern.findall(text)))[:5]
|
||||
path_pattern = re.compile(r'(?:/[a-zA-Z0-9_.-]+)+/?')
|
||||
entities['paths'] = list(set(path_pattern.findall(text)))[:10]
|
||||
return entities
|
||||
|
||||
def _detect_tech(self, text: str) -> List[str]:
|
||||
text_lower = text.lower()
|
||||
return [tech for tech in self.tech_keywords if tech in text_lower]
|
||||
|
||||
def _detect_pii(self, text: str) -> Dict:
|
||||
detected = {}
|
||||
for pii_type, pattern in self.pii_patterns.items():
|
||||
matches = re.findall(pattern, text)
|
||||
for pii_type in ['email', 'phone', 'ssn', 'credit_card']:
|
||||
matches = re.findall(self.pii_patterns[pii_type], text)
|
||||
if matches:
|
||||
detected[pii_type] = len(matches)
|
||||
return detected
|
||||
|
||||
def _detect_credentials(self, text: str) -> bool:
|
||||
for name in ['api_key', 'password']:
|
||||
if re.search(self.pii_patterns[name], text):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _assess_quality(self, text: str) -> str:
|
||||
if len(text.strip()) < 10:
|
||||
return 'low'
|
||||
|
||||
if not text or len(text.strip()) < 10:
|
||||
return 'empty'
|
||||
words = text.split()
|
||||
if not words:
|
||||
return 'empty'
|
||||
avg_word_len = sum(len(w) for w in words) / len(words)
|
||||
if avg_word_len < 2 or avg_word_len > 20:
|
||||
return 'garbled'
|
||||
special_char_ratio = sum(1 for c in text if not c.isalnum() and not c.isspace()) / len(text)
|
||||
if special_char_ratio > 0.3:
|
||||
return 'low'
|
||||
|
||||
return 'high' if len(text.split()) > 50 else 'medium'
|
||||
|
||||
def _extract_basic_topics(self, text: str) -> list:
|
||||
words = re.findall(r'\b[A-Z][a-z]+\b', text)
|
||||
word_freq = {}
|
||||
for word in words:
|
||||
if len(word) > 3:
|
||||
word_freq[word] = word_freq.get(word, 0) + 1
|
||||
|
||||
return sorted(word_freq, key=word_freq.get, reverse=True)[:10]
|
||||
if special_char_ratio > 0.4:
|
||||
return 'low_confidence'
|
||||
return 'good'
|
||||
|
||||
@@ -1,54 +1,79 @@
|
||||
import requests
|
||||
import json
|
||||
from typing import Dict, Optional
|
||||
import logging
|
||||
from typing import Dict, Optional, List
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class LLMClient:
|
||||
def __init__(self, endpoint: str = 'http://192.168.1.74:1234', model: str = 'local'):
|
||||
def __init__(self, endpoint: str = 'http://localhost:11434', model: str = 'llama3', use_local: bool = True, lm_studio_host: str = None):
|
||||
self.endpoint = endpoint
|
||||
self.model = model
|
||||
self.local_ollama = 'http://localhost:11434'
|
||||
self.use_local = use_local
|
||||
|
||||
self.lm_studio_endpoints = {
|
||||
'plato': {'url': 'http://192.168.1.74:1234', 'model': 'openai/gpt-oss-20b'},
|
||||
'postgres': {'url': 'http://192.168.1.159:1234', 'model': 'mistralai/devstral-small-2507'},
|
||||
'local': {'url': 'http://localhost:11434', 'model': 'llama3'}
|
||||
}
|
||||
|
||||
self.lm_studio_host = lm_studio_host or 'postgres'
|
||||
studio_config = self.lm_studio_endpoints.get(self.lm_studio_host, self.lm_studio_endpoints['postgres'])
|
||||
self.lm_studio_endpoint = studio_config['url']
|
||||
self.lm_studio_model = studio_config['model']
|
||||
|
||||
def summarize(self, text: str, max_length: int = 200) -> Dict:
|
||||
prompt = f"Summarize the following in {max_length} chars or less:\n\n{text[:2000]}"
|
||||
prompt = f"Summarize this concisely in under {max_length} characters:\n\n{text[:3000]}"
|
||||
return self._query(prompt)
|
||||
|
||||
def extract_topics(self, text: str) -> Dict:
|
||||
prompt = f"Extract 5-10 key topics/tags from this text. Return as comma-separated list:\n\n{text[:2000]}"
|
||||
prompt = f"Extract 5-10 key topics/tags. Return ONLY comma-separated words:\n\n{text[:3000]}"
|
||||
result = self._query(prompt)
|
||||
if result.get('success'):
|
||||
topics = [t.strip() for t in result['text'].split(',')]
|
||||
result['topics'] = topics[:10]
|
||||
return result
|
||||
|
||||
def extract_intent(self, text: str) -> Dict:
|
||||
prompt = f"What is the main purpose/intent of this code/document? Answer in 1-2 sentences:\n\n{text[:3000]}"
|
||||
return self._query(prompt)
|
||||
|
||||
def classify_content(self, text: str) -> Dict:
|
||||
prompt = f"Classify this content. Return: category, topics, has_pii (yes/no), quality (high/medium/low):\n\n{text[:1000]}"
|
||||
def detect_project_type(self, text: str, file_list: List[str]) -> Dict:
|
||||
files_str = ', '.join(file_list[:20])
|
||||
prompt = f"Based on these files: {files_str}\nAnd this content:\n{text[:2000]}\n\nWhat type of project is this? (e.g. web app, ml/ai, transcription, data processing, etc.)"
|
||||
return self._query(prompt)
|
||||
|
||||
def _query(self, prompt: str, use_local: bool = False) -> Dict:
|
||||
def _query(self, prompt: str, timeout: int = 30) -> Dict:
|
||||
try:
|
||||
endpoint = self.local_ollama if use_local else self.endpoint
|
||||
|
||||
if use_local:
|
||||
if self.use_local:
|
||||
response = requests.post(
|
||||
f'{endpoint}/api/generate',
|
||||
json={'model': 'llama3.2', 'prompt': prompt, 'stream': False},
|
||||
timeout=30
|
||||
f'{self.endpoint}/api/generate',
|
||||
json={'model': self.model, 'prompt': prompt, 'stream': False},
|
||||
timeout=timeout
|
||||
)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
return {'success': True, 'text': data.get('response', '').strip()}
|
||||
else:
|
||||
response = requests.post(
|
||||
f'{endpoint}/v1/chat/completions',
|
||||
f'{self.lm_studio_endpoint}/v1/chat/completions',
|
||||
json={
|
||||
'model': self.model,
|
||||
'model': self.lm_studio_model,
|
||||
'messages': [{'role': 'user', 'content': prompt}],
|
||||
'max_tokens': 500
|
||||
'max_tokens': 500,
|
||||
'temperature': 0.7
|
||||
},
|
||||
timeout=30
|
||||
timeout=timeout
|
||||
)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
return {'success': True, 'text': data['choices'][0]['message']['content'].strip()}
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
if use_local:
|
||||
return {'success': True, 'text': data.get('response', '')}
|
||||
else:
|
||||
return {'success': True, 'text': data['choices'][0]['message']['content']}
|
||||
else:
|
||||
return {'success': False, 'error': f'HTTP {response.status_code}'}
|
||||
return {'success': False, 'error': f'HTTP {response.status_code}'}
|
||||
|
||||
except requests.Timeout:
|
||||
logger.warning(f'LLM request timeout after {timeout}s')
|
||||
return {'success': False, 'error': 'timeout'}
|
||||
except Exception as e:
|
||||
logger.error(f'LLM query failed: {e}')
|
||||
return {'success': False, 'error': str(e)}
|
||||
|
||||
196
app/extraction/incremental.py
Normal file
196
app/extraction/incremental.py
Normal file
@@ -0,0 +1,196 @@
|
||||
import hashlib
|
||||
import psycopg2
|
||||
from pathlib import Path
|
||||
from typing import Dict, Optional, List
|
||||
import logging
|
||||
import time
|
||||
import json
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class IncrementalExtractor:
|
||||
def __init__(self, db_config: Dict):
|
||||
self.db_config = db_config
|
||||
|
||||
def get_connection(self):
|
||||
return psycopg2.connect(**self.db_config)
|
||||
|
||||
def should_extract(self, file_path: str, file_checksum: str) -> bool:
|
||||
conn = self.get_connection()
|
||||
cursor = conn.cursor()
|
||||
try:
|
||||
cursor.execute('''
|
||||
SELECT file_checksum, status
|
||||
FROM extraction_log
|
||||
WHERE file_path = %s
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1
|
||||
''', (file_path,))
|
||||
|
||||
result = cursor.fetchone()
|
||||
if not result:
|
||||
return True
|
||||
|
||||
last_checksum, status = result
|
||||
if last_checksum != file_checksum:
|
||||
logger.info(f'File changed: {file_path}')
|
||||
return True
|
||||
|
||||
if status == 'success':
|
||||
return False
|
||||
|
||||
if status == 'error':
|
||||
return True
|
||||
|
||||
return True
|
||||
|
||||
finally:
|
||||
cursor.close()
|
||||
conn.close()
|
||||
|
||||
def log_extraction(self, node_id: Optional[str], file_path: str, file_checksum: str,
|
||||
method: str, status: str, error_msg: Optional[str] = None,
|
||||
extracted_size: Optional[int] = None, processing_time_ms: Optional[int] = None):
|
||||
conn = self.get_connection()
|
||||
cursor = conn.cursor()
|
||||
try:
|
||||
cursor.execute('''
|
||||
INSERT INTO extraction_log (node_id, file_path, file_checksum, extraction_method,
|
||||
status, error_message, extracted_size, processing_time_ms)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
||||
''', (node_id, file_path, file_checksum, method, status, error_msg, extracted_size, processing_time_ms))
|
||||
conn.commit()
|
||||
finally:
|
||||
cursor.close()
|
||||
conn.close()
|
||||
|
||||
def create_or_update_node(self, node_type: str, path: str, disk_label: str,
|
||||
checksum: Optional[str], size: Optional[int],
|
||||
content_hash: Optional[str], metadata: Optional[Dict]) -> str:
|
||||
conn = self.get_connection()
|
||||
cursor = conn.cursor()
|
||||
try:
|
||||
cursor.execute('''
|
||||
INSERT INTO content_nodes (node_type, path, disk_label, checksum, size,
|
||||
content_hash, extracted_at, metadata)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP, %s)
|
||||
ON CONFLICT (node_type, path, disk_label) DO UPDATE SET
|
||||
checksum = EXCLUDED.checksum,
|
||||
size = EXCLUDED.size,
|
||||
content_hash = EXCLUDED.content_hash,
|
||||
extracted_at = CURRENT_TIMESTAMP,
|
||||
metadata = EXCLUDED.metadata,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
RETURNING id
|
||||
''', (node_type, path, disk_label, checksum, size, content_hash, json.dumps(metadata) if metadata else None))
|
||||
|
||||
node_id = cursor.fetchone()[0]
|
||||
conn.commit()
|
||||
return str(node_id)
|
||||
|
||||
finally:
|
||||
cursor.close()
|
||||
conn.close()
|
||||
|
||||
def batch_extract(self, file_list: List[Dict], parser_func, parser_name: str,
|
||||
batch_size: int = 100, skip_existing: bool = True) -> Dict:
|
||||
stats = {
|
||||
'processed': 0,
|
||||
'extracted': 0,
|
||||
'skipped': 0,
|
||||
'errors': 0,
|
||||
'total_time_ms': 0
|
||||
}
|
||||
|
||||
conn = self.get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
for idx, file_info in enumerate(file_list, 1):
|
||||
path = file_info['path']
|
||||
checksum = file_info.get('checksum')
|
||||
disk_label = file_info.get('disk_label')
|
||||
|
||||
if skip_existing and not self.should_extract(path, checksum):
|
||||
stats['skipped'] += 1
|
||||
continue
|
||||
|
||||
start_time = time.time()
|
||||
try:
|
||||
result = parser_func(Path(path))
|
||||
processing_time_ms = int((time.time() - start_time) * 1000)
|
||||
|
||||
if 'error' not in result and result.get('text'):
|
||||
text = result['text']
|
||||
content_hash = hashlib.sha256(text.encode()).hexdigest()
|
||||
|
||||
node_id = self.create_or_update_node(
|
||||
node_type='file',
|
||||
path=path,
|
||||
disk_label=disk_label,
|
||||
checksum=checksum,
|
||||
size=file_info.get('size'),
|
||||
content_hash=content_hash,
|
||||
metadata={
|
||||
'extraction': result.get('method', parser_name),
|
||||
'quality': result.get('quality', 'unknown')
|
||||
}
|
||||
)
|
||||
|
||||
cursor.execute('''
|
||||
UPDATE files
|
||||
SET extracted_text = %s,
|
||||
text_quality = %s
|
||||
WHERE path = %s
|
||||
''', (text[:50000], result.get('quality'), path))
|
||||
|
||||
self.log_extraction(
|
||||
node_id=node_id,
|
||||
file_path=path,
|
||||
file_checksum=checksum,
|
||||
method=parser_name,
|
||||
status='success',
|
||||
extracted_size=len(text),
|
||||
processing_time_ms=processing_time_ms
|
||||
)
|
||||
|
||||
stats['extracted'] += 1
|
||||
stats['total_time_ms'] += processing_time_ms
|
||||
else:
|
||||
error_msg = result.get('error', 'No text extracted')
|
||||
self.log_extraction(
|
||||
node_id=None,
|
||||
file_path=path,
|
||||
file_checksum=checksum,
|
||||
method=parser_name,
|
||||
status='error',
|
||||
error_msg=error_msg
|
||||
)
|
||||
stats['errors'] += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f'Extract failed for {path}: {e}')
|
||||
self.log_extraction(
|
||||
node_id=None,
|
||||
file_path=path,
|
||||
file_checksum=checksum,
|
||||
method=parser_name,
|
||||
status='error',
|
||||
error_msg=str(e)
|
||||
)
|
||||
stats['errors'] += 1
|
||||
|
||||
stats['processed'] += 1
|
||||
|
||||
if stats['processed'] % batch_size == 0:
|
||||
conn.commit()
|
||||
logger.info(f'Batch progress: {stats["processed"]}/{len(file_list)} '
|
||||
f'({stats["extracted"]} extracted, {stats["skipped"]} skipped, {stats["errors"]} errors)')
|
||||
|
||||
conn.commit()
|
||||
|
||||
finally:
|
||||
cursor.close()
|
||||
conn.close()
|
||||
|
||||
return stats
|
||||
266
app/main.py
266
app/main.py
@@ -535,14 +535,18 @@ class DiskReorganizer:
|
||||
|
||||
try:
|
||||
query = "SELECT path, size, disk_label FROM files WHERE 1=1"
|
||||
params = []
|
||||
if kind:
|
||||
suffix_map = {'text': "('.txt', '.md', '.log', '.json')", 'code': "('.py', '.js', '.java', '.go')", 'pdf': "('.pdf',)"}
|
||||
suffix_map = {
|
||||
'text': ['.txt', '.md', '.log', '.json', '.yaml', '.yml'],
|
||||
'code': ['.py', '.js', '.java', '.go', '.rs', '.ts', '.cpp', '.h'],
|
||||
'pdf': ['.pdf']
|
||||
}
|
||||
if kind in suffix_map:
|
||||
query += f" AND RIGHT(path, 4) IN {suffix_map[kind]} OR RIGHT(path, 3) IN {suffix_map[kind]}"
|
||||
conditions = ' OR '.join([f"path LIKE '%{ext}'" for ext in suffix_map[kind]])
|
||||
query += f" AND ({conditions})"
|
||||
query += f" LIMIT {limit}"
|
||||
|
||||
cursor.execute(query, params)
|
||||
cursor.execute(query)
|
||||
files = cursor.fetchall()
|
||||
|
||||
print(f"\n=== PARSING FILES ===\nProcessing {len(files)} files\n")
|
||||
@@ -580,30 +584,63 @@ class DiskReorganizer:
|
||||
cursor.close()
|
||||
conn.close()
|
||||
|
||||
def enrich_files(self, limit: int = 10, llm_endpoint: str = None, use_local: bool = False):
|
||||
def enrich_files(self, limit: int = 10, use_llm: bool = False, use_local: bool = True, batch_size: int = 100):
|
||||
from enrichment.enricher import ContentEnricher
|
||||
from enrichment.llm_client import LLMClient
|
||||
|
||||
llm_client = LLMClient(use_local=use_local) if use_llm else None
|
||||
enricher = ContentEnricher(llm_client=llm_client)
|
||||
|
||||
enricher = ContentEnricher()
|
||||
conn = self.get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
cursor.execute(f"SELECT path, extracted_text FROM files WHERE extracted_text IS NOT NULL LIMIT {limit}")
|
||||
cursor.execute(f"SELECT path, extracted_text FROM files WHERE extracted_text IS NOT NULL AND (enrichment IS NULL OR enrichment = '{{}}'::jsonb) LIMIT {limit}")
|
||||
files = cursor.fetchall()
|
||||
|
||||
print(f"\n=== ENRICHING CONTENT ===\nProcessing {len(files)} files\n")
|
||||
print(f"\n=== ENRICHING CONTENT ===")
|
||||
print(f"Processing {len(files)} files")
|
||||
if use_llm:
|
||||
print(f"Using LLM: {'Local OLLAMA' if use_local else 'Network LM_STUDIO'}\n")
|
||||
else:
|
||||
print("Using rule-based enrichment only\n")
|
||||
|
||||
for path, text in files:
|
||||
enrichment = enricher.enrich(text[:5000], use_llm=False)
|
||||
print(f"{path[:60]}")
|
||||
enriched_count = 0
|
||||
batch = []
|
||||
for idx, (path, text) in enumerate(files, 1):
|
||||
if not text:
|
||||
continue
|
||||
|
||||
enrichment = enricher.enrich(text[:5000], use_llm=use_llm)
|
||||
|
||||
print(f"{idx}/{len(files)} {path[:60]}")
|
||||
print(f" Quality: {enrichment.get('quality')} | Words: {enrichment.get('word_count'):,}")
|
||||
print(f" PII: {list(enrichment.get('has_pii', {}).keys())}")
|
||||
print(f" Topics: {', '.join(enrichment.get('topics', [])[:5])}\n")
|
||||
if enrichment.get('security', {}).get('has_pii'):
|
||||
print(f" PII: {list(enrichment.get('security', {}).get('pii_details', {}).keys())}")
|
||||
if enrichment.get('tech_stack'):
|
||||
print(f" Tech: {', '.join(enrichment['tech_stack'][:5])}")
|
||||
if enrichment.get('topics'):
|
||||
print(f" Topics: {', '.join(enrichment['topics'][:5])}")
|
||||
if use_llm and enrichment.get('llm_summary'):
|
||||
print(f" LLM Summary: {enrichment['llm_summary'][:100]}...")
|
||||
if use_llm and enrichment.get('llm_intent'):
|
||||
print(f" Intent: {enrichment['llm_intent'][:100]}...")
|
||||
print()
|
||||
|
||||
cursor.execute("UPDATE files SET enrichment = %s::jsonb WHERE path = %s", (json.dumps(enrichment), path))
|
||||
batch.append((json.dumps(enrichment), path))
|
||||
enriched_count += 1
|
||||
|
||||
conn.commit()
|
||||
print(f"Enriched {len(files)} files")
|
||||
if len(batch) >= batch_size:
|
||||
cursor.executemany("UPDATE files SET enrichment = %s::jsonb WHERE path = %s", batch)
|
||||
conn.commit()
|
||||
batch.clear()
|
||||
print(f" Committed batch ({enriched_count} files so far)")
|
||||
|
||||
if batch:
|
||||
cursor.executemany("UPDATE files SET enrichment = %s::jsonb WHERE path = %s", batch)
|
||||
conn.commit()
|
||||
|
||||
print(f"\nEnriched {enriched_count} files")
|
||||
|
||||
finally:
|
||||
cursor.close()
|
||||
@@ -695,6 +732,75 @@ class DiskReorganizer:
|
||||
cursor.close()
|
||||
conn.close()
|
||||
|
||||
def search_content(self, query: str, limit: int=20, search_type: str='text'):
|
||||
conn = self.get_connection()
|
||||
cursor = conn.cursor()
|
||||
try:
|
||||
if search_type == 'text':
|
||||
cursor.execute('''
|
||||
SELECT path, disk_label, size, category,
|
||||
ts_rank(to_tsvector('english', COALESCE(extracted_text, '')), plainto_tsquery('english', %s)) as rank,
|
||||
LEFT(extracted_text, 200) as snippet
|
||||
FROM files
|
||||
WHERE extracted_text IS NOT NULL
|
||||
AND to_tsvector('english', extracted_text) @@ plainto_tsquery('english', %s)
|
||||
ORDER BY rank DESC
|
||||
LIMIT %s
|
||||
''', (query, query, limit))
|
||||
elif search_type == 'enrichment':
|
||||
cursor.execute('''
|
||||
SELECT path, disk_label, size, category, enrichment
|
||||
FROM files
|
||||
WHERE enrichment IS NOT NULL
|
||||
AND enrichment::text ILIKE %s
|
||||
LIMIT %s
|
||||
''', (f'%{query}%', limit))
|
||||
elif search_type == 'path':
|
||||
cursor.execute('''
|
||||
SELECT path, disk_label, size, category
|
||||
FROM files
|
||||
WHERE path ILIKE %s
|
||||
LIMIT %s
|
||||
''', (f'%{query}%', limit))
|
||||
else:
|
||||
logger.error(f'Unknown search type: {search_type}')
|
||||
return
|
||||
|
||||
results = cursor.fetchall()
|
||||
if not results:
|
||||
print(f'No results found for: {query}')
|
||||
return
|
||||
|
||||
print(f'\n=== SEARCH RESULTS: {len(results)} matches for "{query}" ===\n')
|
||||
for idx, row in enumerate(results, 1):
|
||||
if search_type == 'text':
|
||||
path, disk, size, category, rank, snippet = row
|
||||
print(f'{idx}. {path}')
|
||||
print(f' Disk: {disk}, Size: {self.format_size(int(size))}, Category: {category}')
|
||||
print(f' Rank: {rank:.4f}')
|
||||
if snippet:
|
||||
print(f' Snippet: {snippet[:150]}...')
|
||||
elif search_type == 'enrichment':
|
||||
path, disk, size, category, enrichment = row
|
||||
print(f'{idx}. {path}')
|
||||
print(f' Disk: {disk}, Size: {self.format_size(int(size))}, Category: {category}')
|
||||
if enrichment:
|
||||
import json
|
||||
enrich_data = json.loads(enrichment) if isinstance(enrichment, str) else enrichment
|
||||
if 'topics' in enrich_data:
|
||||
print(f' Topics: {", ".join(enrich_data["topics"][:5])}')
|
||||
if 'tech_stack' in enrich_data:
|
||||
print(f' Tech: {", ".join(enrich_data["tech_stack"][:5])}')
|
||||
else:
|
||||
path, disk, size, category = row
|
||||
print(f'{idx}. {path}')
|
||||
print(f' Disk: {disk}, Size: {self.format_size(int(size))}, Category: {category}')
|
||||
print()
|
||||
|
||||
finally:
|
||||
cursor.close()
|
||||
conn.close()
|
||||
|
||||
def analyze_folders(self, disk: Optional[str]=None, min_files: int=3):
|
||||
from analysis.folder_analyzer import FolderAnalyzer
|
||||
analyzer = FolderAnalyzer()
|
||||
@@ -788,131 +894,3 @@ class DiskReorganizer:
|
||||
cursor.close()
|
||||
conn.close()
|
||||
|
||||
def review_migration(self, category: Optional[str]=None, show_build: bool=False):
|
||||
from classification.classifier import FileClassifier
|
||||
classifier = FileClassifier()
|
||||
conn = self.get_connection()
|
||||
cursor = conn.cursor()
|
||||
try:
|
||||
query = 'SELECT path, size, category FROM files WHERE 1=1'
|
||||
params = []
|
||||
if category:
|
||||
query += ' AND category = %s'
|
||||
params.append(category)
|
||||
if not show_build:
|
||||
query += " AND (metadata->>'labels' IS NULL OR metadata->>'labels' NOT LIKE '%build-artifact%')"
|
||||
query += ' ORDER BY category, size DESC LIMIT 100'
|
||||
cursor.execute(query, params)
|
||||
files = cursor.fetchall()
|
||||
if not files:
|
||||
print('No files found matching criteria')
|
||||
return
|
||||
print(f'\n=== MIGRATION PREVIEW ===')
|
||||
print(f'Showing {len(files)} files\n')
|
||||
current_category = None
|
||||
for path, size, cat in files:
|
||||
if cat != current_category:
|
||||
current_category = cat
|
||||
print(f'\n{cat}:')
|
||||
labels, suggested_cat, is_build = classifier.classify_path(path, int(size))
|
||||
target = classifier.suggest_target_path(path, suggested_cat, labels)
|
||||
print(f' {path}')
|
||||
print(f' → {target} ({self.format_size(int(size))})')
|
||||
finally:
|
||||
cursor.close()
|
||||
conn.close()
|
||||
|
||||
@staticmethod
|
||||
def format_size(size: int) -> str:
|
||||
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
|
||||
if size < 1024:
|
||||
return f'{size:.1f}{unit}'
|
||||
size /= 1024
|
||||
return f'{size:.1f}PB'
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Disk Reorganizer - Free up a disk for Linux dual-boot')
|
||||
subparsers = parser.add_subparsers(dest='command', required=True)
|
||||
index_parser = subparsers.add_parser('index', help='Index files on a disk')
|
||||
index_parser.add_argument('disk_root', help='Root path of disk (e.g., D:\\\\)')
|
||||
index_parser.add_argument('disk_name', help='Logical name for the disk')
|
||||
plan_parser = subparsers.add_parser('plan', help='Create migration plan')
|
||||
plan_parser.add_argument('target_disk', help='Disk to free up')
|
||||
plan_parser.add_argument('dest_disks', nargs='+', help='Destination disks')
|
||||
exec_parser = subparsers.add_parser('execute', help='Execute migration plan')
|
||||
exec_parser.add_argument('plan_file', help='Path to plan JSON file')
|
||||
exec_parser.add_argument('--dry-run', action='store_true', help='Simulate without actual file operations')
|
||||
dedupe_parser = subparsers.add_parser('dedupe', help='Deduplicate files and compute checksums')
|
||||
dedupe_parser.add_argument('--disk', help='Optional: Only dedupe specific disk')
|
||||
dedupe_parser.add_argument('--no-chunks', action='store_true', help='Disable chunk-level deduplication')
|
||||
merge_parser = subparsers.add_parser('merge', help='Plan multi-disk merge with deduplication')
|
||||
merge_parser.add_argument('--sources', nargs='+', required=True, help='Source disks to merge')
|
||||
merge_parser.add_argument('--target', required=True, help='Target disk')
|
||||
merge_parser.add_argument('--output', default='merge_plan.json', help='Output plan file')
|
||||
merge_parser.add_argument('--filter-system', action='store_true', help='Filter system/build files')
|
||||
merge_parser.add_argument('--network', help='Network target (e.g., user@host:/path)')
|
||||
profile_parser = subparsers.add_parser('profile', help='Create content profiles (inventory + triage)')
|
||||
profile_parser.add_argument('--disk', help='Profile specific disk')
|
||||
profile_parser.add_argument('--update', action='store_true', help='Update database with profiles')
|
||||
profile_parser.add_argument('--limit', type=int, help='Limit number of files')
|
||||
extract_parser = subparsers.add_parser('extract', help='Extract content from files')
|
||||
extract_parser.add_argument('--kind', help='Extract specific kind (pdf, image, audio, video)')
|
||||
extract_parser.add_argument('--limit', type=int, default=10, help='Limit extraction batch')
|
||||
|
||||
parse_parser = subparsers.add_parser('parse', help='Parse files to extract text')
|
||||
parse_parser.add_argument('--kind', help='Parse specific kind (text, code, pdf)')
|
||||
parse_parser.add_argument('--limit', type=int, default=100, help='Limit parse batch')
|
||||
parse_parser.add_argument('--update', action='store_true', help='Save extracted text to database')
|
||||
|
||||
enrich_parser = subparsers.add_parser('enrich', help='Enrich content with LLM analysis')
|
||||
enrich_parser.add_argument('--limit', type=int, default=10, help='Limit enrichment batch')
|
||||
enrich_parser.add_argument('--llm-endpoint', default='http://192.168.1.74:1234', help='LLM endpoint')
|
||||
enrich_parser.add_argument('--local', action='store_true', help='Use local Ollama')
|
||||
|
||||
classify_parser = subparsers.add_parser('classify', help='Classify files and suggest organization')
|
||||
classify_parser.add_argument('--disk', help='Classify specific disk')
|
||||
classify_parser.add_argument('--update', action='store_true', help='Update database with classifications')
|
||||
classify_parser.add_argument('--no-resume', action='store_true', help='Start from scratch instead of resuming')
|
||||
folders_parser = subparsers.add_parser('analyze-folders', help='Analyze folder structure and infer project intent')
|
||||
folders_parser.add_argument('--disk', help='Analyze specific disk')
|
||||
folders_parser.add_argument('--min-files', type=int, default=3, help='Minimum files per folder')
|
||||
review_parser = subparsers.add_parser('review', help='Review proposed migration structure')
|
||||
review_parser.add_argument('--category', help='Review specific category')
|
||||
review_parser.add_argument('--show-build', action='store_true', help='Include build artifacts')
|
||||
report_parser = subparsers.add_parser('report', help='Show current status')
|
||||
report_parser.add_argument('--format', choices=['text', 'json'], default='text', help='Report format')
|
||||
report_parser.add_argument('--show-duplicates', action='store_true', help='Show duplicate files')
|
||||
report_parser.add_argument('--preview-merge', help='Preview merge plan from file')
|
||||
args = parser.parse_args()
|
||||
tool = DiskReorganizer()
|
||||
if args.command == 'index':
|
||||
tool.index_disk(args.disk_root, args.disk_name)
|
||||
elif args.command == 'dedupe':
|
||||
tool.run_deduplication(disk=args.disk, use_chunks=not args.no_chunks)
|
||||
elif args.command == 'merge':
|
||||
tool.plan_merge(sources=args.sources, target=args.target, output_file=args.output, filter_system=args.filter_system, network_target=args.network)
|
||||
elif args.command == 'plan':
|
||||
plan = tool.plan_migration(args.target_disk, args.dest_disks)
|
||||
if plan:
|
||||
print(f"\nPlan generated: {plan['file_count']} files, {tool.format_size(plan['total_size'])}")
|
||||
print(f"Destination disks: {', '.join(plan['destination_disks'])}")
|
||||
elif args.command == 'execute':
|
||||
tool.execute_migration(args.plan_file, dry_run=args.dry_run)
|
||||
elif args.command == 'profile':
|
||||
tool.profile_content(disk=args.disk, update_db=args.update, limit=args.limit)
|
||||
elif args.command == 'extract':
|
||||
tool.extract_content(kind=args.kind, limit=args.limit)
|
||||
elif args.command == 'parse':
|
||||
tool.parse_files(kind=args.kind, limit=args.limit, update_db=args.update)
|
||||
elif args.command == 'enrich':
|
||||
tool.enrich_files(limit=args.limit, llm_endpoint=args.llm_endpoint, use_local=args.local)
|
||||
elif args.command == 'classify':
|
||||
tool.classify_files(disk=args.disk, update_db=args.update, resume=not args.no_resume)
|
||||
elif args.command == 'analyze-folders':
|
||||
tool.analyze_folders(disk=args.disk, min_files=args.min_files)
|
||||
elif args.command == 'review':
|
||||
tool.review_migration(category=args.category, show_build=args.show_build)
|
||||
elif args.command == 'report':
|
||||
tool.generate_report(format=args.format, show_duplicates=args.show_duplicates, preview_merge=args.preview_merge)
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
@@ -1,27 +1,5 @@
|
||||
"""Migration package exports"""
|
||||
from .copy import (
|
||||
CopyMigrationStrategy,
|
||||
FastCopyStrategy,
|
||||
SafeCopyStrategy,
|
||||
ReferenceCopyStrategy
|
||||
)
|
||||
from .hardlink import (
|
||||
HardlinkMigrationStrategy,
|
||||
SymlinkMigrationStrategy,
|
||||
DedupHardlinkStrategy
|
||||
)
|
||||
from .copy import CopyMigrationStrategy, FastCopyStrategy, SafeCopyStrategy, ReferenceCopyStrategy
|
||||
from .hardlink import HardlinkMigrationStrategy, SymlinkMigrationStrategy, DedupHardlinkStrategy
|
||||
from .engine import MigrationEngine
|
||||
from ._protocols import IMigrationStrategy, IMigrationEngine
|
||||
|
||||
__all__ = [
|
||||
'CopyMigrationStrategy',
|
||||
'FastCopyStrategy',
|
||||
'SafeCopyStrategy',
|
||||
'ReferenceCopyStrategy',
|
||||
'HardlinkMigrationStrategy',
|
||||
'SymlinkMigrationStrategy',
|
||||
'DedupHardlinkStrategy',
|
||||
'MigrationEngine',
|
||||
'IMigrationStrategy',
|
||||
'IMigrationEngine',
|
||||
]
|
||||
__all__ = ['CopyMigrationStrategy', 'FastCopyStrategy', 'SafeCopyStrategy', 'ReferenceCopyStrategy', 'HardlinkMigrationStrategy', 'SymlinkMigrationStrategy', 'DedupHardlinkStrategy', 'MigrationEngine', 'IMigrationStrategy', 'IMigrationEngine']
|
||||
|
||||
@@ -1,107 +1,28 @@
|
||||
"""Protocol definitions for the migration package"""
|
||||
from typing import Protocol
|
||||
from pathlib import Path
|
||||
from ..shared.models import OperationRecord
|
||||
|
||||
|
||||
class IMigrationStrategy(Protocol):
|
||||
"""Protocol for migration strategies"""
|
||||
|
||||
def migrate(
|
||||
self,
|
||||
source: Path,
|
||||
destination: Path,
|
||||
verify: bool = True
|
||||
) -> bool:
|
||||
"""Migrate a file from source to destination
|
||||
|
||||
Args:
|
||||
source: Source file path
|
||||
destination: Destination file path
|
||||
verify: Whether to verify the operation
|
||||
|
||||
Returns:
|
||||
True if migration successful
|
||||
"""
|
||||
def migrate(self, source: Path, destination: Path, verify: bool=True) -> bool:
|
||||
...
|
||||
|
||||
def can_migrate(self, source: Path, destination: Path) -> bool:
|
||||
"""Check if migration is possible
|
||||
|
||||
Args:
|
||||
source: Source file path
|
||||
destination: Destination file path
|
||||
|
||||
Returns:
|
||||
True if migration is possible
|
||||
"""
|
||||
...
|
||||
|
||||
def estimate_time(self, source: Path) -> float:
|
||||
"""Estimate migration time in seconds
|
||||
|
||||
Args:
|
||||
source: Source file path
|
||||
|
||||
Returns:
|
||||
Estimated time in seconds
|
||||
"""
|
||||
...
|
||||
|
||||
def cleanup(self, source: Path) -> bool:
|
||||
"""Cleanup source file after successful migration
|
||||
|
||||
Args:
|
||||
source: Source file path
|
||||
|
||||
Returns:
|
||||
True if cleanup successful
|
||||
"""
|
||||
...
|
||||
|
||||
|
||||
class IMigrationEngine(Protocol):
|
||||
"""Protocol for migration engine"""
|
||||
|
||||
def plan_migration(
|
||||
self,
|
||||
disk: str,
|
||||
target_base: Path
|
||||
) -> list[OperationRecord]:
|
||||
"""Plan migration for a disk
|
||||
|
||||
Args:
|
||||
disk: Disk identifier
|
||||
target_base: Target base directory
|
||||
|
||||
Returns:
|
||||
List of planned operations
|
||||
"""
|
||||
def plan_migration(self, disk: str, target_base: Path) -> list[OperationRecord]:
|
||||
...
|
||||
|
||||
def execute_migration(
|
||||
self,
|
||||
operations: list[OperationRecord],
|
||||
dry_run: bool = False
|
||||
) -> dict:
|
||||
"""Execute migration operations
|
||||
|
||||
Args:
|
||||
operations: List of operations to execute
|
||||
dry_run: Whether to perform a dry run
|
||||
|
||||
Returns:
|
||||
Dictionary with execution statistics
|
||||
"""
|
||||
def execute_migration(self, operations: list[OperationRecord], dry_run: bool=False) -> dict:
|
||||
...
|
||||
|
||||
def rollback(self, operation: OperationRecord) -> bool:
|
||||
"""Rollback a migration operation
|
||||
|
||||
Args:
|
||||
operation: Operation to rollback
|
||||
|
||||
Returns:
|
||||
True if rollback successful
|
||||
"""
|
||||
...
|
||||
|
||||
@@ -1,268 +1,129 @@
|
||||
"""Copy-based migration strategy"""
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
import os
|
||||
|
||||
from ..shared.logger import ProgressLogger
|
||||
|
||||
|
||||
class CopyMigrationStrategy:
|
||||
"""Copy files to destination with verification"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
logger: Optional[ProgressLogger] = None,
|
||||
preserve_metadata: bool = True,
|
||||
verify_checksums: bool = True
|
||||
):
|
||||
"""Initialize copy migration strategy
|
||||
|
||||
Args:
|
||||
logger: Optional progress logger
|
||||
preserve_metadata: Whether to preserve file metadata
|
||||
verify_checksums: Whether to verify checksums after copy
|
||||
"""
|
||||
def __init__(self, logger: Optional[ProgressLogger]=None, preserve_metadata: bool=True, verify_checksums: bool=True):
|
||||
self.logger = logger
|
||||
self.preserve_metadata = preserve_metadata
|
||||
self.verify_checksums = verify_checksums
|
||||
|
||||
def migrate(
|
||||
self,
|
||||
source: Path,
|
||||
destination: Path,
|
||||
verify: bool = True
|
||||
) -> bool:
|
||||
"""Migrate file by copying
|
||||
|
||||
Args:
|
||||
source: Source file path
|
||||
destination: Destination file path
|
||||
verify: Whether to verify the operation
|
||||
|
||||
Returns:
|
||||
True if migration successful
|
||||
"""
|
||||
def migrate(self, source: Path, destination: Path, verify: bool=True) -> bool:
|
||||
if not source.exists():
|
||||
if self.logger:
|
||||
self.logger.error(f"Source file does not exist: {source}")
|
||||
self.logger.error(f'Source file does not exist: {source}')
|
||||
return False
|
||||
|
||||
# Create destination directory
|
||||
destination.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
try:
|
||||
# Copy file
|
||||
if self.preserve_metadata:
|
||||
shutil.copy2(source, destination)
|
||||
else:
|
||||
shutil.copy(source, destination)
|
||||
|
||||
# Verify if requested
|
||||
if verify and self.verify_checksums:
|
||||
if not self._verify_copy(source, destination):
|
||||
if self.logger:
|
||||
self.logger.error(f"Verification failed: {source} -> {destination}")
|
||||
self.logger.error(f'Verification failed: {source} -> {destination}')
|
||||
destination.unlink()
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
if self.logger:
|
||||
self.logger.error(f"Copy failed: {source} -> {destination}: {e}")
|
||||
self.logger.error(f'Copy failed: {source} -> {destination}: {e}')
|
||||
return False
|
||||
|
||||
def _verify_copy(self, source: Path, destination: Path) -> bool:
|
||||
"""Verify copied file
|
||||
|
||||
Args:
|
||||
source: Source file path
|
||||
destination: Destination file path
|
||||
|
||||
Returns:
|
||||
True if verification successful
|
||||
"""
|
||||
# Check size
|
||||
source_size = source.stat().st_size
|
||||
dest_size = destination.stat().st_size
|
||||
|
||||
if source_size != dest_size:
|
||||
return False
|
||||
|
||||
# Compare checksums for files larger than 1MB
|
||||
if source_size > 1024 * 1024:
|
||||
from ..deduplication.chunker import hash_file
|
||||
|
||||
source_hash = hash_file(source)
|
||||
dest_hash = hash_file(destination)
|
||||
|
||||
return source_hash == dest_hash
|
||||
|
||||
# For small files, compare content directly
|
||||
with open(source, 'rb') as f1, open(destination, 'rb') as f2:
|
||||
return f1.read() == f2.read()
|
||||
|
||||
def can_migrate(self, source: Path, destination: Path) -> bool:
|
||||
"""Check if migration is possible
|
||||
|
||||
Args:
|
||||
source: Source file path
|
||||
destination: Destination file path
|
||||
|
||||
Returns:
|
||||
True if migration is possible
|
||||
"""
|
||||
if not source.exists():
|
||||
return False
|
||||
|
||||
# Check if destination directory is writable
|
||||
dest_dir = destination.parent
|
||||
if dest_dir.exists():
|
||||
return os.access(dest_dir, os.W_OK)
|
||||
|
||||
# Check if parent directory exists and is writable
|
||||
parent = dest_dir.parent
|
||||
while not parent.exists() and parent != parent.parent:
|
||||
parent = parent.parent
|
||||
|
||||
return parent.exists() and os.access(parent, os.W_OK)
|
||||
|
||||
def estimate_time(self, source: Path) -> float:
|
||||
"""Estimate migration time in seconds
|
||||
|
||||
Args:
|
||||
source: Source file path
|
||||
|
||||
Returns:
|
||||
Estimated time in seconds
|
||||
"""
|
||||
if not source.exists():
|
||||
return 0.0
|
||||
|
||||
size = source.stat().st_size
|
||||
|
||||
# Estimate based on typical copy speed (100 MB/s)
|
||||
typical_speed = 100 * 1024 * 1024 # bytes per second
|
||||
typical_speed = 100 * 1024 * 1024
|
||||
return size / typical_speed
|
||||
|
||||
def cleanup(self, source: Path) -> bool:
|
||||
"""Cleanup source file after successful migration
|
||||
|
||||
Args:
|
||||
source: Source file path
|
||||
|
||||
Returns:
|
||||
True if cleanup successful
|
||||
"""
|
||||
try:
|
||||
if source.exists():
|
||||
source.unlink()
|
||||
return True
|
||||
except Exception as e:
|
||||
if self.logger:
|
||||
self.logger.warning(f"Failed to cleanup {source}: {e}")
|
||||
self.logger.warning(f'Failed to cleanup {source}: {e}')
|
||||
return False
|
||||
|
||||
|
||||
class FastCopyStrategy(CopyMigrationStrategy):
|
||||
"""Fast copy strategy without verification"""
|
||||
|
||||
def __init__(self, logger: Optional[ProgressLogger] = None):
|
||||
"""Initialize fast copy strategy"""
|
||||
super().__init__(
|
||||
logger=logger,
|
||||
preserve_metadata=True,
|
||||
verify_checksums=False
|
||||
)
|
||||
|
||||
def __init__(self, logger: Optional[ProgressLogger]=None):
|
||||
super().__init__(logger=logger, preserve_metadata=True, verify_checksums=False)
|
||||
|
||||
class SafeCopyStrategy(CopyMigrationStrategy):
|
||||
"""Safe copy strategy with full verification"""
|
||||
|
||||
def __init__(self, logger: Optional[ProgressLogger] = None):
|
||||
"""Initialize safe copy strategy"""
|
||||
super().__init__(
|
||||
logger=logger,
|
||||
preserve_metadata=True,
|
||||
verify_checksums=True
|
||||
)
|
||||
|
||||
def __init__(self, logger: Optional[ProgressLogger]=None):
|
||||
super().__init__(logger=logger, preserve_metadata=True, verify_checksums=True)
|
||||
|
||||
class ReferenceCopyStrategy:
|
||||
"""Create reference copy using reflinks (CoW) if supported"""
|
||||
|
||||
def __init__(self, logger: Optional[ProgressLogger] = None):
|
||||
"""Initialize reflink copy strategy"""
|
||||
def __init__(self, logger: Optional[ProgressLogger]=None):
|
||||
self.logger = logger
|
||||
|
||||
def migrate(
|
||||
self,
|
||||
source: Path,
|
||||
destination: Path,
|
||||
verify: bool = True
|
||||
) -> bool:
|
||||
"""Migrate using reflink (copy-on-write)
|
||||
|
||||
Args:
|
||||
source: Source file path
|
||||
destination: Destination file path
|
||||
verify: Whether to verify the operation
|
||||
|
||||
Returns:
|
||||
True if migration successful
|
||||
"""
|
||||
def migrate(self, source: Path, destination: Path, verify: bool=True) -> bool:
|
||||
if not source.exists():
|
||||
if self.logger:
|
||||
self.logger.error(f"Source file does not exist: {source}")
|
||||
self.logger.error(f'Source file does not exist: {source}')
|
||||
return False
|
||||
|
||||
# Create destination directory
|
||||
destination.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
try:
|
||||
# Try reflink copy (works on btrfs, xfs, etc.)
|
||||
import subprocess
|
||||
|
||||
result = subprocess.run(
|
||||
['cp', '--reflink=auto', str(source), str(destination)],
|
||||
capture_output=True,
|
||||
check=False
|
||||
)
|
||||
|
||||
result = subprocess.run(['cp', '--reflink=auto', str(source), str(destination)], capture_output=True, check=False)
|
||||
if result.returncode != 0:
|
||||
# Fallback to regular copy
|
||||
shutil.copy2(source, destination)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
if self.logger:
|
||||
self.logger.error(f"Reflink copy failed: {source} -> {destination}: {e}")
|
||||
self.logger.error(f'Reflink copy failed: {source} -> {destination}: {e}')
|
||||
return False
|
||||
|
||||
def can_migrate(self, source: Path, destination: Path) -> bool:
|
||||
"""Check if migration is possible"""
|
||||
if not source.exists():
|
||||
return False
|
||||
|
||||
dest_dir = destination.parent
|
||||
if dest_dir.exists():
|
||||
return os.access(dest_dir, os.W_OK)
|
||||
|
||||
return True
|
||||
|
||||
def estimate_time(self, source: Path) -> float:
|
||||
"""Estimate migration time (reflinks are fast)"""
|
||||
return 0.1 # Reflinks are nearly instant
|
||||
return 0.1
|
||||
|
||||
def cleanup(self, source: Path) -> bool:
|
||||
"""Cleanup source file"""
|
||||
try:
|
||||
if source.exists():
|
||||
source.unlink()
|
||||
return True
|
||||
except Exception as e:
|
||||
if self.logger:
|
||||
self.logger.warning(f"Failed to cleanup {source}: {e}")
|
||||
self.logger.warning(f'Failed to cleanup {source}: {e}')
|
||||
return False
|
||||
|
||||
@@ -1,254 +1,100 @@
|
||||
"""Migration engine"""
|
||||
from pathlib import Path
|
||||
from typing import Optional, Callable
|
||||
from datetime import datetime
|
||||
import psycopg2
|
||||
from psycopg2.extras import execute_batch
|
||||
|
||||
from .copy import CopyMigrationStrategy, SafeCopyStrategy
|
||||
from .hardlink import HardlinkMigrationStrategy, SymlinkMigrationStrategy
|
||||
from ..shared.models import OperationRecord, ProcessingStats, MigrationPlan
|
||||
from ..shared.config import DatabaseConfig, ProcessingConfig
|
||||
from ..shared.logger import ProgressLogger
|
||||
|
||||
|
||||
class MigrationEngine:
|
||||
"""Engine for migrating files"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
db_config: DatabaseConfig,
|
||||
processing_config: ProcessingConfig,
|
||||
logger: ProgressLogger,
|
||||
target_base: Path
|
||||
):
|
||||
"""Initialize migration engine
|
||||
|
||||
Args:
|
||||
db_config: Database configuration
|
||||
processing_config: Processing configuration
|
||||
logger: Progress logger
|
||||
target_base: Target base directory for migrations
|
||||
"""
|
||||
def __init__(self, db_config: DatabaseConfig, processing_config: ProcessingConfig, logger: ProgressLogger, target_base: Path):
|
||||
self.db_config = db_config
|
||||
self.processing_config = processing_config
|
||||
self.logger = logger
|
||||
self.target_base = Path(target_base)
|
||||
self._connection = None
|
||||
|
||||
# Initialize strategies
|
||||
self.copy_strategy = SafeCopyStrategy(logger=logger)
|
||||
self.hardlink_strategy = HardlinkMigrationStrategy(logger=logger)
|
||||
self.symlink_strategy = SymlinkMigrationStrategy(logger=logger)
|
||||
|
||||
def _get_connection(self):
|
||||
"""Get or create database connection"""
|
||||
if self._connection is None or self._connection.closed:
|
||||
self._connection = psycopg2.connect(
|
||||
host=self.db_config.host,
|
||||
port=self.db_config.port,
|
||||
database=self.db_config.database,
|
||||
user=self.db_config.user,
|
||||
password=self.db_config.password
|
||||
)
|
||||
self._connection = psycopg2.connect(host=self.db_config.host, port=self.db_config.port, database=self.db_config.database, user=self.db_config.user, password=self.db_config.password)
|
||||
return self._connection
|
||||
|
||||
def _ensure_tables(self):
|
||||
"""Ensure migration tables exist"""
|
||||
conn = self._get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Create operations table
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS operations (
|
||||
id SERIAL PRIMARY KEY,
|
||||
source_path TEXT NOT NULL,
|
||||
target_path TEXT NOT NULL,
|
||||
operation_type TEXT NOT NULL,
|
||||
size BIGINT DEFAULT 0,
|
||||
status TEXT DEFAULT 'pending',
|
||||
error TEXT,
|
||||
executed_at TIMESTAMP,
|
||||
verified BOOLEAN DEFAULT FALSE,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
""")
|
||||
|
||||
# Create index on status
|
||||
cursor.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_operations_status
|
||||
ON operations(status)
|
||||
""")
|
||||
|
||||
cursor.execute("\n CREATE TABLE IF NOT EXISTS operations (\n id SERIAL PRIMARY KEY,\n source_path TEXT NOT NULL,\n target_path TEXT NOT NULL,\n operation_type TEXT NOT NULL,\n size BIGINT DEFAULT 0,\n status TEXT DEFAULT 'pending',\n error TEXT,\n executed_at TIMESTAMP,\n verified BOOLEAN DEFAULT FALSE,\n created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n )\n ")
|
||||
cursor.execute('\n CREATE INDEX IF NOT EXISTS idx_operations_status\n ON operations(status)\n ')
|
||||
conn.commit()
|
||||
cursor.close()
|
||||
|
||||
def plan_migration(
|
||||
self,
|
||||
disk: Optional[str] = None,
|
||||
category: Optional[str] = None
|
||||
) -> MigrationPlan:
|
||||
"""Plan migration for files
|
||||
|
||||
Args:
|
||||
disk: Optional disk filter
|
||||
category: Optional category filter
|
||||
|
||||
Returns:
|
||||
MigrationPlan with planned operations
|
||||
"""
|
||||
self.logger.section("Planning Migration")
|
||||
|
||||
def plan_migration(self, disk: Optional[str]=None, category: Optional[str]=None) -> MigrationPlan:
|
||||
self.logger.section('Planning Migration')
|
||||
conn = self._get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Build query
|
||||
conditions = ["category IS NOT NULL"]
|
||||
conditions = ['category IS NOT NULL']
|
||||
params = []
|
||||
|
||||
if disk:
|
||||
conditions.append("disk_label = %s")
|
||||
conditions.append('disk_label = %s')
|
||||
params.append(disk)
|
||||
|
||||
if category:
|
||||
conditions.append("category = %s")
|
||||
conditions.append('category = %s')
|
||||
params.append(category)
|
||||
|
||||
query = f"""
|
||||
SELECT path, size, category, duplicate_of
|
||||
FROM files
|
||||
WHERE {' AND '.join(conditions)}
|
||||
ORDER BY category, path
|
||||
"""
|
||||
|
||||
query = f"\n SELECT path, size, category, duplicate_of\n FROM files\n WHERE {' AND '.join(conditions)}\n ORDER BY category, path\n "
|
||||
cursor.execute(query, params)
|
||||
files = cursor.fetchall()
|
||||
|
||||
self.logger.info(f"Found {len(files)} files to migrate")
|
||||
|
||||
self.logger.info(f'Found {len(files)} files to migrate')
|
||||
operations = []
|
||||
total_size = 0
|
||||
|
||||
for path_str, size, file_category, duplicate_of in files:
|
||||
source = Path(path_str)
|
||||
|
||||
# Determine destination
|
||||
target_path = self.target_base / file_category / source.name
|
||||
|
||||
# Determine operation type
|
||||
if duplicate_of:
|
||||
# Use hardlink for duplicates
|
||||
operation_type = 'hardlink'
|
||||
else:
|
||||
# Use copy for unique files
|
||||
operation_type = 'copy'
|
||||
|
||||
operation = OperationRecord(
|
||||
source_path=source,
|
||||
target_path=target_path,
|
||||
operation_type=operation_type,
|
||||
size=size
|
||||
)
|
||||
|
||||
operation = OperationRecord(source_path=source, target_path=target_path, operation_type=operation_type, size=size)
|
||||
operations.append(operation)
|
||||
total_size += size
|
||||
|
||||
cursor.close()
|
||||
|
||||
plan = MigrationPlan(
|
||||
target_disk=str(self.target_base),
|
||||
destination_disks=[str(self.target_base)],
|
||||
operations=operations,
|
||||
total_size=total_size,
|
||||
file_count=len(operations)
|
||||
)
|
||||
|
||||
self.logger.info(
|
||||
f"Migration plan created: {plan.file_count} files, "
|
||||
f"{plan.total_size:,} bytes"
|
||||
)
|
||||
|
||||
plan = MigrationPlan(target_disk=str(self.target_base), destination_disks=[str(self.target_base)], operations=operations, total_size=total_size, file_count=len(operations))
|
||||
self.logger.info(f'Migration plan created: {plan.file_count} files, {plan.total_size:,} bytes')
|
||||
return plan
|
||||
|
||||
def execute_migration(
|
||||
self,
|
||||
operations: list[OperationRecord],
|
||||
dry_run: bool = False,
|
||||
progress_callback: Optional[Callable[[int, int, ProcessingStats], None]] = None
|
||||
) -> ProcessingStats:
|
||||
"""Execute migration operations
|
||||
|
||||
Args:
|
||||
operations: List of operations to execute
|
||||
dry_run: Whether to perform a dry run
|
||||
progress_callback: Optional callback for progress updates
|
||||
|
||||
Returns:
|
||||
ProcessingStats with execution statistics
|
||||
"""
|
||||
self.logger.section("Executing Migration" + (" (DRY RUN)" if dry_run else ""))
|
||||
|
||||
def execute_migration(self, operations: list[OperationRecord], dry_run: bool=False, progress_callback: Optional[Callable[[int, int, ProcessingStats], None]]=None) -> ProcessingStats:
|
||||
self.logger.section('Executing Migration' + (' (DRY RUN)' if dry_run else ''))
|
||||
self._ensure_tables()
|
||||
|
||||
stats = ProcessingStats()
|
||||
total_ops = len(operations)
|
||||
|
||||
for operation in operations:
|
||||
stats.files_processed += 1
|
||||
|
||||
if dry_run:
|
||||
# In dry run, just log what would happen
|
||||
self.logger.debug(
|
||||
f"[DRY RUN] Would {operation.operation_type}: "
|
||||
f"{operation.source_path} -> {operation.target_path}"
|
||||
)
|
||||
self.logger.debug(f'[DRY RUN] Would {operation.operation_type}: {operation.source_path} -> {operation.target_path}')
|
||||
stats.files_succeeded += 1
|
||||
else:
|
||||
# Execute actual migration
|
||||
success = self._execute_operation(operation)
|
||||
|
||||
if success:
|
||||
stats.files_succeeded += 1
|
||||
stats.bytes_processed += operation.size
|
||||
else:
|
||||
stats.files_failed += 1
|
||||
|
||||
# Progress callback
|
||||
if progress_callback and stats.files_processed % 100 == 0:
|
||||
progress_callback(stats.files_processed, total_ops, stats)
|
||||
|
||||
# Log progress
|
||||
if stats.files_processed % 1000 == 0:
|
||||
self.logger.progress(
|
||||
stats.files_processed,
|
||||
total_ops,
|
||||
prefix="Operations executed",
|
||||
bytes_processed=stats.bytes_processed,
|
||||
elapsed_seconds=stats.elapsed_seconds
|
||||
)
|
||||
|
||||
self.logger.info(
|
||||
f"Migration {'dry run' if dry_run else 'execution'} complete: "
|
||||
f"{stats.files_succeeded}/{total_ops} operations, "
|
||||
f"{stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s"
|
||||
)
|
||||
|
||||
self.logger.progress(stats.files_processed, total_ops, prefix='Operations executed', bytes_processed=stats.bytes_processed, elapsed_seconds=stats.elapsed_seconds)
|
||||
self.logger.info(f"Migration {('dry run' if dry_run else 'execution')} complete: {stats.files_succeeded}/{total_ops} operations, {stats.bytes_processed:,} bytes in {stats.elapsed_seconds:.1f}s")
|
||||
return stats
|
||||
|
||||
def _execute_operation(self, operation: OperationRecord) -> bool:
|
||||
"""Execute a single migration operation
|
||||
|
||||
Args:
|
||||
operation: Operation to execute
|
||||
|
||||
Returns:
|
||||
True if successful
|
||||
"""
|
||||
operation.status = 'in_progress'
|
||||
operation.executed_at = datetime.now()
|
||||
|
||||
try:
|
||||
# Select strategy based on operation type
|
||||
if operation.operation_type == 'copy':
|
||||
strategy = self.copy_strategy
|
||||
elif operation.operation_type == 'hardlink':
|
||||
@@ -256,15 +102,8 @@ class MigrationEngine:
|
||||
elif operation.operation_type == 'symlink':
|
||||
strategy = self.symlink_strategy
|
||||
else:
|
||||
raise ValueError(f"Unknown operation type: {operation.operation_type}")
|
||||
|
||||
# Execute migration
|
||||
success = strategy.migrate(
|
||||
operation.source_path,
|
||||
operation.target_path,
|
||||
verify=self.processing_config.verify_operations
|
||||
)
|
||||
|
||||
raise ValueError(f'Unknown operation type: {operation.operation_type}')
|
||||
success = strategy.migrate(operation.source_path, operation.target_path, verify=self.processing_config.verify_operations)
|
||||
if success:
|
||||
operation.status = 'completed'
|
||||
operation.verified = True
|
||||
@@ -272,183 +111,85 @@ class MigrationEngine:
|
||||
return True
|
||||
else:
|
||||
operation.status = 'failed'
|
||||
operation.error = "Migration failed"
|
||||
operation.error = 'Migration failed'
|
||||
self._record_operation(operation)
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
operation.status = 'failed'
|
||||
operation.error = str(e)
|
||||
self._record_operation(operation)
|
||||
self.logger.error(f"Operation failed: {operation.source_path}: {e}")
|
||||
self.logger.error(f'Operation failed: {operation.source_path}: {e}')
|
||||
return False
|
||||
|
||||
def _record_operation(self, operation: OperationRecord):
|
||||
"""Record operation in database
|
||||
|
||||
Args:
|
||||
operation: Operation to record
|
||||
"""
|
||||
conn = self._get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute("""
|
||||
INSERT INTO operations (
|
||||
source_path, target_path, operation_type, bytes_processed,
|
||||
status, error, executed_at, verified
|
||||
)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
||||
""", (
|
||||
str(operation.source_path),
|
||||
str(operation.target_path),
|
||||
operation.operation_type,
|
||||
operation.size,
|
||||
operation.status,
|
||||
operation.error,
|
||||
operation.executed_at,
|
||||
operation.verified
|
||||
))
|
||||
|
||||
cursor.execute('\n INSERT INTO operations (\n source_path, target_path, operation_type, bytes_processed,\n status, error, executed_at, verified\n )\n VALUES (%s, %s, %s, %s, %s, %s, %s, %s)\n ', (str(operation.source_path), str(operation.target_path), operation.operation_type, operation.size, operation.status, operation.error, operation.executed_at, operation.verified))
|
||||
conn.commit()
|
||||
cursor.close()
|
||||
|
||||
def rollback(self, operation: OperationRecord) -> bool:
|
||||
"""Rollback a migration operation
|
||||
|
||||
Args:
|
||||
operation: Operation to rollback
|
||||
|
||||
Returns:
|
||||
True if rollback successful
|
||||
"""
|
||||
self.logger.warning(f"Rolling back: {operation.target_path}")
|
||||
|
||||
self.logger.warning(f'Rolling back: {operation.target_path}')
|
||||
try:
|
||||
# Remove destination
|
||||
if operation.target_path.exists():
|
||||
operation.target_path.unlink()
|
||||
|
||||
# Update database
|
||||
conn = self._get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute("""
|
||||
UPDATE operations
|
||||
SET status = 'rolled_back'
|
||||
WHERE source_path = %s AND target_path = %s
|
||||
""", (str(operation.source_path), str(operation.target_path)))
|
||||
|
||||
cursor.execute("\n UPDATE operations\n SET status = 'rolled_back'\n WHERE source_path = %s AND target_path = %s\n ", (str(operation.source_path), str(operation.target_path)))
|
||||
conn.commit()
|
||||
cursor.close()
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Rollback failed: {operation.target_path}: {e}")
|
||||
self.logger.error(f'Rollback failed: {operation.target_path}: {e}')
|
||||
return False
|
||||
|
||||
def get_migration_stats(self) -> dict:
|
||||
"""Get migration statistics
|
||||
|
||||
Returns:
|
||||
Dictionary with statistics
|
||||
"""
|
||||
conn = self._get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
stats = {}
|
||||
|
||||
# Total operations
|
||||
cursor.execute("SELECT COUNT(*) FROM operations")
|
||||
cursor.execute('SELECT COUNT(*) FROM operations')
|
||||
stats['total_operations'] = cursor.fetchone()[0]
|
||||
|
||||
# Operations by status
|
||||
cursor.execute("""
|
||||
SELECT status, COUNT(*)
|
||||
FROM operations
|
||||
GROUP BY status
|
||||
""")
|
||||
|
||||
cursor.execute('\n SELECT status, COUNT(*)\n FROM operations\n GROUP BY status\n ')
|
||||
for status, count in cursor.fetchall():
|
||||
stats[f'{status}_operations'] = count
|
||||
|
||||
# Total size migrated
|
||||
cursor.execute("""
|
||||
SELECT COALESCE(SUM(size), 0)
|
||||
FROM operations
|
||||
WHERE status = 'completed'
|
||||
""")
|
||||
cursor.execute("\n SELECT COALESCE(SUM(size), 0)\n FROM operations\n WHERE status = 'completed'\n ")
|
||||
stats['total_size_migrated'] = cursor.fetchone()[0]
|
||||
|
||||
cursor.close()
|
||||
|
||||
return stats
|
||||
|
||||
def verify_migrations(self) -> dict:
|
||||
"""Verify completed migrations
|
||||
|
||||
Returns:
|
||||
Dictionary with verification results
|
||||
"""
|
||||
self.logger.subsection("Verifying Migrations")
|
||||
|
||||
self.logger.subsection('Verifying Migrations')
|
||||
conn = self._get_connection()
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute("""
|
||||
SELECT source_path, target_path, operation_type
|
||||
FROM operations
|
||||
WHERE status = 'completed' AND verified = FALSE
|
||||
""")
|
||||
|
||||
cursor.execute("\n SELECT source_path, target_path, operation_type\n FROM operations\n WHERE status = 'completed' AND verified = FALSE\n ")
|
||||
operations = cursor.fetchall()
|
||||
cursor.close()
|
||||
|
||||
results = {
|
||||
'total': len(operations),
|
||||
'verified': 0,
|
||||
'failed': 0
|
||||
}
|
||||
|
||||
results = {'total': len(operations), 'verified': 0, 'failed': 0}
|
||||
for source_str, dest_str, op_type in operations:
|
||||
source = Path(source_str)
|
||||
dest = Path(dest_str)
|
||||
|
||||
# Verify destination exists
|
||||
if not dest.exists():
|
||||
results['failed'] += 1
|
||||
self.logger.warning(f"Verification failed: {dest} does not exist")
|
||||
self.logger.warning(f'Verification failed: {dest} does not exist')
|
||||
continue
|
||||
|
||||
# Verify based on operation type
|
||||
if op_type == 'hardlink':
|
||||
# Check if hardlinked
|
||||
if source.exists() and source.stat().st_ino == dest.stat().st_ino:
|
||||
results['verified'] += 1
|
||||
else:
|
||||
results['failed'] += 1
|
||||
elif dest.exists():
|
||||
results['verified'] += 1
|
||||
else:
|
||||
# Check if destination exists and has correct size
|
||||
if dest.exists():
|
||||
results['verified'] += 1
|
||||
else:
|
||||
results['failed'] += 1
|
||||
|
||||
self.logger.info(
|
||||
f"Verification complete: {results['verified']}/{results['total']} verified"
|
||||
)
|
||||
|
||||
results['failed'] += 1
|
||||
self.logger.info(f"Verification complete: {results['verified']}/{results['total']} verified")
|
||||
return results
|
||||
|
||||
def close(self):
|
||||
"""Close database connection"""
|
||||
if self._connection and not self._connection.closed:
|
||||
if self._connection and (not self._connection.closed):
|
||||
self._connection.close()
|
||||
|
||||
def __enter__(self):
|
||||
"""Context manager entry"""
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
"""Context manager exit"""
|
||||
self.close()
|
||||
|
||||
@@ -1,90 +1,43 @@
|
||||
"""Hardlink-based migration strategy"""
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from ..shared.logger import ProgressLogger
|
||||
|
||||
|
||||
class HardlinkMigrationStrategy:
|
||||
"""Create hardlinks to files instead of copying"""
|
||||
|
||||
def __init__(self, logger: Optional[ProgressLogger] = None):
|
||||
"""Initialize hardlink migration strategy
|
||||
|
||||
Args:
|
||||
logger: Optional progress logger
|
||||
"""
|
||||
def __init__(self, logger: Optional[ProgressLogger]=None):
|
||||
self.logger = logger
|
||||
|
||||
def migrate(
|
||||
self,
|
||||
source: Path,
|
||||
destination: Path,
|
||||
verify: bool = True
|
||||
) -> bool:
|
||||
"""Migrate file by creating hardlink
|
||||
|
||||
Args:
|
||||
source: Source file path
|
||||
destination: Destination file path
|
||||
verify: Whether to verify the operation
|
||||
|
||||
Returns:
|
||||
True if migration successful
|
||||
"""
|
||||
def migrate(self, source: Path, destination: Path, verify: bool=True) -> bool:
|
||||
if not source.exists():
|
||||
if self.logger:
|
||||
self.logger.error(f"Source file does not exist: {source}")
|
||||
self.logger.error(f'Source file does not exist: {source}')
|
||||
return False
|
||||
|
||||
# Check if source and destination are on same filesystem
|
||||
if not self._same_filesystem(source, destination.parent):
|
||||
if self.logger:
|
||||
self.logger.warning(
|
||||
f"Cannot hardlink across filesystems: {source} -> {destination}"
|
||||
)
|
||||
self.logger.warning(f'Cannot hardlink across filesystems: {source} -> {destination}')
|
||||
return False
|
||||
|
||||
# Create destination directory
|
||||
destination.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
try:
|
||||
# Create hardlink
|
||||
os.link(source, destination)
|
||||
|
||||
# Verify if requested
|
||||
if verify:
|
||||
if not self._verify_hardlink(source, destination):
|
||||
if self.logger:
|
||||
self.logger.error(f"Verification failed: {source} -> {destination}")
|
||||
self.logger.error(f'Verification failed: {source} -> {destination}')
|
||||
destination.unlink()
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
except FileExistsError:
|
||||
if self.logger:
|
||||
self.logger.warning(f"Destination already exists: {destination}")
|
||||
self.logger.warning(f'Destination already exists: {destination}')
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
if self.logger:
|
||||
self.logger.error(f"Hardlink failed: {source} -> {destination}: {e}")
|
||||
self.logger.error(f'Hardlink failed: {source} -> {destination}: {e}')
|
||||
return False
|
||||
|
||||
def _same_filesystem(self, path1: Path, path2: Path) -> bool:
|
||||
"""Check if two paths are on the same filesystem
|
||||
|
||||
Args:
|
||||
path1: First path
|
||||
path2: Second path
|
||||
|
||||
Returns:
|
||||
True if on same filesystem
|
||||
"""
|
||||
try:
|
||||
# Get device IDs
|
||||
stat1 = path1.stat()
|
||||
stat2 = path2.stat()
|
||||
return stat1.st_dev == stat2.st_dev
|
||||
@@ -92,286 +45,117 @@ class HardlinkMigrationStrategy:
|
||||
return False
|
||||
|
||||
def _verify_hardlink(self, source: Path, destination: Path) -> bool:
|
||||
"""Verify hardlink
|
||||
|
||||
Args:
|
||||
source: Source file path
|
||||
destination: Destination file path
|
||||
|
||||
Returns:
|
||||
True if verification successful
|
||||
"""
|
||||
try:
|
||||
# Check if they have the same inode
|
||||
source_stat = source.stat()
|
||||
dest_stat = destination.stat()
|
||||
|
||||
return source_stat.st_ino == dest_stat.st_ino
|
||||
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def can_migrate(self, source: Path, destination: Path) -> bool:
|
||||
"""Check if migration is possible
|
||||
|
||||
Args:
|
||||
source: Source file path
|
||||
destination: Destination file path
|
||||
|
||||
Returns:
|
||||
True if migration is possible
|
||||
"""
|
||||
if not source.exists():
|
||||
return False
|
||||
|
||||
# Check if on same filesystem
|
||||
dest_dir = destination.parent
|
||||
if dest_dir.exists():
|
||||
return self._same_filesystem(source, dest_dir)
|
||||
|
||||
# Check parent directories
|
||||
parent = dest_dir.parent
|
||||
while not parent.exists() and parent != parent.parent:
|
||||
parent = parent.parent
|
||||
|
||||
return parent.exists() and self._same_filesystem(source, parent)
|
||||
|
||||
def estimate_time(self, source: Path) -> float:
|
||||
"""Estimate migration time in seconds
|
||||
|
||||
Args:
|
||||
source: Source file path
|
||||
|
||||
Returns:
|
||||
Estimated time in seconds (hardlinks are instant)
|
||||
"""
|
||||
return 0.01 # Hardlinks are nearly instant
|
||||
return 0.01
|
||||
|
||||
def cleanup(self, source: Path) -> bool:
|
||||
"""Cleanup source file after successful migration
|
||||
|
||||
Note: For hardlinks, we typically don't remove the source
|
||||
immediately as both links point to the same inode.
|
||||
|
||||
Args:
|
||||
source: Source file path
|
||||
|
||||
Returns:
|
||||
True (no cleanup needed for hardlinks)
|
||||
"""
|
||||
# For hardlinks, we don't remove the source
|
||||
# Both source and destination point to the same data
|
||||
return True
|
||||
|
||||
|
||||
class SymlinkMigrationStrategy:
|
||||
"""Create symbolic links to files"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
logger: Optional[ProgressLogger] = None,
|
||||
absolute_links: bool = True
|
||||
):
|
||||
"""Initialize symlink migration strategy
|
||||
|
||||
Args:
|
||||
logger: Optional progress logger
|
||||
absolute_links: Whether to create absolute symlinks
|
||||
"""
|
||||
def __init__(self, logger: Optional[ProgressLogger]=None, absolute_links: bool=True):
|
||||
self.logger = logger
|
||||
self.absolute_links = absolute_links
|
||||
|
||||
def migrate(
|
||||
self,
|
||||
source: Path,
|
||||
destination: Path,
|
||||
verify: bool = True
|
||||
) -> bool:
|
||||
"""Migrate file by creating symlink
|
||||
|
||||
Args:
|
||||
source: Source file path
|
||||
destination: Destination file path
|
||||
verify: Whether to verify the operation
|
||||
|
||||
Returns:
|
||||
True if migration successful
|
||||
"""
|
||||
def migrate(self, source: Path, destination: Path, verify: bool=True) -> bool:
|
||||
if not source.exists():
|
||||
if self.logger:
|
||||
self.logger.error(f"Source file does not exist: {source}")
|
||||
self.logger.error(f'Source file does not exist: {source}')
|
||||
return False
|
||||
|
||||
# Create destination directory
|
||||
destination.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
try:
|
||||
# Determine link target
|
||||
if self.absolute_links:
|
||||
target = source.resolve()
|
||||
else:
|
||||
# Create relative symlink
|
||||
target = os.path.relpath(source, destination.parent)
|
||||
|
||||
# Create symlink
|
||||
destination.symlink_to(target)
|
||||
|
||||
# Verify if requested
|
||||
if verify:
|
||||
if not self._verify_symlink(destination, source):
|
||||
if self.logger:
|
||||
self.logger.error(f"Verification failed: {source} -> {destination}")
|
||||
self.logger.error(f'Verification failed: {source} -> {destination}')
|
||||
destination.unlink()
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
except FileExistsError:
|
||||
if self.logger:
|
||||
self.logger.warning(f"Destination already exists: {destination}")
|
||||
self.logger.warning(f'Destination already exists: {destination}')
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
if self.logger:
|
||||
self.logger.error(f"Symlink failed: {source} -> {destination}: {e}")
|
||||
self.logger.error(f'Symlink failed: {source} -> {destination}: {e}')
|
||||
return False
|
||||
|
||||
def _verify_symlink(self, symlink: Path, expected_target: Path) -> bool:
|
||||
"""Verify symlink
|
||||
|
||||
Args:
|
||||
symlink: Symlink path
|
||||
expected_target: Expected target path
|
||||
|
||||
Returns:
|
||||
True if verification successful
|
||||
"""
|
||||
try:
|
||||
# Check if it's a symlink
|
||||
if not symlink.is_symlink():
|
||||
return False
|
||||
|
||||
# Resolve and compare
|
||||
resolved = symlink.resolve()
|
||||
expected = expected_target.resolve()
|
||||
|
||||
return resolved == expected
|
||||
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def can_migrate(self, source: Path, destination: Path) -> bool:
|
||||
"""Check if migration is possible
|
||||
|
||||
Args:
|
||||
source: Source file path
|
||||
destination: Destination file path
|
||||
|
||||
Returns:
|
||||
True if migration is possible
|
||||
"""
|
||||
if not source.exists():
|
||||
return False
|
||||
|
||||
# Check if destination directory is writable
|
||||
dest_dir = destination.parent
|
||||
if dest_dir.exists():
|
||||
return os.access(dest_dir, os.W_OK)
|
||||
|
||||
return True
|
||||
|
||||
def estimate_time(self, source: Path) -> float:
|
||||
"""Estimate migration time in seconds
|
||||
|
||||
Args:
|
||||
source: Source file path
|
||||
|
||||
Returns:
|
||||
Estimated time in seconds (symlinks are instant)
|
||||
"""
|
||||
return 0.01 # Symlinks are instant
|
||||
return 0.01
|
||||
|
||||
def cleanup(self, source: Path) -> bool:
|
||||
"""Cleanup source file after successful migration
|
||||
|
||||
Note: For symlinks, we don't remove the source as the
|
||||
symlink points to it.
|
||||
|
||||
Args:
|
||||
source: Source file path
|
||||
|
||||
Returns:
|
||||
True (no cleanup needed for symlinks)
|
||||
"""
|
||||
# For symlinks, we don't remove the source
|
||||
return True
|
||||
|
||||
|
||||
class DedupHardlinkStrategy(HardlinkMigrationStrategy):
|
||||
"""Hardlink strategy for deduplication
|
||||
|
||||
Creates hardlinks for duplicate files to save space.
|
||||
"""
|
||||
|
||||
def __init__(self, logger: Optional[ProgressLogger] = None):
|
||||
"""Initialize dedup hardlink strategy"""
|
||||
def __init__(self, logger: Optional[ProgressLogger]=None):
|
||||
super().__init__(logger=logger)
|
||||
|
||||
def deduplicate(
|
||||
self,
|
||||
canonical: Path,
|
||||
duplicate: Path
|
||||
) -> bool:
|
||||
"""Replace duplicate with hardlink to canonical
|
||||
|
||||
Args:
|
||||
canonical: Canonical file path
|
||||
duplicate: Duplicate file path
|
||||
|
||||
Returns:
|
||||
True if deduplication successful
|
||||
"""
|
||||
def deduplicate(self, canonical: Path, duplicate: Path) -> bool:
|
||||
if not canonical.exists():
|
||||
if self.logger:
|
||||
self.logger.error(f"Canonical file does not exist: {canonical}")
|
||||
self.logger.error(f'Canonical file does not exist: {canonical}')
|
||||
return False
|
||||
|
||||
if not duplicate.exists():
|
||||
if self.logger:
|
||||
self.logger.error(f"Duplicate file does not exist: {duplicate}")
|
||||
self.logger.error(f'Duplicate file does not exist: {duplicate}')
|
||||
return False
|
||||
|
||||
# Check if already hardlinked
|
||||
if self._verify_hardlink(canonical, duplicate):
|
||||
return True
|
||||
|
||||
# Check if on same filesystem
|
||||
if not self._same_filesystem(canonical, duplicate):
|
||||
if self.logger:
|
||||
self.logger.warning(
|
||||
f"Cannot hardlink across filesystems: {canonical} -> {duplicate}"
|
||||
)
|
||||
self.logger.warning(f'Cannot hardlink across filesystems: {canonical} -> {duplicate}')
|
||||
return False
|
||||
|
||||
try:
|
||||
# Create temporary backup
|
||||
backup = duplicate.with_suffix(duplicate.suffix + '.bak')
|
||||
duplicate.rename(backup)
|
||||
|
||||
# Create hardlink
|
||||
os.link(canonical, duplicate)
|
||||
|
||||
# Remove backup
|
||||
backup.unlink()
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
if self.logger:
|
||||
self.logger.error(f"Deduplication failed: {duplicate}: {e}")
|
||||
|
||||
# Restore from backup
|
||||
self.logger.error(f'Deduplication failed: {duplicate}: {e}')
|
||||
if backup.exists():
|
||||
backup.rename(duplicate)
|
||||
|
||||
return False
|
||||
|
||||
62
app/parsers/audio_parser.py
Normal file
62
app/parsers/audio_parser.py
Normal file
@@ -0,0 +1,62 @@
|
||||
from pathlib import Path
|
||||
from typing import Dict
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class AudioParser:
|
||||
def __init__(self, whisper_model: str = 'base'):
|
||||
self.supported_formats = {'.mp3', '.wav', '.flac', '.m4a', '.ogg', '.wma', '.aac'}
|
||||
self.whisper_model = whisper_model
|
||||
|
||||
def parse(self, file_path: Path) -> Dict:
|
||||
if file_path.suffix.lower() not in self.supported_formats:
|
||||
return {'error': f'Unsupported format: {file_path.suffix}'}
|
||||
|
||||
try:
|
||||
return self._transcribe_with_whisper(file_path)
|
||||
except Exception as e:
|
||||
logger.error(f'Audio parse failed for {file_path}: {e}')
|
||||
return {'error': str(e), 'text': ''}
|
||||
|
||||
def _transcribe_with_whisper(self, file_path: Path) -> Dict:
|
||||
try:
|
||||
import whisper
|
||||
|
||||
model = whisper.load_model(self.whisper_model)
|
||||
result = model.transcribe(str(file_path))
|
||||
|
||||
return {
|
||||
'text': result['text'].strip(),
|
||||
'quality': 'good',
|
||||
'method': f'whisper-{self.whisper_model}',
|
||||
'language': result.get('language', 'unknown'),
|
||||
'segments': len(result.get('segments', [])),
|
||||
'metadata': {
|
||||
'duration': result.get('duration'),
|
||||
'language': result.get('language')
|
||||
}
|
||||
}
|
||||
except ImportError:
|
||||
logger.warning('Whisper not installed')
|
||||
return {'error': 'Whisper not installed', 'text': '', 'needs': 'pip install openai-whisper'}
|
||||
except Exception as e:
|
||||
return {'error': str(e), 'text': ''}
|
||||
|
||||
def extract_metadata(self, file_path: Path) -> Dict:
|
||||
try:
|
||||
import mutagen
|
||||
audio = mutagen.File(str(file_path))
|
||||
if audio is None:
|
||||
return {'error': 'Could not read audio file'}
|
||||
|
||||
return {
|
||||
'duration': audio.info.length if hasattr(audio.info, 'length') else None,
|
||||
'bitrate': audio.info.bitrate if hasattr(audio.info, 'bitrate') else None,
|
||||
'sample_rate': audio.info.sample_rate if hasattr(audio.info, 'sample_rate') else None,
|
||||
'channels': audio.info.channels if hasattr(audio.info, 'channels') else None
|
||||
}
|
||||
except ImportError:
|
||||
return {'error': 'mutagen not installed', 'needs': 'pip install mutagen'}
|
||||
except Exception as e:
|
||||
return {'error': str(e)}
|
||||
60
app/parsers/document_parser.py
Normal file
60
app/parsers/document_parser.py
Normal file
@@ -0,0 +1,60 @@
|
||||
from pathlib import Path
|
||||
from typing import Dict
|
||||
import logging
|
||||
import subprocess
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class DocumentParser:
|
||||
def __init__(self):
|
||||
self.supported_formats = {'.doc', '.docx', '.odt', '.rtf'}
|
||||
|
||||
def parse(self, file_path: Path) -> Dict:
|
||||
if file_path.suffix.lower() not in self.supported_formats:
|
||||
return {'error': f'Unsupported format: {file_path.suffix}'}
|
||||
|
||||
try:
|
||||
if file_path.suffix.lower() in {'.docx', '.odt'}:
|
||||
return self._parse_with_python(file_path)
|
||||
else:
|
||||
return self._parse_with_external(file_path)
|
||||
except Exception as e:
|
||||
logger.error(f'Document parse failed for {file_path}: {e}')
|
||||
return {'error': str(e), 'text': ''}
|
||||
|
||||
def _parse_with_python(self, file_path: Path) -> Dict:
|
||||
try:
|
||||
if file_path.suffix.lower() == '.docx':
|
||||
import docx
|
||||
doc = docx.Document(str(file_path))
|
||||
text = '\n'.join([para.text for para in doc.paragraphs if para.text.strip()])
|
||||
return {'text': text, 'quality': 'good', 'method': 'python-docx'}
|
||||
elif file_path.suffix.lower() == '.odt':
|
||||
from odf import text as odf_text, teletype
|
||||
from odf.opendocument import load
|
||||
doc = load(str(file_path))
|
||||
paragraphs = doc.getElementsByType(odf_text.P)
|
||||
text = '\n'.join([teletype.extractText(p) for p in paragraphs if teletype.extractText(p).strip()])
|
||||
return {'text': text, 'quality': 'good', 'method': 'odfpy'}
|
||||
except ImportError as ie:
|
||||
logger.warning(f'Missing library for {file_path.suffix}: {ie}')
|
||||
return {'error': f'Missing library: {ie}', 'text': '', 'needs': 'python-docx or odfpy'}
|
||||
except Exception as e:
|
||||
return {'error': str(e), 'text': ''}
|
||||
|
||||
def _parse_with_external(self, file_path: Path) -> Dict:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
['antiword', str(file_path)],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30
|
||||
)
|
||||
if result.returncode == 0:
|
||||
return {'text': result.stdout, 'quality': 'good', 'method': 'antiword'}
|
||||
else:
|
||||
return {'error': 'antiword failed', 'text': '', 'needs': 'antiword tool'}
|
||||
except FileNotFoundError:
|
||||
return {'error': 'antiword not installed', 'text': '', 'needs': 'sudo apt install antiword'}
|
||||
except Exception as e:
|
||||
return {'error': str(e), 'text': ''}
|
||||
63
app/parsers/image_parser.py
Normal file
63
app/parsers/image_parser.py
Normal file
@@ -0,0 +1,63 @@
|
||||
from pathlib import Path
|
||||
from typing import Dict
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class ImageParser:
|
||||
def __init__(self):
|
||||
self.supported_formats = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'}
|
||||
|
||||
def parse(self, file_path: Path) -> Dict:
|
||||
if file_path.suffix.lower() not in self.supported_formats:
|
||||
return {'error': f'Unsupported format: {file_path.suffix}'}
|
||||
|
||||
try:
|
||||
return self._parse_with_ocr(file_path)
|
||||
except Exception as e:
|
||||
logger.error(f'Image parse failed for {file_path}: {e}')
|
||||
return {'error': str(e), 'text': ''}
|
||||
|
||||
def _parse_with_ocr(self, file_path: Path) -> Dict:
|
||||
try:
|
||||
from PIL import Image
|
||||
import pytesseract
|
||||
|
||||
img = Image.open(str(file_path))
|
||||
text = pytesseract.image_to_string(img)
|
||||
|
||||
data = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT)
|
||||
conf_scores = [int(c) for c in data['conf'] if c != '-1']
|
||||
avg_confidence = sum(conf_scores) / len(conf_scores) if conf_scores else 0
|
||||
|
||||
quality = 'good' if avg_confidence > 80 else 'medium' if avg_confidence > 60 else 'low'
|
||||
|
||||
return {
|
||||
'text': text.strip(),
|
||||
'quality': quality,
|
||||
'confidence': avg_confidence,
|
||||
'method': 'tesseract',
|
||||
'metadata': {
|
||||
'width': img.width,
|
||||
'height': img.height,
|
||||
'format': img.format
|
||||
}
|
||||
}
|
||||
except ImportError as ie:
|
||||
logger.warning(f'Missing library for OCR: {ie}')
|
||||
return {'error': f'Missing library: {ie}', 'text': '', 'needs': 'pytesseract and tesseract-ocr'}
|
||||
except Exception as e:
|
||||
return {'error': str(e), 'text': ''}
|
||||
|
||||
def extract_metadata(self, file_path: Path) -> Dict:
|
||||
try:
|
||||
from PIL import Image
|
||||
img = Image.open(str(file_path))
|
||||
return {
|
||||
'width': img.width,
|
||||
'height': img.height,
|
||||
'format': img.format,
|
||||
'mode': img.mode
|
||||
}
|
||||
except Exception as e:
|
||||
return {'error': str(e)}
|
||||
65
app/parsers/transcription_parser.py
Normal file
65
app/parsers/transcription_parser.py
Normal file
@@ -0,0 +1,65 @@
|
||||
import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Dict, Optional
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class TranscriptionParser:
|
||||
def __init__(self, model: str = 'base'):
|
||||
self.model = model
|
||||
self.whisper_available = self._check_whisper()
|
||||
|
||||
def _check_whisper(self) -> bool:
|
||||
try:
|
||||
import whisper
|
||||
return True
|
||||
except ImportError:
|
||||
logger.warning('Whisper not installed. Install with: pip install openai-whisper')
|
||||
return False
|
||||
|
||||
def parse(self, file_path: Path) -> Dict:
|
||||
if not self.whisper_available:
|
||||
return {'success': False, 'error': 'Whisper not available', 'text': ''}
|
||||
|
||||
if not self._is_supported(file_path):
|
||||
return {'success': False, 'error': 'Unsupported file type', 'text': ''}
|
||||
|
||||
try:
|
||||
import whisper
|
||||
logger.info(f'Transcribing {file_path} with Whisper model={self.model}')
|
||||
|
||||
model = whisper.load_model(self.model)
|
||||
result = model.transcribe(str(file_path))
|
||||
|
||||
return {
|
||||
'success': True,
|
||||
'text': result['text'],
|
||||
'segments': result.get('segments', []),
|
||||
'language': result.get('language', 'unknown')
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f'Transcription failed for {file_path}: {e}')
|
||||
return {'success': False, 'error': str(e), 'text': ''}
|
||||
|
||||
def _is_supported(self, file_path: Path) -> bool:
|
||||
supported = {'.mp3', '.mp4', '.wav', '.m4a', '.flac', '.ogg', '.avi', '.mkv', '.webm'}
|
||||
return file_path.suffix.lower() in supported
|
||||
|
||||
def parse_with_timestamps(self, file_path: Path) -> Dict:
|
||||
result = self.parse(file_path)
|
||||
if not result['success']:
|
||||
return result
|
||||
|
||||
segments = result.get('segments', [])
|
||||
timestamped_text = []
|
||||
for seg in segments:
|
||||
start = seg.get('start', 0)
|
||||
end = seg.get('end', 0)
|
||||
text = seg.get('text', '').strip()
|
||||
timestamped_text.append(f'[{start:.2f}s - {end:.2f}s] {text}')
|
||||
|
||||
result['timestamped_text'] = '\n'.join(timestamped_text)
|
||||
return result
|
||||
@@ -38,4 +38,7 @@ black>=23.0.0
|
||||
mypy>=1.0.0
|
||||
flake8>=6.0.0
|
||||
|
||||
chardet
|
||||
chardet
|
||||
pillow
|
||||
requests
|
||||
openai-whisper
|
||||
76
sql/migrations/003_content_graph.sql
Normal file
76
sql/migrations/003_content_graph.sql
Normal file
@@ -0,0 +1,76 @@
|
||||
CREATE TABLE IF NOT EXISTS content_nodes (
|
||||
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
|
||||
node_type VARCHAR(50) NOT NULL,
|
||||
path TEXT NOT NULL,
|
||||
disk_label VARCHAR(50),
|
||||
parent_id UUID REFERENCES content_nodes(id) ON DELETE CASCADE,
|
||||
|
||||
checksum VARCHAR(64),
|
||||
size BIGINT,
|
||||
modified_time TIMESTAMP,
|
||||
|
||||
content_hash VARCHAR(64),
|
||||
extracted_at TIMESTAMP,
|
||||
extraction_method VARCHAR(100),
|
||||
|
||||
metadata JSONB,
|
||||
|
||||
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
||||
|
||||
UNIQUE(node_type, path, disk_label)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_content_nodes_type ON content_nodes(node_type);
|
||||
CREATE INDEX IF NOT EXISTS idx_content_nodes_path ON content_nodes(path);
|
||||
CREATE INDEX IF NOT EXISTS idx_content_nodes_parent ON content_nodes(parent_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_content_nodes_checksum ON content_nodes(checksum);
|
||||
CREATE INDEX IF NOT EXISTS idx_content_nodes_content_hash ON content_nodes(content_hash);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS content_edges (
|
||||
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
|
||||
source_id UUID NOT NULL REFERENCES content_nodes(id) ON DELETE CASCADE,
|
||||
target_id UUID NOT NULL REFERENCES content_nodes(id) ON DELETE CASCADE,
|
||||
edge_type VARCHAR(50) NOT NULL,
|
||||
|
||||
metadata JSONB,
|
||||
confidence FLOAT,
|
||||
|
||||
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
|
||||
|
||||
UNIQUE(source_id, target_id, edge_type)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_content_edges_source ON content_edges(source_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_content_edges_target ON content_edges(target_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_content_edges_type ON content_edges(edge_type);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS extraction_log (
|
||||
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
|
||||
node_id UUID REFERENCES content_nodes(id) ON DELETE CASCADE,
|
||||
file_path TEXT NOT NULL,
|
||||
file_checksum VARCHAR(64),
|
||||
|
||||
extraction_method VARCHAR(100),
|
||||
status VARCHAR(50),
|
||||
error_message TEXT,
|
||||
|
||||
extracted_size BIGINT,
|
||||
processing_time_ms INT,
|
||||
|
||||
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_extraction_log_node ON extraction_log(node_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_extraction_log_file ON extraction_log(file_path);
|
||||
CREATE INDEX IF NOT EXISTS idx_extraction_log_checksum ON extraction_log(file_checksum);
|
||||
CREATE INDEX IF NOT EXISTS idx_extraction_log_status ON extraction_log(status);
|
||||
CREATE INDEX IF NOT EXISTS idx_extraction_log_created ON extraction_log(created_at DESC);
|
||||
|
||||
COMMENT ON TABLE content_nodes IS 'Content graph nodes: directories, files, chunks';
|
||||
COMMENT ON TABLE content_edges IS 'Content graph edges: contains, derived_from, references, duplicates';
|
||||
COMMENT ON TABLE extraction_log IS 'Tracks extraction history for incremental updates';
|
||||
|
||||
COMMENT ON COLUMN content_nodes.node_type IS 'directory, file, chunk, embedding';
|
||||
COMMENT ON COLUMN content_nodes.content_hash IS 'Hash of extracted content (not file bytes)';
|
||||
COMMENT ON COLUMN content_edges.edge_type IS 'contains, derived_from, references, duplicates, similar_to';
|
||||
Reference in New Issue
Block a user