Reduce duplication in bluetooth gatt read code (#657)

This commit is contained in:
J. Nick Koston 2023-11-21 23:24:54 +01:00 committed by GitHub
parent 432a3e65b6
commit d8cace0b26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 178 additions and 13 deletions

View File

@ -910,20 +910,12 @@ class APIClient:
handle: int, handle: int,
timeout: float = DEFAULT_BLE_TIMEOUT, timeout: float = DEFAULT_BLE_TIMEOUT,
) -> bytearray: ) -> bytearray:
req = BluetoothGATTReadRequest() return await self._bluetooth_gatt_read(
req.address = address BluetoothGATTReadRequest(),
req.handle = handle
resp = await self._send_bluetooth_message_await_response(
address, address,
handle, handle,
req, timeout,
BluetoothGATTReadResponse,
timeout=timeout,
) )
if TYPE_CHECKING:
assert isinstance(resp, BluetoothGATTReadResponse)
return bytearray(resp.data)
async def bluetooth_gatt_write( async def bluetooth_gatt_write(
self, self,
@ -959,7 +951,21 @@ class APIClient:
timeout: float = DEFAULT_BLE_TIMEOUT, timeout: float = DEFAULT_BLE_TIMEOUT,
) -> bytearray: ) -> bytearray:
"""Read a GATT descriptor.""" """Read a GATT descriptor."""
req = BluetoothGATTReadDescriptorRequest() return await self._bluetooth_gatt_read(
BluetoothGATTReadDescriptorRequest(),
address,
handle,
timeout,
)
async def _bluetooth_gatt_read(
self,
req: BluetoothGATTReadDescriptorRequest | BluetoothGATTReadRequest,
address: int,
handle: int,
timeout: float,
) -> bytearray:
"""Perform a GATT read."""
req.address = address req.address = address
req.handle = handle req.handle = handle
resp = await self._send_bluetooth_message_await_response( resp = await self._send_bluetooth_message_await_response(

View File

@ -15,6 +15,8 @@ from aioesphomeapi.api_pb2 import (
BluetoothDeviceConnectionResponse, BluetoothDeviceConnectionResponse,
BluetoothDevicePairingResponse, BluetoothDevicePairingResponse,
BluetoothDeviceUnpairingResponse, BluetoothDeviceUnpairingResponse,
BluetoothGATTReadResponse,
BluetoothGATTWriteResponse,
ButtonCommandRequest, ButtonCommandRequest,
CameraImageRequest, CameraImageRequest,
CameraImageResponse, CameraImageResponse,
@ -39,7 +41,7 @@ from aioesphomeapi.api_pb2 import (
) )
from aioesphomeapi.client import APIClient from aioesphomeapi.client import APIClient
from aioesphomeapi.connection import APIConnection from aioesphomeapi.connection import APIConnection
from aioesphomeapi.core import APIConnectionError from aioesphomeapi.core import APIConnectionError, TimeoutAPIError
from aioesphomeapi.model import ( from aioesphomeapi.model import (
AlarmControlPanelCommand, AlarmControlPanelCommand,
APIVersion, APIVersion,
@ -870,3 +872,160 @@ async def test_device_info(
await disconnect_task await disconnect_task
with pytest.raises(APIConnectionError, match="CLOSED"): with pytest.raises(APIConnectionError, match="CLOSED"):
await client.device_info() await client.device_info()
@pytest.mark.asyncio
async def test_bluetooth_gatt_read(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_read."""
client, connection, transport, protocol = api_client
read_task = asyncio.create_task(client.bluetooth_gatt_read(1234, 1234))
await asyncio.sleep(0)
other_response: message.Message = BluetoothGATTReadResponse(
address=1234, handle=4567, data=b"4567"
)
protocol.data_received(generate_plaintext_packet(other_response))
response: message.Message = BluetoothGATTReadResponse(
address=1234, handle=1234, data=b"1234"
)
protocol.data_received(generate_plaintext_packet(response))
assert await read_task == b"1234"
@pytest.mark.asyncio
async def test_bluetooth_gatt_read_descriptor(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_read_descriptor."""
client, connection, transport, protocol = api_client
read_task = asyncio.create_task(client.bluetooth_gatt_read_descriptor(1234, 1234))
await asyncio.sleep(0)
other_response: message.Message = BluetoothGATTReadResponse(
address=1234, handle=4567, data=b"4567"
)
protocol.data_received(generate_plaintext_packet(other_response))
response: message.Message = BluetoothGATTReadResponse(
address=1234, handle=1234, data=b"1234"
)
protocol.data_received(generate_plaintext_packet(response))
assert await read_task == b"1234"
@pytest.mark.asyncio
async def test_bluetooth_gatt_write(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_write."""
client, connection, transport, protocol = api_client
write_task = asyncio.create_task(
client.bluetooth_gatt_write(1234, 1234, b"1234", True)
)
await asyncio.sleep(0)
other_response: message.Message = BluetoothGATTWriteResponse(
address=1234, handle=4567
)
protocol.data_received(generate_plaintext_packet(other_response))
response: message.Message = BluetoothGATTWriteResponse(address=1234, handle=1234)
protocol.data_received(generate_plaintext_packet(response))
await write_task
@pytest.mark.asyncio
async def test_bluetooth_gatt_write_without_response(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_write without response."""
client, connection, transport, protocol = api_client
transport.reset_mock()
write_task = asyncio.create_task(
client.bluetooth_gatt_write(1234, 1234, b"1234", False)
)
await asyncio.sleep(0)
await write_task
assert transport.mock_calls[0][1][0] == b'\x00\x0cK\x08\xd2\t\x10\xd2\t"\x041234'
with pytest.raises(TimeoutAPIError, match="BluetoothGATTWriteResponse"):
await client.bluetooth_gatt_write(1234, 1234, b"1234", True, timeout=0)
@pytest.mark.asyncio
async def test_bluetooth_gatt_write_descriptor(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_write_descriptor."""
client, connection, transport, protocol = api_client
write_task = asyncio.create_task(
client.bluetooth_gatt_write_descriptor(1234, 1234, b"1234", True)
)
await asyncio.sleep(0)
other_response: message.Message = BluetoothGATTWriteResponse(
address=1234, handle=4567
)
protocol.data_received(generate_plaintext_packet(other_response))
response: message.Message = BluetoothGATTWriteResponse(address=1234, handle=1234)
protocol.data_received(generate_plaintext_packet(response))
await write_task
@pytest.mark.asyncio
async def test_bluetooth_gatt_write_descriptor_without_response(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_write_descriptor without response."""
client, connection, transport, protocol = api_client
transport.reset_mock()
write_task = asyncio.create_task(
client.bluetooth_gatt_write_descriptor(
1234, 1234, b"1234", wait_for_response=False
)
)
await asyncio.sleep(0)
await write_task
assert transport.mock_calls[0][1][0] == b"\x00\x0cM\x08\xd2\t\x10\xd2\t\x1a\x041234"
with pytest.raises(TimeoutAPIError, match="BluetoothGATTWriteResponse"):
await client.bluetooth_gatt_write_descriptor(1234, 1234, b"1234", timeout=0)
@pytest.mark.asyncio
async def test_bluetooth_gatt_read_descriptor(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_read_descriptor."""
client, connection, transport, protocol = api_client
read_task = asyncio.create_task(client.bluetooth_gatt_read_descriptor(1234, 1234))
await asyncio.sleep(0)
other_response: message.Message = BluetoothGATTReadResponse(
address=1234, handle=4567, data=b"4567"
)
protocol.data_received(generate_plaintext_packet(other_response))
response: message.Message = BluetoothGATTReadResponse(
address=1234, handle=1234, data=b"1234"
)
protocol.data_received(generate_plaintext_packet(response))
assert await read_task == b"1234"