Improve AbstractJavaScheduler (#3326)

This commit is contained in:
Luck 2022-03-19 23:30:31 +00:00
parent 405c5c3cf9
commit fb56189f0a
No known key found for this signature in database
GPG Key ID: EFA9B3EC5FD90F8B
4 changed files with 53 additions and 48 deletions

View File

@ -34,6 +34,7 @@ public class BukkitSchedulerAdapter extends AbstractJavaScheduler implements Sch
private final Executor sync;
public BukkitSchedulerAdapter(LPBukkitBootstrap bootstrap) {
super(bootstrap);
this.sync = r -> bootstrap.getServer().getScheduler().scheduleSyncDelayedTask(bootstrap.getLoader(), r);
}

View File

@ -25,40 +25,44 @@
package me.lucko.luckperms.common.plugin.scheduler;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.checkerframework.checker.nullness.qual.NonNull;
import me.lucko.luckperms.common.plugin.bootstrap.LuckPermsBootstrap;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Arrays;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* Abstract implementation of {@link SchedulerAdapter} using a {@link ScheduledExecutorService}.
*/
public abstract class AbstractJavaScheduler implements SchedulerAdapter {
private static final int PARALLELISM = 16;
private final LuckPermsBootstrap bootstrap;
private final ScheduledThreadPoolExecutor scheduler;
private final ErrorReportingExecutor schedulerWorkerPool;
private final ForkJoinPool worker;
public AbstractJavaScheduler() {
this.scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("luckperms-scheduler")
.build()
);
public AbstractJavaScheduler(LuckPermsBootstrap bootstrap) {
this.bootstrap = bootstrap;
this.scheduler = new ScheduledThreadPoolExecutor(1, r -> {
Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setName("luckperms-scheduler");
return thread;
});
this.scheduler.setRemoveOnCancelPolicy(true);
this.schedulerWorkerPool = new ErrorReportingExecutor(Executors.newCachedThreadPool(new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("luckperms-scheduler-worker-%d")
.build()
));
this.worker = new ForkJoinPool(32, ForkJoinPool.defaultForkJoinWorkerThreadFactory, (t, e) -> e.printStackTrace(), false);
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
this.worker = new ForkJoinPool(PARALLELISM, new WorkerThreadFactory(), new ExceptionHandler(), false);
}
@Override
@ -68,13 +72,13 @@ public abstract class AbstractJavaScheduler implements SchedulerAdapter {
@Override
public SchedulerTask asyncLater(Runnable task, long delay, TimeUnit unit) {
ScheduledFuture<?> future = this.scheduler.schedule(() -> this.schedulerWorkerPool.execute(task), delay, unit);
ScheduledFuture<?> future = this.scheduler.schedule(() -> this.worker.execute(task), delay, unit);
return () -> future.cancel(false);
}
@Override
public SchedulerTask asyncRepeating(Runnable task, long interval, TimeUnit unit) {
ScheduledFuture<?> future = this.scheduler.scheduleAtFixedRate(() -> this.schedulerWorkerPool.execute(task), interval, interval, unit);
ScheduledFuture<?> future = this.scheduler.scheduleAtFixedRate(() -> this.worker.execute(task), interval, interval, unit);
return () -> future.cancel(false);
}
@ -82,7 +86,10 @@ public abstract class AbstractJavaScheduler implements SchedulerAdapter {
public void shutdownScheduler() {
this.scheduler.shutdown();
try {
this.scheduler.awaitTermination(1, TimeUnit.MINUTES);
if (!this.scheduler.awaitTermination(1, TimeUnit.MINUTES)) {
this.bootstrap.getPluginLogger().severe("Timed out waiting for the LuckPerms scheduler to terminate");
reportRunningTasks(thread -> thread.getName().equals("luckperms-scheduler"));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
@ -90,48 +97,43 @@ public abstract class AbstractJavaScheduler implements SchedulerAdapter {
@Override
public void shutdownExecutor() {
this.schedulerWorkerPool.delegate.shutdown();
try {
this.schedulerWorkerPool.delegate.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.worker.shutdown();
try {
this.worker.awaitTermination(1, TimeUnit.MINUTES);
if (!this.worker.awaitTermination(1, TimeUnit.MINUTES)) {
this.bootstrap.getPluginLogger().severe("Timed out waiting for the LuckPerms worker thread pool to terminate");
reportRunningTasks(thread -> thread.getName().startsWith("luckperms-worker-"));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static final class ErrorReportingExecutor implements Executor {
private final ExecutorService delegate;
private ErrorReportingExecutor(ExecutorService delegate) {
this.delegate = delegate;
private void reportRunningTasks(Predicate<Thread> predicate) {
Thread.getAllStackTraces().forEach((thread, stack) -> {
if (predicate.test(thread)) {
this.bootstrap.getPluginLogger().warn("Thread " + thread.getName() + " is blocked, and may be the reason for the slow shutdown!\n" +
Arrays.stream(stack).map(el -> " " + el).collect(Collectors.joining("\n"))
);
}
});
}
private static final class WorkerThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory {
private static final AtomicInteger COUNT = new AtomicInteger(0);
@Override
public void execute(@NonNull Runnable command) {
this.delegate.execute(new ErrorReportingRunnable(command));
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
thread.setDaemon(true);
thread.setName("luckperms-worker-" + COUNT.getAndIncrement());
return thread;
}
}
private static final class ErrorReportingRunnable implements Runnable {
private final Runnable delegate;
private ErrorReportingRunnable(Runnable delegate) {
this.delegate = delegate;
}
private final class ExceptionHandler implements UncaughtExceptionHandler {
@Override
public void run() {
try {
this.delegate.run();
} catch (Exception e) {
e.printStackTrace();
}
public void uncaughtException(Thread t, Throwable e) {
AbstractJavaScheduler.this.bootstrap.getPluginLogger().warn("Thread " + t.getName() + " threw an uncaught exception", e);
}
}
}

View File

@ -33,6 +33,7 @@ public class FabricSchedulerAdapter extends AbstractJavaScheduler {
private final Executor sync;
public FabricSchedulerAdapter(LPFabricBootstrap bootstrap) {
super(bootstrap);
this.sync = r -> bootstrap.getServer().orElseThrow(() -> new IllegalStateException("Server not ready")).submitAndJoin(r);
}

View File

@ -34,6 +34,7 @@ public class NukkitSchedulerAdapter extends AbstractJavaScheduler implements Sch
private final Executor sync;
public NukkitSchedulerAdapter(LPNukkitBootstrap bootstrap) {
super(bootstrap);
this.sync = r -> bootstrap.getServer().getScheduler().scheduleTask(bootstrap.getLoader(), r, false);
}