From 983850171b4268aaeb3e366988c0c73a1cc87caa Mon Sep 17 00:00:00 2001 From: TheMode Date: Tue, 14 Sep 2021 02:22:58 +0200 Subject: [PATCH] Use cleaner to reuse connection buffers Signed-off-by: TheMode --- .../player/PlayerSocketConnection.java | 27 +++++++----- .../minestom/server/utils/PacketUtils.java | 13 +++--- .../server/utils/binary/PooledBuffers.java | 41 +++++++++++++++++++ 3 files changed, 65 insertions(+), 16 deletions(-) diff --git a/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java b/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java index 487b19848..6c9ea15dd 100644 --- a/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java +++ b/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java @@ -35,6 +35,7 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.SocketChannel; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; import java.util.zip.DataFormatException; import java.util.zip.Inflater; @@ -75,7 +76,7 @@ public class PlayerSocketConnection extends PlayerConnection { private final Object bufferLock = new Object(); private final List waitingBuffers = new ArrayList<>(); - private BinaryBuffer tickBuffer = PooledBuffers.get(); + private final AtomicReference tickBuffer = new AtomicReference<>(PooledBuffers.get()); private volatile BinaryBuffer cacheBuffer; public PlayerSocketConnection(@NotNull Worker worker, @NotNull SocketChannel channel, SocketAddress remoteAddress) { @@ -83,6 +84,7 @@ public class PlayerSocketConnection extends PlayerConnection { this.worker = worker; this.channel = channel; this.remoteAddress = remoteAddress; + PooledBuffers.registerBuffer(this, tickBuffer); } public void processPackets(Worker.Context workerContext, PacketProcessor packetProcessor) { @@ -223,20 +225,25 @@ public class PlayerSocketConnection extends PlayerConnection { @ApiStatus.Internal public void write(@NotNull ByteBuffer buffer) { synchronized (bufferLock) { - final int capacity = tickBuffer.capacity(); + BinaryBuffer localBuffer = tickBuffer.getPlain(); + final int capacity = localBuffer.capacity(); final int size = buffer.remaining(); if (size <= capacity) { - if (!tickBuffer.canWrite(size)) flush(); - if (!isOnline()) return; - this.tickBuffer.write(buffer); + if (!localBuffer.canWrite(size)) { + flush(); + localBuffer = tickBuffer.getPlain(); + } + localBuffer.write(buffer); } else { final int bufferCount = size / capacity + 1; for (int i = 0; i < bufferCount; i++) { buffer.position(i * capacity); buffer.limit(Math.min(size, buffer.position() + capacity)); - if (!tickBuffer.canWrite(buffer.remaining())) flush(); - if (!isOnline()) return; - this.tickBuffer.write(buffer); + if (!localBuffer.canWrite(buffer.remaining())) { + flush(); + localBuffer = tickBuffer.getPlain(); + } + localBuffer.write(buffer); } } } @@ -262,11 +269,11 @@ public class PlayerSocketConnection extends PlayerConnection { boolean shouldDisconnect = false; if (!channel.isOpen()) return; synchronized (bufferLock) { - final BinaryBuffer localBuffer = this.tickBuffer; + final BinaryBuffer localBuffer = tickBuffer.getPlain(); if (localBuffer.readableBytes() == 0 && waitingBuffers.isEmpty()) return; // Update tick buffer try { - this.tickBuffer = PooledBuffers.get(); + this.tickBuffer.setPlain(PooledBuffers.get()); if (encrypted) { final Cipher cipher = encryptCipher; // Encrypt data first diff --git a/src/main/java/net/minestom/server/utils/PacketUtils.java b/src/main/java/net/minestom/server/utils/PacketUtils.java index e4b684e9d..2d8c96235 100644 --- a/src/main/java/net/minestom/server/utils/PacketUtils.java +++ b/src/main/java/net/minestom/server/utils/PacketUtils.java @@ -266,14 +266,14 @@ public final class PacketUtils { private static final class ViewableStorage { private final Viewable viewable; private final Map> entityIdMap = new HashMap<>(); - private BinaryBuffer buffer; + private final BinaryBuffer buffer = PooledBuffers.get(); private ViewableStorage(Viewable viewable) { this.viewable = viewable; + PooledBuffers.registerBuffer(this, buffer); } private synchronized void append(ServerPacket serverPacket, PlayerConnection connection) { - if (buffer == null) buffer = PooledBuffers.get(); final ByteBuffer framedPacket = createFramedPacket(serverPacket).flip(); final int packetSize = framedPacket.limit(); if (packetSize >= buffer.capacity()) { @@ -291,7 +291,8 @@ public final class PacketUtils { } private synchronized void process() { - if (buffer == null) return; // TODO: there is nothing in the buffer, remove from VIEWABLE_STORAGE_MAP + if (buffer.writerOffset() == 0) + return; // TODO: there is nothing in the buffer, remove from VIEWABLE_STORAGE_MAP for (Player player : viewable.getViewers()) { PlayerConnection connection = player.getPlayerConnection(); Consumer writer = connection instanceof PlayerSocketConnection @@ -321,7 +322,7 @@ public final class PacketUtils { } // Clear state this.entityIdMap.clear(); - this.buffer = null; + this.buffer.clear(); } private synchronized void processSingle(ByteBuffer buffer, PlayerConnection exception) { @@ -329,8 +330,8 @@ public final class PacketUtils { for (Player player : viewable.getViewers()) { PlayerConnection connection = player.getPlayerConnection(); if (Objects.equals(connection, exception)) continue; - if(connection instanceof PlayerSocketConnection){ - ((PlayerSocketConnection)connection).write(buffer.position(0)); + if (connection instanceof PlayerSocketConnection) { + ((PlayerSocketConnection) connection).write(buffer.position(0)); } // TODO for non-socket connection } diff --git a/src/main/java/net/minestom/server/utils/binary/PooledBuffers.java b/src/main/java/net/minestom/server/utils/binary/PooledBuffers.java index 7851104f8..b37fab852 100644 --- a/src/main/java/net/minestom/server/utils/binary/PooledBuffers.java +++ b/src/main/java/net/minestom/server/utils/binary/PooledBuffers.java @@ -2,16 +2,19 @@ package net.minestom.server.utils.binary; import org.jetbrains.annotations.ApiStatus; +import java.lang.ref.Cleaner; import java.lang.ref.SoftReference; import java.util.Objects; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicReference; @ApiStatus.Internal @ApiStatus.Experimental public final class PooledBuffers { private final static Queue> POOLED_BUFFERS = new ConcurrentLinkedQueue<>(); private final static int BUFFER_SIZE = 262_143; + private final static Cleaner CLEANER = Cleaner.create(); public static BinaryBuffer get() { BinaryBuffer buffer = null; @@ -28,7 +31,45 @@ public final class PooledBuffers { POOLED_BUFFERS.add(new SoftReference<>(buffer)); } + public static int count() { + return POOLED_BUFFERS.size(); + } + public static int bufferSize() { return BUFFER_SIZE; } + + public static void registerBuffer(Object ref, AtomicReference buffer) { + CLEANER.register(ref, new BufferRefCleaner(buffer)); + } + + public static void registerBuffer(Object ref, BinaryBuffer buffer) { + CLEANER.register(ref, new BufferCleaner(buffer)); + } + + private static final class BufferRefCleaner implements Runnable { + private final AtomicReference bufferRef; + + public BufferRefCleaner(AtomicReference bufferRef) { + this.bufferRef = bufferRef; + } + + @Override + public void run() { + add(bufferRef.get()); + } + } + + private static final class BufferCleaner implements Runnable { + private final BinaryBuffer buffer; + + public BufferCleaner(BinaryBuffer buffer) { + this.buffer = buffer; + } + + @Override + public void run() { + add(buffer); + } + } }