remove_doc
This commit is contained in:
@@ -1,28 +1,12 @@
|
||||
"""File system scanner implementing IFileScanner protocol"""
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Iterator, Optional, Callable
|
||||
from datetime import datetime
|
||||
|
||||
from ._protocols import FileMeta
|
||||
|
||||
|
||||
class FileScanner:
|
||||
"""File system scanner with filtering and error handling"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
follow_symlinks: bool = False,
|
||||
skip_hidden: bool = True,
|
||||
error_handler: Optional[Callable[[Exception, Path], None]] = None
|
||||
):
|
||||
"""Initialize file scanner
|
||||
|
||||
Args:
|
||||
follow_symlinks: Whether to follow symbolic links
|
||||
skip_hidden: Whether to skip hidden files/directories
|
||||
error_handler: Optional callback for handling errors during scan
|
||||
"""
|
||||
def __init__(self, follow_symlinks: bool=False, skip_hidden: bool=True, error_handler: Optional[Callable[[Exception, Path], None]]=None):
|
||||
self.follow_symlinks = follow_symlinks
|
||||
self.skip_hidden = skip_hidden
|
||||
self.error_handler = error_handler
|
||||
@@ -31,24 +15,14 @@ class FileScanner:
|
||||
self._errors = 0
|
||||
|
||||
def scan(self, root: Path) -> Iterator[FileMeta]:
|
||||
"""Scan a directory tree and yield file metadata
|
||||
|
||||
Args:
|
||||
root: Root directory to scan
|
||||
|
||||
Yields:
|
||||
FileMeta objects for each discovered file
|
||||
"""
|
||||
if not root.exists():
|
||||
error = FileNotFoundError(f"Path does not exist: {root}")
|
||||
error = FileNotFoundError(f'Path does not exist: {root}')
|
||||
if self.error_handler:
|
||||
self.error_handler(error, root)
|
||||
else:
|
||||
raise error
|
||||
return
|
||||
|
||||
if not root.is_dir():
|
||||
# If root is a file, just return its metadata
|
||||
try:
|
||||
yield self._get_file_meta(root)
|
||||
except Exception as e:
|
||||
@@ -58,115 +32,59 @@ class FileScanner:
|
||||
else:
|
||||
raise
|
||||
return
|
||||
|
||||
# Walk directory tree
|
||||
for dirpath, dirnames, filenames in os.walk(root, followlinks=self.follow_symlinks):
|
||||
current_dir = Path(dirpath)
|
||||
|
||||
# Filter directories if needed
|
||||
if self.skip_hidden:
|
||||
dirnames[:] = [d for d in dirnames if not d.startswith('.')]
|
||||
|
||||
# Process files
|
||||
for filename in filenames:
|
||||
if self.skip_hidden and filename.startswith('.'):
|
||||
continue
|
||||
|
||||
file_path = current_dir / filename
|
||||
|
||||
try:
|
||||
# Skip broken symlinks
|
||||
if file_path.is_symlink() and not file_path.exists():
|
||||
if file_path.is_symlink() and (not file_path.exists()):
|
||||
continue
|
||||
|
||||
meta = self._get_file_meta(file_path)
|
||||
self._files_scanned += 1
|
||||
self._bytes_scanned += meta.size
|
||||
|
||||
yield meta
|
||||
|
||||
except PermissionError as e:
|
||||
self._errors += 1
|
||||
if self.error_handler:
|
||||
self.error_handler(e, file_path)
|
||||
# Continue scanning
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
self._errors += 1
|
||||
if self.error_handler:
|
||||
self.error_handler(e, file_path)
|
||||
# Continue scanning
|
||||
continue
|
||||
|
||||
def _get_file_meta(self, path: Path) -> FileMeta:
|
||||
"""Get file metadata
|
||||
|
||||
Args:
|
||||
path: Path to file
|
||||
|
||||
Returns:
|
||||
FileMeta object with file metadata
|
||||
|
||||
Raises:
|
||||
OSError: If file cannot be accessed
|
||||
"""
|
||||
stat = path.stat()
|
||||
|
||||
# Get creation time (platform dependent)
|
||||
created_time = stat.st_ctime
|
||||
if hasattr(stat, 'st_birthtime'):
|
||||
created_time = stat.st_birthtime
|
||||
|
||||
return FileMeta(
|
||||
path=path,
|
||||
size=stat.st_size,
|
||||
modified_time=stat.st_mtime,
|
||||
created_time=created_time
|
||||
)
|
||||
return FileMeta(path=path, size=stat.st_size, modified_time=stat.st_mtime, created_time=created_time)
|
||||
|
||||
@property
|
||||
def files_scanned(self) -> int:
|
||||
"""Get count of files scanned"""
|
||||
return self._files_scanned
|
||||
|
||||
@property
|
||||
def bytes_scanned(self) -> int:
|
||||
"""Get total bytes scanned"""
|
||||
return self._bytes_scanned
|
||||
|
||||
@property
|
||||
def errors(self) -> int:
|
||||
"""Get count of errors encountered"""
|
||||
return self._errors
|
||||
|
||||
def reset_stats(self) -> None:
|
||||
"""Reset scanning statistics"""
|
||||
self._files_scanned = 0
|
||||
self._bytes_scanned = 0
|
||||
self._errors = 0
|
||||
|
||||
|
||||
class FilteredScanner(FileScanner):
|
||||
"""Scanner with additional filtering capabilities"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
min_size: Optional[int] = None,
|
||||
max_size: Optional[int] = None,
|
||||
extensions: Optional[list[str]] = None,
|
||||
exclude_patterns: Optional[list[str]] = None,
|
||||
**kwargs
|
||||
):
|
||||
"""Initialize filtered scanner
|
||||
|
||||
Args:
|
||||
min_size: Minimum file size in bytes
|
||||
max_size: Maximum file size in bytes
|
||||
extensions: List of file extensions to include (e.g., ['.txt', '.py'])
|
||||
exclude_patterns: List of path patterns to exclude
|
||||
**kwargs: Additional arguments passed to FileScanner
|
||||
"""
|
||||
def __init__(self, min_size: Optional[int]=None, max_size: Optional[int]=None, extensions: Optional[list[str]]=None, exclude_patterns: Optional[list[str]]=None, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.min_size = min_size
|
||||
self.max_size = max_size
|
||||
@@ -174,41 +92,19 @@ class FilteredScanner(FileScanner):
|
||||
self.exclude_patterns = exclude_patterns or []
|
||||
|
||||
def scan(self, root: Path) -> Iterator[FileMeta]:
|
||||
"""Scan with additional filtering
|
||||
|
||||
Args:
|
||||
root: Root directory to scan
|
||||
|
||||
Yields:
|
||||
FileMeta objects for files matching filter criteria
|
||||
"""
|
||||
for meta in super().scan(root):
|
||||
# Size filtering
|
||||
if self.min_size is not None and meta.size < self.min_size:
|
||||
continue
|
||||
if self.max_size is not None and meta.size > self.max_size:
|
||||
continue
|
||||
|
||||
# Extension filtering
|
||||
if self.extensions is not None:
|
||||
if meta.path.suffix.lower() not in self.extensions:
|
||||
continue
|
||||
|
||||
# Exclude pattern filtering
|
||||
if self._should_exclude(meta.path):
|
||||
continue
|
||||
|
||||
yield meta
|
||||
|
||||
def _should_exclude(self, path: Path) -> bool:
|
||||
"""Check if path matches any exclude pattern
|
||||
|
||||
Args:
|
||||
path: Path to check
|
||||
|
||||
Returns:
|
||||
True if path should be excluded
|
||||
"""
|
||||
path_str = str(path)
|
||||
for pattern in self.exclude_patterns:
|
||||
if pattern in path_str:
|
||||
|
||||
Reference in New Issue
Block a user