Remove limits on number of packets read/written per tick.

This addresses possible memory leaks or crashes caused by overflowing packet backlogs.
This commit is contained in:
joo 2017-10-10 04:36:50 +01:00
parent 88a5fdc637
commit af559e181a
1 changed files with 43 additions and 42 deletions

View File

@ -5,7 +5,6 @@ from threading import RLock
import zlib
import threading
import socket
import time
import timeit
import select
import sys
@ -366,7 +365,14 @@ class Connection(object):
self.handle_exception(exc, exc_info)
def _react(self, packet):
self.reactor.react(packet)
try:
for listener in self.early_packet_listeners:
listener.call_packet(packet)
self.reactor.react(packet)
for listener in self.packet_listeners:
listener.call_packet(packet)
except IgnorePacket:
pass
class NetworkingThread(threading.Thread):
@ -381,6 +387,11 @@ class NetworkingThread(threading.Thread):
def run(self):
try:
if self.previous_thread is not None:
if self.previous_thread.is_alive():
self.previous_thread.join()
self.previous_thread = None
self.connection.networking_thread = self
self._run()
except BaseException as e:
self.connection._handle_exception(e, sys.exc_info())
@ -388,57 +399,46 @@ class NetworkingThread(threading.Thread):
self.connection.networking_thread = None
def _run(self):
if self.previous_thread is not None:
if self.previous_thread.is_alive():
self.previous_thread.join()
self.previous_thread = None
self.connection.networking_thread = self
while not self.interrupt:
# Attempt to write out as many as 300 packets as possible every
# 0.05 seconds (20 ticks per second)
# Attempt to write out as many as 300 packets.
num_packets = 0
self.connection._write_lock.acquire()
try:
while self.connection._pop_packet():
num_packets += 1
if num_packets >= 300:
break
exc_info = None
except:
exc_info = sys.exc_info()
self.connection._write_lock.release()
with self.connection._write_lock:
try:
while not self.interrupt and self.connection._pop_packet():
num_packets += 1
if num_packets >= 300:
break
exc_info = None
except IOError:
exc_info = sys.exc_info()
# Read and react to as many as 50 packets
num_packets = 0
# If any packets remain to be written, resume writing as soon
# as possible after reading any available packets; otherwise,
# wait for up to 50ms (1 tick) for new packets to arrive.
if self.connection._outgoing_packet_queue:
read_timeout = 0
else:
read_timeout = 0.05
# Read and react to as many as 50 packets.
while num_packets < 50 and not self.interrupt:
packet = self.connection.reactor.read_packet(
self.connection.file_object)
self.connection.file_object, timeout=read_timeout)
if not packet:
break
num_packets += 1
self.connection._react(packet)
read_timeout = 0
# Do not raise an IOError if it occurred while a disconnect
# packet was received, as this may be part of an orderly
# disconnection.
if packet.packet_name == 'disconnect' and \
exc_info is not None and isinstance(exc_info[1], IOError):
# Ignore the earlier exception if a disconnect packet is
# received, as it may have been caused by trying to write to
# thw closed socket, which does not represent a program error.
if exc_info is not None and packet.name == 'disconnect':
exc_info = None
try:
for listener in self.connection.early_packet_listeners:
listener.call_packet(packet)
self.connection._react(packet)
for listener in self.connection.packet_listeners:
listener.call_packet(packet)
except IgnorePacket:
pass
if exc_info is not None:
raise_(*exc_info)
time.sleep(0.05)
class IgnorePacket(Exception):
"""
@ -455,7 +455,6 @@ class PacketReactor(object):
Reads and reacts to packets
"""
state_name = None
TIME_OUT = 0
# Handshaking is considered the "default" state
get_clientbound_packets = staticmethod(clientbound.handshake.get_packets)
@ -467,8 +466,10 @@ class PacketReactor(object):
packet.get_id(context): packet
for packet in self.__class__.get_clientbound_packets(context)}
def read_packet(self, stream):
ready_to_read = select.select([stream], [], [], self.TIME_OUT)[0]
def read_packet(self, stream, timeout=0):
# Block for up to `timeout' seconds waiting for `stream' to become
# readable, returning `None' if the timeout elapses.
ready_to_read = select.select([stream], [], [], timeout)[0]
if ready_to_read:
length = VarInt.read(stream)