Reduce overhead to process bluetooth advertisements (#435)

This commit is contained in:
J. Nick Koston 2023-05-15 12:59:33 -05:00 committed by GitHub
parent bbb17fd96f
commit 8a661bb673
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 35 additions and 56 deletions

View File

@ -6,7 +6,6 @@ from typing import (
TYPE_CHECKING,
Any,
Callable,
Collection,
Dict,
Iterable,
List,
@ -30,7 +29,6 @@ else:
if TYPE_CHECKING:
from .api_pb2 import ( # type: ignore
BluetoothLEAdvertisementResponse,
BluetoothServiceData,
HomeassistantServiceMap,
)
@ -801,55 +799,6 @@ def _uuid_converter(uuid: str) -> str:
_cached_uuid_converter = lru_cache(maxsize=128)(_uuid_converter)
# value is likely a google.protobuf.pyext._message.RepeatedScalarContainer
def _convert_bluetooth_le_service_uuids(value: Iterable[str]) -> List[str]:
if not value:
# empty list, don't convert
return []
return [_cached_uuid_converter(v) for v in value]
def _convert_bluetooth_le_service_data(
value: Union[Dict[str, bytes], Collection["BluetoothServiceData"]],
) -> Dict[str, bytes]:
# We literally mean is a `dict` not any other class
if type(value) is dict: # pylint: disable=unidiomatic-typecheck
return value
if not value:
return {}
# v.data if v.data else v.legacy_data is backwards compatible with ESPHome devices before 2022.10.0
if value[0].data: # type: ignore
return {
_cached_uuid_converter(v.uuid): bytes(v.data) # type: ignore[union-attr]
for v in value
}
return {
_cached_uuid_converter(v.uuid): bytes(v.legacy_data) # type: ignore[union-attr]
for v in value
}
def _convert_bluetooth_le_manufacturer_data(
value: Union[Dict[int, bytes], Collection["BluetoothServiceData"]],
) -> Dict[int, bytes]:
# We literally mean is a `dict` not any other class
if type(value) is dict: # pylint: disable=unidiomatic-typecheck
return value
if not value:
return {}
# v.data if v.data else v.legacy_data is backwards compatible with ESPHome devices before 2022.10.0
if value[0].data: # type: ignore
return {int(v.uuid, 16): bytes(v.data) for v in value} # type: ignore
return {int(v.uuid, 16): bytes(v.legacy_data) for v in value} # type: ignore
@_dataclass_decorator
class BluetoothLEAdvertisement:
address: int
@ -864,16 +813,46 @@ class BluetoothLEAdvertisement:
def from_pb( # type: ignore[misc]
cls: "BluetoothLEAdvertisement", data: "BluetoothLEAdvertisementResponse"
) -> "BluetoothLEAdvertisement":
_uuid_convert = _cached_uuid_converter
if raw_manufacturer_data := data.manufacturer_data:
if raw_manufacturer_data[0].data:
manufacturer_data = {
int(v.uuid, 16): v.data for v in raw_manufacturer_data
}
else:
# Legacy data
manufacturer_data = {
int(v.uuid, 16): bytes(v.legacy_data) for v in raw_manufacturer_data
}
else:
manufacturer_data = {}
if raw_service_data := data.service_data:
if raw_service_data[0].data:
service_data = {_uuid_convert(v.uuid): v.data for v in raw_service_data}
else:
# Legacy data
service_data = {
_uuid_convert(v.uuid): bytes(v.legacy_data)
for v in raw_service_data
}
else:
service_data = {}
if raw_service_uuids := data.service_uuids:
service_uuids = [_uuid_convert(v) for v in raw_service_uuids]
else:
service_uuids = []
return cls( # type: ignore[operator, no-any-return]
address=data.address,
rssi=data.rssi,
address_type=data.address_type,
name=data.name.decode("utf-8", errors="replace"),
service_uuids=_convert_bluetooth_le_service_uuids(data.service_uuids),
service_data=_convert_bluetooth_le_service_data(data.service_data),
manufacturer_data=_convert_bluetooth_le_manufacturer_data(
data.manufacturer_data
),
service_uuids=service_uuids,
service_data=service_data,
manufacturer_data=manufacturer_data,
)