from __future__ import annotations

import asyncio
import contextlib
import itertools
import logging
import socket
from functools import partial
from typing import Any
from unittest.mock import AsyncMock, MagicMock, call, create_autospec, patch

import pytest
from google.protobuf import message

from aioesphomeapi._frame_helper.plain_text import APIPlaintextFrameHelper
from aioesphomeapi.api_pb2 import (
    AlarmControlPanelCommandRequest,
    BinarySensorStateResponse,
    BluetoothConnectionsFreeResponse,
    BluetoothDeviceClearCacheResponse,
    BluetoothDeviceConnectionResponse,
    BluetoothDevicePairingResponse,
    BluetoothDeviceRequest,
    BluetoothDeviceUnpairingResponse,
    BluetoothGATTCharacteristic,
    BluetoothGATTDescriptor,
    BluetoothGATTErrorResponse,
    BluetoothGATTGetServicesDoneResponse,
    BluetoothGATTGetServicesResponse,
    BluetoothGATTNotifyDataResponse,
    BluetoothGATTNotifyResponse,
    BluetoothGATTReadResponse,
    BluetoothGATTService,
    BluetoothGATTWriteResponse,
    BluetoothLEAdvertisementResponse,
    BluetoothLERawAdvertisement,
    BluetoothLERawAdvertisementsResponse,
    BluetoothServiceData,
    ButtonCommandRequest,
    CameraImageRequest,
    CameraImageResponse,
    ClimateCommandRequest,
    CoverCommandRequest,
    DateCommandRequest,
    DateTimeCommandRequest,
    DeviceInfoResponse,
    DisconnectResponse,
    ExecuteServiceArgument,
    ExecuteServiceRequest,
    FanCommandRequest,
    HomeassistantServiceResponse,
    HomeAssistantStateResponse,
    LightCommandRequest,
    ListEntitiesBinarySensorResponse,
    ListEntitiesDoneResponse,
    ListEntitiesServicesResponse,
    LockCommandRequest,
    MediaPlayerCommandRequest,
    NumberCommandRequest,
    SelectCommandRequest,
    SirenCommandRequest,
    SubscribeHomeAssistantStateResponse,
    SubscribeLogsResponse,
    SubscribeVoiceAssistantRequest,
    SwitchCommandRequest,
    TextCommandRequest,
    TimeCommandRequest,
    UpdateCommandRequest,
    ValveCommandRequest,
    VoiceAssistantAudio,
    VoiceAssistantAudioSettings,
    VoiceAssistantEventData,
    VoiceAssistantEventResponse,
    VoiceAssistantRequest,
    VoiceAssistantResponse,
    VoiceAssistantTimerEventResponse,
)
from aioesphomeapi.client import APIClient, BluetoothConnectionDroppedError
from aioesphomeapi.connection import APIConnection
from aioesphomeapi.core import (
    APIConnectionError,
    BluetoothGATTAPIError,
    TimeoutAPIError,
    UnhandledAPIConnectionError,
)
from aioesphomeapi.model import (
    AlarmControlPanelCommand,
    APIVersion,
    BinarySensorInfo,
    BinarySensorState,
    BluetoothDeviceRequestType,
)
from aioesphomeapi.model import BluetoothGATTService as BluetoothGATTServiceModel
from aioesphomeapi.model import (
    BluetoothLEAdvertisement,
    BluetoothProxyFeature,
    CameraState,
    ClimateFanMode,
    ClimateMode,
    ClimatePreset,
    ClimateSwingMode,
    ESPHomeBluetoothGATTServices,
    FanDirection,
    FanSpeed,
    HomeassistantServiceCall,
    LegacyCoverCommand,
    LightColorCapability,
    LockCommand,
    MediaPlayerCommand,
    UserService,
    UserServiceArg,
    UserServiceArgType,
)
from aioesphomeapi.model import (
    VoiceAssistantAudioSettings as VoiceAssistantAudioSettingsModel,
)
from aioesphomeapi.model import VoiceAssistantEventType as VoiceAssistantEventModelType
from aioesphomeapi.model import (
    VoiceAssistantTimerEventType as VoiceAssistantTimerEventModelType,
)
from aioesphomeapi.reconnect_logic import ReconnectLogic, ReconnectLogicState

from .common import (
    Estr,
    generate_plaintext_packet,
    get_mock_zeroconf,
    mock_data_received,
)
from .conftest import PatchableAPIConnection


@pytest.fixture
def auth_client():
    client = APIClient(
        address="fake.address",
        port=6052,
        password=None,
    )
    with patch.object(client, "_connection") as conn:
        conn.is_connected = True
        yield client


def patch_response_complex(client: APIClient, messages):
    async def patched(req, app, stop, msg_types, timeout):
        resp = []
        for msg in messages:
            if app(msg):
                resp.append(msg)
            if stop(msg):
                break
        else:
            raise ValueError("Response never stopped")
        return resp

    client._connection.send_messages_await_response_complex = patched


def patch_response_callback(client: APIClient):
    on_message = None

    def patched(req, callback, msg_types):
        nonlocal on_message
        on_message = callback

    client._connection.send_message_callback_response = patched

    async def ret(send):
        on_message(send)

    return ret


def patch_send(client: APIClient):
    send = client._connection.send_message = MagicMock()
    return send


def patch_api_version(client: APIClient, version: APIVersion):
    client._connection.api_version = version


@pytest.mark.asyncio
async def test_expected_name(auth_client: APIClient) -> None:
    """Ensure expected name can be set externally."""
    assert auth_client.expected_name is None
    auth_client.expected_name = "awesome"
    assert auth_client.expected_name == "awesome"


@pytest.mark.asyncio
async def test_connect_backwards_compat() -> None:
    """Verify connect is a thin wrapper around start_connection and finish_connection."""

    class PatchableApiClient(APIClient):
        pass

    cli = PatchableApiClient("host", 1234, None)
    with (
        patch.object(cli, "start_connection") as mock_start_connection,
        patch.object(cli, "finish_connection") as mock_finish_connection,
    ):
        await cli.connect()

    assert mock_start_connection.mock_calls == [call(None)]
    assert mock_finish_connection.mock_calls == [call(False)]


@pytest.mark.asyncio
async def test_finish_connection_wraps_exceptions_as_unhandled_api_error(
    aiohappyeyeballs_start_connection,
) -> None:
    """Verify finish_connect re-wraps exceptions as UnhandledAPIError."""

    cli = APIClient("127.0.0.1", 1234, None)
    with patch("aioesphomeapi.client.APIConnection", PatchableAPIConnection):
        await cli.start_connection()

    with patch.object(
        cli._connection,
        "send_messages_await_response_complex",
        side_effect=Exception("foo"),
    ):
        with pytest.raises(UnhandledAPIConnectionError, match="foo"):
            await cli.finish_connection(False)


@pytest.mark.asyncio
async def test_connection_released_if_connecting_is_cancelled() -> None:
    """Verify connection is unset if connecting is cancelled."""
    cli = APIClient("127.0.0.1", 1234, None)
    asyncio.get_event_loop()

    async def _start_connection_with_delay(*args, **kwargs):
        await asyncio.sleep(1)
        mock_socket = create_autospec(socket.socket, spec_set=True, instance=True)
        mock_socket.getpeername.return_value = ("4.3.3.3", 323)
        return mock_socket

    with patch(
        "aioesphomeapi.connection.aiohappyeyeballs.start_connection",
        _start_connection_with_delay,
    ):
        start_task = asyncio.create_task(cli.start_connection())
        await asyncio.sleep(0)
        assert cli._connection is not None

    start_task.cancel()
    with contextlib.suppress(BaseException):
        await start_task
    assert cli._connection is None

    async def _start_connection_without_delay(*args, **kwargs):
        mock_socket = create_autospec(socket.socket, spec_set=True, instance=True)
        mock_socket.getpeername.return_value = ("4.3.3.3", 323)
        return mock_socket

    with (
        patch("aioesphomeapi.client.APIConnection", PatchableAPIConnection),
        patch(
            "aioesphomeapi.connection.aiohappyeyeballs.start_connection",
            _start_connection_without_delay,
        ),
    ):
        await cli.start_connection()
        await asyncio.sleep(0)

    assert cli._connection is not None
    task = asyncio.create_task(cli.finish_connection(False))
    await asyncio.sleep(0)
    task.cancel()
    with contextlib.suppress(BaseException):
        await task
    assert cli._connection is None


@pytest.mark.asyncio
async def test_request_while_handshaking() -> None:
    """Test trying a request while handshaking raises."""

    class PatchableApiClient(APIClient):
        pass

    cli = PatchableApiClient("host", 1234, None)
    with (
        patch(
            "aioesphomeapi.connection.aiohappyeyeballs.start_connection",
            side_effect=partial(asyncio.sleep, 1),
        ),
        patch.object(cli, "finish_connection"),
    ):
        connect_task = asyncio.create_task(cli.connect())

    await asyncio.sleep(0)
    with pytest.raises(
        APIConnectionError, match="Authenticated connection not ready yet"
    ):
        await cli.device_info()

    connect_task.cancel()
    await asyncio.sleep(0)


