We have to keep the setAsyncMarker method.

This commit is contained in:
Kristian S. Stangeland 2012-09-29 22:12:30 +02:00
parent 65b5a0e8ec
commit e86c3d3a6e
4 changed files with 60 additions and 1 deletions

View File

@ -1,5 +1,6 @@
package com.comphenix.protocol.async;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
@ -95,6 +96,24 @@ public class AsyncFilterManager {
getProcessingQueue(syncPacket).enqueue(newEvent);
}
/**
* Retrieves a immutable set containing the ID of the sent server packets that will be
* observed by the asynchronous listeners.
* @return Every filtered server packet.
*/
public Set<Integer> getSendingFilters() {
return serverProcessingQueue.keySet();
}
/**
* Retrieves a immutable set containing the ID of the recieved client packets that will be
* observed by the asynchronous listeners.
* @return Every filtered client packet.
*/
public Set<Integer> getReceivingFilters() {
return clientProcessingQueue.keySet();
}
/**
* Determine if a given synchronous packet has asynchronous listeners.
* @param packet - packet to test.

View File

@ -1,9 +1,11 @@
package com.comphenix.protocol.async;
import java.io.IOException;
import java.util.Comparator;
import java.util.concurrent.PriorityBlockingQueue;
import com.comphenix.protocol.events.PacketEvent;
import com.google.common.collect.ComparisonChain;
/**
* Represents packets ready to be transmitted to a client.
@ -11,8 +13,22 @@ import com.comphenix.protocol.events.PacketEvent;
*/
class PacketSendingQueue {
private static final int INITIAL_CAPACITY = 64;
private PriorityBlockingQueue<PacketEvent> sendingQueue;
public PacketSendingQueue() {
sendingQueue = new PriorityBlockingQueue<PacketEvent>(INITIAL_CAPACITY, new Comparator<PacketEvent>() {
// Compare using the async marker
@Override
public int compare(PacketEvent o1, PacketEvent o2) {
return ComparisonChain.start().
compare(o1.getAsyncMarker(), o2.getAsyncMarker()).
result();
}
});
}
/**
* Enqueue a packet for sending.
* @param packet

View File

@ -3,6 +3,7 @@ package com.comphenix.protocol.concurrency;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -109,10 +110,18 @@ public abstract class AbstractConcurrentListenerMultimap<TListener> {
* Retrieve every listener.
* @return Every listener.
*/
protected Iterable<PrioritizedListener<TListener>> values() {
public Iterable<PrioritizedListener<TListener>> values() {
return Iterables.concat(listeners.values());
}
/**
* Retrieve every registered packet ID:
* @return Registered packet ID.
*/
public Set<Integer> keySet() {
return listeners.keySet();
}
/**
* Remove all packet listeners.
*/

View File

@ -166,6 +166,21 @@ public class PacketEvent extends EventObject implements Cancellable {
public AsyncMarker getAsyncMarker() {
return asyncMarker;
}
/**
* Set the asynchronous marker.
* <p>
* If the marker is non-null at the end of an synchronous event processing, the packet will be scheduled
* to be processed asynchronously with the given settings.
* <p>
* Note that if there are no asynchronous events that can receive this packet, the marker should be NULL.
* @param asyncMarker - the new asynchronous marker, or NULL.
* @throws IllegalStateException If the current event is asynchronous.
*/
public void setAsyncMarker(AsyncMarker asyncMarker) {
if (isAsynchronous())
throw new IllegalStateException("The marker is immutable for asynchronous events");
this.asyncMarker = asyncMarker;
}
/**
* Determine if the packet event has been executed asynchronously or not.