Monitored Threads.

This commit is contained in:
Brianna OKeefe 2024-03-21 17:09:07 -05:00
parent 60931f0ba3
commit 5d9afba0b2
5 changed files with 270 additions and 59 deletions

View File

@ -1,59 +0,0 @@
package com.craftaro.core.task;
import com.craftaro.core.SongodaPlugin;
import org.bukkit.scheduler.BukkitRunnable;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public abstract class TaskScheduler {
private final SongodaPlugin plugin;
private final Map<Runnable, Long> tasks = new ConcurrentHashMap<>();
private BukkitRunnable runnable;
public TaskScheduler(SongodaPlugin plugin) {
this.plugin = plugin;
}
private void startScheduler() {
if (runnable == null || runnable.isCancelled()) {
runnable = new BukkitRunnable() {
@Override
public void run() {
executeTasks();
}
};
runnable.runTaskTimerAsynchronously(plugin, 20L, 20L);
}
}
private void stopScheduler() {
if (runnable != null && !runnable.isCancelled()) {
runnable.cancel();
}
}
private void executeTasks() {
if (tasks.isEmpty()) {
stopScheduler();
return;
}
long currentTime = System.currentTimeMillis();
Iterator<Map.Entry<Runnable, Long>> iterator = tasks.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Runnable, Long> entry = iterator.next();
if (entry.getValue() <= currentTime) {
entry.getKey().run();
iterator.remove();
}
}
}
public synchronized void addTask(Runnable task, long delay) {
tasks.put(task, System.currentTimeMillis() + delay);
startScheduler();
}
}

View File

@ -0,0 +1,84 @@
package com.craftaro.core.thread;
import java.time.Instant;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class MonitoredThread {
private final String name;
private final int timeout;
private final TimeUnit timeUnit;
private ScheduledExecutorService executor;
private Instant started = null;
private StackTraceElement[] trace = null;
private boolean nonDisruptable = false;
public MonitoredThread(String name, int timeout, TimeUnit timeUnit) {
this.name = name;
this.timeout = timeout;
this.timeUnit = timeUnit;
System.out.println("Thread '" + name + "' was started...");
start();
}
public void execute(Runnable runnable, boolean nonDisruptable) {
this.nonDisruptable = nonDisruptable;
StackTraceElement[] trace = Thread.currentThread().getStackTrace();
executor.execute(() -> {
started = Instant.now();
this.trace = trace;
try {
runnable.run();
} catch (Exception e) {
StackTraceElement[] newTrace = new StackTraceElement[e.getStackTrace().length + trace.length];
System.arraycopy(e.getStackTrace(), 0, newTrace, 0, e.getStackTrace().length);
System.arraycopy(trace, 0, newTrace, e.getStackTrace().length, trace.length);
e.setStackTrace(newTrace);
System.out.println("Thread '" + name + "' failed with exception: " + e.getMessage());
e.printStackTrace();
}
started = null;
});
}
public MonitoredThread start() {
if (executor != null) {
executor.shutdown();
System.out.println("Thread '" + name + "' was restarted due to a stall. Stack trace:");
for (StackTraceElement element : this.trace)
System.out.println(" " + element.toString());
}
executor = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, name));
return this;
}
public String getName() {
return name;
}
public boolean isStalled() {
return !nonDisruptable && started != null && started.plusMillis(timeUnit.toMillis(timeout)).isBefore(Instant.now())
|| started != null && started.plusMillis(TimeUnit.HOURS.toMillis(1)).isBefore(Instant.now());
}
public boolean isRunning() {
return started != null;
}
public Instant getStarted() {
return started;
}
public ScheduledFuture<?> schedule(Runnable runnable, long delay, TimeUnit timeUnit) {
return executor.schedule(runnable, delay, timeUnit);
}
public void destroy() {
executor.shutdownNow();
}
}

View File

