Run scheduled tasks from other plugins using API in sequence

This prevents two transactions executing at the same time.

Affects issues:
- Fixed #3147
This commit is contained in:
Aurora Lahtela 2023-09-24 09:40:48 +03:00
parent 8bda0ff1c9
commit 55c13d1875
2 changed files with 22 additions and 5 deletions

View File

@ -38,6 +38,7 @@ public class Processing implements SubSystem {
private final ErrorLogger errorLogger;
private ExecutorService nonCriticalExecutor;
private ExecutorService nonCriticalSingleThreadExecutor;
private ExecutorService criticalExecutor;
@Inject
@ -50,6 +51,7 @@ public class Processing implements SubSystem {
this.logger = logger;
this.errorLogger = errorLogger;
nonCriticalExecutor = createExecutor(6, "Plan Non critical-pool-%d");
nonCriticalSingleThreadExecutor = createExecutor(1, "Plan Non critical-pool-single-threaded-%d");
criticalExecutor = createExecutor(2, "Plan Critical-pool-%d");
}
@ -71,13 +73,18 @@ public class Processing implements SubSystem {
}
public CompletableFuture<Boolean> submitNonCritical(Runnable runnable) {
if (runnable == null || nonCriticalExecutor.isShutdown()) {
return submitNonCritical(runnable, false);
}
public CompletableFuture<Boolean> submitNonCritical(Runnable runnable, boolean singleThreaded) {
ExecutorService executorService = singleThreaded ? nonCriticalSingleThreadExecutor : nonCriticalExecutor;
if (runnable == null || executorService.isShutdown()) {
return null;
}
return CompletableFuture.supplyAsync(() -> {
runnable.run();
return true;
}, nonCriticalExecutor).handle(this::exceptionHandlerNonCritical);
}, executorService).handle(this::exceptionHandlerNonCritical);
}
public CompletableFuture<Boolean> submitCritical(Runnable runnable) {
@ -138,6 +145,9 @@ public class Processing implements SubSystem {
if (nonCriticalExecutor.isShutdown()) {
nonCriticalExecutor = createExecutor(6, "Plan Non critical-pool-%d");
}
if (nonCriticalSingleThreadExecutor.isShutdown()) {
nonCriticalSingleThreadExecutor = createExecutor(1, "Plan Non critical-pool-single-threaded-%d");
}
if (criticalExecutor.isShutdown()) {
criticalExecutor = createExecutor(2, "Plan Critical-pool-%d");
}
@ -145,14 +155,15 @@ public class Processing implements SubSystem {
@Override
public void disable() {
shutdownNonCriticalExecutor();
shutdownNonCriticalExecutors();
shutdownCriticalExecutor();
ensureShutdown();
logger.info(locale.get().getString(PluginLang.DISABLED_PROCESSING_COMPLETE));
}
private void shutdownNonCriticalExecutor() {
private void shutdownNonCriticalExecutors() {
nonCriticalExecutor.shutdownNow();
nonCriticalSingleThreadExecutor.shutdownNow();
}
private void shutdownCriticalExecutor() {
@ -184,12 +195,16 @@ public class Processing implements SubSystem {
if (!nonCriticalExecutor.isTerminated()) {
nonCriticalExecutor.shutdownNow();
}
if (!nonCriticalSingleThreadExecutor.isTerminated()) {
nonCriticalSingleThreadExecutor.shutdownNow();
}
if (!criticalExecutor.isTerminated() && !criticalExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
criticalExecutor.shutdownNow();
}
} catch (InterruptedException e) {
logger.error("Processing shutdown thread interrupted: " + e.getMessage());
nonCriticalExecutor.shutdownNow();
nonCriticalSingleThreadExecutor.shutdownNow();
criticalExecutor.shutdownNow();
Thread.currentThread().interrupt();
}

View File

@ -24,6 +24,8 @@ import javax.inject.Singleton;
@Singleton
public class SchedulerSvc implements SchedulerService {
private static final boolean SINGLE_THREADED = true;
private final Processing processing;
@Inject
@ -33,7 +35,7 @@ public class SchedulerSvc implements SchedulerService {
@Override
public void runAsync(Runnable runnable) {
processing.submitNonCritical(runnable);
processing.submitNonCritical(runnable, SINGLE_THREADED);
}
public void register() {