Rewrite scheduler. Fixes BUKKIT-1831, and BUKKIT-845

The new scheduler uses a non-blocking methodology. Combining volatile
references to make a linked reference chain, with the atomic reference
handling the tail, tasks are queued without waiting for locks. The main
thread will no longer limit the length of time spend for scheduled tasks,
but no task will run twice in the same tick. Scheduling a new task inside of
a synchronous task will always run the new task during the same tick,
assuming there is no supplied delay > 0.

Asynchronous tasks are now run using a thread pool. Any thread-local
implemenation should now account for threads being reused between
executions.

Race conditions were carefully examined and the order of logic is now very
important. Each task is placed in a secondary collection before removal from
primary collections. Thus, by reading tasks from the collections in the same
order they travel, it retains state-safety. This does make modifications
less responsive in some situations, as the task may be transitioning before
the modifier accesses it. This cost outweighs the requirement to synchronize
on the scheduler; previously any conflict would be first-come-first-serve,
with the main thread backing out arbitrarily.
This commit is contained in:
Wesley Wolfe 2012-08-14 06:09:51 -05:00 committed by feildmaster
parent 8fdb006143
commit dcd01bf0c0
7 changed files with 563 additions and 648 deletions

View File

@ -0,0 +1,98 @@
package org.bukkit.craftbukkit.scheduler;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import org.apache.commons.lang.UnhandledException;
import org.bukkit.plugin.Plugin;
import org.bukkit.scheduler.BukkitWorker;
class CraftAsyncTask extends CraftTask {
private final LinkedList<BukkitWorker> workers = new LinkedList<BukkitWorker>();
private final Map<Integer, CraftTask> runners;
CraftAsyncTask(final Map<Integer, CraftTask> runners, final Plugin plugin, final Runnable task, final int id, final long delay) {
super(plugin, task, id, delay);
this.runners = runners;
}
@Override
public boolean isSync() {
return false;
}
@Override
public void run() {
final Thread thread = Thread.currentThread();
synchronized(workers) {
if (getPeriod() == -2) {
// Never continue running after cancelled.
// Checking this with the lock is important!
return;
}
workers.add(
new BukkitWorker() {
public Thread getThread() {
return thread;
}
public int getTaskId() {
return CraftAsyncTask.this.getTaskId();
}
public Plugin getOwner() {
return CraftAsyncTask.this.getOwner();
}
});
}
Throwable thrown = null;
try {
super.run();
} catch (final Throwable t) {
thrown = t;
throw new UnhandledException(
String.format(
"Plugin %s generated an exception while executing task %s",
getOwner().getDescription().getFullName(),
getTaskId()),
thrown);
} finally {
// Cleanup is important for any async task, otherwise ghost tasks are everywhere
synchronized(workers) {
try {
final Iterator<BukkitWorker> workers = this.workers.iterator();
boolean removed = false;
while (workers.hasNext()) {
if (workers.next().getThread() == thread) {
workers.remove();
removed = true; // Don't throw exception
break;
}
}
if (!removed) {
throw new IllegalStateException(
String.format(
"Unable to remove worker %s on task %s for %s",
thread.getName(),
getTaskId(),
getOwner().getDescription().getFullName()),
thrown); // We don't want to lose the original exception, if any
}
} finally {
if (getPeriod() < 0 && workers.isEmpty()) {
// At this spot, we know we are the final async task being executed!
// Because we have the lock, nothing else is running or will run because delay < 0
runners.remove(getTaskId());
}
}
}
}
}
LinkedList<BukkitWorker> getWorkers() {
return workers;
}
}

View File

