Make it possible to identify the current worker.

This commit is contained in:
Kristian S. Stangeland 2012-10-04 05:14:17 +02:00
parent e729583d74
commit db8db1fba1
3 changed files with 62 additions and 8 deletions

View File

@ -30,6 +30,9 @@ public class AsyncListenerHandler {
*/ */
private static final PacketEvent WAKEUP_PACKET = new PacketEvent(new Object()); private static final PacketEvent WAKEUP_PACKET = new PacketEvent(new Object());
// Unique worker ID
private static final AtomicInteger nextID = new AtomicInteger();
// Default queue capacity // Default queue capacity
private static int DEFAULT_CAPACITY = 1024; private static int DEFAULT_CAPACITY = 1024;
@ -39,9 +42,6 @@ public class AsyncListenerHandler {
// Number of worker threads // Number of worker threads
private final AtomicInteger started = new AtomicInteger(); private final AtomicInteger started = new AtomicInteger();
// Unique local worker ID
private final AtomicInteger nextID = new AtomicInteger();
// The packet listener // The packet listener
private PacketListener listener; private PacketListener listener;
@ -132,12 +132,17 @@ public class AsyncListenerHandler {
* <p> * <p>
* <b>Warning</b>: Never call the run() method in the main thread. * <b>Warning</b>: Never call the run() method in the main thread.
*/ */
public Runnable getListenerLoop() { public AsyncRunnable getListenerLoop() {
return new AsyncRunnable() { return new AsyncRunnable() {
private final AtomicBoolean running = new AtomicBoolean(); private final AtomicBoolean running = new AtomicBoolean();
private volatile int id; private volatile int id;
@Override
public int getID() {
return id;
}
@Override @Override
public void run() { public void run() {
// Careful now // Careful now
@ -271,7 +276,7 @@ public class AsyncListenerHandler {
} }
// DO NOT call this method from the main thread // DO NOT call this method from the main thread
private void listenerLoop(int taskID) { private void listenerLoop(int workerID) {
// Danger, danger! // Danger, danger!
if (Thread.currentThread().getId() == mainThread.getId()) if (Thread.currentThread().getId() == mainThread.getId())
@ -302,7 +307,7 @@ public class AsyncListenerHandler {
// This is a bit slow, but it should be safe // This is a bit slow, but it should be safe
synchronized (stopLock) { synchronized (stopLock) {
// Are we the one who is supposed to stop? // Are we the one who is supposed to stop?
if (stoppedTasks.contains(taskID)) if (stoppedTasks.contains(workerID))
return; return;
if (waitForStops()) if (waitForStops())
return; return;
@ -311,6 +316,9 @@ public class AsyncListenerHandler {
// Here's the core of the asynchronous processing // Here's the core of the asynchronous processing
try { try {
marker.setListenerHandler(this);
marker.setWorkerID(workerID);
if (packet.isServerPacket()) if (packet.isServerPacket())
listener.onPacketSending(packet); listener.onPacketSending(packet);
else else

View File

@ -67,6 +67,10 @@ public class AsyncMarker implements Serializable, Comparable<AsyncMarker> {
// Whether or not the asynchronous processing itself should be cancelled // Whether or not the asynchronous processing itself should be cancelled
private volatile boolean asyncCancelled; private volatile boolean asyncCancelled;
// Used to identify the asynchronous worker
private AsyncListenerHandler listenerHandler;
private int workerID;
// Determine if Minecraft processes this packet asynchronously // Determine if Minecraft processes this packet asynchronously
private static Method isMinecraftAsync; private static Method isMinecraftAsync;
private static boolean alwaysSync; private static boolean alwaysSync;
@ -214,12 +218,48 @@ public class AsyncMarker implements Serializable, Comparable<AsyncMarker> {
public void setAsyncCancelled(boolean asyncCancelled) { public void setAsyncCancelled(boolean asyncCancelled) {
this.asyncCancelled = asyncCancelled; this.asyncCancelled = asyncCancelled;
} }
/**
* Retrieve the current asynchronous listener handler.
* @return Asychronous listener handler, or NULL if this packet is not asynchronous.
*/
public AsyncListenerHandler getListenerHandler() {
return listenerHandler;
}
/**
* Set the current asynchronous listener handler.
* <p>
* Used by the worker to update the value.
* @param listenerHandler - new listener handler.
*/
void setListenerHandler(AsyncListenerHandler listenerHandler) {
this.listenerHandler = listenerHandler;
}
/**
* Retrieve the current worker ID.
* @return Current worker ID.
*/
public int getWorkerID() {
return workerID;
}
/**
* Set the current worker ID.
* <p>
* Used by the worker.
* @param workerID - new worker ID.
*/
void setWorkerID(int workerID) {
this.workerID = workerID;
}
/** /**
* Retrieve iterator for the next listener in line. * Retrieve iterator for the next listener in line.
* @return Next async packet listener iterator. * @return Next async packet listener iterator.
*/ */
public Iterator<PrioritizedListener<AsyncListenerHandler>> getListenerTraversal() { Iterator<PrioritizedListener<AsyncListenerHandler>> getListenerTraversal() {
return listenerTraversal; return listenerTraversal;
} }

View File

@ -7,6 +7,12 @@ package com.comphenix.protocol.async;
*/ */
public interface AsyncRunnable extends Runnable { public interface AsyncRunnable extends Runnable {
/**
* Retrieve a unique worker ID.
* @return Unique worker ID.
*/
public int getID();
/** /**
* Stop the given runnable. * Stop the given runnable.
* <p> * <p>