Reduce overhead to send and process a complex message (#480)

This commit is contained in:
J. Nick Koston 2023-07-15 10:12:29 -10:00 committed by GitHub
parent f8ffa6ae83
commit 24cddc22a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -598,6 +598,22 @@ class APIConnection:
return
fut.set_exception(asyncio.TimeoutError())
def _handle_complex_message(
self,
fut: asyncio.Future[None],
responses: List[message.Message],
do_append: Optional[Callable[[message.Message], bool]],
do_stop: Optional[Callable[[message.Message], bool]],
resp: message.Message,
) -> None:
"""Handle a message that is part of a response."""
if fut.done():
return
if do_append is None or do_append(resp):
responses.append(resp)
if do_stop is None or do_stop(resp):
fut.set_result(None)
async def send_message_await_response_complex(
self,
send_msg: message.Message,
@ -613,7 +629,7 @@ class APIConnection:
:param do_stop: Predicate to check if a received message is the stop response.
:param timeout: The maximum amount of time to wait for the stop response.
:raises TimeoutAPIError: if a timeout occured
:raises TimeoutAPIError: if a timeout occurred
"""
# Send the message right away to reduce latency.
# This is safe because we are not awaiting between
@ -622,15 +638,10 @@ class APIConnection:
self.send_message(send_msg)
# Unsafe to await between sending the message and registering the handler
fut: asyncio.Future[None] = self._loop.create_future()
responses = []
def on_message(resp: message.Message) -> None:
if fut.done():
return
if do_append is None or do_append(resp):
responses.append(resp)
if do_stop is None or do_stop(resp):
fut.set_result(None)
responses: List[message.Message] = []
on_message = partial(
self._handle_complex_message, fut, responses, do_append, do_stop
)
message_handlers = self._message_handlers
read_exception_futures = self._read_exception_futures