@ -7,100 +7,88 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
public class CraftFuture<T> implements Runnable, Future<T> { import org.bukkit.plugin.Plugin;
class CraftFuture<T> extends CraftTask implements Future<T> {
private final CraftScheduler craftScheduler;
private final Callable<T> callable; private final Callable<T> callable;
private final ObjectContainer<T> returnStore = new ObjectContainer<T>(); private T value;
private boolean done = false; private Exception exception = null;
private boolean running = false;
private boolean cancelled = false;
private Exception e = null;
private int taskId = -1;
CraftFuture(CraftScheduler craftScheduler, Callable<T> callable) { CraftFuture(final Callable<T> callable, final Plugin plugin, final int id) {
super(plugin, null, id, -1l);
this.callable = callable; this.callable = callable;
this.craftScheduler = craftScheduler;
} }
public void run() { public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
synchronized (this) { if (getPeriod() != -1l) {
if (cancelled) { return false;
return;
}
running = true;
}
try {
returnStore.setObject(callable.call());
} catch (Exception e) {
this.e = e;
}
synchronized (this) {
running = false;
done = true;
this.notify();
}
}
public T get() throws InterruptedException, ExecutionException {
try {
return get(0L, TimeUnit.MILLISECONDS);
} catch (TimeoutException te) {}
return null;
}
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
synchronized (this) {
if (isDone()) {
return getResult();
}
this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
return getResult();
}
}
public T getResult() throws ExecutionException {
if (cancelled) {
throw new CancellationException();
}
if (e != null) {
throw new ExecutionException(e);
}
return returnStore.getObject();
}
public boolean isDone() {
synchronized (this) {
return done;
} }
setPeriod(-2l);
return true;
} }
public boolean isCancelled() { public boolean isCancelled() {
synchronized (this) { return getPeriod() == -2l;
return cancelled; }
public boolean isDone() {
final long period = this.getPeriod();
return period != -1l && period != -3l;
}
public T get() throws CancellationException, InterruptedException, ExecutionException {
try {
return get(0, TimeUnit.MILLISECONDS);
} catch (final TimeoutException e) {
throw new Error(e);
} }
} }
public boolean cancel(boolean mayInterruptIfRunning) { public synchronized T get(long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
synchronized (this) { timeout = unit.toMillis(timeout);
if (cancelled) { long period = this.getPeriod();
return false; while (true) {
if (period == -1l || period == -3l) {
this.wait(unit.toMillis(timeout));
period = this.getPeriod();
if (period == -1l || period == -3l) {
if (timeout == 0l) {
continue;
}
throw new TimeoutException();
}
} }
cancelled = true; if (period == -2l) {
if (taskId != -1) { throw new CancellationException();
craftScheduler.cancelTask(taskId);
} }
if (!running && !done) { if (period == -4l) {
return true; if (exception == null) {
} else { return value;
return false; }
throw new ExecutionException(exception);
} }
throw new IllegalStateException("Expected " + -1l + " to " + -4l + ", got " + period);
} }
} }
public void setTaskId(int taskId) { @Override
public void run() {
synchronized (this) { synchronized (this) {
this.taskId = taskId; if (getPeriod() == -2l) {
return;
}
setPeriod(-3l);
}
try {
value = callable.call();
} catch (final Exception e) {
exception = e;
} finally {
synchronized (this) {
setPeriod(-4l);
this.notifyAll();
}
} }
} }
} }

View File

