Better symlink support under Windows (#487)

* Better symlink support under Windows

* Conditional loading of ctypes wintypes module

* Shortening comment line for pylint

* Adding plint bypass for Python 3
This commit is contained in:
Guillermo Ruffino 2019-03-23 18:58:25 -03:00 committed by Otto Winter
parent 6ccab2bef9
commit ced28ad006
No known key found for this signature in database
GPG Key ID: DB66C0BE6013F97E
3 changed files with 171 additions and 19 deletions

View File

@ -140,16 +140,3 @@ def get_bool_env(var, default=False):
def is_hassio(): def is_hassio():
return get_bool_env('ESPHOME_IS_HASSIO') return get_bool_env('ESPHOME_IS_HASSIO')
def symlink(src, dst):
if hasattr(os, 'symlink'):
os.symlink(src, dst)
else:
import ctypes
csl = ctypes.windll.kernel32.CreateSymbolicLinkW
csl.argtypes = (ctypes.c_wchar_p, ctypes.c_wchar_p, ctypes.c_uint32)
csl.restype = ctypes.c_ubyte
flags = 1 if os.path.isdir(src) else 0
if csl(dst, src, flags) == 0:
raise ctypes.WinError()

164
esphome/symlink_ops.py Normal file
View File

@ -0,0 +1,164 @@
import os
if hasattr(os, 'symlink'):
def symlink(src, dst):
return os.symlink(src, dst)
def islink(path):
return os.path.islink(path)
def readlink(path):
return os.readlink(path)
def unlink(path):
return os.unlink(path)
else:
import ctypes
from ctypes import wintypes
# Code taken from
# https://stackoverflow.com/questions/27972776/having-trouble-implementing-a-readlink-function
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
FILE_READ_ATTRIBUTES = 0x0080
OPEN_EXISTING = 3
FILE_FLAG_OPEN_REPARSE_POINT = 0x00200000
FILE_FLAG_BACKUP_SEMANTICS = 0x02000000
FILE_ATTRIBUTE_REPARSE_POINT = 0x0400
IO_REPARSE_TAG_MOUNT_POINT = 0xA0000003
IO_REPARSE_TAG_SYMLINK = 0xA000000C
FSCTL_GET_REPARSE_POINT = 0x000900A8
MAXIMUM_REPARSE_DATA_BUFFER_SIZE = 0x4000
LPDWORD = ctypes.POINTER(wintypes.DWORD)
LPWIN32_FIND_DATA = ctypes.POINTER(wintypes.WIN32_FIND_DATAW)
INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
def IsReparseTagNameSurrogate(tag):
return bool(tag & 0x20000000)
def _check_invalid_handle(result, func, args):
if result == INVALID_HANDLE_VALUE:
raise ctypes.WinError(ctypes.get_last_error())
return args
def _check_bool(result, func, args):
if not result:
raise ctypes.WinError(ctypes.get_last_error())
return args
kernel32.FindFirstFileW.errcheck = _check_invalid_handle
kernel32.FindFirstFileW.restype = wintypes.HANDLE
kernel32.FindFirstFileW.argtypes = (
wintypes.LPCWSTR, # _In_ lpFileName
LPWIN32_FIND_DATA) # _Out_ lpFindFileData
kernel32.FindClose.argtypes = (
wintypes.HANDLE,) # _Inout_ hFindFile
kernel32.CreateFileW.errcheck = _check_invalid_handle
kernel32.CreateFileW.restype = wintypes.HANDLE
kernel32.CreateFileW.argtypes = (
wintypes.LPCWSTR, # _In_ lpFileName
wintypes.DWORD, # _In_ dwDesiredAccess
wintypes.DWORD, # _In_ dwShareMode
wintypes.LPVOID, # _In_opt_ lpSecurityAttributes
wintypes.DWORD, # _In_ dwCreationDisposition
wintypes.DWORD, # _In_ dwFlagsAndAttributes
wintypes.HANDLE) # _In_opt_ hTemplateFile
kernel32.CloseHandle.argtypes = (
wintypes.HANDLE,) # _In_ hObject
kernel32.DeviceIoControl.errcheck = _check_bool
kernel32.DeviceIoControl.argtypes = (
wintypes.HANDLE, # _In_ hDevice
wintypes.DWORD, # _In_ dwIoControlCode
wintypes.LPVOID, # _In_opt_ lpInBuffer
wintypes.DWORD, # _In_ nInBufferSize
wintypes.LPVOID, # _Out_opt_ lpOutBuffer
wintypes.DWORD, # _In_ nOutBufferSize
LPDWORD, # _Out_opt_ lpBytesReturned
wintypes.LPVOID) # _Inout_opt_ lpOverlapped
class REPARSE_DATA_BUFFER(ctypes.Structure):
class ReparseData(ctypes.Union):
class LinkData(ctypes.Structure):
_fields_ = (('SubstituteNameOffset', wintypes.USHORT),
('SubstituteNameLength', wintypes.USHORT),
('PrintNameOffset', wintypes.USHORT),
('PrintNameLength', wintypes.USHORT))
@property
def PrintName(self):
dt = wintypes.WCHAR * (self.PrintNameLength // ctypes.sizeof(wintypes.WCHAR))
name = dt.from_address(ctypes.addressof(self.PathBuffer) +
self.PrintNameOffset).value
if name.startswith(r'\??'):
name = r'\\?' + name[3:] # NT => Windows
return name
class SymbolicLinkData(LinkData):
_fields_ = (('Flags', wintypes.ULONG), ('PathBuffer', wintypes.BYTE * 0))
class MountPointData(LinkData):
_fields_ = (('PathBuffer', wintypes.BYTE * 0),)
class GenericData(ctypes.Structure):
_fields_ = (('DataBuffer', wintypes.BYTE * 0),)
_fields_ = (('SymbolicLinkReparseBuffer', SymbolicLinkData),
('MountPointReparseBuffer', MountPointData),
('GenericReparseBuffer', GenericData))
_fields_ = (('ReparseTag', wintypes.ULONG),
('ReparseDataLength', wintypes.USHORT),
('Reserved', wintypes.USHORT),
('ReparseData', ReparseData))
_anonymous_ = ('ReparseData',)
def symlink(src, dst):
csl = ctypes.windll.kernel32.CreateSymbolicLinkW
csl.argtypes = (ctypes.c_wchar_p, ctypes.c_wchar_p, ctypes.c_uint32)
csl.restype = ctypes.c_ubyte
flags = 1 if os.path.isdir(src) else 0
if csl(dst, src, flags) == 0:
error = ctypes.WinError()
# pylint: disable=no-member
if error.winerror == 1314 and error.errno == 22:
from esphome.core import EsphomeError
raise EsphomeError("Cannot create symlink from '%s' to '%s'. Try running tool \
with elevated privileges" % (src, dst))
raise error
def islink(path):
if not os.path.isdir(path):
return False
data = wintypes.WIN32_FIND_DATAW()
kernel32.FindClose(kernel32.FindFirstFileW(path, ctypes.byref(data)))
if not data.dwFileAttributes & FILE_ATTRIBUTE_REPARSE_POINT:
return False
return IsReparseTagNameSurrogate(data.dwReserved0)
def readlink(path):
n = wintypes.DWORD()
buf = (wintypes.BYTE * MAXIMUM_REPARSE_DATA_BUFFER_SIZE)()
flags = FILE_FLAG_OPEN_REPARSE_POINT | FILE_FLAG_BACKUP_SEMANTICS
handle = kernel32.CreateFileW(path, FILE_READ_ATTRIBUTES, 0, None,
OPEN_EXISTING, flags, None)
try:
kernel32.DeviceIoControl(handle, FSCTL_GET_REPARSE_POINT, None, 0,
buf, ctypes.sizeof(buf), ctypes.byref(n), None)
finally:
kernel32.CloseHandle(handle)
rb = REPARSE_DATA_BUFFER.from_buffer(buf)
tag = rb.ReparseTag
if tag == IO_REPARSE_TAG_SYMLINK:
return rb.SymbolicLinkReparseBuffer.PrintName
if tag == IO_REPARSE_TAG_MOUNT_POINT:
return rb.MountPointReparseBuffer.PrintName
if not IsReparseTagNameSurrogate(tag):
raise ValueError("not a link")
raise ValueError("unsupported reparse tag: %d" % tag)
def unlink(path):
return os.rmdir(path)

View File

@ -13,7 +13,8 @@ from esphome.const import ARDUINO_VERSION_ESP32_1_0_0, ARDUINO_VERSION_ESP8266_2
CONF_LOCAL, CONF_PLATFORMIO_OPTIONS, CONF_REPOSITORY, CONF_TAG, CONF_USE_CUSTOM_CODE CONF_LOCAL, CONF_PLATFORMIO_OPTIONS, CONF_REPOSITORY, CONF_TAG, CONF_USE_CUSTOM_CODE
from esphome.core import CORE, EsphomeError from esphome.core import CORE, EsphomeError
from esphome.core_config import GITHUB_ARCHIVE_ZIP, LIBRARY_URI_REPO, VERSION_REGEX from esphome.core_config import GITHUB_ARCHIVE_ZIP, LIBRARY_URI_REPO, VERSION_REGEX
from esphome.helpers import mkdir_p, run_system_command, symlink from esphome.helpers import mkdir_p, run_system_command
from esphome.symlink_ops import symlink, islink, readlink, unlink
from esphome.pins import ESP8266_FLASH_SIZES, ESP8266_LD_SCRIPTS from esphome.pins import ESP8266_FLASH_SIZES, ESP8266_LD_SCRIPTS
from esphome.py_compat import IS_PY3, string_types from esphome.py_compat import IS_PY3, string_types
from esphome.storage_json import StorageJSON, storage_path from esphome.storage_json import StorageJSON, storage_path
@ -220,10 +221,10 @@ def symlink_esphome_core_version(esphome_core_version):
if CORE.is_local_esphome_core_copy: if CORE.is_local_esphome_core_copy:
src_path = CORE.relative_path(esphome_core_version[CONF_LOCAL]) src_path = CORE.relative_path(esphome_core_version[CONF_LOCAL])
do_write = True do_write = True
if os.path.islink(dst_path): if islink(dst_path):
old_path = os.path.join(os.readlink(dst_path), lib_path) old_path = os.path.join(readlink(dst_path), lib_path)
if old_path != lib_path: if old_path != lib_path:
os.unlink(dst_path) unlink(dst_path)
else: else:
do_write = False do_write = False
if do_write: if do_write:
@ -231,8 +232,8 @@ def symlink_esphome_core_version(esphome_core_version):
symlink(src_path, dst_path) symlink(src_path, dst_path)
else: else:
# Remove symlink when changing back from local version # Remove symlink when changing back from local version
if os.path.islink(dst_path): if islink(dst_path):
os.unlink(dst_path) unlink(dst_path)
def format_ini(data): def format_ini(data):