Still a work in progress.

I'm considering removing the async listener and use the packet listener
for both sync and async processing.
This commit is contained in:
Kristian S. Stangeland 2012-09-29 18:05:08 +02:00
parent 23e676533a
commit 6f02e79802
12 changed files with 522 additions and 111 deletions

View File

@ -0,0 +1,55 @@
package com.comphenix.protocol;
import java.lang.reflect.InvocationTargetException;
import org.bukkit.entity.Player;
import com.comphenix.protocol.events.PacketContainer;
/**
* Represents a object capable of sending or receiving packets.
*
* @author Kristian
*/
public interface PacketStream {
/**
* Send a packet to the given player.
* @param reciever - the reciever.
* @param packet - packet to send.
* @throws InvocationTargetException - if an error occured when sending the packet.
*/
public void sendServerPacket(Player reciever, PacketContainer packet)
throws InvocationTargetException;
/**
* Send a packet to the given player.
* @param reciever - the reciever.
* @param packet - packet to send.
* @param filters - whether or not to invoke any packet filters.
* @throws InvocationTargetException - if an error occured when sending the packet.
*/
public void sendServerPacket(Player reciever, PacketContainer packet, boolean filters)
throws InvocationTargetException;
/**
* Simulate recieving a certain packet from a given player.
* @param sender - the sender.
* @param packet - the packet that was sent.
* @throws InvocationTargetException If the reflection machinery failed.
* @throws IllegalAccessException If the underlying method caused an error.
*/
public void recieveClientPacket(Player sender, PacketContainer packet)
throws IllegalAccessException, InvocationTargetException;
/**
* Simulate recieving a certain packet from a given player.
* @param sender - the sender.
* @param packet - the packet that was sent.
* @param filters - whether or not to invoke any packet filters.
* @throws InvocationTargetException If the reflection machinery failed.
* @throws IllegalAccessException If the underlying method caused an error.
*/
public void recieveClientPacket(Player sender, PacketContainer packet, boolean filters)
throws IllegalAccessException, InvocationTargetException;
}

View File