@ -1,385 +1,421 @@
package org.bukkit.craftbukkit.scheduler; package org.bukkit.craftbukkit.scheduler;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.TreeMap; import java.util.PriorityQueue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.lang.Validate;
import org.bukkit.plugin.Plugin; import org.bukkit.plugin.Plugin;
import org.bukkit.scheduler.BukkitScheduler; import org.bukkit.scheduler.BukkitScheduler;
import org.bukkit.scheduler.BukkitTask; import org.bukkit.scheduler.BukkitTask;
import org.bukkit.scheduler.BukkitWorker; import org.bukkit.scheduler.BukkitWorker;
public class CraftScheduler implements BukkitScheduler, Runnable { /**
* The fundamental concepts for this implementation:
* <li>Main thread owns {@link #head} and {@link #currentTick}, but it may be read from any thread</li>
* <li>Main thread exclusively controls {@link #temp} and {@link #pending}.
* They are never to be accessed outside of the main thread; alternatives exist to prevent locking.</li>
* <li>{@link #head} to {@link #tail} act as a linked list/queue, with 1 consumer and infinite producers.
* Adding to the tail is atomic and very efficient; utility method is {@link #handle(CraftTask, long)} or {@link #addTask(CraftTask)}. </li>
* <li>Changing the period on a task is delicate.
* Any future task needs to notify waiting threads.
* Async tasks must be synchronized to make sure that any thread that's finishing will remove itself from {@link #runners}.
* Another utility method is provided for this, {@link #cancelTask(CraftTask)}</li>
* <li>{@link #runners} provides a moderately up-to-date view of active tasks.
* If the linked head to tail set is read, all remaining tasks that were active at the time execution started will be located in runners.</li>
* <li>Async tasks are responsible for removing themselves from runners</li>
* <li>Sync tasks are only to be removed from runners on the main thread when coupled with a removal from pending and temp.</li>
* <li>Most of the design in this scheduler relies on queuing special tasks to perform any data changes on the main thread.
* When executed from inside a synchronous method, the scheduler will be updated before next execution by virtue of the frequent {@link #parsePending()} calls.</li>
*/
public class CraftScheduler implements BukkitScheduler {
private static final Logger logger = Logger.getLogger("Minecraft"); /**
* Counter for IDs. Order doesn't matter, only uniqueness.
*/
private final AtomicInteger ids = new AtomicInteger(1);
/**
* Current head of linked-list. This reference is always stale, {@link CraftTask#next} is the live reference.
*/
private volatile CraftTask head = new CraftTask();
/**
* Tail of a linked-list. AtomicReference only matters when adding to queue
*/
private final AtomicReference<CraftTask> tail = new AtomicReference<CraftTask>(head);
/**
* Main thread logic only
*/
private final PriorityQueue<CraftTask> pending = new PriorityQueue<CraftTask>(10,
new Comparator<CraftTask>() {
public int compare(final CraftTask o1, final CraftTask o2) {
return (int) (o1.getNextRun() - o2.getNextRun());
}
});
/**
* Main thread logic only
*/
private final List<CraftTask> temp = new ArrayList<CraftTask>();
/**
* These are tasks that are currently active. It's provided for 'viewing' the current state.
*/
private final ConcurrentHashMap<Integer, CraftTask> runners = new ConcurrentHashMap<Integer, CraftTask>();
private volatile int currentTick = -1;
private final Executor executor = Executors.newCachedThreadPool();
private final CraftThreadManager craftThreadManager = new CraftThreadManager(); public int scheduleSyncDelayedTask(final Plugin plugin, final Runnable task) {
return this.scheduleSyncDelayedTask(plugin, task, 0l);
private final LinkedList<CraftTask> mainThreadQueue = new LinkedList<CraftTask>();
private final LinkedList<CraftTask> syncedTasks = new LinkedList<CraftTask>();
private final TreeMap<CraftTask, Boolean> schedulerQueue = new TreeMap<CraftTask, Boolean>();
private Long currentTick = 0L;
// This lock locks the mainThreadQueue and the currentTick value
private final Lock mainThreadLock = new ReentrantLock();
private final Lock syncedTasksLock = new ReentrantLock();
public CraftScheduler() {
Thread t = new Thread(this);
t.start();
} }
public void run() { public int scheduleAsyncDelayedTask(final Plugin plugin, final Runnable task) {
return this.scheduleAsyncDelayedTask(plugin, task, 0l);
}
while (true) { public int scheduleSyncDelayedTask(final Plugin plugin, final Runnable task, final long delay) {
boolean stop = false; return this.scheduleSyncRepeatingTask(plugin, task, delay, -1l);
long firstTick = -1; }
long currentTick = -1;
CraftTask first = null;
do {
synchronized (schedulerQueue) {
first = null;
if (!schedulerQueue.isEmpty()) {
first = schedulerQueue.firstKey();
if (first != null) {
currentTick = getCurrentTick();
firstTick = first.getExecutionTick(); public int scheduleAsyncDelayedTask(final Plugin plugin, final Runnable task, final long delay) {
return this.scheduleAsyncRepeatingTask(plugin, task, delay, -1l);
}
if (currentTick >= firstTick) { public int scheduleSyncRepeatingTask(final Plugin plugin, final Runnable runnable, long delay, long period) {
schedulerQueue.remove(first); validate(plugin, runnable);
processTask(first); if (delay < 0l) {
if (first.getPeriod() >= 0) { delay = 0;
first.updateExecution(); }
schedulerQueue.put(first, first.isSync()); if (period == 0l) {
period = 1l;
} else if (period < -1l) {
period = -1l;
}
return handle(new CraftTask(plugin, runnable, nextId(), period), delay);
}
public int scheduleAsyncRepeatingTask(final Plugin plugin, final Runnable runnable, long delay, long period) {
validate(plugin, runnable);
if (delay < 0l) {
delay = 0;
}
if (period == 0l) {
period = 1l;
} else if (period < -1l) {
period = -1l;
}
return handle(new CraftAsyncTask(runners, plugin, runnable, nextId(), period), delay);
}
public <T> Future<T> callSyncMethod(final Plugin plugin, final Callable<T> task) {
validate(plugin, task);
final CraftFuture<T> future = new CraftFuture<T>(task, plugin, nextId());
handle(future, 0l);
return future;
}
public void cancelTask(final int taskId) {
if (taskId <= 0) {
return;
}
CraftTask task = runners.get(taskId);
if (task != null) {
cancelTask(task);
}
task = new CraftTask(
new Runnable() {
public void run() {
if (!check(CraftScheduler.this.temp)) {
check(CraftScheduler.this.pending);
}
}
private boolean check(final Iterable<CraftTask> collection) {
final Iterator<CraftTask> tasks = collection.iterator();
while (tasks.hasNext()) {
final CraftTask task = tasks.next();
if (task.getTaskId() == taskId) {
cancelTask(task);
tasks.remove();
if (task.isSync()) {
runners.remove(taskId);
} }
} else { return true;
stop = true;
} }
} else {
stop = true;
} }
} else { return false;
stop = true; }});
} handle(task, 0l);
} for (CraftTask taskPending = head.getNext(); taskPending != null; taskPending = taskPending.getNext()) {
} while (!stop); if (taskPending == task) {
return;
long sleepTime = 0;
if (first == null) {
sleepTime = 60000L;
} else {
currentTick = getCurrentTick();
sleepTime = (firstTick - currentTick) * 50 + 25;
} }
if (taskPending.getTaskId() == taskId) {
if (sleepTime < 50L) { cancelTask(taskPending);
sleepTime = 50L;
} else if (sleepTime > 60000L) {
sleepTime = 60000L;
}
synchronized (schedulerQueue) {
try {
schedulerQueue.wait(sleepTime);
} catch (InterruptedException ie) {}
} }
} }
} }
void processTask(CraftTask task) { public void cancelTasks(final Plugin plugin) {
if (task.isSync()) { Validate.notNull(plugin, "Cannot cancel tasks of null plugin");
addToMainThreadQueue(task); final CraftTask task = new CraftTask(
} else { new Runnable() {
craftThreadManager.executeTask(task.getTask(), task.getOwner(), task.getIdNumber()); public void run() {
} check(CraftScheduler.this.pending);
} check(CraftScheduler.this.temp);
// If the main thread cannot obtain the lock, it doesn't wait
public void mainThreadHeartbeat(long currentTick) {
if (syncedTasksLock.tryLock()) {
try {
if (mainThreadLock.tryLock()) {
try {
this.currentTick = currentTick;
while (!mainThreadQueue.isEmpty()) {
syncedTasks.addLast(mainThreadQueue.removeFirst());
}
} finally {
mainThreadLock.unlock();
} }
} void check(final Iterable<CraftTask> collection) {
long breakTime = System.currentTimeMillis() + 35; // max time spent in loop = 35ms final Iterator<CraftTask> tasks = collection.iterator();
while (!syncedTasks.isEmpty() && System.currentTimeMillis() <= breakTime) { while (tasks.hasNext()) {
CraftTask task = syncedTasks.removeFirst(); final CraftTask task = tasks.next();
try { if (task.getOwner().equals(plugin)) {
task.getTask().run(); cancelTask(task);
} catch (Throwable t) { tasks.remove();
// Bad plugin! if (task.isSync()) {
logger.log(Level.WARNING, "Task of '" + task.getOwner().getDescription().getName() + "' generated an exception", t); runners.remove(task.getTaskId());
synchronized (schedulerQueue) { }
schedulerQueue.remove(task); break;
}
} }
} }
} });
} finally { handle(task, 0l);
syncedTasksLock.unlock(); for (CraftTask taskPending = head.getNext(); taskPending != null; taskPending = taskPending.getNext()) {
if (taskPending == task) {
return;
}
if (taskPending.getTaskId() != -1 && taskPending.getOwner().equals(plugin)) {
cancelTask(taskPending);
} }
} }
} for (CraftTask runner : runners.values()) {
if (runner.getOwner().equals(plugin)) {
long getCurrentTick() { cancelTask(runner);
mainThreadLock.lock();
long tempTick = 0;
try {
tempTick = currentTick;
} finally {
mainThreadLock.unlock();
}
return tempTick;
}
void addToMainThreadQueue(CraftTask task) {
mainThreadLock.lock();
try {
mainThreadQueue.addLast(task);
} finally {
mainThreadLock.unlock();
}
}
void wipeSyncedTasks() {
syncedTasksLock.lock();
try {
syncedTasks.clear();
} finally {
syncedTasksLock.unlock();
}
}
void wipeMainThreadQueue() {
mainThreadLock.lock();
try {
mainThreadQueue.clear();
} finally {
mainThreadLock.unlock();
}
}
public int scheduleSyncDelayedTask(Plugin plugin, Runnable task, long delay) {
return scheduleSyncRepeatingTask(plugin, task, delay, -1);
}
public int scheduleSyncDelayedTask(Plugin plugin, Runnable task) {
return scheduleSyncDelayedTask(plugin, task, 0L);
}
public int scheduleSyncRepeatingTask(Plugin plugin, Runnable task, long delay, long period) {
if (plugin == null) {
throw new IllegalArgumentException("Plugin cannot be null");
}
if (task == null) {
throw new IllegalArgumentException("Task cannot be null");
}
if (delay < 0) {
throw new IllegalArgumentException("Delay cannot be less than 0");
}
CraftTask newTask = new CraftTask(plugin, task, true, getCurrentTick() + delay, period);
synchronized (schedulerQueue) {
schedulerQueue.put(newTask, true);
schedulerQueue.notify();
}
return newTask.getIdNumber();
}
public int scheduleAsyncDelayedTask(Plugin plugin, Runnable task, long delay) {
return scheduleAsyncRepeatingTask(plugin, task, delay, -1);
}
public int scheduleAsyncDelayedTask(Plugin plugin, Runnable task) {
return scheduleAsyncDelayedTask(plugin, task, 0L);
}
public int scheduleAsyncRepeatingTask(Plugin plugin, Runnable task, long delay, long period) {
if (plugin == null) {
throw new IllegalArgumentException("Plugin cannot be null");
}
if (task == null) {
throw new IllegalArgumentException("Task cannot be null");
}
if (delay < 0) {
throw new IllegalArgumentException("Delay cannot be less than 0");
}
CraftTask newTask = new CraftTask(plugin, task, false, getCurrentTick() + delay, period);
synchronized (schedulerQueue) {
schedulerQueue.put(newTask, false);
schedulerQueue.notify();
}
return newTask.getIdNumber();
}
public <T> Future<T> callSyncMethod(Plugin plugin, Callable<T> task) {
CraftFuture<T> craftFuture = new CraftFuture<T>(this, task);
synchronized (craftFuture) {
int taskId = scheduleSyncDelayedTask(plugin, craftFuture);
craftFuture.setTaskId(taskId);
}
return craftFuture;
}
public void cancelTask(int taskId) {
syncedTasksLock.lock();
try {
synchronized (schedulerQueue) {
mainThreadLock.lock();
try {
Iterator<CraftTask> itr = schedulerQueue.keySet().iterator();
while (itr.hasNext()) {
CraftTask current = itr.next();
if (current.getIdNumber() == taskId) {
itr.remove();
}
}
itr = mainThreadQueue.iterator();
while (itr.hasNext()) {
CraftTask current = itr.next();
if (current.getIdNumber() == taskId) {
itr.remove();
}
}
itr = syncedTasks.iterator();
while (itr.hasNext()) {
CraftTask current = itr.next();
if (current.getIdNumber() == taskId) {
itr.remove();
}
}
} finally {
mainThreadLock.unlock();
}
} }
} finally {
syncedTasksLock.unlock();
} }
craftThreadManager.interruptTask(taskId);
}
public void cancelTasks(Plugin plugin) {
syncedTasksLock.lock();
try {
synchronized (schedulerQueue) {
mainThreadLock.lock();
try {
Iterator<CraftTask> itr = schedulerQueue.keySet().iterator();
while (itr.hasNext()) {
CraftTask current = itr.next();
if (current.getOwner().equals(plugin)) {
itr.remove();
}
}
itr = mainThreadQueue.iterator();
while (itr.hasNext()) {
CraftTask current = itr.next();
if (current.getOwner().equals(plugin)) {
itr.remove();
}
}
itr = syncedTasks.iterator();
while (itr.hasNext()) {
CraftTask current = itr.next();
if (current.getOwner().equals(plugin)) {
itr.remove();
}
}
} finally {
mainThreadLock.unlock();
}
}
} finally {
syncedTasksLock.unlock();
}
craftThreadManager.interruptTasks(plugin);
} }
public void cancelAllTasks() { public void cancelAllTasks() {
synchronized (schedulerQueue) { final CraftTask task = new CraftTask(
schedulerQueue.clear(); new Runnable() {
} public void run() {
wipeMainThreadQueue(); Iterator<CraftTask> it = CraftScheduler.this.runners.values().iterator();
wipeSyncedTasks(); while (it.hasNext()) {
CraftTask task = it.next();
craftThreadManager.interruptAllTasks(); cancelTask(task);
} if (task.isSync()) {
it.remove();
public boolean isCurrentlyRunning(int taskId) { }
return craftThreadManager.isAlive(taskId); }
} CraftScheduler.this.pending.clear();
CraftScheduler.this.temp.clear();
public boolean isQueued(int taskId) { }
synchronized (schedulerQueue) { });
Iterator<CraftTask> itr = schedulerQueue.keySet().iterator(); handle(task, 0l);
while (itr.hasNext()) { for (CraftTask taskPending = head.getNext(); taskPending != null; taskPending = taskPending.getNext()) {
CraftTask current = itr.next(); if (taskPending == task) {
if (current.getIdNumber() == taskId) { break;
return true;
}
} }
cancelTask(taskPending);
}
for (CraftTask runner : runners.values()) {
cancelTask(runner);
}
}
public boolean isCurrentlyRunning(final int taskId) {
final CraftTask task = runners.get(taskId);
if (task == null || task.isSync()) {
return false; return false;
} }
final CraftAsyncTask asyncTask = (CraftAsyncTask) task;
synchronized (asyncTask.getWorkers()) {
return asyncTask.getWorkers().isEmpty();
}
}
public boolean isQueued(final int taskId) {
if (taskId <= 0) {
return false;
}
for (CraftTask task = head.getNext(); task != null; task = task.getNext()) {
if (task.getTaskId() == taskId) {
return task.getPeriod() >= -1l; // The task will run
}
}
CraftTask task = runners.get(taskId);
return task != null && task.getPeriod() >= -1l;
} }
public List<BukkitWorker> getActiveWorkers() { public List<BukkitWorker> getActiveWorkers() {
synchronized (craftThreadManager.workers) { final ArrayList<BukkitWorker> workers = new ArrayList<BukkitWorker>();
List<BukkitWorker> workerList = new ArrayList<BukkitWorker>(craftThreadManager.workers.size()); for (final CraftTask taskObj : runners.values()) {
Iterator<CraftWorker> itr = craftThreadManager.workers.iterator(); // Iterator will be a best-effort (may fail to grab very new values) if called from an async thread
if (taskObj.isSync()) {
while (itr.hasNext()) { continue;
workerList.add((BukkitWorker) itr.next()); }
final CraftAsyncTask task = (CraftAsyncTask) taskObj;
synchronized (task.getWorkers()) {
// This will never have an issue with stale threads; it's state-safe
workers.addAll(task.getWorkers());
} }
return workerList;
} }
return workers;
} }
public List<BukkitTask> getPendingTasks() { public List<BukkitTask> getPendingTasks() {
List<CraftTask> taskList = null; final ArrayList<CraftTask> truePending = new ArrayList<CraftTask>();
syncedTasksLock.lock(); for (CraftTask task = head.getNext(); task != null; task = task.getNext()) {
try { if (task.getTaskId() != -1) {
synchronized (schedulerQueue) { // -1 is special code
mainThreadLock.lock(); truePending.add(task);
try {
taskList = new ArrayList<CraftTask>(mainThreadQueue.size() + syncedTasks.size() + schedulerQueue.size());
taskList.addAll(mainThreadQueue);
taskList.addAll(syncedTasks);
taskList.addAll(schedulerQueue.keySet());
} finally {
mainThreadLock.unlock();
}
} }
} finally {
syncedTasksLock.unlock();
} }
List<BukkitTask> newTaskList = new ArrayList<BukkitTask>(taskList.size());
for (CraftTask craftTask : taskList) { final ArrayList<BukkitTask> pending = new ArrayList<BukkitTask>();
newTaskList.add((BukkitTask) craftTask); final Iterator<CraftTask> it = runners.values().iterator();
while (it.hasNext()) {
final CraftTask task = it.next();
if (task.getPeriod() >= -1l) {
pending.add(task);
}
} }
return newTaskList;
for (final CraftTask task : truePending) {
if (task.getPeriod() >= -1l && !pending.contains(task)) {
pending.add(task);
}
}
return pending;
} }
/**
* This method is designed to never block or wait for locks; an immediate execution of all current tasks.
*/
public void mainThreadHeartbeat(final int currentTick) {
this.currentTick = currentTick;
final List<CraftTask> temp = this.temp;
parsePending();
while (isReady(currentTick)) {
final CraftTask task = pending.remove();
if (task.getPeriod() < -1l) {
if (task.isSync()) {
runners.remove(task.getTaskId(), task);
}
parsePending();
continue;
}
if (task.isSync()) {
try {
task.run();
} catch (final Throwable throwable) {
task.getOwner().getLogger().log(
Level.WARNING,
String.format(
"Task #%s for %s generated an exception",
task.getTaskId(),
task.getOwner().getDescription().getFullName()),
throwable);
}
parsePending();
} else {
executor.execute(task);
// We don't need to parse pending
// (async tasks must live with race-conditions if they attempt to cancel between these few lines of code)
}
final long period = task.getPeriod(); // State consistency
if (period > 0) {
task.setNextRun(currentTick + period);
temp.add(task);
} else if (task.isSync()) {
runners.remove(task.getTaskId());
}
}
pending.addAll(temp);
temp.clear();
}
private void addTask(final CraftTask task) {
final AtomicReference<CraftTask> tail = this.tail;
CraftTask tailTask = tail.get();
while (!tail.compareAndSet(tailTask, task)) {
tailTask = tail.get();
}
tailTask.setNext(task);
}
private int handle(final CraftTask task, final long delay) {
task.setNextRun(currentTick + delay);
addTask(task);
return task.getTaskId();
}
private static void validate(final Plugin plugin, final Object task) {
Validate.notNull(plugin, "Plugin cannot be null");
Validate.notNull(task, "Task cannot be null");
}
private int nextId() {
return ids.incrementAndGet();
}
private void parsePending() {
CraftTask head = this.head;
CraftTask task = head.getNext();
CraftTask lastTask = head;
for (; task != null; task = (lastTask = task).getNext()) {
if (task.getTaskId() == -1) {
task.run();
} else if (task.getPeriod() >= -1l) {
pending.add(task);
runners.put(task.getTaskId(), task);
}
}
// We split this because of the way things are ordered for all of the async calls in CraftScheduler
// (it prevents race-conditions)
for (task = head; task != lastTask; task = head) {
head = task.getNext();
task.setNext(null);
}
this.head = lastTask;
}
private boolean isReady(final int currentTick) {
return !pending.isEmpty() && pending.peek().getNextRun() <= currentTick;
}
/**
* This method is important to make sure the code is consistent everywhere.
* Synchronizing is needed for future and async to prevent race conditions,
* main thread or otherwise.
* @return True if cancelled
*/
private boolean cancelTask(final CraftTask task) {
if (task.isSync()) {
if (task instanceof CraftFuture) {
synchronized (task) {
if (task.getPeriod() != -1l) {
return false;
}
// This needs to be set INSIDE of the synchronized block
task.setPeriod(-2l);
task.notifyAll();
}
} else {
task.setPeriod(-2l);
}
} else {
synchronized (((CraftAsyncTask) task).getWorkers()) {
// Synchronizing here prevents race condition for a completing task
task.setPeriod(-2l);
}
}
return true;
}
} }

