Move message parsing out of the read loop (#323)

This commit is contained in:
J. Nick Koston 2022-11-30 12:42:15 -10:00 committed by GitHub
parent 585d4cb569
commit a452e738ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -105,7 +105,7 @@ class APIConnection:
self._ping_stop_event = asyncio.Event() self._ping_stop_event = asyncio.Event()
self._to_process: asyncio.Queue[message.Message] = asyncio.Queue() self._to_process: asyncio.Queue[Packet] = asyncio.Queue()
self._process_task: Optional[asyncio.Task[None]] = None self._process_task: Optional[asyncio.Task[None]] = None
@ -511,24 +511,6 @@ class APIConnection:
handler(err) handler(err)
await self._cleanup() await self._cleanup()
async def _read_once(self) -> None:
assert self._frame_helper is not None
pkt = await self._frame_helper.read_packet()
msg_type = pkt.type
raw_msg = pkt.data
if msg_type not in MESSAGE_TYPE_TO_PROTO:
_LOGGER.debug("%s: Skipping message type %s", self.log_name, msg_type)
return
msg = MESSAGE_TYPE_TO_PROTO[msg_type]()
try:
msg.ParseFromString(raw_msg)
except Exception as e:
raise ProtocolAPIError(f"Invalid protobuf message: {e}") from e
_LOGGER.debug("%s: Got message of type %s: %s", self.log_name, type(msg), msg)
self._to_process.put_nowait(msg)
async def _process_loop(self) -> None: async def _process_loop(self) -> None:
while True: while True:
if not self._is_socket_open: if not self._is_socket_open:
@ -536,21 +518,40 @@ class APIConnection:
break break
try: try:
msg = await self._to_process.get() pkt = await self._to_process.get()
except RuntimeError: except RuntimeError:
break break
msg_type = pkt.type
raw_msg = pkt.data
if msg_type not in MESSAGE_TYPE_TO_PROTO:
_LOGGER.debug("%s: Skipping message type %s", self.log_name, msg_type)
continue
msg = MESSAGE_TYPE_TO_PROTO[msg_type]()
try:
msg.ParseFromString(raw_msg)
except Exception as e:
await self._report_fatal_error(
ProtocolAPIError(f"Invalid protobuf message: {e}")
)
raise
_LOGGER.debug(
"%s: Got message of type %s: %s", self.log_name, type(msg), msg
)
for handler in self._message_handlers[:]: for handler in self._message_handlers[:]:
handler(msg) handler(msg)
await self._handle_internal_messages(msg) await self._handle_internal_messages(msg)
async def _read_loop(self) -> None: async def _read_loop(self) -> None:
assert self._frame_helper is not None
try:
while True: while True:
if not self._is_socket_open: if not self._is_socket_open:
# Socket closed but task isn't cancelled yet # Socket closed but task isn't cancelled yet
break break
try: self._to_process.put_nowait(await self._frame_helper.read_packet())
await self._read_once()
except SocketClosedAPIError as err: except SocketClosedAPIError as err:
# don't log with info, if closed the site that closed the connection should log # don't log with info, if closed the site that closed the connection should log
_LOGGER.debug( _LOGGER.debug(
@ -558,7 +559,6 @@ class APIConnection:
self.log_name, self.log_name,
) )
await self._report_fatal_error(err) await self._report_fatal_error(err)
break
except APIConnectionError as err: except APIConnectionError as err:
_LOGGER.info( _LOGGER.info(
"%s: Error while reading incoming messages: %s", "%s: Error while reading incoming messages: %s",
@ -566,7 +566,6 @@ class APIConnection:
err, err,
) )
await self._report_fatal_error(err) await self._report_fatal_error(err)
break
except Exception as err: # pylint: disable=broad-except except Exception as err: # pylint: disable=broad-except
_LOGGER.warning( _LOGGER.warning(
"%s: Unexpected error while reading incoming messages: %s", "%s: Unexpected error while reading incoming messages: %s",
@ -575,7 +574,6 @@ class APIConnection:
exc_info=True, exc_info=True,
) )
await self._report_fatal_error(err) await self._report_fatal_error(err)
break
async def _handle_internal_messages(self, msg: Any) -> None: async def _handle_internal_messages(self, msg: Any) -> None:
if isinstance(msg, DisconnectRequest): if isinstance(msg, DisconnectRequest):