Merge branch 'main' into climate_enhancements

This commit is contained in:
J. Nick Koston 2023-11-28 08:54:58 -06:00 committed by GitHub
commit 38aa0fcfba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 320 additions and 125 deletions

View File

@ -11,6 +11,7 @@ from .core import (
MESSAGE_TYPE_TO_PROTO,
APIConnectionError,
BadNameAPIError,
BluetoothConnectionDroppedError,
HandshakeAPIError,
InvalidAuthAPIError,
InvalidEncryptionKeyAPIError,

View File

@ -23,7 +23,6 @@ WRITE_EXCEPTIONS = (RuntimeError, ConnectionResetError, OSError)
_int = int
_bytes = bytes
_float = float
class APIFrameHelper:

View File

@ -1,3 +1,4 @@
# pylint: disable=unidiomatic-typecheck
from __future__ import annotations
import asyncio
@ -76,6 +77,7 @@ from .client_callbacks import (
on_bluetooth_device_connection_response,
on_bluetooth_gatt_notify_data_response,
on_bluetooth_le_advertising_response,
on_bluetooth_message,
on_home_assistant_service_response,
on_state_msg,
on_subscribe_home_assistant_state_response,
@ -83,9 +85,11 @@ from .client_callbacks import (
from .connection import APIConnection, ConnectionParams, handle_timeout
from .core import (
APIConnectionError,
BluetoothConnectionDroppedError,
BluetoothGATTAPIError,
TimeoutAPIError,
to_human_readable_address,
to_human_readable_gatt_error,
)
from .model import (
AlarmControlPanelCommand,
@ -118,7 +122,11 @@ from .model import (
UserServiceArgType,
)
from .model import VoiceAssistantAudioSettings as VoiceAssistantAudioSettingsModel
from .model import VoiceAssistantCommand, VoiceAssistantEventType
from .model import (
VoiceAssistantCommand,
VoiceAssistantEventType,
message_types_to_names,
)
from .model_conversions import (
LIST_ENTITIES_SERVICES_RESPONSE_TYPES,
SUBSCRIBE_STATES_RESPONSE_TYPES,
@ -380,27 +388,23 @@ class APIClient:
async def list_entities_services(
self,
) -> tuple[list[EntityInfo], list[UserService]]:
response_types = LIST_ENTITIES_SERVICES_RESPONSE_TYPES
msg_types = LIST_ENTITIES_MSG_TYPES
def do_append(msg: message.Message) -> bool:
return not isinstance(msg, ListEntitiesDoneResponse)
def do_stop(msg: message.Message) -> bool:
return isinstance(msg, ListEntitiesDoneResponse)
resp = await self._get_connection().send_messages_await_response_complex(
(ListEntitiesRequest(),), do_append, do_stop, msg_types, 60
msgs = await self._get_connection().send_messages_await_response_complex(
(ListEntitiesRequest(),),
lambda msg: type(msg) is not ListEntitiesDoneResponse,
lambda msg: type(msg) is ListEntitiesDoneResponse,
LIST_ENTITIES_MSG_TYPES,
60,
)
entities: list[EntityInfo] = []
services: list[UserService] = []
for msg in resp:
if isinstance(msg, ListEntitiesServicesResponse):
response_types = LIST_ENTITIES_SERVICES_RESPONSE_TYPES
for msg in msgs:
msg_type = type(msg)
if msg_type is ListEntitiesServicesResponse:
services.append(UserService.from_pb(msg))
continue
cls = response_types[type(msg)]
assert cls is not None
entities.append(cls.from_pb(msg))
if cls := response_types[msg_type]:
entities.append(cls.from_pb(msg))
return entities, services
async def subscribe_states(self, on_state: Callable[[EntityState], None]) -> None:
@ -435,25 +439,6 @@ class APIClient:
(HomeassistantServiceResponse,),
)
def _filter_bluetooth_message(
self,
address: int,
handle: int,
msg: message.Message,
) -> bool:
"""Handle a Bluetooth message."""
if TYPE_CHECKING:
assert isinstance(
msg,
(
BluetoothGATTErrorResponse,
BluetoothGATTNotifyResponse,
BluetoothGATTReadResponse,
BluetoothGATTWriteResponse,
),
)
return bool(msg.address == address and msg.handle == handle)
async def _send_bluetooth_message_await_response(
self,
address: int,
@ -466,21 +451,21 @@ class APIClient:
),
timeout: float = 10.0,
) -> message.Message:
message_filter = partial(self._filter_bluetooth_message, address, handle)
message_filter = partial(on_bluetooth_message, address, handle)
msg_types = (response_type, BluetoothGATTErrorResponse)
[resp] = await self._get_connection().send_messages_await_response_complex(
(request,),
message_filter,
message_filter,
(response_type, BluetoothGATTErrorResponse),
(*msg_types, BluetoothDeviceConnectionResponse),
timeout,
)
if (
type(resp) # pylint: disable=unidiomatic-typecheck
is BluetoothGATTErrorResponse
):
if type(resp) is BluetoothGATTErrorResponse:
raise BluetoothGATTAPIError(BluetoothGATTError.from_pb(resp))
self._raise_for_ble_connection_change(address, resp, msg_types)
return resp
def _unsub_bluetooth_advertisements(
@ -632,26 +617,11 @@ class APIClient:
async def bluetooth_device_pair(
self, address: int, timeout: float = DEFAULT_BLE_TIMEOUT
) -> BluetoothDevicePairing:
def predicate_func(
msg: BluetoothDevicePairingResponse | BluetoothDeviceConnectionResponse,
) -> bool:
if msg.address != address:
return False
if isinstance(msg, BluetoothDeviceConnectionResponse):
raise APIConnectionError(
f"Peripheral changed connections status while pairing: {msg.error}"
)
return True
return BluetoothDevicePairing.from_pb(
await self._bluetooth_device_request(
await self._bluetooth_device_request_watch_connection(
address,
BluetoothDeviceRequestType.PAIR,
predicate_func,
(
BluetoothDevicePairingResponse,
BluetoothDeviceConnectionResponse,
),
(BluetoothDevicePairingResponse,),
timeout,
)
)
@ -660,10 +630,9 @@ class APIClient:
self, address: int, timeout: float = DEFAULT_BLE_TIMEOUT
) -> BluetoothDeviceUnpairing:
return BluetoothDeviceUnpairing.from_pb(
await self._bluetooth_device_request(
await self._bluetooth_device_request_watch_connection(
address,
BluetoothDeviceRequestType.UNPAIR,
lambda msg: msg.address == address,
(BluetoothDeviceUnpairingResponse,),
timeout,
)
@ -673,10 +642,9 @@ class APIClient:
self, address: int, timeout: float = DEFAULT_BLE_TIMEOUT
) -> BluetoothDeviceClearCache:
return BluetoothDeviceClearCache.from_pb(
await self._bluetooth_device_request(
await self._bluetooth_device_request_watch_connection(
address,
BluetoothDeviceRequestType.CLEAR_CACHE,
lambda msg: msg.address == address,
(BluetoothDeviceClearCacheResponse,),
timeout,
)
@ -694,6 +662,41 @@ class APIClient:
timeout,
)
async def _bluetooth_device_request_watch_connection(
self,
address: int,
request_type: BluetoothDeviceRequestType,
msg_types: tuple[type[message.Message], ...],
timeout: float,
) -> message.Message:
"""Send a BluetoothDeviceRequest watch for the connection state to change."""
response = await self._bluetooth_device_request(
address,
request_type,
lambda msg: msg.address == address,
(BluetoothDeviceConnectionResponse, *msg_types),
timeout,
)
self._raise_for_ble_connection_change(address, response, msg_types)
return response
def _raise_for_ble_connection_change(
self,
address: int,
response: BluetoothDeviceConnectionResponse,
msg_types: tuple[type[message.Message], ...],
) -> None:
"""Raise an exception if the connection status changed."""
if type(response) is not BluetoothDeviceConnectionResponse:
return
response_names = message_types_to_names(msg_types)
human_readable_address = to_human_readable_address(address)
raise BluetoothConnectionDroppedError(
f"Peripheral {human_readable_address} changed connection status while waiting for "
f"{response_names}: {to_human_readable_gatt_error(response.error)} "
f"({response.error})"
)
async def _bluetooth_device_request(
self,
address: int,
@ -702,6 +705,7 @@ class APIClient:
msg_types: tuple[type[message.Message], ...],
timeout: float,
) -> message.Message:
"""Send a BluetoothDeviceRequest and wait for a response."""
[response] = await self._get_connection().send_messages_await_response_complex(
(
BluetoothDeviceRequest(
@ -719,31 +723,50 @@ class APIClient:
async def bluetooth_gatt_get_services(
self, address: int
) -> ESPHomeBluetoothGATTServices:
append_types = (BluetoothGATTGetServicesResponse, BluetoothGATTErrorResponse)
stop_types = (BluetoothGATTGetServicesDoneResponse, BluetoothGATTErrorResponse)
append_types = (
BluetoothDeviceConnectionResponse,
BluetoothGATTGetServicesResponse,
BluetoothGATTErrorResponse,
)
stop_types = (
BluetoothDeviceConnectionResponse,
BluetoothGATTGetServicesDoneResponse,
BluetoothGATTErrorResponse,
)
msg_types = (
BluetoothGATTGetServicesResponse,
BluetoothGATTGetServicesDoneResponse,
BluetoothGATTErrorResponse,
)
def do_append(msg: message.Message) -> bool:
return isinstance(msg, append_types) and msg.address == address
def do_append(
msg: BluetoothDeviceConnectionResponse
| BluetoothGATTGetServicesResponse
| BluetoothGATTGetServicesDoneResponse
| BluetoothGATTErrorResponse,
) -> bool:
return type(msg) in append_types and msg.address == address
def do_stop(msg: message.Message) -> bool:
return isinstance(msg, stop_types) and msg.address == address
def do_stop(
msg: BluetoothDeviceConnectionResponse
| BluetoothGATTGetServicesResponse
| BluetoothGATTGetServicesDoneResponse
| BluetoothGATTErrorResponse,
) -> bool:
return type(msg) in stop_types and msg.address == address
resp = await self._get_connection().send_messages_await_response_complex(
(BluetoothGATTGetServicesRequest(address=address),),
do_append,
do_stop,
(
BluetoothGATTGetServicesResponse,
BluetoothGATTGetServicesDoneResponse,
BluetoothGATTErrorResponse,
),
(*msg_types, BluetoothDeviceConnectionResponse),
DEFAULT_BLE_TIMEOUT,
)
services = []
for msg in resp:
if isinstance(msg, BluetoothGATTErrorResponse):
self._raise_for_ble_connection_change(address, msg, msg_types)
if type(msg) is BluetoothGATTErrorResponse:
raise BluetoothGATTAPIError(BluetoothGATTError.from_pb(msg))
services.extend(BluetoothGATTServices.from_pb(msg).services)
return ESPHomeBluetoothGATTServices(address=address, services=services) # type: ignore[call-arg]
@ -755,36 +778,12 @@ class APIClient:
timeout: float = DEFAULT_BLE_TIMEOUT,
) -> bytearray:
return await self._bluetooth_gatt_read(
BluetoothGATTReadRequest(),
BluetoothGATTReadRequest,
address,
handle,
timeout,
)
async def bluetooth_gatt_write(
self,
address: int,
handle: int,
data: bytes,
response: bool,
timeout: float = DEFAULT_BLE_TIMEOUT,
) -> None:
req = BluetoothGATTWriteRequest(
address=address, handle=handle, response=response, data=data
)
if not response:
self._get_connection().send_message(req)
return
await self._send_bluetooth_message_await_response(
address,
handle,
req,
BluetoothGATTWriteResponse,
timeout=timeout,
)
async def bluetooth_gatt_read_descriptor(
self,
address: int,
@ -793,7 +792,7 @@ class APIClient:
) -> bytearray:
"""Read a GATT descriptor."""
return await self._bluetooth_gatt_read(
BluetoothGATTReadDescriptorRequest(),
BluetoothGATTReadDescriptorRequest,
address,
handle,
timeout,
@ -801,25 +800,40 @@ class APIClient:
async def _bluetooth_gatt_read(
self,
req: BluetoothGATTReadDescriptorRequest | BluetoothGATTReadRequest,
req_type: type[BluetoothGATTReadDescriptorRequest]
| type[BluetoothGATTReadRequest],
address: int,
handle: int,
timeout: float,
) -> bytearray:
"""Perform a GATT read."""
req.address = address
req.handle = handle
resp = await self._send_bluetooth_message_await_response(
address,
handle,
req,
req_type(address=address, handle=handle),
BluetoothGATTReadResponse,
timeout=timeout,
timeout,
)
if TYPE_CHECKING:
assert isinstance(resp, BluetoothGATTReadResponse)
return bytearray(resp.data)
async def bluetooth_gatt_write(
self,
address: int,
handle: int,
data: bytes,
response: bool,
timeout: float = DEFAULT_BLE_TIMEOUT,
) -> None:
await self._bluetooth_gatt_write(
address,
handle,
BluetoothGATTWriteRequest(response=response, data=data),
timeout,
response,
)
async def bluetooth_gatt_write_descriptor(
self,
address: int,
@ -828,20 +842,34 @@ class APIClient:
timeout: float = DEFAULT_BLE_TIMEOUT,
wait_for_response: bool = True,
) -> None:
req = BluetoothGATTWriteDescriptorRequest(
address=address, handle=handle, data=data
await self._bluetooth_gatt_write(
address,
handle,
BluetoothGATTWriteDescriptorRequest(data=data),
timeout,
wait_for_response,
)
async def _bluetooth_gatt_write(
self,
address: int,
handle: int,
req: BluetoothGATTWriteDescriptorRequest | BluetoothGATTWriteRequest,
timeout: float,
wait_for_response: bool,
) -> None:
"""Perform a GATT write to a char or descriptor."""
req.address = address
req.handle = handle
if not wait_for_response:
self._get_connection().send_message(req)
return
await self._send_bluetooth_message_await_response(
address,
handle,
req,
BluetoothGATTWriteResponse,
timeout=timeout,
timeout,
)
async def bluetooth_gatt_start_notify(
@ -849,6 +877,7 @@ class APIClient:
address: int,
handle: int,
on_bluetooth_gatt_notify: Callable[[int, bytearray], None],
timeout: float = 10.0,
) -> tuple[Callable[[], Coroutine[Any, Any, None]], Callable[[], None]]:
"""Start a notify session for a GATT characteristic.
@ -876,6 +905,7 @@ class APIClient:
handle,
BluetoothGATTNotifyRequest(address=address, handle=handle, enable=True),
BluetoothGATTNotifyResponse,
timeout,
)
except Exception:
remove_callback()
@ -941,7 +971,6 @@ class APIClient:
elif position == 0.0:
req.legacy_command = LegacyCoverCommand.CLOSE
req.has_legacy_command = True
self._get_connection().send_message(req)
async def fan_command(
@ -969,7 +998,6 @@ class APIClient:
if direction is not None:
req.has_direction = True
req.direction = direction
self._get_connection().send_message(req)
async def light_command( # pylint: disable=too-many-branches
@ -1027,7 +1055,6 @@ class APIClient:
if effect is not None:
req.has_effect = True
req.effect = effect
self._get_connection().send_message(req)
async def switch_command(self, key: int, state: bool) -> None:
@ -1117,7 +1144,6 @@ class APIClient:
if duration is not None:
req.duration = duration
req.has_duration = True
self._get_connection().send_message(req)
async def button_command(self, key: int) -> None:
@ -1152,7 +1178,6 @@ class APIClient:
if media_url is not None:
req.media_url = media_url
req.has_media_url = True
self._get_connection().send_message(req)
async def text_command(self, key: int, state: str) -> None:

View File

@ -8,3 +8,5 @@ cdef object CameraImageResponse, CameraState
cdef object HomeassistantServiceCall
cdef object BluetoothLEAdvertisement
cdef object BluetoothDeviceConnectionResponse

View File

@ -1,3 +1,4 @@
# pylint: disable=unidiomatic-typecheck
from __future__ import annotations
from asyncio import Future
@ -8,7 +9,11 @@ from google.protobuf import message
from .api_pb2 import ( # type: ignore
BluetoothConnectionsFreeResponse,
BluetoothDeviceConnectionResponse,
BluetoothGATTErrorResponse,
BluetoothGATTNotifyDataResponse,
BluetoothGATTNotifyResponse,
BluetoothGATTReadResponse,
BluetoothGATTWriteResponse,
BluetoothLEAdvertisementResponse,
BluetoothLERawAdvertisement,
BluetoothLERawAdvertisementsResponse,
@ -111,3 +116,18 @@ def on_bluetooth_device_connection_response(
# or we get an error.
if not connect_future.done():
connect_future.set_result(None)
def on_bluetooth_message(
address: int,
handle: int,
msg: BluetoothGATTErrorResponse
| BluetoothGATTNotifyResponse
| BluetoothGATTReadResponse
| BluetoothGATTWriteResponse
| BluetoothDeviceConnectionResponse,
) -> bool:
"""Handle a Bluetooth message."""
if type(msg) is BluetoothDeviceConnectionResponse:
return bool(msg.address == address)
return bool(msg.address == address and msg.handle == handle)

View File

@ -49,7 +49,7 @@ from .core import (
TimeoutAPIError,
UnhandledAPIConnectionError,
)
from .model import APIVersion
from .model import APIVersion, message_types_to_names
from .zeroconf import ZeroconfManager
if sys.version_info[:2] < (3, 11):
@ -758,7 +758,7 @@ class APIConnection:
await fut
except asyncio_TimeoutError as err:
timeout_expired = True
response_names = ", ".join(t.__name__ for t in msg_types)
response_names = message_types_to_names(msg_types)
raise TimeoutAPIError(
f"Timeout waiting for {response_names} after {timeout}s"
) from err

View File

@ -228,6 +228,10 @@ class UnhandledAPIConnectionError(APIConnectionError):
pass
class BluetoothConnectionDroppedError(APIConnectionError):
"""Raised when a Bluetooth connection is dropped."""
def to_human_readable_address(address: int) -> str:
"""Convert a MAC address to a human readable format."""
return ":".join(TWO_CHAR.findall(f"{address:012X}"))

View File

@ -8,6 +8,8 @@ from functools import cache, lru_cache, partial
from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast
from uuid import UUID
from google.protobuf import message
from .util import fix_float_single_double_conversion
if sys.version_info[:2] < (3, 10):
@ -1174,3 +1176,7 @@ def build_unique_id(formatted_mac: str, entity_info: EntityInfo) -> str:
"""
# <mac>-<entity type>-<object_id>
return f"{formatted_mac}-{_TYPE_TO_NAME[type(entity_info)]}-{entity_info.object_id}"
def message_types_to_names(msg_types: Iterable[type[message.Message]]) -> str:
return ", ".join(t.__name__ for t in msg_types)

View File

@ -11,7 +11,7 @@ with open(os.path.join(here, "README.rst"), encoding="utf-8") as readme_file:
long_description = readme_file.read()
VERSION = "19.1.5"
VERSION = "19.1.6"
PROJECT_NAME = "aioesphomeapi"
PROJECT_PACKAGE_NAME = "aioesphomeapi"
PROJECT_LICENSE = "MIT"

View File

@ -66,7 +66,7 @@ from aioesphomeapi.api_pb2 import (
VoiceAssistantRequest,
VoiceAssistantResponse,
)
from aioesphomeapi.client import APIClient
from aioesphomeapi.client import APIClient, BluetoothConnectionDroppedError
from aioesphomeapi.connection import APIConnection
from aioesphomeapi.core import (
APIConnectionError,
@ -963,10 +963,55 @@ async def test_bluetooth_pair_connection_drops(
address=1234, connected=False, error=13
)
mock_data_received(protocol, generate_plaintext_packet(response))
with pytest.raises(
APIConnectionError,
match="Peripheral changed connections status while pairing: 13",
):
message = (
"Peripheral 00:00:00:00:04:D2 changed connection status while waiting"
" for BluetoothDevicePairingResponse: Invalid attribute length"
)
with pytest.raises(BluetoothConnectionDroppedError, match=message):
await pair_task
@pytest.mark.asyncio
async def test_bluetooth_unpair_connection_drops(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test connection drop during bluetooth_device_unpair."""
client, connection, transport, protocol = api_client
pair_task = asyncio.create_task(client.bluetooth_device_unpair(1234))
await asyncio.sleep(0)
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=False, error=13
)
mock_data_received(protocol, generate_plaintext_packet(response))
message = (
"Peripheral 00:00:00:00:04:D2 changed connection status while waiting"
" for BluetoothDeviceUnpairingResponse: Invalid attribute length"
)
with pytest.raises(BluetoothConnectionDroppedError, match=message):
await pair_task
@pytest.mark.asyncio
async def test_bluetooth_clear_cache_connection_drops(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test connection drop during bluetooth_device_clear_cache."""
client, connection, transport, protocol = api_client
pair_task = asyncio.create_task(client.bluetooth_device_clear_cache(1234))
await asyncio.sleep(0)
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=False, error=13
)
mock_data_received(protocol, generate_plaintext_packet(response))
message = (
"Peripheral 00:00:00:00:04:D2 changed connection status while waiting"
" for BluetoothDeviceClearCacheResponse: Invalid attribute length"
)
with pytest.raises(BluetoothConnectionDroppedError, match=message):
await pair_task
@ -1054,6 +1099,28 @@ async def test_bluetooth_gatt_read(
assert await read_task == b"1234"
@pytest.mark.asyncio
async def test_bluetooth_gatt_read_connection_drops(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test connection drop during bluetooth_gatt_read."""
client, connection, transport, protocol = api_client
read_task = asyncio.create_task(client.bluetooth_gatt_read(1234, 1234))
await asyncio.sleep(0)
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=False, error=13
)
mock_data_received(protocol, generate_plaintext_packet(response))
message = (
"Peripheral 00:00:00:00:04:D2 changed connection status while waiting"
" for BluetoothGATTReadResponse, BluetoothGATTErrorResponse: Invalid attribute length"
)
with pytest.raises(BluetoothConnectionDroppedError, match=message):
await read_task
@pytest.mark.asyncio
async def test_bluetooth_gatt_read_error(
api_client: tuple[
@ -1118,6 +1185,30 @@ async def test_bluetooth_gatt_write(
await write_task
@pytest.mark.asyncio
async def test_bluetooth_gatt_write_connection_drops(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test connection drop during bluetooth_gatt_read."""
client, connection, transport, protocol = api_client
write_task = asyncio.create_task(
client.bluetooth_gatt_write(1234, 1234, b"1234", True)
)
await asyncio.sleep(0)
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=False, error=13
)
mock_data_received(protocol, generate_plaintext_packet(response))
message = (
"Peripheral 00:00:00:00:04:D2 changed connection status while waiting"
" for BluetoothGATTWriteResponse, BluetoothGATTErrorResponse: Invalid attribute length"
)
with pytest.raises(BluetoothConnectionDroppedError, match=message):
await write_task
@pytest.mark.asyncio
async def test_bluetooth_gatt_write_without_response(
api_client: tuple[
@ -1183,6 +1274,29 @@ async def test_bluetooth_gatt_write_descriptor_without_response(
await client.bluetooth_gatt_write_descriptor(1234, 1234, b"1234", timeout=0)
@pytest.mark.asyncio
async def test_bluetooth_gatt_get_services_connection_drops(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test connection drop during bluetooth_gatt_get_services."""
client, connection, transport, protocol = api_client
services_task = asyncio.create_task(client.bluetooth_gatt_get_services(1234))
await asyncio.sleep(0)
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=False, error=13
)
mock_data_received(protocol, generate_plaintext_packet(response))
message = (
"Peripheral 00:00:00:00:04:D2 changed connection status while waiting"
" for BluetoothGATTGetServicesResponse, BluetoothGATTGetServicesDoneResponse, "
"BluetoothGATTErrorResponse: Invalid attribute length"
)
with pytest.raises(BluetoothConnectionDroppedError, match=message):
await services_task
@pytest.mark.asyncio
async def test_bluetooth_gatt_get_services(
api_client: tuple[
@ -1244,6 +1358,30 @@ async def test_bluetooth_gatt_get_services_errors(
await services_task
@pytest.mark.asyncio
async def test_bluetooth_gatt_start_notify_connection_drops(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test connection drop during bluetooth_gatt_start_notify."""
client, connection, transport, protocol = api_client
notify_task = asyncio.create_task(
client.bluetooth_gatt_start_notify(1234, 1, lambda handle, data: None)
)
await asyncio.sleep(0)
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=False, error=13
)
mock_data_received(protocol, generate_plaintext_packet(response))
message = (
"Peripheral 00:00:00:00:04:D2 changed connection status while waiting"
" for BluetoothGATTNotifyResponse, BluetoothGATTErrorResponse: Invalid attribute length"
)
with pytest.raises(BluetoothConnectionDroppedError, match=message):
await notify_task
@pytest.mark.asyncio
async def test_bluetooth_gatt_start_notify(
api_client: tuple[