mirror of
https://github.com/ammaraskar/pyCraft.git
synced 2024-12-11 11:31:58 +01:00
442 lines
12 KiB
Python
442 lines
12 KiB
Python
from io import BytesIO
|
|
from zlib import compress
|
|
|
|
from .types import (
|
|
VarInt, Integer, Float, Double, UnsignedShort, Long, Byte, UnsignedByte,
|
|
String, VarIntPrefixedByteArray, Boolean, UUID
|
|
)
|
|
|
|
|
|
class PacketBuffer(object):
|
|
def __init__(self):
|
|
self.bytes = BytesIO()
|
|
|
|
def send(self, value):
|
|
"""
|
|
Writes the given bytes to the buffer, designed to emulate socket.send
|
|
:param value: The bytes to write
|
|
"""
|
|
self.bytes.write(value)
|
|
|
|
def read(self, length):
|
|
return self.bytes.read(length)
|
|
|
|
def recv(self, length):
|
|
return self.read(length)
|
|
|
|
def reset(self):
|
|
self.bytes = BytesIO()
|
|
|
|
def reset_cursor(self):
|
|
self.bytes.seek(0)
|
|
|
|
def get_writable(self):
|
|
return self.bytes.getvalue()
|
|
|
|
|
|
class PacketListener(object):
|
|
def __init__(self, callback, *args):
|
|
self.callback = callback
|
|
self.packets_to_listen = []
|
|
for arg in args:
|
|
if issubclass(arg, Packet):
|
|
self.packets_to_listen.append(arg)
|
|
|
|
def call_packet(self, packet):
|
|
for packet_type in self.packets_to_listen:
|
|
if isinstance(packet, packet_type):
|
|
self.callback(packet)
|
|
|
|
|
|
class Packet(object):
|
|
packet_name = "base"
|
|
id = -0x01
|
|
definition = []
|
|
|
|
def __init__(self, **kwargs):
|
|
pass
|
|
|
|
def set_values(self, **kwargs):
|
|
for key, value in kwargs.items():
|
|
setattr(self, key, value)
|
|
return self
|
|
|
|
def read(self, file_object):
|
|
for field in self.definition:
|
|
for var_name, data_type in field.items():
|
|
value = data_type.read(file_object)
|
|
setattr(self, var_name, value)
|
|
|
|
def write(self, socket, compression_threshold=None):
|
|
# buffer the data since we need to know the length of each packet's
|
|
# payload
|
|
packet_buffer = PacketBuffer()
|
|
# write packet's id right off the bat in the header
|
|
VarInt.send(self.id, packet_buffer)
|
|
|
|
for field in self.definition:
|
|
for var_name, data_type in field.items():
|
|
data = getattr(self, var_name)
|
|
data_type.send(data, packet_buffer)
|
|
|
|
# compression_threshold of None means compression is disabled
|
|
if compression_threshold is not None:
|
|
if len(packet_buffer.get_writable()) > compression_threshold != -1:
|
|
# compress the current payload
|
|
compressed_data = compress(packet_buffer.get_writable())
|
|
packet_buffer.reset()
|
|
# write out the length of the compressed payload
|
|
VarInt.send(len(compressed_data), packet_buffer)
|
|
# write the compressed payload itself
|
|
packet_buffer.send(compressed_data)
|
|
else:
|
|
# write out a 0 to indicate uncompressed data
|
|
packet_data = packet_buffer.get_writable()
|
|
packet_buffer.reset()
|
|
VarInt.send(0, packet_buffer)
|
|
packet_buffer.send(packet_data)
|
|
|
|
VarInt.send(len(packet_buffer.get_writable()), socket) # Packet Size
|
|
socket.send(packet_buffer.get_writable()) # Packet Payload
|
|
|
|
|
|
# Handshake State
|
|
# ==============
|
|
class HandShakePacket(Packet):
|
|
id = 0x00
|
|
packet_name = "handshake"
|
|
definition = [
|
|
{'protocol_version': VarInt},
|
|
{'server_address': String},
|
|
{'server_port': UnsignedShort},
|
|
{'next_state': VarInt}]
|
|
|
|
|
|
STATE_HANDSHAKE_CLIENTBOUND = {
|
|
|
|
}
|
|
STATE_HANDSHAKE_SERVERBOUND = {
|
|
0x00: HandShakePacket
|
|
}
|
|
|
|
|
|
# Status State
|
|
# ==============
|
|
class ResponsePacket(Packet):
|
|
id = 0x00
|
|
packet_name = "response"
|
|
definition = [
|
|
{'json_response': String}]
|
|
|
|
|
|
class PingPacketResponse(Packet):
|
|
id = 0x01
|
|
packet_name = "ping"
|
|
definition = [
|
|
{'time': Long}]
|
|
|
|
|
|
STATE_STATUS_CLIENTBOUND = {
|
|
0x00: ResponsePacket,
|
|
0x01: PingPacketResponse
|
|
}
|
|
|
|
|
|
class RequestPacket(Packet):
|
|
id = 0x00
|
|
packet_name = "request"
|
|
definition = []
|
|
|
|
|
|
class PingPacket(Packet):
|
|
id = 0x01
|
|
packet_name = "ping"
|
|
definition = [
|
|
{'time': Long}]
|
|
|
|
|
|
STATE_STATUS_SERVERBOUND = {
|
|
0x00: RequestPacket,
|
|
0x01: PingPacket
|
|
}
|
|
|
|
|
|
# Login State
|
|
# ==============
|
|
class DisconnectPacket(Packet):
|
|
id = 0x00
|
|
packet_name = "disconnect"
|
|
definition = [
|
|
{'json_data': String}]
|
|
|
|
|
|
class EncryptionRequestPacket(Packet):
|
|
id = 0x01
|
|
packet_name = "encryption request"
|
|
definition = [
|
|
{'server_id': String},
|
|
{'public_key': VarIntPrefixedByteArray},
|
|
{'verify_token': VarIntPrefixedByteArray}]
|
|
|
|
|
|
class LoginSuccessPacket(Packet):
|
|
id = 0x02
|
|
packet_name = "login success"
|
|
definition = [
|
|
{'UUID': String},
|
|
{'Username': String}]
|
|
|
|
|
|
class SetCompressionPacket(Packet):
|
|
id = 0x03
|
|
packet_name = "set compression"
|
|
definition = [
|
|
{'threshold': VarInt}]
|
|
|
|
|
|
STATE_LOGIN_CLIENTBOUND = {
|
|
0x00: DisconnectPacket,
|
|
0x01: EncryptionRequestPacket,
|
|
0x02: LoginSuccessPacket,
|
|
0x03: SetCompressionPacket
|
|
}
|
|
|
|
|
|
class LoginStartPacket(Packet):
|
|
id = 0x00
|
|
packet_name = "login start"
|
|
definition = [
|
|
{'name': String}]
|
|
|
|
|
|
class EncryptionResponsePacket(Packet):
|
|
id = 0x01
|
|
packet_name = "encryption response"
|
|
definition = [
|
|
{'shared_secret': VarIntPrefixedByteArray},
|
|
{'verify_token': VarIntPrefixedByteArray}]
|
|
|
|
|
|
STATE_LOGIN_SERVERBOUND = {
|
|
0x00: LoginStartPacket,
|
|
0x01: EncryptionResponsePacket
|
|
}
|
|
|
|
|
|
# Playing State
|
|
# ==============
|
|
|
|
class KeepAlivePacket(Packet):
|
|
id = 0x00
|
|
packet_name = "keep alive"
|
|
definition = [
|
|
{'keep_alive_id': VarInt}]
|
|
|
|
|
|
class JoinGamePacket(Packet):
|
|
id = 0x01
|
|
packet_name = "join game"
|
|
definition = [
|
|
{'entity_id': Integer},
|
|
{'game_mode': UnsignedByte},
|
|
{'dimension': Byte},
|
|
{'difficulty': UnsignedByte},
|
|
{'max_players': UnsignedByte},
|
|
{'level_type': String},
|
|
{'reduced_debug_info': Boolean}]
|
|
|
|
|
|
class ChatMessagePacket(Packet):
|
|
id = 0x02
|
|
packet_name = "chat message"
|
|
definition = [
|
|
{'json_data': String},
|
|
{'position': Byte}]
|
|
|
|
|
|
class PlayerPositionAndLookPacket(Packet):
|
|
id = 0x08
|
|
packet_name = "player position and look"
|
|
definition = [
|
|
{'x': Double},
|
|
{'y': Double},
|
|
{'z': Double},
|
|
{'yaw': Float},
|
|
{'pitch': Float},
|
|
{'flags': Byte}]
|
|
|
|
|
|
class DisconnectPacketPlayState(Packet):
|
|
id = 0x40
|
|
packet_name = "disconnect"
|
|
|
|
definition = [
|
|
{'json_data': String}]
|
|
|
|
|
|
class SetCompressionPacketPlayState(Packet):
|
|
id = 0x46
|
|
packet_name = "set compression"
|
|
definition = [
|
|
{'threshold': VarInt}]
|
|
|
|
class PlayerListItemPacket(Packet):
|
|
id = 0x38
|
|
packet_name = "player list item"
|
|
|
|
class PlayerList(object):
|
|
__slots__ = 'players_by_uuid'
|
|
def __init__(self):
|
|
self.players_by_uuid = dict()
|
|
|
|
class PlayerListItem(object):
|
|
__slots__ = (
|
|
'uuid', 'name', 'properties', 'gamemode', 'ping', 'display_name')
|
|
def __init__(self, **kwds):
|
|
for key, val in kwds.iteritems():
|
|
setattr(self, key, val)
|
|
|
|
class PlayerProperty(object):
|
|
__slots__ = 'name', 'value', 'signature'
|
|
def read(self, file_object):
|
|
self.name = String.read(file_object)
|
|
self.value = String.read(file_object)
|
|
is_signed = Boolean.read(file_object)
|
|
if is_signed:
|
|
self.signature = String.read(file_object)
|
|
else:
|
|
self.signature = None
|
|
|
|
class Action(object):
|
|
__slots__ = 'uuid'
|
|
def read(self, file_object):
|
|
self.uuid = UUID.read(file_object)
|
|
self._read(file_object)
|
|
|
|
@classmethod
|
|
def type_from_id(cls, action_id):
|
|
subcls = {
|
|
0: PlayerListItemPacket.AddPlayerAction,
|
|
1: PlayerListItemPacket.UpdateGameModeAction,
|
|
2: PlayerListItemPacket.UpdateLatencyAction,
|
|
3: PlayerListItemPacket.UpdateDisplayNameAction,
|
|
4: PlayerListItemPacket.RemovePlayerAction
|
|
}.get(action_id)
|
|
if subcls is None: raise ValueError(
|
|
"Unknown player list action ID: %s." % action_id)
|
|
return subcls
|
|
|
|
class AddPlayerAction(Action):
|
|
__slots__ = 'name', 'properties', 'gamemode', 'ping', 'display_name'
|
|
def _read(self, file_object):
|
|
self.name = String.read(file_object)
|
|
prop_count = VarInt.read(file_object)
|
|
self.properties = []
|
|
for i in range(prop_count):
|
|
property = PlayerListItemPacket.PlayerProperty()
|
|
property.read(file_object)
|
|
self.properties.append(property)
|
|
self.gamemode = VarInt.read(file_object)
|
|
self.ping = VarInt.read(file_object)
|
|
has_display_name = Boolean.read(file_object)
|
|
if has_display_name:
|
|
self.display_name = String.read(file_object)
|
|
else:
|
|
self.display_name = None
|
|
def apply(self, player_list):
|
|
player = PlayerListItemPacket.PlayerListItem(
|
|
uuid = self.uuid,
|
|
name = self.name,
|
|
properties = self.properties,
|
|
gamemode = self.gamemode,
|
|
ping = self.ping,
|
|
display_name = self.display_name)
|
|
player_list.players_by_uuid[self.uuid] = player
|
|
|
|
class UpdateGameModeAction(Action):
|
|
__slots__ = 'gamemode'
|
|
def _read(self, file_object):
|
|
self.gamemode = VarInt.read(file_object)
|
|
def apply(self, player_list):
|
|
player = player_list.players_by_uuid.get(self.uuid)
|
|
if player: player.gamemode = self.gamemode
|
|
|
|
class UpdateLatencyAction(Action):
|
|
__slots__ = 'ping'
|
|
def _read(self, file_object):
|
|
self.ping = VarInt.read(file_object)
|
|
def apply(self, player_list):
|
|
player = player_list.players_by_uuid.get(self.uuid)
|
|
if player: player.ping = self.ping
|
|
|
|
class UpdateDisplayNameAction(Action):
|
|
__slots__ = 'display_name'
|
|
def _read(self, file_object):
|
|
has_display_name = Boolean.read(file_object)
|
|
if has_display_name:
|
|
self.display_name = String.read(file_object)
|
|
else:
|
|
self.display_name = None
|
|
def apply(self, player_list):
|
|
player = player_list.players_by_uuid.get(self.uuid)
|
|
if player: player.display_name = self.display_name
|
|
|
|
class RemovePlayerAction(Action):
|
|
def _read(self, file_object):
|
|
pass
|
|
def apply(self, player_list):
|
|
if self.uuid in player_list.players_by_uuid:
|
|
del player_list.players_by_uuid[self.uuid]
|
|
|
|
def read(self, file_object):
|
|
action_id = VarInt.read(file_object)
|
|
self.action_type = PlayerListItemPacket.Action.type_from_id(action_id)
|
|
action_count = VarInt.read(file_object)
|
|
self.actions = []
|
|
for i in range(action_count):
|
|
action = self.action_type()
|
|
action.read(file_object)
|
|
self.actions.append(action)
|
|
|
|
def apply(self, player_list):
|
|
for action in self.actions:
|
|
action.apply(player_list)
|
|
|
|
def write(self, socket, compression_threshold=None):
|
|
raise NotImplementedError
|
|
|
|
STATE_PLAYING_CLIENTBOUND = {
|
|
0x00: KeepAlivePacket,
|
|
0x01: JoinGamePacket,
|
|
0x02: ChatMessagePacket,
|
|
0x08: PlayerPositionAndLookPacket,
|
|
0x38: PlayerListItemPacket,
|
|
0x40: DisconnectPacketPlayState,
|
|
0x46: SetCompressionPacketPlayState
|
|
}
|
|
|
|
|
|
class ChatPacket(Packet):
|
|
id = 0x01
|
|
packet_name = "chat"
|
|
definition = [
|
|
{'message': String}]
|
|
|
|
|
|
class PositionAndLookPacket(Packet):
|
|
id = 0x06
|
|
packet_name = "position and look"
|
|
definition = [
|
|
{'x': Double},
|
|
{'feet_y': Double},
|
|
{'z': Double},
|
|
{'yaw': Float},
|
|
{'pitch': Float},
|
|
{'on_ground': Boolean}]
|
|
|
|
STATE_PLAYING_SERVERBOUND = {
|
|
0x00: KeepAlivePacket,
|
|
0x01: ChatPacket,
|
|
0x06: PositionAndLookPacket
|
|
}
|