From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 From: Aikar Date: Wed, 6 May 2020 04:53:35 -0400 Subject: [PATCH] Optimize Network Manager and add advanced packet support Adds ability for 1 packet to bundle other packets to follow it Adds ability for a packet to delay sending more packets until a state is ready. Removes synchronization from sending packets Removes processing packet queue off of main thread - for the few cases where it is allowed, order is not necessary nor should it even be happening concurrently in first place (handshaking/login/status) Ensures packets sent asynchronously are dispatched on main thread This helps ensure safety for ProtocolLib as packet listeners are commonly accessing world state. This will allow you to schedule a packet to be sent async, but itll be dispatched sync for packet listeners to process. This should solve some deadlock risks Also adds Netty Channel Flush Consolidation to reduce the amount of flushing Also avoids spamming closed channel exception by rechecking closed state in dispatch and then catch exceptions and close if they fire. Part of this commit was authored by: Spottedleaf, sandtechnology diff --git a/src/main/java/net/minecraft/network/Connection.java b/src/main/java/net/minecraft/network/Connection.java index 25881c890c643ce90bdcda6b094d912bafb0ed75..1931db6936773657bd43b9b16de950cb3e7a2303 100644 --- a/src/main/java/net/minecraft/network/Connection.java +++ b/src/main/java/net/minecraft/network/Connection.java @@ -84,7 +84,7 @@ public class Connection extends SimpleChannelInboundHandler> { return new DefaultEventLoopGroup(0, (new ThreadFactoryBuilder()).setNameFormat("Netty Local Client IO #%d").setDaemon(true).build()); }); private final PacketFlow receiving; - private final Queue> pendingActions = Queues.newConcurrentLinkedQueue(); + private final Queue pendingActions = Queues.newConcurrentLinkedQueue(); public Channel channel; public SocketAddress address; // Spigot Start @@ -116,6 +116,10 @@ public class Connection extends SimpleChannelInboundHandler> { public java.net.InetSocketAddress virtualHost; private static boolean enableExplicitFlush = Boolean.getBoolean("paper.explicit-flush"); // Paper end + // Paper start - Optimize network + public boolean isPending = true; + public boolean queueImmunity; + // Paper end - Optimize network // Paper start - add utility methods public final net.minecraft.server.level.ServerPlayer getPlayer() { @@ -307,15 +311,39 @@ public class Connection extends SimpleChannelInboundHandler> { } public void send(Packet packet, @Nullable PacketSendListener callbacks, boolean flush) { - if (this.isConnected()) { - this.flushQueue(); + // Paper start - Optimize network: Handle oversized packets better + final boolean connected = this.isConnected(); + if (!connected && !this.preparing) { + return; + } + + packet.onPacketDispatch(this.getPlayer()); + if (connected && (InnerUtil.canSendImmediate(this, packet) + || (io.papermc.paper.util.MCUtil.isMainThread() && packet.isReady() && this.pendingActions.isEmpty() + && (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty())))) { this.sendPacket(packet, callbacks, flush); } else { - this.pendingActions.add((networkmanager) -> { - networkmanager.sendPacket(packet, callbacks, flush); - }); - } + // Write the packets to the queue, then flush - antixray hooks there already + final java.util.List> extraPackets = InnerUtil.buildExtraPackets(packet); + final boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty(); + if (!hasExtraPackets) { + this.pendingActions.add(new PacketSendAction(packet, callbacks, flush)); + } else { + final java.util.List actions = new java.util.ArrayList<>(1 + extraPackets.size()); + actions.add(new PacketSendAction(packet, null, false)); // Delay the future listener until the end of the extra packets + for (int i = 0, len = extraPackets.size(); i < len;) { + final Packet extraPacket = extraPackets.get(i); + final boolean end = ++i == len; + actions.add(new PacketSendAction(extraPacket, end ? callbacks : null, end)); // Append listener to the end + } + + this.pendingActions.addAll(actions); + } + + this.flushQueue(); + // Paper end - Optimize network + } } public void runOnceConnected(Consumer task) { @@ -323,7 +351,7 @@ public class Connection extends SimpleChannelInboundHandler> { this.flushQueue(); task.accept(this); } else { - this.pendingActions.add(task); + this.pendingActions.add(new WrappedConsumer(task)); // Paper - Optimize network } } @@ -341,6 +369,14 @@ public class Connection extends SimpleChannelInboundHandler> { } private void doSendPacket(Packet packet, @Nullable PacketSendListener callbacks, boolean flush) { + // Paper start - Optimize network + final net.minecraft.server.level.ServerPlayer player = this.getPlayer(); + if (!this.isConnected()) { + packet.onPacketDispatchFinish(player, null); + return; + } + try { + // Paper end - Optimize network ChannelFuture channelfuture = flush ? this.channel.writeAndFlush(packet) : this.channel.write(packet); if (callbacks != null) { @@ -360,14 +396,24 @@ public class Connection extends SimpleChannelInboundHandler> { }); } + // Paper start - Optimize network + if (packet.hasFinishListener()) { + channelfuture.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(player, channelFuture)); + } channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); + } catch (final Exception e) { + LOGGER.error("NetworkException: {}", player, e); + this.disconnect(Component.translatable("disconnect.genericReason", "Internal Exception: " + e.getMessage())); + packet.onPacketDispatchFinish(player, null); + } + // Paper end - Optimize network } public void flushChannel() { if (this.isConnected()) { this.flush(); } else { - this.pendingActions.add(Connection::flush); + this.pendingActions.add(new WrappedConsumer(Connection::flush)); // Paper - Optimize network } } @@ -400,20 +446,57 @@ public class Connection extends SimpleChannelInboundHandler> { return attributekey; } - private void flushQueue() { - if (this.channel != null && this.channel.isOpen()) { - Queue queue = this.pendingActions; - + // Paper start - Optimize network: Rewrite this to be safer if ran off main thread + private boolean flushQueue() { + if (!this.isConnected()) { + return true; + } + if (io.papermc.paper.util.MCUtil.isMainThread()) { + return this.processQueue(); + } else if (this.isPending) { + // Should only happen during login/status stages synchronized (this.pendingActions) { - Consumer consumer; + return this.processQueue(); + } + } + return false; + } + + private boolean processQueue() { + if (this.pendingActions.isEmpty()) { + return true; + } - while ((consumer = (Consumer) this.pendingActions.poll()) != null) { - consumer.accept(this); + // If we are on main, we are safe here in that nothing else should be processing queue off main anymore + // But if we are not on main due to login/status, the parent is synchronized on packetQueue + final java.util.Iterator iterator = this.pendingActions.iterator(); + while (iterator.hasNext()) { + final WrappedConsumer queued = iterator.next(); // poll -> peek + + // Fix NPE (Spigot bug caused by handleDisconnection()) + if (queued == null) { + return true; + } + + if (queued.isConsumed()) { + continue; + } + + if (queued instanceof PacketSendAction packetSendAction) { + final Packet packet = packetSendAction.packet; + if (!packet.isReady()) { + return false; } + } + iterator.remove(); + if (queued.tryMarkConsumed()) { + queued.accept(this); } } + return true; } + // Paper end - Optimize network public void tick() { this.flushQueue(); @@ -461,6 +544,7 @@ public class Connection extends SimpleChannelInboundHandler> { public void disconnect(Component disconnectReason) { // Spigot Start this.preparing = false; + this.clearPacketQueue(); // Paper - Optimize network // Spigot End if (this.channel == null) { this.delayedDisconnect = disconnectReason; @@ -630,7 +714,7 @@ public class Connection extends SimpleChannelInboundHandler> { public void handleDisconnection() { if (this.channel != null && !this.channel.isOpen()) { if (this.disconnectionHandled) { - Connection.LOGGER.warn("handleDisconnection() called twice"); + // Connection.LOGGER.warn("handleDisconnection() called twice"); // Paper - Don't log useless message } else { this.disconnectionHandled = true; PacketListener packetlistener = this.getPacketListener(); @@ -643,7 +727,7 @@ public class Connection extends SimpleChannelInboundHandler> { packetlistener1.onDisconnect(ichatbasecomponent); } - this.pendingActions.clear(); // Free up packet queue. + this.clearPacketQueue(); // Paper - Optimize network // Paper start - Add PlayerConnectionCloseEvent final PacketListener packetListener = this.getPacketListener(); if (packetListener instanceof net.minecraft.server.network.ServerCommonPacketListenerImpl commonPacketListener) { @@ -680,4 +764,89 @@ public class Connection extends SimpleChannelInboundHandler> { public void setBandwidthLogger(SampleLogger log) { this.bandwidthDebugMonitor = new BandwidthDebugMonitor(log); } + + // Paper start - Optimize network + public void clearPacketQueue() { + final net.minecraft.server.level.ServerPlayer player = getPlayer(); + for (final Consumer queuedAction : this.pendingActions) { + if (queuedAction instanceof PacketSendAction packetSendAction) { + final Packet packet = packetSendAction.packet; + if (packet.hasFinishListener()) { + packet.onPacketDispatchFinish(player, null); + } + } + } + this.pendingActions.clear(); + } + + private static class InnerUtil { // Attempt to hide these methods from ProtocolLib, so it doesn't accidently pick them up. + + @Nullable + private static java.util.List> buildExtraPackets(final Packet packet) { + final java.util.List> extra = packet.getExtraPackets(); + if (extra == null || extra.isEmpty()) { + return null; + } + + final java.util.List> ret = new java.util.ArrayList<>(1 + extra.size()); + buildExtraPackets0(extra, ret); + return ret; + } + + private static void buildExtraPackets0(final java.util.List> extraPackets, final java.util.List> into) { + for (final Packet extra : extraPackets) { + into.add(extra); + final java.util.List> extraExtra = extra.getExtraPackets(); + if (extraExtra != null && !extraExtra.isEmpty()) { + buildExtraPackets0(extraExtra, into); + } + } + } + + private static boolean canSendImmediate(final Connection networkManager, final net.minecraft.network.protocol.Packet packet) { + return networkManager.isPending || networkManager.packetListener.protocol() != ConnectionProtocol.PLAY || + packet instanceof net.minecraft.network.protocol.common.ClientboundKeepAlivePacket || + packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerChatPacket || + packet instanceof net.minecraft.network.protocol.game.ClientboundSystemChatPacket || + packet instanceof net.minecraft.network.protocol.game.ClientboundCommandSuggestionsPacket || + packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitleTextPacket || + packet instanceof net.minecraft.network.protocol.game.ClientboundSetSubtitleTextPacket || + packet instanceof net.minecraft.network.protocol.game.ClientboundSetActionBarTextPacket || + packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitlesAnimationPacket || + packet instanceof net.minecraft.network.protocol.game.ClientboundClearTitlesPacket || + packet instanceof net.minecraft.network.protocol.game.ClientboundBossEventPacket; + } + } + + private static class WrappedConsumer implements Consumer { + private final Consumer delegate; + private final java.util.concurrent.atomic.AtomicBoolean consumed = new java.util.concurrent.atomic.AtomicBoolean(false); + + private WrappedConsumer(final Consumer delegate) { + this.delegate = delegate; + } + + @Override + public void accept(final Connection connection) { + this.delegate.accept(connection); + } + + public boolean tryMarkConsumed() { + return consumed.compareAndSet(false, true); + } + + public boolean isConsumed() { + return consumed.get(); + } + } + + private static final class PacketSendAction extends WrappedConsumer { + private final Packet packet; + + private PacketSendAction(final Packet packet, @Nullable final PacketSendListener packetSendListener, final boolean flush) { + super(connection -> connection.sendPacket(packet, packetSendListener, flush)); + this.packet = packet; + } + } + // Paper end - Optimize network } diff --git a/src/main/java/net/minecraft/network/protocol/Packet.java b/src/main/java/net/minecraft/network/protocol/Packet.java index cc658a61065d5c0021a4b88fa58b40211b94f8ec..da11266a0a23f446196e6facf2c358cfcc18070f 100644 --- a/src/main/java/net/minecraft/network/protocol/Packet.java +++ b/src/main/java/net/minecraft/network/protocol/Packet.java @@ -11,6 +11,30 @@ public interface Packet { void handle(T listener); // Paper start + /** + * @param player Null if not at PLAY stage yet + */ + default void onPacketDispatch(@Nullable net.minecraft.server.level.ServerPlayer player) { + } + + /** + * @param player Null if not at PLAY stage yet + * @param future Can be null if packet was cancelled + */ + default void onPacketDispatchFinish(@Nullable net.minecraft.server.level.ServerPlayer player, @Nullable io.netty.channel.ChannelFuture future) {} + + default boolean hasFinishListener() { + return false; + } + + default boolean isReady() { + return true; + } + + @Nullable + default java.util.List> getExtraPackets() { + return null; + } default boolean packetTooLarge(net.minecraft.network.Connection manager) { return false; } diff --git a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java index d870fbec236a3660f12e0f45cf9431067b18468b..caeead6c6082855f1651ee28263cc9f60423ca0c 100644 --- a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java +++ b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java @@ -63,10 +63,12 @@ public class ServerConnectionListener { final List connections = Collections.synchronizedList(Lists.newArrayList()); // Paper start - prevent blocking on adding a new network manager while the server is ticking private final java.util.Queue pending = new java.util.concurrent.ConcurrentLinkedQueue<>(); + private static final boolean disableFlushConsolidation = Boolean.getBoolean("Paper.disableFlushConsolidate"); // Paper - Optimize network private final void addPending() { Connection manager = null; while ((manager = pending.poll()) != null) { connections.add(manager); + manager.isPending = false; // Paper - Optimize network } } // Paper end @@ -103,6 +105,7 @@ public class ServerConnectionListener { ; } + if (!disableFlushConsolidation) channel.pipeline().addFirst(new io.netty.handler.flush.FlushConsolidationHandler()); // Paper - Optimize network ChannelPipeline channelpipeline = channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30)).addLast("legacy_query", new LegacyQueryHandler(ServerConnectionListener.this.getServer())); Connection.configureSerialization(channelpipeline, PacketFlow.SERVERBOUND, (BandwidthDebugMonitor) null);