Fold BukkitExecutors into ProtocolLib (#721)

Fixes #721
This commit is contained in:
Dan Mulloy 2020-06-06 13:49:26 -04:00
parent c203fda391
commit 7ac4ac696f
No known key found for this signature in database
GPG Key ID: 2B62F7DACFF133E8
11 changed files with 1034 additions and 10 deletions

View File

@ -298,14 +298,6 @@
<scope>compile</scope>
</dependency>
<!-- TODO fold BukkitExecutors into ProtocolLib -->
<dependency>
<groupId>com.comphenix.executors</groupId>
<artifactId>BukkitExecutors</artifactId>
<version>1.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<!-- Testing dependencies -->
<dependency>
<groupId>junit</groupId>

View File

@ -33,7 +33,7 @@ import org.bukkit.plugin.Plugin;
import org.bukkit.plugin.PluginManager;
import org.bukkit.plugin.java.JavaPlugin;
import com.comphenix.executors.BukkitExecutors;
import com.comphenix.protocol.executors.BukkitExecutors;
import com.comphenix.protocol.async.AsyncFilterManager;
import com.comphenix.protocol.error.BasicErrorReporter;
import com.comphenix.protocol.error.DelegatedErrorReporter;

View File

@ -0,0 +1,142 @@
package com.comphenix.protocol.executors;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableScheduledFuture;
import org.bukkit.scheduler.BukkitTask;
abstract class AbstractBukkitService
extends AbstractListeningService implements BukkitScheduledExecutorService {
private static final long MILLISECONDS_PER_TICK = 50;
private static final long NANOSECONDS_PER_TICK = 1000000 * MILLISECONDS_PER_TICK;
private volatile boolean shutdown;
private final PendingTasks tasks;
public AbstractBukkitService(PendingTasks tasks) {
this.tasks = tasks;
}
@Override
protected <T> RunnableAbstractFuture<T> newTaskFor(Runnable runnable, T value) {
return newTaskFor(Executors.callable(runnable, value));
}
@Override
protected <T> RunnableAbstractFuture<T> newTaskFor(final Callable<T> callable) {
validateState();
return new CallableTask<T>(callable);
}
@Override
public void execute(Runnable command) {
validateState();
if (command instanceof RunnableFuture) {
tasks.add(getTask(command), (Future<?>) command);
} else {
// Submit it first
submit(command);
}
}
// Bridge to Bukkit
protected abstract BukkitTask getTask(Runnable command);
protected abstract BukkitTask getLaterTask(Runnable task, long ticks);
protected abstract BukkitTask getTimerTask(long ticksInitial, long ticksDelay, Runnable task);
@Override
public List<Runnable> shutdownNow() {
shutdown();
tasks.cancel();
// We don't support this
return Collections.emptyList();
}
@Override
public void shutdown() {
shutdown = true;
}
private void validateState() {
if (shutdown) {
throw new RejectedExecutionException("Executor service has shut down. Cannot start new tasks.");
}
}
private long toTicks(long delay, TimeUnit unit) {
return Math.round(unit.toMillis(delay) / (double)MILLISECONDS_PER_TICK);
}
@Override
public ListenableScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return schedule(Executors.callable(command), delay, unit);
}
@Override
public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
long ticks = toTicks(delay, unit);
// Construct future task and Bukkit task
CallableTask<V> task = new CallableTask<V>(callable);
BukkitTask bukkitTask = getLaterTask(task, ticks);
tasks.add(bukkitTask, task);
return task.getScheduledFuture(System.nanoTime() + delay * NANOSECONDS_PER_TICK, 0);
}
@Override
public ListenableScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
long period, TimeUnit unit) {
long ticksInitial = toTicks(initialDelay, unit);
long ticksDelay = toTicks(period, unit);
// Construct future task and Bukkit task
CallableTask<?> task = new CallableTask<Object>(Executors.callable(command)) {
protected void compute() {
// Do nothing more. This future can only be finished by cancellation
try {
compute.call();
} catch (Exception e) {
// Let Bukkit handle this
throw Throwables.propagate(e);
}
}
};
BukkitTask bukkitTask = getTimerTask(ticksInitial, ticksDelay, task);
tasks.add(bukkitTask, task);
return task.getScheduledFuture(
System.nanoTime() + ticksInitial * NANOSECONDS_PER_TICK,
ticksDelay * NANOSECONDS_PER_TICK);
}
// Not supported!
@Deprecated
@Override
public ListenableScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return scheduleAtFixedRate(command, initialDelay, delay, unit);
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return tasks.awaitTermination(timeout, unit);
}
@Override
public boolean isShutdown() {
return shutdown;
}
@Override
public boolean isTerminated() {
return tasks.isTerminated();
}
}

