From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 From: Aikar Date: Wed, 6 May 2020 04:53:35 -0400 Subject: [PATCH] Optimize Network Manager and add advanced packet support Adds ability for 1 packet to bundle other packets to follow it Adds ability for a packet to delay sending more packets until a state is ready. Adds ability to clean up a packet when it is finished (not sent, or finished encoding), such as freeing buffers Removes synchronization from sending packets Removes processing packet queue off of main thread - for the few cases where it is allowed, order is not necessary nor should it even be happening concurrently in first place (handshaking/login/status) Ensures packets sent asynchronously are dispatched on main thread This helps ensure safety for ProtocolLib as packet listeners are commonly accessing world state. This will allow you to schedule a packet to be sent async, but itll be dispatched sync for packet listeners to process. This should solve some deadlock risks Part of this commit was authored by: Spottedleaf diff --git a/src/main/java/net/minecraft/server/NetworkManager.java b/src/main/java/net/minecraft/server/NetworkManager.java index 0000000000000000000000000000000000000000..0000000000000000000000000000000000000000 100644 --- a/src/main/java/net/minecraft/server/NetworkManager.java +++ b/src/main/java/net/minecraft/server/NetworkManager.java @@ -0,0 +0,0 @@ public class NetworkManager extends SimpleChannelInboundHandler> { public int protocolVersion; public java.net.InetSocketAddress virtualHost; private static boolean enableExplicitFlush = Boolean.getBoolean("paper.explicit-flush"); + // Optimize network + boolean isPending = true; + boolean queueImmunity = false; + EnumProtocol protocol; // Paper end public NetworkManager(EnumProtocolDirection enumprotocoldirection) { @@ -0,0 +0,0 @@ public class NetworkManager extends SimpleChannelInboundHandler> { } public void setProtocol(EnumProtocol enumprotocol) { + protocol = enumprotocol; // Paper this.channel.attr(NetworkManager.c).set(enumprotocol); this.channel.config().setAutoRead(true); NetworkManager.LOGGER.debug("Enabled auto read"); @@ -0,0 +0,0 @@ public class NetworkManager extends SimpleChannelInboundHandler> { NetworkManager.LOGGER.debug("Set listener of {} to {}", this, packetlistener); this.packetListener = packetlistener; } + // Paper start + private static class InnerUtil { // Attempt to hide these methods from ProtocolLib so it doesn't accidently pick them up. + private static java.util.List buildExtraPackets(Packet packet) { + java.util.List extra = packet.getExtraPackets(); + if (extra == null || extra.isEmpty()) { + return null; + } + java.util.List ret = new java.util.ArrayList<>(1 + extra.size()); + buildExtraPackets0(extra, ret); + return ret; + } + + private static void buildExtraPackets0(java.util.List extraPackets, java.util.List into) { + for (Packet extra : extraPackets) { + into.add(extra); + java.util.List extraExtra = extra.getExtraPackets(); + if (extraExtra != null && !extraExtra.isEmpty()) { + buildExtraPackets0(extraExtra, into); + } + } + } + // Paper start + private static boolean canSendImmediate(NetworkManager networkManager, Packet packet) { + return networkManager.isPending || networkManager.protocol == EnumProtocol.HANDSHAKING || networkManager.protocol == EnumProtocol.STATUS || networkManager.queueImmunity || + packet instanceof PacketPlayOutKeepAlive || + packet instanceof PacketPlayOutChat || + packet instanceof PacketPlayOutTabComplete; + } + // Paper end + } + // Paper end public void sendPacket(Packet packet) { this.sendPacket(packet, (GenericFutureListener) null); } public void sendPacket(Packet packet, @Nullable GenericFutureListener> genericfuturelistener) { - if (this.isConnected()) { - this.o(); - this.b(packet, genericfuturelistener); - } else { - this.packetQueue.add(new NetworkManager.QueuedPacket(packet, genericfuturelistener)); + // Paper start - handle oversized packets better + boolean connected = this.isConnected(); + if (!connected && !preparing) { + packet.onPacketDone(); + return; // Do nothing + } + if (connected && (InnerUtil.canSendImmediate(this, packet) || ( + MCUtil.isMainThread() && packet.isReady() && this.packetQueue.isEmpty() && + (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty()) + ))) { + this.dispatchPacket(packet, genericfuturelistener); + return; } + // write the packets to the queue, then flush - antixray hooks there already + java.util.List extraPackets = InnerUtil.buildExtraPackets(packet); + boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty(); + if (!hasExtraPackets) { + this.packetQueue.add(new NetworkManager.QueuedPacket(packet, genericfuturelistener)); + } else { + java.util.List packets = new java.util.ArrayList<>(1 + extraPackets.size()); + packets.add(new NetworkManager.QueuedPacket(packet, null)); // delay the future listener until the end of the extra packets + + for (int i = 0, len = extraPackets.size(); i < len;) { + Packet extra = extraPackets.get(i); + boolean end = ++i == len; + packets.add(new NetworkManager.QueuedPacket(extra, end ? genericfuturelistener : null)); // append listener to the end + } + this.packetQueue.addAll(packets); // atomic + } + this.sendPacketQueue(); + // Paper end } private void dispatchPacket(Packet packet, @Nullable GenericFutureListener> genericFutureListener) { this.b(packet, genericFutureListener); } // Paper - OBFHELPER @@ -0,0 +0,0 @@ public class NetworkManager extends SimpleChannelInboundHandler> { } - private void sendPacketQueue() { this.o(); } // Paper - OBFHELPER - private void o() { - if (this.channel != null && this.channel.isOpen()) { - Queue queue = this.packetQueue; - + // Paper start - rewrite this to be safer on + private boolean sendPacketQueue() { return this.o(); } // OBFHELPER // void -> boolean + private boolean o() { // void -> boolean + if (!isConnected()) { + return true; + } + if (MCUtil.isMainThread()) { + return processQueue(); + } else if (isPending) { + // Should only happen during login/status stages synchronized (this.packetQueue) { - NetworkManager.QueuedPacket networkmanager_queuedpacket; - - while ((networkmanager_queuedpacket = (NetworkManager.QueuedPacket) this.packetQueue.poll()) != null) { - this.b(networkmanager_queuedpacket.a, networkmanager_queuedpacket.b); - } + return this.processQueue(); + } + } + return false; + } + private boolean processQueue() { + if (this.packetQueue.isEmpty()) return true; + // If we are on main, we are safe here in that nothing else should be processing queue off main anymore + // But if we are not on main due to login/status, the parent is synchronized on packetQueue + java.util.Iterator iterator = this.packetQueue.iterator(); + while (iterator.hasNext()) { + NetworkManager.QueuedPacket queued = iterator.next(); // poll -> peek + + // Fix NPE (Spigot bug caused by handleDisconnection()) + if (queued == null) { + return true; + } + Packet packet = queued.getPacket(); + if (!packet.isReady()) { + return false; + } else { + iterator.remove(); + this.dispatchPacket(packet, queued.getGenericFutureListener()); } } + return true; } + // Paper end public void a() { this.o(); @@ -0,0 +0,0 @@ public class NetworkManager extends SimpleChannelInboundHandler> { return this.socketAddress; } + public void clearPacketQueue() { QueuedPacket packet; while ((packet = packetQueue.poll()) != null) packet.getPacket().onPacketDone(); } // Paper public void close(IChatBaseComponent ichatbasecomponent) { // Spigot Start this.preparing = false; + clearPacketQueue(); // Paper // Spigot End if (this.channel.isOpen()) { this.channel.close(); // We can't wait as this may be called from an event loop. @@ -0,0 +0,0 @@ public class NetworkManager extends SimpleChannelInboundHandler> { } else if (this.i() != null) { this.i().a(new ChatMessage("multiplayer.disconnect.generic", new Object[0])); } - this.packetQueue.clear(); // Free up packet queue. + clearPacketQueue(); // Paper // Paper start - Add PlayerConnectionCloseEvent final PacketListener packetListener = this.i(); if (packetListener instanceof PlayerConnection) { diff --git a/src/main/java/net/minecraft/server/Packet.java b/src/main/java/net/minecraft/server/Packet.java index 0000000000000000000000000000000000000000..0000000000000000000000000000000000000000 100644 --- a/src/main/java/net/minecraft/server/Packet.java +++ b/src/main/java/net/minecraft/server/Packet.java @@ -0,0 +0,0 @@ public interface Packet { void a(T t0); // Paper start + default void onPacketDone() {} + default boolean isReady() { return true; } + default java.util.List getExtraPackets() { return null; } default boolean packetTooLarge(NetworkManager manager) { return false; } diff --git a/src/main/java/net/minecraft/server/PacketEncoder.java b/src/main/java/net/minecraft/server/PacketEncoder.java index 0000000000000000000000000000000000000000..0000000000000000000000000000000000000000 100644 --- a/src/main/java/net/minecraft/server/PacketEncoder.java +++ b/src/main/java/net/minecraft/server/PacketEncoder.java @@ -0,0 +0,0 @@ public class PacketEncoder extends MessageToByteEncoder> { } else { throw throwable; } - } + } finally { try { packet.onPacketDone(); } catch (Exception e) { e.printStackTrace(); } ; } // Paper // Paper start int packetLength = bytebuf.readableBytes(); diff --git a/src/main/java/net/minecraft/server/PlayerList.java b/src/main/java/net/minecraft/server/PlayerList.java index 0000000000000000000000000000000000000000..0000000000000000000000000000000000000000 100644 --- a/src/main/java/net/minecraft/server/PlayerList.java +++ b/src/main/java/net/minecraft/server/PlayerList.java @@ -0,0 +0,0 @@ public abstract class PlayerList { // CraftBukkit - getType() // Spigot - view distance + networkmanager.queueImmunity = true; // Paper playerconnection.sendPacket(new PacketPlayOutLogin(entityplayer.getId(), entityplayer.playerInteractManager.getGameMode(), WorldData.c(worlddata.getSeed()), worlddata.isHardcore(), worldserver.worldProvider.getDimensionManager().getType(), this.getMaxPlayers(), worlddata.getType(), worldserver.spigotConfig.viewDistance, flag1, !flag)); entityplayer.getBukkitEntity().sendSupportedChannels(); // CraftBukkit playerconnection.sendPacket(new PacketPlayOutCustomPayload(PacketPlayOutCustomPayload.a, (new PacketDataSerializer(Unpooled.buffer())).a(this.getServer().getServerModName()))); @@ -0,0 +0,0 @@ public abstract class PlayerList { playerconnection.sendPacket(new PacketPlayOutRecipeUpdate(this.server.getCraftingManager().b())); playerconnection.sendPacket(new PacketPlayOutTags(this.server.getTagRegistry())); playerconnection.sendPacket(new PacketPlayOutEntityStatus(entityplayer, (byte) (worldserver.getGameRules().getBoolean(GameRules.REDUCED_DEBUG_INFO) ? 22 : 23))); // Paper - fix this rule not being initialized on the client + networkmanager.queueImmunity = false; // Paper this.d(entityplayer); entityplayer.getStatisticManager().c(); entityplayer.B().a(entityplayer); diff --git a/src/main/java/net/minecraft/server/ServerConnection.java b/src/main/java/net/minecraft/server/ServerConnection.java index 0000000000000000000000000000000000000000..0000000000000000000000000000000000000000 100644 --- a/src/main/java/net/minecraft/server/ServerConnection.java +++ b/src/main/java/net/minecraft/server/ServerConnection.java @@ -0,0 +0,0 @@ public class ServerConnection { NetworkManager manager = null; while ((manager = pending.poll()) != null) { connectedChannels.add(manager); + manager.isPending = false; } } // Paper end