Properly clean up after async listeners.

This commit is contained in:
Kristian S. Stangeland 2012-09-29 23:21:09 +02:00
parent 15b33925c0
commit d58ff1c4c1
6 changed files with 96 additions and 21 deletions

View File

@ -24,7 +24,7 @@ public interface AsynchronousManager {
* @param listener - the packet listener that will recieve these asynchronous events.
* @return An asynchrouns handler.
*/
public abstract AsyncListenerHandler registerAsyncHandler(Plugin plugin, PacketListener listener);
public abstract AsyncListenerHandler registerAsyncHandler(PacketListener listener);
/**
* Unregisters and closes the given asynchronous handler.
@ -32,6 +32,12 @@ public interface AsynchronousManager {
*/
public abstract void unregisterAsyncHandler(AsyncListenerHandler handler);
/**
* Unregisters every asynchronous handler associated with this plugin.
* @param plugin - the original plugin.
*/
public void unregisterAsyncHandlers(Plugin plugin);
/**
* Retrieves a immutable set containing the ID of the sent server packets that will be
* observed by the asynchronous listeners.

View File

@ -1,5 +1,6 @@
package com.comphenix.protocol.async;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
@ -11,6 +12,9 @@ import com.comphenix.protocol.PacketStream;
import com.comphenix.protocol.events.ListeningWhitelist;
import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.events.PacketListener;
import com.comphenix.protocol.injector.PacketFilterManager;
import com.comphenix.protocol.injector.PrioritizedListener;
import com.google.common.base.Objects;
/**
* Represents a filter manager for asynchronous packets.
@ -34,6 +38,9 @@ public class AsyncFilterManager implements AsynchronousManager {
// Current packet index
private AtomicInteger currentSendingIndex = new AtomicInteger();
// Whether or not we're currently cleaning up
private volatile boolean cleaningUp;
public AsyncFilterManager(Logger logger, PacketStream packetStream) {
this.serverQueue = new PacketSendingQueue();
this.clientQueue = new PacketSendingQueue();
@ -46,14 +53,21 @@ public class AsyncFilterManager implements AsynchronousManager {
}
@Override
public AsyncListenerHandler registerAsyncHandler(Plugin plugin, PacketListener listener) {
AsyncListenerHandler handler = new AsyncListenerHandler(plugin, mainThread, this, listener);
public AsyncListenerHandler registerAsyncHandler(PacketListener listener) {
AsyncListenerHandler handler = new AsyncListenerHandler(mainThread, this, listener);
ListeningWhitelist sendingWhitelist = listener.getSendingWhitelist();
ListeningWhitelist receivingWhitelist = listener.getReceivingWhitelist();
// Add listener to either or both processing queue
if (hasValidWhitelist(listener.getSendingWhitelist()))
serverProcessingQueue.addListener(handler, listener.getSendingWhitelist());
if (hasValidWhitelist(listener.getReceivingWhitelist()))
clientProcessingQueue.addListener(handler, listener.getReceivingWhitelist());
if (hasValidWhitelist(sendingWhitelist)) {
PacketFilterManager.verifyWhitelist(listener, sendingWhitelist);
serverProcessingQueue.addListener(handler, sendingWhitelist);
}
if (hasValidWhitelist(receivingWhitelist)) {
PacketFilterManager.verifyWhitelist(listener, receivingWhitelist);
clientProcessingQueue.addListener(handler, receivingWhitelist);
}
return handler;
}
@ -76,10 +90,36 @@ public class AsyncFilterManager implements AsynchronousManager {
PacketListener listener = handler.getAsyncListener();
// Just remove it from the queue(s)
if (hasValidWhitelist(listener.getSendingWhitelist()))
serverProcessingQueue.removeListener(handler, listener.getSendingWhitelist());
if (hasValidWhitelist(listener.getReceivingWhitelist()))
clientProcessingQueue.removeListener(handler, listener.getReceivingWhitelist());
if (hasValidWhitelist(listener.getSendingWhitelist())) {
List<Integer> removed = serverProcessingQueue.removeListener(handler, listener.getSendingWhitelist());
// We're already taking care of this, so don't do anything
if (!cleaningUp)
serverQueue.signalPacketUpdate(removed);
}
if (hasValidWhitelist(listener.getReceivingWhitelist())) {
List<Integer> removed = clientProcessingQueue.removeListener(handler, listener.getReceivingWhitelist());
if (!cleaningUp)
clientQueue.signalPacketUpdate(removed);
}
}
@Override
public void unregisterAsyncHandlers(Plugin plugin) {
unregisterAsyncHandlers(serverProcessingQueue, plugin);
unregisterAsyncHandlers(clientProcessingQueue, plugin);
}
private void unregisterAsyncHandlers(PacketProcessingQueue processingQueue, Plugin plugin) {
// Iterate through every packet listener
for (PrioritizedListener<AsyncListenerHandler> listener : processingQueue.values()) {
// Remove the listener
if (Objects.equal(listener.getListener().getPlugin(), plugin)) {
unregisterAsyncHandler(listener.getListener());
}
}
}
/**
@ -146,6 +186,7 @@ public class AsyncFilterManager implements AsynchronousManager {
@Override
public void cleanupAll() {
cleaningUp = true;
serverProcessingQueue.cleanupAll();
serverQueue.cleanupAll();
}

View File

@ -5,6 +5,7 @@ import java.util.logging.Level;
import org.bukkit.plugin.Plugin;
import com.comphenix.protocol.events.PacketAdapter;
import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.events.PacketListener;
@ -31,10 +32,7 @@ public class AsyncListenerHandler {
// The packet listener
private PacketListener listener;
// The original plugin
private Plugin plugin;
// The filter manager
private AsyncFilterManager filterManager;
@ -44,13 +42,12 @@ public class AsyncListenerHandler {
// Minecraft main thread
private Thread mainThread;
public AsyncListenerHandler(Plugin plugin, Thread mainThread, AsyncFilterManager filterManager, PacketListener listener) {
public AsyncListenerHandler(Thread mainThread, AsyncFilterManager filterManager, PacketListener listener) {
if (filterManager == null)
throw new IllegalArgumentException("filterManager cannot be NULL");
if (listener == null)
throw new IllegalArgumentException("listener cannot be NULL");
this.plugin = plugin;
this.mainThread = mainThread;
this.filterManager = filterManager;
this.listener = listener;
@ -173,6 +170,14 @@ public class AsyncListenerHandler {
}
private String getPluginName() {
return plugin != null ? plugin.getName() : "UNKNOWN";
return PacketAdapter.getPluginName(listener);
}
/**
* Retrieve the plugin associated with this async listener.
* @return The plugin.
*/
public Plugin getPlugin() {
return listener != null ? listener.getPlugin() : null;
}
}

