Make it possible for threads to delay packet transmission.

Threads can now increment a shared counter indicating that a packet
should not be transmitted after the default packet listener 
processing. This can be useful if a packet listener needs information
from additional packets before it can complete.

Packet listeners that whish to use this method begin by calling
incrementPacketDelay(). It is then responsible for calling
signalPacketTransmission() when it's done waiting. All processing
on PacketEvents outside packet listeners must be synchronized
with getProcessingLock().
This commit is contained in:
Kristian S. Stangeland 2012-10-10 04:41:07 +02:00
parent 20792aa09a
commit 7ed0bc82dd
6 changed files with 98 additions and 14 deletions

View File

@ -15,7 +15,6 @@ import com.comphenix.protocol.events.PacketListener;
* @author Kristian * @author Kristian
*/ */
public interface AsynchronousManager { public interface AsynchronousManager {
/** /**
* Registers an asynchronous packet handler. * Registers an asynchronous packet handler.
* <p> * <p>
@ -74,4 +73,13 @@ public interface AsynchronousManager {
* Remove listeners, close threads and transmit every delayed packet. * Remove listeners, close threads and transmit every delayed packet.
*/ */
public abstract void cleanupAll(); public abstract void cleanupAll();
/**
* Signal that a packet is ready to be transmitted.
* <p>
* This should only be called if {@link com.comphenix.protocol.async.AsyncMarker#incrementProcessingDelay() AsyncMarker.incrementProcessingDelay()}
* has been called previously.
* @param packet - packet to signal.
*/
public abstract void signalPacketTransmission(PacketEvent packet);
} }

View File

@ -256,14 +256,27 @@ public class AsyncFilterManager implements AsynchronousManager {
serverQueue.cleanupAll(); serverQueue.cleanupAll();
} }
@Override
public void signalPacketTransmission(PacketEvent packet) {
signalPacketTransmission(packet, onMainThread());
}
/** /**
* Signal that a packet is ready to be transmitted. * Signal that a packet is ready to be transmitted.
* @param packet - packet to signal. * @param packet - packet to signal.
* @param onMainThread - whether or not this method was run by the main thread.
*/ */
public void signalPacketUpdate(PacketEvent packet) { private void signalPacketTransmission(PacketEvent packet, boolean onMainThread) {
getSendingQueue(packet).signalPacketUpdate(packet, onMainThread()); if (packet.getAsyncMarker() == null)
throw new IllegalArgumentException(
"A sync packet cannot be transmitted by the asynchronous manager.");
// Only send if the packet is ready
if (packet.getAsyncMarker().decrementProcessingDelay() == 0) {
getSendingQueue(packet).signalPacketUpdate(packet, onMainThread);
}
} }
/** /**
* Retrieve the sending queue this packet belongs to. * Retrieve the sending queue this packet belongs to.
* @param packet - the packet. * @param packet - the packet.
@ -277,7 +290,7 @@ public class AsyncFilterManager implements AsynchronousManager {
* Signal that a packet has finished processing. * Signal that a packet has finished processing.
* @param packet - packet to signal. * @param packet - packet to signal.
*/ */
public void signalProcessingDone(PacketEvent packet) { public void signalFreeProcessingSlot(PacketEvent packet) {
getProcessingQueue(packet).signalProcessingDone(); getProcessingQueue(packet).signalProcessingDone();
} }

View File

