Make FakeServer class more reusable, and extract it into its own module.

This commit is contained in:
joo 2017-08-22 14:06:17 +01:00
parent f1d04e6610
commit 9497aae8fa
3 changed files with 502 additions and 334 deletions

View File

@ -352,6 +352,8 @@ class Connection(object):
if self.reactor.handle_exception(exc, exc_info):
return
self.disconnect()
self.exception, self.exc_info = exc, exc_info
if self.handle_exception is None:
raise_(*exc_info)

471
tests/fake_server.py Normal file
View File

@ -0,0 +1,471 @@
from __future__ import print_function
from minecraft import SUPPORTED_MINECRAFT_VERSIONS
from minecraft.networking import connection
from minecraft.networking import types
from minecraft.networking import packets
from minecraft.networking.packets import clientbound
from minecraft.networking.packets import serverbound
from future.utils import raise_
import unittest
import threading
import logging
import socket
import json
import sys
import zlib
import hashlib
import uuid
VERSIONS = sorted(SUPPORTED_MINECRAFT_VERSIONS.items(), key=lambda i: i[1])
VERSIONS = [v for (v, p) in VERSIONS]
THREAD_TIMEOUT_S = 5
class FakeClientDisconnect(Exception):
""" Raised by 'FakeClientHandler.read_packet' if the client has cleanly
disconnected prior to the call.
"""
class FakeServerDisconnect(Exception):
""" May be raised within 'FakeClientHandler.handle_*' in order to terminate
the client's connection. 'message' is provided as an argument to
'handle_play_server_disconnect'.
"""
def __init__(self, message=None):
self.message = message
class FakeServerTestSuccess(Exception):
""" May be raised from within 'FakeClientHandler.handle_*' or from a
'Connection' packet listener in order to terminate a 'FakeServerTest'
successfully.
"""
class FakeClientHandler(object):
""" Represents a single client connection being handled by a 'FakeServer'.
The methods of the form 'handle_*' may be overridden by subclasses to
customise the behaviour of the server.
"""
__slots__ = 'server', 'socket', 'socket_file', 'packets', \
'compression_enabled', 'user_uuid', 'user_name'
def __init__(self, server, socket):
self.server = server
self.socket = socket
self.socket_file = socket.makefile('rb', 0)
self.compression_enabled = False
self.user_uuid = None
self.user_name = None
def run(self):
# Communicate with the client until disconnected.
try:
self._run_handshake()
self.socket.shutdown(socket.SHUT_RDWR)
finally:
self.socket.close()
self.socket_file.close()
def handle_play_start(self):
# Called upon entering the play state.
self.write_packet(clientbound.play.JoinGamePacket(
entity_id=0, game_mode=0, dimension=0, difficulty=2, max_players=1,
level_type='default', reduced_debug_info=False))
raise FakeServerDisconnect
def handle_play_packet(self, packet):
# Called upon each packet received after handle_play_start() returns.
if isinstance(packet, serverbound.play.ChatPacket):
assert len(packet.message) <= packet.max_length
self.write_packet(clientbound.play.ChatMessagePacket(json.dumps({
'translate': 'chat.type.text',
'with': [self.username, packet.message],
})))
def handle_play_client_disconnect(self):
# Called when the client cleanly terminates the connection during play.
pass
def handle_play_server_disconnect(self, message=None):
# Called when the server cleanly terminates the connection during play,
# i.e. by raising FakeServerDisconnect from a handler.
message = 'Disconnected.' if message is None else message
self.write_packet(clientbound.play.DisconnectPacket(
json_data=json.dumps({'text': message})))
def write_packet(self, packet):
# Send and log a clientbound packet.
packet.context = self.server.context
logging.debug('[S-> ] %s' % packet)
packet.write(self.socket, **(
{'compression_threshold': self.server.compression_threshold}
if self.compression_enabled else {}))
def read_packet(self):
# Read and log a serverbound packet from the client, or raises
# FakeClientDisconnect if the client has cleanly disconnected.
buffer = self._read_packet_buffer()
packet_id = types.VarInt.read(buffer)
if packet_id in self.packets:
packet = self.packets[packet_id](self.server.context)
packet.read(buffer)
else:
packet = packets.Packet(self.server.context, id=packet_id)
logging.debug('[ ->S] %s' % packet)
return packet
def _run_handshake(self):
# Enter the initial (i.e. handshaking) state of the connection.
self.packets = self.server.packets_handshake
packet = self.read_packet()
assert isinstance(packet, serverbound.handshake.HandShakePacket)
if packet.next_state == 1:
self._run_status()
elif packet.next_state == 2:
self._run_handshake_play(packet)
else:
raise AssertionError('Unknown state: %s' % packet.next_state)
def _run_handshake_play(self, packet):
# Prepare to transition from handshaking to play state (via login),
# using the given serverbound HandShakePacket to perform play-specific
# processing.
if packet.protocol_version == self.server.context.protocol_version:
self._run_login()
else:
if packet.protocol_version < self.server.context.protocol_version:
msg = 'Outdated client! Please use %s' \
% self.server.minecraft_version
else:
msg = "Outdated server! I'm still on %s" \
% self.server.minecraft_version
self.write_packet(clientbound.login.DisconnectPacket(
json_data=json.dumps({'text': msg})))
def _run_login(self):
# Enter the login state of the connection.
self.packets = self.server.packets_login
packet = self.read_packet()
assert isinstance(packet, serverbound.login.LoginStartPacket)
if self.server.compression_threshold is not None:
self.write_packet(clientbound.login.SetCompressionPacket(
threshold=self.server.compression_threshold))
self.compression_enabled = True
self.user_name = packet.name
self.user_uuid = uuid.UUID(bytes=hashlib.md5(
('OfflinePlayer:%s' % self.user_name).encode('utf8')).digest())
self.write_packet(clientbound.login.LoginSuccessPacket(
UUID=str(self.user_uuid), Username=self.user_name))
self._run_playing()
def _run_playing(self):
# Enter the playing state of the connection.
self.packets = self.server.packets_playing
client_disconnected = False
try:
self.handle_play_start()
try:
while True:
self.handle_play_packet(self.read_packet())
except FakeClientDisconnect:
client_disconnected = True
self.handle_play_client_disconnect()
except FakeServerDisconnect as e:
if not client_disconnected:
self.handle_play_server_disconnect(message=e.message)
def _run_status(self):
# Enter the status state of the connection.
self.packets = self.server.packets_status
packet = self.read_packet()
assert isinstance(packet, serverbound.status.RequestPacket)
packet = clientbound.status.ResponsePacket()
packet.json_response = json.dumps({
'version': {
'name': self.server.minecraft_version,
'protocol': self.server.context.protocol_version},
'players': {
'max': 1,
'online': 0,
'sample': []},
'description': {
'text': 'FakeServer'}})
self.write_packet(packet)
try:
packet = self.read_packet()
except FakeClientDisconnect:
return
assert isinstance(packet, serverbound.status.PingPacket)
self.write_packet(clientbound.status.PingResponsePacket(
time=packet.time))
def _read_packet_buffer(self):
# Read a serverbound packet in the form of a raw buffer, or raises
# FakeClientDisconnect if the client has cleanly disconnected.
try:
length = types.VarInt.read(self.socket_file)
except EOFError:
raise FakeClientDisconnect
buffer = packets.PacketBuffer()
while len(buffer.get_writable()) < length:
data = self.socket_file.read(length - len(buffer.get_writable()))
buffer.send(data)
buffer.reset_cursor()
if self.compression_enabled:
data_length = types.VarInt.read(buffer)
if data_length > 0:
data = zlib.decompress(buffer.read())
assert len(data) == data_length, \
'%s != %s' % (len(data), data_length)
buffer.reset()
buffer.send(data)
buffer.reset_cursor()
return buffer
class FakeServer(object):
"""
A rudimentary implementation of a Minecraft server, suitable for
testing features of minecraft.networking.connection.Connection that
require a full connection to be established.
The server listens on a local TCP socket and accepts client connections
in serial, in a single-threaded manner. It responds to status queries,
performs handshake and login, and, by default, immediately cleanly
disconnects the client after they join the game.
The behaviour of the server can be customised by writing subclasses of
FakeClientHandler, overriding its public methods of the form
'handle_*', and providing the class to the FakeServer as its
'client_handler_type'.
"""
__slots__ = 'listen_socket', 'compression_threshold', 'context', \
'minecraft_version', 'client_handler_type', \
'packets_handshake', 'packets_login', 'packets_playing', \
'packets_status', 'lock', 'stopping'
def __init__(self, minecraft_version=None, compression_threshold=None,
client_handler_type=FakeClientHandler):
if minecraft_version is None:
minecraft_version = VERSIONS[-1][0]
self.minecraft_version = minecraft_version
self.compression_threshold = compression_threshold
self.client_handler_type = client_handler_type
protocol_version = SUPPORTED_MINECRAFT_VERSIONS[minecraft_version]
self.context = connection.ConnectionContext(
protocol_version=protocol_version)
self.packets_handshake = {
p.get_id(self.context): p for p in
serverbound.handshake.get_packets(self.context)}
self.packets_login = {
p.get_id(self.context): p for p in
serverbound.login.get_packets(self.context)}
self.packets_playing = {
p.get_id(self.context): p for p in
serverbound.play.get_packets(self.context)}
self.packets_status = {
p.get_id(self.context): p for p in
serverbound.status.get_packets(self.context)}
self.listen_socket = socket.socket()
self.listen_socket.settimeout(0.1)
self.listen_socket.bind(('localhost', 0))
self.listen_socket.listen(0)
self.lock = threading.Lock()
self.stopping = False
super(FakeServer, self).__init__()
def run(self):
try:
while True:
try:
client_socket, addr = self.listen_socket.accept()
logging.debug('[ ++ ] Client %s connected.' % (addr,))
self.client_handler_type(self, client_socket).run()
logging.debug('[ -- ] Client %s disconnected.' % (addr,))
except socket.timeout:
pass
with self.lock:
if self.stopping:
logging.debug('[ ** ] Server stopped normally.')
break
finally:
self.listen_socket.close()
def stop(self):
with self.lock:
self.stopping = True
class _FakeServerTest(unittest.TestCase):
"""
A template for test cases involving a single client connecting to a
single 'FakeServer'. The default behaviour causes the client to connect
to the server, join the game, then disconnect, considering it a success
if a 'JoinGamePacket' is received before a 'DisconnectPacket'.
Customise by making subclasses that:
1. Overrides the attributes present in this class, where desired, so
that they will apply to all tests; and/or
2. Define tests (or override 'runTest') to call '_test_connect' with
the arguments specified as necessary to override class attributes.
3. Overrides '_start_client' in order to set event listeners and
change the connection mode, if necessary.
To terminate the test and indicate that it finished successfully, a
client packet handler or a handler method of the 'FakeClientHandler'
must raise a 'FakeServerTestSuccess' exception.
"""
server_version = VERSIONS[-1]
# The Minecraft version name that the server will support.
client_versions = None
# The set of Minecraft version names or protocol version numbers that the
# client will support. If None, the client supports all possible versions.
client_handler_type = FakeClientHandler
# A subclass of FakeClientHandler to be used in tests.
compression_threshold = None
# The compression threshold that the server will dictate.
# If None, compression is disabled.
def _start_client(self, client):
game_joined = [False]
def handle_join_game(packet):
game_joined[0] = True
client.register_packet_listener(
handle_join_game, clientbound.play.JoinGamePacket)
def handle_disconnect(packet):
assert game_joined[0], 'JoinGamePacket not received.'
raise FakeServerTestSuccess
client.register_packet_listener(
handle_disconnect, clientbound.play.DisconnectPacket)
client.connect()
def _test_connect(self, client_versions=None, server_version=None,
client_handler_type=None, compression_threshold=None):
if client_versions is None:
client_versions = self.client_versions
if server_version is None:
server_version = self.server_version
if compression_threshold is None:
compression_threshold = self.compression_threshold
if client_handler_type is None:
client_handler_type = self.client_handler_type
server = FakeServer(minecraft_version=server_version,
compression_threshold=compression_threshold)
addr = "localhost"
port = server.listen_socket.getsockname()[1]
cond = threading.Condition()
server_lock = threading.Lock()
server_exc_info = [None]
client_lock = threading.Lock()
client_exc_info = [None]
def handle_client_exception(exc, exc_info):
with client_lock:
client_exc_info[0] = exc_info
with cond:
cond.notify_all()
client = connection.Connection(
addr, port, username='TestUser', allowed_versions=client_versions,
handle_exception=handle_client_exception)
client.register_packet_listener(
lambda packet: logging.debug('[ ->C] %s' % packet),
packets.Packet, early=True)
client.register_packet_listener(
lambda packet: logging.debug('[C-> ] %s' % packet),
packets.Packet, early=True, outgoing=True)
server_thread = threading.Thread(
name='FakeServer',
target=self._test_connect_server,
args=(server, cond, server_lock, server_exc_info))
server_thread.daemon = True
errors = []
try:
try:
with cond:
server_thread.start()
self._start_client(client)
cond.wait(THREAD_TIMEOUT_S)
finally:
# Wait for all threads to exit.
server.stop()
for thread in server_thread, client.networking_thread:
if thread is not None and thread.is_alive():
thread.join(THREAD_TIMEOUT_S)
if thread is not None and thread.is_alive():
errors.append({
'msg': 'Thread "%s" timed out.' % thread.name})
except:
errors.insert(0, sys.exc_info())
else:
timeout = True
for lock, [exc_info], thread_name in (
(client_lock, client_exc_info, 'client thread'),
(server_lock, server_exc_info, 'server thread')
):
with lock:
if exc_info is None:
continue
if not issubclass(exc_info[0], FakeServerTestSuccess):
errors.insert(0, {
'msg': 'Exception in %s:' % thread_name,
'exc_info': exc_info})
timeout = False
if timeout:
errors.insert(0, {'msg': 'Timed out.'})
if len(errors) > 1:
for error in errors:
logging.error(**error)
self.fail('Multiple errors: see logging output.')
elif errors and 'exc_info' in errors[0]:
raise_(*errors[0]['exc_info'])
elif errors:
self.fail(errors[0]['msg'])
def _test_connect_server(self, server, cond, server_lock, server_exc_info):
exc_info = None
try:
server.run()
except:
exc_info = sys.exc_info()
with server_lock:
server_exc_info[0] = exc_info
with cond:
cond.notify_all()

