Beginning to add support for asynchronous packet listeners.

This commit is contained in:
Kristian S. Stangeland 2012-09-29 01:00:12 +02:00
parent 2012698275
commit 23e676533a
10 changed files with 482 additions and 196 deletions

View File

@ -0,0 +1,17 @@
package com.comphenix.protocol.async;
import java.util.concurrent.Future;
/**
* Represents a filter manager for asynchronous packets.
*
* @author Kristian
*/
public class AsyncFilterManager {
public Future<Void> registerAsyncHandler() {
}
}

View File

@ -0,0 +1,5 @@
package com.comphenix.protocol.async;
public interface AsyncListener {
public void onAsyncPacket(AsyncPacket packet);
}

View File

@ -0,0 +1,123 @@
package com.comphenix.protocol.async;
import java.io.Serializable;
import com.comphenix.protocol.events.PacketEvent;
import com.google.common.primitives.Longs;
/**
* Represents a packet that is being processed by asynchronous listeners.
*
* @author Kristian
*/
public class AsyncPacket implements Serializable, Comparable<AsyncPacket> {
/**
* Generated by Eclipse.
*/
private static final long serialVersionUID = -2621498096616187384L;
/**
* Default number of milliseconds until a packet will rejected.
*/
public static final int DEFAULT_TIMEOUT_DETLA = 60000;
/**
* The original synchronized packet.
*/
private PacketEvent packetEvent;
// Timeout handling
private long initialTime;
private long timeout;
// Packet order
private long originalSendingIndex;
private long newSendingIndex;
/**
* Create a container for asyncronous packets.
* @param packetEvent - the synchronous packet event.
* @param initialTime - the current time in milliseconds since 01.01.1970 00:00.
*/
public AsyncPacket(PacketEvent packetEvent, long sendingIndex, long initialTime) {
this.packetEvent = packetEvent;
// Timeout
this.initialTime = initialTime;
this.timeout = initialTime + DEFAULT_TIMEOUT_DETLA;
// Sending index
this.originalSendingIndex = sendingIndex;
this.newSendingIndex = sendingIndex;
}
/**
* Retrieve the time the packet was initially queued for asynchronous processing.
* @return The initial time in number of milliseconds since 01.01.1970 00:00.
*/
public long getInitialTime() {
return initialTime;
}
/**
* Retrieve the time the packet will be forcefully rejected.
* @return The time to reject the packet, in milliseconds since 01.01.1970 00:00.
*/
public long getTimeout() {
return timeout;
}
/**
* Sets the time the packet will be forcefully rejected.
* @param timeout - the time to reject the packet, in milliseconds since 01.01.1970 00:00.
*/
public void setTimeout(long timeout) {
this.timeout = timeout;
}
/**
* Retrieve the order the packet was originally transmitted.
* @return The original packet index.
*/
public long getOriginalSendingIndex() {
return originalSendingIndex;
}
/**
* Retrieve the desired sending order after processing has completed.
* <p>
* Higher sending order means lower priority.
* @return Desired sending order.
*/
public long getNewSendingIndex() {
return newSendingIndex;
}
/**
* Sets the desired sending order after processing has completed.
* <p>
* Higher sending order means lower priority.
* @param newSendingIndex - new packet send index.
*/
public void setNewSendingIndex(long newSendingIndex) {
this.newSendingIndex = newSendingIndex;
}
/**
* Retrieve the original synchronous packet event.
* @return The original packet event.
*/
public PacketEvent getPacketEvent() {
return packetEvent;
}
@Override
public int compareTo(AsyncPacket o) {
if (o == null)
return 1;
else
return Longs.compare(getNewSendingIndex(), o.getNewSendingIndex());
}
}

View File

@ -0,0 +1,38 @@
package com.comphenix.protocol.async;
import java.util.concurrent.ArrayBlockingQueue;
public class ListenerToken {
// Cancel the async handler
private volatile boolean cancelled;
public boolean isCancelled() {
return cancelled;
}
/**
* Cancel the handler.
*/
public void cancel() {
cancelled = true;
}
public void beginListener(AsyncListener asyncListener) {
try {
AsyncPacket packet = processingQueue.take();
// Now,
asyncListener.onAsyncPacket(packet);
} catch (InterruptedException e) {
}
}
}

View File

