Use soft references for pooled buffers

Signed-off-by: TheMode <themode@outlook.fr>
This commit is contained in:
TheMode 2021-09-04 02:20:03 +02:00
parent 538ef75552
commit df92939ddd

View File

@ -27,6 +27,7 @@ import javax.crypto.Cipher;
import javax.crypto.SecretKey; import javax.crypto.SecretKey;
import javax.crypto.ShortBufferException; import javax.crypto.ShortBufferException;
import java.io.IOException; import java.io.IOException;
import java.lang.ref.SoftReference;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.BufferUnderflowException; import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -44,7 +45,7 @@ import java.util.zip.DataFormatException;
@ApiStatus.Internal @ApiStatus.Internal
public class PlayerSocketConnection extends PlayerConnection { public class PlayerSocketConnection extends PlayerConnection {
private final static Logger LOGGER = LoggerFactory.getLogger(PlayerSocketConnection.class); private final static Logger LOGGER = LoggerFactory.getLogger(PlayerSocketConnection.class);
private final static Queue<BinaryBuffer> POOLED_BUFFERS = new ConcurrentLinkedQueue<>(); private final static Queue<SoftReference<BinaryBuffer>> POOLED_BUFFERS = new ConcurrentLinkedQueue<>();
private final static int BUFFER_SIZE = 262_143; private final static int BUFFER_SIZE = 262_143;
private final Worker worker; private final Worker worker;
@ -256,26 +257,29 @@ public class PlayerSocketConnection extends PlayerConnection {
@Override @Override
public void flush() { public void flush() {
boolean shouldDisconnect = false;
if (!channel.isOpen()) return; if (!channel.isOpen()) return;
if (tickBuffer.readableBytes() == 0 && waitingBuffers.isEmpty()) return;
synchronized (bufferLock) { synchronized (bufferLock) {
if (tickBuffer.readableBytes() == 0 && waitingBuffers.isEmpty()) return; final BinaryBuffer localBuffer = this.tickBuffer;
if (localBuffer.readableBytes() == 0 && waitingBuffers.isEmpty()) return;
// Update tick buffer
this.tickBuffer = getPooledBuffer();
if (encrypted) { if (encrypted) {
final Cipher cipher = encryptCipher; final Cipher cipher = encryptCipher;
// Encrypt data first // Encrypt data first
final int remainingBytes = tickBuffer.readableBytes(); final int remainingBytes = localBuffer.readableBytes();
final byte[] bytes = tickBuffer.readRemainingBytes(); final byte[] bytes = localBuffer.readRemainingBytes();
byte[] outTempArray = new byte[cipher.getOutputSize(remainingBytes)]; byte[] outTempArray = new byte[cipher.getOutputSize(remainingBytes)];
try { try {
cipher.update(bytes, 0, remainingBytes, outTempArray); cipher.update(bytes, 0, remainingBytes, outTempArray);
} catch (ShortBufferException e) { } catch (ShortBufferException e) {
MinecraftServer.getExceptionManager().handleException(e); MinecraftServer.getExceptionManager().handleException(e);
} }
this.tickBuffer.clear(); localBuffer.clear();
this.tickBuffer.writeBytes(outTempArray); localBuffer.writeBytes(outTempArray);
} }
this.waitingBuffers.add(tickBuffer); this.waitingBuffers.add(localBuffer);
Iterator<BinaryBuffer> iterator = waitingBuffers.iterator(); Iterator<BinaryBuffer> iterator = waitingBuffers.iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
BinaryBuffer waitingBuffer = iterator.next(); BinaryBuffer waitingBuffer = iterator.next();
@ -283,14 +287,20 @@ public class PlayerSocketConnection extends PlayerConnection {
if (!waitingBuffer.writeChannel(channel)) break; if (!waitingBuffer.writeChannel(channel)) break;
iterator.remove(); iterator.remove();
waitingBuffer.clear(); waitingBuffer.clear();
POOLED_BUFFERS.add(waitingBuffer); POOLED_BUFFERS.add(new SoftReference<>(waitingBuffer));
} catch (IOException e) { } catch (IOException e) {
MinecraftServer.getExceptionManager().handleException(e); final String message = e.getMessage();
if (message == null ||
(!message.equals("Broken pipe") && !message.equals("Connection reset by peer"))) {
MinecraftServer.getExceptionManager().handleException(e);
}
shouldDisconnect = true;
} }
} }
// Update tick buffer // Update tick buffer
this.tickBuffer = getPooledBuffer(); this.tickBuffer = getPooledBuffer();
} }
if (shouldDisconnect) disconnect();
} }
@Override @Override
@ -314,7 +324,10 @@ public class PlayerSocketConnection extends PlayerConnection {
public void disconnect() { public void disconnect() {
this.worker.disconnect(this, channel); this.worker.disconnect(this, channel);
synchronized (bufferLock) { synchronized (bufferLock) {
POOLED_BUFFERS.addAll(waitingBuffers); for (BinaryBuffer waitingBuffer : waitingBuffers) {
POOLED_BUFFERS.add(new SoftReference<>(waitingBuffer));
}
this.waitingBuffers.clear();
} }
} }
@ -452,12 +465,17 @@ public class PlayerSocketConnection extends PlayerConnection {
} }
private static BinaryBuffer getPooledBuffer() { private static BinaryBuffer getPooledBuffer() {
BinaryBuffer newBuffer = POOLED_BUFFERS.poll(); BinaryBuffer buffer = null;
if (newBuffer == null) { SoftReference<BinaryBuffer> ref;
newBuffer = BinaryBuffer.ofSize(BUFFER_SIZE); while ((ref = POOLED_BUFFERS.poll()) != null) {
} else { buffer = ref.get();
newBuffer.clear(); if (buffer != null) break;
} }
return newBuffer; if (buffer == null) {
buffer = BinaryBuffer.ofSize(BUFFER_SIZE);
} else {
buffer.clear();
}
return buffer;
} }
} }