Ensure that asynchronous listeners are run, even if they have no

accompanying synchronous listener.
This commit is contained in:
Kristian S. Stangeland 2012-09-30 00:51:59 +02:00
parent c77103246e
commit 0cff9d1243
3 changed files with 113 additions and 5 deletions

View File

@ -11,6 +11,7 @@ import org.bukkit.scheduler.BukkitScheduler;
import com.comphenix.protocol.AsynchronousManager;
import com.comphenix.protocol.PacketStream;
import com.comphenix.protocol.ProtocolManager;
import com.comphenix.protocol.events.ListeningWhitelist;
import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.events.PacketListener;
@ -31,7 +32,6 @@ public class AsyncFilterManager implements AsynchronousManager {
private PacketProcessingQueue clientProcessingQueue;
private PacketSendingQueue clientQueue;
private PacketStream packetStream;
private Logger logger;
// The likely main thread
@ -40,13 +40,16 @@ public class AsyncFilterManager implements AsynchronousManager {
// Default scheduler
private BukkitScheduler scheduler;
// Our protocol manager
private ProtocolManager manager;
// Current packet index
private AtomicInteger currentSendingIndex = new AtomicInteger();
// Whether or not we're currently cleaning up
private volatile boolean cleaningUp;
public AsyncFilterManager(Logger logger, BukkitScheduler scheduler, PacketStream packetStream) {
public AsyncFilterManager(Logger logger, BukkitScheduler scheduler, ProtocolManager manager) {
// Server packets are synchronized already
this.serverQueue = new PacketSendingQueue(false);
@ -56,8 +59,8 @@ public class AsyncFilterManager implements AsynchronousManager {
this.serverProcessingQueue = new PacketProcessingQueue(serverQueue);
this.clientProcessingQueue = new PacketProcessingQueue(clientQueue);
this.packetStream = packetStream;
this.scheduler = scheduler;
this.manager = manager;
this.logger = logger;
this.mainThread = Thread.currentThread();
@ -70,19 +73,39 @@ public class AsyncFilterManager implements AsynchronousManager {
ListeningWhitelist sendingWhitelist = listener.getSendingWhitelist();
ListeningWhitelist receivingWhitelist = listener.getReceivingWhitelist();
// We need a synchronized listener to get the ball rolling
boolean needNoOp = true;
// Add listener to either or both processing queue
if (hasValidWhitelist(sendingWhitelist)) {
PacketFilterManager.verifyWhitelist(listener, sendingWhitelist);
serverProcessingQueue.addListener(handler, sendingWhitelist);
needNoOp &= hasPacketListener(sendingWhitelist);
}
if (hasValidWhitelist(receivingWhitelist)) {
PacketFilterManager.verifyWhitelist(listener, receivingWhitelist);
clientProcessingQueue.addListener(handler, receivingWhitelist);
needNoOp &= hasPacketListener(receivingWhitelist);
}
if (needNoOp) {
handler.setNullPacketListener(new NullPacketListener(listener));
manager.addPacketListener(handler.getNullPacketListener());
}
return handler;
}
/**
* Determine if the given packets are represented.
* @param whitelist - list of packets.
* @return TRUE if they are all registered, FALSE otherwise.
*/
private boolean hasPacketListener(ListeningWhitelist whitelist) {
return manager.getSendingFilters().containsAll(whitelist.getWhitelist());
}
private boolean hasValidWhitelist(ListeningWhitelist whitelist) {
return whitelist != null && whitelist.getWhitelist().size() > 0;
}
@ -101,6 +124,11 @@ public class AsyncFilterManager implements AsynchronousManager {
PacketListener listener = handler.getAsyncListener();
boolean synchronusOK = onMainThread();
// Unregister null packet listeners
if (handler.getNullPacketListener() != null) {
manager.removePacketListener(handler.getNullPacketListener());
}
// Just remove it from the queue(s)
if (hasValidWhitelist(listener.getSendingWhitelist())) {
List<Integer> removed = serverProcessingQueue.removeListener(handler, listener.getSendingWhitelist());
@ -109,6 +137,7 @@ public class AsyncFilterManager implements AsynchronousManager {
if (!cleaningUp)
serverQueue.signalPacketUpdate(removed, synchronusOK);
}
if (hasValidWhitelist(listener.getReceivingWhitelist())) {
List<Integer> removed = clientProcessingQueue.removeListener(handler, listener.getReceivingWhitelist());
@ -203,12 +232,12 @@ public class AsyncFilterManager implements AsynchronousManager {
// Helper method
private AsyncMarker createAsyncMarker(long sendingDelta, long timeoutDelta, long sendingIndex, long currentTime) {
return new AsyncMarker(packetStream, sendingIndex, sendingDelta, System.currentTimeMillis(), timeoutDelta);
return new AsyncMarker(manager, sendingIndex, sendingDelta, System.currentTimeMillis(), timeoutDelta);
}
@Override
public PacketStream getPacketStream() {
return packetStream;
return manager;
}
@Override

View File

@ -35,6 +35,7 @@ public class AsyncListenerHandler {
// The filter manager
private AsyncFilterManager filterManager;
private NullPacketListener nullPacketListener;
// List of queued packets
private ArrayBlockingQueue<PacketEvent> queuedPackets = new ArrayBlockingQueue<PacketEvent>(DEFAULT_CAPACITY);
@ -61,6 +62,22 @@ public class AsyncListenerHandler {
return listener;
}
/**
* Set the synchronized listener that has been automatically created.
* @param nullPacketListener - automatically created listener.
*/
void setNullPacketListener(NullPacketListener nullPacketListener) {
this.nullPacketListener = nullPacketListener;
}
/**
* Retrieve the synchronized listener that was automatically created.
* @return Automatically created listener.
*/
PacketListener getNullPacketListener() {
return nullPacketListener;
}
/**
* Cancel the handler.
*/

View File

@ -0,0 +1,62 @@
package com.comphenix.protocol.async;
import org.bukkit.plugin.Plugin;
import com.comphenix.protocol.events.ListenerPriority;
import com.comphenix.protocol.events.ListeningWhitelist;
import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.events.PacketListener;
/**
* Represents a NO OPERATION listener.
*
* @author Kristian
*/
class NullPacketListener implements PacketListener {
private ListeningWhitelist sendingWhitelist;
private ListeningWhitelist receivingWhitelist;
private Plugin plugin;
/**
* Create a no-op listener with the same whitelist and plugin as the given listener.
* @param original - the packet listener to copy.
*/
public NullPacketListener(PacketListener original) {
this.sendingWhitelist = cloneWhitelist(ListenerPriority.LOW, original.getSendingWhitelist());
this.receivingWhitelist = cloneWhitelist(ListenerPriority.LOW, original.getReceivingWhitelist());
this.plugin = original.getPlugin();
}
@Override
public void onPacketSending(PacketEvent event) {
// NULL
}
@Override
public void onPacketReceiving(PacketEvent event) {
// NULL
}
@Override
public ListeningWhitelist getSendingWhitelist() {
return sendingWhitelist;
}
@Override
public ListeningWhitelist getReceivingWhitelist() {
return receivingWhitelist;
}
private ListeningWhitelist cloneWhitelist(ListenerPriority priority, ListeningWhitelist whitelist) {
if (whitelist != null)
return new ListeningWhitelist(priority, whitelist.getWhitelist());
else
return null;
}
@Override
public Plugin getPlugin() {
return plugin;
}
}