Replace synced async threading with queued async threading.

This commit is contained in:
Brianna 2020-04-18 05:10:54 -04:00
parent 1510d6ec80
commit cb0614019f
2 changed files with 26 additions and 39 deletions

View File

@ -404,8 +404,6 @@ public class SongodaCore {
if (pi != null) { if (pi != null) {
registeredPlugins.remove(pi); registeredPlugins.remove(pi);
} }
// Terminate all active threads
DataManagerAbstract.terminateAllThreads();
if (event.getPlugin() == piggybackedPlugin) { if (event.getPlugin() == piggybackedPlugin) {
// uh-oh! Abandon ship!! // uh-oh! Abandon ship!!
Bukkit.getServicesManager().unregisterAll(piggybackedPlugin); Bukkit.getServicesManager().unregisterAll(piggybackedPlugin);

View File

@ -8,17 +8,17 @@ import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executors; import java.util.function.Consumer;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class DataManagerAbstract { public class DataManagerAbstract {
protected final DatabaseConnector databaseConnector; protected final DatabaseConnector databaseConnector;
protected final Plugin plugin; protected final Plugin plugin;
private static Map<String, ScheduledExecutorService> threads = new HashMap<>(); private static Map<String, LinkedList<Runnable>> queues = new HashMap<>();
public DataManagerAbstract(DatabaseConnector databaseConnector, Plugin plugin) { public DataManagerAbstract(DatabaseConnector databaseConnector, Plugin plugin) {
this.databaseConnector = databaseConnector; this.databaseConnector = databaseConnector;
@ -62,7 +62,6 @@ public class DataManagerAbstract {
/** /**
* Queue a task to be run asynchronously. <br> * Queue a task to be run asynchronously. <br>
* TODO: This needs to be separated from BukkitScheduler
* *
* @param runnable task to run * @param runnable task to run
*/ */
@ -80,43 +79,33 @@ public class DataManagerAbstract {
} }
/** /**
* Queue a task to be run synchronously on a new thread. * Queue tasks to be ran asynchronously.
* *
* @param runnable task to run on the next server tick * @param runnable task to put into queue.
* @param threadKey the thread key to run on. * @param queueKey the queue key to add the runnable to.
*/ */
public void sync(Runnable runnable, String threadKey) { public void queueAsync(Runnable runnable, String queueKey) {
threads.computeIfAbsent(threadKey.toUpperCase(), if (queueKey == null) return;
t -> Executors.newSingleThreadScheduledExecutor()).execute(runnable); List<Runnable> queue = queues.computeIfAbsent(queueKey, t -> new LinkedList<>());
queue.add(runnable);
if (queue.size() == 1) runQueue(queueKey);
} }
/** private void runQueue(String queueKey) {
* Terminate thread once all tasks have been completed. doQueue(queueKey, (s) -> {
* if (!queues.get(queueKey).isEmpty())
* @param threadKey the thread key to terminate. runQueue(queueKey);
*/ });
public static void terminateThread(String threadKey) {
ScheduledExecutorService service = threads.get(threadKey);
if (service != null) {
threads.remove(threadKey);
try {
service.awaitTermination(0, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} }
/** private void doQueue(String queueKey, Consumer<Boolean> callback) {
* Terminate all active threads. Runnable runnable = queues.get(queueKey).getFirst();
*/ async(() -> {
public static void terminateAllThreads() { runnable.run();
for (ScheduledExecutorService service : threads.values()) { sync(() -> {
try { queues.get(queueKey).remove(runnable);
service.awaitTermination(0, TimeUnit.SECONDS); callback.accept(true);
} catch (InterruptedException e) { });
e.printStackTrace(); });
}
}
} }
} }