From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 From: Aikar Date: Wed, 6 May 2020 04:53:35 -0400 Subject: [PATCH] Optimize Network Manager and add advanced packet support Adds ability for 1 packet to bundle other packets to follow it Adds ability for a packet to delay sending more packets until a state is ready. Removes synchronization from sending packets Removes processing packet queue off of main thread - for the few cases where it is allowed, order is not necessary nor should it even be happening concurrently in first place (handshaking/login/status) Ensures packets sent asynchronously are dispatched on main thread This helps ensure safety for ProtocolLib as packet listeners are commonly accessing world state. This will allow you to schedule a packet to be sent async, but itll be dispatched sync for packet listeners to process. This should solve some deadlock risks Also adds Netty Channel Flush Consolidation to reduce the amount of flushing Also avoids spamming closed channel exception by rechecking closed state in dispatch and then catch exceptions and close if they fire. Part of this commit was authored by: Spottedleaf diff --git a/src/main/java/net/minecraft/network/Connection.java b/src/main/java/net/minecraft/network/Connection.java index 3247ec5d6cf329ba0b7e6d5a6c3294dec2e34db4..fc63df21aecd4721efdb45d4744666ed0b562c1b 100644 --- a/src/main/java/net/minecraft/network/Connection.java +++ b/src/main/java/net/minecraft/network/Connection.java @@ -25,8 +25,15 @@ import net.minecraft.network.chat.Component; import net.minecraft.network.chat.TranslatableComponent; import net.minecraft.network.protocol.Packet; import net.minecraft.network.protocol.PacketFlow; +import net.minecraft.network.protocol.game.ClientboundBossEventPacket; +import net.minecraft.network.protocol.game.ClientboundChatPacket; +import net.minecraft.network.protocol.game.ClientboundCommandSuggestionsPacket; import net.minecraft.network.protocol.game.ClientboundDisconnectPacket; +import net.minecraft.network.protocol.game.ClientboundKeepAlivePacket; +import net.minecraft.network.protocol.game.ClientboundSetTitlesPacket; +import net.minecraft.server.MCUtil; import net.minecraft.server.RunningOnDifferentThreadException; +import net.minecraft.server.level.ServerPlayer; import net.minecraft.server.network.ServerGamePacketListenerImpl; import net.minecraft.server.network.ServerLoginPacketListenerImpl; import net.minecraft.util.LazyLoadedValue; @@ -75,6 +82,10 @@ public class Connection extends SimpleChannelInboundHandler> { public int protocolVersion; public java.net.InetSocketAddress virtualHost; private static boolean enableExplicitFlush = Boolean.getBoolean("paper.explicit-flush"); + // Optimize network + public boolean isPending = true; + public boolean queueImmunity = false; + public ConnectionProtocol protocol; // Paper end public Connection(PacketFlow side) { @@ -98,6 +109,7 @@ public class Connection extends SimpleChannelInboundHandler> { } public void setProtocol(ConnectionProtocol state) { + protocol = state; // Paper this.channel.attr(Connection.ATTRIBUTE_PROTOCOL).set(state); this.channel.config().setAutoRead(true); Connection.LOGGER.debug("Enabled auto read"); @@ -168,19 +180,84 @@ public class Connection extends SimpleChannelInboundHandler> { Validate.notNull(listener, "packetListener", new Object[0]); this.packetListener = listener; } + // Paper start + public ServerPlayer getPlayer() { + if (packetListener instanceof ServerGamePacketListenerImpl) { + return ((ServerGamePacketListenerImpl) packetListener).player; + } else { + return null; + } + } + private static class InnerUtil { // Attempt to hide these methods from ProtocolLib so it doesn't accidently pick them up. + private static java.util.List buildExtraPackets(Packet packet) { + java.util.List extra = packet.getExtraPackets(); + if (extra == null || extra.isEmpty()) { + return null; + } + java.util.List ret = new java.util.ArrayList<>(1 + extra.size()); + buildExtraPackets0(extra, ret); + return ret; + } + + private static void buildExtraPackets0(java.util.List extraPackets, java.util.List into) { + for (Packet extra : extraPackets) { + into.add(extra); + java.util.List extraExtra = extra.getExtraPackets(); + if (extraExtra != null && !extraExtra.isEmpty()) { + buildExtraPackets0(extraExtra, into); + } + } + } + // Paper start + private static boolean canSendImmediate(Connection networkManager, Packet packet) { + return networkManager.isPending || networkManager.protocol != ConnectionProtocol.PLAY || + packet instanceof ClientboundKeepAlivePacket || + packet instanceof ClientboundChatPacket || + packet instanceof ClientboundCommandSuggestionsPacket || + packet instanceof ClientboundSetTitlesPacket || + packet instanceof ClientboundBossEventPacket; + } + // Paper end + } + // Paper end public void send(Packet packet) { this.send(packet, (GenericFutureListener) null); } public void send(Packet packet, @Nullable GenericFutureListener> callback) { - if (this.isConnected()) { - this.flushQueue(); - this.sendPacket(packet, callback); - } else { - this.queue.add(new Connection.PacketHolder(packet, callback)); + // Paper start - handle oversized packets better + boolean connected = this.isConnected(); + if (!connected && !preparing) { + return; // Do nothing + } + packet.onPacketDispatch(getPlayer()); + if (connected && (InnerUtil.canSendImmediate(this, packet) || ( + MCUtil.isMainThread() && packet.isReady() && this.queue.isEmpty() && + (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty()) + ))) { + this.dispatchPacket(packet, callback); + return; } + // write the packets to the queue, then flush - antixray hooks there already + java.util.List extraPackets = InnerUtil.buildExtraPackets(packet); + boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty(); + if (!hasExtraPackets) { + this.queue.add(new Connection.PacketHolder(packet, callback)); + } else { + java.util.List packets = new java.util.ArrayList<>(1 + extraPackets.size()); + packets.add(new Connection.PacketHolder(packet, null)); // delay the future listener until the end of the extra packets + + for (int i = 0, len = extraPackets.size(); i < len;) { + Packet extra = extraPackets.get(i); + boolean end = ++i == len; + packets.add(new Connection.PacketHolder(extra, end ? callback : null)); // append listener to the end + } + this.queue.addAll(packets); // atomic + } + this.sendPacketQueue(); + // Paper end } private void dispatchPacket(Packet packet, @Nullable GenericFutureListener> genericFutureListener) { this.sendPacket(packet, genericFutureListener); } // Paper - OBFHELPER @@ -194,54 +271,119 @@ public class Connection extends SimpleChannelInboundHandler> { this.channel.config().setAutoRead(false); } + ServerPlayer player = getPlayer(); // Paper if (this.channel.eventLoop().inEventLoop()) { if (enumprotocol != enumprotocol1) { this.setProtocol(enumprotocol); } + // Paper start + if (!isConnected()) { + packet.onPacketDispatchFinish(player, null); + return; + } + try { + // Paper end ChannelFuture channelfuture = this.channel.writeAndFlush(packet); if (callback != null) { channelfuture.addListener(callback); } + // Paper start + if (packet.hasFinishListener()) { + channelfuture.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(player, channelFuture)); + } + // Paper end channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); + // Paper start + } catch (Exception e) { + LOGGER.error("NetworkException: " + player, e); + disconnect(new TranslatableComponent("disconnect.genericReason", "Internal Exception: " + e.getMessage()));; + packet.onPacketDispatchFinish(player, null); + } + // Paper end } else { this.channel.eventLoop().execute(() -> { if (enumprotocol != enumprotocol1) { this.setProtocol(enumprotocol); } + // Paper start + if (!isConnected()) { + packet.onPacketDispatchFinish(player, null); + return; + } + try { + // Paper end ChannelFuture channelfuture1 = this.channel.writeAndFlush(packet); + if (callback != null) { channelfuture1.addListener(callback); } + // Paper start + if (packet.hasFinishListener()) { + channelfuture1.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(player, channelFuture)); + } + // Paper end channelfuture1.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); + // Paper start + } catch (Exception e) { + LOGGER.error("NetworkException: " + player, e); + disconnect(new TranslatableComponent("disconnect.genericReason", "Internal Exception: " + e.getMessage()));; + packet.onPacketDispatchFinish(player, null); + } + // Paper end }); } } - private void sendPacketQueue() { this.flushQueue(); } // Paper - OBFHELPER - private void flushQueue() { - if (this.channel != null && this.channel.isOpen()) { - Queue queue = this.queue; - + // Paper start - rewrite this to be safer if ran off main thread + private boolean sendPacketQueue() { return this.p(); } // OBFHELPER // void -> boolean + private boolean p() { // void -> boolean + if (!isConnected()) { + return true; + } + if (MCUtil.isMainThread()) { + return processQueue(); + } else if (isPending) { + // Should only happen during login/status stages synchronized (this.queue) { - Connection.PacketHolder networkmanager_queuedpacket; - - while ((networkmanager_queuedpacket = (Connection.PacketHolder) this.queue.poll()) != null) { - this.sendPacket(networkmanager_queuedpacket.packet, networkmanager_queuedpacket.listener); - } + return this.processQueue(); + } + } + return false; + } + private boolean processQueue() { + if (this.queue.isEmpty()) return true; + // If we are on main, we are safe here in that nothing else should be processing queue off main anymore + // But if we are not on main due to login/status, the parent is synchronized on packetQueue + java.util.Iterator iterator = this.queue.iterator(); + while (iterator.hasNext()) { + Connection.PacketHolder queued = iterator.next(); // poll -> peek + + // Fix NPE (Spigot bug caused by handleDisconnection()) + if (queued == null) { + return true; + } + Packet packet = queued.getPacket(); + if (!packet.isReady()) { + return false; + } else { + iterator.remove(); + this.dispatchPacket(packet, queued.getGenericFutureListener()); } } + return true; } + // Paper end public void tick() { - this.flushQueue(); + this.p(); if (this.packetListener instanceof ServerLoginPacketListenerImpl) { ((ServerLoginPacketListenerImpl) this.packetListener).tick(); } @@ -271,9 +413,21 @@ public class Connection extends SimpleChannelInboundHandler> { return this.address; } + // Paper start + public void clearPacketQueue() { + ServerPlayer player = getPlayer(); + queue.forEach(queuedPacket -> { + Packet packet = queuedPacket.getPacket(); + if (packet.hasFinishListener()) { + packet.onPacketDispatchFinish(player, null); + } + }); + queue.clear(); + } // Paper end public void disconnect(Component disconnectReason) { // Spigot Start this.preparing = false; + clearPacketQueue(); // Paper // Spigot End if (this.channel.isOpen()) { this.channel.close(); // We can't wait as this may be called from an event loop. @@ -341,7 +495,7 @@ public class Connection extends SimpleChannelInboundHandler> { public void handleDisconnection() { if (this.channel != null && !this.channel.isOpen()) { if (this.disconnectionHandled) { - Connection.LOGGER.warn("handleDisconnection() called twice"); + //NetworkManager.LOGGER.warn("handleDisconnection() called twice"); // Paper - Do not log useless message } else { this.disconnectionHandled = true; if (this.getDisconnectedReason() != null) { @@ -349,7 +503,7 @@ public class Connection extends SimpleChannelInboundHandler> { } else if (this.getPacketListener() != null) { this.getPacketListener().a(new TranslatableComponent("multiplayer.disconnect.generic")); } - this.queue.clear(); // Free up packet queue. + clearPacketQueue(); // Paper // Paper start - Add PlayerConnectionCloseEvent final PacketListener packetListener = this.getPacketListener(); if (packetListener instanceof ServerGamePacketListenerImpl) { diff --git a/src/main/java/net/minecraft/network/protocol/Packet.java b/src/main/java/net/minecraft/network/protocol/Packet.java index 9914a82ba0ec146ab13fe94c4dbf0ebf64926536..22db5d0d2cc33498ca40162c66aa3b5fbf2f569f 100644 --- a/src/main/java/net/minecraft/network/protocol/Packet.java +++ b/src/main/java/net/minecraft/network/protocol/Packet.java @@ -1,5 +1,6 @@ package net.minecraft.network.protocol; +import io.netty.channel.ChannelFuture; // Paper import java.io.IOException; import net.minecraft.network.FriendlyByteBuf; import net.minecraft.network.PacketListener; @@ -13,6 +14,20 @@ public interface Packet { void handle(T listener); // Paper start + + /** + * @param player Null if not at PLAY stage yet + */ + default void onPacketDispatch(@javax.annotation.Nullable EntityPlayer player) {} + + /** + * @param player Null if not at PLAY stage yet + * @param future Can be null if packet was cancelled + */ + default void onPacketDispatchFinish(@javax.annotation.Nullable EntityPlayer player, @javax.annotation.Nullable ChannelFuture future) {} + default boolean hasFinishListener() { return false; } + default boolean isReady() { return true; } + default java.util.List getExtraPackets() { return null; } default boolean packetTooLarge(NetworkManager manager) { return false; } diff --git a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java index 6cb51a4fe3c11f53fbb556ce6b0d64b735254d51..d46910cfdc0aef046a0c79731a85d381953c328a 100644 --- a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java +++ b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java @@ -16,6 +16,7 @@ import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.flush.FlushConsolidationHandler; // Paper import io.netty.handler.timeout.ReadTimeoutHandler; import java.io.IOException; import java.net.InetAddress; @@ -54,10 +55,12 @@ public class ServerConnectionListener { private final List connections = Collections.synchronizedList(Lists.newArrayList()); // Paper start - prevent blocking on adding a new network manager while the server is ticking private final java.util.Queue pending = new java.util.concurrent.ConcurrentLinkedQueue<>(); + private static final boolean disableFlushConsolidation = Boolean.getBoolean("Paper.disableFlushConsolidate"); // Paper private void addPending() { Connection manager = null; while ((manager = pending.poll()) != null) { connections.add(manager); + manager.isPending = false; } } // Paper end @@ -92,6 +95,7 @@ public class ServerConnectionListener { ; } + if (!disableFlushConsolidation) channel.pipeline().addFirst(new FlushConsolidationHandler()); // Paper channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30)).addLast("legacy_query", new LegacyQueryHandler(ServerConnectionListener.this)).addLast("splitter", new Varint21FrameDecoder()).addLast("decoder", new PacketDecoder(PacketFlow.SERVERBOUND)).addLast("prepender", new Varint21LengthFieldPrepender()).addLast("encoder", new PacketEncoder(PacketFlow.CLIENTBOUND)); int j = ServerConnectionListener.this.server.getRateLimitPacketsPerSecond(); Object object = j > 0 ? new RateKickingConnection(j) : new Connection(PacketFlow.SERVERBOUND);