DataManagerAbstract: Add single threaded task queue (experimental)

We had some reports about server outright crashing because EpicFurnaces (maybe others too) because there are too many async tasks queue at once. Every async task scheduled to spigot (as of 1.17) creates a new thread

This should help a bit and the `FIXME` has to be taken care of when more time is at hand... The whole data storing stuff is just bad which shows as soon as servers (or the plugin use e.g. many furnaces) get bigger... (am a bit annoyed 🤷)
This commit is contained in:
Christian Koop 2021-10-26 21:59:25 +02:00
parent 0bdb0af890
commit 6a858c9c30
No known key found for this signature in database
GPG Key ID: 89A8181384E010A3
2 changed files with 112 additions and 5 deletions

View File

@ -1,6 +1,7 @@
package com.songoda.core;
import com.songoda.core.configuration.Config;
import com.songoda.core.database.DataManagerAbstract;
import com.songoda.core.locale.Locale;
import com.songoda.core.utils.Metrics;
import org.bukkit.Bukkit;
@ -10,6 +11,7 @@ import org.bukkit.configuration.file.FileConfiguration;
import org.bukkit.plugin.java.JavaPlugin;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
/**
@ -19,7 +21,6 @@ import java.util.logging.Level;
* @author jascotty2
*/
public abstract class SongodaPlugin extends JavaPlugin {
protected Locale locale;
protected Config config = new Config(this);
protected long dataLoadDelay = 20L;
@ -36,7 +37,7 @@ public abstract class SongodaPlugin extends JavaPlugin {
public abstract void onDataLoad();
/**
* Called after reloadConfig() is called
* Called after reloadConfig() is called
*/
public abstract void onConfigReload();
@ -175,4 +176,41 @@ public abstract class SongodaPlugin extends JavaPlugin {
}
}
}
protected void shutdownDataManager(DataManagerAbstract dataManager) {
// 3 minutes is overkill, but we just want to make sure
shutdownDataManager(dataManager, 15, TimeUnit.MINUTES.toSeconds(3));
}
protected void shutdownDataManager(DataManagerAbstract dataManager, int reportInterval, long secondsUntilForceShutdown) {
dataManager.shutdownTaskQueue();
while (!dataManager.isTaskQueueTerminated() && secondsUntilForceShutdown > 0) {
long secondsToWait = Math.min(reportInterval, secondsUntilForceShutdown);
try {
if (dataManager.waitForShutdown(secondsToWait, TimeUnit.SECONDS)) {
break;
}
getLogger().info(String.format("A DataManager is currently working on %d tasks... " +
"We are giving him another %d seconds until we forcefully shut him down " +
"(continuing to report in %d second intervals)",
dataManager.getTaskQueueSize(), secondsUntilForceShutdown, reportInterval));
} catch (InterruptedException ignore) {
} finally {
secondsUntilForceShutdown -= secondsToWait;
}
}
if (!dataManager.isTaskQueueTerminated()) {
int unfinishedTasks = dataManager.forceShutdownTaskQueue().size();
if (unfinishedTasks > 0) {
getLogger().log(Level.WARNING,
String.format("A DataManager has been forcefully terminated with %d unfinished tasks - " +
"This can be a serious problem, please report it to us (Songoda)!", unfinishedTasks));
}
}
}
}

View File

@ -11,14 +11,20 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class DataManagerAbstract {
protected final DatabaseConnector databaseConnector;
protected final Plugin plugin;
private static Map<String, LinkedList<Runnable>> queues = new HashMap<>();
protected final ExecutorService asyncPool = Executors.newSingleThreadExecutor();
@Deprecated
private static final Map<String, LinkedList<Runnable>> queues = new HashMap<>();
public DataManagerAbstract(DatabaseConnector databaseConnector, Plugin plugin) {
this.databaseConnector = databaseConnector;
@ -33,13 +39,17 @@ public class DataManagerAbstract {
}
/**
* Deprecated because it is often times not accurate to its use case.
* Deprecated because it is often times not accurate to its use case. (+race-conditions)
*/
@Deprecated
protected int lastInsertedId(Connection connection) {
return lastInsertedId(connection, null);
}
/**
* Deprecated because it is often times not accurate to its use case. (+race-conditions)
*/
@Deprecated
protected int lastInsertedId(Connection connection, String table) {
String select = "SELECT * FROM " + this.getTablePrefix() + table + " ORDER BY id DESC LIMIT 1";
String query;
@ -65,6 +75,7 @@ public class DataManagerAbstract {
*
* @param runnable task to run
*/
@Deprecated
public void async(Runnable runnable) {
Bukkit.getScheduler().runTaskAsynchronously(this.plugin, runnable);
}
@ -78,12 +89,66 @@ public class DataManagerAbstract {
Bukkit.getScheduler().runTask(this.plugin, runnable);
}
public void runAsync(Runnable runnable) {
runAsync(runnable, null);
}
// FIXME: The problem with a single threaded async queue is that the database implementations and this queue
// are **not** thread-safe in any way. The connection is not pooled or anything...
// So the actual problem is that plugins just queue way too much tasks on bulk which it just shouldn't need to do...
public void runAsync(Runnable task, Consumer<Throwable> callback) {
this.asyncPool.execute(() -> {
try {
task.run();
if (callback != null) {
callback.accept(null);
}
} catch (Throwable th) {
if (callback != null) {
callback.accept(th);
return;
}
th.printStackTrace();
}
});
}
public void shutdownTaskQueue() {
this.asyncPool.shutdown();
}
public List<Runnable> forceShutdownTaskQueue() {
return this.asyncPool.shutdownNow();
}
public boolean isTaskQueueTerminated() {
return this.asyncPool.isTerminated();
}
public long getTaskQueueSize() {
if (this.asyncPool instanceof ThreadPoolExecutor) {
return ((ThreadPoolExecutor) this.asyncPool).getTaskCount();
}
return -1;
}
/**
* @see ExecutorService#awaitTermination(long, TimeUnit)
*/
public boolean waitForShutdown(long timeout, TimeUnit unit) throws InterruptedException {
return this.asyncPool.awaitTermination(timeout, unit);
}
/**
* Queue tasks to be ran asynchronously.
*
* @param runnable task to put into queue.
* @param queueKey the queue key to add the runnable to.
*/
@Deprecated
public void queueAsync(Runnable runnable, String queueKey) {
if (queueKey == null) return;
List<Runnable> queue = queues.computeIfAbsent(queueKey, t -> new LinkedList<>());
@ -91,6 +156,7 @@ public class DataManagerAbstract {
if (queue.size() == 1) runQueue(queueKey);
}
@Deprecated
private void runQueue(String queueKey) {
doQueue(queueKey, (s) -> {
if (!queues.get(queueKey).isEmpty())
@ -98,10 +164,13 @@ public class DataManagerAbstract {
});
}
@Deprecated
private void doQueue(String queueKey, Consumer<Boolean> callback) {
Runnable runnable = queues.get(queueKey).getFirst();
async(() -> {
runnable.run();
sync(() -> {
queues.get(queueKey).remove(runnable);
callback.accept(true);