@pytest.mark.asyncio
async def test_connect_while_already_connected(auth_client: APIClient) -> None:
    """Test connecting while already connected raises."""
    with pytest.raises(APIConnectionError):
        await auth_client.start_connection()


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "input, output",
    [
        (
            [ListEntitiesBinarySensorResponse(), ListEntitiesDoneResponse()],
            ([BinarySensorInfo()], []),
        ),
        (
            [ListEntitiesServicesResponse(), ListEntitiesDoneResponse()],
            ([], [UserService()]),
        ),
    ],
)
async def test_list_entities(
    auth_client: APIClient, input: dict[str, Any], output: dict[str, Any]
) -> None:
    patch_response_complex(auth_client, input)
    resp = await auth_client.list_entities_services()
    assert resp == output


@pytest.mark.asyncio
async def test_subscribe_states(auth_client: APIClient) -> None:
    send = patch_response_callback(auth_client)
    on_state = MagicMock()
    auth_client.subscribe_states(on_state)
    on_state.assert_not_called()

    await send(BinarySensorStateResponse())
    on_state.assert_called_once_with(BinarySensorState())


@pytest.mark.asyncio
async def test_subscribe_states_camera(auth_client: APIClient) -> None:
    send = patch_response_callback(auth_client)
    on_state = MagicMock()
    auth_client.subscribe_states(on_state)
    await send(CameraImageResponse(key=1, data=b"asdf"))
    on_state.assert_not_called()

    await send(CameraImageResponse(key=1, data=b"qwer", done=True))
    on_state.assert_called_once_with(CameraState(key=1, data=b"asdfqwer"))


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "cmd, req",
    [
        (dict(key=1), dict(key=1)),
        (
            dict(key=1, position=1.0),
            dict(
                key=1, has_legacy_command=True, legacy_command=LegacyCoverCommand.OPEN
            ),
        ),
        (
            dict(key=1, position=0.0),
            dict(
                key=1, has_legacy_command=True, legacy_command=LegacyCoverCommand.CLOSE
            ),
        ),
        (
            dict(key=1, stop=True),
            dict(
                key=1, has_legacy_command=True, legacy_command=LegacyCoverCommand.STOP
            ),
        ),
    ],
)
async def test_cover_command_legacy(
    auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
    send = patch_send(auth_client)
    patch_api_version(auth_client, APIVersion(1, 0))

    auth_client.cover_command(**cmd)
    send.assert_called_once_with(CoverCommandRequest(**req))


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "cmd, req",
    [
        (dict(key=1), dict(key=1)),
        (dict(key=1, position=0.5), dict(key=1, has_position=True, position=0.5)),
        (dict(key=1, position=0.0), dict(key=1, has_position=True, position=0.0)),
        (dict(key=1, stop=True), dict(key=1, stop=True)),
        (
            dict(key=1, position=1.0, tilt=0.8),
            dict(key=1, has_position=True, position=1.0, has_tilt=True, tilt=0.8),
        ),
    ],
)
async def test_cover_command(
    auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
    send = patch_send(auth_client)
    patch_api_version(auth_client, APIVersion(1, 1))

    auth_client.cover_command(**cmd)
    send.assert_called_once_with(CoverCommandRequest(**req))


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "cmd, req",
    [
        (dict(key=1), dict(key=1)),
        (dict(key=1, state=True), dict(key=1, has_state=True, state=True)),
        (
            dict(key=1, speed=FanSpeed.LOW),
            dict(key=1, has_speed=True, speed=FanSpeed.LOW),
        ),
        (
            dict(key=1, speed_level=10),
            dict(key=1, has_speed_level=True, speed_level=10),
        ),
        (
            dict(key=1, oscillating=False),
            dict(key=1, has_oscillating=True, oscillating=False),
        ),
        (
            dict(key=1, direction=FanDirection.REVERSE),
            dict(key=1, has_direction=True, direction=FanDirection.REVERSE),
        ),
        (
            dict(key=1, preset_mode="auto"),
            dict(key=1, has_preset_mode=True, preset_mode="auto"),
        ),
    ],
)
async def test_fan_command(
    auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
    send = patch_send(auth_client)

    auth_client.fan_command(**cmd)
    send.assert_called_once_with(FanCommandRequest(**req))


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "cmd, req",
    [
        (dict(key=1), dict(key=1)),
        (dict(key=1, state=True), dict(key=1, has_state=True, state=True)),
        (dict(key=1, brightness=0.8), dict(key=1, has_brightness=True, brightness=0.8)),
        (
            dict(key=1, rgb=(0.1, 0.5, 1.0)),
            dict(key=1, has_rgb=True, red=0.1, green=0.5, blue=1.0),
        ),
        (dict(key=1, white=0.0), dict(key=1, has_white=True, white=0.0)),
        (
            dict(key=1, color_temperature=0.0),
            dict(key=1, has_color_temperature=True, color_temperature=0.0),
        ),
        (
            dict(key=1, color_brightness=0.0),
            dict(key=1, has_color_brightness=True, color_brightness=0.0),
        ),
        (
            dict(key=1, cold_white=1.0, warm_white=2.0),
            dict(
                key=1,
                has_cold_white=True,
                cold_white=1.0,
                has_warm_white=True,
                warm_white=2.0,
            ),
        ),
        (
            dict(key=1, transition_length=0.1),
            dict(key=1, has_transition_length=True, transition_length=100),
        ),
        (
            dict(key=1, flash_length=0.1),
            dict(key=1, has_flash_length=True, flash_length=100),
        ),
        (dict(key=1, effect="special"), dict(key=1, has_effect=True, effect="special")),
        (
            dict(
                key=1,
                color_mode=LightColorCapability.COLOR_TEMPERATURE,
                color_temperature=153.0,
            ),
            dict(
                key=1,
                has_color_mode=True,
                color_mode=LightColorCapability.COLOR_TEMPERATURE,
                has_color_temperature=True,
                color_temperature=153.0,
            ),
        ),
    ],
)
async def test_light_command(
    auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
    send = patch_send(auth_client)

    auth_client.light_command(**cmd)
    send.assert_called_once_with(LightCommandRequest(**req))


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "cmd, req",
    [
        (dict(key=1, state=False), dict(key=1, state=False)),
        (dict(key=1, state=True), dict(key=1, state=True)),
    ],
)
async def test_switch_command(
    auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
    send = patch_send(auth_client)

    auth_client.switch_command(**cmd)
    send.assert_called_once_with(SwitchCommandRequest(**req))


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "cmd, req",
    [
        (
            dict(key=1, preset=ClimatePreset.HOME),
            dict(key=1, has_legacy_away=True, legacy_away=False),
        ),
        (
            dict(key=1, preset=ClimatePreset.AWAY),
            dict(key=1, has_legacy_away=True, legacy_away=True),
        ),
    ],
)
async def test_climate_command_legacy(
    auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
    send = patch_send(auth_client)
    patch_api_version(auth_client, APIVersion(1, 4))

    auth_client.climate_command(**cmd)
    send.assert_called_once_with(ClimateCommandRequest(**req))


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "cmd, req",
    [
        (
            dict(key=1, mode=ClimateMode.HEAT),
            dict(key=1, has_mode=True, mode=ClimateMode.HEAT),
        ),
        (
            dict(key=1, target_temperature=21.0),
            dict(key=1, has_target_temperature=True, target_temperature=21.0),
        ),
        (
            dict(key=1, target_temperature_low=21.0),
            dict(key=1, has_target_temperature_low=True, target_temperature_low=21.0),
        ),
        (
            dict(key=1, target_temperature_high=21.0),
            dict(key=1, has_target_temperature_high=True, target_temperature_high=21.0),
        ),
        (
            dict(key=1, fan_mode=ClimateFanMode.LOW),
            dict(key=1, has_fan_mode=True, fan_mode=ClimateFanMode.LOW),
        ),
        (
            dict(key=1, swing_mode=ClimateSwingMode.OFF),
            dict(key=1, has_swing_mode=True, swing_mode=ClimateSwingMode.OFF),
        ),
        (
            dict(key=1, custom_fan_mode="asdf"),
            dict(key=1, has_custom_fan_mode=True, custom_fan_mode="asdf"),
        ),
        (
            dict(key=1, preset=ClimatePreset.AWAY),
            dict(key=1, has_preset=True, preset=ClimatePreset.AWAY),
        ),
        (
            dict(key=1, custom_preset="asdf"),
            dict(key=1, has_custom_preset=True, custom_preset="asdf"),
        ),
        (
            dict(key=1, target_humidity=60.0),
            dict(key=1, has_target_humidity=True, target_humidity=60.0),
        ),
    ],
)
async def test_climate_command(
    auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
    send = patch_send(auth_client)
    patch_api_version(auth_client, APIVersion(1, 5))

    auth_client.climate_command(**cmd)
    send.assert_called_once_with(ClimateCommandRequest(**req))


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "cmd, req",
    [
        (dict(key=1, state=0.0), dict(key=1, state=0.0)),
        (dict(key=1, state=100.0), dict(key=1, state=100.0)),
    ],
)
async def test_number_command(
    auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
    send = patch_send(auth_client)

    auth_client.number_command(**cmd)
    send.assert_called_once_with(NumberCommandRequest(**req))


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "cmd, req",
    [
        (
            dict(key=1, year=2024, month=2, day=29),
            dict(key=1, year=2024, month=2, day=29),
        ),
        (
            dict(key=1, year=2000, month=6, day=10),
            dict(key=1, year=2000, month=6, day=10),
        ),
    ],
)
async def test_date_command(
    auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
    send = patch_send(auth_client)

    auth_client.date_command(**cmd)
    send.assert_called_once_with(DateCommandRequest(**req))


# Test time command
@pytest.mark.asyncio
@pytest.mark.parametrize(
    "cmd, req",
    [
        (
            dict(key=1, hour=12, minute=30, second=30),
            dict(key=1, hour=12, minute=30, second=30),
        ),
        (
            dict(key=1, hour=0, minute=0, second=0),
            dict(key=1, hour=0, minute=0, second=0),
        ),
    ],
)
async def test_time_command(
    auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
    send = patch_send(auth_client)

    auth_client.time_command(**cmd)
    send.assert_called_once_with(TimeCommandRequest(**req))


# Test date_time command
@pytest.mark.asyncio
@pytest.mark.parametrize(
    "cmd, req",
    [
        (
            dict(key=1, epoch_seconds=1735648230),
            dict(key=1, epoch_seconds=1735648230),
        ),
        (
            dict(key=1, epoch_seconds=1735689600),
            dict(key=1, epoch_seconds=1735689600),
        ),
    ],
)
async def test_datetime_command(
    auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
    send = patch_send(auth_client)

    auth_client.datetime_command(**cmd)
    send.assert_called_once_with(DateTimeCommandRequest(**req))


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "cmd, req",
    [
        (dict(key=1, command=LockCommand.LOCK), dict(key=1, command=LockCommand.LOCK)),
        (
            dict(key=1, command=LockCommand.UNLOCK),
            dict(key=1, command=LockCommand.UNLOCK),
        ),
        (dict(key=1, command=LockCommand.OPEN), dict(key=1, command=LockCommand.OPEN)),
        (
            dict(key=1, command=LockCommand.OPEN, code="1234"),
            dict(key=1, command=LockCommand.OPEN, code="1234"),
        ),
    ],
)
async def test_lock_command(
    auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
    send = patch_send(auth_client)

    auth_client.lock_command(**cmd)
    send.assert_called_once_with(LockCommandRequest(**req))


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "cmd, req",
    [
        (dict(key=1), dict(key=1)),
        (dict(key=1, position=1.0),),
        (dict(key=1, position=0.0),),
        (dict(key=1, stop=True),),
    ],
)
async def test_valve_command(
    auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
    send = patch_send(auth_client)

    auth_client.valve_command(**cmd)
    send.assert_called_once_with(ValveCommandRequest(**req))


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "cmd, req",
    [
        (dict(key=1), dict(key=1)),
        (dict(key=1, position=0.5), dict(key=1, has_position=True, position=0.5)),
        (dict(key=1, position=0.0), dict(key=1, has_position=True, position=0.0)),
        (dict(key=1, stop=True), dict(key=1, stop=True)),
        (
            dict(key=1, position=1.0),
            dict(key=1, has_position=True, position=1.0),
        ),
    ],
)
async def test_valve_command(
    auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
    send = patch_send(auth_client)
    patch_api_version(auth_client, APIVersion(1, 1))

    auth_client.valve_command(**cmd)
    send.assert_called_once_with(ValveCommandRequest(**req))


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "cmd, req",
    [
        (dict(key=1, state="One"), dict(key=1, state="One")),
        (dict(key=1, state="Two"), dict(key=1, state="Two")),
    ],
)
async def test_select_command(
    auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
    send = patch_send(auth_client)

    auth_client.select_command(**cmd)
    send.assert_called_once_with(SelectCommandRequest(**req))


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "cmd, req",
    [
        (
            dict(key=1, command=MediaPlayerCommand.MUTE),
            dict(key=1, has_command=True, command=MediaPlayerCommand.MUTE),
        ),
        (
            dict(key=1, volume=1.0),
            dict(key=1, has_volume=True, volume=1.0),
        ),
        (
            dict(key=1, media_url="http://example.com"),
            dict(key=1, has_media_url=True, media_url="http://example.com"),
        ),
        (
            dict(key=1, media_url="http://example.com", announcement=True),
            dict(
                key=1,
                has_media_url=True,
                media_url="http://example.com",
                has_announcement=True,
                announcement=True,
            ),
        ),
    ],
)
async def test_media_player_command(
    auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
    send = patch_send(auth_client)

    auth_client.media_player_command(**cmd)
    send.assert_called_once_with(MediaPlayerCommandRequest(**req))


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "cmd, req",
    [
        (dict(key=1), dict(key=1)),
    ],
)
async def test_button_command(
    auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
    send = patch_send(auth_client)

    auth_client.button_command(**cmd)
    send.assert_called_once_with(ButtonCommandRequest(**req))


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "cmd, req",
    [
        (dict(key=1, state=True), dict(key=1, state=True, has_state=True)),
        (dict(key=1, state=False), dict(key=1, state=False, has_state=True)),
        (dict(key=1, state=None), dict(key=1, state=None, has_state=False)),
        (
            dict(key=1, state=True, tone="any"),
            dict(key=1, state=True, has_state=True, has_tone=True, tone="any"),
        ),
        (
            dict(key=1, state=True, tone=None),
            dict(key=1, state=True, has_state=True, has_tone=False, tone=None),
        ),
        (
            dict(key=1, state=True, volume=5),
            dict(key=1, state=True, has_volume=True, volume=5, has_state=True),
        ),
        (
            dict(key=1, state=True, duration=5),
            dict(key=1, state=True, has_duration=True, duration=5, has_state=True),
        ),
    ],
)
async def test_siren_command(
    auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
    send = patch_send(auth_client)

    auth_client.siren_command(**cmd)
    send.assert_called_once_with(SirenCommandRequest(**req))


@pytest.mark.asyncio
async def test_execute_service(auth_client: APIClient) -> None:
    send = patch_send(auth_client)
    patch_api_version(auth_client, APIVersion(1, 3))

    service = UserService(
        name="my_service",
        key=1,
        args=[
            UserServiceArg(name="arg1", type=UserServiceArgType.BOOL),
            UserServiceArg(name="arg2", type=UserServiceArgType.INT),
            UserServiceArg(name="arg3", type=UserServiceArgType.FLOAT),
            UserServiceArg(name="arg4", type=UserServiceArgType.STRING),
            UserServiceArg(name="arg5", type=UserServiceArgType.BOOL_ARRAY),
            UserServiceArg(name="arg6", type=UserServiceArgType.INT_ARRAY),
            UserServiceArg(name="arg7", type=UserServiceArgType.FLOAT_ARRAY),
            UserServiceArg(name="arg8", type=UserServiceArgType.STRING_ARRAY),
        ],
    )

    with pytest.raises(KeyError):
        auth_client.execute_service(service, data={})

    auth_client.execute_service(
        service,
        data={
            "arg1": False,
            "arg2": 42,
            "arg3": 99.0,
            "arg4": "asdf",
            "arg5": [False, True, False],
            "arg6": [42, 10, 9],
            "arg7": [0.0, -100.0],
            "arg8": [],
        },
    )
    send.assert_called_once_with(
        ExecuteServiceRequest(
            key=1,
            args=[
                ExecuteServiceArgument(bool_=False),
                ExecuteServiceArgument(int_=42),
                ExecuteServiceArgument(float_=99.0),
                ExecuteServiceArgument(string_="asdf"),
                ExecuteServiceArgument(bool_array=[False, True, False]),
                ExecuteServiceArgument(int_array=[42, 10, 9]),
                ExecuteServiceArgument(float_array=[0.0, -100.0]),
                ExecuteServiceArgument(string_array=[]),
            ],
        )
    )
    send.reset_mock()

    patch_api_version(auth_client, APIVersion(1, 2))
    service = UserService(
        name="my_service",
        key=2,
        args=[
            UserServiceArg(name="arg1", type=UserServiceArgType.BOOL),
            UserServiceArg(name="arg2", type=UserServiceArgType.INT),
        ],
    )

    # Test legacy_int
    auth_client.execute_service(
        service,
        data={
            "arg1": False,
            "arg2": 42,
        },
    )
    send.assert_called_once_with(
        ExecuteServiceRequest(
            key=2,
            args=[
                ExecuteServiceArgument(bool_=False),
                ExecuteServiceArgument(legacy_int=42),
            ],
        )
    )
    send.reset_mock()

    # Test arg order
    auth_client.execute_service(
        service,
        data={
            "arg2": 42,
            "arg1": False,
        },
    )
    send.assert_called_once_with(
        ExecuteServiceRequest(
            key=2,
            args=[
                ExecuteServiceArgument(bool_=False),
                ExecuteServiceArgument(legacy_int=42),
            ],
        )
    )
    send.reset_mock()


@pytest.mark.asyncio
async def test_request_single_image(auth_client: APIClient) -> None:
    send = patch_send(auth_client)

    auth_client.request_single_image()
    send.assert_called_once_with(CameraImageRequest(single=True, stream=False))


@pytest.mark.asyncio
async def test_request_image_stream(auth_client: APIClient) -> None:
    send = patch_send(auth_client)

    auth_client.request_image_stream()
    send.assert_called_once_with(CameraImageRequest(single=False, stream=True))


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "cmd, req",
    [
        (
            dict(key=1, command=AlarmControlPanelCommand.ARM_AWAY),
            dict(key=1, command=AlarmControlPanelCommand.ARM_AWAY, code=None),
        ),
        (
            dict(key=1, command=AlarmControlPanelCommand.ARM_HOME),
            dict(key=1, command=AlarmControlPanelCommand.ARM_HOME, code=None),
        ),
        (
            dict(key=1, command=AlarmControlPanelCommand.DISARM, code="1234"),
            dict(key=1, command=AlarmControlPanelCommand.DISARM, code="1234"),
        ),
    ],
)
async def test_alarm_panel_command(
    auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
    send = patch_send(auth_client)

    auth_client.alarm_control_panel_command(**cmd)
    send.assert_called_once_with(AlarmControlPanelCommandRequest(**req))


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "cmd, req",
    [
        (dict(key=1, state="hello world"), dict(key=1, state="hello world")),
        (dict(key=1, state="goodbye"), dict(key=1, state="goodbye")),
    ],
)
async def test_text_command(
    auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
    send = patch_send(auth_client)

    auth_client.text_command(**cmd)
    send.assert_called_once_with(TextCommandRequest(**req))


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "cmd, req",
    [
        (dict(key=1, install=True), dict(key=1, install=True)),
        (dict(key=1, install=False), dict(key=1, install=False)),
    ],
)
async def test_update_command(
    auth_client: APIClient, cmd: dict[str, Any], req: dict[str, Any]
) -> None:
    send = patch_send(auth_client)

    auth_client.update_command(**cmd)
    send.assert_called_once_with(UpdateCommandRequest(**req))


@pytest.mark.asyncio
async def test_noise_psk_handles_subclassed_string():
    """Test that the noise_psk gets converted to a string."""

    class PatchableAPIClient(APIClient):
        pass

    cli = PatchableAPIClient(
        address=Estr("127.0.0.1"),
        port=6052,
        password=None,
        noise_psk=Estr("QRTIErOb/fcE9Ukd/5qA3RGYMn0Y+p06U58SCtOXvPc="),
        expected_name=Estr("mydevice"),
    )
    # Make sure its not a subclassed string
    assert type(cli._params.noise_psk) is str
    assert type(cli._params.addresses[0]) is str
    assert type(cli._params.expected_name) is str

    rl = ReconnectLogic(
        client=cli,
        on_disconnect=AsyncMock(),
        on_connect=AsyncMock(),
        zeroconf_instance=get_mock_zeroconf(),
        name="mydevice",
    )
    assert rl._connection_state is ReconnectLogicState.DISCONNECTED

    with patch.object(cli, "start_connection"), patch.object(cli, "finish_connection"):
        await rl.start()
        for _ in range(3):
            await asyncio.sleep(0)

    rl.stop_callback()
    # Wait for cancellation to propagate
    for _ in range(4):
        await asyncio.sleep(0)
    assert rl._connection_state is ReconnectLogicState.DISCONNECTED


@pytest.mark.asyncio
async def test_no_noise_psk():
    """Test not using a noise_psk."""
    cli = APIClient(
        address=Estr("127.0.0.1"),
        port=6052,
        password=None,
        noise_psk=None,
        expected_name=Estr("mydevice"),
    )
    # Make sure its not a subclassed string
    assert cli._params.noise_psk is None
    assert type(cli._params.addresses[0]) is str
    assert type(cli._params.expected_name) is str


@pytest.mark.asyncio
async def test_empty_noise_psk_or_expected_name():
    """Test an empty noise_psk is treated as None."""
    cli = APIClient(
        address=Estr("127.0.0.1"),
        port=6052,
        password=None,
        noise_psk="",
        expected_name="",
    )
    assert cli._params.noise_psk is None
    assert type(cli._params.addresses[0]) is str
    assert cli._params.expected_name is None


@pytest.mark.asyncio
async def test_bluetooth_disconnect(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test bluetooth_device_disconnect."""
    client, connection, transport, protocol = api_client
    disconnect_task = asyncio.create_task(client.bluetooth_device_disconnect(1234))
    await asyncio.sleep(0)
    response: message.Message = BluetoothDeviceConnectionResponse(
        address=1234, connected=False
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    await disconnect_task


@pytest.mark.asyncio
async def test_bluetooth_pair(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test bluetooth_device_pair."""
    client, connection, transport, protocol = api_client
    pair_task = asyncio.create_task(client.bluetooth_device_pair(1234))
    await asyncio.sleep(0)
    response: message.Message = BluetoothDevicePairingResponse(address=4567)
    mock_data_received(protocol, generate_plaintext_packet(response))
    await asyncio.sleep(0)
    assert not pair_task.done()
    response: message.Message = BluetoothDevicePairingResponse(address=1234)
    mock_data_received(protocol, generate_plaintext_packet(response))
    await pair_task


@pytest.mark.asyncio
async def test_bluetooth_pair_connection_drops(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test connection drop during bluetooth_device_pair."""
    client, connection, transport, protocol = api_client
    pair_task = asyncio.create_task(client.bluetooth_device_pair(1234))
    await asyncio.sleep(0)
    response: message.Message = BluetoothDeviceConnectionResponse(
        address=1234, connected=False, error=13
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    message = (
        "Peripheral 00:00:00:00:04:D2 changed connection status while waiting"
        " for BluetoothDevicePairingResponse: Invalid attribute length"
    )
    with pytest.raises(BluetoothConnectionDroppedError, match=message):
        await pair_task


@pytest.mark.asyncio
async def test_bluetooth_unpair_connection_drops(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test connection drop during bluetooth_device_unpair."""
    client, connection, transport, protocol = api_client
    pair_task = asyncio.create_task(client.bluetooth_device_unpair(1234))
    await asyncio.sleep(0)
    response: message.Message = BluetoothDeviceConnectionResponse(
        address=1234, connected=False, error=13
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    message = (
        "Peripheral 00:00:00:00:04:D2 changed connection status while waiting"
        " for BluetoothDeviceUnpairingResponse: Invalid attribute length"
    )
    with pytest.raises(BluetoothConnectionDroppedError, match=message):
        await pair_task


@pytest.mark.asyncio
async def test_bluetooth_clear_cache_connection_drops(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test connection drop during bluetooth_device_clear_cache."""
    client, connection, transport, protocol = api_client
    pair_task = asyncio.create_task(client.bluetooth_device_clear_cache(1234))
    await asyncio.sleep(0)
    response: message.Message = BluetoothDeviceConnectionResponse(
        address=1234, connected=False, error=13
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    message = (
        "Peripheral 00:00:00:00:04:D2 changed connection status while waiting"
        " for BluetoothDeviceClearCacheResponse: Invalid attribute length"
    )
    with pytest.raises(BluetoothConnectionDroppedError, match=message):
        await pair_task


@pytest.mark.asyncio
async def test_bluetooth_unpair(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test bluetooth_device_unpair."""
    client, connection, transport, protocol = api_client
    unpair_task = asyncio.create_task(client.bluetooth_device_unpair(1234))
    await asyncio.sleep(0)
    response: message.Message = BluetoothDeviceUnpairingResponse(address=1234)
    mock_data_received(protocol, generate_plaintext_packet(response))
    await unpair_task


@pytest.mark.asyncio
async def test_bluetooth_clear_cache(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test bluetooth_device_clear_cache."""
    client, connection, transport, protocol = api_client
    clear_task = asyncio.create_task(client.bluetooth_device_clear_cache(1234))
    await asyncio.sleep(0)
    response: message.Message = BluetoothDeviceClearCacheResponse(address=1234)
    mock_data_received(protocol, generate_plaintext_packet(response))
    await clear_task


@pytest.mark.asyncio
async def test_device_info(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test fetching device info."""
    client, connection, transport, protocol = api_client
    assert client.log_name == "fake @ 10.0.0.512"
    device_info_task = asyncio.create_task(client.device_info())
    await asyncio.sleep(0)
    response: message.Message = DeviceInfoResponse(
        name="realname",
        friendly_name="My Device",
        has_deep_sleep=True,
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    device_info = await device_info_task
    assert device_info.name == "realname"
    assert device_info.friendly_name == "My Device"
    assert device_info.has_deep_sleep
    assert client.log_name == "realname @ 10.0.0.512"
    disconnect_task = asyncio.create_task(client.disconnect())
    await asyncio.sleep(0)
    response: message.Message = DisconnectResponse()
    mock_data_received(protocol, generate_plaintext_packet(response))
    await disconnect_task
    with pytest.raises(APIConnectionError, match="Not connected"):
        await client.device_info()


@pytest.mark.asyncio
async def test_bluetooth_gatt_read(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test bluetooth_gatt_read."""
    client, connection, transport, protocol = api_client
    read_task = asyncio.create_task(client.bluetooth_gatt_read(1234, 1234))
    await asyncio.sleep(0)

    other_response: message.Message = BluetoothGATTReadResponse(
        address=1234, handle=4567, data=b"4567"
    )
    mock_data_received(protocol, generate_plaintext_packet(other_response))

    response: message.Message = BluetoothGATTReadResponse(
        address=1234, handle=1234, data=b"1234"
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    assert await read_task == b"1234"


@pytest.mark.asyncio
async def test_bluetooth_gatt_read_connection_drops(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test connection drop during bluetooth_gatt_read."""
    client, connection, transport, protocol = api_client
    read_task = asyncio.create_task(client.bluetooth_gatt_read(1234, 1234))
    await asyncio.sleep(0)
    response: message.Message = BluetoothDeviceConnectionResponse(
        address=1234, connected=False, error=13
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    message = (
        "Peripheral 00:00:00:00:04:D2 changed connection status while waiting"
        " for BluetoothGATTReadResponse, BluetoothGATTErrorResponse: Invalid attribute length"
    )
    with pytest.raises(BluetoothConnectionDroppedError, match=message):
        await read_task


@pytest.mark.asyncio
async def test_bluetooth_gatt_read_error(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test bluetooth_gatt_read that errors."""
    client, connection, transport, protocol = api_client
    read_task = asyncio.create_task(client.bluetooth_gatt_read(1234, 1234))
    await asyncio.sleep(0)
    error_response: message.Message = BluetoothGATTErrorResponse(
        address=1234, handle=1234
    )
    mock_data_received(protocol, generate_plaintext_packet(error_response))
    with pytest.raises(BluetoothGATTAPIError):
        await read_task


@pytest.mark.asyncio
async def test_bluetooth_gatt_read_descriptor(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test bluetooth_gatt_read_descriptor."""
    client, connection, transport, protocol = api_client
    read_task = asyncio.create_task(client.bluetooth_gatt_read_descriptor(1234, 1234))
    await asyncio.sleep(0)

    other_response: message.Message = BluetoothGATTReadResponse(
        address=1234, handle=4567, data=b"4567"
    )
    mock_data_received(protocol, generate_plaintext_packet(other_response))

    response: message.Message = BluetoothGATTReadResponse(
        address=1234, handle=1234, data=b"1234"
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    assert await read_task == b"1234"


@pytest.mark.asyncio
async def test_bluetooth_gatt_write(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test bluetooth_gatt_write."""
    client, connection, transport, protocol = api_client
    write_task = asyncio.create_task(
        client.bluetooth_gatt_write(1234, 1234, b"1234", True)
    )
    await asyncio.sleep(0)

    other_response: message.Message = BluetoothGATTWriteResponse(
        address=1234, handle=4567
    )
    mock_data_received(protocol, generate_plaintext_packet(other_response))

    response: message.Message = BluetoothGATTWriteResponse(address=1234, handle=1234)
    mock_data_received(protocol, generate_plaintext_packet(response))
    await write_task


@pytest.mark.asyncio
async def test_bluetooth_gatt_write_connection_drops(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test connection drop during bluetooth_gatt_read."""
    client, connection, transport, protocol = api_client
    write_task = asyncio.create_task(
        client.bluetooth_gatt_write(1234, 1234, b"1234", True)
    )
    await asyncio.sleep(0)
    response: message.Message = BluetoothDeviceConnectionResponse(
        address=1234, connected=False, error=13
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    message = (
        "Peripheral 00:00:00:00:04:D2 changed connection status while waiting"
        " for BluetoothGATTWriteResponse, BluetoothGATTErrorResponse: Invalid attribute length"
    )
    with pytest.raises(BluetoothConnectionDroppedError, match=message):
        await write_task


@pytest.mark.asyncio
async def test_bluetooth_gatt_write_without_response(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test bluetooth_gatt_write without response."""
    client, connection, transport, protocol = api_client
    transport.reset_mock()
    write_task = asyncio.create_task(
        client.bluetooth_gatt_write(1234, 1234, b"1234", False)
    )
    await asyncio.sleep(0)
    await write_task
    assert transport.mock_calls[0][1][0] == b'\x00\x0cK\x08\xd2\t\x10\xd2\t"\x041234'

    with pytest.raises(TimeoutAPIError, match="BluetoothGATTWriteResponse"):
        await client.bluetooth_gatt_write(1234, 1234, b"1234", True, timeout=0)


@pytest.mark.asyncio
async def test_bluetooth_gatt_write_descriptor(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test bluetooth_gatt_write_descriptor."""
    client, connection, transport, protocol = api_client
    write_task = asyncio.create_task(
        client.bluetooth_gatt_write_descriptor(1234, 1234, b"1234", True)
    )
    await asyncio.sleep(0)

    other_response: message.Message = BluetoothGATTWriteResponse(
        address=1234, handle=4567
    )
    mock_data_received(protocol, generate_plaintext_packet(other_response))

    response: message.Message = BluetoothGATTWriteResponse(address=1234, handle=1234)
    mock_data_received(protocol, generate_plaintext_packet(response))
    await write_task


@pytest.mark.asyncio
async def test_bluetooth_gatt_write_descriptor_without_response(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test bluetooth_gatt_write_descriptor without response."""
    client, connection, transport, protocol = api_client
    transport.reset_mock()
    write_task = asyncio.create_task(
        client.bluetooth_gatt_write_descriptor(
            1234, 1234, b"1234", wait_for_response=False
        )
    )
    await asyncio.sleep(0)
    await write_task
    assert transport.mock_calls[0][1][0] == b"\x00\x0cM\x08\xd2\t\x10\xd2\t\x1a\x041234"

    with pytest.raises(TimeoutAPIError, match="BluetoothGATTWriteResponse"):
        await client.bluetooth_gatt_write_descriptor(1234, 1234, b"1234", timeout=0)


@pytest.mark.asyncio
async def test_bluetooth_gatt_get_services_connection_drops(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test connection drop during bluetooth_gatt_get_services."""
    client, connection, transport, protocol = api_client
    services_task = asyncio.create_task(client.bluetooth_gatt_get_services(1234))
    await asyncio.sleep(0)
    response: message.Message = BluetoothDeviceConnectionResponse(
        address=1234, connected=False, error=13
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    message = (
        "Peripheral 00:00:00:00:04:D2 changed connection status while waiting"
        " for BluetoothGATTGetServicesResponse, BluetoothGATTGetServicesDoneResponse, "
        "BluetoothGATTErrorResponse: Invalid attribute length"
    )
    with pytest.raises(BluetoothConnectionDroppedError, match=message):
        await services_task


@pytest.mark.asyncio
async def test_bluetooth_gatt_get_services(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test bluetooth_gatt_get_services success case."""
    client, connection, transport, protocol = api_client
    services_task = asyncio.create_task(client.bluetooth_gatt_get_services(1234))
    await asyncio.sleep(0)
    service1: message.Message = BluetoothGATTService(
        uuid=[1, 1],
        handle=1,
        characteristics=[
            BluetoothGATTCharacteristic(
                uuid=[1, 2],
                handle=2,
                properties=1,
                descriptors=[BluetoothGATTDescriptor(uuid=[1, 3], handle=3)],
            )
        ],
    )
    response: message.Message = BluetoothGATTGetServicesResponse(
        address=1234, services=[service1]
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    done_response: message.Message = BluetoothGATTGetServicesDoneResponse(address=1234)
    mock_data_received(protocol, generate_plaintext_packet(done_response))

    services = await services_task
    service = BluetoothGATTServiceModel.from_pb(service1)
    assert services == ESPHomeBluetoothGATTServices(
        address=1234,
        services=[service],
    )


@pytest.mark.asyncio
async def test_bluetooth_gatt_get_services_errors(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test bluetooth_gatt_get_services with a failure."""
    client, connection, transport, protocol = api_client
    services_task = asyncio.create_task(client.bluetooth_gatt_get_services(1234))
    await asyncio.sleep(0)
    service1: message.Message = BluetoothGATTService(
        uuid=[1, 1], handle=1, characteristics=[]
    )
    response: message.Message = BluetoothGATTGetServicesResponse(
        address=1234, services=[service1]
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    done_response: message.Message = BluetoothGATTErrorResponse(address=1234)
    mock_data_received(protocol, generate_plaintext_packet(done_response))

    with pytest.raises(BluetoothGATTAPIError):
        await services_task


@pytest.mark.asyncio
async def test_bluetooth_gatt_start_notify_connection_drops(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test connection drop during bluetooth_gatt_start_notify."""
    client, connection, transport, protocol = api_client
    notify_task = asyncio.create_task(
        client.bluetooth_gatt_start_notify(1234, 1, lambda handle, data: None)
    )
    await asyncio.sleep(0)
    response: message.Message = BluetoothDeviceConnectionResponse(
        address=1234, connected=False, error=13
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    message = (
        "Peripheral 00:00:00:00:04:D2 changed connection status while waiting"
        " for BluetoothGATTNotifyResponse, BluetoothGATTErrorResponse: Invalid attribute length"
    )
    with pytest.raises(BluetoothConnectionDroppedError, match=message):
        await notify_task


@pytest.mark.asyncio
async def test_bluetooth_gatt_start_notify(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test bluetooth_gatt_start_notify."""
    client, connection, transport, protocol = api_client
    notifies = []

    handlers_before = len(list(itertools.chain(*connection._message_handlers.values())))

    def on_bluetooth_gatt_notify(handle: int, data: bytearray) -> None:
        notifies.append((handle, data))

    notify_task = asyncio.create_task(
        client.bluetooth_gatt_start_notify(1234, 1, on_bluetooth_gatt_notify)
    )
    await asyncio.sleep(0)
    notify_response: message.Message = BluetoothGATTNotifyResponse(
        address=1234, handle=1
    )
    data_response: message.Message = BluetoothGATTNotifyDataResponse(
        address=1234, handle=1, data=b"gotit"
    )
    mock_data_received(
        protocol,
        generate_plaintext_packet(notify_response)
        + generate_plaintext_packet(data_response),
    )

    cancel_cb, abort_cb = await notify_task
    assert notifies == [(1, b"gotit")]

    second_data_response: message.Message = BluetoothGATTNotifyDataResponse(
        address=1234, handle=1, data=b"after finished"
    )
    mock_data_received(protocol, generate_plaintext_packet(second_data_response))
    assert notifies == [(1, b"gotit"), (1, b"after finished")]
    await cancel_cb()

    assert (
        len(list(itertools.chain(*connection._message_handlers.values())))
        == handlers_before
    )
    # Ensure abort callback is a no-op after cancel
    # and doesn't raise
    abort_cb()
    await client.disconnect(force=True)
    # Ensure abort callback is a no-op after disconnect
    # and does not raise
    await cancel_cb()


@pytest.mark.asyncio
async def test_bluetooth_gatt_start_notify_fails(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test bluetooth_gatt_start_notify failure does not leak."""
    client, connection, transport, protocol = api_client
    notifies = []

    def on_bluetooth_gatt_notify(handle: int, data: bytearray) -> None:
        notifies.append((handle, data))

    handlers_before = len(list(itertools.chain(*connection._message_handlers.values())))

    with (
        patch.object(
            connection,
            "send_messages_await_response_complex",
            side_effect=APIConnectionError,
        ),
        pytest.raises(APIConnectionError),
    ):
        await client.bluetooth_gatt_start_notify(1234, 1, on_bluetooth_gatt_notify)

    assert (
        len(list(itertools.chain(*connection._message_handlers.values())))
        == handlers_before
    )


@pytest.mark.asyncio
async def test_subscribe_bluetooth_le_advertisements(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test subscribe_bluetooth_le_advertisements."""
    client, connection, transport, protocol = api_client
    advs = []

    def on_bluetooth_le_advertisements(adv: BluetoothLEAdvertisement) -> None:
        advs.append(adv)

    unsub = client.subscribe_bluetooth_le_advertisements(on_bluetooth_le_advertisements)
    await asyncio.sleep(0)
    response: message.Message = BluetoothLEAdvertisementResponse(
        address=1234,
        name=b"mydevice",
        rssi=-50,
        service_uuids=["1234"],
        service_data=[
            BluetoothServiceData(
                uuid="1234",
                data=b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
            )
        ],
        manufacturer_data=[
            BluetoothServiceData(
                uuid="1234",
                data=b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
            )
        ],
        address_type=1,
    )
    mock_data_received(protocol, generate_plaintext_packet(response))

    assert advs == [
        BluetoothLEAdvertisement(
            address=1234,
            name="mydevice",
            rssi=-50,
            service_uuids=["000034-0000-1000-8000-00805f9b34fb"],
            manufacturer_data={
                4660: b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
            },
            service_data={
                "000034-0000-1000-8000-00805f9b34fb": b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
            },
            address_type=1,
        )
    ]
    advs.clear()
    response: message.Message = BluetoothLEAdvertisementResponse(
        address=1234,
        name=b"mydevice",
        rssi=-50,
        service_uuids=[],
        service_data=[],
        manufacturer_data=[],
        address_type=1,
    )
    mock_data_received(protocol, generate_plaintext_packet(response))

    assert advs == [
        BluetoothLEAdvertisement(
            address=1234,
            name="mydevice",
            rssi=-50,
            service_uuids=[],
            manufacturer_data={},
            service_data={},
            address_type=1,
        )
    ]
    advs.clear()
    response: message.Message = BluetoothLEAdvertisementResponse(
        address=1234,
        name=b"mydevice",
        rssi=-50,
        service_uuids=["1234"],
        service_data=[
            BluetoothServiceData(
                uuid="1234",
                legacy_data=b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
            )
        ],
        manufacturer_data=[
            BluetoothServiceData(
                uuid="1234",
                legacy_data=b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
            )
        ],
        address_type=1,
    )
    mock_data_received(protocol, generate_plaintext_packet(response))

    assert advs == [
        BluetoothLEAdvertisement(
            address=1234,
            name="mydevice",
            rssi=-50,
            service_uuids=["000034-0000-1000-8000-00805f9b34fb"],
            manufacturer_data={
                4660: b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
            },
            service_data={
                "000034-0000-1000-8000-00805f9b34fb": b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
            },
            address_type=1,
        )
    ]
    unsub()


@pytest.mark.asyncio
async def test_subscribe_bluetooth_le_raw_advertisements(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test subscribe_bluetooth_le_raw_advertisements."""
    client, connection, transport, protocol = api_client
    adv_groups = []

    def on_raw_bluetooth_le_advertisements(
        advs: BluetoothLERawAdvertisementsResponse,
    ) -> None:
        adv_groups.append(advs.advertisements)

    unsub = client.subscribe_bluetooth_le_raw_advertisements(
        on_raw_bluetooth_le_advertisements
    )
    await asyncio.sleep(0)

    response: message.Message = BluetoothLERawAdvertisementsResponse(
        advertisements=[
            BluetoothLERawAdvertisement(
                address=1234,
                rssi=-50,
                address_type=1,
                data=b"1234",
            )
        ]
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    assert len(adv_groups) == 1
    first_adv = adv_groups[0][0]
    assert first_adv.address == 1234
    assert first_adv.rssi == -50
    assert first_adv.address_type == 1
    assert first_adv.data == b"1234"
    unsub()


@pytest.mark.asyncio
async def test_subscribe_bluetooth_connections_free(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test subscribe_bluetooth_connections_free."""
    client, connection, transport, protocol = api_client
    connections = []

    def on_bluetooth_connections_free(free: int, limit: int) -> None:
        connections.append((free, limit))

    unsub = client.subscribe_bluetooth_connections_free(on_bluetooth_connections_free)
    await asyncio.sleep(0)
    response: message.Message = BluetoothConnectionsFreeResponse(free=2, limit=3)
    mock_data_received(protocol, generate_plaintext_packet(response))

    assert connections == [(2, 3)]
    unsub()


@pytest.mark.asyncio
async def test_subscribe_home_assistant_states(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test subscribe_home_assistant_states."""
    client, connection, transport, protocol = api_client
    states = []

    def on_subscribe_home_assistant_states(
        entity_id: str, attribute: str | None
    ) -> None:
        states.append((entity_id, attribute))

    client.subscribe_home_assistant_states(on_subscribe_home_assistant_states)
    await asyncio.sleep(0)

    response: message.Message = SubscribeHomeAssistantStateResponse(
        entity_id="sensor.red", attribute="any"
    )
    mock_data_received(protocol, generate_plaintext_packet(response))

    assert states == [("sensor.red", "any")]


@pytest.mark.asyncio
async def test_subscribe_logs(auth_client: APIClient) -> None:
    send = patch_response_callback(auth_client)
    on_logs = MagicMock()
    auth_client.subscribe_logs(on_logs)
    log_msg = SubscribeLogsResponse(level=1, message=b"asdf")
    await send(log_msg)
    on_logs.assert_called_with(log_msg)


@pytest.mark.asyncio
async def test_send_home_assistant_state(auth_client: APIClient) -> None:
    send = patch_send(auth_client)
    auth_client.send_home_assistant_state("binary_sensor.bla", None, "on")
    send.assert_called_once_with(
        HomeAssistantStateResponse(
            entity_id="binary_sensor.bla", state="on", attribute=None
        )
    )


@pytest.mark.asyncio
async def test_subscribe_service_calls(auth_client: APIClient) -> None:
    send = patch_response_callback(auth_client)
    on_service_call = MagicMock()
    auth_client.subscribe_service_calls(on_service_call)
    service_msg = HomeassistantServiceResponse(service="bob")
    await send(service_msg)
    on_service_call.assert_called_with(HomeassistantServiceCall.from_pb(service_msg))


@pytest.mark.asyncio
async def test_set_debug(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
    caplog: pytest.LogCaptureFixture,
) -> None:
    """Test set_debug."""
    client, connection, transport, protocol = api_client
    response: message.Message = DeviceInfoResponse(
        name="realname",
        friendly_name="My Device",
        has_deep_sleep=True,
    )

    caplog.set_level(logging.DEBUG)

    client.set_debug(True)
    assert client.log_name == "fake @ 10.0.0.512"
    device_info_task = asyncio.create_task(client.device_info())
    await asyncio.sleep(0)
    mock_data_received(protocol, generate_plaintext_packet(response))
    await device_info_task

    assert "My Device" in caplog.text
    caplog.clear()
    client.set_debug(False)
    device_info_task = asyncio.create_task(client.device_info())
    await asyncio.sleep(0)
    mock_data_received(protocol, generate_plaintext_packet(response))
    await device_info_task
    assert "My Device" not in caplog.text


@pytest.mark.asyncio
async def test_force_disconnect(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test force disconnect can be called multiple times."""
    client, connection, transport, protocol = api_client
    assert connection.is_connected is True
    assert connection.on_stop is not None
    await client.disconnect(force=True)
    assert client._connection is None
    assert connection.is_connected is False
    await client.disconnect(force=False)
    assert connection.is_connected is False


@pytest.mark.asyncio
@pytest.mark.parametrize(
    ("has_cache", "feature_flags", "method"),
    [
        (False, BluetoothProxyFeature(0), BluetoothDeviceRequestType.CONNECT),
        (
            False,
            BluetoothProxyFeature.REMOTE_CACHING,
            BluetoothDeviceRequestType.CONNECT_V3_WITHOUT_CACHE,
        ),
        (
            True,
            BluetoothProxyFeature.REMOTE_CACHING,
            BluetoothDeviceRequestType.CONNECT_V3_WITH_CACHE,
        ),
    ],
)
async def test_bluetooth_device_connect(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
    has_cache: bool,
    feature_flags: BluetoothProxyFeature,
    method: BluetoothDeviceRequestType,
) -> None:
    """Test bluetooth_device_connect."""
    client, connection, transport, protocol = api_client
    states = []

    def on_bluetooth_connection_state(connected: bool, mtu: int, error: int) -> None:
        states.append((connected, mtu, error))

    connect_task = asyncio.create_task(
        client.bluetooth_device_connect(
            1234,
            on_bluetooth_connection_state,
            timeout=1,
            feature_flags=feature_flags,
            has_cache=has_cache,
            disconnect_timeout=1,
            address_type=1,
        )
    )
    await asyncio.sleep(0)
    response: message.Message = BluetoothDeviceConnectionResponse(
        address=1234, connected=True, mtu=23, error=0
    )
    mock_data_received(protocol, generate_plaintext_packet(response))

    cancel = await connect_task
    assert states == [(True, 23, 0)]
    transport.write.assert_called_once_with(
        generate_plaintext_packet(
            BluetoothDeviceRequest(
                address=1234,
                request_type=method,
                has_address_type=True,
                address_type=1,
            ),
        )
    )
    response: message.Message = BluetoothDeviceConnectionResponse(
        address=1234, connected=False, mtu=23, error=7
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    await asyncio.sleep(0)
    assert states == [(True, 23, 0), (False, 23, 7)]
    cancel()

    # After cancel, no more messages should called back
    response: message.Message = BluetoothDeviceConnectionResponse(
        address=1234, connected=False, mtu=23, error=8
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    await asyncio.sleep(0)
    assert states == [(True, 23, 0), (False, 23, 7)]

    # Make sure cancel is safe to call again
    cancel()

    await client.disconnect(force=True)
    await asyncio.sleep(0)
    assert not client._connection
    # Make sure cancel is safe to call after disconnect
    cancel()


@pytest.mark.asyncio
async def test_bluetooth_device_connect_and_disconnect_times_out(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test bluetooth_device_connect and disconnect times out."""
    client, connection, transport, protocol = api_client
    states = []

    def on_bluetooth_connection_state(connected: bool, mtu: int, error: int) -> None:
        states.append((connected, mtu, error))

    connect_task = asyncio.create_task(
        client.bluetooth_device_connect(
            1234,
            on_bluetooth_connection_state,
            timeout=0,
            feature_flags=0,
            has_cache=True,
            disconnect_timeout=0,
            address_type=1,
        )
    )
    with pytest.raises(TimeoutAPIError):
        await connect_task
    assert states == []


@pytest.mark.asyncio
async def test_bluetooth_device_connect_times_out_disconnect_ok(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test bluetooth_device_connect and disconnect times out."""
    client, connection, transport, protocol = api_client
    states = []

    def on_bluetooth_connection_state(connected: bool, mtu: int, error: int) -> None:
        states.append((connected, mtu, error))

    connect_task = asyncio.create_task(
        client.bluetooth_device_connect(
            1234,
            on_bluetooth_connection_state,
            timeout=0,
            feature_flags=0,
            has_cache=True,
            disconnect_timeout=1,
            address_type=1,
        )
    )
    await asyncio.sleep(0)
    # The connect request should be written
    assert len(transport.write.mock_calls) == 1
    await asyncio.sleep(0)
    await asyncio.sleep(0)
    await asyncio.sleep(0)
    # Now that we timed out, the disconnect
    # request should be written
    assert len(transport.write.mock_calls) == 2
    response: message.Message = BluetoothDeviceConnectionResponse(
        address=1234, connected=False, mtu=23, error=8
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    with pytest.raises(TimeoutAPIError):
        await connect_task
    assert states == []


@pytest.mark.asyncio
async def test_bluetooth_device_connect_cancelled(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test bluetooth_device_connect handles cancellation."""
    client, connection, transport, protocol = api_client
    states = []

    handlers_before = len(list(itertools.chain(*connection._message_handlers.values())))

    def on_bluetooth_connection_state(connected: bool, mtu: int, error: int) -> None:
        states.append((connected, mtu, error))

    connect_task = asyncio.create_task(
        client.bluetooth_device_connect(
            1234,
            on_bluetooth_connection_state,
            timeout=10,
            feature_flags=0,
            has_cache=True,
            disconnect_timeout=10,
            address_type=1,
        )
    )
    await asyncio.sleep(0)
    # The connect request should be written
    assert len(transport.write.mock_calls) == 1
    connect_task.cancel()
    with pytest.raises(asyncio.CancelledError):
        await connect_task
    assert states == []

    handlers_after = len(list(itertools.chain(*connection._message_handlers.values())))
    # Make sure we do not leak message handlers
    assert handlers_after == handlers_before


@pytest.mark.asyncio
async def test_send_voice_assistant_event(auth_client: APIClient) -> None:
    send = patch_send(auth_client)

    auth_client.send_voice_assistant_event(
        VoiceAssistantEventModelType.VOICE_ASSISTANT_ERROR,
        {"error": "error", "ok": "ok"},
    )
    send.assert_called_once_with(
        VoiceAssistantEventResponse(
            event_type=VoiceAssistantEventModelType.VOICE_ASSISTANT_ERROR.value,
            data=[
                VoiceAssistantEventData(name="error", value="error"),
                VoiceAssistantEventData(name="ok", value="ok"),
            ],
        )
    )

    send.reset_mock()
    auth_client.send_voice_assistant_event(
        VoiceAssistantEventModelType.VOICE_ASSISTANT_ERROR, None
    )
    send.assert_called_once_with(
        VoiceAssistantEventResponse(
            event_type=VoiceAssistantEventModelType.VOICE_ASSISTANT_ERROR.value,
            data=[],
        )
    )


@pytest.mark.asyncio
async def test_subscribe_voice_assistant(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test subscribe_voice_assistant."""
    client, connection, transport, protocol = api_client
    send = patch_send(client)
    starts = []
    stops = []

    async def handle_start(
        conversation_id: str,
        flags: int,
        audio_settings: VoiceAssistantAudioSettings,
        wake_word_phrase: str | None,
    ) -> int | None:
        starts.append((conversation_id, flags, audio_settings, wake_word_phrase))
        return 42

    async def handle_stop() -> None:
        stops.append(True)

    unsub = client.subscribe_voice_assistant(
        handle_start=handle_start, handle_stop=handle_stop
    )
    send.assert_called_once_with(SubscribeVoiceAssistantRequest(subscribe=True))
    send.reset_mock()
    audio_settings = VoiceAssistantAudioSettings(
        noise_suppression_level=42,
        auto_gain=42,
        volume_multiplier=42,
    )
    response: message.Message = VoiceAssistantRequest(
        conversation_id="theone",
        start=True,
        flags=42,
        audio_settings=audio_settings,
        wake_word_phrase="okay nabu",
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    await asyncio.sleep(0)
    await asyncio.sleep(0)
    assert starts == [
        (
            "theone",
            42,
            VoiceAssistantAudioSettingsModel(
                noise_suppression_level=42,
                auto_gain=42,
                volume_multiplier=42,
            ),
            "okay nabu",
        )
    ]
    assert stops == []
    send.assert_called_once_with(VoiceAssistantResponse(port=42))
    send.reset_mock()
    response: message.Message = VoiceAssistantRequest(
        conversation_id="theone",
        start=False,
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    await asyncio.sleep(0)
    assert stops == [True]
    send.reset_mock()
    unsub()
    send.assert_called_once_with(SubscribeVoiceAssistantRequest(subscribe=False))
    send.reset_mock()
    await client.disconnect(force=True)
    # Ensure abort callback is a no-op after disconnect
    # and does not raise
    unsub()
    assert len(send.mock_calls) == 0


@pytest.mark.asyncio
async def test_subscribe_voice_assistant_failure(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test subscribe_voice_assistant failure."""
    client, connection, transport, protocol = api_client
    send = patch_send(client)
    starts = []
    stops = []

    async def handle_start(
        conversation_id: str,
        flags: int,
        audio_settings: VoiceAssistantAudioSettings,
        wake_word_phrase: str | None,
    ) -> int | None:
        starts.append((conversation_id, flags, audio_settings, wake_word_phrase))
        # Return None to indicate failure
        return None

    async def handle_stop() -> None:
        stops.append(True)

    unsub = client.subscribe_voice_assistant(
        handle_start=handle_start, handle_stop=handle_stop
    )
    send.assert_called_once_with(SubscribeVoiceAssistantRequest(subscribe=True))
    send.reset_mock()
    audio_settings = VoiceAssistantAudioSettings(
        noise_suppression_level=42,
        auto_gain=42,
        volume_multiplier=42,
    )
    response: message.Message = VoiceAssistantRequest(
        conversation_id="theone",
        start=True,
        flags=42,
        audio_settings=audio_settings,
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    await asyncio.sleep(0)
    await asyncio.sleep(0)
    assert starts == [
        (
            "theone",
            42,
            VoiceAssistantAudioSettingsModel(
                noise_suppression_level=42,
                auto_gain=42,
                volume_multiplier=42,
            ),
            None,
        )
    ]
    assert stops == []
    send.assert_called_once_with(VoiceAssistantResponse(error=True))
    send.reset_mock()
    response: message.Message = VoiceAssistantRequest(
        conversation_id="theone",
        start=False,
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    await asyncio.sleep(0)
    assert stops == [True]
    send.reset_mock()
    unsub()
    send.assert_called_once_with(SubscribeVoiceAssistantRequest(subscribe=False))
    send.reset_mock()
    await client.disconnect(force=True)
    # Ensure abort callback is a no-op after disconnect
    # and does not raise
    unsub()
    assert len(send.mock_calls) == 0


@pytest.mark.asyncio
async def test_subscribe_voice_assistant_cancels_long_running_handle_start(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test subscribe_voice_assistant cancels long running tasks on unsub."""
    client, connection, transport, protocol = api_client
    send = patch_send(client)
    starts = []
    stops = []

    async def handle_start(
        conversation_id: str,
        flags: int,
        audio_settings: VoiceAssistantAudioSettings,
        wake_word_phrase: str | None,
    ) -> int | None:
        starts.append((conversation_id, flags, audio_settings, wake_word_phrase))
        await asyncio.sleep(10)
        # Return None to indicate failure
        starts.append("never")
        return None

    async def handle_stop() -> None:
        stops.append(True)

    unsub = client.subscribe_voice_assistant(
        handle_start=handle_start, handle_stop=handle_stop
    )
    send.assert_called_once_with(SubscribeVoiceAssistantRequest(subscribe=True))
    send.reset_mock()
    audio_settings = VoiceAssistantAudioSettings(
        noise_suppression_level=42,
        auto_gain=42,
        volume_multiplier=42,
    )
    response: message.Message = VoiceAssistantRequest(
        conversation_id="theone",
        start=True,
        flags=42,
        audio_settings=audio_settings,
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    await asyncio.sleep(0)
    await asyncio.sleep(0)
    unsub()
    await asyncio.sleep(0)
    assert not stops
    assert starts == [
        (
            "theone",
            42,
            VoiceAssistantAudioSettingsModel(
                noise_suppression_level=42,
                auto_gain=42,
                volume_multiplier=42,
            ),
            None,
        )
    ]


@pytest.mark.asyncio
async def test_subscribe_voice_assistant_api_audio(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test subscribe_voice_assistant."""
    client, connection, transport, protocol = api_client
    send = patch_send(client)
    starts = []
    stops = []
    data_received = 0

    async def handle_start(
        conversation_id: str,
        flags: int,
        audio_settings: VoiceAssistantAudioSettings,
        wake_word_phrase: str | None,
    ) -> int | None:
        starts.append((conversation_id, flags, audio_settings, wake_word_phrase))
        return 0

    async def handle_stop() -> None:
        stops.append(True)

    async def handle_audio(data: bytes) -> None:
        nonlocal data_received
        data_received += len(data)

    unsub = client.subscribe_voice_assistant(
        handle_start=handle_start, handle_stop=handle_stop, handle_audio=handle_audio
    )
    send.assert_called_once_with(
        SubscribeVoiceAssistantRequest(subscribe=True, flags=4)
    )
    send.reset_mock()
    audio_settings = VoiceAssistantAudioSettings(
        noise_suppression_level=42,
        auto_gain=42,
        volume_multiplier=42,
    )
    response: message.Message = VoiceAssistantRequest(
        conversation_id="theone",
        start=True,
        flags=42,
        audio_settings=audio_settings,
        wake_word_phrase="okay nabu",
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    await asyncio.sleep(0)
    await asyncio.sleep(0)
    assert starts == [
        (
            "theone",
            42,
            VoiceAssistantAudioSettingsModel(
                noise_suppression_level=42,
                auto_gain=42,
                volume_multiplier=42,
            ),
            "okay nabu",
        )
    ]
    assert stops == []
    send.assert_called_once_with(VoiceAssistantResponse(port=0))
    send.reset_mock()

    response: message.Message = VoiceAssistantAudio(
        data=bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    await asyncio.sleep(0)
    assert data_received == 10

    response: message.Message = VoiceAssistantAudio(
        end=True,
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    await asyncio.sleep(0)
    assert stops == [True]

    send.reset_mock()
    client.send_voice_assistant_audio(bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))
    send.assert_called_once_with(
        VoiceAssistantAudio(data=bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))
    )

    response: message.Message = VoiceAssistantRequest(
        conversation_id="theone",
        start=False,
    )
    mock_data_received(protocol, generate_plaintext_packet(response))
    await asyncio.sleep(0)
    assert stops == [True, True]
    send.reset_mock()
    unsub()
    send.assert_called_once_with(SubscribeVoiceAssistantRequest(subscribe=False))
    send.reset_mock()
    await client.disconnect(force=True)
    # Ensure abort callback is a no-op after disconnect
    # and does not raise
    unsub()
    assert len(send.mock_calls) == 0


@pytest.mark.asyncio
async def test_send_voice_assistant_timer_event(auth_client: APIClient) -> None:
    send = patch_send(auth_client)

    auth_client.send_voice_assistant_timer_event(
        VoiceAssistantTimerEventModelType.VOICE_ASSISTANT_TIMER_STARTED,
        timer_id="test",
        name="test",
        total_seconds=99,
        seconds_left=45,
        is_active=True,
    )

    send.assert_called_once_with(
        VoiceAssistantTimerEventResponse(
            event_type=VoiceAssistantTimerEventModelType.VOICE_ASSISTANT_TIMER_STARTED,
            timer_id="test",
            name="test",
            total_seconds=99,
            seconds_left=45,
            is_active=True,
        )
    )


@pytest.mark.asyncio
async def test_api_version_after_connection_closed(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test api version is None after connection close."""
    client, connection, transport, protocol = api_client
    assert client.api_version == APIVersion(1, 9)
    await client.disconnect(force=True)
    assert client.api_version is None


@pytest.mark.asyncio
async def test_calls_after_connection_closed(
    api_client: tuple[
        APIClient, APIConnection, asyncio.Transport, APIPlaintextFrameHelper
    ],
) -> None:
    """Test calls after connection close should raise APIConnectionError."""
    client, connection, transport, protocol = api_client
    assert client.api_version == APIVersion(1, 9)
    await client.disconnect(force=True)
    assert client.api_version is None
    service = UserService(
        name="my_service",
        key=1,
        args=[],
    )
    with pytest.raises(APIConnectionError):
        client.execute_service(service, {})
    for method in (
        client.button_command,
        client.climate_command,
        client.cover_command,
        client.fan_command,
        client.light_command,
        client.valve_command,
        client.media_player_command,
        client.siren_command,
    ):
        with pytest.raises(APIConnectionError):
            await method(1)

    with pytest.raises(APIConnectionError):
        await client.alarm_control_panel_command(1, AlarmControlPanelCommand.ARM_HOME)

    with pytest.raises(APIConnectionError):
        await client.date_command(1, 1, 1, 1)

    with pytest.raises(APIConnectionError):
        await client.lock_command(1, LockCommand.LOCK)

    with pytest.raises(APIConnectionError):
        await client.number_command(1, 1)

    with pytest.raises(APIConnectionError):
        await client.select_command(1, "1")

    with pytest.raises(APIConnectionError):
        await client.switch_command(1, True)

    with pytest.raises(APIConnectionError):
        await client.text_command(1, "1")

    with pytest.raises(APIConnectionError):
        await client.update_command(1, True)