mirror of
https://github.com/dmulloy2/ProtocolLib.git
synced 2025-02-08 16:42:04 +01:00
fix packet listener calling when processed in event loop (#1621)
This commit is contained in:
parent
e202503c09
commit
4db1e39ac7
@ -27,7 +27,7 @@ import io.netty.channel.ChannelHandler;
|
|||||||
import io.netty.util.AttributeKey;
|
import io.netty.util.AttributeKey;
|
||||||
import java.lang.reflect.Field;
|
import java.lang.reflect.Field;
|
||||||
import java.lang.reflect.Modifier;
|
import java.lang.reflect.Modifier;
|
||||||
import java.util.LinkedHashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.NoSuchElementException;
|
import java.util.NoSuchElementException;
|
||||||
@ -95,7 +95,8 @@ public class NettyChannelInjector implements Injector {
|
|||||||
|
|
||||||
private final FieldAccessor channelField;
|
private final FieldAccessor channelField;
|
||||||
|
|
||||||
private final Set<Object> skippedPackets = new LinkedHashSet<>();
|
private final Set<Object> skippedPackets = new HashSet<>();
|
||||||
|
private final Set<Object> processedPackets = new HashSet<>();
|
||||||
private final Map<Object, NetworkMarker> savedMarkers = new WeakHashMap<>(16, 0.9f);
|
private final Map<Object, NetworkMarker> savedMarkers = new WeakHashMap<>(16, 0.9f);
|
||||||
|
|
||||||
// status of this injector
|
// status of this injector
|
||||||
@ -463,14 +464,14 @@ public class NettyChannelInjector implements Injector {
|
|||||||
Channel ch = new NettyChannelProxy(this.wrappedChannel, new NettyEventLoopProxy(this.wrappedChannel.eventLoop()) {
|
Channel ch = new NettyChannelProxy(this.wrappedChannel, new NettyEventLoopProxy(this.wrappedChannel.eventLoop()) {
|
||||||
@Override
|
@Override
|
||||||
protected Runnable proxyRunnable(Runnable original) {
|
protected Runnable proxyRunnable(Runnable original) {
|
||||||
return NettyChannelInjector.this.processOutbound(original);
|
return NettyChannelInjector.this.processOutbound(original, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected <T> Callable<T> proxyCallable(Callable<T> original) {
|
protected <T> Callable<T> proxyCallable(Callable<T> original) {
|
||||||
return NettyChannelInjector.this.processOutbound(original);
|
return NettyChannelInjector.this.processOutbound(original, true);
|
||||||
}
|
}
|
||||||
});
|
}, this);
|
||||||
this.channelField.set(this.networkManager, ch);
|
this.channelField.set(this.networkManager, ch);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -478,7 +479,7 @@ public class NettyChannelInjector implements Injector {
|
|||||||
this.wrappedChannel.eventLoop().execute(runnable);
|
this.wrappedChannel.eventLoop().execute(runnable);
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> T processOutbound(T action) {
|
<T> T processOutbound(T action, boolean markSeen) {
|
||||||
// get the accessor to the packet field
|
// get the accessor to the packet field
|
||||||
// if we are unable to look up the accessor then just return the runnable, probably nothing of our business
|
// if we are unable to look up the accessor then just return the runnable, probably nothing of our business
|
||||||
FieldAccessor packetAccessor = this.lookupPacketAccessor(action);
|
FieldAccessor packetAccessor = this.lookupPacketAccessor(action);
|
||||||
@ -498,16 +499,16 @@ public class NettyChannelInjector implements Injector {
|
|||||||
// if a marker was set there might be scheduled packets to execute after the packet send
|
// if a marker was set there might be scheduled packets to execute after the packet send
|
||||||
// for this to work we need to proxy the input action to provide access to them
|
// for this to work we need to proxy the input action to provide access to them
|
||||||
if (marker != null) {
|
if (marker != null) {
|
||||||
return this.proxyAction(action, null, marker);
|
return this.markProcessed(packet, this.proxyAction(action, null, marker), markSeen);
|
||||||
}
|
}
|
||||||
|
|
||||||
// nothing special, just no processing
|
// nothing special, just no processing
|
||||||
return action;
|
return this.markProcessed(packet, action, markSeen);
|
||||||
}
|
}
|
||||||
|
|
||||||
// no listener and no marker - no magic :)
|
// no listener and no marker - no magic :)
|
||||||
if (!this.channelListener.hasListener(packet.getClass()) && marker == null) {
|
if (!this.channelListener.hasListener(packet.getClass()) && marker == null) {
|
||||||
return action;
|
return this.markProcessed(packet, action, markSeen);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensure that we are on the main thread if we need to
|
// ensure that we are on the main thread if we need to
|
||||||
@ -523,7 +524,7 @@ public class NettyChannelInjector implements Injector {
|
|||||||
// null indicates that no listener was affected by the packet, meaning that we can directly send the original packet
|
// null indicates that no listener was affected by the packet, meaning that we can directly send the original packet
|
||||||
PacketEvent event = this.channelListener.onPacketSending(this, packet, marker);
|
PacketEvent event = this.channelListener.onPacketSending(this, packet, marker);
|
||||||
if (event == null) {
|
if (event == null) {
|
||||||
return action;
|
return this.markProcessed(packet, action, markSeen);
|
||||||
}
|
}
|
||||||
|
|
||||||
// if the event wasn't cancelled by this action we must recheck if the packet changed during the method call
|
// if the event wasn't cancelled by this action we must recheck if the packet changed during the method call
|
||||||
@ -539,11 +540,11 @@ public class NettyChannelInjector implements Injector {
|
|||||||
// if the marker is null we can just schedule the action as we don't need to do anything after the packet was sent
|
// if the marker is null we can just schedule the action as we don't need to do anything after the packet was sent
|
||||||
NetworkMarker eventMarker = NetworkMarker.getNetworkMarker(event);
|
NetworkMarker eventMarker = NetworkMarker.getNetworkMarker(event);
|
||||||
if (eventMarker == null) {
|
if (eventMarker == null) {
|
||||||
return action;
|
return this.markProcessed(packet, action, markSeen);
|
||||||
}
|
}
|
||||||
|
|
||||||
// we need to wrap the action to call the listeners set in the marker
|
// we need to wrap the action to call the listeners set in the marker
|
||||||
return this.proxyAction(action, event, eventMarker);
|
return this.markProcessed(packet, this.proxyAction(action, event, eventMarker), markSeen);
|
||||||
}
|
}
|
||||||
|
|
||||||
// return null if the event was cancelled to schedule a no-op event
|
// return null if the event was cancelled to schedule a no-op event
|
||||||
@ -572,10 +573,29 @@ public class NettyChannelInjector implements Injector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private <T> T markProcessed(Object packet, T actualAction, boolean shouldMarkPackets) {
|
||||||
|
if (shouldMarkPackets) {
|
||||||
|
// tiny hack to prevent duplicate packet processing, on main thread and async
|
||||||
|
this.processedPackets.add(packet);
|
||||||
|
}
|
||||||
|
|
||||||
|
// return the requested action
|
||||||
|
return actualAction;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean wasProcessedBefore(Object packet) {
|
||||||
|
return this.processedPackets.remove(packet);
|
||||||
|
}
|
||||||
|
|
||||||
private FieldAccessor lookupPacketAccessor(Object action) {
|
private FieldAccessor lookupPacketAccessor(Object action) {
|
||||||
return PACKET_ACCESSORS.computeIfAbsent(action.getClass(), clazz -> {
|
return PACKET_ACCESSORS.computeIfAbsent(action.getClass(), clazz -> {
|
||||||
try {
|
try {
|
||||||
return Accessors.getFieldAccessor(action.getClass(), MinecraftReflection.getPacketClass(), true);
|
// find the field
|
||||||
|
Field packetField = FuzzyReflection.fromClass(clazz, true).getField(FuzzyFieldContract
|
||||||
|
.newBuilder()
|
||||||
|
.typeSuperOf(MinecraftReflection.getPacketClass())
|
||||||
|
.build());
|
||||||
|
return Accessors.getFieldAccessor(packetField, true);
|
||||||
} catch (IllegalArgumentException exception) {
|
} catch (IllegalArgumentException exception) {
|
||||||
// no such field found :(
|
// no such field found :(
|
||||||
return NO_OP_ACCESSOR;
|
return NO_OP_ACCESSOR;
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package com.comphenix.protocol.injector.netty.channel;
|
package com.comphenix.protocol.injector.netty.channel;
|
||||||
|
|
||||||
|
import com.comphenix.protocol.utility.MinecraftReflection;
|
||||||
import io.netty.buffer.ByteBufAllocator;
|
import io.netty.buffer.ByteBufAllocator;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelConfig;
|
import io.netty.channel.ChannelConfig;
|
||||||
@ -12,6 +13,7 @@ import io.netty.channel.EventLoop;
|
|||||||
import io.netty.util.Attribute;
|
import io.netty.util.Attribute;
|
||||||
import io.netty.util.AttributeKey;
|
import io.netty.util.AttributeKey;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
|
import java.util.function.Consumer;
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -23,10 +25,12 @@ final class NettyChannelProxy implements Channel {
|
|||||||
|
|
||||||
private final Channel delegate;
|
private final Channel delegate;
|
||||||
private final EventLoop eventLoop;
|
private final EventLoop eventLoop;
|
||||||
|
private final NettyChannelInjector injector;
|
||||||
|
|
||||||
public NettyChannelProxy(Channel delegate, EventLoop eventLoop) {
|
public NettyChannelProxy(Channel delegate, EventLoop eventLoop, NettyChannelInjector injector) {
|
||||||
this.delegate = delegate;
|
this.delegate = delegate;
|
||||||
this.eventLoop = eventLoop;
|
this.eventLoop = eventLoop;
|
||||||
|
this.injector = injector;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -192,13 +196,20 @@ final class NettyChannelProxy implements Channel {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture write(Object msg) {
|
public ChannelFuture write(Object msg) {
|
||||||
return this.delegate.write(msg);
|
return this.write(msg, this.newPromise());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture write(Object msg, ChannelPromise promise) {
|
public ChannelFuture write(Object msg, ChannelPromise promise) {
|
||||||
|
// only need to do our special handling if we are in the event loop
|
||||||
|
if (this.isPacketEventCallNeeded(msg)) {
|
||||||
|
this.processPacketOutbound(msg, packet -> this.delegate.write(packet, promise));
|
||||||
|
return promise;
|
||||||
|
} else {
|
||||||
|
// no special handling needed, just write the packet
|
||||||
return this.delegate.write(msg, promise);
|
return this.delegate.write(msg, promise);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Channel flush() {
|
public Channel flush() {
|
||||||
@ -207,12 +218,19 @@ final class NettyChannelProxy implements Channel {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
|
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
|
||||||
|
// only need to do our special handling if we are in the event loop
|
||||||
|
if (this.isPacketEventCallNeeded(msg)) {
|
||||||
|
this.processPacketOutbound(msg, packet -> this.delegate.writeAndFlush(packet, promise));
|
||||||
|
return promise;
|
||||||
|
} else {
|
||||||
|
// no special handling needed, just write the packet
|
||||||
return this.delegate.writeAndFlush(msg, promise);
|
return this.delegate.writeAndFlush(msg, promise);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture writeAndFlush(Object msg) {
|
public ChannelFuture writeAndFlush(Object msg) {
|
||||||
return this.delegate.writeAndFlush(msg);
|
return this.writeAndFlush(msg, this.newPromise());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -224,4 +242,15 @@ final class NettyChannelProxy implements Channel {
|
|||||||
public int compareTo(@NotNull Channel o) {
|
public int compareTo(@NotNull Channel o) {
|
||||||
return this.delegate.compareTo(o);
|
return this.delegate.compareTo(o);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isPacketEventCallNeeded(Object msg) {
|
||||||
|
return MinecraftReflection.isPacketClass(msg) && !this.injector.wasProcessedBefore(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void processPacketOutbound(Object packet, Consumer<Object> delegateActionHandler) {
|
||||||
|
Runnable action = this.injector.processOutbound(() -> delegateActionHandler.accept(packet), false);
|
||||||
|
if (action != null) {
|
||||||
|
action.run();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -27,6 +27,9 @@ final class InjectionChannelInboundHandler extends ChannelInboundHandlerAdapter
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelActive(ChannelHandlerContext ctx) {
|
public void channelActive(ChannelHandlerContext ctx) {
|
||||||
|
// call the minecraft channelActive method first
|
||||||
|
ctx.fireChannelActive();
|
||||||
|
|
||||||
// the channel is now active, at this point minecraft has eventually prepared everything in the connection
|
// the channel is now active, at this point minecraft has eventually prepared everything in the connection
|
||||||
// of the player so that we can come in and hook as we are after the minecraft handler
|
// of the player so that we can come in and hook as we are after the minecraft handler
|
||||||
try {
|
try {
|
||||||
@ -40,9 +43,6 @@ final class InjectionChannelInboundHandler extends ChannelInboundHandlerAdapter
|
|||||||
|
|
||||||
// remove this handler from the pipeline now to prevent multiple injections
|
// remove this handler from the pipeline now to prevent multiple injections
|
||||||
ctx.channel().pipeline().remove(this);
|
ctx.channel().pipeline().remove(this);
|
||||||
|
|
||||||
// fire it down the pipeline in case someone else needs it
|
|
||||||
ctx.fireChannelActive();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
Reference in New Issue
Block a user