# pylint: disable=too-many-lines import asyncio import logging from typing import ( Any, Awaitable, Callable, Coroutine, Dict, List, Optional, Tuple, Type, Union, cast, ) import async_timeout from google.protobuf import message from .api_pb2 import ( # type: ignore BinarySensorStateResponse, BluetoothConnectionsFreeResponse, BluetoothDeviceConnectionResponse, BluetoothDeviceRequest, BluetoothGATTGetServicesDoneResponse, BluetoothGATTGetServicesRequest, BluetoothGATTGetServicesResponse, BluetoothGATTNotifyDataResponse, BluetoothGATTNotifyRequest, BluetoothGATTReadDescriptorRequest, BluetoothGATTReadRequest, BluetoothGATTReadResponse, BluetoothGATTWriteDescriptorRequest, BluetoothGATTWriteRequest, BluetoothLEAdvertisementResponse, ButtonCommandRequest, CameraImageRequest, CameraImageResponse, ClimateCommandRequest, ClimateStateResponse, CoverCommandRequest, CoverStateResponse, DeviceInfoRequest, DeviceInfoResponse, ExecuteServiceArgument, ExecuteServiceRequest, FanCommandRequest, FanStateResponse, HomeassistantServiceResponse, HomeAssistantStateResponse, LightCommandRequest, LightStateResponse, ListEntitiesBinarySensorResponse, ListEntitiesButtonResponse, ListEntitiesCameraResponse, ListEntitiesClimateResponse, ListEntitiesCoverResponse, ListEntitiesDoneResponse, ListEntitiesFanResponse, ListEntitiesLightResponse, ListEntitiesLockResponse, ListEntitiesMediaPlayerResponse, ListEntitiesNumberResponse, ListEntitiesRequest, ListEntitiesSelectResponse, ListEntitiesSensorResponse, ListEntitiesServicesResponse, ListEntitiesSirenResponse, ListEntitiesSwitchResponse, ListEntitiesTextSensorResponse, LockCommandRequest, LockStateResponse, MediaPlayerCommandRequest, MediaPlayerStateResponse, NumberCommandRequest, NumberStateResponse, SelectCommandRequest, SelectStateResponse, SensorStateResponse, SirenCommandRequest, SirenStateResponse, SubscribeBluetoothConnectionsFreeRequest, SubscribeBluetoothLEAdvertisementsRequest, SubscribeHomeassistantServicesRequest, SubscribeHomeAssistantStateResponse, SubscribeHomeAssistantStatesRequest, SubscribeLogsRequest, SubscribeLogsResponse, SubscribeStatesRequest, SwitchCommandRequest, SwitchStateResponse, TextSensorStateResponse, ) from .connection import APIConnection, ConnectionParams from .core import APIConnectionError, TimeoutAPIError from .host_resolver import ZeroconfInstanceType from .model import ( APIVersion, BinarySensorInfo, BinarySensorState, BluetoothConnectionsFree, BluetoothDeviceConnection, BluetoothDeviceRequestType, BluetoothGATTRead, BluetoothGATTServices, BluetoothLEAdvertisement, ButtonInfo, CameraInfo, CameraState, ClimateFanMode, ClimateInfo, ClimateMode, ClimatePreset, ClimateState, ClimateSwingMode, CoverInfo, CoverState, DeviceInfo, EntityInfo, EntityState, ESPHomeBluetoothGATTServices, FanDirection, FanInfo, FanSpeed, FanState, HomeassistantServiceCall, LegacyCoverCommand, LightInfo, LightState, LockCommand, LockEntityState, LockInfo, LogLevel, MediaPlayerCommand, MediaPlayerEntityState, MediaPlayerInfo, NumberInfo, NumberState, SelectInfo, SelectState, SensorInfo, SensorState, SirenInfo, SirenState, SwitchInfo, SwitchState, TextSensorInfo, TextSensorState, UserService, UserServiceArgType, ) _LOGGER = logging.getLogger(__name__) ExecuteServiceDataType = Dict[ str, Union[bool, int, float, str, List[bool], List[int], List[float], List[str]] ] # pylint: disable=too-many-public-methods class APIClient: def __init__( self, address: str, port: int, password: Optional[str], *, client_info: str = "aioesphomeapi", keepalive: float = 15.0, zeroconf_instance: ZeroconfInstanceType = None, noise_psk: Optional[str] = None, expected_name: Optional[str] = None, ): """Create a client, this object is shared across sessions. :param address: The address to connect to; for example an IP address or .local name for mDNS lookup. :param port: The port to connect to :param password: Optional password to send to the device for authentication :param client_info: User Agent string to send. :param keepalive: The keepalive time in seconds (ping interval) for detecting stale connections. Every keepalive seconds a ping is sent, if no pong is received the connection is closed. :param zeroconf_instance: Pass a zeroconf instance to use if an mDNS lookup is necessary. :param noise_psk: Encryption preshared key for noise transport encrypted sessions. :param expected_name: Require the devices name to match the given expected name. Can be used to prevent accidentally connecting to a different device if IP passed as address but DHCP reassigned IP. """ self._params = ConnectionParams( address=address, port=port, password=password, client_info=client_info, keepalive=keepalive, zeroconf_instance=zeroconf_instance, # treat empty psk string as missing (like password) noise_psk=noise_psk or None, expected_name=expected_name, ) self._connection: Optional[APIConnection] = None self._cached_name: Optional[str] = None @property def expected_name(self) -> Optional[str]: return self._params.expected_name @expected_name.setter def expected_name(self, value: Optional[str]) -> None: self._params.expected_name = value @property def address(self) -> str: return self._params.address @property def _log_name(self) -> str: if self._cached_name is not None: return f"{self._cached_name} @ {self.address}" return self.address async def connect( self, on_stop: Optional[Callable[[], Awaitable[None]]] = None, login: bool = False, ) -> None: if self._connection is not None: raise APIConnectionError(f"Already connected to {self._log_name}!") async def _on_stop() -> None: # Hook into on_stop handler to clear connection when stopped self._connection = None if on_stop is not None: await on_stop() self._connection = APIConnection(self._params, _on_stop) self._connection.log_name = self._log_name try: await self._connection.connect(login=login) except APIConnectionError: self._connection = None raise except Exception as e: self._connection = None raise APIConnectionError( f"Unexpected error while connecting to {self._log_name}: {e}" ) from e async def disconnect(self, force: bool = False) -> None: if self._connection is None: return if force: await self._connection.force_disconnect() else: await self._connection.disconnect() def _check_connected(self) -> None: if self._connection is None: raise APIConnectionError(f"Not connected to {self._log_name}!") if not self._connection.is_connected: raise APIConnectionError(f"Connection not done for {self._log_name}!") def _check_authenticated(self) -> None: self._check_connected() assert self._connection is not None if not self._connection.is_authenticated: raise APIConnectionError(f"Not authenticated for {self._log_name}!") async def device_info(self) -> DeviceInfo: self._check_connected() assert self._connection is not None resp = await self._connection.send_message_await_response( DeviceInfoRequest(), DeviceInfoResponse ) info = DeviceInfo.from_pb(resp) self._cached_name = info.name self._connection.log_name = self._log_name return info async def list_entities_services( self, ) -> Tuple[List[EntityInfo], List[UserService]]: self._check_authenticated() response_types: Dict[Any, Optional[Type[EntityInfo]]] = { ListEntitiesBinarySensorResponse: BinarySensorInfo, ListEntitiesButtonResponse: ButtonInfo, ListEntitiesCoverResponse: CoverInfo, ListEntitiesFanResponse: FanInfo, ListEntitiesLightResponse: LightInfo, ListEntitiesNumberResponse: NumberInfo, ListEntitiesSelectResponse: SelectInfo, ListEntitiesSensorResponse: SensorInfo, ListEntitiesSirenResponse: SirenInfo, ListEntitiesSwitchResponse: SwitchInfo, ListEntitiesTextSensorResponse: TextSensorInfo, ListEntitiesServicesResponse: None, ListEntitiesCameraResponse: CameraInfo, ListEntitiesClimateResponse: ClimateInfo, ListEntitiesLockResponse: LockInfo, ListEntitiesMediaPlayerResponse: MediaPlayerInfo, } def do_append(msg: message.Message) -> bool: return isinstance(msg, tuple(response_types.keys())) def do_stop(msg: message.Message) -> bool: return isinstance(msg, ListEntitiesDoneResponse) assert self._connection is not None resp = await self._connection.send_message_await_response_complex( ListEntitiesRequest(), do_append, do_stop, timeout=30 ) entities: List[EntityInfo] = [] services: List[UserService] = [] for msg in resp: if isinstance(msg, ListEntitiesServicesResponse): services.append(UserService.from_pb(msg)) continue cls = None for resp_type, cls in response_types.items(): if isinstance(msg, resp_type): break else: continue assert cls is not None entities.append(cls.from_pb(msg)) return entities, services async def subscribe_states(self, on_state: Callable[[EntityState], None]) -> None: self._check_authenticated() response_types: Dict[Any, Type[EntityState]] = { BinarySensorStateResponse: BinarySensorState, CoverStateResponse: CoverState, FanStateResponse: FanState, LightStateResponse: LightState, NumberStateResponse: NumberState, SelectStateResponse: SelectState, SensorStateResponse: SensorState, SirenStateResponse: SirenState, SwitchStateResponse: SwitchState, TextSensorStateResponse: TextSensorState, ClimateStateResponse: ClimateState, LockStateResponse: LockEntityState, MediaPlayerStateResponse: MediaPlayerEntityState, } image_stream: Dict[int, bytes] = {} def on_msg(msg: message.Message) -> None: if isinstance(msg, CameraImageResponse): data = image_stream.pop(msg.key, bytes()) + msg.data if msg.done: # Return CameraState with the merged data on_state(CameraState(key=msg.key, data=data)) else: image_stream[msg.key] = data return for resp_type, cls in response_types.items(): if isinstance(msg, resp_type): break else: return # pylint: disable=undefined-loop-variable on_state(cls.from_pb(msg)) assert self._connection is not None await self._connection.send_message_callback_response( SubscribeStatesRequest(), on_msg ) async def subscribe_logs( self, on_log: Callable[[SubscribeLogsResponse], None], log_level: Optional[LogLevel] = None, dump_config: Optional[bool] = None, ) -> None: self._check_authenticated() def on_msg(msg: message.Message) -> None: if isinstance(msg, SubscribeLogsResponse): on_log(msg) req = SubscribeLogsRequest() if log_level is not None: req.level = log_level if dump_config is not None: req.dump_config = dump_config assert self._connection is not None await self._connection.send_message_callback_response(req, on_msg) async def subscribe_service_calls( self, on_service_call: Callable[[HomeassistantServiceCall], None] ) -> None: self._check_authenticated() def on_msg(msg: message.Message) -> None: if isinstance(msg, HomeassistantServiceResponse): on_service_call(HomeassistantServiceCall.from_pb(msg)) assert self._connection is not None await self._connection.send_message_callback_response( SubscribeHomeassistantServicesRequest(), on_msg ) async def subscribe_bluetooth_le_advertisements( self, on_bluetooth_le_advertisement: Callable[[BluetoothLEAdvertisement], None] ) -> Callable[[], None]: self._check_authenticated() def on_msg(msg: message.Message) -> None: if isinstance(msg, BluetoothLEAdvertisementResponse): on_bluetooth_le_advertisement(BluetoothLEAdvertisement.from_pb(msg)) assert self._connection is not None await self._connection.send_message_callback_response( SubscribeBluetoothLEAdvertisementsRequest(), on_msg ) def unsub() -> None: assert self._connection is not None self._connection.remove_message_callback(on_msg) return unsub async def subscribe_bluetooth_connections_free( self, on_bluetooth_connections_free_update: Callable[[int, int], None] ) -> Callable[[], None]: self._check_authenticated() def on_msg(msg: message.Message) -> None: if isinstance(msg, BluetoothConnectionsFreeResponse): resp = BluetoothConnectionsFree.from_pb(msg) on_bluetooth_connections_free_update(resp.free, resp.limit) assert self._connection is not None await self._connection.send_message_callback_response( SubscribeBluetoothConnectionsFreeRequest(), on_msg ) def unsub() -> None: assert self._connection is not None self._connection.remove_message_callback(on_msg) return unsub async def bluetooth_device_connect( self, address: int, on_bluetooth_connection_state: Callable[[bool, int, int], None], timeout: float = 10.0, ) -> Callable[[], None]: self._check_authenticated() event = asyncio.Event() def on_msg(msg: message.Message) -> None: if isinstance(msg, BluetoothDeviceConnectionResponse): resp = BluetoothDeviceConnection.from_pb(msg) if address == resp.address: on_bluetooth_connection_state(resp.connected, resp.mtu, resp.error) event.set() assert self._connection is not None await self._connection.send_message_callback_response( BluetoothDeviceRequest( address=address, request_type=BluetoothDeviceRequestType.CONNECT, ), on_msg, ) def unsub() -> None: assert self._connection is not None self._connection.remove_message_callback(on_msg) try: async with async_timeout.timeout(timeout): await event.wait() except asyncio.TimeoutError as err: unsub() raise TimeoutAPIError("Timeout waiting for connect response") from err return unsub async def bluetooth_device_disconnect(self, address: int) -> None: self._check_authenticated() assert self._connection is not None await self._connection.send_message( BluetoothDeviceRequest( address=address, request_type=BluetoothDeviceRequestType.DISCONNECT, ) ) async def bluetooth_gatt_get_services( self, address: int ) -> ESPHomeBluetoothGATTServices: self._check_authenticated() def do_append(msg: message.Message) -> bool: return isinstance(msg, BluetoothGATTGetServicesResponse) def do_stop(msg: message.Message) -> bool: return isinstance(msg, BluetoothGATTGetServicesDoneResponse) assert self._connection is not None resp = await self._connection.send_message_await_response_complex( BluetoothGATTGetServicesRequest(address=address), do_append, do_stop ) services = [] for msg in resp: services.extend(BluetoothGATTServices.from_pb(msg).services) return ESPHomeBluetoothGATTServices(address=address, services=services) async def bluetooth_gatt_read( self, address: int, characteristic_handle: int, timeout: float = 10.0 ) -> bytearray: self._check_authenticated() req = BluetoothGATTReadRequest() req.address = address req.handle = characteristic_handle def is_response(msg: message.Message) -> bool: if isinstance(msg, BluetoothGATTReadResponse): read = BluetoothGATTRead.from_pb(msg) if read.address == address and read.handle == characteristic_handle: return True return False assert self._connection is not None resp = await self._connection.send_message_await_response_complex( req, is_response, is_response, timeout=timeout ) if len(resp) != 1: raise APIConnectionError(f"Expected one result, got {len(resp)}") read_response = BluetoothGATTRead.from_pb(resp[0]) return bytearray(read_response.data) async def bluetooth_gatt_write( self, address: int, characteristic_handle: int, data: bytes, response: bool, ) -> None: self._check_authenticated() req = BluetoothGATTWriteRequest() req.address = address req.handle = characteristic_handle req.response = response req.data = data assert self._connection is not None await self._connection.send_message(req) async def bluetooth_gatt_read_descriptor( self, address: int, handle: int, timeout: float = 10.0, ) -> bytearray: self._check_authenticated() req = BluetoothGATTReadDescriptorRequest() req.address = address req.handle = handle def is_response(msg: message.Message) -> bool: if isinstance(msg, BluetoothGATTReadResponse): read = BluetoothGATTRead.from_pb(msg) if read.address == address and read.handle == handle: return True return False assert self._connection is not None resp = await self._connection.send_message_await_response_complex( req, is_response, is_response, timeout=timeout ) if len(resp) != 1: raise APIConnectionError(f"Expected one result, got {len(resp)}") read_response = BluetoothGATTRead.from_pb(resp[0]) return bytearray(read_response.data) async def bluetooth_gatt_write_descriptor( self, address: int, handle: int, data: bytes, ) -> None: self._check_authenticated() req = BluetoothGATTWriteDescriptorRequest() req.address = address req.handle = handle req.data = data assert self._connection is not None await self._connection.send_message(req) async def bluetooth_gatt_start_notify( self, address: int, handle: int, on_bluetooth_gatt_notify: Callable[[int, bytearray], None], ) -> Callable[[], Coroutine[Any, Any, None]]: self._check_authenticated() def on_msg(msg: message.Message) -> None: if isinstance(msg, BluetoothGATTNotifyDataResponse): notify = BluetoothGATTRead.from_pb(msg) if address == notify.address and handle == notify.handle: on_bluetooth_gatt_notify(handle, bytearray(notify.data)) assert self._connection is not None await self._connection.send_message_callback_response( BluetoothGATTNotifyRequest(address=address, handle=handle, enable=True), on_msg, ) async def stop_notify() -> None: assert self._connection is not None self._connection.remove_message_callback(on_msg) self._check_authenticated() await self._connection.send_message( BluetoothGATTNotifyRequest(address=address, handle=handle, enable=False) ) return stop_notify async def subscribe_home_assistant_states( self, on_state_sub: Callable[[str, Optional[str]], None] ) -> None: self._check_authenticated() def on_msg(msg: message.Message) -> None: if isinstance(msg, SubscribeHomeAssistantStateResponse): on_state_sub(msg.entity_id, msg.attribute) assert self._connection is not None await self._connection.send_message_callback_response( SubscribeHomeAssistantStatesRequest(), on_msg ) async def send_home_assistant_state( self, entity_id: str, attribute: Optional[str], state: str ) -> None: self._check_authenticated() assert self._connection is not None await self._connection.send_message( HomeAssistantStateResponse( entity_id=entity_id, state=state, attribute=attribute, ) ) async def cover_command( self, key: int, position: Optional[float] = None, tilt: Optional[float] = None, stop: bool = False, ) -> None: self._check_authenticated() req = CoverCommandRequest() req.key = key apiv = cast(APIVersion, self.api_version) if apiv >= APIVersion(1, 1): if position is not None: req.has_position = True req.position = position if tilt is not None: req.has_tilt = True req.tilt = tilt if stop: req.stop = stop else: if stop: req.legacy_command = LegacyCoverCommand.STOP req.has_legacy_command = True elif position == 1.0: req.legacy_command = LegacyCoverCommand.OPEN req.has_legacy_command = True elif position == 0.0: req.legacy_command = LegacyCoverCommand.CLOSE req.has_legacy_command = True assert self._connection is not None await self._connection.send_message(req) async def fan_command( self, key: int, state: Optional[bool] = None, speed: Optional[FanSpeed] = None, speed_level: Optional[int] = None, oscillating: Optional[bool] = None, direction: Optional[FanDirection] = None, ) -> None: self._check_authenticated() req = FanCommandRequest() req.key = key if state is not None: req.has_state = True req.state = state if speed is not None: req.has_speed = True req.speed = speed if speed_level is not None: req.has_speed_level = True req.speed_level = speed_level if oscillating is not None: req.has_oscillating = True req.oscillating = oscillating if direction is not None: req.has_direction = True req.direction = direction assert self._connection is not None await self._connection.send_message(req) async def light_command( self, key: int, state: Optional[bool] = None, brightness: Optional[float] = None, color_mode: Optional[int] = None, color_brightness: Optional[float] = None, rgb: Optional[Tuple[float, float, float]] = None, white: Optional[float] = None, color_temperature: Optional[float] = None, cold_white: Optional[float] = None, warm_white: Optional[float] = None, transition_length: Optional[float] = None, flash_length: Optional[float] = None, effect: Optional[str] = None, ) -> None: self._check_authenticated() req = LightCommandRequest() req.key = key if state is not None: req.has_state = True req.state = state if brightness is not None: req.has_brightness = True req.brightness = brightness if color_mode is not None: req.has_color_mode = True req.color_mode = color_mode if color_brightness is not None: req.has_color_brightness = True req.color_brightness = color_brightness if rgb is not None: req.has_rgb = True req.red = rgb[0] req.green = rgb[1] req.blue = rgb[2] if white is not None: req.has_white = True req.white = white if color_temperature is not None: req.has_color_temperature = True req.color_temperature = color_temperature if cold_white is not None: req.has_cold_white = True req.cold_white = cold_white if warm_white is not None: req.has_warm_white = True req.warm_white = warm_white if transition_length is not None: req.has_transition_length = True req.transition_length = int(round(transition_length * 1000)) if flash_length is not None: req.has_flash_length = True req.flash_length = int(round(flash_length * 1000)) if effect is not None: req.has_effect = True req.effect = effect assert self._connection is not None await self._connection.send_message(req) async def switch_command(self, key: int, state: bool) -> None: self._check_authenticated() req = SwitchCommandRequest() req.key = key req.state = state assert self._connection is not None await self._connection.send_message(req) async def climate_command( self, key: int, mode: Optional[ClimateMode] = None, target_temperature: Optional[float] = None, target_temperature_low: Optional[float] = None, target_temperature_high: Optional[float] = None, fan_mode: Optional[ClimateFanMode] = None, swing_mode: Optional[ClimateSwingMode] = None, custom_fan_mode: Optional[str] = None, preset: Optional[ClimatePreset] = None, custom_preset: Optional[str] = None, ) -> None: self._check_authenticated() req = ClimateCommandRequest() req.key = key if mode is not None: req.has_mode = True req.mode = mode if target_temperature is not None: req.has_target_temperature = True req.target_temperature = target_temperature if target_temperature_low is not None: req.has_target_temperature_low = True req.target_temperature_low = target_temperature_low if target_temperature_high is not None: req.has_target_temperature_high = True req.target_temperature_high = target_temperature_high if fan_mode is not None: req.has_fan_mode = True req.fan_mode = fan_mode if swing_mode is not None: req.has_swing_mode = True req.swing_mode = swing_mode if custom_fan_mode is not None: req.has_custom_fan_mode = True req.custom_fan_mode = custom_fan_mode if preset is not None: apiv = cast(APIVersion, self.api_version) if apiv < APIVersion(1, 5): req.has_legacy_away = True req.legacy_away = preset == ClimatePreset.AWAY else: req.has_preset = True req.preset = preset if custom_preset is not None: req.has_custom_preset = True req.custom_preset = custom_preset assert self._connection is not None await self._connection.send_message(req) async def number_command(self, key: int, state: float) -> None: self._check_authenticated() req = NumberCommandRequest() req.key = key req.state = state assert self._connection is not None await self._connection.send_message(req) async def select_command(self, key: int, state: str) -> None: self._check_authenticated() req = SelectCommandRequest() req.key = key req.state = state assert self._connection is not None await self._connection.send_message(req) async def siren_command( self, key: int, state: Optional[bool] = None, tone: Optional[str] = None, volume: Optional[float] = None, duration: Optional[int] = None, ) -> None: self._check_authenticated() req = SirenCommandRequest() req.key = key if state is not None: req.state = state req.has_state = True if tone is not None: req.tone = tone req.has_tone = True if volume is not None: req.volume = volume req.has_volume = True if duration is not None: req.duration = duration req.has_duration = True assert self._connection is not None await self._connection.send_message(req) async def button_command(self, key: int) -> None: self._check_authenticated() req = ButtonCommandRequest() req.key = key assert self._connection is not None await self._connection.send_message(req) async def lock_command( self, key: int, command: LockCommand, code: Optional[str] = None, ) -> None: self._check_authenticated() req = LockCommandRequest() req.key = key req.command = command if code is not None: req.code = code assert self._connection is not None await self._connection.send_message(req) async def media_player_command( self, key: int, *, command: Optional[MediaPlayerCommand] = None, volume: Optional[float] = None, media_url: Optional[str] = None, ) -> None: self._check_authenticated() req = MediaPlayerCommandRequest() req.key = key if command is not None: req.command = command req.has_command = True if volume is not None: req.volume = volume req.has_volume = True if media_url is not None: req.media_url = media_url req.has_media_url = True assert self._connection is not None await self._connection.send_message(req) async def execute_service( self, service: UserService, data: ExecuteServiceDataType ) -> None: self._check_authenticated() req = ExecuteServiceRequest() req.key = service.key args = [] for arg_desc in service.args: arg = ExecuteServiceArgument() val = data[arg_desc.name] apiv = cast(APIVersion, self.api_version) int_type = "int_" if apiv >= APIVersion(1, 3) else "legacy_int" map_single = { UserServiceArgType.BOOL: "bool_", UserServiceArgType.INT: int_type, UserServiceArgType.FLOAT: "float_", UserServiceArgType.STRING: "string_", } map_array = { UserServiceArgType.BOOL_ARRAY: "bool_array", UserServiceArgType.INT_ARRAY: "int_array", UserServiceArgType.FLOAT_ARRAY: "float_array", UserServiceArgType.STRING_ARRAY: "string_array", } if arg_desc.type in map_array: attr = getattr(arg, map_array[arg_desc.type]) attr.extend(val) else: assert arg_desc.type in map_single setattr(arg, map_single[arg_desc.type], val) args.append(arg) # pylint: disable=no-member req.args.extend(args) assert self._connection is not None await self._connection.send_message(req) async def _request_image( self, *, single: bool = False, stream: bool = False ) -> None: req = CameraImageRequest() req.single = single req.stream = stream assert self._connection is not None await self._connection.send_message(req) async def request_single_image(self) -> None: await self._request_image(single=True) async def request_image_stream(self) -> None: await self._request_image(stream=True) @property def api_version(self) -> Optional[APIVersion]: if self._connection is None: return None return self._connection.api_version