mirror of
https://github.com/PaperMC/Paper.git
synced 2024-11-06 02:42:14 +01:00
ac554ad46d
Updated Upstream (Bukkit/CraftBukkit) Upstream has released updates that appear to apply and compile correctly. This update has not been tested by PaperMC and as with ANY update, please do your own testing Bukkit Changes: fa99e752 PR-1007: Add ItemMeta#getAsComponentString() 94a91782 Fix copy-pasted BlockType.Typed documentation 9b34ac8c Largely restore deprecated PotionData API 51a6449b PR-1008: Deprecate ITEMS_TOOLS, removed in 1.20.5 702d15fe Fix Javadoc reference 42f6cdf4 PR-919: Add internal ItemType and BlockType, delegate Material methods to them 237bb37b SPIGOT-1166, SPIGOT-7647: Expose Damager BlockState in EntityDamageByBlockEvent 035ea146 SPIGOT-6993: Allow #setVelocity to change the speed of a fireball and add a note to #setDirection about it 8c7880fb PR-1004: Improve field rename handling and centralize conversion between bukkit and string more 87c90e93 SPIGOT-7650: Add DamageSource for EntityDeathEvent and PlayerDeathEvent CraftBukkit Changes: 4af0f22e8 SPIGOT-7664: Item meta should prevail over block states c2ccc46ec SPIGOT-7666: Fix access to llama and horse special slot 124ac66d7 SPIGOT-7665: Fix ThrownPotion#getEffects() implementation only bringing custom effects 66f1f439a Restore null page behaviour of signed books even though not strictly allowed by API 6118e5398 Fix regression listening to minecraft:brand custom payloads c1a26b366 Fix unnecessary and potential not thread-safe chat visibility check 12360a7ec Remove unused imports 147b098b4 PR-1397: Add ItemMeta#getAsComponentString() 428aefe0e Largely restore deprecated PotionData API afe5b5ee9 PR-1275: Add internal ItemType and BlockType, delegate Material methods to them 8afeafa7d SPIGOT-1166, SPIGOT-7647: Expose Damager BlockState in EntityDamageByBlockEvent 4e7d749d4 SPIGOT-6993: Allow #setVelocity to change the speed of a fireball and add a note to #setDirection about it 441880757 Support both entity_data and bucket_entity_data on axolotl/fish buckets 0e22fdd1e Fix custom direct BlockState being not correctly set in DamageSource f2182ed47 SPIGOT-7659: TropicalFishBucketMeta should use BUCKET_ENTITY_DATA 2a6207fe1 PR-1393: Improve field rename handling and centralize conversion between bukkit and string more c024a5039 SPIGOT-7650: Add DamageSource for EntityDeathEvent and PlayerDeathEvent 741b84480 PR-1390: Improve internal handling of damage sources 0364df4e1 SPIGOT-7657: Error when loading angry entities
396 lines
19 KiB
Diff
396 lines
19 KiB
Diff
From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
|
|
From: Aikar <aikar@aikar.co>
|
|
Date: Wed, 6 May 2020 04:53:35 -0400
|
|
Subject: [PATCH] Optimize Network Manager and add advanced packet support
|
|
|
|
Adds ability for 1 packet to bundle other packets to follow it
|
|
Adds ability for a packet to delay sending more packets until a state is ready.
|
|
|
|
Removes synchronization from sending packets
|
|
Removes processing packet queue off of main thread
|
|
- for the few cases where it is allowed, order is not necessary nor
|
|
should it even be happening concurrently in first place (handshaking/login/status)
|
|
|
|
Ensures packets sent asynchronously are dispatched on main thread
|
|
|
|
This helps ensure safety for ProtocolLib as packet listeners
|
|
are commonly accessing world state. This will allow you to schedule
|
|
a packet to be sent async, but itll be dispatched sync for packet
|
|
listeners to process.
|
|
|
|
This should solve some deadlock risks
|
|
|
|
Also adds Netty Channel Flush Consolidation to reduce the amount of flushing
|
|
|
|
Also avoids spamming closed channel exception by rechecking closed state in dispatch
|
|
and then catch exceptions and close if they fire.
|
|
|
|
Part of this commit was authored by: Spottedleaf, sandtechnology
|
|
|
|
diff --git a/src/main/java/net/minecraft/network/Connection.java b/src/main/java/net/minecraft/network/Connection.java
|
|
index f57679d88cd015caa8996d44b486da694df29521..f94458d4270042062b9ae18931cc8179ef1fb420 100644
|
|
--- a/src/main/java/net/minecraft/network/Connection.java
|
|
+++ b/src/main/java/net/minecraft/network/Connection.java
|
|
@@ -93,7 +93,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
private static final ProtocolInfo<ServerHandshakePacketListener> INITIAL_PROTOCOL = HandshakeProtocols.SERVERBOUND;
|
|
private final PacketFlow receiving;
|
|
private volatile boolean sendLoginDisconnect = true;
|
|
- private final Queue<Consumer<Connection>> pendingActions = Queues.newConcurrentLinkedQueue();
|
|
+ private final Queue<WrappedConsumer> pendingActions = Queues.newConcurrentLinkedQueue(); // Paper
|
|
public Channel channel;
|
|
public SocketAddress address;
|
|
// Spigot Start
|
|
@@ -125,6 +125,10 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
public java.net.InetSocketAddress virtualHost;
|
|
private static boolean enableExplicitFlush = Boolean.getBoolean("paper.explicit-flush"); // Paper - Disable explicit network manager flushing
|
|
// Paper end
|
|
+ // Paper start - Optimize network
|
|
+ public boolean isPending = true;
|
|
+ public boolean queueImmunity;
|
|
+ // Paper end - Optimize network
|
|
|
|
// Paper start - add utility methods
|
|
public final net.minecraft.server.level.ServerPlayer getPlayer() {
|
|
@@ -415,15 +419,39 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
}
|
|
|
|
public void send(Packet<?> packet, @Nullable PacketSendListener callbacks, boolean flush) {
|
|
- if (this.isConnected()) {
|
|
- this.flushQueue();
|
|
+ // Paper start - Optimize network: Handle oversized packets better
|
|
+ final boolean connected = this.isConnected();
|
|
+ if (!connected && !this.preparing) {
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ packet.onPacketDispatch(this.getPlayer());
|
|
+ if (connected && (InnerUtil.canSendImmediate(this, packet)
|
|
+ || (io.papermc.paper.util.MCUtil.isMainThread() && packet.isReady() && this.pendingActions.isEmpty()
|
|
+ && (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty())))) {
|
|
this.sendPacket(packet, callbacks, flush);
|
|
} else {
|
|
- this.pendingActions.add((networkmanager) -> {
|
|
- networkmanager.sendPacket(packet, callbacks, flush);
|
|
- });
|
|
- }
|
|
+ // Write the packets to the queue, then flush - antixray hooks there already
|
|
+ final java.util.List<Packet<?>> extraPackets = InnerUtil.buildExtraPackets(packet);
|
|
+ final boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty();
|
|
+ if (!hasExtraPackets) {
|
|
+ this.pendingActions.add(new PacketSendAction(packet, callbacks, flush));
|
|
+ } else {
|
|
+ final java.util.List<PacketSendAction> actions = new java.util.ArrayList<>(1 + extraPackets.size());
|
|
+ actions.add(new PacketSendAction(packet, null, false)); // Delay the future listener until the end of the extra packets
|
|
+
|
|
+ for (int i = 0, len = extraPackets.size(); i < len;) {
|
|
+ final Packet<?> extraPacket = extraPackets.get(i);
|
|
+ final boolean end = ++i == len;
|
|
+ actions.add(new PacketSendAction(extraPacket, end ? callbacks : null, end)); // Append listener to the end
|
|
+ }
|
|
+
|
|
+ this.pendingActions.addAll(actions);
|
|
+ }
|
|
|
|
+ this.flushQueue();
|
|
+ // Paper end - Optimize network
|
|
+ }
|
|
}
|
|
|
|
public void runOnceConnected(Consumer<Connection> task) {
|
|
@@ -431,7 +459,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
this.flushQueue();
|
|
task.accept(this);
|
|
} else {
|
|
- this.pendingActions.add(task);
|
|
+ this.pendingActions.add(new WrappedConsumer(task)); // Paper - Optimize network
|
|
}
|
|
|
|
}
|
|
@@ -449,6 +477,14 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
}
|
|
|
|
private void doSendPacket(Packet<?> packet, @Nullable PacketSendListener callbacks, boolean flush) {
|
|
+ // Paper start - Optimize network
|
|
+ final net.minecraft.server.level.ServerPlayer player = this.getPlayer();
|
|
+ if (!this.isConnected()) {
|
|
+ packet.onPacketDispatchFinish(player, null);
|
|
+ return;
|
|
+ }
|
|
+ try {
|
|
+ // Paper end - Optimize network
|
|
ChannelFuture channelfuture = flush ? this.channel.writeAndFlush(packet) : this.channel.write(packet);
|
|
|
|
if (callbacks != null) {
|
|
@@ -468,14 +504,24 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
});
|
|
}
|
|
|
|
+ // Paper start - Optimize network
|
|
+ if (packet.hasFinishListener()) {
|
|
+ channelfuture.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(player, channelFuture));
|
|
+ }
|
|
channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
|
|
+ } catch (final Exception e) {
|
|
+ LOGGER.error("NetworkException: {}", player, e);
|
|
+ this.disconnect(Component.translatable("disconnect.genericReason", "Internal Exception: " + e.getMessage()));
|
|
+ packet.onPacketDispatchFinish(player, null);
|
|
+ }
|
|
+ // Paper end - Optimize network
|
|
}
|
|
|
|
public void flushChannel() {
|
|
if (this.isConnected()) {
|
|
this.flush();
|
|
} else {
|
|
- this.pendingActions.add(Connection::flush);
|
|
+ this.pendingActions.add(new WrappedConsumer(Connection::flush)); // Paper - Optimize network
|
|
}
|
|
|
|
}
|
|
@@ -491,20 +537,57 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
|
|
}
|
|
|
|
- private void flushQueue() {
|
|
- if (this.channel != null && this.channel.isOpen()) {
|
|
- Queue queue = this.pendingActions;
|
|
-
|
|
+ // Paper start - Optimize network: Rewrite this to be safer if ran off main thread
|
|
+ private boolean flushQueue() {
|
|
+ if (!this.isConnected()) {
|
|
+ return true;
|
|
+ }
|
|
+ if (io.papermc.paper.util.MCUtil.isMainThread()) {
|
|
+ return this.processQueue();
|
|
+ } else if (this.isPending) {
|
|
+ // Should only happen during login/status stages
|
|
synchronized (this.pendingActions) {
|
|
- Consumer consumer;
|
|
+ return this.processQueue();
|
|
+ }
|
|
+ }
|
|
+ return false;
|
|
+ }
|
|
+
|
|
+ private boolean processQueue() {
|
|
+ if (this.pendingActions.isEmpty()) {
|
|
+ return true;
|
|
+ }
|
|
|
|
- while ((consumer = (Consumer) this.pendingActions.poll()) != null) {
|
|
- consumer.accept(this);
|
|
+ // If we are on main, we are safe here in that nothing else should be processing queue off main anymore
|
|
+ // But if we are not on main due to login/status, the parent is synchronized on packetQueue
|
|
+ final java.util.Iterator<WrappedConsumer> iterator = this.pendingActions.iterator();
|
|
+ while (iterator.hasNext()) {
|
|
+ final WrappedConsumer queued = iterator.next(); // poll -> peek
|
|
+
|
|
+ // Fix NPE (Spigot bug caused by handleDisconnection())
|
|
+ if (queued == null) {
|
|
+ return true;
|
|
+ }
|
|
+
|
|
+ if (queued.isConsumed()) {
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ if (queued instanceof PacketSendAction packetSendAction) {
|
|
+ final Packet<?> packet = packetSendAction.packet;
|
|
+ if (!packet.isReady()) {
|
|
+ return false;
|
|
}
|
|
+ }
|
|
|
|
+ iterator.remove();
|
|
+ if (queued.tryMarkConsumed()) {
|
|
+ queued.accept(this);
|
|
}
|
|
}
|
|
+ return true;
|
|
}
|
|
+ // Paper end - Optimize network
|
|
|
|
private static final int MAX_PER_TICK = io.papermc.paper.configuration.GlobalConfiguration.get().misc.maxJoinsPerTick; // Paper - Buffer joins to world
|
|
private static int joinAttemptsThisTick; // Paper - Buffer joins to world
|
|
@@ -564,6 +647,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
public void disconnect(Component disconnectReason) {
|
|
// Spigot Start
|
|
this.preparing = false;
|
|
+ this.clearPacketQueue(); // Paper - Optimize network
|
|
// Spigot End
|
|
if (this.channel == null) {
|
|
this.delayedDisconnect = disconnectReason;
|
|
@@ -751,7 +835,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
public void handleDisconnection() {
|
|
if (this.channel != null && !this.channel.isOpen()) {
|
|
if (this.disconnectionHandled) {
|
|
- Connection.LOGGER.warn("handleDisconnection() called twice");
|
|
+ // Connection.LOGGER.warn("handleDisconnection() called twice"); // Paper - Don't log useless message
|
|
} else {
|
|
this.disconnectionHandled = true;
|
|
PacketListener packetlistener = this.getPacketListener();
|
|
@@ -764,7 +848,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
|
|
packetlistener1.onDisconnect(ichatbasecomponent);
|
|
}
|
|
- this.pendingActions.clear(); // Free up packet queue.
|
|
+ this.clearPacketQueue(); // Paper - Optimize network
|
|
// Paper start - Add PlayerConnectionCloseEvent
|
|
final PacketListener packetListener = this.getPacketListener();
|
|
if (packetListener instanceof net.minecraft.server.network.ServerCommonPacketListenerImpl commonPacketListener) {
|
|
@@ -801,4 +885,93 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
public void setBandwidthLogger(LocalSampleLogger log) {
|
|
this.bandwidthDebugMonitor = new BandwidthDebugMonitor(log);
|
|
}
|
|
+
|
|
+ // Paper start - Optimize network
|
|
+ public void clearPacketQueue() {
|
|
+ final net.minecraft.server.level.ServerPlayer player = getPlayer();
|
|
+ for (final Consumer<Connection> queuedAction : this.pendingActions) {
|
|
+ if (queuedAction instanceof PacketSendAction packetSendAction) {
|
|
+ final Packet<?> packet = packetSendAction.packet;
|
|
+ if (packet.hasFinishListener()) {
|
|
+ packet.onPacketDispatchFinish(player, null);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ this.pendingActions.clear();
|
|
+ }
|
|
+
|
|
+ private static class InnerUtil { // Attempt to hide these methods from ProtocolLib, so it doesn't accidently pick them up.
|
|
+
|
|
+ @Nullable
|
|
+ private static java.util.List<Packet<?>> buildExtraPackets(final Packet<?> packet) {
|
|
+ final java.util.List<Packet<?>> extra = packet.getExtraPackets();
|
|
+ if (extra == null || extra.isEmpty()) {
|
|
+ return null;
|
|
+ }
|
|
+
|
|
+ final java.util.List<Packet<?>> ret = new java.util.ArrayList<>(1 + extra.size());
|
|
+ buildExtraPackets0(extra, ret);
|
|
+ return ret;
|
|
+ }
|
|
+
|
|
+ private static void buildExtraPackets0(final java.util.List<Packet<?>> extraPackets, final java.util.List<Packet<?>> into) {
|
|
+ for (final Packet<?> extra : extraPackets) {
|
|
+ into.add(extra);
|
|
+ final java.util.List<Packet<?>> extraExtra = extra.getExtraPackets();
|
|
+ if (extraExtra != null && !extraExtra.isEmpty()) {
|
|
+ buildExtraPackets0(extraExtra, into);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private static boolean canSendImmediate(final Connection networkManager, final net.minecraft.network.protocol.Packet<?> packet) {
|
|
+ return networkManager.isPending || networkManager.packetListener.protocol() != ConnectionProtocol.PLAY ||
|
|
+ packet instanceof net.minecraft.network.protocol.common.ClientboundKeepAlivePacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerChatPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSystemChatPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundCommandSuggestionsPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitleTextPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetSubtitleTextPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetActionBarTextPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitlesAnimationPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundClearTitlesPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSoundPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSoundEntityPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundStopSoundPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundLevelParticlesPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundBossEventPacket;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private static class WrappedConsumer implements Consumer<Connection> {
|
|
+ private final Consumer<Connection> delegate;
|
|
+ private final java.util.concurrent.atomic.AtomicBoolean consumed = new java.util.concurrent.atomic.AtomicBoolean(false);
|
|
+
|
|
+ private WrappedConsumer(final Consumer<Connection> delegate) {
|
|
+ this.delegate = delegate;
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void accept(final Connection connection) {
|
|
+ this.delegate.accept(connection);
|
|
+ }
|
|
+
|
|
+ public boolean tryMarkConsumed() {
|
|
+ return consumed.compareAndSet(false, true);
|
|
+ }
|
|
+
|
|
+ public boolean isConsumed() {
|
|
+ return consumed.get();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private static final class PacketSendAction extends WrappedConsumer {
|
|
+ private final Packet<?> packet;
|
|
+
|
|
+ private PacketSendAction(final Packet<?> packet, @Nullable final PacketSendListener packetSendListener, final boolean flush) {
|
|
+ super(connection -> connection.sendPacket(packet, packetSendListener, flush));
|
|
+ this.packet = packet;
|
|
+ }
|
|
+ }
|
|
+ // Paper end - Optimize network
|
|
}
|
|
diff --git a/src/main/java/net/minecraft/network/protocol/Packet.java b/src/main/java/net/minecraft/network/protocol/Packet.java
|
|
index 4c776c591dd0a7b36945a6487fdfe86d1187b4af..5ee2ba1225fb7e4f02152b45adeb66f79ed1650d 100644
|
|
--- a/src/main/java/net/minecraft/network/protocol/Packet.java
|
|
+++ b/src/main/java/net/minecraft/network/protocol/Packet.java
|
|
@@ -22,4 +22,31 @@ public interface Packet<T extends PacketListener> {
|
|
static <B extends ByteBuf, T extends Packet<?>> StreamCodec<B, T> codec(StreamMemberEncoder<B, T> encoder, StreamDecoder<B, T> decoder) {
|
|
return StreamCodec.ofMember(encoder, decoder);
|
|
}
|
|
+
|
|
+ // Paper start
|
|
+ /**
|
|
+ * @param player Null if not at PLAY stage yet
|
|
+ */
|
|
+ default void onPacketDispatch(@org.jetbrains.annotations.Nullable net.minecraft.server.level.ServerPlayer player) {
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * @param player Null if not at PLAY stage yet
|
|
+ * @param future Can be null if packet was cancelled
|
|
+ */
|
|
+ default void onPacketDispatchFinish(@org.jetbrains.annotations.Nullable net.minecraft.server.level.ServerPlayer player, @org.jetbrains.annotations.Nullable io.netty.channel.ChannelFuture future) {}
|
|
+
|
|
+ default boolean hasFinishListener() {
|
|
+ return false;
|
|
+ }
|
|
+
|
|
+ default boolean isReady() {
|
|
+ return true;
|
|
+ }
|
|
+
|
|
+ @org.jetbrains.annotations.Nullable
|
|
+ default java.util.List<Packet<?>> getExtraPackets() {
|
|
+ return null;
|
|
+ }
|
|
+ // Paper end
|
|
}
|
|
diff --git a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
|
|
index 96355e1da8feb6687ea0069dda4a82fcd7e25e8a..a08d9aa6e420f691795df9b627a9cd5b5c0112c5 100644
|
|
--- a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
|
|
+++ b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
|
|
@@ -63,10 +63,12 @@ public class ServerConnectionListener {
|
|
final List<Connection> connections = Collections.synchronizedList(Lists.newArrayList());
|
|
// Paper start - prevent blocking on adding a new connection while the server is ticking
|
|
private final java.util.Queue<Connection> pending = new java.util.concurrent.ConcurrentLinkedQueue<>();
|
|
+ private static final boolean disableFlushConsolidation = Boolean.getBoolean("Paper.disableFlushConsolidate"); // Paper - Optimize network
|
|
private final void addPending() {
|
|
Connection connection;
|
|
while ((connection = pending.poll()) != null) {
|
|
connections.add(connection);
|
|
+ connection.isPending = false; // Paper - Optimize network
|
|
}
|
|
}
|
|
// Paper end - prevent blocking on adding a new connection while the server is ticking
|
|
@@ -112,6 +114,7 @@ public class ServerConnectionListener {
|
|
;
|
|
}
|
|
|
|
+ if (!disableFlushConsolidation) channel.pipeline().addFirst(new io.netty.handler.flush.FlushConsolidationHandler()); // Paper - Optimize network
|
|
ChannelPipeline channelpipeline = channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30));
|
|
|
|
if (ServerConnectionListener.this.server.repliesToStatus()) {
|