Make the manual asynchronous worker closable.

Added a good deal of synchronization to deal with closing a 
specific worker. This overhead can be avoided by simply closing 
any given worker.
This commit is contained in:
Kristian S. Stangeland 2012-10-04 03:20:09 +02:00
parent 8a5e5e849b
commit e729583d74
2 changed files with 185 additions and 17 deletions

View File

@ -1,6 +1,9 @@
package com.comphenix.protocol.async;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
@ -18,18 +21,26 @@ import com.comphenix.protocol.events.PacketListener;
public class AsyncListenerHandler {
/**
* Signal an end to the packet processing.
* Signal an end to packet processing.
*/
private static final PacketEvent INTERUPT_PACKET = new PacketEvent(new Object());
/**
* Called when the threads have to wake up for something important.
*/
private static final PacketEvent WAKEUP_PACKET = new PacketEvent(new Object());
// Default queue capacity
private static int DEFAULT_CAPACITY = 1024;
// Cancel the async handler
private volatile boolean cancelled;
// If we've started the listener loop before
private AtomicInteger started = new AtomicInteger();
// Number of worker threads
private final AtomicInteger started = new AtomicInteger();
// Unique local worker ID
private final AtomicInteger nextID = new AtomicInteger();
// The packet listener
private PacketListener listener;
@ -41,6 +52,10 @@ public class AsyncListenerHandler {
// List of queued packets
private ArrayBlockingQueue<PacketEvent> queuedPackets = new ArrayBlockingQueue<PacketEvent>(DEFAULT_CAPACITY);
// List of cancelled tasks
private final Set<Integer> stoppedTasks = new HashSet<Integer>();
private final Object stopLock = new Object();
// Minecraft main thread
private Thread mainThread;
@ -112,15 +127,57 @@ public class AsyncListenerHandler {
}
/**
* Create a runnable that will initiate the listener loop.
* Create a worker that will initiate the listener loop. Note that using stop() to
* close a specific worker is less efficient than stopping an arbitrary worker.
* <p>
* <b>Warning</b>: Never call the run() method in the main thread.
*/
public Runnable getListenerLoop() {
return new Runnable() {
return new AsyncRunnable() {
private final AtomicBoolean running = new AtomicBoolean();
private volatile int id;
@Override
public void run() {
listenerLoop();
// Careful now
if (running.compareAndSet(false, true)) {
id = nextID.incrementAndGet();
listenerLoop(id);
synchronized (stopLock) {
stoppedTasks.remove(id);
notifyAll();
running.set(false);
}
} else {
throw new IllegalStateException(
"This listener loop has already been started. Create a new instead.");
}
}
@Override
public boolean stop() throws InterruptedException {
synchronized (stopLock) {
if (!running.get())
return false;
stoppedTasks.add(id);
// Wake up threads - we have a listener to stop
for (int i = 0; i < getWorkers(); i++) {
queuedPackets.offer(WAKEUP_PACKET);
}
waitForStops();
return true;
}
}
@Override
public boolean isRunning() {
return running.get();
}
};
}
@ -128,9 +185,11 @@ public class AsyncListenerHandler {
/**
* Start a singler worker thread handling the asynchronous.
*/
public void start() {
public synchronized void start() {
if (listener.getPlugin() == null)
throw new IllegalArgumentException("Cannot start task without a valid plugin.");
if (cancelled)
throw new IllegalStateException("Cannot start a worker when the listener is closing.");
filterManager.scheduleAsyncTask(listener.getPlugin(), getListenerLoop());
}
@ -139,32 +198,115 @@ public class AsyncListenerHandler {
* Start multiple worker threads for this listener.
* @param count - number of worker threads to start.
*/
public void start(int count) {
public synchronized void start(int count) {
for (int i = 0; i < count; i++)
start();
}
/**
* Stop a worker thread.
*/
public synchronized void stop() {
queuedPackets.add(INTERUPT_PACKET);
}
/**
* Stop the given amount of worker threads.
* @param count - number of threads to stop.
*/
public synchronized void stop(int count) {
for (int i = 0; i < count; i++)
stop();
}
/**
* Set the current number of workers.
* <p>
* This method can only be called with a count of zero when the listener is closing.
* @param count - new number of workers.
*/
public synchronized void setWorkers(int count) {
if (count < 0)
throw new IllegalArgumentException("Number of workers cannot be less than zero.");
if (count > DEFAULT_CAPACITY)
throw new IllegalArgumentException("Cannot initiate more than " + DEFAULT_CAPACITY + " workers");
if (cancelled && count > 0)
throw new IllegalArgumentException("Cannot add workers when the listener is closing.");
long time = System.currentTimeMillis();
// Try to get to the correct count
while (started.get() != count) {
if (started.get() < count)
start();
else
stop();
// May happen if another thread is doing something similar to "setWorkers"
if ((System.currentTimeMillis() - time) > 1000)
throw new RuntimeException("Failed to set worker count.");
}
}
/**
* Retrieve the current number of registered workers.
* <p>
* Note that the returned value may be out of data.
* @return Number of registered workers.
*/
public synchronized int getWorkers() {
return started.get();
}
/**
* Wait until every tasks scheduled to stop has actually stopped.
* @return TRUE if the current listener should stop, FALSE otherwise.
* @throws InterruptedException - If the current thread was interrupted.
*/
private boolean waitForStops() throws InterruptedException {
while (stoppedTasks.size() > 0 && !cancelled) {
wait();
}
return cancelled;
}
// DO NOT call this method from the main thread
private void listenerLoop() {
private void listenerLoop(int taskID) {
// Danger, danger!
if (Thread.currentThread().getId() == mainThread.getId())
throw new IllegalStateException("Do not call this method from the main thread.");
if (cancelled)
throw new IllegalStateException("Listener has been cancelled. Create a new listener instead.");
// Proceed
started.incrementAndGet();
try {
// Wait if certain threads are stopping
synchronized (stopLock) {
if (waitForStops())
return;
}
// Proceed
started.incrementAndGet();
mainLoop:
while (!cancelled) {
PacketEvent packet = queuedPackets.take();
AsyncMarker marker = packet.getAsyncMarker();
// Handle cancel requests
if (packet == null || marker == null || !packet.isAsynchronous()) {
break;
if (packet == null || marker == null || packet == INTERUPT_PACKET) {
return;
} else if (packet == WAKEUP_PACKET) {
// This is a bit slow, but it should be safe
synchronized (stopLock) {
// Are we the one who is supposed to stop?
if (stoppedTasks.contains(taskID))
return;
if (waitForStops())
return;
}
}
// Here's the core of the asynchronous processing
@ -221,8 +363,11 @@ public class AsyncListenerHandler {
private void stopThreads() {
// Poison Pill Shutdown
queuedPackets.clear();
stop(started.get());
for (int i = 0; i < started.get(); i++)
queuedPackets.add(INTERUPT_PACKET);
// Individual shut down is irrelevant now
synchronized (stopLock) {
notifyAll();
}
}
}

View File

@ -0,0 +1,23 @@
package com.comphenix.protocol.async;
/**
* A runnable representing a asynchronous event listener.
*
* @author Kristian
*/
public interface AsyncRunnable extends Runnable {
/**
* Stop the given runnable.
* <p>
* This may not occur right away.
* @return TRUE if the thread was stopped, FALSE if it was already stopped.
*/
public boolean stop() throws InterruptedException;
/**
* Determine if we're running or not.
* @return TRUE if we're running, FALSE otherwise.
*/
public boolean isRunning();
}