"""System API for querying mounts and disks""" import os import subprocess from pathlib import Path from typing import Optional import psutil from ._protocols import MountInfo, DiskInfo class SystemAPI: """System information API for querying mounts and disks""" def query_mounts(self) -> list[MountInfo]: """Query mounted filesystems Returns: List of MountInfo objects for all mounted filesystems """ mounts = [] for partition in psutil.disk_partitions(all=False): mount_info = MountInfo( device=partition.device, mount_point=partition.mountpoint, fs_type=partition.fstype, options=partition.opts ) mounts.append(mount_info) return mounts def query_nvmes(self) -> list[DiskInfo]: """Query NVMe/disk information Returns: List of DiskInfo objects for all disks """ disks = [] # Try to get disk information using lsblk try: result = subprocess.run( ['lsblk', '-ndo', 'NAME,MODEL,SIZE,SERIAL', '-b'], capture_output=True, text=True, check=False ) if result.returncode == 0: for line in result.stdout.strip().split('\n'): if not line.strip(): continue parts = line.split(maxsplit=3) if len(parts) >= 3: device = f"/dev/{parts[0]}" model = parts[1] if len(parts) > 1 else "Unknown" size_str = parts[2] if len(parts) > 2 else "0" serial = parts[3] if len(parts) > 3 else "Unknown" try: size = int(size_str) except ValueError: size = 0 disk_info = DiskInfo( device=device, model=model, size=size, serial=serial ) disks.append(disk_info) except FileNotFoundError: # lsblk not available, fall back to basic info pass # If lsblk failed or unavailable, try alternative method if not disks: disks = self._query_disks_fallback() return disks def _query_disks_fallback(self) -> list[DiskInfo]: """Fallback method for querying disk information Returns: List of DiskInfo objects using psutil """ disks = [] seen_devices = set() for partition in psutil.disk_partitions(all=True): device = partition.device # Skip non-disk devices if not device.startswith('/dev/'): continue # Get base device (e.g., /dev/sda from /dev/sda1) base_device = self._get_base_device(device) if base_device in seen_devices: continue seen_devices.add(base_device) try: usage = psutil.disk_usage(partition.mountpoint) size = usage.total except (PermissionError, OSError): size = 0 disk_info = DiskInfo( device=base_device, model="Unknown", size=size, serial="Unknown" ) disks.append(disk_info) return disks def _get_base_device(self, device: str) -> str: """Extract base device name from partition device Args: device: Device path (e.g., /dev/sda1, /dev/nvme0n1p1) Returns: Base device path (e.g., /dev/sda, /dev/nvme0n1) """ # Handle NVMe devices if 'nvme' in device: # /dev/nvme0n1p1 -> /dev/nvme0n1 if 'p' in device: return device.rsplit('p', 1)[0] return device # Handle standard devices (sda, sdb, etc.) # /dev/sda1 -> /dev/sda import re match = re.match(r'(/dev/[a-z]+)', device) if match: return match.group(1) return device def get_disk_for_path(self, path: Path) -> Optional[str]: """Get the disk/mount point for a given path Args: path: Path to check Returns: Mount point device or None if not found """ path = path.resolve() # Find the mount point that contains this path best_match = None best_match_len = 0 for partition in psutil.disk_partitions(): mount_point = Path(partition.mountpoint) try: if path == mount_point or mount_point in path.parents: mount_len = len(str(mount_point)) if mount_len > best_match_len: best_match = partition.device best_match_len = mount_len except (ValueError, OSError): continue return best_match def get_disk_usage(self, path: Path) -> tuple[int, int, int]: """Get disk usage for a path Args: path: Path to check Returns: Tuple of (total, used, free) in bytes """ try: usage = psutil.disk_usage(str(path)) return usage.total, usage.used, usage.free except (PermissionError, OSError): return 0, 0, 0 def get_mount_point(self, path: Path) -> Optional[Path]: """Get the mount point for a given path Args: path: Path to check Returns: Mount point path or None if not found """ path = path.resolve() # Find the mount point that contains this path best_match = None best_match_len = 0 for partition in psutil.disk_partitions(): mount_point = Path(partition.mountpoint) try: if path == mount_point or mount_point in path.parents: mount_len = len(str(mount_point)) if mount_len > best_match_len: best_match = mount_point best_match_len = mount_len except (ValueError, OSError): continue return best_match def is_same_filesystem(self, path1: Path, path2: Path) -> bool: """Check if two paths are on the same filesystem Args: path1: First path path2: Second path Returns: True if paths are on the same filesystem """ try: stat1 = path1.stat() stat2 = path2.stat() return stat1.st_dev == stat2.st_dev except (OSError, PermissionError): return False