WIP BinaryBuffer

This commit is contained in:
TheMode 2021-08-06 14:21:11 +02:00
parent aa2a6522dc
commit 455c21208e
4 changed files with 200 additions and 50 deletions

View File

@ -127,9 +127,7 @@ public final class PacketProcessor {
} catch (Exception e) {
final Player player = connection.getPlayer();
final String username = player != null ? player.getUsername() : "null";
LOGGER.warn("Connection {} ({}) sent an unexpected packet.",
connection.getRemoteAddress(),
username);
LOGGER.warn("Connection {} ({}) sent an unexpected packet.", connection.getRemoteAddress(), username);
MinecraftServer.getExceptionManager().handleException(e);
}
}

View File

@ -14,7 +14,7 @@ import net.minestom.server.network.packet.server.login.SetCompressionPacket;
import net.minestom.server.network.socket.Server;
import net.minestom.server.network.socket.Worker;
import net.minestom.server.utils.PacketUtils;
import net.minestom.server.utils.Utils;
import net.minestom.server.utils.binary.BinaryBuffer;
import net.minestom.server.utils.validate.Check;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
@ -62,8 +62,8 @@ public class NettyPlayerConnection extends PlayerConnection {
private UUID bungeeUuid;
private PlayerSkin bungeeSkin;
private final ByteBuffer tickBuffer = ByteBuffer.allocateDirect(Server.SOCKET_BUFFER_SIZE);
private volatile ByteBuffer cacheBuffer;
private final BinaryBuffer tickBuffer = BinaryBuffer.ofSize(Server.SOCKET_BUFFER_SIZE);
private volatile BinaryBuffer cacheBuffer;
public NettyPlayerConnection(@NotNull Worker worker, @NotNull SocketChannel channel, SocketAddress remoteAddress) {
super();
@ -74,74 +74,69 @@ public class NettyPlayerConnection extends PlayerConnection {
public void processPackets(Worker.Context workerContext, PacketProcessor packetProcessor) {
final var readBuffer = workerContext.readBuffer;
final int limit = readBuffer.limit();
final int limit = readBuffer.writerOffset();
// Read all packets
while (readBuffer.remaining() > 0) {
readBuffer.mark(); // Mark the beginning of the packet
while (readBuffer.readableBytes() > 0) {
final var beginMark = readBuffer.mark();
try {
// Read packet
final int packetLength = Utils.readVarInt(readBuffer);
final int packetEnd = readBuffer.position() + packetLength;
if (packetEnd > readBuffer.limit()) {
// Ensure that the buffer contains the full packet (or wait for next socket read)
final int packetLength = readBuffer.readVarInt();
final int packetEnd = readBuffer.readerOffset() + packetLength;
if (packetEnd > readBuffer.writerOffset()) {
// Integrity fail
throw new BufferUnderflowException();
}
readBuffer.limit(packetEnd); // Ensure that the reader doesn't exceed packet bound
// Read protocol
ByteBuffer content;
// Read packet https://wiki.vg/Protocol#Packet_format
BinaryBuffer content;
if (!compressed) {
// Compression disabled, payload is following
content = readBuffer;
} else {
final int dataLength = Utils.readVarInt(readBuffer);
final int dataLength = readBuffer.readVarInt();
if (dataLength == 0) {
// Data is too small to be compressed, payload is following
content = readBuffer;
} else {
// Decompress to content buffer
content = workerContext.contentBuffer;
final var contentStartMark = content.mark();
try {
final var inflater = workerContext.inflater;
inflater.setInput(readBuffer);
inflater.inflate(content);
inflater.setInput(readBuffer.asByteBuffer(readBuffer.readerOffset(), packetEnd));
inflater.inflate(content.asByteBuffer(0, content.capacity()));
inflater.reset();
} catch (DataFormatException e) {
e.printStackTrace();
}
content.flip();
content.reset(contentStartMark);
}
}
// Process packet
final int packetId = Utils.readVarInt(content);
final int packetId = content.readVarInt();
try {
packetProcessor.process(this, packetId, content);
var finalBuffer = content.asByteBuffer(content.readerOffset(), packetEnd);
packetProcessor.process(this, packetId, finalBuffer);
} catch (Exception e) {
// Error while reading the packet
MinecraftServer.getExceptionManager().handleException(e);
break;
}
// Return to original state (before writing)
readBuffer.limit(limit).position(packetEnd);
readBuffer.reset(packetEnd, limit);
} catch (BufferUnderflowException e) {
readBuffer.reset();
this.cacheBuffer = ByteBuffer.allocateDirect(readBuffer.remaining())
.put(readBuffer).flip();
readBuffer.reset(beginMark);
this.cacheBuffer = BinaryBuffer.copy(readBuffer);
break;
}
}
}
public void consumeCache(ByteBuffer buffer) {
if (cacheBuffer == null) {
return;
}
buffer.put(cacheBuffer);
public void consumeCache(BinaryBuffer buffer) {
if (cacheBuffer != null) {
buffer.write(cacheBuffer);
this.cacheBuffer = null;
}
}
/**
* Sets the encryption key and add the codecs to the pipeline.
@ -197,11 +192,11 @@ public class NettyPlayerConnection extends PlayerConnection {
public void write(@NotNull ByteBuffer buffer) {
synchronized (tickBuffer) {
buffer.flip();
if (buffer.limit() > tickBuffer.remaining()) {
if (!tickBuffer.canWrite(buffer.limit())) {
// Tick buffer is full, flush before appending
flush();
}
this.tickBuffer.put(buffer);
this.tickBuffer.write(buffer);
}
}
@ -224,9 +219,9 @@ public class NettyPlayerConnection extends PlayerConnection {
public void flush() {
if (!channel.isOpen()) return;
synchronized (tickBuffer) {
if (tickBuffer.position() == 0) return;
if (tickBuffer.readableBytes() == 0) return;
try {
this.channel.write(tickBuffer.flip());
this.tickBuffer.writeChannel(channel);
} catch (IOException e) {
MinecraftServer.getExceptionManager().handleException(e);
} finally {

View File

@ -4,10 +4,10 @@ import net.minestom.server.MinecraftServer;
import net.minestom.server.entity.Player;
import net.minestom.server.network.PacketProcessor;
import net.minestom.server.network.player.NettyPlayerConnection;
import net.minestom.server.utils.binary.BinaryBuffer;
import org.jetbrains.annotations.ApiStatus;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
@ -48,16 +48,11 @@ public final class Worker {
}
var connection = connectionMap.get(channel);
try {
ByteBuffer readBuffer = workerContext.readBuffer;
var readBuffer = workerContext.readBuffer;
// Consume last incomplete packet
connection.consumeCache(readBuffer);
// Read socket
if (channel.read(readBuffer) == -1) {
// EOS
throw new IOException("Disconnected");
}
// Process data
readBuffer.flip();
// Read & process
readBuffer.readChannel(channel);
connection.processPackets(workerContext, packetProcessor);
} catch (IOException e) {
// TODO print exception? (should ignore disconnection)
@ -124,11 +119,11 @@ public final class Worker {
* Contains objects that we can be shared across all the connection of a {@link Worker worker}.
*/
public static final class Context {
public final ByteBuffer readBuffer = ByteBuffer.allocateDirect(Server.SOCKET_BUFFER_SIZE);
public final BinaryBuffer readBuffer = BinaryBuffer.ofSize(Server.SOCKET_BUFFER_SIZE);
/**
* Stores a single packet payload to be read.
*/
public final ByteBuffer contentBuffer = ByteBuffer.allocateDirect(Server.MAX_PACKET_SIZE);
public final BinaryBuffer contentBuffer = BinaryBuffer.ofSize(Server.MAX_PACKET_SIZE);
public final Inflater inflater = new Inflater();
public void clearBuffers() {

View File

@ -0,0 +1,162 @@
package net.minestom.server.utils.binary;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jglrxavpok.hephaistos.nbt.NBTReader;
import org.jglrxavpok.hephaistos.nbt.NBTWriter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
/**
* Manages off-heap memory.
* Not thread-safe.
*/
public final class BinaryBuffer {
private ByteBuffer nioBuffer; // To become a `MemorySegment` once released
private NBTReader nbtReader;
private NBTWriter nbtWriter;
private final int capacity;
private int readerOffset, writerOffset;
private BinaryBuffer(ByteBuffer buffer) {
this.nioBuffer = buffer;
this.capacity = buffer.capacity();
}
@ApiStatus.Internal
public static BinaryBuffer ofSize(int size) {
return new BinaryBuffer(ByteBuffer.allocateDirect(size));
}
public static BinaryBuffer copy(BinaryBuffer buffer) {
final int size = buffer.readableBytes();
final var temp = ByteBuffer.allocateDirect(size)
.put(buffer.asByteBuffer(0, size));
return new BinaryBuffer(temp);
}
public void write(ByteBuffer buffer) {
final int size = buffer.remaining();
// TODO jdk 13 put with index
asByteBuffer(writerOffset, writerOffset + size).put(buffer);
this.writerOffset += size;
}
public void write(BinaryBuffer buffer) {
write(buffer.asByteBuffer(buffer.readerOffset, buffer.writerOffset));
}
public int readVarInt() {
int value = 0;
final int maxRead = Math.min(5, readableBytes());
for (int i = 0; i < maxRead; i++) {
final int offset = readerOffset + i;
final byte k = nioBuffer.get(offset);
value |= (k & 0x7F) << i * 7;
if ((k & 0x80) != 128) {
this.readerOffset = offset + 1;
return value;
}
}
this.readerOffset += maxRead;
throw new RuntimeException("VarInt is too big");
}
public @NotNull Marker mark() {
return new Marker(readerOffset, writerOffset);
}
public void reset(int readerOffset, int writerOffset) {
this.readerOffset = readerOffset;
this.writerOffset = writerOffset;
}
public void reset(@NotNull Marker marker) {
reset(marker.readerOffset(), marker.writerOffset());
}
public boolean canWrite(int size) {
return writerOffset + size <= capacity;
}
public int capacity() {
return capacity;
}
public int readerOffset() {
return readerOffset;
}
public int writerOffset() {
return writerOffset;
}
public int readableBytes() {
return writerOffset - readerOffset;
}
public void clear() {
this.readerOffset = 0;
this.writerOffset = 0;
}
public ByteBuffer asByteBuffer(int reader, int writer) {
return nioBuffer.duplicate().position(reader).limit(writer);
}
public void writeChannel(WritableByteChannel channel) throws IOException {
final int count = channel.write(asByteBuffer(readerOffset, writerOffset));
if (count == -1) {
// EOS
throw new IOException("Disconnected");
}
this.readerOffset += count;
}
public void readChannel(ReadableByteChannel channel) throws IOException {
final int count = channel.read(asByteBuffer(readerOffset, capacity));
if (count == -1) {
// EOS
throw new IOException("Disconnected");
}
this.writerOffset += count;
}
@Override
public String toString() {
return "BinaryBuffer{" +
"readerOffset=" + readerOffset +
", writerOffset=" + writerOffset +
", capacity=" + capacity +
'}';
}
public static final class Marker {
private final int readerOffset, writerOffset;
private Marker(int readerOffset, int writerOffset) {
this.readerOffset = readerOffset;
this.writerOffset = writerOffset;
}
public int readerOffset() {
return readerOffset;
}
public int writerOffset() {
return writerOffset;
}
@Override
public String toString() {
return "Marker{" +
"readerOffset=" + readerOffset +
", writerOffset=" + writerOffset +
'}';
}
}
}