From 455c21208e048b5673bca605d3f86eda5ca722c2 Mon Sep 17 00:00:00 2001 From: TheMode Date: Fri, 6 Aug 2021 14:21:11 +0200 Subject: [PATCH] WIP BinaryBuffer --- .../server/network/PacketProcessor.java | 4 +- .../network/player/NettyPlayerConnection.java | 67 ++++---- .../server/network/socket/Worker.java | 17 +- .../server/utils/binary/BinaryBuffer.java | 162 ++++++++++++++++++ 4 files changed, 200 insertions(+), 50 deletions(-) create mode 100644 src/main/java/net/minestom/server/utils/binary/BinaryBuffer.java diff --git a/src/main/java/net/minestom/server/network/PacketProcessor.java b/src/main/java/net/minestom/server/network/PacketProcessor.java index 3b1fd51b2..81a7f8211 100644 --- a/src/main/java/net/minestom/server/network/PacketProcessor.java +++ b/src/main/java/net/minestom/server/network/PacketProcessor.java @@ -127,9 +127,7 @@ public final class PacketProcessor { } catch (Exception e) { final Player player = connection.getPlayer(); final String username = player != null ? player.getUsername() : "null"; - LOGGER.warn("Connection {} ({}) sent an unexpected packet.", - connection.getRemoteAddress(), - username); + LOGGER.warn("Connection {} ({}) sent an unexpected packet.", connection.getRemoteAddress(), username); MinecraftServer.getExceptionManager().handleException(e); } } diff --git a/src/main/java/net/minestom/server/network/player/NettyPlayerConnection.java b/src/main/java/net/minestom/server/network/player/NettyPlayerConnection.java index e57294f1f..ea77c3270 100644 --- a/src/main/java/net/minestom/server/network/player/NettyPlayerConnection.java +++ b/src/main/java/net/minestom/server/network/player/NettyPlayerConnection.java @@ -14,7 +14,7 @@ import net.minestom.server.network.packet.server.login.SetCompressionPacket; import net.minestom.server.network.socket.Server; import net.minestom.server.network.socket.Worker; import net.minestom.server.utils.PacketUtils; -import net.minestom.server.utils.Utils; +import net.minestom.server.utils.binary.BinaryBuffer; import net.minestom.server.utils.validate.Check; import org.jetbrains.annotations.ApiStatus; import org.jetbrains.annotations.NotNull; @@ -62,8 +62,8 @@ public class NettyPlayerConnection extends PlayerConnection { private UUID bungeeUuid; private PlayerSkin bungeeSkin; - private final ByteBuffer tickBuffer = ByteBuffer.allocateDirect(Server.SOCKET_BUFFER_SIZE); - private volatile ByteBuffer cacheBuffer; + private final BinaryBuffer tickBuffer = BinaryBuffer.ofSize(Server.SOCKET_BUFFER_SIZE); + private volatile BinaryBuffer cacheBuffer; public NettyPlayerConnection(@NotNull Worker worker, @NotNull SocketChannel channel, SocketAddress remoteAddress) { super(); @@ -74,73 +74,68 @@ public class NettyPlayerConnection extends PlayerConnection { public void processPackets(Worker.Context workerContext, PacketProcessor packetProcessor) { final var readBuffer = workerContext.readBuffer; - final int limit = readBuffer.limit(); + final int limit = readBuffer.writerOffset(); // Read all packets - while (readBuffer.remaining() > 0) { - readBuffer.mark(); // Mark the beginning of the packet + while (readBuffer.readableBytes() > 0) { + final var beginMark = readBuffer.mark(); try { - // Read packet - final int packetLength = Utils.readVarInt(readBuffer); - final int packetEnd = readBuffer.position() + packetLength; - if (packetEnd > readBuffer.limit()) { + // Ensure that the buffer contains the full packet (or wait for next socket read) + final int packetLength = readBuffer.readVarInt(); + final int packetEnd = readBuffer.readerOffset() + packetLength; + if (packetEnd > readBuffer.writerOffset()) { // Integrity fail throw new BufferUnderflowException(); } - - readBuffer.limit(packetEnd); // Ensure that the reader doesn't exceed packet bound - - // Read protocol - ByteBuffer content; + // Read packet https://wiki.vg/Protocol#Packet_format + BinaryBuffer content; if (!compressed) { // Compression disabled, payload is following content = readBuffer; } else { - final int dataLength = Utils.readVarInt(readBuffer); + final int dataLength = readBuffer.readVarInt(); if (dataLength == 0) { // Data is too small to be compressed, payload is following content = readBuffer; } else { // Decompress to content buffer content = workerContext.contentBuffer; + final var contentStartMark = content.mark(); try { final var inflater = workerContext.inflater; - inflater.setInput(readBuffer); - inflater.inflate(content); + inflater.setInput(readBuffer.asByteBuffer(readBuffer.readerOffset(), packetEnd)); + inflater.inflate(content.asByteBuffer(0, content.capacity())); inflater.reset(); } catch (DataFormatException e) { e.printStackTrace(); } - content.flip(); + content.reset(contentStartMark); } } - // Process packet - final int packetId = Utils.readVarInt(content); + final int packetId = content.readVarInt(); try { - packetProcessor.process(this, packetId, content); + var finalBuffer = content.asByteBuffer(content.readerOffset(), packetEnd); + packetProcessor.process(this, packetId, finalBuffer); } catch (Exception e) { // Error while reading the packet MinecraftServer.getExceptionManager().handleException(e); break; } - // Return to original state (before writing) - readBuffer.limit(limit).position(packetEnd); + readBuffer.reset(packetEnd, limit); } catch (BufferUnderflowException e) { - readBuffer.reset(); - this.cacheBuffer = ByteBuffer.allocateDirect(readBuffer.remaining()) - .put(readBuffer).flip(); + readBuffer.reset(beginMark); + this.cacheBuffer = BinaryBuffer.copy(readBuffer); break; } } } - public void consumeCache(ByteBuffer buffer) { - if (cacheBuffer == null) { - return; + public void consumeCache(BinaryBuffer buffer) { + if (cacheBuffer != null) { + buffer.write(cacheBuffer); + this.cacheBuffer = null; } - buffer.put(cacheBuffer); - this.cacheBuffer = null; } /** @@ -197,11 +192,11 @@ public class NettyPlayerConnection extends PlayerConnection { public void write(@NotNull ByteBuffer buffer) { synchronized (tickBuffer) { buffer.flip(); - if (buffer.limit() > tickBuffer.remaining()) { + if (!tickBuffer.canWrite(buffer.limit())) { // Tick buffer is full, flush before appending flush(); } - this.tickBuffer.put(buffer); + this.tickBuffer.write(buffer); } } @@ -224,9 +219,9 @@ public class NettyPlayerConnection extends PlayerConnection { public void flush() { if (!channel.isOpen()) return; synchronized (tickBuffer) { - if (tickBuffer.position() == 0) return; + if (tickBuffer.readableBytes() == 0) return; try { - this.channel.write(tickBuffer.flip()); + this.tickBuffer.writeChannel(channel); } catch (IOException e) { MinecraftServer.getExceptionManager().handleException(e); } finally { diff --git a/src/main/java/net/minestom/server/network/socket/Worker.java b/src/main/java/net/minestom/server/network/socket/Worker.java index 4d5762ebe..c2fe6581e 100644 --- a/src/main/java/net/minestom/server/network/socket/Worker.java +++ b/src/main/java/net/minestom/server/network/socket/Worker.java @@ -4,10 +4,10 @@ import net.minestom.server.MinecraftServer; import net.minestom.server.entity.Player; import net.minestom.server.network.PacketProcessor; import net.minestom.server.network.player.NettyPlayerConnection; +import net.minestom.server.utils.binary.BinaryBuffer; import org.jetbrains.annotations.ApiStatus; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; @@ -48,16 +48,11 @@ public final class Worker { } var connection = connectionMap.get(channel); try { - ByteBuffer readBuffer = workerContext.readBuffer; + var readBuffer = workerContext.readBuffer; // Consume last incomplete packet connection.consumeCache(readBuffer); - // Read socket - if (channel.read(readBuffer) == -1) { - // EOS - throw new IOException("Disconnected"); - } - // Process data - readBuffer.flip(); + // Read & process + readBuffer.readChannel(channel); connection.processPackets(workerContext, packetProcessor); } catch (IOException e) { // TODO print exception? (should ignore disconnection) @@ -124,11 +119,11 @@ public final class Worker { * Contains objects that we can be shared across all the connection of a {@link Worker worker}. */ public static final class Context { - public final ByteBuffer readBuffer = ByteBuffer.allocateDirect(Server.SOCKET_BUFFER_SIZE); + public final BinaryBuffer readBuffer = BinaryBuffer.ofSize(Server.SOCKET_BUFFER_SIZE); /** * Stores a single packet payload to be read. */ - public final ByteBuffer contentBuffer = ByteBuffer.allocateDirect(Server.MAX_PACKET_SIZE); + public final BinaryBuffer contentBuffer = BinaryBuffer.ofSize(Server.MAX_PACKET_SIZE); public final Inflater inflater = new Inflater(); public void clearBuffers() { diff --git a/src/main/java/net/minestom/server/utils/binary/BinaryBuffer.java b/src/main/java/net/minestom/server/utils/binary/BinaryBuffer.java new file mode 100644 index 000000000..8f7b862ff --- /dev/null +++ b/src/main/java/net/minestom/server/utils/binary/BinaryBuffer.java @@ -0,0 +1,162 @@ +package net.minestom.server.utils.binary; + +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; +import org.jglrxavpok.hephaistos.nbt.NBTReader; +import org.jglrxavpok.hephaistos.nbt.NBTWriter; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; + +/** + * Manages off-heap memory. + * Not thread-safe. + */ +public final class BinaryBuffer { + private ByteBuffer nioBuffer; // To become a `MemorySegment` once released + private NBTReader nbtReader; + private NBTWriter nbtWriter; + + private final int capacity; + private int readerOffset, writerOffset; + + private BinaryBuffer(ByteBuffer buffer) { + this.nioBuffer = buffer; + this.capacity = buffer.capacity(); + } + + @ApiStatus.Internal + public static BinaryBuffer ofSize(int size) { + return new BinaryBuffer(ByteBuffer.allocateDirect(size)); + } + + public static BinaryBuffer copy(BinaryBuffer buffer) { + final int size = buffer.readableBytes(); + final var temp = ByteBuffer.allocateDirect(size) + .put(buffer.asByteBuffer(0, size)); + return new BinaryBuffer(temp); + } + + public void write(ByteBuffer buffer) { + final int size = buffer.remaining(); + // TODO jdk 13 put with index + asByteBuffer(writerOffset, writerOffset + size).put(buffer); + this.writerOffset += size; + } + + public void write(BinaryBuffer buffer) { + write(buffer.asByteBuffer(buffer.readerOffset, buffer.writerOffset)); + } + + public int readVarInt() { + int value = 0; + final int maxRead = Math.min(5, readableBytes()); + for (int i = 0; i < maxRead; i++) { + final int offset = readerOffset + i; + final byte k = nioBuffer.get(offset); + value |= (k & 0x7F) << i * 7; + if ((k & 0x80) != 128) { + this.readerOffset = offset + 1; + return value; + } + } + this.readerOffset += maxRead; + throw new RuntimeException("VarInt is too big"); + } + + public @NotNull Marker mark() { + return new Marker(readerOffset, writerOffset); + } + + public void reset(int readerOffset, int writerOffset) { + this.readerOffset = readerOffset; + this.writerOffset = writerOffset; + } + + public void reset(@NotNull Marker marker) { + reset(marker.readerOffset(), marker.writerOffset()); + } + + public boolean canWrite(int size) { + return writerOffset + size <= capacity; + } + + public int capacity() { + return capacity; + } + + public int readerOffset() { + return readerOffset; + } + + public int writerOffset() { + return writerOffset; + } + + public int readableBytes() { + return writerOffset - readerOffset; + } + + public void clear() { + this.readerOffset = 0; + this.writerOffset = 0; + } + + public ByteBuffer asByteBuffer(int reader, int writer) { + return nioBuffer.duplicate().position(reader).limit(writer); + } + + public void writeChannel(WritableByteChannel channel) throws IOException { + final int count = channel.write(asByteBuffer(readerOffset, writerOffset)); + if (count == -1) { + // EOS + throw new IOException("Disconnected"); + } + this.readerOffset += count; + } + + public void readChannel(ReadableByteChannel channel) throws IOException { + final int count = channel.read(asByteBuffer(readerOffset, capacity)); + if (count == -1) { + // EOS + throw new IOException("Disconnected"); + } + this.writerOffset += count; + } + + @Override + public String toString() { + return "BinaryBuffer{" + + "readerOffset=" + readerOffset + + ", writerOffset=" + writerOffset + + ", capacity=" + capacity + + '}'; + } + + public static final class Marker { + private final int readerOffset, writerOffset; + + private Marker(int readerOffset, int writerOffset) { + this.readerOffset = readerOffset; + this.writerOffset = writerOffset; + } + + public int readerOffset() { + return readerOffset; + } + + public int writerOffset() { + return writerOffset; + } + + @Override + public String toString() { + return "Marker{" + + "readerOffset=" + readerOffset + + ", writerOffset=" + writerOffset + + '}'; + } + } +}