from __future__ import annotations import asyncio from typing import Any from unittest.mock import AsyncMock, MagicMock, patch import pytest from google.protobuf import message from aioesphomeapi._frame_helper.plain_text import APIPlaintextFrameHelper from aioesphomeapi.api_pb2 import ( AlarmControlPanelCommandRequest, BinarySensorStateResponse, BluetoothDeviceClearCacheResponse, BluetoothDeviceConnectionResponse, BluetoothDevicePairingResponse, BluetoothDeviceUnpairingResponse, ButtonCommandRequest, CameraImageRequest, CameraImageResponse, ClimateCommandRequest, CoverCommandRequest, DeviceInfoResponse, DisconnectResponse, ExecuteServiceArgument, ExecuteServiceRequest, FanCommandRequest, LightCommandRequest, ListEntitiesBinarySensorResponse, ListEntitiesDoneResponse, ListEntitiesServicesResponse, LockCommandRequest, MediaPlayerCommandRequest, NumberCommandRequest, SelectCommandRequest, SwitchCommandRequest, TextCommandRequest, ) from aioesphomeapi.client import APIClient from aioesphomeapi.connection import APIConnection from aioesphomeapi.core import APIConnectionError from aioesphomeapi.model import ( AlarmControlPanelCommand, APIVersion, BinarySensorInfo, BinarySensorState, CameraState, ClimateFanMode, ClimateMode, ClimatePreset, ClimateSwingMode, FanDirection, FanSpeed, LegacyCoverCommand, LockCommand, MediaPlayerCommand, UserService, UserServiceArg, UserServiceArgType, ) from aioesphomeapi.reconnect_logic import ReconnectLogic, ReconnectLogicState from .common import Estr, generate_plaintext_packet, get_mock_zeroconf @pytest.fixture def auth_client(): client = APIClient( address="fake.address", port=6052, password=None, ) with patch.object(client, "_connection") as conn: conn.is_connected = True yield client def patch_response_complex(client: APIClient, messages): async def patched(req, app, stop, msg_types, timeout): resp = [] for msg in messages: if app(msg): resp.append(msg) if stop(msg): break else: raise ValueError("Response never stopped") return resp client._connection.send_messages_await_response_complex = patched def patch_response_callback(client: APIClient): on_message = None def patched(req, callback, msg_types): nonlocal on_message on_message = callback client._connection.send_message_callback_response = patched async def ret(send): on_message(send) return ret def patch_send(client: APIClient): send = client._connection.send_message = AsyncMock() return send def patch_api_version(client: APIClient, version: APIVersion): client._connection.api_version = version @pytest.mark.asyncio @pytest.mark.parametrize( "input, output", [ ( [ListEntitiesBinarySensorResponse(), ListEntitiesDoneResponse()], ([BinarySensorInfo()], []), ), ( [ListEntitiesServicesResponse(), ListEntitiesDoneResponse()], ([], [UserService()]), ), ], ) async def test_list_entities( auth_client: APIClient, input: dict[str, Any], output: dict[str, Any] ) -> None: patch_response_complex(auth_client, input) resp = await auth_client.list_entities_services() assert resp == output @pytest.mark.asyncio async def test_subscribe_states(auth_client: APIClient) -> None: send = patch_response_callback(auth_client) on_state = MagicMock() await auth_client.subscribe_states(on_state) on_state.assert_not_called() await send(BinarySensorStateResponse()) on_state.assert_called_once_with(BinarySensorState()) @pytest.mark.asyncio async def test_subscribe_states_camera(auth_client: APIClient) -> None: send = patch_response_callback(auth_client) on_state = MagicMock() await auth_client.subscribe_states(on_state) await send(CameraImageResponse(key=1, data=b"asdf")) on_state.assert_not_called() await send(CameraImageResponse(key=1, data=b"qwer", done=True)) on_state.assert_called_once_with(CameraState(key=1, data=b"asdfqwer")) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ (dict(key=1), dict(key=1)), ( dict(key=1, position=1.0), dict( key=1, has_legacy_command=True, legacy_command=LegacyCoverCommand.OPEN ), ), ( dict(key=1, position=0.0), dict( key=1, has_legacy_command=True, legacy_command=LegacyCoverCommand.CLOSE ), ), ( dict(key=1, stop=True), dict( key=1, has_legacy_command=True, legacy_command=LegacyCoverCommand.STOP ), ), ], ) async def test_cover_command_legacy( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) patch_api_version(auth_client, APIVersion(1, 0)) await auth_client.cover_command(**cmd) send.assert_called_once_with(CoverCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ (dict(key=1), dict(key=1)), (dict(key=1, position=0.5), dict(key=1, has_position=True, position=0.5)), (dict(key=1, position=0.0), dict(key=1, has_position=True, position=0.0)), (dict(key=1, stop=True), dict(key=1, stop=True)), ( dict(key=1, position=1.0, tilt=0.8), dict(key=1, has_position=True, position=1.0, has_tilt=True, tilt=0.8), ), ], ) async def test_cover_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) patch_api_version(auth_client, APIVersion(1, 1)) await auth_client.cover_command(**cmd) send.assert_called_once_with(CoverCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ (dict(key=1), dict(key=1)), (dict(key=1, state=True), dict(key=1, has_state=True, state=True)), ( dict(key=1, speed=FanSpeed.LOW), dict(key=1, has_speed=True, speed=FanSpeed.LOW), ), ( dict(key=1, speed_level=10), dict(key=1, has_speed_level=True, speed_level=10), ), ( dict(key=1, oscillating=False), dict(key=1, has_oscillating=True, oscillating=False), ), ( dict(key=1, direction=FanDirection.REVERSE), dict(key=1, has_direction=True, direction=FanDirection.REVERSE), ), ], ) async def test_fan_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) await auth_client.fan_command(**cmd) send.assert_called_once_with(FanCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ (dict(key=1), dict(key=1)), (dict(key=1, state=True), dict(key=1, has_state=True, state=True)), (dict(key=1, brightness=0.8), dict(key=1, has_brightness=True, brightness=0.8)), ( dict(key=1, rgb=(0.1, 0.5, 1.0)), dict(key=1, has_rgb=True, red=0.1, green=0.5, blue=1.0), ), (dict(key=1, white=0.0), dict(key=1, has_white=True, white=0.0)), ( dict(key=1, color_temperature=0.0), dict(key=1, has_color_temperature=True, color_temperature=0.0), ), ( dict(key=1, transition_length=0.1), dict(key=1, has_transition_length=True, transition_length=100), ), ( dict(key=1, flash_length=0.1), dict(key=1, has_flash_length=True, flash_length=100), ), (dict(key=1, effect="special"), dict(key=1, has_effect=True, effect="special")), ], ) async def test_light_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) await auth_client.light_command(**cmd) send.assert_called_once_with(LightCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ (dict(key=1, state=False), dict(key=1, state=False)), (dict(key=1, state=True), dict(key=1, state=True)), ], ) async def test_switch_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) await auth_client.switch_command(**cmd) send.assert_called_once_with(SwitchCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ ( dict(key=1, preset=ClimatePreset.HOME), dict(key=1, has_legacy_away=True, legacy_away=False), ), ( dict(key=1, preset=ClimatePreset.AWAY), dict(key=1, has_legacy_away=True, legacy_away=True), ), ], ) async def test_climate_command_legacy( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) patch_api_version(auth_client, APIVersion(1, 4)) await auth_client.climate_command(**cmd) send.assert_called_once_with(ClimateCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ ( dict(key=1, mode=ClimateMode.HEAT), dict(key=1, has_mode=True, mode=ClimateMode.HEAT), ), ( dict(key=1, target_temperature=21.0), dict(key=1, has_target_temperature=True, target_temperature=21.0), ), ( dict(key=1, target_temperature_low=21.0), dict(key=1, has_target_temperature_low=True, target_temperature_low=21.0), ), ( dict(key=1, target_temperature_high=21.0), dict(key=1, has_target_temperature_high=True, target_temperature_high=21.0), ), ( dict(key=1, fan_mode=ClimateFanMode.LOW), dict(key=1, has_fan_mode=True, fan_mode=ClimateFanMode.LOW), ), ( dict(key=1, swing_mode=ClimateSwingMode.OFF), dict(key=1, has_swing_mode=True, swing_mode=ClimateSwingMode.OFF), ), ( dict(key=1, custom_fan_mode="asdf"), dict(key=1, has_custom_fan_mode=True, custom_fan_mode="asdf"), ), ( dict(key=1, preset=ClimatePreset.AWAY), dict(key=1, has_preset=True, preset=ClimatePreset.AWAY), ), ( dict(key=1, custom_preset="asdf"), dict(key=1, has_custom_preset=True, custom_preset="asdf"), ), ], ) async def test_climate_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) patch_api_version(auth_client, APIVersion(1, 5)) await auth_client.climate_command(**cmd) send.assert_called_once_with(ClimateCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ (dict(key=1, state=0.0), dict(key=1, state=0.0)), (dict(key=1, state=100.0), dict(key=1, state=100.0)), ], ) async def test_number_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) await auth_client.number_command(**cmd) send.assert_called_once_with(NumberCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ (dict(key=1, command=LockCommand.LOCK), dict(key=1, command=LockCommand.LOCK)), ( dict(key=1, command=LockCommand.UNLOCK), dict(key=1, command=LockCommand.UNLOCK), ), (dict(key=1, command=LockCommand.OPEN), dict(key=1, command=LockCommand.OPEN)), ], ) async def test_lock_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) await auth_client.lock_command(**cmd) send.assert_called_once_with(LockCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ (dict(key=1, state="One"), dict(key=1, state="One")), (dict(key=1, state="Two"), dict(key=1, state="Two")), ], ) async def test_select_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) await auth_client.select_command(**cmd) send.assert_called_once_with(SelectCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ ( dict(key=1, command=MediaPlayerCommand.MUTE), dict(key=1, has_command=True, command=MediaPlayerCommand.MUTE), ), ( dict(key=1, volume=1.0), dict(key=1, has_volume=True, volume=1.0), ), ( dict(key=1, media_url="http://example.com"), dict(key=1, has_media_url=True, media_url="http://example.com"), ), ], ) async def test_media_player_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) await auth_client.media_player_command(**cmd) send.assert_called_once_with(MediaPlayerCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ (dict(key=1), dict(key=1)), ], ) async def test_button_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) await auth_client.button_command(**cmd) send.assert_called_once_with(ButtonCommandRequest(**req)) @pytest.mark.asyncio async def test_execute_service(auth_client: APIClient) -> None: send = patch_send(auth_client) patch_api_version(auth_client, APIVersion(1, 3)) service = UserService( name="my_service", key=1, args=[ UserServiceArg(name="arg1", type=UserServiceArgType.BOOL), UserServiceArg(name="arg2", type=UserServiceArgType.INT), UserServiceArg(name="arg3", type=UserServiceArgType.FLOAT), UserServiceArg(name="arg4", type=UserServiceArgType.STRING), UserServiceArg(name="arg5", type=UserServiceArgType.BOOL_ARRAY), UserServiceArg(name="arg6", type=UserServiceArgType.INT_ARRAY), UserServiceArg(name="arg7", type=UserServiceArgType.FLOAT_ARRAY), UserServiceArg(name="arg8", type=UserServiceArgType.STRING_ARRAY), ], ) with pytest.raises(KeyError): await auth_client.execute_service(service, data={}) await auth_client.execute_service( service, data={ "arg1": False, "arg2": 42, "arg3": 99.0, "arg4": "asdf", "arg5": [False, True, False], "arg6": [42, 10, 9], "arg7": [0.0, -100.0], "arg8": [], }, ) send.assert_called_once_with( ExecuteServiceRequest( key=1, args=[ ExecuteServiceArgument(bool_=False), ExecuteServiceArgument(int_=42), ExecuteServiceArgument(float_=99.0), ExecuteServiceArgument(string_="asdf"), ExecuteServiceArgument(bool_array=[False, True, False]), ExecuteServiceArgument(int_array=[42, 10, 9]), ExecuteServiceArgument(float_array=[0.0, -100.0]), ExecuteServiceArgument(string_array=[]), ], ) ) send.reset_mock() patch_api_version(auth_client, APIVersion(1, 2)) service = UserService( name="my_service", key=2, args=[ UserServiceArg(name="arg1", type=UserServiceArgType.BOOL), UserServiceArg(name="arg2", type=UserServiceArgType.INT), ], ) # Test legacy_int await auth_client.execute_service( service, data={ "arg1": False, "arg2": 42, }, ) send.assert_called_once_with( ExecuteServiceRequest( key=2, args=[ ExecuteServiceArgument(bool_=False), ExecuteServiceArgument(legacy_int=42), ], ) ) send.reset_mock() # Test arg order await auth_client.execute_service( service, data={ "arg2": 42, "arg1": False, }, ) send.assert_called_once_with( ExecuteServiceRequest( key=2, args=[ ExecuteServiceArgument(bool_=False), ExecuteServiceArgument(legacy_int=42), ], ) ) send.reset_mock() @pytest.mark.asyncio async def test_request_single_image(auth_client: APIClient) -> None: send = patch_send(auth_client) await auth_client.request_single_image() send.assert_called_once_with(CameraImageRequest(single=True, stream=False)) @pytest.mark.asyncio async def test_request_image_stream(auth_client: APIClient) -> None: send = patch_send(auth_client) await auth_client.request_image_stream() send.assert_called_once_with(CameraImageRequest(single=False, stream=True)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ ( dict(key=1, command=AlarmControlPanelCommand.ARM_AWAY), dict(key=1, command=AlarmControlPanelCommand.ARM_AWAY, code=None), ), ( dict(key=1, command=AlarmControlPanelCommand.ARM_HOME), dict(key=1, command=AlarmControlPanelCommand.ARM_HOME, code=None), ), ( dict(key=1, command=AlarmControlPanelCommand.DISARM, code="1234"), dict(key=1, command=AlarmControlPanelCommand.DISARM, code="1234"), ), ], ) async def test_alarm_panel_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) await auth_client.alarm_control_panel_command(**cmd) send.assert_called_once_with(AlarmControlPanelCommandRequest(**req)) @pytest.mark.asyncio @pytest.mark.parametrize( "cmd, req", [ (dict(key=1, state="hello world"), dict(key=1, state="hello world")), (dict(key=1, state="goodbye"), dict(key=1, state="goodbye")), ], ) async def test_text_command( auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any] ) -> None: send = patch_send(auth_client) await auth_client.text_command(**cmd) send.assert_called_once_with(TextCommandRequest(**req)) @pytest.mark.asyncio async def test_noise_psk_handles_subclassed_string(): """Test that the noise_psk gets converted to a string.""" class PatchableAPIClient(APIClient): pass cli = PatchableAPIClient( address=Estr("1.2.3.4"), port=6052, password=None, noise_psk=Estr("QRTIErOb/fcE9Ukd/5qA3RGYMn0Y+p06U58SCtOXvPc="), expected_name=Estr("mydevice"), ) # Make sure its not a subclassed string assert type(cli._params.noise_psk) is str assert type(cli._params.address) is str assert type(cli._params.expected_name) is str rl = ReconnectLogic( client=cli, on_disconnect=AsyncMock(), on_connect=AsyncMock(), zeroconf_instance=get_mock_zeroconf(), name="mydevice", ) assert rl._connection_state is ReconnectLogicState.DISCONNECTED with patch.object(cli, "start_connection"), patch.object(cli, "finish_connection"): await rl.start() for _ in range(3): await asyncio.sleep(0) rl.stop_callback() # Wait for cancellation to propagate for _ in range(4): await asyncio.sleep(0) assert rl._connection_state is ReconnectLogicState.DISCONNECTED @pytest.mark.asyncio async def test_no_noise_psk(): """Test not using a noise_psk.""" cli = APIClient( address=Estr("1.2.3.4"), port=6052, password=None, noise_psk=None, expected_name=Estr("mydevice"), ) # Make sure its not a subclassed string assert cli._params.noise_psk is None assert type(cli._params.address) is str assert type(cli._params.expected_name) is str @pytest.mark.asyncio async def test_empty_noise_psk_or_expected_name(): """Test an empty noise_psk is treated as None.""" cli = APIClient( address=Estr("1.2.3.4"), port=6052, password=None, noise_psk="", expected_name="", ) assert cli._params.noise_psk is None assert type(cli._params.address) is str assert cli._params.expected_name is None @pytest.mark.asyncio async def test_bluetooth_disconnect( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test bluetooth_device_disconnect.""" client, connection, transport, protocol = api_client disconnect_task = asyncio.create_task(client.bluetooth_device_disconnect(1234)) await asyncio.sleep(0) response: message.Message = BluetoothDeviceConnectionResponse( address=1234, connected=False ) protocol.data_received(generate_plaintext_packet(response)) await disconnect_task @pytest.mark.asyncio async def test_bluetooth_pair( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test bluetooth_device_pair.""" client, connection, transport, protocol = api_client pair_task = asyncio.create_task(client.bluetooth_device_pair(1234)) await asyncio.sleep(0) response: message.Message = BluetoothDevicePairingResponse(address=1234) protocol.data_received(generate_plaintext_packet(response)) await pair_task @pytest.mark.asyncio async def test_bluetooth_unpair( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test bluetooth_device_unpair.""" client, connection, transport, protocol = api_client unpair_task = asyncio.create_task(client.bluetooth_device_unpair(1234)) await asyncio.sleep(0) response: message.Message = BluetoothDeviceUnpairingResponse(address=1234) protocol.data_received(generate_plaintext_packet(response)) await unpair_task @pytest.mark.asyncio async def test_bluetooth_clear_cache( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test bluetooth_device_clear_cache.""" client, connection, transport, protocol = api_client clear_task = asyncio.create_task(client.bluetooth_device_clear_cache(1234)) await asyncio.sleep(0) response: message.Message = BluetoothDeviceClearCacheResponse(address=1234) protocol.data_received(generate_plaintext_packet(response)) await clear_task @pytest.mark.asyncio async def test_device_info( api_client: tuple[ APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper ], ) -> None: """Test fetching device info.""" client, connection, transport, protocol = api_client assert client.log_name == "mydevice.local" device_info_task = asyncio.create_task(client.device_info()) await asyncio.sleep(0) response: message.Message = DeviceInfoResponse( name="realname", friendly_name="My Device", has_deep_sleep=True, ) protocol.data_received(generate_plaintext_packet(response)) device_info = await device_info_task assert device_info.name == "realname" assert device_info.friendly_name == "My Device" assert device_info.has_deep_sleep assert client.log_name == "realname @ 10.0.0.512" disconnect_task = asyncio.create_task(client.disconnect()) await asyncio.sleep(0) response: message.Message = DisconnectResponse() protocol.data_received(generate_plaintext_packet(response)) await disconnect_task with pytest.raises(APIConnectionError, match="CLOSED"): await client.device_info()