From 4db1e39ac76743bf1775249c1749161dd5f4909e Mon Sep 17 00:00:00 2001 From: Pasqual Koschmieder Date: Sat, 11 Jun 2022 18:36:46 +0200 Subject: [PATCH] fix packet listener calling when processed in event loop (#1621) --- .../netty/channel/NettyChannelInjector.java | 46 +++++++++++++------ .../netty/channel/NettyChannelProxy.java | 39 ++++++++++++++-- .../InjectionChannelInboundHandler.java | 6 +-- 3 files changed, 70 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyChannelInjector.java b/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyChannelInjector.java index aa3cc484..1c0164ab 100644 --- a/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyChannelInjector.java +++ b/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyChannelInjector.java @@ -27,7 +27,7 @@ import io.netty.channel.ChannelHandler; import io.netty.util.AttributeKey; import java.lang.reflect.Field; import java.lang.reflect.Modifier; -import java.util.LinkedHashSet; +import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; @@ -95,7 +95,8 @@ public class NettyChannelInjector implements Injector { private final FieldAccessor channelField; - private final Set skippedPackets = new LinkedHashSet<>(); + private final Set skippedPackets = new HashSet<>(); + private final Set processedPackets = new HashSet<>(); private final Map savedMarkers = new WeakHashMap<>(16, 0.9f); // status of this injector @@ -463,14 +464,14 @@ public class NettyChannelInjector implements Injector { Channel ch = new NettyChannelProxy(this.wrappedChannel, new NettyEventLoopProxy(this.wrappedChannel.eventLoop()) { @Override protected Runnable proxyRunnable(Runnable original) { - return NettyChannelInjector.this.processOutbound(original); + return NettyChannelInjector.this.processOutbound(original, true); } @Override protected Callable proxyCallable(Callable original) { - return NettyChannelInjector.this.processOutbound(original); + return NettyChannelInjector.this.processOutbound(original, true); } - }); + }, this); this.channelField.set(this.networkManager, ch); } @@ -478,7 +479,7 @@ public class NettyChannelInjector implements Injector { this.wrappedChannel.eventLoop().execute(runnable); } - private T processOutbound(T action) { + T processOutbound(T action, boolean markSeen) { // get the accessor to the packet field // if we are unable to look up the accessor then just return the runnable, probably nothing of our business FieldAccessor packetAccessor = this.lookupPacketAccessor(action); @@ -498,16 +499,16 @@ public class NettyChannelInjector implements Injector { // if a marker was set there might be scheduled packets to execute after the packet send // for this to work we need to proxy the input action to provide access to them if (marker != null) { - return this.proxyAction(action, null, marker); + return this.markProcessed(packet, this.proxyAction(action, null, marker), markSeen); } // nothing special, just no processing - return action; + return this.markProcessed(packet, action, markSeen); } // no listener and no marker - no magic :) if (!this.channelListener.hasListener(packet.getClass()) && marker == null) { - return action; + return this.markProcessed(packet, action, markSeen); } // ensure that we are on the main thread if we need to @@ -523,7 +524,7 @@ public class NettyChannelInjector implements Injector { // null indicates that no listener was affected by the packet, meaning that we can directly send the original packet PacketEvent event = this.channelListener.onPacketSending(this, packet, marker); if (event == null) { - return action; + return this.markProcessed(packet, action, markSeen); } // if the event wasn't cancelled by this action we must recheck if the packet changed during the method call @@ -539,11 +540,11 @@ public class NettyChannelInjector implements Injector { // if the marker is null we can just schedule the action as we don't need to do anything after the packet was sent NetworkMarker eventMarker = NetworkMarker.getNetworkMarker(event); if (eventMarker == null) { - return action; + return this.markProcessed(packet, action, markSeen); } // we need to wrap the action to call the listeners set in the marker - return this.proxyAction(action, event, eventMarker); + return this.markProcessed(packet, this.proxyAction(action, event, eventMarker), markSeen); } // return null if the event was cancelled to schedule a no-op event @@ -572,10 +573,29 @@ public class NettyChannelInjector implements Injector { } } + private T markProcessed(Object packet, T actualAction, boolean shouldMarkPackets) { + if (shouldMarkPackets) { + // tiny hack to prevent duplicate packet processing, on main thread and async + this.processedPackets.add(packet); + } + + // return the requested action + return actualAction; + } + + boolean wasProcessedBefore(Object packet) { + return this.processedPackets.remove(packet); + } + private FieldAccessor lookupPacketAccessor(Object action) { return PACKET_ACCESSORS.computeIfAbsent(action.getClass(), clazz -> { try { - return Accessors.getFieldAccessor(action.getClass(), MinecraftReflection.getPacketClass(), true); + // find the field + Field packetField = FuzzyReflection.fromClass(clazz, true).getField(FuzzyFieldContract + .newBuilder() + .typeSuperOf(MinecraftReflection.getPacketClass()) + .build()); + return Accessors.getFieldAccessor(packetField, true); } catch (IllegalArgumentException exception) { // no such field found :( return NO_OP_ACCESSOR; diff --git a/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyChannelProxy.java b/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyChannelProxy.java index 42f85cd3..668e7267 100644 --- a/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyChannelProxy.java +++ b/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyChannelProxy.java @@ -1,5 +1,6 @@ package com.comphenix.protocol.injector.netty.channel; +import com.comphenix.protocol.utility.MinecraftReflection; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; @@ -12,6 +13,7 @@ import io.netty.channel.EventLoop; import io.netty.util.Attribute; import io.netty.util.AttributeKey; import java.net.SocketAddress; +import java.util.function.Consumer; import org.jetbrains.annotations.NotNull; /** @@ -23,10 +25,12 @@ final class NettyChannelProxy implements Channel { private final Channel delegate; private final EventLoop eventLoop; + private final NettyChannelInjector injector; - public NettyChannelProxy(Channel delegate, EventLoop eventLoop) { + public NettyChannelProxy(Channel delegate, EventLoop eventLoop, NettyChannelInjector injector) { this.delegate = delegate; this.eventLoop = eventLoop; + this.injector = injector; } @Override @@ -192,12 +196,19 @@ final class NettyChannelProxy implements Channel { @Override public ChannelFuture write(Object msg) { - return this.delegate.write(msg); + return this.write(msg, this.newPromise()); } @Override public ChannelFuture write(Object msg, ChannelPromise promise) { - return this.delegate.write(msg, promise); + // only need to do our special handling if we are in the event loop + if (this.isPacketEventCallNeeded(msg)) { + this.processPacketOutbound(msg, packet -> this.delegate.write(packet, promise)); + return promise; + } else { + // no special handling needed, just write the packet + return this.delegate.write(msg, promise); + } } @Override @@ -207,12 +218,19 @@ final class NettyChannelProxy implements Channel { @Override public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { - return this.delegate.writeAndFlush(msg, promise); + // only need to do our special handling if we are in the event loop + if (this.isPacketEventCallNeeded(msg)) { + this.processPacketOutbound(msg, packet -> this.delegate.writeAndFlush(packet, promise)); + return promise; + } else { + // no special handling needed, just write the packet + return this.delegate.writeAndFlush(msg, promise); + } } @Override public ChannelFuture writeAndFlush(Object msg) { - return this.delegate.writeAndFlush(msg); + return this.writeAndFlush(msg, this.newPromise()); } @Override @@ -224,4 +242,15 @@ final class NettyChannelProxy implements Channel { public int compareTo(@NotNull Channel o) { return this.delegate.compareTo(o); } + + private boolean isPacketEventCallNeeded(Object msg) { + return MinecraftReflection.isPacketClass(msg) && !this.injector.wasProcessedBefore(msg); + } + + private void processPacketOutbound(Object packet, Consumer delegateActionHandler) { + Runnable action = this.injector.processOutbound(() -> delegateActionHandler.accept(packet), false); + if (action != null) { + action.run(); + } + } } diff --git a/src/main/java/com/comphenix/protocol/injector/netty/manager/InjectionChannelInboundHandler.java b/src/main/java/com/comphenix/protocol/injector/netty/manager/InjectionChannelInboundHandler.java index 71c861be..fd27d06e 100644 --- a/src/main/java/com/comphenix/protocol/injector/netty/manager/InjectionChannelInboundHandler.java +++ b/src/main/java/com/comphenix/protocol/injector/netty/manager/InjectionChannelInboundHandler.java @@ -27,6 +27,9 @@ final class InjectionChannelInboundHandler extends ChannelInboundHandlerAdapter @Override public void channelActive(ChannelHandlerContext ctx) { + // call the minecraft channelActive method first + ctx.fireChannelActive(); + // the channel is now active, at this point minecraft has eventually prepared everything in the connection // of the player so that we can come in and hook as we are after the minecraft handler try { @@ -40,9 +43,6 @@ final class InjectionChannelInboundHandler extends ChannelInboundHandlerAdapter // remove this handler from the pipeline now to prevent multiple injections ctx.channel().pipeline().remove(this); - - // fire it down the pipeline in case someone else needs it - ctx.fireChannelActive(); } @Override