diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/events/PacketEvent.java b/ProtocolLib/src/main/java/com/comphenix/protocol/events/PacketEvent.java
index 86aa1035..69c2e96d 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/events/PacketEvent.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/events/PacketEvent.java
@@ -22,6 +22,8 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.ref.WeakReference;
import java.util.EventObject;
+
+import org.bukkit.Bukkit;
import org.bukkit.entity.Player;
import org.bukkit.event.Cancellable;
@@ -137,6 +139,19 @@ public class PacketEvent extends EventObject implements Cancellable {
return new PacketEvent(event, marker);
}
+ /**
+ * Determine if we are executing the packet event in an asynchronous thread.
+ *
+ * If so, you must synchronize all calls to the Bukkit API.
+ *
+ * Generally, most server packets are executed on the main thread, whereas client packets
+ * are all executed asynchronously.
+ * @return TRUE if we are, FALSE otherwise.
+ */
+ public boolean isAsync() {
+ return !Bukkit.isPrimaryThread();
+ }
+
/**
* Retrieves the packet that will be sent to the player.
* @return Packet to send to the player.
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelInjector.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelInjector.java
index 09c44c51..9d6c7260 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelInjector.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelInjector.java
@@ -118,6 +118,9 @@ class ChannelInjector extends ByteToMessageDecoder {
private ConcurrentMap packetMarker = new MapMaker().weakKeys().makeMap();
private ConcurrentMap markerEvent = new MapMaker().weakKeys().makeMap();
+ // Packets we have processed before
+ private Set processedPackets = Collections.newSetFromMap(new MapMaker().weakKeys().makeMap());
+
// Packets to ignore
private Set ignoredPackets = Collections.newSetFromMap(new MapMaker().weakKeys().makeMap());
@@ -235,32 +238,11 @@ class ChannelInjector extends ByteToMessageDecoder {
ENCODE_BUFFER = FuzzyReflection.getMethodAccessor(vanillaEncoder.getClass(),
"encode", ChannelHandlerContext.class, Object.class, ByteBuf.class);
+ // Intercept sent packets
protocolEncoder = new MessageToByteEncoder() {
@Override
protected void encode(ChannelHandlerContext ctx, Object packet, ByteBuf output) throws Exception {
- try {
- NetworkMarker marker = getMarker(output);
- PacketEvent event = markerEvent.remove(marker);
-
- if (event != null && NetworkMarker.hasOutputHandlers(marker)) {
- ByteBuf packetBuffer = ctx.alloc().buffer();
- ENCODE_BUFFER.invoke(vanillaEncoder, ctx, packet, packetBuffer);
- byte[] data = getBytes(packetBuffer);
-
- for (PacketOutputHandler handler : marker.getOutputHandlers()) {
- handler.handle(event, data);
- }
- // Write the result
- output.writeBytes(data);
- return;
- }
- } catch (Exception e) {
- channelListener.getReporter().reportDetailed(this,
- Report.newBuilder(REPORT_CANNOT_INTERCEPT_SERVER_PACKET).callerParam(packet).error(e).build());
- } finally {
- // Attempt to handle the packet nevertheless
- ENCODE_BUFFER.invoke(vanillaEncoder, ctx, packet, output);
- }
+ ChannelInjector.this.encode(ctx, packet, output);
}
public void exceptionCaught(ChannelHandlerContext channelhandlercontext, Throwable throwable) {
@@ -273,10 +255,16 @@ class ChannelInjector extends ByteToMessageDecoder {
originalChannel.pipeline().addAfter("encoder", "protocol_lib_encoder", protocolEncoder);
// Intercept all write methods
- channelField.setValue(new ChannelProxy(originalChannel) {
+ channelField.setValue(new ChannelProxy(originalChannel, MinecraftReflection.getPacketClass()) {
@Override
- protected Object onMessageWritten(Object message) {
- return channelListener.onPacketSending(ChannelInjector.this, message, packetMarker.get(message));
+ protected Object onMessageScheduled(Object message) {
+ Object result = processSending(message);
+
+ // We have now processed this packet once already
+ if (result != null) {
+ processedPackets.add(result);
+ }
+ return result;
}
});
@@ -285,6 +273,15 @@ class ChannelInjector extends ByteToMessageDecoder {
}
}
+ /**
+ * Process a given message on the packet listeners.
+ * @param message - the message/packet.
+ * @return The resulting message/packet.
+ */
+ private Object processSending(Object message) {
+ return channelListener.onPacketSending(ChannelInjector.this, message, packetMarker.get(message));
+ }
+
/**
* This method patches the encoder so that it skips already created packets.
* @param encoder - the encoder to patch.
@@ -316,6 +313,49 @@ class ChannelInjector extends ByteToMessageDecoder {
}
}
+ /**
+ * Encode a packet to a byte buffer, taking over for the standard Minecraft encoder.
+ * @param ctx - the current context.
+ * @param packet - the packet to encode to a byte array.
+ * @param output - the output byte array.
+ * @throws Exception If anything went wrong.
+ */
+ protected void encode(ChannelHandlerContext ctx, Object packet, ByteBuf output) throws Exception {
+ try {
+ NetworkMarker marker = getMarker(packet);
+ PacketEvent event = markerEvent.remove(marker);
+
+ // Try again, in case this packet was sent directly in the event loop
+ if (event == null && !processedPackets.remove(packet)) {
+ packet = processSending(packet);
+ marker = getMarker(packet);
+ event = markerEvent.remove(marker);
+ }
+
+ // Process output handler
+ if (packet != null && event != null && NetworkMarker.hasOutputHandlers(marker)) {
+ ByteBuf packetBuffer = ctx.alloc().buffer();
+ ENCODE_BUFFER.invoke(vanillaEncoder, ctx, packet, packetBuffer);
+ byte[] data = getBytes(packetBuffer);
+
+ for (PacketOutputHandler handler : marker.getOutputHandlers()) {
+ handler.handle(event, data);
+ }
+ // Write the result
+ output.writeBytes(data);
+ return;
+ }
+ } catch (Exception e) {
+ channelListener.getReporter().reportDetailed(this,
+ Report.newBuilder(REPORT_CANNOT_INTERCEPT_SERVER_PACKET).callerParam(packet).error(e).build());
+ } finally {
+ // Attempt to handle the packet nevertheless
+ if (packet != null) {
+ ENCODE_BUFFER.invoke(vanillaEncoder, ctx, packet, output);
+ }
+ }
+ }
+
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuffer, List packets) throws Exception {
try {
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelProxy.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelProxy.java
index e2244e55..892a1c04 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelProxy.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/ChannelProxy.java
@@ -1,8 +1,13 @@
package com.comphenix.protocol.injector.netty;
-import java.lang.reflect.Constructor;
import java.net.SocketAddress;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import com.comphenix.protocol.reflect.FuzzyReflection;
+import com.comphenix.protocol.reflect.FuzzyReflection.FieldAccessor;
+
+import net.minecraft.util.com.google.common.collect.Maps;
import net.minecraft.util.io.netty.buffer.ByteBufAllocator;
import net.minecraft.util.io.netty.channel.Channel;
import net.minecraft.util.io.netty.channel.ChannelConfig;
@@ -14,48 +19,35 @@ import net.minecraft.util.io.netty.channel.ChannelPromise;
import net.minecraft.util.io.netty.channel.EventLoop;
import net.minecraft.util.io.netty.util.Attribute;
import net.minecraft.util.io.netty.util.AttributeKey;
-import net.minecraft.util.io.netty.util.concurrent.EventExecutor;
abstract class ChannelProxy implements Channel {
- private static Constructor extends ChannelFuture> FUTURE_CONSTRUCTOR;
+ // Mark that a certain object does not contain a message field
+ private static final FieldAccessor MARK_NO_MESSAGE = new FieldAccessor() {
+ public void set(Object instance, Object value) { }
+ public Object get(Object instance) { return null; }
+ };
+
+ // Looking up packets in inner classes
+ private static Map, FieldAccessor> MESSAGE_LOOKUP = Maps.newConcurrentMap();
// The underlying channel
private Channel delegate;
-
- public ChannelProxy(Channel delegate) {
- this.delegate = delegate;
- }
-
- /**
- * Invoked when a packet is being transmitted.
- * @param message - the packet to transmit.
- * @return The object to transmit.
- */
- protected abstract Object onMessageWritten(Object message);
+ private Class> messageClass;
- /**
- * The future we return when packets are being cancelled.
- * @return A succeeded future.
- */
- protected ChannelFuture getSucceededFuture() {
- try {
- if (FUTURE_CONSTRUCTOR == null) {
- @SuppressWarnings("unchecked")
- Class extends ChannelFuture> succededFuture =
- (Class extends ChannelFuture>) ChannelProxy.class.getClassLoader().
- loadClass("net.minecraft.util.io.netty.channel.SucceededChannelFuture");
-
- FUTURE_CONSTRUCTOR = succededFuture.getDeclaredConstructor(Channel.class, EventExecutor.class);
- FUTURE_CONSTRUCTOR.setAccessible(true);
- }
- return FUTURE_CONSTRUCTOR.newInstance(this, null);
-
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Cannot get succeeded future.", e);
- } catch (Exception e) {
- throw new RuntimeException("Cannot construct completed future.", e);
- }
+ // Event loop proxy
+ private transient EventLoopProxy loopProxy;
+
+ public ChannelProxy(Channel delegate, Class> messageClass) {
+ this.delegate = delegate;
+ this.messageClass = messageClass;
}
+
+ /**
+ * Invoked when a packet is scheduled for transmission in the event loop.
+ * @param message - the packet to schedule.
+ * @return The object to transmit, or NULL to cancel.
+ */
+ protected abstract Object onMessageScheduled(Object message);
public Attribute attr(AttributeKey paramAttributeKey) {
return delegate.attr(paramAttributeKey);
@@ -82,7 +74,66 @@ abstract class ChannelProxy implements Channel {
}
public EventLoop eventLoop() {
- return delegate.eventLoop();
+ if (loopProxy == null) {
+ loopProxy = new EventLoopProxy() {
+ @Override
+ protected EventLoop getDelegate() {
+ return delegate.eventLoop();
+ }
+
+ @Override
+ protected Runnable schedulingRunnable(Runnable runnable) {
+ FieldAccessor accessor = getMessageAccessor(runnable);
+
+ if (accessor != null) {
+ Object packet = onMessageScheduled(accessor.get(runnable));
+
+ if (packet != null)
+ accessor.set(runnable, packet);
+ else
+ getEmptyRunnable();
+ }
+ return runnable;
+ }
+
+ @Override
+ protected Callable schedulingCallable(Callable callable) {
+ FieldAccessor accessor = getMessageAccessor(callable);
+
+ if (accessor != null) {
+ Object packet = onMessageScheduled(accessor.get(callable));
+
+ if (packet != null)
+ accessor.set(callable, packet);
+ else
+ getEmptyCallable();
+ }
+ return callable;
+ }
+ };
+ }
+ return loopProxy;
+ }
+
+ /**
+ * Retrieve a way to access the packet field of an object.
+ * @param value - the object.
+ * @return The packet field accessor, or NULL if not found.
+ */
+ private FieldAccessor getMessageAccessor(Object value) {
+ Class> clazz = value.getClass();
+ FieldAccessor accessor = MESSAGE_LOOKUP.get(clazz);
+
+ if (accessor == null) {
+ try {
+ accessor = FuzzyReflection.getFieldAccessor(clazz, messageClass, true);
+ } catch (IllegalArgumentException e) {
+ accessor = MARK_NO_MESSAGE;
+ }
+ // Save the result
+ MESSAGE_LOOKUP.put(clazz, accessor);
+ }
+ return accessor != MARK_NO_MESSAGE ? accessor : null;
}
public ChannelFuture connect(SocketAddress paramSocketAddress1,
@@ -198,37 +249,21 @@ abstract class ChannelProxy implements Channel {
public ChannelFuture deregister(ChannelPromise paramChannelPromise) {
return delegate.deregister(paramChannelPromise);
}
-
- public ChannelFuture write(Object message) {
- Object result = onMessageWritten(message);
-
- if (result != null)
- return delegate.write(result);
- return getSucceededFuture();
+
+ public ChannelFuture write(Object paramObject) {
+ return delegate.write(paramObject);
}
- public ChannelFuture write(Object message, ChannelPromise paramChannelPromise) {
- Object result = onMessageWritten(message);
-
- if (result != null)
- return delegate.write(message, paramChannelPromise);
- return getSucceededFuture();
+ public ChannelFuture write(Object paramObject, ChannelPromise paramChannelPromise) {
+ return delegate.write(paramObject, paramChannelPromise);
}
- public ChannelFuture writeAndFlush(Object message, ChannelPromise paramChannelPromise) {
- Object result = onMessageWritten(message);
-
- if (result != null)
- return delegate.writeAndFlush(message, paramChannelPromise);
- return getSucceededFuture();
+ public ChannelFuture writeAndFlush(Object paramObject, ChannelPromise paramChannelPromise) {
+ return delegate.writeAndFlush(paramObject, paramChannelPromise);
}
- public ChannelFuture writeAndFlush(Object message) {
- Object result = onMessageWritten(message);
-
- if (result != null)
- return delegate.writeAndFlush(message);
- return getSucceededFuture();
+ public ChannelFuture writeAndFlush(Object paramObject) {
+ return delegate.writeAndFlush(paramObject);
}
public int compareTo(Channel o) {
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/EventLoopProxy.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/EventLoopProxy.java
new file mode 100644
index 00000000..627a6c09
--- /dev/null
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/EventLoopProxy.java
@@ -0,0 +1,210 @@
+package com.comphenix.protocol.injector.netty;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import net.minecraft.util.io.netty.channel.Channel;
+import net.minecraft.util.io.netty.channel.ChannelFuture;
+import net.minecraft.util.io.netty.channel.ChannelPromise;
+import net.minecraft.util.io.netty.channel.EventLoop;
+import net.minecraft.util.io.netty.channel.EventLoopGroup;
+import net.minecraft.util.io.netty.util.concurrent.EventExecutor;
+import net.minecraft.util.io.netty.util.concurrent.ProgressivePromise;
+import net.minecraft.util.io.netty.util.concurrent.Promise;
+import net.minecraft.util.io.netty.util.concurrent.ScheduledFuture;
+
+/**
+ * An event loop proxy.
+ * @author Kristian.
+ */
+abstract class EventLoopProxy implements EventLoop {
+ private static final Runnable EMPTY_RUNNABLE = new Runnable() {
+ public void run() {
+ // Do nothing
+ }
+ };
+ private static final Callable> EMPTY_CALLABLE = new Callable() {
+ public Object call() throws Exception {
+ return null;
+ };
+ };
+
+ /**
+ * Retrieve the underlying event loop.
+ * @return The event loop.
+ */
+ protected abstract EventLoop getDelegate();
+
+ /**
+ * Retrieve a callable that does nothing but return NULL.
+ * @return The empty callable.
+ */
+ @SuppressWarnings("unchecked")
+ public static Callable getEmptyCallable() {
+ return (Callable) EMPTY_CALLABLE;
+ }
+
+ /**
+ * Retrieve a runnable that does nothing.
+ * @return A NO-OP runnable.
+ */
+ public static Runnable getEmptyRunnable() {
+ return EMPTY_RUNNABLE;
+ }
+
+ /**
+ * Invoked when a runnable is being scheduled.
+ * @param runnable - the runnable that is scheduling.
+ * @return The runnable to schedule instead. Cannot be NULL.
+ */
+ protected abstract Runnable schedulingRunnable(Runnable runnable);
+
+ /**
+ * Invoked when a callable is being scheduled.
+ * @param runnable - the callable that is scheduling.
+ * @return The callable to schedule instead. Cannot be NULL.
+ */
+ protected abstract Callable schedulingCallable(Callable callable);
+
+ public void execute(Runnable command) {
+ getDelegate().execute(schedulingRunnable(command));
+ }
+
+ public net.minecraft.util.io.netty.util.concurrent.Future submit(Callable action) {
+ return getDelegate().submit(schedulingCallable(action));
+ }
+
+ public net.minecraft.util.io.netty.util.concurrent.Future submit(Runnable action, T arg1) {
+ return getDelegate().submit(schedulingRunnable(action), arg1);
+ }
+
+ public net.minecraft.util.io.netty.util.concurrent.Future> submit(Runnable action) {
+ return getDelegate().submit(schedulingRunnable(action));
+ }
+
+ public ScheduledFuture schedule(Callable action, long arg1, TimeUnit arg2) {
+ return getDelegate().schedule(schedulingCallable(action), arg1, arg2);
+ }
+
+ public ScheduledFuture> schedule(Runnable action, long arg1, TimeUnit arg2) {
+ return getDelegate().schedule(schedulingRunnable(action), arg1, arg2);
+ }
+
+ public ScheduledFuture> scheduleAtFixedRate(Runnable action, long arg1, long arg2, TimeUnit arg3) {
+ return getDelegate().scheduleAtFixedRate(schedulingRunnable(action), arg1, arg2, arg3);
+ }
+
+ public ScheduledFuture> scheduleWithFixedDelay(Runnable action, long arg1, long arg2, TimeUnit arg3) {
+ return getDelegate().scheduleWithFixedDelay(schedulingRunnable(action), arg1, arg2, arg3);
+ }
+
+ // Boiler plate:
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return getDelegate().awaitTermination(timeout, unit);
+ }
+
+ public boolean inEventLoop() {
+ return getDelegate().inEventLoop();
+ }
+
+ public boolean inEventLoop(Thread arg0) {
+ return getDelegate().inEventLoop(arg0);
+ }
+
+ public boolean isShutdown() {
+ return getDelegate().isShutdown();
+ }
+
+ public boolean isTerminated() {
+ return getDelegate().isTerminated();
+ }
+
+ public List> invokeAll(Collection extends Callable> tasks)
+ throws InterruptedException {
+ return getDelegate().invokeAll(tasks);
+ }
+
+ public List> invokeAll(Collection extends Callable> tasks, long timeout,
+ TimeUnit unit) throws InterruptedException {
+ return getDelegate().invokeAll(tasks, timeout, unit);
+ }
+
+ public T invokeAny(Collection extends Callable> tasks) throws InterruptedException,
+ ExecutionException {
+ return getDelegate().invokeAny(tasks);
+ }
+
+ public T invokeAny(Collection extends Callable> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return getDelegate().invokeAny(tasks, timeout, unit);
+ }
+
+ public boolean isShuttingDown() {
+ return getDelegate().isShuttingDown();
+ }
+
+ public Iterator iterator() {
+ return getDelegate().iterator();
+ }
+
+ public net.minecraft.util.io.netty.util.concurrent.Future newFailedFuture(Throwable arg0) {
+ return getDelegate().newFailedFuture(arg0);
+ }
+
+ @Override
+ public EventLoop next() {
+ return getDelegate().next();
+ }
+
+ public ProgressivePromise newProgressivePromise() {
+ return getDelegate().newProgressivePromise();
+ }
+
+ public Promise newPromise() {
+ return getDelegate().newPromise();
+ }
+
+ public net.minecraft.util.io.netty.util.concurrent.Future newSucceededFuture(V arg0) {
+ return getDelegate().newSucceededFuture(arg0);
+ }
+
+ public EventLoopGroup parent() {
+ return getDelegate().parent();
+ }
+
+ public ChannelFuture register(Channel arg0, ChannelPromise arg1) {
+ return getDelegate().register(arg0, arg1);
+ }
+
+ public ChannelFuture register(Channel arg0) {
+ return getDelegate().register(arg0);
+ }
+
+ public net.minecraft.util.io.netty.util.concurrent.Future> shutdownGracefully() {
+ return getDelegate().shutdownGracefully();
+ }
+
+ public net.minecraft.util.io.netty.util.concurrent.Future> shutdownGracefully(long arg0, long arg1, TimeUnit arg2) {
+ return getDelegate().shutdownGracefully(arg0, arg1, arg2);
+ }
+
+ public net.minecraft.util.io.netty.util.concurrent.Future> terminationFuture() {
+ return getDelegate().terminationFuture();
+ }
+
+ @Deprecated
+ public void shutdown() {
+ getDelegate().shutdown();
+ }
+
+ @Deprecated
+ public List shutdownNow() {
+ return getDelegate().shutdownNow();
+ }
+}
diff --git a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/NettyProtocolInjector.java b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/NettyProtocolInjector.java
index a7f18d45..d8274e21 100644
--- a/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/NettyProtocolInjector.java
+++ b/ProtocolLib/src/main/java/com/comphenix/protocol/injector/netty/NettyProtocolInjector.java
@@ -175,6 +175,10 @@ public class NettyProtocolInjector implements ChannelListener {
public Object onPacketSending(ChannelInjector injector, Object packet, NetworkMarker marker) {
Class> clazz = packet.getClass();
+ if (!Bukkit.isPrimaryThread()) {
+ System.out.println("FUCK ME: " + packet);
+ }
+
if (queuedFilters.contains(clazz)) {
// Check for ignored packets
if (injector.unignorePacket(packet)) {