View File

@ -1,112 +1,78 @@
package org.bukkit.craftbukkit.scheduler; package org.bukkit.craftbukkit.scheduler;
import java.lang.Comparable;
import org.bukkit.plugin.Plugin; import org.bukkit.plugin.Plugin;
import org.bukkit.scheduler.BukkitTask; import org.bukkit.scheduler.BukkitTask;
public class CraftTask implements Comparable<Object>, BukkitTask {
class CraftTask implements BukkitTask, Runnable {
private volatile CraftTask next = null;
/**
* -1 means no repeating <br>
* -2 means cancel <br>
* -3 means processing for Future <br>
* -4 means done for Future <br>
* Never 0 <br>
* >0 means number of ticks to wait between each execution
*/
private volatile long period;
private long nextRun;
private final Runnable task; private final Runnable task;
private final boolean syncTask; private final Plugin plugin;
private long executionTick; private final int id;
private final long period;
private final Plugin owner;
private final int idNumber;
private static Integer idCounter = 1; CraftTask() {
private static Object idCounterSync = new Object(); this(null, null, -1, -1);
CraftTask(Plugin owner, Runnable task, boolean syncTask) {
this(owner, task, syncTask, -1, -1);
} }
CraftTask(Plugin owner, Runnable task, boolean syncTask, long executionTick) { CraftTask(final Runnable task) {
this(owner, task, syncTask, executionTick, -1); this(null, task, -1, -1);
} }
CraftTask(Plugin owner, Runnable task, boolean syncTask, long executionTick, long period) { CraftTask(final Plugin plugin, final Runnable task, final int id, final long period) {
this.plugin = plugin;
this.task = task; this.task = task;
this.syncTask = syncTask; this.id = id;
this.executionTick = executionTick;
this.period = period; this.period = period;
this.owner = owner;
this.idNumber = CraftTask.getNextId();
} }
static int getNextId() { public final int getTaskId() {
synchronized (idCounterSync) { return id;
idCounter++;
return idCounter;
}
} }
Runnable getTask() { public final Plugin getOwner() {
return task; return plugin;
} }
public boolean isSync() { public boolean isSync() {
return syncTask; return true;
} }
long getExecutionTick() { public void run() {
return executionTick; task.run();
} }
long getPeriod() { long getPeriod() {
return period; return period;
} }
public Plugin getOwner() { void setPeriod(long period) {
return owner; this.period = period;
} }
void updateExecution() { long getNextRun() {
executionTick += period; return nextRun;
} }
public int getTaskId() { void setNextRun(long nextRun) {
return getIdNumber(); this.nextRun = nextRun;
} }
int getIdNumber() { CraftTask getNext() {
return idNumber; return next;
} }
public int compareTo(Object other) { void setNext(CraftTask next) {
if (!(other instanceof CraftTask)) { this.next = next;
return 0;
} else {
CraftTask o = (CraftTask) other;
long timeDiff = executionTick - o.getExecutionTick();
if (timeDiff > 0) {
return 1;
} else if (timeDiff < 0) {
return -1;
} else {
CraftTask otherCraftTask = (CraftTask) other;
return getIdNumber() - otherCraftTask.getIdNumber();
}
}
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (!(other instanceof CraftTask)) {
return false;
}
CraftTask otherCraftTask = (CraftTask) other;
return otherCraftTask.getIdNumber() == getIdNumber();
}
@Override
public int hashCode() {
return getIdNumber();
} }
} }

