113 lines
4.1 KiB
Python
113 lines
4.1 KiB
Python
import os
|
|
from pathlib import Path
|
|
from typing import Iterator, Optional, Callable
|
|
from datetime import datetime
|
|
from ._protocols import FileMeta
|
|
|
|
class FileScanner:
|
|
|
|
def __init__(self, follow_symlinks: bool=False, skip_hidden: bool=True, error_handler: Optional[Callable[[Exception, Path], None]]=None):
|
|
self.follow_symlinks = follow_symlinks
|
|
self.skip_hidden = skip_hidden
|
|
self.error_handler = error_handler
|
|
self._files_scanned = 0
|
|
self._bytes_scanned = 0
|
|
self._errors = 0
|
|
|
|
def scan(self, root: Path) -> Iterator[FileMeta]:
|
|
if not root.exists():
|
|
error = FileNotFoundError(f'Path does not exist: {root}')
|
|
if self.error_handler:
|
|
self.error_handler(error, root)
|
|
else:
|
|
raise error
|
|
return
|
|
if not root.is_dir():
|
|
try:
|
|
yield self._get_file_meta(root)
|
|
except Exception as e:
|
|
self._errors += 1
|
|
if self.error_handler:
|
|
self.error_handler(e, root)
|
|
else:
|
|
raise
|
|
return
|
|
for dirpath, dirnames, filenames in os.walk(root, followlinks=self.follow_symlinks):
|
|
current_dir = Path(dirpath)
|
|
if self.skip_hidden:
|
|
dirnames[:] = [d for d in dirnames if not d.startswith('.')]
|
|
for filename in filenames:
|
|
if self.skip_hidden and filename.startswith('.'):
|
|
continue
|
|
file_path = current_dir / filename
|
|
try:
|
|
if file_path.is_symlink() and (not file_path.exists()):
|
|
continue
|
|
meta = self._get_file_meta(file_path)
|
|
self._files_scanned += 1
|
|
self._bytes_scanned += meta.size
|
|
yield meta
|
|
except PermissionError as e:
|
|
self._errors += 1
|
|
if self.error_handler:
|
|
self.error_handler(e, file_path)
|
|
continue
|
|
except Exception as e:
|
|
self._errors += 1
|
|
if self.error_handler:
|
|
self.error_handler(e, file_path)
|
|
continue
|
|
|
|
def _get_file_meta(self, path: Path) -> FileMeta:
|
|
stat = path.stat()
|
|
created_time = stat.st_ctime
|
|
if hasattr(stat, 'st_birthtime'):
|
|
created_time = stat.st_birthtime
|
|
return FileMeta(path=path, size=stat.st_size, modified_time=stat.st_mtime, created_time=created_time)
|
|
|
|
@property
|
|
def files_scanned(self) -> int:
|
|
return self._files_scanned
|
|
|
|
@property
|
|
def bytes_scanned(self) -> int:
|
|
return self._bytes_scanned
|
|
|
|
@property
|
|
def errors(self) -> int:
|
|
return self._errors
|
|
|
|
def reset_stats(self) -> None:
|
|
self._files_scanned = 0
|
|
self._bytes_scanned = 0
|
|
self._errors = 0
|
|
|
|
class FilteredScanner(FileScanner):
|
|
|
|
def __init__(self, min_size: Optional[int]=None, max_size: Optional[int]=None, extensions: Optional[list[str]]=None, exclude_patterns: Optional[list[str]]=None, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.min_size = min_size
|
|
self.max_size = max_size
|
|
self.extensions = {ext.lower() for ext in extensions} if extensions else None
|
|
self.exclude_patterns = exclude_patterns or []
|
|
|
|
def scan(self, root: Path) -> Iterator[FileMeta]:
|
|
for meta in super().scan(root):
|
|
if self.min_size is not None and meta.size < self.min_size:
|
|
continue
|
|
if self.max_size is not None and meta.size > self.max_size:
|
|
continue
|
|
if self.extensions is not None:
|
|
if meta.path.suffix.lower() not in self.extensions:
|
|
continue
|
|
if self._should_exclude(meta.path):
|
|
continue
|
|
yield meta
|
|
|
|
def _should_exclude(self, path: Path) -> bool:
|
|
path_str = str(path)
|
|
for pattern in self.exclude_patterns:
|
|
if pattern in path_str:
|
|
return True
|
|
return False
|