diff --git a/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyChannelInjector.java b/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyChannelInjector.java index a1b21c0d..125bb00e 100644 --- a/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyChannelInjector.java +++ b/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyChannelInjector.java @@ -2,8 +2,6 @@ package com.comphenix.protocol.injector.netty.channel; import java.lang.reflect.Field; import java.lang.reflect.Modifier; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Map; @@ -11,9 +9,9 @@ import java.util.Map.Entry; import java.util.NoSuchElementException; import java.util.Set; import java.util.WeakHashMap; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; import com.comphenix.protocol.PacketType; import com.comphenix.protocol.PacketType.Protocol; @@ -101,9 +99,10 @@ public class NettyChannelInjector implements Injector { private final FieldAccessor channelField; - private final Set skippedPackets = Collections.synchronizedSet(new HashSet<>()); - private final Collection processedPackets = Collections.synchronizedList(new ArrayList<>()); + // packet marking private final Map savedMarkers = new WeakHashMap<>(16, 0.9f); + private final Set skippedPackets = Collections.synchronizedSet(new HashSet<>()); + protected final ThreadLocal processedPackets = ThreadLocal.withInitial(() -> Boolean.FALSE); // status of this injector private volatile boolean closed = false; @@ -467,15 +466,15 @@ public class NettyChannelInjector implements Injector { } // the field is not correct, rewrite now to our handler - Channel ch = new NettyChannelProxy(this.wrappedChannel, new NettyEventLoopProxy(this.wrappedChannel.eventLoop()) { + Channel ch = new NettyChannelProxy(this.wrappedChannel, new NettyEventLoopProxy(this.wrappedChannel.eventLoop(), this) { @Override - protected Runnable proxyRunnable(Runnable original) { - return NettyChannelInjector.this.processOutbound(original, true); + protected Runnable doProxyRunnable(Runnable original) { + return NettyChannelInjector.this.processOutbound(original); } @Override - protected Callable proxyCallable(Callable original) { - return NettyChannelInjector.this.processOutbound(original, true); + protected Callable doProxyCallable(Callable original) { + return NettyChannelInjector.this.processOutbound(original); } }, this); this.channelField.set(this.networkManager, ch); @@ -524,7 +523,7 @@ public class NettyChannelInjector implements Injector { } } - T processOutbound(T action, boolean markSeen) { + T processOutbound(T action) { // get the accessor to the packet field // if we are unable to look up the accessor then just return the runnable, probably nothing of our business FieldAccessor packetAccessor = this.lookupPacketAccessor(action); @@ -539,21 +538,23 @@ public class NettyChannelInjector implements Injector { } // filter out all packets which were explicitly send to not be processed by any event - NetworkMarker marker = this.savedMarkers.remove(packet); - if (this.skippedPackets.remove(packet)) { + // pre-checking isEmpty will reduce the need of hashing packets which don't override the + // hashCode method; this presents calls to the very slow identityHashCode default implementation + NetworkMarker marker = this.savedMarkers.isEmpty() ? null : this.savedMarkers.remove(packet); + if (!this.skippedPackets.isEmpty() && this.skippedPackets.remove(packet)) { // if a marker was set there might be scheduled packets to execute after the packet send // for this to work we need to proxy the input action to provide access to them if (marker != null) { - return this.markProcessed(packet, this.proxyAction(action, null, marker), markSeen); + return this.proxyAction(action, null, marker); } // nothing special, just no processing - return this.markProcessed(packet, action, markSeen); + return action; } // no listener and no marker - no magic :) if (!this.channelListener.hasListener(packet.getClass()) && marker == null) { - return this.markProcessed(packet, action, markSeen); + return action; } // ensure that we are on the main thread if we need to @@ -569,7 +570,7 @@ public class NettyChannelInjector implements Injector { // null indicates that no listener was affected by the packet, meaning that we can directly send the original packet PacketEvent event = this.channelListener.onPacketSending(this, packet, marker); if (event == null) { - return this.markProcessed(packet, action, markSeen); + return action; } // if the event wasn't cancelled by this action we must recheck if the packet changed during the method call @@ -585,11 +586,11 @@ public class NettyChannelInjector implements Injector { // if the marker is null we can just schedule the action as we don't need to do anything after the packet was sent NetworkMarker eventMarker = NetworkMarker.getNetworkMarker(event); if (eventMarker == null) { - return this.markProcessed(interceptedPacket, action, markSeen); + return action; } // we need to wrap the action to call the listeners set in the marker - return this.markProcessed(interceptedPacket, this.proxyAction(action, event, eventMarker), markSeen); + return this.proxyAction(action, event, eventMarker); } // return null if the event was cancelled to schedule a no-op event @@ -603,12 +604,18 @@ public class NettyChannelInjector implements Injector { if (action instanceof Runnable) { // easier thing to do - just wrap the runnable in a new one return (T) (Runnable) () -> { + // notify the outbound handler that the packets are processed + this.processedPackets.set(Boolean.TRUE); + // execute the action & invoke the post event ((Runnable) action).run(); this.networkProcessor.invokePostEvent(event, marker); }; } else if (action instanceof Callable) { // okay this is a bit harder now - we need to wrap the action and return the value of it return (T) (Callable) () -> { + // notify the outbound handler that the packets are processed + this.processedPackets.set(Boolean.TRUE); + // execute the action & invoke the post event Object value = ((Callable) action).call(); this.networkProcessor.invokePostEvent(event, marker); return value; @@ -618,20 +625,6 @@ public class NettyChannelInjector implements Injector { } } - private T markProcessed(Object packet, T actualAction, boolean shouldMarkPackets) { - if (shouldMarkPackets) { - // tiny hack to prevent duplicate packet processing, on main thread and async - this.processedPackets.add(packet); - } - - // return the requested action - return actualAction; - } - - boolean wasProcessedBefore(Object packet) { - return this.processedPackets.remove(packet); - } - private FieldAccessor lookupPacketAccessor(Object action) { return PACKET_ACCESSORS.computeIfAbsent(action.getClass(), clazz -> { try { diff --git a/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyChannelProxy.java b/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyChannelProxy.java index 668e7267..86d03511 100644 --- a/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyChannelProxy.java +++ b/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyChannelProxy.java @@ -244,11 +244,26 @@ final class NettyChannelProxy implements Channel { } private boolean isPacketEventCallNeeded(Object msg) { - return MinecraftReflection.isPacketClass(msg) && !this.injector.wasProcessedBefore(msg); + if (MinecraftReflection.isPacketClass(msg)) { + // check if any packet was marked as processed before during the current execution + // then reset the thread local as there will always be only one packet per write op (if needed) + Boolean hasProcessedPacket = this.injector.processedPackets.get(); + if (hasProcessedPacket == Boolean.TRUE) { + // at least one packet was processed before, so no need for us to process as well + this.injector.processedPackets.set(Boolean.FALSE); + return false; + } else { + // no packet was processed in the current context, we need to process + return true; + } + } else { + // not a packet, just ignore + return false; + } } private void processPacketOutbound(Object packet, Consumer delegateActionHandler) { - Runnable action = this.injector.processOutbound(() -> delegateActionHandler.accept(packet), false); + Runnable action = this.injector.processOutbound(() -> delegateActionHandler.accept(packet)); if (action != null) { action.run(); } diff --git a/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyEventLoopProxy.java b/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyEventLoopProxy.java index 1e5e33fa..a894e983 100644 --- a/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyEventLoopProxy.java +++ b/src/main/java/com/comphenix/protocol/injector/netty/channel/NettyEventLoopProxy.java @@ -31,9 +31,41 @@ abstract class NettyEventLoopProxy implements EventLoop { }; private final EventLoop delegate; + private final NettyChannelInjector injector; - public NettyEventLoopProxy(EventLoop delegate) { + public NettyEventLoopProxy(EventLoop delegate, NettyChannelInjector injector) { this.delegate = delegate; + this.injector = injector; + } + + private Runnable proxyRunnable(Runnable original) { + // execute the proxy and check if we need to do anything + Runnable proxied = this.doProxyRunnable(original); + if (proxied != null && proxied == original) { + // was not changed, we need to mark the packet as processed manually + return () -> { + this.injector.processedPackets.set(Boolean.TRUE); + original.run(); + }; + } else { + // either the action was not executed, or the proxy will set the packet as processes + return proxied; + } + } + + private Callable proxyCallable(Callable original) { + // execute the proxy and check if we need to do anything + Callable proxied = this.doProxyCallable(original); + if (proxied != null && proxied == original) { + // was not changed, we need to mark the packet as processed manually + return () -> { + this.injector.processedPackets.set(Boolean.TRUE); + return proxied.call(); + }; + } else { + // either the action was not executed, or the proxy will set the packet as processes + return proxied; + } } /** @@ -43,7 +75,7 @@ abstract class NettyEventLoopProxy implements EventLoop { * @param original the runnable to proxy. * @return the runnable to execute instead, null to execute no action. */ - protected abstract Runnable proxyRunnable(Runnable original); + protected abstract Runnable doProxyRunnable(Runnable original); /** * Proxies the given callable. The returned callable will be executed instead of the original. If this method returns @@ -54,7 +86,7 @@ abstract class NettyEventLoopProxy implements EventLoop { * @param the return type of the original callable. * @return the callable to execute instead of the original, null to use a no-op callable instead. */ - protected abstract Callable proxyCallable(Callable original); + protected abstract Callable doProxyCallable(Callable original); @Override public EventLoopGroup parent() {