Refactor raw Bluetooth advs subscriptions (#797)

This commit is contained in:
J. Nick Koston 2023-12-12 14:15:09 -10:00 committed by GitHub
parent 1672302ac2
commit cf83582ef1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 4 additions and 15 deletions

View File

@ -31,7 +31,6 @@ from .api_pb2 import ( # type: ignore
BluetoothGATTWriteRequest,
BluetoothGATTWriteResponse,
BluetoothLEAdvertisementResponse,
BluetoothLERawAdvertisement,
BluetoothLERawAdvertisementsResponse,
ButtonCommandRequest,
CameraImageRequest,
@ -72,7 +71,6 @@ from .api_pb2 import ( # type: ignore
VoiceAssistantResponse,
)
from .client_callbacks import (
on_ble_raw_advertisement_response,
on_bluetooth_connections_free_response,
on_bluetooth_device_connection_response,
on_bluetooth_gatt_notify_data_response,
@ -495,13 +493,13 @@ class APIClient:
return partial(self._unsub_bluetooth_advertisements, unsub_callback)
async def subscribe_bluetooth_le_raw_advertisements(
self, on_advertisements: Callable[[list[BluetoothLERawAdvertisement]], None]
self, on_advertisements: Callable[[BluetoothLERawAdvertisementsResponse], None]
) -> Callable[[], None]:
unsub_callback = self._get_connection().send_message_callback_response(
SubscribeBluetoothLEAdvertisementsRequest(
flags=BluetoothProxySubscriptionFlag.RAW_ADVERTISEMENTS
),
partial(on_ble_raw_advertisement_response, on_advertisements),
on_advertisements,
(BluetoothLERawAdvertisementsResponse,),
)
return partial(self._unsub_bluetooth_advertisements, unsub_callback)

View File

@ -17,8 +17,6 @@ from .api_pb2 import ( # type: ignore
BluetoothGATTReadResponse,
BluetoothGATTWriteResponse,
BluetoothLEAdvertisementResponse,
BluetoothLERawAdvertisement,
BluetoothLERawAdvertisementsResponse,
CameraImageResponse,
HomeassistantServiceResponse,
SubscribeHomeAssistantStateResponse,
@ -72,13 +70,6 @@ def on_bluetooth_le_advertising_response(
on_bluetooth_le_advertisement(BluetoothLEAdvertisement.from_pb(msg)) # type: ignore[misc]
def on_ble_raw_advertisement_response(
on_advertisements: Callable[[list[BluetoothLERawAdvertisement]], None],
msg: BluetoothLERawAdvertisementsResponse,
) -> None:
on_advertisements(msg.advertisements)
def on_bluetooth_connections_free_response(
on_bluetooth_connections_free_update: Callable[[int, int], None],
msg: BluetoothConnectionsFreeResponse,

View File

@ -1641,9 +1641,9 @@ async def test_subscribe_bluetooth_le_raw_advertisements(
adv_groups = []
def on_raw_bluetooth_le_advertisements(
advs: list[BluetoothLERawAdvertisementsResponse],
advs: BluetoothLERawAdvertisementsResponse,
) -> None:
adv_groups.append(advs)
adv_groups.append(advs.advertisements)
unsub = await client.subscribe_bluetooth_le_raw_advertisements(
on_raw_bluetooth_le_advertisements