Fix: reconnecting from an exception handler does not work.

This commit is contained in:
joo 2018-05-29 01:14:23 +01:00
parent c01f194d06
commit d36b652b69
2 changed files with 76 additions and 40 deletions

View File

@ -69,6 +69,13 @@ class LoginDisconnect(ConnectionFailure):
"""
class InvalidState(ConnectionFailure):
"""Raised by 'minecraft.networking.Connection' when a connection attempt
fails due to to the internal state of the Connection being unsuitable,
for example if there is an existing ongoing connection.
"""
class IgnorePacket(Exception):
"""This exception may be raised from within a packet handler, such as
`PacketReactor.react' or a packet listener added with

View File

@ -18,7 +18,9 @@ from .packets import clientbound, serverbound
from . import packets
from . import encryption
from .. import SUPPORTED_PROTOCOL_VERSIONS, SUPPORTED_MINECRAFT_VERSIONS
from ..exceptions import VersionMismatch, LoginDisconnect, IgnorePacket
from ..exceptions import (
VersionMismatch, LoginDisconnect, IgnorePacket, InvalidState
)
STATE_STATUS = 1
@ -103,6 +105,7 @@ class Connection(object):
self._write_lock = RLock()
self.networking_thread = None
self.new_networking_thread = None
self.packet_listeners = []
self.early_packet_listeners = []
self.outgoing_packet_listeners = []
@ -149,14 +152,21 @@ class Connection(object):
self.reactor = PacketReactor(self)
def _start_network_thread(self):
"""May safely be called multiple times."""
if self.networking_thread is None:
self.networking_thread = NetworkingThread(self)
self.networking_thread.start()
elif self.networking_thread.interrupt:
# This thread will wait until the previous thread exits, and then
# set `networking_thread' to itself.
NetworkingThread(self, previous=self.networking_thread).start()
with self._write_lock:
if self.networking_thread is not None and \
not self.networking_thread.interrupt or \
self.new_networking_thread is not None:
raise InvalidState('A networking thread is already running.')
elif self.networking_thread is None:
self.networking_thread = NetworkingThread(self)
self.networking_thread.start()
else:
# This thread will wait until the existing thread exits, and
# then set 'networking_thread' to itself and
# 'new_networking_thread' to None.
self.new_networking_thread \
= NetworkingThread(self, previous=self.networking_thread)
self.new_networking_thread.start()
def write_packet(self, packet, force=False):
"""Writes a packet to the server.
@ -253,24 +263,28 @@ class Connection(object):
which prints the latency to standard outout, or
False, to prevent measurement of the latency.
"""
self._connect()
self._handshake(next_state=STATE_STATUS)
self._start_network_thread()
with self._write_lock: # pylint: disable=not-context-manager
self._check_connection()
self.reactor = StatusReactor(self, do_ping=handle_ping is not False)
self._connect()
self._handshake(next_state=STATE_STATUS)
self._start_network_thread()
if handle_status is False:
self.reactor.handle_status = lambda *args, **kwds: None
elif handle_status is not None:
self.reactor.handle_status = handle_status
do_ping = handle_ping is not False
self.reactor = StatusReactor(self, do_ping=do_ping)
if handle_ping is False:
self.reactor.handle_ping = lambda *args, **kwds: None
elif handle_ping is not None:
self.reactor.handle_ping = handle_ping
if handle_status is False:
self.reactor.handle_status = lambda *args, **kwds: None
elif handle_status is not None:
self.reactor.handle_status = handle_status
request_packet = serverbound.status.RequestPacket()
self.write_packet(request_packet)
if handle_ping is False:
self.reactor.handle_ping = lambda *args, **kwds: None
elif handle_ping is not None:
self.reactor.handle_ping = handle_ping
request_packet = serverbound.status.RequestPacket()
self.write_packet(request_packet)
def connect(self):
"""
@ -280,6 +294,7 @@ class Connection(object):
# Hold the lock throughout, in case connect() is called from the
# networking thread while another connection is in progress.
with self._write_lock: # pylint: disable=not-context-manager
self._check_connection()
# It is important that this is set correctly even when connecting
# in status mode, as some servers, e.g. SpigotMC with the
@ -308,6 +323,12 @@ class Connection(object):
self.reactor = PlayingStatusReactor(self)
self._start_network_thread()
def _check_connection(self):
if self.networking_thread is not None and \
not self.networking_thread.interrupt or \
self.new_networking_thread is not None:
raise InvalidState('There is an existing connection.')
def _connect(self):
# Connect a socket to the server and create a file object from the
# socket.
@ -333,12 +354,14 @@ class Connection(object):
self.file_object = self.socket.makefile("rb", 0)
self.connected = True
def disconnect(self):
""" Terminate the existing server connection, if there is one. """
self.connected = False
def disconnect(self, immediate=False):
"""Terminate the existing server connection, if there is one.
If 'immediate' is True, do not attempt to write any packets.
"""
with self._write_lock: # pylint: disable=not-context-manager
if self.socket is not None:
self.connected = False
if not immediate and self.socket is not None:
# Flush any packets remaining in the queue.
while self._pop_packet():
pass
@ -346,14 +369,14 @@ class Connection(object):
if self.networking_thread is not None:
self.networking_thread.interrupt = True
if self.socket is not None:
try:
self.socket.shutdown(socket.SHUT_RDWR)
except socket.error:
pass
finally:
self.socket.close()
self.socket = None
if self.socket is not None:
try:
self.socket.shutdown(socket.SHUT_RDWR)
except socket.error:
pass
finally:
self.socket.close()
self.socket = None
def _handshake(self, next_state=STATE_PLAYING):
handshake = serverbound.handshake.HandShakePacket()
@ -380,7 +403,9 @@ class Connection(object):
pass
self.exception, self.exc_info = exc, exc_info
self.disconnect()
with self._write_lock:
if self.networking_thread and not self.networking_thread.interrupt:
self.disconnect(immediate=True)
if handle_exception is None:
raise_(*exc_info)
@ -429,14 +454,17 @@ class NetworkingThread(threading.Thread):
if self.previous_thread is not None:
if self.previous_thread.is_alive():
self.previous_thread.join()
self.previous_thread = None
self.connection.networking_thread = self
with self.connection._write_lock:
self.connection.networking_thread = self
self.connection.new_networking_thread = None
self._run()
self.connection._handle_exit()
except BaseException as e:
self.interrupt = True
self.connection._handle_exception(e, sys.exc_info())
finally:
self.connection.networking_thread = None
with self.connection._write_lock:
self.connection.networking_thread = None
def _run(self):
while not self.interrupt:
@ -705,5 +733,6 @@ class PlayingStatusReactor(StatusReactor):
if isinstance(exc, EOFError):
# An exception of this type may indicate that the server does not
# properly support status queries, so we treat it as non-fatal.
self.connection.disconnect(immediate=True)
self.handle_failure()
return True