Raise BluetoothConnectionDroppedError if connection drops during GATT read/write

This commit is contained in:
J. Nick Koston 2023-11-28 07:27:04 -06:00
parent 3e920df478
commit 8ac21c3610
No known key found for this signature in database
1 changed files with 14 additions and 7 deletions

View File

@ -456,9 +456,12 @@ class APIClient:
BluetoothGATTNotifyResponse,
BluetoothGATTReadResponse,
BluetoothGATTWriteResponse,
BluetoothDeviceConnectionResponse,
),
)
return bool(msg.address == address and msg.handle == handle)
if type(msg) is BluetoothDeviceConnectionResponse:
return msg.address == address
return msg.address == address and msg.handle == handle
async def _send_bluetooth_message_await_response(
self,
@ -473,11 +476,12 @@ class APIClient:
timeout: float = 10.0,
) -> message.Message:
message_filter = partial(self._filter_bluetooth_message, address, handle)
msg_types = (response_type, BluetoothGATTErrorResponse)
[resp] = await self._get_connection().send_messages_await_response_complex(
(request,),
message_filter,
message_filter,
(response_type, BluetoothGATTErrorResponse),
(*msg_types, BluetoothDeviceConnectionResponse),
timeout,
)
@ -487,6 +491,8 @@ class APIClient:
):
raise BluetoothGATTAPIError(BluetoothGATTError.from_pb(resp))
self._raise_for_ble_connection_change(address, resp, msg_types)
return resp
def _unsub_bluetooth_advertisements(
@ -698,11 +704,7 @@ class APIClient:
(BluetoothDeviceConnectionResponse, *msg_types),
timeout,
)
if (
type(response) # pylint: disable=unidiomatic-typecheck
is BluetoothDeviceConnectionResponse
):
self._raise_for_ble_connection_change(address, response, msg_types)
self._raise_for_ble_connection_change(address, response, msg_types)
return response
def _raise_for_ble_connection_change(
@ -712,6 +714,11 @@ class APIClient:
msg_types: tuple[type[message.Message], ...],
) -> None:
"""Raise an exception if the connection status changed."""
if (
type(response) # pylint: disable=unidiomatic-typecheck
is not BluetoothDeviceConnectionResponse
):
return
response_names = message_types_to_names(msg_types)
human_readable_address = to_human_readable_address(address)
raise BluetoothConnectionDroppedError(