@ -17,7 +17,6 @@
package com.comphenix.protocol; package com.comphenix.protocol;
import java.lang.reflect.InvocationTargetException;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -35,7 +34,7 @@ import com.google.common.collect.ImmutableSet;
* Represents an API for accessing the Minecraft protocol. * Represents an API for accessing the Minecraft protocol.
* @author Kristian * @author Kristian
*/ */
public interface ProtocolManager { public interface ProtocolManager extends PacketStream {
/** /**
* Retrieves a list of every registered packet listener. * Retrieves a list of every registered packet listener.
@ -67,46 +66,6 @@ public interface ProtocolManager {
*/ */
public void removePacketListeners(Plugin plugin); public void removePacketListeners(Plugin plugin);
/**
* Send a packet to the given player.
* @param reciever - the reciever.
* @param packet - packet to send.
* @throws InvocationTargetException - if an error occured when sending the packet.
*/
public void sendServerPacket(Player reciever, PacketContainer packet)
throws InvocationTargetException;
/**
* Send a packet to the given player.
* @param reciever - the reciever.
* @param packet - packet to send.
* @param filters - whether or not to invoke any packet filters.
* @throws InvocationTargetException - if an error occured when sending the packet.
*/
public void sendServerPacket(Player reciever, PacketContainer packet, boolean filters)
throws InvocationTargetException;
/**
* Simulate recieving a certain packet from a given player.
* @param sender - the sender.
* @param packet - the packet that was sent.
* @throws InvocationTargetException If the reflection machinery failed.
* @throws IllegalAccessException If the underlying method caused an error.
*/
public void recieveClientPacket(Player sender, PacketContainer packet)
throws IllegalAccessException, InvocationTargetException;
/**
* Simulate recieving a certain packet from a given player.
* @param sender - the sender.
* @param packet - the packet that was sent.
* @param filters - whether or not to invoke any packet filters.
* @throws InvocationTargetException If the reflection machinery failed.
* @throws IllegalAccessException If the underlying method caused an error.
*/
public void recieveClientPacket(Player sender, PacketContainer packet, boolean filters)
throws IllegalAccessException, InvocationTargetException;
/** /**
* Constructs a new encapsulated Minecraft packet with the given ID. * Constructs a new encapsulated Minecraft packet with the given ID.
* @param id - packet ID. * @param id - packet ID.

View File

@ -1,6 +1,12 @@
package com.comphenix.protocol.async; package com.comphenix.protocol.async;
import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import org.bukkit.plugin.Plugin;
import com.comphenix.protocol.PacketStream;
import com.comphenix.protocol.events.PacketEvent;
/** /**
* Represents a filter manager for asynchronous packets. * Represents a filter manager for asynchronous packets.
@ -9,9 +15,77 @@ import java.util.concurrent.Future;
*/ */
public class AsyncFilterManager { public class AsyncFilterManager {
private PacketProcessingQueue processingQueue;
private PacketSendingQueue sendingQueue;
public Future<Void> registerAsyncHandler() { private PacketStream packetStream;
private Logger logger;
// The likely main thread
private Thread mainThread;
// Current packet index
private AtomicInteger currentSendingIndex = new AtomicInteger();
public AsyncFilterManager(Logger logger, PacketStream packetStream) {
this.sendingQueue = new PacketSendingQueue();
this.processingQueue = new PacketProcessingQueue(sendingQueue);
this.packetStream = packetStream;
this.logger = logger;
this.mainThread = Thread.currentThread();
} }
public ListenerToken registerAsyncHandler(Plugin plugin, AsyncListener listener) {
ListenerToken token = new ListenerToken(plugin, mainThread, this, listener);
processingQueue.addListener(token, listener.getSendingWhitelist());
return token;
}
public void unregisterAsyncHandler(ListenerToken listenerToken) {
if (listenerToken == null)
throw new IllegalArgumentException("listenerToken cannot be NULL");
listenerToken.cancel();
}
// Called by ListenerToken
void unregisterAsyncHandlerInternal(ListenerToken listenerToken) {
// Just remove it from the queue
processingQueue.removeListener(listenerToken, listenerToken.getAsyncListener().getSendingWhitelist());
}
public void enqueueSyncPacket(PacketEvent syncPacket, int sendingDelta, long timeoutDelta) {
AsyncPacket asyncPacket = new AsyncPacket(packetStream, syncPacket,
currentSendingIndex.getAndIncrement() + sendingDelta,
System.currentTimeMillis(),
timeoutDelta);
// Start the process
sendingQueue.enqueue(asyncPacket);
processingQueue.enqueuePacket(asyncPacket);
}
public PacketStream getPacketStream() {
return packetStream;
}
public Logger getLogger() {
return logger;
}
PacketProcessingQueue getProcessingQueue() {
return processingQueue;
}
PacketSendingQueue getSendingQueue() {
return sendingQueue;
}
public void cleanupAll() {
// Remove all listeners
// We don't necessarily remove packets, as this might be a part of a server reload
}
} }

View File

@ -1,5 +1,17 @@
package com.comphenix.protocol.async; package com.comphenix.protocol.async;
import org.bukkit.plugin.Plugin;
import com.comphenix.protocol.events.ListeningWhitelist;
public interface AsyncListener { public interface AsyncListener {
public void onAsyncPacket(AsyncPacket packet); public void onAsyncPacket(AsyncPacket packet);
public ListeningWhitelist getSendingWhitelist();
/**
* Retrieve the plugin that created this async packet listener.
* @return The plugin, or NULL if not available.
*/
public Plugin getPlugin();
} }

View File

@ -1,8 +1,12 @@
package com.comphenix.protocol.async; package com.comphenix.protocol.async;
import java.io.Serializable; import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.util.Iterator;
import com.comphenix.protocol.PacketStream;
import com.comphenix.protocol.events.PacketEvent; import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.injector.PrioritizedListener;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
/** /**
@ -12,6 +16,11 @@ import com.google.common.primitives.Longs;
*/ */
public class AsyncPacket implements Serializable, Comparable<AsyncPacket> { public class AsyncPacket implements Serializable, Comparable<AsyncPacket> {
/**
* Signal an end to the packet processing.
*/
static final AsyncPacket INTERUPT_PACKET = new AsyncPacket();
/** /**
* Generated by Eclipse. * Generated by Eclipse.
*/ */
@ -20,13 +29,23 @@ public class AsyncPacket implements Serializable, Comparable<AsyncPacket> {
/** /**
* Default number of milliseconds until a packet will rejected. * Default number of milliseconds until a packet will rejected.
*/ */
public static final int DEFAULT_TIMEOUT_DETLA = 60000; public static final int DEFAULT_TIMEOUT_DELTA = 60000;
/** /**
* The original synchronized packet. * The original synchronized packet.
*/ */
private PacketEvent packetEvent; private PacketEvent packetEvent;
/**
* The packet stream responsible for transmitting the packet when it's done processing.
*/
private transient PacketStream packetStream;
/**
* Current list of async packet listeners.
*/
private transient Iterator<PrioritizedListener<ListenerToken>> listenerTraversal;
// Timeout handling // Timeout handling
private long initialTime; private long initialTime;
private long timeout; private long timeout;
@ -35,18 +54,39 @@ public class AsyncPacket implements Serializable, Comparable<AsyncPacket> {
private long originalSendingIndex; private long originalSendingIndex;
private long newSendingIndex; private long newSendingIndex;
// Whether or not the packet has been processed by the listeners
private volatile boolean processed;
private AsyncPacket() {
// Used by the poision pill pattern
}
/**
* Determine whether or not this is a signal for the async listener to interrupt processing.
* @return Interrupt packet processing.
*/
boolean isInteruptPacket() {
// This is only possble if we're dealing with the poision pill packet
return packetEvent == null || packetStream == null;
}
/** /**
* Create a container for asyncronous packets. * Create a container for asyncronous packets.
* @param packetEvent - the synchronous packet event. * @param packetEvent - the synchronous packet event.
* @param initialTime - the current time in milliseconds since 01.01.1970 00:00. * @param initialTime - the current time in milliseconds since 01.01.1970 00:00.
*/ */
public AsyncPacket(PacketEvent packetEvent, long sendingIndex, long initialTime) { public AsyncPacket(PacketStream packetStream, PacketEvent packetEvent, long sendingIndex, long initialTime, long timeoutDelta) {
if (packetEvent == null)
throw new IllegalArgumentException("packetEvent cannot be NULL");
if (packetStream == null)
throw new IllegalArgumentException("packetStream cannot be NULL");
this.packetStream = packetStream;
this.packetEvent = packetEvent; this.packetEvent = packetEvent;
// Timeout // Timeout
this.initialTime = initialTime; this.initialTime = initialTime;
this.timeout = initialTime + DEFAULT_TIMEOUT_DETLA; this.timeout = initialTime + timeoutDelta;
// Sending index // Sending index
this.originalSendingIndex = sendingIndex; this.originalSendingIndex = sendingIndex;
@ -69,14 +109,6 @@ public class AsyncPacket implements Serializable, Comparable<AsyncPacket> {
return timeout; return timeout;
} }
/**
* Sets the time the packet will be forcefully rejected.
* @param timeout - the time to reject the packet, in milliseconds since 01.01.1970 00:00.
*/
public void setTimeout(long timeout) {
this.timeout = timeout;
}
/** /**
* Retrieve the order the packet was originally transmitted. * Retrieve the order the packet was originally transmitted.
* @return The original packet index. * @return The original packet index.
@ -113,6 +145,74 @@ public class AsyncPacket implements Serializable, Comparable<AsyncPacket> {
return packetEvent; return packetEvent;
} }
/**
* Retrieve the packet ID of the underlying packet.
* @return Packet ID.
*/
public int getPacketID() {
return packetEvent.getPacketID();
}
/**
* Retrieve the packet stream responsible for transmitting this packet.
* @return The packet stream.
*/
public PacketStream getPacketStream() {
return packetStream;
}
/**
* Sets the output packet stream responsible for transmitting this packet.
* @param packetStream - new output packet stream.
*/
public void setPacketStream(PacketStream packetStream) {
this.packetStream = packetStream;
}
/**
* Retrieve whether or not this packet has been processed by the async listeners.
* @return TRUE if it has been processed, FALSE otherwise.
*/
boolean isProcessed() {
return processed;
}
/**
* Sets whether or not this packet has been processed by the async listeners.
* @param processed - TRUE if it has, FALSE otherwise.
*/
void setProcessed(boolean processed) {
this.processed = processed;
}
/**
* Retrieve iterator for the next listener in line.
* @return Next async packet listener iterator.
*/
Iterator<PrioritizedListener<ListenerToken>> getListenerTraversal() {
return listenerTraversal;
}
/**
* We're done processing. Send the packet.
*/
void sendPacket() {
try {
// We only support server packets at this stage
packetStream.sendServerPacket(packetEvent.getPlayer(), packetEvent.getPacket(), false);
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
/**
* Set the iterator for the next listener.
* @param listenerTraversal - the new async packet listener iterator.
*/
void setListenerTraversal(Iterator<PrioritizedListener<ListenerToken>> listenerTraversal) {
this.listenerTraversal = listenerTraversal;
}
@Override @Override
public int compareTo(AsyncPacket o) { public int compareTo(AsyncPacket o) {
if (o == null) if (o == null)

View File

@ -1,38 +1,139 @@
package com.comphenix.protocol.async; package com.comphenix.protocol.async;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.logging.Level;
import org.bukkit.plugin.Plugin;
public class ListenerToken { public class ListenerToken {
// Default queue capacity
private static int DEFAULT_CAPACITY = 1024;
// Cancel the async handler // Cancel the async handler
private volatile boolean cancelled; private volatile boolean cancelled;
// The packet listener
private AsyncListener listener;
// The original plugin
private Plugin plugin;
// The filter manager
private AsyncFilterManager filterManager;
// List of queued packets
private ArrayBlockingQueue<AsyncPacket> queuedPackets = new ArrayBlockingQueue<AsyncPacket>(DEFAULT_CAPACITY);
// Minecraft main thread
private Thread mainThread;
public ListenerToken(Plugin plugin, Thread mainThread, AsyncFilterManager filterManager, AsyncListener listener) {
if (filterManager == null)
throw new IllegalArgumentException("filterManager cannot be NULL");
if (listener == null)
throw new IllegalArgumentException("listener cannot be NULL");
this.plugin = plugin;
this.mainThread = mainThread;
this.filterManager = filterManager;
this.listener = listener;
}
public boolean isCancelled() { public boolean isCancelled() {
return cancelled; return cancelled;
} }
public AsyncListener getAsyncListener() {
return listener;
}
/** /**
* Cancel the handler. * Cancel the handler.
*/ */
public void cancel() { public void cancel() {
cancelled = true; // Remove the listener as quickly as possible
close();
// Poison Pill Shutdown
queuedPackets.clear();
queuedPackets.add(AsyncPacket.INTERUPT_PACKET);
} }
/**
* Queue a packet for processing.
* @param packet - a packet for processing.
* @throws IllegalStateException If the underlying packet queue is full.
*/
public void enqueuePacket(AsyncPacket packet) {
if (packet == null)
throw new IllegalArgumentException("packet is NULL");
public void beginListener(AsyncListener asyncListener) { queuedPackets.add(packet);
}
/**
* Entry point for the background thread that will be processing the packet asynchronously.
* <p>
* <b>WARNING:</b>
* Never call this method from the main thread. Doing so will block Minecraft.
*/
public void listenerLoop() {
// Danger, danger!
if (Thread.currentThread().getId() == mainThread.getId())
throw new IllegalStateException("Do not call this method from the main thread.");
try { try {
AsyncPacket packet = processingQueue.take(); mainLoop:
while (!cancelled) {
AsyncPacket packet = queuedPackets.take();
// Now, // Handle cancel requests
asyncListener.onAsyncPacket(packet); if (packet == null || packet.isInteruptPacket()) {
break;
}
// Here's the core of the asynchronous processing
try {
listener.onAsyncPacket(packet);
} catch (Throwable e) {
// Minecraft doesn't want your Exception.
filterManager.getLogger().log(Level.SEVERE,
"Unhandled exception occured in onAsyncPacket() for " + getPluginName(), e);
}
// Now, get the next non-cancelled listener
for (; packet.getListenerTraversal().hasNext(); ) {
ListenerToken token = packet.getListenerTraversal().next().getListener();
if (!token.isCancelled()) {
token.enqueuePacket(packet);
continue mainLoop;
}
}
// There are no more listeners - queue the packet for transmission
filterManager.getSendingQueue().signalPacketUpdate(packet);
filterManager.getProcessingQueue().signalProcessingDone();
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
// We're done
} }
// Clean up
close();
}
private void close() {
// Remove the listener itself
if (!cancelled) {
filterManager.unregisterAsyncHandlerInternal(this);
cancelled = true;
}
}
private String getPluginName() {
return plugin != null ? plugin.getName() : "UNKNOWN";
} }
} }

View File

@ -1,17 +1,19 @@
package com.comphenix.protocol.async; package com.comphenix.protocol.async;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import com.comphenix.protocol.concurrency.SortedCopyOnWriteArray; import com.comphenix.protocol.concurrency.AbstractConcurrentListenerMultimap;
import com.comphenix.protocol.injector.PrioritizedListener;
/** /**
* Handles the processing of a certain packet type. * Handles the processing of every packet type.
* *
* @author Kristian * @author Kristian
*/ */
class PacketProcessingQueue { class PacketProcessingQueue extends AbstractConcurrentListenerMultimap<ListenerToken> {
/** /**
* Default maximum number of packets to process concurrently. * Default maximum number of packets to process concurrently.
@ -32,43 +34,86 @@ class PacketProcessingQueue {
// Queued packets for being processed // Queued packets for being processed
private ArrayBlockingQueue<AsyncPacket> processingQueue; private ArrayBlockingQueue<AsyncPacket> processingQueue;
// Packet listeners // Packets for sending
private SortedCopyOnWriteArray<> private PacketSendingQueue sendingQueue;
public PacketProcessingQueue() { public PacketProcessingQueue(PacketSendingQueue sendingQueue) {
this(DEFAULT_QUEUE_LIMIT, DEFAULT_MAXIMUM_CONCURRENCY); this(sendingQueue, DEFAULT_QUEUE_LIMIT, DEFAULT_MAXIMUM_CONCURRENCY);
} }
public PacketProcessingQueue(int queueLimit, int maximumConcurrency) { public PacketProcessingQueue(PacketSendingQueue sendingQueue, int queueLimit, int maximumConcurrency) {
super();
this.processingQueue = new ArrayBlockingQueue<AsyncPacket>(queueLimit); this.processingQueue = new ArrayBlockingQueue<AsyncPacket>(queueLimit);
this.maximumConcurrency = maximumConcurrency; this.maximumConcurrency = maximumConcurrency;
this.concurrentProcessing = new Semaphore(maximumConcurrency); this.concurrentProcessing = new Semaphore(maximumConcurrency);
this.sendingQueue = sendingQueue;
} }
public boolean queuePacket(AsyncPacket packet) { /**
* Enqueue a packet for processing by the asynchronous listeners.
* @param packet - packet to process.
* @return TRUE if we sucessfully queued the packet, FALSE if the queue ran out if space.
*/
public boolean enqueuePacket(AsyncPacket packet) {
try { try {
processingQueue.add(packet); processingQueue.add(packet);
// Begin processing packets // Begin processing packets
processPacket(); signalBeginProcessing();
return true; return true;
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
return false; return false;
} }
} }
public void processPacket() { /**
if (concurrentProcessing.tryAcquire()) { * Called by the current method and each thread to signal that a packet might be ready for processing.
*/
public void signalBeginProcessing() {
while (concurrentProcessing.tryAcquire()) {
AsyncPacket packet = processingQueue.poll(); AsyncPacket packet = processingQueue.poll();
// Any packet queued? // Any packet queued?
if (packet != null) { if (packet != null) {
Collection<PrioritizedListener<ListenerToken>> list = getListener(packet.getPacketID());
if (list != null) {
Iterator<PrioritizedListener<ListenerToken>> iterator = list.iterator();
if (iterator.hasNext()) {
packet.setListenerTraversal(iterator);
iterator.next().getListener().enqueuePacket(packet);
continue;
}
}
// The packet has no listeners. Just send it.
sendingQueue.signalPacketUpdate(packet);
signalProcessingDone();
} else {
// No more queued packets.
return;
} }
} }
} }
/**
* Called when a packet has been processed.
*/
public void signalProcessingDone() {
concurrentProcessing.release();
}
/**
* Retrieve the maximum number of packets to process at any given time.
* @return Number of simultaneous packet to process.
*/
public int getMaximumConcurrency() { public int getMaximumConcurrency() {
return maximumConcurrency; return maximumConcurrency;
} }
public void removeListeners() {
for (PrioritizedListener<ListenerToken> token : )
}
} }

View File

@ -0,0 +1,41 @@
package com.comphenix.protocol.async;
import java.util.concurrent.PriorityBlockingQueue;
/**
* Represents packets ready to be transmitted to a client.
* @author Kristian
*/
class PacketSendingQueue {
private PriorityBlockingQueue<AsyncPacket> sendingQueue;
/**
* Enqueue a packet for sending.
* @param packet
*/
public void enqueue(AsyncPacket packet) {
sendingQueue.add(packet);
}
/**
* Invoked when one of the packets have finished processing.
*/
public synchronized void signalPacketUpdate(AsyncPacket packetUpdated) {
// Mark this packet as finished
packetUpdated.setProcessed(true);
// Transmit as many packets as we can
while (true) {
AsyncPacket current = sendingQueue.peek();
if (current != null && current.isProcessed()) {
current.sendPacket();
sendingQueue.poll();
} else {
break;
}
}
}
}

View File

@ -1,6 +1,7 @@
package com.comphenix.protocol.concurrency; package com.comphenix.protocol.concurrency;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -16,7 +17,7 @@ import com.comphenix.protocol.injector.PrioritizedListener;
public abstract class AbstractConcurrentListenerMultimap<TListener> { public abstract class AbstractConcurrentListenerMultimap<TListener> {
// The core of our map // The core of our map
protected ConcurrentMap<Integer, SortedCopyOnWriteArray<PrioritizedListener<TListener>>> listeners = private ConcurrentMap<Integer, SortedCopyOnWriteArray<PrioritizedListener<TListener>>> listeners =
new ConcurrentHashMap<Integer, SortedCopyOnWriteArray<PrioritizedListener<TListener>>>(); new ConcurrentHashMap<Integer, SortedCopyOnWriteArray<PrioritizedListener<TListener>>>();
/** /**
@ -91,4 +92,15 @@ public abstract class AbstractConcurrentListenerMultimap<TListener> {
return removedPackets; return removedPackets;
} }
/**
* Retrieve the registered listeners, in order from the lowest to the highest priority.
* <p>
* The returned list is thread-safe and doesn't require synchronization.
* @param packetID - packet ID.
* @return Registered listeners.
*/
public Collection<PrioritizedListener<TListener>> getListener(int packetID) {
return listeners.get(packetID);
}
} }

View File

@ -46,6 +46,7 @@ import org.bukkit.plugin.Plugin;
import org.bukkit.plugin.PluginManager; import org.bukkit.plugin.PluginManager;
import com.comphenix.protocol.ProtocolManager; import com.comphenix.protocol.ProtocolManager;
import com.comphenix.protocol.async.AsyncFilterManager;
import com.comphenix.protocol.events.*; import com.comphenix.protocol.events.*;
import com.comphenix.protocol.reflect.FieldAccessException; import com.comphenix.protocol.reflect.FieldAccessException;
import com.comphenix.protocol.reflect.FuzzyReflection; import com.comphenix.protocol.reflect.FuzzyReflection;
@ -112,6 +113,9 @@ public final class PacketFilterManager implements ProtocolManager {
// Error logger // Error logger
private Logger logger; private Logger logger;
// The async packet handler
private AsyncFilterManager asyncFilterManager;
/** /**
* Only create instances of this class if protocol lib is disabled. * Only create instances of this class if protocol lib is disabled.
*/ */
@ -126,11 +130,20 @@ public final class PacketFilterManager implements ProtocolManager {
this.classLoader = classLoader; this.classLoader = classLoader;
this.logger = logger; this.logger = logger;
this.packetInjector = new PacketInjector(classLoader, this, connectionLookup); this.packetInjector = new PacketInjector(classLoader, this, connectionLookup);
this.asyncFilterManager = new AsyncFilterManager(logger, this);
} catch (IllegalAccessException e) { } catch (IllegalAccessException e) {
logger.log(Level.SEVERE, "Unable to initialize packet injector.", e); logger.log(Level.SEVERE, "Unable to initialize packet injector.", e);
} }
} }
/**
* Retrieve the current async packet filter manager.
* @return Async filter manager.
*/
public AsyncFilterManager getAsyncFilterManager() {
return asyncFilterManager;
}
/** /**
* Retrieves how the server packets are read. * Retrieves how the server packets are read.
* @return Injection method for reading server packets. * @return Injection method for reading server packets.
@ -656,6 +669,9 @@ public final class PacketFilterManager implements ProtocolManager {
injection.cleanupAll(); injection.cleanupAll();
} }
// Clean up async handlers
asyncFilterManager.cleanupAll();
// Remove packet handlers // Remove packet handlers
if (packetInjector != null) if (packetInjector != null)
packetInjector.cleanupAll(); packetInjector.cleanupAll();

View File

@ -1,16 +1,16 @@
package com.comphenix.protocol.injector; package com.comphenix.protocol.injector;
import java.util.Collection;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import com.comphenix.protocol.concurrency.AbstractConcurrentListenerMultimap; import com.comphenix.protocol.concurrency.AbstractConcurrentListenerMultimap;
import com.comphenix.protocol.concurrency.SortedCopyOnWriteArray;
import com.comphenix.protocol.events.PacketAdapter; import com.comphenix.protocol.events.PacketAdapter;
import com.comphenix.protocol.events.PacketEvent; import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.events.PacketListener; import com.comphenix.protocol.events.PacketListener;
/** /**
* A thread-safe implementation of a listener multimap. * Registry of synchronous packet listeners.
* *
* @author Kristian * @author Kristian
*/ */
@ -22,13 +22,12 @@ class SortedPacketListenerList extends AbstractConcurrentListenerMultimap<Packet
* @param event - the packet event to invoke. * @param event - the packet event to invoke.
*/ */
public void invokePacketRecieving(Logger logger, PacketEvent event) { public void invokePacketRecieving(Logger logger, PacketEvent event) {
SortedCopyOnWriteArray<PrioritizedListener<PacketListener>> list = listeners.get(event.getPacketID()); Collection<PrioritizedListener<PacketListener>> list = getListener(event.getPacketID());
if (list == null) if (list == null)
return; return;
// We have to be careful. Cannot modify the underlying list when sending notifications. // The returned list is thread-safe
synchronized (list) {
for (PrioritizedListener<PacketListener> element : list) { for (PrioritizedListener<PacketListener> element : list) {
try { try {
element.getListener().onPacketReceiving(event); element.getListener().onPacketReceiving(event);
@ -40,7 +39,6 @@ class SortedPacketListenerList extends AbstractConcurrentListenerMultimap<Packet
} }
} }
} }
}
/** /**
* Invokes the given packet event for every registered listener. * Invokes the given packet event for every registered listener.
@ -48,12 +46,11 @@ class SortedPacketListenerList extends AbstractConcurrentListenerMultimap<Packet
* @param event - the packet event to invoke. * @param event - the packet event to invoke.
*/ */
public void invokePacketSending(Logger logger, PacketEvent event) { public void invokePacketSending(Logger logger, PacketEvent event) {
SortedCopyOnWriteArray<PrioritizedListener<PacketListener>> list = listeners.get(event.getPacketID()); Collection<PrioritizedListener<PacketListener>> list = getListener(event.getPacketID());
if (list == null) if (list == null)
return; return;
synchronized (list) {
for (PrioritizedListener<PacketListener> element : list) { for (PrioritizedListener<PacketListener> element : list) {
try { try {
element.getListener().onPacketSending(event); element.getListener().onPacketSending(event);
@ -65,6 +62,5 @@ class SortedPacketListenerList extends AbstractConcurrentListenerMultimap<Packet
} }
} }
} }
}
} }