Added synchronous packet processing.

Client packets are typically processed asynchronously (in a client's
reader thread), and should never access the Bukkit API directly,
with a few exceptions. This is problematic if you need to cancel a
packet as a response to the Bukkit API, such as the permission system.

Currently, you will have to either cancel the packet - which is 
discuraged - sync with the main thread and then re-transmit it outside
the filters, or use an asynchronous thread with callSyncMethod and 
wait on the returned future. A better method is needed.

Synchronous processing allows you to run light-weight packet listeners
on the main thread without having to deal with synchronization, 
concurrency or the overhead of an additional thread. It can also
process multiple packets per tick with a configurable timeout.

This, along with 7b9d971238, makes it
easy to delay light-weight packets to be synchronously processed.
This commit is contained in:
Kristian S. Stangeland 2012-11-21 00:15:53 +01:00
parent dd9cb30d25
commit 36f867cafa
2 changed files with 202 additions and 60 deletions

View File

@ -56,29 +56,36 @@ public class AsyncFilterManager implements AsynchronousManager {
private PacketProcessingQueue clientProcessingQueue; private PacketProcessingQueue clientProcessingQueue;
// Sending queues // Sending queues
private PlayerSendingHandler playerSendingHandler; private final PlayerSendingHandler playerSendingHandler;
// Report exceptions // Report exceptions
private ErrorReporter reporter; private final ErrorReporter reporter;
// The likely main thread // The likely main thread
private Thread mainThread; private final Thread mainThread;
// Default scheduler // Default scheduler
private BukkitScheduler scheduler; private final BukkitScheduler scheduler;
// Our protocol manager // Our protocol manager
private ProtocolManager manager; private final ProtocolManager manager;
// Current packet index // Current packet index
private AtomicInteger currentSendingIndex = new AtomicInteger(); private final AtomicInteger currentSendingIndex = new AtomicInteger();
/**
* Initialize a asynchronous filter manager.
* <p>
* <b>Internal method</b>. Retrieve the global asynchronous manager from the protocol manager instead.
* @param reporter - desired error reporter.
* @param scheduler - task scheduler.
* @param manager - protocol manager.
*/
public AsyncFilterManager(ErrorReporter reporter, BukkitScheduler scheduler, ProtocolManager manager) { public AsyncFilterManager(ErrorReporter reporter, BukkitScheduler scheduler, ProtocolManager manager) {
// Initialize timeout listeners // Initialize timeout listeners
serverTimeoutListeners = new SortedPacketListenerList(); this.serverTimeoutListeners = new SortedPacketListenerList();
clientTimeoutListeners = new SortedPacketListenerList(); this.clientTimeoutListeners = new SortedPacketListenerList();
timeoutListeners = Sets.newSetFromMap(new ConcurrentHashMap<PacketListener, Boolean>()); this.timeoutListeners = Sets.newSetFromMap(new ConcurrentHashMap<PacketListener, Boolean>());
this.playerSendingHandler = new PlayerSendingHandler(reporter, serverTimeoutListeners, clientTimeoutListeners); this.playerSendingHandler = new PlayerSendingHandler(reporter, serverTimeoutListeners, clientTimeoutListeners);
this.serverProcessingQueue = new PacketProcessingQueue(playerSendingHandler); this.serverProcessingQueue = new PacketProcessingQueue(playerSendingHandler);
@ -263,12 +270,11 @@ public class AsyncFilterManager implements AsynchronousManager {
} }
/** /**
* Used to create a default asynchronous task. * Retrieve the current task scheduler.
* @param plugin - the calling plugin. * @return Current task scheduler.
* @param runnable - the runnable.
*/ */
public void scheduleAsyncTask(Plugin plugin, Runnable runnable) { public BukkitScheduler getScheduler() {
scheduler.scheduleAsyncDelayedTask(plugin, runnable); return scheduler;
} }
@Override @Override

View File

@ -20,6 +20,7 @@ package com.comphenix.protocol.async;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -75,10 +76,19 @@ public class AsyncListenerHandler {
private final Set<Integer> stoppedTasks = new HashSet<Integer>(); private final Set<Integer> stoppedTasks = new HashSet<Integer>();
private final Object stopLock = new Object(); private final Object stopLock = new Object();
// Processing task on the main thread
private int syncTask = -1;
// Minecraft main thread // Minecraft main thread
private Thread mainThread; private Thread mainThread;
public AsyncListenerHandler(Thread mainThread, AsyncFilterManager filterManager, PacketListener listener) { /**
* Construct a manager for an asynchronous packet handler.
* @param mainThread - the main game thread.
* @param filterManager - the parent filter manager.
* @param listener - the current packet listener.
*/
AsyncListenerHandler(Thread mainThread, AsyncFilterManager filterManager, PacketListener listener) {
if (filterManager == null) if (filterManager == null)
throw new IllegalArgumentException("filterManager cannot be NULL"); throw new IllegalArgumentException("filterManager cannot be NULL");
if (listener == null) if (listener == null)
@ -89,10 +99,18 @@ public class AsyncListenerHandler {
this.listener = listener; this.listener = listener;
} }
/**
* Determine whether or not this asynchronous handler has been cancelled.
* @return TRUE if it has been cancelled/stopped, FALSE otherwise.
*/
public boolean isCancelled() { public boolean isCancelled() {
return cancelled; return cancelled;
} }
/**
* Retrieve the current asynchronous packet listener.
* @return Current packet listener.
*/
public PacketListener getAsyncListener() { public PacketListener getAsyncListener() {
return listener; return listener;
} }
@ -223,7 +241,7 @@ public class AsyncListenerHandler {
final AsyncRunnable listenerLoop = getListenerLoop(); final AsyncRunnable listenerLoop = getListenerLoop();
filterManager.scheduleAsyncTask(listener.getPlugin(), new Runnable() { filterManager.getScheduler().scheduleAsyncDelayedTask(listener.getPlugin(), new Runnable() {
@Override @Override
public void run() { public void run() {
Thread thread = Thread.currentThread(); Thread thread = Thread.currentThread();
@ -271,7 +289,7 @@ public class AsyncListenerHandler {
final AsyncRunnable listenerLoop = getListenerLoop(); final AsyncRunnable listenerLoop = getListenerLoop();
final Function<AsyncRunnable, Void> delegateCopy = executor; final Function<AsyncRunnable, Void> delegateCopy = executor;
filterManager.scheduleAsyncTask(listener.getPlugin(), new Runnable() { filterManager.getScheduler().scheduleAsyncDelayedTask(listener.getPlugin(), new Runnable() {
@Override @Override
public void run() { public void run() {
delegateCopy.apply(listenerLoop); delegateCopy.apply(listenerLoop);
@ -308,6 +326,104 @@ public class AsyncListenerHandler {
return Joiner.on(", ").join(whitelist.getWhitelist()); return Joiner.on(", ").join(whitelist.getWhitelist());
} }
/**
* Start processing packets on the main thread.
* <p>
* This is useful if you need to synchronize with the main thread in your packet listener, but
* you're not performing any expensive processing.
* <p>
* <b>Note</b>: Use a asynchronous worker if the packet listener may use more than 0.5 ms
* of processing time on a single packet. Do as much as possible on the worker thread, and schedule synchronous tasks
* to use the Bukkit API instead.
* @return TRUE if the synchronized processing was successfully started, FALSE if it's already running.
* @throws IllegalStateException If we couldn't start the underlying task.
*/
public synchronized boolean syncStart() {
return syncStart(500, TimeUnit.MICROSECONDS);
}
/**
* Start processing packets on the main thread.
* <p>
* This is useful if you need to synchronize with the main thread in your packet listener, but
* you're not performing any expensive processing.
* <p>
* The processing time parameter gives the upper bound for the amount of time spent processing pending packets.
* It should be set to a fairly low number, such as 0.5 ms or 1% of a game tick - to reduce the impact
* on the main thread. Never go beyond 50 milliseconds.
* <p>
* <b>Note</b>: Use a asynchronous worker if the packet listener may exceed the ideal processing time
* on a single packet. Do as much as possible on the worker thread, and schedule synchronous tasks
* to use the Bukkit API instead.
*
* @param time - the amount of processing time alloted per game tick (20 ticks per second).
* @param unit - the unit of the processingTime argument.
* @return TRUE if the synchronized processing was successfully started, FALSE if it's already running.
* @throws IllegalStateException If we couldn't start the underlying task.
*/
public synchronized boolean syncStart(final long time, final TimeUnit unit) {
if (time <= 0)
throw new IllegalArgumentException("Time must be greater than zero.");
if (unit == null)
throw new IllegalArgumentException("TimeUnit cannot be NULL.");
final long tickDelay = 1;
final int workerID = nextID.incrementAndGet();
if (syncTask < 0) {
syncTask = filterManager.getScheduler().scheduleSyncRepeatingTask(getPlugin(), new Runnable() {
@Override
public void run() {
long stopTime = System.nanoTime() + unit.convert(time, TimeUnit.NANOSECONDS);
while (!cancelled) {
PacketEvent packet = queuedPackets.poll();
if (packet == INTERUPT_PACKET || packet == WAKEUP_PACKET) {
// Sorry, asynchronous threads!
queuedPackets.add(packet);
// Try again next tick
break;
} else if (packet != null && packet.getAsyncMarker() != null) {
processPacket(workerID, packet, "onSyncPacket()");
} else {
// No more packets left - wait a tick
break;
}
// Check time here, ensuring that we at least process one packet
if (System.nanoTime() < stopTime)
break;
}
}
}, tickDelay, tickDelay);
// This is very bad - force the caller to handle it
if (syncTask < 0)
throw new IllegalStateException("Cannot start synchronous task.");
else
return true;
} else {
return false;
}
}
/**
* Stop processing packets on the main thread.
* @return TRUE if we stopped any processing tasks, FALSE if it has already been stopped.
*/
public synchronized boolean syncStop() {
if (syncTask > 0) {
filterManager.getScheduler().cancelTask(syncTask);
syncTask = -1;
return true;
} else {
return false;
}
}
/** /**
* Start multiple worker threads for this listener. * Start multiple worker threads for this listener.
* @param count - number of worker threads to start. * @param count - number of worker threads to start.
@ -386,9 +502,13 @@ public class AsyncListenerHandler {
} }
} }
// DO NOT call this method from the main thread /**
* The main processing loop of asynchronous threads.
* <p>
* Note: DO NOT call this method from the main thread
* @param workerID - the current worker ID.
*/
private void listenerLoop(int workerID) { private void listenerLoop(int workerID) {
// Danger, danger! // Danger, danger!
if (Thread.currentThread().getId() == mainThread.getId()) if (Thread.currentThread().getId() == mainThread.getId())
throw new IllegalStateException("Do not call this method from the main thread."); throw new IllegalStateException("Do not call this method from the main thread.");
@ -403,16 +523,11 @@ public class AsyncListenerHandler {
// Proceed // Proceed
started.incrementAndGet(); started.incrementAndGet();
mainLoop:
while (!cancelled) { while (!cancelled) {
PacketEvent packet = queuedPackets.take(); PacketEvent packet = queuedPackets.take();
AsyncMarker marker = packet.getAsyncMarker();
// Handle cancel requests // Handle cancel requests
if (packet == null || marker == null || packet == INTERUPT_PACKET) { if (packet == WAKEUP_PACKET) {
return;
} else if (packet == WAKEUP_PACKET) {
// This is a bit slow, but it should be safe // This is a bit slow, but it should be safe
synchronized (stopLock) { synchronized (stopLock) {
// Are we the one who is supposed to stop? // Are we the one who is supposed to stop?
@ -421,42 +536,13 @@ public class AsyncListenerHandler {
if (waitForStops()) if (waitForStops())
return; return;
} }
} else if (packet == INTERUPT_PACKET) {
return;
} }
// Here's the core of the asynchronous processing if (packet != null && packet.getAsyncMarker() == null) {
try { processPacket(workerID, packet, "onAsyncPacket()");
marker.setListenerHandler(this);
marker.setWorkerID(workerID);
synchronized (marker.getProcessingLock()) {
if (packet.isServerPacket())
listener.onPacketSending(packet);
else
listener.onPacketReceiving(packet);
}
} catch (Throwable e) {
// Minecraft doesn't want your Exception.
filterManager.getErrorReporter().reportMinimal(listener.getPlugin(), "onAsyncPacket()", e);
} }
// Now, get the next non-cancelled listener
if (!marker.hasExpired()) {
for (; marker.getListenerTraversal().hasNext(); ) {
AsyncListenerHandler handler = marker.getListenerTraversal().next().getListener();
if (!handler.isCancelled()) {
handler.enqueuePacket(packet);
continue mainLoop;
}
}
}
// There are no more listeners - queue the packet for transmission
filterManager.signalFreeProcessingSlot(packet);
// Note that listeners can opt to delay the packet transmission
filterManager.signalPacketTransmission(packet);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -464,16 +550,66 @@ public class AsyncListenerHandler {
} finally { } finally {
// Clean up // Clean up
started.decrementAndGet(); started.decrementAndGet();
close();
} }
} }
/**
* Called when a packet is scheduled for processing.
* @param workerID - the current worker ID.
* @param packet - the current packet.
* @param methodName - name of the method.
*/
private void processPacket(int workerID, PacketEvent packet, String methodName) {
AsyncMarker marker = packet.getAsyncMarker();
// Here's the core of the asynchronous processing
try {
synchronized (marker.getProcessingLock()) {
marker.setListenerHandler(this);
marker.setWorkerID(workerID);
if (packet.isServerPacket())
listener.onPacketSending(packet);
else
listener.onPacketReceiving(packet);
}
} catch (Throwable e) {
// Minecraft doesn't want your Exception.
filterManager.getErrorReporter().reportMinimal(listener.getPlugin(), methodName, e);
}
// Now, get the next non-cancelled listener
if (!marker.hasExpired()) {
for (; marker.getListenerTraversal().hasNext(); ) {
AsyncListenerHandler handler = marker.getListenerTraversal().next().getListener();
if (!handler.isCancelled()) {
handler.enqueuePacket(packet);
return;
}
}
}
// There are no more listeners - queue the packet for transmission
filterManager.signalFreeProcessingSlot(packet);
// Note that listeners can opt to delay the packet transmission
filterManager.signalPacketTransmission(packet);
}
/**
* Close all worker threads and the handler itself.
*/
private synchronized void close() { private synchronized void close() {
// Remove the listener itself // Remove the listener itself
if (!cancelled) { if (!cancelled) {
filterManager.unregisterAsyncHandlerInternal(this); filterManager.unregisterAsyncHandlerInternal(this);
cancelled = true; cancelled = true;
// Close processing tasks
syncStop();
// Tell every uncancelled thread to end // Tell every uncancelled thread to end
stopThreads(); stopThreads();
} }