From 6f02e798027545bf4e61fc2e2113a1d8387f318a Mon Sep 17 00:00:00 2001 From: "Kristian S. Stangeland" Date: Sat, 29 Sep 2012 18:05:08 +0200 Subject: [PATCH] Still a work in progress. I'm considering removing the async listener and use the packet listener for both sync and async processing. --- .../com/comphenix/protocol/PacketStream.java | 55 ++++++++ .../comphenix/protocol/ProtocolLibrary.java | 2 +- .../comphenix/protocol/ProtocolManager.java | 43 +----- .../protocol/async/AsyncFilterManager.java | 80 +++++++++++- .../protocol/async/AsyncListener.java | 12 ++ .../comphenix/protocol/async/AsyncPacket.java | 122 +++++++++++++++-- .../protocol/async/ListenerToken.java | 123 ++++++++++++++++-- .../protocol/async/PacketProcessingQueue.java | 73 +++++++++-- .../protocol/async/PacketSendingQueue.java | 41 ++++++ .../AbstractConcurrentListenerMultimap.java | 16 ++- .../injector/PacketFilterManager.java | 16 +++ .../injector/SortedPacketListenerList.java | 50 ++++--- 12 files changed, 522 insertions(+), 111 deletions(-) create mode 100644 ProtocolLib/src/com/comphenix/protocol/PacketStream.java create mode 100644 ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java diff --git a/ProtocolLib/src/com/comphenix/protocol/PacketStream.java b/ProtocolLib/src/com/comphenix/protocol/PacketStream.java new file mode 100644 index 00000000..464f6f5c --- /dev/null +++ b/ProtocolLib/src/com/comphenix/protocol/PacketStream.java @@ -0,0 +1,55 @@ +package com.comphenix.protocol; + +import java.lang.reflect.InvocationTargetException; + +import org.bukkit.entity.Player; + +import com.comphenix.protocol.events.PacketContainer; + +/** + * Represents a object capable of sending or receiving packets. + * + * @author Kristian + */ +public interface PacketStream { + + /** + * Send a packet to the given player. + * @param reciever - the reciever. + * @param packet - packet to send. + * @throws InvocationTargetException - if an error occured when sending the packet. + */ + public void sendServerPacket(Player reciever, PacketContainer packet) + throws InvocationTargetException; + + /** + * Send a packet to the given player. + * @param reciever - the reciever. + * @param packet - packet to send. + * @param filters - whether or not to invoke any packet filters. + * @throws InvocationTargetException - if an error occured when sending the packet. + */ + public void sendServerPacket(Player reciever, PacketContainer packet, boolean filters) + throws InvocationTargetException; + + /** + * Simulate recieving a certain packet from a given player. + * @param sender - the sender. + * @param packet - the packet that was sent. + * @throws InvocationTargetException If the reflection machinery failed. + * @throws IllegalAccessException If the underlying method caused an error. + */ + public void recieveClientPacket(Player sender, PacketContainer packet) + throws IllegalAccessException, InvocationTargetException; + + /** + * Simulate recieving a certain packet from a given player. + * @param sender - the sender. + * @param packet - the packet that was sent. + * @param filters - whether or not to invoke any packet filters. + * @throws InvocationTargetException If the reflection machinery failed. + * @throws IllegalAccessException If the underlying method caused an error. + */ + public void recieveClientPacket(Player sender, PacketContainer packet, boolean filters) + throws IllegalAccessException, InvocationTargetException; +} diff --git a/ProtocolLib/src/com/comphenix/protocol/ProtocolLibrary.java b/ProtocolLib/src/com/comphenix/protocol/ProtocolLibrary.java index a0d3ce00..f41491ca 100644 --- a/ProtocolLib/src/com/comphenix/protocol/ProtocolLibrary.java +++ b/ProtocolLib/src/com/comphenix/protocol/ProtocolLibrary.java @@ -59,7 +59,7 @@ public class ProtocolLibrary extends JavaPlugin { backgroundCompiler = new BackgroundCompiler(getClassLoader()); BackgroundCompiler.setInstance(backgroundCompiler); } - + // Notify server managers of incompatible plugins checkForIncompatibility(manager); diff --git a/ProtocolLib/src/com/comphenix/protocol/ProtocolManager.java b/ProtocolLib/src/com/comphenix/protocol/ProtocolManager.java index 87062685..2fa911e3 100644 --- a/ProtocolLib/src/com/comphenix/protocol/ProtocolManager.java +++ b/ProtocolLib/src/com/comphenix/protocol/ProtocolManager.java @@ -17,7 +17,6 @@ package com.comphenix.protocol; -import java.lang.reflect.InvocationTargetException; import java.util.List; import java.util.Set; @@ -35,7 +34,7 @@ import com.google.common.collect.ImmutableSet; * Represents an API for accessing the Minecraft protocol. * @author Kristian */ -public interface ProtocolManager { +public interface ProtocolManager extends PacketStream { /** * Retrieves a list of every registered packet listener. @@ -66,46 +65,6 @@ public interface ProtocolManager { * @param plugin - the plugin to unload. */ public void removePacketListeners(Plugin plugin); - - /** - * Send a packet to the given player. - * @param reciever - the reciever. - * @param packet - packet to send. - * @throws InvocationTargetException - if an error occured when sending the packet. - */ - public void sendServerPacket(Player reciever, PacketContainer packet) - throws InvocationTargetException; - - /** - * Send a packet to the given player. - * @param reciever - the reciever. - * @param packet - packet to send. - * @param filters - whether or not to invoke any packet filters. - * @throws InvocationTargetException - if an error occured when sending the packet. - */ - public void sendServerPacket(Player reciever, PacketContainer packet, boolean filters) - throws InvocationTargetException; - - /** - * Simulate recieving a certain packet from a given player. - * @param sender - the sender. - * @param packet - the packet that was sent. - * @throws InvocationTargetException If the reflection machinery failed. - * @throws IllegalAccessException If the underlying method caused an error. - */ - public void recieveClientPacket(Player sender, PacketContainer packet) - throws IllegalAccessException, InvocationTargetException; - - /** - * Simulate recieving a certain packet from a given player. - * @param sender - the sender. - * @param packet - the packet that was sent. - * @param filters - whether or not to invoke any packet filters. - * @throws InvocationTargetException If the reflection machinery failed. - * @throws IllegalAccessException If the underlying method caused an error. - */ - public void recieveClientPacket(Player sender, PacketContainer packet, boolean filters) - throws IllegalAccessException, InvocationTargetException; /** * Constructs a new encapsulated Minecraft packet with the given ID. diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java index dc8e972a..86f9d789 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncFilterManager.java @@ -1,6 +1,12 @@ package com.comphenix.protocol.async; -import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; + +import org.bukkit.plugin.Plugin; + +import com.comphenix.protocol.PacketStream; +import com.comphenix.protocol.events.PacketEvent; /** * Represents a filter manager for asynchronous packets. @@ -9,9 +15,77 @@ import java.util.concurrent.Future; */ public class AsyncFilterManager { - - public Future registerAsyncHandler() { + private PacketProcessingQueue processingQueue; + private PacketSendingQueue sendingQueue; + + private PacketStream packetStream; + private Logger logger; + + // The likely main thread + private Thread mainThread; + + // Current packet index + private AtomicInteger currentSendingIndex = new AtomicInteger(); + + public AsyncFilterManager(Logger logger, PacketStream packetStream) { + this.sendingQueue = new PacketSendingQueue(); + this.processingQueue = new PacketProcessingQueue(sendingQueue); + this.packetStream = packetStream; + this.logger = logger; + this.mainThread = Thread.currentThread(); } + public ListenerToken registerAsyncHandler(Plugin plugin, AsyncListener listener) { + ListenerToken token = new ListenerToken(plugin, mainThread, this, listener); + + processingQueue.addListener(token, listener.getSendingWhitelist()); + return token; + } + + public void unregisterAsyncHandler(ListenerToken listenerToken) { + if (listenerToken == null) + throw new IllegalArgumentException("listenerToken cannot be NULL"); + + listenerToken.cancel(); + } + + // Called by ListenerToken + void unregisterAsyncHandlerInternal(ListenerToken listenerToken) { + // Just remove it from the queue + processingQueue.removeListener(listenerToken, listenerToken.getAsyncListener().getSendingWhitelist()); + } + + public void enqueueSyncPacket(PacketEvent syncPacket, int sendingDelta, long timeoutDelta) { + AsyncPacket asyncPacket = new AsyncPacket(packetStream, syncPacket, + currentSendingIndex.getAndIncrement() + sendingDelta, + System.currentTimeMillis(), + timeoutDelta); + + // Start the process + sendingQueue.enqueue(asyncPacket); + processingQueue.enqueuePacket(asyncPacket); + } + + public PacketStream getPacketStream() { + return packetStream; + } + + public Logger getLogger() { + return logger; + } + + PacketProcessingQueue getProcessingQueue() { + return processingQueue; + } + + PacketSendingQueue getSendingQueue() { + return sendingQueue; + } + + public void cleanupAll() { + // Remove all listeners + + // We don't necessarily remove packets, as this might be a part of a server reload + } } diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncListener.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncListener.java index 01ef4aba..49342139 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncListener.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncListener.java @@ -1,5 +1,17 @@ package com.comphenix.protocol.async; +import org.bukkit.plugin.Plugin; + +import com.comphenix.protocol.events.ListeningWhitelist; + public interface AsyncListener { public void onAsyncPacket(AsyncPacket packet); + + public ListeningWhitelist getSendingWhitelist(); + + /** + * Retrieve the plugin that created this async packet listener. + * @return The plugin, or NULL if not available. + */ + public Plugin getPlugin(); } diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncPacket.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncPacket.java index e14d4073..9bcc2e53 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncPacket.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncPacket.java @@ -1,8 +1,12 @@ package com.comphenix.protocol.async; import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; +import java.util.Iterator; +import com.comphenix.protocol.PacketStream; import com.comphenix.protocol.events.PacketEvent; +import com.comphenix.protocol.injector.PrioritizedListener; import com.google.common.primitives.Longs; /** @@ -12,6 +16,11 @@ import com.google.common.primitives.Longs; */ public class AsyncPacket implements Serializable, Comparable { + /** + * Signal an end to the packet processing. + */ + static final AsyncPacket INTERUPT_PACKET = new AsyncPacket(); + /** * Generated by Eclipse. */ @@ -20,13 +29,23 @@ public class AsyncPacket implements Serializable, Comparable { /** * Default number of milliseconds until a packet will rejected. */ - public static final int DEFAULT_TIMEOUT_DETLA = 60000; + public static final int DEFAULT_TIMEOUT_DELTA = 60000; /** * The original synchronized packet. */ private PacketEvent packetEvent; + /** + * The packet stream responsible for transmitting the packet when it's done processing. + */ + private transient PacketStream packetStream; + + /** + * Current list of async packet listeners. + */ + private transient Iterator> listenerTraversal; + // Timeout handling private long initialTime; private long timeout; @@ -35,18 +54,39 @@ public class AsyncPacket implements Serializable, Comparable { private long originalSendingIndex; private long newSendingIndex; + // Whether or not the packet has been processed by the listeners + private volatile boolean processed; + + private AsyncPacket() { + // Used by the poision pill pattern + } + + /** + * Determine whether or not this is a signal for the async listener to interrupt processing. + * @return Interrupt packet processing. + */ + boolean isInteruptPacket() { + // This is only possble if we're dealing with the poision pill packet + return packetEvent == null || packetStream == null; + } /** * Create a container for asyncronous packets. * @param packetEvent - the synchronous packet event. * @param initialTime - the current time in milliseconds since 01.01.1970 00:00. */ - public AsyncPacket(PacketEvent packetEvent, long sendingIndex, long initialTime) { + public AsyncPacket(PacketStream packetStream, PacketEvent packetEvent, long sendingIndex, long initialTime, long timeoutDelta) { + if (packetEvent == null) + throw new IllegalArgumentException("packetEvent cannot be NULL"); + if (packetStream == null) + throw new IllegalArgumentException("packetStream cannot be NULL"); + + this.packetStream = packetStream; this.packetEvent = packetEvent; // Timeout this.initialTime = initialTime; - this.timeout = initialTime + DEFAULT_TIMEOUT_DETLA; + this.timeout = initialTime + timeoutDelta; // Sending index this.originalSendingIndex = sendingIndex; @@ -69,14 +109,6 @@ public class AsyncPacket implements Serializable, Comparable { return timeout; } - /** - * Sets the time the packet will be forcefully rejected. - * @param timeout - the time to reject the packet, in milliseconds since 01.01.1970 00:00. - */ - public void setTimeout(long timeout) { - this.timeout = timeout; - } - /** * Retrieve the order the packet was originally transmitted. * @return The original packet index. @@ -112,6 +144,74 @@ public class AsyncPacket implements Serializable, Comparable { public PacketEvent getPacketEvent() { return packetEvent; } + + /** + * Retrieve the packet ID of the underlying packet. + * @return Packet ID. + */ + public int getPacketID() { + return packetEvent.getPacketID(); + } + + /** + * Retrieve the packet stream responsible for transmitting this packet. + * @return The packet stream. + */ + public PacketStream getPacketStream() { + return packetStream; + } + + /** + * Sets the output packet stream responsible for transmitting this packet. + * @param packetStream - new output packet stream. + */ + public void setPacketStream(PacketStream packetStream) { + this.packetStream = packetStream; + } + + /** + * Retrieve whether or not this packet has been processed by the async listeners. + * @return TRUE if it has been processed, FALSE otherwise. + */ + boolean isProcessed() { + return processed; + } + + /** + * Sets whether or not this packet has been processed by the async listeners. + * @param processed - TRUE if it has, FALSE otherwise. + */ + void setProcessed(boolean processed) { + this.processed = processed; + } + + /** + * Retrieve iterator for the next listener in line. + * @return Next async packet listener iterator. + */ + Iterator> getListenerTraversal() { + return listenerTraversal; + } + + /** + * We're done processing. Send the packet. + */ + void sendPacket() { + try { + // We only support server packets at this stage + packetStream.sendServerPacket(packetEvent.getPlayer(), packetEvent.getPacket(), false); + } catch (InvocationTargetException e) { + e.printStackTrace(); + } + } + + /** + * Set the iterator for the next listener. + * @param listenerTraversal - the new async packet listener iterator. + */ + void setListenerTraversal(Iterator> listenerTraversal) { + this.listenerTraversal = listenerTraversal; + } @Override public int compareTo(AsyncPacket o) { diff --git a/ProtocolLib/src/com/comphenix/protocol/async/ListenerToken.java b/ProtocolLib/src/com/comphenix/protocol/async/ListenerToken.java index c3a59b3d..b932b252 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/ListenerToken.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/ListenerToken.java @@ -1,38 +1,139 @@ package com.comphenix.protocol.async; import java.util.concurrent.ArrayBlockingQueue; +import java.util.logging.Level; + +import org.bukkit.plugin.Plugin; public class ListenerToken { + // Default queue capacity + private static int DEFAULT_CAPACITY = 1024; + // Cancel the async handler private volatile boolean cancelled; + + // The packet listener + private AsyncListener listener; + + // The original plugin + private Plugin plugin; + + // The filter manager + private AsyncFilterManager filterManager; + + // List of queued packets + private ArrayBlockingQueue queuedPackets = new ArrayBlockingQueue(DEFAULT_CAPACITY); + // Minecraft main thread + private Thread mainThread; + + public ListenerToken(Plugin plugin, Thread mainThread, AsyncFilterManager filterManager, AsyncListener listener) { + if (filterManager == null) + throw new IllegalArgumentException("filterManager cannot be NULL"); + if (listener == null) + throw new IllegalArgumentException("listener cannot be NULL"); + + this.plugin = plugin; + this.mainThread = mainThread; + this.filterManager = filterManager; + this.listener = listener; + } + public boolean isCancelled() { return cancelled; } + public AsyncListener getAsyncListener() { + return listener; + } + /** * Cancel the handler. */ public void cancel() { - cancelled = true; + // Remove the listener as quickly as possible + close(); + + // Poison Pill Shutdown + queuedPackets.clear(); + queuedPackets.add(AsyncPacket.INTERUPT_PACKET); } - - public void beginListener(AsyncListener asyncListener) { + /** + * Queue a packet for processing. + * @param packet - a packet for processing. + * @throws IllegalStateException If the underlying packet queue is full. + */ + public void enqueuePacket(AsyncPacket packet) { + if (packet == null) + throw new IllegalArgumentException("packet is NULL"); + + queuedPackets.add(packet); + } + + /** + * Entry point for the background thread that will be processing the packet asynchronously. + *

+ * WARNING: + * Never call this method from the main thread. Doing so will block Minecraft. + */ + public void listenerLoop() { + // Danger, danger! + if (Thread.currentThread().getId() == mainThread.getId()) + throw new IllegalStateException("Do not call this method from the main thread."); try { - AsyncPacket packet = processingQueue.take(); - - // Now, - asyncListener.onAsyncPacket(packet); - + mainLoop: + while (!cancelled) { + AsyncPacket packet = queuedPackets.take(); + + // Handle cancel requests + if (packet == null || packet.isInteruptPacket()) { + break; + } + + // Here's the core of the asynchronous processing + try { + listener.onAsyncPacket(packet); + } catch (Throwable e) { + // Minecraft doesn't want your Exception. + filterManager.getLogger().log(Level.SEVERE, + "Unhandled exception occured in onAsyncPacket() for " + getPluginName(), e); + } + + // Now, get the next non-cancelled listener + for (; packet.getListenerTraversal().hasNext(); ) { + ListenerToken token = packet.getListenerTraversal().next().getListener(); + + if (!token.isCancelled()) { + token.enqueuePacket(packet); + continue mainLoop; + } + } + + // There are no more listeners - queue the packet for transmission + filterManager.getSendingQueue().signalPacketUpdate(packet); + filterManager.getProcessingQueue().signalProcessingDone(); + } } catch (InterruptedException e) { - + // We're done } - - + // Clean up + close(); + } + + private void close() { + // Remove the listener itself + if (!cancelled) { + filterManager.unregisterAsyncHandlerInternal(this); + cancelled = true; + } + } + + private String getPluginName() { + return plugin != null ? plugin.getName() : "UNKNOWN"; } } diff --git a/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java b/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java index 620e9b3a..9061f96f 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/PacketProcessingQueue.java @@ -1,17 +1,19 @@ package com.comphenix.protocol.async; +import java.util.Collection; +import java.util.Iterator; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Semaphore; -import com.comphenix.protocol.concurrency.SortedCopyOnWriteArray; +import com.comphenix.protocol.concurrency.AbstractConcurrentListenerMultimap; +import com.comphenix.protocol.injector.PrioritizedListener; /** - * Handles the processing of a certain packet type. + * Handles the processing of every packet type. * * @author Kristian */ -class PacketProcessingQueue { - +class PacketProcessingQueue extends AbstractConcurrentListenerMultimap { /** * Default maximum number of packets to process concurrently. @@ -31,44 +33,87 @@ class PacketProcessingQueue { // Queued packets for being processed private ArrayBlockingQueue processingQueue; - - // Packet listeners - private SortedCopyOnWriteArray<> - public PacketProcessingQueue() { - this(DEFAULT_QUEUE_LIMIT, DEFAULT_MAXIMUM_CONCURRENCY); + // Packets for sending + private PacketSendingQueue sendingQueue; + + public PacketProcessingQueue(PacketSendingQueue sendingQueue) { + this(sendingQueue, DEFAULT_QUEUE_LIMIT, DEFAULT_MAXIMUM_CONCURRENCY); } - public PacketProcessingQueue(int queueLimit, int maximumConcurrency) { + public PacketProcessingQueue(PacketSendingQueue sendingQueue, int queueLimit, int maximumConcurrency) { + super(); this.processingQueue = new ArrayBlockingQueue(queueLimit); this.maximumConcurrency = maximumConcurrency; this.concurrentProcessing = new Semaphore(maximumConcurrency); + this.sendingQueue = sendingQueue; } - public boolean queuePacket(AsyncPacket packet) { + /** + * Enqueue a packet for processing by the asynchronous listeners. + * @param packet - packet to process. + * @return TRUE if we sucessfully queued the packet, FALSE if the queue ran out if space. + */ + public boolean enqueuePacket(AsyncPacket packet) { try { processingQueue.add(packet); // Begin processing packets - processPacket(); + signalBeginProcessing(); return true; } catch (IllegalStateException e) { return false; } } - public void processPacket() { - if (concurrentProcessing.tryAcquire()) { + /** + * Called by the current method and each thread to signal that a packet might be ready for processing. + */ + public void signalBeginProcessing() { + while (concurrentProcessing.tryAcquire()) { AsyncPacket packet = processingQueue.poll(); // Any packet queued? if (packet != null) { + Collection> list = getListener(packet.getPacketID()); + if (list != null) { + Iterator> iterator = list.iterator(); + + if (iterator.hasNext()) { + packet.setListenerTraversal(iterator); + iterator.next().getListener().enqueuePacket(packet); + continue; + } + } + + // The packet has no listeners. Just send it. + sendingQueue.signalPacketUpdate(packet); + signalProcessingDone(); + + } else { + // No more queued packets. + return; } } } + + /** + * Called when a packet has been processed. + */ + public void signalProcessingDone() { + concurrentProcessing.release(); + } + /** + * Retrieve the maximum number of packets to process at any given time. + * @return Number of simultaneous packet to process. + */ public int getMaximumConcurrency() { return maximumConcurrency; } + + public void removeListeners() { + for (PrioritizedListener token : ) + } } diff --git a/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java b/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java new file mode 100644 index 00000000..db2223f4 --- /dev/null +++ b/ProtocolLib/src/com/comphenix/protocol/async/PacketSendingQueue.java @@ -0,0 +1,41 @@ +package com.comphenix.protocol.async; + +import java.util.concurrent.PriorityBlockingQueue; + +/** + * Represents packets ready to be transmitted to a client. + * @author Kristian + */ +class PacketSendingQueue { + + private PriorityBlockingQueue sendingQueue; + + /** + * Enqueue a packet for sending. + * @param packet + */ + public void enqueue(AsyncPacket packet) { + sendingQueue.add(packet); + } + + /** + * Invoked when one of the packets have finished processing. + */ + public synchronized void signalPacketUpdate(AsyncPacket packetUpdated) { + + // Mark this packet as finished + packetUpdated.setProcessed(true); + + // Transmit as many packets as we can + while (true) { + AsyncPacket current = sendingQueue.peek(); + + if (current != null && current.isProcessed()) { + current.sendPacket(); + sendingQueue.poll(); + } else { + break; + } + } + } +} diff --git a/ProtocolLib/src/com/comphenix/protocol/concurrency/AbstractConcurrentListenerMultimap.java b/ProtocolLib/src/com/comphenix/protocol/concurrency/AbstractConcurrentListenerMultimap.java index 2fd5a1f3..ae61cf9b 100644 --- a/ProtocolLib/src/com/comphenix/protocol/concurrency/AbstractConcurrentListenerMultimap.java +++ b/ProtocolLib/src/com/comphenix/protocol/concurrency/AbstractConcurrentListenerMultimap.java @@ -1,6 +1,7 @@ package com.comphenix.protocol.concurrency; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -16,8 +17,8 @@ import com.comphenix.protocol.injector.PrioritizedListener; public abstract class AbstractConcurrentListenerMultimap { // The core of our map - protected ConcurrentMap>> listeners = - new ConcurrentHashMap>>(); + private ConcurrentMap>> listeners = + new ConcurrentHashMap>>(); /** * Adds a listener to its requested list of packet recievers. @@ -91,4 +92,15 @@ public abstract class AbstractConcurrentListenerMultimap { return removedPackets; } + + /** + * Retrieve the registered listeners, in order from the lowest to the highest priority. + *

+ * The returned list is thread-safe and doesn't require synchronization. + * @param packetID - packet ID. + * @return Registered listeners. + */ + public Collection> getListener(int packetID) { + return listeners.get(packetID); + } } diff --git a/ProtocolLib/src/com/comphenix/protocol/injector/PacketFilterManager.java b/ProtocolLib/src/com/comphenix/protocol/injector/PacketFilterManager.java index f4084dbb..85329a2e 100644 --- a/ProtocolLib/src/com/comphenix/protocol/injector/PacketFilterManager.java +++ b/ProtocolLib/src/com/comphenix/protocol/injector/PacketFilterManager.java @@ -46,6 +46,7 @@ import org.bukkit.plugin.Plugin; import org.bukkit.plugin.PluginManager; import com.comphenix.protocol.ProtocolManager; +import com.comphenix.protocol.async.AsyncFilterManager; import com.comphenix.protocol.events.*; import com.comphenix.protocol.reflect.FieldAccessException; import com.comphenix.protocol.reflect.FuzzyReflection; @@ -112,6 +113,9 @@ public final class PacketFilterManager implements ProtocolManager { // Error logger private Logger logger; + // The async packet handler + private AsyncFilterManager asyncFilterManager; + /** * Only create instances of this class if protocol lib is disabled. */ @@ -126,11 +130,20 @@ public final class PacketFilterManager implements ProtocolManager { this.classLoader = classLoader; this.logger = logger; this.packetInjector = new PacketInjector(classLoader, this, connectionLookup); + this.asyncFilterManager = new AsyncFilterManager(logger, this); } catch (IllegalAccessException e) { logger.log(Level.SEVERE, "Unable to initialize packet injector.", e); } } + /** + * Retrieve the current async packet filter manager. + * @return Async filter manager. + */ + public AsyncFilterManager getAsyncFilterManager() { + return asyncFilterManager; + } + /** * Retrieves how the server packets are read. * @return Injection method for reading server packets. @@ -656,6 +669,9 @@ public final class PacketFilterManager implements ProtocolManager { injection.cleanupAll(); } + // Clean up async handlers + asyncFilterManager.cleanupAll(); + // Remove packet handlers if (packetInjector != null) packetInjector.cleanupAll(); diff --git a/ProtocolLib/src/com/comphenix/protocol/injector/SortedPacketListenerList.java b/ProtocolLib/src/com/comphenix/protocol/injector/SortedPacketListenerList.java index 41c80e70..27df4585 100644 --- a/ProtocolLib/src/com/comphenix/protocol/injector/SortedPacketListenerList.java +++ b/ProtocolLib/src/com/comphenix/protocol/injector/SortedPacketListenerList.java @@ -1,16 +1,16 @@ package com.comphenix.protocol.injector; +import java.util.Collection; import java.util.logging.Level; import java.util.logging.Logger; import com.comphenix.protocol.concurrency.AbstractConcurrentListenerMultimap; -import com.comphenix.protocol.concurrency.SortedCopyOnWriteArray; import com.comphenix.protocol.events.PacketAdapter; import com.comphenix.protocol.events.PacketEvent; import com.comphenix.protocol.events.PacketListener; /** - * A thread-safe implementation of a listener multimap. + * Registry of synchronous packet listeners. * * @author Kristian */ @@ -22,22 +22,20 @@ class SortedPacketListenerList extends AbstractConcurrentListenerMultimap> list = listeners.get(event.getPacketID()); + Collection> list = getListener(event.getPacketID()); if (list == null) return; - - // We have to be careful. Cannot modify the underlying list when sending notifications. - synchronized (list) { - for (PrioritizedListener element : list) { - try { - element.getListener().onPacketReceiving(event); - } catch (Throwable e) { - // Minecraft doesn't want your Exception. - logger.log(Level.SEVERE, - "Exception occured in onPacketReceiving() for " + - PacketAdapter.getPluginName(element.getListener()), e); - } + + // The returned list is thread-safe + for (PrioritizedListener element : list) { + try { + element.getListener().onPacketReceiving(event); + } catch (Throwable e) { + // Minecraft doesn't want your Exception. + logger.log(Level.SEVERE, + "Exception occured in onPacketReceiving() for " + + PacketAdapter.getPluginName(element.getListener()), e); } } } @@ -48,23 +46,21 @@ class SortedPacketListenerList extends AbstractConcurrentListenerMultimap> list = listeners.get(event.getPacketID()); + Collection> list = getListener(event.getPacketID()); if (list == null) return; - synchronized (list) { - for (PrioritizedListener element : list) { - try { - element.getListener().onPacketSending(event); - } catch (Throwable e) { - // Minecraft doesn't want your Exception. - logger.log(Level.SEVERE, - "Exception occured in onPacketReceiving() for " + - PacketAdapter.getPluginName(element.getListener()), e); - } + for (PrioritizedListener element : list) { + try { + element.getListener().onPacketSending(event); + } catch (Throwable e) { + // Minecraft doesn't want your Exception. + logger.log(Level.SEVERE, + "Exception occured in onPacketReceiving() for " + + PacketAdapter.getPluginName(element.getListener()), e); } - } + } } }