View File

@ -1,68 +0,0 @@
package org.bukkit.craftbukkit.scheduler;
import java.util.HashSet;
import java.util.Iterator;
import org.bukkit.plugin.Plugin;
public class CraftThreadManager {
final HashSet<CraftWorker> workers = new HashSet<CraftWorker>();
void executeTask(Runnable task, Plugin owner, int taskId) {
CraftWorker craftWorker = new CraftWorker(this, task, owner, taskId);
synchronized (workers) {
workers.add(craftWorker);
}
}
void interruptTask(int taskId) {
synchronized (workers) {
Iterator<CraftWorker> itr = workers.iterator();
while (itr.hasNext()) {
CraftWorker craftWorker = itr.next();
if (craftWorker.getTaskId() == taskId) {
craftWorker.interrupt();
}
}
}
}
void interruptTasks(Plugin owner) {
synchronized (workers) {
Iterator<CraftWorker> itr = workers.iterator();
while (itr.hasNext()) {
CraftWorker craftWorker = itr.next();
if (craftWorker.getOwner().equals(owner)) {
craftWorker.interrupt();
}
}
}
}
void interruptAllTasks() {
synchronized (workers) {
Iterator<CraftWorker> itr = workers.iterator();
while (itr.hasNext()) {
CraftWorker craftWorker = itr.next();
craftWorker.interrupt();
}
}
}
boolean isAlive(int taskId) {
synchronized (workers) {
Iterator<CraftWorker> itr = workers.iterator();
while (itr.hasNext()) {
CraftWorker craftWorker = itr.next();
if (craftWorker.getTaskId() == taskId) {
return craftWorker.isAlive();
}
}
// didn't find it, so it must have been removed
return false;
}
}
}

