Processed packets is now sorted by the sending index.

This commit is contained in:
Kristian S. Stangeland 2012-10-01 21:47:19 +02:00
parent 7b35dd954c
commit fe55bb2e56
5 changed files with 285 additions and 29 deletions

View File

@ -53,6 +53,7 @@ public class AsyncFilterManager implements AsynchronousManager {
// Server packets are synchronized already
this.serverQueue = new PacketSendingQueue(false);
// Client packets must be synchronized
this.clientQueue = new PacketSendingQueue(true);

View File

@ -0,0 +1,40 @@
package com.comphenix.protocol.async;
import com.comphenix.protocol.events.PacketEvent;
import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
/**
* Provides a comparable to a packet event.
*
* @author Kristian
*/
class PacketEventHolder implements Comparable<PacketEventHolder> {
private PacketEvent event;
/**
* A wrapper that ensures the packet event is ordered by sending index.
* @param event - packet event to wrap.
*/
public PacketEventHolder(PacketEvent event) {
this.event = Preconditions.checkNotNull(event, "Event must be non-null");
}
/**
* Retrieve the stored event.
* @return The stored event.
*/
public PacketEvent getEvent() {
return event;
}
@Override
public int compareTo(PacketEventHolder other) {
AsyncMarker marker = other != null ? other.getEvent().getAsyncMarker() : null;
return ComparisonChain.start().
compare(event.getAsyncMarker(), marker).
result();
}
}

View File

@ -2,12 +2,14 @@ package com.comphenix.protocol.async;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.Queue;
import java.util.concurrent.Semaphore;
import com.comphenix.protocol.concurrency.AbstractConcurrentListenerMultimap;
import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.injector.PrioritizedListener;
import com.google.common.collect.MinMaxPriorityQueue;
/**
* Handles the processing of every packet type.
@ -16,6 +18,9 @@ import com.comphenix.protocol.injector.PrioritizedListener;
*/
class PacketProcessingQueue extends AbstractConcurrentListenerMultimap<AsyncListenerHandler> {
// Initial number of elements
public static final int INITIAL_CAPACITY = 64;
/**
* Default maximum number of packets to process concurrently.
*/
@ -33,18 +38,23 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap<AsyncList
private Semaphore concurrentProcessing;
// Queued packets for being processed
private ArrayBlockingQueue<PacketEvent> processingQueue;
private Queue<PacketEventHolder> processingQueue;
// Packets for sending
private PacketSendingQueue sendingQueue;
public PacketProcessingQueue(PacketSendingQueue sendingQueue) {
this(sendingQueue, DEFAULT_QUEUE_LIMIT, DEFAULT_MAXIMUM_CONCURRENCY);
this(sendingQueue, INITIAL_CAPACITY, DEFAULT_QUEUE_LIMIT, DEFAULT_MAXIMUM_CONCURRENCY);
}
public PacketProcessingQueue(PacketSendingQueue sendingQueue, int queueLimit, int maximumConcurrency) {
public PacketProcessingQueue(PacketSendingQueue sendingQueue, int initialSize, int maximumSize, int maximumConcurrency) {
super();
this.processingQueue = new ArrayBlockingQueue<PacketEvent>(queueLimit);
this.processingQueue = Synchronization.queue(MinMaxPriorityQueue.
expectedSize(initialSize).
maximumSize(maximumSize).
<PacketEventHolder>create(), null);
this.maximumConcurrency = maximumConcurrency;
this.concurrentProcessing = new Semaphore(maximumConcurrency);
this.sendingQueue = sendingQueue;
@ -58,7 +68,7 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap<AsyncList
*/
public boolean enqueue(PacketEvent packet, boolean onMainThread) {
try {
processingQueue.add(packet);
processingQueue.add(new PacketEventHolder(packet));
// Begin processing packets
signalBeginProcessing(onMainThread);
@ -74,12 +84,13 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap<AsyncList
*/
public void signalBeginProcessing(boolean onMainThread) {
while (concurrentProcessing.tryAcquire()) {
PacketEvent packet = processingQueue.poll();
PacketEventHolder holder = processingQueue.poll();
// Any packet queued?
if (packet != null) {
Collection<PrioritizedListener<AsyncListenerHandler>> list = getListener(packet.getPacketID());
if (holder != null) {
PacketEvent packet = holder.getEvent();
AsyncMarker marker = packet.getAsyncMarker();
Collection<PrioritizedListener<AsyncListenerHandler>> list = getListener(packet.getPacketID());
// Yes, removing the marker will cause the chain to stop
if (list != null) {

View File

@ -1,7 +1,6 @@
package com.comphenix.protocol.async;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -9,7 +8,6 @@ import java.util.concurrent.PriorityBlockingQueue;
import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.reflect.FieldAccessException;
import com.google.common.collect.ComparisonChain;
/**
* Represents packets ready to be transmitted to a client.
@ -17,9 +15,9 @@ import com.google.common.collect.ComparisonChain;
*/
class PacketSendingQueue {
private static final int INITIAL_CAPACITY = 64;
public static final int INITIAL_CAPACITY = 64;
private PriorityBlockingQueue<PacketEvent> sendingQueue;
private PriorityBlockingQueue<PacketEventHolder> sendingQueue;
// Whether or not packet transmission can only occur on the main thread
private final boolean synchronizeMain;
@ -29,16 +27,8 @@ class PacketSendingQueue {
* @param synchronizeMain - whether or not to synchronize with the main thread.
*/
public PacketSendingQueue(boolean synchronizeMain) {
this.sendingQueue = new PriorityBlockingQueue<PacketEventHolder>(INITIAL_CAPACITY);
this.synchronizeMain = synchronizeMain;
this.sendingQueue = new PriorityBlockingQueue<PacketEvent>(INITIAL_CAPACITY, new Comparator<PacketEvent>() {
// Compare using the async marker
@Override
public int compare(PacketEvent o1, PacketEvent o2) {
return ComparisonChain.start().
compare(o1.getAsyncMarker(), o2.getAsyncMarker()).
result();
}
});
}
/**
@ -46,7 +36,7 @@ class PacketSendingQueue {
* @param packet
*/
public void enqueue(PacketEvent packet) {
sendingQueue.add(packet);
sendingQueue.add(new PacketEventHolder(packet));
}
/**
@ -70,7 +60,9 @@ class PacketSendingQueue {
Set<Integer> lookup = new HashSet<Integer>(packetsRemoved);
// Note that this is O(n), so it might be expensive
for (PacketEvent event : sendingQueue) {
for (PacketEventHolder holder : sendingQueue) {
PacketEvent event = holder.getEvent();
if (lookup.contains(event.getPacketID())) {
event.getAsyncMarker().setProcessed(true);
}
@ -88,9 +80,10 @@ class PacketSendingQueue {
// Transmit as many packets as we can
while (true) {
PacketEvent current = sendingQueue.peek();
PacketEventHolder holder = sendingQueue.peek();
if (current != null) {
if (holder != null) {
PacketEvent current = holder.getEvent();
AsyncMarker marker = current.getAsyncMarker();
// Abort if we're not on the main thread
@ -129,10 +122,10 @@ class PacketSendingQueue {
*/
private void forceSend() {
while (true) {
PacketEvent current = sendingQueue.poll();
PacketEventHolder holder = sendingQueue.poll();
if (current != null) {
sendPacket(current);
if (holder != null) {
sendPacket(holder.getEvent());
} else {
break;
}

View File

@ -0,0 +1,211 @@
package com.comphenix.protocol.async;
import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;
import javax.annotation.Nullable;
import com.google.common.base.Preconditions;
/**
* Synchronization views copied from Google Guava.
*
* @author Kristian
*/
class Synchronization {
/**
* Create a synchronized wrapper for the given queue.
* <p>
* This wrapper cannot synchronize the iterator(). Callers are expected
* to synchronize iterators manually.
* @param queue - the queue to synchronize.
* @param mutex - synchronization mutex, or NULL to use the queue.
* @return A synchronization wrapper.
*/
public static <E> Queue<E> queue(Queue<E> queue, @Nullable Object mutex) {
return (queue instanceof SynchronizedQueue) ?
queue :
new SynchronizedQueue<E>(queue, mutex);
}
private static class SynchronizedObject implements Serializable {
private static final long serialVersionUID = -4408866092364554628L;
final Object delegate;
final Object mutex;
SynchronizedObject(Object delegate, @Nullable Object mutex) {
this.delegate = Preconditions.checkNotNull(delegate);
this.mutex = (mutex == null) ? this : mutex;
}
Object delegate() {
return delegate;
}
// No equals and hashCode; see ForwardingObject for details.
@Override
public String toString() {
synchronized (mutex) {
return delegate.toString();
}
}
}
private static class SynchronizedCollection<E> extends SynchronizedObject implements Collection<E> {
private static final long serialVersionUID = 5440572373531285692L;
private SynchronizedCollection(Collection<E> delegate,
@Nullable Object mutex) {
super(delegate, mutex);
}
@SuppressWarnings("unchecked")
@Override
Collection<E> delegate() {
return (Collection<E>) super.delegate();
}
@Override
public boolean add(E e) {
synchronized (mutex) {
return delegate().add(e);
}
}
@Override
public boolean addAll(Collection<? extends E> c) {
synchronized (mutex) {
return delegate().addAll(c);
}
}
@Override
public void clear() {
synchronized (mutex) {
delegate().clear();
}
}
@Override
public boolean contains(Object o) {
synchronized (mutex) {
return delegate().contains(o);
}
}
@Override
public boolean containsAll(Collection<?> c) {
synchronized (mutex) {
return delegate().containsAll(c);
}
}
@Override
public boolean isEmpty() {
synchronized (mutex) {
return delegate().isEmpty();
}
}
@Override
public Iterator<E> iterator() {
return delegate().iterator(); // manually synchronized
}
@Override
public boolean remove(Object o) {
synchronized (mutex) {
return delegate().remove(o);
}
}
@Override
public boolean removeAll(Collection<?> c) {
synchronized (mutex) {
return delegate().removeAll(c);
}
}
@Override
public boolean retainAll(Collection<?> c) {
synchronized (mutex) {
return delegate().retainAll(c);
}
}
@Override
public int size() {
synchronized (mutex) {
return delegate().size();
}
}
@Override
public Object[] toArray() {
synchronized (mutex) {
return delegate().toArray();
}
}
@Override
public <T> T[] toArray(T[] a) {
synchronized (mutex) {
return delegate().toArray(a);
}
}
}
private static class SynchronizedQueue<E> extends SynchronizedCollection<E> implements Queue<E> {
private static final long serialVersionUID = 1961791630386791902L;
SynchronizedQueue(Queue<E> delegate, @Nullable Object mutex) {
super(delegate, mutex);
}
@Override
Queue<E> delegate() {
return (Queue<E>) super.delegate();
}
@Override
public E element() {
synchronized (mutex) {
return delegate().element();
}
}
@Override
public boolean offer(E e) {
synchronized (mutex) {
return delegate().offer(e);
}
}
@Override
public E peek() {
synchronized (mutex) {
return delegate().peek();
}
}
@Override
public E poll() {
synchronized (mutex) {
return delegate().poll();
}
}
@Override
public E remove() {
synchronized (mutex) {
return delegate().remove();
}
}
}
}