aioesphomeapi/aioesphomeapi/host_resolver.py

226 lines
6.2 KiB
Python

from __future__ import annotations
import asyncio
import logging
import socket
from dataclasses import dataclass
from ipaddress import IPv4Address, IPv6Address, ip_address
from typing import TYPE_CHECKING, cast
from zeroconf import IPVersion
from zeroconf.asyncio import AsyncServiceInfo
from .core import APIConnectionError, ResolveAPIError
from .util import address_is_local, host_is_name_part
from .zeroconf import ZeroconfManager
_LOGGER = logging.getLogger(__name__)
SERVICE_TYPE = "_esphomelib._tcp.local."
@dataclass(frozen=True)
class Sockaddr:
"""Base socket address."""
address: str
port: int
@dataclass(frozen=True)
class IPv4Sockaddr(Sockaddr):
"""IPv4 socket address."""
@dataclass(frozen=True)
class IPv6Sockaddr(Sockaddr):
"""IPv6 socket address."""
flowinfo: int
scope_id: int
@dataclass(frozen=True)
class AddrInfo:
family: int
type: int
proto: int
sockaddr: IPv4Sockaddr | IPv6Sockaddr
async def _async_zeroconf_get_service_info(
zeroconf_manager: ZeroconfManager,
service_type: str,
service_name: str,
server: str,
timeout: float,
) -> AsyncServiceInfo:
# Use or create zeroconf instance, ensure it's an AsyncZeroconf
had_instance = zeroconf_manager.has_instance
try:
zc = zeroconf_manager.get_async_zeroconf().zeroconf
except Exception as exc:
raise ResolveAPIError(
f"Cannot start mDNS sockets: {exc}, is this a docker container without "
"host network mode?"
) from exc
try:
info = AsyncServiceInfo(service_type, service_name, server=server)
await info.async_request(zc, int(timeout * 1000))
except Exception as exc:
raise ResolveAPIError(
f"Error resolving mDNS {service_name} via mDNS: {exc}"
) from exc
finally:
if not had_instance:
await zeroconf_manager.async_close()
return info
def _scope_id_to_int(value: str | None) -> int:
"""Convert a scope id to int if possible."""
if value is None:
return 0
try:
return int(value)
except ValueError:
return 0
async def _async_resolve_host_zeroconf(
host: str,
port: int,
*,
timeout: float = 3.0,
zeroconf_manager: ZeroconfManager | None = None,
) -> list[AddrInfo]:
service_name = f"{host}.{SERVICE_TYPE}"
server = f"{host}.local."
_LOGGER.debug("Resolving host %s via mDNS", service_name)
info = await _async_zeroconf_get_service_info(
zeroconf_manager or ZeroconfManager(),
SERVICE_TYPE,
service_name,
server,
timeout,
)
addrs: list[AddrInfo] = []
for ip in info.ip_addresses_by_version(IPVersion.V6Only):
addrs.extend(_async_ip_address_to_addrs(ip, port)) # type: ignore
for ip in info.ip_addresses_by_version(IPVersion.V4Only):
addrs.extend(_async_ip_address_to_addrs(ip, port)) # type: ignore
return addrs
async def _async_resolve_host_getaddrinfo(host: str, port: int) -> list[AddrInfo]:
try:
# Limit to TCP IP protocol and SOCK_STREAM
res = await asyncio.get_event_loop().getaddrinfo(
host, port, type=socket.SOCK_STREAM, proto=socket.IPPROTO_TCP
)
except OSError as err:
raise APIConnectionError(f"Error resolving IP address: {err}")
addrs: list[AddrInfo] = []
for family, type_, proto, _, raw in res:
sockaddr: IPv4Sockaddr | IPv6Sockaddr
if family == socket.AF_INET:
raw = cast(tuple[str, int], raw)
address, port = raw
sockaddr = IPv4Sockaddr(address=address, port=port)
elif family == socket.AF_INET6:
raw = cast(tuple[str, int, int, int], raw)
address, port, flowinfo, scope_id = raw
sockaddr = IPv6Sockaddr(
address=address, port=port, flowinfo=flowinfo, scope_id=scope_id
)
else:
# Unknown family
continue
addrs.append(
AddrInfo(family=family, type=type_, proto=proto, sockaddr=sockaddr)
)
return addrs
def _async_ip_address_to_addrs(
ip: IPv4Address | IPv6Address, port: int
) -> list[AddrInfo]:
"""Convert an ipaddress to AddrInfo."""
addrs: list[AddrInfo] = []
is_ipv6 = ip.version == 6
sockaddr: IPv6Sockaddr | IPv4Sockaddr
if is_ipv6:
if TYPE_CHECKING:
assert isinstance(ip, IPv6Address)
sockaddr = IPv6Sockaddr(
address=str(ip).partition("%")[0],
port=port,
flowinfo=0,
scope_id=_scope_id_to_int(ip.scope_id),
)
else:
sockaddr = IPv4Sockaddr(
address=str(ip),
port=port,
)
addrs.append(
AddrInfo(
family=socket.AF_INET6 if is_ipv6 else socket.AF_INET,
type=socket.SOCK_STREAM,
proto=socket.IPPROTO_TCP,
sockaddr=sockaddr,
)
)
return addrs
async def async_resolve_host(
hosts: list[str],
port: int,
zeroconf_manager: ZeroconfManager | None = None,
) -> list[AddrInfo]:
addrs: list[AddrInfo] = []
zc_error: Exception | None = None
for host in hosts:
host_addrs: list[AddrInfo] = []
host_is_local_name = host_is_name_part(host) or address_is_local(host)
if host_is_local_name:
name = host.partition(".")[0]
try:
host_addrs.extend(
await _async_resolve_host_zeroconf(
name, port, zeroconf_manager=zeroconf_manager
)
)
except ResolveAPIError as err:
zc_error = err
if not host_is_local_name:
try:
host_addrs.extend(_async_ip_address_to_addrs(ip_address(host), port))
except ValueError:
# Not an IP address
pass
if not host_addrs:
host_addrs.extend(await _async_resolve_host_getaddrinfo(host, port))
addrs.extend(host_addrs)
if not addrs:
if zc_error:
# Only show ZC error if getaddrinfo also didn't work
raise zc_error
raise ResolveAPIError(
f"Could not resolve host {hosts} - got no results from OS"
)
return addrs