View File

@ -0,0 +1,288 @@
package com.comphenix.protocol.executors;
/*
* This file is a modified version of
* http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java?revision=1.35
* which contained the following notice:
*
* Written by Doug Lea with assistance from members of JCP JSR-166 Expert Group and released to the
* public domain, as explained at http://creativecommons.org/publicdomain/zero/1.0/
*
* Rationale for copying:
* Guava targets JDK5, whose AbstractExecutorService class lacks the newTaskFor protected
* customization methods needed by MoreExecutors.listeningDecorator. This class is a copy of
* AbstractExecutorService from the JSR166 CVS repository. It contains the desired methods.
*/
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.ListeningExecutorService;
/**
* Provides default implementations of {@link ListeningExecutorService}
* execution methods. This class implements the <tt>submit</tt>,
* <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
* {@link ListenableFutureTask} returned by <tt>newTaskFor</tt>. For example,
* the implementation of <tt>submit(Runnable)</tt> creates an associated
* <tt>ListenableFutureTask</tt> that is executed and returned.
*
* @author Doug Lea
*/
abstract class AbstractListeningService implements ListeningExecutorService {
/**
* Represents a runnable abstract listenable future task.
*
* @author Kristian
* @param <T>
*/
public static abstract class RunnableAbstractFuture<T>
extends AbstractFuture<T> implements RunnableFuture<T> {
}
/**
* Returns a <tt>ListenableFutureTask</tt> for the given runnable and
* default value.
*
* @param runnable - the runnable task being wrapped
* @param value - the default value for the returned future
* @return a <tt>ListenableFutureTask</tt> which when run will run the
* underlying runnable and which, as a <tt>Future</tt>, will yield
* the given value as its result and provide for cancellation of the
* underlying task.
*/
protected abstract <T> RunnableAbstractFuture<T> newTaskFor(Runnable runnable, T value);
/**
* Returns a <tt>ListenableFutureTask</tt> for the given callable task.
*
* @param callable - the callable task being wrapped
* @return a <tt>ListenableFutureTask</tt> which when run will call the
* underlying callable and which, as a <tt>Future</tt>, will yield
* the callable's result as its result and provide for cancellation
* of the underlying task.
*/
protected abstract <T> RunnableAbstractFuture<T> newTaskFor(Callable<T> callable);
@Override
public ListenableFuture<?> submit(Runnable task) {
if (task == null) {
throw new NullPointerException();
}
RunnableAbstractFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
@Override
public <T> ListenableFuture<T> submit(Runnable task, T result) {
if (task == null) {
throw new NullPointerException();
}
RunnableAbstractFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
@Override
public <T> ListenableFuture<T> submit(Callable<T> task) {
if (task == null) {
throw new NullPointerException();
}
RunnableAbstractFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
/**
* The main mechanics of invokeAny.
*/
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null) {
throw new NullPointerException();
}
int ntasks = tasks.size();
if (ntasks == 0) {
throw new IllegalArgumentException();
}
List<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);
// For efficiency, especially in executors with limited
// parallelism, check to see if previously submitted tasks are
// done before submitting more of them. This interleaving
// plus the exception mechanics account for messiness of main
// loop.
try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
ExecutionException ee = null;
long lastTime = timed ? System.nanoTime() : 0;
Iterator<? extends Callable<T>> it = tasks.iterator();
// Start one task for sure; the rest incrementally
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;
for (;;) {
Future<T> f = ecs.poll();
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
} else if (active == 0) {
break;
} else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null) {
throw new TimeoutException();
}
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
} else {
f = ecs.take();
}
}
if (f != null) {
--active;
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null) {
ee = new ExecutionException(null);
}
throw ee;
} finally {
for (Future<T> f : futures)
f.cancel(true);
}
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException,
ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
// assert false;
return null;
}
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null) {
throw new NullPointerException();
}
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableAbstractFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (Future<T> f : futures) {
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException | ExecutionException ignore) { }
}
}
done = true;
return futures;
} finally {
if (!done) {
for (Future<T> f : futures)
f.cancel(true);
}
}
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout,
TimeUnit unit) throws InterruptedException {
if (tasks == null || unit == null) {
throw new NullPointerException();
}
long nanos = unit.toNanos(timeout);
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
long lastTime = System.nanoTime();
// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
for (Future<T> future : futures) {
execute((Runnable) future);
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
if (nanos <= 0) {
return futures;
}
}
for (Future<T> f : futures) {
if (!f.isDone()) {
if (nanos <= 0) {
return futures;
}
try {
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException | ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
}
}
done = true;
return futures;
} finally {
if (!done) {
for (Future<T> f : futures)
f.cancel(true);
}
}
}
}

View File

@ -0,0 +1,93 @@
package com.comphenix.protocol.executors;
import org.bukkit.plugin.Plugin;
import org.bukkit.scheduler.BukkitScheduler;
import org.bukkit.scheduler.BukkitTask;
import com.comphenix.protocol.executors.AbstractBukkitService;
import com.comphenix.protocol.executors.BukkitScheduledExecutorService;
import com.comphenix.protocol.executors.PendingTasks;
import com.comphenix.protocol.executors.PluginDisabledListener;
import com.google.common.base.Preconditions;
public class BukkitExecutors {
private BukkitExecutors() {
// Don't make it constructable
}
/**
* Retrieves a scheduled executor service for running tasks on the main thread.
* @param plugin - plugin that is executing the given tasks.
* @return Executor service.
*/
public static BukkitScheduledExecutorService newSynchronous(final Plugin plugin) {
// Bridge destination
final BukkitScheduler scheduler = getScheduler(plugin);
Preconditions.checkNotNull(plugin, "plugin cannot be NULL");
BukkitScheduledExecutorService service = new com.comphenix.protocol.executors.AbstractBukkitService(new PendingTasks(plugin, scheduler)) {
@Override
protected BukkitTask getTask(Runnable command) {
return scheduler.runTask(plugin, command);
}
@Override
protected BukkitTask getLaterTask(Runnable task, long ticks) {
return scheduler.runTaskLater(plugin, task, ticks);
}
@Override
protected BukkitTask getTimerTask(long ticksInitial, long ticksDelay, Runnable task) {
return scheduler.runTaskTimer(plugin, task, ticksInitial, ticksDelay);
}
};
PluginDisabledListener.getListener(plugin).addService(service);
return service;
}
/**
* Retrieves a scheduled executor service for running asynchronous tasks.
* @param plugin - plugin that is executing the given tasks.
* @return Asynchronous executor service.
*/
public static BukkitScheduledExecutorService newAsynchronous(final Plugin plugin) {
// Bridge destination
final BukkitScheduler scheduler = getScheduler(plugin);
Preconditions.checkNotNull(plugin, "plugin cannot be NULL");
BukkitScheduledExecutorService service = new com.comphenix.protocol.executors.AbstractBukkitService(new PendingTasks(plugin, scheduler)) {
@Override
protected BukkitTask getTask(Runnable command) {
return scheduler.runTaskAsynchronously(plugin, command);
}
@Override
protected BukkitTask getLaterTask(Runnable task, long ticks) {
return scheduler.runTaskLaterAsynchronously(plugin, task, ticks);
}
@Override
protected BukkitTask getTimerTask(long ticksInitial, long ticksDelay, Runnable task) {
return scheduler.runTaskTimerAsynchronously(plugin, task, ticksInitial, ticksDelay);
}
};
PluginDisabledListener.getListener(plugin).addService(service);
return service;
}
/**
* Retrieve the current Bukkit scheduler.
* @return Current scheduler.
*/
private static BukkitScheduler getScheduler(Plugin plugin) {
BukkitScheduler scheduler = plugin.getServer().getScheduler();
if (scheduler != null) {
return scheduler;
} else {
throw new IllegalStateException("Unable to retrieve scheduler.");
}
}
}

View File

@ -0,0 +1,103 @@
package com.comphenix.protocol.executors;
import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.bukkit.event.*;
import org.bukkit.plugin.EventExecutor;
import org.bukkit.plugin.IllegalPluginAccessException;
import org.bukkit.plugin.Plugin;
import org.bukkit.plugin.RegisteredListener;
public class BukkitFutures {
// Represents empty classes
private static Listener EMPTY_LISTENER = new Listener() {};
/**
* Retrieve a future representing the next invocation of the given event.
* @param plugin - owner plugin.
* @return Future event invocation.
*/
public static <TEvent extends Event> ListenableFuture<TEvent> nextEvent(Plugin plugin, Class<TEvent> eventClass) {
return BukkitFutures.nextEvent(plugin, eventClass, EventPriority.NORMAL, false);
}
/**
* Retrieve a future representing the next invocation of the given event.
* @param plugin - owner plugin.
* @return Future event invocation.
*/
public static <TEvent extends Event> ListenableFuture<TEvent> nextEvent(
Plugin plugin, Class<TEvent> eventClass, EventPriority priority, boolean ignoreCancelled) {
// Event and future
final HandlerList list = getHandlerList(eventClass);
final SettableFuture<TEvent> future = SettableFuture.create();
EventExecutor executor = new EventExecutor() {
private final AtomicBoolean once = new AtomicBoolean();
@SuppressWarnings("unchecked")
@Override
public void execute(Listener listener, Event event) throws EventException {
// Fire the future
if (!future.isCancelled() && !once.getAndSet(true)) {
future.set((TEvent) event);
}
}
};
RegisteredListener listener = new RegisteredListener(EMPTY_LISTENER, executor, priority, plugin, ignoreCancelled) {
@Override
public void callEvent(Event event) throws EventException {
super.callEvent(event);
list.unregister(this);
}
};
// Ensure that the future is cleaned up when the plugin is disabled
PluginDisabledListener.getListener(plugin).addFuture(future);
// Add the listener
list.register(listener);
return future;
}
/**
* Register a given event executor.
* @param plugin - the owner plugin.
* @param eventClass - the event to register.
* @param priority - the event priority.
* @param executor - the event executor.
*/
public static void registerEventExecutor(Plugin plugin, Class<? extends Event> eventClass, EventPriority priority, EventExecutor executor) {
getHandlerList(eventClass).register(
new RegisteredListener(EMPTY_LISTENER, executor, priority, plugin, false)
);
}
/**
* Retrieve the handler list associated with the given class.
* @param clazz - given event class.
* @return Associated handler list.
*/
private static HandlerList getHandlerList(Class<? extends Event> clazz) {
// Class must have Event as its superclass
while (clazz.getSuperclass() != null && Event.class.isAssignableFrom(clazz.getSuperclass())) {
try {
Method method = clazz.getDeclaredMethod("getHandlerList");
method.setAccessible(true);
return (HandlerList) method.invoke(null);
} catch (NoSuchMethodException e) {
// Keep on searching
clazz = clazz.getSuperclass().asSubclass(Event.class);
} catch (Exception e) {
throw new IllegalPluginAccessException(e.getMessage());
}
}
throw new IllegalPluginAccessException("Unable to find handler list for event "
+ clazz.getName());
}
}

View File

@ -0,0 +1,34 @@
package com.comphenix.protocol.executors;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
/**
* Represents a listening scheduler service that returns {@link ListenableScheduledFuture} instead of {@link ScheduledFuture}.
* @author Kristian
*/
public interface BukkitScheduledExecutorService extends ListeningScheduledExecutorService {
@Override
public ListenableScheduledFuture<?> schedule(
Runnable command, long delay, TimeUnit unit);
@Override
public <V> ListenableScheduledFuture<V> schedule(
Callable<V> callable, long delay, TimeUnit unit);
@Override
public ListenableScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period, TimeUnit unit);
/**
* This is not supported by the underlying Bukkit scheduler.
*/
@Override
@Deprecated
public ListenableScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, long initialDelay, long delay, TimeUnit unit);
}

View File

@ -0,0 +1,98 @@
package com.comphenix.protocol.executors;
import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.comphenix.protocol.executors.AbstractListeningService.RunnableAbstractFuture;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableScheduledFuture;
class CallableTask<T> extends RunnableAbstractFuture<T> {
protected final Callable<T> compute;
public CallableTask(Callable<T> compute) {
Preconditions.checkNotNull(compute, "compute cannot be NULL");
this.compute = compute;
}
public ListenableScheduledFuture<T> getScheduledFuture(final long startTime, final long nextDelay) {
return new ListenableScheduledFuture<T>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return CallableTask.this.cancel(mayInterruptIfRunning);
}
@Override
public T get() throws InterruptedException, ExecutionException {
return CallableTask.this.get();
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
return CallableTask.this.get(timeout, unit);
}
@Override
public boolean isCancelled() {
return CallableTask.this.isCancelled();
}
@Override
public boolean isDone() {
return CallableTask.this.isDone();
}
@Override
public void addListener(Runnable listener, Executor executor) {
CallableTask.this.addListener(listener, executor);
}
@Override
public int compareTo(Delayed o) {
return Long.valueOf(getDelay(TimeUnit.NANOSECONDS))
.compareTo(o.getDelay(TimeUnit.NANOSECONDS));
}
@Override
public long getDelay(TimeUnit unit) {
long current = System.nanoTime();
// Calculate the correct delay
if (current < startTime || !isPeriodic())
return unit.convert(startTime - current, TimeUnit.NANOSECONDS);
else
return unit.convert(((current - startTime) % nextDelay), TimeUnit.NANOSECONDS);
}
// @Override
public boolean isPeriodic() {
return nextDelay > 0;
}
};
}
/**
* Invoked by the thread responsible for computing this future.
*/
protected void compute() {
try {
// Save result
if (!isCancelled()) {
set(compute.call());
}
} catch (Throwable e) {
setException(e);
}
}
@Override
public void run() {
compute();
}
}

View File

@ -0,0 +1,140 @@
package com.comphenix.protocol.executors;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.bukkit.plugin.Plugin;
import org.bukkit.scheduler.BukkitScheduler;
import org.bukkit.scheduler.BukkitTask;
class PendingTasks {
/**
* Represents a wrapper for a cancelable task.
*
* @author Kristian
*/
private interface CancelableFuture {
void cancel();
boolean isTaskCancelled();
}
// Every pending task
private final Set<CancelableFuture> pending = new HashSet<>();
private final Object pendingLock = new Object();
// Handle arbitrary cancelation
private final Plugin plugin;
private final BukkitScheduler scheduler;
private BukkitTask cancellationTask;
public PendingTasks(Plugin plugin, BukkitScheduler scheduler) {
this.plugin = plugin;
this.scheduler = scheduler;
}
public void add(final BukkitTask task, final Future<?> future) {
add(new CancelableFuture() {
@Override
public boolean isTaskCancelled() {
// If completed, check its cancellation state
if (future.isDone())
return future.isCancelled();
return !(scheduler.isCurrentlyRunning(task.getTaskId()) ||
scheduler.isQueued(task.getTaskId()));
}
@Override
public void cancel() {
// Make sure
task.cancel();
future.cancel(true);
}
});
}
private CancelableFuture add(CancelableFuture task) {
synchronized (pendingLock) {
pending.add(task);
pendingLock.notifyAll();
beginCancellationTask();
return task;
}
}
private void beginCancellationTask() {
if (cancellationTask == null) {
cancellationTask = scheduler.runTaskTimer(plugin, () -> {
// Check for cancellations
synchronized (pendingLock) {
boolean changed = false;
for (Iterator<CancelableFuture> it = pending.iterator(); it.hasNext(); ) {
CancelableFuture future = it.next();
// Remove cancelled tasks
if (future.isTaskCancelled()) {
future.cancel();
it.remove();
changed = true;
}
}
// Notify waiting threads
if (changed) {
pendingLock.notifyAll();
}
}
// Stop if we are out of tasks
if (isTerminated()) {
cancellationTask.cancel();
cancellationTask = null;
}
}, 1, 1);
}
}
/**
* Cancel all pending tasks.
*/
public void cancel() {
for (CancelableFuture task : pending) {
task.cancel();
}
}
/**
* Wait until all pending tasks have completed.
* @param timeout - the current timeout.
* @param unit - unit of the timeout.
* @return TRUE if every pending task has terminated, FALSE if we reached the timeout.
* @throws InterruptedException
*/
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long expire = System.nanoTime() + unit.toNanos(timeout);
synchronized (pendingLock) {
// Wait until the tasks have all terminated
while (!isTerminated()) {
// Check timeout
if (expire < System.nanoTime())
return false;
unit.timedWait(pendingLock, timeout);
}
}
// Timeout!
return false;
}
/**
* Determine if all tasks have completed executing.
* @return TRUE if they have, FALSE otherwise.
*/
public boolean isTerminated() {
return pending.isEmpty();
}
}