@ -0,0 +1,75 @@
package com.craftaro.core.thread;
import java.util.HashSet;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class MonitoredThreadPool {
private final HashSet<MonitoredThread> threads = new HashSet<>();
private final String name;
private final int size;
private int latestThread = 1;
private final int threadTimeout;
private final TimeUnit threadTimeoutUnit;
public MonitoredThreadPool(String name, int size, int timeout, TimeUnit timeUnit) {
this.name = name;
this.size = size;
this.threadTimeout = timeout;
this.threadTimeoutUnit = timeUnit;
for (int i = 0; i < size; i++)
createThread(name);
}
public MonitoredThread createThread(String name) {
MonitoredThread thread = new MonitoredThread((name + "-" + latestThread++).toLowerCase(), threadTimeout, threadTimeoutUnit);
threads.add(thread);
return thread;
}
public void execute(Runnable runnable) {
execute(runnable, false);
}
public void execute(Runnable runnable, boolean nonDisruptable) {
MonitoredThread thread = getHealthyThread();
if (thread != null)
thread.execute(runnable, nonDisruptable);
}
public ScheduledFuture<?> schedule(Runnable runnable, long delay, TimeUnit timeUnit) {
MonitoredThread thread = getHealthyThread();
return thread == null ? null : thread.schedule(runnable, delay, timeUnit);
}
public ScheduledFuture<?> delay(Runnable runnable, long delay, TimeUnit timeUnit) {
return schedule(runnable, delay, timeUnit);
}
private MonitoredThread getHealthyThread() {
for (MonitoredThread thread : threads) {
if (!thread.isRunning()) {
return thread;
} else if (thread.isStalled()) {
thread.start();
onStall();
return thread;
}
}
return null;
}
public int getRunningThreads() {
int runningThreads = 0;
for (MonitoredThread thread : threads)
if (thread.isRunning())
runningThreads++;
return runningThreads;
}
public void onStall() {
// Must be overridden if you want to do something when a thread stalls
}
}

View File

@ -0,0 +1,11 @@
package com.craftaro.core.thread;
import java.util.concurrent.TimeUnit;
public class SingleMonitoredThread extends MonitoredThreadPool {
public SingleMonitoredThread(String name, int timeout, TimeUnit timeUnit) {
super(name, 1, timeout, timeUnit);
}
}

View File

@ -0,0 +1,100 @@
package com.craftaro.core.thread;
import com.craftaro.core.SongodaPlugin;
import org.bukkit.scheduler.BukkitRunnable;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public abstract class TaskScheduler {
private final SongodaPlugin plugin;
private final Map<TaskWrapper, Long> tasks = new ConcurrentHashMap<>();
private BukkitRunnable runnable;
public TaskScheduler(SongodaPlugin plugin) {
this.plugin = plugin;
}
private void startScheduler() {
if (runnable == null || runnable.isCancelled()) {
runnable = new BukkitRunnable() {
@Override
public void run() {
executeTasks();
}
};
runnable.runTaskTimerAsynchronously(plugin, 20L, 20L);
}
}
private void stopScheduler() {
if (runnable != null && !runnable.isCancelled()) {
runnable.cancel();
}
}
private void executeTasks() {
if (tasks.isEmpty()) {
stopScheduler();
return;
}
long currentTime = System.currentTimeMillis();
Iterator<Map.Entry<TaskWrapper, Long>> iterator = tasks.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<TaskWrapper, Long> entry = iterator.next();
if (entry.getValue() <= currentTime) {
TaskWrapper taskWrapper = entry.getKey();
if (taskWrapper.isAsync()) {
// Run the task asynchronously
new BukkitRunnable() {
@Override
public void run() {
taskWrapper.getTask().run();
}
}.runTaskAsynchronously(plugin);
} else {
// Run the task synchronously
new BukkitRunnable() {
@Override
public void run() {
taskWrapper.getTask().run();
}
}.runTask(plugin);
}
iterator.remove();
}
}
}
public synchronized void addTask(Runnable task, long delay, boolean async) {
tasks.put(new TaskWrapper(task, async), System.currentTimeMillis() + delay);
startScheduler();
}
public synchronized void addTask(Runnable task, long delay) {
addTask(task, delay, false);
}
private static class TaskWrapper {
private final Runnable task;
private final boolean async;
public TaskWrapper(Runnable task, boolean async) {
this.task = task;
this.async = async;
}
public Runnable getTask() {
return task;
}
public boolean isAsync() {
return async;
}
}
}