From db8db1fba198fe1426641eabdca3a8e7b8f03e38 Mon Sep 17 00:00:00 2001 From: "Kristian S. Stangeland" Date: Thu, 4 Oct 2012 05:14:17 +0200 Subject: [PATCH] Make it possible to identify the current worker. --- .../protocol/async/AsyncListenerHandler.java | 20 ++++++--- .../comphenix/protocol/async/AsyncMarker.java | 44 ++++++++++++++++++- .../protocol/async/AsyncRunnable.java | 6 +++ 3 files changed, 62 insertions(+), 8 deletions(-) diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java index f5385fad..137327b8 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncListenerHandler.java @@ -30,6 +30,9 @@ public class AsyncListenerHandler { */ private static final PacketEvent WAKEUP_PACKET = new PacketEvent(new Object()); + // Unique worker ID + private static final AtomicInteger nextID = new AtomicInteger(); + // Default queue capacity private static int DEFAULT_CAPACITY = 1024; @@ -39,9 +42,6 @@ public class AsyncListenerHandler { // Number of worker threads private final AtomicInteger started = new AtomicInteger(); - // Unique local worker ID - private final AtomicInteger nextID = new AtomicInteger(); - // The packet listener private PacketListener listener; @@ -132,12 +132,17 @@ public class AsyncListenerHandler { *

* Warning: Never call the run() method in the main thread. */ - public Runnable getListenerLoop() { + public AsyncRunnable getListenerLoop() { return new AsyncRunnable() { private final AtomicBoolean running = new AtomicBoolean(); private volatile int id; + @Override + public int getID() { + return id; + } + @Override public void run() { // Careful now @@ -271,7 +276,7 @@ public class AsyncListenerHandler { } // DO NOT call this method from the main thread - private void listenerLoop(int taskID) { + private void listenerLoop(int workerID) { // Danger, danger! if (Thread.currentThread().getId() == mainThread.getId()) @@ -302,7 +307,7 @@ public class AsyncListenerHandler { // This is a bit slow, but it should be safe synchronized (stopLock) { // Are we the one who is supposed to stop? - if (stoppedTasks.contains(taskID)) + if (stoppedTasks.contains(workerID)) return; if (waitForStops()) return; @@ -311,6 +316,9 @@ public class AsyncListenerHandler { // Here's the core of the asynchronous processing try { + marker.setListenerHandler(this); + marker.setWorkerID(workerID); + if (packet.isServerPacket()) listener.onPacketSending(packet); else diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java index 8cecb1c8..a5e15f0e 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncMarker.java @@ -67,6 +67,10 @@ public class AsyncMarker implements Serializable, Comparable { // Whether or not the asynchronous processing itself should be cancelled private volatile boolean asyncCancelled; + // Used to identify the asynchronous worker + private AsyncListenerHandler listenerHandler; + private int workerID; + // Determine if Minecraft processes this packet asynchronously private static Method isMinecraftAsync; private static boolean alwaysSync; @@ -214,12 +218,48 @@ public class AsyncMarker implements Serializable, Comparable { public void setAsyncCancelled(boolean asyncCancelled) { this.asyncCancelled = asyncCancelled; } - + + /** + * Retrieve the current asynchronous listener handler. + * @return Asychronous listener handler, or NULL if this packet is not asynchronous. + */ + public AsyncListenerHandler getListenerHandler() { + return listenerHandler; + } + + /** + * Set the current asynchronous listener handler. + *

+ * Used by the worker to update the value. + * @param listenerHandler - new listener handler. + */ + void setListenerHandler(AsyncListenerHandler listenerHandler) { + this.listenerHandler = listenerHandler; + } + + /** + * Retrieve the current worker ID. + * @return Current worker ID. + */ + public int getWorkerID() { + return workerID; + } + + /** + * Set the current worker ID. + *

+ * Used by the worker. + * @param workerID - new worker ID. + */ + void setWorkerID(int workerID) { + this.workerID = workerID; + } + /** * Retrieve iterator for the next listener in line. * @return Next async packet listener iterator. */ - public Iterator> getListenerTraversal() { + Iterator> getListenerTraversal() { return listenerTraversal; } diff --git a/ProtocolLib/src/com/comphenix/protocol/async/AsyncRunnable.java b/ProtocolLib/src/com/comphenix/protocol/async/AsyncRunnable.java index 281b71be..7e56a6b6 100644 --- a/ProtocolLib/src/com/comphenix/protocol/async/AsyncRunnable.java +++ b/ProtocolLib/src/com/comphenix/protocol/async/AsyncRunnable.java @@ -7,6 +7,12 @@ package com.comphenix.protocol.async; */ public interface AsyncRunnable extends Runnable { + /** + * Retrieve a unique worker ID. + * @return Unique worker ID. + */ + public int getID(); + /** * Stop the given runnable. *