diff --git a/aioesphomeapi/client.py b/aioesphomeapi/client.py index c3b2fb5..47c55de 100644 --- a/aioesphomeapi/client.py +++ b/aioesphomeapi/client.py @@ -1,3 +1,5 @@ +import asyncio +import async_timeout import logging from typing import ( Any, @@ -16,6 +18,14 @@ from google.protobuf import message from .api_pb2 import ( # type: ignore BinarySensorStateResponse, + BluetoothDeviceConnectionResponse, + BluetoothDeviceRequest, + BluetoothGATTGetServicesRequest, + BluetoothGATTGetServicesResponse, + BluetoothGATTReadRequest, + BluetoothGATTReadResponse, + BluetoothGATTWriteRequest, + BluetoothGATTWriteResponse, BluetoothLEAdvertisementResponse, ButtonCommandRequest, CameraImageRequest, @@ -75,12 +85,17 @@ from .api_pb2 import ( # type: ignore TextSensorStateResponse, ) from .connection import APIConnection, ConnectionParams -from .core import APIConnectionError +from .core import APIConnectionError, TimeoutAPIError from .host_resolver import ZeroconfInstanceType from .model import ( APIVersion, BinarySensorInfo, BinarySensorState, + BluetoothDeviceConnection, + BluetoothDeviceRequestType, + BluetoothGATTRead, + BluetoothGATTServices, + BluetoothGATTWrite, BluetoothLEAdvertisement, ButtonInfo, CameraInfo, @@ -396,6 +411,98 @@ class APIClient: SubscribeBluetoothLEAdvertisementsRequest(), on_msg ) + async def bluetooth_device_connect( + self, + address: int, + on_bluetooth_connection_state: Callable[[bool], None], + timeout: float = 10.0, + ) -> None: + self._check_authenticated() + + fut = asyncio.get_event_loop().create_future() + + def on_msg(msg: message.Message) -> None: + if isinstance(msg, BluetoothDeviceConnectionResponse): + resp = BluetoothDeviceConnection.from_pb(msg) + if address == resp.address: + on_bluetooth_connection_state(resp.connected) + + assert self._connection is not None + await self._connection.send_message_callback_response( + BluetoothDeviceRequest( + address=address, + request_type=BluetoothDeviceRequestType.CONNECT, + ), + on_msg, + ) + + try: + async with async_timeout.timeout(timeout): + await fut + except asyncio.TimeoutError as err: + raise TimeoutAPIError(f"Timeout waiting for connect response") from err + + async def bluetooth_device_disconnect(self, address: int) -> None: + self._check_authenticated() + + assert self._connection is not None + await self._connection.send_message( + BluetoothDeviceRequest( + address=address, + request_type=BluetoothDeviceRequestType.DISCONNECT, + ) + ) + + async def bluetooth_gatt_get_services(self, address: int) -> BluetoothGATTServices: + self._check_authenticated() + + assert self._connection is not None + resp = await self._connection.send_message_await_response( + BluetoothGATTGetServicesRequest(address=address), + BluetoothGATTGetServicesResponse, + ) + return BluetoothGATTServices.from_pb(resp) + + async def bluetooth_gatt_read( + self, address: int, service: str, characteristic: str + ) -> bytearray: + self._check_authenticated() + + req = BluetoothGATTReadRequest() + req.address = address + req.service = service + req.characteristic = characteristic + req.is_descriptor = False + + assert self._connection is not None + resp = await self._connection.send_message_await_response( + req, BluetoothGATTReadResponse + ) + + read_response = BluetoothGATTRead.from_pb(resp) + if ( + read_response.address == address + and (service == "" or read_response.service == service) + and read_response.characteristic == characteristic + ): + return bytearray(read_response.data) + return None + + async def bluetooth_gatt_write( + self, address: int, service: str, characteristic: str, data: bytes + ) -> None: + self._check_authenticated() + + req = BluetoothGATTWriteRequest() + req.address = address + req.service = service + req.characteristic = characteristic + req.is_descriptor = False + req.data = data + + assert self._connection is not None + await self._connection.send_message(req) + async def subscribe_home_assistant_states( self, on_state_sub: Callable[[str, Optional[str]], None] ) -> None: diff --git a/aioesphomeapi/model.py b/aioesphomeapi/model.py index 67fa11d..f30cd36 100644 --- a/aioesphomeapi/model.py +++ b/aioesphomeapi/model.py @@ -799,6 +799,99 @@ class BluetoothLEAdvertisement(APIModelBase): ) +@dataclass(frozen=True) +class BluetoothDeviceConnection(APIModelBase): + address: int = 0 + connected: bool = False + + +@dataclass(frozen=True) +class BluetoothGATTRead(APIModelBase): + address: int = 0 + is_descriptor: bool = False + service_uuid: str = "" + characteristic_uuid: str = "" + + data: bytes = converter_field(default_factory=bytes, converter=bytes) + + +@dataclass(frozen=True) +class BluetoothGATTWrite(APIModelBase): + pass + + +@dataclass(frozen=True) +class BluetoothGATTDescriptor(APIModelBase): + uuid: str = "" + handle: int = 0 + description: str = "" + + @classmethod + def convert_list(cls, value: List[Any]) -> List["BluetoothGATTDescriptor"]: + ret = [] + for x in value: + if isinstance(x, dict): + ret.append(cls.from_dict(x)) + else: + ret.append(cls.from_pb(x)) + return ret + + +@dataclass(frozen=True) +class BluetoothGATTCharacteristic(APIModelBase): + uuid: str = "" + handle: int = 0 + properties: int = 0 + + descriptors: List[BluetoothGATTDescriptor] = converter_field( + default_factory=list, converter=BluetoothGATTDescriptor.convert_list + ) + + @classmethod + def convert_list(cls, value: List[Any]) -> List["BluetoothGATTCharacteristic"]: + ret = [] + for x in value: + if isinstance(x, dict): + ret.append(cls.from_dict(x)) + else: + ret.append(cls.from_pb(x)) + return ret + + +@dataclass(frozen=True) +class BluetoothGATTService(APIModelBase): + uuid: str = "" + handle: int = 0 + characteristics: List[BluetoothGATTCharacteristic] = converter_field( + default_factory=list, converter=BluetoothGATTCharacteristic.convert_list + ) + + @classmethod + def convert_list(cls, value: List[Any]) -> List["BluetoothGATTService"]: + ret = [] + for x in value: + if isinstance(x, dict): + ret.append(cls.from_dict(x)) + else: + ret.append(cls.from_pb(x)) + return ret + + +@dataclass(frozen=True) +class BluetoothGATTServices(APIModelBase): + address: int = 0 + services: List[BluetoothGATTService] = converter_field( + default_factory=list, converter=BluetoothGATTService.convert_list + ) + + +class BluetoothDeviceRequestType(APIIntEnum): + CONNECT = 0 + DISCONNECT = 1 + PAIR = 2 + UNPAIR = 3 + + class LogLevel(APIIntEnum): LOG_LEVEL_NONE = 0 LOG_LEVEL_ERROR = 1