Use cleaner to reuse connection buffers

Signed-off-by: TheMode <themode@outlook.fr>
This commit is contained in:
TheMode 2021-09-14 02:22:58 +02:00
parent 561a10bddc
commit 983850171b
3 changed files with 65 additions and 16 deletions

View File

@ -35,6 +35,7 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.DataFormatException; import java.util.zip.DataFormatException;
import java.util.zip.Inflater; import java.util.zip.Inflater;
@ -75,7 +76,7 @@ public class PlayerSocketConnection extends PlayerConnection {
private final Object bufferLock = new Object(); private final Object bufferLock = new Object();
private final List<BinaryBuffer> waitingBuffers = new ArrayList<>(); private final List<BinaryBuffer> waitingBuffers = new ArrayList<>();
private BinaryBuffer tickBuffer = PooledBuffers.get(); private final AtomicReference<BinaryBuffer> tickBuffer = new AtomicReference<>(PooledBuffers.get());
private volatile BinaryBuffer cacheBuffer; private volatile BinaryBuffer cacheBuffer;
public PlayerSocketConnection(@NotNull Worker worker, @NotNull SocketChannel channel, SocketAddress remoteAddress) { public PlayerSocketConnection(@NotNull Worker worker, @NotNull SocketChannel channel, SocketAddress remoteAddress) {
@ -83,6 +84,7 @@ public class PlayerSocketConnection extends PlayerConnection {
this.worker = worker; this.worker = worker;
this.channel = channel; this.channel = channel;
this.remoteAddress = remoteAddress; this.remoteAddress = remoteAddress;
PooledBuffers.registerBuffer(this, tickBuffer);
} }
public void processPackets(Worker.Context workerContext, PacketProcessor packetProcessor) { public void processPackets(Worker.Context workerContext, PacketProcessor packetProcessor) {
@ -223,20 +225,25 @@ public class PlayerSocketConnection extends PlayerConnection {
@ApiStatus.Internal @ApiStatus.Internal
public void write(@NotNull ByteBuffer buffer) { public void write(@NotNull ByteBuffer buffer) {
synchronized (bufferLock) { synchronized (bufferLock) {
final int capacity = tickBuffer.capacity(); BinaryBuffer localBuffer = tickBuffer.getPlain();
final int capacity = localBuffer.capacity();
final int size = buffer.remaining(); final int size = buffer.remaining();
if (size <= capacity) { if (size <= capacity) {
if (!tickBuffer.canWrite(size)) flush(); if (!localBuffer.canWrite(size)) {
if (!isOnline()) return; flush();
this.tickBuffer.write(buffer); localBuffer = tickBuffer.getPlain();
}
localBuffer.write(buffer);
} else { } else {
final int bufferCount = size / capacity + 1; final int bufferCount = size / capacity + 1;
for (int i = 0; i < bufferCount; i++) { for (int i = 0; i < bufferCount; i++) {
buffer.position(i * capacity); buffer.position(i * capacity);
buffer.limit(Math.min(size, buffer.position() + capacity)); buffer.limit(Math.min(size, buffer.position() + capacity));
if (!tickBuffer.canWrite(buffer.remaining())) flush(); if (!localBuffer.canWrite(buffer.remaining())) {
if (!isOnline()) return; flush();
this.tickBuffer.write(buffer); localBuffer = tickBuffer.getPlain();
}
localBuffer.write(buffer);
} }
} }
} }
@ -262,11 +269,11 @@ public class PlayerSocketConnection extends PlayerConnection {
boolean shouldDisconnect = false; boolean shouldDisconnect = false;
if (!channel.isOpen()) return; if (!channel.isOpen()) return;
synchronized (bufferLock) { synchronized (bufferLock) {
final BinaryBuffer localBuffer = this.tickBuffer; final BinaryBuffer localBuffer = tickBuffer.getPlain();
if (localBuffer.readableBytes() == 0 && waitingBuffers.isEmpty()) return; if (localBuffer.readableBytes() == 0 && waitingBuffers.isEmpty()) return;
// Update tick buffer // Update tick buffer
try { try {
this.tickBuffer = PooledBuffers.get(); this.tickBuffer.setPlain(PooledBuffers.get());
if (encrypted) { if (encrypted) {
final Cipher cipher = encryptCipher; final Cipher cipher = encryptCipher;
// Encrypt data first // Encrypt data first

View File

@ -266,14 +266,14 @@ public final class PacketUtils {
private static final class ViewableStorage { private static final class ViewableStorage {
private final Viewable viewable; private final Viewable viewable;
private final Map<PlayerConnection, List<IntIntPair>> entityIdMap = new HashMap<>(); private final Map<PlayerConnection, List<IntIntPair>> entityIdMap = new HashMap<>();
private BinaryBuffer buffer; private final BinaryBuffer buffer = PooledBuffers.get();
private ViewableStorage(Viewable viewable) { private ViewableStorage(Viewable viewable) {
this.viewable = viewable; this.viewable = viewable;
PooledBuffers.registerBuffer(this, buffer);
} }
private synchronized void append(ServerPacket serverPacket, PlayerConnection connection) { private synchronized void append(ServerPacket serverPacket, PlayerConnection connection) {
if (buffer == null) buffer = PooledBuffers.get();
final ByteBuffer framedPacket = createFramedPacket(serverPacket).flip(); final ByteBuffer framedPacket = createFramedPacket(serverPacket).flip();
final int packetSize = framedPacket.limit(); final int packetSize = framedPacket.limit();
if (packetSize >= buffer.capacity()) { if (packetSize >= buffer.capacity()) {
@ -291,7 +291,8 @@ public final class PacketUtils {
} }
private synchronized void process() { private synchronized void process() {
if (buffer == null) return; // TODO: there is nothing in the buffer, remove from VIEWABLE_STORAGE_MAP if (buffer.writerOffset() == 0)
return; // TODO: there is nothing in the buffer, remove from VIEWABLE_STORAGE_MAP
for (Player player : viewable.getViewers()) { for (Player player : viewable.getViewers()) {
PlayerConnection connection = player.getPlayerConnection(); PlayerConnection connection = player.getPlayerConnection();
Consumer<ByteBuffer> writer = connection instanceof PlayerSocketConnection Consumer<ByteBuffer> writer = connection instanceof PlayerSocketConnection
@ -321,7 +322,7 @@ public final class PacketUtils {
} }
// Clear state // Clear state
this.entityIdMap.clear(); this.entityIdMap.clear();
this.buffer = null; this.buffer.clear();
} }
private synchronized void processSingle(ByteBuffer buffer, PlayerConnection exception) { private synchronized void processSingle(ByteBuffer buffer, PlayerConnection exception) {

View File

@ -2,16 +2,19 @@ package net.minestom.server.utils.binary;
import org.jetbrains.annotations.ApiStatus; import org.jetbrains.annotations.ApiStatus;
import java.lang.ref.Cleaner;
import java.lang.ref.SoftReference; import java.lang.ref.SoftReference;
import java.util.Objects; import java.util.Objects;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
@ApiStatus.Internal @ApiStatus.Internal
@ApiStatus.Experimental @ApiStatus.Experimental
public final class PooledBuffers { public final class PooledBuffers {
private final static Queue<SoftReference<BinaryBuffer>> POOLED_BUFFERS = new ConcurrentLinkedQueue<>(); private final static Queue<SoftReference<BinaryBuffer>> POOLED_BUFFERS = new ConcurrentLinkedQueue<>();
private final static int BUFFER_SIZE = 262_143; private final static int BUFFER_SIZE = 262_143;
private final static Cleaner CLEANER = Cleaner.create();
public static BinaryBuffer get() { public static BinaryBuffer get() {
BinaryBuffer buffer = null; BinaryBuffer buffer = null;
@ -28,7 +31,45 @@ public final class PooledBuffers {
POOLED_BUFFERS.add(new SoftReference<>(buffer)); POOLED_BUFFERS.add(new SoftReference<>(buffer));
} }
public static int count() {
return POOLED_BUFFERS.size();
}
public static int bufferSize() { public static int bufferSize() {
return BUFFER_SIZE; return BUFFER_SIZE;
} }
public static void registerBuffer(Object ref, AtomicReference<BinaryBuffer> buffer) {
CLEANER.register(ref, new BufferRefCleaner(buffer));
}
public static void registerBuffer(Object ref, BinaryBuffer buffer) {
CLEANER.register(ref, new BufferCleaner(buffer));
}
private static final class BufferRefCleaner implements Runnable {
private final AtomicReference<BinaryBuffer> bufferRef;
public BufferRefCleaner(AtomicReference<BinaryBuffer> bufferRef) {
this.bufferRef = bufferRef;
}
@Override
public void run() {
add(bufferRef.get());
}
}
private static final class BufferCleaner implements Runnable {
private final BinaryBuffer buffer;
public BufferCleaner(BinaryBuffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
add(buffer);
}
}
} }