diff --git a/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java b/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java index 7b91ad895..673f24182 100644 --- a/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java +++ b/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java @@ -16,6 +16,7 @@ import net.minestom.server.utils.Utils; import net.minestom.server.utils.binary.BinaryBuffer; import net.minestom.server.utils.binary.PooledBuffers; import net.minestom.server.utils.validate.Check; +import org.jctools.queues.MessagePassingQueue; import org.jetbrains.annotations.ApiStatus; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -47,6 +48,7 @@ public class PlayerSocketConnection extends PlayerConnection { private final static Logger LOGGER = LoggerFactory.getLogger(PlayerSocketConnection.class); private final Worker worker; + private final MessagePassingQueue workerQueue; private final SocketChannel channel; private SocketAddress remoteAddress; @@ -79,6 +81,7 @@ public class PlayerSocketConnection extends PlayerConnection { public PlayerSocketConnection(@NotNull Worker worker, @NotNull SocketChannel channel, SocketAddress remoteAddress) { super(); this.worker = worker; + this.workerQueue = worker.queue(); this.channel = channel; this.remoteAddress = remoteAddress; PooledBuffers.registerBuffer(this, tickBuffer); @@ -193,20 +196,21 @@ public class PlayerSocketConnection extends PlayerConnection { @Override public void sendPacket(@NotNull SendablePacket packet) { final boolean compressed = this.compressed; - this.worker.queue().offer(() -> writePacketSync(packet, compressed)); + this.workerQueue.relaxedOffer(() -> writePacketSync(packet, compressed)); } @Override public void sendPackets(@NotNull Collection packets) { + final List packetsCopy = List.copyOf(packets); final boolean compressed = this.compressed; - this.worker.queue().offer(() -> { - for (SendablePacket packet : packets) writePacketSync(packet, compressed); + this.workerQueue.relaxedOffer(() -> { + for (SendablePacket packet : packetsCopy) writePacketSync(packet, compressed); }); } @ApiStatus.Internal public void write(@NotNull ByteBuffer buffer, int index, int length) { - this.worker.queue().offer(() -> writeBufferSync(buffer, index, length)); + this.workerQueue.relaxedOffer(() -> writeBufferSync(buffer, index, length)); } @ApiStatus.Internal @@ -216,7 +220,7 @@ public class PlayerSocketConnection extends PlayerConnection { public void writeAndFlush(@NotNull ServerPacket packet) { final boolean compressed = this.compressed; - this.worker.queue().offer(() -> { + this.workerQueue.relaxedOffer(() -> { writeServerPacketSync(packet, compressed); flushSync(); }); @@ -224,7 +228,7 @@ public class PlayerSocketConnection extends PlayerConnection { @Override public void flush() { - this.worker.queue().offer(this::flushSync); + this.workerQueue.relaxedOffer(this::flushSync); } @Override @@ -246,7 +250,7 @@ public class PlayerSocketConnection extends PlayerConnection { @Override public void disconnect() { - this.worker.queue().offer(() -> this.worker.disconnect(this, channel)); + this.workerQueue.relaxedOffer(() -> this.worker.disconnect(this, channel)); } public @NotNull SocketChannel getChannel() { diff --git a/src/main/java/net/minestom/server/network/socket/Worker.java b/src/main/java/net/minestom/server/network/socket/Worker.java index f598c1429..701d6f931 100644 --- a/src/main/java/net/minestom/server/network/socket/Worker.java +++ b/src/main/java/net/minestom/server/network/socket/Worker.java @@ -6,6 +6,7 @@ import net.minestom.server.entity.Player; import net.minestom.server.network.player.PlayerSocketConnection; import net.minestom.server.thread.MinestomThread; import net.minestom.server.utils.binary.BinaryBuffer; +import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.MpscUnboundedXaddArrayQueue; import org.jetbrains.annotations.ApiStatus; @@ -15,7 +16,6 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Map; -import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -118,7 +118,7 @@ public final class Worker extends MinestomThread { this.selector.wakeup(); } - public Queue queue() { + public MessagePassingQueue queue() { return queue; }