mirror of
https://github.com/PaperMC/Paper.git
synced 2025-01-13 03:40:57 +01:00
392 lines
19 KiB
Diff
392 lines
19 KiB
Diff
From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
|
|
From: Aikar <aikar@aikar.co>
|
|
Date: Wed, 6 May 2020 04:53:35 -0400
|
|
Subject: [PATCH] Optimize Network Manager and add advanced packet support
|
|
|
|
Adds ability for 1 packet to bundle other packets to follow it
|
|
Adds ability for a packet to delay sending more packets until a state is ready.
|
|
|
|
Removes synchronization from sending packets
|
|
Removes processing packet queue off of main thread
|
|
- for the few cases where it is allowed, order is not necessary nor
|
|
should it even be happening concurrently in first place (handshaking/login/status)
|
|
|
|
Ensures packets sent asynchronously are dispatched on main thread
|
|
|
|
This helps ensure safety for ProtocolLib as packet listeners
|
|
are commonly accessing world state. This will allow you to schedule
|
|
a packet to be sent async, but itll be dispatched sync for packet
|
|
listeners to process.
|
|
|
|
This should solve some deadlock risks
|
|
|
|
Also adds Netty Channel Flush Consolidation to reduce the amount of flushing
|
|
|
|
Also avoids spamming closed channel exception by rechecking closed state in dispatch
|
|
and then catch exceptions and close if they fire.
|
|
|
|
Part of this commit was authored by: Spottedleaf, sandtechnology
|
|
|
|
diff --git a/net/minecraft/network/Connection.java b/net/minecraft/network/Connection.java
|
|
index 42f44c7cb0bd55ddfacd18acb0e596e7a953870e..ad8f8428b75e37097487cdfbd0db2421ee4cbe37 100644
|
|
--- a/net/minecraft/network/Connection.java
|
|
+++ b/net/minecraft/network/Connection.java
|
|
@@ -85,7 +85,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
private static final ProtocolInfo<ServerHandshakePacketListener> INITIAL_PROTOCOL = HandshakeProtocols.SERVERBOUND;
|
|
private final PacketFlow receiving;
|
|
private volatile boolean sendLoginDisconnect = true;
|
|
- private final Queue<Consumer<Connection>> pendingActions = Queues.newConcurrentLinkedQueue();
|
|
+ private final Queue<WrappedConsumer> pendingActions = Queues.newConcurrentLinkedQueue(); // Paper - Optimize network
|
|
public Channel channel;
|
|
public SocketAddress address;
|
|
// Spigot start
|
|
@@ -145,6 +145,10 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
}
|
|
// Paper end - packet limiter
|
|
@Nullable public SocketAddress haProxyAddress; // Paper - Add API to get player's proxy address
|
|
+ // Paper start - Optimize network
|
|
+ public boolean isPending = true;
|
|
+ public boolean queueImmunity;
|
|
+ // Paper end - Optimize network
|
|
|
|
public Connection(PacketFlow receiving) {
|
|
this.receiving = receiving;
|
|
@@ -423,11 +427,38 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
}
|
|
|
|
public void send(Packet<?> packet, @Nullable PacketSendListener listener, boolean flush) {
|
|
- if (this.isConnected()) {
|
|
- this.flushQueue();
|
|
+ // Paper start - Optimize network: Handle oversized packets better
|
|
+ final boolean connected = this.isConnected();
|
|
+ if (!connected && !this.preparing) {
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ packet.onPacketDispatch(this.getPlayer());
|
|
+ if (connected && (InnerUtil.canSendImmediate(this, packet)
|
|
+ || (io.papermc.paper.util.MCUtil.isMainThread() && packet.isReady() && this.pendingActions.isEmpty()
|
|
+ && (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty())))) {
|
|
this.sendPacket(packet, listener, flush);
|
|
} else {
|
|
- this.pendingActions.add(connection -> connection.sendPacket(packet, listener, flush));
|
|
+ // Write the packets to the queue, then flush - antixray hooks there already
|
|
+ final java.util.List<Packet<?>> extraPackets = InnerUtil.buildExtraPackets(packet);
|
|
+ final boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty();
|
|
+ if (!hasExtraPackets) {
|
|
+ this.pendingActions.add(new PacketSendAction(packet, listener, flush));
|
|
+ } else {
|
|
+ final java.util.List<PacketSendAction> actions = new java.util.ArrayList<>(1 + extraPackets.size());
|
|
+ actions.add(new PacketSendAction(packet, null, false)); // Delay the future listener until the end of the extra packets
|
|
+
|
|
+ for (int i = 0, len = extraPackets.size(); i < len;) {
|
|
+ final Packet<?> extraPacket = extraPackets.get(i);
|
|
+ final boolean end = ++i == len;
|
|
+ actions.add(new PacketSendAction(extraPacket, end ? listener : null, end)); // Append listener to the end
|
|
+ }
|
|
+
|
|
+ this.pendingActions.addAll(actions);
|
|
+ }
|
|
+
|
|
+ this.flushQueue();
|
|
+ // Paper end - Optimize network
|
|
}
|
|
}
|
|
|
|
@@ -436,7 +467,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
this.flushQueue();
|
|
action.accept(this);
|
|
} else {
|
|
- this.pendingActions.add(action);
|
|
+ this.pendingActions.add(new WrappedConsumer(action)); // Paper - Optimize network
|
|
}
|
|
}
|
|
|
|
@@ -450,6 +481,14 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
}
|
|
|
|
private void doSendPacket(Packet<?> packet, @Nullable PacketSendListener sendListener, boolean flush) {
|
|
+ // Paper start - Optimize network
|
|
+ final net.minecraft.server.level.ServerPlayer player = this.getPlayer();
|
|
+ if (!this.isConnected()) {
|
|
+ packet.onPacketDispatchFinish(player, null);
|
|
+ return;
|
|
+ }
|
|
+ try {
|
|
+ // Paper end - Optimize network
|
|
ChannelFuture channelFuture = flush ? this.channel.writeAndFlush(packet) : this.channel.write(packet);
|
|
if (sendListener != null) {
|
|
channelFuture.addListener(future -> {
|
|
@@ -465,14 +504,24 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
});
|
|
}
|
|
|
|
+ // Paper start - Optimize network
|
|
+ if (packet.hasFinishListener()) {
|
|
+ channelFuture.addListener((ChannelFutureListener) future -> packet.onPacketDispatchFinish(player, future));
|
|
+ }
|
|
channelFuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
|
|
+ } catch (final Exception e) {
|
|
+ LOGGER.error("NetworkException: {}", player, e);
|
|
+ this.disconnect(Component.translatable("disconnect.genericReason", "Internal Exception: " + e.getMessage()));
|
|
+ packet.onPacketDispatchFinish(player, null);
|
|
+ }
|
|
+ // Paper end - Optimize network
|
|
}
|
|
|
|
public void flushChannel() {
|
|
if (this.isConnected()) {
|
|
this.flush();
|
|
} else {
|
|
- this.pendingActions.add(Connection::flush);
|
|
+ this.pendingActions.add(new WrappedConsumer(Connection::flush)); // Paper - Optimize network
|
|
}
|
|
}
|
|
|
|
@@ -484,16 +533,57 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
}
|
|
}
|
|
|
|
- private void flushQueue() {
|
|
- if (this.channel != null && this.channel.isOpen()) {
|
|
+ // Paper start - Optimize network: Rewrite this to be safer if ran off main thread
|
|
+ private boolean flushQueue() {
|
|
+ if (!this.isConnected()) {
|
|
+ return true;
|
|
+ }
|
|
+ if (io.papermc.paper.util.MCUtil.isMainThread()) {
|
|
+ return this.processQueue();
|
|
+ } else if (this.isPending) {
|
|
+ // Should only happen during login/status stages
|
|
synchronized (this.pendingActions) {
|
|
- Consumer<Connection> consumer;
|
|
- while ((consumer = this.pendingActions.poll()) != null) {
|
|
- consumer.accept(this);
|
|
+ return this.processQueue();
|
|
+ }
|
|
+ }
|
|
+ return false;
|
|
+ }
|
|
+
|
|
+ private boolean processQueue() {
|
|
+ if (this.pendingActions.isEmpty()) {
|
|
+ return true;
|
|
+ }
|
|
+
|
|
+ // If we are on main, we are safe here in that nothing else should be processing queue off main anymore
|
|
+ // But if we are not on main due to login/status, the parent is synchronized on packetQueue
|
|
+ final java.util.Iterator<WrappedConsumer> iterator = this.pendingActions.iterator();
|
|
+ while (iterator.hasNext()) {
|
|
+ final WrappedConsumer queued = iterator.next(); // poll -> peek
|
|
+
|
|
+ // Fix NPE (Spigot bug caused by handleDisconnection())
|
|
+ if (queued == null) {
|
|
+ return true;
|
|
+ }
|
|
+
|
|
+ if (queued.isConsumed()) {
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ if (queued instanceof PacketSendAction packetSendAction) {
|
|
+ final Packet<?> packet = packetSendAction.packet;
|
|
+ if (!packet.isReady()) {
|
|
+ return false;
|
|
}
|
|
}
|
|
+
|
|
+ iterator.remove();
|
|
+ if (queued.tryMarkConsumed()) {
|
|
+ queued.accept(this);
|
|
+ }
|
|
}
|
|
+ return true;
|
|
}
|
|
+ // Paper end - Optimize network
|
|
|
|
private static final int MAX_PER_TICK = io.papermc.paper.configuration.GlobalConfiguration.get().misc.maxJoinsPerTick; // Paper - Buffer joins to world
|
|
private static int joinAttemptsThisTick; // Paper - Buffer joins to world
|
|
@@ -563,6 +653,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
|
|
public void disconnect(DisconnectionDetails disconnectionDetails) {
|
|
this.preparing = false; // Spigot
|
|
+ this.clearPacketQueue(); // Paper - Optimize network
|
|
if (this.channel == null) {
|
|
this.delayedDisconnect = disconnectionDetails;
|
|
}
|
|
@@ -751,7 +842,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
public void handleDisconnection() {
|
|
if (this.channel != null && !this.channel.isOpen()) {
|
|
if (this.disconnectionHandled) {
|
|
- LOGGER.warn("handleDisconnection() called twice");
|
|
+ // LOGGER.warn("handleDisconnection() called twice"); // Paper - Don't log useless message
|
|
} else {
|
|
this.disconnectionHandled = true;
|
|
PacketListener packetListener = this.getPacketListener();
|
|
@@ -762,7 +853,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
);
|
|
packetListener1.onDisconnect(disconnectionDetails);
|
|
}
|
|
- this.pendingActions.clear(); // Free up packet queue.
|
|
+ this.clearPacketQueue(); // Paper - Optimize network
|
|
// Paper start - Add PlayerConnectionCloseEvent
|
|
if (packetListener instanceof net.minecraft.server.network.ServerCommonPacketListenerImpl commonPacketListener) {
|
|
/* Player was logged in, either game listener or configuration listener */
|
|
@@ -797,4 +888,93 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
public void setBandwidthLogger(LocalSampleLogger bandwithLogger) {
|
|
this.bandwidthDebugMonitor = new BandwidthDebugMonitor(bandwithLogger);
|
|
}
|
|
+
|
|
+ // Paper start - Optimize network
|
|
+ public void clearPacketQueue() {
|
|
+ final net.minecraft.server.level.ServerPlayer player = getPlayer();
|
|
+ for (final Consumer<Connection> queuedAction : this.pendingActions) {
|
|
+ if (queuedAction instanceof PacketSendAction packetSendAction) {
|
|
+ final Packet<?> packet = packetSendAction.packet;
|
|
+ if (packet.hasFinishListener()) {
|
|
+ packet.onPacketDispatchFinish(player, null);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ this.pendingActions.clear();
|
|
+ }
|
|
+
|
|
+ private static class InnerUtil { // Attempt to hide these methods from ProtocolLib, so it doesn't accidently pick them up.
|
|
+
|
|
+ @Nullable
|
|
+ private static java.util.List<Packet<?>> buildExtraPackets(final Packet<?> packet) {
|
|
+ final java.util.List<Packet<?>> extra = packet.getExtraPackets();
|
|
+ if (extra == null || extra.isEmpty()) {
|
|
+ return null;
|
|
+ }
|
|
+
|
|
+ final java.util.List<Packet<?>> ret = new java.util.ArrayList<>(1 + extra.size());
|
|
+ buildExtraPackets0(extra, ret);
|
|
+ return ret;
|
|
+ }
|
|
+
|
|
+ private static void buildExtraPackets0(final java.util.List<Packet<?>> extraPackets, final java.util.List<Packet<?>> into) {
|
|
+ for (final Packet<?> extra : extraPackets) {
|
|
+ into.add(extra);
|
|
+ final java.util.List<Packet<?>> extraExtra = extra.getExtraPackets();
|
|
+ if (extraExtra != null && !extraExtra.isEmpty()) {
|
|
+ buildExtraPackets0(extraExtra, into);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private static boolean canSendImmediate(final Connection networkManager, final net.minecraft.network.protocol.Packet<?> packet) {
|
|
+ return networkManager.isPending || networkManager.packetListener.protocol() != ConnectionProtocol.PLAY ||
|
|
+ packet instanceof net.minecraft.network.protocol.common.ClientboundKeepAlivePacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerChatPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSystemChatPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundCommandSuggestionsPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitleTextPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetSubtitleTextPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetActionBarTextPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitlesAnimationPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundClearTitlesPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSoundPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSoundEntityPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundStopSoundPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundLevelParticlesPacket ||
|
|
+ packet instanceof net.minecraft.network.protocol.game.ClientboundBossEventPacket;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private static class WrappedConsumer implements Consumer<Connection> {
|
|
+ private final Consumer<Connection> delegate;
|
|
+ private final java.util.concurrent.atomic.AtomicBoolean consumed = new java.util.concurrent.atomic.AtomicBoolean(false);
|
|
+
|
|
+ private WrappedConsumer(final Consumer<Connection> delegate) {
|
|
+ this.delegate = delegate;
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void accept(final Connection connection) {
|
|
+ this.delegate.accept(connection);
|
|
+ }
|
|
+
|
|
+ public boolean tryMarkConsumed() {
|
|
+ return consumed.compareAndSet(false, true);
|
|
+ }
|
|
+
|
|
+ public boolean isConsumed() {
|
|
+ return consumed.get();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private static final class PacketSendAction extends WrappedConsumer {
|
|
+ private final Packet<?> packet;
|
|
+
|
|
+ private PacketSendAction(final Packet<?> packet, @Nullable final PacketSendListener packetSendListener, final boolean flush) {
|
|
+ super(connection -> connection.sendPacket(packet, packetSendListener, flush));
|
|
+ this.packet = packet;
|
|
+ }
|
|
+ }
|
|
+ // Paper end - Optimize network
|
|
}
|
|
diff --git a/net/minecraft/network/protocol/Packet.java b/net/minecraft/network/protocol/Packet.java
|
|
index 65ff8b9112ec76eeac48c679044fc02ae7d4ffeb..e4789584cbe43959681a8522c66eab58369bebd0 100644
|
|
--- a/net/minecraft/network/protocol/Packet.java
|
|
+++ b/net/minecraft/network/protocol/Packet.java
|
|
@@ -35,4 +35,32 @@ public interface Packet<T extends PacketListener> {
|
|
static <B extends ByteBuf, T extends Packet<?>> StreamCodec<B, T> codec(StreamMemberEncoder<B, T> encoder, StreamDecoder<B, T> decoder) {
|
|
return StreamCodec.ofMember(encoder, decoder);
|
|
}
|
|
+
|
|
+ // Paper start
|
|
+ /**
|
|
+ * @param player Null if not at PLAY stage yet
|
|
+ */
|
|
+ default void onPacketDispatch(@org.jetbrains.annotations.Nullable net.minecraft.server.level.ServerPlayer player) {
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * @param player Null if not at PLAY stage yet
|
|
+ * @param future Can be null if packet was cancelled
|
|
+ */
|
|
+ default void onPacketDispatchFinish(@org.jetbrains.annotations.Nullable net.minecraft.server.level.ServerPlayer player, @org.jetbrains.annotations.Nullable io.netty.channel.ChannelFuture future) {
|
|
+ }
|
|
+
|
|
+ default boolean hasFinishListener() {
|
|
+ return false;
|
|
+ }
|
|
+
|
|
+ default boolean isReady() {
|
|
+ return true;
|
|
+ }
|
|
+
|
|
+ @org.jetbrains.annotations.Nullable
|
|
+ default java.util.List<Packet<?>> getExtraPackets() {
|
|
+ return null;
|
|
+ }
|
|
+ // Paper end
|
|
}
|
|
diff --git a/net/minecraft/server/network/ServerConnectionListener.java b/net/minecraft/server/network/ServerConnectionListener.java
|
|
index b873af7183d9b1aabc87e63d254a4326f17b21c9..7de11ba404f0b60e7f7b7c16954811a343688219 100644
|
|
--- a/net/minecraft/server/network/ServerConnectionListener.java
|
|
+++ b/net/minecraft/server/network/ServerConnectionListener.java
|
|
@@ -66,11 +66,13 @@ public class ServerConnectionListener {
|
|
|
|
// Paper start - prevent blocking on adding a new connection while the server is ticking
|
|
private final java.util.Queue<Connection> pending = new java.util.concurrent.ConcurrentLinkedQueue<>();
|
|
+ private static final boolean disableFlushConsolidation = Boolean.getBoolean("Paper.disableFlushConsolidate"); // Paper - Optimize network
|
|
|
|
private final void addPending() {
|
|
Connection connection;
|
|
while ((connection = this.pending.poll()) != null) {
|
|
this.connections.add(connection);
|
|
+ connection.isPending = false; // Paper - Optimize network
|
|
}
|
|
}
|
|
// Paper end - prevent blocking on adding a new connection while the server is ticking
|
|
@@ -120,6 +122,7 @@ public class ServerConnectionListener {
|
|
} catch (ChannelException var5) {
|
|
}
|
|
|
|
+ if (!disableFlushConsolidation) channel.pipeline().addFirst(new io.netty.handler.flush.FlushConsolidationHandler()); // Paper - Optimize network
|
|
ChannelPipeline channelPipeline = channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30));
|
|
if (ServerConnectionListener.this.server.repliesToStatus()) {
|
|
channelPipeline.addLast("legacy_query", new LegacyQueryHandler(ServerConnectionListener.this.getServer()));
|