add support for sync & async receiving listeners (#1815)

This commit is contained in:
Pasqual Koschmieder 2022-08-10 22:50:33 +02:00 committed by GitHub
parent 575174580e
commit 8876ce323b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 134 additions and 51 deletions

View File

@ -1,8 +1,5 @@
package com.comphenix.protocol.injector.netty.channel;
import com.comphenix.protocol.events.NetworkMarker;
import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.injector.NetworkProcessor;
import com.comphenix.protocol.injector.netty.ChannelListener;
import com.comphenix.protocol.utility.MinecraftReflection;
import io.netty.channel.ChannelHandlerContext;
@ -12,47 +9,36 @@ final class InboundPacketInterceptor extends ChannelInboundHandlerAdapter {
private final NettyChannelInjector injector;
private final ChannelListener channelListener;
private final NetworkProcessor networkProcessor;
public InboundPacketInterceptor(NettyChannelInjector injector, ChannelListener listener, NetworkProcessor processor) {
public InboundPacketInterceptor(NettyChannelInjector injector, ChannelListener listener) {
this.injector = injector;
this.channelListener = listener;
this.networkProcessor = processor;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (this.shouldInterceptMessage(msg)) {
Class<?> messageClass = msg.getClass();
if (this.shouldInterceptMessage(messageClass)) {
// process the login if the packet is one before posting the packet to any handler to provide "real" data
// the method invocation will do nothing if the packet is not a login packet
this.injector.tryProcessLogin(msg);
// call packet handlers, a null result indicates that we shouldn't change anything
PacketEvent interceptionResult = this.channelListener.onPacketReceiving(this.injector, msg, null);
if (interceptionResult == null) {
// check if there are any listeners bound for the packet - if not just post the packet down the pipeline
if (!this.channelListener.hasListener(messageClass)) {
ctx.fireChannelRead(msg);
return;
}
// fire the intercepted packet down the pipeline if it wasn't cancelled
if (!interceptionResult.isCancelled()) {
ctx.fireChannelRead(interceptionResult.getPacket().getHandle());
// check if there were any post events added the packet after we fired it down the pipeline
// we use this way as we don't want to construct a new network manager accidentally
NetworkMarker marker = NetworkMarker.getNetworkMarker(interceptionResult);
if (marker != null) {
this.networkProcessor.invokePostEvent(interceptionResult, marker);
}
}
// call all inbound listeners
this.injector.processInboundPacket(ctx, msg, messageClass);
} else {
// just pass the message down the pipeline
ctx.fireChannelRead(msg);
}
}
private boolean shouldInterceptMessage(Object msg) {
private boolean shouldInterceptMessage(Class<?> messageClass) {
// only intercept minecraft packets and no garbage from other stuff in the channel
return MinecraftReflection.getPacketClass().isAssignableFrom(msg.getClass());
return MinecraftReflection.getPacketClass().isAssignableFrom(messageClass);
}
}

View File

@ -1,5 +1,19 @@
package com.comphenix.protocol.injector.netty.channel;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import com.comphenix.protocol.PacketType;
import com.comphenix.protocol.PacketType.Protocol;
import com.comphenix.protocol.error.ErrorReporter;
@ -23,20 +37,9 @@ import com.comphenix.protocol.utility.MinecraftVersion;
import com.comphenix.protocol.wrappers.WrappedGameProfile;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.util.AttributeKey;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import org.bukkit.Server;
import org.bukkit.entity.Player;
@ -211,7 +214,7 @@ public class NettyChannelInjector implements Injector {
this.wrappedChannel.pipeline().addAfter(
"decoder",
INTERCEPTOR_NAME,
new InboundPacketInterceptor(this, this.channelListener, this.networkProcessor));
new InboundPacketInterceptor(this, this.channelListener));
this.injected = true;
return true;
@ -478,7 +481,57 @@ public class NettyChannelInjector implements Injector {
}
private void ensureInEventLoop(Runnable runnable) {
this.wrappedChannel.eventLoop().execute(runnable);
this.ensureInEventLoop(this.wrappedChannel.eventLoop(), runnable);
}
private void ensureInEventLoop(EventLoop eventLoop, Runnable runnable) {
if (eventLoop.inEventLoop()) {
runnable.run();
} else {
eventLoop.execute(runnable);
}
}
void processInboundPacket(ChannelHandlerContext ctx, Object packet, Class<?> packetClass) {
if (this.channelListener.hasMainThreadListener(packetClass) && !this.server.isPrimaryThread()) {
// not on the main thread but we are required to be - re-schedule the packet on the main thread
this.server.getScheduler().runTask(
this.injectionFactory.getPlugin(),
() -> this.processInboundPacket(ctx, packet, packetClass));
return;
}
if (ctx.channel().eventLoop().inEventLoop()) {
// we're in a netty event loop - prevent that from happening as it slows down netty
// in normal cases netty only has 4 processing threads available which is *really* bad when we're
// then blocking these (or more specifically a plugin) to process the incoming packet
// See https://twitter.com/fbrasisil/status/1163974576511995904 for a reference what can happen
this.server.getScheduler().runTaskAsynchronously(
this.injectionFactory.getPlugin(),
() -> this.processInboundPacket(ctx, packet, packetClass));
return;
}
// call packet handlers, a null result indicates that we shouldn't change anything
PacketEvent interceptionResult = this.channelListener.onPacketReceiving(this, packet, null);
if (interceptionResult == null) {
this.ensureInEventLoop(ctx.channel().eventLoop(), () -> ctx.fireChannelRead(packet));
return;
}
// fire the intercepted packet down the pipeline if it wasn't cancelled
if (!interceptionResult.isCancelled()) {
this.ensureInEventLoop(
ctx.channel().eventLoop(),
() -> ctx.fireChannelRead(interceptionResult.getPacket().getHandle()));
// check if there were any post events added the packet after we fired it down the pipeline
// we use this way as we don't want to construct a new network manager accidentally
NetworkMarker marker = NetworkMarker.getNetworkMarker(interceptionResult);
if (marker != null) {
this.networkProcessor.invokePostEvent(interceptionResult, marker);
}
}
}
<T> T processOutbound(T action, boolean markSeen) {

View File

@ -1,5 +1,12 @@
package com.comphenix.protocol.injector.netty.manager;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import com.comphenix.protocol.PacketType;
import com.comphenix.protocol.ProtocolLogger;
import com.comphenix.protocol.concurrency.PacketTypeSet;
@ -23,12 +30,6 @@ import com.comphenix.protocol.reflect.fuzzy.FuzzyMethodContract;
import com.comphenix.protocol.utility.MinecraftReflection;
import com.comphenix.protocol.wrappers.Pair;
import io.netty.channel.ChannelFuture;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.bukkit.Server;
import org.bukkit.plugin.Plugin;
@ -78,7 +79,11 @@ public class NetworkManagerInjector implements ChannelListener {
this,
this.injectionFactory,
this.mainThreadListeners);
this.packetInjector = new NetworkManagerPacketInjector(this.inboundListeners, this.listenerInvoker, this);
this.packetInjector = new NetworkManagerPacketInjector(
this.inboundListeners,
this.listenerInvoker,
this,
this.mainThreadListeners);
}
@Override

View File

@ -1,23 +1,49 @@
package com.comphenix.protocol.injector.netty.manager;
import java.util.Set;
import com.comphenix.protocol.PacketType;
import com.comphenix.protocol.concurrency.PacketTypeSet;
import com.comphenix.protocol.events.ListenerOptions;
import com.comphenix.protocol.events.PacketContainer;
import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.injector.ListenerInvoker;
import com.comphenix.protocol.injector.packet.AbstractPacketInjector;
import com.comphenix.protocol.injector.netty.ChannelListener;
import com.comphenix.protocol.injector.packet.AbstractPacketInjector;
import org.bukkit.entity.Player;
final class NetworkManagerPacketInjector extends AbstractPacketInjector {
private final ListenerInvoker invoker;
private final ChannelListener channelListener;
private final PacketTypeSet mainThreadListeners;
public NetworkManagerPacketInjector(PacketTypeSet inboundFilters, ListenerInvoker invoker, ChannelListener listener) {
public NetworkManagerPacketInjector(
PacketTypeSet inboundFilters,
ListenerInvoker invoker,
ChannelListener listener,
PacketTypeSet mainThreadListeners
) {
super(inboundFilters);
this.invoker = invoker;
this.channelListener = listener;
this.mainThreadListeners = mainThreadListeners;
}
@Override
public boolean addPacketHandler(PacketType type, Set<ListenerOptions> options) {
if (!type.isAsyncForced() && (options == null || !options.contains(ListenerOptions.ASYNC))) {
this.mainThreadListeners.addType(type);
}
return super.addPacketHandler(type, options);
}
@Override
public boolean removePacketHandler(PacketType type) {
this.mainThreadListeners.removeType(type);
return super.removePacketHandler(type);
}
@Override
@ -27,4 +53,9 @@ final class NetworkManagerPacketInjector extends AbstractPacketInjector {
return event;
}
@Override
public boolean hasMainThreadListener(PacketType type) {
return this.mainThreadListeners.contains(type);
}
}

View File

@ -1,10 +1,11 @@
package com.comphenix.protocol.injector.packet;
import java.util.Set;
import com.comphenix.protocol.PacketType;
import com.comphenix.protocol.events.ListenerOptions;
import com.comphenix.protocol.events.PacketContainer;
import com.comphenix.protocol.events.PacketEvent;
import java.util.Set;
import org.bukkit.entity.Player;
/**
@ -55,6 +56,14 @@ public interface PacketInjector {
*/
PacketEvent packetReceived(PacketContainer packet, Player client);
/**
* Determine if we have packet listeners with the given type that must be executed on the main thread.
*
* @param type - the packet type.
* @return TRUE if we do, FALSE otherwise.
*/
boolean hasMainThreadListener(PacketType type);
/**
* Perform any necessary cleanup before unloading ProtocolLib.
*/

View File

@ -1,12 +1,13 @@
package com.comphenix.protocol.injector.player;
import java.util.Set;
import com.comphenix.protocol.PacketType;
import com.comphenix.protocol.events.ListenerOptions;
import com.comphenix.protocol.events.NetworkMarker;
import com.comphenix.protocol.events.PacketContainer;
import com.comphenix.protocol.events.PacketListener;
import io.netty.channel.Channel;
import java.util.Set;
import org.bukkit.entity.Player;
public interface PlayerInjectionHandler {
@ -121,8 +122,6 @@ public interface PlayerInjectionHandler {
/**
* Determine if we have packet listeners with the given type that must be executed on the main thread.
* <p>
* This only applies for onPacketSending(), as it makes certain guarantees.
*
* @param type - the packet type.
* @return TRUE if we do, FALSE otherwise.