diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketFilterManager.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketFilterManager.java index e1b2dc1c..522069a1 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketFilterManager.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/PacketFilterManager.java @@ -252,7 +252,7 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok // Use the correct injection type if (MinecraftReflection.isUsingNetty()) { - this.nettyInjector = new NettyProtocolInjector(this, reporter); + this.nettyInjector = new NettyProtocolInjector(builder.getLibrary(), this, reporter); this.playerInjection = nettyInjector.getPlayerInjector(); this.packetInjector = nettyInjector.getPacketInjector(); diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelInjector.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelInjector.java index 32062b61..64e2b935 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelInjector.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelInjector.java @@ -3,10 +3,8 @@ package com.comphenix.protocol.injector.netty; import java.lang.reflect.InvocationTargetException; import java.net.Socket; import java.net.SocketAddress; -import java.util.Collections; import java.util.List; import java.util.Map.Entry; -import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; @@ -22,6 +20,7 @@ import net.minecraft.util.io.netty.util.concurrent.GenericFutureListener; import net.minecraft.util.io.netty.util.internal.TypeParameterMatcher; import net.sf.cglib.proxy.Factory; +import org.bukkit.Bukkit; import org.bukkit.entity.Player; import com.comphenix.protocol.PacketType; @@ -85,10 +84,22 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { private ConcurrentMap markerEvent = new MapMaker().weakKeys().makeMap(); // Packets we have processed before - private Set processedPackets = Collections.newSetFromMap(new MapMaker().weakKeys().makeMap()); + //private Set processedPackets = Collections.newSetFromMap(new MapMaker().weakKeys().makeMap()); // Packets to ignore - private Set ignoredPackets = Collections.newSetFromMap(new MapMaker().weakKeys().makeMap()); + //private Set ignoredPackets = Collections.newSetFromMap(new MapMaker().weakKeys().makeMap()); + + /** + * Indicate that this packet will be ignored. + *

