Process messages in a separate task loop (#263)

This commit is contained in:
Jesse Hills 2022-09-30 10:59:40 +13:00 committed by GitHub
parent d14af2eee5
commit 2103a7467a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -103,6 +103,8 @@ class APIConnection:
self._ping_stop_event = asyncio.Event()
self._to_process: asyncio.Queue[message.Message] = asyncio.Queue()
async def _cleanup(self) -> None:
"""Clean up all resources that have been allocated.
@ -171,7 +173,9 @@ class APIConnection:
async def _connect_init_frame_helper(self) -> None:
"""Step 3 in connect process: initialize the frame helper and init read loop."""
reader, writer = await asyncio.open_connection(sock=self._socket)
reader, writer = await asyncio.open_connection(
sock=self._socket, limit=1024 * 1024
) # Set buffer limit to 1MB
if self._params.noise_psk is None:
self._frame_helper = APIPlaintextFrameHelper(reader, writer)
@ -185,6 +189,8 @@ class APIConnection:
# Create read loop
asyncio.create_task(self._read_loop())
# Create process loop
asyncio.create_task(self._process_loop())
async def _connect_hello(self) -> None:
"""Step 4 in connect process: send hello and get api version."""
@ -473,9 +479,19 @@ class APIConnection:
except Exception as e:
raise ProtocolAPIError(f"Invalid protobuf message: {e}") from e
_LOGGER.debug("%s: Got message of type %s: %s", self.log_name, type(msg), msg)
for msg_handler in self._message_handlers[:]:
msg_handler(msg)
await self._handle_internal_messages(msg)
self._to_process.put_nowait(msg)
async def _process_loop(self) -> None:
while True:
if not self._is_socket_open:
# Socket closed but task isn't cancelled yet
break
msg = await self._to_process.get()
for handler in self._message_handlers[:]:
handler(msg)
await self._handle_internal_messages(msg)
async def _read_loop(self) -> None:
while True: