Revamped scheduling API (#535)

This commit is contained in:
TheMode 2021-12-16 00:15:55 +01:00
parent df071d4bfb
commit 9717e54aac
21 changed files with 592 additions and 661 deletions

View File

@ -66,7 +66,8 @@ public abstract class GLFWCapableBuffer {
synchronized(GLFWCapableBuffer.class) {
if(threadBindingPool == null) {
threadBindingPool = new ThreadBindingExecutor(MinecraftServer.THREAD_COUNT_SCHEDULER, MinecraftServer.THREAD_NAME_SCHEDULER);
threadBindingPool = new ThreadBindingExecutor(Runtime.getRuntime().availableProcessors()/2,
"GLFWCapableBuffer-ThreadBindingPool");
}
}
}

View File

@ -67,10 +67,6 @@ public final class MinecraftServer {
public static final int THREAD_COUNT_BLOCK_BATCH = getThreadCount("minestom.block-thread-count",
Runtime.getRuntime().availableProcessors() / 2);
public static final String THREAD_NAME_SCHEDULER = "Ms-SchedulerPool";
public static final int THREAD_COUNT_SCHEDULER = getThreadCount("minestom.scheduler-thread-count",
Runtime.getRuntime().availableProcessors() / 2);
public static final String THREAD_NAME_PARALLEL_CHUNK_SAVING = "Ms-ParallelChunkSaving";
public static final int THREAD_COUNT_PARALLEL_CHUNK_SAVING = getThreadCount("minestom.save-thread-count", 2);

View File

@ -9,6 +9,7 @@ import net.minestom.server.network.ConnectionManager;
import net.minestom.server.network.socket.Worker;
import net.minestom.server.thread.MinestomThread;
import net.minestom.server.thread.ThreadDispatcher;
import net.minestom.server.timer.SchedulerManager;
import net.minestom.server.utils.PacketUtils;
import org.jetbrains.annotations.NotNull;
@ -166,6 +167,7 @@ public final class UpdateManager {
@Override
public void run() {
final ConnectionManager connectionManager = MinecraftServer.getConnectionManager();
final SchedulerManager schedulerManager = MinecraftServer.getSchedulerManager();
final List<Worker> workers = MinecraftServer.getServer().workers();
while (!stopRequested) {
try {
@ -175,6 +177,8 @@ public final class UpdateManager {
// Tick start callbacks
doTickCallback(tickStartCallbacks, tickStart);
schedulerManager.processTick();
// Waiting players update (newly connected clients waiting to get into the server)
connectionManager.updateWaitingPlayers();

View File

@ -36,6 +36,8 @@ import net.minestom.server.potion.PotionEffect;
import net.minestom.server.potion.TimedPotion;
import net.minestom.server.tag.Tag;
import net.minestom.server.tag.TagHandler;
import net.minestom.server.timer.Schedulable;
import net.minestom.server.timer.Scheduler;
import net.minestom.server.utils.PacketUtils;
import net.minestom.server.utils.ViewEngine;
import net.minestom.server.utils.async.AsyncUtils;
@ -50,13 +52,15 @@ import net.minestom.server.utils.validate.Check;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jglrxavpok.hephaistos.nbt.NBTCompound;
import org.jglrxavpok.hephaistos.nbt.mutable.MutableNBTCompound;
import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Predicate;
@ -67,7 +71,7 @@ import java.util.function.UnaryOperator;
* <p>
* To create your own entity you probably want to extends {@link LivingEntity} or {@link EntityCreature} instead.
*/
public class Entity implements Viewable, Tickable, TagHandler, PermissionHandler, HoverEventSource<ShowEntity>, Sound.Emitter {
public class Entity implements Viewable, Tickable, Schedulable, TagHandler, PermissionHandler, HoverEventSource<ShowEntity>, Sound.Emitter {
private static final Map<Integer, Entity> ENTITY_BY_ID = new ConcurrentHashMap<>();
private static final Map<UUID, Entity> ENTITY_BY_UUID = new ConcurrentHashMap<>();
@ -145,6 +149,7 @@ public class Entity implements Viewable, Tickable, TagHandler, PermissionHandler
this instanceof Player player ? entity -> entity.viewEngine.viewableOption.removal.accept(player) : null);
protected final Set<Player> viewers = viewEngine.asSet();
private final MutableNBTCompound nbtCompound = new MutableNBTCompound();
private final Scheduler scheduler = Scheduler.newScheduler();
private final Set<Permission> permissions = new CopyOnWriteArraySet<>();
protected UUID uuid;
@ -166,9 +171,6 @@ public class Entity implements Viewable, Tickable, TagHandler, PermissionHandler
private final List<TimedPotion> effects = new CopyOnWriteArrayList<>();
// list of scheduled tasks to be executed during the next entity tick
protected final Queue<Consumer<Entity>> nextTick = new ConcurrentLinkedQueue<>();
// Tick related
private long ticks;
@ -203,7 +205,7 @@ public class Entity implements Viewable, Tickable, TagHandler, PermissionHandler
* @param callback the task to execute during the next entity tick
*/
public void scheduleNextTick(@NotNull Consumer<Entity> callback) {
this.nextTick.add(callback);
this.scheduler.scheduleNextTick(() -> callback.accept(this));
}
/**
@ -531,13 +533,8 @@ public class Entity implements Viewable, Tickable, TagHandler, PermissionHandler
}
// scheduled tasks
{
Consumer<Entity> callback;
while ((callback = nextTick.poll()) != null) {
callback.accept(this);
}
if (isRemoved()) return;
}
this.scheduler.processTick();
if (isRemoved()) return;
// Entity tick
{
@ -1574,6 +1571,11 @@ public class Entity implements Viewable, Tickable, TagHandler, PermissionHandler
tag.write(nbtCompound, value);
}
@Override
public @NotNull Scheduler scheduler() {
return scheduler;
}
/**
* Applies knockback to the entity
*

View File

@ -22,6 +22,8 @@ import net.minestom.server.network.packet.server.play.BlockActionPacket;
import net.minestom.server.network.packet.server.play.TimeUpdatePacket;
import net.minestom.server.tag.Tag;
import net.minestom.server.tag.TagHandler;
import net.minestom.server.timer.Schedulable;
import net.minestom.server.timer.Scheduler;
import net.minestom.server.utils.PacketUtils;
import net.minestom.server.utils.chunk.ChunkUtils;
import net.minestom.server.utils.time.Cooldown;
@ -31,13 +33,11 @@ import net.minestom.server.world.DimensionType;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jglrxavpok.hephaistos.nbt.NBTCompound;
import org.jglrxavpok.hephaistos.nbt.mutable.MutableNBTCompound;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@ -52,7 +52,7 @@ import java.util.stream.Collectors;
* you need to be sure to signal the {@link UpdateManager} of the changes using
* {@link UpdateManager#signalChunkLoad(Chunk)} and {@link UpdateManager#signalChunkUnload(Chunk)}.
*/
public abstract class Instance implements Block.Getter, Block.Setter, Tickable, TagHandler, PacketGroupingAudience {
public abstract class Instance implements Block.Getter, Block.Setter, Tickable, Schedulable, TagHandler, PacketGroupingAudience {
protected static final BlockManager BLOCK_MANAGER = MinecraftServer.getBlockManager();
protected static final UpdateManager UPDATE_MANAGER = MinecraftServer.getUpdateManager();
@ -80,13 +80,12 @@ public abstract class Instance implements Block.Getter, Block.Setter, Tickable,
// the uuid of this instance
protected UUID uniqueId;
// list of scheduled tasks to be executed during the next instance tick
protected final Queue<Consumer<Instance>> nextTick = new ConcurrentLinkedQueue<>();
// instance custom data
private final Object nbtLock = new Object();
private final MutableNBTCompound nbt = new MutableNBTCompound();
private final Scheduler scheduler = Scheduler.newScheduler();
// the explosion supplier
private ExplosionSupplier explosionSupplier;
@ -121,7 +120,7 @@ public abstract class Instance implements Block.Getter, Block.Setter, Tickable,
* @param callback the task to execute during the next instance tick
*/
public void scheduleNextTick(@NotNull Consumer<Instance> callback) {
this.nextTick.add(callback);
this.scheduler.scheduleNextTick(() -> callback.accept(this));
}
@ApiStatus.Internal
@ -574,12 +573,7 @@ public abstract class Instance implements Block.Getter, Block.Setter, Tickable,
@Override
public void tick(long time) {
// Scheduled tasks
{
Consumer<Instance> callback;
while ((callback = nextTick.poll()) != null) {
callback.accept(this);
}
}
this.scheduler.processTick();
// Time
{
this.worldAge++;
@ -615,6 +609,11 @@ public abstract class Instance implements Block.Getter, Block.Setter, Tickable,
}
}
@Override
public @NotNull Scheduler scheduler() {
return scheduler;
}
/**
* Creates an explosion at the given position with the given strength.
* The algorithm used to compute damages is provided by {@link #getExplosionSupplier()}.

View File

@ -36,7 +36,6 @@ public final class BenchmarkManager {
static {
THREADS.add(THREAD_NAME_BLOCK_BATCH);
THREADS.add(THREAD_NAME_SCHEDULER);
THREADS.add(THREAD_NAME_TICK_SCHEDULER);
THREADS.add(THREAD_NAME_TICK);
}

View File

@ -0,0 +1,6 @@
package net.minestom.server.timer;
public enum ExecutionType {
SYNC,
ASYNC
}

View File

@ -0,0 +1,7 @@
package net.minestom.server.timer;
import org.jetbrains.annotations.NotNull;
public interface Schedulable {
@NotNull Scheduler scheduler();
}

View File

@ -0,0 +1,70 @@
package net.minestom.server.timer;
import org.jetbrains.annotations.NotNull;
import java.util.function.Supplier;
/**
* Represents a scheduler that will execute tasks with a precision based on its ticking rate.
* If precision is important, consider using a JDK executor service or any third party library.
* <p>
* Tasks are by default executed in the caller thread.
*/
public sealed interface Scheduler permits SchedulerImpl, SchedulerManager {
static @NotNull Scheduler newScheduler() {
return new SchedulerImpl();
}
/**
* Process scheduled tasks based on time to increase scheduling precision.
* <p>
* This method is not thread-safe.
*/
void process();
/**
* Advance 1 tick and call {@link #process()}.
* <p>
* This method is not thread-safe.
*/
void processTick();
/**
* Submits a new task with custom scheduling logic.
* <p>
* This is the primitive method used by all scheduling shortcuts,
* {@code task} is immediately executed in the caller thread to retrieve its scheduling state
* and the task will stay alive as long as {@link TaskSchedule#stop()} is not returned (or {@link Task#cancel()} is called).
*
* @param task the task to be directly executed in the caller thread
* @param executionType the execution type
* @return the created task
*/
@NotNull Task submitTask(@NotNull Supplier<TaskSchedule> task, @NotNull ExecutionType executionType);
default @NotNull Task submitTask(@NotNull Supplier<TaskSchedule> task) {
return submitTask(task, ExecutionType.SYNC);
}
default @NotNull Task.Builder buildTask(@NotNull Runnable task) {
return new Task.Builder(this, task);
}
default @NotNull Task scheduleTask(@NotNull Runnable task,
@NotNull TaskSchedule delay, @NotNull TaskSchedule repeat,
@NotNull ExecutionType executionType) {
return buildTask(task).delay(delay).repeat(repeat).executionType(executionType).schedule();
}
default @NotNull Task scheduleTask(@NotNull Runnable task, @NotNull TaskSchedule delay, @NotNull TaskSchedule repeat) {
return scheduleTask(task, delay, repeat, ExecutionType.SYNC);
}
default @NotNull Task scheduleNextTick(@NotNull Runnable task, @NotNull ExecutionType executionType) {
return buildTask(task).delay(TaskSchedule.nextTick()).executionType(executionType).schedule();
}
default @NotNull Task scheduleNextTick(@NotNull Runnable task) {
return scheduleNextTick(task, ExecutionType.SYNC);
}
}

View File

@ -0,0 +1,129 @@
package net.minestom.server.timer;
import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
import org.jctools.queues.MpscGrowableArrayQueue;
import org.jetbrains.annotations.NotNull;
import org.roaringbitmap.RoaringBitmap;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
final class SchedulerImpl implements Scheduler {
private static final AtomicInteger TASK_COUNTER = new AtomicInteger();
private static final ScheduledExecutorService SCHEDULER = Executors.newSingleThreadScheduledExecutor(r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
});
private static final ForkJoinPool EXECUTOR = ForkJoinPool.commonPool();
private final Set<TaskImpl> tasks = new HashSet<>();
private final RoaringBitmap bitSet = new RoaringBitmap();
private final Int2ObjectAVLTreeMap<List<TaskImpl>> tickTaskQueue = new Int2ObjectAVLTreeMap<>();
private final MpscGrowableArrayQueue<TaskImpl> taskQueue = new MpscGrowableArrayQueue<>(64);
private final Set<TaskImpl> parkedTasks = ConcurrentHashMap.newKeySet();
private int tickState;
@Override
public void process() {
processTick(0);
}
@Override
public void processTick() {
processTick(1);
}
private void processTick(int tickDelta) {
synchronized (this) {
this.tickState += tickDelta;
int tickToProcess;
while (!tickTaskQueue.isEmpty() && (tickToProcess = tickTaskQueue.firstIntKey()) <= tickState) {
final List<TaskImpl> tickScheduledTasks = tickTaskQueue.remove(tickToProcess);
if (tickScheduledTasks != null) tickScheduledTasks.forEach(taskQueue::relaxedOffer);
}
}
// Run all tasks lock-free, either in the current thread or pool
this.taskQueue.drain(task -> {
if (!task.isAlive()) return;
switch (task.executionType()) {
case SYNC -> handleTask(task);
case ASYNC -> EXECUTOR.submit(() -> handleTask(task));
}
});
}
@Override
public @NotNull Task submitTask(@NotNull Supplier<TaskSchedule> task,
@NotNull ExecutionType executionType) {
final TaskImpl taskRef = register(task, executionType);
handleTask(taskRef);
return taskRef;
}
void unparkTask(TaskImpl task) {
if (parkedTasks.remove(task)) {
this.taskQueue.relaxedOffer(task);
}
}
boolean isTaskParked(TaskImpl task) {
return parkedTasks.contains(task);
}
synchronized void cancelTask(TaskImpl task) {
this.bitSet.remove(task.id());
if (!tasks.remove(task)) throw new IllegalStateException("Task is not scheduled");
}
synchronized boolean isTaskAlive(TaskImpl task) {
return bitSet.contains(task.id());
}
private synchronized TaskImpl register(@NotNull Supplier<TaskSchedule> task,
@NotNull ExecutionType executionType) {
TaskImpl taskRef = new TaskImpl(TASK_COUNTER.getAndIncrement(), task,
executionType, this);
this.bitSet.add(taskRef.id());
this.tasks.add(taskRef);
return taskRef;
}
private void safeExecute(TaskImpl task) {
// Prevent the task from being executed in the current thread
// By either adding the task to the execution queue or submitting it to the pool
switch (task.executionType()) {
case SYNC -> taskQueue.offer(task);
case ASYNC -> EXECUTOR.submit(() -> handleTask(task));
}
}
private void handleTask(TaskImpl task) {
final TaskSchedule schedule = task.task().get();
if (schedule instanceof TaskScheduleImpl.DurationSchedule durationSchedule) {
final Duration duration = durationSchedule.duration();
SCHEDULER.schedule(() -> safeExecute(task), duration.toMillis(), TimeUnit.MILLISECONDS);
} else if (schedule instanceof TaskScheduleImpl.TickSchedule tickSchedule) {
synchronized (this) {
final int target = tickState + tickSchedule.tick();
this.tickTaskQueue.computeIfAbsent(target, i -> new ArrayList<>()).add(task);
}
} else if (schedule instanceof TaskScheduleImpl.FutureSchedule futureSchedule) {
futureSchedule.future().thenRun(() -> safeExecute(task));
} else if (schedule instanceof TaskScheduleImpl.Park) {
this.parkedTasks.add(task);
} else if (schedule instanceof TaskScheduleImpl.Stop) {
cancelTask(task);
} else if (schedule instanceof TaskScheduleImpl.Immediate) {
this.taskQueue.relaxedOffer(task);
}
}
}

View File

@ -1,259 +1,35 @@
package net.minestom.server.timer;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import net.minestom.server.MinecraftServer;
import net.minestom.server.extensions.Extension;
import net.minestom.server.extensions.IExtensionObserver;
import net.minestom.server.thread.MinestomThreadPool;
import org.jctools.queues.MpmcUnboundedXaddArrayQueue;
import org.jetbrains.annotations.NotNull;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
/**
* An object which manages all the {@link Task}'s.
* <p>
* {@link Task} first need to be built with {@link #buildTask(Runnable)}, you can then specify a delay with as example
* {@link TaskBuilder#delay(long, java.time.temporal.TemporalUnit)}
* or {@link TaskBuilder#repeat(long, java.time.temporal.TemporalUnit)},
* and to finally schedule: {@link TaskBuilder#schedule()}.
* <p>
* Shutdown tasks are built with {@link #buildShutdownTask(Runnable)} and are executed, as the name implies, when the server stops.
*/
public final class SchedulerManager implements IExtensionObserver {
public final class SchedulerManager implements Scheduler {
private final Scheduler scheduler = Scheduler.newScheduler();
private final MpmcUnboundedXaddArrayQueue<Runnable> shutdownTasks = new MpmcUnboundedXaddArrayQueue<>(1024);
private static boolean instanced;
// A counter for all normal tasks
private final AtomicInteger counter;
// A counter for all shutdown tasks
private final AtomicInteger shutdownCounter;
//A threaded execution
private final ExecutorService batchesPool;
// A single threaded scheduled execution
private final ScheduledExecutorService timerExecutionService;
// All the registered tasks (task id = task)
protected final Int2ObjectMap<Task> tasks;
// All the registered shutdown tasks (task id = task)
protected final Int2ObjectMap<Task> shutdownTasks;
/**
* Tasks scheduled through extensions
*/
private final Map<String, List<Task>> extensionTasks = new ConcurrentHashMap<>();
/**
* Shutdown tasks scheduled through extensions
*/
private final Map<String, List<Task>> extensionShutdownTasks = new ConcurrentHashMap<>();
/**
* Default constructor
*/
public SchedulerManager() {
if (instanced) {
throw new IllegalStateException("You cannot instantiate a SchedulerManager," +
" use MinecraftServer.getSchedulerManager()");
}
SchedulerManager.instanced = true;
this.counter = new AtomicInteger();
this.shutdownCounter = new AtomicInteger();
this.batchesPool = new MinestomThreadPool(MinecraftServer.THREAD_COUNT_SCHEDULER, MinecraftServer.THREAD_NAME_SCHEDULER);
this.timerExecutionService = Executors.newSingleThreadScheduledExecutor();
this.tasks = new Int2ObjectOpenHashMap<>();
this.shutdownTasks = new Int2ObjectOpenHashMap<>();
}
/**
* Initializes a new {@link TaskBuilder} for creating a {@link Task}.
*
* @param runnable The {@link Task} to run when scheduled
* @return the {@link TaskBuilder}
*/
@NotNull
public TaskBuilder buildTask(@NotNull Runnable runnable) {
return new TaskBuilder(this, runnable);
}
/**
* Initializes a new {@link TaskBuilder} for creating a shutdown {@link Task}.
*
* @param runnable The shutdown {@link Task} to run when scheduled
* @return the {@link TaskBuilder}
*/
@NotNull
public TaskBuilder buildShutdownTask(@NotNull Runnable runnable) {
return new TaskBuilder(this, runnable, true);
}
/**
* Removes/Forces the end of a {@link Task}.
* <p>
* {@link Task#cancel()} can also be used instead.
*
* @param task The {@link Task} to remove
*/
public void removeTask(@NotNull Task task) {
task.cancel();
}
/**
* Shutdowns all normal tasks and call the registered shutdown tasks.
*/
public void shutdown() {
MinecraftServer.LOGGER.info("Executing all shutdown tasks..");
for (Task task : this.getShutdownTasks()) {
task.runRunnable();
}
MinecraftServer.LOGGER.info("Shutting down the scheduled execution service and batches pool.");
this.timerExecutionService.shutdown();
this.batchesPool.shutdown();
try {
batchesPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
MinecraftServer.getExceptionManager().handleException(e);
}
}
/**
* Increments the current counter value.
*
* @return the updated counter value
*/
protected int getCounterIdentifier() {
return this.counter.incrementAndGet();
}
/**
* Increments the current shutdown counter value.
*
* @return the updated shutdown counter value
*/
protected int getShutdownCounterIdentifier() {
return this.shutdownCounter.incrementAndGet();
}
/**
* Gets a {@link Collection} with all the registered {@link Task}.
* <p>
* Be aware that the collection is not thread-safe.
*
* @return a {@link Collection} with all the registered {@link Task}
*/
public @NotNull Collection<Task> getTasks() {
return tasks.values();
}
/**
* Returns the task associated with this task id
*
* @param id the id of the task
* @return task the task itself
*/
public Task getTask(int id) {
return tasks.get(id);
}
/**
* Gets a {@link Collection} with all the registered shutdown {@link Task}.
*
* @return a {@link Collection} with all the registered shutdown {@link Task}
*/
public @NotNull Collection<Task> getShutdownTasks() {
return shutdownTasks.values();
}
/**
* Returns the shutdown task associated with this task id
*
* @param id the id of the task
* @return task the shutdown task itself
*/
public Task getShutdownTask(int id) {
return shutdownTasks.get(id);
}
/**
* Gets the execution service for all the registered {@link Task}.
*
* @return the execution service for all the registered {@link Task}
*/
@NotNull
public ExecutorService getBatchesPool() {
return batchesPool;
}
/**
* Gets the scheduled execution service for all the registered {@link Task}.
*
* @return the scheduled execution service for all the registered {@link Task}
*/
@NotNull
public ScheduledExecutorService getTimerExecutionService() {
return timerExecutionService;
}
/**
* Called when a Task from an extension is scheduled.
*
* @param owningExtension the name of the extension which scheduled the task
* @param task the task that has been scheduled
*/
void onScheduleFromExtension(String owningExtension, Task task) {
List<Task> scheduledForThisExtension = extensionTasks.computeIfAbsent(owningExtension, s -> new CopyOnWriteArrayList<>());
scheduledForThisExtension.add(task);
Extension ext = MinecraftServer.getExtensionManager().getExtension(owningExtension);
ext.observe(this);
}
/**
* Called when a Task from an extension is scheduled for server shutdown.
*
* @param owningExtension the name of the extension which scheduled the task
* @param task the task that has been scheduled
*/
void onScheduleShutdownFromExtension(String owningExtension, Task task) {
List<Task> scheduledForThisExtension = extensionShutdownTasks.computeIfAbsent(owningExtension, s -> new CopyOnWriteArrayList<>());
scheduledForThisExtension.add(task);
Extension ext = MinecraftServer.getExtensionManager().getExtension(owningExtension);
ext.observe(this);
}
/**
* Unschedules all non-transient tasks ({@link Task#isTransient()}) from this scheduler. Tasks are allowed to complete
*
* @param extension the name of the extension to unschedule tasks from
* @see Task#isTransient()
*/
public void removeExtensionTasksOnUnload(String extension) {
List<Task> scheduledForThisExtension = extensionTasks.get(extension);
if (scheduledForThisExtension != null) {
List<Task> toCancel = scheduledForThisExtension.stream()
.filter(t -> !t.isTransient()).toList();
toCancel.forEach(Task::cancel);
scheduledForThisExtension.removeAll(toCancel);
}
List<Task> shutdownScheduledForThisExtension = extensionShutdownTasks.get(extension);
if (shutdownScheduledForThisExtension != null) {
List<Task> toCancel = shutdownScheduledForThisExtension.stream()
.filter(t -> !t.isTransient()).toList();
toCancel.forEach(Task::cancel);
shutdownScheduledForThisExtension.removeAll(toCancel);
shutdownTasks.values().removeAll(toCancel);
}
@Override
public void process() {
this.scheduler.process();
}
@Override
public void onExtensionUnload(String extensionName) {
removeExtensionTasksOnUnload(extensionName);
public void processTick() {
this.scheduler.processTick();
}
@Override
public @NotNull Task submitTask(@NotNull Supplier<TaskSchedule> task,
@NotNull ExecutionType executionType) {
return scheduler.submitTask(task, executionType);
}
public void shutdown() {
this.shutdownTasks.drain(Runnable::run);
}
public void buildShutdownTask(@NotNull Runnable runnable) {
this.shutdownTasks.relaxedOffer(runnable);
}
}

View File

@ -1,203 +1,91 @@
package net.minestom.server.timer;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import net.minestom.server.MinecraftServer;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.Objects;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.function.Supplier;
/**
* An Object that represents a task that is scheduled for execution on the application.
* <p>
* Tasks are built in {@link SchedulerManager} and scheduled by a {@link TaskBuilder}.
*/
public class Task implements Runnable {
public sealed interface Task permits TaskImpl {
int id();
// Manages all tasks
private final SchedulerManager schedulerManager;
// The task logic
private final Runnable runnable;
// Task identifier
private final int id;
// True if the task planned for the application shutdown
private final boolean shutdown;
// Delay value for the task execution
private final long delay;
// Repeat value for the task execution
private final long repeat;
@NotNull ExecutionType executionType();
/** Extension which owns this task, or null if none */
private final String owningExtension;
/**
* If this task is owned by an extension, should it survive the unloading of said extension?
* May be useful for delay tasks, but it can prevent the extension classes from being unloaded, and preventing a full
* reload of that extension.
*/
private final boolean isTransient;
// Task completion/execution
private ScheduledFuture<?> future;
// The thread of the task
private volatile Thread currentThreadTask;
@NotNull Scheduler owner();
/**
* Creates a task.
*
* @param schedulerManager The manager for the task
* @param runnable The task to run when scheduled
* @param shutdown Defines whether the task is a shutdown task
* @param delay The time to delay
* @param repeat The time until the repetition
* Unpark the tasks to be executed during next processing.
*/
public Task(@NotNull SchedulerManager schedulerManager, @NotNull Runnable runnable, boolean shutdown, long delay, long repeat, boolean isTransient, @Nullable String owningExtension) {
this.schedulerManager = schedulerManager;
this.runnable = runnable;
this.shutdown = shutdown;
this.id = shutdown ? this.schedulerManager.getShutdownCounterIdentifier() : this.schedulerManager.getCounterIdentifier();
this.delay = delay;
this.repeat = repeat;
this.isTransient = isTransient;
this.owningExtension = owningExtension;
}
void unpark();
/**
* Executes the task.
*/
@Override
public void run() {
this.schedulerManager.getBatchesPool().execute(() -> {
this.currentThreadTask = Thread.currentThread();
try {
this.runnable.run();
} catch (Exception e) {
System.err.printf(
"An exception in %s task %s is occurred! (%s)%n",
this.shutdown ? "shutdown" : "",
this.id,
e.getMessage()
);
MinecraftServer.getExceptionManager().handleException(e);
} finally {
if (this.repeat == 0) this.finish();
this.currentThreadTask = null;
}
});
}
boolean isParked();
/**
* Executes the internal runnable.
* <p>
* Should probably use {@link #schedule()} instead.
*/
public void runRunnable() {
this.runnable.run();
}
void cancel();
/**
* Sets up the task for correct execution.
*/
public void schedule() {
if(this.shutdown) {
Int2ObjectMap<Task> shutdownTasks = this.schedulerManager.shutdownTasks;
synchronized (shutdownTasks) {
shutdownTasks.put(getId(), this);
}
if (owningExtension != null) {
this.schedulerManager.onScheduleShutdownFromExtension(owningExtension, this);
}
} else {
Int2ObjectMap<Task> tasks = this.schedulerManager.tasks;
synchronized (tasks) {
tasks.put(getId(), this);
}
if (owningExtension != null) {
this.schedulerManager.onScheduleFromExtension(owningExtension, this);
}
this.future = this.repeat == 0L ?
this.schedulerManager.getTimerExecutionService().schedule(this, this.delay, TimeUnit.MILLISECONDS) :
this.schedulerManager.getTimerExecutionService().scheduleAtFixedRate(this, this.delay, this.repeat, TimeUnit.MILLISECONDS);
boolean isAlive();
final class Builder {
private final Scheduler scheduler;
private final Runnable runnable;
private ExecutionType executionType = ExecutionType.SYNC;
private TaskSchedule delay = TaskSchedule.immediate();
private TaskSchedule repeat = TaskSchedule.stop();
Builder(Scheduler scheduler, Runnable runnable) {
this.scheduler = scheduler;
this.runnable = runnable;
}
public @NotNull Builder executionType(@NotNull ExecutionType executionType) {
this.executionType = executionType;
return this;
}
public @NotNull Builder delay(@NotNull TaskSchedule schedule) {
this.delay = schedule;
return this;
}
public @NotNull Builder repeat(@NotNull TaskSchedule schedule) {
this.repeat = schedule;
return this;
}
public @NotNull Task schedule() {
var runnable = this.runnable;
var delay = this.delay;
var repeat = this.repeat;
var executionType = this.executionType;
return scheduler.submitTask(new Supplier<>() {
boolean first = true;
@Override
public TaskSchedule get() {
if (first) {
first = false;
return delay;
}
runnable.run();
return repeat;
}
}, executionType);
}
public @NotNull Builder delay(@NotNull Duration duration) {
return delay(TaskSchedule.duration(duration));
}
public @NotNull Builder delay(long time, @NotNull TemporalUnit unit) {
return delay(Duration.of(time, unit));
}
public @NotNull Builder repeat(@NotNull Duration duration) {
return repeat(TaskSchedule.duration(duration));
}
public @NotNull Builder repeat(long time, @NotNull TemporalUnit unit) {
return repeat(Duration.of(time, unit));
}
}
/**
* Gets the current status of the task.
*
* @return the current stats of the task
*/
@NotNull
public TaskStatus getStatus() {
if (this.future == null) return TaskStatus.SCHEDULED;
if (this.future.isCancelled()) return TaskStatus.CANCELLED;
if (this.future.isDone()) return TaskStatus.FINISHED;
return TaskStatus.SCHEDULED;
}
/**
* Cancels this task. If the task is already running, the thread in which it is running is interrupted.
* If the task is not currently running, Minestom will safely terminate it.
*/
public void cancel() {
if (this.future != null) {
this.future.cancel(false);
Thread current = this.currentThreadTask;
if (current != null) current.interrupt();
this.finish();
}
}
/**
* Gets the id of this task.
*
* @return the task id
*/
public int getId() {
return id;
}
/**
* Removes the task from the {@link SchedulerManager} map.
*/
private void finish() {
Int2ObjectMap<Task> taskMap = shutdown ?
this.schedulerManager.shutdownTasks :
this.schedulerManager.tasks;
synchronized (taskMap) {
taskMap.remove(getId());
}
}
@Override
public boolean equals(Object object) {
if (this == object) return true;
if (object == null || getClass() != object.getClass()) return false;
Task task = (Task) object;
return id == task.id;
}
/**
* If this task is owned by an extension, should it survive the unloading of said extension?
* May be useful for delay tasks, but it can prevent the extension classes from being unloaded, and preventing a full
* reload of that extension.
*/
public boolean isTransient() {
return isTransient;
}
/**
* Extension which owns this task, or null if none
*/
public String getOwningExtension() {
return owningExtension;
}
@Override
public int hashCode() {
return Objects.hash(id);
}
}

View File

@ -1,167 +0,0 @@
package net.minestom.server.timer;
import net.minestom.server.extras.selfmodification.MinestomRootClassLoader;
import org.jetbrains.annotations.NotNull;
import java.time.Duration;
import java.time.temporal.TemporalUnit;
/**
* A builder which represents a fluent Object to schedule tasks.
* <p>
* You can specify a delay with {@link #delay(long, TemporalUnit)} or {@link #repeat(long, TemporalUnit)}
* and then schedule the {@link Task} with {@link #schedule()}.
*/
public class TaskBuilder {
// Manager for the tasks
private final SchedulerManager schedulerManager;
// The logic behind every task
private final Runnable runnable;
// True if the task planned for the application shutdown
private final boolean shutdown;
/**
* Extension which owns this task, or null if none
*/
private final String owningExtension;
// Delay value for the task execution
private long delay;
// Repeat value for the task execution
private long repeat;
/**
* If this task is owned by an extension, should it survive the unloading of said extension?
* May be useful for delay tasks, but it can prevent the extension classes from being unloaded, and preventing a full
* reload of that extension.
*/
private boolean isTransient;
/**
* Creates a task builder.
* <br>
* <b>Note:</b> The task builder creates a normal task.
*
* @param schedulerManager The manager for the tasks
* @param runnable The task to run when scheduled
*/
public TaskBuilder(@NotNull SchedulerManager schedulerManager, @NotNull Runnable runnable) {
this(schedulerManager, runnable, false);
}
/**
* Creates a task builder.
*
* @param schedulerManager The manager for the tasks
* @param runnable The task to run when scheduled
* @param shutdown Defines whether the task is a shutdown task
*/
public TaskBuilder(@NotNull SchedulerManager schedulerManager, @NotNull Runnable runnable, boolean shutdown) {
this.schedulerManager = schedulerManager;
this.runnable = runnable;
this.shutdown = shutdown;
this.isTransient = false;
this.owningExtension = MinestomRootClassLoader.findExtensionObjectOwner(runnable);
}
/**
* Specifies that the {@link Task} should delay its execution by the specified amount of time.
*
* @param time The time to delay
* @param unit The unit of time for {@code time}
* @return this builder, for chaining
*/
public @NotNull TaskBuilder delay(long time, @NotNull TemporalUnit unit) {
return delay(Duration.of(time, unit));
}
/**
* Specifies that the {@link Task} should delay its execution by the specified amount of time.
*
* @param duration the Duration for this builder.
* @return this builder, for chaining
*/
public @NotNull TaskBuilder delay(@NotNull Duration duration) {
this.delay = duration.toMillis();
return this;
}
/**
* Specifies that the {@link Task} should continue to run after waiting for the specified value until it is terminated.
*
* @param time The time until the repetition
* @param unit The {@link TemporalUnit} for {@code time}
* @return this builder, for chaining
*/
public @NotNull TaskBuilder repeat(long time, @NotNull TemporalUnit unit) {
return repeat(Duration.of(time, unit));
}
/**
* Specifies that the {@link Task} should continue to run after waiting for the specified value until it is terminated.
*
* @param duration the Duration for this builder.
* @return this builder, for chaining
*/
public @NotNull TaskBuilder repeat(@NotNull Duration duration) {
this.repeat = duration.toMillis();
return this;
}
/**
* Clears the delay interval of the {@link Task}.
*
* @return this builder, for chaining
*/
public @NotNull TaskBuilder clearDelay() {
this.delay = 0L;
return this;
}
/**
* Clears the repeat interval of the {@link Task}.
*
* @return this builder, for chaining
*/
public @NotNull TaskBuilder clearRepeat() {
this.repeat = 0L;
return this;
}
/**
* If this task is owned by an extension, should it survive the unloading of said extension?
* May be useful for delay tasks, but it can prevent the extension classes from being unloaded, and preventing a full
* reload of that extension.
*/
public TaskBuilder makeTransient() {
isTransient = true;
return this;
}
/**
* Builds a {@link Task}.
*
* @return the built {@link Task}
*/
@NotNull
public Task build() {
return new Task(
this.schedulerManager,
this.runnable,
this.shutdown,
this.delay,
this.repeat,
this.isTransient,
this.owningExtension);
}
/**
* Schedules this {@link Task} for execution.
*
* @return the scheduled {@link Task}
*/
@NotNull
public Task schedule() {
Task task = build();
task.schedule();
return task;
}
}

View File

@ -0,0 +1,30 @@
package net.minestom.server.timer;
import org.jetbrains.annotations.NotNull;
import java.util.function.Supplier;
record TaskImpl(int id,
@NotNull Supplier<TaskSchedule> task,
@NotNull ExecutionType executionType,
@NotNull SchedulerImpl owner) implements Task {
@Override
public void unpark() {
this.owner.unparkTask(this);
}
@Override
public boolean isParked() {
return owner.isTaskParked(this);
}
@Override
public void cancel() {
this.owner.cancelTask(this);
}
@Override
public boolean isAlive() {
return owner.isTaskAlive(this);
}
}

View File

@ -0,0 +1,65 @@
package net.minestom.server.timer;
import org.jetbrains.annotations.NotNull;
import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.concurrent.CompletableFuture;
public sealed interface TaskSchedule permits
TaskScheduleImpl.DurationSchedule,
TaskScheduleImpl.FutureSchedule,
TaskScheduleImpl.Immediate,
TaskScheduleImpl.Park,
TaskScheduleImpl.Stop,
TaskScheduleImpl.TickSchedule {
static @NotNull TaskSchedule duration(@NotNull Duration duration) {
return new TaskScheduleImpl.DurationSchedule(duration);
}
static @NotNull TaskSchedule tick(int tick) {
return new TaskScheduleImpl.TickSchedule(tick);
}
static @NotNull TaskSchedule future(@NotNull CompletableFuture<?> future) {
return new TaskScheduleImpl.FutureSchedule(future);
}
static @NotNull TaskSchedule park() {
return TaskScheduleImpl.PARK;
}
static @NotNull TaskSchedule stop() {
return TaskScheduleImpl.STOP;
}
static @NotNull TaskSchedule immediate() {
return TaskScheduleImpl.IMMEDIATE;
}
// Shortcuts
static @NotNull TaskSchedule duration(long amount, @NotNull TemporalUnit unit) {
return duration(Duration.of(amount, unit));
}
static @NotNull TaskSchedule nextTick() {
return TaskScheduleImpl.NEXT_TICK;
}
static @NotNull TaskSchedule hours(long hours) {
return duration(Duration.ofHours(hours));
}
static @NotNull TaskSchedule minutes(long minutes) {
return duration(Duration.ofMinutes(minutes));
}
static @NotNull TaskSchedule seconds(long seconds) {
return duration(Duration.ofSeconds(seconds));
}
static @NotNull TaskSchedule millis(long millis) {
return duration(Duration.ofMillis(millis));
}
}

View File

@ -0,0 +1,35 @@
package net.minestom.server.timer;
import org.jetbrains.annotations.NotNull;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
final class TaskScheduleImpl {
static TaskSchedule NEXT_TICK = new TickSchedule(1);
static TaskSchedule PARK = new Park();
static TaskSchedule STOP = new Stop();
static TaskSchedule IMMEDIATE = new Immediate();
record DurationSchedule(@NotNull Duration duration) implements TaskSchedule {
}
record TickSchedule(int tick) implements TaskSchedule {
public TickSchedule {
if (tick <= 0)
throw new IllegalArgumentException("Tick must be greater than 0 (" + tick + ")");
}
}
record FutureSchedule(CompletableFuture<?> future) implements TaskSchedule {
}
record Park() implements TaskSchedule {
}
record Stop() implements TaskSchedule {
}
record Immediate() implements TaskSchedule {
}
}

View File

@ -1,21 +0,0 @@
package net.minestom.server.timer;
/**
* An enumeration that representing all available statuses for a {@link Task}
*/
public enum TaskStatus {
/**
* The task is execution and is currently running
*/
SCHEDULED,
/**
* The task was cancelled with {@link Task#cancel()}
*/
CANCELLED,
/**
* The task has been completed. This only applies to tasks without repetition
*/
FINISHED,
}

View File

@ -57,7 +57,7 @@ public class Main {
MinecraftServer.getBenchmarkManager().enable(Duration.of(10, TimeUnit.SECOND));
MinecraftServer.getSchedulerManager().buildShutdownTask(() -> System.out.println("Good night")).schedule();
MinecraftServer.getSchedulerManager().buildShutdownTask(() -> System.out.println("Good night"));
MinecraftServer.getGlobalEventHandler().addListener(ServerListPingEvent.class, event -> {
ResponseData responseData = event.getResponseData();

View File

@ -62,7 +62,7 @@ public class UnloadCallbacksExtension extends Extension {
// this callback will NOT be cancelled
MinecraftServer.getSchedulerManager().buildTask(() -> {
tickedScheduledTransient = true;
}).repeat(100L, TimeUnit.MILLISECOND).makeTransient().schedule();
}).repeat(100L, TimeUnit.MILLISECOND).schedule();
try {
Assertions.assertNotNull(MinestomRootClassLoader.findExtensionObjectOwner(callback));
@ -103,7 +103,7 @@ public class UnloadCallbacksExtension extends Extension {
}).delay(100L, TimeUnit.MILLISECOND).schedule();
// this shutdown tasks will not be executed because it is not transient
MinecraftServer.getSchedulerManager().buildShutdownTask(() -> Assertions.fail("This shutdown task should be unloaded when the extension is")).schedule();
MinecraftServer.getSchedulerManager().buildShutdownTask(() -> Assertions.fail("This shutdown task should be unloaded when the extension is"));
MinecraftServer.getSchedulerManager().buildTask(() -> {
// Make sure callbacks are disabled
@ -120,6 +120,6 @@ public class UnloadCallbacksExtension extends Extension {
e.printStackTrace();
}
MinecraftServer.stopCleanly(); // TODO: fix deadlock which happens because stopCleanly waits on completion of scheduler tasks
}).delay(1L, TimeUnit.SECOND).makeTransient().schedule();
}).delay(1L, TimeUnit.SECOND).schedule();
}
}

View File

@ -14,9 +14,9 @@ public class UnloadExtensionOnStop extends Extension {
MinecraftServer.getSchedulerManager().buildShutdownTask(() -> {
Assertions.assertTrue(terminated, "Extension should have been terminated on shutdown.");
System.out.println("All tests passed.");
}).makeTransient().schedule();
});
MinecraftServer.getSchedulerManager().buildTask(MinecraftServer::stopCleanly).makeTransient().delay(1L, TimeUnit.SECOND).schedule();
MinecraftServer.getSchedulerManager().buildTask(MinecraftServer::stopCleanly).delay(1L, TimeUnit.SECOND).schedule();
}
@Override

View File

@ -0,0 +1,112 @@
package timer;
import net.minestom.server.timer.ExecutionType;
import net.minestom.server.timer.Scheduler;
import net.minestom.server.timer.Task;
import net.minestom.server.timer.TaskSchedule;
import org.junit.jupiter.api.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.jupiter.api.Assertions.*;
public class TestScheduler {
@Test
public void tickTask() {
Scheduler scheduler = Scheduler.newScheduler();
AtomicBoolean result = new AtomicBoolean(false);
Task task = scheduler.scheduleNextTick(() -> result.set(true));
assertEquals(task.executionType(), ExecutionType.SYNC, "Tasks default execution type should be sync");
assertFalse(result.get(), "Tick task should not be executed after scheduling");
scheduler.process();
assertFalse(result.get(), "Tick task should not be executed after process");
scheduler.processTick();
assertTrue(result.get(), "Tick task must be executed after tick process");
assertFalse(task.isAlive(), "Tick task should be cancelled after execution");
}
@Test
public void durationTask() throws InterruptedException {
Scheduler scheduler = Scheduler.newScheduler();
AtomicBoolean result = new AtomicBoolean(false);
scheduler.buildTask(() -> result.set(true))
.delay(TaskSchedule.seconds(1))
.schedule();
Thread.sleep(100);
scheduler.process();
assertFalse(result.get(), "900ms remaining");
Thread.sleep(1200);
scheduler.process();
assertTrue(result.get(), "Tick task must be executed after 1 second");
}
@Test
public void cancelTask() {
Scheduler scheduler = Scheduler.newScheduler();
AtomicBoolean result = new AtomicBoolean(false);
var task = scheduler.buildTask(() -> result.set(true))
.schedule();
assertTrue(task.isAlive(), "Task should still be alive");
task.cancel();
assertFalse(task.isAlive(), "Task should not be alive anymore");
scheduler.process();
assertFalse(result.get(), "Task should be cancelled");
}
@Test
public void parkTask() {
Scheduler scheduler = Scheduler.newScheduler();
// Ignored parked task
scheduler.buildTask(() -> fail("This parked task should never be executed"))
.executionType(ExecutionType.SYNC)
.delay(TaskSchedule.park())
.schedule();
// Unpark task
AtomicBoolean result = new AtomicBoolean(false);
var task = scheduler.buildTask(() -> result.set(true))
.delay(TaskSchedule.park())
.schedule();
assertFalse(result.get(), "Task hasn't been unparked yet");
task.unpark();
assertFalse(result.get(), "Tasks must be processed first");
scheduler.process();
assertTrue(result.get(), "Parked task should be executed");
}
@Test
public void futureTask() {
Scheduler scheduler = Scheduler.newScheduler();
CompletableFuture<Void> future = new CompletableFuture<>();
AtomicBoolean result = new AtomicBoolean(false);
scheduler.buildTask(() -> result.set(true))
.delay(TaskSchedule.future(future))
.schedule();
assertFalse(result.get(), "Future is not completed yet");
future.complete(null);
assertFalse(result.get(), "Tasks must be processed first");
scheduler.process();
assertTrue(result.get(), "Future should be completed");
}
@Test
public void asyncTask() throws InterruptedException {
final Thread currentThread = Thread.currentThread();
Scheduler scheduler = Scheduler.newScheduler();
AtomicBoolean result = new AtomicBoolean(false);
scheduler.buildTask(() -> {
assertNotEquals(currentThread, Thread.currentThread(),
"Task should be executed in a different thread");
result.set(true);
})
.executionType(ExecutionType.ASYNC)
.schedule();
assertFalse(result.get(), "Async task should only be executed after process()");
scheduler.process();
Thread.sleep(250);
assertTrue(result.get(), "Async task didn't get executed");
}
}