@ -371,10 +371,12 @@ public class AsyncListenerHandler {
marker.setListenerHandler(this); marker.setListenerHandler(this);
marker.setWorkerID(workerID); marker.setWorkerID(workerID);
if (packet.isServerPacket()) synchronized (marker.getProcessingLock()) {
listener.onPacketSending(packet); if (packet.isServerPacket())
else listener.onPacketSending(packet);
listener.onPacketReceiving(packet); else
listener.onPacketReceiving(packet);
}
} catch (Throwable e) { } catch (Throwable e) {
// Minecraft doesn't want your Exception. // Minecraft doesn't want your Exception.
@ -393,8 +395,10 @@ public class AsyncListenerHandler {
} }
// There are no more listeners - queue the packet for transmission // There are no more listeners - queue the packet for transmission
filterManager.signalPacketUpdate(packet); filterManager.signalFreeProcessingSlot(packet);
filterManager.signalProcessingDone(packet);
// Note that listeners can opt to delay the packet transmission
filterManager.signalPacketTransmission(packet);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {

View File

@ -6,6 +6,7 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import net.minecraft.server.Packet; import net.minecraft.server.Packet;
@ -61,12 +62,18 @@ public class AsyncMarker implements Serializable, Comparable<AsyncMarker> {
// Whether or not the packet has been processed by the listeners // Whether or not the packet has been processed by the listeners
private volatile boolean processed; private volatile boolean processed;
// Whether or not to delay processing
private AtomicInteger processingDelay = new AtomicInteger();
// Whether or not the packet has been sent // Whether or not the packet has been sent
private volatile boolean transmitted; private volatile boolean transmitted;
// Whether or not the asynchronous processing itself should be cancelled // Whether or not the asynchronous processing itself should be cancelled
private volatile boolean asyncCancelled; private volatile boolean asyncCancelled;
// Used to synchronize processing on the shared PacketEvent
private Object processingLock = new Object();
// Used to identify the asynchronous worker // Used to identify the asynchronous worker
private AsyncListenerHandler listenerHandler; private AsyncListenerHandler listenerHandler;
private int workerID; private int workerID;
@ -178,6 +185,55 @@ public class AsyncMarker implements Serializable, Comparable<AsyncMarker> {
this.processed = processed; this.processed = processed;
} }
/**
* Increment the number of times this packet must be signalled as done before its transmitted.
* <p>
* This is useful if an asynchronous listener is waiting for further information before the
* packet can be sent to the user. A packet listener <b>MUST</b> eventually call signalPacketUpdate,
* even if the packet is cancelled, after this method is called.
* <p>
* It is recommended that processing outside a packet listener is wrapped in a synchronized block
* using the {@link #getProcessingLock()} method.
* <p>
* To decrement the processing delay, call signalPacketUpdate. A thread that calls this method
* multiple times must call signalPacketUpdate at least that many times.
* @return The new processing delay.
*/
public int incrementProcessingDelay() {
return processingDelay.incrementAndGet();
}
/**
* Decrement the number of times this packet must be signalled as done before it's transmitted.
* @return The new processing delay. If zero, the packet should be sent.
*/
int decrementProcessingDelay() {
return processingDelay.decrementAndGet();
}
/**
* Retrieve the number of times a packet must be signalled to be done before it's sent.
* @return Number of processing delays.
*/
public int getProcessingDelay() {
return processingDelay.get();
}
/**
* Processing lock used to synchronize access to the parent PacketEvent and PacketContainer.
* <p>
* This lock is automatically acquired for every asynchronous packet listener. It should only be
* used to synchronize access to a PacketEvent if it's processing has been delayed.
* @return A processing lock.
*/
public Object getProcessingLock() {
return processingLock;
}
public void setProcessingLock(Object processingLock) {
this.processingLock = processingLock;
}
/** /**
* Retrieve whether or not this packet has already been sent. * Retrieve whether or not this packet has already been sent.
* @return TRUE if it has been sent before, FALSE otherwise. * @return TRUE if it has been sent before, FALSE otherwise.
@ -276,7 +332,7 @@ public class AsyncMarker implements Serializable, Comparable<AsyncMarker> {
* @param event - the packet to send. * @param event - the packet to send.
* @throws IOException If the packet couldn't be sent. * @throws IOException If the packet couldn't be sent.
*/ */
public void sendPacket(PacketEvent event) throws IOException { void sendPacket(PacketEvent event) throws IOException {
try { try {
if (event.isServerPacket()) { if (event.isServerPacket()) {
packetStream.sendServerPacket(event.getPlayer(), event.getPacket(), false); packetStream.sendServerPacket(event.getPlayer(), event.getPacket(), false);

View File

@ -100,6 +100,8 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap<AsyncList
AsyncMarker marker = packet.getAsyncMarker(); AsyncMarker marker = packet.getAsyncMarker();
Collection<PrioritizedListener<AsyncListenerHandler>> list = getListener(packet.getPacketID()); Collection<PrioritizedListener<AsyncListenerHandler>> list = getListener(packet.getPacketID());
marker.incrementProcessingDelay();
// Yes, removing the marker will cause the chain to stop // Yes, removing the marker will cause the chain to stop
if (list != null) { if (list != null) {
Iterator<PrioritizedListener<AsyncListenerHandler>> iterator = list.iterator(); Iterator<PrioritizedListener<AsyncListenerHandler>> iterator = list.iterator();
@ -112,7 +114,8 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap<AsyncList
} }
// The packet has no further listeners. Just send it. // The packet has no further listeners. Just send it.
sendingQueue.signalPacketUpdate(packet, onMainThread); if (marker.decrementProcessingDelay() == 0)
sendingQueue.signalPacketUpdate(packet, onMainThread);
signalProcessingDone(); signalProcessingDone();
} else { } else {

View File

@ -43,7 +43,7 @@ class PacketSendingQueue {
/** /**
* Enqueue a packet for sending. * Enqueue a packet for sending.
* @param packet * @param packet - packet to queue.
*/ */
public void enqueue(PacketEvent packet) { public void enqueue(PacketEvent packet) {
sendingQueue.add(new PacketEventHolder(packet)); sendingQueue.add(new PacketEventHolder(packet));