Reduce duplicate code for Bluetooth Advertisements (#738)

This commit is contained in:
J. Nick Koston 2023-11-26 14:08:05 -06:00 committed by GitHub
parent 67bd7efb29
commit f15acf1b1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -483,6 +483,14 @@ class APIClient:
return resp return resp
def _unsub_bluetooth_advertisements(
self, unsub_callback: Callable[[], None]
) -> None:
"""Unsubscribe Bluetooth advertisements if connected."""
if self._connection is not None:
unsub_callback()
self._connection.send_message(UnsubscribeBluetoothLEAdvertisementsRequest())
async def subscribe_bluetooth_le_advertisements( async def subscribe_bluetooth_le_advertisements(
self, on_bluetooth_le_advertisement: Callable[[BluetoothLEAdvertisement], None] self, on_bluetooth_le_advertisement: Callable[[BluetoothLEAdvertisement], None]
) -> Callable[[], None]: ) -> Callable[[], None]:
@ -494,15 +502,7 @@ class APIClient:
), ),
(BluetoothLEAdvertisementResponse,), (BluetoothLEAdvertisementResponse,),
) )
return partial(self._unsub_bluetooth_advertisements, unsub_callback)
def unsub() -> None:
if self._connection is not None:
unsub_callback()
self._connection.send_message(
UnsubscribeBluetoothLEAdvertisementsRequest()
)
return unsub
async def subscribe_bluetooth_le_raw_advertisements( async def subscribe_bluetooth_le_raw_advertisements(
self, on_advertisements: Callable[[list[BluetoothLERawAdvertisement]], None] self, on_advertisements: Callable[[list[BluetoothLERawAdvertisement]], None]
@ -514,15 +514,7 @@ class APIClient:
partial(on_ble_raw_advertisement_response, on_advertisements), partial(on_ble_raw_advertisement_response, on_advertisements),
(BluetoothLERawAdvertisementsResponse,), (BluetoothLERawAdvertisementsResponse,),
) )
return partial(self._unsub_bluetooth_advertisements, unsub_callback)
def unsub() -> None:
if self._connection is not None:
unsub_callback()
self._connection.send_message(
UnsubscribeBluetoothLEAdvertisementsRequest()
)
return unsub
async def subscribe_bluetooth_connections_free( async def subscribe_bluetooth_connections_free(
self, on_bluetooth_connections_free_update: Callable[[int, int], None] self, on_bluetooth_connections_free_update: Callable[[int, int], None]