diff --git a/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java b/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java index 2fbba6aba..84e09b34a 100644 --- a/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java +++ b/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java @@ -12,7 +12,6 @@ import net.minestom.server.network.packet.FramedPacket; import net.minestom.server.network.packet.server.ComponentHoldingServerPacket; import net.minestom.server.network.packet.server.ServerPacket; import net.minestom.server.network.packet.server.login.SetCompressionPacket; -import net.minestom.server.network.socket.Server; import net.minestom.server.network.socket.Worker; import net.minestom.server.utils.PacketUtils; import net.minestom.server.utils.Utils; @@ -32,10 +31,9 @@ import java.net.SocketAddress; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; -import java.util.Map; -import java.util.Objects; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.zip.DataFormatException; /** @@ -46,6 +44,8 @@ import java.util.zip.DataFormatException; @ApiStatus.Internal public class PlayerSocketConnection extends PlayerConnection { private final static Logger LOGGER = LoggerFactory.getLogger(PlayerSocketConnection.class); + private final static Queue POOLED_BUFFERS = new ConcurrentLinkedQueue<>(); + private final static int BUFFER_SIZE = 262_143; private final Worker worker; private final SocketChannel channel; @@ -73,7 +73,9 @@ public class PlayerSocketConnection extends PlayerConnection { private UUID bungeeUuid; private PlayerSkin bungeeSkin; - private final BinaryBuffer tickBuffer = BinaryBuffer.ofSize(Server.SOCKET_BUFFER_SIZE); + private final Object bufferLock = new Object(); + private final List waitingBuffers = new ArrayList<>(); + private BinaryBuffer tickBuffer = BinaryBuffer.ofSize(BUFFER_SIZE); private volatile BinaryBuffer cacheBuffer; public PlayerSocketConnection(@NotNull Worker worker, @NotNull SocketChannel channel, SocketAddress remoteAddress) { @@ -227,7 +229,7 @@ public class PlayerSocketConnection extends PlayerConnection { } public void write(@NotNull ByteBuffer buffer) { - synchronized (tickBuffer) { + synchronized (bufferLock) { if (!tickBuffer.canWrite(buffer.position())) { // Tick buffer is full, flush before appending flush(); @@ -246,7 +248,7 @@ public class PlayerSocketConnection extends PlayerConnection { } public void writeAndFlush(@NotNull ServerPacket packet) { - synchronized (tickBuffer) { + synchronized (bufferLock) { write(packet); flush(); } @@ -255,27 +257,42 @@ public class PlayerSocketConnection extends PlayerConnection { @Override public void flush() { if (!channel.isOpen()) return; - synchronized (tickBuffer) { - if (tickBuffer.readableBytes() == 0) return; - try { - if (encrypted) { - final Cipher cipher = encryptCipher; - // Encrypt data first - final int remainingBytes = tickBuffer.readableBytes(); - final byte[] bytes = tickBuffer.readRemainingBytes(); - byte[] outTempArray = new byte[cipher.getOutputSize(remainingBytes)]; + if (tickBuffer.readableBytes() == 0 && waitingBuffers.isEmpty()) return; + synchronized (bufferLock) { + if (tickBuffer.readableBytes() == 0 && waitingBuffers.isEmpty()) return; + if (encrypted) { + final Cipher cipher = encryptCipher; + // Encrypt data first + final int remainingBytes = tickBuffer.readableBytes(); + final byte[] bytes = tickBuffer.readRemainingBytes(); + byte[] outTempArray = new byte[cipher.getOutputSize(remainingBytes)]; + try { cipher.update(bytes, 0, remainingBytes, outTempArray); - this.tickBuffer.clear(); - this.tickBuffer.writeBytes(outTempArray); + } catch (ShortBufferException e) { + MinecraftServer.getExceptionManager().handleException(e); } - this.tickBuffer.writeChannel(channel); - } catch (IOException e) { - MinecraftServer.getExceptionManager().handleException(e); - } catch (ShortBufferException e) { - e.printStackTrace(); - } finally { this.tickBuffer.clear(); + this.tickBuffer.writeBytes(outTempArray); } + + this.waitingBuffers.add(tickBuffer); + Iterator iterator = waitingBuffers.iterator(); + while (iterator.hasNext()) { + BinaryBuffer waitingBuffer = iterator.next(); + try { + if (!waitingBuffer.writeChannel(channel)) break; + iterator.remove(); + waitingBuffer.clear(); + POOLED_BUFFERS.add(waitingBuffer); + } catch (IOException e) { + MinecraftServer.getExceptionManager().handleException(e); + } + } + // Update tick buffer + BinaryBuffer newBuffer = POOLED_BUFFERS.poll(); + if (newBuffer == null) newBuffer = BinaryBuffer.ofSize(BUFFER_SIZE); + newBuffer.clear(); + this.tickBuffer = newBuffer; } } @@ -299,6 +316,9 @@ public class PlayerSocketConnection extends PlayerConnection { @Override public void disconnect() { this.worker.disconnect(this, channel); + synchronized (bufferLock) { + POOLED_BUFFERS.addAll(waitingBuffers); + } } public @NotNull SocketChannel getChannel() { diff --git a/src/main/java/net/minestom/server/network/socket/Worker.java b/src/main/java/net/minestom/server/network/socket/Worker.java index 54b94d61d..eb23c87f8 100644 --- a/src/main/java/net/minestom/server/network/socket/Worker.java +++ b/src/main/java/net/minestom/server/network/socket/Worker.java @@ -84,6 +84,7 @@ public final class Worker extends Thread { socket.setSendBufferSize(Server.SOCKET_BUFFER_SIZE); socket.setReceiveBufferSize(Server.SOCKET_BUFFER_SIZE); socket.setTcpNoDelay(Server.NO_DELAY); + socket.setSoTimeout(30 * 1000); // 30 seconds this.selector.wakeup(); } diff --git a/src/main/java/net/minestom/server/utils/binary/BinaryBuffer.java b/src/main/java/net/minestom/server/utils/binary/BinaryBuffer.java index d0a60d40e..4689f624b 100644 --- a/src/main/java/net/minestom/server/utils/binary/BinaryBuffer.java +++ b/src/main/java/net/minestom/server/utils/binary/BinaryBuffer.java @@ -123,16 +123,15 @@ public final class BinaryBuffer { return nioBuffer.position(reader).slice().limit(writer); } - public void writeChannel(WritableByteChannel channel) throws IOException { - var writeBuffer = asByteBuffer(readerOffset, writerOffset); - while (writeBuffer.position() != writeBuffer.limit()) { - final int count = channel.write(writeBuffer); - if (count == -1) { - // EOS - throw new IOException("Disconnected"); - } - this.readerOffset += count; + public boolean writeChannel(WritableByteChannel channel) throws IOException { + var writeBuffer = asByteBuffer(readerOffset, writerOffset - readerOffset); + final int count = channel.write(writeBuffer); + if (count == -1) { + // EOS + throw new IOException("Disconnected"); } + this.readerOffset += count; + return writeBuffer.limit() == writeBuffer.position(); } public void readChannel(ReadableByteChannel channel) throws IOException {