Use relaxedOffer, copy packet list to prevent modifications

Signed-off-by: TheMode <themode@outlook.fr>
This commit is contained in:
TheMode 2021-11-21 14:34:24 +01:00
parent 8005d2bb3f
commit 02de469cad
2 changed files with 13 additions and 9 deletions

View File

@ -16,6 +16,7 @@ import net.minestom.server.utils.Utils;
import net.minestom.server.utils.binary.BinaryBuffer;
import net.minestom.server.utils.binary.PooledBuffers;
import net.minestom.server.utils.validate.Check;
import org.jctools.queues.MessagePassingQueue;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -47,6 +48,7 @@ public class PlayerSocketConnection extends PlayerConnection {
private final static Logger LOGGER = LoggerFactory.getLogger(PlayerSocketConnection.class);
private final Worker worker;
private final MessagePassingQueue<Runnable> workerQueue;
private final SocketChannel channel;
private SocketAddress remoteAddress;
@ -79,6 +81,7 @@ public class PlayerSocketConnection extends PlayerConnection {
public PlayerSocketConnection(@NotNull Worker worker, @NotNull SocketChannel channel, SocketAddress remoteAddress) {
super();
this.worker = worker;
this.workerQueue = worker.queue();
this.channel = channel;
this.remoteAddress = remoteAddress;
PooledBuffers.registerBuffer(this, tickBuffer);
@ -193,20 +196,21 @@ public class PlayerSocketConnection extends PlayerConnection {
@Override
public void sendPacket(@NotNull SendablePacket packet) {
final boolean compressed = this.compressed;
this.worker.queue().offer(() -> writePacketSync(packet, compressed));
this.workerQueue.relaxedOffer(() -> writePacketSync(packet, compressed));
}
@Override
public void sendPackets(@NotNull Collection<SendablePacket> packets) {
final List<SendablePacket> packetsCopy = List.copyOf(packets);
final boolean compressed = this.compressed;
this.worker.queue().offer(() -> {
for (SendablePacket packet : packets) writePacketSync(packet, compressed);
this.workerQueue.relaxedOffer(() -> {
for (SendablePacket packet : packetsCopy) writePacketSync(packet, compressed);
});
}
@ApiStatus.Internal
public void write(@NotNull ByteBuffer buffer, int index, int length) {
this.worker.queue().offer(() -> writeBufferSync(buffer, index, length));
this.workerQueue.relaxedOffer(() -> writeBufferSync(buffer, index, length));
}
@ApiStatus.Internal
@ -216,7 +220,7 @@ public class PlayerSocketConnection extends PlayerConnection {
public void writeAndFlush(@NotNull ServerPacket packet) {
final boolean compressed = this.compressed;
this.worker.queue().offer(() -> {
this.workerQueue.relaxedOffer(() -> {
writeServerPacketSync(packet, compressed);
flushSync();
});
@ -224,7 +228,7 @@ public class PlayerSocketConnection extends PlayerConnection {
@Override
public void flush() {
this.worker.queue().offer(this::flushSync);
this.workerQueue.relaxedOffer(this::flushSync);
}
@Override
@ -246,7 +250,7 @@ public class PlayerSocketConnection extends PlayerConnection {
@Override
public void disconnect() {
this.worker.queue().offer(() -> this.worker.disconnect(this, channel));
this.workerQueue.relaxedOffer(() -> this.worker.disconnect(this, channel));
}
public @NotNull SocketChannel getChannel() {

View File

@ -6,6 +6,7 @@ import net.minestom.server.entity.Player;
import net.minestom.server.network.player.PlayerSocketConnection;
import net.minestom.server.thread.MinestomThread;
import net.minestom.server.utils.binary.BinaryBuffer;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscUnboundedXaddArrayQueue;
import org.jetbrains.annotations.ApiStatus;
@ -15,7 +16,6 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -118,7 +118,7 @@ public final class Worker extends MinestomThread {
this.selector.wakeup();
}
public Queue<Runnable> queue() {
public MessagePassingQueue<Runnable> queue() {
return queue;
}