IQueueRORA as interface, add an implementation using a Lock.

+ Use in AbstractLogNodeDispatcher.
This commit is contained in:
asofold 2017-04-10 14:29:37 +02:00
parent 73a62a1e13
commit 2de5f2fd3a
4 changed files with 179 additions and 62 deletions

View File

@ -17,6 +17,7 @@ package fr.neatmonster.nocheatplus.logging.details;
import java.util.List;
import java.util.logging.Level;
import fr.neatmonster.nocheatplus.utilities.ds.corw.IQueueRORA;
import fr.neatmonster.nocheatplus.utilities.ds.corw.QueueRORA;
/**
@ -25,19 +26,19 @@ import fr.neatmonster.nocheatplus.utilities.ds.corw.QueueRORA;
*
*/
public abstract class AbstractLogNodeDispatcher implements LogNodeDispatcher { // TODO: Name
// TODO: Queues might need a drop policy with thresholds.
// TODO: Allow multiple tasks for logging, e.g. per file, also thinking of SQL logging. Could pool + round-robin.
/**
* This queue has to be processed in a task within the primary thread with calling runLogsPrimary.
*/
protected final QueueRORA<LogRecord<?>> queuePrimary = new QueueRORA<LogRecord<?>>();
protected final QueueRORA<LogRecord<?>> queueAsynchronous = new QueueRORA<LogRecord<?>>();
protected final IQueueRORA<LogRecord<?>> queuePrimary = new QueueRORA<LogRecord<?>>();
protected final IQueueRORA<LogRecord<?>> queueAsynchronous = new QueueRORA<LogRecord<?>>();
/** Once a queue reaches this size, it will be reduced (loss of content). */
protected int maxQueueSize = 5000;
/**
* Task id, -1 means the asynchronous task is not running. Synchronize over
* queueAsynchronous. Must be maintained.
@ -48,7 +49,7 @@ public abstract class AbstractLogNodeDispatcher implements LogNodeDispatcher { /
* taskAsynchronousID, synchronized over queueAsynchronous.
*/
protected final Runnable taskAsynchronous = new Runnable() {
@Override
public void run() {
// TODO: A more sophisticated System to allow "wake up on burst"?
@ -86,9 +87,9 @@ public abstract class AbstractLogNodeDispatcher implements LogNodeDispatcher { /
}
}
}
};
/**
* Optional init logger to log errors. Should log to the init stream, no queuing.
*/
@ -102,16 +103,16 @@ public abstract class AbstractLogNodeDispatcher implements LogNodeDispatcher { /
scheduleLog(node, level, content);
}
}
protected boolean runLogsPrimary() {
return runLogs(queuePrimary);
}
protected boolean runLogsAsynchronous() {
return runLogs(queueAsynchronous);
}
private boolean runLogs(final QueueRORA<LogRecord<?>> queue) {
private boolean runLogs(final IQueueRORA<LogRecord<?>> queue) {
// TODO: Consider allowYield + msYield, calling yield after 5 ms if async.
final List<LogRecord<?>> records = queue.removeAll();
if (records.isEmpty()) {
@ -122,7 +123,7 @@ public abstract class AbstractLogNodeDispatcher implements LogNodeDispatcher { /
}
return true;
}
@Override
public void flush(long ms) {
if (!isPrimaryThread()) {
@ -130,7 +131,7 @@ public abstract class AbstractLogNodeDispatcher implements LogNodeDispatcher { /
throw new IllegalStateException("Must only be called from within the primary thread.");
}
// TODO: Note that all streams should be emptied here, except the fallback logger.
// Cancel task.
synchronized (queueAsynchronous) {
if (taskAsynchronousID != -1) {
@ -150,12 +151,12 @@ public abstract class AbstractLogNodeDispatcher implements LogNodeDispatcher { /
// Ignore.
}
}
// Log the rest (from here logging should be done via the appropriate direct-only stream).
runLogsPrimary();
runLogsAsynchronous();
}
protected <C> boolean isWithinContext(LogNode<C> node) {
switch (node.options.callContext) {
case PRIMARY_THREAD_DIRECT:
@ -174,7 +175,7 @@ public abstract class AbstractLogNodeDispatcher implements LogNodeDispatcher { /
return false; // Force scheduling.
}
}
protected <C> void scheduleLog(LogNode<C> node, Level level, C content) {
final LogRecord<C> record = new LogRecord<C>(node, level, content); // TODO: parameters.
switch (node.options.callContext) {
@ -201,12 +202,12 @@ public abstract class AbstractLogNodeDispatcher implements LogNodeDispatcher { /
throw new IllegalArgumentException("Bad CallContext: " + node.options.callContext);
}
}
/**
* Hard reduce the queue (heavy locking!).
* @param queue
*/
private void reduceQueue(final QueueRORA<LogRecord<?>> queue) {
private void reduceQueue(final IQueueRORA<LogRecord<?>> queue) {
// TODO: Different dropping strategies (drop first, last, alternate).
final int dropped;
synchronized (queue) {
@ -220,17 +221,17 @@ public abstract class AbstractLogNodeDispatcher implements LogNodeDispatcher { /
}
logINIT(Level.WARNING, "Dropped " + dropped + " log entries from the " + (queue == queueAsynchronous ? "asynchronous" : "primary thread") + " queue.");
}
@Override
public void setMaxQueueSize(int maxQueueSize) {
this.maxQueueSize = maxQueueSize;
}
@Override
public void setInitLogger(ContentLogger<String> logger) {
this.initLogger = logger;
}
protected void logINIT(final Level level, final String message) {
if (initLogger != null) {
initLogger.log(level, message);
@ -238,9 +239,9 @@ public abstract class AbstractLogNodeDispatcher implements LogNodeDispatcher { /
}
protected abstract boolean isPrimaryThread();
protected abstract void scheduleAsynchronous();
protected abstract void cancelTask(int taskId);
}

View File

@ -0,0 +1,48 @@
package fr.neatmonster.nocheatplus.utilities.ds.corw;
import java.util.List;
/**
* Thread-safe queue - a replace-on-read-all queue-thing, supposedly exchanging
* the internally stored list by a new empty one under lock, for a very small
* locking time, so it is not really a typical copy-on-read. All methods use
* locking, implementation-specific.
*
* @author asofold
*
*/
public interface IQueueRORA<E> {
/**
* Add to list.
*
* @param element
* @return Size of queue after adding.
*/
public int add(final E element);
/**
*
* @return An ordinary List containing all elements. This should be the
* previously internally stored list, to keep locking time minimal.
*/
public List<E> removeAll();
/**
* Remove oldest entries until maxSize is reached.
*
* @param maxSize
* @return
*/
public int reduce(final int maxSize);
public void clear();
public boolean isEmpty();
public int size();
}

View File

@ -19,23 +19,18 @@ import java.util.List;
/**
* A replace-on-read-all queue-thing, exchanging the internal list under lock by
* a new empty one, for a small locking time, so it is not really a typical
* copy-on-read. All methods use locking, the QueueRORA instance is used for
* locking.
* IQueueRORA implementation using the synchronized keyword for locking, with a
* LinkedList inside.
*
* @author dev1mc
* @author asofold
*
* @param <E>
*/
public class QueueRORA<E> {
public class QueueRORA<E> implements IQueueRORA<E> {
private LinkedList<E> elements = new LinkedList<E>();
/**
* Add to list (synchronized).
* @param element
* @return Size of queue after adding.
*/
@Override
public int add(final E element) {
final int size;
synchronized (this) {
@ -44,11 +39,8 @@ public class QueueRORA<E> {
}
return size;
}
/**
*
* @return An ordinary (linked) List containing all elements.
*/
@Override
public List<E> removeAll() {
final List<E> result;
synchronized (this) {
@ -57,12 +49,8 @@ public class QueueRORA<E> {
}
return result;
}
/**
* Remove oldest entries until maxSize is reached.
* @param maxSize
* @return
*/
@Override
public int reduce(final int maxSize) {
int dropped = 0;
synchronized (this) {
@ -77,15 +65,13 @@ public class QueueRORA<E> {
}
return dropped;
}
@Override
public void clear() {
removeAll();
}
/**
*
* @return
*/
@Override
public boolean isEmpty() {
final boolean isEmpty;
synchronized (this) {
@ -93,11 +79,8 @@ public class QueueRORA<E> {
}
return isEmpty;
}
/**
*
* @return
*/
@Override
public int size() {
final int size;
synchronized (this) {
@ -105,5 +88,5 @@ public class QueueRORA<E> {
}
return size;
}
}

View File

@ -0,0 +1,85 @@
package fr.neatmonster.nocheatplus.utilities.ds.corw;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* IQueueRORA implementation using an external Lock or a ReentrantLock for
* locking, with a LinkedList inside.
*
* @author asofold
*
* @param <E>
*/
public class QueueRORAWithLock<E> implements IQueueRORA<E> {
private final Lock lock;
private LinkedList<E> elements = new LinkedList<E>();
public QueueRORAWithLock() {
this(new ReentrantLock());
}
public QueueRORAWithLock(Lock lock) {
this.lock = lock;
}
@Override
public int add(final E element) {
lock.lock();
elements.add(element);
final int size = elements.size();
lock.unlock();
return size;
}
@Override
public List<E> removeAll() {
lock.lock();
final List<E> result = elements;
elements = new LinkedList<E>();
lock.unlock();
return result;
}
@Override
public int reduce(final int maxSize) {
int dropped = 0;
lock.lock();
final int size = elements.size();
if (size <= maxSize) {
return dropped;
}
while (dropped < size - maxSize) {
elements.removeFirst();
dropped ++;
}
lock.unlock();
return dropped;
}
@Override
public void clear() {
removeAll();
}
@Override
public boolean isEmpty() {
lock.lock();
final boolean isEmpty = elements.isEmpty();
lock.unlock();
return isEmpty;
}
@Override
public int size() {
// TODO: Could maintain an int, simply.
lock.lock();
final int size = elements.size();
lock.unlock();
return size;
}
}