Improve performance when slow clients are connected

Signed-off-by: TheMode <themode@outlook.fr>
This commit is contained in:
TheMode 2021-09-02 15:44:36 +02:00
parent 463a46ccc0
commit 399eb860a7
3 changed files with 53 additions and 33 deletions

View File

@ -12,7 +12,6 @@ import net.minestom.server.network.packet.FramedPacket;
import net.minestom.server.network.packet.server.ComponentHoldingServerPacket; import net.minestom.server.network.packet.server.ComponentHoldingServerPacket;
import net.minestom.server.network.packet.server.ServerPacket; import net.minestom.server.network.packet.server.ServerPacket;
import net.minestom.server.network.packet.server.login.SetCompressionPacket; import net.minestom.server.network.packet.server.login.SetCompressionPacket;
import net.minestom.server.network.socket.Server;
import net.minestom.server.network.socket.Worker; import net.minestom.server.network.socket.Worker;
import net.minestom.server.utils.PacketUtils; import net.minestom.server.utils.PacketUtils;
import net.minestom.server.utils.Utils; import net.minestom.server.utils.Utils;
@ -32,10 +31,9 @@ import java.net.SocketAddress;
import java.nio.BufferUnderflowException; import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.Map; import java.util.*;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.zip.DataFormatException; import java.util.zip.DataFormatException;
/** /**
@ -46,6 +44,8 @@ import java.util.zip.DataFormatException;
@ApiStatus.Internal @ApiStatus.Internal
public class PlayerSocketConnection extends PlayerConnection { public class PlayerSocketConnection extends PlayerConnection {
private final static Logger LOGGER = LoggerFactory.getLogger(PlayerSocketConnection.class); private final static Logger LOGGER = LoggerFactory.getLogger(PlayerSocketConnection.class);
private final static Queue<BinaryBuffer> POOLED_BUFFERS = new ConcurrentLinkedQueue<>();
private final static int BUFFER_SIZE = 262_143;
private final Worker worker; private final Worker worker;
private final SocketChannel channel; private final SocketChannel channel;
@ -73,7 +73,9 @@ public class PlayerSocketConnection extends PlayerConnection {
private UUID bungeeUuid; private UUID bungeeUuid;
private PlayerSkin bungeeSkin; private PlayerSkin bungeeSkin;
private final BinaryBuffer tickBuffer = BinaryBuffer.ofSize(Server.SOCKET_BUFFER_SIZE); private final Object bufferLock = new Object();
private final List<BinaryBuffer> waitingBuffers = new ArrayList<>();
private BinaryBuffer tickBuffer = BinaryBuffer.ofSize(BUFFER_SIZE);
private volatile BinaryBuffer cacheBuffer; private volatile BinaryBuffer cacheBuffer;
public PlayerSocketConnection(@NotNull Worker worker, @NotNull SocketChannel channel, SocketAddress remoteAddress) { public PlayerSocketConnection(@NotNull Worker worker, @NotNull SocketChannel channel, SocketAddress remoteAddress) {
@ -227,7 +229,7 @@ public class PlayerSocketConnection extends PlayerConnection {
} }
public void write(@NotNull ByteBuffer buffer) { public void write(@NotNull ByteBuffer buffer) {
synchronized (tickBuffer) { synchronized (bufferLock) {
if (!tickBuffer.canWrite(buffer.position())) { if (!tickBuffer.canWrite(buffer.position())) {
// Tick buffer is full, flush before appending // Tick buffer is full, flush before appending
flush(); flush();
@ -246,7 +248,7 @@ public class PlayerSocketConnection extends PlayerConnection {
} }
public void writeAndFlush(@NotNull ServerPacket packet) { public void writeAndFlush(@NotNull ServerPacket packet) {
synchronized (tickBuffer) { synchronized (bufferLock) {
write(packet); write(packet);
flush(); flush();
} }
@ -255,27 +257,42 @@ public class PlayerSocketConnection extends PlayerConnection {
@Override @Override
public void flush() { public void flush() {
if (!channel.isOpen()) return; if (!channel.isOpen()) return;
synchronized (tickBuffer) { if (tickBuffer.readableBytes() == 0 && waitingBuffers.isEmpty()) return;
if (tickBuffer.readableBytes() == 0) return; synchronized (bufferLock) {
try { if (tickBuffer.readableBytes() == 0 && waitingBuffers.isEmpty()) return;
if (encrypted) { if (encrypted) {
final Cipher cipher = encryptCipher; final Cipher cipher = encryptCipher;
// Encrypt data first // Encrypt data first
final int remainingBytes = tickBuffer.readableBytes(); final int remainingBytes = tickBuffer.readableBytes();
final byte[] bytes = tickBuffer.readRemainingBytes(); final byte[] bytes = tickBuffer.readRemainingBytes();
byte[] outTempArray = new byte[cipher.getOutputSize(remainingBytes)]; byte[] outTempArray = new byte[cipher.getOutputSize(remainingBytes)];
try {
cipher.update(bytes, 0, remainingBytes, outTempArray); cipher.update(bytes, 0, remainingBytes, outTempArray);
this.tickBuffer.clear(); } catch (ShortBufferException e) {
this.tickBuffer.writeBytes(outTempArray); MinecraftServer.getExceptionManager().handleException(e);
} }
this.tickBuffer.writeChannel(channel);
} catch (IOException e) {
MinecraftServer.getExceptionManager().handleException(e);
} catch (ShortBufferException e) {
e.printStackTrace();
} finally {
this.tickBuffer.clear(); this.tickBuffer.clear();
this.tickBuffer.writeBytes(outTempArray);
} }
this.waitingBuffers.add(tickBuffer);
Iterator<BinaryBuffer> iterator = waitingBuffers.iterator();
while (iterator.hasNext()) {
BinaryBuffer waitingBuffer = iterator.next();
try {
if (!waitingBuffer.writeChannel(channel)) break;
iterator.remove();
waitingBuffer.clear();
POOLED_BUFFERS.add(waitingBuffer);
} catch (IOException e) {
MinecraftServer.getExceptionManager().handleException(e);
}
}
// Update tick buffer
BinaryBuffer newBuffer = POOLED_BUFFERS.poll();
if (newBuffer == null) newBuffer = BinaryBuffer.ofSize(BUFFER_SIZE);
newBuffer.clear();
this.tickBuffer = newBuffer;
} }
} }
@ -299,6 +316,9 @@ public class PlayerSocketConnection extends PlayerConnection {
@Override @Override
public void disconnect() { public void disconnect() {
this.worker.disconnect(this, channel); this.worker.disconnect(this, channel);
synchronized (bufferLock) {
POOLED_BUFFERS.addAll(waitingBuffers);
}
} }
public @NotNull SocketChannel getChannel() { public @NotNull SocketChannel getChannel() {

View File

@ -84,6 +84,7 @@ public final class Worker extends Thread {
socket.setSendBufferSize(Server.SOCKET_BUFFER_SIZE); socket.setSendBufferSize(Server.SOCKET_BUFFER_SIZE);
socket.setReceiveBufferSize(Server.SOCKET_BUFFER_SIZE); socket.setReceiveBufferSize(Server.SOCKET_BUFFER_SIZE);
socket.setTcpNoDelay(Server.NO_DELAY); socket.setTcpNoDelay(Server.NO_DELAY);
socket.setSoTimeout(30 * 1000); // 30 seconds
this.selector.wakeup(); this.selector.wakeup();
} }

View File

@ -123,16 +123,15 @@ public final class BinaryBuffer {
return nioBuffer.position(reader).slice().limit(writer); return nioBuffer.position(reader).slice().limit(writer);
} }
public void writeChannel(WritableByteChannel channel) throws IOException { public boolean writeChannel(WritableByteChannel channel) throws IOException {
var writeBuffer = asByteBuffer(readerOffset, writerOffset); var writeBuffer = asByteBuffer(readerOffset, writerOffset - readerOffset);
while (writeBuffer.position() != writeBuffer.limit()) { final int count = channel.write(writeBuffer);
final int count = channel.write(writeBuffer); if (count == -1) {
if (count == -1) { // EOS
// EOS throw new IOException("Disconnected");
throw new IOException("Disconnected");
}
this.readerOffset += count;
} }
this.readerOffset += count;
return writeBuffer.limit() == writeBuffer.position();
} }
public void readChannel(ReadableByteChannel channel) throws IOException { public void readChannel(ReadableByteChannel channel) throws IOException {