View File

@ -1,349 +1,44 @@
from __future__ import print_function
from . import fake_server
from minecraft import SUPPORTED_MINECRAFT_VERSIONS
from minecraft.networking import connection
from minecraft.networking import types
from minecraft.networking import packets
from minecraft.networking.packets import clientbound
from minecraft.networking.packets import serverbound
from future.utils import raise_
import unittest
import threading
import logging
import socket
import json
import sys
import zlib
VERSIONS = sorted(SUPPORTED_MINECRAFT_VERSIONS.items(), key=lambda i: i[1])
THREAD_TIMEOUT_S = 5
class _ConnectTest(unittest.TestCase):
compression_threshold = None
def _test_connect(self, client_version=None, server_version=None):
server = FakeServer(minecraft_version=server_version,
compression_threshold=self.compression_threshold)
addr = "localhost"
port = server.listen_socket.getsockname()[1]
cond = threading.Condition()
def handle_client_exception(exc, exc_info):
with cond:
cond.exc_info = exc_info
cond.notify_all()
client = connection.Connection(
addr, port, username='User', initial_version=client_version,
handle_exception=handle_client_exception)
client.register_packet_listener(
lambda packet: logging.debug('[ ->C] %s' % packet), early=True)
client.register_packet_listener(
lambda packet: logging.debug('[C-> ] %s' % packet), outgoing=True)
try:
with cond:
server_thread = threading.Thread(
name='_ConnectTest server',
target=self._test_connect_server,
args=(server, cond))
server_thread.daemon = True
server_thread.start()
self._test_connect_client(client, cond)
cond.exc_info = Ellipsis
cond.wait(THREAD_TIMEOUT_S)
if cond.exc_info is Ellipsis:
self.fail('Timed out.')
elif cond.exc_info is not None:
raise_(*cond.exc_info)
finally:
# Wait for all threads to exit.
for thread in server_thread, client.networking_thread:
if thread is not None and thread.is_alive():
thread.join(THREAD_TIMEOUT_S)
if thread is not None and thread.is_alive():
if cond.exc_info is None:
self.fail('Thread "%s" timed out.' % thread.name)
else:
# Keep the earlier exception, if there is one.
break
def _test_connect_client(self, client, cond):
client.connect()
def _test_connect_server(self, server, cond):
try:
server.run()
exc_info = None
except:
exc_info = sys.exc_info()
with cond:
cond.exc_info = exc_info
cond.notify_all()
class ConnectTest(fake_server._FakeServerTest):
def test_connect(self):
self._test_connect()
class ConnectOldToOldTest(_ConnectTest):
def runTest(self):
self._test_connect(VERSIONS[0][1], VERSIONS[0][0])
class PingTest(ConnectTest):
def _start_client(self, client):
def handle_ping(latency_ms):
assert 0 <= latency_ms < 60000
raise fake_server.FakeServerTestSuccess
client.status(handle_status=False, handle_ping=handle_ping)
class ConnectOldToNewTest(_ConnectTest):
def runTest(self):
self._test_connect(VERSIONS[0][1], VERSIONS[-1][0])
class ConnectNewToOldTest(_ConnectTest):
def runTest(self):
self._test_connect(VERSIONS[-1][1], VERSIONS[0][0])
class ConnectNewToNewTest(_ConnectTest):
def runTest(self):
self._test_connect(VERSIONS[-1][1], VERSIONS[-1][0])
class ConnectCompressionLowTest(ConnectNewToNewTest):
class ConnectCompressionLowTest(ConnectTest):
compression_threshold = 0
class ConnectCompressionHighTest(ConnectNewToNewTest):
class ConnectCompressionHighTest(ConnectTest):
compression_threshold = 256
class PingTest(_ConnectTest):
def runTest(self):
self._test_connect()
class AllowedVersionsTest(fake_server._FakeServerTest):
VERSIONS = sorted(SUPPORTED_MINECRAFT_VERSIONS.items(), key=lambda p: p[1])
VERSIONS = dict((VERSIONS[0], VERSIONS[len(VERSIONS)//2], VERSIONS[-1]))
def _test_connect_client(self, client, cond):
def handle_ping(latency_ms):
assert 0 <= latency_ms < 60000
with cond:
cond.exc_info = None
cond.notify_all()
client.status(handle_status=False, handle_ping=handle_ping)
def test_with_version_names(self):
for version, proto in AllowedVersionsTest.VERSIONS.items():
client_versions = {
v for (v, p) in SUPPORTED_MINECRAFT_VERSIONS.items()
if p <= proto}
self._test_connect(
server_version=version, client_versions=client_versions)
def _test_connect_server(self, server, cond):
try:
server.continue_after_status = False
server.run()
except:
with cond:
cond.exc_info = sys.exc_info()
cond.notify_all()
class FakeServer(threading.Thread):
__slots__ = 'context', 'minecraft_version', 'listen_socket', \
'compression_threshold', 'compression_enabled', \
'packets_login', 'packets_playing', 'packets_status', \
'packets'
def __init__(self, minecraft_version=None, continue_after_status=True,
compression_threshold=None):
if minecraft_version is None:
minecraft_version = VERSIONS[-1][0]
self.minecraft_version = minecraft_version
self.continue_after_status = continue_after_status
self.compression_threshold = compression_threshold
protocol_version = SUPPORTED_MINECRAFT_VERSIONS[minecraft_version]
self.context = connection.ConnectionContext(
protocol_version=protocol_version)
self.compression_enabled = False
self.packets_handshake = {
p.get_id(self.context): p for p in
serverbound.handshake.get_packets(self.context)}
self.packets_login = {
p.get_id(self.context): p for p in
serverbound.login.get_packets(self.context)}
self.packets_playing = {
p.get_id(self.context): p for p in
serverbound.play.get_packets(self.context)}
self.packets_status = {
p.get_id(self.context): p for p in
serverbound.status.get_packets(self.context)}
self.listen_socket = socket.socket()
self.listen_socket.bind(('0.0.0.0', 0))
self.listen_socket.listen(0)
super(FakeServer, self).__init__()
def run(self):
try:
self.run_accept()
finally:
self.listen_socket.close()
def run_accept(self):
running = True
while running:
client_socket, addr = self.listen_socket.accept()
logging.debug('[ ++ ] Client %s connected to server.' % (addr,))
client_file = client_socket.makefile('rb', 0)
try:
running = self.run_handshake(client_socket, client_file)
except:
raise
else:
client_socket.shutdown(socket.SHUT_RDWR)
logging.debug('[ -- ] Client %s disconnected.' % (addr,))
finally:
client_socket.close()
client_file.close()
def run_handshake(self, client_socket, client_file):
self.packets = self.packets_handshake
packet = self.read_packet_filtered(client_file)
assert isinstance(packet, serverbound.handshake.HandShakePacket)
if packet.next_state == 1:
return self.run_handshake_status(
packet, client_socket, client_file)
elif packet.next_state == 2:
return self.run_handshake_play(
packet, client_socket, client_file)
else:
raise AssertionError('Unknown state: %s' % packet.next_state)
def run_handshake_status(self, packet, client_socket, client_file):
self.run_status(client_socket, client_file)
return self.continue_after_status
def run_handshake_play(self, packet, client_socket, client_file):
if packet.protocol_version == self.context.protocol_version:
self.run_login(client_socket, client_file)
else:
if packet.protocol_version < self.context.protocol_version:
msg = 'Outdated client! Please use %s' \
% self.minecraft_version
else:
msg = "Outdated server! I'm still on %s" \
% self.minecraft_version
packet = clientbound.login.DisconnectPacket(
self.context, json_data=json.dumps({'text': msg}))
self.write_packet(packet, client_socket)
return True
def run_login(self, client_socket, client_file):
self.packets = self.packets_login
packet = self.read_packet_filtered(client_file)
assert isinstance(packet, serverbound.login.LoginStartPacket)
if self.compression_threshold is not None:
self.write_packet(clientbound.login.SetCompressionPacket(
self.context, threshold=self.compression_threshold),
client_socket)
self.compression_enabled = True
packet = clientbound.login.LoginSuccessPacket(
self.context, UUID='{fake uuid}', Username=packet.name)
self.write_packet(packet, client_socket)
self.run_playing(client_socket, client_file)
def run_playing(self, client_socket, client_file):
self.packets = self.packets_playing
packet = clientbound.play.JoinGamePacket(
self.context, entity_id=0, game_mode=0, dimension=0, difficulty=2,
max_players=1, level_type='default', reduced_debug_info=False)
self.write_packet(packet, client_socket)
keep_alive_id = 1076048782
packet = clientbound.play.KeepAlivePacket(
self.context, keep_alive_id=keep_alive_id)
self.write_packet(packet, client_socket)
packet = self.read_packet_filtered(client_file)
assert isinstance(packet, serverbound.play.KeepAlivePacket)
assert packet.keep_alive_id == keep_alive_id
packet = clientbound.play.DisconnectPacket(
self.context, json_data=json.dumps({'text': 'Test complete.'}))
self.write_packet(packet, client_socket)
return False
def run_status(self, client_socket, client_file):
self.packets = self.packets_status
packet = self.read_packet(client_file)
assert isinstance(packet, serverbound.status.RequestPacket)
packet = clientbound.status.ResponsePacket(self.context)
packet.json_response = json.dumps({
'version': {
'name': self.minecraft_version,
'protocol': self.context.protocol_version},
'players': {
'max': 1,
'online': 0,
'sample': []},
'description': {
'text': 'FakeServer'}})
self.write_packet(packet, client_socket)
try:
packet = self.read_packet(client_file)
except EOFError:
return False
assert isinstance(packet, serverbound.status.PingPacket)
res_packet = clientbound.status.PingResponsePacket(self.context)
res_packet.time = packet.time
self.write_packet(res_packet, client_socket)
return False
def read_packet_filtered(self, client_file):
while True:
packet = self.read_packet(client_file)
if isinstance(packet, serverbound.play.PositionAndLookPacket):
continue
if isinstance(packet, serverbound.play.AnimationPacket):
continue
return packet
def read_packet(self, client_file):
buffer = self.read_packet_buffer(client_file)
packet_id = types.VarInt.read(buffer)
if packet_id in self.packets:
packet = self.packets[packet_id](self.context)
packet.read(buffer)
else:
packet = packets.Packet(self.context, id=packet_id)
logging.debug('[ ->S] %s' % packet)
return packet
def read_packet_buffer(self, client_file):
length = types.VarInt.read(client_file)
buffer = packets.PacketBuffer()
while len(buffer.get_writable()) < length:
buffer.send(client_file.read(length - len(buffer.get_writable())))
buffer.reset_cursor()
if self.compression_enabled:
data_length = types.VarInt.read(buffer)
if data_length > 0:
data = zlib.decompress(buffer.read())
assert len(data) == data_length, \
'%s != %s' % (len(data), data_length)
buffer.reset()
buffer.send(data)
buffer.reset_cursor()
return buffer
def write_packet(self, packet, client_socket):
kwds = {'compression_threshold': self.compression_threshold} \
if self.compression_enabled else {}
logging.debug('[S-> ] %s' % packet)
packet.write(client_socket, **kwds)
def test_with_protocol_numbers(self):
for version, proto in AllowedVersionsTest.VERSIONS.items():
client_versions = {
p for (v, p) in SUPPORTED_MINECRAFT_VERSIONS.items()
if p <= proto}
self._test_connect(
server_version=version, client_versions=client_versions)