aioesphomeapi/aioesphomeapi/util.py

72 lines
2.0 KiB
Python
Raw Normal View History

2019-04-07 19:03:26 +02:00
import asyncio
import functools
2019-04-07 19:03:26 +02:00
import socket
from typing import Optional, Tuple, Any
import zeroconf
2019-04-07 19:03:26 +02:00
2020-07-14 20:00:12 +02:00
# pylint: disable=cyclic-import
2019-04-07 19:03:26 +02:00
from aioesphomeapi.core import APIConnectionError
def _varuint_to_bytes(value: int) -> bytes:
if value <= 0x7F:
return bytes([value])
ret = bytes()
while value:
temp = value & 0x7F
value >>= 7
if value:
ret += bytes([temp | 0x80])
else:
ret += bytes([temp])
return ret
def _bytes_to_varuint(value: bytes) -> Optional[int]:
result = 0
bitpos = 0
for val in value:
result |= (val & 0x7F) << bitpos
bitpos += 7
if (val & 0x80) == 0:
return result
return None
async def resolve_ip_address_getaddrinfo(eventloop: asyncio.events.AbstractEventLoop,
host: str, port: int) -> Tuple[Any, ...]:
try:
res = await eventloop.getaddrinfo(host, port, family=socket.AF_INET,
proto=socket.IPPROTO_TCP)
except OSError as err:
raise APIConnectionError("Error resolving IP address: {}".format(err))
if not res:
raise APIConnectionError("Error resolving IP address: No matches!")
_, _, _, _, sockaddr = res[0]
return sockaddr
async def resolve_ip_address(eventloop: asyncio.events.AbstractEventLoop,
host: str, port: int,
zeroconf_instance: zeroconf.Zeroconf = None) -> Tuple[Any, ...]:
if host.endswith('.local'):
from aioesphomeapi.host_resolver import resolve_host
2019-04-07 19:03:26 +02:00
try:
return await eventloop.run_in_executor(
None,
functools.partial(
resolve_host,
host,
zeroconf_instance=zeroconf_instance
)
), port
except APIConnectionError:
pass
return await resolve_ip_address_getaddrinfo(eventloop, host, port)