Add coverage for bluetooth connection free responses (#673)

This commit is contained in:
J. Nick Koston 2023-11-23 17:49:24 +01:00 committed by GitHub
parent 96781616b8
commit 0347302222
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 37 additions and 9 deletions

View File

@ -618,20 +618,23 @@ class APIClient:
return unsub
def _on_bluetooth_connections_free_response(
self,
on_bluetooth_connections_free_update: Callable[[int, int], None],
msg: BluetoothConnectionsFreeResponse,
) -> None:
on_bluetooth_connections_free_update(msg.free, msg.limit)
async def subscribe_bluetooth_connections_free(
self, on_bluetooth_connections_free_update: Callable[[int, int], None]
) -> Callable[[], None]:
msg_types = (BluetoothConnectionsFreeResponse,)
def _on_bluetooth_connections_free_response(
msg: BluetoothConnectionsFreeResponse,
) -> None:
on_bluetooth_connections_free_update(msg.free, msg.limit)
return self._get_connection().send_message_callback_response(
SubscribeBluetoothConnectionsFreeRequest(),
_on_bluetooth_connections_free_response,
msg_types,
partial(
self._on_bluetooth_connections_free_response,
on_bluetooth_connections_free_update,
),
(BluetoothConnectionsFreeResponse,),
)
def _handle_timeout(self, fut: asyncio.Future[None]) -> None:

View File

@ -12,6 +12,7 @@ from aioesphomeapi._frame_helper.plain_text import APIPlaintextFrameHelper
from aioesphomeapi.api_pb2 import (
AlarmControlPanelCommandRequest,
BinarySensorStateResponse,
BluetoothConnectionsFreeResponse,
BluetoothDeviceClearCacheResponse,
BluetoothDeviceConnectionResponse,
BluetoothDevicePairingResponse,
@ -1298,6 +1299,30 @@ async def test_subscribe_bluetooth_le_raw_advertisements(
unsub()
@pytest.mark.asyncio
async def test_subscribe_bluetooth_connections_free(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test subscribe_bluetooth_connections_free."""
client, connection, transport, protocol = api_client
connections = []
def on_bluetooth_connections_free(free: int, limit: int) -> None:
connections.append((free, limit))
unsub = await client.subscribe_bluetooth_connections_free(
on_bluetooth_connections_free
)
await asyncio.sleep(0)
response: message.Message = BluetoothConnectionsFreeResponse(free=2, limit=3)
protocol.data_received(generate_plaintext_packet(response))
assert connections == [(2, 3)]
unsub()
@pytest.mark.asyncio
async def test_subscribe_logs(auth_client: APIClient) -> None:
send = patch_response_callback(auth_client)