From 7ac4ac696f5cb47d4c3d3f3c1ad786b066294d2c Mon Sep 17 00:00:00 2001 From: Dan Mulloy Date: Sat, 6 Jun 2020 13:49:26 -0400 Subject: [PATCH] Fold BukkitExecutors into ProtocolLib (#721) Fixes #721 --- pom.xml | 8 - .../com/comphenix/protocol/ProtocolLib.java | 2 +- .../executors/AbstractBukkitService.java | 142 +++++++++ .../executors/AbstractListeningService.java | 288 ++++++++++++++++++ .../protocol/executors/BukkitExecutors.java | 93 ++++++ .../protocol/executors/BukkitFutures.java | 103 +++++++ .../BukkitScheduledExecutorService.java | 34 +++ .../protocol/executors/CallableTask.java | 98 ++++++ .../protocol/executors/PendingTasks.java | 140 +++++++++ .../executors/PluginDisabledListener.java | 134 ++++++++ .../injector/PacketFilterBuilder.java | 2 +- 11 files changed, 1034 insertions(+), 10 deletions(-) create mode 100644 src/main/java/com/comphenix/protocol/executors/AbstractBukkitService.java create mode 100644 src/main/java/com/comphenix/protocol/executors/AbstractListeningService.java create mode 100644 src/main/java/com/comphenix/protocol/executors/BukkitExecutors.java create mode 100644 src/main/java/com/comphenix/protocol/executors/BukkitFutures.java create mode 100644 src/main/java/com/comphenix/protocol/executors/BukkitScheduledExecutorService.java create mode 100644 src/main/java/com/comphenix/protocol/executors/CallableTask.java create mode 100644 src/main/java/com/comphenix/protocol/executors/PendingTasks.java create mode 100644 src/main/java/com/comphenix/protocol/executors/PluginDisabledListener.java diff --git a/pom.xml b/pom.xml index 39313315..5c8c024f 100644 --- a/pom.xml +++ b/pom.xml @@ -298,14 +298,6 @@ compile - - - com.comphenix.executors - BukkitExecutors - 1.1-SNAPSHOT - compile - - junit diff --git a/src/main/java/com/comphenix/protocol/ProtocolLib.java b/src/main/java/com/comphenix/protocol/ProtocolLib.java index 46dc854a..e6b6781b 100644 --- a/src/main/java/com/comphenix/protocol/ProtocolLib.java +++ b/src/main/java/com/comphenix/protocol/ProtocolLib.java @@ -33,7 +33,7 @@ import org.bukkit.plugin.Plugin; import org.bukkit.plugin.PluginManager; import org.bukkit.plugin.java.JavaPlugin; -import com.comphenix.executors.BukkitExecutors; +import com.comphenix.protocol.executors.BukkitExecutors; import com.comphenix.protocol.async.AsyncFilterManager; import com.comphenix.protocol.error.BasicErrorReporter; import com.comphenix.protocol.error.DelegatedErrorReporter; diff --git a/src/main/java/com/comphenix/protocol/executors/AbstractBukkitService.java b/src/main/java/com/comphenix/protocol/executors/AbstractBukkitService.java new file mode 100644 index 00000000..216d7b1b --- /dev/null +++ b/src/main/java/com/comphenix/protocol/executors/AbstractBukkitService.java @@ -0,0 +1,142 @@ +package com.comphenix.protocol.executors; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.*; + +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.ListenableScheduledFuture; + +import org.bukkit.scheduler.BukkitTask; + +abstract class AbstractBukkitService + extends AbstractListeningService implements BukkitScheduledExecutorService { + + private static final long MILLISECONDS_PER_TICK = 50; + private static final long NANOSECONDS_PER_TICK = 1000000 * MILLISECONDS_PER_TICK; + + private volatile boolean shutdown; + private final PendingTasks tasks; + + public AbstractBukkitService(PendingTasks tasks) { + this.tasks = tasks; + } + + @Override + protected RunnableAbstractFuture newTaskFor(Runnable runnable, T value) { + return newTaskFor(Executors.callable(runnable, value)); + } + + @Override + protected RunnableAbstractFuture newTaskFor(final Callable callable) { + validateState(); + return new CallableTask(callable); + } + + @Override + public void execute(Runnable command) { + validateState(); + + if (command instanceof RunnableFuture) { + tasks.add(getTask(command), (Future) command); + } else { + // Submit it first + submit(command); + } + } + + // Bridge to Bukkit + protected abstract BukkitTask getTask(Runnable command); + protected abstract BukkitTask getLaterTask(Runnable task, long ticks); + protected abstract BukkitTask getTimerTask(long ticksInitial, long ticksDelay, Runnable task); + + @Override + public List shutdownNow() { + shutdown(); + tasks.cancel(); + + // We don't support this + return Collections.emptyList(); + } + + @Override + public void shutdown() { + shutdown = true; + } + + private void validateState() { + if (shutdown) { + throw new RejectedExecutionException("Executor service has shut down. Cannot start new tasks."); + } + } + + private long toTicks(long delay, TimeUnit unit) { + return Math.round(unit.toMillis(delay) / (double)MILLISECONDS_PER_TICK); + } + + @Override + public ListenableScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return schedule(Executors.callable(command), delay, unit); + } + + @Override + public ListenableScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + long ticks = toTicks(delay, unit); + + // Construct future task and Bukkit task + CallableTask task = new CallableTask(callable); + BukkitTask bukkitTask = getLaterTask(task, ticks); + + tasks.add(bukkitTask, task); + return task.getScheduledFuture(System.nanoTime() + delay * NANOSECONDS_PER_TICK, 0); + } + + @Override + public ListenableScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, + long period, TimeUnit unit) { + + long ticksInitial = toTicks(initialDelay, unit); + long ticksDelay = toTicks(period, unit); + + // Construct future task and Bukkit task + CallableTask task = new CallableTask(Executors.callable(command)) { + protected void compute() { + // Do nothing more. This future can only be finished by cancellation + try { + compute.call(); + } catch (Exception e) { + // Let Bukkit handle this + throw Throwables.propagate(e); + } + } + }; + BukkitTask bukkitTask = getTimerTask(ticksInitial, ticksDelay, task); + + tasks.add(bukkitTask, task); + return task.getScheduledFuture( + System.nanoTime() + ticksInitial * NANOSECONDS_PER_TICK, + ticksDelay * NANOSECONDS_PER_TICK); + } + + // Not supported! + @Deprecated + @Override + public ListenableScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return scheduleAtFixedRate(command, initialDelay, delay, unit); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return tasks.awaitTermination(timeout, unit); + } + + @Override + public boolean isShutdown() { + return shutdown; + } + + @Override + public boolean isTerminated() { + return tasks.isTerminated(); + } +} \ No newline at end of file diff --git a/src/main/java/com/comphenix/protocol/executors/AbstractListeningService.java b/src/main/java/com/comphenix/protocol/executors/AbstractListeningService.java new file mode 100644 index 00000000..4fd2d19d --- /dev/null +++ b/src/main/java/com/comphenix/protocol/executors/AbstractListeningService.java @@ -0,0 +1,288 @@ +package com.comphenix.protocol.executors; + +/* + * This file is a modified version of + * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java?revision=1.35 + * which contained the following notice: + * + * Written by Doug Lea with assistance from members of JCP JSR-166 Expert Group and released to the + * public domain, as explained at http://creativecommons.org/publicdomain/zero/1.0/ + * + * Rationale for copying: + * Guava targets JDK5, whose AbstractExecutorService class lacks the newTaskFor protected + * customization methods needed by MoreExecutors.listeningDecorator. This class is a copy of + * AbstractExecutorService from the JSR166 CVS repository. It contains the desired methods. + */ + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableFutureTask; +import com.google.common.util.concurrent.ListeningExecutorService; + +/** + * Provides default implementations of {@link ListeningExecutorService} + * execution methods. This class implements the submit, + * invokeAny and invokeAll methods using a + * {@link ListenableFutureTask} returned by newTaskFor. For example, + * the implementation of submit(Runnable) creates an associated + * ListenableFutureTask that is executed and returned. + * + * @author Doug Lea + */ +abstract class AbstractListeningService implements ListeningExecutorService { + /** + * Represents a runnable abstract listenable future task. + * + * @author Kristian + * @param + */ + public static abstract class RunnableAbstractFuture + extends AbstractFuture implements RunnableFuture { + + } + + /** + * Returns a ListenableFutureTask for the given runnable and + * default value. + * + * @param runnable - the runnable task being wrapped + * @param value - the default value for the returned future + * @return a ListenableFutureTask which when run will run the + * underlying runnable and which, as a Future, will yield + * the given value as its result and provide for cancellation of the + * underlying task. + */ + protected abstract RunnableAbstractFuture newTaskFor(Runnable runnable, T value); + + /** + * Returns a ListenableFutureTask for the given callable task. + * + * @param callable - the callable task being wrapped + * @return a ListenableFutureTask which when run will call the + * underlying callable and which, as a Future, will yield + * the callable's result as its result and provide for cancellation + * of the underlying task. + */ + protected abstract RunnableAbstractFuture newTaskFor(Callable callable); + + @Override + public ListenableFuture submit(Runnable task) { + if (task == null) { + throw new NullPointerException(); + } + RunnableAbstractFuture ftask = newTaskFor(task, null); + execute(ftask); + return ftask; + } + + @Override + public ListenableFuture submit(Runnable task, T result) { + if (task == null) { + throw new NullPointerException(); + } + RunnableAbstractFuture ftask = newTaskFor(task, result); + execute(ftask); + return ftask; + } + + @Override + public ListenableFuture submit(Callable task) { + if (task == null) { + throw new NullPointerException(); + } + RunnableAbstractFuture ftask = newTaskFor(task); + execute(ftask); + return ftask; + } + + /** + * The main mechanics of invokeAny. + */ + private T doInvokeAny(Collection> tasks, boolean timed, long nanos) + throws InterruptedException, ExecutionException, TimeoutException { + if (tasks == null) { + throw new NullPointerException(); + } + int ntasks = tasks.size(); + if (ntasks == 0) { + throw new IllegalArgumentException(); + } + List> futures = new ArrayList>(ntasks); + ExecutorCompletionService ecs = new ExecutorCompletionService(this); + + // For efficiency, especially in executors with limited + // parallelism, check to see if previously submitted tasks are + // done before submitting more of them. This interleaving + // plus the exception mechanics account for messiness of main + // loop. + + try { + // Record exceptions so that if we fail to obtain any + // result, we can throw the last exception we got. + ExecutionException ee = null; + long lastTime = timed ? System.nanoTime() : 0; + Iterator> it = tasks.iterator(); + + // Start one task for sure; the rest incrementally + futures.add(ecs.submit(it.next())); + --ntasks; + int active = 1; + + for (;;) { + Future f = ecs.poll(); + if (f == null) { + if (ntasks > 0) { + --ntasks; + futures.add(ecs.submit(it.next())); + ++active; + } else if (active == 0) { + break; + } else if (timed) { + f = ecs.poll(nanos, TimeUnit.NANOSECONDS); + if (f == null) { + throw new TimeoutException(); + } + long now = System.nanoTime(); + nanos -= now - lastTime; + lastTime = now; + } else { + f = ecs.take(); + } + } + if (f != null) { + --active; + try { + return f.get(); + } catch (ExecutionException eex) { + ee = eex; + } catch (RuntimeException rex) { + ee = new ExecutionException(rex); + } + } + } + + if (ee == null) { + ee = new ExecutionException(null); + } + throw ee; + + } finally { + for (Future f : futures) + f.cancel(true); + } + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, + ExecutionException { + try { + return doInvokeAny(tasks, false, 0); + } catch (TimeoutException cannotHappen) { + // assert false; + return null; + } + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return doInvokeAny(tasks, true, unit.toNanos(timeout)); + } + + @Override + public List> invokeAll(Collection> tasks) + throws InterruptedException { + if (tasks == null) { + throw new NullPointerException(); + } + List> futures = new ArrayList>(tasks.size()); + boolean done = false; + try { + for (Callable t : tasks) { + RunnableAbstractFuture f = newTaskFor(t); + futures.add(f); + execute(f); + } + for (Future f : futures) { + if (!f.isDone()) { + try { + f.get(); + } catch (CancellationException | ExecutionException ignore) { } + } + } + done = true; + return futures; + } finally { + if (!done) { + for (Future f : futures) + f.cancel(true); + } + } + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, + TimeUnit unit) throws InterruptedException { + if (tasks == null || unit == null) { + throw new NullPointerException(); + } + long nanos = unit.toNanos(timeout); + List> futures = new ArrayList>(tasks.size()); + boolean done = false; + try { + for (Callable t : tasks) + futures.add(newTaskFor(t)); + + long lastTime = System.nanoTime(); + + // Interleave time checks and calls to execute in case + // executor doesn't have any/much parallelism. + for (Future future : futures) { + execute((Runnable) future); + long now = System.nanoTime(); + nanos -= now - lastTime; + lastTime = now; + if (nanos <= 0) { + return futures; + } + } + + for (Future f : futures) { + if (!f.isDone()) { + if (nanos <= 0) { + return futures; + } + try { + f.get(nanos, TimeUnit.NANOSECONDS); + } catch (CancellationException | ExecutionException ignore) { + } catch (TimeoutException toe) { + return futures; + } + long now = System.nanoTime(); + nanos -= now - lastTime; + lastTime = now; + } + } + done = true; + return futures; + } finally { + if (!done) { + for (Future f : futures) + f.cancel(true); + } + } + } +} diff --git a/src/main/java/com/comphenix/protocol/executors/BukkitExecutors.java b/src/main/java/com/comphenix/protocol/executors/BukkitExecutors.java new file mode 100644 index 00000000..1729349f --- /dev/null +++ b/src/main/java/com/comphenix/protocol/executors/BukkitExecutors.java @@ -0,0 +1,93 @@ +package com.comphenix.protocol.executors; + +import org.bukkit.plugin.Plugin; +import org.bukkit.scheduler.BukkitScheduler; +import org.bukkit.scheduler.BukkitTask; + +import com.comphenix.protocol.executors.AbstractBukkitService; +import com.comphenix.protocol.executors.BukkitScheduledExecutorService; +import com.comphenix.protocol.executors.PendingTasks; +import com.comphenix.protocol.executors.PluginDisabledListener; +import com.google.common.base.Preconditions; + +public class BukkitExecutors { + private BukkitExecutors() { + // Don't make it constructable + } + + /** + * Retrieves a scheduled executor service for running tasks on the main thread. + * @param plugin - plugin that is executing the given tasks. + * @return Executor service. + */ + public static BukkitScheduledExecutorService newSynchronous(final Plugin plugin) { + // Bridge destination + final BukkitScheduler scheduler = getScheduler(plugin); + Preconditions.checkNotNull(plugin, "plugin cannot be NULL"); + + BukkitScheduledExecutorService service = new com.comphenix.protocol.executors.AbstractBukkitService(new PendingTasks(plugin, scheduler)) { + @Override + protected BukkitTask getTask(Runnable command) { + return scheduler.runTask(plugin, command); + } + + @Override + protected BukkitTask getLaterTask(Runnable task, long ticks) { + return scheduler.runTaskLater(plugin, task, ticks); + } + + @Override + protected BukkitTask getTimerTask(long ticksInitial, long ticksDelay, Runnable task) { + return scheduler.runTaskTimer(plugin, task, ticksInitial, ticksDelay); + } + }; + + PluginDisabledListener.getListener(plugin).addService(service); + return service; + } + + /** + * Retrieves a scheduled executor service for running asynchronous tasks. + * @param plugin - plugin that is executing the given tasks. + * @return Asynchronous executor service. + */ + public static BukkitScheduledExecutorService newAsynchronous(final Plugin plugin) { + // Bridge destination + final BukkitScheduler scheduler = getScheduler(plugin); + Preconditions.checkNotNull(plugin, "plugin cannot be NULL"); + + BukkitScheduledExecutorService service = new com.comphenix.protocol.executors.AbstractBukkitService(new PendingTasks(plugin, scheduler)) { + @Override + protected BukkitTask getTask(Runnable command) { + return scheduler.runTaskAsynchronously(plugin, command); + } + + @Override + protected BukkitTask getLaterTask(Runnable task, long ticks) { + return scheduler.runTaskLaterAsynchronously(plugin, task, ticks); + } + + @Override + protected BukkitTask getTimerTask(long ticksInitial, long ticksDelay, Runnable task) { + return scheduler.runTaskTimerAsynchronously(plugin, task, ticksInitial, ticksDelay); + } + }; + + PluginDisabledListener.getListener(plugin).addService(service); + return service; + } + + /** + * Retrieve the current Bukkit scheduler. + * @return Current scheduler. + */ + private static BukkitScheduler getScheduler(Plugin plugin) { + BukkitScheduler scheduler = plugin.getServer().getScheduler(); + + if (scheduler != null) { + return scheduler; + } else { + throw new IllegalStateException("Unable to retrieve scheduler."); + } + } +} diff --git a/src/main/java/com/comphenix/protocol/executors/BukkitFutures.java b/src/main/java/com/comphenix/protocol/executors/BukkitFutures.java new file mode 100644 index 00000000..de741e3b --- /dev/null +++ b/src/main/java/com/comphenix/protocol/executors/BukkitFutures.java @@ -0,0 +1,103 @@ +package com.comphenix.protocol.executors; + +import java.lang.reflect.Method; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + +import org.bukkit.event.*; +import org.bukkit.plugin.EventExecutor; +import org.bukkit.plugin.IllegalPluginAccessException; +import org.bukkit.plugin.Plugin; +import org.bukkit.plugin.RegisteredListener; + +public class BukkitFutures { + // Represents empty classes + private static Listener EMPTY_LISTENER = new Listener() {}; + + /** + * Retrieve a future representing the next invocation of the given event. + * @param plugin - owner plugin. + * @return Future event invocation. + */ + public static ListenableFuture nextEvent(Plugin plugin, Class eventClass) { + return BukkitFutures.nextEvent(plugin, eventClass, EventPriority.NORMAL, false); + } + + /** + * Retrieve a future representing the next invocation of the given event. + * @param plugin - owner plugin. + * @return Future event invocation. + */ + public static ListenableFuture nextEvent( + Plugin plugin, Class eventClass, EventPriority priority, boolean ignoreCancelled) { + + // Event and future + final HandlerList list = getHandlerList(eventClass); + final SettableFuture future = SettableFuture.create(); + + EventExecutor executor = new EventExecutor() { + private final AtomicBoolean once = new AtomicBoolean(); + + @SuppressWarnings("unchecked") + @Override + public void execute(Listener listener, Event event) throws EventException { + // Fire the future + if (!future.isCancelled() && !once.getAndSet(true)) { + future.set((TEvent) event); + } + } + }; + RegisteredListener listener = new RegisteredListener(EMPTY_LISTENER, executor, priority, plugin, ignoreCancelled) { + @Override + public void callEvent(Event event) throws EventException { + super.callEvent(event); + list.unregister(this); + } + }; + + // Ensure that the future is cleaned up when the plugin is disabled + PluginDisabledListener.getListener(plugin).addFuture(future); + + // Add the listener + list.register(listener); + return future; + } + + /** + * Register a given event executor. + * @param plugin - the owner plugin. + * @param eventClass - the event to register. + * @param priority - the event priority. + * @param executor - the event executor. + */ + public static void registerEventExecutor(Plugin plugin, Class eventClass, EventPriority priority, EventExecutor executor) { + getHandlerList(eventClass).register( + new RegisteredListener(EMPTY_LISTENER, executor, priority, plugin, false) + ); + } + + /** + * Retrieve the handler list associated with the given class. + * @param clazz - given event class. + * @return Associated handler list. + */ + private static HandlerList getHandlerList(Class clazz) { + // Class must have Event as its superclass + while (clazz.getSuperclass() != null && Event.class.isAssignableFrom(clazz.getSuperclass())) { + try { + Method method = clazz.getDeclaredMethod("getHandlerList"); + method.setAccessible(true); + return (HandlerList) method.invoke(null); + } catch (NoSuchMethodException e) { + // Keep on searching + clazz = clazz.getSuperclass().asSubclass(Event.class); + } catch (Exception e) { + throw new IllegalPluginAccessException(e.getMessage()); + } + } + throw new IllegalPluginAccessException("Unable to find handler list for event " + + clazz.getName()); + } +} diff --git a/src/main/java/com/comphenix/protocol/executors/BukkitScheduledExecutorService.java b/src/main/java/com/comphenix/protocol/executors/BukkitScheduledExecutorService.java new file mode 100644 index 00000000..43cdd32f --- /dev/null +++ b/src/main/java/com/comphenix/protocol/executors/BukkitScheduledExecutorService.java @@ -0,0 +1,34 @@ +package com.comphenix.protocol.executors; + +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.ListenableScheduledFuture; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; + +/** + * Represents a listening scheduler service that returns {@link ListenableScheduledFuture} instead of {@link ScheduledFuture}. + * @author Kristian + */ +public interface BukkitScheduledExecutorService extends ListeningScheduledExecutorService { + @Override + public ListenableScheduledFuture schedule( + Runnable command, long delay, TimeUnit unit); + + @Override + public ListenableScheduledFuture schedule( + Callable callable, long delay, TimeUnit unit); + + @Override + public ListenableScheduledFuture scheduleAtFixedRate( + Runnable command, long initialDelay, long period, TimeUnit unit); + + /** + * This is not supported by the underlying Bukkit scheduler. + */ + @Override + @Deprecated + public ListenableScheduledFuture scheduleWithFixedDelay( + Runnable command, long initialDelay, long delay, TimeUnit unit); +} \ No newline at end of file diff --git a/src/main/java/com/comphenix/protocol/executors/CallableTask.java b/src/main/java/com/comphenix/protocol/executors/CallableTask.java new file mode 100644 index 00000000..8f6bf2d9 --- /dev/null +++ b/src/main/java/com/comphenix/protocol/executors/CallableTask.java @@ -0,0 +1,98 @@ +package com.comphenix.protocol.executors; + +import java.util.concurrent.Callable; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.comphenix.protocol.executors.AbstractListeningService.RunnableAbstractFuture; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableScheduledFuture; + +class CallableTask extends RunnableAbstractFuture { + protected final Callable compute; + + public CallableTask(Callable compute) { + Preconditions.checkNotNull(compute, "compute cannot be NULL"); + + this.compute = compute; + } + + public ListenableScheduledFuture getScheduledFuture(final long startTime, final long nextDelay) { + return new ListenableScheduledFuture() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return CallableTask.this.cancel(mayInterruptIfRunning); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + return CallableTask.this.get(); + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, + ExecutionException, TimeoutException { + return CallableTask.this.get(timeout, unit); + } + + @Override + public boolean isCancelled() { + return CallableTask.this.isCancelled(); + } + + @Override + public boolean isDone() { + return CallableTask.this.isDone(); + } + + @Override + public void addListener(Runnable listener, Executor executor) { + CallableTask.this.addListener(listener, executor); + } + + @Override + public int compareTo(Delayed o) { + return Long.valueOf(getDelay(TimeUnit.NANOSECONDS)) + .compareTo(o.getDelay(TimeUnit.NANOSECONDS)); + } + + @Override + public long getDelay(TimeUnit unit) { + long current = System.nanoTime(); + + // Calculate the correct delay + if (current < startTime || !isPeriodic()) + return unit.convert(startTime - current, TimeUnit.NANOSECONDS); + else + return unit.convert(((current - startTime) % nextDelay), TimeUnit.NANOSECONDS); + } + + // @Override + public boolean isPeriodic() { + return nextDelay > 0; + } + }; + } + + /** + * Invoked by the thread responsible for computing this future. + */ + protected void compute() { + try { + // Save result + if (!isCancelled()) { + set(compute.call()); + } + } catch (Throwable e) { + setException(e); + } + } + + @Override + public void run() { + compute(); + } +} \ No newline at end of file diff --git a/src/main/java/com/comphenix/protocol/executors/PendingTasks.java b/src/main/java/com/comphenix/protocol/executors/PendingTasks.java new file mode 100644 index 00000000..6890d728 --- /dev/null +++ b/src/main/java/com/comphenix/protocol/executors/PendingTasks.java @@ -0,0 +1,140 @@ +package com.comphenix.protocol.executors; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.bukkit.plugin.Plugin; +import org.bukkit.scheduler.BukkitScheduler; +import org.bukkit.scheduler.BukkitTask; + +class PendingTasks { + /** + * Represents a wrapper for a cancelable task. + * + * @author Kristian + */ + private interface CancelableFuture { + void cancel(); + boolean isTaskCancelled(); + } + + // Every pending task + private final Set pending = new HashSet<>(); + private final Object pendingLock = new Object(); + + // Handle arbitrary cancelation + private final Plugin plugin; + private final BukkitScheduler scheduler; + private BukkitTask cancellationTask; + + public PendingTasks(Plugin plugin, BukkitScheduler scheduler) { + this.plugin = plugin; + this.scheduler = scheduler; + } + + public void add(final BukkitTask task, final Future future) { + add(new CancelableFuture() { + @Override + public boolean isTaskCancelled() { + // If completed, check its cancellation state + if (future.isDone()) + return future.isCancelled(); + + return !(scheduler.isCurrentlyRunning(task.getTaskId()) || + scheduler.isQueued(task.getTaskId())); + } + + @Override + public void cancel() { + // Make sure + task.cancel(); + future.cancel(true); + } + }); + } + + private CancelableFuture add(CancelableFuture task) { + synchronized (pendingLock) { + pending.add(task); + pendingLock.notifyAll(); + beginCancellationTask(); + return task; + } + } + + private void beginCancellationTask() { + if (cancellationTask == null) { + cancellationTask = scheduler.runTaskTimer(plugin, () -> { + // Check for cancellations + synchronized (pendingLock) { + boolean changed = false; + + for (Iterator it = pending.iterator(); it.hasNext(); ) { + CancelableFuture future = it.next(); + + // Remove cancelled tasks + if (future.isTaskCancelled()) { + future.cancel(); + it.remove(); + changed = true; + } + } + + // Notify waiting threads + if (changed) { + pendingLock.notifyAll(); + } + } + + // Stop if we are out of tasks + if (isTerminated()) { + cancellationTask.cancel(); + cancellationTask = null; + } + }, 1, 1); + } + } + + /** + * Cancel all pending tasks. + */ + public void cancel() { + for (CancelableFuture task : pending) { + task.cancel(); + } + } + + /** + * Wait until all pending tasks have completed. + * @param timeout - the current timeout. + * @param unit - unit of the timeout. + * @return TRUE if every pending task has terminated, FALSE if we reached the timeout. + * @throws InterruptedException + */ + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + long expire = System.nanoTime() + unit.toNanos(timeout); + + synchronized (pendingLock) { + // Wait until the tasks have all terminated + while (!isTerminated()) { + // Check timeout + if (expire < System.nanoTime()) + return false; + unit.timedWait(pendingLock, timeout); + } + } + // Timeout! + return false; + } + + /** + * Determine if all tasks have completed executing. + * @return TRUE if they have, FALSE otherwise. + */ + public boolean isTerminated() { + return pending.isEmpty(); + } +} diff --git a/src/main/java/com/comphenix/protocol/executors/PluginDisabledListener.java b/src/main/java/com/comphenix/protocol/executors/PluginDisabledListener.java new file mode 100644 index 00000000..0a165a35 --- /dev/null +++ b/src/main/java/com/comphenix/protocol/executors/PluginDisabledListener.java @@ -0,0 +1,134 @@ +package com.comphenix.protocol.executors; + +import java.util.Collections; +import java.util.Set; +import java.util.WeakHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import com.google.common.collect.MapMaker; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +import org.bukkit.event.EventPriority; +import org.bukkit.event.Listener; +import org.bukkit.event.server.PluginDisableEvent; +import org.bukkit.plugin.Plugin; + +class PluginDisabledListener implements Listener { + private static final ConcurrentMap LISTENERS = new MapMaker().weakKeys().makeMap(); + + // Objects that must be disabled + private final Set> futures = Collections.newSetFromMap(new WeakHashMap<>()); + private final Set services = Collections.newSetFromMap(new WeakHashMap<>()); + private final Object setLock = new Object(); + + // The plugin we're looking for + private final Plugin plugin; + private boolean disabled; + + private PluginDisabledListener(Plugin plugin) { + this.plugin = plugin; + } + + /** + * Retrieve the associated disabled listener. + * @param plugin - the plugin. + * @return Associated listener. + */ + public static PluginDisabledListener getListener(final Plugin plugin) { + PluginDisabledListener result = LISTENERS.get(plugin); + + if (result == null) { + final PluginDisabledListener created = new PluginDisabledListener(plugin); + result = LISTENERS.putIfAbsent(plugin, created); + + if (result == null) { + // Register listener - we can't use the normal method as the plugin might not be enabled yet + BukkitFutures.registerEventExecutor(plugin, PluginDisableEvent.class, EventPriority.NORMAL, + (listener, event) -> { + if (event instanceof PluginDisableEvent) { + created.onPluginDisabled((PluginDisableEvent) event); + } + }); + + result = created; + } + } + return result; + } + + /** + * Ensure that the given future will be cancelled when the plugin is disabled. + * @param future - the future to cancel. + */ + public void addFuture(final ListenableFuture future) { + synchronized (setLock) { + if (disabled) { + processFuture(future); + } else { + futures.add(future); + } + } + + // Remove the future when it has computed + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(Object value) { + synchronized (setLock) { + futures.remove(future); + } + } + + @Override + public void onFailure(Throwable ex) { + synchronized (setLock) { + futures.remove(future); + } + } + }); + } + + /** + * Ensure that a given service is shutdown when the plugin is disabled. + * @param service - the service. + */ + public void addService(ExecutorService service) { + synchronized (setLock) { + if (disabled) { + processService(service); + } else { + services.add(service); + } + } + } + + // Will be registered manually + public void onPluginDisabled(PluginDisableEvent e) { + if (e.getPlugin().equals(plugin)) { + synchronized (setLock) { + disabled = true; + + // Cancel all unfinished futures + for (Future future : futures) { + processFuture(future); + } + for (ExecutorService service : services) { + processService(service); + } + } + } + } + + private void processFuture(Future future) { + if (!future.isDone()) { + future.cancel(true); + } + } + + private void processService(ExecutorService service) { + service.shutdownNow(); + } +} diff --git a/src/main/java/com/comphenix/protocol/injector/PacketFilterBuilder.java b/src/main/java/com/comphenix/protocol/injector/PacketFilterBuilder.java index 25e42559..d21d9bcc 100644 --- a/src/main/java/com/comphenix/protocol/injector/PacketFilterBuilder.java +++ b/src/main/java/com/comphenix/protocol/injector/PacketFilterBuilder.java @@ -6,7 +6,7 @@ import org.bukkit.Server; import org.bukkit.event.world.WorldInitEvent; import org.bukkit.plugin.Plugin; -import com.comphenix.executors.BukkitFutures; +import com.comphenix.protocol.executors.BukkitFutures; import com.comphenix.protocol.async.AsyncFilterManager; import com.comphenix.protocol.error.ErrorReporter; import com.comphenix.protocol.error.Report;