Intercept server packets when they're scheduled in the main thread.

Note that some server packets, such as LOGIN and STATUS, will still be 
run asynchronously.
This commit is contained in:
Kristian S. Stangeland 2013-12-06 01:21:35 +01:00
parent 29d71e05e4
commit 6c3a3e0ba2
5 changed files with 392 additions and 88 deletions

View File

@ -22,6 +22,8 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.ref.WeakReference;
import java.util.EventObject;
import org.bukkit.Bukkit;
import org.bukkit.entity.Player;
import org.bukkit.event.Cancellable;
@ -137,6 +139,19 @@ public class PacketEvent extends EventObject implements Cancellable {
return new PacketEvent(event, marker);
}
/**
* Determine if we are executing the packet event in an asynchronous thread.
* <p>
* If so, you must synchronize all calls to the Bukkit API.
* <p>
* Generally, most server packets are executed on the main thread, whereas client packets
* are all executed asynchronously.
* @return TRUE if we are, FALSE otherwise.
*/
public boolean isAsync() {
return !Bukkit.isPrimaryThread();
}
/**
* Retrieves the packet that will be sent to the player.
* @return Packet to send to the player.

View File

@ -118,6 +118,9 @@ class ChannelInjector extends ByteToMessageDecoder {
private ConcurrentMap<Object, NetworkMarker> packetMarker = new MapMaker().weakKeys().makeMap();
private ConcurrentMap<NetworkMarker, PacketEvent> markerEvent = new MapMaker().weakKeys().makeMap();
// Packets we have processed before
private Set<Object> processedPackets = Collections.newSetFromMap(new MapMaker().weakKeys().<Object, Boolean>makeMap());
// Packets to ignore
private Set<Object> ignoredPackets = Collections.newSetFromMap(new MapMaker().weakKeys().<Object, Boolean>makeMap());
@ -235,32 +238,11 @@ class ChannelInjector extends ByteToMessageDecoder {
ENCODE_BUFFER = FuzzyReflection.getMethodAccessor(vanillaEncoder.getClass(),
"encode", ChannelHandlerContext.class, Object.class, ByteBuf.class);
// Intercept sent packets
protocolEncoder = new MessageToByteEncoder<Object>() {
@Override
protected void encode(ChannelHandlerContext ctx, Object packet, ByteBuf output) throws Exception {
try {
NetworkMarker marker = getMarker(output);
PacketEvent event = markerEvent.remove(marker);
if (event != null && NetworkMarker.hasOutputHandlers(marker)) {
ByteBuf packetBuffer = ctx.alloc().buffer();
ENCODE_BUFFER.invoke(vanillaEncoder, ctx, packet, packetBuffer);
byte[] data = getBytes(packetBuffer);
for (PacketOutputHandler handler : marker.getOutputHandlers()) {
handler.handle(event, data);
}
// Write the result
output.writeBytes(data);
return;
}
} catch (Exception e) {
channelListener.getReporter().reportDetailed(this,
Report.newBuilder(REPORT_CANNOT_INTERCEPT_SERVER_PACKET).callerParam(packet).error(e).build());
} finally {
// Attempt to handle the packet nevertheless
ENCODE_BUFFER.invoke(vanillaEncoder, ctx, packet, output);
}
ChannelInjector.this.encode(ctx, packet, output);
}
public void exceptionCaught(ChannelHandlerContext channelhandlercontext, Throwable throwable) {
@ -273,10 +255,16 @@ class ChannelInjector extends ByteToMessageDecoder {
originalChannel.pipeline().addAfter("encoder", "protocol_lib_encoder", protocolEncoder);
// Intercept all write methods
channelField.setValue(new ChannelProxy(originalChannel) {
channelField.setValue(new ChannelProxy(originalChannel, MinecraftReflection.getPacketClass()) {
@Override
protected Object onMessageWritten(Object message) {
return channelListener.onPacketSending(ChannelInjector.this, message, packetMarker.get(message));
protected Object onMessageScheduled(Object message) {
Object result = processSending(message);
// We have now processed this packet once already
if (result != null) {
processedPackets.add(result);
}
return result;
}
});
@ -285,6 +273,15 @@ class ChannelInjector extends ByteToMessageDecoder {
}
}
/**
* Process a given message on the packet listeners.
* @param message - the message/packet.
* @return The resulting message/packet.
*/
private Object processSending(Object message) {
return channelListener.onPacketSending(ChannelInjector.this, message, packetMarker.get(message));
}
/**
* This method patches the encoder so that it skips already created packets.
* @param encoder - the encoder to patch.
@ -316,6 +313,49 @@ class ChannelInjector extends ByteToMessageDecoder {
}
}
/**
* Encode a packet to a byte buffer, taking over for the standard Minecraft encoder.
* @param ctx - the current context.
* @param packet - the packet to encode to a byte array.
* @param output - the output byte array.
* @throws Exception If anything went wrong.
*/
protected void encode(ChannelHandlerContext ctx, Object packet, ByteBuf output) throws Exception {
try {
NetworkMarker marker = getMarker(packet);
PacketEvent event = markerEvent.remove(marker);
// Try again, in case this packet was sent directly in the event loop
if (event == null && !processedPackets.remove(packet)) {
packet = processSending(packet);
marker = getMarker(packet);
event = markerEvent.remove(marker);
}
// Process output handler
if (packet != null && event != null && NetworkMarker.hasOutputHandlers(marker)) {
ByteBuf packetBuffer = ctx.alloc().buffer();
ENCODE_BUFFER.invoke(vanillaEncoder, ctx, packet, packetBuffer);
byte[] data = getBytes(packetBuffer);
for (PacketOutputHandler handler : marker.getOutputHandlers()) {
handler.handle(event, data);
}
// Write the result
output.writeBytes(data);
return;
}
} catch (Exception e) {
channelListener.getReporter().reportDetailed(this,
Report.newBuilder(REPORT_CANNOT_INTERCEPT_SERVER_PACKET).callerParam(packet).error(e).build());
} finally {
// Attempt to handle the packet nevertheless
if (packet != null) {
ENCODE_BUFFER.invoke(vanillaEncoder, ctx, packet, output);
}
}
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuffer, List<Object> packets) throws Exception {
try {

View File

@ -1,8 +1,13 @@
package com.comphenix.protocol.injector.netty;
import java.lang.reflect.Constructor;
import java.net.SocketAddress;
import java.util.Map;
import java.util.concurrent.Callable;
import com.comphenix.protocol.reflect.FuzzyReflection;
import com.comphenix.protocol.reflect.FuzzyReflection.FieldAccessor;
import net.minecraft.util.com.google.common.collect.Maps;
import net.minecraft.util.io.netty.buffer.ByteBufAllocator;
import net.minecraft.util.io.netty.channel.Channel;
import net.minecraft.util.io.netty.channel.ChannelConfig;
@ -14,48 +19,35 @@ import net.minecraft.util.io.netty.channel.ChannelPromise;
import net.minecraft.util.io.netty.channel.EventLoop;
import net.minecraft.util.io.netty.util.Attribute;
import net.minecraft.util.io.netty.util.AttributeKey;
import net.minecraft.util.io.netty.util.concurrent.EventExecutor;
abstract class ChannelProxy implements Channel {
private static Constructor<? extends ChannelFuture> FUTURE_CONSTRUCTOR;
// Mark that a certain object does not contain a message field
private static final FieldAccessor MARK_NO_MESSAGE = new FieldAccessor() {
public void set(Object instance, Object value) { }
public Object get(Object instance) { return null; }
};
// Looking up packets in inner classes
private static Map<Class<?>, FieldAccessor> MESSAGE_LOOKUP = Maps.newConcurrentMap();
// The underlying channel
private Channel delegate;
public ChannelProxy(Channel delegate) {
this.delegate = delegate;
}
/**
* Invoked when a packet is being transmitted.
* @param message - the packet to transmit.
* @return The object to transmit.
*/
protected abstract Object onMessageWritten(Object message);
private Class<?> messageClass;
/**
* The future we return when packets are being cancelled.
* @return A succeeded future.
*/
protected ChannelFuture getSucceededFuture() {
try {
if (FUTURE_CONSTRUCTOR == null) {
@SuppressWarnings("unchecked")
Class<? extends ChannelFuture> succededFuture =
(Class<? extends ChannelFuture>) ChannelProxy.class.getClassLoader().
loadClass("net.minecraft.util.io.netty.channel.SucceededChannelFuture");
FUTURE_CONSTRUCTOR = succededFuture.getDeclaredConstructor(Channel.class, EventExecutor.class);
FUTURE_CONSTRUCTOR.setAccessible(true);
}
return FUTURE_CONSTRUCTOR.newInstance(this, null);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Cannot get succeeded future.", e);
} catch (Exception e) {
throw new RuntimeException("Cannot construct completed future.", e);
}
// Event loop proxy
private transient EventLoopProxy loopProxy;
public ChannelProxy(Channel delegate, Class<?> messageClass) {
this.delegate = delegate;
this.messageClass = messageClass;
}
/**
* Invoked when a packet is scheduled for transmission in the event loop.
* @param message - the packet to schedule.
* @return The object to transmit, or NULL to cancel.
*/
protected abstract Object onMessageScheduled(Object message);
public <T> Attribute<T> attr(AttributeKey<T> paramAttributeKey) {
return delegate.attr(paramAttributeKey);
@ -82,7 +74,66 @@ abstract class ChannelProxy implements Channel {
}
public EventLoop eventLoop() {
return delegate.eventLoop();
if (loopProxy == null) {
loopProxy = new EventLoopProxy() {
@Override
protected EventLoop getDelegate() {
return delegate.eventLoop();
}
@Override
protected Runnable schedulingRunnable(Runnable runnable) {
FieldAccessor accessor = getMessageAccessor(runnable);
if (accessor != null) {
Object packet = onMessageScheduled(accessor.get(runnable));
if (packet != null)
accessor.set(runnable, packet);
else
getEmptyRunnable();
}
return runnable;
}
@Override
protected <T> Callable<T> schedulingCallable(Callable<T> callable) {
FieldAccessor accessor = getMessageAccessor(callable);
if (accessor != null) {
Object packet = onMessageScheduled(accessor.get(callable));
if (packet != null)
accessor.set(callable, packet);
else
getEmptyCallable();
}
return callable;
}
};
}
return loopProxy;
}
/**
* Retrieve a way to access the packet field of an object.
* @param value - the object.
* @return The packet field accessor, or NULL if not found.
*/
private FieldAccessor getMessageAccessor(Object value) {
Class<?> clazz = value.getClass();
FieldAccessor accessor = MESSAGE_LOOKUP.get(clazz);
if (accessor == null) {
try {
accessor = FuzzyReflection.getFieldAccessor(clazz, messageClass, true);
} catch (IllegalArgumentException e) {
accessor = MARK_NO_MESSAGE;
}
// Save the result
MESSAGE_LOOKUP.put(clazz, accessor);
}
return accessor != MARK_NO_MESSAGE ? accessor : null;
}
public ChannelFuture connect(SocketAddress paramSocketAddress1,
@ -198,37 +249,21 @@ abstract class ChannelProxy implements Channel {
public ChannelFuture deregister(ChannelPromise paramChannelPromise) {
return delegate.deregister(paramChannelPromise);
}
public ChannelFuture write(Object message) {
Object result = onMessageWritten(message);
if (result != null)
return delegate.write(result);
return getSucceededFuture();
public ChannelFuture write(Object paramObject) {
return delegate.write(paramObject);
}
public ChannelFuture write(Object message, ChannelPromise paramChannelPromise) {
Object result = onMessageWritten(message);
if (result != null)
return delegate.write(message, paramChannelPromise);
return getSucceededFuture();
public ChannelFuture write(Object paramObject, ChannelPromise paramChannelPromise) {
return delegate.write(paramObject, paramChannelPromise);
}
public ChannelFuture writeAndFlush(Object message, ChannelPromise paramChannelPromise) {
Object result = onMessageWritten(message);
if (result != null)
return delegate.writeAndFlush(message, paramChannelPromise);
return getSucceededFuture();
public ChannelFuture writeAndFlush(Object paramObject, ChannelPromise paramChannelPromise) {
return delegate.writeAndFlush(paramObject, paramChannelPromise);
}
public ChannelFuture writeAndFlush(Object message) {
Object result = onMessageWritten(message);
if (result != null)
return delegate.writeAndFlush(message);
return getSucceededFuture();
public ChannelFuture writeAndFlush(Object paramObject) {
return delegate.writeAndFlush(paramObject);
}
public int compareTo(Channel o) {

View File

@ -0,0 +1,210 @@
package com.comphenix.protocol.injector.netty;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import net.minecraft.util.io.netty.channel.Channel;
import net.minecraft.util.io.netty.channel.ChannelFuture;
import net.minecraft.util.io.netty.channel.ChannelPromise;
import net.minecraft.util.io.netty.channel.EventLoop;
import net.minecraft.util.io.netty.channel.EventLoopGroup;
import net.minecraft.util.io.netty.util.concurrent.EventExecutor;
import net.minecraft.util.io.netty.util.concurrent.ProgressivePromise;
import net.minecraft.util.io.netty.util.concurrent.Promise;
import net.minecraft.util.io.netty.util.concurrent.ScheduledFuture;
/**
* An event loop proxy.
* @author Kristian.
*/
abstract class EventLoopProxy implements EventLoop {
private static final Runnable EMPTY_RUNNABLE = new Runnable() {
public void run() {
// Do nothing
}
};
private static final Callable<?> EMPTY_CALLABLE = new Callable<Object>() {
public Object call() throws Exception {
return null;
};
};
/**
* Retrieve the underlying event loop.
* @return The event loop.
*/
protected abstract EventLoop getDelegate();
/**
* Retrieve a callable that does nothing but return NULL.
* @return The empty callable.
*/
@SuppressWarnings("unchecked")
public static <T> Callable<T> getEmptyCallable() {
return (Callable<T>) EMPTY_CALLABLE;
}
/**
* Retrieve a runnable that does nothing.
* @return A NO-OP runnable.
*/
public static Runnable getEmptyRunnable() {
return EMPTY_RUNNABLE;
}
/**
* Invoked when a runnable is being scheduled.
* @param runnable - the runnable that is scheduling.
* @return The runnable to schedule instead. Cannot be NULL.
*/
protected abstract Runnable schedulingRunnable(Runnable runnable);
/**
* Invoked when a callable is being scheduled.
* @param runnable - the callable that is scheduling.
* @return The callable to schedule instead. Cannot be NULL.
*/
protected abstract <T> Callable<T> schedulingCallable(Callable<T> callable);
public void execute(Runnable command) {
getDelegate().execute(schedulingRunnable(command));
}
public <T> net.minecraft.util.io.netty.util.concurrent.Future<T> submit(Callable<T> action) {
return getDelegate().submit(schedulingCallable(action));
}
public <T> net.minecraft.util.io.netty.util.concurrent.Future<T> submit(Runnable action, T arg1) {
return getDelegate().submit(schedulingRunnable(action), arg1);
}
public net.minecraft.util.io.netty.util.concurrent.Future<?> submit(Runnable action) {
return getDelegate().submit(schedulingRunnable(action));
}
public <V> ScheduledFuture<V> schedule(Callable<V> action, long arg1, TimeUnit arg2) {
return getDelegate().schedule(schedulingCallable(action), arg1, arg2);
}
public ScheduledFuture<?> schedule(Runnable action, long arg1, TimeUnit arg2) {
return getDelegate().schedule(schedulingRunnable(action), arg1, arg2);
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable action, long arg1, long arg2, TimeUnit arg3) {
return getDelegate().scheduleAtFixedRate(schedulingRunnable(action), arg1, arg2, arg3);
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable action, long arg1, long arg2, TimeUnit arg3) {
return getDelegate().scheduleWithFixedDelay(schedulingRunnable(action), arg1, arg2, arg3);
}
// Boiler plate:
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return getDelegate().awaitTermination(timeout, unit);
}
public boolean inEventLoop() {
return getDelegate().inEventLoop();
}
public boolean inEventLoop(Thread arg0) {
return getDelegate().inEventLoop(arg0);
}
public boolean isShutdown() {
return getDelegate().isShutdown();
}
public boolean isTerminated() {
return getDelegate().isTerminated();
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return getDelegate().invokeAll(tasks);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout,
TimeUnit unit) throws InterruptedException {
return getDelegate().invokeAll(tasks, timeout, unit);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException,
ExecutionException {
return getDelegate().invokeAny(tasks);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return getDelegate().invokeAny(tasks, timeout, unit);
}
public boolean isShuttingDown() {
return getDelegate().isShuttingDown();
}
public Iterator<EventExecutor> iterator() {
return getDelegate().iterator();
}
public <V> net.minecraft.util.io.netty.util.concurrent.Future<V> newFailedFuture(Throwable arg0) {
return getDelegate().newFailedFuture(arg0);
}
@Override
public EventLoop next() {
return getDelegate().next();
}
public <V> ProgressivePromise<V> newProgressivePromise() {
return getDelegate().newProgressivePromise();
}
public <V> Promise<V> newPromise() {
return getDelegate().newPromise();
}
public <V> net.minecraft.util.io.netty.util.concurrent.Future<V> newSucceededFuture(V arg0) {
return getDelegate().newSucceededFuture(arg0);
}
public EventLoopGroup parent() {
return getDelegate().parent();
}
public ChannelFuture register(Channel arg0, ChannelPromise arg1) {
return getDelegate().register(arg0, arg1);
}
public ChannelFuture register(Channel arg0) {
return getDelegate().register(arg0);
}
public net.minecraft.util.io.netty.util.concurrent.Future<?> shutdownGracefully() {
return getDelegate().shutdownGracefully();
}
public net.minecraft.util.io.netty.util.concurrent.Future<?> shutdownGracefully(long arg0, long arg1, TimeUnit arg2) {
return getDelegate().shutdownGracefully(arg0, arg1, arg2);
}
public net.minecraft.util.io.netty.util.concurrent.Future<?> terminationFuture() {
return getDelegate().terminationFuture();
}
@Deprecated
public void shutdown() {
getDelegate().shutdown();
}
@Deprecated
public List<Runnable> shutdownNow() {
return getDelegate().shutdownNow();
}
}

View File

@ -175,6 +175,10 @@ public class NettyProtocolInjector implements ChannelListener {
public Object onPacketSending(ChannelInjector injector, Object packet, NetworkMarker marker) {
Class<?> clazz = packet.getClass();
if (!Bukkit.isPrimaryThread()) {
System.out.println("FUCK ME: " + packet);
}
if (queuedFilters.contains(clazz)) {
// Check for ignored packets
if (injector.unignorePacket(packet)) {