Adding the ability to use multiple worker threads.

This commit is contained in:
Kristian S. Stangeland 2012-10-03 23:10:35 +02:00
parent eb8ac33a3e
commit c6fb01e1e1

View File

@ -28,7 +28,7 @@ public class AsyncListenerHandler {
private volatile boolean cancelled; private volatile boolean cancelled;
// If we've started the listener loop before // If we've started the listener loop before
private volatile boolean started; private volatile int started;
// The packet listener // The packet listener
private PacketListener listener; private PacketListener listener;
@ -78,18 +78,26 @@ public class AsyncListenerHandler {
return nullPacketListener; return nullPacketListener;
} }
private String getPluginName() {
return PacketAdapter.getPluginName(listener);
}
/**
* Retrieve the plugin associated with this async listener.
* @return The plugin.
*/
public Plugin getPlugin() {
return listener != null ? listener.getPlugin() : null;
}
/** /**
* Cancel the handler. * Cancel the handler.
*/ */
public void cancel() { public void cancel() {
// Remove the listener as quickly as possible // Remove the listener as quickly as possible
close(); close();
// Poison Pill Shutdown
queuedPackets.clear();
queuedPackets.add(INTERUPT_PACKET);
} }
/** /**
* Queue a packet for processing. * Queue a packet for processing.
* @param packet - a packet for processing. * @param packet - a packet for processing.
@ -116,19 +124,36 @@ public class AsyncListenerHandler {
}; };
} }
/**
* Start a singler worker thread handling the asynchronous.
*/
public void start() {
if (listener.getPlugin() == null)
throw new IllegalArgumentException("Cannot start task without a valid plugin.");
filterManager.scheduleAsyncTask(listener.getPlugin(), getListenerLoop());
}
/**
* Start multiple worker threads for this listener.
* @param count - number of worker threads to start.
*/
public void start(int count) {
for (int i = 0; i < count; i++)
start();
}
// DO NOT call this method from the main thread // DO NOT call this method from the main thread
private void listenerLoop() { private void listenerLoop() {
// Danger, danger! // Danger, danger!
if (Thread.currentThread().getId() == mainThread.getId()) if (Thread.currentThread().getId() == mainThread.getId())
throw new IllegalStateException("Do not call this method from the main thread."); throw new IllegalStateException("Do not call this method from the main thread.");
if (started)
throw new IllegalStateException("A listener cannot be run by multiple threads. Create a new listener instead.");
if (cancelled) if (cancelled)
throw new IllegalStateException("Listener has been cancelled. Create a new listener instead."); throw new IllegalStateException("Listener has been cancelled. Create a new listener instead.");
// Proceed // Proceed
started = true; started++;
try { try {
mainLoop: mainLoop:
@ -171,40 +196,32 @@ public class AsyncListenerHandler {
} catch (InterruptedException e) { } catch (InterruptedException e) {
// We're done // We're done
} finally {
// Clean up
started--;
close();
} }
// Clean up
close();
} }
private void close() { private synchronized void close() {
// Remove the listener itself // Remove the listener itself
if (!cancelled) { if (!cancelled) {
filterManager.unregisterAsyncHandlerInternal(this); filterManager.unregisterAsyncHandlerInternal(this);
cancelled = true; cancelled = true;
started = false;
// Tell every uncancelled thread to end
stopThreads();
} }
} }
private String getPluginName() {
return PacketAdapter.getPluginName(listener);
}
/** /**
* Retrieve the plugin associated with this async listener. * Use the poision pill method to stop every worker thread.
* @return The plugin.
*/ */
public Plugin getPlugin() { private void stopThreads() {
return listener != null ? listener.getPlugin() : null; // Poison Pill Shutdown
} queuedPackets.clear();
/**
* Start the asynchronous listener using the Bukkit scheduler.
*/
public void start() {
if (listener.getPlugin() == null)
throw new IllegalArgumentException("Cannot start task without a valid plugin.");
filterManager.scheduleAsyncTask(listener.getPlugin(), getListenerLoop()); for (int i = 0; i < started; i++)
queuedPackets.add(INTERUPT_PACKET);
} }
} }