Refactor client class to split callback conversion functions into their own module (#675)

This commit is contained in:
J. Nick Koston 2023-11-23 18:23:51 +01:00 committed by GitHub
parent 174fb8284b
commit e7d27e307e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 220 additions and 173 deletions

View File

@ -10,8 +10,6 @@ from google.protobuf import message
from .api_pb2 import ( # type: ignore
AlarmControlPanelCommandRequest,
AlarmControlPanelStateResponse,
BinarySensorStateResponse,
BluetoothConnectionsFreeResponse,
BluetoothDeviceClearCacheResponse,
BluetoothDeviceConnectionResponse,
@ -38,50 +36,23 @@ from .api_pb2 import ( # type: ignore
CameraImageRequest,
CameraImageResponse,
ClimateCommandRequest,
ClimateStateResponse,
CoverCommandRequest,
CoverStateResponse,
DeviceInfoRequest,
DeviceInfoResponse,
ExecuteServiceArgument,
ExecuteServiceRequest,
FanCommandRequest,
FanStateResponse,
HomeassistantServiceResponse,
HomeAssistantStateResponse,
LightCommandRequest,
LightStateResponse,
ListEntitiesAlarmControlPanelResponse,
ListEntitiesBinarySensorResponse,
ListEntitiesButtonResponse,
ListEntitiesCameraResponse,
ListEntitiesClimateResponse,
ListEntitiesCoverResponse,
ListEntitiesDoneResponse,
ListEntitiesFanResponse,
ListEntitiesLightResponse,
ListEntitiesLockResponse,
ListEntitiesMediaPlayerResponse,
ListEntitiesNumberResponse,
ListEntitiesRequest,
ListEntitiesSelectResponse,
ListEntitiesSensorResponse,
ListEntitiesServicesResponse,
ListEntitiesSirenResponse,
ListEntitiesSwitchResponse,
ListEntitiesTextResponse,
ListEntitiesTextSensorResponse,
LockCommandRequest,
LockStateResponse,
MediaPlayerCommandRequest,
MediaPlayerStateResponse,
NumberCommandRequest,
NumberStateResponse,
SelectCommandRequest,
SelectStateResponse,
SensorStateResponse,
SirenCommandRequest,
SirenStateResponse,
SubscribeBluetoothConnectionsFreeRequest,
SubscribeBluetoothLEAdvertisementsRequest,
SubscribeHomeassistantServicesRequest,
@ -92,10 +63,7 @@ from .api_pb2 import ( # type: ignore
SubscribeStatesRequest,
SubscribeVoiceAssistantRequest,
SwitchCommandRequest,
SwitchStateResponse,
TextCommandRequest,
TextSensorStateResponse,
TextStateResponse,
UnsubscribeBluetoothLEAdvertisementsRequest,
VoiceAssistantAudioSettings,
VoiceAssistantEventData,
@ -103,6 +71,14 @@ from .api_pb2 import ( # type: ignore
VoiceAssistantRequest,
VoiceAssistantResponse,
)
from .client_callbacks import (
on_ble_raw_advertisement_response,
on_bluetooth_connections_free_response,
on_bluetooth_gatt_notify_data_response,
on_bluetooth_le_advertising_response,
on_home_assistant_service_response,
on_state_msg,
)
from .connection import APIConnection, ConnectionParams
from .core import (
APIConnectionError,
@ -113,11 +89,7 @@ from .core import (
)
from .model import (
AlarmControlPanelCommand,
AlarmControlPanelEntityState,
AlarmControlPanelInfo,
APIVersion,
BinarySensorInfo,
BinarySensorState,
BluetoothDeviceClearCache,
BluetoothDevicePairing,
BluetoothDeviceRequestType,
@ -127,55 +99,30 @@ from .model import (
BluetoothLEAdvertisement,
BluetoothProxyFeature,
BluetoothProxySubscriptionFlag,
ButtonInfo,
CameraInfo,
CameraState,
ClimateFanMode,
ClimateInfo,
ClimateMode,
ClimatePreset,
ClimateState,
ClimateSwingMode,
CoverInfo,
CoverState,
DeviceInfo,
EntityInfo,
EntityState,
ESPHomeBluetoothGATTServices,
FanDirection,
FanInfo,
FanSpeed,
FanState,
HomeassistantServiceCall,
LegacyCoverCommand,
LightInfo,
LightState,
LockCommand,
LockEntityState,
LockInfo,
LogLevel,
MediaPlayerCommand,
MediaPlayerEntityState,
MediaPlayerInfo,
NumberInfo,
NumberState,
SelectInfo,
SelectState,
SensorInfo,
SensorState,
SirenInfo,
SirenState,
SwitchInfo,
SwitchState,
TextInfo,
TextSensorInfo,
TextSensorState,
TextState,
UserService,
UserServiceArgType,
VoiceAssistantCommand,
VoiceAssistantEventType,
)
from .model_conversions import (
LIST_ENTITIES_SERVICES_RESPONSE_TYPES,
SUBSCRIBE_STATES_RESPONSE_TYPES,
)
from .util import build_log_name
from .zeroconf import ZeroconfInstanceType, ZeroconfManager
@ -194,45 +141,9 @@ DEFAULT_BLE_DISCONNECT_TIMEOUT = 20.0
# connection is poor.
KEEP_ALIVE_FREQUENCY = 20.0
SUBSCRIBE_STATES_RESPONSE_TYPES: dict[Any, type[EntityState]] = {
BinarySensorStateResponse: BinarySensorState,
CoverStateResponse: CoverState,
FanStateResponse: FanState,
LightStateResponse: LightState,
NumberStateResponse: NumberState,
SelectStateResponse: SelectState,
SensorStateResponse: SensorState,
SirenStateResponse: SirenState,
SwitchStateResponse: SwitchState,
TextStateResponse: TextState,
TextSensorStateResponse: TextSensorState,
ClimateStateResponse: ClimateState,
LockStateResponse: LockEntityState,
MediaPlayerStateResponse: MediaPlayerEntityState,
AlarmControlPanelStateResponse: AlarmControlPanelEntityState,
}
SUBSCRIBE_STATES_MSG_TYPES = (*SUBSCRIBE_STATES_RESPONSE_TYPES, CameraImageResponse)
LIST_ENTITIES_SERVICES_RESPONSE_TYPES: dict[Any, type[EntityInfo] | None] = {
ListEntitiesBinarySensorResponse: BinarySensorInfo,
ListEntitiesButtonResponse: ButtonInfo,
ListEntitiesCoverResponse: CoverInfo,
ListEntitiesFanResponse: FanInfo,
ListEntitiesLightResponse: LightInfo,
ListEntitiesNumberResponse: NumberInfo,
ListEntitiesSelectResponse: SelectInfo,
ListEntitiesSensorResponse: SensorInfo,
ListEntitiesSirenResponse: SirenInfo,
ListEntitiesSwitchResponse: SwitchInfo,
ListEntitiesTextResponse: TextInfo,
ListEntitiesTextSensorResponse: TextSensorInfo,
ListEntitiesServicesResponse: None,
ListEntitiesCameraResponse: CameraInfo,
ListEntitiesClimateResponse: ClimateInfo,
ListEntitiesLockResponse: LockInfo,
ListEntitiesMediaPlayerResponse: MediaPlayerInfo,
ListEntitiesAlarmControlPanelResponse: AlarmControlPanelInfo,
}
LIST_ENTITIES_MSG_TYPES = (
ListEntitiesDoneResponse,
*LIST_ENTITIES_SERVICES_RESPONSE_TYPES,
@ -461,37 +372,11 @@ class APIClient:
entities.append(cls.from_pb(msg))
return entities, services
def _on_state_msg(
self,
on_state: Callable[[EntityState], None],
image_stream: dict[int, list[bytes]],
msg: message.Message,
) -> None:
"""Handle a state message."""
msg_type = type(msg)
if cls := SUBSCRIBE_STATES_RESPONSE_TYPES.get(msg_type):
on_state(cls.from_pb(msg))
elif msg_type is CameraImageResponse:
if TYPE_CHECKING:
assert isinstance(msg, CameraImageResponse)
msg_key = msg.key
data_parts: list[bytes] | None = image_stream.get(msg_key)
if not data_parts:
data_parts = []
image_stream[msg_key] = data_parts
data_parts.append(msg.data)
if msg.done:
# Return CameraState with the merged data
image_data = b"".join(data_parts)
del image_stream[msg_key]
on_state(CameraState(key=msg.key, data=image_data)) # type: ignore[call-arg]
async def subscribe_states(self, on_state: Callable[[EntityState], None]) -> None:
"""Subscribe to state updates."""
self._get_connection().send_message_callback_response(
SubscribeStatesRequest(),
partial(self._on_state_msg, on_state, {}),
partial(on_state_msg, on_state, {}),
SUBSCRIBE_STATES_MSG_TYPES,
)
@ -510,19 +395,12 @@ class APIClient:
req, on_log, (SubscribeLogsResponse,)
)
def _on_home_assistant_service_response(
self,
on_service_call: Callable[[HomeassistantServiceCall], None],
msg: HomeassistantServiceResponse,
) -> None:
on_service_call(HomeassistantServiceCall.from_pb(msg))
async def subscribe_service_calls(
self, on_service_call: Callable[[HomeassistantServiceCall], None]
) -> None:
self._get_connection().send_message_callback_response(
SubscribeHomeassistantServicesRequest(),
partial(self._on_home_assistant_service_response, on_service_call),
partial(on_home_assistant_service_response, on_service_call),
(HomeassistantServiceResponse,),
)
@ -569,20 +447,13 @@ class APIClient:
return resp[0]
def _on_bluetooth_le_advertising_response(
self,
on_bluetooth_le_advertisement: Callable[[BluetoothLEAdvertisement], None],
msg: BluetoothLEAdvertisementResponse,
) -> None:
on_bluetooth_le_advertisement(BluetoothLEAdvertisement.from_pb(msg)) # type: ignore[misc]
async def subscribe_bluetooth_le_advertisements(
self, on_bluetooth_le_advertisement: Callable[[BluetoothLEAdvertisement], None]
) -> Callable[[], None]:
unsub_callback = self._get_connection().send_message_callback_response(
SubscribeBluetoothLEAdvertisementsRequest(flags=0),
partial(
self._on_bluetooth_le_advertising_response,
on_bluetooth_le_advertising_response,
on_bluetooth_le_advertisement,
),
(BluetoothLEAdvertisementResponse,),
@ -597,13 +468,6 @@ class APIClient:
return unsub
def _on_ble_raw_advertisement_response(
self,
on_advertisements: Callable[[list[BluetoothLERawAdvertisement]], None],
msg: BluetoothLERawAdvertisementsResponse,
) -> None:
on_advertisements(msg.advertisements)
async def subscribe_bluetooth_le_raw_advertisements(
self, on_advertisements: Callable[[list[BluetoothLERawAdvertisement]], None]
) -> Callable[[], None]:
@ -611,7 +475,7 @@ class APIClient:
SubscribeBluetoothLEAdvertisementsRequest(
flags=BluetoothProxySubscriptionFlag.RAW_ADVERTISEMENTS
),
partial(self._on_ble_raw_advertisement_response, on_advertisements),
partial(on_ble_raw_advertisement_response, on_advertisements),
(BluetoothLERawAdvertisementsResponse,),
)
@ -624,20 +488,13 @@ class APIClient:
return unsub
def _on_bluetooth_connections_free_response(
self,
on_bluetooth_connections_free_update: Callable[[int, int], None],
msg: BluetoothConnectionsFreeResponse,
) -> None:
on_bluetooth_connections_free_update(msg.free, msg.limit)
async def subscribe_bluetooth_connections_free(
self, on_bluetooth_connections_free_update: Callable[[int, int], None]
) -> Callable[[], None]:
return self._get_connection().send_message_callback_response(
SubscribeBluetoothConnectionsFreeRequest(),
partial(
self._on_bluetooth_connections_free_response,
on_bluetooth_connections_free_response,
on_bluetooth_connections_free_update,
),
(BluetoothConnectionsFreeResponse,),
@ -990,17 +847,6 @@ class APIClient:
timeout=timeout,
)
def _on_bluetooth_gatt_notify_data_response(
self,
address: int,
handle: int,
on_bluetooth_gatt_notify: Callable[[int, bytearray], None],
msg: BluetoothGATTNotifyDataResponse,
) -> None:
"""Handle a BluetoothGATTNotifyDataResponse message."""
if address == msg.address and handle == msg.handle:
on_bluetooth_gatt_notify(handle, bytearray(msg.data))
async def bluetooth_gatt_start_notify(
self,
address: int,
@ -1019,7 +865,7 @@ class APIClient:
"""
remove_callback = self._get_connection().add_message_callback(
partial(
self._on_bluetooth_gatt_notify_data_response,
on_bluetooth_gatt_notify_data_response,
address,
handle,
on_bluetooth_gatt_notify,

View File

@ -0,0 +1,87 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Callable
from google.protobuf import message
from .api_pb2 import ( # type: ignore
BluetoothConnectionsFreeResponse,
BluetoothGATTNotifyDataResponse,
BluetoothLEAdvertisementResponse,
BluetoothLERawAdvertisement,
BluetoothLERawAdvertisementsResponse,
CameraImageResponse,
HomeassistantServiceResponse,
)
from .model import (
BluetoothLEAdvertisement,
CameraState,
EntityState,
HomeassistantServiceCall,
)
from .model_conversions import SUBSCRIBE_STATES_RESPONSE_TYPES
def on_state_msg(
on_state: Callable[[EntityState], None],
image_stream: dict[int, list[bytes]],
msg: message.Message,
) -> None:
"""Handle a state message."""
msg_type = type(msg)
if cls := SUBSCRIBE_STATES_RESPONSE_TYPES.get(msg_type):
on_state(cls.from_pb(msg))
elif msg_type is CameraImageResponse:
if TYPE_CHECKING:
assert isinstance(msg, CameraImageResponse)
msg_key = msg.key
data_parts: list[bytes] | None = image_stream.get(msg_key)
if not data_parts:
data_parts = []
image_stream[msg_key] = data_parts
data_parts.append(msg.data)
if msg.done:
# Return CameraState with the merged data
image_data = b"".join(data_parts)
del image_stream[msg_key]
on_state(CameraState(key=msg.key, data=image_data)) # type: ignore[call-arg]
def on_home_assistant_service_response(
on_service_call: Callable[[HomeassistantServiceCall], None],
msg: HomeassistantServiceResponse,
) -> None:
on_service_call(HomeassistantServiceCall.from_pb(msg))
def on_bluetooth_le_advertising_response(
on_bluetooth_le_advertisement: Callable[[BluetoothLEAdvertisement], None],
msg: BluetoothLEAdvertisementResponse,
) -> None:
on_bluetooth_le_advertisement(BluetoothLEAdvertisement.from_pb(msg)) # type: ignore[misc]
def on_ble_raw_advertisement_response(
on_advertisements: Callable[[list[BluetoothLERawAdvertisement]], None],
msg: BluetoothLERawAdvertisementsResponse,
) -> None:
on_advertisements(msg.advertisements)
def on_bluetooth_connections_free_response(
on_bluetooth_connections_free_update: Callable[[int, int], None],
msg: BluetoothConnectionsFreeResponse,
) -> None:
on_bluetooth_connections_free_update(msg.free, msg.limit)
def on_bluetooth_gatt_notify_data_response(
address: int,
handle: int,
on_bluetooth_gatt_notify: Callable[[int, bytearray], None],
msg: BluetoothGATTNotifyDataResponse,
) -> None:
"""Handle a BluetoothGATTNotifyDataResponse message."""
if address == msg.address and handle == msg.handle:
on_bluetooth_gatt_notify(handle, bytearray(msg.data))

View File

@ -0,0 +1,114 @@
from __future__ import annotations
from typing import Any
from .api_pb2 import ( # type: ignore
AlarmControlPanelStateResponse,
BinarySensorStateResponse,
ClimateStateResponse,
CoverStateResponse,
FanStateResponse,
LightStateResponse,
ListEntitiesAlarmControlPanelResponse,
ListEntitiesBinarySensorResponse,
ListEntitiesButtonResponse,
ListEntitiesCameraResponse,
ListEntitiesClimateResponse,
ListEntitiesCoverResponse,
ListEntitiesFanResponse,
ListEntitiesLightResponse,
ListEntitiesLockResponse,
ListEntitiesMediaPlayerResponse,
ListEntitiesNumberResponse,
ListEntitiesSelectResponse,
ListEntitiesSensorResponse,
ListEntitiesServicesResponse,
ListEntitiesSirenResponse,
ListEntitiesSwitchResponse,
ListEntitiesTextResponse,
ListEntitiesTextSensorResponse,
LockStateResponse,
MediaPlayerStateResponse,
NumberStateResponse,
SelectStateResponse,
SensorStateResponse,
SirenStateResponse,
SwitchStateResponse,
TextSensorStateResponse,
TextStateResponse,
)
from .model import (
AlarmControlPanelEntityState,
AlarmControlPanelInfo,
BinarySensorInfo,
BinarySensorState,
ButtonInfo,
CameraInfo,
ClimateInfo,
ClimateState,
CoverInfo,
CoverState,
EntityInfo,
EntityState,
FanInfo,
FanState,
LightInfo,
LightState,
LockEntityState,
LockInfo,
MediaPlayerEntityState,
MediaPlayerInfo,
NumberInfo,
NumberState,
SelectInfo,
SelectState,
SensorInfo,
SensorState,
SirenInfo,
SirenState,
SwitchInfo,
SwitchState,
TextInfo,
TextSensorInfo,
TextSensorState,
TextState,
)
SUBSCRIBE_STATES_RESPONSE_TYPES: dict[Any, type[EntityState]] = {
BinarySensorStateResponse: BinarySensorState,
CoverStateResponse: CoverState,
FanStateResponse: FanState,
LightStateResponse: LightState,
NumberStateResponse: NumberState,
SelectStateResponse: SelectState,
SensorStateResponse: SensorState,
SirenStateResponse: SirenState,
SwitchStateResponse: SwitchState,
TextStateResponse: TextState,
TextSensorStateResponse: TextSensorState,
ClimateStateResponse: ClimateState,
LockStateResponse: LockEntityState,
MediaPlayerStateResponse: MediaPlayerEntityState,
AlarmControlPanelStateResponse: AlarmControlPanelEntityState,
}
LIST_ENTITIES_SERVICES_RESPONSE_TYPES: dict[Any, type[EntityInfo] | None] = {
ListEntitiesBinarySensorResponse: BinarySensorInfo,
ListEntitiesButtonResponse: ButtonInfo,
ListEntitiesCoverResponse: CoverInfo,
ListEntitiesFanResponse: FanInfo,
ListEntitiesLightResponse: LightInfo,
ListEntitiesNumberResponse: NumberInfo,
ListEntitiesSelectResponse: SelectInfo,
ListEntitiesSensorResponse: SensorInfo,
ListEntitiesSirenResponse: SirenInfo,
ListEntitiesSwitchResponse: SwitchInfo,
ListEntitiesTextResponse: TextInfo,
ListEntitiesTextSensorResponse: TextSensorInfo,
ListEntitiesServicesResponse: None,
ListEntitiesCameraResponse: CameraInfo,
ListEntitiesClimateResponse: ClimateInfo,
ListEntitiesLockResponse: LockInfo,
ListEntitiesMediaPlayerResponse: MediaPlayerInfo,
ListEntitiesAlarmControlPanelResponse: AlarmControlPanelInfo,
}