From 8b1856d5b7c28ea2fbb4124652d6dc12d2960638 Mon Sep 17 00:00:00 2001 From: TheMode Date: Wed, 17 Nov 2021 06:31:24 +0100 Subject: [PATCH] Async packet write (#533) --- build.gradle | 3 + .../java/net/minestom/server/Viewable.java | 36 ++- .../net/minestom/server/entity/Entity.java | 2 +- .../net/minestom/server/entity/Player.java | 23 +- .../server/instance/DynamicChunk.java | 6 +- .../minestom/server/message/Messenger.java | 8 +- .../network/packet/server/CachedPacket.java | 20 +- .../network/packet/server/FramedPacket.java | 2 +- .../network/packet/server/SendablePacket.java | 8 + .../network/packet/server/ServerPacket.java | 4 +- .../network/player/FakePlayerConnection.java | 18 +- .../network/player/PlayerConnection.java | 30 +-- .../player/PlayerSocketConnection.java | 235 +++++++++--------- .../server/network/socket/Worker.java | 11 +- .../minestom/server/utils/PacketUtils.java | 67 ++--- 15 files changed, 239 insertions(+), 234 deletions(-) create mode 100644 src/main/java/net/minestom/server/network/packet/server/SendablePacket.java diff --git a/build.gradle b/build.gradle index e2c1255e5..1483f12ef 100644 --- a/build.gradle +++ b/build.gradle @@ -131,6 +131,9 @@ dependencies { // https://mvnrepository.com/artifact/com.zaxxer/SparseBitSet implementation group: 'com.zaxxer', name: 'SparseBitSet', version: '1.2' + // https://mvnrepository.com/artifact/org.jctools/jctools-core + implementation group: 'org.jctools', name: 'jctools-core', version: '3.3.0' + // Guava 21.0+ required for Mixin api 'com.google.guava:guava:31.0.1-jre' diff --git a/src/main/java/net/minestom/server/Viewable.java b/src/main/java/net/minestom/server/Viewable.java index 59965d5e7..6d5a67e42 100644 --- a/src/main/java/net/minestom/server/Viewable.java +++ b/src/main/java/net/minestom/server/Viewable.java @@ -3,12 +3,12 @@ package net.minestom.server; import net.kyori.adventure.audience.Audience; import net.minestom.server.adventure.audience.PacketGroupingAudience; import net.minestom.server.entity.Player; -import net.minestom.server.network.packet.server.FramedPacket; +import net.minestom.server.network.packet.server.SendablePacket; import net.minestom.server.network.packet.server.ServerPacket; import net.minestom.server.utils.PacketUtils; -import org.jetbrains.annotations.ApiStatus; import org.jetbrains.annotations.NotNull; +import java.util.Collection; import java.util.Set; /** @@ -57,14 +57,11 @@ public interface Viewable { * * @param packet the packet to send to all viewers */ - default void sendPacketToViewers(@NotNull ServerPacket packet) { - PacketUtils.sendGroupedPacket(getViewers(), packet); - } - - @ApiStatus.Experimental - default void sendPacketToViewers(@NotNull FramedPacket framedPacket) { - for (Player viewer : getViewers()) { - viewer.sendPacket(framedPacket); + default void sendPacketToViewers(@NotNull SendablePacket packet) { + if (packet instanceof ServerPacket serverPacket) { + PacketUtils.sendGroupedPacket(getViewers(), serverPacket); + } else { + getViewers().forEach(player -> player.sendPacket(packet)); } } @@ -76,28 +73,27 @@ public interface Viewable { * * @param packets the packets to send */ - default void sendPacketsToViewers(@NotNull ServerPacket... packets) { - for (ServerPacket packet : packets) { - PacketUtils.sendGroupedPacket(getViewers(), packet); + default void sendPacketsToViewers(@NotNull SendablePacket... packets) { + for (SendablePacket packet : packets) { + sendPacketToViewers(packet); } } + default void sendPacketsToViewers(@NotNull Collection packets) { + packets.forEach(this::sendPacketToViewers); + } + /** * Sends a packet to all viewers and the viewable element if it is a player. *

- * If 'this' isn't a player, then only {@link #sendPacketToViewers(ServerPacket)} is called. + * If 'this' isn't a player, then only {@link #sendPacketToViewers(SendablePacket)} is called. * * @param packet the packet to send */ - default void sendPacketToViewersAndSelf(@NotNull ServerPacket packet) { + default void sendPacketToViewersAndSelf(@NotNull SendablePacket packet) { sendPacketToViewers(packet); } - @ApiStatus.Experimental - default void sendPacketToViewersAndSelf(@NotNull FramedPacket framedPacket) { - sendPacketToViewers(framedPacket); - } - /** * Gets the result of {@link #getViewers()} as an Adventure Audience. * diff --git a/src/main/java/net/minestom/server/entity/Entity.java b/src/main/java/net/minestom/server/entity/Entity.java index e5ecdf201..2aba093ad 100644 --- a/src/main/java/net/minestom/server/entity/Entity.java +++ b/src/main/java/net/minestom/server/entity/Entity.java @@ -455,7 +455,7 @@ public class Entity implements Viewable, Tickable, TagHandler, PermissionHandler if (passenger != player) passenger.viewEngine.viewableOption.removal.accept(player); } } - player.sendPacket(destroyPacketCache.retrieve()); + player.sendPacket(destroyPacketCache); } @Override diff --git a/src/main/java/net/minestom/server/entity/Player.java b/src/main/java/net/minestom/server/entity/Player.java index 4dd39a573..b4e5c5ece 100644 --- a/src/main/java/net/minestom/server/entity/Player.java +++ b/src/main/java/net/minestom/server/entity/Player.java @@ -54,7 +54,7 @@ import net.minestom.server.network.ConnectionState; import net.minestom.server.network.PlayerProvider; import net.minestom.server.network.packet.client.ClientPlayPacket; import net.minestom.server.network.packet.client.play.ClientChatMessagePacket; -import net.minestom.server.network.packet.server.FramedPacket; +import net.minestom.server.network.packet.server.SendablePacket; import net.minestom.server.network.packet.server.ServerPacket; import net.minestom.server.network.packet.server.login.LoginDisconnectPacket; import net.minestom.server.network.packet.server.play.*; @@ -502,17 +502,11 @@ public class Player extends LivingEntity implements CommandSender, Localizable, } @Override - public void sendPacketToViewersAndSelf(@NotNull ServerPacket packet) { + public void sendPacketToViewersAndSelf(@NotNull SendablePacket packet) { this.playerConnection.sendPacket(packet); super.sendPacketToViewersAndSelf(packet); } - @Override - public void sendPacketToViewersAndSelf(@NotNull FramedPacket framedPacket) { - this.playerConnection.sendPacket(framedPacket); - super.sendPacketToViewersAndSelf(framedPacket); - } - /** * Changes the player instance and load surrounding chunks if needed. *

@@ -1180,18 +1174,23 @@ public class Player extends LivingEntity implements CommandSender, Localizable, } /** - * Shortcut for {@link PlayerConnection#sendPacket(ServerPacket)}. + * Shortcut for {@link PlayerConnection#sendPacket(SendablePacket)}. * * @param packet the packet to send */ @ApiStatus.Experimental - public void sendPacket(@NotNull ServerPacket packet) { + public void sendPacket(@NotNull SendablePacket packet) { this.playerConnection.sendPacket(packet); } @ApiStatus.Experimental - public void sendPacket(@NotNull FramedPacket framedPacket) { - this.playerConnection.sendPacket(framedPacket); + public void sendPackets(@NotNull SendablePacket... packets) { + this.playerConnection.sendPackets(packets); + } + + @ApiStatus.Experimental + public void sendPackets(@NotNull Collection packets) { + this.playerConnection.sendPackets(packets); } /** diff --git a/src/main/java/net/minestom/server/instance/DynamicChunk.java b/src/main/java/net/minestom/server/instance/DynamicChunk.java index f65580673..09c639cae 100644 --- a/src/main/java/net/minestom/server/instance/DynamicChunk.java +++ b/src/main/java/net/minestom/server/instance/DynamicChunk.java @@ -125,16 +125,14 @@ public class DynamicChunk extends Chunk { @Override public void sendChunk(@NotNull Player player) { if (!isLoaded()) return; - player.sendPacket(lightCache.retrieve()); - player.sendPacket(chunkCache.retrieve()); + player.sendPackets(lightCache, chunkCache); } @Override public void sendChunk() { if (!isLoaded()) return; if (getViewers().isEmpty()) return; - sendPacketToViewers(lightCache.retrieve()); - sendPacketToViewers(chunkCache.retrieve()); + sendPacketsToViewers(lightCache, chunkCache); } @NotNull diff --git a/src/main/java/net/minestom/server/message/Messenger.java b/src/main/java/net/minestom/server/message/Messenger.java index c4dc90ba1..a30cd9418 100644 --- a/src/main/java/net/minestom/server/message/Messenger.java +++ b/src/main/java/net/minestom/server/message/Messenger.java @@ -1,7 +1,5 @@ package net.minestom.server.message; -import java.util.*; - import net.kyori.adventure.text.Component; import net.kyori.adventure.text.format.NamedTextColor; import net.minestom.server.entity.Player; @@ -10,6 +8,10 @@ import net.minestom.server.utils.PacketUtils; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import java.util.Collection; +import java.util.Objects; +import java.util.UUID; + /** * Utility class to handle client chat settings. */ @@ -79,7 +81,7 @@ public class Messenger { * @param player the player */ public static void sendRejectionMessage(@NotNull Player player) { - player.getPlayerConnection().sendPacket(CANNOT_SEND_PACKET, false); + player.getPlayerConnection().sendPacket(CANNOT_SEND_PACKET); } /** diff --git a/src/main/java/net/minestom/server/network/packet/server/CachedPacket.java b/src/main/java/net/minestom/server/network/packet/server/CachedPacket.java index d8a7bbe6d..913401f0c 100644 --- a/src/main/java/net/minestom/server/network/packet/server/CachedPacket.java +++ b/src/main/java/net/minestom/server/network/packet/server/CachedPacket.java @@ -9,33 +9,41 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.Supplier; @ApiStatus.Internal -public final class CachedPacket { +public final class CachedPacket implements SendablePacket { private static final AtomicIntegerFieldUpdater UPDATER = AtomicIntegerFieldUpdater.newUpdater(CachedPacket.class, "updated"); - private final Supplier supplier; + private final Supplier packetSupplier; // 0 means that the reference needs to be updated // Anything else (currently 1) means that the packet is up-to-date private volatile int updated = 0; private SoftReference packet; - public CachedPacket(@NotNull Supplier<@NotNull ServerPacket> supplier) { - this.supplier = supplier; + public CachedPacket(@NotNull Supplier<@NotNull ServerPacket> packetSupplier) { + this.packetSupplier = packetSupplier; + } + + public CachedPacket(@NotNull ServerPacket packet) { + this(() -> packet); } public void invalidate() { this.updated = 0; } + public @NotNull ServerPacket packet() { + return packetSupplier.get(); + } + public @NotNull FramedPacket retrieve() { if (!PacketUtils.CACHED_PACKET) { // TODO: Using a local buffer may be possible - return PacketUtils.allocateTrimmedPacket(supplier.get()); + return PacketUtils.allocateTrimmedPacket(packet()); } SoftReference ref; FramedPacket cache; if (updated == 0 || ((ref = packet) == null || (cache = ref.get()) == null)) { - cache = PacketUtils.allocateTrimmedPacket(supplier.get()); + cache = PacketUtils.allocateTrimmedPacket(packet()); this.packet = new SoftReference<>(cache); UPDATER.compareAndSet(this, 0, 1); } diff --git a/src/main/java/net/minestom/server/network/packet/server/FramedPacket.java b/src/main/java/net/minestom/server/network/packet/server/FramedPacket.java index 7bba75182..70cce3fbf 100644 --- a/src/main/java/net/minestom/server/network/packet/server/FramedPacket.java +++ b/src/main/java/net/minestom/server/network/packet/server/FramedPacket.java @@ -13,7 +13,7 @@ import java.nio.ByteBuffer; */ @ApiStatus.Internal public record FramedPacket(@NotNull ServerPacket packet, - @NotNull ByteBuffer body) { + @NotNull ByteBuffer body) implements SendablePacket { public FramedPacket { body = body.position(0).asReadOnlyBuffer(); diff --git a/src/main/java/net/minestom/server/network/packet/server/SendablePacket.java b/src/main/java/net/minestom/server/network/packet/server/SendablePacket.java new file mode 100644 index 000000000..35bf5b941 --- /dev/null +++ b/src/main/java/net/minestom/server/network/packet/server/SendablePacket.java @@ -0,0 +1,8 @@ +package net.minestom.server.network.packet.server; + +import org.jetbrains.annotations.ApiStatus; + +@ApiStatus.Experimental +public sealed interface SendablePacket + permits ServerPacket, CachedPacket, FramedPacket { +} diff --git a/src/main/java/net/minestom/server/network/packet/server/ServerPacket.java b/src/main/java/net/minestom/server/network/packet/server/ServerPacket.java index eb94d671b..bf3904282 100644 --- a/src/main/java/net/minestom/server/network/packet/server/ServerPacket.java +++ b/src/main/java/net/minestom/server/network/packet/server/ServerPacket.java @@ -7,9 +7,9 @@ import net.minestom.server.utils.binary.Writeable; import org.jetbrains.annotations.NotNull; /** - * Represents a packet which can be sent to a player using {@link PlayerConnection#sendPacket(ServerPacket)}. + * Represents a packet which can be sent to a player using {@link PlayerConnection#sendPacket(SendablePacket)}. */ -public interface ServerPacket extends Readable, Writeable { +public non-sealed interface ServerPacket extends Readable, Writeable, SendablePacket { @Override default void read(@NotNull BinaryReader reader) { diff --git a/src/main/java/net/minestom/server/network/player/FakePlayerConnection.java b/src/main/java/net/minestom/server/network/player/FakePlayerConnection.java index 9cb1b4637..c570f7801 100644 --- a/src/main/java/net/minestom/server/network/player/FakePlayerConnection.java +++ b/src/main/java/net/minestom/server/network/player/FakePlayerConnection.java @@ -3,6 +3,10 @@ package net.minestom.server.network.player; import net.minestom.server.MinecraftServer; import net.minestom.server.entity.Player; import net.minestom.server.entity.fakeplayer.FakePlayer; +import net.minestom.server.entity.fakeplayer.FakePlayerController; +import net.minestom.server.network.packet.server.CachedPacket; +import net.minestom.server.network.packet.server.FramedPacket; +import net.minestom.server.network.packet.server.SendablePacket; import net.minestom.server.network.packet.server.ServerPacket; import net.minestom.server.utils.validate.Check; import org.jetbrains.annotations.NotNull; @@ -13,9 +17,17 @@ import java.net.SocketAddress; public class FakePlayerConnection extends PlayerConnection { @Override - public void sendPacket(@NotNull ServerPacket serverPacket, boolean skipTranslating) { - if (shouldSendPacket(serverPacket)) { - getFakePlayer().getController().consumePacket(serverPacket); + public void sendPacket(@NotNull SendablePacket packet) { + FakePlayerController controller = getFakePlayer().getController(); + if (packet instanceof ServerPacket serverPacket) { + if (!shouldSendPacket(serverPacket)) return; + controller.consumePacket(serverPacket); + } else if (packet instanceof FramedPacket framedPacket) { + controller.consumePacket(framedPacket.packet()); + } else if (packet instanceof CachedPacket cachedPacket) { + controller.consumePacket(cachedPacket.packet()); + } else { + throw new RuntimeException("Unknown packet type: " + packet.getClass().getName()); } } diff --git a/src/main/java/net/minestom/server/network/player/PlayerConnection.java b/src/main/java/net/minestom/server/network/player/PlayerConnection.java index 7a5398857..55b3b8c72 100644 --- a/src/main/java/net/minestom/server/network/player/PlayerConnection.java +++ b/src/main/java/net/minestom/server/network/player/PlayerConnection.java @@ -8,13 +8,14 @@ import net.minestom.server.listener.manager.PacketListenerManager; import net.minestom.server.listener.manager.ServerPacketConsumer; import net.minestom.server.network.ConnectionManager; import net.minestom.server.network.ConnectionState; -import net.minestom.server.network.packet.server.FramedPacket; +import net.minestom.server.network.packet.server.SendablePacket; import net.minestom.server.network.packet.server.ServerPacket; import org.jetbrains.annotations.ApiStatus; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.net.SocketAddress; +import java.util.Collection; import java.util.Collections; import java.util.concurrent.atomic.AtomicInteger; @@ -85,26 +86,21 @@ public abstract class PlayerConnection { *

* Also responsible for executing {@link ConnectionManager#onPacketSend(ServerPacketConsumer)} consumers. * - * @param serverPacket the packet to send + * @param packet the packet to send * @see #shouldSendPacket(ServerPacket) */ - public void sendPacket(@NotNull ServerPacket serverPacket) { - this.sendPacket(serverPacket, false); - } - - /** - * Serializes the packet and send it to the client, optionally skipping the translation phase. - *

- * Also responsible for executing {@link ConnectionManager#onPacketSend(ServerPacketConsumer)} consumers. - * - * @param serverPacket the packet to send - * @see #shouldSendPacket(ServerPacket) - */ - public abstract void sendPacket(@NotNull ServerPacket serverPacket, boolean skipTranslating); + public abstract void sendPacket(@NotNull SendablePacket packet); @ApiStatus.Experimental - public void sendPacket(@NotNull FramedPacket framedPacket) { - this.sendPacket(framedPacket.packet()); + public void sendPackets(@NotNull SendablePacket... packets) { + for (SendablePacket p : packets) { + sendPacket(p); + } + } + + @ApiStatus.Experimental + public void sendPackets(@NotNull Collection packet) { + packet.forEach(this::sendPacket); } /** diff --git a/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java b/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java index cee21577a..8f00d096e 100644 --- a/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java +++ b/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java @@ -8,9 +8,7 @@ import net.minestom.server.entity.PlayerSkin; import net.minestom.server.extras.mojangAuth.MojangCrypt; import net.minestom.server.network.ConnectionState; import net.minestom.server.network.PacketProcessor; -import net.minestom.server.network.packet.server.ComponentHoldingServerPacket; -import net.minestom.server.network.packet.server.FramedPacket; -import net.minestom.server.network.packet.server.ServerPacket; +import net.minestom.server.network.packet.server.*; import net.minestom.server.network.packet.server.login.SetCompressionPacket; import net.minestom.server.network.socket.Worker; import net.minestom.server.utils.PacketUtils; @@ -52,8 +50,8 @@ public class PlayerSocketConnection extends PlayerConnection { private final SocketChannel channel; private SocketAddress remoteAddress; - private boolean encrypted = false; - private boolean compressed = false; + private volatile boolean encrypted = false; + private volatile boolean compressed = false; //Could be null. Only used for Mojang Auth private byte[] nonce = new byte[4]; @@ -74,8 +72,6 @@ public class PlayerSocketConnection extends PlayerConnection { private UUID bungeeUuid; private PlayerSkin bungeeSkin; - private final Object bufferLock = new Object(); - private final Object flushLock = new Object(); private final List waitingBuffers = new ArrayList<>(); private final AtomicReference tickBuffer = new AtomicReference<>(PooledBuffers.get()); private volatile BinaryBuffer cacheBuffer; @@ -176,11 +172,9 @@ public class PlayerSocketConnection extends PlayerConnection { */ public void setEncryptionKey(@NotNull SecretKey secretKey) { Check.stateCondition(encrypted, "Encryption is already enabled!"); - synchronized (bufferLock) { - this.decryptCipher = MojangCrypt.getCipher(2, secretKey); - this.encryptCipher = MojangCrypt.getCipher(1, secretKey); - this.encrypted = true; - } + this.decryptCipher = MojangCrypt.getCipher(2, secretKey); + this.encryptCipher = MojangCrypt.getCipher(1, secretKey); + this.encrypted = true; } /** @@ -193,72 +187,18 @@ public class PlayerSocketConnection extends PlayerConnection { final int threshold = MinecraftServer.getCompressionThreshold(); Check.stateCondition(threshold == 0, "Compression cannot be enabled because the threshold is equal to 0"); writeAndFlush(new SetCompressionPacket(threshold)); - synchronized (bufferLock) { - this.compressed = true; - } - } - - /** - * Writes a packet to the connection channel. - *

- * All packets are flushed during {@link net.minestom.server.entity.Player#update(long)}. - * - * @param serverPacket the packet to write - */ - @Override - public void sendPacket(@NotNull ServerPacket serverPacket, boolean skipTranslating) { - if (!channel.isConnected()) return; - if (shouldSendPacket(serverPacket)) { - final Player player = getPlayer(); - if (player != null) { - // Flush happen during #update() - if ((MinestomAdventure.AUTOMATIC_COMPONENT_TRANSLATION && !skipTranslating) && serverPacket instanceof ComponentHoldingServerPacket) { - serverPacket = ((ComponentHoldingServerPacket) serverPacket).copyWithOperator(component -> - GlobalTranslator.render(component, Objects.requireNonNullElseGet(player.getLocale(), MinestomAdventure::getDefaultLocale))); - } - writePacket(serverPacket); - } else { - // Player is probably not logged yet - writeAndFlush(serverPacket); - } - } + this.compressed = true; } @Override - public void sendPacket(@NotNull FramedPacket framedPacket) { - write(framedPacket.body()); + public void sendPacket(@NotNull SendablePacket packet) { + final boolean compressed = this.compressed; + this.worker.queue().offer(() -> writePacketSync(packet, compressed)); } @ApiStatus.Internal public void write(@NotNull ByteBuffer buffer, int index, int length) { - synchronized (bufferLock) { - if (encrypted) { // Encryption support - ByteBuffer output = PacketUtils.localBuffer(); - try { - this.encryptCipher.update(buffer.slice(index, length), output); - buffer = output.flip(); - index = 0; - } catch (ShortBufferException e) { - MinecraftServer.getExceptionManager().handleException(e); - return; - } - } - - BinaryBuffer localBuffer = tickBuffer.getPlain(); - final int capacity = localBuffer.capacity(); - if (length <= capacity) { - if (!localBuffer.canWrite(length)) localBuffer = updateLocalBuffer(); - localBuffer.write(buffer, index, length); - } else { - final int bufferCount = length / capacity + 1; - for (int i = 0; i < bufferCount; i++) { - final int sliceStart = i * capacity; - final int sliceLength = Math.min(length, sliceStart + capacity) - sliceStart; - if (!localBuffer.canWrite(sliceLength)) localBuffer = updateLocalBuffer(); - localBuffer.write(buffer, sliceStart, sliceLength); - } - } - } + this.worker.queue().offer(() -> writeBufferSync(buffer, index, length)); } @ApiStatus.Internal @@ -266,58 +206,17 @@ public class PlayerSocketConnection extends PlayerConnection { write(buffer, buffer.position(), buffer.remaining()); } - private void writePacket(@NotNull ServerPacket packet) { - write(PacketUtils.createFramedPacket(packet, compressed)); - } - public void writeAndFlush(@NotNull ServerPacket packet) { - synchronized (bufferLock) { - writePacket(packet); - flush(); - } + final boolean compressed = this.compressed; + this.worker.queue().offer(() -> { + writeServerPacketSync(packet, compressed); + flushSync(); + }); } @Override public void flush() { - try { - if (!channel.isConnected()) - throw new ClosedChannelException(); - synchronized (bufferLock) { - try { - updateLocalBuffer(); - } catch (OutOfMemoryError e) { - this.waitingBuffers.clear(); - System.gc(); // Explicit gc forcing buffers to be collected - throw new ClosedChannelException(); - } - } - synchronized (flushLock) { - try { - // Write as much as possible from the waiting list - Iterator iterator = waitingBuffers.iterator(); - while (iterator.hasNext()) { - BinaryBuffer waitingBuffer = iterator.next(); - if (!waitingBuffer.writeChannel(channel)) break; - iterator.remove(); - PooledBuffers.add(waitingBuffer); - } - } catch (IOException e) { // Couldn't write to the socket - MinecraftServer.getExceptionManager().handleException(e); - throw new ClosedChannelException(); - } - } - } catch (ClosedChannelException e) { - disconnect(); - } - } - - private BinaryBuffer updateLocalBuffer() { - synchronized (flushLock) { - BinaryBuffer newBuffer = PooledBuffers.get(); - this.waitingBuffers.add(tickBuffer.getPlain()); - this.tickBuffer.setPlain(newBuffer); - return newBuffer; - } + this.worker.queue().offer(this::flushSync); } @Override @@ -339,7 +238,7 @@ public class PlayerSocketConnection extends PlayerConnection { @Override public void disconnect() { - this.worker.disconnect(this, channel); + this.worker.queue().offer(() -> this.worker.disconnect(this, channel)); } public @NotNull SocketChannel getChannel() { @@ -474,4 +373,102 @@ public class PlayerSocketConnection extends PlayerConnection { public void setNonce(byte[] nonce) { this.nonce = nonce; } + + private void writePacketSync(SendablePacket packet, boolean compressed) { + if (!channel.isConnected()) return; + if (packet instanceof ServerPacket serverPacket) { + writeServerPacketSync(serverPacket, compressed); + } else if (packet instanceof FramedPacket framedPacket) { + writeFramedPacketSync(framedPacket); + } else if (packet instanceof CachedPacket cachedPacket) { + writeFramedPacketSync(cachedPacket.retrieve()); + } else { + throw new RuntimeException("Unknown packet type: " + packet.getClass().getName()); + } + } + + private void writeServerPacketSync(ServerPacket serverPacket, boolean compressed) { + if (!shouldSendPacket(serverPacket)) return; + final Player player = getPlayer(); + if (player != null) { + if (MinestomAdventure.AUTOMATIC_COMPONENT_TRANSLATION && serverPacket instanceof ComponentHoldingServerPacket) { + serverPacket = ((ComponentHoldingServerPacket) serverPacket).copyWithOperator(component -> + GlobalTranslator.render(component, Objects.requireNonNullElseGet(player.getLocale(), MinestomAdventure::getDefaultLocale))); + } + } + writeBufferSync(PacketUtils.createFramedPacket(serverPacket, compressed)); + if (player == null) flushSync(); // Player is probably not logged yet + } + + private void writeFramedPacketSync(FramedPacket framedPacket) { + writeBufferSync(framedPacket.body()); + } + + private void writeBufferSync(@NotNull ByteBuffer buffer, int index, int length) { + if (encrypted) { // Encryption support + ByteBuffer output = PacketUtils.localBuffer(); + try { + this.encryptCipher.update(buffer.slice(index, length), output); + buffer = output.flip(); + index = 0; + } catch (ShortBufferException e) { + MinecraftServer.getExceptionManager().handleException(e); + return; + } + } + + BinaryBuffer localBuffer = tickBuffer.getPlain(); + final int capacity = localBuffer.capacity(); + if (length <= capacity) { + if (!localBuffer.canWrite(length)) localBuffer = updateLocalBuffer(); + localBuffer.write(buffer, index, length); + } else { + final int bufferCount = length / capacity + 1; + for (int i = 0; i < bufferCount; i++) { + final int sliceStart = i * capacity; + final int sliceLength = Math.min(length, sliceStart + capacity) - sliceStart; + if (!localBuffer.canWrite(sliceLength)) localBuffer = updateLocalBuffer(); + localBuffer.write(buffer, sliceStart, sliceLength); + } + } + } + + private void writeBufferSync(@NotNull ByteBuffer buffer) { + writeBufferSync(buffer, buffer.position(), buffer.remaining()); + } + + public void flushSync() { + try { + if (!channel.isConnected()) throw new ClosedChannelException(); + try { + updateLocalBuffer(); + } catch (OutOfMemoryError e) { + this.waitingBuffers.clear(); + System.gc(); // Explicit gc forcing buffers to be collected + throw new ClosedChannelException(); + } + try { + // Write as much as possible from the waiting list + Iterator iterator = waitingBuffers.iterator(); + while (iterator.hasNext()) { + BinaryBuffer waitingBuffer = iterator.next(); + if (!waitingBuffer.writeChannel(channel)) break; + iterator.remove(); + PooledBuffers.add(waitingBuffer); + } + } catch (IOException e) { // Couldn't write to the socket + MinecraftServer.getExceptionManager().handleException(e); + throw new ClosedChannelException(); + } + } catch (ClosedChannelException e) { + disconnect(); + } + } + + private BinaryBuffer updateLocalBuffer() { + BinaryBuffer newBuffer = PooledBuffers.get(); + this.waitingBuffers.add(tickBuffer.getPlain()); + this.tickBuffer.setPlain(newBuffer); + return newBuffer; + } } diff --git a/src/main/java/net/minestom/server/network/socket/Worker.java b/src/main/java/net/minestom/server/network/socket/Worker.java index edfa430ed..3f62f6fe1 100644 --- a/src/main/java/net/minestom/server/network/socket/Worker.java +++ b/src/main/java/net/minestom/server/network/socket/Worker.java @@ -7,6 +7,7 @@ import net.minestom.server.network.PacketProcessor; import net.minestom.server.network.player.PlayerSocketConnection; import net.minestom.server.thread.MinestomThread; import net.minestom.server.utils.binary.BinaryBuffer; +import org.jctools.queues.MpscUnboundedArrayQueue; import org.jetbrains.annotations.ApiStatus; import java.io.IOException; @@ -15,6 +16,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Map; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -29,7 +31,7 @@ public final class Worker extends MinestomThread { private final Map connectionMap = new ConcurrentHashMap<>(); private final Server server; private final PacketProcessor packetProcessor; - + private final MpscUnboundedArrayQueue queue = new MpscUnboundedArrayQueue<>(1024); private final AtomicBoolean flush = new AtomicBoolean(); public Worker(Server server, PacketProcessor packetProcessor) throws IOException { @@ -42,9 +44,10 @@ public final class Worker extends MinestomThread { public void run() { while (server.isOpen()) { try { + this.queue.drain(Runnable::run); // Flush all connections if needed if (flush.compareAndSet(true, false)) { - connectionMap.values().forEach(PlayerSocketConnection::flush); + connectionMap.values().forEach(PlayerSocketConnection::flushSync); } // Wait for an event this.selector.select(key -> { @@ -105,6 +108,10 @@ public final class Worker extends MinestomThread { this.selector.wakeup(); } + public Queue queue() { + return queue; + } + /** * Contains objects that we can be shared across all the connection of a {@link Worker worker}. */ diff --git a/src/main/java/net/minestom/server/utils/PacketUtils.java b/src/main/java/net/minestom/server/utils/PacketUtils.java index b3e3e5f79..3cb10a867 100644 --- a/src/main/java/net/minestom/server/utils/PacketUtils.java +++ b/src/main/java/net/minestom/server/utils/PacketUtils.java @@ -9,13 +9,13 @@ import net.kyori.adventure.audience.Audience; import net.kyori.adventure.audience.ForwardingAudience; import net.minestom.server.MinecraftServer; import net.minestom.server.Viewable; -import net.minestom.server.adventure.MinestomAdventure; import net.minestom.server.adventure.audience.PacketGroupingAudience; import net.minestom.server.entity.Entity; import net.minestom.server.entity.Player; import net.minestom.server.listener.manager.PacketListenerManager; -import net.minestom.server.network.packet.server.ComponentHoldingServerPacket; +import net.minestom.server.network.packet.server.CachedPacket; import net.minestom.server.network.packet.server.FramedPacket; +import net.minestom.server.network.packet.server.SendablePacket; import net.minestom.server.network.packet.server.ServerPacket; import net.minestom.server.network.player.PlayerConnection; import net.minestom.server.network.player.PlayerSocketConnection; @@ -24,7 +24,6 @@ import net.minestom.server.utils.binary.BinaryBuffer; import net.minestom.server.utils.binary.BinaryWriter; import net.minestom.server.utils.binary.PooledBuffers; import net.minestom.server.utils.cache.LocalCache; -import net.minestom.server.utils.callback.validator.PlayerValidator; import org.jetbrains.annotations.ApiStatus; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -33,6 +32,7 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.Objects; import java.util.concurrent.ConcurrentMap; +import java.util.function.Predicate; import java.util.zip.Deflater; /** @@ -105,45 +105,26 @@ public final class PacketUtils { *

* Can drastically improve performance since the packet will not have to be processed as much. * - * @param players the players to send the packet to - * @param packet the packet to send to the players - * @param playerValidator optional callback to check if a specify player of {@code players} should receive the packet + * @param players the players to send the packet to + * @param packet the packet to send to the players + * @param predicate predicate to ignore specific players */ public static void sendGroupedPacket(@NotNull Collection players, @NotNull ServerPacket packet, - @NotNull PlayerValidator playerValidator) { - if (players.isEmpty()) - return; + @NotNull Predicate predicate) { + if (players.isEmpty()) return; + if (!PACKET_LISTENER_MANAGER.processServerPacket(packet, players)) return; // work out if the packet needs to be sent individually due to server-side translating - boolean needsTranslating = false; - if (MinestomAdventure.AUTOMATIC_COMPONENT_TRANSLATION && packet instanceof ComponentHoldingServerPacket) { - needsTranslating = ComponentUtils.areAnyTranslatable(((ComponentHoldingServerPacket) packet).components()); - } - if (GROUPED_PACKET && !needsTranslating) { - // Send grouped packet... - if (!PACKET_LISTENER_MANAGER.processServerPacket(packet, players)) - return; - final FramedPacket framedPacket = new FramedPacket(packet, createFramedPacket(packet)); - // Send packet to all players - players.forEach(player -> { - if (!player.isOnline() || !playerValidator.isValid(player)) - return; - player.sendPacket(framedPacket); - }); - } else { - // Write the same packet for each individual players - players.forEach(player -> { - if (!player.isOnline() || !playerValidator.isValid(player)) - return; - player.getPlayerConnection().sendPacket(packet, false); - }); - } + final SendablePacket sendablePacket = GROUPED_PACKET ? new CachedPacket(packet) : packet; + players.forEach(player -> { + if (predicate.test(player)) player.sendPacket(sendablePacket); + }); } /** - * Same as {@link #sendGroupedPacket(Collection, ServerPacket, PlayerValidator)} + * Same as {@link #sendGroupedPacket(Collection, ServerPacket, Predicate)} * but with the player validator sets to null. * - * @see #sendGroupedPacket(Collection, ServerPacket, PlayerValidator) + * @see #sendGroupedPacket(Collection, ServerPacket, Predicate) */ public static void sendGroupedPacket(@NotNull Collection players, @NotNull ServerPacket packet) { sendGroupedPacket(players, packet, player -> true); @@ -274,13 +255,15 @@ public final class PacketUtils { private void process(Viewable viewable) { if (buffer.writerOffset() == 0) return; - viewable.getViewers().forEach(this::processPlayer); + ByteBuffer copy = ByteBuffer.allocateDirect(buffer.writerOffset()); + copy.put(buffer.asByteBuffer(0, copy.capacity())); + viewable.getViewers().forEach(player -> processPlayer(player, copy)); this.buffer.clear(); this.entityIdMap.clear(); } - private void processPlayer(Player player) { - final int size = buffer.writerOffset(); + private void processPlayer(Player player, ByteBuffer buffer) { + final int size = buffer.limit(); final PlayerConnection connection = player.getPlayerConnection(); final LongArrayList pairs = entityIdMap.get(player.getEntityId()); if (pairs != null) { @@ -290,20 +273,16 @@ public final class PacketUtils { for (int i = 0; i < pairs.size(); ++i) { final long offsets = elements[i]; final int start = (int) (offsets >> 32); - if (start != lastWrite) writeTo(connection, lastWrite, start - lastWrite); + if (start != lastWrite) writeTo(connection, buffer, lastWrite, start - lastWrite); lastWrite = (int) offsets; // End = last 32 bits } - if (size != lastWrite) writeTo(connection, lastWrite, size - lastWrite); + if (size != lastWrite) writeTo(connection, buffer, lastWrite, size - lastWrite); } else { // Write all - writeTo(connection, 0, size); + writeTo(connection, buffer, 0, size); } } - private void writeTo(PlayerConnection connection, int offset, int length) { - writeTo(connection, buffer.asByteBuffer(), offset, length); - } - private static void writeTo(PlayerConnection connection, ByteBuffer buffer, int offset, int length) { if (connection instanceof PlayerSocketConnection socketConnection) { socketConnection.write(buffer, offset, length);