Paper/patches/server/0985-Optimize-Network-Manager-and-add-advanced-packet-sup.patch
Jake Potrebic bd38e0318a
Updated Upstream (Bukkit/CraftBukkit) (#10379)
Updated Upstream (Bukkit/CraftBukkit)

Upstream has released updates that appear to apply and compile correctly.
This update has not been tested by PaperMC and as with ANY update, please do your own testing

Bukkit Changes:
f02baa38 PR-988: Add World#getIntersectingChunks(BoundingBox)
9321d665 Move getItemInUse up to LivingEntity
819eef73 PR-959: Add access to current item's remaining ticks
c4fdadb0 SPIGOT-7601: Add AbstractArrow#getItem
be8261ca Add support for Java 22
26119676 PR-979: Add more translation keys
66753362 PR-985: Correct book maximum pages and characters per page documentation
c8be92fa PR-980: Improve getArmorContents() documentation
f1120ee2 PR-983: Expose riptide velocity to PlayerRiptideEvent

CraftBukkit Changes:
dfaa89bbe PR-1369: Add World#getIntersectingChunks(BoundingBox)
51bbab2b9 Move getItemInUse up to LivingEntity
668e09602 PR-1331: Add access to current item's remaining ticks
a639406d1 SPIGOT-7601: Add AbstractArrow#getItem
0398930fc SPIGOT-7602: Allow opening in-world horse and related inventories
ffd15611c SPIGOT-7608: Allow empty lists to morph to any PDT list
2188dcfa9 Add support for Java 22
45d6a609f SPIGOT-7604: Revert "SPIGOT-7365: DamageCause blocked by shield should trigger invulnerableTime"
06d915943 SPIGOT-7365: DamageCause blocked by shield should trigger invulnerableTime
ca3bc3707 PR-1361: Add more translation keys
366c3ca80 SPIGOT-7600: EntityChangeBlockEvent is not fired for frog eggs
06d0f9ba8 SPIGOT-7593: Fix sapling growth physics / client-side updates
45c2608e4 PR-1366: Expose riptide velocity to PlayerRiptideEvent
29b6bb79b SPIGOT-7587: Remove fixes for now-resolved MC-142590 and MC-109346
2024-04-06 12:53:39 -07:00

395 lines
19 KiB
Diff

From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
From: Aikar <aikar@aikar.co>
Date: Wed, 6 May 2020 04:53:35 -0400
Subject: [PATCH] Optimize Network Manager and add advanced packet support
Adds ability for 1 packet to bundle other packets to follow it
Adds ability for a packet to delay sending more packets until a state is ready.
Removes synchronization from sending packets
Removes processing packet queue off of main thread
- for the few cases where it is allowed, order is not necessary nor
should it even be happening concurrently in first place (handshaking/login/status)
Ensures packets sent asynchronously are dispatched on main thread
This helps ensure safety for ProtocolLib as packet listeners
are commonly accessing world state. This will allow you to schedule
a packet to be sent async, but itll be dispatched sync for packet
listeners to process.
This should solve some deadlock risks
Also adds Netty Channel Flush Consolidation to reduce the amount of flushing
Also avoids spamming closed channel exception by rechecking closed state in dispatch
and then catch exceptions and close if they fire.
Part of this commit was authored by: Spottedleaf, sandtechnology
diff --git a/src/main/java/net/minecraft/network/Connection.java b/src/main/java/net/minecraft/network/Connection.java
index 22a7f17180b76b6c3548d3b54ae8218a469401a8..7f2aa5e17fe675f3404d67b1794d2ca68b188eb9 100644
--- a/src/main/java/net/minecraft/network/Connection.java
+++ b/src/main/java/net/minecraft/network/Connection.java
@@ -84,7 +84,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
return new DefaultEventLoopGroup(0, (new ThreadFactoryBuilder()).setNameFormat("Netty Local Client IO #%d").setDaemon(true).setUncaughtExceptionHandler(new net.minecraft.DefaultUncaughtExceptionHandlerWithName(LOGGER)).build()); // Paper
});
private final PacketFlow receiving;
- private final Queue<Consumer<Connection>> pendingActions = Queues.newConcurrentLinkedQueue();
+ private final Queue<WrappedConsumer> pendingActions = Queues.newConcurrentLinkedQueue();
public Channel channel;
public SocketAddress address;
// Spigot Start
@@ -116,6 +116,10 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
public java.net.InetSocketAddress virtualHost;
private static boolean enableExplicitFlush = Boolean.getBoolean("paper.explicit-flush"); // Paper - Disable explicit network manager flushing
// Paper end
+ // Paper start - Optimize network
+ public boolean isPending = true;
+ public boolean queueImmunity;
+ // Paper end - Optimize network
// Paper start - add utility methods
public final net.minecraft.server.level.ServerPlayer getPlayer() {
@@ -375,15 +379,39 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
}
public void send(Packet<?> packet, @Nullable PacketSendListener callbacks, boolean flush) {
- if (this.isConnected()) {
- this.flushQueue();
+ // Paper start - Optimize network: Handle oversized packets better
+ final boolean connected = this.isConnected();
+ if (!connected && !this.preparing) {
+ return;
+ }
+
+ packet.onPacketDispatch(this.getPlayer());
+ if (connected && (InnerUtil.canSendImmediate(this, packet)
+ || (io.papermc.paper.util.MCUtil.isMainThread() && packet.isReady() && this.pendingActions.isEmpty()
+ && (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty())))) {
this.sendPacket(packet, callbacks, flush);
} else {
- this.pendingActions.add((networkmanager) -> {
- networkmanager.sendPacket(packet, callbacks, flush);
- });
- }
+ // Write the packets to the queue, then flush - antixray hooks there already
+ final java.util.List<Packet<?>> extraPackets = InnerUtil.buildExtraPackets(packet);
+ final boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty();
+ if (!hasExtraPackets) {
+ this.pendingActions.add(new PacketSendAction(packet, callbacks, flush));
+ } else {
+ final java.util.List<PacketSendAction> actions = new java.util.ArrayList<>(1 + extraPackets.size());
+ actions.add(new PacketSendAction(packet, null, false)); // Delay the future listener until the end of the extra packets
+ for (int i = 0, len = extraPackets.size(); i < len;) {
+ final Packet<?> extraPacket = extraPackets.get(i);
+ final boolean end = ++i == len;
+ actions.add(new PacketSendAction(extraPacket, end ? callbacks : null, end)); // Append listener to the end
+ }
+
+ this.pendingActions.addAll(actions);
+ }
+
+ this.flushQueue();
+ // Paper end - Optimize network
+ }
}
public void runOnceConnected(Consumer<Connection> task) {
@@ -391,7 +419,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
this.flushQueue();
task.accept(this);
} else {
- this.pendingActions.add(task);
+ this.pendingActions.add(new WrappedConsumer(task)); // Paper - Optimize network
}
}
@@ -409,6 +437,14 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
}
private void doSendPacket(Packet<?> packet, @Nullable PacketSendListener callbacks, boolean flush) {
+ // Paper start - Optimize network
+ final net.minecraft.server.level.ServerPlayer player = this.getPlayer();
+ if (!this.isConnected()) {
+ packet.onPacketDispatchFinish(player, null);
+ return;
+ }
+ try {
+ // Paper end - Optimize network
ChannelFuture channelfuture = flush ? this.channel.writeAndFlush(packet) : this.channel.write(packet);
if (callbacks != null) {
@@ -428,14 +464,24 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
});
}
+ // Paper start - Optimize network
+ if (packet.hasFinishListener()) {
+ channelfuture.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(player, channelFuture));
+ }
channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+ } catch (final Exception e) {
+ LOGGER.error("NetworkException: {}", player, e);
+ this.disconnect(Component.translatable("disconnect.genericReason", "Internal Exception: " + e.getMessage()));
+ packet.onPacketDispatchFinish(player, null);
+ }
+ // Paper end - Optimize network
}
public void flushChannel() {
if (this.isConnected()) {
this.flush();
} else {
- this.pendingActions.add(Connection::flush);
+ this.pendingActions.add(new WrappedConsumer(Connection::flush)); // Paper - Optimize network
}
}
@@ -468,20 +514,57 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
return attributekey;
}
- private void flushQueue() {
- if (this.channel != null && this.channel.isOpen()) {
- Queue queue = this.pendingActions;
-
+ // Paper start - Optimize network: Rewrite this to be safer if ran off main thread
+ private boolean flushQueue() {
+ if (!this.isConnected()) {
+ return true;
+ }
+ if (io.papermc.paper.util.MCUtil.isMainThread()) {
+ return this.processQueue();
+ } else if (this.isPending) {
+ // Should only happen during login/status stages
synchronized (this.pendingActions) {
- Consumer consumer;
+ return this.processQueue();
+ }
+ }
+ return false;
+ }
+
+ private boolean processQueue() {
+ if (this.pendingActions.isEmpty()) {
+ return true;
+ }
- while ((consumer = (Consumer) this.pendingActions.poll()) != null) {
- consumer.accept(this);
+ // If we are on main, we are safe here in that nothing else should be processing queue off main anymore
+ // But if we are not on main due to login/status, the parent is synchronized on packetQueue
+ final java.util.Iterator<WrappedConsumer> iterator = this.pendingActions.iterator();
+ while (iterator.hasNext()) {
+ final WrappedConsumer queued = iterator.next(); // poll -> peek
+
+ // Fix NPE (Spigot bug caused by handleDisconnection())
+ if (queued == null) {
+ return true;
+ }
+
+ if (queued.isConsumed()) {
+ continue;
+ }
+
+ if (queued instanceof PacketSendAction packetSendAction) {
+ final Packet<?> packet = packetSendAction.packet;
+ if (!packet.isReady()) {
+ return false;
}
+ }
+ iterator.remove();
+ if (queued.tryMarkConsumed()) {
+ queued.accept(this);
}
}
+ return true;
}
+ // Paper end - Optimize network
private static final int MAX_PER_TICK = io.papermc.paper.configuration.GlobalConfiguration.get().misc.maxJoinsPerTick; // Paper - Buffer joins to world
private static int joinAttemptsThisTick; // Paper - Buffer joins to world
@@ -544,6 +627,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
public void disconnect(Component disconnectReason) {
// Spigot Start
this.preparing = false;
+ this.clearPacketQueue(); // Paper - Optimize network
// Spigot End
if (this.channel == null) {
this.delayedDisconnect = disconnectReason;
@@ -715,7 +799,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
public void handleDisconnection() {
if (this.channel != null && !this.channel.isOpen()) {
if (this.disconnectionHandled) {
- Connection.LOGGER.warn("handleDisconnection() called twice");
+ // Connection.LOGGER.warn("handleDisconnection() called twice"); // Paper - Don't log useless message
} else {
this.disconnectionHandled = true;
PacketListener packetlistener = this.getPacketListener();
@@ -728,7 +812,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
packetlistener1.onDisconnect(ichatbasecomponent);
}
- this.pendingActions.clear(); // Free up packet queue.
+ this.clearPacketQueue(); // Paper - Optimize network
// Paper start - Add PlayerConnectionCloseEvent
final PacketListener packetListener = this.getPacketListener();
if (packetListener instanceof net.minecraft.server.network.ServerCommonPacketListenerImpl commonPacketListener) {
@@ -765,4 +849,93 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
public void setBandwidthLogger(SampleLogger log) {
this.bandwidthDebugMonitor = new BandwidthDebugMonitor(log);
}
+
+ // Paper start - Optimize network
+ public void clearPacketQueue() {
+ final net.minecraft.server.level.ServerPlayer player = getPlayer();
+ for (final Consumer<Connection> queuedAction : this.pendingActions) {
+ if (queuedAction instanceof PacketSendAction packetSendAction) {
+ final Packet<?> packet = packetSendAction.packet;
+ if (packet.hasFinishListener()) {
+ packet.onPacketDispatchFinish(player, null);
+ }
+ }
+ }
+ this.pendingActions.clear();
+ }
+
+ private static class InnerUtil { // Attempt to hide these methods from ProtocolLib, so it doesn't accidently pick them up.
+
+ @Nullable
+ private static java.util.List<Packet<?>> buildExtraPackets(final Packet<?> packet) {
+ final java.util.List<Packet<?>> extra = packet.getExtraPackets();
+ if (extra == null || extra.isEmpty()) {
+ return null;
+ }
+
+ final java.util.List<Packet<?>> ret = new java.util.ArrayList<>(1 + extra.size());
+ buildExtraPackets0(extra, ret);
+ return ret;
+ }
+
+ private static void buildExtraPackets0(final java.util.List<Packet<?>> extraPackets, final java.util.List<Packet<?>> into) {
+ for (final Packet<?> extra : extraPackets) {
+ into.add(extra);
+ final java.util.List<Packet<?>> extraExtra = extra.getExtraPackets();
+ if (extraExtra != null && !extraExtra.isEmpty()) {
+ buildExtraPackets0(extraExtra, into);
+ }
+ }
+ }
+
+ private static boolean canSendImmediate(final Connection networkManager, final net.minecraft.network.protocol.Packet<?> packet) {
+ return networkManager.isPending || networkManager.packetListener.protocol() != ConnectionProtocol.PLAY ||
+ packet instanceof net.minecraft.network.protocol.common.ClientboundKeepAlivePacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerChatPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSystemChatPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundCommandSuggestionsPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitleTextPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetSubtitleTextPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetActionBarTextPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitlesAnimationPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundClearTitlesPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSoundPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSoundEntityPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundStopSoundPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundLevelParticlesPacket ||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundBossEventPacket;
+ }
+ }
+
+ private static class WrappedConsumer implements Consumer<Connection> {
+ private final Consumer<Connection> delegate;
+ private final java.util.concurrent.atomic.AtomicBoolean consumed = new java.util.concurrent.atomic.AtomicBoolean(false);
+
+ private WrappedConsumer(final Consumer<Connection> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void accept(final Connection connection) {
+ this.delegate.accept(connection);
+ }
+
+ public boolean tryMarkConsumed() {
+ return consumed.compareAndSet(false, true);
+ }
+
+ public boolean isConsumed() {
+ return consumed.get();
+ }
+ }
+
+ private static final class PacketSendAction extends WrappedConsumer {
+ private final Packet<?> packet;
+
+ private PacketSendAction(final Packet<?> packet, @Nullable final PacketSendListener packetSendListener, final boolean flush) {
+ super(connection -> connection.sendPacket(packet, packetSendListener, flush));
+ this.packet = packet;
+ }
+ }
+ // Paper end - Optimize network
}
diff --git a/src/main/java/net/minecraft/network/protocol/Packet.java b/src/main/java/net/minecraft/network/protocol/Packet.java
index cc658a61065d5c0021a4b88fa58b40211b94f8ec..da11266a0a23f446196e6facf2c358cfcc18070f 100644
--- a/src/main/java/net/minecraft/network/protocol/Packet.java
+++ b/src/main/java/net/minecraft/network/protocol/Packet.java
@@ -11,6 +11,30 @@ public interface Packet<T extends PacketListener> {
void handle(T listener);
// Paper start
+ /**
+ * @param player Null if not at PLAY stage yet
+ */
+ default void onPacketDispatch(@Nullable net.minecraft.server.level.ServerPlayer player) {
+ }
+
+ /**
+ * @param player Null if not at PLAY stage yet
+ * @param future Can be null if packet was cancelled
+ */
+ default void onPacketDispatchFinish(@Nullable net.minecraft.server.level.ServerPlayer player, @Nullable io.netty.channel.ChannelFuture future) {}
+
+ default boolean hasFinishListener() {
+ return false;
+ }
+
+ default boolean isReady() {
+ return true;
+ }
+
+ @Nullable
+ default java.util.List<Packet<?>> getExtraPackets() {
+ return null;
+ }
default boolean packetTooLarge(net.minecraft.network.Connection manager) {
return false;
}
diff --git a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
index 4f330a44c77a7ec3237a86fda04921a8c4a1c00f..a4a29a7ea0035ecf4c61ee8547a9eb24acb667d0 100644
--- a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
+++ b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
@@ -63,10 +63,12 @@ public class ServerConnectionListener {
final List<Connection> connections = Collections.synchronizedList(Lists.newArrayList());
// Paper start - prevent blocking on adding a new connection while the server is ticking
private final java.util.Queue<Connection> pending = new java.util.concurrent.ConcurrentLinkedQueue<>();
+ private static final boolean disableFlushConsolidation = Boolean.getBoolean("Paper.disableFlushConsolidate"); // Paper - Optimize network
private final void addPending() {
Connection connection;
while ((connection = pending.poll()) != null) {
connections.add(connection);
+ connection.isPending = false; // Paper - Optimize network
}
}
// Paper end - prevent blocking on adding a new connection while the server is ticking
@@ -114,6 +116,7 @@ public class ServerConnectionListener {
;
}
+ if (!disableFlushConsolidation) channel.pipeline().addFirst(new io.netty.handler.flush.FlushConsolidationHandler()); // Paper - Optimize network
ChannelPipeline channelpipeline = channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30)).addLast("legacy_query", new LegacyQueryHandler(ServerConnectionListener.this.getServer()));
Connection.configureSerialization(channelpipeline, PacketFlow.SERVERBOUND, (BandwidthDebugMonitor) null);