Add GATT functions to client

This commit is contained in:
Jesse Hills 2022-09-27 13:10:19 +13:00
parent c65011094e
commit a671b723d0
No known key found for this signature in database
GPG Key ID: BEAAE804EFD8E83A
2 changed files with 201 additions and 1 deletions

View File

@ -1,3 +1,5 @@
import asyncio
import async_timeout
import logging
from typing import (
Any,
@ -16,6 +18,14 @@ from google.protobuf import message
from .api_pb2 import ( # type: ignore
BinarySensorStateResponse,
BluetoothDeviceConnectionResponse,
BluetoothDeviceRequest,
BluetoothGATTGetServicesRequest,
BluetoothGATTGetServicesResponse,
BluetoothGATTReadRequest,
BluetoothGATTReadResponse,
BluetoothGATTWriteRequest,
BluetoothGATTWriteResponse,
BluetoothLEAdvertisementResponse,
ButtonCommandRequest,
CameraImageRequest,
@ -75,12 +85,17 @@ from .api_pb2 import ( # type: ignore
TextSensorStateResponse,
)
from .connection import APIConnection, ConnectionParams
from .core import APIConnectionError
from .core import APIConnectionError, TimeoutAPIError
from .host_resolver import ZeroconfInstanceType
from .model import (
APIVersion,
BinarySensorInfo,
BinarySensorState,
BluetoothDeviceConnection,
BluetoothDeviceRequestType,
BluetoothGATTRead,
BluetoothGATTServices,
BluetoothGATTWrite,
BluetoothLEAdvertisement,
ButtonInfo,
CameraInfo,
@ -396,6 +411,98 @@ class APIClient:
SubscribeBluetoothLEAdvertisementsRequest(), on_msg
)
async def bluetooth_device_connect(
self,
address: int,
on_bluetooth_connection_state: Callable[[bool], None],
timeout: float = 10.0,
) -> None:
self._check_authenticated()
fut = asyncio.get_event_loop().create_future()
def on_msg(msg: message.Message) -> None:
if isinstance(msg, BluetoothDeviceConnectionResponse):
resp = BluetoothDeviceConnection.from_pb(msg)
if address == resp.address:
on_bluetooth_connection_state(resp.connected)
assert self._connection is not None
await self._connection.send_message_callback_response(
BluetoothDeviceRequest(
address=address,
request_type=BluetoothDeviceRequestType.CONNECT,
),
on_msg,
)
try:
async with async_timeout.timeout(timeout):
await fut
except asyncio.TimeoutError as err:
raise TimeoutAPIError(f"Timeout waiting for connect response") from err
async def bluetooth_device_disconnect(self, address: int) -> None:
self._check_authenticated()
assert self._connection is not None
await self._connection.send_message(
BluetoothDeviceRequest(
address=address,
request_type=BluetoothDeviceRequestType.DISCONNECT,
)
)
async def bluetooth_gatt_get_services(self, address: int) -> BluetoothGATTServices:
self._check_authenticated()
assert self._connection is not None
resp = await self._connection.send_message_await_response(
BluetoothGATTGetServicesRequest(address=address),
BluetoothGATTGetServicesResponse,
)
return BluetoothGATTServices.from_pb(resp)
async def bluetooth_gatt_read(
self, address: int, service: str, characteristic: str
) -> bytearray:
self._check_authenticated()
req = BluetoothGATTReadRequest()
req.address = address
req.service = service
req.characteristic = characteristic
req.is_descriptor = False
assert self._connection is not None
resp = await self._connection.send_message_await_response(
req, BluetoothGATTReadResponse
)
read_response = BluetoothGATTRead.from_pb(resp)
if (
read_response.address == address
and (service == "" or read_response.service == service)
and read_response.characteristic == characteristic
):
return bytearray(read_response.data)
return None
async def bluetooth_gatt_write(
self, address: int, service: str, characteristic: str, data: bytes
) -> None:
self._check_authenticated()
req = BluetoothGATTWriteRequest()
req.address = address
req.service = service
req.characteristic = characteristic
req.is_descriptor = False
req.data = data
assert self._connection is not None
await self._connection.send_message(req)
async def subscribe_home_assistant_states(
self, on_state_sub: Callable[[str, Optional[str]], None]
) -> None:

View File

@ -799,6 +799,99 @@ class BluetoothLEAdvertisement(APIModelBase):
)
@dataclass(frozen=True)
class BluetoothDeviceConnection(APIModelBase):
address: int = 0
connected: bool = False
@dataclass(frozen=True)
class BluetoothGATTRead(APIModelBase):
address: int = 0
is_descriptor: bool = False
service_uuid: str = ""
characteristic_uuid: str = ""
data: bytes = converter_field(default_factory=bytes, converter=bytes)
@dataclass(frozen=True)
class BluetoothGATTWrite(APIModelBase):
pass
@dataclass(frozen=True)
class BluetoothGATTDescriptor(APIModelBase):
uuid: str = ""
handle: int = 0
description: str = ""
@classmethod
def convert_list(cls, value: List[Any]) -> List["BluetoothGATTDescriptor"]:
ret = []
for x in value:
if isinstance(x, dict):
ret.append(cls.from_dict(x))
else:
ret.append(cls.from_pb(x))
return ret
@dataclass(frozen=True)
class BluetoothGATTCharacteristic(APIModelBase):
uuid: str = ""
handle: int = 0
properties: int = 0
descriptors: List[BluetoothGATTDescriptor] = converter_field(
default_factory=list, converter=BluetoothGATTDescriptor.convert_list
)
@classmethod
def convert_list(cls, value: List[Any]) -> List["BluetoothGATTCharacteristic"]:
ret = []
for x in value:
if isinstance(x, dict):
ret.append(cls.from_dict(x))
else:
ret.append(cls.from_pb(x))
return ret
@dataclass(frozen=True)
class BluetoothGATTService(APIModelBase):
uuid: str = ""
handle: int = 0
characteristics: List[BluetoothGATTCharacteristic] = converter_field(
default_factory=list, converter=BluetoothGATTCharacteristic.convert_list
)
@classmethod
def convert_list(cls, value: List[Any]) -> List["BluetoothGATTService"]:
ret = []
for x in value:
if isinstance(x, dict):
ret.append(cls.from_dict(x))
else:
ret.append(cls.from_pb(x))
return ret
@dataclass(frozen=True)
class BluetoothGATTServices(APIModelBase):
address: int = 0
services: List[BluetoothGATTService] = converter_field(
default_factory=list, converter=BluetoothGATTService.convert_list
)
class BluetoothDeviceRequestType(APIIntEnum):
CONNECT = 0
DISCONNECT = 1
PAIR = 2
UNPAIR = 3
class LogLevel(APIIntEnum):
LOG_LEVEL_NONE = 0
LOG_LEVEL_ERROR = 1