pyCraft/tests/test_packets.py

421 lines
17 KiB
Python

# -*- coding: utf-8 -*-
import unittest
import string
import logging
import struct
from zlib import decompress
from random import choice
from minecraft.utility import protocol_earlier
from minecraft import (
PRE, SUPPORTED_PROTOCOL_VERSIONS, RELEASE_PROTOCOL_VERSIONS,
)
from minecraft.networking.connection import ConnectionContext
from minecraft.networking.types import (
VarInt, Enum, Vector, PositionAndLook, OriginPoint,
)
from minecraft.networking.packets import (
Packet, PacketBuffer, PacketListener, KeepAlivePacket, serverbound,
clientbound
)
TEST_VERSIONS = list(RELEASE_PROTOCOL_VERSIONS)
if SUPPORTED_PROTOCOL_VERSIONS[-1] not in TEST_VERSIONS:
TEST_VERSIONS.append(SUPPORTED_PROTOCOL_VERSIONS[-1])
class PacketBufferTest(unittest.TestCase):
def test_basic_read_write(self):
message = b"hello"
packet_buffer = PacketBuffer()
packet_buffer.send(message)
packet_buffer.reset_cursor()
self.assertEqual(packet_buffer.read(), message)
packet_buffer.reset_cursor()
self.assertEqual(packet_buffer.recv(), message)
packet_buffer.reset()
self.assertNotEqual(packet_buffer.read(), message)
def test_get_writable(self):
message = b"hello"
packet_buffer = PacketBuffer()
packet_buffer.send(message)
self.assertEqual(packet_buffer.get_writable(), message)
class PacketSerializationTest(unittest.TestCase):
def test_packet(self):
for protocol_version in TEST_VERSIONS:
logging.debug('protocol_version = %r' % protocol_version)
context = ConnectionContext(protocol_version=protocol_version)
packet = serverbound.play.ChatPacket(context)
packet.message = u"κόσμε"
packet_buffer = PacketBuffer()
packet.write(packet_buffer)
packet_buffer.reset_cursor()
# Read the length and packet id
VarInt.read(packet_buffer)
packet_id = VarInt.read(packet_buffer)
self.assertEqual(packet_id, packet.id)
deserialized = serverbound.play.ChatPacket(context)
deserialized.read(packet_buffer)
self.assertEqual(packet.message, deserialized.message)
def test_compressed_packet(self):
for protocol_version in TEST_VERSIONS:
logging.debug('protocol_version = %r' % protocol_version)
context = ConnectionContext(protocol_version=protocol_version)
msg = ''.join(choice(string.ascii_lowercase) for i in range(500))
packet = serverbound.play.ChatPacket(context)
packet.message = msg
self.write_read_packet(packet, 20)
self.write_read_packet(packet, -1)
def write_read_packet(self, packet, compression_threshold):
for protocol_version in TEST_VERSIONS:
logging.debug('protocol_version = %r' % protocol_version)
context = ConnectionContext(protocol_version=protocol_version)
packet_buffer = PacketBuffer()
packet.write(packet_buffer, compression_threshold)
packet_buffer.reset_cursor()
VarInt.read(packet_buffer)
compressed_size = VarInt.read(packet_buffer)
if compressed_size > 0:
decompressed = decompress(packet_buffer.read(compressed_size))
packet_buffer.reset()
packet_buffer.send(decompressed)
packet_buffer.reset_cursor()
packet_id = VarInt.read(packet_buffer)
self.assertEqual(packet_id, packet.id)
deserialized = serverbound.play.ChatPacket(context)
deserialized.read(packet_buffer)
self.assertEqual(packet.message, deserialized.message)
class PacketListenerTest(unittest.TestCase):
def test_listener(self):
message = "hello world"
def test_packet(chat_packet):
self.assertEqual(chat_packet.message, message)
for protocol_version in TEST_VERSIONS:
logging.debug('protocol_version = %r' % protocol_version)
context = ConnectionContext(protocol_version=protocol_version)
listener = PacketListener(test_packet, serverbound.play.ChatPacket)
packet = serverbound.play.ChatPacket(context).set_values(
message=message)
uncalled_packet = KeepAlivePacket().set_values(keep_alive_id=0)
listener.call_packet(packet)
listener.call_packet(uncalled_packet)
class PacketEnumTest(unittest.TestCase):
def test_packet_str(self):
class ExamplePacket(Packet):
id = 0x00
packet_name = 'example'
definition = [
{'alpha': VarInt},
{'beta': VarInt},
{'gamma': VarInt}]
class Alpha(Enum):
ZERO = 0
class Beta(Enum):
ONE = 1
self.assertEqual(
str(ExamplePacket(ConnectionContext(), alpha=0, beta=0, gamma=0)),
'0x00 ExamplePacket(alpha=ZERO, beta=0, gamma=0)'
)
class TestReadWritePackets(unittest.TestCase):
maxDiff = None
def test_explosion_packet(self):
context = ConnectionContext(protocol_version=TEST_VERSIONS[-1])
Record = clientbound.play.ExplosionPacket.Record
packet = clientbound.play.ExplosionPacket(
position=Vector(787, -37, 0), radius=15,
records=[Record(-14, -116, -5), Record(-77, 34, -36),
Record(-35, -127, 95), Record(11, 113, -8)],
player_motion=Vector(4, 5, 0), context=context)
self.assertEqual(
str(packet),
'0x%02X ExplosionPacket(x=787, y=-37, z=0, radius=15, records=['
'Record(-14, -116, -5), Record(-77, 34, -36), '
'Record(-35, -127, 95), Record(11, 113, -8)], '
'player_motion_x=4, player_motion_y=5, player_motion_z=0)'
% packet.id
)
self._test_read_write_packet(packet)
def test_combat_event_packet(self):
packet = clientbound.play.CombatEventPacket(
event=clientbound.play.CombatEventPacket.EnterCombatEvent())
self.assertEqual(
str(packet),
'CombatEventPacket(event=EnterCombatEvent())'
)
self._test_read_write_packet(packet, vmax=PRE | 14)
with self.assertRaises(NotImplementedError):
self._test_read_write_packet(packet, vmin=PRE | 15)
specialised_packet = clientbound.play.EnterCombatEventPacket()
self.assertIsInstance(specialised_packet.event, type(packet.event))
for field in specialised_packet.fields:
value = getattr(packet.event, field)
setattr(specialised_packet, field, value)
self.assertEqual(getattr(specialised_packet.event, field), value)
packet = clientbound.play.CombatEventPacket(
event=clientbound.play.CombatEventPacket.EndCombatEvent(
duration=415, entity_id=91063502))
self.assertEqual(str(packet),
'CombatEventPacket(event=EndCombatEvent('
'duration=415, entity_id=91063502))')
self._test_read_write_packet(packet, vmax=PRE | 14)
with self.assertRaises(NotImplementedError):
self._test_read_write_packet(packet, vmin=PRE | 15)
specialised_packet = clientbound.play.EndCombatEventPacket()
self.assertIsInstance(specialised_packet.event, type(packet.event))
for field in specialised_packet.fields:
value = getattr(packet.event, field)
setattr(specialised_packet, field, value)
self.assertEqual(getattr(specialised_packet.event, field), value)
packet = clientbound.play.CombatEventPacket(
event=clientbound.play.CombatEventPacket.EntityDeadEvent(
player_id=178, entity_id=36, message='RIP'))
self.assertEqual(
str(packet),
"CombatEventPacket(event=EntityDeadEvent("
"player_id=178, entity_id=36, message='RIP'))"
)
self._test_read_write_packet(packet, vmax=PRE | 14)
with self.assertRaises(NotImplementedError):
self._test_read_write_packet(packet, vmin=PRE | 15)
specialised_packet = clientbound.play.DeathCombatEventPacket()
self.assertIsInstance(specialised_packet.event, type(packet.event))
for field in specialised_packet.fields:
value = getattr(packet.event, field)
setattr(specialised_packet, field, value)
self.assertEqual(getattr(specialised_packet.event, field), value)
def test_multi_block_change_packet(self):
Record = clientbound.play.MultiBlockChangePacket.Record
for protocol_version in TEST_VERSIONS:
context = ConnectionContext()
context.protocol_version = protocol_version
packet = clientbound.play.MultiBlockChangePacket(context)
if context.protocol_later_eq(741):
packet.chunk_section_pos = Vector(167, 17, 33)
packet.invert_trust_edges = False
else:
packet.chunk_x, packet.chunk_z = 167, 17
self.assertEqual(packet.chunk_pos, (167, 17))
packet.records = [
Record(x=1, y=2, z=3, blockId=56, blockMeta=13),
Record(position=Vector(1, 2, 3), block_state_id=909),
Record(position=(1, 2, 3), blockStateId=909),
]
for i in range(3):
self.assertEqual(packet.records[i].blockId, 56)
self.assertEqual(packet.records[i].blockMeta, 13)
self.assertEqual(packet.records[i].blockStateId, 909)
self.assertEqual(packet.records[i].position, Vector(1, 2, 3))
self._test_read_write_packet(packet, context)
def test_spawn_object_packet(self):
for protocol_version in TEST_VERSIONS:
logging.debug('protocol_version = %r' % protocol_version)
context = ConnectionContext(protocol_version=protocol_version)
EntityType = clientbound.play.SpawnObjectPacket.field_enum(
'type_id', context)
pos_look = PositionAndLook(
position=(Vector(68.0, 38.0, 76.0)
if context.protocol_later_eq(100) else
Vector(68, 38, 76)),
yaw=263.494, pitch=180)
velocity = Vector(21, 55, 41)
entity_id, type_name, type_id = 49846, 'EGG', EntityType.EGG
packet = clientbound.play.SpawnObjectPacket(
context=context,
x=pos_look.x, y=pos_look.y, z=pos_look.z,
yaw=pos_look.yaw, pitch=pos_look.pitch,
velocity_x=velocity.x, velocity_y=velocity.y,
velocity_z=velocity.z,
entity_id=entity_id, type_id=type_id, data=1)
if context.protocol_later_eq(49):
object_uuid = 'd9568851-85bc-4a10-8d6a-261d130626fa'
packet.object_uuid = object_uuid
self.assertEqual(packet.objectUUID, object_uuid)
self.assertEqual(packet.position_and_look, pos_look)
self.assertEqual(packet.position, pos_look.position)
self.assertEqual(packet.velocity, velocity)
self.assertEqual(packet.type, type_name)
self.assertEqual(
str(packet),
"0x%02X SpawnObjectPacket(entity_id=49846, "
"object_uuid='d9568851-85bc-4a10-8d6a-261d130626fa', "
"type_id=EGG, x=68.0, y=38.0, z=76.0, pitch=180, yaw=263.494, "
"data=1, velocity_x=21, velocity_y=55, velocity_z=41)"
% packet.id if context.protocol_later_eq(100) else
"0x%02X SpawnObjectPacket(entity_id=49846, "
"object_uuid='d9568851-85bc-4a10-8d6a-261d130626fa', "
"type_id=EGG, x=68, y=38, z=76, pitch=180, yaw=263.494, "
"data=1, velocity_x=21, velocity_y=55, velocity_z=41)"
% packet.id if context.protocol_later_eq(49) else
"0x%02X SpawnObjectPacket(entity_id=49846, type_id=EGG, "
"x=68, y=38, z=76, pitch=180, yaw=263.494, data=1, "
"velocity_x=21, velocity_y=55, velocity_z=41)" % packet.id
)
packet2 = clientbound.play.SpawnObjectPacket(
context=context, position_and_look=pos_look,
velocity=velocity, type=type_name,
entity_id=entity_id, data=1)
if context.protocol_later_eq(49):
packet2.object_uuid = object_uuid
self.assertEqual(packet.__dict__, packet2.__dict__)
packet2.position = pos_look.position
self.assertEqual(packet.position, packet2.position)
packet2.data = 0
if context.protocol_earlier(49):
del packet2.velocity
self._test_read_write_packet(packet, context,
yaw=360/256, pitch=360/256)
self._test_read_write_packet(packet2, context,
yaw=360/256, pitch=360/256)
def test_sound_effect_packet(self):
for protocol_version in TEST_VERSIONS:
context = ConnectionContext(protocol_version=protocol_version)
packet = clientbound.play.SoundEffectPacket(
sound_id=545, effect_position=Vector(0.125, 300.0, 50.5),
volume=0.75)
if context.protocol_later_eq(201):
packet.pitch = struct.unpack('f', struct.pack('f', 1.5))[0]
else:
packet.pitch = int(1.5 / 63.5) * 63.5
if context.protocol_later_eq(95):
packet.sound_category = \
clientbound.play.SoundEffectPacket.SoundCategory.NEUTRAL
self._test_read_write_packet(packet, context)
def test_face_player_packet(self):
for protocol_version in TEST_VERSIONS:
context = ConnectionContext(protocol_version=protocol_version)
packet = clientbound.play.FacePlayerPacket(context)
packet.target = 1.0, -2.0, 3.5
packet.entity_id = None
if context.protocol_later_eq(353):
packet.origin = OriginPoint.EYES
self.assertEqual(
str(packet),
"0x%02X FacePlayerPacket(origin=EYES, x=1.0, y=-2.0, z=3.5, "
"entity_id=None)" % packet.id
if context.protocol_later_eq(353) else
"0x%02X FacePlayerPacket(entity_id=None, x=1.0, y=-2.0, z=3.5)"
% packet.id
)
self._test_read_write_packet(packet, context)
packet.entity_id = 123
if context.protocol_later_eq(353):
packet.entity_origin = OriginPoint.FEET
else:
del packet.target
self.assertEqual(
str(packet),
"0x%02X FacePlayerPacket(origin=EYES, x=1.0, y=-2.0, z=3.5, "
"entity_id=123, entity_origin=FEET)" % packet.id
if context.protocol_later_eq(353) else
"0x%02X FacePlayerPacket(entity_id=123)" % packet.id
)
self._test_read_write_packet(packet, context)
def _test_read_write_packet(
self, packet_in, context=None, vmin=None, vmax=None, **kwargs
):
"""
If kwargs are specified, the key will be tested against the
respective delta value. Useful for testing FixedPointNumbers
where there is precision lost in the resulting value.
"""
if context is None:
for pv in TEST_VERSIONS:
if vmin is not None and protocol_earlier(pv, vmin):
continue
if vmax is not None and protocol_earlier(vmax, pv):
continue
logging.debug('protocol_version = %r' % pv)
context = ConnectionContext(protocol_version=pv)
self._test_read_write_packet(packet_in, context)
else:
packet_in.context = context
packet_buffer = PacketBuffer()
packet_in.write(packet_buffer)
packet_buffer.reset_cursor()
VarInt.read(packet_buffer)
packet_id = VarInt.read(packet_buffer)
self.assertEqual(packet_id, packet_in.id)
packet_out = type(packet_in)(context=context)
packet_out.read(packet_buffer)
self.assertIs(type(packet_in), type(packet_out))
for packet_attr, precision in kwargs.items():
packet_attribute_in = packet_in.__dict__.pop(packet_attr)
packet_attribute_out = packet_out.__dict__.pop(packet_attr)
self.assertAlmostEqual(packet_attribute_in,
packet_attribute_out,
delta=precision)
self.assertEqual(packet_in.__dict__, packet_out.__dict__)