@ -0,0 +1,74 @@
package com.comphenix.protocol.async;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Semaphore;
import com.comphenix.protocol.concurrency.SortedCopyOnWriteArray;
/**
* Handles the processing of a certain packet type.
*
* @author Kristian
*/
class PacketProcessingQueue {
/**
* Default maximum number of packets to process concurrently.
*/
public static final int DEFAULT_MAXIMUM_CONCURRENCY = 5;
/**
* Default maximum number of packets to queue for processing.
*/
public static final int DEFAULT_QUEUE_LIMIT = 1024 * 60;
/**
* Number of packets we're processing concurrently.
*/
private final int maximumConcurrency;
private Semaphore concurrentProcessing;
// Queued packets for being processed
private ArrayBlockingQueue<AsyncPacket> processingQueue;
// Packet listeners
private SortedCopyOnWriteArray<>
public PacketProcessingQueue() {
this(DEFAULT_QUEUE_LIMIT, DEFAULT_MAXIMUM_CONCURRENCY);
}
public PacketProcessingQueue(int queueLimit, int maximumConcurrency) {
this.processingQueue = new ArrayBlockingQueue<AsyncPacket>(queueLimit);
this.maximumConcurrency = maximumConcurrency;
this.concurrentProcessing = new Semaphore(maximumConcurrency);
}
public boolean queuePacket(AsyncPacket packet) {
try {
processingQueue.add(packet);
// Begin processing packets
processPacket();
return true;
} catch (IllegalStateException e) {
return false;
}
}
public void processPacket() {
if (concurrentProcessing.tryAcquire()) {
AsyncPacket packet = processingQueue.poll();
// Any packet queued?
if (packet != null) {
}
}
}
public int getMaximumConcurrency() {
return maximumConcurrency;
}
}

View File

@ -0,0 +1,94 @@
package com.comphenix.protocol.concurrency;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import com.comphenix.protocol.events.ListeningWhitelist;
import com.comphenix.protocol.injector.PrioritizedListener;
/**
* A thread-safe implementation of a listener multimap.
*
* @author Kristian
*/
public abstract class AbstractConcurrentListenerMultimap<TListener> {
// The core of our map
protected ConcurrentMap<Integer, SortedCopyOnWriteArray<PrioritizedListener<TListener>>> listeners =
new ConcurrentHashMap<Integer, SortedCopyOnWriteArray<PrioritizedListener<TListener>>>();
/**
* Adds a listener to its requested list of packet recievers.
* @param listener - listener with a list of packets to recieve notifcations for.
* @param whitelist - the packet whitelist to use.
*/
public void addListener(TListener listener, ListeningWhitelist whitelist) {
PrioritizedListener<TListener> prioritized = new PrioritizedListener<TListener>(listener, whitelist.getPriority());
for (Integer packetID : whitelist.getWhitelist()) {
addListener(packetID, prioritized);
}
}
// Add the listener to a specific packet notifcation list
private void addListener(Integer packetID, PrioritizedListener<TListener> listener) {
SortedCopyOnWriteArray<PrioritizedListener<TListener>> list = listeners.get(packetID);
// We don't want to create this for every lookup
if (list == null) {
// It would be nice if we could use a PriorityBlockingQueue, but it doesn't preseve iterator order,
// which is a essential feature for our purposes.
final SortedCopyOnWriteArray<PrioritizedListener<TListener>> value = new SortedCopyOnWriteArray<PrioritizedListener<TListener>>();
list = listeners.putIfAbsent(packetID, value);
// We may end up creating multiple multisets, but we'll agree
// on the one to use.
if (list == null) {
list = value;
}
}
// Thread safe
list.add(listener);
}
/**
* Removes the given listener from the packet event list.
* @param listener - listener to remove.
* @param whitelist - the packet whitelist that was used.
* @return Every packet ID that was removed due to no listeners.
*/
public List<Integer> removeListener(TListener listener, ListeningWhitelist whitelist) {
List<Integer> removedPackets = new ArrayList<Integer>();
// Again, not terribly efficient. But adding or removing listeners should be a rare event.
for (Integer packetID : whitelist.getWhitelist()) {
SortedCopyOnWriteArray<PrioritizedListener<TListener>> list = listeners.get(packetID);
// Remove any listeners
if (list != null) {
// Don't remove from newly created lists
if (list.size() > 0) {
// Remove this listener. Note that priority is generally ignored.
list.remove(new PrioritizedListener<TListener>(listener, whitelist.getPriority()));
if (list.size() == 0) {
listeners.remove(packetID);
removedPackets.add(packetID);
}
}
}
// Move on to the next
}
return removedPackets;
}
}

View File

