diff --git a/patches/server/Allow-controlled-flushing-for-network-manager.patch b/patches/server/Allow-controlled-flushing-for-network-manager.patch index c2c2a0c7bc..8a340b3963 100644 --- a/patches/server/Allow-controlled-flushing-for-network-manager.patch +++ b/patches/server/Allow-controlled-flushing-for-network-manager.patch @@ -129,6 +129,8 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000 return true; } +@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler> { + Packet packet = queued.packet; if (!packet.isReady()) { + // Paper start - make only one flush call per sendPacketQueue() call @@ -139,9 +141,10 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000 return false; } else { iterator.remove(); -- this.sendPacket(packet, queued.listener); -+ this.sendPacket(packet, queued.listener, (!iterator.hasNext() && (needsFlush || this.canFlush)) ? Boolean.TRUE : Boolean.FALSE); // Paper - make only one flush call per sendPacketQueue() call -+ hasWrotePacket = true; // Paper - make only one flush call per sendPacketQueue() call + if (queued.tryMarkConsumed()) { // Paper - try to mark isConsumed flag for de-duplicating packet +- this.sendPacket(packet, queued.listener); ++ this.sendPacket(packet, queued.listener, (!iterator.hasNext() && (needsFlush || this.canFlush)) ? Boolean.TRUE : Boolean.FALSE); // Paper - make only one flush call per sendPacketQueue() call ++ hasWrotePacket = true; // Paper - make only one flush call per sendPacketQueue() call + } } } - return true; diff --git a/patches/server/Optimize-Network-Manager-and-add-advanced-packet-sup.patch b/patches/server/Optimize-Network-Manager-and-add-advanced-packet-sup.patch index 5be635d606..f28a869d11 100644 --- a/patches/server/Optimize-Network-Manager-and-add-advanced-packet-sup.patch +++ b/patches/server/Optimize-Network-Manager-and-add-advanced-packet-sup.patch @@ -25,7 +25,7 @@ Also adds Netty Channel Flush Consolidation to reduce the amount of flushing Also avoids spamming closed channel exception by rechecking closed state in dispatch and then catch exceptions and close if they fire. -Part of this commit was authored by: Spottedleaf +Part of this commit was authored by: Spottedleaf, sandtechnology diff --git a/src/main/java/net/minecraft/network/Connection.java b/src/main/java/net/minecraft/network/Connection.java index 0000000000000000000000000000000000000000..0000000000000000000000000000000000000000 100644 @@ -185,10 +185,6 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000 } - private void flushQueue() { -- try { // Paper - add pending task queue -- if (this.channel != null && this.channel.isOpen()) { -- Queue queue = this.queue; -- + // Paper start - rewrite this to be safer if ran off main thread + private boolean flushQueue() { // void -> boolean + if (!isConnected()) { @@ -198,19 +194,16 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000 + return processQueue(); + } else if (isPending) { + // Should only happen during login/status stages - synchronized (this.queue) { -- Connection.PacketHolder networkmanager_queuedpacket; -- -- while ((networkmanager_queuedpacket = (Connection.PacketHolder) this.queue.poll()) != null) { -- this.sendPacket(networkmanager_queuedpacket.packet, networkmanager_queuedpacket.listener); -- } ++ synchronized (this.queue) { + return this.processQueue(); + } + } + return false; + } + private boolean processQueue() { -+ try { // Paper - add pending task queue + try { // Paper - add pending task queue +- if (this.channel != null && this.channel.isOpen()) { +- Queue queue = this.queue; + if (this.queue.isEmpty()) return true; + // If we are on main, we are safe here in that nothing else should be processing queue off main anymore + // But if we are not on main due to login/status, the parent is synchronized on packetQueue @@ -223,12 +216,25 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000 + return true; + } +- synchronized (this.queue) { +- Connection.PacketHolder networkmanager_queuedpacket; ++ // Paper start - checking isConsumed flag and skipping packet sending ++ if (queued.isConsumed()) { ++ continue; ++ } ++ // Paper end - checking isConsumed flag and skipping packet sending + +- while ((networkmanager_queuedpacket = (Connection.PacketHolder) this.queue.poll()) != null) { +- this.sendPacket(networkmanager_queuedpacket.packet, networkmanager_queuedpacket.listener); + Packet packet = queued.packet; + if (!packet.isReady()) { + return false; + } else { + iterator.remove(); -+ this.sendPacket(packet, queued.listener); ++ if (queued.tryMarkConsumed()) { // Paper - try to mark isConsumed flag for de-duplicating packet ++ this.sendPacket(packet, queued.listener); + } +- } } + return true; @@ -284,6 +290,25 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000 // Paper start - Add PlayerConnectionCloseEvent final PacketListener packetListener = this.getPacketListener(); if (packetListener instanceof net.minecraft.server.network.ServerGamePacketListenerImpl) { +@@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler> { + @Nullable + final PacketSendListener listener; + ++ // Paper start - isConsumed flag for the connection ++ private java.util.concurrent.atomic.AtomicBoolean isConsumed = new java.util.concurrent.atomic.AtomicBoolean(false); ++ ++ public boolean tryMarkConsumed() { ++ return isConsumed.compareAndSet(false, true); ++ } ++ ++ public boolean isConsumed() { ++ return isConsumed.get(); ++ } ++ // Paper end - isConsumed flag for the connection ++ + public PacketHolder(Packet packet, @Nullable PacketSendListener callbacks) { + this.packet = packet; + this.listener = callbacks; diff --git a/src/main/java/net/minecraft/network/protocol/Packet.java b/src/main/java/net/minecraft/network/protocol/Packet.java index 0000000000000000000000000000000000000000..0000000000000000000000000000000000000000 100644 --- a/src/main/java/net/minecraft/network/protocol/Packet.java