Paper/Spigot-Server-Patches/0346-Optimize-Network-Manager-and-add-advanced-packet-sup.patch
Aikar 897dd2c840
Foundational work for Future Memory usage improvements
This commit doesn't do much on its own, but adds a new Java Cleaner API
that lets us hook into Garbage Collector events to reclaim pooled objects and
return them to the pool.

Adds framework for Network Packets to know when a packet has finished dispatching
to get an idea when a packet is done sending to players.

Rewrites PooledObjects impl to properly respect max pool size and remove
almost all risk of contention.

Bumps the Paper Async Task Queue to use 2 threads, and properly shuts it down on shutdown.
2020-05-16 21:38:19 -04:00

305 lines
15 KiB
Diff

From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
From: Aikar <aikar@aikar.co>
Date: Wed, 6 May 2020 04:53:35 -0400
Subject: [PATCH] Optimize Network Manager and add advanced packet support
Adds ability for 1 packet to bundle other packets to follow it
Adds ability for a packet to delay sending more packets until a state is ready.
Removes synchronization from sending packets
Removes processing packet queue off of main thread
- for the few cases where it is allowed, order is not necessary nor
should it even be happening concurrently in first place (handshaking/login/status)
Ensures packets sent asynchronously are dispatched on main thread
This helps ensure safety for ProtocolLib as packet listeners
are commonly accessing world state. This will allow you to schedule
a packet to be sent async, but itll be dispatched sync for packet
listeners to process.
This should solve some deadlock risks
Part of this commit was authored by: Spottedleaf
diff --git a/src/main/java/net/minecraft/server/NetworkManager.java b/src/main/java/net/minecraft/server/NetworkManager.java
index b1dededc15cce686ead74a99bee64c89ac1de22c..94c25c542dd18396fa792af944794bdb2436d2fd 100644
--- a/src/main/java/net/minecraft/server/NetworkManager.java
+++ b/src/main/java/net/minecraft/server/NetworkManager.java
@@ -64,6 +64,10 @@ public class NetworkManager extends SimpleChannelInboundHandler<Packet<?>> {
public int protocolVersion;
public java.net.InetSocketAddress virtualHost;
private static boolean enableExplicitFlush = Boolean.getBoolean("paper.explicit-flush");
+ // Optimize network
+ boolean isPending = true;
+ boolean queueImmunity = false;
+ EnumProtocol protocol;
// Paper end
public NetworkManager(EnumProtocolDirection enumprotocoldirection) {
@@ -87,6 +91,7 @@ public class NetworkManager extends SimpleChannelInboundHandler<Packet<?>> {
}
public void setProtocol(EnumProtocol enumprotocol) {
+ protocol = enumprotocol; // Paper
this.channel.attr(NetworkManager.c).set(enumprotocol);
this.channel.config().setAutoRead(true);
NetworkManager.LOGGER.debug("Enabled auto read");
@@ -158,19 +163,82 @@ public class NetworkManager extends SimpleChannelInboundHandler<Packet<?>> {
NetworkManager.LOGGER.debug("Set listener of {} to {}", this, packetlistener);
this.packetListener = packetlistener;
}
+ // Paper start
+ private EntityPlayer getPlayer() {
+ if (packetListener instanceof PlayerConnection) {
+ return ((PlayerConnection) packetListener).player;
+ } else {
+ return null;
+ }
+ }
+ private static class InnerUtil { // Attempt to hide these methods from ProtocolLib so it doesn't accidently pick them up.
+ private static java.util.List<Packet> buildExtraPackets(Packet packet) {
+ java.util.List<Packet> extra = packet.getExtraPackets();
+ if (extra == null || extra.isEmpty()) {
+ return null;
+ }
+ java.util.List<Packet> ret = new java.util.ArrayList<>(1 + extra.size());
+ buildExtraPackets0(extra, ret);
+ return ret;
+ }
+
+ private static void buildExtraPackets0(java.util.List<Packet> extraPackets, java.util.List<Packet> into) {
+ for (Packet extra : extraPackets) {
+ into.add(extra);
+ java.util.List<Packet> extraExtra = extra.getExtraPackets();
+ if (extraExtra != null && !extraExtra.isEmpty()) {
+ buildExtraPackets0(extraExtra, into);
+ }
+ }
+ }
+ // Paper start
+ private static boolean canSendImmediate(NetworkManager networkManager, Packet<?> packet) {
+ return networkManager.isPending || networkManager.protocol != EnumProtocol.PLAY || networkManager.queueImmunity ||
+ packet instanceof PacketPlayOutKeepAlive ||
+ packet instanceof PacketPlayOutChat ||
+ packet instanceof PacketPlayOutTabComplete;
+ }
+ // Paper end
+ }
+ // Paper end
public void sendPacket(Packet<?> packet) {
this.sendPacket(packet, (GenericFutureListener) null);
}
public void sendPacket(Packet<?> packet, @Nullable GenericFutureListener<? extends Future<? super Void>> genericfuturelistener) {
- if (this.isConnected()) {
- this.o();
- this.b(packet, genericfuturelistener);
- } else {
- this.packetQueue.add(new NetworkManager.QueuedPacket(packet, genericfuturelistener));
+ // Paper start - handle oversized packets better
+ boolean connected = this.isConnected();
+ if (!connected && !preparing) {
+ return; // Do nothing
+ }
+ packet.onPacketDispatch(getPlayer());
+ if (connected && (InnerUtil.canSendImmediate(this, packet) || (
+ MCUtil.isMainThread() && packet.isReady() && this.packetQueue.isEmpty() &&
+ (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty())
+ ))) {
+ this.dispatchPacket(packet, genericfuturelistener);
+ return;
}
+ // write the packets to the queue, then flush - antixray hooks there already
+ java.util.List<Packet> extraPackets = InnerUtil.buildExtraPackets(packet);
+ boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty();
+ if (!hasExtraPackets) {
+ this.packetQueue.add(new NetworkManager.QueuedPacket(packet, genericfuturelistener));
+ } else {
+ java.util.List<NetworkManager.QueuedPacket> packets = new java.util.ArrayList<>(1 + extraPackets.size());
+ packets.add(new NetworkManager.QueuedPacket(packet, null)); // delay the future listener until the end of the extra packets
+ for (int i = 0, len = extraPackets.size(); i < len;) {
+ Packet extra = extraPackets.get(i);
+ boolean end = ++i == len;
+ packets.add(new NetworkManager.QueuedPacket(extra, end ? genericfuturelistener : null)); // append listener to the end
+ }
+
+ this.packetQueue.addAll(packets); // atomic
+ }
+ this.sendPacketQueue();
+ // Paper end
}
private void dispatchPacket(Packet<?> packet, @Nullable GenericFutureListener<? extends Future<? super Void>> genericFutureListener) { this.b(packet, genericFutureListener); } // Paper - OBFHELPER
@@ -194,6 +262,11 @@ public class NetworkManager extends SimpleChannelInboundHandler<Packet<?>> {
if (genericfuturelistener != null) {
channelfuture.addListener(genericfuturelistener);
}
+ // Paper start
+ if (packet.hasFinishListener()) {
+ channelfuture.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(getPlayer(), channelFuture));
+ }
+ // Paper end
channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
} else {
@@ -207,6 +280,11 @@ public class NetworkManager extends SimpleChannelInboundHandler<Packet<?>> {
if (genericfuturelistener != null) {
channelfuture1.addListener(genericfuturelistener);
}
+ // Paper start
+ if (packet.hasFinishListener()) {
+ channelfuture1.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(getPlayer(), channelFuture));
+ }
+ // Paper end
channelfuture1.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
});
@@ -214,21 +292,46 @@ public class NetworkManager extends SimpleChannelInboundHandler<Packet<?>> {
}
- private void sendPacketQueue() { this.o(); } // Paper - OBFHELPER
- private void o() {
- if (this.channel != null && this.channel.isOpen()) {
- Queue queue = this.packetQueue;
-
+ // Paper start - rewrite this to be safer on
+ private boolean sendPacketQueue() { return this.o(); } // OBFHELPER // void -> boolean
+ private boolean o() { // void -> boolean
+ if (!isConnected()) {
+ return true;
+ }
+ if (MCUtil.isMainThread()) {
+ return processQueue();
+ } else if (isPending) {
+ // Should only happen during login/status stages
synchronized (this.packetQueue) {
- NetworkManager.QueuedPacket networkmanager_queuedpacket;
-
- while ((networkmanager_queuedpacket = (NetworkManager.QueuedPacket) this.packetQueue.poll()) != null) {
- this.b(networkmanager_queuedpacket.a, networkmanager_queuedpacket.b);
- }
+ return this.processQueue();
+ }
+ }
+ return false;
+ }
+ private boolean processQueue() {
+ if (this.packetQueue.isEmpty()) return true;
+ // If we are on main, we are safe here in that nothing else should be processing queue off main anymore
+ // But if we are not on main due to login/status, the parent is synchronized on packetQueue
+ java.util.Iterator<QueuedPacket> iterator = this.packetQueue.iterator();
+ while (iterator.hasNext()) {
+ NetworkManager.QueuedPacket queued = iterator.next(); // poll -> peek
+
+ // Fix NPE (Spigot bug caused by handleDisconnection())
+ if (queued == null) {
+ return true;
+ }
+ Packet<?> packet = queued.getPacket();
+ if (!packet.isReady()) {
+ return false;
+ } else {
+ iterator.remove();
+ this.dispatchPacket(packet, queued.getGenericFutureListener());
}
}
+ return true;
}
+ // Paper end
public void a() {
this.o();
@@ -257,9 +360,21 @@ public class NetworkManager extends SimpleChannelInboundHandler<Packet<?>> {
return this.socketAddress;
}
+ // Paper start
+ public void clearPacketQueue() {
+ EntityPlayer player = getPlayer();
+ packetQueue.forEach(queuedPacket -> {
+ Packet<?> packet = queuedPacket.getPacket();
+ if (packet.hasFinishListener()) {
+ packet.onPacketDispatchFinish(player, null);
+ }
+ });
+ packetQueue.clear();
+ } // Paper end
public void close(IChatBaseComponent ichatbasecomponent) {
// Spigot Start
this.preparing = false;
+ clearPacketQueue(); // Paper
// Spigot End
if (this.channel.isOpen()) {
this.channel.close(); // We can't wait as this may be called from an event loop.
@@ -335,7 +450,7 @@ public class NetworkManager extends SimpleChannelInboundHandler<Packet<?>> {
} else if (this.i() != null) {
this.i().a(new ChatMessage("multiplayer.disconnect.generic", new Object[0]));
}
- this.packetQueue.clear(); // Free up packet queue.
+ clearPacketQueue(); // Paper
// Paper start - Add PlayerConnectionCloseEvent
final PacketListener packetListener = this.i();
if (packetListener instanceof PlayerConnection) {
diff --git a/src/main/java/net/minecraft/server/Packet.java b/src/main/java/net/minecraft/server/Packet.java
index 2d8e6a2f4a0c3c5d74a647d7164b0028781d3bf5..ffc9a1f7d58d67611c4ab46462ac13a921042313 100644
--- a/src/main/java/net/minecraft/server/Packet.java
+++ b/src/main/java/net/minecraft/server/Packet.java
@@ -11,6 +11,20 @@ public interface Packet<T extends PacketListener> {
void a(T t0);
// Paper start
+
+ /**
+ * @param player Null if not at PLAY stage yet
+ */
+ default void onPacketDispatch(@javax.annotation.Nullable EntityPlayer player) {}
+
+ /**
+ * @param player Null if not at PLAY stage yet
+ * @param future Can be null if packet was cancelled
+ */
+ default void onPacketDispatchFinish(@javax.annotation.Nullable EntityPlayer player, @javax.annotation.Nullable io.netty.channel.ChannelFuture future) {}
+ default boolean hasFinishListener() { return false; }
+ default boolean isReady() { return true; }
+ default java.util.List<Packet> getExtraPackets() { return null; }
default boolean packetTooLarge(NetworkManager manager) {
return false;
}
diff --git a/src/main/java/net/minecraft/server/PlayerList.java b/src/main/java/net/minecraft/server/PlayerList.java
index 5136905b71085445eb6bac00e9200af8cc7fbe27..14c82861158eed8c91336590fb71c69d185d22f8 100644
--- a/src/main/java/net/minecraft/server/PlayerList.java
+++ b/src/main/java/net/minecraft/server/PlayerList.java
@@ -143,6 +143,7 @@ public abstract class PlayerList {
// CraftBukkit - getType()
// Spigot - view distance
+ networkmanager.queueImmunity = true; // Paper
playerconnection.sendPacket(new PacketPlayOutLogin(entityplayer.getId(), entityplayer.playerInteractManager.getGameMode(), WorldData.c(worlddata.getSeed()), worlddata.isHardcore(), worldserver.worldProvider.getDimensionManager().getType(), this.getMaxPlayers(), worlddata.getType(), worldserver.spigotConfig.viewDistance, flag1, !flag));
entityplayer.getBukkitEntity().sendSupportedChannels(); // CraftBukkit
playerconnection.sendPacket(new PacketPlayOutCustomPayload(PacketPlayOutCustomPayload.a, (new PacketDataSerializer(Unpooled.buffer())).a(this.getServer().getServerModName())));
@@ -152,6 +153,7 @@ public abstract class PlayerList {
playerconnection.sendPacket(new PacketPlayOutRecipeUpdate(this.server.getCraftingManager().b()));
playerconnection.sendPacket(new PacketPlayOutTags(this.server.getTagRegistry()));
playerconnection.sendPacket(new PacketPlayOutEntityStatus(entityplayer, (byte) (worldserver.getGameRules().getBoolean(GameRules.REDUCED_DEBUG_INFO) ? 22 : 23))); // Paper - fix this rule not being initialized on the client
+ networkmanager.queueImmunity = false; // Paper
this.d(entityplayer);
entityplayer.getStatisticManager().c();
entityplayer.B().a(entityplayer);
diff --git a/src/main/java/net/minecraft/server/ServerConnection.java b/src/main/java/net/minecraft/server/ServerConnection.java
index 37a22ba6f7a2ac54759428d23d5ea9787bb557f7..06cd29bb9a5d6b67f896c129662c3f493238c758 100644
--- a/src/main/java/net/minecraft/server/ServerConnection.java
+++ b/src/main/java/net/minecraft/server/ServerConnection.java
@@ -45,6 +45,7 @@ public class ServerConnection {
NetworkManager manager = null;
while ((manager = pending.poll()) != null) {
connectedChannels.add(manager);
+ manager.isPending = false;
}
}
// Paper end