@ -1,194 +0,0 @@
package com.comphenix.protocol.injector;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.comphenix.protocol.concurrency.SortedCopyOnWriteArray;
import com.comphenix.protocol.events.ListenerPriority;
import com.comphenix.protocol.events.ListeningWhitelist;
import com.comphenix.protocol.events.PacketAdapter;
import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.events.PacketListener;
import com.google.common.base.Objects;
import com.google.common.primitives.Ints;
/**
* A thread-safe implementation of a listener multimap.
*
* @author Kristian
*/
public class ConcurrentListenerMultimap {
// The core of our map
protected ConcurrentMap<Integer, SortedCopyOnWriteArray<PrioritizedListener>> listeners =
new ConcurrentHashMap<Integer, SortedCopyOnWriteArray<PrioritizedListener>>();
/**
* Adds a listener to its requested list of packet recievers.
* @param listener - listener with a list of packets to recieve notifcations for.
* @param whitelist - the packet whitelist to use.
*/
public void addListener(PacketListener listener, ListeningWhitelist whitelist) {
PrioritizedListener prioritized = new PrioritizedListener(listener, whitelist.getPriority());
for (Integer packetID : whitelist.getWhitelist()) {
addListener(packetID, prioritized);
}
}
// Add the listener to a specific packet notifcation list
private void addListener(Integer packetID, PrioritizedListener listener) {
SortedCopyOnWriteArray<PrioritizedListener> list = listeners.get(packetID);
// We don't want to create this for every lookup
if (list == null) {
// It would be nice if we could use a PriorityBlockingQueue, but it doesn't preseve iterator order,
// which is a essential feature for our purposes.
final SortedCopyOnWriteArray<PrioritizedListener> value = new SortedCopyOnWriteArray<PrioritizedListener>();
list = listeners.putIfAbsent(packetID, value);
// We may end up creating multiple multisets, but we'll agree
// on the one to use.
if (list == null) {
list = value;
}
}
// Thread safe
list.add(listener);
}
/**
* Removes the given listener from the packet event list.
* @param listener - listener to remove.
* @param whitelist - the packet whitelist that was used.
* @return Every packet ID that was removed due to no listeners.
*/
public List<Integer> removeListener(PacketListener listener, ListeningWhitelist whitelist) {
List<Integer> removedPackets = new ArrayList<Integer>();
// Again, not terribly efficient. But adding or removing listeners should be a rare event.
for (Integer packetID : whitelist.getWhitelist()) {
SortedCopyOnWriteArray<PrioritizedListener> list = listeners.get(packetID);
// Remove any listeners
if (list != null) {
// Don't remove from newly created lists
if (list.size() > 0) {
// Remove this listener. Note that priority is generally ignored.
list.remove(new PrioritizedListener(listener, whitelist.getPriority()));
if (list.size() == 0) {
listeners.remove(packetID);
removedPackets.add(packetID);
}
}
}
// Move on to the next
}
return removedPackets;
}
/**
* Invokes the given packet event for every registered listener.
* @param logger - the logger that will be used to inform about listener exceptions.
* @param event - the packet event to invoke.
*/
public void invokePacketRecieving(Logger logger, PacketEvent event) {
SortedCopyOnWriteArray<PrioritizedListener> list = listeners.get(event.getPacketID());
if (list == null)
return;
// We have to be careful. Cannot modify the underlying list when sending notifications.
synchronized (list) {
for (PrioritizedListener element : list) {
try {
element.getListener().onPacketReceiving(event);
} catch (Throwable e) {
// Minecraft doesn't want your Exception.
logger.log(Level.SEVERE,
"Exception occured in onPacketReceiving() for " +
PacketAdapter.getPluginName(element.getListener()), e);
}
}
}
}
/**
* Invokes the given packet event for every registered listener.
* @param logger - the logger that will be used to inform about listener exceptions.
* @param event - the packet event to invoke.
*/
public void invokePacketSending(Logger logger, PacketEvent event) {
SortedCopyOnWriteArray<PrioritizedListener> list = listeners.get(event.getPacketID());
if (list == null)
return;
synchronized (list) {
for (PrioritizedListener element : list) {
try {
element.getListener().onPacketSending(event);
} catch (Throwable e) {
// Minecraft doesn't want your Exception.
logger.log(Level.SEVERE,
"Exception occured in onPacketReceiving() for " +
PacketAdapter.getPluginName(element.getListener()), e);
}
}
}
}
/**
* A listener with an associated priority.
*/
private class PrioritizedListener implements Comparable<PrioritizedListener> {
private PacketListener listener;
private ListenerPriority priority;
public PrioritizedListener(PacketListener listener, ListenerPriority priority) {
this.listener = listener;
this.priority = priority;
}
@Override
public int compareTo(PrioritizedListener other) {
// This ensures that lower priority listeners are executed first
return Ints.compare(this.getPriority().getSlot(),
other.getPriority().getSlot());
}
// Note that this equals() method is NOT consistent with compareTo().
// But, it's a private class so who cares.
@Override
public boolean equals(Object obj) {
// We only care about the listener - priority itself should not make a difference
if(obj instanceof PrioritizedListener){
final PrioritizedListener other = (PrioritizedListener) obj;
return Objects.equal(listener, other.listener);
} else {
return false;
}
}
public PacketListener getListener() {
return listener;
}
public ListenerPriority getPriority() {
return priority;
}
}
}

