"""File system scanner implementing IFileScanner protocol""" import os from pathlib import Path from typing import Iterator, Optional, Callable from datetime import datetime from ._protocols import FileMeta class FileScanner: """File system scanner with filtering and error handling""" def __init__( self, follow_symlinks: bool = False, skip_hidden: bool = True, error_handler: Optional[Callable[[Exception, Path], None]] = None ): """Initialize file scanner Args: follow_symlinks: Whether to follow symbolic links skip_hidden: Whether to skip hidden files/directories error_handler: Optional callback for handling errors during scan """ self.follow_symlinks = follow_symlinks self.skip_hidden = skip_hidden self.error_handler = error_handler self._files_scanned = 0 self._bytes_scanned = 0 self._errors = 0 def scan(self, root: Path) -> Iterator[FileMeta]: """Scan a directory tree and yield file metadata Args: root: Root directory to scan Yields: FileMeta objects for each discovered file """ if not root.exists(): error = FileNotFoundError(f"Path does not exist: {root}") if self.error_handler: self.error_handler(error, root) else: raise error return if not root.is_dir(): # If root is a file, just return its metadata try: yield self._get_file_meta(root) except Exception as e: self._errors += 1 if self.error_handler: self.error_handler(e, root) else: raise return # Walk directory tree for dirpath, dirnames, filenames in os.walk(root, followlinks=self.follow_symlinks): current_dir = Path(dirpath) # Filter directories if needed if self.skip_hidden: dirnames[:] = [d for d in dirnames if not d.startswith('.')] # Process files for filename in filenames: if self.skip_hidden and filename.startswith('.'): continue file_path = current_dir / filename try: # Skip broken symlinks if file_path.is_symlink() and not file_path.exists(): continue meta = self._get_file_meta(file_path) self._files_scanned += 1 self._bytes_scanned += meta.size yield meta except PermissionError as e: self._errors += 1 if self.error_handler: self.error_handler(e, file_path) # Continue scanning continue except Exception as e: self._errors += 1 if self.error_handler: self.error_handler(e, file_path) # Continue scanning continue def _get_file_meta(self, path: Path) -> FileMeta: """Get file metadata Args: path: Path to file Returns: FileMeta object with file metadata Raises: OSError: If file cannot be accessed """ stat = path.stat() # Get creation time (platform dependent) created_time = stat.st_ctime if hasattr(stat, 'st_birthtime'): created_time = stat.st_birthtime return FileMeta( path=path, size=stat.st_size, modified_time=stat.st_mtime, created_time=created_time ) @property def files_scanned(self) -> int: """Get count of files scanned""" return self._files_scanned @property def bytes_scanned(self) -> int: """Get total bytes scanned""" return self._bytes_scanned @property def errors(self) -> int: """Get count of errors encountered""" return self._errors def reset_stats(self) -> None: """Reset scanning statistics""" self._files_scanned = 0 self._bytes_scanned = 0 self._errors = 0 class FilteredScanner(FileScanner): """Scanner with additional filtering capabilities""" def __init__( self, min_size: Optional[int] = None, max_size: Optional[int] = None, extensions: Optional[list[str]] = None, exclude_patterns: Optional[list[str]] = None, **kwargs ): """Initialize filtered scanner Args: min_size: Minimum file size in bytes max_size: Maximum file size in bytes extensions: List of file extensions to include (e.g., ['.txt', '.py']) exclude_patterns: List of path patterns to exclude **kwargs: Additional arguments passed to FileScanner """ super().__init__(**kwargs) self.min_size = min_size self.max_size = max_size self.extensions = {ext.lower() for ext in extensions} if extensions else None self.exclude_patterns = exclude_patterns or [] def scan(self, root: Path) -> Iterator[FileMeta]: """Scan with additional filtering Args: root: Root directory to scan Yields: FileMeta objects for files matching filter criteria """ for meta in super().scan(root): # Size filtering if self.min_size is not None and meta.size < self.min_size: continue if self.max_size is not None and meta.size > self.max_size: continue # Extension filtering if self.extensions is not None: if meta.path.suffix.lower() not in self.extensions: continue # Exclude pattern filtering if self._should_exclude(meta.path): continue yield meta def _should_exclude(self, path: Path) -> bool: """Check if path matches any exclude pattern Args: path: Path to check Returns: True if path should be excluded """ path_str = str(path) for pattern in self.exclude_patterns: if pattern in path_str: return True return False