import hashlib from pathlib import Path from typing import Iterator, Optional class RabinChunker: def __init__(self, avg_chunk_size: int=8192, min_chunk_size: Optional[int]=None, max_chunk_size: Optional[int]=None, window_size: int=48): self.avg_chunk_size = avg_chunk_size self.min_chunk_size = min_chunk_size or avg_chunk_size // 4 self.max_chunk_size = max_chunk_size or avg_chunk_size * 8 self.window_size = window_size bits = 0 size = avg_chunk_size while size > 1: bits += 1 size >>= 1 self.mask = (1 << bits) - 1 self.poly = 17349423945073011 def chunk_file(self, file_path: Path, chunk_size: Optional[int]=None) -> Iterator[bytes]: if chunk_size: yield from self._chunk_fixed(file_path, chunk_size) else: yield from self._chunk_rabin(file_path) def _chunk_fixed(self, file_path: Path, chunk_size: int) -> Iterator[bytes]: with open(file_path, 'rb') as f: while True: chunk = f.read(chunk_size) if not chunk: break yield chunk def _chunk_rabin(self, file_path: Path) -> Iterator[bytes]: with open(file_path, 'rb') as f: chunk_data = bytearray() window = bytearray() hash_value = 0 while True: byte = f.read(1) if not byte: if chunk_data: yield bytes(chunk_data) break chunk_data.extend(byte) window.extend(byte) if len(window) > self.window_size: window.pop(0) hash_value = self._rolling_hash(window) should_break = len(chunk_data) >= self.min_chunk_size and (hash_value & self.mask == 0 or len(chunk_data) >= self.max_chunk_size) if should_break: yield bytes(chunk_data) chunk_data = bytearray() window = bytearray() hash_value = 0 def _rolling_hash(self, window: bytearray) -> int: hash_value = 0 for byte in window: hash_value = (hash_value << 1) + byte & 18446744073709551615 return hash_value class SimpleChunker: def __init__(self, chunk_size: int=8192): self.chunk_size = chunk_size def chunk_file(self, file_path: Path) -> Iterator[bytes]: with open(file_path, 'rb') as f: while True: chunk = f.read(self.chunk_size) if not chunk: break yield chunk def hash_chunk(chunk: bytes, algorithm: str='sha256') -> str: hasher = hashlib.new(algorithm) hasher.update(chunk) return hasher.hexdigest() def hash_file(file_path: Path, algorithm: str='sha256', chunk_size: int=65536) -> str: hasher = hashlib.new(algorithm) with open(file_path, 'rb') as f: while True: chunk = f.read(chunk_size) if not chunk: break hasher.update(chunk) return hasher.hexdigest() def compute_file_signature(file_path: Path, use_rabin: bool=True, avg_chunk_size: int=8192) -> tuple[str, list[str]]: if use_rabin: chunker = RabinChunker(avg_chunk_size=avg_chunk_size) else: chunker = SimpleChunker(chunk_size=avg_chunk_size) chunk_hashes = [] file_hasher = hashlib.sha256() for chunk in chunker.chunk_file(file_path): chunk_hash = hash_chunk(chunk) chunk_hashes.append(chunk_hash) file_hasher.update(chunk) file_hash = file_hasher.hexdigest() return (file_hash, chunk_hashes)