Connect the asynchronous listeners to the system.

This commit is contained in:
Kristian S. Stangeland 2012-09-29 20:30:22 +02:00
parent 81321383d5
commit fad6a0a99c
7 changed files with 202 additions and 35 deletions

View File

@ -6,6 +6,7 @@ import java.util.logging.Logger;
import org.bukkit.plugin.Plugin;
import com.comphenix.protocol.PacketStream;
import com.comphenix.protocol.events.ListeningWhitelist;
import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.events.PacketListener;
@ -16,8 +17,11 @@ import com.comphenix.protocol.events.PacketListener;
*/
public class AsyncFilterManager {
private PacketProcessingQueue processingQueue;
private PacketSendingQueue sendingQueue;
private PacketProcessingQueue serverProcessingQueue;
private PacketSendingQueue serverQueue;
private PacketProcessingQueue clientProcessingQueue;
private PacketSendingQueue clientQueue;
private PacketStream packetStream;
private Logger logger;
@ -29,8 +33,10 @@ public class AsyncFilterManager {
private AtomicInteger currentSendingIndex = new AtomicInteger();
public AsyncFilterManager(Logger logger, PacketStream packetStream) {
this.sendingQueue = new PacketSendingQueue();
this.processingQueue = new PacketProcessingQueue(sendingQueue);
this.serverQueue = new PacketSendingQueue();
this.clientQueue = new PacketSendingQueue();
this.serverProcessingQueue = new PacketProcessingQueue(serverQueue);
this.clientProcessingQueue = new PacketProcessingQueue(clientQueue);
this.packetStream = packetStream;
this.logger = logger;
@ -40,10 +46,23 @@ public class AsyncFilterManager {
public ListenerToken registerAsyncHandler(Plugin plugin, PacketListener listener) {
ListenerToken token = new ListenerToken(plugin, mainThread, this, listener);
processingQueue.addListener(token, listener.getSendingWhitelist());
// Add listener to either or both processing queue
if (hasValidWhitelist(listener.getSendingWhitelist()))
serverProcessingQueue.addListener(token, listener.getSendingWhitelist());
if (hasValidWhitelist(listener.getReceivingWhitelist()))
clientProcessingQueue.addListener(token, listener.getReceivingWhitelist());
return token;
}
private boolean hasValidWhitelist(ListeningWhitelist whitelist) {
return whitelist != null && whitelist.getWhitelist().size() > 0;
}
/**
* Unregisters and closes the given asynchronous handler.
* @param listenerToken - asynchronous handler.
*/
public void unregisterAsyncHandler(ListenerToken listenerToken) {
if (listenerToken == null)
throw new IllegalArgumentException("listenerToken cannot be NULL");
@ -53,8 +72,14 @@ public class AsyncFilterManager {
// Called by ListenerToken
void unregisterAsyncHandlerInternal(ListenerToken listenerToken) {
// Just remove it from the queue
processingQueue.removeListener(listenerToken, listenerToken.getAsyncListener().getSendingWhitelist());
PacketListener listener = listenerToken.getAsyncListener();
// Just remove it from the queue(s)
if (hasValidWhitelist(listener.getSendingWhitelist()))
serverProcessingQueue.removeListener(listenerToken, listener.getSendingWhitelist());
if (hasValidWhitelist(listener.getReceivingWhitelist()))
clientProcessingQueue.removeListener(listenerToken, listener.getReceivingWhitelist());
}
/**
@ -66,8 +91,25 @@ public class AsyncFilterManager {
PacketEvent newEvent = PacketEvent.fromSynchronous(syncPacket, asyncMarker);
// Start the process
sendingQueue.enqueue(newEvent);
processingQueue.enqueuePacket(newEvent);
getSendingQueue(syncPacket).enqueue(newEvent);
getProcessingQueue(syncPacket).enqueue(newEvent);
}
/**
* Determine if a given synchronous packet has asynchronous listeners.
* @param packet - packet to test.
* @return TRUE if it does, FALSE otherwise.
*/
public boolean hasAsynchronousListeners(PacketEvent packet) {
return getProcessingQueue(packet).getListener(packet.getPacketID()).size() > 0;
}
/**
* Construct a asynchronous marker with all the default values.
* @return Asynchronous marker.
*/
public AsyncMarker createAsyncMarker() {
return createAsyncMarker(AsyncMarker.DEFAULT_SENDING_DELTA, AsyncMarker.DEFAULT_TIMEOUT_DELTA);
}
/**
@ -86,27 +128,61 @@ public class AsyncFilterManager {
return new AsyncMarker(packetStream, sendingIndex, sendingDelta, System.currentTimeMillis(), timeoutDelta);
}
/**
* Retrieve the default packet stream.
* @return Default packet stream.
*/
public PacketStream getPacketStream() {
return packetStream;
}
/**
* Retrieve the default error logger.
* @return Default logger.
*/
public Logger getLogger() {
return logger;
}
PacketProcessingQueue getProcessingQueue() {
return processingQueue;
}
PacketSendingQueue getSendingQueue() {
return sendingQueue;
}
/**
* Remove listeners, close threads and transmit every delayed packet.
*/
public void cleanupAll() {
processingQueue.cleanupAll();
sendingQueue.cleanupAll();
serverProcessingQueue.cleanupAll();
serverQueue.cleanupAll();
}
/**
* Signal that a packet is ready to be transmitted.
* @param packet - packet to signal.
*/
public void signalPacketUpdate(PacketEvent packet) {
getSendingQueue(packet).signalPacketUpdate(packet);
}
/**
* Retrieve the sending queue this packet belongs to.
* @param packet - the packet.
* @return The server or client sending queue the packet belongs to.
*/
private PacketSendingQueue getSendingQueue(PacketEvent packet) {
return packet.isServerPacket() ? serverQueue : clientQueue;
}
/**
* Signal that a packet has finished processing.
* @param packet - packet to signal.
*/
public void signalProcessingDone(PacketEvent packet) {
getProcessingQueue(packet).signalProcessingDone();
}
/**
* Retrieve the processing queue this packet belongs to.
* @param packet - the packet.
* @return The server or client sending processing the packet belongs to.
*/
private PacketProcessingQueue getProcessingQueue(PacketEvent packet) {
return packet.isServerPacket() ? serverProcessingQueue : clientProcessingQueue;
}
}

View File

@ -27,6 +27,11 @@ public class AsyncMarker implements Serializable, Comparable<AsyncMarker> {
*/
public static final int DEFAULT_TIMEOUT_DELTA = 60000;
/**
* Default number of packets to skip.
*/
public static final int DEFAULT_SENDING_DELTA = 0;
/**
* The packet stream responsible for transmitting the packet when it's done processing.
*/
@ -48,8 +53,11 @@ public class AsyncMarker implements Serializable, Comparable<AsyncMarker> {
// Whether or not the packet has been processed by the listeners
private volatile boolean processed;
// Whethre or not the packet has been sent
// Whether or not the packet has been sent
private volatile boolean transmitted;
// Whether or not the asynchronous processing itself should be cancelled
private volatile boolean asyncCancelled;
/**
* Create a container for asyncronous packets.
@ -86,6 +94,14 @@ public class AsyncMarker implements Serializable, Comparable<AsyncMarker> {
return timeout;
}
/**
* Set the time the packet will be forcefully rejected.
* @param timeout - time to reject the packet, in milliseconds since 01.01.1970 00:00.
*/
public void setTimeout(long timeout) {
this.timeout = timeout;
}
/**
* Retrieve the order the packet was originally transmitted.
* @return The original packet index.
@ -154,6 +170,39 @@ public class AsyncMarker implements Serializable, Comparable<AsyncMarker> {
return transmitted;
}
/**
* Determine if this packet has expired.
* @return TRUE if it has, FALSE otherwise.
*/
public boolean hasExpired() {
return hasExpired(System.currentTimeMillis());
}
/**
* Determine if this packet has expired given this time.
* @param currentTime - the current time in milliseconds since 01.01.1970 00:00.
* @return TRUE if it has, FALSE otherwise.
*/
public boolean hasExpired(long currentTime) {
return timeout < currentTime;
}
/**
* Determine if the asynchronous handling should be cancelled.
* @return TRUE if it should, FALSE otherwise.
*/
public boolean isAsyncCancelled() {
return asyncCancelled;
}
/**
* Set whether or not the asynchronous handling should be cancelled.
* @param asyncCancelled - TRUE to cancel it, FALSE otherwise.
*/
public void setAsyncCancelled(boolean asyncCancelled) {
this.asyncCancelled = asyncCancelled;
}
/**
* Retrieve iterator for the next listener in line.
* @return Next async packet listener iterator.

View File

@ -126,8 +126,8 @@ class ListenerToken {
}
// There are no more listeners - queue the packet for transmission
filterManager.getSendingQueue().signalPacketUpdate(packet);
filterManager.getProcessingQueue().signalProcessingDone();
filterManager.signalPacketUpdate(packet);
filterManager.signalProcessingDone(packet);
}
} catch (InterruptedException e) {

View File

@ -55,7 +55,7 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap<ListenerT
* @param packet - packet to process.
* @return TRUE if we sucessfully queued the packet, FALSE if the queue ran out if space.
*/
public boolean enqueuePacket(PacketEvent packet) {
public boolean enqueue(PacketEvent packet) {
try {
processingQueue.add(packet);

View File

@ -32,12 +32,20 @@ class PacketSendingQueue {
while (true) {
PacketEvent current = sendingQueue.peek();
if (current != null && current.getAsyncMarker().isProcessed()) {
sendPacket(current);
sendingQueue.poll();
} else {
break;
if (current != null) {
AsyncMarker marker = current.getAsyncMarker();
if (marker.isProcessed() || marker.hasExpired()) {
if (marker.isProcessed())
sendPacket(current);
sendingQueue.poll();
continue;
}
}
// Only repeat when packets are removed
break;
}
}

View File

@ -158,18 +158,22 @@ public class PacketEvent extends EventObject implements Cancellable {
* <p>
* If the packet is synchronous, this marker will be used to schedule an asynchronous event. In this
* asynchronous event, the marker is used to correctly pass the packet around to the different threads.
* @return The current asynchronous marker.
* <p>
* Note that if there are no asynchronous events that can receive this packet, the marker is NULL.
* @return The current asynchronous marker, or NULL.
*/
public AsyncMarker getAsyncMarker() {
return asyncMarker;
}
/**
* Set the asynchronous marker.
* <p>
* If the marker is non-null at the end of an synchronous event processing, the packet will be scheduled
* to be processed asynchronously with the given settings.
* @param asyncMarker - the new asynchronous marker.
* to be processed asynchronously with the given settings.
* <p>
* Note that if there are no asynchronous events that can receive this packet, the marker should be NULL.
* @param asyncMarker - the new asynchronous marker, or NULL.
*/
public void setAsyncMarker(AsyncMarker asyncMarker) {
this.asyncMarker = asyncMarker;
@ -182,7 +186,7 @@ public class PacketEvent extends EventObject implements Cancellable {
public boolean isAsynchronous() {
return asynchronous;
}
private void writeObject(ObjectOutputStream output) throws IOException {
// Default serialization
output.defaultWriteObject();

View File

@ -47,6 +47,7 @@ import org.bukkit.plugin.PluginManager;
import com.comphenix.protocol.ProtocolManager;
import com.comphenix.protocol.async.AsyncFilterManager;
import com.comphenix.protocol.async.AsyncMarker;
import com.comphenix.protocol.events.*;
import com.comphenix.protocol.reflect.FieldAccessException;
import com.comphenix.protocol.reflect.FuzzyReflection;
@ -284,7 +285,7 @@ public final class PacketFilterManager implements ProtocolManager {
* @param event - the packet event to invoke.
*/
public void invokePacketRecieving(PacketEvent event) {
recievedListeners.invokePacketRecieving(logger, event);
handlePacket(recievedListeners, event);
}
/**
@ -292,7 +293,36 @@ public final class PacketFilterManager implements ProtocolManager {
* @param event - the packet event to invoke.
*/
public void invokePacketSending(PacketEvent event) {
sendingListeners.invokePacketSending(logger, event);
handlePacket(sendingListeners, event);
}
/**
* Handle a packet sending or receiving event.
* <p>
* Note that we also handle asynchronous events.
* @param packetListeners - packet listeners that will receive this event.
* @param event - the evnet to broadcast.
*/
private void handlePacket(SortedPacketListenerList packetListeners, PacketEvent event) {
// By default, asynchronous packets are queued for processing
if (asyncFilterManager.hasAsynchronousListeners(event)) {
event.setAsyncMarker(asyncFilterManager.createAsyncMarker());
}
// Process synchronous events
packetListeners.invokePacketRecieving(logger, event);
// To cancel the asynchronous processing, use the async marker
if (!event.isCancelled() && !hasAsyncCancelled(event.getAsyncMarker())) {
asyncFilterManager.enqueueSyncPacket(event, event.getAsyncMarker());
event.setCancelled(true);
}
}
// NULL marker mean we're dealing with no asynchronous listeners
private boolean hasAsyncCancelled(AsyncMarker marker) {
return marker == null || marker.isAsyncCancelled();
}
/**