aioesphomeapi/tests/test_client.py

1984 lines
61 KiB
Python

from __future__ import annotations
import asyncio
import itertools
import logging
from functools import partial
from typing import Any
from unittest.mock import AsyncMock, MagicMock, call, patch
import pytest
from google.protobuf import message
from aioesphomeapi._frame_helper.plain_text import APIPlaintextFrameHelper
from aioesphomeapi.api_pb2 import (
AlarmControlPanelCommandRequest,
BinarySensorStateResponse,
BluetoothConnectionsFreeResponse,
BluetoothDeviceClearCacheResponse,
BluetoothDeviceConnectionResponse,
BluetoothDevicePairingResponse,
BluetoothDeviceRequest,
BluetoothDeviceUnpairingResponse,
BluetoothGATTCharacteristic,
BluetoothGATTDescriptor,
BluetoothGATTErrorResponse,
BluetoothGATTGetServicesDoneResponse,
BluetoothGATTGetServicesResponse,
BluetoothGATTNotifyDataResponse,
BluetoothGATTNotifyResponse,
BluetoothGATTReadResponse,
BluetoothGATTService,
BluetoothGATTWriteResponse,
BluetoothLEAdvertisementResponse,
BluetoothLERawAdvertisement,
BluetoothLERawAdvertisementsResponse,
BluetoothServiceData,
ButtonCommandRequest,
CameraImageRequest,
CameraImageResponse,
ClimateCommandRequest,
CoverCommandRequest,
DeviceInfoResponse,
DisconnectResponse,
ExecuteServiceArgument,
ExecuteServiceRequest,
FanCommandRequest,
HomeassistantServiceResponse,
HomeAssistantStateResponse,
LightCommandRequest,
ListEntitiesBinarySensorResponse,
ListEntitiesDoneResponse,
ListEntitiesServicesResponse,
LockCommandRequest,
MediaPlayerCommandRequest,
NumberCommandRequest,
SelectCommandRequest,
SirenCommandRequest,
SubscribeHomeAssistantStateResponse,
SubscribeLogsResponse,
SubscribeVoiceAssistantRequest,
SwitchCommandRequest,
TextCommandRequest,
VoiceAssistantAudioSettings,
VoiceAssistantEventData,
VoiceAssistantEventResponse,
VoiceAssistantRequest,
VoiceAssistantResponse,
)
from aioesphomeapi.client import APIClient
from aioesphomeapi.connection import APIConnection
from aioesphomeapi.core import (
APIConnectionError,
BluetoothGATTAPIError,
TimeoutAPIError,
UnhandledAPIConnectionError,
)
from aioesphomeapi.model import (
AlarmControlPanelCommand,
APIVersion,
BinarySensorInfo,
BinarySensorState,
BluetoothDeviceRequestType,
)
from aioesphomeapi.model import BluetoothGATTService as BluetoothGATTServiceModel
from aioesphomeapi.model import (
BluetoothLEAdvertisement,
BluetoothProxyFeature,
CameraState,
ClimateFanMode,
ClimateMode,
ClimatePreset,
ClimateSwingMode,
ESPHomeBluetoothGATTServices,
FanDirection,
FanSpeed,
HomeassistantServiceCall,
LegacyCoverCommand,
LightColorCapability,
LockCommand,
MediaPlayerCommand,
UserService,
UserServiceArg,
UserServiceArgType,
)
from aioesphomeapi.model import (
VoiceAssistantAudioSettings as VoiceAssistantAudioSettingsModel,
)
from aioesphomeapi.model import VoiceAssistantEventType as VoiceAssistantEventModelType
from aioesphomeapi.reconnect_logic import ReconnectLogic, ReconnectLogicState
from .common import (
Estr,
generate_plaintext_packet,
get_mock_zeroconf,
mock_data_received,
)
from .conftest import PatchableAPIConnection
@pytest.fixture
def auth_client():
client = APIClient(
address="fake.address",
port=6052,
password=None,
)
with patch.object(client, "_connection") as conn:
conn.is_connected = True
yield client
def patch_response_complex(client: APIClient, messages):
async def patched(req, app, stop, msg_types, timeout):
resp = []
for msg in messages:
if app(msg):
resp.append(msg)
if stop(msg):
break
else:
raise ValueError("Response never stopped")
return resp
client._connection.send_messages_await_response_complex = patched
def patch_response_callback(client: APIClient):
on_message = None
def patched(req, callback, msg_types):
nonlocal on_message
on_message = callback
client._connection.send_message_callback_response = patched
async def ret(send):
on_message(send)
return ret
def patch_send(client: APIClient):
send = client._connection.send_message = MagicMock()
return send
def patch_api_version(client: APIClient, version: APIVersion):
client._connection.api_version = version
def test_expected_name(auth_client: APIClient) -> None:
"""Ensure expected name can be set externally."""
assert auth_client.expected_name is None
auth_client.expected_name = "awesome"
assert auth_client.expected_name == "awesome"
@pytest.mark.asyncio
async def test_connect_backwards_compat() -> None:
"""Verify connect is a thin wrapper around start_connection and finish_connection."""
class PatchableApiClient(APIClient):
pass
cli = PatchableApiClient("host", 1234, None)
with patch.object(cli, "start_connection") as mock_start_connection, patch.object(
cli, "finish_connection"
) as mock_finish_connection:
await cli.connect()
assert mock_start_connection.mock_calls == [call(None)]
assert mock_finish_connection.mock_calls == [call(False)]
@pytest.mark.asyncio
async def test_finish_connection_wraps_exceptions_as_unhandled_api_error() -> None:
"""Verify finish_connect re-wraps exceptions as UnhandledAPIError."""
cli = APIClient("1.2.3.4", 1234, None)
loop = asyncio.get_event_loop()
with patch(
"aioesphomeapi.client.APIConnection", PatchableAPIConnection
), patch.object(loop, "sock_connect"):
await cli.start_connection()
with patch.object(
cli._connection,
"send_messages_await_response_complex",
side_effect=Exception("foo"),
):
with pytest.raises(UnhandledAPIConnectionError, match="foo"):
await cli.finish_connection(False)
@pytest.mark.asyncio
async def test_request_while_handshaking(event_loop) -> None:
"""Test trying a request while handshaking raises."""
class PatchableApiClient(APIClient):
pass
cli = PatchableApiClient("host", 1234, None)
with patch.object(
event_loop, "sock_connect", side_effect=partial(asyncio.sleep, 1)
), patch.object(cli, "finish_connection"):
connect_task = asyncio.create_task(cli.connect())
await asyncio.sleep(0)
with pytest.raises(
APIConnectionError, match="Authenticated connection not ready yet"
):
await cli.device_info()
connect_task.cancel()
await asyncio.sleep(0)
@pytest.mark.asyncio
async def test_connect_while_already_connected(auth_client: APIClient) -> None:
"""Test connecting while already connected raises."""
with pytest.raises(APIConnectionError):
await auth_client.start_connection()
@pytest.mark.asyncio
@pytest.mark.parametrize(
"input, output",
[
(
[ListEntitiesBinarySensorResponse(), ListEntitiesDoneResponse()],
([BinarySensorInfo()], []),
),
(
[ListEntitiesServicesResponse(), ListEntitiesDoneResponse()],
([], [UserService()]),
),
],
)
async def test_list_entities(
auth_client: APIClient, input: dict[str, Any], output: dict[str, Any]
) -> None:
patch_response_complex(auth_client, input)
resp = await auth_client.list_entities_services()
assert resp == output
@pytest.mark.asyncio
async def test_subscribe_states(auth_client: APIClient) -> None:
send = patch_response_callback(auth_client)
on_state = MagicMock()
await auth_client.subscribe_states(on_state)
on_state.assert_not_called()
await send(BinarySensorStateResponse())
on_state.assert_called_once_with(BinarySensorState())
@pytest.mark.asyncio
async def test_subscribe_states_camera(auth_client: APIClient) -> None:
send = patch_response_callback(auth_client)
on_state = MagicMock()
await auth_client.subscribe_states(on_state)
await send(CameraImageResponse(key=1, data=b"asdf"))
on_state.assert_not_called()
await send(CameraImageResponse(key=1, data=b"qwer", done=True))
on_state.assert_called_once_with(CameraState(key=1, data=b"asdfqwer"))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1), dict(key=1)),
(
dict(key=1, position=1.0),
dict(
key=1, has_legacy_command=True, legacy_command=LegacyCoverCommand.OPEN
),
),
(
dict(key=1, position=0.0),
dict(
key=1, has_legacy_command=True, legacy_command=LegacyCoverCommand.CLOSE
),
),
(
dict(key=1, stop=True),
dict(
key=1, has_legacy_command=True, legacy_command=LegacyCoverCommand.STOP
),
),
],
)
async def test_cover_command_legacy(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
patch_api_version(auth_client, APIVersion(1, 0))
await auth_client.cover_command(**cmd)
send.assert_called_once_with(CoverCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1), dict(key=1)),
(dict(key=1, position=0.5), dict(key=1, has_position=True, position=0.5)),
(dict(key=1, position=0.0), dict(key=1, has_position=True, position=0.0)),
(dict(key=1, stop=True), dict(key=1, stop=True)),
(
dict(key=1, position=1.0, tilt=0.8),
dict(key=1, has_position=True, position=1.0, has_tilt=True, tilt=0.8),
),
],
)
async def test_cover_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
patch_api_version(auth_client, APIVersion(1, 1))
await auth_client.cover_command(**cmd)
send.assert_called_once_with(CoverCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1), dict(key=1)),
(dict(key=1, state=True), dict(key=1, has_state=True, state=True)),
(
dict(key=1, speed=FanSpeed.LOW),
dict(key=1, has_speed=True, speed=FanSpeed.LOW),
),
(
dict(key=1, speed_level=10),
dict(key=1, has_speed_level=True, speed_level=10),
),
(
dict(key=1, oscillating=False),
dict(key=1, has_oscillating=True, oscillating=False),
),
(
dict(key=1, direction=FanDirection.REVERSE),
dict(key=1, has_direction=True, direction=FanDirection.REVERSE),
),
],
)
async def test_fan_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
await auth_client.fan_command(**cmd)
send.assert_called_once_with(FanCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1), dict(key=1)),
(dict(key=1, state=True), dict(key=1, has_state=True, state=True)),
(dict(key=1, brightness=0.8), dict(key=1, has_brightness=True, brightness=0.8)),
(
dict(key=1, rgb=(0.1, 0.5, 1.0)),
dict(key=1, has_rgb=True, red=0.1, green=0.5, blue=1.0),
),
(dict(key=1, white=0.0), dict(key=1, has_white=True, white=0.0)),
(
dict(key=1, color_temperature=0.0),
dict(key=1, has_color_temperature=True, color_temperature=0.0),
),
(
dict(key=1, color_brightness=0.0),
dict(key=1, has_color_brightness=True, color_brightness=0.0),
),
(
dict(key=1, cold_white=1.0, warm_white=2.0),
dict(
key=1,
has_cold_white=True,
cold_white=1.0,
has_warm_white=True,
warm_white=2.0,
),
),
(
dict(key=1, transition_length=0.1),
dict(key=1, has_transition_length=True, transition_length=100),
),
(
dict(key=1, flash_length=0.1),
dict(key=1, has_flash_length=True, flash_length=100),
),
(dict(key=1, effect="special"), dict(key=1, has_effect=True, effect="special")),
(
dict(
key=1,
color_mode=LightColorCapability.COLOR_TEMPERATURE,
color_temperature=153.0,
),
dict(
key=1,
has_color_mode=True,
color_mode=LightColorCapability.COLOR_TEMPERATURE,
has_color_temperature=True,
color_temperature=153.0,
),
),
],
)
async def test_light_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
await auth_client.light_command(**cmd)
send.assert_called_once_with(LightCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1, state=False), dict(key=1, state=False)),
(dict(key=1, state=True), dict(key=1, state=True)),
],
)
async def test_switch_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
await auth_client.switch_command(**cmd)
send.assert_called_once_with(SwitchCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(
dict(key=1, preset=ClimatePreset.HOME),
dict(key=1, has_legacy_away=True, legacy_away=False),
),
(
dict(key=1, preset=ClimatePreset.AWAY),
dict(key=1, has_legacy_away=True, legacy_away=True),
),
],
)
async def test_climate_command_legacy(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
patch_api_version(auth_client, APIVersion(1, 4))
await auth_client.climate_command(**cmd)
send.assert_called_once_with(ClimateCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(
dict(key=1, mode=ClimateMode.HEAT),
dict(key=1, has_mode=True, mode=ClimateMode.HEAT),
),
(
dict(key=1, target_temperature=21.0),
dict(key=1, has_target_temperature=True, target_temperature=21.0),
),
(
dict(key=1, target_temperature_low=21.0),
dict(key=1, has_target_temperature_low=True, target_temperature_low=21.0),
),
(
dict(key=1, target_temperature_high=21.0),
dict(key=1, has_target_temperature_high=True, target_temperature_high=21.0),
),
(
dict(key=1, fan_mode=ClimateFanMode.LOW),
dict(key=1, has_fan_mode=True, fan_mode=ClimateFanMode.LOW),
),
(
dict(key=1, swing_mode=ClimateSwingMode.OFF),
dict(key=1, has_swing_mode=True, swing_mode=ClimateSwingMode.OFF),
),
(
dict(key=1, custom_fan_mode="asdf"),
dict(key=1, has_custom_fan_mode=True, custom_fan_mode="asdf"),
),
(
dict(key=1, preset=ClimatePreset.AWAY),
dict(key=1, has_preset=True, preset=ClimatePreset.AWAY),
),
(
dict(key=1, custom_preset="asdf"),
dict(key=1, has_custom_preset=True, custom_preset="asdf"),
),
],
)
async def test_climate_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
patch_api_version(auth_client, APIVersion(1, 5))
await auth_client.climate_command(**cmd)
send.assert_called_once_with(ClimateCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1, state=0.0), dict(key=1, state=0.0)),
(dict(key=1, state=100.0), dict(key=1, state=100.0)),
],
)
async def test_number_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
await auth_client.number_command(**cmd)
send.assert_called_once_with(NumberCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1, command=LockCommand.LOCK), dict(key=1, command=LockCommand.LOCK)),
(
dict(key=1, command=LockCommand.UNLOCK),
dict(key=1, command=LockCommand.UNLOCK),
),
(dict(key=1, command=LockCommand.OPEN), dict(key=1, command=LockCommand.OPEN)),
(
dict(key=1, command=LockCommand.OPEN, code="1234"),
dict(key=1, command=LockCommand.OPEN, code="1234"),
),
],
)
async def test_lock_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
await auth_client.lock_command(**cmd)
send.assert_called_once_with(LockCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1, state="One"), dict(key=1, state="One")),
(dict(key=1, state="Two"), dict(key=1, state="Two")),
],
)
async def test_select_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
await auth_client.select_command(**cmd)
send.assert_called_once_with(SelectCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(
dict(key=1, command=MediaPlayerCommand.MUTE),
dict(key=1, has_command=True, command=MediaPlayerCommand.MUTE),
),
(
dict(key=1, volume=1.0),
dict(key=1, has_volume=True, volume=1.0),
),
(
dict(key=1, media_url="http://example.com"),
dict(key=1, has_media_url=True, media_url="http://example.com"),
),
],
)
async def test_media_player_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
await auth_client.media_player_command(**cmd)
send.assert_called_once_with(MediaPlayerCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1), dict(key=1)),
],
)
async def test_button_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
await auth_client.button_command(**cmd)
send.assert_called_once_with(ButtonCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1, state=True), dict(key=1, state=True, has_state=True)),
(dict(key=1, state=False), dict(key=1, state=False, has_state=True)),
(dict(key=1, state=None), dict(key=1, state=None, has_state=False)),
(
dict(key=1, state=True, tone="any"),
dict(key=1, state=True, has_state=True, has_tone=True, tone="any"),
),
(
dict(key=1, state=True, tone=None),
dict(key=1, state=True, has_state=True, has_tone=False, tone=None),
),
(
dict(key=1, state=True, volume=5),
dict(key=1, state=True, has_volume=True, volume=5, has_state=True),
),
(
dict(key=1, state=True, duration=5),
dict(key=1, state=True, has_duration=True, duration=5, has_state=True),
),
],
)
async def test_siren_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
await auth_client.siren_command(**cmd)
send.assert_called_once_with(SirenCommandRequest(**req))
@pytest.mark.asyncio
async def test_execute_service(auth_client: APIClient) -> None:
send = patch_send(auth_client)
patch_api_version(auth_client, APIVersion(1, 3))
service = UserService(
name="my_service",
key=1,
args=[
UserServiceArg(name="arg1", type=UserServiceArgType.BOOL),
UserServiceArg(name="arg2", type=UserServiceArgType.INT),
UserServiceArg(name="arg3", type=UserServiceArgType.FLOAT),
UserServiceArg(name="arg4", type=UserServiceArgType.STRING),
UserServiceArg(name="arg5", type=UserServiceArgType.BOOL_ARRAY),
UserServiceArg(name="arg6", type=UserServiceArgType.INT_ARRAY),
UserServiceArg(name="arg7", type=UserServiceArgType.FLOAT_ARRAY),
UserServiceArg(name="arg8", type=UserServiceArgType.STRING_ARRAY),
],
)
with pytest.raises(KeyError):
await auth_client.execute_service(service, data={})
await auth_client.execute_service(
service,
data={
"arg1": False,
"arg2": 42,
"arg3": 99.0,
"arg4": "asdf",
"arg5": [False, True, False],
"arg6": [42, 10, 9],
"arg7": [0.0, -100.0],
"arg8": [],
},
)
send.assert_called_once_with(
ExecuteServiceRequest(
key=1,
args=[
ExecuteServiceArgument(bool_=False),
ExecuteServiceArgument(int_=42),
ExecuteServiceArgument(float_=99.0),
ExecuteServiceArgument(string_="asdf"),
ExecuteServiceArgument(bool_array=[False, True, False]),
ExecuteServiceArgument(int_array=[42, 10, 9]),
ExecuteServiceArgument(float_array=[0.0, -100.0]),
ExecuteServiceArgument(string_array=[]),
],
)
)
send.reset_mock()
patch_api_version(auth_client, APIVersion(1, 2))
service = UserService(
name="my_service",
key=2,
args=[
UserServiceArg(name="arg1", type=UserServiceArgType.BOOL),
UserServiceArg(name="arg2", type=UserServiceArgType.INT),
],
)
# Test legacy_int
await auth_client.execute_service(
service,
data={
"arg1": False,
"arg2": 42,
},
)
send.assert_called_once_with(
ExecuteServiceRequest(
key=2,
args=[
ExecuteServiceArgument(bool_=False),
ExecuteServiceArgument(legacy_int=42),
],
)
)
send.reset_mock()
# Test arg order
await auth_client.execute_service(
service,
data={
"arg2": 42,
"arg1": False,
},
)
send.assert_called_once_with(
ExecuteServiceRequest(
key=2,
args=[
ExecuteServiceArgument(bool_=False),
ExecuteServiceArgument(legacy_int=42),
],
)
)
send.reset_mock()
@pytest.mark.asyncio
async def test_request_single_image(auth_client: APIClient) -> None:
send = patch_send(auth_client)
await auth_client.request_single_image()
send.assert_called_once_with(CameraImageRequest(single=True, stream=False))
@pytest.mark.asyncio
async def test_request_image_stream(auth_client: APIClient) -> None:
send = patch_send(auth_client)
await auth_client.request_image_stream()
send.assert_called_once_with(CameraImageRequest(single=False, stream=True))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(
dict(key=1, command=AlarmControlPanelCommand.ARM_AWAY),
dict(key=1, command=AlarmControlPanelCommand.ARM_AWAY, code=None),
),
(
dict(key=1, command=AlarmControlPanelCommand.ARM_HOME),
dict(key=1, command=AlarmControlPanelCommand.ARM_HOME, code=None),
),
(
dict(key=1, command=AlarmControlPanelCommand.DISARM, code="1234"),
dict(key=1, command=AlarmControlPanelCommand.DISARM, code="1234"),
),
],
)
async def test_alarm_panel_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
await auth_client.alarm_control_panel_command(**cmd)
send.assert_called_once_with(AlarmControlPanelCommandRequest(**req))
@pytest.mark.asyncio
@pytest.mark.parametrize(
"cmd, req",
[
(dict(key=1, state="hello world"), dict(key=1, state="hello world")),
(dict(key=1, state="goodbye"), dict(key=1, state="goodbye")),
],
)
async def test_text_command(
auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
send = patch_send(auth_client)
await auth_client.text_command(**cmd)
send.assert_called_once_with(TextCommandRequest(**req))
@pytest.mark.asyncio
async def test_noise_psk_handles_subclassed_string():
"""Test that the noise_psk gets converted to a string."""
class PatchableAPIClient(APIClient):
pass
cli = PatchableAPIClient(
address=Estr("1.2.3.4"),
port=6052,
password=None,
noise_psk=Estr("QRTIErOb/fcE9Ukd/5qA3RGYMn0Y+p06U58SCtOXvPc="),
expected_name=Estr("mydevice"),
)
# Make sure its not a subclassed string
assert type(cli._params.noise_psk) is str
assert type(cli._params.address) is str
assert type(cli._params.expected_name) is str
rl = ReconnectLogic(
client=cli,
on_disconnect=AsyncMock(),
on_connect=AsyncMock(),
zeroconf_instance=get_mock_zeroconf(),
name="mydevice",
)
assert rl._connection_state is ReconnectLogicState.DISCONNECTED
with patch.object(cli, "start_connection"), patch.object(cli, "finish_connection"):
await rl.start()
for _ in range(3):
await asyncio.sleep(0)
rl.stop_callback()
# Wait for cancellation to propagate
for _ in range(4):
await asyncio.sleep(0)
assert rl._connection_state is ReconnectLogicState.DISCONNECTED
@pytest.mark.asyncio
async def test_no_noise_psk():
"""Test not using a noise_psk."""
cli = APIClient(
address=Estr("1.2.3.4"),
port=6052,
password=None,
noise_psk=None,
expected_name=Estr("mydevice"),
)
# Make sure its not a subclassed string
assert cli._params.noise_psk is None
assert type(cli._params.address) is str
assert type(cli._params.expected_name) is str
@pytest.mark.asyncio
async def test_empty_noise_psk_or_expected_name():
"""Test an empty noise_psk is treated as None."""
cli = APIClient(
address=Estr("1.2.3.4"),
port=6052,
password=None,
noise_psk="",
expected_name="",
)
assert cli._params.noise_psk is None
assert type(cli._params.address) is str
assert cli._params.expected_name is None
@pytest.mark.asyncio
async def test_bluetooth_disconnect(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_device_disconnect."""
client, connection, transport, protocol = api_client
disconnect_task = asyncio.create_task(client.bluetooth_device_disconnect(1234))
await asyncio.sleep(0)
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=False
)
mock_data_received(protocol, generate_plaintext_packet(response))
await disconnect_task
@pytest.mark.asyncio
async def test_bluetooth_pair(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_device_pair."""
client, connection, transport, protocol = api_client
pair_task = asyncio.create_task(client.bluetooth_device_pair(1234))
await asyncio.sleep(0)
response: message.Message = BluetoothDevicePairingResponse(address=4567)
mock_data_received(protocol, generate_plaintext_packet(response))
await asyncio.sleep(0)
assert not pair_task.done()
response: message.Message = BluetoothDevicePairingResponse(address=1234)
mock_data_received(protocol, generate_plaintext_packet(response))
await pair_task
@pytest.mark.asyncio
async def test_bluetooth_pair_connection_drops(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test connection drop during bluetooth_device_pair."""
client, connection, transport, protocol = api_client
pair_task = asyncio.create_task(client.bluetooth_device_pair(1234))
await asyncio.sleep(0)
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=False, error=13
)
mock_data_received(protocol, generate_plaintext_packet(response))
with pytest.raises(
APIConnectionError,
match="Peripheral changed connections status while pairing: 13",
):
await pair_task
@pytest.mark.asyncio
async def test_bluetooth_unpair(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_device_unpair."""
client, connection, transport, protocol = api_client
unpair_task = asyncio.create_task(client.bluetooth_device_unpair(1234))
await asyncio.sleep(0)
response: message.Message = BluetoothDeviceUnpairingResponse(address=1234)
mock_data_received(protocol, generate_plaintext_packet(response))
await unpair_task
@pytest.mark.asyncio
async def test_bluetooth_clear_cache(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_device_clear_cache."""
client, connection, transport, protocol = api_client
clear_task = asyncio.create_task(client.bluetooth_device_clear_cache(1234))
await asyncio.sleep(0)
response: message.Message = BluetoothDeviceClearCacheResponse(address=1234)
mock_data_received(protocol, generate_plaintext_packet(response))
await clear_task
@pytest.mark.asyncio
async def test_device_info(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test fetching device info."""
client, connection, transport, protocol = api_client
assert client.log_name == "fake @ 10.0.0.512"
device_info_task = asyncio.create_task(client.device_info())
await asyncio.sleep(0)
response: message.Message = DeviceInfoResponse(
name="realname",
friendly_name="My Device",
has_deep_sleep=True,
)
mock_data_received(protocol, generate_plaintext_packet(response))
device_info = await device_info_task
assert device_info.name == "realname"
assert device_info.friendly_name == "My Device"
assert device_info.has_deep_sleep
assert client.log_name == "realname @ 10.0.0.512"
disconnect_task = asyncio.create_task(client.disconnect())
await asyncio.sleep(0)
response: message.Message = DisconnectResponse()
mock_data_received(protocol, generate_plaintext_packet(response))
await disconnect_task
with pytest.raises(APIConnectionError, match="Not connected"):
await client.device_info()
@pytest.mark.asyncio
async def test_bluetooth_gatt_read(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_read."""
client, connection, transport, protocol = api_client
read_task = asyncio.create_task(client.bluetooth_gatt_read(1234, 1234))
await asyncio.sleep(0)
other_response: message.Message = BluetoothGATTReadResponse(
address=1234, handle=4567, data=b"4567"
)
mock_data_received(protocol, generate_plaintext_packet(other_response))
response: message.Message = BluetoothGATTReadResponse(
address=1234, handle=1234, data=b"1234"
)
mock_data_received(protocol, generate_plaintext_packet(response))
assert await read_task == b"1234"
@pytest.mark.asyncio
async def test_bluetooth_gatt_read_error(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_read that errors."""
client, connection, transport, protocol = api_client
read_task = asyncio.create_task(client.bluetooth_gatt_read(1234, 1234))
await asyncio.sleep(0)
error_response: message.Message = BluetoothGATTErrorResponse(
address=1234, handle=1234
)
mock_data_received(protocol, generate_plaintext_packet(error_response))
with pytest.raises(BluetoothGATTAPIError):
await read_task
@pytest.mark.asyncio
async def test_bluetooth_gatt_read_descriptor(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_read_descriptor."""
client, connection, transport, protocol = api_client
read_task = asyncio.create_task(client.bluetooth_gatt_read_descriptor(1234, 1234))
await asyncio.sleep(0)
other_response: message.Message = BluetoothGATTReadResponse(
address=1234, handle=4567, data=b"4567"
)
mock_data_received(protocol, generate_plaintext_packet(other_response))
response: message.Message = BluetoothGATTReadResponse(
address=1234, handle=1234, data=b"1234"
)
mock_data_received(protocol, generate_plaintext_packet(response))
assert await read_task == b"1234"
@pytest.mark.asyncio
async def test_bluetooth_gatt_write(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_write."""
client, connection, transport, protocol = api_client
write_task = asyncio.create_task(
client.bluetooth_gatt_write(1234, 1234, b"1234", True)
)
await asyncio.sleep(0)
other_response: message.Message = BluetoothGATTWriteResponse(
address=1234, handle=4567
)
mock_data_received(protocol, generate_plaintext_packet(other_response))
response: message.Message = BluetoothGATTWriteResponse(address=1234, handle=1234)
mock_data_received(protocol, generate_plaintext_packet(response))
await write_task
@pytest.mark.asyncio
async def test_bluetooth_gatt_write_without_response(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_write without response."""
client, connection, transport, protocol = api_client
transport.reset_mock()
write_task = asyncio.create_task(
client.bluetooth_gatt_write(1234, 1234, b"1234", False)
)
await asyncio.sleep(0)
await write_task
assert transport.mock_calls[0][1][0] == b'\x00\x0cK\x08\xd2\t\x10\xd2\t"\x041234'
with pytest.raises(TimeoutAPIError, match="BluetoothGATTWriteResponse"):
await client.bluetooth_gatt_write(1234, 1234, b"1234", True, timeout=0)
@pytest.mark.asyncio
async def test_bluetooth_gatt_write_descriptor(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_write_descriptor."""
client, connection, transport, protocol = api_client
write_task = asyncio.create_task(
client.bluetooth_gatt_write_descriptor(1234, 1234, b"1234", True)
)
await asyncio.sleep(0)
other_response: message.Message = BluetoothGATTWriteResponse(
address=1234, handle=4567
)
mock_data_received(protocol, generate_plaintext_packet(other_response))
response: message.Message = BluetoothGATTWriteResponse(address=1234, handle=1234)
mock_data_received(protocol, generate_plaintext_packet(response))
await write_task
@pytest.mark.asyncio
async def test_bluetooth_gatt_write_descriptor_without_response(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_write_descriptor without response."""
client, connection, transport, protocol = api_client
transport.reset_mock()
write_task = asyncio.create_task(
client.bluetooth_gatt_write_descriptor(
1234, 1234, b"1234", wait_for_response=False
)
)
await asyncio.sleep(0)
await write_task
assert transport.mock_calls[0][1][0] == b"\x00\x0cM\x08\xd2\t\x10\xd2\t\x1a\x041234"
with pytest.raises(TimeoutAPIError, match="BluetoothGATTWriteResponse"):
await client.bluetooth_gatt_write_descriptor(1234, 1234, b"1234", timeout=0)
@pytest.mark.asyncio
async def test_bluetooth_gatt_get_services(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_get_services success case."""
client, connection, transport, protocol = api_client
services_task = asyncio.create_task(client.bluetooth_gatt_get_services(1234))
await asyncio.sleep(0)
service1: message.Message = BluetoothGATTService(
uuid=[1, 1],
handle=1,
characteristics=[
BluetoothGATTCharacteristic(
uuid=[1, 2],
handle=2,
properties=1,
descriptors=[BluetoothGATTDescriptor(uuid=[1, 3], handle=3)],
)
],
)
response: message.Message = BluetoothGATTGetServicesResponse(
address=1234, services=[service1]
)
mock_data_received(protocol, generate_plaintext_packet(response))
done_response: message.Message = BluetoothGATTGetServicesDoneResponse(address=1234)
mock_data_received(protocol, generate_plaintext_packet(done_response))
services = await services_task
service = BluetoothGATTServiceModel.from_pb(service1)
assert services == ESPHomeBluetoothGATTServices(
address=1234,
services=[service],
)
@pytest.mark.asyncio
async def test_bluetooth_gatt_get_services_errors(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_get_services with a failure."""
client, connection, transport, protocol = api_client
services_task = asyncio.create_task(client.bluetooth_gatt_get_services(1234))
await asyncio.sleep(0)
service1: message.Message = BluetoothGATTService(
uuid=[1, 1], handle=1, characteristics=[]
)
response: message.Message = BluetoothGATTGetServicesResponse(
address=1234, services=[service1]
)
mock_data_received(protocol, generate_plaintext_packet(response))
done_response: message.Message = BluetoothGATTErrorResponse(address=1234)
mock_data_received(protocol, generate_plaintext_packet(done_response))
with pytest.raises(BluetoothGATTAPIError):
await services_task
@pytest.mark.asyncio
async def test_bluetooth_gatt_start_notify(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_start_notify."""
client, connection, transport, protocol = api_client
notifies = []
handlers_before = len(
list(itertools.chain(*connection._get_message_handlers().values()))
)
def on_bluetooth_gatt_notify(handle: int, data: bytearray) -> None:
notifies.append((handle, data))
notify_task = asyncio.create_task(
client.bluetooth_gatt_start_notify(1234, 1, on_bluetooth_gatt_notify)
)
await asyncio.sleep(0)
notify_response: message.Message = BluetoothGATTNotifyResponse(
address=1234, handle=1
)
data_response: message.Message = BluetoothGATTNotifyDataResponse(
address=1234, handle=1, data=b"gotit"
)
mock_data_received(
protocol,
generate_plaintext_packet(notify_response)
+ generate_plaintext_packet(data_response),
)
cancel_cb, abort_cb = await notify_task
assert notifies == [(1, b"gotit")]
second_data_response: message.Message = BluetoothGATTNotifyDataResponse(
address=1234, handle=1, data=b"after finished"
)
mock_data_received(protocol, generate_plaintext_packet(second_data_response))
assert notifies == [(1, b"gotit"), (1, b"after finished")]
await cancel_cb()
assert (
len(list(itertools.chain(*connection._get_message_handlers().values())))
== handlers_before
)
# Ensure abort callback is a no-op after cancel
# and doesn't raise
abort_cb()
await client.disconnect(force=True)
# Ensure abort callback is a no-op after disconnect
# and does not raise
await cancel_cb()
@pytest.mark.asyncio
async def test_bluetooth_gatt_start_notify_fails(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_gatt_start_notify failure does not leak."""
client, connection, transport, protocol = api_client
notifies = []
def on_bluetooth_gatt_notify(handle: int, data: bytearray) -> None:
notifies.append((handle, data))
handlers_before = len(
list(itertools.chain(*connection._get_message_handlers().values()))
)
with patch.object(
connection,
"send_messages_await_response_complex",
side_effect=APIConnectionError,
), pytest.raises(APIConnectionError):
await client.bluetooth_gatt_start_notify(1234, 1, on_bluetooth_gatt_notify)
assert (
len(list(itertools.chain(*connection._get_message_handlers().values())))
== handlers_before
)
@pytest.mark.asyncio
async def test_subscribe_bluetooth_le_advertisements(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test subscribe_bluetooth_le_advertisements."""
client, connection, transport, protocol = api_client
advs = []
def on_bluetooth_le_advertisements(adv: BluetoothLEAdvertisement) -> None:
advs.append(adv)
unsub = await client.subscribe_bluetooth_le_advertisements(
on_bluetooth_le_advertisements
)
await asyncio.sleep(0)
response: message.Message = BluetoothLEAdvertisementResponse(
address=1234,
name=b"mydevice",
rssi=-50,
service_uuids=["1234"],
service_data=[
BluetoothServiceData(
uuid="1234",
data=b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
)
],
manufacturer_data=[
BluetoothServiceData(
uuid="1234",
data=b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
)
],
address_type=1,
)
mock_data_received(protocol, generate_plaintext_packet(response))
assert advs == [
BluetoothLEAdvertisement(
address=1234,
name="mydevice",
rssi=-50,
service_uuids=["000034-0000-1000-8000-00805f9b34fb"],
manufacturer_data={
4660: b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
},
service_data={
"000034-0000-1000-8000-00805f9b34fb": b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
},
address_type=1,
)
]
unsub()
@pytest.mark.asyncio
async def test_subscribe_bluetooth_le_raw_advertisements(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test subscribe_bluetooth_le_raw_advertisements."""
client, connection, transport, protocol = api_client
adv_groups = []
def on_raw_bluetooth_le_advertisements(
advs: list[BluetoothLERawAdvertisementsResponse],
) -> None:
adv_groups.append(advs)
unsub = await client.subscribe_bluetooth_le_raw_advertisements(
on_raw_bluetooth_le_advertisements
)
await asyncio.sleep(0)
response: message.Message = BluetoothLERawAdvertisementsResponse(
advertisements=[
BluetoothLERawAdvertisement(
address=1234,
rssi=-50,
address_type=1,
data=b"1234",
)
]
)
mock_data_received(protocol, generate_plaintext_packet(response))
assert len(adv_groups) == 1
first_adv = adv_groups[0][0]
assert first_adv.address == 1234
assert first_adv.rssi == -50
assert first_adv.address_type == 1
assert first_adv.data == b"1234"
unsub()
@pytest.mark.asyncio
async def test_subscribe_bluetooth_connections_free(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test subscribe_bluetooth_connections_free."""
client, connection, transport, protocol = api_client
connections = []
def on_bluetooth_connections_free(free: int, limit: int) -> None:
connections.append((free, limit))
unsub = await client.subscribe_bluetooth_connections_free(
on_bluetooth_connections_free
)
await asyncio.sleep(0)
response: message.Message = BluetoothConnectionsFreeResponse(free=2, limit=3)
mock_data_received(protocol, generate_plaintext_packet(response))
assert connections == [(2, 3)]
unsub()
@pytest.mark.asyncio
async def test_subscribe_home_assistant_states(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test subscribe_home_assistant_states."""
client, connection, transport, protocol = api_client
states = []
def on_subscribe_home_assistant_states(
entity_id: str, attribute: str | None
) -> None:
states.append((entity_id, attribute))
await client.subscribe_home_assistant_states(on_subscribe_home_assistant_states)
await asyncio.sleep(0)
response: message.Message = SubscribeHomeAssistantStateResponse(
entity_id="sensor.red", attribute="any"
)
mock_data_received(protocol, generate_plaintext_packet(response))
assert states == [("sensor.red", "any")]
@pytest.mark.asyncio
async def test_subscribe_logs(auth_client: APIClient) -> None:
send = patch_response_callback(auth_client)
on_logs = MagicMock()
await auth_client.subscribe_logs(on_logs)
log_msg = SubscribeLogsResponse(level=1, message=b"asdf")
await send(log_msg)
on_logs.assert_called_with(log_msg)
@pytest.mark.asyncio
async def test_send_home_assistant_state(auth_client: APIClient) -> None:
send = patch_send(auth_client)
await auth_client.send_home_assistant_state("binary_sensor.bla", None, "on")
send.assert_called_once_with(
HomeAssistantStateResponse(
entity_id="binary_sensor.bla", state="on", attribute=None
)
)
@pytest.mark.asyncio
async def test_subscribe_service_calls(auth_client: APIClient) -> None:
send = patch_response_callback(auth_client)
on_service_call = MagicMock()
await auth_client.subscribe_service_calls(on_service_call)
service_msg = HomeassistantServiceResponse(service="bob")
await send(service_msg)
on_service_call.assert_called_with(HomeassistantServiceCall.from_pb(service_msg))
@pytest.mark.asyncio
async def test_set_debug(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test set_debug."""
client, connection, transport, protocol = api_client
response: message.Message = DeviceInfoResponse(
name="realname",
friendly_name="My Device",
has_deep_sleep=True,
)
caplog.set_level(logging.DEBUG)
client.set_debug(True)
assert client.log_name == "fake @ 10.0.0.512"
device_info_task = asyncio.create_task(client.device_info())
await asyncio.sleep(0)
mock_data_received(protocol, generate_plaintext_packet(response))
await device_info_task
assert "My Device" in caplog.text
caplog.clear()
client.set_debug(False)
device_info_task = asyncio.create_task(client.device_info())
await asyncio.sleep(0)
mock_data_received(protocol, generate_plaintext_packet(response))
await device_info_task
assert "My Device" not in caplog.text
@pytest.mark.asyncio
async def test_force_disconnect(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test force disconnect can be called multiple times."""
client, connection, transport, protocol = api_client
assert connection.is_connected is True
assert connection.on_stop is not None
await client.disconnect(force=True)
assert client._connection is None
assert connection.is_connected is False
await client.disconnect(force=False)
assert connection.is_connected is False
@pytest.mark.asyncio
@pytest.mark.parametrize(
("has_cache", "feature_flags", "method"),
[
(False, BluetoothProxyFeature(0), BluetoothDeviceRequestType.CONNECT),
(
False,
BluetoothProxyFeature.REMOTE_CACHING,
BluetoothDeviceRequestType.CONNECT_V3_WITHOUT_CACHE,
),
(
True,
BluetoothProxyFeature.REMOTE_CACHING,
BluetoothDeviceRequestType.CONNECT_V3_WITH_CACHE,
),
],
)
async def test_bluetooth_device_connect(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
has_cache: bool,
feature_flags: BluetoothProxyFeature,
method: BluetoothDeviceRequestType,
) -> None:
"""Test bluetooth_device_connect."""
client, connection, transport, protocol = api_client
states = []
def on_bluetooth_connection_state(connected: bool, mtu: int, error: int) -> None:
states.append((connected, mtu, error))
connect_task = asyncio.create_task(
client.bluetooth_device_connect(
1234,
on_bluetooth_connection_state,
timeout=1,
feature_flags=feature_flags,
has_cache=has_cache,
disconnect_timeout=1,
address_type=1,
)
)
await asyncio.sleep(0)
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=True, mtu=23, error=0
)
mock_data_received(protocol, generate_plaintext_packet(response))
cancel = await connect_task
assert states == [(True, 23, 0)]
transport.write.assert_called_once_with(
generate_plaintext_packet(
BluetoothDeviceRequest(
address=1234,
request_type=method,
has_address_type=True,
address_type=1,
),
)
)
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=False, mtu=23, error=7
)
mock_data_received(protocol, generate_plaintext_packet(response))
await asyncio.sleep(0)
assert states == [(True, 23, 0), (False, 23, 7)]
cancel()
# After cancel, no more messages should called back
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=False, mtu=23, error=8
)
mock_data_received(protocol, generate_plaintext_packet(response))
await asyncio.sleep(0)
assert states == [(True, 23, 0), (False, 23, 7)]
@pytest.mark.asyncio
async def test_bluetooth_device_connect_and_disconnect_times_out(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_device_connect and disconnect times out."""
client, connection, transport, protocol = api_client
states = []
def on_bluetooth_connection_state(connected: bool, mtu: int, error: int) -> None:
states.append((connected, mtu, error))
connect_task = asyncio.create_task(
client.bluetooth_device_connect(
1234,
on_bluetooth_connection_state,
timeout=0,
feature_flags=0,
has_cache=True,
disconnect_timeout=0,
address_type=1,
)
)
with pytest.raises(TimeoutAPIError):
await connect_task
assert states == []
@pytest.mark.asyncio
async def test_bluetooth_device_connect_times_out_disconnect_ok(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_device_connect and disconnect times out."""
client, connection, transport, protocol = api_client
states = []
def on_bluetooth_connection_state(connected: bool, mtu: int, error: int) -> None:
states.append((connected, mtu, error))
connect_task = asyncio.create_task(
client.bluetooth_device_connect(
1234,
on_bluetooth_connection_state,
timeout=0,
feature_flags=0,
has_cache=True,
disconnect_timeout=1,
address_type=1,
)
)
await asyncio.sleep(0)
# The connect request should be written
assert len(transport.write.mock_calls) == 1
await asyncio.sleep(0)
await asyncio.sleep(0)
await asyncio.sleep(0)
# Now that we timed out, the disconnect
# request should be written
assert len(transport.write.mock_calls) == 2
response: message.Message = BluetoothDeviceConnectionResponse(
address=1234, connected=False, mtu=23, error=8
)
mock_data_received(protocol, generate_plaintext_packet(response))
with pytest.raises(TimeoutAPIError):
await connect_task
assert states == []
@pytest.mark.asyncio
async def test_bluetooth_device_connect_cancelled(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test bluetooth_device_connect handles cancellation."""
client, connection, transport, protocol = api_client
states = []
handlers_before = len(
list(itertools.chain(*connection._get_message_handlers().values()))
)
def on_bluetooth_connection_state(connected: bool, mtu: int, error: int) -> None:
states.append((connected, mtu, error))
connect_task = asyncio.create_task(
client.bluetooth_device_connect(
1234,
on_bluetooth_connection_state,
timeout=10,
feature_flags=0,
has_cache=True,
disconnect_timeout=10,
address_type=1,
)
)
await asyncio.sleep(0)
# The connect request should be written
assert len(transport.write.mock_calls) == 1
connect_task.cancel()
with pytest.raises(asyncio.CancelledError):
await connect_task
assert states == []
handlers_after = len(
list(itertools.chain(*connection._get_message_handlers().values()))
)
# Make sure we do not leak message handlers
assert handlers_after == handlers_before
@pytest.mark.asyncio
async def test_send_voice_assistant_event(auth_client: APIClient) -> None:
send = patch_send(auth_client)
auth_client.send_voice_assistant_event(
VoiceAssistantEventModelType.VOICE_ASSISTANT_ERROR,
{"error": "error", "ok": "ok"},
)
send.assert_called_once_with(
VoiceAssistantEventResponse(
event_type=VoiceAssistantEventModelType.VOICE_ASSISTANT_ERROR.value,
data=[
VoiceAssistantEventData(name="error", value="error"),
VoiceAssistantEventData(name="ok", value="ok"),
],
)
)
send.reset_mock()
auth_client.send_voice_assistant_event(
VoiceAssistantEventModelType.VOICE_ASSISTANT_ERROR, None
)
send.assert_called_once_with(
VoiceAssistantEventResponse(
event_type=VoiceAssistantEventModelType.VOICE_ASSISTANT_ERROR.value,
data=[],
)
)
@pytest.mark.asyncio
async def test_subscribe_voice_assistant(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test subscribe_voice_assistant."""
client, connection, transport, protocol = api_client
send = patch_send(client)
starts = []
stops = []
async def handle_start(
conversation_id: str, flags: int, audio_settings: VoiceAssistantAudioSettings
) -> int | None:
starts.append((conversation_id, flags, audio_settings))
return 42
async def handle_stop() -> None:
stops.append(True)
unsub = await client.subscribe_voice_assistant(handle_start, handle_stop)
send.assert_called_once_with(SubscribeVoiceAssistantRequest(subscribe=True))
send.reset_mock()
audio_settings = VoiceAssistantAudioSettings(
noise_suppression_level=42,
auto_gain=42,
volume_multiplier=42,
)
response: message.Message = VoiceAssistantRequest(
conversation_id="theone",
start=True,
flags=42,
audio_settings=audio_settings,
)
mock_data_received(protocol, generate_plaintext_packet(response))
await asyncio.sleep(0)
await asyncio.sleep(0)
assert starts == [
(
"theone",
42,
VoiceAssistantAudioSettingsModel(
noise_suppression_level=42,
auto_gain=42,
volume_multiplier=42,
),
)
]
assert stops == []
send.assert_called_once_with(VoiceAssistantResponse(port=42))
send.reset_mock()
response: message.Message = VoiceAssistantRequest(
conversation_id="theone",
start=False,
)
mock_data_received(protocol, generate_plaintext_packet(response))
await asyncio.sleep(0)
assert stops == [True]
send.reset_mock()
unsub()
send.assert_called_once_with(SubscribeVoiceAssistantRequest(subscribe=False))
send.reset_mock()
await client.disconnect(force=True)
# Ensure abort callback is a no-op after disconnect
# and does not raise
unsub()
assert len(send.mock_calls) == 0
@pytest.mark.asyncio
async def test_subscribe_voice_assistant_failure(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test subscribe_voice_assistant failure."""
client, connection, transport, protocol = api_client
send = patch_send(client)
starts = []
stops = []
async def handle_start(
conversation_id: str, flags: int, audio_settings: VoiceAssistantAudioSettings
) -> int | None:
starts.append((conversation_id, flags, audio_settings))
# Return None to indicate failure
return None
async def handle_stop() -> None:
stops.append(True)
unsub = await client.subscribe_voice_assistant(handle_start, handle_stop)
send.assert_called_once_with(SubscribeVoiceAssistantRequest(subscribe=True))
send.reset_mock()
audio_settings = VoiceAssistantAudioSettings(
noise_suppression_level=42,
auto_gain=42,
volume_multiplier=42,
)
response: message.Message = VoiceAssistantRequest(
conversation_id="theone",
start=True,
flags=42,
audio_settings=audio_settings,
)
mock_data_received(protocol, generate_plaintext_packet(response))
await asyncio.sleep(0)
await asyncio.sleep(0)
assert starts == [
(
"theone",
42,
VoiceAssistantAudioSettingsModel(
noise_suppression_level=42,
auto_gain=42,
volume_multiplier=42,
),
)
]
assert stops == []
send.assert_called_once_with(VoiceAssistantResponse(error=True))
send.reset_mock()
response: message.Message = VoiceAssistantRequest(
conversation_id="theone",
start=False,
)
mock_data_received(protocol, generate_plaintext_packet(response))
await asyncio.sleep(0)
assert stops == [True]
send.reset_mock()
unsub()
send.assert_called_once_with(SubscribeVoiceAssistantRequest(subscribe=False))
send.reset_mock()
await client.disconnect(force=True)
# Ensure abort callback is a no-op after disconnect
# and does not raise
unsub()
assert len(send.mock_calls) == 0
@pytest.mark.asyncio
async def test_subscribe_voice_assistant_cancels_long_running_handle_start(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test subscribe_voice_assistant cancels long running tasks on unsub."""
client, connection, transport, protocol = api_client
send = patch_send(client)
starts = []
stops = []
async def handle_start(
conversation_id: str, flags: int, audio_settings: VoiceAssistantAudioSettings
) -> int | None:
starts.append((conversation_id, flags, audio_settings))
await asyncio.sleep(10)
# Return None to indicate failure
starts.append("never")
return None
async def handle_stop() -> None:
stops.append(True)
unsub = await client.subscribe_voice_assistant(handle_start, handle_stop)
send.assert_called_once_with(SubscribeVoiceAssistantRequest(subscribe=True))
send.reset_mock()
audio_settings = VoiceAssistantAudioSettings(
noise_suppression_level=42,
auto_gain=42,
volume_multiplier=42,
)
response: message.Message = VoiceAssistantRequest(
conversation_id="theone",
start=True,
flags=42,
audio_settings=audio_settings,
)
mock_data_received(protocol, generate_plaintext_packet(response))
await asyncio.sleep(0)
await asyncio.sleep(0)
unsub()
await asyncio.sleep(0)
assert not stops
assert starts == [
(
"theone",
42,
VoiceAssistantAudioSettingsModel(
noise_suppression_level=42,
auto_gain=42,
volume_multiplier=42,
),
)
]
@pytest.mark.asyncio
async def test_api_version_after_connection_closed(
api_client: tuple[
APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
],
) -> None:
"""Test api version is None after connection close."""
client, connection, transport, protocol = api_client
assert client.api_version == APIVersion(1, 9)
await client.disconnect(force=True)
assert client.api_version is None