From 8a661bb67385222177e52cad070916f1a45ebddb Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 15 May 2023 12:59:33 -0500 Subject: [PATCH] Reduce overhead to process bluetooth advertisements (#435) --- aioesphomeapi/model.py | 91 ++++++++++++++++-------------------------- 1 file changed, 35 insertions(+), 56 deletions(-) diff --git a/aioesphomeapi/model.py b/aioesphomeapi/model.py index 878f26d..0b18607 100644 --- a/aioesphomeapi/model.py +++ b/aioesphomeapi/model.py @@ -6,7 +6,6 @@ from typing import ( TYPE_CHECKING, Any, Callable, - Collection, Dict, Iterable, List, @@ -30,7 +29,6 @@ else: if TYPE_CHECKING: from .api_pb2 import ( # type: ignore BluetoothLEAdvertisementResponse, - BluetoothServiceData, HomeassistantServiceMap, ) @@ -801,55 +799,6 @@ def _uuid_converter(uuid: str) -> str: _cached_uuid_converter = lru_cache(maxsize=128)(_uuid_converter) -# value is likely a google.protobuf.pyext._message.RepeatedScalarContainer -def _convert_bluetooth_le_service_uuids(value: Iterable[str]) -> List[str]: - if not value: - # empty list, don't convert - return [] - - return [_cached_uuid_converter(v) for v in value] - - -def _convert_bluetooth_le_service_data( - value: Union[Dict[str, bytes], Collection["BluetoothServiceData"]], -) -> Dict[str, bytes]: - # We literally mean is a `dict` not any other class - if type(value) is dict: # pylint: disable=unidiomatic-typecheck - return value - - if not value: - return {} - - # v.data if v.data else v.legacy_data is backwards compatible with ESPHome devices before 2022.10.0 - if value[0].data: # type: ignore - return { - _cached_uuid_converter(v.uuid): bytes(v.data) # type: ignore[union-attr] - for v in value - } - - return { - _cached_uuid_converter(v.uuid): bytes(v.legacy_data) # type: ignore[union-attr] - for v in value - } - - -def _convert_bluetooth_le_manufacturer_data( - value: Union[Dict[int, bytes], Collection["BluetoothServiceData"]], -) -> Dict[int, bytes]: - # We literally mean is a `dict` not any other class - if type(value) is dict: # pylint: disable=unidiomatic-typecheck - return value - - if not value: - return {} - - # v.data if v.data else v.legacy_data is backwards compatible with ESPHome devices before 2022.10.0 - if value[0].data: # type: ignore - return {int(v.uuid, 16): bytes(v.data) for v in value} # type: ignore - - return {int(v.uuid, 16): bytes(v.legacy_data) for v in value} # type: ignore - - @_dataclass_decorator class BluetoothLEAdvertisement: address: int @@ -864,16 +813,46 @@ class BluetoothLEAdvertisement: def from_pb( # type: ignore[misc] cls: "BluetoothLEAdvertisement", data: "BluetoothLEAdvertisementResponse" ) -> "BluetoothLEAdvertisement": + _uuid_convert = _cached_uuid_converter + + if raw_manufacturer_data := data.manufacturer_data: + if raw_manufacturer_data[0].data: + manufacturer_data = { + int(v.uuid, 16): v.data for v in raw_manufacturer_data + } + else: + # Legacy data + manufacturer_data = { + int(v.uuid, 16): bytes(v.legacy_data) for v in raw_manufacturer_data + } + else: + manufacturer_data = {} + + if raw_service_data := data.service_data: + if raw_service_data[0].data: + service_data = {_uuid_convert(v.uuid): v.data for v in raw_service_data} + else: + # Legacy data + service_data = { + _uuid_convert(v.uuid): bytes(v.legacy_data) + for v in raw_service_data + } + else: + service_data = {} + + if raw_service_uuids := data.service_uuids: + service_uuids = [_uuid_convert(v) for v in raw_service_uuids] + else: + service_uuids = [] + return cls( # type: ignore[operator, no-any-return] address=data.address, rssi=data.rssi, address_type=data.address_type, name=data.name.decode("utf-8", errors="replace"), - service_uuids=_convert_bluetooth_le_service_uuids(data.service_uuids), - service_data=_convert_bluetooth_le_service_data(data.service_data), - manufacturer_data=_convert_bluetooth_le_manufacturer_data( - data.manufacturer_data - ), + service_uuids=service_uuids, + service_data=service_data, + manufacturer_data=manufacturer_data, )