From a9f6d9f02b0e90e0936a2c684e17cbbfd3264dd5 Mon Sep 17 00:00:00 2001 From: DeidaraMC <117625071+DeidaraMC@users.noreply.github.com> Date: Sat, 30 Mar 2024 23:51:08 -0400 Subject: [PATCH] feat: add TICK_END execution type to scheduler api (#2045) * feat: implement TICK_END scheduler ExecutionType * feat: add TICK_END scheduler ExecutionType * fix: call processTickEnd on entity/instances, SchedulerImpl code cleanup * deprecate ExecutionType#SYNC, replace with TICK_START * chore: update scheduler test * chore: scheduleEndOfTick cleanup --------- Co-authored-by: iam --- .../minestom/server/ServerProcessImpl.java | 2 + .../net/minestom/server/entity/Entity.java | 2 + .../minestom/server/instance/Instance.java | 2 + .../minestom/server/timer/ExecutionType.java | 17 ++++ .../net/minestom/server/timer/Scheduler.java | 19 ++++- .../minestom/server/timer/SchedulerImpl.java | 49 ++++++++---- .../server/timer/SchedulerManager.java | 5 ++ .../java/net/minestom/server/timer/Task.java | 2 +- .../minestom/server/timer/TestScheduler.java | 77 ++++++++++++++++++- 9 files changed, 156 insertions(+), 19 deletions(-) diff --git a/src/main/java/net/minestom/server/ServerProcessImpl.java b/src/main/java/net/minestom/server/ServerProcessImpl.java index 1b9f713ba..3bca56ef0 100644 --- a/src/main/java/net/minestom/server/ServerProcessImpl.java +++ b/src/main/java/net/minestom/server/ServerProcessImpl.java @@ -273,6 +273,8 @@ final class ServerProcessImpl implements ServerProcess { // Server tick (chunks/entities) serverTick(msTime); + scheduler().processTickEnd(); + // Flush all waiting packets PacketUtils.flush(); diff --git a/src/main/java/net/minestom/server/entity/Entity.java b/src/main/java/net/minestom/server/entity/Entity.java index ddaa6f36d..2cad94a1e 100644 --- a/src/main/java/net/minestom/server/entity/Entity.java +++ b/src/main/java/net/minestom/server/entity/Entity.java @@ -581,6 +581,8 @@ public class Entity implements Viewable, Tickable, Schedulable, Snapshotable, Ev if (vehicle == null && ticks >= nextSynchronizationTick) { synchronizePosition(); } + // End of tick scheduled tasks + this.scheduler.processTickEnd(); } @ApiStatus.Internal diff --git a/src/main/java/net/minestom/server/instance/Instance.java b/src/main/java/net/minestom/server/instance/Instance.java index b8cbd45dd..3cfe29d21 100644 --- a/src/main/java/net/minestom/server/instance/Instance.java +++ b/src/main/java/net/minestom/server/instance/Instance.java @@ -690,6 +690,8 @@ public abstract class Instance implements Block.Getter, Block.Setter, this.lastTickAge = time; } this.worldBorder.update(); + // End of tick scheduled tasks + this.scheduler.processTickEnd(); } /** diff --git a/src/main/java/net/minestom/server/timer/ExecutionType.java b/src/main/java/net/minestom/server/timer/ExecutionType.java index 0a1c560c0..5dfc17de1 100644 --- a/src/main/java/net/minestom/server/timer/ExecutionType.java +++ b/src/main/java/net/minestom/server/timer/ExecutionType.java @@ -1,6 +1,23 @@ package net.minestom.server.timer; public enum ExecutionType { + /** + * Schedule tasks to execute at the beginning of the {@link Schedulable} tick + */ + TICK_START, + /** + * Schedule tasks to execute at the end of the {@link Schedulable} tick + */ + TICK_END, + /** + * @deprecated use {@link ExecutionType#TICK_START} + * to be removed in 1.20.5 + */ + @Deprecated() SYNC, + /** + * to be removed in 1.20.5 + */ + @Deprecated() ASYNC } diff --git a/src/main/java/net/minestom/server/timer/Scheduler.java b/src/main/java/net/minestom/server/timer/Scheduler.java index d28e12466..2092b1bb8 100644 --- a/src/main/java/net/minestom/server/timer/Scheduler.java +++ b/src/main/java/net/minestom/server/timer/Scheduler.java @@ -30,6 +30,13 @@ public sealed interface Scheduler extends Executor permits SchedulerImpl, Schedu */ void processTick(); + /** + * Execute tasks set to run at the end of this tick. + *

+ * This method is not thread-safe. + */ + void processTickEnd(); + /** * Submits a new task with custom scheduling logic. *

@@ -44,7 +51,7 @@ public sealed interface Scheduler extends Executor permits SchedulerImpl, Schedu @NotNull Task submitTask(@NotNull Supplier task, @NotNull ExecutionType executionType); default @NotNull Task submitTask(@NotNull Supplier task) { - return submitTask(task, ExecutionType.SYNC); + return submitTask(task, ExecutionType.TICK_START); } default @NotNull Task.Builder buildTask(@NotNull Runnable task) { @@ -58,7 +65,7 @@ public sealed interface Scheduler extends Executor permits SchedulerImpl, Schedu } default @NotNull Task scheduleTask(@NotNull Runnable task, @NotNull TaskSchedule delay, @NotNull TaskSchedule repeat) { - return scheduleTask(task, delay, repeat, ExecutionType.SYNC); + return scheduleTask(task, delay, repeat, ExecutionType.TICK_START); } default @NotNull Task scheduleNextTick(@NotNull Runnable task, @NotNull ExecutionType executionType) { @@ -66,7 +73,11 @@ public sealed interface Scheduler extends Executor permits SchedulerImpl, Schedu } default @NotNull Task scheduleNextTick(@NotNull Runnable task) { - return scheduleNextTick(task, ExecutionType.SYNC); + return scheduleNextTick(task, ExecutionType.TICK_START); + } + + default @NotNull Task scheduleEndOfTick(@NotNull Runnable task) { + return scheduleNextProcess(task, ExecutionType.TICK_END); } default @NotNull Task scheduleNextProcess(@NotNull Runnable task, @NotNull ExecutionType executionType) { @@ -74,7 +85,7 @@ public sealed interface Scheduler extends Executor permits SchedulerImpl, Schedu } default @NotNull Task scheduleNextProcess(@NotNull Runnable task) { - return scheduleNextProcess(task, ExecutionType.SYNC); + return scheduleNextProcess(task, ExecutionType.TICK_START); } /** diff --git a/src/main/java/net/minestom/server/timer/SchedulerImpl.java b/src/main/java/net/minestom/server/timer/SchedulerImpl.java index 76b47a0d4..1064cb5c8 100644 --- a/src/main/java/net/minestom/server/timer/SchedulerImpl.java +++ b/src/main/java/net/minestom/server/timer/SchedulerImpl.java @@ -24,9 +24,11 @@ final class SchedulerImpl implements Scheduler { }); private static final ForkJoinPool EXECUTOR = ForkJoinPool.commonPool(); - private final MpscUnboundedArrayQueue taskQueue = new MpscUnboundedArrayQueue<>(64); - // Tasks scheduled on a certain tick - private final Int2ObjectAVLTreeMap> tickTaskQueue = new Int2ObjectAVLTreeMap<>(); + private final MpscUnboundedArrayQueue tasksToExecute = new MpscUnboundedArrayQueue<>(64); + private final MpscUnboundedArrayQueue tickEndTasksToExecute = new MpscUnboundedArrayQueue<>(64); + // Tasks scheduled on a certain tick/tick end + private final Int2ObjectAVLTreeMap> tickStartTaskQueue = new Int2ObjectAVLTreeMap<>(); + private final Int2ObjectAVLTreeMap> tickEndTaskQueue = new Int2ObjectAVLTreeMap<>(); private int tickState; @@ -41,20 +43,33 @@ final class SchedulerImpl implements Scheduler { } private void processTick(int tickDelta) { + processTickTasks(tickStartTaskQueue, tasksToExecute, tickDelta); + } + + @Override + public void processTickEnd() { + processTickTasks(tickEndTaskQueue, tickEndTasksToExecute, 0); + } + + private void processTickTasks(Int2ObjectAVLTreeMap> targetTaskQueue, MpscUnboundedArrayQueue targetTasksToExecute, int tickDelta) { synchronized (this) { this.tickState += tickDelta; int tickToProcess; - while (!tickTaskQueue.isEmpty() && (tickToProcess = tickTaskQueue.firstIntKey()) <= tickState) { - final List tickScheduledTasks = tickTaskQueue.remove(tickToProcess); - if (tickScheduledTasks != null) tickScheduledTasks.forEach(taskQueue::relaxedOffer); + while (!targetTaskQueue.isEmpty() && (tickToProcess = targetTaskQueue.firstIntKey()) <= tickState) { + final List tickScheduledTasks = targetTaskQueue.remove(tickToProcess); + if (tickScheduledTasks != null) tickScheduledTasks.forEach(targetTasksToExecute::relaxedOffer); } } + runTasks(targetTasksToExecute); + } + + private void runTasks(MpscUnboundedArrayQueue targetQueue) { // Run all tasks lock-free, either in the current thread or pool - if (!taskQueue.isEmpty()) { - this.taskQueue.drain(task -> { + if (!targetQueue.isEmpty()) { + targetQueue.drain(task -> { if (!task.isAlive()) return; switch (task.executionType()) { - case SYNC -> handleTask(task); + case TICK_START, TICK_END, SYNC -> handleTask(task); case ASYNC -> EXECUTOR.submit(() -> handleTask(task)); } }); @@ -72,14 +87,15 @@ final class SchedulerImpl implements Scheduler { void unparkTask(TaskImpl task) { if (task.tryUnpark()) - this.taskQueue.relaxedOffer(task); + this.tasksToExecute.relaxedOffer(task); } private void safeExecute(TaskImpl task) { // Prevent the task from being executed in the current thread // By either adding the task to the execution queue or submitting it to the pool switch (task.executionType()) { - case SYNC -> taskQueue.offer(task); + case TICK_START, SYNC -> tasksToExecute.offer(task); + case TICK_END -> tickEndTasksToExecute.offer(task); case ASYNC -> EXECUTOR.submit(() -> { if (!task.isAlive()) { return; @@ -104,7 +120,11 @@ final class SchedulerImpl implements Scheduler { } else if (schedule instanceof TaskScheduleImpl.TickSchedule tickSchedule) { synchronized (this) { final int target = tickState + tickSchedule.tick(); - this.tickTaskQueue.computeIfAbsent(target, i -> new ArrayList<>()).add(task); + var targetTaskQueue = switch (task.executionType()) { + case TICK_START, SYNC, ASYNC -> tickStartTaskQueue; + case TICK_END -> tickEndTaskQueue; + }; + targetTaskQueue.computeIfAbsent(target, i -> new ArrayList<>()).add(task); } } else if (schedule instanceof TaskScheduleImpl.FutureSchedule futureSchedule) { futureSchedule.future().thenRun(() -> safeExecute(task)); @@ -113,7 +133,10 @@ final class SchedulerImpl implements Scheduler { } else if (schedule instanceof TaskScheduleImpl.Stop) { task.cancel(); } else if (schedule instanceof TaskScheduleImpl.Immediate) { - this.taskQueue.relaxedOffer(task); + if (task.executionType() == ExecutionType.TICK_END) { + tickEndTasksToExecute.relaxedOffer(task); + } + else tasksToExecute.relaxedOffer(task); } } } diff --git a/src/main/java/net/minestom/server/timer/SchedulerManager.java b/src/main/java/net/minestom/server/timer/SchedulerManager.java index bd694fbe1..54dc4035c 100644 --- a/src/main/java/net/minestom/server/timer/SchedulerManager.java +++ b/src/main/java/net/minestom/server/timer/SchedulerManager.java @@ -19,6 +19,11 @@ public final class SchedulerManager implements Scheduler { this.scheduler.processTick(); } + @Override + public void processTickEnd() { + this.scheduler.processTickEnd(); + } + @Override public @NotNull Task submitTask(@NotNull Supplier task, @NotNull ExecutionType executionType) { diff --git a/src/main/java/net/minestom/server/timer/Task.java b/src/main/java/net/minestom/server/timer/Task.java index b2422c397..4ba61fbeb 100644 --- a/src/main/java/net/minestom/server/timer/Task.java +++ b/src/main/java/net/minestom/server/timer/Task.java @@ -27,7 +27,7 @@ public sealed interface Task permits TaskImpl { final class Builder { private final Scheduler scheduler; private final Runnable runnable; - private ExecutionType executionType = ExecutionType.SYNC; + private ExecutionType executionType = ExecutionType.TICK_START; private TaskSchedule delay = TaskSchedule.immediate(); private TaskSchedule repeat = TaskSchedule.stop(); diff --git a/src/test/java/net/minestom/server/timer/TestScheduler.java b/src/test/java/net/minestom/server/timer/TestScheduler.java index 079967dc1..c825ce57e 100644 --- a/src/test/java/net/minestom/server/timer/TestScheduler.java +++ b/src/test/java/net/minestom/server/timer/TestScheduler.java @@ -17,11 +17,13 @@ public class TestScheduler { Scheduler scheduler = Scheduler.newScheduler(); AtomicBoolean result = new AtomicBoolean(false); Task task = scheduler.scheduleNextTick(() -> result.set(true)); - assertEquals(task.executionType(), ExecutionType.SYNC, "Tasks default execution type should be sync"); + assertEquals(task.executionType(), ExecutionType.TICK_START, "Tasks default execution type should be tick start"); assertFalse(result.get(), "Tick task should not be executed after scheduling"); scheduler.process(); assertFalse(result.get(), "Tick task should not be executed after process"); + scheduler.processTickEnd(); + assertFalse(result.get(), "Tick task should not be executed after processTickEnd"); scheduler.processTick(); assertTrue(result.get(), "Tick task must be executed after tick process"); assertFalse(task.isAlive(), "Tick task should be cancelled after execution"); @@ -48,6 +50,8 @@ public class TestScheduler { AtomicBoolean result = new AtomicBoolean(false); scheduler.scheduleNextProcess(() -> result.set(true)); assertFalse(result.get()); + scheduler.processTickEnd(); + assertFalse(result.get(), "processTickEnd should never execute immediate tasks unless it is of type TICK_END"); scheduler.process(); assertTrue(result.get()); @@ -161,4 +165,75 @@ public class TestScheduler { assertDoesNotThrow(scheduler::processTick); assertEquals(100, executed.get()); } + + @Test + public void scheduleEndOfTick() { + Scheduler scheduler = Scheduler.newScheduler(); + AtomicBoolean result = new AtomicBoolean(false); + scheduler.scheduleEndOfTick(() -> result.set(true)); + assertFalse(result.get(), "End of tick tasks should not be executed immediately upon submission"); + scheduler.processTick(); + assertFalse(result.get(), "End of tick tasks should not be executed by processTick()"); + scheduler.processTickEnd(); + assertTrue(result.get(), "scheduleEndOfTick(...) tasks should be executed after the next call to processTickEnd()"); + + result.set(false); + scheduler.scheduleEndOfTick(() -> result.set(true)); + scheduler.processTickEnd(); + assertTrue(result.get(), "scheduleEndOfTick(...) tasks should always execute on the very next processTickEnd()"); + } + + @Test + public void delayedEndOfTick() { + Scheduler scheduler = Scheduler.newScheduler(); + AtomicBoolean result = new AtomicBoolean(false); + scheduler.buildTask(() -> result.set(true)).delay(TaskSchedule.tick(1)) + .executionType(ExecutionType.TICK_END).schedule(); + + scheduler.processTickEnd(); scheduler.processTickEnd(); + assertFalse(result.get(), "processTickEnd() should not increment the scheduler's internal tick counter"); + scheduler.processTick(); + scheduler.processTickEnd(); + assertTrue(result.get(), "processTick() should increment the current tick counter processTickEnd() uses"); + } + + @Test + public void repeatingEndOfTick() { + Scheduler scheduler = Scheduler.newScheduler(); + AtomicInteger result = new AtomicInteger(0); + Task task = scheduler.scheduleTask(result::getAndIncrement, TaskSchedule.immediate(), TaskSchedule.tick(1), ExecutionType.TICK_END); + assertEquals(0, result.get(), "TICK_END tasks should not be executed immediately upon submission"); + scheduler.processTickEnd(); + assertEquals(1, result.get(), "processTickEnd() should always execute TaskSchedule.immediate() TICK_END tasks"); + scheduler.processTickEnd(); + assertEquals(1, result.get(), "task should not executed on processTickEnd() again until processTick() is called"); + scheduler.processTick(); + assertEquals(1, result.get(), "processTick() should never execute TICK_END tasks"); + scheduler.processTickEnd(); + assertEquals(2, result.get(), "processTickEnd() should execute this task"); + + task.cancel(); + scheduler.processTick(); + scheduler.processTickEnd(); + assertEquals(2, result.get(), "this task should have been cancelled"); + } + + @Test + public void durationEndOfTick() throws InterruptedException { + Scheduler scheduler = Scheduler.newScheduler(); + AtomicBoolean result = new AtomicBoolean(false); + scheduler.buildTask(() -> result.set(true)) + .delay(TaskSchedule.seconds(1)) + .executionType(ExecutionType.TICK_END) + .schedule(); + Thread.sleep(100); + scheduler.process(); + scheduler.processTickEnd(); + assertFalse(result.get(), "900ms remaining"); + Thread.sleep(1200); + scheduler.process(); + assertFalse(result.get(), "process() should never execute TICK_END tasks"); + scheduler.processTickEnd(); + assertTrue(result.get(), "Tick end task must be executed after 1 second"); + } }