From 6a858c9c30ab2d84f12c6994d4b03f9096329a38 Mon Sep 17 00:00:00 2001 From: Christian Koop Date: Tue, 26 Oct 2021 21:59:25 +0200 Subject: [PATCH] DataManagerAbstract: Add single threaded task queue (experimental) We had some reports about server outright crashing because EpicFurnaces (maybe others too) because there are too many async tasks queue at once. Every async task scheduled to spigot (as of 1.17) creates a new thread This should help a bit and the `FIXME` has to be taken care of when more time is at hand... The whole data storing stuff is just bad which shows as soon as servers (or the plugin use e.g. many furnaces) get bigger... (am a bit annoyed :shrug:) --- .../java/com/songoda/core/SongodaPlugin.java | 42 ++++++++++- .../core/database/DataManagerAbstract.java | 75 ++++++++++++++++++- 2 files changed, 112 insertions(+), 5 deletions(-) diff --git a/Core/src/main/java/com/songoda/core/SongodaPlugin.java b/Core/src/main/java/com/songoda/core/SongodaPlugin.java index e7c8ebc5..de953492 100644 --- a/Core/src/main/java/com/songoda/core/SongodaPlugin.java +++ b/Core/src/main/java/com/songoda/core/SongodaPlugin.java @@ -1,6 +1,7 @@ package com.songoda.core; import com.songoda.core.configuration.Config; +import com.songoda.core.database.DataManagerAbstract; import com.songoda.core.locale.Locale; import com.songoda.core.utils.Metrics; import org.bukkit.Bukkit; @@ -10,6 +11,7 @@ import org.bukkit.configuration.file.FileConfiguration; import org.bukkit.plugin.java.JavaPlugin; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; /** @@ -19,7 +21,6 @@ import java.util.logging.Level; * @author jascotty2 */ public abstract class SongodaPlugin extends JavaPlugin { - protected Locale locale; protected Config config = new Config(this); protected long dataLoadDelay = 20L; @@ -36,7 +37,7 @@ public abstract class SongodaPlugin extends JavaPlugin { public abstract void onDataLoad(); /** - * Called after reloadConfig​() is called + * Called after reloadConfig() is called */ public abstract void onConfigReload(); @@ -175,4 +176,41 @@ public abstract class SongodaPlugin extends JavaPlugin { } } } + + protected void shutdownDataManager(DataManagerAbstract dataManager) { + // 3 minutes is overkill, but we just want to make sure + shutdownDataManager(dataManager, 15, TimeUnit.MINUTES.toSeconds(3)); + } + + protected void shutdownDataManager(DataManagerAbstract dataManager, int reportInterval, long secondsUntilForceShutdown) { + dataManager.shutdownTaskQueue(); + + while (!dataManager.isTaskQueueTerminated() && secondsUntilForceShutdown > 0) { + long secondsToWait = Math.min(reportInterval, secondsUntilForceShutdown); + + try { + if (dataManager.waitForShutdown(secondsToWait, TimeUnit.SECONDS)) { + break; + } + + getLogger().info(String.format("A DataManager is currently working on %d tasks... " + + "We are giving him another %d seconds until we forcefully shut him down " + + "(continuing to report in %d second intervals)", + dataManager.getTaskQueueSize(), secondsUntilForceShutdown, reportInterval)); + } catch (InterruptedException ignore) { + } finally { + secondsUntilForceShutdown -= secondsToWait; + } + } + + if (!dataManager.isTaskQueueTerminated()) { + int unfinishedTasks = dataManager.forceShutdownTaskQueue().size(); + + if (unfinishedTasks > 0) { + getLogger().log(Level.WARNING, + String.format("A DataManager has been forcefully terminated with %d unfinished tasks - " + + "This can be a serious problem, please report it to us (Songoda)!", unfinishedTasks)); + } + } + } } diff --git a/Core/src/main/java/com/songoda/core/database/DataManagerAbstract.java b/Core/src/main/java/com/songoda/core/database/DataManagerAbstract.java index 540b68a6..d0b3b7a8 100644 --- a/Core/src/main/java/com/songoda/core/database/DataManagerAbstract.java +++ b/Core/src/main/java/com/songoda/core/database/DataManagerAbstract.java @@ -11,14 +11,20 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; public class DataManagerAbstract { - protected final DatabaseConnector databaseConnector; protected final Plugin plugin; - private static Map> queues = new HashMap<>(); + protected final ExecutorService asyncPool = Executors.newSingleThreadExecutor(); + + @Deprecated + private static final Map> queues = new HashMap<>(); public DataManagerAbstract(DatabaseConnector databaseConnector, Plugin plugin) { this.databaseConnector = databaseConnector; @@ -33,13 +39,17 @@ public class DataManagerAbstract { } /** - * Deprecated because it is often times not accurate to its use case. + * Deprecated because it is often times not accurate to its use case. (+race-conditions) */ @Deprecated protected int lastInsertedId(Connection connection) { return lastInsertedId(connection, null); } + /** + * Deprecated because it is often times not accurate to its use case. (+race-conditions) + */ + @Deprecated protected int lastInsertedId(Connection connection, String table) { String select = "SELECT * FROM " + this.getTablePrefix() + table + " ORDER BY id DESC LIMIT 1"; String query; @@ -65,6 +75,7 @@ public class DataManagerAbstract { * * @param runnable task to run */ + @Deprecated public void async(Runnable runnable) { Bukkit.getScheduler().runTaskAsynchronously(this.plugin, runnable); } @@ -78,12 +89,66 @@ public class DataManagerAbstract { Bukkit.getScheduler().runTask(this.plugin, runnable); } + public void runAsync(Runnable runnable) { + runAsync(runnable, null); + } + + // FIXME: The problem with a single threaded async queue is that the database implementations and this queue + // are **not** thread-safe in any way. The connection is not pooled or anything... + // So the actual problem is that plugins just queue way too much tasks on bulk which it just shouldn't need to do... + public void runAsync(Runnable task, Consumer callback) { + this.asyncPool.execute(() -> { + try { + task.run(); + + if (callback != null) { + callback.accept(null); + } + } catch (Throwable th) { + if (callback != null) { + callback.accept(th); + return; + } + + th.printStackTrace(); + } + }); + } + + public void shutdownTaskQueue() { + this.asyncPool.shutdown(); + } + + public List forceShutdownTaskQueue() { + return this.asyncPool.shutdownNow(); + } + + public boolean isTaskQueueTerminated() { + return this.asyncPool.isTerminated(); + } + + public long getTaskQueueSize() { + if (this.asyncPool instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor) this.asyncPool).getTaskCount(); + } + + return -1; + } + + /** + * @see ExecutorService#awaitTermination(long, TimeUnit) + */ + public boolean waitForShutdown(long timeout, TimeUnit unit) throws InterruptedException { + return this.asyncPool.awaitTermination(timeout, unit); + } + /** * Queue tasks to be ran asynchronously. * * @param runnable task to put into queue. * @param queueKey the queue key to add the runnable to. */ + @Deprecated public void queueAsync(Runnable runnable, String queueKey) { if (queueKey == null) return; List queue = queues.computeIfAbsent(queueKey, t -> new LinkedList<>()); @@ -91,6 +156,7 @@ public class DataManagerAbstract { if (queue.size() == 1) runQueue(queueKey); } + @Deprecated private void runQueue(String queueKey) { doQueue(queueKey, (s) -> { if (!queues.get(queueKey).isEmpty()) @@ -98,10 +164,13 @@ public class DataManagerAbstract { }); } + @Deprecated private void doQueue(String queueKey, Consumer callback) { Runnable runnable = queues.get(queueKey).getFirst(); + async(() -> { runnable.run(); + sync(() -> { queues.get(queueKey).remove(runnable); callback.accept(true);