mirror of
https://github.com/ammaraskar/pyCraft.git
synced 2024-11-28 21:25:21 +01:00
354 lines
12 KiB
Python
354 lines
12 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf8 -*-
|
|
"""
|
|
Implements reading & writing for the Minecraft Named Binary Tag (NBT) format,
|
|
created by Markus Petersson.
|
|
|
|
.. moduleauthor:: Tyler Kennedy <tk@tkte.ch>
|
|
"""
|
|
import gzip
|
|
from struct import unpack, pack
|
|
|
|
|
|
class BaseTag(object):
|
|
def __init__(self, value, name=None):
|
|
self.name = name
|
|
self.value = value
|
|
|
|
@staticmethod
|
|
def _read_utf8(read):
|
|
"""Reads a length-prefixed UTF-8 string."""
|
|
name_length = read('H', 2)[0]
|
|
return read.io.read(name_length).decode('utf-8')
|
|
|
|
@staticmethod
|
|
def _write_utf8(write, value):
|
|
"""Writes a length-prefixed UTF-8 string."""
|
|
write('h', len(value))
|
|
write.io.write(value.encode('UTF-8'))
|
|
|
|
@classmethod
|
|
def read(cls, read, has_name=True):
|
|
"""
|
|
Read the tag in using the reader `rd`.
|
|
If `has_name` is `False`, skip reading the tag name.
|
|
"""
|
|
name = cls._read_utf8(read) if has_name else None
|
|
|
|
if cls is TAG_Compound:
|
|
# A TAG_Compound is almost identical to Python's native dict()
|
|
# object, or a Java HashMap.
|
|
final = {}
|
|
while True:
|
|
# Find the type of each tag in a compound in turn.
|
|
tag = read('b', 1)[0]
|
|
if tag == 0:
|
|
# A tag of 0 means we've reached TAG_End, used to terminate
|
|
# a TAG_Compound.
|
|
break
|
|
# We read in each tag in turn, using its name as the key in
|
|
# the dict (Since a compound cannot have repeating names,
|
|
# this works fine).
|
|
tmp = _tags[tag].read(read)
|
|
final[tmp.name] = tmp
|
|
return cls(final, name=name)
|
|
elif cls is TAG_List:
|
|
# A TAG_List is a very simple homogeneous array, similar to
|
|
# Python's native list() object, but restricted to a single type.
|
|
tag_type, length = read('bi', 5)
|
|
tag_read = _tags[tag_type].read
|
|
return cls(
|
|
_tags[tag_type],
|
|
[tag_read(read, has_name=False) for x in range(0, length)],
|
|
name=name
|
|
)
|
|
elif cls is TAG_String:
|
|
# A simple length-prefixed UTF-8 string.
|
|
value = cls._read_utf8(read)
|
|
return cls(value, name=name)
|
|
elif cls is TAG_Byte_Array:
|
|
# A simple array of (signed) bytes.
|
|
length = read('i', 4)[0]
|
|
return cls(read('{0}b'.format(length), length), name=name)
|
|
elif cls is TAG_Int_Array:
|
|
# A simple array of (signed) 4-byte integers.
|
|
length = read('i', 4)[0]
|
|
return cls(read('{0}i'.format(length), length * 4), name=name)
|
|
elif cls is TAG_Byte:
|
|
# A single (signed) byte.
|
|
return cls(read('b', 1)[0], name=name)
|
|
elif cls is TAG_Short:
|
|
# A single (signed) short.
|
|
return cls(read('h', 2)[0], name=name)
|
|
elif cls is TAG_Int:
|
|
# A signed (signed) 4-byte int.
|
|
return cls(read('i', 4)[0], name=name)
|
|
elif cls is TAG_Long:
|
|
# A single (signed) 8-byte long.
|
|
return cls(read('q', 8)[0], name=name)
|
|
elif cls is TAG_Float:
|
|
# A single single-precision floating point value.
|
|
return cls(read('f', 4)[0], name=name)
|
|
elif cls is TAG_Double:
|
|
# A single double-precision floating point value.
|
|
return cls(read('d', 8)[0], name=name)
|
|
|
|
def write(self, write):
|
|
# Only write the name TAG_String if our name is not `None`.
|
|
# If you want a blank name, use ''.
|
|
if self.name is not None:
|
|
if isinstance(self, NBTFile):
|
|
write('b', 0x0A)
|
|
else:
|
|
write('b', _tags.index(self.__class__))
|
|
self._write_utf8(write, self.name)
|
|
if isinstance(self, TAG_List):
|
|
write('bi', _tags.index(self.type_), len(self.value))
|
|
for item in self.value:
|
|
# If our list item isn't of type self._type, convert
|
|
# it before writing.
|
|
if not isinstance(item, self.type_):
|
|
item = self.type_(item)
|
|
item.write(write)
|
|
elif isinstance(self, TAG_Compound):
|
|
for v in self.value.values():
|
|
v.write(write)
|
|
# A tag of type 0 (TAg_End) terminates a TAG_Compound.
|
|
write('b', 0)
|
|
elif isinstance(self, TAG_String):
|
|
self._write_utf8(write, self.value)
|
|
elif isinstance(self, TAG_Int_Array):
|
|
l = len(self.value)
|
|
write('i{0}i'.format(l), l, *self.value)
|
|
elif isinstance(self, TAG_Byte_Array):
|
|
l = len(self.value)
|
|
write('i{0}b'.format(l), l, *self.value)
|
|
elif isinstance(self, TAG_Byte):
|
|
write('b', self.value)
|
|
elif isinstance(self, TAG_Short):
|
|
write('h', self.value)
|
|
elif isinstance(self, TAG_Int):
|
|
write('i', self.value)
|
|
elif isinstance(self, TAG_Long):
|
|
write('q', self.value)
|
|
elif isinstance(self, TAG_Float):
|
|
write('f', self.value)
|
|
elif isinstance(self, TAG_Double):
|
|
write('d', self.value)
|
|
|
|
def pretty(self, indent=0, indent_str=' '):
|
|
"""
|
|
Pretty-print a tag in the same general style as Markus's example
|
|
output.
|
|
"""
|
|
return '{0}{1}({2!r}): {3!r}'.format(
|
|
indent_str * indent,
|
|
self.__class__.__name__,
|
|
self.name,
|
|
self.value
|
|
)
|
|
|
|
def __repr__(self):
|
|
return '{0}({1!r}, {2!r})'.format(
|
|
self.__class__.__name__, self.value, self.name)
|
|
|
|
def __str__(self):
|
|
return repr(self)
|
|
|
|
def __unicode__(self):
|
|
return unicode(repr(self), 'utf-8')
|
|
|
|
|
|
class TAG_Byte(BaseTag):
|
|
pass
|
|
|
|
|
|
class TAG_Short(BaseTag):
|
|
pass
|
|
|
|
|
|
class TAG_Int(BaseTag):
|
|
pass
|
|
|
|
|
|
class TAG_Long(BaseTag):
|
|
pass
|
|
|
|
|
|
class TAG_Float(BaseTag):
|
|
pass
|
|
|
|
|
|
class TAG_Double(BaseTag):
|
|
pass
|
|
|
|
|
|
class TAG_Byte_Array(BaseTag):
|
|
def pretty(self, indent=0, indent_str=' '):
|
|
return '{0}TAG_Byte_Array({1!r}): [{2} bytes]'.format(
|
|
indent_str * indent, self.name, len(self.value))
|
|
|
|
|
|
class TAG_String(BaseTag):
|
|
pass
|
|
|
|
|
|
class TAG_List(BaseTag, list):
|
|
def __init__(self, tag_type, value=None, name=None):
|
|
"""
|
|
Creates a new homogeneous list of `tag_type` items, copying `value`
|
|
if provided.
|
|
"""
|
|
self.name = name
|
|
self.value = self
|
|
self.type_ = tag_type
|
|
if value is not None:
|
|
self.extend(value)
|
|
|
|
def pretty(self, indent=0, indent_str=' '):
|
|
t = []
|
|
t.append('{0}TAG_List({1!r}): {2} entries'.format(
|
|
indent_str * indent, self.name, len(self.value)))
|
|
t.append('{0}{{'.format(indent_str * indent))
|
|
for v in self.value:
|
|
t.append(v.pretty(indent + 1, indent_str))
|
|
t.append('{0}}}'.format(indent_str * indent))
|
|
return '\n'.join(t)
|
|
|
|
def __repr__(self):
|
|
return '{0}({1!r} entries, {2!r})'.format(
|
|
self.__class__.__name__, len(self), self.name)
|
|
|
|
|
|
class TAG_Compound(BaseTag, dict):
|
|
def __init__(self, value=None, name=None):
|
|
self.name = name
|
|
self.value = self
|
|
if value is not None:
|
|
self.update(value)
|
|
|
|
def pretty(self, indent=0, indent_str=' '):
|
|
t = []
|
|
t.append('{0}TAG_Compound({1!r}): {2} entries'.format(
|
|
indent_str * indent, self.name, len(self.value)))
|
|
t.append('{0}{{'.format(indent_str * indent))
|
|
for v in self.values():
|
|
t.append(v.pretty(indent + 1, indent_str))
|
|
t.append('{0}}}'.format(indent_str * indent))
|
|
return '\n'.join(t)
|
|
|
|
def __repr__(self):
|
|
return '{0}({1!r} entries, {2!r})'.format(
|
|
self.__class__.__name__, len(self), self.name)
|
|
|
|
def __setitem__(self, key, value):
|
|
"""
|
|
Sets the TAG_*'s name if it isn't already set to that of the key
|
|
it's being assigned to. This results in cleaner code, as the name
|
|
does not need to be specified twice.
|
|
"""
|
|
if value.name is None:
|
|
value.name = key
|
|
super(TAG_Compound, self).__setitem__(key, value)
|
|
|
|
def update(self, *args, **kwargs):
|
|
"""See `__setitem__`."""
|
|
super(TAG_Compound, self).update(*args, **kwargs)
|
|
for key, item in self.items():
|
|
if item.name is None:
|
|
item.name = key
|
|
|
|
|
|
class TAG_Int_Array(BaseTag):
|
|
def pretty(self, indent=0, indent_str=' '):
|
|
return '{0}TAG_Int_Array({1!r}): [{2} integers]'.format(
|
|
indent_str * indent, self.name, len(self.value))
|
|
|
|
# The TAG_* types have the convienient property of being continuous.
|
|
# The code is written in such a way that if this were to no longer be
|
|
# true in the future, _tags can simply be replaced with a dict().
|
|
_tags = (
|
|
None,
|
|
TAG_Byte,
|
|
TAG_Short,
|
|
TAG_Int,
|
|
TAG_Long,
|
|
TAG_Float,
|
|
TAG_Double,
|
|
TAG_Byte_Array,
|
|
TAG_String,
|
|
TAG_List,
|
|
TAG_Compound,
|
|
TAG_Int_Array
|
|
)
|
|
|
|
|
|
class NBTFile(TAG_Compound):
|
|
class Compression(object):
|
|
"""
|
|
Defines compression schemes to be used for loading and saving
|
|
NBT files.
|
|
"""
|
|
# NONE is simply for the sake of completeness.
|
|
NONE = 10
|
|
# Use Gzip compression when reading or writing.
|
|
GZIP = 20
|
|
|
|
def __init__(self, io=None, name=None, value=None, compression=None,
|
|
little_endian=False):
|
|
"""
|
|
Creates a new NBTFile or loads one from any file-like object providing
|
|
`read()`.
|
|
|
|
Construction a new NBTFile() is as simple as:
|
|
>>> nbt = NBTFile(name='')
|
|
|
|
Whereas loading an existing one is most often done:
|
|
>>> with open('my_file.nbt', rb') as io:
|
|
... nbt = NBTFile(io=io, compression=NBTFile.Compression.GZIP)
|
|
"""
|
|
# No file or path given, so we're creating a new NBTFile.
|
|
if io is None:
|
|
super(NBTFile, self).__init__(value if value else {}, name)
|
|
return
|
|
|
|
if compression is None or compression == NBTFile.Compression.NONE:
|
|
final_io = io
|
|
elif compression == NBTFile.Compression.GZIP:
|
|
final_io = gzip.GzipFile(fileobj=io, mode='rb')
|
|
else:
|
|
raise ValueError('Unrecognized compression scheme.')
|
|
|
|
# The pocket edition uses little-endian NBT files, but annoyingly
|
|
# without any kind of header we can't determine that ourselves,
|
|
# not even a magic number we could flip.
|
|
if little_endian:
|
|
read = lambda fmt, size: unpack('<' + fmt, final_io.read(size))
|
|
else:
|
|
read = lambda fmt, size: unpack('>' + fmt, final_io.read(size))
|
|
read.io = final_io
|
|
|
|
# All valid NBT files will begin with 0x0A, which is a TAG_Compound.
|
|
if read('b', 1)[0] != 0x0A:
|
|
raise IOError('NBTFile does not begin with 0x0A.')
|
|
|
|
tmp = TAG_Compound.read(read)
|
|
super(NBTFile, self).__init__(tmp, tmp.name)
|
|
|
|
def save(self, io, compression=None, little_endian=False):
|
|
"""
|
|
Saves the `NBTFile()` to `io`, which can be any file-like object
|
|
providing `write()`.
|
|
"""
|
|
if compression is None or compression == NBTFile.Compression.NONE:
|
|
final_io = io
|
|
elif compression == NBTFile.Compression.GZIP:
|
|
final_io = gzip.GzipFile(fileobj=io, mode='wb')
|
|
|
|
if little_endian:
|
|
write = lambda fmt, *args: final_io.write(pack('<' + fmt, *args))
|
|
else:
|
|
write = lambda fmt, *args: final_io.write(pack('>' + fmt, *args))
|
|
write.io = final_io
|
|
|
|
self.write(write) |