Execute onPacketSending() on the main thread for monitor listeners.

A special-case occurs when a plugin sends a packet to a client 
with filters set to FALSE (that is, bypassing most packet listeners) - 
a new packet event is constructed solely for all MONITOR listeners, as
they are informed regardless of the value of FILTER.

Unfortunately, the sending method may be invoked on a thread other 
than the main thread, which will invoke onPacketSending() 
asynchronously. This violate the assumed thread affinity of 
onPacketSending(), so we will now schedule the packet sending on 
the main thread to correct this - but only if there are monitor
listeners, and they have not specified ListenerOptions.ASYNC (which 
means onPacketSending() is thread safe).
This commit is contained in:
Kristian S. Stangeland 2014-08-02 23:39:29 +02:00
parent b3cda21fe5
commit c443fc3da6
5 changed files with 58 additions and 8 deletions

View File

@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -33,6 +34,7 @@ import net.sf.cglib.proxy.Enhancer;
import net.sf.cglib.proxy.MethodInterceptor; import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy; import net.sf.cglib.proxy.MethodProxy;
import org.bukkit.Bukkit;
import org.bukkit.Location; import org.bukkit.Location;
import org.bukkit.Server; import org.bukkit.Server;
import org.bukkit.World; import org.bukkit.World;
@ -172,6 +174,9 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok
// The current server // The current server
private Server server; private Server server;
// The current ProtocolLib library
private Plugin library;
// The async packet handler // The async packet handler
private AsyncFilterManager asyncFilterManager; private AsyncFilterManager asyncFilterManager;
@ -292,6 +297,7 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok
buildInjector(); buildInjector();
} }
this.asyncFilterManager = builder.getAsyncManager(); this.asyncFilterManager = builder.getAsyncManager();
this.library = builder.getLibrary();
// Attempt to load the list of server and client packets // Attempt to load the list of server and client packets
try { try {
@ -771,21 +777,39 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok
} }
@Override @Override
public void sendServerPacket(Player receiver, PacketContainer packet, NetworkMarker marker, boolean filters) throws InvocationTargetException { public void sendServerPacket(final Player receiver, final PacketContainer packet, NetworkMarker marker, final boolean filters) throws InvocationTargetException {
if (receiver == null) if (receiver == null)
throw new IllegalArgumentException("receiver cannot be NULL."); throw new IllegalArgumentException("receiver cannot be NULL.");
if (packet == null) if (packet == null)
throw new IllegalArgumentException("packet cannot be NULL."); throw new IllegalArgumentException("packet cannot be NULL.");
// We may have to enable player injection indefinitely after this // We may have to enable player injection indefinitely after this
if (packetCreation.compareAndSet(false, true)) if (packetCreation.compareAndSet(false, true))
incrementPhases(GamePhase.PLAYING); incrementPhases(GamePhase.PLAYING);
// Inform the MONITOR packets
if (!filters) { if (!filters) {
PacketEvent event = PacketEvent.fromServer(this, packet, marker, receiver, false); // We may have to delay the packet due to non-asynchronous monitor listeners
if (!filters && !Bukkit.isPrimaryThread() && playerInjection.hasMainThreadListener(packet.getType())) {
final NetworkMarker copy = marker;
sendingListeners.invokePacketSending( server.getScheduler().scheduleSyncDelayedTask(library, new Runnable() {
reporter, event, ListenerPriority.MONITOR); @Override
public void run() {
try {
// Prevent infinite loops
if (!Bukkit.isPrimaryThread())
throw new IllegalStateException("Scheduled task was not executed on the main thread!");
sendServerPacket(receiver, packet, copy, filters);
} catch (Exception e) {
reporter.reportMinimal(library, "sendServerPacket-run()", e);
}
}
});
return;
}
PacketEvent event = PacketEvent.fromServer(this, packet, marker, receiver, false);
sendingListeners.invokePacketSending(reporter, event, ListenerPriority.MONITOR);
marker = NetworkMarker.getNetworkMarker(event); marker = NetworkMarker.getNetworkMarker(event);
} }
playerInjection.sendServerPacket(receiver, packet, marker, filters); playerInjection.sendServerPacket(receiver, packet, marker, filters);

View File

@ -342,6 +342,11 @@ public class NettyProtocolInjector implements ChannelListener {
sendServerPacket(packet.getHandle(), marker, filters); sendServerPacket(packet.getHandle(), marker, filters);
} }
@Override
public boolean hasMainThreadListener(PacketType type) {
return mainThreadFilters.contains(type);
}
@Override @Override
public void recieveClientPacket(Player player, Object mcPacket) throws IllegalAccessException, InvocationTargetException { public void recieveClientPacket(Player player, Object mcPacket) throws IllegalAccessException, InvocationTargetException {
injectionFactory.fromPlayer(player, listener). injectionFactory.fromPlayer(player, listener).

View File

@ -181,4 +181,13 @@ public interface PlayerInjectionHandler {
* Close any lingering proxy injections. * Close any lingering proxy injections.
*/ */
public abstract void close(); public abstract void close();
/**
* Determine if we have packet listeners with the given type that must be executed on the main thread.
* <p>
* This only applies for onPacketSending(), as it makes certain guarantees.
* @param type - the packet type.
* @return TRUE if we do, FALSE otherwise.
*/
public abstract boolean hasMainThreadListener(PacketType type);
} }

View File

@ -174,6 +174,11 @@ class ProxyPlayerInjectionHandler implements PlayerInjectionHandler {
} }
} }
@Override
public boolean hasMainThreadListener(PacketType type) {
return sendingFilters.contains(type.getLegacyId());
}
/** /**
* Sets how the server packets are read. * Sets how the server packets are read.
* @param playerHook - the new injection method for reading server packets. * @param playerHook - the new injection method for reading server packets.

View File

@ -3,8 +3,10 @@ package com.comphenix.protocol.injector.spigot;
import java.io.InputStream; import java.io.InputStream;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.bukkit.entity.Player; import org.bukkit.entity.Player;
import com.comphenix.protocol.PacketType;
import com.comphenix.protocol.concurrency.PacketTypeSet; import com.comphenix.protocol.concurrency.PacketTypeSet;
import com.comphenix.protocol.events.NetworkMarker; import com.comphenix.protocol.events.NetworkMarker;
import com.comphenix.protocol.events.PacketContainer; import com.comphenix.protocol.events.PacketContainer;
@ -50,6 +52,11 @@ class DummyPlayerHandler extends AbstractPlayerHandler {
injector.injectPlayer(player); injector.injectPlayer(player);
} }
@Override
public boolean hasMainThreadListener(PacketType type) {
return sendingFilters.contains(type);
}
@Override @Override
public void handleDisconnect(Player player) { public void handleDisconnect(Player player) {
// Just ignore // Just ignore