From 2de5f2fd3a6a7ad5d0300137b1497fc81e234f78 Mon Sep 17 00:00:00 2001 From: asofold Date: Mon, 10 Apr 2017 14:29:37 +0200 Subject: [PATCH] IQueueRORA as interface, add an implementation using a Lock. + Use in AbstractLogNodeDispatcher. --- .../details/AbstractLogNodeDispatcher.java | 53 ++++++------ .../utilities/ds/corw/IQueueRORA.java | 48 +++++++++++ .../utilities/ds/corw/QueueRORA.java | 55 +++++------- .../utilities/ds/corw/QueueRORAWithLock.java | 85 +++++++++++++++++++ 4 files changed, 179 insertions(+), 62 deletions(-) create mode 100644 NCPCommons/src/main/java/fr/neatmonster/nocheatplus/utilities/ds/corw/IQueueRORA.java create mode 100644 NCPCommons/src/main/java/fr/neatmonster/nocheatplus/utilities/ds/corw/QueueRORAWithLock.java diff --git a/NCPCommons/src/main/java/fr/neatmonster/nocheatplus/logging/details/AbstractLogNodeDispatcher.java b/NCPCommons/src/main/java/fr/neatmonster/nocheatplus/logging/details/AbstractLogNodeDispatcher.java index 31e62a95..555dde1d 100644 --- a/NCPCommons/src/main/java/fr/neatmonster/nocheatplus/logging/details/AbstractLogNodeDispatcher.java +++ b/NCPCommons/src/main/java/fr/neatmonster/nocheatplus/logging/details/AbstractLogNodeDispatcher.java @@ -17,6 +17,7 @@ package fr.neatmonster.nocheatplus.logging.details; import java.util.List; import java.util.logging.Level; +import fr.neatmonster.nocheatplus.utilities.ds.corw.IQueueRORA; import fr.neatmonster.nocheatplus.utilities.ds.corw.QueueRORA; /** @@ -25,19 +26,19 @@ import fr.neatmonster.nocheatplus.utilities.ds.corw.QueueRORA; * */ public abstract class AbstractLogNodeDispatcher implements LogNodeDispatcher { // TODO: Name - + // TODO: Queues might need a drop policy with thresholds. // TODO: Allow multiple tasks for logging, e.g. per file, also thinking of SQL logging. Could pool + round-robin. - + /** * This queue has to be processed in a task within the primary thread with calling runLogsPrimary. */ - protected final QueueRORA> queuePrimary = new QueueRORA>(); - protected final QueueRORA> queueAsynchronous = new QueueRORA>(); - + protected final IQueueRORA> queuePrimary = new QueueRORA>(); + protected final IQueueRORA> queueAsynchronous = new QueueRORA>(); + /** Once a queue reaches this size, it will be reduced (loss of content). */ protected int maxQueueSize = 5000; - + /** * Task id, -1 means the asynchronous task is not running. Synchronize over * queueAsynchronous. Must be maintained. @@ -48,7 +49,7 @@ public abstract class AbstractLogNodeDispatcher implements LogNodeDispatcher { / * taskAsynchronousID, synchronized over queueAsynchronous. */ protected final Runnable taskAsynchronous = new Runnable() { - + @Override public void run() { // TODO: A more sophisticated System to allow "wake up on burst"? @@ -86,9 +87,9 @@ public abstract class AbstractLogNodeDispatcher implements LogNodeDispatcher { / } } } - + }; - + /** * Optional init logger to log errors. Should log to the init stream, no queuing. */ @@ -102,16 +103,16 @@ public abstract class AbstractLogNodeDispatcher implements LogNodeDispatcher { / scheduleLog(node, level, content); } } - + protected boolean runLogsPrimary() { return runLogs(queuePrimary); } - + protected boolean runLogsAsynchronous() { return runLogs(queueAsynchronous); } - - private boolean runLogs(final QueueRORA> queue) { + + private boolean runLogs(final IQueueRORA> queue) { // TODO: Consider allowYield + msYield, calling yield after 5 ms if async. final List> records = queue.removeAll(); if (records.isEmpty()) { @@ -122,7 +123,7 @@ public abstract class AbstractLogNodeDispatcher implements LogNodeDispatcher { / } return true; } - + @Override public void flush(long ms) { if (!isPrimaryThread()) { @@ -130,7 +131,7 @@ public abstract class AbstractLogNodeDispatcher implements LogNodeDispatcher { / throw new IllegalStateException("Must only be called from within the primary thread."); } // TODO: Note that all streams should be emptied here, except the fallback logger. - + // Cancel task. synchronized (queueAsynchronous) { if (taskAsynchronousID != -1) { @@ -150,12 +151,12 @@ public abstract class AbstractLogNodeDispatcher implements LogNodeDispatcher { / // Ignore. } } - + // Log the rest (from here logging should be done via the appropriate direct-only stream). runLogsPrimary(); runLogsAsynchronous(); } - + protected boolean isWithinContext(LogNode node) { switch (node.options.callContext) { case PRIMARY_THREAD_DIRECT: @@ -174,7 +175,7 @@ public abstract class AbstractLogNodeDispatcher implements LogNodeDispatcher { / return false; // Force scheduling. } } - + protected void scheduleLog(LogNode node, Level level, C content) { final LogRecord record = new LogRecord(node, level, content); // TODO: parameters. switch (node.options.callContext) { @@ -201,12 +202,12 @@ public abstract class AbstractLogNodeDispatcher implements LogNodeDispatcher { / throw new IllegalArgumentException("Bad CallContext: " + node.options.callContext); } } - + /** * Hard reduce the queue (heavy locking!). * @param queue */ - private void reduceQueue(final QueueRORA> queue) { + private void reduceQueue(final IQueueRORA> queue) { // TODO: Different dropping strategies (drop first, last, alternate). final int dropped; synchronized (queue) { @@ -220,17 +221,17 @@ public abstract class AbstractLogNodeDispatcher implements LogNodeDispatcher { / } logINIT(Level.WARNING, "Dropped " + dropped + " log entries from the " + (queue == queueAsynchronous ? "asynchronous" : "primary thread") + " queue."); } - + @Override public void setMaxQueueSize(int maxQueueSize) { this.maxQueueSize = maxQueueSize; } - + @Override public void setInitLogger(ContentLogger logger) { this.initLogger = logger; } - + protected void logINIT(final Level level, final String message) { if (initLogger != null) { initLogger.log(level, message); @@ -238,9 +239,9 @@ public abstract class AbstractLogNodeDispatcher implements LogNodeDispatcher { / } protected abstract boolean isPrimaryThread(); - + protected abstract void scheduleAsynchronous(); - + protected abstract void cancelTask(int taskId); - + } diff --git a/NCPCommons/src/main/java/fr/neatmonster/nocheatplus/utilities/ds/corw/IQueueRORA.java b/NCPCommons/src/main/java/fr/neatmonster/nocheatplus/utilities/ds/corw/IQueueRORA.java new file mode 100644 index 00000000..bf5cefe1 --- /dev/null +++ b/NCPCommons/src/main/java/fr/neatmonster/nocheatplus/utilities/ds/corw/IQueueRORA.java @@ -0,0 +1,48 @@ +package fr.neatmonster.nocheatplus.utilities.ds.corw; + +import java.util.List; + +/** + * Thread-safe queue - a replace-on-read-all queue-thing, supposedly exchanging + * the internally stored list by a new empty one under lock, for a very small + * locking time, so it is not really a typical copy-on-read. All methods use + * locking, implementation-specific. + * + * @author asofold + * + */ +public interface IQueueRORA { + + /** + * Add to list. + * + * @param element + * @return Size of queue after adding. + */ + public int add(final E element); + + /** + * + * @return An ordinary List containing all elements. This should be the + * previously internally stored list, to keep locking time minimal. + */ + public List removeAll(); + + /** + * Remove oldest entries until maxSize is reached. + * + * @param maxSize + * @return + */ + public int reduce(final int maxSize); + + + public void clear(); + + + public boolean isEmpty(); + + + public int size(); + +} diff --git a/NCPCommons/src/main/java/fr/neatmonster/nocheatplus/utilities/ds/corw/QueueRORA.java b/NCPCommons/src/main/java/fr/neatmonster/nocheatplus/utilities/ds/corw/QueueRORA.java index da84624d..7f412473 100644 --- a/NCPCommons/src/main/java/fr/neatmonster/nocheatplus/utilities/ds/corw/QueueRORA.java +++ b/NCPCommons/src/main/java/fr/neatmonster/nocheatplus/utilities/ds/corw/QueueRORA.java @@ -19,23 +19,18 @@ import java.util.List; /** - * A replace-on-read-all queue-thing, exchanging the internal list under lock by - * a new empty one, for a small locking time, so it is not really a typical - * copy-on-read. All methods use locking, the QueueRORA instance is used for - * locking. + * IQueueRORA implementation using the synchronized keyword for locking, with a + * LinkedList inside. * - * @author dev1mc + * @author asofold * + * @param */ -public class QueueRORA { - +public class QueueRORA implements IQueueRORA { + private LinkedList elements = new LinkedList(); - - /** - * Add to list (synchronized). - * @param element - * @return Size of queue after adding. - */ + + @Override public int add(final E element) { final int size; synchronized (this) { @@ -44,11 +39,8 @@ public class QueueRORA { } return size; } - - /** - * - * @return An ordinary (linked) List containing all elements. - */ + + @Override public List removeAll() { final List result; synchronized (this) { @@ -57,12 +49,8 @@ public class QueueRORA { } return result; } - - /** - * Remove oldest entries until maxSize is reached. - * @param maxSize - * @return - */ + + @Override public int reduce(final int maxSize) { int dropped = 0; synchronized (this) { @@ -77,15 +65,13 @@ public class QueueRORA { } return dropped; } - + + @Override public void clear() { removeAll(); } - - /** - * - * @return - */ + + @Override public boolean isEmpty() { final boolean isEmpty; synchronized (this) { @@ -93,11 +79,8 @@ public class QueueRORA { } return isEmpty; } - - /** - * - * @return - */ + + @Override public int size() { final int size; synchronized (this) { @@ -105,5 +88,5 @@ public class QueueRORA { } return size; } - + } diff --git a/NCPCommons/src/main/java/fr/neatmonster/nocheatplus/utilities/ds/corw/QueueRORAWithLock.java b/NCPCommons/src/main/java/fr/neatmonster/nocheatplus/utilities/ds/corw/QueueRORAWithLock.java new file mode 100644 index 00000000..322112c0 --- /dev/null +++ b/NCPCommons/src/main/java/fr/neatmonster/nocheatplus/utilities/ds/corw/QueueRORAWithLock.java @@ -0,0 +1,85 @@ +package fr.neatmonster.nocheatplus.utilities.ds.corw; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * IQueueRORA implementation using an external Lock or a ReentrantLock for + * locking, with a LinkedList inside. + * + * @author asofold + * + * @param + */ +public class QueueRORAWithLock implements IQueueRORA { + + private final Lock lock; + private LinkedList elements = new LinkedList(); + + public QueueRORAWithLock() { + this(new ReentrantLock()); + } + + public QueueRORAWithLock(Lock lock) { + this.lock = lock; + } + + @Override + public int add(final E element) { + lock.lock(); + elements.add(element); + final int size = elements.size(); + lock.unlock(); + return size; + } + + @Override + public List removeAll() { + lock.lock(); + final List result = elements; + elements = new LinkedList(); + lock.unlock(); + return result; + } + + @Override + public int reduce(final int maxSize) { + int dropped = 0; + lock.lock(); + final int size = elements.size(); + if (size <= maxSize) { + return dropped; + } + while (dropped < size - maxSize) { + elements.removeFirst(); + dropped ++; + } + lock.unlock(); + return dropped; + } + + @Override + public void clear() { + removeAll(); + } + + @Override + public boolean isEmpty() { + lock.lock(); + final boolean isEmpty = elements.isEmpty(); + lock.unlock(); + return isEmpty; + } + + @Override + public int size() { + // TODO: Could maintain an int, simply. + lock.lock(); + final int size = elements.size(); + lock.unlock(); + return size; + } + +}