Remove concurrent "processed" and "ignore" hash maps.

We can rely on the fact that channel handlers are processed in a single 
thread, and simply enable/disable a flag.
This commit is contained in:
Kristian S. Stangeland 2014-02-23 18:55:41 +01:00
parent 6e0a44f9df
commit 659f01cc63
5 changed files with 144 additions and 63 deletions

View File

@ -252,7 +252,7 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok
// Use the correct injection type // Use the correct injection type
if (MinecraftReflection.isUsingNetty()) { if (MinecraftReflection.isUsingNetty()) {
this.nettyInjector = new NettyProtocolInjector(this, reporter); this.nettyInjector = new NettyProtocolInjector(builder.getLibrary(), this, reporter);
this.playerInjection = nettyInjector.getPlayerInjector(); this.playerInjection = nettyInjector.getPlayerInjector();
this.packetInjector = nettyInjector.getPacketInjector(); this.packetInjector = nettyInjector.getPacketInjector();

View File

@ -3,10 +3,8 @@ package com.comphenix.protocol.injector.netty;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.net.Socket; import java.net.Socket;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -22,6 +20,7 @@ import net.minecraft.util.io.netty.util.concurrent.GenericFutureListener;
import net.minecraft.util.io.netty.util.internal.TypeParameterMatcher; import net.minecraft.util.io.netty.util.internal.TypeParameterMatcher;
import net.sf.cglib.proxy.Factory; import net.sf.cglib.proxy.Factory;
import org.bukkit.Bukkit;
import org.bukkit.entity.Player; import org.bukkit.entity.Player;
import com.comphenix.protocol.PacketType; import com.comphenix.protocol.PacketType;
@ -85,10 +84,22 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
private ConcurrentMap<NetworkMarker, PacketEvent> markerEvent = new MapMaker().weakKeys().makeMap(); private ConcurrentMap<NetworkMarker, PacketEvent> markerEvent = new MapMaker().weakKeys().makeMap();
// Packets we have processed before // Packets we have processed before
private Set<Object> processedPackets = Collections.newSetFromMap(new MapMaker().weakKeys().<Object, Boolean>makeMap()); //private Set<Object> processedPackets = Collections.newSetFromMap(new MapMaker().weakKeys().<Object, Boolean>makeMap());
// Packets to ignore // Packets to ignore
private Set<Object> ignoredPackets = Collections.newSetFromMap(new MapMaker().weakKeys().<Object, Boolean>makeMap()); //private Set<Object> ignoredPackets = Collections.newSetFromMap(new MapMaker().weakKeys().<Object, Boolean>makeMap());
/**
* Indicate that this packet will be ignored.
* <p>
* This must never be set outside the channel pipeline's thread.
*/
private boolean processPackets = true;
/**
* A flag set by the main thread to indiciate that a packet should not be processed.
*/
private boolean scheduleProcessPackets = true;
// Other handlers // Other handlers
private ByteToMessageDecoder vanillaDecoder; private ByteToMessageDecoder vanillaDecoder;
@ -174,14 +185,50 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
// Intercept all write methods // Intercept all write methods
channelField.setValue(new ChannelProxy(originalChannel, MinecraftReflection.getPacketClass()) { channelField.setValue(new ChannelProxy(originalChannel, MinecraftReflection.getPacketClass()) {
@Override @Override
protected Object onMessageScheduled(Object message) { protected <T> Callable<T> onMessageScheduled(final Callable<T> callable, FieldAccessor packetAccessor) {
Object result = processSending(message); if (handleScheduled(callable, packetAccessor)) {
return new Callable<T>() {
@Override
public T call() throws Exception {
T result = null;
// We have now processed this packet once already // This field must only be updated in the pipeline thread
if (result != null) { processPackets = false;
processedPackets.add(result); result = callable.call();
processPackets = true;
return result;
}
};
} }
return result; return null;
}
@Override
protected Runnable onMessageScheduled(final Runnable runnable, FieldAccessor packetAccessor) {
if (handleScheduled(runnable, packetAccessor)) {
return new Runnable() {
@Override
public void run() {
processPackets = false;
runnable.run();
processPackets = true;
}
};
}
return null;
}
protected boolean handleScheduled(Object instance, FieldAccessor accessor) {
Object original = accessor.get(instance);
Object result = scheduleProcessPackets ? processSending(original) : original;
if (result != null) {
// Change packet to be scheduled
if (original != result)
accessor.set(instance, result);
return true;
}
return false;
} }
}); });
@ -196,7 +243,16 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
* @return The resulting message/packet. * @return The resulting message/packet.
*/ */
private Object processSending(Object message) { private Object processSending(Object message) {
return channelListener.onPacketSending(ChannelInjector.this, message, packetMarker.get(message)); return processSending(message, packetMarker.get(message));
}
/**
* Process a given message on the packet listeners.
* @param message - the message/packet.
* @return The resulting message/packet.
*/
private Object processSending(Object message, NetworkMarker marker) {
return channelListener.onPacketSending(ChannelInjector.this, message, marker);
} }
/** /**
@ -226,11 +282,11 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
*/ */
protected void encode(ChannelHandlerContext ctx, Object packet, ByteBuf output) throws Exception { protected void encode(ChannelHandlerContext ctx, Object packet, ByteBuf output) throws Exception {
try { try {
NetworkMarker marker = getMarker(packet); NetworkMarker marker = getMarker(packet);;
PacketEvent event = markerEvent.remove(marker); PacketEvent event = markerEvent.remove(marker);
// Try again, in case this packet was sent directly in the event loop // This packet has not been seen by the main thread
if (event == null && !processedPackets.remove(packet)) { if (processPackets) {
Class<?> clazz = packet.getClass(); Class<?> clazz = packet.getClass();
// Schedule the transmission on the main thread instead // Schedule the transmission on the main thread instead
@ -240,7 +296,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
packet = null; packet = null;
} else { } else {
packet = processSending(packet); packet = processSending(packet, marker);
marker = getMarker(packet); marker = getMarker(packet);
event = markerEvent.remove(marker); event = markerEvent.remove(marker);
} }
@ -271,10 +327,8 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
} }
private void scheduleMainThread(final Object packetCopy) { private void scheduleMainThread(final Object packetCopy) {
// Do not process this packet agai // Don't use BukkitExecutors for this - it has a bit of overhead
processedPackets.add(packetCopy); Bukkit.getScheduler().scheduleSyncDelayedTask(factory.getPlugin(), new Runnable() {
ProtocolLibrary.getExecutorSync().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
invokeSendPacket(packetCopy); invokeSendPacket(packetCopy);
@ -386,15 +440,10 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
@Override @Override
public void sendServerPacket(Object packet, NetworkMarker marker, boolean filtered) { public void sendServerPacket(Object packet, NetworkMarker marker, boolean filtered) {
saveMarker(packet, marker); saveMarker(packet, marker);
processedPackets.remove(packet);
// Record if this packet should be ignored by most listeners scheduleProcessPackets = filtered;
if (!filtered) {
ignoredPackets.add(packet);
} else {
ignoredPackets.remove(packet);
}
invokeSendPacket(packet); invokeSendPacket(packet);
scheduleProcessPackets = true;
} }
/** /**
@ -415,20 +464,31 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
} }
@Override @Override
public void recieveClientPacket(Object packet, NetworkMarker marker, boolean filtered) { public void recieveClientPacket(final Object packet, final NetworkMarker marker, final boolean filtered) {
saveMarker(packet, marker); // Execute this in the channel thread
processedPackets.remove(packet); Runnable action = new Runnable() {
@Override
public void run() {
Object result = filtered ? channelListener.onPacketReceiving(ChannelInjector.this, packet, marker) : packet;
if (!filtered) { // See if the packet has been cancelled
ignoredPackets.add(packet); if (result == null)
return;
try {
MinecraftMethods.getNetworkManagerReadPacketMethod().invoke(networkManager, null, result);
} catch (Exception e) {
// Inform the user
ProtocolLibrary.getErrorReporter().reportMinimal(factory.getPlugin(), "recieveClientPacket", e);
}
}
};
// Execute in the worker thread
if (originalChannel.eventLoop().inEventLoop()) {
action.run();
} else { } else {
ignoredPackets.remove(packet); originalChannel.eventLoop().execute(action);
}
try {
MinecraftMethods.getNetworkManagerReadPacketMethod().invoke(networkManager, null, packet);
} catch (Exception e) {
throw new IllegalArgumentException("Unable to receive client packet " + packet, e);
} }
} }
@ -454,12 +514,14 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
@Override @Override
public boolean unignorePacket(Object packet) { public boolean unignorePacket(Object packet) {
return ignoredPackets.remove(packet); // NOP
return false;
} }
@Override @Override
public boolean ignorePacket(Object packet) { public boolean ignorePacket(Object packet) {
return ignoredPackets.add(packet); // NOP
return false;
} }
@Override @Override

View File

@ -46,10 +46,19 @@ abstract class ChannelProxy implements Channel {
/** /**
* Invoked when a packet is scheduled for transmission in the event loop. * Invoked when a packet is scheduled for transmission in the event loop.
* @param message - the packet to schedule. * @param callable - callable to schedule for execution.
* @return The object to transmit, or NULL to cancel. * @param packetAccessor - accessor for modifying the packet in the callable.
* @return The callable that will be scheduled, or NULL to cancel.
*/ */
protected abstract Object onMessageScheduled(Object message); protected abstract <T> Callable<T> onMessageScheduled(Callable<T> callable, FieldAccessor packetAccessor);
/**
* Invoked when a packet is scheduled for transmission in the event loop.
* @param runnable - the runnable that contains a packet to be scheduled.
* @param packetAccessor - accessor for modifying the packet in the runnable.
* @return The runnable that will be scheduled, or NULL to cancel.
*/
protected abstract Runnable onMessageScheduled(Runnable runnable, FieldAccessor packetAccessor);
public <T> Attribute<T> attr(AttributeKey<T> paramAttributeKey) { public <T> Attribute<T> attr(AttributeKey<T> paramAttributeKey) {
return delegate.attr(paramAttributeKey); return delegate.attr(paramAttributeKey);
@ -84,16 +93,12 @@ abstract class ChannelProxy implements Channel {
} }
@Override @Override
protected Runnable schedulingRunnable(Runnable runnable) { protected Runnable schedulingRunnable(final Runnable runnable) {
FieldAccessor accessor = getMessageAccessor(runnable); final FieldAccessor accessor = getMessageAccessor(runnable);
if (accessor != null) { if (accessor != null) {
Object packet = onMessageScheduled(accessor.get(runnable)); Runnable result = onMessageScheduled(runnable, accessor);;
return result != null ? result : getEmptyRunnable();
if (packet != null)
accessor.set(runnable, packet);
else
return getEmptyRunnable();
} }
return runnable; return runnable;
} }
@ -103,12 +108,8 @@ abstract class ChannelProxy implements Channel {
FieldAccessor accessor = getMessageAccessor(callable); FieldAccessor accessor = getMessageAccessor(callable);
if (accessor != null) { if (accessor != null) {
Object packet = onMessageScheduled(accessor.get(callable)); Callable<T> result = onMessageScheduled(callable, accessor);;
return result != null ? result : EventLoopProxy.<T>getEmptyCallable();
if (packet != null)
accessor.set(callable, packet);
else
return getEmptyCallable();
} }
return callable; return callable;
} }

View File

@ -6,6 +6,7 @@ import javax.annotation.Nonnull;
import net.minecraft.util.io.netty.channel.Channel; import net.minecraft.util.io.netty.channel.Channel;
import org.bukkit.Bukkit; import org.bukkit.Bukkit;
import org.bukkit.entity.Player; import org.bukkit.entity.Player;
import org.bukkit.plugin.Plugin;
import com.comphenix.protocol.injector.netty.ChannelInjector.ChannelSocketInjector; import com.comphenix.protocol.injector.netty.ChannelInjector.ChannelSocketInjector;
import com.comphenix.protocol.injector.server.SocketInjector; import com.comphenix.protocol.injector.server.SocketInjector;
@ -29,6 +30,21 @@ class InjectionFactory {
// Whether or not the factory is closed // Whether or not the factory is closed
private volatile boolean closed; private volatile boolean closed;
// The current plugin
private final Plugin plugin;
public InjectionFactory(Plugin plugin) {
this.plugin = plugin;
}
/**
* Retrieve the main plugin associated with this injection factory.
* @return The main plugin.
*/
public Plugin getPlugin() {
return plugin;
}
/** /**
* Construct or retrieve a channel injector from an existing Bukkit player. * Construct or retrieve a channel injector from an existing Bukkit player.
* @param player - the existing Bukkit player. * @param player - the existing Bukkit player.

View File

@ -8,6 +8,7 @@ import java.net.InetSocketAddress;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import org.bukkit.entity.Player; import org.bukkit.entity.Player;
import org.bukkit.plugin.Plugin;
import net.minecraft.util.io.netty.channel.Channel; import net.minecraft.util.io.netty.channel.Channel;
import net.minecraft.util.io.netty.channel.ChannelFuture; import net.minecraft.util.io.netty.channel.ChannelFuture;
@ -47,7 +48,7 @@ public class NettyProtocolInjector implements ChannelListener {
private List<VolatileField> bootstrapFields = Lists.newArrayList(); private List<VolatileField> bootstrapFields = Lists.newArrayList();
// The channel injector factory // The channel injector factory
private InjectionFactory injectionFactory = new InjectionFactory(); private InjectionFactory injectionFactory;
// List of network managers // List of network managers
private volatile List<Object> networkManagers; private volatile List<Object> networkManagers;
@ -67,7 +68,8 @@ public class NettyProtocolInjector implements ChannelListener {
private ErrorReporter reporter; private ErrorReporter reporter;
private boolean debug; private boolean debug;
public NettyProtocolInjector(ListenerInvoker invoker, ErrorReporter reporter) { public NettyProtocolInjector(Plugin plugin, ListenerInvoker invoker, ErrorReporter reporter) {
this.injectionFactory = new InjectionFactory(plugin);
this.invoker = invoker; this.invoker = invoker;
this.reporter = reporter; this.reporter = reporter;
} }