aioesphomeapi/tests/test_client.py

2542 lines
79 KiB
Python
Raw Normal View History

from __future__ import annotations
import asyncio
import contextlib
import itertools
import logging
import socket
from functools import partial
2023-11-21 17:09:11 +01:00
from typing import Any
from unittest.mock import AsyncMock, MagicMock, call, create_autospec, patch
2021-07-12 20:09:17 +02:00
import pytest
from google.protobuf import message
2021-07-12 20:09:17 +02:00
from aioesphomeapi._frame_helper.plain_text import APIPlaintextFrameHelper
2021-07-12 20:09:17 +02:00
from aioesphomeapi.api_pb2 import (
AlarmControlPanelCommandRequest,
2021-07-12 20:09:17 +02:00
BinarySensorStateResponse,
BluetoothConnectionsFreeResponse,
BluetoothDeviceClearCacheResponse,
BluetoothDeviceConnectionResponse,
BluetoothDevicePairingResponse,
BluetoothDeviceRequest,
BluetoothDeviceUnpairingResponse,
BluetoothGATTCharacteristic,
BluetoothGATTDescriptor,
BluetoothGATTErrorResponse,
BluetoothGATTGetServicesDoneResponse,
BluetoothGATTGetServicesResponse,
BluetoothGATTNotifyDataResponse,
BluetoothGATTNotifyResponse,
BluetoothGATTReadResponse,
BluetoothGATTService,
BluetoothGATTWriteResponse,
BluetoothLEAdvertisementResponse,
BluetoothLERawAdvertisement,
BluetoothLERawAdvertisementsResponse,
BluetoothServiceData,
2023-11-21 17:09:11 +01:00
ButtonCommandRequest,
2021-07-12 20:09:17 +02:00
CameraImageRequest,
CameraImageResponse,
ClimateCommandRequest,
CoverCommandRequest,
DateCommandRequest,
2024-04-22 05:58:16 +02:00
DateTimeCommandRequest,
DeviceInfoResponse,
DisconnectResponse,
2021-07-12 20:09:17 +02:00
ExecuteServiceArgument,
ExecuteServiceRequest,
FanCommandRequest,
HomeassistantServiceResponse,
HomeAssistantStateResponse,
2021-07-12 20:09:17 +02:00
LightCommandRequest,
ListEntitiesBinarySensorResponse,
ListEntitiesDoneResponse,
ListEntitiesServicesResponse,
2022-01-11 02:29:19 +01:00
LockCommandRequest,
2022-05-18 03:28:40 +02:00
MediaPlayerCommandRequest,
2021-07-12 20:09:17 +02:00
NumberCommandRequest,
2021-07-26 20:51:12 +02:00
SelectCommandRequest,
2023-11-21 17:18:58 +01:00
SirenCommandRequest,
SubscribeHomeAssistantStateResponse,
SubscribeLogsResponse,
SubscribeVoiceAssistantRequest,
2021-07-12 20:09:17 +02:00
SwitchCommandRequest,
TextCommandRequest,
2024-03-20 02:31:00 +01:00
TimeCommandRequest,
ValveCommandRequest,
VoiceAssistantAudio,
VoiceAssistantAudioSettings,
VoiceAssistantEventData,
VoiceAssistantEventResponse,
VoiceAssistantRequest,
VoiceAssistantResponse,
2021-07-12 20:09:17 +02:00
)
from aioesphomeapi.client import APIClient, BluetoothConnectionDroppedError
from aioesphomeapi.connection import APIConnection
from aioesphomeapi.core import (
APIConnectionError,
BluetoothGATTAPIError,
TimeoutAPIError,
UnhandledAPIConnectionError,
)
2021-07-12 20:09:17 +02:00
from aioesphomeapi.model import (
AlarmControlPanelCommand,
2021-07-12 20:09:17 +02:00
APIVersion,
BinarySensorInfo,
BinarySensorState,
BluetoothDeviceRequestType,
)
from aioesphomeapi.model import BluetoothGATTService as BluetoothGATTServiceModel
from aioesphomeapi.model import (
BluetoothLEAdvertisement,
BluetoothProxyFeature,
2021-07-12 20:09:17 +02:00
CameraState,
ClimateFanMode,
ClimateMode,
ClimatePreset,
ClimateSwingMode,
ESPHomeBluetoothGATTServices,
2021-07-12 20:09:17 +02:00
FanDirection,
FanSpeed,
HomeassistantServiceCall,
2021-07-12 20:09:17 +02:00
LegacyCoverCommand,
2023-11-21 17:26:09 +01:00
LightColorCapability,
2022-01-11 02:29:19 +01:00
LockCommand,
2022-05-18 03:28:40 +02:00
MediaPlayerCommand,
2021-07-12 20:09:17 +02:00
UserService,
UserServiceArg,
UserServiceArgType,
)
from aioesphomeapi.model import (
VoiceAssistantAudioSettings as VoiceAssistantAudioSettingsModel,
)
from aioesphomeapi.model import VoiceAssistantEventType as VoiceAssistantEventModelType
from aioesphomeapi.reconnect_logic import ReconnectLogic, ReconnectLogicState
from .common import (
Estr,
generate_plaintext_packet,
get_mock_zeroconf,
mock_data_received,
)
from .conftest import PatchableAPIConnection
2021-07-12 20:09:17 +02:00
@pytest.fixture
2021-10-13 10:15:30 +02:00
def auth_client():
2021-07-12 20:09:17 +02:00
client = APIClient(
address="fake.address",
port=6052,
password=None,
)
with patch.object(client, "_connection") as conn:
conn.is_connected = True
yield client
def patch_response_complex(client: APIClient, messages):
2023-11-21 13:08:48 +01:00
async def patched(req, app, stop, msg_types, timeout):
2021-07-12 20:09:17 +02:00
resp = []
for msg in messages:
if app(msg):
resp.append(msg)
if stop(msg):
break
else:
raise ValueError("Response never stopped")
return resp
client._connection.send_messages_await_response_complex = patched
2021-07-12 20:09:17 +02:00
def patch_response_callback(client: APIClient):
on_message = None
def patched(req, callback, msg_types):
2021-07-12 20:09:17 +02:00
nonlocal on_message
on_message = callback
client._connection.send_message_callback_response = patched
async def ret(send):
on_message(send)
return ret
def patch_send(client: APIClient):
send = client._connection.send_message = MagicMock()
2021-07-12 20:09:17 +02:00
return send
def patch_api_version(client: APIClient, version: APIVersion):
client._connection.api_version = version
@pytest.mark.asyncio
async def test_expected_name(auth_client: APIClient) -> None:
"""Ensure expected name can be set externally."""
assert auth_client.expected_name is None
auth_client.expected_name = "awesome"
assert auth_client.expected_name == "awesome"
@pytest.mark.asyncio
async def test_connect_backwards_compat() -> None:
"""Verify connect is a thin wrapper around start_connection and finish_connection."""
class PatchableApiClient(APIClient):
pass
cli = PatchableApiClient("host", 1234, None)
with (
patch.object(cli, "start_connection") as mock_start_connection,
patch.object(cli, "finish_connection") as mock_finish_connection,
):
await cli.connect()
assert mock_start_connection.mock_calls == [call(None)]
assert mock_finish_connection.mock_calls == [call(False)]
@pytest.mark.asyncio
async def test_finish_connection_wraps_exceptions_as_unhandled_api_error(
aiohappyeyeballs_start_connection,
) -> None:
"""Verify finish_connect re-wraps exceptions as UnhandledAPIError."""
cli = APIClient("127.0.0.1", 1234, None)
with patch("aioesphomeapi.client.APIConnection", PatchableAPIConnection):
await cli.start_connection()
with patch.object(
cli._connection,
"send_messages_await_response_complex",
side_effect=Exception("foo"),
):
with pytest.raises(UnhandledAPIConnectionError, match="foo"):
await cli.finish_connection(False)
@pytest.mark.asyncio
async def test_connection_released_if_connecting_is_cancelled() -> None:
"""Verify connection is unset if connecting is cancelled."""
cli = APIClient("127.0.0.1", 1234, None)
asyncio.get_event_loop()
async def _start_connection_with_delay(*args, **kwargs):
await asyncio.sleep(1)
mock_socket = create_autospec(socket.socket, spec_set=True, instance=True)
mock_socket.getpeername.return_value = ("4.3.3.3", 323)
return mock_socket
with patch(
"aioesphomeapi.connection.aiohappyeyeballs.start_connection",
_start_connection_with_delay,
):
start_task = asyncio.create_task(cli.start_connection())
await asyncio.sleep(0)
assert cli._connection is not None
start_task.cancel()
with contextlib.suppress(BaseException):
await start_task
assert cli._connection is None
async def _start_connection_without_delay(*args, **kwargs):
mock_socket = create_autospec(socket.socket, spec_set=True, instance=True)
mock_socket.getpeername.return_value = ("4.3.3.3", 323)
return mock_socket
with (
patch("aioesphomeapi.client.APIConnection", PatchableAPIConnection),
patch(
"aioesphomeapi.connection.aiohappyeyeballs.start_connection",
_start_connection_without_delay,
),
):
await cli.start_connection()
await asyncio.sleep(0)
assert cli._connection is not None
task = asyncio.create_task(cli.finish_connection(False))
await asyncio.sleep(0)
task.cancel()
with contextlib.suppress(BaseException):
await task
assert cli._connection is None
@pytest.mark.asyncio
async def test_request_while_handshaking() -> None:
"""Test trying a request while handshaking raises."""
class PatchableApiClient(APIClient):
pass
cli = PatchableApiClient("host", 1234, None)
with (
patch(
"aioesphomeapi.connection.aiohappyeyeballs.start_connection",
side_effect=partial(asyncio.sleep, 1),
),
patch.object(cli, "finish_connection"),
):
connect_task = asyncio.create_task(cli.connect())
await asyncio.sleep(0)
with pytest.raises(
APIConnectionError, match="Authenticated connection not ready yet"
):
await cli.device_info()
connect_task.cancel()
await asyncio.sleep(0)
@pytest.mark.asyncio
async def test_connect_while_already_connected(auth_client: APIClient) -> None:
"""Test connecting while already connected raises."""
with pytest.raises(APIConnectionError):
await auth_client.start_connection()
2021-07-12 20:09:17 +02:00
@pytest.mark.asyncio
@pytest.mark.parametrize(
"input, output",
[
(
[ListEntitiesBinarySensorResponse(), ListEntitiesDoneResponse()],
([BinarySensorInfo()], []),
),
(
[ListEntitiesServicesResponse(), ListEntitiesDoneResponse()],
([], [UserService()]),
),
],
)
2023-11-21 17:09:11 +01:00
async def test_list_entities(
auth_client: APIClient, input: dict[str, Any], output: dict[str, Any]
) -> None:
2021-07-12 20:09:17 +02:00
patch_response_complex(auth_client, input)
resp = await auth_client.list_entities_services()
assert resp == output
@pytest.mark.asyncio
2023-11-21 17:09:11 +01:00
async def test_subscribe_states(auth_client: APIClient) -> None:
2021-07-12 20:09:17 +02:00
send = patch_response_callback(auth_client)
on_state = MagicMock()
auth_client.subscribe_states(on_state)
2021-07-12 20:09:17 +02:00
on_state.assert_not_called()
await send(BinarySensorStateResponse())
on_state.assert_called_once_with(BinarySensorState())
@pytest.mark.asyncio
2023-11-21 17:09:11 +01:00
async def test_subscribe_states_camera(auth_client: APIClient) -> None:
2021-07-12 20:09:17 +02:00
send = patch_response_callback(auth_client)
on_state = MagicMock()
auth_client.subscribe_states(on_state)
2021-07-12 20:09:17 +02:00
await send(CameraImageResponse(key=1, data=b"asdf"))
on_state.assert_not_called()
await send(CameraImageResponse(key=1, data=b"qwer", done=True))
on_state.assert_called_once_with(CameraState(key=1, data=b"asdfqwer"))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1), dict(key=1)),
(
dict(key=1, position=1.0),
dict(
key=1, has_legacy_command=True, legacy_command=LegacyCoverCommand.OPEN
),
),
(
dict(key=1, position=0.0),
dict(
key=1, has_legacy_command=True, legacy_command=LegacyCoverCommand.CLOSE
),
),
(
dict(key=1, stop=True),
dict(
key=1, has_legacy_command=True, legacy_command=LegacyCoverCommand.STOP
),
),
],
)
2023-11-21 17:09:11 +01:00
async def test_cover_command_legacy(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
2021-07-12 20:09:17 +02:00
send = patch_send(auth_client)
patch_api_version(auth_client, APIVersion(1, 0))
auth_client.cover_command(**cmd)
2021-07-12 20:09:17 +02:00
send.assert_called_once_with(CoverCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1), dict(key=1)),
(dict(key=1, position=0.5), dict(key=1, has_position=True, position=0.5)),
(dict(key=1, position=0.0), dict(key=1, has_position=True, position=0.0)),
(dict(key=1, stop=True), dict(key=1, stop=True)),
(
dict(key=1, position=1.0, tilt=0.8),
dict(key=1, has_position=True, position=1.0, has_tilt=True, tilt=0.8),
),
],
)
2023-11-21 17:09:11 +01:00
async def test_cover_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
2021-07-12 20:09:17 +02:00
send = patch_send(auth_client)
patch_api_version(auth_client, APIVersion(1, 1))
auth_client.cover_command(**cmd)
2021-07-12 20:09:17 +02:00
send.assert_called_once_with(CoverCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1), dict(key=1)),
(dict(key=1, state=True), dict(key=1, has_state=True, state=True)),
(
dict(key=1, speed=FanSpeed.LOW),
dict(key=1, has_speed=True, speed=FanSpeed.LOW),
),
(
dict(key=1, speed_level=10),
dict(key=1, has_speed_level=True, speed_level=10),
),
(
dict(key=1, oscillating=False),
dict(key=1, has_oscillating=True, oscillating=False),
),
(
dict(key=1, direction=FanDirection.REVERSE),
dict(key=1, has_direction=True, direction=FanDirection.REVERSE),
),
(
dict(key=1, preset_mode="auto"),
dict(key=1, has_preset_mode=True, preset_mode="auto"),
),
2021-07-12 20:09:17 +02:00
],
)
2023-11-21 17:09:11 +01:00
async def test_fan_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
2021-07-12 20:09:17 +02:00
send = patch_send(auth_client)
auth_client.fan_command(**cmd)
2021-07-12 20:09:17 +02:00
send.assert_called_once_with(FanCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1), dict(key=1)),
(dict(key=1, state=True), dict(key=1, has_state=True, state=True)),
(dict(key=1, brightness=0.8), dict(key=1, has_brightness=True, brightness=0.8)),
(
dict(key=1, rgb=(0.1, 0.5, 1.0)),
dict(key=1, has_rgb=True, red=0.1, green=0.5, blue=1.0),
),
(dict(key=1, white=0.0), dict(key=1, has_white=True, white=0.0)),
(
dict(key=1, color_temperature=0.0),
dict(key=1, has_color_temperature=True, color_temperature=0.0),
),
2023-11-21 17:26:09 +01:00
(
dict(key=1, color_brightness=0.0),
dict(key=1, has_color_brightness=True, color_brightness=0.0),
),
(
dict(key=1, cold_white=1.0, warm_white=2.0),
dict(
key=1,
has_cold_white=True,
cold_white=1.0,
has_warm_white=True,
warm_white=2.0,
),
),
2021-07-12 20:09:17 +02:00
(
dict(key=1, transition_length=0.1),
dict(key=1, has_transition_length=True, transition_length=100),
),
(
dict(key=1, flash_length=0.1),
dict(key=1, has_flash_length=True, flash_length=100),
),
(dict(key=1, effect="special"), dict(key=1, has_effect=True, effect="special")),
2023-11-21 17:26:09 +01:00
(
dict(
key=1,
color_mode=LightColorCapability.COLOR_TEMPERATURE,
color_temperature=153.0,
),
dict(
key=1,
has_color_mode=True,
color_mode=LightColorCapability.COLOR_TEMPERATURE,
has_color_temperature=True,
color_temperature=153.0,
),
),
2021-07-12 20:09:17 +02:00
],
)
2023-11-21 17:09:11 +01:00
async def test_light_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
2021-07-12 20:09:17 +02:00
send = patch_send(auth_client)
auth_client.light_command(**cmd)
2021-07-12 20:09:17 +02:00
send.assert_called_once_with(LightCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1, state=False), dict(key=1, state=False)),
(dict(key=1, state=True), dict(key=1, state=True)),
],
)
2023-11-21 17:09:11 +01:00
async def test_switch_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
2021-07-12 20:09:17 +02:00
send = patch_send(auth_client)
auth_client.switch_command(**cmd)
2021-07-12 20:09:17 +02:00
send.assert_called_once_with(SwitchCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(
dict(key=1, preset=ClimatePreset.HOME),
dict(key=1, has_legacy_away=True, legacy_away=False),
),
(
dict(key=1, preset=ClimatePreset.AWAY),
dict(key=1, has_legacy_away=True, legacy_away=True),
),
],
)
2023-11-21 17:09:11 +01:00
async def test_climate_command_legacy(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
2021-07-12 20:09:17 +02:00
send = patch_send(auth_client)
patch_api_version(auth_client, APIVersion(1, 4))
auth_client.climate_command(**cmd)
2021-07-12 20:09:17 +02:00
send.assert_called_once_with(ClimateCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(
dict(key=1, mode=ClimateMode.HEAT),
dict(key=1, has_mode=True, mode=ClimateMode.HEAT),
),
(
dict(key=1, target_temperature=21.0),
dict(key=1, has_target_temperature=True, target_temperature=21.0),
),
(
dict(key=1, target_temperature_low=21.0),
dict(key=1, has_target_temperature_low=True, target_temperature_low=21.0),
),
(
dict(key=1, target_temperature_high=21.0),
dict(key=1, has_target_temperature_high=True, target_temperature_high=21.0),
),
(
dict(key=1, fan_mode=ClimateFanMode.LOW),
dict(key=1, has_fan_mode=True, fan_mode=ClimateFanMode.LOW),
),
(
dict(key=1, swing_mode=ClimateSwingMode.OFF),
dict(key=1, has_swing_mode=True, swing_mode=ClimateSwingMode.OFF),
),
(
dict(key=1, custom_fan_mode="asdf"),
dict(key=1, has_custom_fan_mode=True, custom_fan_mode="asdf"),
),
(
dict(key=1, preset=ClimatePreset.AWAY),
dict(key=1, has_preset=True, preset=ClimatePreset.AWAY),
),
(
dict(key=1, custom_preset="asdf"),
dict(key=1, has_custom_preset=True, custom_preset="asdf"),
),
(
dict(key=1, target_humidity=60.0),
dict(key=1, has_target_humidity=True, target_humidity=60.0),
),
2021-07-12 20:09:17 +02:00
],
)
2023-11-21 17:09:11 +01:00
async def test_climate_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
2021-07-12 20:09:17 +02:00
send = patch_send(auth_client)
patch_api_version(auth_client, APIVersion(1, 5))
auth_client.climate_command(**cmd)
2021-07-12 20:09:17 +02:00
send.assert_called_once_with(ClimateCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1, state=0.0), dict(key=1, state=0.0)),
(dict(key=1, state=100.0), dict(key=1, state=100.0)),
],
)
2023-11-21 17:09:11 +01:00
async def test_number_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
2021-07-12 20:09:17 +02:00
send = patch_send(auth_client)
auth_client.number_command(**cmd)
2021-07-12 20:09:17 +02:00
send.assert_called_once_with(NumberCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(
dict(key=1, year=2024, month=2, day=29),
dict(key=1, year=2024, month=2, day=29),
),
(
dict(key=1, year=2000, month=6, day=10),
dict(key=1, year=2000, month=6, day=10),
),
],
)
async def test_date_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
auth_client.date_command(**cmd)
send.assert_called_once_with(DateCommandRequest(**req))
2024-03-20 02:31:00 +01:00
# Test time command
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(
dict(key=1, hour=12, minute=30, second=30),
dict(key=1, hour=12, minute=30, second=30),
),
(
dict(key=1, hour=0, minute=0, second=0),
dict(key=1, hour=0, minute=0, second=0),
),
],
)
async def test_time_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
auth_client.time_command(**cmd)
send.assert_called_once_with(TimeCommandRequest(**req))
2024-04-22 05:58:16 +02:00
# Test date_time command
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(
dict(key=1, epoch_seconds=1735648230),
dict(key=1, epoch_seconds=1735648230),
),
(
dict(key=1, epoch_seconds=1735689600),
dict(key=1, epoch_seconds=1735689600),
),
],
)
async def test_datetime_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
auth_client.datetime_command(**cmd)
send.assert_called_once_with(DateTimeCommandRequest(**req))
2022-01-11 02:29:19 +01:00
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1, command=LockCommand.LOCK), dict(key=1, command=LockCommand.LOCK)),
(
dict(key=1, command=LockCommand.UNLOCK),
dict(key=1, command=LockCommand.UNLOCK),
),
(dict(key=1, command=LockCommand.OPEN), dict(key=1, command=LockCommand.OPEN)),
2023-11-21 17:26:09 +01:00
(
dict(key=1, command=LockCommand.OPEN, code="1234"),
dict(key=1, command=LockCommand.OPEN, code="1234"),
),
2022-01-11 02:29:19 +01:00
],
)
2023-11-21 17:09:11 +01:00
async def test_lock_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
2022-01-11 02:29:19 +01:00
send = patch_send(auth_client)
auth_client.lock_command(**cmd)
2022-01-11 02:29:19 +01:00
send.assert_called_once_with(LockCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1), dict(key=1)),
(dict(key=1, position=1.0),),
(dict(key=1, position=0.0),),
(dict(key=1, stop=True),),
],
)
async def test_valve_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
auth_client.valve_command(**cmd)
send.assert_called_once_with(ValveCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1), dict(key=1)),
(dict(key=1, position=0.5), dict(key=1, has_position=True, position=0.5)),
(dict(key=1, position=0.0), dict(key=1, has_position=True, position=0.0)),
(dict(key=1, stop=True), dict(key=1, stop=True)),
(
dict(key=1, position=1.0),
dict(key=1, has_position=True, position=1.0),
),
],
)
async def test_valve_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
patch_api_version(auth_client, APIVersion(1, 1))
auth_client.valve_command(**cmd)
send.assert_called_once_with(ValveCommandRequest(**req))
2021-07-26 20:51:12 +02:00
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1, state="One"), dict(key=1, state="One")),
(dict(key=1, state="Two"), dict(key=1, state="Two")),
],
)
2023-11-21 17:09:11 +01:00
async def test_select_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
2021-07-26 20:51:12 +02:00
send = patch_send(auth_client)
auth_client.select_command(**cmd)
2021-07-26 20:51:12 +02:00
send.assert_called_once_with(SelectCommandRequest(**req))
2022-05-18 03:28:40 +02:00
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(
dict(key=1, command=MediaPlayerCommand.MUTE),
dict(key=1, has_command=True, command=MediaPlayerCommand.MUTE),
),
(
dict(key=1, volume=1.0),
dict(key=1, has_volume=True, volume=1.0),
),
(
dict(key=1, media_url="http://example.com"),
dict(key=1, has_media_url=True, media_url="http://example.com"),
),
],
)
2023-11-21 17:09:11 +01:00
async def test_media_player_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
2022-05-18 03:28:40 +02:00
send = patch_send(auth_client)
auth_client.media_player_command(**cmd)
2022-05-18 03:28:40 +02:00
send.assert_called_once_with(MediaPlayerCommandRequest(**req))
2021-07-12 20:09:17 +02:00
@pytest.mark.asyncio
2023-11-21 17:09:11 +01:00
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1), dict(key=1)),
],
)
async def test_button_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
auth_client.button_command(**cmd)
2023-11-21 17:09:11 +01:00
send.assert_called_once_with(ButtonCommandRequest(**req))
2023-11-21 17:18:58 +01:00
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1, state=True), dict(key=1, state=True, has_state=True)),
(dict(key=1, state=False), dict(key=1, state=False, has_state=True)),
(dict(key=1, state=None), dict(key=1, state=None, has_state=False)),
(
dict(key=1, state=True, tone="any"),
dict(key=1, state=True, has_state=True, has_tone=True, tone="any"),
),
(
dict(key=1, state=True, tone=None),
dict(key=1, state=True, has_state=True, has_tone=False, tone=None),
),
(
dict(key=1, state=True, volume=5),
dict(key=1, state=True, has_volume=True, volume=5, has_state=True),
),
(
dict(key=1, state=True, duration=5),
dict(key=1, state=True, has_duration=True, duration=5, has_state=True),
),
],
)
async def test_siren_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
auth_client.siren_command(**cmd)
2023-11-21 17:18:58 +01:00
send.assert_called_once_with(SirenCommandRequest(**req))
2023-11-21 17:09:11 +01:00
@pytest.mark.asyncio
async def test_execute_service(auth_client: APIClient) -> None:
2021-07-12 20:09:17 +02:00
send = patch_send(auth_client)
patch_api_version(auth_client, APIVersion(1, 3))
service = UserService(
name="my_service",
key=1,
args=[
UserServiceArg(name="arg1", type=UserServiceArgType.BOOL),
UserServiceArg(name="arg2", type=UserServiceArgType.INT),
UserServiceArg(name="arg3", type=UserServiceArgType.FLOAT),
UserServiceArg(name="arg4", type=UserServiceArgType.STRING),
UserServiceArg(name="arg5", type=UserServiceArgType.BOOL_ARRAY),
UserServiceArg(name="arg6", type=UserServiceArgType.INT_ARRAY),
UserServiceArg(name="arg7", type=UserServiceArgType.FLOAT_ARRAY),
UserServiceArg(name="arg8", type=UserServiceArgType.STRING_ARRAY),
],
)
with pytest.raises(KeyError):
auth_client.execute_service(service, data={})
2021-07-12 20:09:17 +02:00
auth_client.execute_service(
2021-07-12 20:09:17 +02:00
service,
data={
"arg1": False,
"arg2": 42,
"arg3": 99.0,
"arg4": "asdf",
"arg5": [False, True, False],
"arg6": [42, 10, 9],
"arg7": [0.0, -100.0],
"arg8": [],
},
)
send.assert_called_once_with(
ExecuteServiceRequest(
key=1,
args=[
ExecuteServiceArgument(bool_=False),
ExecuteServiceArgument(int_=42),
ExecuteServiceArgument(float_=99.0),
ExecuteServiceArgument(string_="asdf"),
ExecuteServiceArgument(bool_array=[False, True, False]),
ExecuteServiceArgument(int_array=[42, 10, 9]),
ExecuteServiceArgument(float_array=[0.0, -100.0]),
ExecuteServiceArgument(string_array=[]),
],
)
)
send.reset_mock()
patch_api_version(auth_client, APIVersion(1, 2))
service = UserService(
name="my_service",
key=2,
args=[
UserServiceArg(name="arg1", type=UserServiceArgType.BOOL),
UserServiceArg(name="arg2", type=UserServiceArgType.INT),
],
)
# Test legacy_int
auth_client.execute_service(
2021-07-12 20:09:17 +02:00
service,
data={
"arg1": False,
"arg2": 42,
},
)
send.assert_called_once_with(
ExecuteServiceRequest(
key=2,
args=[
ExecuteServiceArgument(bool_=False),
ExecuteServiceArgument(legacy_int=42),
],
)
)
send.reset_mock()
# Test arg order
auth_client.execute_service(
2021-07-12 20:09:17 +02:00
service,
data={
"arg2": 42,
"arg1": False,
},
)
send.assert_called_once_with(
ExecuteServiceRequest(
key=2,
args=[
ExecuteServiceArgument(bool_=False),
ExecuteServiceArgument(legacy_int=42),
],
)
)
send.reset_mock()
@pytest.mark.asyncio
2023-11-21 17:09:11 +01:00
async def test_request_single_image(auth_client: APIClient) -> None:
2021-07-12 20:09:17 +02:00
send = patch_send(auth_client)
auth_client.request_single_image()
2021-07-12 20:09:17 +02:00
send.assert_called_once_with(CameraImageRequest(single=True, stream=False))
@pytest.mark.asyncio
2023-11-21 17:09:11 +01:00
async def test_request_image_stream(auth_client: APIClient) -> None:
2021-07-12 20:09:17 +02:00
send = patch_send(auth_client)
auth_client.request_image_stream()
2021-07-12 20:09:17 +02:00
send.assert_called_once_with(CameraImageRequest(single=False, stream=True))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(
dict(key=1, command=AlarmControlPanelCommand.ARM_AWAY),
dict(key=1, command=AlarmControlPanelCommand.ARM_AWAY, code=None),
),
(
dict(key=1, command=AlarmControlPanelCommand.ARM_HOME),
dict(key=1, command=AlarmControlPanelCommand.ARM_HOME, code=None),
),
(
dict(key=1, command=AlarmControlPanelCommand.DISARM, code="1234"),
dict(key=1, command=AlarmControlPanelCommand.DISARM, code="1234"),
),
],
)
2023-11-21 17:09:11 +01:00
async def test_alarm_panel_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
auth_client.alarm_control_panel_command(**cmd)
send.assert_called_once_with(AlarmControlPanelCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1, state="hello world"), dict(key=1, state="hello world")),
(dict(key=1, state="goodbye"), dict(key=1, state="goodbye")),
],
)
2023-11-21 17:09:11 +01:00
async def test_text_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
auth_client.text_command(**cmd)
send.assert_called_once_with(TextCommandRequest(**req))
@pytest.mark.asyncio
async def test_noise_psk_handles_subclassed_string():
"""Test that the noise_psk gets converted to a string."""
class PatchableAPIClient(APIClient):
pass
cli = PatchableAPIClient(
address=Estr("127.0.0.1"),
port=6052,
password=None,
noise_psk=Estr("QRTIErOb/fcE9Ukd/5qA3RGYMn0Y+p06U58SCtOXvPc="),
expected_name=Estr("mydevice"),
)
# Make sure its not a subclassed string
assert type(cli._params.noise_psk) is str
assert type(cli._params.addresses[0]) is str
assert type(cli._params.expected_name) is str
rl = ReconnectLogic(
client=cli,
on_disconnect=AsyncMock(),
on_connect=AsyncMock(),
zeroconf_instance=get_mock_zeroconf(),
name="mydevice",
)
assert rl._connection_state is ReconnectLogicState.DISCONNECTED
with patch.object(cli, "start_connection"), patch.object(cli, "finish_connection"):
await rl.start()
for _ in range(3):
await asyncio.sleep(0)
rl.stop_callback()
# Wait for cancellation to propagate
for _ in range(4):
await asyncio.sleep(0)
assert rl._connection_state is ReconnectLogicState.DISCONNECTED
@pytest.mark.asyncio
async def test_no_noise_psk():
"""Test not using a noise_psk."""
cli = APIClient(
address=Estr("127.0.0.1"),
port=6052,
password=None,
noise_psk=None,
expected_name=Estr("mydevice"),
)
# Make sure its not a subclassed string
assert cli._params.noise_psk is None
assert type(cli._params.addresses[0]) is str
assert type(cli._params.expected_name) is str
@pytest.mark.asyncio
async def test_empty_noise_psk_or_expected_name():
"""Test an empty noise_psk is treated as None."""
cli = APIClient(
address=Estr("127.0.0.1"),
port=6052,
password=None,
noise_psk="",
expected_name="",
)
assert cli._params.noise_psk is None
assert type(cli._params.addresses[0]) is str
assert cli._params.expected_name is None
@pytest.mark.asyncio
async def test_bluetooth_disconnect(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_device_disconnect."""
client, connection, transport, protocol = api_client
disconnect_task = asyncio.create_task(client.bluetooth_device_disconnect(1234))
await asyncio.sleep(0)
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=False
)
mock_data_received(protocol, generate_plaintext_packet(response))
await disconnect_task
@pytest.mark.asyncio
async def test_bluetooth_pair(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_device_pair."""
client, connection, transport, protocol = api_client
pair_task = asyncio.create_task(client.bluetooth_device_pair(1234))
await asyncio.sleep(0)
response: message.Message = BluetoothDevicePairingResponse(address=4567)
mock_data_received(protocol, generate_plaintext_packet(response))
await asyncio.sleep(0)
assert not pair_task.done()
response: message.Message = BluetoothDevicePairingResponse(address=1234)
mock_data_received(protocol, generate_plaintext_packet(response))
await pair_task
@pytest.mark.asyncio
async def test_bluetooth_pair_connection_drops(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test connection drop during bluetooth_device_pair."""
client, connection, transport, protocol = api_client
pair_task = asyncio.create_task(client.bluetooth_device_pair(1234))
await asyncio.sleep(0)
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=False, error=13
)
mock_data_received(protocol, generate_plaintext_packet(response))
message = (
"Peripheral 00:00:00:00:04:D2 changed connection status while waiting"
" for BluetoothDevicePairingResponse: Invalid attribute length"
)
with pytest.raises(BluetoothConnectionDroppedError, match=message):
await pair_task
@pytest.mark.asyncio
async def test_bluetooth_unpair_connection_drops(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test connection drop during bluetooth_device_unpair."""
client, connection, transport, protocol = api_client
pair_task = asyncio.create_task(client.bluetooth_device_unpair(1234))
await asyncio.sleep(0)
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=False, error=13
)
mock_data_received(protocol, generate_plaintext_packet(response))
message = (
"Peripheral 00:00:00:00:04:D2 changed connection status while waiting"
" for BluetoothDeviceUnpairingResponse: Invalid attribute length"
)
with pytest.raises(BluetoothConnectionDroppedError, match=message):
await pair_task
@pytest.mark.asyncio
async def test_bluetooth_clear_cache_connection_drops(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test connection drop during bluetooth_device_clear_cache."""
client, connection, transport, protocol = api_client
pair_task = asyncio.create_task(client.bluetooth_device_clear_cache(1234))
await asyncio.sleep(0)
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=False, error=13
)
mock_data_received(protocol, generate_plaintext_packet(response))
message = (
"Peripheral 00:00:00:00:04:D2 changed connection status while waiting"
" for BluetoothDeviceClearCacheResponse: Invalid attribute length"
)
with pytest.raises(BluetoothConnectionDroppedError, match=message):
await pair_task
@pytest.mark.asyncio
async def test_bluetooth_unpair(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_device_unpair."""
client, connection, transport, protocol = api_client
unpair_task = asyncio.create_task(client.bluetooth_device_unpair(1234))
await asyncio.sleep(0)
response: message.Message = BluetoothDeviceUnpairingResponse(address=1234)
mock_data_received(protocol, generate_plaintext_packet(response))
await unpair_task
@pytest.mark.asyncio
async def test_bluetooth_clear_cache(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_device_clear_cache."""
client, connection, transport, protocol = api_client
clear_task = asyncio.create_task(client.bluetooth_device_clear_cache(1234))
await asyncio.sleep(0)
response: message.Message = BluetoothDeviceClearCacheResponse(address=1234)
mock_data_received(protocol, generate_plaintext_packet(response))
await clear_task
@pytest.mark.asyncio
async def test_device_info(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test fetching device info."""
client, connection, transport, protocol = api_client
assert client.log_name == "fake @ 10.0.0.512"
device_info_task = asyncio.create_task(client.device_info())
await asyncio.sleep(0)
response: message.Message = DeviceInfoResponse(
name="realname",
friendly_name="My Device",
has_deep_sleep=True,
)
mock_data_received(protocol, generate_plaintext_packet(response))
device_info = await device_info_task
assert device_info.name == "realname"
assert device_info.friendly_name == "My Device"
assert device_info.has_deep_sleep
assert client.log_name == "realname @ 10.0.0.512"
disconnect_task = asyncio.create_task(client.disconnect())
await asyncio.sleep(0)
response: message.Message = DisconnectResponse()
mock_data_received(protocol, generate_plaintext_packet(response))
await disconnect_task
with pytest.raises(APIConnectionError, match="Not connected"):
await client.device_info()
@pytest.mark.asyncio
async def test_bluetooth_gatt_read(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_read."""
client, connection, transport, protocol = api_client
read_task = asyncio.create_task(client.bluetooth_gatt_read(1234, 1234))
await asyncio.sleep(0)
other_response: message.Message = BluetoothGATTReadResponse(
address=1234, handle=4567, data=b"4567"
)
mock_data_received(protocol, generate_plaintext_packet(other_response))
response: message.Message = BluetoothGATTReadResponse(
address=1234, handle=1234, data=b"1234"
)
mock_data_received(protocol, generate_plaintext_packet(response))
assert await read_task == b"1234"
@pytest.mark.asyncio
async def test_bluetooth_gatt_read_connection_drops(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test connection drop during bluetooth_gatt_read."""
client, connection, transport, protocol = api_client
read_task = asyncio.create_task(client.bluetooth_gatt_read(1234, 1234))
await asyncio.sleep(0)
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=False, error=13
)
mock_data_received(protocol, generate_plaintext_packet(response))
message = (
"Peripheral 00:00:00:00:04:D2 changed connection status while waiting"
" for BluetoothGATTReadResponse, BluetoothGATTErrorResponse: Invalid attribute length"
)
with pytest.raises(BluetoothConnectionDroppedError, match=message):
await read_task
@pytest.mark.asyncio
async def test_bluetooth_gatt_read_error(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_read that errors."""
client, connection, transport, protocol = api_client
read_task = asyncio.create_task(client.bluetooth_gatt_read(1234, 1234))
await asyncio.sleep(0)
error_response: message.Message = BluetoothGATTErrorResponse(
address=1234, handle=1234
)
mock_data_received(protocol, generate_plaintext_packet(error_response))
with pytest.raises(BluetoothGATTAPIError):
await read_task
@pytest.mark.asyncio
async def test_bluetooth_gatt_read_descriptor(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_read_descriptor."""
client, connection, transport, protocol = api_client
read_task = asyncio.create_task(client.bluetooth_gatt_read_descriptor(1234, 1234))
await asyncio.sleep(0)
other_response: message.Message = BluetoothGATTReadResponse(
address=1234, handle=4567, data=b"4567"
)
mock_data_received(protocol, generate_plaintext_packet(other_response))
response: message.Message = BluetoothGATTReadResponse(
address=1234, handle=1234, data=b"1234"
)
mock_data_received(protocol, generate_plaintext_packet(response))
assert await read_task == b"1234"
@pytest.mark.asyncio
async def test_bluetooth_gatt_write(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_write."""
client, connection, transport, protocol = api_client
write_task = asyncio.create_task(
client.bluetooth_gatt_write(1234, 1234, b"1234", True)
)
await asyncio.sleep(0)
other_response: message.Message = BluetoothGATTWriteResponse(
address=1234, handle=4567
)
mock_data_received(protocol, generate_plaintext_packet(other_response))
response: message.Message = BluetoothGATTWriteResponse(address=1234, handle=1234)
mock_data_received(protocol, generate_plaintext_packet(response))
await write_task
@pytest.mark.asyncio
async def test_bluetooth_gatt_write_connection_drops(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test connection drop during bluetooth_gatt_read."""
client, connection, transport, protocol = api_client
write_task = asyncio.create_task(
client.bluetooth_gatt_write(1234, 1234, b"1234", True)
)
await asyncio.sleep(0)
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=False, error=13
)
mock_data_received(protocol, generate_plaintext_packet(response))
message = (
"Peripheral 00:00:00:00:04:D2 changed connection status while waiting"
" for BluetoothGATTWriteResponse, BluetoothGATTErrorResponse: Invalid attribute length"
)
with pytest.raises(BluetoothConnectionDroppedError, match=message):
await write_task
@pytest.mark.asyncio
async def test_bluetooth_gatt_write_without_response(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_write without response."""
client, connection, transport, protocol = api_client
transport.reset_mock()
write_task = asyncio.create_task(
client.bluetooth_gatt_write(1234, 1234, b"1234", False)
)
await asyncio.sleep(0)
await write_task
assert transport.mock_calls[0][1][0] == b'\x00\x0cK\x08\xd2\t\x10\xd2\t"\x041234'
with pytest.raises(TimeoutAPIError, match="BluetoothGATTWriteResponse"):
await client.bluetooth_gatt_write(1234, 1234, b"1234", True, timeout=0)
@pytest.mark.asyncio
async def test_bluetooth_gatt_write_descriptor(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_write_descriptor."""
client, connection, transport, protocol = api_client
write_task = asyncio.create_task(
client.bluetooth_gatt_write_descriptor(1234, 1234, b"1234", True)
)
await asyncio.sleep(0)
other_response: message.Message = BluetoothGATTWriteResponse(
address=1234, handle=4567
)
mock_data_received(protocol, generate_plaintext_packet(other_response))
response: message.Message = BluetoothGATTWriteResponse(address=1234, handle=1234)
mock_data_received(protocol, generate_plaintext_packet(response))
await write_task
@pytest.mark.asyncio
async def test_bluetooth_gatt_write_descriptor_without_response(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_write_descriptor without response."""
client, connection, transport, protocol = api_client
transport.reset_mock()
write_task = asyncio.create_task(
client.bluetooth_gatt_write_descriptor(
1234, 1234, b"1234", wait_for_response=False
)
)
await asyncio.sleep(0)
await write_task
assert transport.mock_calls[0][1][0] == b"\x00\x0cM\x08\xd2\t\x10\xd2\t\x1a\x041234"
with pytest.raises(TimeoutAPIError, match="BluetoothGATTWriteResponse"):
await client.bluetooth_gatt_write_descriptor(1234, 1234, b"1234", timeout=0)
@pytest.mark.asyncio
async def test_bluetooth_gatt_get_services_connection_drops(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test connection drop during bluetooth_gatt_get_services."""
client, connection, transport, protocol = api_client
services_task = asyncio.create_task(client.bluetooth_gatt_get_services(1234))
await asyncio.sleep(0)
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=False, error=13
)
mock_data_received(protocol, generate_plaintext_packet(response))
message = (
"Peripheral 00:00:00:00:04:D2 changed connection status while waiting"
" for BluetoothGATTGetServicesResponse, BluetoothGATTGetServicesDoneResponse, "
"BluetoothGATTErrorResponse: Invalid attribute length"
)
with pytest.raises(BluetoothConnectionDroppedError, match=message):
await services_task
@pytest.mark.asyncio
async def test_bluetooth_gatt_get_services(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_get_services success case."""
client, connection, transport, protocol = api_client
services_task = asyncio.create_task(client.bluetooth_gatt_get_services(1234))
await asyncio.sleep(0)
service1: message.Message = BluetoothGATTService(
uuid=[1, 1],
handle=1,
characteristics=[
BluetoothGATTCharacteristic(
uuid=[1, 2],
handle=2,
properties=1,
descriptors=[BluetoothGATTDescriptor(uuid=[1, 3], handle=3)],
)
],
)
response: message.Message = BluetoothGATTGetServicesResponse(
address=1234, services=[service1]
)
mock_data_received(protocol, generate_plaintext_packet(response))
done_response: message.Message = BluetoothGATTGetServicesDoneResponse(address=1234)
mock_data_received(protocol, generate_plaintext_packet(done_response))
services = await services_task
service = BluetoothGATTServiceModel.from_pb(service1)
assert services == ESPHomeBluetoothGATTServices(
address=1234,
services=[service],
)
@pytest.mark.asyncio
async def test_bluetooth_gatt_get_services_errors(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_get_services with a failure."""
client, connection, transport, protocol = api_client
services_task = asyncio.create_task(client.bluetooth_gatt_get_services(1234))
await asyncio.sleep(0)
service1: message.Message = BluetoothGATTService(
uuid=[1, 1], handle=1, characteristics=[]
)
response: message.Message = BluetoothGATTGetServicesResponse(
address=1234, services=[service1]
)
mock_data_received(protocol, generate_plaintext_packet(response))
done_response: message.Message = BluetoothGATTErrorResponse(address=1234)
mock_data_received(protocol, generate_plaintext_packet(done_response))
with pytest.raises(BluetoothGATTAPIError):
await services_task
@pytest.mark.asyncio
async def test_bluetooth_gatt_start_notify_connection_drops(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test connection drop during bluetooth_gatt_start_notify."""
client, connection, transport, protocol = api_client
notify_task = asyncio.create_task(
client.bluetooth_gatt_start_notify(1234, 1, lambda handle, data: None)
)
await asyncio.sleep(0)
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=False, error=13
)
mock_data_received(protocol, generate_plaintext_packet(response))
message = (
"Peripheral 00:00:00:00:04:D2 changed connection status while waiting"
" for BluetoothGATTNotifyResponse, BluetoothGATTErrorResponse: Invalid attribute length"
)
with pytest.raises(BluetoothConnectionDroppedError, match=message):
await notify_task
@pytest.mark.asyncio
async def test_bluetooth_gatt_start_notify(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_start_notify."""
client, connection, transport, protocol = api_client
notifies = []
handlers_before = len(list(itertools.chain(*connection._message_handlers.values())))
def on_bluetooth_gatt_notify(handle: int, data: bytearray) -> None:
notifies.append((handle, data))
notify_task = asyncio.create_task(
client.bluetooth_gatt_start_notify(1234, 1, on_bluetooth_gatt_notify)
)
await asyncio.sleep(0)
notify_response: message.Message = BluetoothGATTNotifyResponse(
address=1234, handle=1
)
data_response: message.Message = BluetoothGATTNotifyDataResponse(
address=1234, handle=1, data=b"gotit"
)
mock_data_received(
protocol,
generate_plaintext_packet(notify_response)
+ generate_plaintext_packet(data_response),
)
cancel_cb, abort_cb = await notify_task
assert notifies == [(1, b"gotit")]
second_data_response: message.Message = BluetoothGATTNotifyDataResponse(
address=1234, handle=1, data=b"after finished"
)
mock_data_received(protocol, generate_plaintext_packet(second_data_response))
assert notifies == [(1, b"gotit"), (1, b"after finished")]
await cancel_cb()
assert (
len(list(itertools.chain(*connection._message_handlers.values())))
== handlers_before
)
# Ensure abort callback is a no-op after cancel
# and doesn't raise
abort_cb()
await client.disconnect(force=True)
# Ensure abort callback is a no-op after disconnect
# and does not raise
await cancel_cb()
@pytest.mark.asyncio
async def test_bluetooth_gatt_start_notify_fails(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_start_notify failure does not leak."""
client, connection, transport, protocol = api_client
notifies = []
def on_bluetooth_gatt_notify(handle: int, data: bytearray) -> None:
notifies.append((handle, data))
handlers_before = len(list(itertools.chain(*connection._message_handlers.values())))
with (
patch.object(
connection,
"send_messages_await_response_complex",
side_effect=APIConnectionError,
),
pytest.raises(APIConnectionError),
):
await client.bluetooth_gatt_start_notify(1234, 1, on_bluetooth_gatt_notify)
assert (
len(list(itertools.chain(*connection._message_handlers.values())))
== handlers_before
)
@pytest.mark.asyncio
async def test_subscribe_bluetooth_le_advertisements(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test subscribe_bluetooth_le_advertisements."""
client, connection, transport, protocol = api_client
advs = []
def on_bluetooth_le_advertisements(adv: BluetoothLEAdvertisement) -> None:
advs.append(adv)
unsub = client.subscribe_bluetooth_le_advertisements(on_bluetooth_le_advertisements)
await asyncio.sleep(0)
response: message.Message = BluetoothLEAdvertisementResponse(
address=1234,
name=b"mydevice",
rssi=-50,
service_uuids=["1234"],
service_data=[
BluetoothServiceData(
uuid="1234",
data=b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
)
],
manufacturer_data=[
BluetoothServiceData(
uuid="1234",
data=b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
)
],
address_type=1,
)
mock_data_received(protocol, generate_plaintext_packet(response))
assert advs == [
BluetoothLEAdvertisement(
address=1234,
name="mydevice",
rssi=-50,
service_uuids=["000034-0000-1000-8000-00805f9b34fb"],
manufacturer_data={
4660: b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
},
service_data={
"000034-0000-1000-8000-00805f9b34fb": b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
},
address_type=1,
)
]
advs.clear()
response: message.Message = BluetoothLEAdvertisementResponse(
address=1234,
name=b"mydevice",
rssi=-50,
service_uuids=[],
service_data=[],
manufacturer_data=[],
address_type=1,
)
mock_data_received(protocol, generate_plaintext_packet(response))
assert advs == [
BluetoothLEAdvertisement(
address=1234,
name="mydevice",
rssi=-50,
service_uuids=[],
manufacturer_data={},
service_data={},
address_type=1,
)
]
advs.clear()
response: message.Message = BluetoothLEAdvertisementResponse(
address=1234,
name=b"mydevice",
rssi=-50,
service_uuids=["1234"],
service_data=[
BluetoothServiceData(
uuid="1234",
legacy_data=b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
)
],
manufacturer_data=[
BluetoothServiceData(
uuid="1234",
legacy_data=b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
)
],
address_type=1,
)
mock_data_received(protocol, generate_plaintext_packet(response))
assert advs == [
BluetoothLEAdvertisement(
address=1234,
name="mydevice",
rssi=-50,
service_uuids=["000034-0000-1000-8000-00805f9b34fb"],
manufacturer_data={
4660: b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
},
service_data={
"000034-0000-1000-8000-00805f9b34fb": b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
},
address_type=1,
)
]
unsub()
@pytest.mark.asyncio
async def test_subscribe_bluetooth_le_raw_advertisements(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test subscribe_bluetooth_le_raw_advertisements."""
client, connection, transport, protocol = api_client
adv_groups = []
def on_raw_bluetooth_le_advertisements(
advs: BluetoothLERawAdvertisementsResponse,
) -> None:
adv_groups.append(advs.advertisements)
unsub = client.subscribe_bluetooth_le_raw_advertisements(
on_raw_bluetooth_le_advertisements
)
await asyncio.sleep(0)
response: message.Message = BluetoothLERawAdvertisementsResponse(
advertisements=[
BluetoothLERawAdvertisement(
address=1234,
rssi=-50,
address_type=1,
data=b"1234",
)
]
)
mock_data_received(protocol, generate_plaintext_packet(response))
assert len(adv_groups) == 1
first_adv = adv_groups[0][0]
assert first_adv.address == 1234
assert first_adv.rssi == -50
assert first_adv.address_type == 1
assert first_adv.data == b"1234"
unsub()
@pytest.mark.asyncio
async def test_subscribe_bluetooth_connections_free(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test subscribe_bluetooth_connections_free."""
client, connection, transport, protocol = api_client
connections = []
def on_bluetooth_connections_free(free: int, limit: int) -> None:
connections.append((free, limit))
unsub = client.subscribe_bluetooth_connections_free(on_bluetooth_connections_free)
await asyncio.sleep(0)
response: message.Message = BluetoothConnectionsFreeResponse(free=2, limit=3)
mock_data_received(protocol, generate_plaintext_packet(response))
assert connections == [(2, 3)]
unsub()
@pytest.mark.asyncio
async def test_subscribe_home_assistant_states(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test subscribe_home_assistant_states."""
client, connection, transport, protocol = api_client
states = []
def on_subscribe_home_assistant_states(
entity_id: str, attribute: str | None
) -> None:
states.append((entity_id, attribute))
client.subscribe_home_assistant_states(on_subscribe_home_assistant_states)
await asyncio.sleep(0)
response: message.Message = SubscribeHomeAssistantStateResponse(
entity_id="sensor.red", attribute="any"
)
mock_data_received(protocol, generate_plaintext_packet(response))
assert states == [("sensor.red", "any")]
@pytest.mark.asyncio
async def test_subscribe_logs(auth_client: APIClient) -> None:
send = patch_response_callback(auth_client)
on_logs = MagicMock()
auth_client.subscribe_logs(on_logs)
log_msg = SubscribeLogsResponse(level=1, message=b"asdf")
await send(log_msg)
on_logs.assert_called_with(log_msg)
@pytest.mark.asyncio
async def test_send_home_assistant_state(auth_client: APIClient) -> None:
send = patch_send(auth_client)
auth_client.send_home_assistant_state("binary_sensor.bla", None, "on")
send.assert_called_once_with(
HomeAssistantStateResponse(
entity_id="binary_sensor.bla", state="on", attribute=None
)
)
@pytest.mark.asyncio
async def test_subscribe_service_calls(auth_client: APIClient) -> None:
send = patch_response_callback(auth_client)
on_service_call = MagicMock()
auth_client.subscribe_service_calls(on_service_call)
service_msg = HomeassistantServiceResponse(service="bob")
await send(service_msg)
on_service_call.assert_called_with(HomeassistantServiceCall.from_pb(service_msg))
@pytest.mark.asyncio
async def test_set_debug(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test set_debug."""
client, connection, transport, protocol = api_client
response: message.Message = DeviceInfoResponse(
name="realname",
friendly_name="My Device",
has_deep_sleep=True,
)
caplog.set_level(logging.DEBUG)
client.set_debug(True)
assert client.log_name == "fake @ 10.0.0.512"
device_info_task = asyncio.create_task(client.device_info())
await asyncio.sleep(0)
mock_data_received(protocol, generate_plaintext_packet(response))
await device_info_task
assert "My Device" in caplog.text
caplog.clear()
client.set_debug(False)
device_info_task = asyncio.create_task(client.device_info())
await asyncio.sleep(0)
mock_data_received(protocol, generate_plaintext_packet(response))
await device_info_task
assert "My Device" not in caplog.text
@pytest.mark.asyncio
async def test_force_disconnect(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test force disconnect can be called multiple times."""
client, connection, transport, protocol = api_client
assert connection.is_connected is True
assert connection.on_stop is not None
await client.disconnect(force=True)
assert client._connection is None
assert connection.is_connected is False
await client.disconnect(force=False)
assert connection.is_connected is False
@pytest.mark.asyncio
@pytest.mark.parametrize(
("has_cache", "feature_flags", "method"),
[
(False, BluetoothProxyFeature(0), BluetoothDeviceRequestType.CONNECT),
(
False,
BluetoothProxyFeature.REMOTE_CACHING,
BluetoothDeviceRequestType.CONNECT_V3_WITHOUT_CACHE,
),
(
True,
BluetoothProxyFeature.REMOTE_CACHING,
BluetoothDeviceRequestType.CONNECT_V3_WITH_CACHE,
),
],
)
async def test_bluetooth_device_connect(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
has_cache: bool,
feature_flags: BluetoothProxyFeature,
method: BluetoothDeviceRequestType,
) -> None:
"""Test bluetooth_device_connect."""
client, connection, transport, protocol = api_client
states = []
def on_bluetooth_connection_state(connected: bool, mtu: int, error: int) -> None:
states.append((connected, mtu, error))
connect_task = asyncio.create_task(
client.bluetooth_device_connect(
1234,
on_bluetooth_connection_state,
timeout=1,
feature_flags=feature_flags,
has_cache=has_cache,
disconnect_timeout=1,
address_type=1,
)
)
await asyncio.sleep(0)
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=True, mtu=23, error=0
)
mock_data_received(protocol, generate_plaintext_packet(response))
cancel = await connect_task
assert states == [(True, 23, 0)]
transport.write.assert_called_once_with(
generate_plaintext_packet(
BluetoothDeviceRequest(
address=1234,
request_type=method,
has_address_type=True,
address_type=1,
),
)
)
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=False, mtu=23, error=7
)
mock_data_received(protocol, generate_plaintext_packet(response))
await asyncio.sleep(0)
assert states == [(True, 23, 0), (False, 23, 7)]
cancel()
# After cancel, no more messages should called back
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=False, mtu=23, error=8
)
mock_data_received(protocol, generate_plaintext_packet(response))
await asyncio.sleep(0)
assert states == [(True, 23, 0), (False, 23, 7)]
# Make sure cancel is safe to call again
cancel()
await client.disconnect(force=True)
await asyncio.sleep(0)
assert not client._connection
# Make sure cancel is safe to call after disconnect
cancel()
@pytest.mark.asyncio
async def test_bluetooth_device_connect_and_disconnect_times_out(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_device_connect and disconnect times out."""
client, connection, transport, protocol = api_client
states = []
def on_bluetooth_connection_state(connected: bool, mtu: int, error: int) -> None:
states.append((connected, mtu, error))
connect_task = asyncio.create_task(
client.bluetooth_device_connect(
1234,
on_bluetooth_connection_state,
timeout=0,
feature_flags=0,
has_cache=True,
disconnect_timeout=0,
address_type=1,
)
)
with pytest.raises(TimeoutAPIError):
await connect_task
assert states == []
@pytest.mark.asyncio
async def test_bluetooth_device_connect_times_out_disconnect_ok(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_device_connect and disconnect times out."""
client, connection, transport, protocol = api_client
states = []
def on_bluetooth_connection_state(connected: bool, mtu: int, error: int) -> None:
states.append((connected, mtu, error))
connect_task = asyncio.create_task(
client.bluetooth_device_connect(
1234,
on_bluetooth_connection_state,
timeout=0,
feature_flags=0,
has_cache=True,
disconnect_timeout=1,
address_type=1,
)
)
await asyncio.sleep(0)
# The connect request should be written
assert len(transport.write.mock_calls) == 1
await asyncio.sleep(0)
await asyncio.sleep(0)
await asyncio.sleep(0)
# Now that we timed out, the disconnect
# request should be written
assert len(transport.write.mock_calls) == 2
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=False, mtu=23, error=8
)
mock_data_received(protocol, generate_plaintext_packet(response))
with pytest.raises(TimeoutAPIError):
await connect_task
assert states == []
@pytest.mark.asyncio
async def test_bluetooth_device_connect_cancelled(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_device_connect handles cancellation."""
client, connection, transport, protocol = api_client
states = []
handlers_before = len(list(itertools.chain(*connection._message_handlers.values())))
def on_bluetooth_connection_state(connected: bool, mtu: int, error: int) -> None:
states.append((connected, mtu, error))
connect_task = asyncio.create_task(
client.bluetooth_device_connect(
1234,
on_bluetooth_connection_state,
timeout=10,
feature_flags=0,
has_cache=True,
disconnect_timeout=10,
address_type=1,
)
)
await asyncio.sleep(0)
# The connect request should be written
assert len(transport.write.mock_calls) == 1
connect_task.cancel()
with pytest.raises(asyncio.CancelledError):
await connect_task
assert states == []
handlers_after = len(list(itertools.chain(*connection._message_handlers.values())))
# Make sure we do not leak message handlers
assert handlers_after == handlers_before
@pytest.mark.asyncio
async def test_send_voice_assistant_event(auth_client: APIClient) -> None:
send = patch_send(auth_client)
auth_client.send_voice_assistant_event(
VoiceAssistantEventModelType.VOICE_ASSISTANT_ERROR,
{"error": "error", "ok": "ok"},
)
send.assert_called_once_with(
VoiceAssistantEventResponse(
event_type=VoiceAssistantEventModelType.VOICE_ASSISTANT_ERROR.value,
data=[
VoiceAssistantEventData(name="error", value="error"),
VoiceAssistantEventData(name="ok", value="ok"),
],
)
)
send.reset_mock()
auth_client.send_voice_assistant_event(
VoiceAssistantEventModelType.VOICE_ASSISTANT_ERROR, None
)
send.assert_called_once_with(
VoiceAssistantEventResponse(
event_type=VoiceAssistantEventModelType.VOICE_ASSISTANT_ERROR.value,
data=[],
)
)
@pytest.mark.asyncio
async def test_subscribe_voice_assistant(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test subscribe_voice_assistant."""
client, connection, transport, protocol = api_client
send = patch_send(client)
starts = []
stops = []
async def handle_start(
conversation_id: str,
flags: int,
audio_settings: VoiceAssistantAudioSettings,
wake_word_phrase: str | None,
) -> int | None:
starts.append((conversation_id, flags, audio_settings, wake_word_phrase))
return 42
async def handle_stop() -> None:
stops.append(True)
unsub = client.subscribe_voice_assistant(
handle_start=handle_start, handle_stop=handle_stop
)
send.assert_called_once_with(SubscribeVoiceAssistantRequest(subscribe=True))
send.reset_mock()
audio_settings = VoiceAssistantAudioSettings(
noise_suppression_level=42,
auto_gain=42,
volume_multiplier=42,
)
response: message.Message = VoiceAssistantRequest(
conversation_id="theone",
start=True,
flags=42,
audio_settings=audio_settings,
wake_word_phrase="okay nabu",
)
mock_data_received(protocol, generate_plaintext_packet(response))
await asyncio.sleep(0)
await asyncio.sleep(0)
assert starts == [
(
"theone",
42,
VoiceAssistantAudioSettingsModel(
noise_suppression_level=42,
auto_gain=42,
volume_multiplier=42,
),
"okay nabu",
)
]
assert stops == []
send.assert_called_once_with(VoiceAssistantResponse(port=42))
send.reset_mock()
response: message.Message = VoiceAssistantRequest(
conversation_id="theone",
start=False,
)
mock_data_received(protocol, generate_plaintext_packet(response))
await asyncio.sleep(0)
assert stops == [True]
send.reset_mock()
unsub()
send.assert_called_once_with(SubscribeVoiceAssistantRequest(subscribe=False))
send.reset_mock()
await client.disconnect(force=True)
# Ensure abort callback is a no-op after disconnect
# and does not raise
unsub()
assert len(send.mock_calls) == 0
@pytest.mark.asyncio
async def test_subscribe_voice_assistant_failure(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test subscribe_voice_assistant failure."""
client, connection, transport, protocol = api_client
send = patch_send(client)
starts = []
stops = []
async def handle_start(
conversation_id: str,
flags: int,
audio_settings: VoiceAssistantAudioSettings,
wake_word_phrase: str | None,
) -> int | None:
starts.append((conversation_id, flags, audio_settings, wake_word_phrase))
# Return None to indicate failure
return None
async def handle_stop() -> None:
stops.append(True)
unsub = client.subscribe_voice_assistant(
handle_start=handle_start, handle_stop=handle_stop
)
send.assert_called_once_with(SubscribeVoiceAssistantRequest(subscribe=True))
send.reset_mock()
audio_settings = VoiceAssistantAudioSettings(
noise_suppression_level=42,
auto_gain=42,
volume_multiplier=42,
)
response: message.Message = VoiceAssistantRequest(
conversation_id="theone",
start=True,
flags=42,
audio_settings=audio_settings,
)
mock_data_received(protocol, generate_plaintext_packet(response))
await asyncio.sleep(0)
await asyncio.sleep(0)
assert starts == [
(
"theone",
42,
VoiceAssistantAudioSettingsModel(
noise_suppression_level=42,
auto_gain=42,
volume_multiplier=42,
),
None,
)
]
assert stops == []
send.assert_called_once_with(VoiceAssistantResponse(error=True))
send.reset_mock()
response: message.Message = VoiceAssistantRequest(
conversation_id="theone",
start=False,
)
mock_data_received(protocol, generate_plaintext_packet(response))
await asyncio.sleep(0)
assert stops == [True]
send.reset_mock()
unsub()
send.assert_called_once_with(SubscribeVoiceAssistantRequest(subscribe=False))
send.reset_mock()
await client.disconnect(force=True)
# Ensure abort callback is a no-op after disconnect
# and does not raise
unsub()
assert len(send.mock_calls) == 0
@pytest.mark.asyncio
async def test_subscribe_voice_assistant_cancels_long_running_handle_start(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test subscribe_voice_assistant cancels long running tasks on unsub."""
client, connection, transport, protocol = api_client
send = patch_send(client)
starts = []
stops = []
async def handle_start(
conversation_id: str,
flags: int,
audio_settings: VoiceAssistantAudioSettings,
wake_word_phrase: str | None,
) -> int | None:
starts.append((conversation_id, flags, audio_settings, wake_word_phrase))
await asyncio.sleep(10)
# Return None to indicate failure
starts.append("never")
return None
async def handle_stop() -> None:
stops.append(True)
unsub = client.subscribe_voice_assistant(
handle_start=handle_start, handle_stop=handle_stop
)
send.assert_called_once_with(SubscribeVoiceAssistantRequest(subscribe=True))
send.reset_mock()
audio_settings = VoiceAssistantAudioSettings(
noise_suppression_level=42,
auto_gain=42,
volume_multiplier=42,
)
response: message.Message = VoiceAssistantRequest(
conversation_id="theone",
start=True,
flags=42,
audio_settings=audio_settings,
)
mock_data_received(protocol, generate_plaintext_packet(response))
await asyncio.sleep(0)
await asyncio.sleep(0)
unsub()
await asyncio.sleep(0)
assert not stops
assert starts == [
(
"theone",
42,
VoiceAssistantAudioSettingsModel(
noise_suppression_level=42,
auto_gain=42,
volume_multiplier=42,
),
None,
)
]
@pytest.mark.asyncio
async def test_subscribe_voice_assistant_api_audio(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test subscribe_voice_assistant."""
client, connection, transport, protocol = api_client
send = patch_send(client)
starts = []
stops = []
data_received = 0
async def handle_start(
conversation_id: str,
flags: int,
audio_settings: VoiceAssistantAudioSettings,
wake_word_phrase: str | None,
) -> int | None:
starts.append((conversation_id, flags, audio_settings, wake_word_phrase))
return 0
async def handle_stop() -> None:
stops.append(True)
async def handle_audio(data: bytes) -> None:
nonlocal data_received
data_received += len(data)
unsub = client.subscribe_voice_assistant(
handle_start=handle_start, handle_stop=handle_stop, handle_audio=handle_audio
)
send.assert_called_once_with(
SubscribeVoiceAssistantRequest(subscribe=True, flags=4)
)
send.reset_mock()
audio_settings = VoiceAssistantAudioSettings(
noise_suppression_level=42,
auto_gain=42,
volume_multiplier=42,
)
response: message.Message = VoiceAssistantRequest(
conversation_id="theone",
start=True,
flags=42,
audio_settings=audio_settings,
wake_word_phrase="okay nabu",
)
mock_data_received(protocol, generate_plaintext_packet(response))
await asyncio.sleep(0)
await asyncio.sleep(0)
assert starts == [
(
"theone",
42,
VoiceAssistantAudioSettingsModel(
noise_suppression_level=42,
auto_gain=42,
volume_multiplier=42,
),
"okay nabu",
)
]
assert stops == []
send.assert_called_once_with(VoiceAssistantResponse(port=0))
send.reset_mock()
response: message.Message = VoiceAssistantAudio(
data=bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
)
mock_data_received(protocol, generate_plaintext_packet(response))
await asyncio.sleep(0)
assert data_received == 10
response: message.Message = VoiceAssistantAudio(
end=True,
)
mock_data_received(protocol, generate_plaintext_packet(response))
await asyncio.sleep(0)
assert stops == [True]
send.reset_mock()
client.send_voice_assistant_audio(bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))
send.assert_called_once_with(
VoiceAssistantAudio(data=bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))
)
response: message.Message = VoiceAssistantRequest(
conversation_id="theone",
start=False,
)
mock_data_received(protocol, generate_plaintext_packet(response))
await asyncio.sleep(0)
assert stops == [True, True]
send.reset_mock()
unsub()
send.assert_called_once_with(SubscribeVoiceAssistantRequest(subscribe=False))
send.reset_mock()
await client.disconnect(force=True)
# Ensure abort callback is a no-op after disconnect
# and does not raise
unsub()
assert len(send.mock_calls) == 0
@pytest.mark.asyncio
async def test_api_version_after_connection_closed(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test api version is None after connection close."""
client, connection, transport, protocol = api_client
assert client.api_version == APIVersion(1, 9)
await client.disconnect(force=True)
assert client.api_version is None
@pytest.mark.asyncio
async def test_calls_after_connection_closed(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test calls after connection close should raise APIConnectionError."""
client, connection, transport, protocol = api_client
assert client.api_version == APIVersion(1, 9)
await client.disconnect(force=True)
assert client.api_version is None
service = UserService(
name="my_service",
key=1,
args=[],
)
with pytest.raises(APIConnectionError):
client.execute_service(service, {})
for method in (
client.button_command,
client.climate_command,
client.cover_command,
client.fan_command,
client.light_command,
client.valve_command,
client.media_player_command,
client.siren_command,
):
with pytest.raises(APIConnectionError):
await method(1)
with pytest.raises(APIConnectionError):
await client.alarm_control_panel_command(1, AlarmControlPanelCommand.ARM_HOME)
with pytest.raises(APIConnectionError):
await client.date_command(1, 1, 1, 1)
with pytest.raises(APIConnectionError):
await client.lock_command(1, LockCommand.LOCK)
with pytest.raises(APIConnectionError):
await client.number_command(1, 1)
with pytest.raises(APIConnectionError):
await client.select_command(1, "1")
with pytest.raises(APIConnectionError):
await client.switch_command(1, True)
with pytest.raises(APIConnectionError):
await client.text_command(1, "1")