Async packet write (#533)

This commit is contained in:
TheMode 2021-11-17 06:31:24 +01:00 committed by GitHub
parent c5e947c76e
commit 8b1856d5b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 239 additions and 234 deletions

View File

@ -131,6 +131,9 @@ dependencies {
// https://mvnrepository.com/artifact/com.zaxxer/SparseBitSet
implementation group: 'com.zaxxer', name: 'SparseBitSet', version: '1.2'
// https://mvnrepository.com/artifact/org.jctools/jctools-core
implementation group: 'org.jctools', name: 'jctools-core', version: '3.3.0'
// Guava 21.0+ required for Mixin
api 'com.google.guava:guava:31.0.1-jre'

View File

@ -3,12 +3,12 @@ package net.minestom.server;
import net.kyori.adventure.audience.Audience;
import net.minestom.server.adventure.audience.PacketGroupingAudience;
import net.minestom.server.entity.Player;
import net.minestom.server.network.packet.server.FramedPacket;
import net.minestom.server.network.packet.server.SendablePacket;
import net.minestom.server.network.packet.server.ServerPacket;
import net.minestom.server.utils.PacketUtils;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import java.util.Collection;
import java.util.Set;
/**
@ -57,14 +57,11 @@ public interface Viewable {
*
* @param packet the packet to send to all viewers
*/
default void sendPacketToViewers(@NotNull ServerPacket packet) {
PacketUtils.sendGroupedPacket(getViewers(), packet);
}
@ApiStatus.Experimental
default void sendPacketToViewers(@NotNull FramedPacket framedPacket) {
for (Player viewer : getViewers()) {
viewer.sendPacket(framedPacket);
default void sendPacketToViewers(@NotNull SendablePacket packet) {
if (packet instanceof ServerPacket serverPacket) {
PacketUtils.sendGroupedPacket(getViewers(), serverPacket);
} else {
getViewers().forEach(player -> player.sendPacket(packet));
}
}
@ -76,28 +73,27 @@ public interface Viewable {
*
* @param packets the packets to send
*/
default void sendPacketsToViewers(@NotNull ServerPacket... packets) {
for (ServerPacket packet : packets) {
PacketUtils.sendGroupedPacket(getViewers(), packet);
default void sendPacketsToViewers(@NotNull SendablePacket... packets) {
for (SendablePacket packet : packets) {
sendPacketToViewers(packet);
}
}
default void sendPacketsToViewers(@NotNull Collection<SendablePacket> packets) {
packets.forEach(this::sendPacketToViewers);
}
/**
* Sends a packet to all viewers and the viewable element if it is a player.
* <p>
* If 'this' isn't a player, then only {@link #sendPacketToViewers(ServerPacket)} is called.
* If 'this' isn't a player, then only {@link #sendPacketToViewers(SendablePacket)} is called.
*
* @param packet the packet to send
*/
default void sendPacketToViewersAndSelf(@NotNull ServerPacket packet) {
default void sendPacketToViewersAndSelf(@NotNull SendablePacket packet) {
sendPacketToViewers(packet);
}
@ApiStatus.Experimental
default void sendPacketToViewersAndSelf(@NotNull FramedPacket framedPacket) {
sendPacketToViewers(framedPacket);
}
/**
* Gets the result of {@link #getViewers()} as an Adventure Audience.
*

View File

@ -455,7 +455,7 @@ public class Entity implements Viewable, Tickable, TagHandler, PermissionHandler
if (passenger != player) passenger.viewEngine.viewableOption.removal.accept(player);
}
}
player.sendPacket(destroyPacketCache.retrieve());
player.sendPacket(destroyPacketCache);
}
@Override

View File

@ -54,7 +54,7 @@ import net.minestom.server.network.ConnectionState;
import net.minestom.server.network.PlayerProvider;
import net.minestom.server.network.packet.client.ClientPlayPacket;
import net.minestom.server.network.packet.client.play.ClientChatMessagePacket;
import net.minestom.server.network.packet.server.FramedPacket;
import net.minestom.server.network.packet.server.SendablePacket;
import net.minestom.server.network.packet.server.ServerPacket;
import net.minestom.server.network.packet.server.login.LoginDisconnectPacket;
import net.minestom.server.network.packet.server.play.*;
@ -502,17 +502,11 @@ public class Player extends LivingEntity implements CommandSender, Localizable,
}
@Override
public void sendPacketToViewersAndSelf(@NotNull ServerPacket packet) {
public void sendPacketToViewersAndSelf(@NotNull SendablePacket packet) {
this.playerConnection.sendPacket(packet);
super.sendPacketToViewersAndSelf(packet);
}
@Override
public void sendPacketToViewersAndSelf(@NotNull FramedPacket framedPacket) {
this.playerConnection.sendPacket(framedPacket);
super.sendPacketToViewersAndSelf(framedPacket);
}
/**
* Changes the player instance and load surrounding chunks if needed.
* <p>
@ -1180,18 +1174,23 @@ public class Player extends LivingEntity implements CommandSender, Localizable,
}
/**
* Shortcut for {@link PlayerConnection#sendPacket(ServerPacket)}.
* Shortcut for {@link PlayerConnection#sendPacket(SendablePacket)}.
*
* @param packet the packet to send
*/
@ApiStatus.Experimental
public void sendPacket(@NotNull ServerPacket packet) {
public void sendPacket(@NotNull SendablePacket packet) {
this.playerConnection.sendPacket(packet);
}
@ApiStatus.Experimental
public void sendPacket(@NotNull FramedPacket framedPacket) {
this.playerConnection.sendPacket(framedPacket);
public void sendPackets(@NotNull SendablePacket... packets) {
this.playerConnection.sendPackets(packets);
}
@ApiStatus.Experimental
public void sendPackets(@NotNull Collection<SendablePacket> packets) {
this.playerConnection.sendPackets(packets);
}
/**

View File

@ -125,16 +125,14 @@ public class DynamicChunk extends Chunk {
@Override
public void sendChunk(@NotNull Player player) {
if (!isLoaded()) return;
player.sendPacket(lightCache.retrieve());
player.sendPacket(chunkCache.retrieve());
player.sendPackets(lightCache, chunkCache);
}
@Override
public void sendChunk() {
if (!isLoaded()) return;
if (getViewers().isEmpty()) return;
sendPacketToViewers(lightCache.retrieve());
sendPacketToViewers(chunkCache.retrieve());
sendPacketsToViewers(lightCache, chunkCache);
}
@NotNull

View File

@ -1,7 +1,5 @@
package net.minestom.server.message;
import java.util.*;
import net.kyori.adventure.text.Component;
import net.kyori.adventure.text.format.NamedTextColor;
import net.minestom.server.entity.Player;
@ -10,6 +8,10 @@ import net.minestom.server.utils.PacketUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.Collection;
import java.util.Objects;
import java.util.UUID;
/**
* Utility class to handle client chat settings.
*/
@ -79,7 +81,7 @@ public class Messenger {
* @param player the player
*/
public static void sendRejectionMessage(@NotNull Player player) {
player.getPlayerConnection().sendPacket(CANNOT_SEND_PACKET, false);
player.getPlayerConnection().sendPacket(CANNOT_SEND_PACKET);
}
/**

View File

@ -9,33 +9,41 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Supplier;
@ApiStatus.Internal
public final class CachedPacket {
public final class CachedPacket implements SendablePacket {
private static final AtomicIntegerFieldUpdater<CachedPacket> UPDATER = AtomicIntegerFieldUpdater.newUpdater(CachedPacket.class, "updated");
private final Supplier<ServerPacket> supplier;
private final Supplier<ServerPacket> packetSupplier;
// 0 means that the reference needs to be updated
// Anything else (currently 1) means that the packet is up-to-date
private volatile int updated = 0;
private SoftReference<FramedPacket> packet;
public CachedPacket(@NotNull Supplier<@NotNull ServerPacket> supplier) {
this.supplier = supplier;
public CachedPacket(@NotNull Supplier<@NotNull ServerPacket> packetSupplier) {
this.packetSupplier = packetSupplier;
}
public CachedPacket(@NotNull ServerPacket packet) {
this(() -> packet);
}
public void invalidate() {
this.updated = 0;
}
public @NotNull ServerPacket packet() {
return packetSupplier.get();
}
public @NotNull FramedPacket retrieve() {
if (!PacketUtils.CACHED_PACKET) {
// TODO: Using a local buffer may be possible
return PacketUtils.allocateTrimmedPacket(supplier.get());
return PacketUtils.allocateTrimmedPacket(packet());
}
SoftReference<FramedPacket> ref;
FramedPacket cache;
if (updated == 0 ||
((ref = packet) == null ||
(cache = ref.get()) == null)) {
cache = PacketUtils.allocateTrimmedPacket(supplier.get());
cache = PacketUtils.allocateTrimmedPacket(packet());
this.packet = new SoftReference<>(cache);
UPDATER.compareAndSet(this, 0, 1);
}

View File

@ -13,7 +13,7 @@ import java.nio.ByteBuffer;
*/
@ApiStatus.Internal
public record FramedPacket(@NotNull ServerPacket packet,
@NotNull ByteBuffer body) {
@NotNull ByteBuffer body) implements SendablePacket {
public FramedPacket {
body = body.position(0).asReadOnlyBuffer();

View File

@ -0,0 +1,8 @@
package net.minestom.server.network.packet.server;
import org.jetbrains.annotations.ApiStatus;
@ApiStatus.Experimental
public sealed interface SendablePacket
permits ServerPacket, CachedPacket, FramedPacket {
}

View File

@ -7,9 +7,9 @@ import net.minestom.server.utils.binary.Writeable;
import org.jetbrains.annotations.NotNull;
/**
* Represents a packet which can be sent to a player using {@link PlayerConnection#sendPacket(ServerPacket)}.
* Represents a packet which can be sent to a player using {@link PlayerConnection#sendPacket(SendablePacket)}.
*/
public interface ServerPacket extends Readable, Writeable {
public non-sealed interface ServerPacket extends Readable, Writeable, SendablePacket {
@Override
default void read(@NotNull BinaryReader reader) {

View File

@ -3,6 +3,10 @@ package net.minestom.server.network.player;
import net.minestom.server.MinecraftServer;
import net.minestom.server.entity.Player;
import net.minestom.server.entity.fakeplayer.FakePlayer;
import net.minestom.server.entity.fakeplayer.FakePlayerController;
import net.minestom.server.network.packet.server.CachedPacket;
import net.minestom.server.network.packet.server.FramedPacket;
import net.minestom.server.network.packet.server.SendablePacket;
import net.minestom.server.network.packet.server.ServerPacket;
import net.minestom.server.utils.validate.Check;
import org.jetbrains.annotations.NotNull;
@ -13,9 +17,17 @@ import java.net.SocketAddress;
public class FakePlayerConnection extends PlayerConnection {
@Override
public void sendPacket(@NotNull ServerPacket serverPacket, boolean skipTranslating) {
if (shouldSendPacket(serverPacket)) {
getFakePlayer().getController().consumePacket(serverPacket);
public void sendPacket(@NotNull SendablePacket packet) {
FakePlayerController controller = getFakePlayer().getController();
if (packet instanceof ServerPacket serverPacket) {
if (!shouldSendPacket(serverPacket)) return;
controller.consumePacket(serverPacket);
} else if (packet instanceof FramedPacket framedPacket) {
controller.consumePacket(framedPacket.packet());
} else if (packet instanceof CachedPacket cachedPacket) {
controller.consumePacket(cachedPacket.packet());
} else {
throw new RuntimeException("Unknown packet type: " + packet.getClass().getName());
}
}

View File

@ -8,13 +8,14 @@ import net.minestom.server.listener.manager.PacketListenerManager;
import net.minestom.server.listener.manager.ServerPacketConsumer;
import net.minestom.server.network.ConnectionManager;
import net.minestom.server.network.ConnectionState;
import net.minestom.server.network.packet.server.FramedPacket;
import net.minestom.server.network.packet.server.SendablePacket;
import net.minestom.server.network.packet.server.ServerPacket;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
@ -85,26 +86,21 @@ public abstract class PlayerConnection {
* <p>
* Also responsible for executing {@link ConnectionManager#onPacketSend(ServerPacketConsumer)} consumers.
*
* @param serverPacket the packet to send
* @param packet the packet to send
* @see #shouldSendPacket(ServerPacket)
*/
public void sendPacket(@NotNull ServerPacket serverPacket) {
this.sendPacket(serverPacket, false);
}
/**
* Serializes the packet and send it to the client, optionally skipping the translation phase.
* <p>
* Also responsible for executing {@link ConnectionManager#onPacketSend(ServerPacketConsumer)} consumers.
*
* @param serverPacket the packet to send
* @see #shouldSendPacket(ServerPacket)
*/
public abstract void sendPacket(@NotNull ServerPacket serverPacket, boolean skipTranslating);
public abstract void sendPacket(@NotNull SendablePacket packet);
@ApiStatus.Experimental
public void sendPacket(@NotNull FramedPacket framedPacket) {
this.sendPacket(framedPacket.packet());
public void sendPackets(@NotNull SendablePacket... packets) {
for (SendablePacket p : packets) {
sendPacket(p);
}
}
@ApiStatus.Experimental
public void sendPackets(@NotNull Collection<SendablePacket> packet) {
packet.forEach(this::sendPacket);
}
/**

View File

@ -8,9 +8,7 @@ import net.minestom.server.entity.PlayerSkin;
import net.minestom.server.extras.mojangAuth.MojangCrypt;
import net.minestom.server.network.ConnectionState;
import net.minestom.server.network.PacketProcessor;
import net.minestom.server.network.packet.server.ComponentHoldingServerPacket;
import net.minestom.server.network.packet.server.FramedPacket;
import net.minestom.server.network.packet.server.ServerPacket;
import net.minestom.server.network.packet.server.*;
import net.minestom.server.network.packet.server.login.SetCompressionPacket;
import net.minestom.server.network.socket.Worker;
import net.minestom.server.utils.PacketUtils;
@ -52,8 +50,8 @@ public class PlayerSocketConnection extends PlayerConnection {
private final SocketChannel channel;
private SocketAddress remoteAddress;
private boolean encrypted = false;
private boolean compressed = false;
private volatile boolean encrypted = false;
private volatile boolean compressed = false;
//Could be null. Only used for Mojang Auth
private byte[] nonce = new byte[4];
@ -74,8 +72,6 @@ public class PlayerSocketConnection extends PlayerConnection {
private UUID bungeeUuid;
private PlayerSkin bungeeSkin;
private final Object bufferLock = new Object();
private final Object flushLock = new Object();
private final List<BinaryBuffer> waitingBuffers = new ArrayList<>();
private final AtomicReference<BinaryBuffer> tickBuffer = new AtomicReference<>(PooledBuffers.get());
private volatile BinaryBuffer cacheBuffer;
@ -176,11 +172,9 @@ public class PlayerSocketConnection extends PlayerConnection {
*/
public void setEncryptionKey(@NotNull SecretKey secretKey) {
Check.stateCondition(encrypted, "Encryption is already enabled!");
synchronized (bufferLock) {
this.decryptCipher = MojangCrypt.getCipher(2, secretKey);
this.encryptCipher = MojangCrypt.getCipher(1, secretKey);
this.encrypted = true;
}
this.decryptCipher = MojangCrypt.getCipher(2, secretKey);
this.encryptCipher = MojangCrypt.getCipher(1, secretKey);
this.encrypted = true;
}
/**
@ -193,72 +187,18 @@ public class PlayerSocketConnection extends PlayerConnection {
final int threshold = MinecraftServer.getCompressionThreshold();
Check.stateCondition(threshold == 0, "Compression cannot be enabled because the threshold is equal to 0");
writeAndFlush(new SetCompressionPacket(threshold));
synchronized (bufferLock) {
this.compressed = true;
}
}
/**
* Writes a packet to the connection channel.
* <p>
* All packets are flushed during {@link net.minestom.server.entity.Player#update(long)}.
*
* @param serverPacket the packet to write
*/
@Override
public void sendPacket(@NotNull ServerPacket serverPacket, boolean skipTranslating) {
if (!channel.isConnected()) return;
if (shouldSendPacket(serverPacket)) {
final Player player = getPlayer();
if (player != null) {
// Flush happen during #update()
if ((MinestomAdventure.AUTOMATIC_COMPONENT_TRANSLATION && !skipTranslating) && serverPacket instanceof ComponentHoldingServerPacket) {
serverPacket = ((ComponentHoldingServerPacket) serverPacket).copyWithOperator(component ->
GlobalTranslator.render(component, Objects.requireNonNullElseGet(player.getLocale(), MinestomAdventure::getDefaultLocale)));
}
writePacket(serverPacket);
} else {
// Player is probably not logged yet
writeAndFlush(serverPacket);
}
}
this.compressed = true;
}
@Override
public void sendPacket(@NotNull FramedPacket framedPacket) {
write(framedPacket.body());
public void sendPacket(@NotNull SendablePacket packet) {
final boolean compressed = this.compressed;
this.worker.queue().offer(() -> writePacketSync(packet, compressed));
}
@ApiStatus.Internal
public void write(@NotNull ByteBuffer buffer, int index, int length) {
synchronized (bufferLock) {
if (encrypted) { // Encryption support
ByteBuffer output = PacketUtils.localBuffer();
try {
this.encryptCipher.update(buffer.slice(index, length), output);
buffer = output.flip();
index = 0;
} catch (ShortBufferException e) {
MinecraftServer.getExceptionManager().handleException(e);
return;
}
}
BinaryBuffer localBuffer = tickBuffer.getPlain();
final int capacity = localBuffer.capacity();
if (length <= capacity) {
if (!localBuffer.canWrite(length)) localBuffer = updateLocalBuffer();
localBuffer.write(buffer, index, length);
} else {
final int bufferCount = length / capacity + 1;
for (int i = 0; i < bufferCount; i++) {
final int sliceStart = i * capacity;
final int sliceLength = Math.min(length, sliceStart + capacity) - sliceStart;
if (!localBuffer.canWrite(sliceLength)) localBuffer = updateLocalBuffer();
localBuffer.write(buffer, sliceStart, sliceLength);
}
}
}
this.worker.queue().offer(() -> writeBufferSync(buffer, index, length));
}
@ApiStatus.Internal
@ -266,58 +206,17 @@ public class PlayerSocketConnection extends PlayerConnection {
write(buffer, buffer.position(), buffer.remaining());
}
private void writePacket(@NotNull ServerPacket packet) {
write(PacketUtils.createFramedPacket(packet, compressed));
}
public void writeAndFlush(@NotNull ServerPacket packet) {
synchronized (bufferLock) {
writePacket(packet);
flush();
}
final boolean compressed = this.compressed;
this.worker.queue().offer(() -> {
writeServerPacketSync(packet, compressed);
flushSync();
});
}
@Override
public void flush() {
try {
if (!channel.isConnected())
throw new ClosedChannelException();
synchronized (bufferLock) {
try {
updateLocalBuffer();
} catch (OutOfMemoryError e) {
this.waitingBuffers.clear();
System.gc(); // Explicit gc forcing buffers to be collected
throw new ClosedChannelException();
}
}
synchronized (flushLock) {
try {
// Write as much as possible from the waiting list
Iterator<BinaryBuffer> iterator = waitingBuffers.iterator();
while (iterator.hasNext()) {
BinaryBuffer waitingBuffer = iterator.next();
if (!waitingBuffer.writeChannel(channel)) break;
iterator.remove();
PooledBuffers.add(waitingBuffer);
}
} catch (IOException e) { // Couldn't write to the socket
MinecraftServer.getExceptionManager().handleException(e);
throw new ClosedChannelException();
}
}
} catch (ClosedChannelException e) {
disconnect();
}
}
private BinaryBuffer updateLocalBuffer() {
synchronized (flushLock) {
BinaryBuffer newBuffer = PooledBuffers.get();
this.waitingBuffers.add(tickBuffer.getPlain());
this.tickBuffer.setPlain(newBuffer);
return newBuffer;
}
this.worker.queue().offer(this::flushSync);
}
@Override
@ -339,7 +238,7 @@ public class PlayerSocketConnection extends PlayerConnection {
@Override
public void disconnect() {
this.worker.disconnect(this, channel);
this.worker.queue().offer(() -> this.worker.disconnect(this, channel));
}
public @NotNull SocketChannel getChannel() {
@ -474,4 +373,102 @@ public class PlayerSocketConnection extends PlayerConnection {
public void setNonce(byte[] nonce) {
this.nonce = nonce;
}
private void writePacketSync(SendablePacket packet, boolean compressed) {
if (!channel.isConnected()) return;
if (packet instanceof ServerPacket serverPacket) {
writeServerPacketSync(serverPacket, compressed);
} else if (packet instanceof FramedPacket framedPacket) {
writeFramedPacketSync(framedPacket);
} else if (packet instanceof CachedPacket cachedPacket) {
writeFramedPacketSync(cachedPacket.retrieve());
} else {
throw new RuntimeException("Unknown packet type: " + packet.getClass().getName());
}
}
private void writeServerPacketSync(ServerPacket serverPacket, boolean compressed) {
if (!shouldSendPacket(serverPacket)) return;
final Player player = getPlayer();
if (player != null) {
if (MinestomAdventure.AUTOMATIC_COMPONENT_TRANSLATION && serverPacket instanceof ComponentHoldingServerPacket) {
serverPacket = ((ComponentHoldingServerPacket) serverPacket).copyWithOperator(component ->
GlobalTranslator.render(component, Objects.requireNonNullElseGet(player.getLocale(), MinestomAdventure::getDefaultLocale)));
}
}
writeBufferSync(PacketUtils.createFramedPacket(serverPacket, compressed));
if (player == null) flushSync(); // Player is probably not logged yet
}
private void writeFramedPacketSync(FramedPacket framedPacket) {
writeBufferSync(framedPacket.body());
}
private void writeBufferSync(@NotNull ByteBuffer buffer, int index, int length) {
if (encrypted) { // Encryption support
ByteBuffer output = PacketUtils.localBuffer();
try {
this.encryptCipher.update(buffer.slice(index, length), output);
buffer = output.flip();
index = 0;
} catch (ShortBufferException e) {
MinecraftServer.getExceptionManager().handleException(e);
return;
}
}
BinaryBuffer localBuffer = tickBuffer.getPlain();
final int capacity = localBuffer.capacity();
if (length <= capacity) {
if (!localBuffer.canWrite(length)) localBuffer = updateLocalBuffer();
localBuffer.write(buffer, index, length);
} else {
final int bufferCount = length / capacity + 1;
for (int i = 0; i < bufferCount; i++) {
final int sliceStart = i * capacity;
final int sliceLength = Math.min(length, sliceStart + capacity) - sliceStart;
if (!localBuffer.canWrite(sliceLength)) localBuffer = updateLocalBuffer();
localBuffer.write(buffer, sliceStart, sliceLength);
}
}
}
private void writeBufferSync(@NotNull ByteBuffer buffer) {
writeBufferSync(buffer, buffer.position(), buffer.remaining());
}
public void flushSync() {
try {
if (!channel.isConnected()) throw new ClosedChannelException();
try {
updateLocalBuffer();
} catch (OutOfMemoryError e) {
this.waitingBuffers.clear();
System.gc(); // Explicit gc forcing buffers to be collected
throw new ClosedChannelException();
}
try {
// Write as much as possible from the waiting list
Iterator<BinaryBuffer> iterator = waitingBuffers.iterator();
while (iterator.hasNext()) {
BinaryBuffer waitingBuffer = iterator.next();
if (!waitingBuffer.writeChannel(channel)) break;
iterator.remove();
PooledBuffers.add(waitingBuffer);
}
} catch (IOException e) { // Couldn't write to the socket
MinecraftServer.getExceptionManager().handleException(e);
throw new ClosedChannelException();
}
} catch (ClosedChannelException e) {
disconnect();
}
}
private BinaryBuffer updateLocalBuffer() {
BinaryBuffer newBuffer = PooledBuffers.get();
this.waitingBuffers.add(tickBuffer.getPlain());
this.tickBuffer.setPlain(newBuffer);
return newBuffer;
}
}

View File

@ -7,6 +7,7 @@ import net.minestom.server.network.PacketProcessor;
import net.minestom.server.network.player.PlayerSocketConnection;
import net.minestom.server.thread.MinestomThread;
import net.minestom.server.utils.binary.BinaryBuffer;
import org.jctools.queues.MpscUnboundedArrayQueue;
import org.jetbrains.annotations.ApiStatus;
import java.io.IOException;
@ -15,6 +16,7 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -29,7 +31,7 @@ public final class Worker extends MinestomThread {
private final Map<SocketChannel, PlayerSocketConnection> connectionMap = new ConcurrentHashMap<>();
private final Server server;
private final PacketProcessor packetProcessor;
private final MpscUnboundedArrayQueue<Runnable> queue = new MpscUnboundedArrayQueue<>(1024);
private final AtomicBoolean flush = new AtomicBoolean();
public Worker(Server server, PacketProcessor packetProcessor) throws IOException {
@ -42,9 +44,10 @@ public final class Worker extends MinestomThread {
public void run() {
while (server.isOpen()) {
try {
this.queue.drain(Runnable::run);
// Flush all connections if needed
if (flush.compareAndSet(true, false)) {
connectionMap.values().forEach(PlayerSocketConnection::flush);
connectionMap.values().forEach(PlayerSocketConnection::flushSync);
}
// Wait for an event
this.selector.select(key -> {
@ -105,6 +108,10 @@ public final class Worker extends MinestomThread {
this.selector.wakeup();
}
public Queue<Runnable> queue() {
return queue;
}
/**
* Contains objects that we can be shared across all the connection of a {@link Worker worker}.
*/

View File

@ -9,13 +9,13 @@ import net.kyori.adventure.audience.Audience;
import net.kyori.adventure.audience.ForwardingAudience;
import net.minestom.server.MinecraftServer;
import net.minestom.server.Viewable;
import net.minestom.server.adventure.MinestomAdventure;
import net.minestom.server.adventure.audience.PacketGroupingAudience;
import net.minestom.server.entity.Entity;
import net.minestom.server.entity.Player;
import net.minestom.server.listener.manager.PacketListenerManager;
import net.minestom.server.network.packet.server.ComponentHoldingServerPacket;
import net.minestom.server.network.packet.server.CachedPacket;
import net.minestom.server.network.packet.server.FramedPacket;
import net.minestom.server.network.packet.server.SendablePacket;
import net.minestom.server.network.packet.server.ServerPacket;
import net.minestom.server.network.player.PlayerConnection;
import net.minestom.server.network.player.PlayerSocketConnection;
@ -24,7 +24,6 @@ import net.minestom.server.utils.binary.BinaryBuffer;
import net.minestom.server.utils.binary.BinaryWriter;
import net.minestom.server.utils.binary.PooledBuffers;
import net.minestom.server.utils.cache.LocalCache;
import net.minestom.server.utils.callback.validator.PlayerValidator;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -33,6 +32,7 @@ import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;
import java.util.zip.Deflater;
/**
@ -105,45 +105,26 @@ public final class PacketUtils {
* <p>
* Can drastically improve performance since the packet will not have to be processed as much.
*
* @param players the players to send the packet to
* @param packet the packet to send to the players
* @param playerValidator optional callback to check if a specify player of {@code players} should receive the packet
* @param players the players to send the packet to
* @param packet the packet to send to the players
* @param predicate predicate to ignore specific players
*/
public static void sendGroupedPacket(@NotNull Collection<Player> players, @NotNull ServerPacket packet,
@NotNull PlayerValidator playerValidator) {
if (players.isEmpty())
return;
@NotNull Predicate<Player> predicate) {
if (players.isEmpty()) return;
if (!PACKET_LISTENER_MANAGER.processServerPacket(packet, players)) return;
// work out if the packet needs to be sent individually due to server-side translating
boolean needsTranslating = false;
if (MinestomAdventure.AUTOMATIC_COMPONENT_TRANSLATION && packet instanceof ComponentHoldingServerPacket) {
needsTranslating = ComponentUtils.areAnyTranslatable(((ComponentHoldingServerPacket) packet).components());
}
if (GROUPED_PACKET && !needsTranslating) {
// Send grouped packet...
if (!PACKET_LISTENER_MANAGER.processServerPacket(packet, players))
return;
final FramedPacket framedPacket = new FramedPacket(packet, createFramedPacket(packet));
// Send packet to all players
players.forEach(player -> {
if (!player.isOnline() || !playerValidator.isValid(player))
return;
player.sendPacket(framedPacket);
});
} else {
// Write the same packet for each individual players
players.forEach(player -> {
if (!player.isOnline() || !playerValidator.isValid(player))
return;
player.getPlayerConnection().sendPacket(packet, false);
});
}
final SendablePacket sendablePacket = GROUPED_PACKET ? new CachedPacket(packet) : packet;
players.forEach(player -> {
if (predicate.test(player)) player.sendPacket(sendablePacket);
});
}
/**
* Same as {@link #sendGroupedPacket(Collection, ServerPacket, PlayerValidator)}
* Same as {@link #sendGroupedPacket(Collection, ServerPacket, Predicate)}
* but with the player validator sets to null.
*
* @see #sendGroupedPacket(Collection, ServerPacket, PlayerValidator)
* @see #sendGroupedPacket(Collection, ServerPacket, Predicate)
*/
public static void sendGroupedPacket(@NotNull Collection<Player> players, @NotNull ServerPacket packet) {
sendGroupedPacket(players, packet, player -> true);
@ -274,13 +255,15 @@ public final class PacketUtils {
private void process(Viewable viewable) {
if (buffer.writerOffset() == 0) return;
viewable.getViewers().forEach(this::processPlayer);
ByteBuffer copy = ByteBuffer.allocateDirect(buffer.writerOffset());
copy.put(buffer.asByteBuffer(0, copy.capacity()));
viewable.getViewers().forEach(player -> processPlayer(player, copy));
this.buffer.clear();
this.entityIdMap.clear();
}
private void processPlayer(Player player) {
final int size = buffer.writerOffset();
private void processPlayer(Player player, ByteBuffer buffer) {
final int size = buffer.limit();
final PlayerConnection connection = player.getPlayerConnection();
final LongArrayList pairs = entityIdMap.get(player.getEntityId());
if (pairs != null) {
@ -290,20 +273,16 @@ public final class PacketUtils {
for (int i = 0; i < pairs.size(); ++i) {
final long offsets = elements[i];
final int start = (int) (offsets >> 32);
if (start != lastWrite) writeTo(connection, lastWrite, start - lastWrite);
if (start != lastWrite) writeTo(connection, buffer, lastWrite, start - lastWrite);
lastWrite = (int) offsets; // End = last 32 bits
}
if (size != lastWrite) writeTo(connection, lastWrite, size - lastWrite);
if (size != lastWrite) writeTo(connection, buffer, lastWrite, size - lastWrite);
} else {
// Write all
writeTo(connection, 0, size);
writeTo(connection, buffer, 0, size);
}
}
private void writeTo(PlayerConnection connection, int offset, int length) {
writeTo(connection, buffer.asByteBuffer(), offset, length);
}
private static void writeTo(PlayerConnection connection, ByteBuffer buffer, int offset, int length) {
if (connection instanceof PlayerSocketConnection socketConnection) {
socketConnection.write(buffer, offset, length);