from __future__ import annotations import asyncio import contextlib from functools import partial import itertools import logging import socket from typing import Any from unittest.mock import AsyncMock, MagicMock, call, create_autospec, patch from google.protobuf import message import pytest from aioesphomeapi._frame_helper.plain_text import APIPlaintextFrameHelper from aioesphomeapi.api_pb2 import ( AlarmControlPanelCommandRequest, BinarySensorStateResponse, BluetoothConnectionsFreeResponse, BluetoothDeviceClearCacheResponse, BluetoothDeviceConnectionResponse, BluetoothDevicePairingResponse, BluetoothDeviceRequest, BluetoothDeviceUnpairingResponse, BluetoothGATTCharacteristic, BluetoothGATTDescriptor, BluetoothGATTErrorResponse, BluetoothGATTGetServicesDoneResponse, BluetoothGATTGetServicesResponse, BluetoothGATTNotifyDataResponse, BluetoothGATTNotifyResponse, BluetoothGATTReadResponse, BluetoothGATTService, BluetoothGATTWriteResponse, BluetoothLEAdvertisementResponse, BluetoothLERawAdvertisement, BluetoothLERawAdvertisementsResponse, BluetoothServiceData, ButtonCommandRequest, CameraImageRequest, CameraImageResponse, ClimateCommandRequest, CoverCommandRequest, DateCommandRequest, DateTimeCommandRequest, DeviceInfoResponse, DisconnectResponse, ExecuteServiceArgument, ExecuteServiceRequest, FanCommandRequest, HomeassistantServiceResponse, HomeAssistantStateResponse, LightCommandRequest, ListEntitiesBinarySensorResponse, ListEntitiesDoneResponse, ListEntitiesServicesResponse, LockCommandRequest, MediaPlayerCommandRequest, NumberCommandRequest, SelectCommandRequest, SirenCommandRequest, SubscribeHomeAssistantStateResponse, SubscribeLogsResponse, SubscribeVoiceAssistantRequest, SwitchCommandRequest, TextCommandRequest, TimeCommandRequest, UpdateCommandRequest, ValveCommandRequest, VoiceAssistantAudio, VoiceAssistantAudioSettings, VoiceAssistantEventData, VoiceAssistantEventResponse, VoiceAssistantRequest, VoiceAssistantResponse, VoiceAssistantTimerEventResponse, ) from aioesphomeapi.client import APIClient, BluetoothConnectionDroppedError from aioesphomeapi.connection import APIConnection from aioesphomeapi.core import ( APIConnectionError, BluetoothGATTAPIError, TimeoutAPIError, UnhandledAPIConnectionError, ) from aioesphomeapi.model import ( AlarmControlPanelCommand, APIVersion, BinarySensorInfo, BinarySensorState, BluetoothDeviceRequestType, BluetoothGATTService as BluetoothGATTServiceModel, BluetoothLEAdvertisement, BluetoothProxyFeature, CameraState, ClimateFanMode, ClimateMode, ClimatePreset, ClimateSwingMode, ESPHomeBluetoothGATTServices, FanDirection, FanSpeed, HomeassistantServiceCall, LegacyCoverCommand, LightColorCapability, LockCommand, MediaPlayerCommand, UpdateCommand, UserService, UserServiceArg, UserServiceArgType, VoiceAssistantAudioSettings as VoiceAssistantAudioSettingsModel, VoiceAssistantEventType as VoiceAssistantEventModelType, VoiceAssistantTimerEventType as VoiceAssistantTimerEventModelType, ) from aioesphomeapi.reconnect_logic import ReconnectLogic, ReconnectLogicState from .common import ( Estr, generate_plaintext_packet, get_mock_zeroconf, mock_data_received, ) from .conftest import PatchableAPIConnection @pytest.fixture def auth_client(): client = APIClient( address="fake.address", port=6052, password=None, ) with patch.object(client, "_connection") as conn: conn.is_connected = True yield client def patch_response_complex(client: APIClient, messages): async def patched(req, app, stop, msg_types, timeout): resp = [] for msg in messages: if app(msg): resp.append(msg) if stop(msg): break else: raise ValueError("Response never stopped") return resp client._connection.send_messages_await_response_complex = patched def patch_response_callback(client: APIClient): on_message = None def patched(req, callback, msg_types): nonlocal on_message on_message = callback client._connection.send_message_callback_response = patched async def ret(send): on_message(send) return ret def patch_send(client: APIClient): send = client._connection.send_message = MagicMock() return send def patch_api_version(client: APIClient, version: APIVersion): client._connection.api_version = version @pytest.mark.asyncio async def test_expected_name(auth_client: APIClient) -> None: """Ensure expected name can be set externally.""" assert auth_client.expected_name is None auth_client.expected_name = "awesome" assert auth_client.expected_name == "awesome" @pytest.mark.asyncio async def test_connect_backwards_compat() -> None: """Verify connect is a thin wrapper around start_connection and finish_connection.""" class PatchableApiClient(APIClient): pass cli = PatchableApiClient("host", 1234, None) with ( patch.object(cli, "start_connection") as mock_start_connection, patch.object(cli, "finish_connection") as mock_finish_connection, ): await cli.connect() assert mock_start_connection.mock_calls == [call(None)] assert mock_finish_connection.mock_calls == [call(False)] @pytest.mark.asyncio async def test_finish_connection_wraps_exceptions_as_unhandled_api_error( aiohappyeyeballs_start_connection, ) -> None: """Verify finish_connect re-wraps exceptions as UnhandledAPIError.""" cli = APIClient("127.0.0.1", 1234, None) with patch("aioesphomeapi.client.APIConnection", PatchableAPIConnection): await cli.start_connection() with patch.object( cli._connection, "send_messages_await_response_complex", side_effect=Exception("foo"), ): with pytest.raises(UnhandledAPIConnectionError, match="foo"): await cli.finish_connection(False) @pytest.mark.asyncio async def test_connection_released_if_connecting_is_cancelled() -> None: """Verify connection is unset if connecting is cancelled.""" cli = APIClient("127.0.0.1", 1234, None) asyncio.get_event_loop() async def _start_connection_with_delay(*args, **kwargs): await asyncio.sleep(1) mock_socket = create_autospec(socket.socket, spec_set=True, instance=True) mock_socket.getpeername.return_value = ("4.3.3.3", 323) return mock_socket with patch( "aioesphomeapi.connection.aiohappyeyeballs.start_connection", _start_connection_with_delay, ): start_task = asyncio.create_task(cli.start_connection()) await asyncio.sleep(0) assert cli._connection is not None start_task.cancel() with contextlib.suppress(BaseException): await start_task assert cli._connection is None async def _start_connection_without_delay(*args, **kwargs): mock_socket = create_autospec(socket.socket, spec_set=True, instance=True) mock_socket.getpeername.return_value = ("4.3.3.3", 323) return mock_socket with ( patch("aioesphomeapi.client.APIConnection", PatchableAPIConnection), patch( "aioesphomeapi.connection.aiohappyeyeballs.start_connection", _start_connection_without_delay, ), ): await cli.start_connection() await asyncio.sleep(0) assert cli._connection is not None task = asyncio.create_task(cli.finish_connection(False)) await asyncio.sleep(0) task.cancel() with contextlib.suppress(BaseException): await task assert cli._connection is None @pytest.mark.asyncio async def test_request_while_handshaking() -> None: """Test trying a request while handshaking raises.""" class PatchableApiClient(APIClient): pass cli = PatchableApiClient("host", 1234, None) with ( patch( "aioesphomeapi.connection.aiohappyeyeballs.start_connection", side_effect=partial(asyncio.sleep, 1), ), patch.object(cli, "finish_connection"), ): connect_task = asyncio.create_task(cli.connect()) await asyncio.sleep(0) with pytest.raises( APIConnectionError, match="Authenticated connection not ready yet" ): await cli.device_info() connect_task.cancel() await asyncio.sleep(0) @pytest.mark.asyncio async def test_connect_while_already_connected(auth_client: APIClient) -> None: """Test connecting while already connected raises.""" with pytest.raises(APIConnectionError): await auth_client.start_connection() @pytest.mark.asyncio @pytest.mark.parametrize( "input, output", [ ( [ListEntitiesBinarySensorResponse(), ListEntitiesDoneResponse()], ([BinarySensorInfo()], []), ), ( [ListEntitiesServicesResponse(), ListEntitiesDoneResponse()], ([], [UserService()]), ), ], ) async def test_list_entities( auth_client: APIClient, input: dict[str, Any], output: dict[str, Any] ) -> None: patch_response_complex(auth_client, input) resp = await auth_client.list_entities_services() assert resp == output @pytest.mark.asyncio async def test_subscribe_states(auth_client: APIClient) -> None: send = patch_response_callback(auth_client) on_state = MagicMock() auth_client.subscribe_states(on_state) on_state.assert_not_called() await send(BinarySensorStateResponse()) on_state.assert_called_once_with(BinarySensorState()) @pytest.mark.asyncio async def test_subscribe_states_camera(auth_client: APIClient) -> None: send = patch_response_callback(auth_client) on_state = MagicMock() auth_client.subscribe_states(on_state) await send(CameraImageResponse(key=1, data=b"asdf")) on_state.assert_not_called() await send(CameraImageResponse(key=1, data=b"qwer", done=True)) on_state.assert_called_once_with(CameraState(key=1, data=b"asdfqwer")) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ (dict(key=1), dict(key=1)), ( dict(key=1, position=1.0), dict( key=1, has_legacy_command=True, legacy_command=LegacyCoverCommand.OPEN ), ), ( dict(key=1, position=0.0), dict( key=1, has_legacy_command=True, legacy_command=LegacyCoverCommand.CLOSE ), ), ( dict(key=1, stop=True), dict( key=1, has_legacy_command=True, legacy_command=LegacyCoverCommand.STOP ), ), ], ) async def test_cover_command_legacy( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) patch_api_version(auth_client, APIVersion(1, 0)) auth_client.cover_command(**cmd) send.assert_called_once_with(CoverCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ (dict(key=1), dict(key=1)), (dict(key=1, position=0.5), dict(key=1, has_position=True, position=0.5)), (dict(key=1, position=0.0), dict(key=1, has_position=True, position=0.0)), (dict(key=1, stop=True), dict(key=1, stop=True)), ( dict(key=1, position=1.0, tilt=0.8), dict(key=1, has_position=True, position=1.0, has_tilt=True, tilt=0.8), ), ], ) async def test_cover_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) patch_api_version(auth_client, APIVersion(1, 1)) auth_client.cover_command(**cmd) send.assert_called_once_with(CoverCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ (dict(key=1), dict(key=1)), (dict(key=1, state=True), dict(key=1, has_state=True, state=True)), ( dict(key=1, speed=FanSpeed.LOW), dict(key=1, has_speed=True, speed=FanSpeed.LOW), ), ( dict(key=1, speed_level=10), dict(key=1, has_speed_level=True, speed_level=10), ), ( dict(key=1, oscillating=False), dict(key=1, has_oscillating=True, oscillating=False), ), ( dict(key=1, direction=FanDirection.REVERSE), dict(key=1, has_direction=True, direction=FanDirection.REVERSE), ), ( dict(key=1, preset_mode="auto"), dict(key=1, has_preset_mode=True, preset_mode="auto"), ), ], ) async def test_fan_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) auth_client.fan_command(**cmd) send.assert_called_once_with(FanCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ (dict(key=1), dict(key=1)), (dict(key=1, state=True), dict(key=1, has_state=True, state=True)), (dict(key=1, brightness=0.8), dict(key=1, has_brightness=True, brightness=0.8)), ( dict(key=1, rgb=(0.1, 0.5, 1.0)), dict(key=1, has_rgb=True, red=0.1, green=0.5, blue=1.0), ), (dict(key=1, white=0.0), dict(key=1, has_white=True, white=0.0)), ( dict(key=1, color_temperature=0.0), dict(key=1, has_color_temperature=True, color_temperature=0.0), ), ( dict(key=1, color_brightness=0.0), dict(key=1, has_color_brightness=True, color_brightness=0.0), ), ( dict(key=1, cold_white=1.0, warm_white=2.0), dict( key=1, has_cold_white=True, cold_white=1.0, has_warm_white=True, warm_white=2.0, ), ), ( dict(key=1, transition_length=0.1), dict(key=1, has_transition_length=True, transition_length=100), ), ( dict(key=1, flash_length=0.1), dict(key=1, has_flash_length=True, flash_length=100), ), (dict(key=1, effect="special"), dict(key=1, has_effect=True, effect="special")), ( dict( key=1, color_mode=LightColorCapability.COLOR_TEMPERATURE, color_temperature=153.0, ), dict( key=1, has_color_mode=True, color_mode=LightColorCapability.COLOR_TEMPERATURE, has_color_temperature=True, color_temperature=153.0, ), ), ], ) async def test_light_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) auth_client.light_command(**cmd) send.assert_called_once_with(LightCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ (dict(key=1, state=False), dict(key=1, state=False)), (dict(key=1, state=True), dict(key=1, state=True)), ], ) async def test_switch_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) auth_client.switch_command(**cmd) send.assert_called_once_with(SwitchCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ ( dict(key=1, preset=ClimatePreset.HOME), dict(key=1, has_legacy_away=True, legacy_away=False), ), ( dict(key=1, preset=ClimatePreset.AWAY), dict(key=1, has_legacy_away=True, legacy_away=True), ), ], ) async def test_climate_command_legacy( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) patch_api_version(auth_client, APIVersion(1, 4)) auth_client.climate_command(**cmd) send.assert_called_once_with(ClimateCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ ( dict(key=1, mode=ClimateMode.HEAT), dict(key=1, has_mode=True, mode=ClimateMode.HEAT), ), ( dict(key=1, target_temperature=21.0), dict(key=1, has_target_temperature=True, target_temperature=21.0), ), ( dict(key=1, target_temperature_low=21.0), dict(key=1, has_target_temperature_low=True, target_temperature_low=21.0), ), ( dict(key=1, target_temperature_high=21.0), dict(key=1, has_target_temperature_high=True, target_temperature_high=21.0), ), ( dict(key=1, fan_mode=ClimateFanMode.LOW), dict(key=1, has_fan_mode=True, fan_mode=ClimateFanMode.LOW), ), ( dict(key=1, swing_mode=ClimateSwingMode.OFF), dict(key=1, has_swing_mode=True, swing_mode=ClimateSwingMode.OFF), ), ( dict(key=1, custom_fan_mode="asdf"), dict(key=1, has_custom_fan_mode=True, custom_fan_mode="asdf"), ), ( dict(key=1, preset=ClimatePreset.AWAY), dict(key=1, has_preset=True, preset=ClimatePreset.AWAY), ), ( dict(key=1, custom_preset="asdf"), dict(key=1, has_custom_preset=True, custom_preset="asdf"), ), ( dict(key=1, target_humidity=60.0), dict(key=1, has_target_humidity=True, target_humidity=60.0), ), ], ) async def test_climate_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) patch_api_version(auth_client, APIVersion(1, 5)) auth_client.climate_command(**cmd) send.assert_called_once_with(ClimateCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ (dict(key=1, state=0.0), dict(key=1, state=0.0)), (dict(key=1, state=100.0), dict(key=1, state=100.0)), ], ) async def test_number_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) auth_client.number_command(**cmd) send.assert_called_once_with(NumberCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ ( dict(key=1, year=2024, month=2, day=29), dict(key=1, year=2024, month=2, day=29), ), ( dict(key=1, year=2000, month=6, day=10), dict(key=1, year=2000, month=6, day=10), ), ], ) async def test_date_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) auth_client.date_command(**cmd) send.assert_called_once_with(DateCommandRequest(**req)) # Test time command @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ ( dict(key=1, hour=12, minute=30, second=30), dict(key=1, hour=12, minute=30, second=30), ), ( dict(key=1, hour=0, minute=0, second=0), dict(key=1, hour=0, minute=0, second=0), ), ], ) async def test_time_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) auth_client.time_command(**cmd) send.assert_called_once_with(TimeCommandRequest(**req)) # Test date_time command @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ ( dict(key=1, epoch_seconds=1735648230), dict(key=1, epoch_seconds=1735648230), ), ( dict(key=1, epoch_seconds=1735689600), dict(key=1, epoch_seconds=1735689600), ), ], ) async def test_datetime_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) auth_client.datetime_command(**cmd) send.assert_called_once_with(DateTimeCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ (dict(key=1, command=LockCommand.LOCK), dict(key=1, command=LockCommand.LOCK)), ( dict(key=1, command=LockCommand.UNLOCK), dict(key=1, command=LockCommand.UNLOCK), ), (dict(key=1, command=LockCommand.OPEN), dict(key=1, command=LockCommand.OPEN)), ( dict(key=1, command=LockCommand.OPEN, code="1234"), dict(key=1, command=LockCommand.OPEN, code="1234"), ), ], ) async def test_lock_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) auth_client.lock_command(**cmd) send.assert_called_once_with(LockCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ (dict(key=1), dict(key=1)), (dict(key=1, position=1.0), dict(key=1, position=1.0, has_position=True)), (dict(key=1, position=0.0), dict(key=1, position=0.0, has_position=True)), (dict(key=1, stop=True), dict(key=1, stop=True)), ], ) async def test_valve_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) auth_client.valve_command(**cmd) send.assert_called_once_with(ValveCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ (dict(key=1), dict(key=1)), (dict(key=1, position=0.5), dict(key=1, has_position=True, position=0.5)), (dict(key=1, position=0.0), dict(key=1, has_position=True, position=0.0)), (dict(key=1, stop=True), dict(key=1, stop=True)), ( dict(key=1, position=1.0), dict(key=1, has_position=True, position=1.0), ), ], ) async def test_valve_command_version_1_1( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) patch_api_version(auth_client, APIVersion(1, 1)) auth_client.valve_command(**cmd) send.assert_called_once_with(ValveCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ (dict(key=1, state="One"), dict(key=1, state="One")), (dict(key=1, state="Two"), dict(key=1, state="Two")), ], ) async def test_select_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) auth_client.select_command(**cmd) send.assert_called_once_with(SelectCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ ( dict(key=1, command=MediaPlayerCommand.MUTE), dict(key=1, has_command=True, command=MediaPlayerCommand.MUTE), ), ( dict(key=1, volume=1.0), dict(key=1, has_volume=True, volume=1.0), ), ( dict(key=1, media_url="http://example.com"), dict(key=1, has_media_url=True, media_url="http://example.com"), ), ( dict(key=1, media_url="http://example.com", announcement=True), dict( key=1, has_media_url=True, media_url="http://example.com", has_announcement=True, announcement=True, ), ), ], ) async def test_media_player_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) auth_client.media_player_command(**cmd) send.assert_called_once_with(MediaPlayerCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ (dict(key=1), dict(key=1)), ], ) async def test_button_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) auth_client.button_command(**cmd) send.assert_called_once_with(ButtonCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ (dict(key=1, state=True), dict(key=1, state=True, has_state=True)), (dict(key=1, state=False), dict(key=1, state=False, has_state=True)), (dict(key=1, state=None), dict(key=1, state=None, has_state=False)), ( dict(key=1, state=True, tone="any"), dict(key=1, state=True, has_state=True, has_tone=True, tone="any"), ), ( dict(key=1, state=True, tone=None), dict(key=1, state=True, has_state=True, has_tone=False, tone=None), ), ( dict(key=1, state=True, volume=5), dict(key=1, state=True, has_volume=True, volume=5, has_state=True), ), ( dict(key=1, state=True, duration=5), dict(key=1, state=True, has_duration=True, duration=5, has_state=True), ), ], ) async def test_siren_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) auth_client.siren_command(**cmd) send.assert_called_once_with(SirenCommandRequest(**req)) @pytest.mark.asyncio async def test_execute_service(auth_client: APIClient) -> None: send = patch_send(auth_client) patch_api_version(auth_client, APIVersion(1, 3)) service = UserService( name="my_service", key=1, args=[ UserServiceArg(name="arg1", type=UserServiceArgType.BOOL), UserServiceArg(name="arg2", type=UserServiceArgType.INT), UserServiceArg(name="arg3", type=UserServiceArgType.FLOAT), UserServiceArg(name="arg4", type=UserServiceArgType.STRING), UserServiceArg(name="arg5", type=UserServiceArgType.BOOL_ARRAY), UserServiceArg(name="arg6", type=UserServiceArgType.INT_ARRAY), UserServiceArg(name="arg7", type=UserServiceArgType.FLOAT_ARRAY), UserServiceArg(name="arg8", type=UserServiceArgType.STRING_ARRAY), ], ) with pytest.raises(KeyError): auth_client.execute_service(service, data={}) auth_client.execute_service( service, data={ "arg1": False, "arg2": 42, "arg3": 99.0, "arg4": "asdf", "arg5": [False, True, False], "arg6": [42, 10, 9], "arg7": [0.0, -100.0], "arg8": [], }, ) send.assert_called_once_with( ExecuteServiceRequest( key=1, args=[ ExecuteServiceArgument(bool_=False), ExecuteServiceArgument(int_=42), ExecuteServiceArgument(float_=99.0), ExecuteServiceArgument(string_="asdf"), ExecuteServiceArgument(bool_array=[False, True, False]), ExecuteServiceArgument(int_array=[42, 10, 9]), ExecuteServiceArgument(float_array=[0.0, -100.0]), ExecuteServiceArgument(string_array=[]), ], ) ) send.reset_mock() patch_api_version(auth_client, APIVersion(1, 2)) service = UserService( name="my_service", key=2, args=[ UserServiceArg(name="arg1", type=UserServiceArgType.BOOL), UserServiceArg(name="arg2", type=UserServiceArgType.INT), ], ) # Test legacy_int auth_client.execute_service( service, data={ "arg1": False, "arg2": 42, }, ) send.assert_called_once_with( ExecuteServiceRequest( key=2, args=[ ExecuteServiceArgument(bool_=False), ExecuteServiceArgument(legacy_int=42), ], ) ) send.reset_mock() # Test arg order auth_client.execute_service( service, data={ "arg2": 42, "arg1": False, }, ) send.assert_called_once_with( ExecuteServiceRequest( key=2, args=[ ExecuteServiceArgument(bool_=False), ExecuteServiceArgument(legacy_int=42), ], ) ) send.reset_mock() @pytest.mark.asyncio async def test_request_single_image(auth_client: APIClient) -> None: send = patch_send(auth_client) auth_client.request_single_image() send.assert_called_once_with(CameraImageRequest(single=True, stream=False)) @pytest.mark.asyncio async def test_request_image_stream(auth_client: APIClient) -> None: send = patch_send(auth_client) auth_client.request_image_stream() send.assert_called_once_with(CameraImageRequest(single=False, stream=True)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ ( dict(key=1, command=AlarmControlPanelCommand.ARM_AWAY), dict(key=1, command=AlarmControlPanelCommand.ARM_AWAY, code=None), ), ( dict(key=1, command=AlarmControlPanelCommand.ARM_HOME), dict(key=1, command=AlarmControlPanelCommand.ARM_HOME, code=None), ), ( dict(key=1, command=AlarmControlPanelCommand.DISARM, code="1234"), dict(key=1, command=AlarmControlPanelCommand.DISARM, code="1234"), ), ], ) async def test_alarm_panel_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) auth_client.alarm_control_panel_command(**cmd) send.assert_called_once_with(AlarmControlPanelCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ (dict(key=1, state="hello world"), dict(key=1, state="hello world")), (dict(key=1, state="goodbye"), dict(key=1, state="goodbye")), ], ) async def test_text_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) auth_client.text_command(**cmd) send.assert_called_once_with(TextCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ ( dict(key=1, command=UpdateCommand.INSTALL), dict(key=1, command=UpdateCommand.INSTALL), ), ( dict(key=1, command=UpdateCommand.CHECK), dict(key=1, command=UpdateCommand.CHECK), ), ], ) async def test_update_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) auth_client.update_command(**cmd) send.assert_called_once_with(UpdateCommandRequest(**req)) @pytest.mark.asyncio async def test_noise_psk_handles_subclassed_string(): """Test that the noise_psk gets converted to a string.""" class PatchableAPIClient(APIClient): pass cli = PatchableAPIClient( address=Estr("127.0.0.1"), port=6052, password=None, noise_psk=Estr("QRTIErOb/fcE9Ukd/5qA3RGYMn0Y+p06U58SCtOXvPc="), expected_name=Estr("mydevice"), ) # Make sure its not a subclassed string assert type(cli._params.noise_psk) is str assert type(cli._params.addresses[0]) is str assert type(cli._params.expected_name) is str rl = ReconnectLogic( client=cli, on_disconnect=AsyncMock(), on_connect=AsyncMock(), zeroconf_instance=get_mock_zeroconf(), name="mydevice", ) assert rl._connection_state is ReconnectLogicState.DISCONNECTED with patch.object(cli, "start_connection"), patch.object(cli, "finish_connection"): await rl.start() for _ in range(3): await asyncio.sleep(0) rl.stop_callback() # Wait for cancellation to propagate for _ in range(4): await asyncio.sleep(0) assert rl._connection_state is ReconnectLogicState.DISCONNECTED @pytest.mark.asyncio async def test_no_noise_psk(): """Test not using a noise_psk.""" cli = APIClient( address=Estr("127.0.0.1"), port=6052, password=None, noise_psk=None, expected_name=Estr("mydevice"), ) # Make sure its not a subclassed string assert cli._params.noise_psk is None assert type(cli._params.addresses[0]) is str assert type(cli._params.expected_name) is str @pytest.mark.asyncio async def test_empty_noise_psk_or_expected_name(): """Test an empty noise_psk is treated as None.""" cli = APIClient( address=Estr("127.0.0.1"), port=6052, password=None, noise_psk="", expected_name="", ) assert cli._params.noise_psk is None assert type(cli._params.addresses[0]) is str assert cli._params.expected_name is None @pytest.mark.asyncio async def test_bluetooth_disconnect( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test bluetooth_device_disconnect.""" client, connection, transport, protocol = api_client disconnect_task = asyncio.create_task(client.bluetooth_device_disconnect(1234)) await asyncio.sleep(0) response: message.Message = BluetoothDeviceConnectionResponse( address=1234, connected=False ) mock_data_received(protocol, generate_plaintext_packet(response)) await disconnect_task @pytest.mark.asyncio async def test_bluetooth_pair( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test bluetooth_device_pair.""" client, connection, transport, protocol = api_client pair_task = asyncio.create_task(client.bluetooth_device_pair(1234)) await asyncio.sleep(0) response: message.Message = BluetoothDevicePairingResponse(address=4567) mock_data_received(protocol, generate_plaintext_packet(response)) await asyncio.sleep(0) assert not pair_task.done() response: message.Message = BluetoothDevicePairingResponse(address=1234) mock_data_received(protocol, generate_plaintext_packet(response)) await pair_task @pytest.mark.asyncio async def test_bluetooth_pair_connection_drops( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test connection drop during bluetooth_device_pair.""" client, connection, transport, protocol = api_client pair_task = asyncio.create_task(client.bluetooth_device_pair(1234)) await asyncio.sleep(0) response: message.Message = BluetoothDeviceConnectionResponse( address=1234, connected=False, error=13 ) mock_data_received(protocol, generate_plaintext_packet(response)) message = ( "Peripheral 00:00:00:00:04:D2 changed connection status while waiting" " for BluetoothDevicePairingResponse: Invalid attribute length" ) with pytest.raises(BluetoothConnectionDroppedError, match=message): await pair_task @pytest.mark.asyncio async def test_bluetooth_unpair_connection_drops( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test connection drop during bluetooth_device_unpair.""" client, connection, transport, protocol = api_client pair_task = asyncio.create_task(client.bluetooth_device_unpair(1234)) await asyncio.sleep(0) response: message.Message = BluetoothDeviceConnectionResponse( address=1234, connected=False, error=13 ) mock_data_received(protocol, generate_plaintext_packet(response)) message = ( "Peripheral 00:00:00:00:04:D2 changed connection status while waiting" " for BluetoothDeviceUnpairingResponse: Invalid attribute length" ) with pytest.raises(BluetoothConnectionDroppedError, match=message): await pair_task @pytest.mark.asyncio async def test_bluetooth_clear_cache_connection_drops( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test connection drop during bluetooth_device_clear_cache.""" client, connection, transport, protocol = api_client pair_task = asyncio.create_task(client.bluetooth_device_clear_cache(1234)) await asyncio.sleep(0) response: message.Message = BluetoothDeviceConnectionResponse( address=1234, connected=False, error=13 ) mock_data_received(protocol, generate_plaintext_packet(response)) message = ( "Peripheral 00:00:00:00:04:D2 changed connection status while waiting" " for BluetoothDeviceClearCacheResponse: Invalid attribute length" ) with pytest.raises(BluetoothConnectionDroppedError, match=message): await pair_task @pytest.mark.asyncio async def test_bluetooth_unpair( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test bluetooth_device_unpair.""" client, connection, transport, protocol = api_client unpair_task = asyncio.create_task(client.bluetooth_device_unpair(1234)) await asyncio.sleep(0) response: message.Message = BluetoothDeviceUnpairingResponse(address=1234) mock_data_received(protocol, generate_plaintext_packet(response)) await unpair_task @pytest.mark.asyncio async def test_bluetooth_clear_cache( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test bluetooth_device_clear_cache.""" client, connection, transport, protocol = api_client clear_task = asyncio.create_task(client.bluetooth_device_clear_cache(1234)) await asyncio.sleep(0) response: message.Message = BluetoothDeviceClearCacheResponse(address=1234) mock_data_received(protocol, generate_plaintext_packet(response)) await clear_task @pytest.mark.asyncio async def test_device_info( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test fetching device info.""" client, connection, transport, protocol = api_client assert client.log_name == "fake @ 10.0.0.512" device_info_task = asyncio.create_task(client.device_info()) await asyncio.sleep(0) response: message.Message = DeviceInfoResponse( name="realname", friendly_name="My Device", has_deep_sleep=True, ) mock_data_received(protocol, generate_plaintext_packet(response)) device_info = await device_info_task assert device_info.name == "realname" assert device_info.friendly_name == "My Device" assert device_info.has_deep_sleep assert client.log_name == "realname @ 10.0.0.512" disconnect_task = asyncio.create_task(client.disconnect()) await asyncio.sleep(0) response: message.Message = DisconnectResponse() mock_data_received(protocol, generate_plaintext_packet(response)) await disconnect_task with pytest.raises(APIConnectionError, match="Not connected"): await client.device_info() @pytest.mark.asyncio async def test_bluetooth_gatt_read( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test bluetooth_gatt_read.""" client, connection, transport, protocol = api_client read_task = asyncio.create_task(client.bluetooth_gatt_read(1234, 1234)) await asyncio.sleep(0) other_response: message.Message = BluetoothGATTReadResponse( address=1234, handle=4567, data=b"4567" ) mock_data_received(protocol, generate_plaintext_packet(other_response)) response: message.Message = BluetoothGATTReadResponse( address=1234, handle=1234, data=b"1234" ) mock_data_received(protocol, generate_plaintext_packet(response)) assert await read_task == b"1234" @pytest.mark.asyncio async def test_bluetooth_gatt_read_connection_drops( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test connection drop during bluetooth_gatt_read.""" client, connection, transport, protocol = api_client read_task = asyncio.create_task(client.bluetooth_gatt_read(1234, 1234)) await asyncio.sleep(0) response: message.Message = BluetoothDeviceConnectionResponse( address=1234, connected=False, error=13 ) mock_data_received(protocol, generate_plaintext_packet(response)) message = ( "Peripheral 00:00:00:00:04:D2 changed connection status while waiting" " for BluetoothGATTReadResponse, BluetoothGATTErrorResponse: Invalid attribute length" ) with pytest.raises(BluetoothConnectionDroppedError, match=message): await read_task @pytest.mark.asyncio async def test_bluetooth_gatt_read_error( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test bluetooth_gatt_read that errors.""" client, connection, transport, protocol = api_client read_task = asyncio.create_task(client.bluetooth_gatt_read(1234, 1234)) await asyncio.sleep(0) error_response: message.Message = BluetoothGATTErrorResponse( address=1234, handle=1234 ) mock_data_received(protocol, generate_plaintext_packet(error_response)) with pytest.raises(BluetoothGATTAPIError): await read_task @pytest.mark.asyncio async def test_bluetooth_gatt_read_descriptor( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test bluetooth_gatt_read_descriptor.""" client, connection, transport, protocol = api_client read_task = asyncio.create_task(client.bluetooth_gatt_read_descriptor(1234, 1234)) await asyncio.sleep(0) other_response: message.Message = BluetoothGATTReadResponse( address=1234, handle=4567, data=b"4567" ) mock_data_received(protocol, generate_plaintext_packet(other_response)) response: message.Message = BluetoothGATTReadResponse( address=1234, handle=1234, data=b"1234" ) mock_data_received(protocol, generate_plaintext_packet(response)) assert await read_task == b"1234" @pytest.mark.asyncio async def test_bluetooth_gatt_write( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test bluetooth_gatt_write.""" client, connection, transport, protocol = api_client write_task = asyncio.create_task( client.bluetooth_gatt_write(1234, 1234, b"1234", True) ) await asyncio.sleep(0) other_response: message.Message = BluetoothGATTWriteResponse( address=1234, handle=4567 ) mock_data_received(protocol, generate_plaintext_packet(other_response)) response: message.Message = BluetoothGATTWriteResponse(address=1234, handle=1234) mock_data_received(protocol, generate_plaintext_packet(response)) await write_task @pytest.mark.asyncio async def test_bluetooth_gatt_write_connection_drops( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test connection drop during bluetooth_gatt_read.""" client, connection, transport, protocol = api_client write_task = asyncio.create_task( client.bluetooth_gatt_write(1234, 1234, b"1234", True) ) await asyncio.sleep(0) response: message.Message = BluetoothDeviceConnectionResponse( address=1234, connected=False, error=13 ) mock_data_received(protocol, generate_plaintext_packet(response)) message = ( "Peripheral 00:00:00:00:04:D2 changed connection status while waiting" " for BluetoothGATTWriteResponse, BluetoothGATTErrorResponse: Invalid attribute length" ) with pytest.raises(BluetoothConnectionDroppedError, match=message): await write_task @pytest.mark.asyncio async def test_bluetooth_gatt_write_without_response( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test bluetooth_gatt_write without response.""" client, connection, transport, protocol = api_client transport.reset_mock() write_task = asyncio.create_task( client.bluetooth_gatt_write(1234, 1234, b"1234", False) ) await asyncio.sleep(0) await write_task assert transport.mock_calls[0][1][0] == b'\x00\x0cK\x08\xd2\t\x10\xd2\t"\x041234' with pytest.raises(TimeoutAPIError, match="BluetoothGATTWriteResponse"): await client.bluetooth_gatt_write(1234, 1234, b"1234", True, timeout=0) @pytest.mark.asyncio async def test_bluetooth_gatt_write_descriptor( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test bluetooth_gatt_write_descriptor.""" client, connection, transport, protocol = api_client write_task = asyncio.create_task( client.bluetooth_gatt_write_descriptor(1234, 1234, b"1234", True) ) await asyncio.sleep(0) other_response: message.Message = BluetoothGATTWriteResponse( address=1234, handle=4567 ) mock_data_received(protocol, generate_plaintext_packet(other_response)) response: message.Message = BluetoothGATTWriteResponse(address=1234, handle=1234) mock_data_received(protocol, generate_plaintext_packet(response)) await write_task @pytest.mark.asyncio async def test_bluetooth_gatt_write_descriptor_without_response( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test bluetooth_gatt_write_descriptor without response.""" client, connection, transport, protocol = api_client transport.reset_mock() write_task = asyncio.create_task( client.bluetooth_gatt_write_descriptor( 1234, 1234, b"1234", wait_for_response=False ) ) await asyncio.sleep(0) await write_task assert transport.mock_calls[0][1][0] == b"\x00\x0cM\x08\xd2\t\x10\xd2\t\x1a\x041234" with pytest.raises(TimeoutAPIError, match="BluetoothGATTWriteResponse"): await client.bluetooth_gatt_write_descriptor(1234, 1234, b"1234", timeout=0) @pytest.mark.asyncio async def test_bluetooth_gatt_get_services_connection_drops( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test connection drop during bluetooth_gatt_get_services.""" client, connection, transport, protocol = api_client services_task = asyncio.create_task(client.bluetooth_gatt_get_services(1234)) await asyncio.sleep(0) response: message.Message = BluetoothDeviceConnectionResponse( address=1234, connected=False, error=13 ) mock_data_received(protocol, generate_plaintext_packet(response)) message = ( "Peripheral 00:00:00:00:04:D2 changed connection status while waiting" " for BluetoothGATTGetServicesResponse, BluetoothGATTGetServicesDoneResponse, " "BluetoothGATTErrorResponse: Invalid attribute length" ) with pytest.raises(BluetoothConnectionDroppedError, match=message): await services_task @pytest.mark.asyncio async def test_bluetooth_gatt_get_services( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test bluetooth_gatt_get_services success case.""" client, connection, transport, protocol = api_client services_task = asyncio.create_task(client.bluetooth_gatt_get_services(1234)) await asyncio.sleep(0) service1: message.Message = BluetoothGATTService( uuid=[1, 1], handle=1, characteristics=[ BluetoothGATTCharacteristic( uuid=[1, 2], handle=2, properties=1, descriptors=[BluetoothGATTDescriptor(uuid=[1, 3], handle=3)], ) ], ) response: message.Message = BluetoothGATTGetServicesResponse( address=1234, services=[service1] ) mock_data_received(protocol, generate_plaintext_packet(response)) done_response: message.Message = BluetoothGATTGetServicesDoneResponse(address=1234) mock_data_received(protocol, generate_plaintext_packet(done_response)) services = await services_task service = BluetoothGATTServiceModel.from_pb(service1) assert services == ESPHomeBluetoothGATTServices( address=1234, services=[service], ) @pytest.mark.asyncio async def test_bluetooth_gatt_get_services_errors( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test bluetooth_gatt_get_services with a failure.""" client, connection, transport, protocol = api_client services_task = asyncio.create_task(client.bluetooth_gatt_get_services(1234)) await asyncio.sleep(0) service1: message.Message = BluetoothGATTService( uuid=[1, 1], handle=1, characteristics=[] ) response: message.Message = BluetoothGATTGetServicesResponse( address=1234, services=[service1] ) mock_data_received(protocol, generate_plaintext_packet(response)) done_response: message.Message = BluetoothGATTErrorResponse(address=1234) mock_data_received(protocol, generate_plaintext_packet(done_response)) with pytest.raises(BluetoothGATTAPIError): await services_task @pytest.mark.asyncio async def test_bluetooth_gatt_start_notify_connection_drops( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test connection drop during bluetooth_gatt_start_notify.""" client, connection, transport, protocol = api_client notify_task = asyncio.create_task( client.bluetooth_gatt_start_notify(1234, 1, lambda handle, data: None) ) await asyncio.sleep(0) response: message.Message = BluetoothDeviceConnectionResponse( address=1234, connected=False, error=13 ) mock_data_received(protocol, generate_plaintext_packet(response)) message = ( "Peripheral 00:00:00:00:04:D2 changed connection status while waiting" " for BluetoothGATTNotifyResponse, BluetoothGATTErrorResponse: Invalid attribute length" ) with pytest.raises(BluetoothConnectionDroppedError, match=message): await notify_task @pytest.mark.asyncio async def test_bluetooth_gatt_start_notify( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test bluetooth_gatt_start_notify.""" client, connection, transport, protocol = api_client notifies = [] handlers_before = len(list(itertools.chain(*connection._message_handlers.values()))) def on_bluetooth_gatt_notify(handle: int, data: bytearray) -> None: notifies.append((handle, data)) notify_task = asyncio.create_task( client.bluetooth_gatt_start_notify(1234, 1, on_bluetooth_gatt_notify) ) await asyncio.sleep(0) notify_response: message.Message = BluetoothGATTNotifyResponse( address=1234, handle=1 ) data_response: message.Message = BluetoothGATTNotifyDataResponse( address=1234, handle=1, data=b"gotit" ) mock_data_received( protocol, generate_plaintext_packet(notify_response) + generate_plaintext_packet(data_response), ) cancel_cb, abort_cb = await notify_task assert notifies == [(1, b"gotit")] second_data_response: message.Message = BluetoothGATTNotifyDataResponse( address=1234, handle=1, data=b"after finished" ) mock_data_received(protocol, generate_plaintext_packet(second_data_response)) assert notifies == [(1, b"gotit"), (1, b"after finished")] await cancel_cb() assert ( len(list(itertools.chain(*connection._message_handlers.values()))) == handlers_before ) # Ensure abort callback is a no-op after cancel # and doesn't raise abort_cb() await client.disconnect(force=True) # Ensure abort callback is a no-op after disconnect # and does not raise await cancel_cb() @pytest.mark.asyncio async def test_bluetooth_gatt_start_notify_fails( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test bluetooth_gatt_start_notify failure does not leak.""" client, connection, transport, protocol = api_client notifies = [] def on_bluetooth_gatt_notify(handle: int, data: bytearray) -> None: notifies.append((handle, data)) handlers_before = len(list(itertools.chain(*connection._message_handlers.values()))) with ( patch.object( connection, "send_messages_await_response_complex", side_effect=APIConnectionError, ), pytest.raises(APIConnectionError), ): await client.bluetooth_gatt_start_notify(1234, 1, on_bluetooth_gatt_notify) assert ( len(list(itertools.chain(*connection._message_handlers.values()))) == handlers_before ) @pytest.mark.asyncio async def test_subscribe_bluetooth_le_advertisements( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test subscribe_bluetooth_le_advertisements.""" client, connection, transport, protocol = api_client advs = [] def on_bluetooth_le_advertisements(adv: BluetoothLEAdvertisement) -> None: advs.append(adv) unsub = client.subscribe_bluetooth_le_advertisements(on_bluetooth_le_advertisements) await asyncio.sleep(0) response: message.Message = BluetoothLEAdvertisementResponse( address=1234, name=b"mydevice", rssi=-50, service_uuids=["1234"], service_data=[ BluetoothServiceData( uuid="1234", data=b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", ) ], manufacturer_data=[ BluetoothServiceData( uuid="1234", data=b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", ) ], address_type=1, ) mock_data_received(protocol, generate_plaintext_packet(response)) assert advs == [ BluetoothLEAdvertisement( address=1234, name="mydevice", rssi=-50, service_uuids=["000034-0000-1000-8000-00805f9b34fb"], manufacturer_data={ 4660: b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" }, service_data={ "000034-0000-1000-8000-00805f9b34fb": b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" }, address_type=1, ) ] advs.clear() response: message.Message = BluetoothLEAdvertisementResponse( address=1234, name=b"mydevice", rssi=-50, service_uuids=[], service_data=[], manufacturer_data=[], address_type=1, ) mock_data_received(protocol, generate_plaintext_packet(response)) assert advs == [ BluetoothLEAdvertisement( address=1234, name="mydevice", rssi=-50, service_uuids=[], manufacturer_data={}, service_data={}, address_type=1, ) ] advs.clear() response: message.Message = BluetoothLEAdvertisementResponse( address=1234, name=b"mydevice", rssi=-50, service_uuids=["1234"], service_data=[ BluetoothServiceData( uuid="1234", legacy_data=b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", ) ], manufacturer_data=[ BluetoothServiceData( uuid="1234", legacy_data=b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", ) ], address_type=1, ) mock_data_received(protocol, generate_plaintext_packet(response)) assert advs == [ BluetoothLEAdvertisement( address=1234, name="mydevice", rssi=-50, service_uuids=["000034-0000-1000-8000-00805f9b34fb"], manufacturer_data={ 4660: b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" }, service_data={ "000034-0000-1000-8000-00805f9b34fb": b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" }, address_type=1, ) ] unsub() @pytest.mark.asyncio async def test_subscribe_bluetooth_le_raw_advertisements( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test subscribe_bluetooth_le_raw_advertisements.""" client, connection, transport, protocol = api_client adv_groups = [] def on_raw_bluetooth_le_advertisements( advs: BluetoothLERawAdvertisementsResponse, ) -> None: adv_groups.append(advs.advertisements) unsub = client.subscribe_bluetooth_le_raw_advertisements( on_raw_bluetooth_le_advertisements ) await asyncio.sleep(0) response: message.Message = BluetoothLERawAdvertisementsResponse( advertisements=[ BluetoothLERawAdvertisement( address=1234, rssi=-50, address_type=1, data=b"1234", ) ] ) mock_data_received(protocol, generate_plaintext_packet(response)) assert len(adv_groups) == 1 first_adv = adv_groups[0][0] assert first_adv.address == 1234 assert first_adv.rssi == -50 assert first_adv.address_type == 1 assert first_adv.data == b"1234" unsub() @pytest.mark.asyncio async def test_subscribe_bluetooth_connections_free( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test subscribe_bluetooth_connections_free.""" client, connection, transport, protocol = api_client connections = [] def on_bluetooth_connections_free(free: int, limit: int) -> None: connections.append((free, limit)) unsub = client.subscribe_bluetooth_connections_free(on_bluetooth_connections_free) await asyncio.sleep(0) response: message.Message = BluetoothConnectionsFreeResponse(free=2, limit=3) mock_data_received(protocol, generate_plaintext_packet(response)) assert connections == [(2, 3)] unsub() @pytest.mark.asyncio async def test_subscribe_home_assistant_states( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test subscribe_home_assistant_states.""" client, connection, transport, protocol = api_client states = [] requests = [] def on_subscribe_home_assistant_states( entity_id: str, attribute: str | None ) -> None: states.append((entity_id, attribute)) def on_request_home_assistant_state(entity_id: str, attribute: str | None) -> None: requests.append((entity_id, attribute)) client.subscribe_home_assistant_states( on_subscribe_home_assistant_states, on_request_home_assistant_state, ) await asyncio.sleep(0) response: message.Message = SubscribeHomeAssistantStateResponse( entity_id="sensor.red", attribute="any" ) mock_data_received(protocol, generate_plaintext_packet(response)) response: message.Message = SubscribeHomeAssistantStateResponse( entity_id="sensor.green" ) mock_data_received(protocol, generate_plaintext_packet(response)) response: message.Message = SubscribeHomeAssistantStateResponse( entity_id="sensor.blue", attribute="any", once=True ) mock_data_received(protocol, generate_plaintext_packet(response)) response: message.Message = SubscribeHomeAssistantStateResponse( entity_id="sensor.white", once=True ) mock_data_received(protocol, generate_plaintext_packet(response)) assert states == [("sensor.red", "any"), ("sensor.green", "")] assert requests == [("sensor.blue", "any"), ("sensor.white", "")] @pytest.mark.asyncio async def test_subscribe_logs(auth_client: APIClient) -> None: send = patch_response_callback(auth_client) on_logs = MagicMock() auth_client.subscribe_logs(on_logs) log_msg = SubscribeLogsResponse(level=1, message=b"asdf") await send(log_msg) on_logs.assert_called_with(log_msg) @pytest.mark.asyncio async def test_send_home_assistant_state(auth_client: APIClient) -> None: send = patch_send(auth_client) auth_client.send_home_assistant_state("binary_sensor.bla", None, "on") send.assert_called_once_with( HomeAssistantStateResponse( entity_id="binary_sensor.bla", state="on", attribute=None ) ) @pytest.mark.asyncio async def test_subscribe_service_calls(auth_client: APIClient) -> None: send = patch_response_callback(auth_client) on_service_call = MagicMock() auth_client.subscribe_service_calls(on_service_call) service_msg = HomeassistantServiceResponse(service="bob") await send(service_msg) on_service_call.assert_called_with(HomeassistantServiceCall.from_pb(service_msg)) @pytest.mark.asyncio async def test_set_debug( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], caplog: pytest.LogCaptureFixture, ) -> None: """Test set_debug.""" client, connection, transport, protocol = api_client response: message.Message = DeviceInfoResponse( name="realname", friendly_name="My Device", has_deep_sleep=True, ) caplog.set_level(logging.DEBUG) client.set_debug(True) assert client.log_name == "fake @ 10.0.0.512" device_info_task = asyncio.create_task(client.device_info()) await asyncio.sleep(0) mock_data_received(protocol, generate_plaintext_packet(response)) await device_info_task assert "My Device" in caplog.text caplog.clear() client.set_debug(False) device_info_task = asyncio.create_task(client.device_info()) await asyncio.sleep(0) mock_data_received(protocol, generate_plaintext_packet(response)) await device_info_task assert "My Device" not in caplog.text @pytest.mark.asyncio async def test_force_disconnect( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test force disconnect can be called multiple times.""" client, connection, transport, protocol = api_client assert connection.is_connected is True assert connection.on_stop is not None await client.disconnect(force=True) assert client._connection is None assert connection.is_connected is False await client.disconnect(force=False) assert connection.is_connected is False @pytest.mark.asyncio @pytest.mark.parametrize( ("has_cache", "feature_flags", "method"), [ (False, BluetoothProxyFeature(0), BluetoothDeviceRequestType.CONNECT), ( False, BluetoothProxyFeature.REMOTE_CACHING, BluetoothDeviceRequestType.CONNECT_V3_WITHOUT_CACHE, ), ( True, BluetoothProxyFeature.REMOTE_CACHING, BluetoothDeviceRequestType.CONNECT_V3_WITH_CACHE, ), ], ) async def test_bluetooth_device_connect( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], has_cache: bool, feature_flags: BluetoothProxyFeature, method: BluetoothDeviceRequestType, ) -> None: """Test bluetooth_device_connect.""" client, connection, transport, protocol = api_client states = [] def on_bluetooth_connection_state(connected: bool, mtu: int, error: int) -> None: states.append((connected, mtu, error)) connect_task = asyncio.create_task( client.bluetooth_device_connect( 1234, on_bluetooth_connection_state, timeout=1, feature_flags=feature_flags, has_cache=has_cache, disconnect_timeout=1, address_type=1, ) ) await asyncio.sleep(0) response: message.Message = BluetoothDeviceConnectionResponse( address=1234, connected=True, mtu=23, error=0 ) mock_data_received(protocol, generate_plaintext_packet(response)) cancel = await connect_task assert states == [(True, 23, 0)] transport.write.assert_called_once_with( generate_plaintext_packet( BluetoothDeviceRequest( address=1234, request_type=method, has_address_type=True, address_type=1, ), ) ) response: message.Message = BluetoothDeviceConnectionResponse( address=1234, connected=False, mtu=23, error=7 ) mock_data_received(protocol, generate_plaintext_packet(response)) await asyncio.sleep(0) assert states == [(True, 23, 0), (False, 23, 7)] cancel() # After cancel, no more messages should called back response: message.Message = BluetoothDeviceConnectionResponse( address=1234, connected=False, mtu=23, error=8 ) mock_data_received(protocol, generate_plaintext_packet(response)) await asyncio.sleep(0) assert states == [(True, 23, 0), (False, 23, 7)] # Make sure cancel is safe to call again cancel() await client.disconnect(force=True) await asyncio.sleep(0) assert not client._connection # Make sure cancel is safe to call after disconnect cancel() @pytest.mark.asyncio async def test_bluetooth_device_connect_and_disconnect_times_out( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test bluetooth_device_connect and disconnect times out.""" client, connection, transport, protocol = api_client states = [] def on_bluetooth_connection_state(connected: bool, mtu: int, error: int) -> None: states.append((connected, mtu, error)) connect_task = asyncio.create_task( client.bluetooth_device_connect( 1234, on_bluetooth_connection_state, timeout=0, feature_flags=0, has_cache=True, disconnect_timeout=0, address_type=1, ) ) with pytest.raises(TimeoutAPIError): await connect_task assert states == [] @pytest.mark.asyncio async def test_bluetooth_device_connect_times_out_disconnect_ok( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test bluetooth_device_connect and disconnect times out.""" client, connection, transport, protocol = api_client states = [] def on_bluetooth_connection_state(connected: bool, mtu: int, error: int) -> None: states.append((connected, mtu, error)) connect_task = asyncio.create_task( client.bluetooth_device_connect( 1234, on_bluetooth_connection_state, timeout=0, feature_flags=0, has_cache=True, disconnect_timeout=1, address_type=1, ) ) await asyncio.sleep(0) # The connect request should be written assert len(transport.write.mock_calls) == 1 await asyncio.sleep(0) await asyncio.sleep(0) await asyncio.sleep(0) # Now that we timed out, the disconnect # request should be written assert len(transport.write.mock_calls) == 2 response: message.Message = BluetoothDeviceConnectionResponse( address=1234, connected=False, mtu=23, error=8 ) mock_data_received(protocol, generate_plaintext_packet(response)) with pytest.raises(TimeoutAPIError): await connect_task assert states == [] @pytest.mark.asyncio async def test_bluetooth_device_connect_cancelled( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test bluetooth_device_connect handles cancellation.""" client, connection, transport, protocol = api_client states = [] handlers_before = len(list(itertools.chain(*connection._message_handlers.values()))) def on_bluetooth_connection_state(connected: bool, mtu: int, error: int) -> None: states.append((connected, mtu, error)) connect_task = asyncio.create_task( client.bluetooth_device_connect( 1234, on_bluetooth_connection_state, timeout=10, feature_flags=0, has_cache=True, disconnect_timeout=10, address_type=1, ) ) await asyncio.sleep(0) # The connect request should be written assert len(transport.write.mock_calls) == 1 connect_task.cancel() with pytest.raises(asyncio.CancelledError): await connect_task assert states == [] handlers_after = len(list(itertools.chain(*connection._message_handlers.values()))) # Make sure we do not leak message handlers assert handlers_after == handlers_before @pytest.mark.asyncio async def test_send_voice_assistant_event(auth_client: APIClient) -> None: send = patch_send(auth_client) auth_client.send_voice_assistant_event( VoiceAssistantEventModelType.VOICE_ASSISTANT_ERROR, {"error": "error", "ok": "ok"}, ) send.assert_called_once_with( VoiceAssistantEventResponse( event_type=VoiceAssistantEventModelType.VOICE_ASSISTANT_ERROR.value, data=[ VoiceAssistantEventData(name="error", value="error"), VoiceAssistantEventData(name="ok", value="ok"), ], ) ) send.reset_mock() auth_client.send_voice_assistant_event( VoiceAssistantEventModelType.VOICE_ASSISTANT_ERROR, None ) send.assert_called_once_with( VoiceAssistantEventResponse( event_type=VoiceAssistantEventModelType.VOICE_ASSISTANT_ERROR.value, data=[], ) ) @pytest.mark.asyncio async def test_subscribe_voice_assistant( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test subscribe_voice_assistant.""" client, connection, transport, protocol = api_client send = patch_send(client) starts = [] stops = [] async def handle_start( conversation_id: str, flags: int, audio_settings: VoiceAssistantAudioSettings, wake_word_phrase: str | None, ) -> int | None: starts.append((conversation_id, flags, audio_settings, wake_word_phrase)) return 42 async def handle_stop() -> None: stops.append(True) unsub = client.subscribe_voice_assistant( handle_start=handle_start, handle_stop=handle_stop ) send.assert_called_once_with(SubscribeVoiceAssistantRequest(subscribe=True)) send.reset_mock() audio_settings = VoiceAssistantAudioSettings( noise_suppression_level=42, auto_gain=42, volume_multiplier=42, ) response: message.Message = VoiceAssistantRequest( conversation_id="theone", start=True, flags=42, audio_settings=audio_settings, wake_word_phrase="okay nabu", ) mock_data_received(protocol, generate_plaintext_packet(response)) await asyncio.sleep(0) await asyncio.sleep(0) assert starts == [ ( "theone", 42, VoiceAssistantAudioSettingsModel( noise_suppression_level=42, auto_gain=42, volume_multiplier=42, ), "okay nabu", ) ] assert stops == [] send.assert_called_once_with(VoiceAssistantResponse(port=42)) send.reset_mock() response: message.Message = VoiceAssistantRequest( conversation_id="theone", start=False, ) mock_data_received(protocol, generate_plaintext_packet(response)) await asyncio.sleep(0) assert stops == [True] send.reset_mock() unsub() send.assert_called_once_with(SubscribeVoiceAssistantRequest(subscribe=False)) send.reset_mock() await client.disconnect(force=True) # Ensure abort callback is a no-op after disconnect # and does not raise unsub() assert len(send.mock_calls) == 0 @pytest.mark.asyncio async def test_subscribe_voice_assistant_failure( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test subscribe_voice_assistant failure.""" client, connection, transport, protocol = api_client send = patch_send(client) starts = [] stops = [] async def handle_start( conversation_id: str, flags: int, audio_settings: VoiceAssistantAudioSettings, wake_word_phrase: str | None, ) -> int | None: starts.append((conversation_id, flags, audio_settings, wake_word_phrase)) # Return None to indicate failure return None async def handle_stop() -> None: stops.append(True) unsub = client.subscribe_voice_assistant( handle_start=handle_start, handle_stop=handle_stop ) send.assert_called_once_with(SubscribeVoiceAssistantRequest(subscribe=True)) send.reset_mock() audio_settings = VoiceAssistantAudioSettings( noise_suppression_level=42, auto_gain=42, volume_multiplier=42, ) response: message.Message = VoiceAssistantRequest( conversation_id="theone", start=True, flags=42, audio_settings=audio_settings, ) mock_data_received(protocol, generate_plaintext_packet(response)) await asyncio.sleep(0) await asyncio.sleep(0) assert starts == [ ( "theone", 42, VoiceAssistantAudioSettingsModel( noise_suppression_level=42, auto_gain=42, volume_multiplier=42, ), None, ) ] assert stops == [] send.assert_called_once_with(VoiceAssistantResponse(error=True)) send.reset_mock() response: message.Message = VoiceAssistantRequest( conversation_id="theone", start=False, ) mock_data_received(protocol, generate_plaintext_packet(response)) await asyncio.sleep(0) assert stops == [True] send.reset_mock() unsub() send.assert_called_once_with(SubscribeVoiceAssistantRequest(subscribe=False)) send.reset_mock() await client.disconnect(force=True) # Ensure abort callback is a no-op after disconnect # and does not raise unsub() assert len(send.mock_calls) == 0 @pytest.mark.asyncio async def test_subscribe_voice_assistant_cancels_long_running_handle_start( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test subscribe_voice_assistant cancels long running tasks on unsub.""" client, connection, transport, protocol = api_client send = patch_send(client) starts = [] stops = [] async def handle_start( conversation_id: str, flags: int, audio_settings: VoiceAssistantAudioSettings, wake_word_phrase: str | None, ) -> int | None: starts.append((conversation_id, flags, audio_settings, wake_word_phrase)) await asyncio.sleep(10) # Return None to indicate failure starts.append("never") return None async def handle_stop() -> None: stops.append(True) unsub = client.subscribe_voice_assistant( handle_start=handle_start, handle_stop=handle_stop ) send.assert_called_once_with(SubscribeVoiceAssistantRequest(subscribe=True)) send.reset_mock() audio_settings = VoiceAssistantAudioSettings( noise_suppression_level=42, auto_gain=42, volume_multiplier=42, ) response: message.Message = VoiceAssistantRequest( conversation_id="theone", start=True, flags=42, audio_settings=audio_settings, ) mock_data_received(protocol, generate_plaintext_packet(response)) await asyncio.sleep(0) await asyncio.sleep(0) unsub() await asyncio.sleep(0) assert not stops assert starts == [ ( "theone", 42, VoiceAssistantAudioSettingsModel( noise_suppression_level=42, auto_gain=42, volume_multiplier=42, ), None, ) ] @pytest.mark.asyncio async def test_subscribe_voice_assistant_api_audio( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test subscribe_voice_assistant.""" client, connection, transport, protocol = api_client send = patch_send(client) starts = [] stops = [] data_received = 0 async def handle_start( conversation_id: str, flags: int, audio_settings: VoiceAssistantAudioSettings, wake_word_phrase: str | None, ) -> int | None: starts.append((conversation_id, flags, audio_settings, wake_word_phrase)) return 0 async def handle_stop() -> None: stops.append(True) async def handle_audio(data: bytes) -> None: nonlocal data_received data_received += len(data) unsub = client.subscribe_voice_assistant( handle_start=handle_start, handle_stop=handle_stop, handle_audio=handle_audio ) send.assert_called_once_with( SubscribeVoiceAssistantRequest(subscribe=True, flags=4) ) send.reset_mock() audio_settings = VoiceAssistantAudioSettings( noise_suppression_level=42, auto_gain=42, volume_multiplier=42, ) response: message.Message = VoiceAssistantRequest( conversation_id="theone", start=True, flags=42, audio_settings=audio_settings, wake_word_phrase="okay nabu", ) mock_data_received(protocol, generate_plaintext_packet(response)) await asyncio.sleep(0) await asyncio.sleep(0) assert starts == [ ( "theone", 42, VoiceAssistantAudioSettingsModel( noise_suppression_level=42, auto_gain=42, volume_multiplier=42, ), "okay nabu", ) ] assert stops == [] send.assert_called_once_with(VoiceAssistantResponse(port=0)) send.reset_mock() response: message.Message = VoiceAssistantAudio( data=bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), ) mock_data_received(protocol, generate_plaintext_packet(response)) await asyncio.sleep(0) assert data_received == 10 response: message.Message = VoiceAssistantAudio( end=True, ) mock_data_received(protocol, generate_plaintext_packet(response)) await asyncio.sleep(0) assert stops == [True] send.reset_mock() client.send_voice_assistant_audio(bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])) send.assert_called_once_with( VoiceAssistantAudio(data=bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])) ) response: message.Message = VoiceAssistantRequest( conversation_id="theone", start=False, ) mock_data_received(protocol, generate_plaintext_packet(response)) await asyncio.sleep(0) assert stops == [True, True] send.reset_mock() unsub() send.assert_called_once_with(SubscribeVoiceAssistantRequest(subscribe=False)) send.reset_mock() await client.disconnect(force=True) # Ensure abort callback is a no-op after disconnect # and does not raise unsub() assert len(send.mock_calls) == 0 @pytest.mark.asyncio async def test_send_voice_assistant_timer_event(auth_client: APIClient) -> None: send = patch_send(auth_client) auth_client.send_voice_assistant_timer_event( VoiceAssistantTimerEventModelType.VOICE_ASSISTANT_TIMER_STARTED, timer_id="test", name="test", total_seconds=99, seconds_left=45, is_active=True, ) send.assert_called_once_with( VoiceAssistantTimerEventResponse( event_type=VoiceAssistantTimerEventModelType.VOICE_ASSISTANT_TIMER_STARTED, timer_id="test", name="test", total_seconds=99, seconds_left=45, is_active=True, ) ) @pytest.mark.asyncio async def test_api_version_after_connection_closed( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test api version is None after connection close.""" client, connection, transport, protocol = api_client assert client.api_version == APIVersion(1, 9) await client.disconnect(force=True) assert client.api_version is None @pytest.mark.asyncio async def test_calls_after_connection_closed( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test calls after connection close should raise APIConnectionError.""" client, connection, transport, protocol = api_client assert client.api_version == APIVersion(1, 9) await client.disconnect(force=True) assert client.api_version is None service = UserService( name="my_service", key=1, args=[], ) with pytest.raises(APIConnectionError): client.execute_service(service, {}) for method in ( client.button_command, client.climate_command, client.cover_command, client.fan_command, client.light_command, client.valve_command, client.media_player_command, client.siren_command, ): with pytest.raises(APIConnectionError): await method(1) with pytest.raises(APIConnectionError): await client.alarm_control_panel_command(1, AlarmControlPanelCommand.ARM_HOME) with pytest.raises(APIConnectionError): await client.date_command(1, 1, 1, 1) with pytest.raises(APIConnectionError): await client.lock_command(1, LockCommand.LOCK) with pytest.raises(APIConnectionError): await client.number_command(1, 1) with pytest.raises(APIConnectionError): await client.select_command(1, "1") with pytest.raises(APIConnectionError): await client.switch_command(1, True) with pytest.raises(APIConnectionError): await client.text_command(1, "1") with pytest.raises(APIConnectionError): await client.update_command(1, True)