Implement Bluetooth LE advertisement receiving (#246)

This commit is contained in:
Jesse Hills 2022-08-22 15:27:46 +12:00 committed by GitHub
parent f6ec38cb19
commit 1273d689f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 320 additions and 58 deletions

View File

@ -27,6 +27,7 @@ service APIConnection {
rpc subscribe_logs (SubscribeLogsRequest) returns (void) {}
rpc subscribe_homeassistant_services (SubscribeHomeassistantServicesRequest) returns (void) {}
rpc subscribe_home_assistant_states (SubscribeHomeAssistantStatesRequest) returns (void) {}
rpc subscribe_bluetooth_le_advertisements (SubscribeBluetoothLEAdvertisementsRequest) returns (void) {}
rpc get_time (GetTimeRequest) returns (GetTimeResponse) {
option (needs_authentication) = false;
}
@ -1126,3 +1127,28 @@ message MediaPlayerCommandRequest {
bool has_media_url = 6;
string media_url = 7;
}
// ==================== BLUETOOTH ====================
message SubscribeBluetoothLEAdvertisementsRequest {
option (id) = 66;
option (source) = SOURCE_CLIENT;
}
message BluetoothServiceData {
string uuid = 1;
repeated uint32 data = 2 [packed=false];
}
message BluetoothLEAdvertisementResponse {
option (id) = 67;
option (source) = SOURCE_SERVER;
option (ifdef) = "USE_BLUETOOTH_PROXY";
option (no_delay) = true;
uint64 address = 1;
string name = 2;
sint32 rssi = 3;
repeated string service_uuids = 4;
repeated BluetoothServiceData service_data = 5;
repeated BluetoothServiceData manufacturer_data = 6;
}

File diff suppressed because one or more lines are too long

View File

@ -16,6 +16,7 @@ from google.protobuf import message
from .api_pb2 import ( # type: ignore
BinarySensorStateResponse,
BluetoothLEAdvertisementResponse,
ButtonCommandRequest,
CameraImageRequest,
CameraImageResponse,
@ -62,6 +63,7 @@ from .api_pb2 import ( # type: ignore
SensorStateResponse,
SirenCommandRequest,
SirenStateResponse,
SubscribeBluetoothLEAdvertisementsRequest,
SubscribeHomeassistantServicesRequest,
SubscribeHomeAssistantStateResponse,
SubscribeHomeAssistantStatesRequest,
@ -79,6 +81,7 @@ from .model import (
APIVersion,
BinarySensorInfo,
BinarySensorState,
BluetoothLEAdvertisement,
ButtonInfo,
CameraInfo,
CameraState,
@ -379,6 +382,20 @@ class APIClient:
SubscribeHomeassistantServicesRequest(), on_msg
)
async def subscribe_bluetooth_le_advertisements(
self, on_bluetooth_le_advertisement: Callable[[BluetoothLEAdvertisement], None]
) -> None:
self._check_authenticated()
def on_msg(msg: message.Message) -> None:
if isinstance(msg, BluetoothLEAdvertisementResponse):
on_bluetooth_le_advertisement(BluetoothLEAdvertisement.from_pb(msg))
assert self._connection is not None
await self._connection.send_message_callback_response(
SubscribeBluetoothLEAdvertisementsRequest(), on_msg
)
async def subscribe_home_assistant_states(
self, on_state_sub: Callable[[str, Optional[str]], None]
) -> None:

View File

@ -1,5 +1,6 @@
from .api_pb2 import ( # type: ignore
BinarySensorStateResponse,
BluetoothLEAdvertisementResponse,
ButtonCommandRequest,
CameraImageRequest,
CameraImageResponse,
@ -55,6 +56,7 @@ from .api_pb2 import ( # type: ignore
SensorStateResponse,
SirenCommandRequest,
SirenStateResponse,
SubscribeBluetoothLEAdvertisementsRequest,
SubscribeHomeassistantServicesRequest,
SubscribeHomeAssistantStateResponse,
SubscribeHomeAssistantStatesRequest,
@ -189,4 +191,6 @@ MESSAGE_TYPE_TO_PROTO = {
63: ListEntitiesMediaPlayerResponse,
64: MediaPlayerStateResponse,
65: MediaPlayerCommandRequest,
66: SubscribeBluetoothLEAdvertisementsRequest,
67: BluetoothLEAdvertisementResponse,
}

View File

@ -17,7 +17,7 @@ from typing import (
from .util import fix_float_single_double_conversion
if TYPE_CHECKING:
from .api_pb2 import HomeassistantServiceMap # type: ignore
from .api_pb2 import BluetoothServiceData, HomeassistantServiceMap # type: ignore
# All fields in here should have defaults set
# Home Assistant depends on these fields being constructible
@ -753,6 +753,51 @@ class UserService(APIModelBase):
)
# ==================== BLUETOOTH ====================
def _long_uuid(uuid: str) -> str:
"""Convert a UUID to a long UUID."""
return (
f"0000{uuid[2:].lower()}-0000-1000-8000-00805f9b34fb" if len(uuid) < 8 else uuid
)
def _convert_bluetooth_le_service_uuids(value: List[str]) -> List[str]:
return [_long_uuid(v) for v in value]
def _convert_bluetooth_le_service_data(
value: Union[Dict[str, bytes], Iterable["BluetoothServiceData"]],
) -> Dict[str, bytes]:
if isinstance(value, dict):
return value
return {_long_uuid(v.uuid): bytes(v.data) for v in value} # type: ignore
def _convert_bluetooth_le_manufacturer_data(
value: Union[Dict[int, bytes], Iterable["BluetoothServiceData"]],
) -> Dict[int, bytes]:
if isinstance(value, dict):
return value
return {int(v.uuid, 16): bytes(v.data) for v in value} # type: ignore
@dataclass(frozen=True)
class BluetoothLEAdvertisement(APIModelBase):
address: int = 0
name: str = ""
rssi: int = 0
service_uuids: List[str] = converter_field(
default_factory=list, converter=_convert_bluetooth_le_service_uuids
)
service_data: Dict[str, bytes] = converter_field(
default_factory=dict, converter=_convert_bluetooth_le_service_data
)
manufacturer_data: Dict[int, bytes] = converter_field(
default_factory=dict, converter=_convert_bluetooth_le_manufacturer_data
)
class LogLevel(APIIntEnum):
LOG_LEVEL_NONE = 0
LOG_LEVEL_ERROR = 1