diff --git a/src/main/java/com/comphenix/protocol/injector/netty/channel/InboundPacketInterceptor.java b/src/main/java/com/comphenix/protocol/injector/netty/channel/InboundPacketInterceptor.java index b5474fd3..3660ed41 100644 --- a/src/main/java/com/comphenix/protocol/injector/netty/channel/InboundPacketInterceptor.java +++ b/src/main/java/com/comphenix/protocol/injector/netty/channel/InboundPacketInterceptor.java @@ -1,8 +1,5 @@ package com.comphenix.protocol.injector.netty.channel; -import com.comphenix.protocol.events.NetworkMarker; -import com.comphenix.protocol.events.PacketEvent; -import com.comphenix.protocol.injector.NetworkProcessor; import com.comphenix.protocol.injector.netty.ChannelListener; import com.comphenix.protocol.utility.MinecraftReflection; import io.netty.channel.ChannelHandlerContext; @@ -12,47 +9,36 @@ final class InboundPacketInterceptor extends ChannelInboundHandlerAdapter { private final NettyChannelInjector injector; private final ChannelListener channelListener; - private final NetworkProcessor networkProcessor; - public InboundPacketInterceptor(NettyChannelInjector injector, ChannelListener listener, NetworkProcessor processor) { + public InboundPacketInterceptor(NettyChannelInjector injector, ChannelListener listener) { this.injector = injector; this.channelListener = listener; - this.networkProcessor = processor; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { - if (this.shouldInterceptMessage(msg)) { + Class messageClass = msg.getClass(); + if (this.shouldInterceptMessage(messageClass)) { // process the login if the packet is one before posting the packet to any handler to provide "real" data // the method invocation will do nothing if the packet is not a login packet this.injector.tryProcessLogin(msg); - // call packet handlers, a null result indicates that we shouldn't change anything - PacketEvent interceptionResult = this.channelListener.onPacketReceiving(this.injector, msg, null); - if (interceptionResult == null) { + // check if there are any listeners bound for the packet - if not just post the packet down the pipeline + if (!this.channelListener.hasListener(messageClass)) { ctx.fireChannelRead(msg); return; } - // fire the intercepted packet down the pipeline if it wasn't cancelled - if (!interceptionResult.isCancelled()) { - ctx.fireChannelRead(interceptionResult.getPacket().getHandle()); - - // check if there were any post events added the packet after we fired it down the pipeline - // we use this way as we don't want to construct a new network manager accidentally - NetworkMarker marker = NetworkMarker.getNetworkMarker(interceptionResult); - if (marker != null) { - this.networkProcessor.invokePostEvent(interceptionResult, marker); - } - } + // call all inbound listeners + this.injector.processInboundPacket(ctx, msg, messageClass); } else { // just pass the message down the pipeline ctx.fireChannelRead(msg); } } - private boolean shouldInterceptMessage(Object msg) { + private boolean shouldInterceptMessage(Class messageClass) { // only intercept minecraft packets and no garbage from other stuff in the channel - return MinecraftReflection.getPacketClass().isAssignableFrom(msg.getClass()); + return MinecraftReflection.getPacketClass().isAssignableFrom(messageClass); } } diff --git a/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyChannelInjector.java b/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyChannelInjector.java index 129ba9b9..b28688b8 100644 --- a/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyChannelInjector.java +++ b/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyChannelInjector.java @@ -1,5 +1,19 @@ package com.comphenix.protocol.injector.netty.channel; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.WeakHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; + import com.comphenix.protocol.PacketType; import com.comphenix.protocol.PacketType.Protocol; import com.comphenix.protocol.error.ErrorReporter; @@ -23,20 +37,9 @@ import com.comphenix.protocol.utility.MinecraftVersion; import com.comphenix.protocol.wrappers.WrappedGameProfile; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.EventLoop; import io.netty.util.AttributeKey; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NoSuchElementException; -import java.util.Set; -import java.util.WeakHashMap; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; import org.bukkit.Server; import org.bukkit.entity.Player; @@ -211,7 +214,7 @@ public class NettyChannelInjector implements Injector { this.wrappedChannel.pipeline().addAfter( "decoder", INTERCEPTOR_NAME, - new InboundPacketInterceptor(this, this.channelListener, this.networkProcessor)); + new InboundPacketInterceptor(this, this.channelListener)); this.injected = true; return true; @@ -478,7 +481,57 @@ public class NettyChannelInjector implements Injector { } private void ensureInEventLoop(Runnable runnable) { - this.wrappedChannel.eventLoop().execute(runnable); + this.ensureInEventLoop(this.wrappedChannel.eventLoop(), runnable); + } + + private void ensureInEventLoop(EventLoop eventLoop, Runnable runnable) { + if (eventLoop.inEventLoop()) { + runnable.run(); + } else { + eventLoop.execute(runnable); + } + } + + void processInboundPacket(ChannelHandlerContext ctx, Object packet, Class packetClass) { + if (this.channelListener.hasMainThreadListener(packetClass) && !this.server.isPrimaryThread()) { + // not on the main thread but we are required to be - re-schedule the packet on the main thread + this.server.getScheduler().runTask( + this.injectionFactory.getPlugin(), + () -> this.processInboundPacket(ctx, packet, packetClass)); + return; + } + + if (ctx.channel().eventLoop().inEventLoop()) { + // we're in a netty event loop - prevent that from happening as it slows down netty + // in normal cases netty only has 4 processing threads available which is *really* bad when we're + // then blocking these (or more specifically a plugin) to process the incoming packet + // See https://twitter.com/fbrasisil/status/1163974576511995904 for a reference what can happen + this.server.getScheduler().runTaskAsynchronously( + this.injectionFactory.getPlugin(), + () -> this.processInboundPacket(ctx, packet, packetClass)); + return; + } + + // call packet handlers, a null result indicates that we shouldn't change anything + PacketEvent interceptionResult = this.channelListener.onPacketReceiving(this, packet, null); + if (interceptionResult == null) { + this.ensureInEventLoop(ctx.channel().eventLoop(), () -> ctx.fireChannelRead(packet)); + return; + } + + // fire the intercepted packet down the pipeline if it wasn't cancelled + if (!interceptionResult.isCancelled()) { + this.ensureInEventLoop( + ctx.channel().eventLoop(), + () -> ctx.fireChannelRead(interceptionResult.getPacket().getHandle())); + + // check if there were any post events added the packet after we fired it down the pipeline + // we use this way as we don't want to construct a new network manager accidentally + NetworkMarker marker = NetworkMarker.getNetworkMarker(interceptionResult); + if (marker != null) { + this.networkProcessor.invokePostEvent(interceptionResult, marker); + } + } } T processOutbound(T action, boolean markSeen) { diff --git a/src/main/java/com/comphenix/protocol/injector/netty/manager/NetworkManagerInjector.java b/src/main/java/com/comphenix/protocol/injector/netty/manager/NetworkManagerInjector.java index 9fe00997..dce7b13a 100644 --- a/src/main/java/com/comphenix/protocol/injector/netty/manager/NetworkManagerInjector.java +++ b/src/main/java/com/comphenix/protocol/injector/netty/manager/NetworkManagerInjector.java @@ -1,5 +1,12 @@ package com.comphenix.protocol.injector.netty.manager; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + import com.comphenix.protocol.PacketType; import com.comphenix.protocol.ProtocolLogger; import com.comphenix.protocol.concurrency.PacketTypeSet; @@ -23,12 +30,6 @@ import com.comphenix.protocol.reflect.fuzzy.FuzzyMethodContract; import com.comphenix.protocol.utility.MinecraftReflection; import com.comphenix.protocol.wrappers.Pair; import io.netty.channel.ChannelFuture; -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.util.HashSet; -import java.util.List; -import java.util.Set; import org.bukkit.Server; import org.bukkit.plugin.Plugin; @@ -78,7 +79,11 @@ public class NetworkManagerInjector implements ChannelListener { this, this.injectionFactory, this.mainThreadListeners); - this.packetInjector = new NetworkManagerPacketInjector(this.inboundListeners, this.listenerInvoker, this); + this.packetInjector = new NetworkManagerPacketInjector( + this.inboundListeners, + this.listenerInvoker, + this, + this.mainThreadListeners); } @Override diff --git a/src/main/java/com/comphenix/protocol/injector/netty/manager/NetworkManagerPacketInjector.java b/src/main/java/com/comphenix/protocol/injector/netty/manager/NetworkManagerPacketInjector.java index fe4d85fb..2cc75452 100644 --- a/src/main/java/com/comphenix/protocol/injector/netty/manager/NetworkManagerPacketInjector.java +++ b/src/main/java/com/comphenix/protocol/injector/netty/manager/NetworkManagerPacketInjector.java @@ -1,23 +1,49 @@ package com.comphenix.protocol.injector.netty.manager; +import java.util.Set; + +import com.comphenix.protocol.PacketType; import com.comphenix.protocol.concurrency.PacketTypeSet; +import com.comphenix.protocol.events.ListenerOptions; import com.comphenix.protocol.events.PacketContainer; import com.comphenix.protocol.events.PacketEvent; import com.comphenix.protocol.injector.ListenerInvoker; -import com.comphenix.protocol.injector.packet.AbstractPacketInjector; import com.comphenix.protocol.injector.netty.ChannelListener; +import com.comphenix.protocol.injector.packet.AbstractPacketInjector; import org.bukkit.entity.Player; final class NetworkManagerPacketInjector extends AbstractPacketInjector { private final ListenerInvoker invoker; private final ChannelListener channelListener; + private final PacketTypeSet mainThreadListeners; - public NetworkManagerPacketInjector(PacketTypeSet inboundFilters, ListenerInvoker invoker, ChannelListener listener) { + public NetworkManagerPacketInjector( + PacketTypeSet inboundFilters, + ListenerInvoker invoker, + ChannelListener listener, + PacketTypeSet mainThreadListeners + ) { super(inboundFilters); this.invoker = invoker; this.channelListener = listener; + this.mainThreadListeners = mainThreadListeners; + } + + @Override + public boolean addPacketHandler(PacketType type, Set options) { + if (!type.isAsyncForced() && (options == null || !options.contains(ListenerOptions.ASYNC))) { + this.mainThreadListeners.addType(type); + } + + return super.addPacketHandler(type, options); + } + + @Override + public boolean removePacketHandler(PacketType type) { + this.mainThreadListeners.removeType(type); + return super.removePacketHandler(type); } @Override @@ -27,4 +53,9 @@ final class NetworkManagerPacketInjector extends AbstractPacketInjector { return event; } + + @Override + public boolean hasMainThreadListener(PacketType type) { + return this.mainThreadListeners.contains(type); + } } diff --git a/src/main/java/com/comphenix/protocol/injector/packet/PacketInjector.java b/src/main/java/com/comphenix/protocol/injector/packet/PacketInjector.java index 4bbb66a7..258999cb 100644 --- a/src/main/java/com/comphenix/protocol/injector/packet/PacketInjector.java +++ b/src/main/java/com/comphenix/protocol/injector/packet/PacketInjector.java @@ -1,10 +1,11 @@ package com.comphenix.protocol.injector.packet; +import java.util.Set; + import com.comphenix.protocol.PacketType; import com.comphenix.protocol.events.ListenerOptions; import com.comphenix.protocol.events.PacketContainer; import com.comphenix.protocol.events.PacketEvent; -import java.util.Set; import org.bukkit.entity.Player; /** @@ -55,6 +56,14 @@ public interface PacketInjector { */ PacketEvent packetReceived(PacketContainer packet, Player client); + /** + * Determine if we have packet listeners with the given type that must be executed on the main thread. + * + * @param type - the packet type. + * @return TRUE if we do, FALSE otherwise. + */ + boolean hasMainThreadListener(PacketType type); + /** * Perform any necessary cleanup before unloading ProtocolLib. */ diff --git a/src/main/java/com/comphenix/protocol/injector/player/PlayerInjectionHandler.java b/src/main/java/com/comphenix/protocol/injector/player/PlayerInjectionHandler.java index 62fd0f29..783829a3 100644 --- a/src/main/java/com/comphenix/protocol/injector/player/PlayerInjectionHandler.java +++ b/src/main/java/com/comphenix/protocol/injector/player/PlayerInjectionHandler.java @@ -1,12 +1,13 @@ package com.comphenix.protocol.injector.player; +import java.util.Set; + import com.comphenix.protocol.PacketType; import com.comphenix.protocol.events.ListenerOptions; import com.comphenix.protocol.events.NetworkMarker; import com.comphenix.protocol.events.PacketContainer; import com.comphenix.protocol.events.PacketListener; import io.netty.channel.Channel; -import java.util.Set; import org.bukkit.entity.Player; public interface PlayerInjectionHandler { @@ -121,8 +122,6 @@ public interface PlayerInjectionHandler { /** * Determine if we have packet listeners with the given type that must be executed on the main thread. - *

- * This only applies for onPacketSending(), as it makes certain guarantees. * * @param type - the packet type. * @return TRUE if we do, FALSE otherwise.