import subprocess, plistlib, sys, os, time, json sys.path.append(os.path.abspath(os.path.dirname(os.path.realpath(__file__)))) import run if sys.version_info < (3,0): # Force use of StringIO instead of cStringIO as the latter # has issues with Unicode strings from StringIO import StringIO class Disk: def __init__(self): self.r = run.Run() self.diskutil = self.get_diskutil() self.os_version = ".".join( self.r.run({"args":["sw_vers", "-productVersion"]})[0].split(".")[:2] ) self.full_os_version = self.r.run({"args":["sw_vers", "-productVersion"]})[0] if len(self.full_os_version.split(".")) < 3: # Add .0 in case of 10.14 self.full_os_version += ".0" self.sudo_mount_version = "10.13.6" self.sudo_mount_types = ["efi"] self.apfs = {} self._update_disks() def _get_str(self, val): # Helper method to return a string value based on input type if (sys.version_info < (3,0) and isinstance(val, unicode)) or (sys.version_info >= (3,0) and isinstance(val, bytes)): return val.encode("utf-8") return str(val) def _get_plist(self, s): p = {} try: if sys.version_info >= (3, 0): p = plistlib.loads(s.encode("utf-8")) else: # p = plistlib.readPlistFromString(s) # We avoid using readPlistFromString() as that uses # cStringIO and fails when Unicode strings are detected # Don't subclass - keep the parser local from xml.parsers.expat import ParserCreate # Create a new PlistParser object - then we need to set up # the values and parse. pa = plistlib.PlistParser() # We also monkey patch this to encode unicode as utf-8 def end_string(): d = pa.getData() if isinstance(d,unicode): d = d.encode("utf-8") pa.addObject(d) pa.end_string = end_string parser = ParserCreate() parser.StartElementHandler = pa.handleBeginElement parser.EndElementHandler = pa.handleEndElement parser.CharacterDataHandler = pa.handleData if isinstance(s, unicode): # Encode unicode -> string; use utf-8 for safety s = s.encode("utf-8") # Parse the string parser.Parse(s, 1) p = pa.root except Exception as e: print(e) pass return p def _compare_versions(self, vers1, vers2, pad = -1): # Helper method to compare ##.## strings # # vers1 < vers2 = True # vers1 = vers2 = None # vers1 > vers2 = False # # Must be separated with a period # Sanitize the pads pad = -1 if not type(pad) is int else pad # Cast as strings vers1 = str(vers1) vers2 = str(vers2) # Split to lists v1_parts = vers1.split(".") v2_parts = vers2.split(".") # Equalize lengths if len(v1_parts) < len(v2_parts): v1_parts.extend([str(pad) for x in range(len(v2_parts) - len(v1_parts))]) elif len(v2_parts) < len(v1_parts): v2_parts.extend([str(pad) for x in range(len(v1_parts) - len(v2_parts))]) # Iterate and compare for i in range(len(v1_parts)): # Remove non-numeric v1 = ''.join(c for c in v1_parts[i] if c.isdigit()) v2 = ''.join(c for c in v2_parts[i] if c.isdigit()) # If empty - make it a pad var v1 = pad if not len(v1) else v1 v2 = pad if not len(v2) else v2 # Compare if int(v1) < int(v2): return True elif int(v1) > int(v2): return False # Never differed - return None, must be equal return None def update(self): self._update_disks() def _update_disks(self): self.disks = self.get_disks() self.disk_text = self.get_disk_text() if self._compare_versions("10.12", self.os_version): self.apfs = self.get_apfs() else: self.apfs = {} def get_diskutil(self): # Returns the path to the diskutil binary return self.r.run({"args":["which", "diskutil"]})[0].split("\n")[0].split("\r")[0] def get_disks(self): # Returns a dictionary object of connected disks disk_list = self.r.run({"args":[self.diskutil, "list", "-plist"]})[0] return self._get_plist(disk_list) def get_disk_text(self): # Returns plain text listing connected disks return self.r.run({"args":[self.diskutil, "list"]})[0] def get_disk_info(self, disk): disk_id = self.get_identifier(disk) if not disk_id: return None disk_list = self.r.run({"args":[self.diskutil, "info", "-plist", disk_id]})[0] return self._get_plist(disk_list) def get_disk_fs(self, disk): disk_id = self.get_identifier(disk) if not disk_id: return None return self.get_disk_info(disk_id).get("FilesystemName", None) def get_disk_fs_type(self, disk): disk_id = self.get_identifier(disk) if not disk_id: return None return self.get_disk_info(disk_id).get("FilesystemType", None) def get_apfs(self): # Returns a dictionary object of apfs disks output = self.r.run({"args":"echo y | " + self.diskutil + " apfs list -plist", "shell" : True}) if not output[2] == 0: # Error getting apfs info - return an empty dict return {} disk_list = output[0] p_list = disk_list.split(" 1: # We had text before the start - get only the plist info disk_list = "