diff --git a/esphome/symlink_ops.py b/esphome/symlink_ops.py deleted file mode 100644 index accfbe5ff7..0000000000 --- a/esphome/symlink_ops.py +++ /dev/null @@ -1,164 +0,0 @@ -import os - -if hasattr(os, 'symlink'): - def symlink(src, dst): - return os.symlink(src, dst) - - def islink(path): - return os.path.islink(path) - - def readlink(path): - return os.readlink(path) - - def unlink(path): - return os.unlink(path) -else: - import ctypes - from ctypes import wintypes - # Code taken from - # https://stackoverflow.com/questions/27972776/having-trouble-implementing-a-readlink-function - - kernel32 = ctypes.WinDLL('kernel32', use_last_error=True) - - FILE_READ_ATTRIBUTES = 0x0080 - OPEN_EXISTING = 3 - FILE_FLAG_OPEN_REPARSE_POINT = 0x00200000 - FILE_FLAG_BACKUP_SEMANTICS = 0x02000000 - FILE_ATTRIBUTE_REPARSE_POINT = 0x0400 - - IO_REPARSE_TAG_MOUNT_POINT = 0xA0000003 - IO_REPARSE_TAG_SYMLINK = 0xA000000C - FSCTL_GET_REPARSE_POINT = 0x000900A8 - MAXIMUM_REPARSE_DATA_BUFFER_SIZE = 0x4000 - - LPDWORD = ctypes.POINTER(wintypes.DWORD) - LPWIN32_FIND_DATA = ctypes.POINTER(wintypes.WIN32_FIND_DATAW) - INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value - - def IsReparseTagNameSurrogate(tag): - return bool(tag & 0x20000000) - - def _check_invalid_handle(result, func, args): - if result == INVALID_HANDLE_VALUE: - raise ctypes.WinError(ctypes.get_last_error()) - return args - - def _check_bool(result, func, args): - if not result: - raise ctypes.WinError(ctypes.get_last_error()) - return args - - kernel32.FindFirstFileW.errcheck = _check_invalid_handle - kernel32.FindFirstFileW.restype = wintypes.HANDLE - kernel32.FindFirstFileW.argtypes = ( - wintypes.LPCWSTR, # _In_ lpFileName - LPWIN32_FIND_DATA) # _Out_ lpFindFileData - - kernel32.FindClose.argtypes = ( - wintypes.HANDLE,) # _Inout_ hFindFile - - kernel32.CreateFileW.errcheck = _check_invalid_handle - kernel32.CreateFileW.restype = wintypes.HANDLE - kernel32.CreateFileW.argtypes = ( - wintypes.LPCWSTR, # _In_ lpFileName - wintypes.DWORD, # _In_ dwDesiredAccess - wintypes.DWORD, # _In_ dwShareMode - wintypes.LPVOID, # _In_opt_ lpSecurityAttributes - wintypes.DWORD, # _In_ dwCreationDisposition - wintypes.DWORD, # _In_ dwFlagsAndAttributes - wintypes.HANDLE) # _In_opt_ hTemplateFile - - kernel32.CloseHandle.argtypes = ( - wintypes.HANDLE,) # _In_ hObject - - kernel32.DeviceIoControl.errcheck = _check_bool - kernel32.DeviceIoControl.argtypes = ( - wintypes.HANDLE, # _In_ hDevice - wintypes.DWORD, # _In_ dwIoControlCode - wintypes.LPVOID, # _In_opt_ lpInBuffer - wintypes.DWORD, # _In_ nInBufferSize - wintypes.LPVOID, # _Out_opt_ lpOutBuffer - wintypes.DWORD, # _In_ nOutBufferSize - LPDWORD, # _Out_opt_ lpBytesReturned - wintypes.LPVOID) # _Inout_opt_ lpOverlapped - - class REPARSE_DATA_BUFFER(ctypes.Structure): - class ReparseData(ctypes.Union): - class LinkData(ctypes.Structure): - _fields_ = (('SubstituteNameOffset', wintypes.USHORT), - ('SubstituteNameLength', wintypes.USHORT), - ('PrintNameOffset', wintypes.USHORT), - ('PrintNameLength', wintypes.USHORT)) - - @property - def PrintName(self): - dt = wintypes.WCHAR * (self.PrintNameLength // ctypes.sizeof(wintypes.WCHAR)) - name = dt.from_address(ctypes.addressof(self.PathBuffer) + - self.PrintNameOffset).value - if name.startswith(r'\??'): - name = r'\\?' + name[3:] # NT => Windows - return name - - class SymbolicLinkData(LinkData): - _fields_ = (('Flags', wintypes.ULONG), ('PathBuffer', wintypes.BYTE * 0)) - - class MountPointData(LinkData): - _fields_ = (('PathBuffer', wintypes.BYTE * 0),) - - class GenericData(ctypes.Structure): - _fields_ = (('DataBuffer', wintypes.BYTE * 0),) - _fields_ = (('SymbolicLinkReparseBuffer', SymbolicLinkData), - ('MountPointReparseBuffer', MountPointData), - ('GenericReparseBuffer', GenericData)) - _fields_ = (('ReparseTag', wintypes.ULONG), - ('ReparseDataLength', wintypes.USHORT), - ('Reserved', wintypes.USHORT), - ('ReparseData', ReparseData)) - _anonymous_ = ('ReparseData',) - - def symlink(src, dst): - csl = ctypes.windll.kernel32.CreateSymbolicLinkW - csl.argtypes = (ctypes.c_wchar_p, ctypes.c_wchar_p, ctypes.c_uint32) - csl.restype = ctypes.c_ubyte - flags = 1 if os.path.isdir(src) else 0 - if csl(dst, src, flags) == 0: - error = ctypes.WinError() - # pylint: disable=no-member - if error.winerror == 1314 and error.errno == 22: - from esphome.core import EsphomeError - raise EsphomeError("Cannot create symlink from '%s' to '%s'. Try running tool \ -with elevated privileges" % (src, dst)) - raise error - - def islink(path): - if not os.path.isdir(path): - return False - data = wintypes.WIN32_FIND_DATAW() - kernel32.FindClose(kernel32.FindFirstFileW(path, ctypes.byref(data))) - if not data.dwFileAttributes & FILE_ATTRIBUTE_REPARSE_POINT: - return False - return IsReparseTagNameSurrogate(data.dwReserved0) - - def readlink(path): - n = wintypes.DWORD() - buf = (wintypes.BYTE * MAXIMUM_REPARSE_DATA_BUFFER_SIZE)() - flags = FILE_FLAG_OPEN_REPARSE_POINT | FILE_FLAG_BACKUP_SEMANTICS - handle = kernel32.CreateFileW(path, FILE_READ_ATTRIBUTES, 0, None, - OPEN_EXISTING, flags, None) - try: - kernel32.DeviceIoControl(handle, FSCTL_GET_REPARSE_POINT, None, 0, - buf, ctypes.sizeof(buf), ctypes.byref(n), None) - finally: - kernel32.CloseHandle(handle) - rb = REPARSE_DATA_BUFFER.from_buffer(buf) - tag = rb.ReparseTag - if tag == IO_REPARSE_TAG_SYMLINK: - return rb.SymbolicLinkReparseBuffer.PrintName - if tag == IO_REPARSE_TAG_MOUNT_POINT: - return rb.MountPointReparseBuffer.PrintName - if not IsReparseTagNameSurrogate(tag): - raise ValueError("not a link") - raise ValueError("unsupported reparse tag: %d" % tag) - - def unlink(path): - return os.rmdir(path)