This commit is contained in:
mike
2025-12-12 19:25:16 +01:00
parent 5e0db89d45
commit 56b2db82fc
34 changed files with 117 additions and 6556 deletions

216
app/discovery/scanner.py Normal file
View File

@@ -0,0 +1,216 @@
"""File system scanner implementing IFileScanner protocol"""
import os
from pathlib import Path
from typing import Iterator, Optional, Callable
from datetime import datetime
from ._protocols import FileMeta
class FileScanner:
"""File system scanner with filtering and error handling"""
def __init__(
self,
follow_symlinks: bool = False,
skip_hidden: bool = True,
error_handler: Optional[Callable[[Exception, Path], None]] = None
):
"""Initialize file scanner
Args:
follow_symlinks: Whether to follow symbolic links
skip_hidden: Whether to skip hidden files/directories
error_handler: Optional callback for handling errors during scan
"""
self.follow_symlinks = follow_symlinks
self.skip_hidden = skip_hidden
self.error_handler = error_handler
self._files_scanned = 0
self._bytes_scanned = 0
self._errors = 0
def scan(self, root: Path) -> Iterator[FileMeta]:
"""Scan a directory tree and yield file metadata
Args:
root: Root directory to scan
Yields:
FileMeta objects for each discovered file
"""
if not root.exists():
error = FileNotFoundError(f"Path does not exist: {root}")
if self.error_handler:
self.error_handler(error, root)
else:
raise error
return
if not root.is_dir():
# If root is a file, just return its metadata
try:
yield self._get_file_meta(root)
except Exception as e:
self._errors += 1
if self.error_handler:
self.error_handler(e, root)
else:
raise
return
# Walk directory tree
for dirpath, dirnames, filenames in os.walk(root, followlinks=self.follow_symlinks):
current_dir = Path(dirpath)
# Filter directories if needed
if self.skip_hidden:
dirnames[:] = [d for d in dirnames if not d.startswith('.')]
# Process files
for filename in filenames:
if self.skip_hidden and filename.startswith('.'):
continue
file_path = current_dir / filename
try:
# Skip broken symlinks
if file_path.is_symlink() and not file_path.exists():
continue
meta = self._get_file_meta(file_path)
self._files_scanned += 1
self._bytes_scanned += meta.size
yield meta
except PermissionError as e:
self._errors += 1
if self.error_handler:
self.error_handler(e, file_path)
# Continue scanning
continue
except Exception as e:
self._errors += 1
if self.error_handler:
self.error_handler(e, file_path)
# Continue scanning
continue
def _get_file_meta(self, path: Path) -> FileMeta:
"""Get file metadata
Args:
path: Path to file
Returns:
FileMeta object with file metadata
Raises:
OSError: If file cannot be accessed
"""
stat = path.stat()
# Get creation time (platform dependent)
created_time = stat.st_ctime
if hasattr(stat, 'st_birthtime'):
created_time = stat.st_birthtime
return FileMeta(
path=path,
size=stat.st_size,
modified_time=stat.st_mtime,
created_time=created_time
)
@property
def files_scanned(self) -> int:
"""Get count of files scanned"""
return self._files_scanned
@property
def bytes_scanned(self) -> int:
"""Get total bytes scanned"""
return self._bytes_scanned
@property
def errors(self) -> int:
"""Get count of errors encountered"""
return self._errors
def reset_stats(self) -> None:
"""Reset scanning statistics"""
self._files_scanned = 0
self._bytes_scanned = 0
self._errors = 0
class FilteredScanner(FileScanner):
"""Scanner with additional filtering capabilities"""
def __init__(
self,
min_size: Optional[int] = None,
max_size: Optional[int] = None,
extensions: Optional[list[str]] = None,
exclude_patterns: Optional[list[str]] = None,
**kwargs
):
"""Initialize filtered scanner
Args:
min_size: Minimum file size in bytes
max_size: Maximum file size in bytes
extensions: List of file extensions to include (e.g., ['.txt', '.py'])
exclude_patterns: List of path patterns to exclude
**kwargs: Additional arguments passed to FileScanner
"""
super().__init__(**kwargs)
self.min_size = min_size
self.max_size = max_size
self.extensions = {ext.lower() for ext in extensions} if extensions else None
self.exclude_patterns = exclude_patterns or []
def scan(self, root: Path) -> Iterator[FileMeta]:
"""Scan with additional filtering
Args:
root: Root directory to scan
Yields:
FileMeta objects for files matching filter criteria
"""
for meta in super().scan(root):
# Size filtering
if self.min_size is not None and meta.size < self.min_size:
continue
if self.max_size is not None and meta.size > self.max_size:
continue
# Extension filtering
if self.extensions is not None:
if meta.path.suffix.lower() not in self.extensions:
continue
# Exclude pattern filtering
if self._should_exclude(meta.path):
continue
yield meta
def _should_exclude(self, path: Path) -> bool:
"""Check if path matches any exclude pattern
Args:
path: Path to check
Returns:
True if path should be excluded
"""
path_str = str(path)
for pattern in self.exclude_patterns:
if pattern in path_str:
return True
return False