View File

@ -2,6 +2,9 @@ package com.comphenix.protocol.async;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.PriorityBlockingQueue;
import com.comphenix.protocol.events.PacketEvent;
@ -46,6 +49,21 @@ class PacketSendingQueue {
trySendPackets();
}
public synchronized void signalPacketUpdate(List<Integer> packetsRemoved) {
Set<Integer> lookup = new HashSet<Integer>(packetsRemoved);
// Note that this is O(n), so it might be expensive
for (PacketEvent event : sendingQueue) {
if (lookup.contains(event.getPacketID())) {
event.getAsyncMarker().setProcessed(true);
}
}
// This is likely to have changed the situation a bit
trySendPackets();
}
/**
* Attempt to send any remaining packets.
*/

View File

@ -212,9 +212,11 @@ public final class PacketFilterManager implements ProtocolManager {
/**
* Determine if the packet IDs in a whitelist is valid.
* @param listener - the listener that will be mentioned in the error.
* @param whitelist - whitelist of packet IDs.
* @throws IllegalArgumentException If the whitelist is illegal.
*/
private void verifyWhitelist(PacketListener listener, ListeningWhitelist whitelist) {
public static void verifyWhitelist(PacketListener listener, ListeningWhitelist whitelist) {
for (Integer id : whitelist.getWhitelist()) {
if (id >= 256 || id < 0) {
throw new IllegalArgumentException(String.format("Invalid packet id %s in listener %s.",
@ -276,6 +278,9 @@ public final class PacketFilterManager implements ProtocolManager {
removePacketListener(listener);
}
}
// Do the same for the asynchronous events
asyncFilterManager.unregisterAsyncHandlers(plugin);
}
/**

View File

@ -104,7 +104,7 @@ public final class StructureCompiler {
}
// Used to load classes
private static Method defineMethod;
private volatile static Method defineMethod;
@SuppressWarnings("rawtypes")
private Map<StructureKey, Class> compiledCache = new HashMap<StructureKey, Class>();