View File

@ -97,8 +97,8 @@ public final class PacketFilterManager implements ProtocolManager {
private Set<Integer> sendingFilters = Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>());
// The two listener containers
private ConcurrentListenerMultimap recievedListeners = new ConcurrentListenerMultimap();
private ConcurrentListenerMultimap sendingListeners = new ConcurrentListenerMultimap();
private SortedPacketListenerList recievedListeners = new SortedPacketListenerList();
private SortedPacketListenerList sendingListeners = new SortedPacketListenerList();
// Whether or not this class has been closed
private boolean hasClosed;

View File

@ -0,0 +1,59 @@
package com.comphenix.protocol.injector;
import com.comphenix.protocol.events.ListenerPriority;
import com.google.common.base.Objects;
import com.google.common.primitives.Ints;
/**
* Represents a listener with a priority.
*
* @author Kristian
*/
public class PrioritizedListener<TListener> implements Comparable<PrioritizedListener<TListener>> {
private TListener listener;
private ListenerPriority priority;
public PrioritizedListener(TListener listener, ListenerPriority priority) {
this.listener = listener;
this.priority = priority;
}
@Override
public int compareTo(PrioritizedListener<TListener> other) {
// This ensures that lower priority listeners are executed first
return Ints.compare(
this.getPriority().getSlot(),
other.getPriority().getSlot());
}
// Note that this equals() method is NOT consistent with compareTo().
// But, it's a private class so who cares.
@SuppressWarnings("unchecked")
@Override
public boolean equals(Object obj) {
// We only care about the listener - priority itself should not make a difference
if(obj instanceof PrioritizedListener){
final PrioritizedListener<TListener> other = (PrioritizedListener<TListener>) obj;
return Objects.equal(listener, other.listener);
} else {
return false;
}
}
/**
* Retrieve the underlying listener.
* @return Underlying listener.
*/
public TListener getListener() {
return listener;
}
/**
* Retrieve the priority of this listener.
* @return Listener priority.
*/
public ListenerPriority getPriority() {
return priority;
}
}

View File

@ -0,0 +1,70 @@
package com.comphenix.protocol.injector;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.comphenix.protocol.concurrency.AbstractConcurrentListenerMultimap;
import com.comphenix.protocol.concurrency.SortedCopyOnWriteArray;
import com.comphenix.protocol.events.PacketAdapter;
import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.events.PacketListener;
/**
* A thread-safe implementation of a listener multimap.
*
* @author Kristian
*/
class SortedPacketListenerList extends AbstractConcurrentListenerMultimap<PacketListener> {
/**
* Invokes the given packet event for every registered listener.
* @param logger - the logger that will be used to inform about listener exceptions.
* @param event - the packet event to invoke.
*/
public void invokePacketRecieving(Logger logger, PacketEvent event) {
SortedCopyOnWriteArray<PrioritizedListener<PacketListener>> list = listeners.get(event.getPacketID());
if (list == null)
return;
// We have to be careful. Cannot modify the underlying list when sending notifications.
synchronized (list) {
for (PrioritizedListener<PacketListener> element : list) {
try {
element.getListener().onPacketReceiving(event);
} catch (Throwable e) {
// Minecraft doesn't want your Exception.
logger.log(Level.SEVERE,
"Exception occured in onPacketReceiving() for " +
PacketAdapter.getPluginName(element.getListener()), e);
}
}
}
}
/**
* Invokes the given packet event for every registered listener.
* @param logger - the logger that will be used to inform about listener exceptions.
* @param event - the packet event to invoke.
*/
public void invokePacketSending(Logger logger, PacketEvent event) {
SortedCopyOnWriteArray<PrioritizedListener<PacketListener>> list = listeners.get(event.getPacketID());
if (list == null)
return;
synchronized (list) {
for (PrioritizedListener<PacketListener> element : list) {
try {
element.getListener().onPacketSending(event);
} catch (Throwable e) {
// Minecraft doesn't want your Exception.
logger.log(Level.SEVERE,
"Exception occured in onPacketReceiving() for " +
PacketAdapter.getPluginName(element.getListener()), e);
}
}
}
}
}