View File

@ -1,90 +0,0 @@
package org.bukkit.craftbukkit.scheduler;
import org.bukkit.plugin.Plugin;
import org.bukkit.scheduler.BukkitWorker;
public class CraftWorker implements Runnable, BukkitWorker {
private static int hashIdCounter = 1;
private static Object hashIdCounterSync = new Object();
private final int hashId;
private final Plugin owner;
private final int taskId;
private final Thread t;
private final CraftThreadManager parent;
private final Runnable task;
CraftWorker(CraftThreadManager parent, Runnable task, Plugin owner, int taskId) {
this.parent = parent;
this.taskId = taskId;
this.task = task;
this.owner = owner;
this.hashId = CraftWorker.getNextHashId();
t = new Thread(this);
t.start();
}
public void run() {
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
}
synchronized (parent.workers) {
parent.workers.remove(this);
}
}
public int getTaskId() {
return taskId;
}
public Plugin getOwner() {
return owner;
}
public Thread getThread() {
return t;
}
public void interrupt() {
t.interrupt();
}
public boolean isAlive() {
return t.isAlive();
}
private static int getNextHashId() {
synchronized (hashIdCounterSync) {
return hashIdCounter++;
}
}
@Override
public int hashCode() {
return hashId;
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (!(other instanceof CraftWorker)) {
return false;
}
CraftWorker otherCraftWorker = (CraftWorker) other;
return otherCraftWorker.hashCode() == hashId;
}
}

View File

@ -1,15 +0,0 @@
package org.bukkit.craftbukkit.scheduler;
public class ObjectContainer<T> {
T object;
public void setObject(T object) {
this.object = object;
}
public T getObject() {
return object;
}
}