+ * This must never be set outside the channel pipeline's thread. + */ + private boolean processPackets = true; + + /** + * A flag set by the main thread to indiciate that a packet should not be processed. + */ + private boolean scheduleProcessPackets = true; // Other handlers private ByteToMessageDecoder vanillaDecoder; @@ -174,14 +185,50 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { // Intercept all write methods channelField.setValue(new ChannelProxy(originalChannel, MinecraftReflection.getPacketClass()) { @Override - protected Object onMessageScheduled(Object message) { - Object result = processSending(message); - - // We have now processed this packet once already - if (result != null) { - processedPackets.add(result); + protected Callable onMessageScheduled(final Callable callable, FieldAccessor packetAccessor) { + if (handleScheduled(callable, packetAccessor)) { + return new Callable() { + @Override + public T call() throws Exception { + T result = null; + + // This field must only be updated in the pipeline thread + processPackets = false; + result = callable.call(); + processPackets = true; + return result; + } + }; } - return result; + return null; + } + + @Override + protected Runnable onMessageScheduled(final Runnable runnable, FieldAccessor packetAccessor) { + if (handleScheduled(runnable, packetAccessor)) { + return new Runnable() { + @Override + public void run() { + processPackets = false; + runnable.run(); + processPackets = true; + } + }; + } + return null; + } + + protected boolean handleScheduled(Object instance, FieldAccessor accessor) { + Object original = accessor.get(instance); + Object result = scheduleProcessPackets ? processSending(original) : original; + + if (result != null) { + // Change packet to be scheduled + if (original != result) + accessor.set(instance, result); + return true; + } + return false; } }); @@ -196,7 +243,16 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { * @return The resulting message/packet. */ private Object processSending(Object message) { - return channelListener.onPacketSending(ChannelInjector.this, message, packetMarker.get(message)); + return processSending(message, packetMarker.get(message)); + } + + /** + * Process a given message on the packet listeners. + * @param message - the message/packet. + * @return The resulting message/packet. + */ + private Object processSending(Object message, NetworkMarker marker) { + return channelListener.onPacketSending(ChannelInjector.this, message, marker); } /** @@ -226,11 +282,11 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { */ protected void encode(ChannelHandlerContext ctx, Object packet, ByteBuf output) throws Exception { try { - NetworkMarker marker = getMarker(packet); + NetworkMarker marker = getMarker(packet);; PacketEvent event = markerEvent.remove(marker); - // Try again, in case this packet was sent directly in the event loop - if (event == null && !processedPackets.remove(packet)) { + // This packet has not been seen by the main thread + if (processPackets) { Class clazz = packet.getClass(); // Schedule the transmission on the main thread instead @@ -240,7 +296,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { packet = null; } else { - packet = processSending(packet); + packet = processSending(packet, marker); marker = getMarker(packet); event = markerEvent.remove(marker); } @@ -271,10 +327,8 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { } private void scheduleMainThread(final Object packetCopy) { - // Do not process this packet agai - processedPackets.add(packetCopy); - - ProtocolLibrary.getExecutorSync().execute(new Runnable() { + // Don't use BukkitExecutors for this - it has a bit of overhead + Bukkit.getScheduler().scheduleSyncDelayedTask(factory.getPlugin(), new Runnable() { @Override public void run() { invokeSendPacket(packetCopy); @@ -386,15 +440,10 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { @Override public void sendServerPacket(Object packet, NetworkMarker marker, boolean filtered) { saveMarker(packet, marker); - processedPackets.remove(packet); - // Record if this packet should be ignored by most listeners - if (!filtered) { - ignoredPackets.add(packet); - } else { - ignoredPackets.remove(packet); - } + scheduleProcessPackets = filtered; invokeSendPacket(packet); + scheduleProcessPackets = true; } /** @@ -415,20 +464,31 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { } @Override - public void recieveClientPacket(Object packet, NetworkMarker marker, boolean filtered) { - saveMarker(packet, marker); - processedPackets.remove(packet); - - if (!filtered) { - ignoredPackets.add(packet); - } else { - ignoredPackets.remove(packet); - } + public void recieveClientPacket(final Object packet, final NetworkMarker marker, final boolean filtered) { + // Execute this in the channel thread + Runnable action = new Runnable() { + @Override + public void run() { + Object result = filtered ? channelListener.onPacketReceiving(ChannelInjector.this, packet, marker) : packet; + + // See if the packet has been cancelled + if (result == null) + return; + + try { + MinecraftMethods.getNetworkManagerReadPacketMethod().invoke(networkManager, null, result); + } catch (Exception e) { + // Inform the user + ProtocolLibrary.getErrorReporter().reportMinimal(factory.getPlugin(), "recieveClientPacket", e); + } + } + }; - try { - MinecraftMethods.getNetworkManagerReadPacketMethod().invoke(networkManager, null, packet); - } catch (Exception e) { - throw new IllegalArgumentException("Unable to receive client packet " + packet, e); + // Execute in the worker thread + if (originalChannel.eventLoop().inEventLoop()) { + action.run(); + } else { + originalChannel.eventLoop().execute(action); } } @@ -454,12 +514,14 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector { @Override public boolean unignorePacket(Object packet) { - return ignoredPackets.remove(packet); + // NOP + return false; } @Override public boolean ignorePacket(Object packet) { - return ignoredPackets.add(packet); + // NOP + return false; } @Override diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelProxy.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelProxy.java index 4efe126d..250a68ae 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelProxy.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelProxy.java @@ -28,7 +28,7 @@ abstract class ChannelProxy implements Channel { public Object get(Object instance) { return null; } public Field getField() { return null; }; }; - + // Looking up packets in inner classes private static Map, FieldAccessor> MESSAGE_LOOKUP = Maps.newConcurrentMap(); @@ -46,10 +46,19 @@ abstract class ChannelProxy implements Channel { /** * Invoked when a packet is scheduled for transmission in the event loop. - * @param message - the packet to schedule. - * @return The object to transmit, or NULL to cancel. + * @param callable - callable to schedule for execution. + * @param packetAccessor - accessor for modifying the packet in the callable. + * @return The callable that will be scheduled, or NULL to cancel. */ - protected abstract Object onMessageScheduled(Object message); + protected abstract Callable onMessageScheduled(Callable callable, FieldAccessor packetAccessor); + + /** + * Invoked when a packet is scheduled for transmission in the event loop. + * @param runnable - the runnable that contains a packet to be scheduled. + * @param packetAccessor - accessor for modifying the packet in the runnable. + * @return The runnable that will be scheduled, or NULL to cancel. + */ + protected abstract Runnable onMessageScheduled(Runnable runnable, FieldAccessor packetAccessor); public Attribute attr(AttributeKey paramAttributeKey) { return delegate.attr(paramAttributeKey); @@ -84,16 +93,12 @@ abstract class ChannelProxy implements Channel { } @Override - protected Runnable schedulingRunnable(Runnable runnable) { - FieldAccessor accessor = getMessageAccessor(runnable); + protected Runnable schedulingRunnable(final Runnable runnable) { + final FieldAccessor accessor = getMessageAccessor(runnable); if (accessor != null) { - Object packet = onMessageScheduled(accessor.get(runnable)); - - if (packet != null) - accessor.set(runnable, packet); - else - return getEmptyRunnable(); + Runnable result = onMessageScheduled(runnable, accessor);; + return result != null ? result : getEmptyRunnable(); } return runnable; } @@ -103,12 +108,8 @@ abstract class ChannelProxy implements Channel { FieldAccessor accessor = getMessageAccessor(callable); if (accessor != null) { - Object packet = onMessageScheduled(accessor.get(callable)); - - if (packet != null) - accessor.set(callable, packet); - else - return getEmptyCallable(); + Callable result = onMessageScheduled(callable, accessor);; + return result != null ? result : EventLoopProxy.getEmptyCallable(); } return callable; } diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/InjectionFactory.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/InjectionFactory.java index 7f5da749..9f25f9e6 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/InjectionFactory.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/InjectionFactory.java @@ -6,6 +6,7 @@ import javax.annotation.Nonnull; import net.minecraft.util.io.netty.channel.Channel; import org.bukkit.Bukkit; import org.bukkit.entity.Player; +import org.bukkit.plugin.Plugin; import com.comphenix.protocol.injector.netty.ChannelInjector.ChannelSocketInjector; import com.comphenix.protocol.injector.server.SocketInjector; @@ -29,6 +30,21 @@ class InjectionFactory { // Whether or not the factory is closed private volatile boolean closed; + // The current plugin + private final Plugin plugin; + + public InjectionFactory(Plugin plugin) { + this.plugin = plugin; + } + + /** + * Retrieve the main plugin associated with this injection factory. + * @return The main plugin. + */ + public Plugin getPlugin() { + return plugin; + } + /** * Construct or retrieve a channel injector from an existing Bukkit player. * @param player - the existing Bukkit player. diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/NettyProtocolInjector.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/NettyProtocolInjector.java index b211e994..b39bf798 100644 --- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/NettyProtocolInjector.java +++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/NettyProtocolInjector.java @@ -8,6 +8,7 @@ import java.net.InetSocketAddress; import java.util.List; import java.util.Set; import org.bukkit.entity.Player; +import org.bukkit.plugin.Plugin; import net.minecraft.util.io.netty.channel.Channel; import net.minecraft.util.io.netty.channel.ChannelFuture; @@ -47,7 +48,7 @@ public class NettyProtocolInjector implements ChannelListener { private List bootstrapFields = Lists.newArrayList(); // The channel injector factory - private InjectionFactory injectionFactory = new InjectionFactory(); + private InjectionFactory injectionFactory; // List of network managers private volatile List networkManagers; @@ -67,7 +68,8 @@ public class NettyProtocolInjector implements ChannelListener { private ErrorReporter reporter; private boolean debug; - public NettyProtocolInjector(ListenerInvoker invoker, ErrorReporter reporter) { + public NettyProtocolInjector(Plugin plugin, ListenerInvoker invoker, ErrorReporter reporter) { + this.injectionFactory = new InjectionFactory(plugin); this.invoker = invoker; this.reporter = reporter; }