View File

@ -0,0 +1,134 @@
package com.comphenix.protocol.executors;
import java.util.Collections;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import com.google.common.collect.MapMaker;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.bukkit.event.EventPriority;
import org.bukkit.event.Listener;
import org.bukkit.event.server.PluginDisableEvent;
import org.bukkit.plugin.Plugin;
class PluginDisabledListener implements Listener {
private static final ConcurrentMap<Plugin, PluginDisabledListener> LISTENERS = new MapMaker().weakKeys().makeMap();
// Objects that must be disabled
private final Set<Future<?>> futures = Collections.newSetFromMap(new WeakHashMap<>());
private final Set<ExecutorService> services = Collections.newSetFromMap(new WeakHashMap<>());
private final Object setLock = new Object();
// The plugin we're looking for
private final Plugin plugin;
private boolean disabled;
private PluginDisabledListener(Plugin plugin) {
this.plugin = plugin;
}
/**
* Retrieve the associated disabled listener.
* @param plugin - the plugin.
* @return Associated listener.
*/
public static PluginDisabledListener getListener(final Plugin plugin) {
PluginDisabledListener result = LISTENERS.get(plugin);
if (result == null) {
final PluginDisabledListener created = new PluginDisabledListener(plugin);
result = LISTENERS.putIfAbsent(plugin, created);
if (result == null) {
// Register listener - we can't use the normal method as the plugin might not be enabled yet
BukkitFutures.registerEventExecutor(plugin, PluginDisableEvent.class, EventPriority.NORMAL,
(listener, event) -> {
if (event instanceof PluginDisableEvent) {
created.onPluginDisabled((PluginDisableEvent) event);
}
});
result = created;
}
}
return result;
}
/**
* Ensure that the given future will be cancelled when the plugin is disabled.
* @param future - the future to cancel.
*/
public void addFuture(final ListenableFuture<?> future) {
synchronized (setLock) {
if (disabled) {
processFuture(future);
} else {
futures.add(future);
}
}
// Remove the future when it has computed
Futures.addCallback(future, new FutureCallback<Object>() {
@Override
public void onSuccess(Object value) {
synchronized (setLock) {
futures.remove(future);
}
}
@Override
public void onFailure(Throwable ex) {
synchronized (setLock) {
futures.remove(future);
}
}
});
}
/**
* Ensure that a given service is shutdown when the plugin is disabled.
* @param service - the service.
*/
public void addService(ExecutorService service) {
synchronized (setLock) {
if (disabled) {
processService(service);
} else {
services.add(service);
}
}
}
// Will be registered manually
public void onPluginDisabled(PluginDisableEvent e) {
if (e.getPlugin().equals(plugin)) {
synchronized (setLock) {
disabled = true;
// Cancel all unfinished futures
for (Future<?> future : futures) {
processFuture(future);
}
for (ExecutorService service : services) {
processService(service);
}
}
}
}
private void processFuture(Future<?> future) {
if (!future.isDone()) {
future.cancel(true);
}
}
private void processService(ExecutorService service) {
service.shutdownNow();
}
}

View File

@ -6,7 +6,7 @@ import org.bukkit.Server;
import org.bukkit.event.world.WorldInitEvent;
import org.bukkit.plugin.Plugin;
import com.comphenix.executors.BukkitFutures;
import com.comphenix.protocol.executors.BukkitFutures;
import com.comphenix.protocol.async.AsyncFilterManager;
import com.comphenix.protocol.error.ErrorReporter;
import com.comphenix.protocol.error.Report;