From d36b652b697a6838e79b3e278297f718a9be4b14 Mon Sep 17 00:00:00 2001 From: joo Date: Tue, 29 May 2018 01:14:23 +0100 Subject: [PATCH] Fix: reconnecting from an exception handler does not work. --- minecraft/exceptions.py | 7 ++ minecraft/networking/connection.py | 109 ++++++++++++++++++----------- 2 files changed, 76 insertions(+), 40 deletions(-) diff --git a/minecraft/exceptions.py b/minecraft/exceptions.py index 88ecd6a..07e6043 100644 --- a/minecraft/exceptions.py +++ b/minecraft/exceptions.py @@ -69,6 +69,13 @@ class LoginDisconnect(ConnectionFailure): """ +class InvalidState(ConnectionFailure): + """Raised by 'minecraft.networking.Connection' when a connection attempt + fails due to to the internal state of the Connection being unsuitable, + for example if there is an existing ongoing connection. + """ + + class IgnorePacket(Exception): """This exception may be raised from within a packet handler, such as `PacketReactor.react' or a packet listener added with diff --git a/minecraft/networking/connection.py b/minecraft/networking/connection.py index 522c1ab..39fa4a0 100644 --- a/minecraft/networking/connection.py +++ b/minecraft/networking/connection.py @@ -18,7 +18,9 @@ from .packets import clientbound, serverbound from . import packets from . import encryption from .. import SUPPORTED_PROTOCOL_VERSIONS, SUPPORTED_MINECRAFT_VERSIONS -from ..exceptions import VersionMismatch, LoginDisconnect, IgnorePacket +from ..exceptions import ( + VersionMismatch, LoginDisconnect, IgnorePacket, InvalidState +) STATE_STATUS = 1 @@ -103,6 +105,7 @@ class Connection(object): self._write_lock = RLock() self.networking_thread = None + self.new_networking_thread = None self.packet_listeners = [] self.early_packet_listeners = [] self.outgoing_packet_listeners = [] @@ -149,14 +152,21 @@ class Connection(object): self.reactor = PacketReactor(self) def _start_network_thread(self): - """May safely be called multiple times.""" - if self.networking_thread is None: - self.networking_thread = NetworkingThread(self) - self.networking_thread.start() - elif self.networking_thread.interrupt: - # This thread will wait until the previous thread exits, and then - # set `networking_thread' to itself. - NetworkingThread(self, previous=self.networking_thread).start() + with self._write_lock: + if self.networking_thread is not None and \ + not self.networking_thread.interrupt or \ + self.new_networking_thread is not None: + raise InvalidState('A networking thread is already running.') + elif self.networking_thread is None: + self.networking_thread = NetworkingThread(self) + self.networking_thread.start() + else: + # This thread will wait until the existing thread exits, and + # then set 'networking_thread' to itself and + # 'new_networking_thread' to None. + self.new_networking_thread \ + = NetworkingThread(self, previous=self.networking_thread) + self.new_networking_thread.start() def write_packet(self, packet, force=False): """Writes a packet to the server. @@ -253,24 +263,28 @@ class Connection(object): which prints the latency to standard outout, or False, to prevent measurement of the latency. """ - self._connect() - self._handshake(next_state=STATE_STATUS) - self._start_network_thread() + with self._write_lock: # pylint: disable=not-context-manager + self._check_connection() - self.reactor = StatusReactor(self, do_ping=handle_ping is not False) + self._connect() + self._handshake(next_state=STATE_STATUS) + self._start_network_thread() - if handle_status is False: - self.reactor.handle_status = lambda *args, **kwds: None - elif handle_status is not None: - self.reactor.handle_status = handle_status + do_ping = handle_ping is not False + self.reactor = StatusReactor(self, do_ping=do_ping) - if handle_ping is False: - self.reactor.handle_ping = lambda *args, **kwds: None - elif handle_ping is not None: - self.reactor.handle_ping = handle_ping + if handle_status is False: + self.reactor.handle_status = lambda *args, **kwds: None + elif handle_status is not None: + self.reactor.handle_status = handle_status - request_packet = serverbound.status.RequestPacket() - self.write_packet(request_packet) + if handle_ping is False: + self.reactor.handle_ping = lambda *args, **kwds: None + elif handle_ping is not None: + self.reactor.handle_ping = handle_ping + + request_packet = serverbound.status.RequestPacket() + self.write_packet(request_packet) def connect(self): """ @@ -280,6 +294,7 @@ class Connection(object): # Hold the lock throughout, in case connect() is called from the # networking thread while another connection is in progress. with self._write_lock: # pylint: disable=not-context-manager + self._check_connection() # It is important that this is set correctly even when connecting # in status mode, as some servers, e.g. SpigotMC with the @@ -308,6 +323,12 @@ class Connection(object): self.reactor = PlayingStatusReactor(self) self._start_network_thread() + def _check_connection(self): + if self.networking_thread is not None and \ + not self.networking_thread.interrupt or \ + self.new_networking_thread is not None: + raise InvalidState('There is an existing connection.') + def _connect(self): # Connect a socket to the server and create a file object from the # socket. @@ -333,12 +354,14 @@ class Connection(object): self.file_object = self.socket.makefile("rb", 0) self.connected = True - def disconnect(self): - """ Terminate the existing server connection, if there is one. """ - self.connected = False - + def disconnect(self, immediate=False): + """Terminate the existing server connection, if there is one. + If 'immediate' is True, do not attempt to write any packets. + """ with self._write_lock: # pylint: disable=not-context-manager - if self.socket is not None: + self.connected = False + + if not immediate and self.socket is not None: # Flush any packets remaining in the queue. while self._pop_packet(): pass @@ -346,14 +369,14 @@ class Connection(object): if self.networking_thread is not None: self.networking_thread.interrupt = True - if self.socket is not None: - try: - self.socket.shutdown(socket.SHUT_RDWR) - except socket.error: - pass - finally: - self.socket.close() - self.socket = None + if self.socket is not None: + try: + self.socket.shutdown(socket.SHUT_RDWR) + except socket.error: + pass + finally: + self.socket.close() + self.socket = None def _handshake(self, next_state=STATE_PLAYING): handshake = serverbound.handshake.HandShakePacket() @@ -380,7 +403,9 @@ class Connection(object): pass self.exception, self.exc_info = exc, exc_info - self.disconnect() + with self._write_lock: + if self.networking_thread and not self.networking_thread.interrupt: + self.disconnect(immediate=True) if handle_exception is None: raise_(*exc_info) @@ -429,14 +454,17 @@ class NetworkingThread(threading.Thread): if self.previous_thread is not None: if self.previous_thread.is_alive(): self.previous_thread.join() - self.previous_thread = None - self.connection.networking_thread = self + with self.connection._write_lock: + self.connection.networking_thread = self + self.connection.new_networking_thread = None self._run() self.connection._handle_exit() except BaseException as e: + self.interrupt = True self.connection._handle_exception(e, sys.exc_info()) finally: - self.connection.networking_thread = None + with self.connection._write_lock: + self.connection.networking_thread = None def _run(self): while not self.interrupt: @@ -705,5 +733,6 @@ class PlayingStatusReactor(StatusReactor): if isinstance(exc, EOFError): # An exception of this type may indicate that the server does not # properly support status queries, so we treat it as non-fatal. + self.connection.disconnect(immediate=True) self.handle_failure() return True