mirror of https://github.com/Minestom/Minestom.git
feat: add TICK_END execution type to scheduler api (#2045)
* feat: implement TICK_END scheduler ExecutionType * feat: add TICK_END scheduler ExecutionType * fix: call processTickEnd on entity/instances, SchedulerImpl code cleanup * deprecate ExecutionType#SYNC, replace with TICK_START * chore: update scheduler test * chore: scheduleEndOfTick cleanup --------- Co-authored-by: iam <iam4722202468@users.noreply.github.com>
This commit is contained in:
parent
154059468e
commit
a9f6d9f02b
|
@ -273,6 +273,8 @@ final class ServerProcessImpl implements ServerProcess {
|
||||||
// Server tick (chunks/entities)
|
// Server tick (chunks/entities)
|
||||||
serverTick(msTime);
|
serverTick(msTime);
|
||||||
|
|
||||||
|
scheduler().processTickEnd();
|
||||||
|
|
||||||
// Flush all waiting packets
|
// Flush all waiting packets
|
||||||
PacketUtils.flush();
|
PacketUtils.flush();
|
||||||
|
|
||||||
|
|
|
@ -581,6 +581,8 @@ public class Entity implements Viewable, Tickable, Schedulable, Snapshotable, Ev
|
||||||
if (vehicle == null && ticks >= nextSynchronizationTick) {
|
if (vehicle == null && ticks >= nextSynchronizationTick) {
|
||||||
synchronizePosition();
|
synchronizePosition();
|
||||||
}
|
}
|
||||||
|
// End of tick scheduled tasks
|
||||||
|
this.scheduler.processTickEnd();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ApiStatus.Internal
|
@ApiStatus.Internal
|
||||||
|
|
|
@ -690,6 +690,8 @@ public abstract class Instance implements Block.Getter, Block.Setter,
|
||||||
this.lastTickAge = time;
|
this.lastTickAge = time;
|
||||||
}
|
}
|
||||||
this.worldBorder.update();
|
this.worldBorder.update();
|
||||||
|
// End of tick scheduled tasks
|
||||||
|
this.scheduler.processTickEnd();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1,6 +1,23 @@
|
||||||
package net.minestom.server.timer;
|
package net.minestom.server.timer;
|
||||||
|
|
||||||
public enum ExecutionType {
|
public enum ExecutionType {
|
||||||
|
/**
|
||||||
|
* Schedule tasks to execute at the beginning of the {@link Schedulable} tick
|
||||||
|
*/
|
||||||
|
TICK_START,
|
||||||
|
/**
|
||||||
|
* Schedule tasks to execute at the end of the {@link Schedulable} tick
|
||||||
|
*/
|
||||||
|
TICK_END,
|
||||||
|
/**
|
||||||
|
* @deprecated use {@link ExecutionType#TICK_START}
|
||||||
|
* to be removed in 1.20.5
|
||||||
|
*/
|
||||||
|
@Deprecated()
|
||||||
SYNC,
|
SYNC,
|
||||||
|
/**
|
||||||
|
* to be removed in 1.20.5
|
||||||
|
*/
|
||||||
|
@Deprecated()
|
||||||
ASYNC
|
ASYNC
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,13 @@ public sealed interface Scheduler extends Executor permits SchedulerImpl, Schedu
|
||||||
*/
|
*/
|
||||||
void processTick();
|
void processTick();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute tasks set to run at the end of this tick.
|
||||||
|
* <p>
|
||||||
|
* This method is not thread-safe.
|
||||||
|
*/
|
||||||
|
void processTickEnd();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Submits a new task with custom scheduling logic.
|
* Submits a new task with custom scheduling logic.
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -44,7 +51,7 @@ public sealed interface Scheduler extends Executor permits SchedulerImpl, Schedu
|
||||||
@NotNull Task submitTask(@NotNull Supplier<TaskSchedule> task, @NotNull ExecutionType executionType);
|
@NotNull Task submitTask(@NotNull Supplier<TaskSchedule> task, @NotNull ExecutionType executionType);
|
||||||
|
|
||||||
default @NotNull Task submitTask(@NotNull Supplier<TaskSchedule> task) {
|
default @NotNull Task submitTask(@NotNull Supplier<TaskSchedule> task) {
|
||||||
return submitTask(task, ExecutionType.SYNC);
|
return submitTask(task, ExecutionType.TICK_START);
|
||||||
}
|
}
|
||||||
|
|
||||||
default @NotNull Task.Builder buildTask(@NotNull Runnable task) {
|
default @NotNull Task.Builder buildTask(@NotNull Runnable task) {
|
||||||
|
@ -58,7 +65,7 @@ public sealed interface Scheduler extends Executor permits SchedulerImpl, Schedu
|
||||||
}
|
}
|
||||||
|
|
||||||
default @NotNull Task scheduleTask(@NotNull Runnable task, @NotNull TaskSchedule delay, @NotNull TaskSchedule repeat) {
|
default @NotNull Task scheduleTask(@NotNull Runnable task, @NotNull TaskSchedule delay, @NotNull TaskSchedule repeat) {
|
||||||
return scheduleTask(task, delay, repeat, ExecutionType.SYNC);
|
return scheduleTask(task, delay, repeat, ExecutionType.TICK_START);
|
||||||
}
|
}
|
||||||
|
|
||||||
default @NotNull Task scheduleNextTick(@NotNull Runnable task, @NotNull ExecutionType executionType) {
|
default @NotNull Task scheduleNextTick(@NotNull Runnable task, @NotNull ExecutionType executionType) {
|
||||||
|
@ -66,7 +73,11 @@ public sealed interface Scheduler extends Executor permits SchedulerImpl, Schedu
|
||||||
}
|
}
|
||||||
|
|
||||||
default @NotNull Task scheduleNextTick(@NotNull Runnable task) {
|
default @NotNull Task scheduleNextTick(@NotNull Runnable task) {
|
||||||
return scheduleNextTick(task, ExecutionType.SYNC);
|
return scheduleNextTick(task, ExecutionType.TICK_START);
|
||||||
|
}
|
||||||
|
|
||||||
|
default @NotNull Task scheduleEndOfTick(@NotNull Runnable task) {
|
||||||
|
return scheduleNextProcess(task, ExecutionType.TICK_END);
|
||||||
}
|
}
|
||||||
|
|
||||||
default @NotNull Task scheduleNextProcess(@NotNull Runnable task, @NotNull ExecutionType executionType) {
|
default @NotNull Task scheduleNextProcess(@NotNull Runnable task, @NotNull ExecutionType executionType) {
|
||||||
|
@ -74,7 +85,7 @@ public sealed interface Scheduler extends Executor permits SchedulerImpl, Schedu
|
||||||
}
|
}
|
||||||
|
|
||||||
default @NotNull Task scheduleNextProcess(@NotNull Runnable task) {
|
default @NotNull Task scheduleNextProcess(@NotNull Runnable task) {
|
||||||
return scheduleNextProcess(task, ExecutionType.SYNC);
|
return scheduleNextProcess(task, ExecutionType.TICK_START);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -24,9 +24,11 @@ final class SchedulerImpl implements Scheduler {
|
||||||
});
|
});
|
||||||
private static final ForkJoinPool EXECUTOR = ForkJoinPool.commonPool();
|
private static final ForkJoinPool EXECUTOR = ForkJoinPool.commonPool();
|
||||||
|
|
||||||
private final MpscUnboundedArrayQueue<TaskImpl> taskQueue = new MpscUnboundedArrayQueue<>(64);
|
private final MpscUnboundedArrayQueue<TaskImpl> tasksToExecute = new MpscUnboundedArrayQueue<>(64);
|
||||||
// Tasks scheduled on a certain tick
|
private final MpscUnboundedArrayQueue<TaskImpl> tickEndTasksToExecute = new MpscUnboundedArrayQueue<>(64);
|
||||||
private final Int2ObjectAVLTreeMap<List<TaskImpl>> tickTaskQueue = new Int2ObjectAVLTreeMap<>();
|
// Tasks scheduled on a certain tick/tick end
|
||||||
|
private final Int2ObjectAVLTreeMap<List<TaskImpl>> tickStartTaskQueue = new Int2ObjectAVLTreeMap<>();
|
||||||
|
private final Int2ObjectAVLTreeMap<List<TaskImpl>> tickEndTaskQueue = new Int2ObjectAVLTreeMap<>();
|
||||||
|
|
||||||
private int tickState;
|
private int tickState;
|
||||||
|
|
||||||
|
@ -41,20 +43,33 @@ final class SchedulerImpl implements Scheduler {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processTick(int tickDelta) {
|
private void processTick(int tickDelta) {
|
||||||
|
processTickTasks(tickStartTaskQueue, tasksToExecute, tickDelta);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void processTickEnd() {
|
||||||
|
processTickTasks(tickEndTaskQueue, tickEndTasksToExecute, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void processTickTasks(Int2ObjectAVLTreeMap<List<TaskImpl>> targetTaskQueue, MpscUnboundedArrayQueue<TaskImpl> targetTasksToExecute, int tickDelta) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
this.tickState += tickDelta;
|
this.tickState += tickDelta;
|
||||||
int tickToProcess;
|
int tickToProcess;
|
||||||
while (!tickTaskQueue.isEmpty() && (tickToProcess = tickTaskQueue.firstIntKey()) <= tickState) {
|
while (!targetTaskQueue.isEmpty() && (tickToProcess = targetTaskQueue.firstIntKey()) <= tickState) {
|
||||||
final List<TaskImpl> tickScheduledTasks = tickTaskQueue.remove(tickToProcess);
|
final List<TaskImpl> tickScheduledTasks = targetTaskQueue.remove(tickToProcess);
|
||||||
if (tickScheduledTasks != null) tickScheduledTasks.forEach(taskQueue::relaxedOffer);
|
if (tickScheduledTasks != null) tickScheduledTasks.forEach(targetTasksToExecute::relaxedOffer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
runTasks(targetTasksToExecute);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runTasks(MpscUnboundedArrayQueue<TaskImpl> targetQueue) {
|
||||||
// Run all tasks lock-free, either in the current thread or pool
|
// Run all tasks lock-free, either in the current thread or pool
|
||||||
if (!taskQueue.isEmpty()) {
|
if (!targetQueue.isEmpty()) {
|
||||||
this.taskQueue.drain(task -> {
|
targetQueue.drain(task -> {
|
||||||
if (!task.isAlive()) return;
|
if (!task.isAlive()) return;
|
||||||
switch (task.executionType()) {
|
switch (task.executionType()) {
|
||||||
case SYNC -> handleTask(task);
|
case TICK_START, TICK_END, SYNC -> handleTask(task);
|
||||||
case ASYNC -> EXECUTOR.submit(() -> handleTask(task));
|
case ASYNC -> EXECUTOR.submit(() -> handleTask(task));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -72,14 +87,15 @@ final class SchedulerImpl implements Scheduler {
|
||||||
|
|
||||||
void unparkTask(TaskImpl task) {
|
void unparkTask(TaskImpl task) {
|
||||||
if (task.tryUnpark())
|
if (task.tryUnpark())
|
||||||
this.taskQueue.relaxedOffer(task);
|
this.tasksToExecute.relaxedOffer(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void safeExecute(TaskImpl task) {
|
private void safeExecute(TaskImpl task) {
|
||||||
// Prevent the task from being executed in the current thread
|
// Prevent the task from being executed in the current thread
|
||||||
// By either adding the task to the execution queue or submitting it to the pool
|
// By either adding the task to the execution queue or submitting it to the pool
|
||||||
switch (task.executionType()) {
|
switch (task.executionType()) {
|
||||||
case SYNC -> taskQueue.offer(task);
|
case TICK_START, SYNC -> tasksToExecute.offer(task);
|
||||||
|
case TICK_END -> tickEndTasksToExecute.offer(task);
|
||||||
case ASYNC -> EXECUTOR.submit(() -> {
|
case ASYNC -> EXECUTOR.submit(() -> {
|
||||||
if (!task.isAlive()) {
|
if (!task.isAlive()) {
|
||||||
return;
|
return;
|
||||||
|
@ -104,7 +120,11 @@ final class SchedulerImpl implements Scheduler {
|
||||||
} else if (schedule instanceof TaskScheduleImpl.TickSchedule tickSchedule) {
|
} else if (schedule instanceof TaskScheduleImpl.TickSchedule tickSchedule) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
final int target = tickState + tickSchedule.tick();
|
final int target = tickState + tickSchedule.tick();
|
||||||
this.tickTaskQueue.computeIfAbsent(target, i -> new ArrayList<>()).add(task);
|
var targetTaskQueue = switch (task.executionType()) {
|
||||||
|
case TICK_START, SYNC, ASYNC -> tickStartTaskQueue;
|
||||||
|
case TICK_END -> tickEndTaskQueue;
|
||||||
|
};
|
||||||
|
targetTaskQueue.computeIfAbsent(target, i -> new ArrayList<>()).add(task);
|
||||||
}
|
}
|
||||||
} else if (schedule instanceof TaskScheduleImpl.FutureSchedule futureSchedule) {
|
} else if (schedule instanceof TaskScheduleImpl.FutureSchedule futureSchedule) {
|
||||||
futureSchedule.future().thenRun(() -> safeExecute(task));
|
futureSchedule.future().thenRun(() -> safeExecute(task));
|
||||||
|
@ -113,7 +133,10 @@ final class SchedulerImpl implements Scheduler {
|
||||||
} else if (schedule instanceof TaskScheduleImpl.Stop) {
|
} else if (schedule instanceof TaskScheduleImpl.Stop) {
|
||||||
task.cancel();
|
task.cancel();
|
||||||
} else if (schedule instanceof TaskScheduleImpl.Immediate) {
|
} else if (schedule instanceof TaskScheduleImpl.Immediate) {
|
||||||
this.taskQueue.relaxedOffer(task);
|
if (task.executionType() == ExecutionType.TICK_END) {
|
||||||
|
tickEndTasksToExecute.relaxedOffer(task);
|
||||||
|
}
|
||||||
|
else tasksToExecute.relaxedOffer(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,11 @@ public final class SchedulerManager implements Scheduler {
|
||||||
this.scheduler.processTick();
|
this.scheduler.processTick();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void processTickEnd() {
|
||||||
|
this.scheduler.processTickEnd();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public @NotNull Task submitTask(@NotNull Supplier<TaskSchedule> task,
|
public @NotNull Task submitTask(@NotNull Supplier<TaskSchedule> task,
|
||||||
@NotNull ExecutionType executionType) {
|
@NotNull ExecutionType executionType) {
|
||||||
|
|
|
@ -27,7 +27,7 @@ public sealed interface Task permits TaskImpl {
|
||||||
final class Builder {
|
final class Builder {
|
||||||
private final Scheduler scheduler;
|
private final Scheduler scheduler;
|
||||||
private final Runnable runnable;
|
private final Runnable runnable;
|
||||||
private ExecutionType executionType = ExecutionType.SYNC;
|
private ExecutionType executionType = ExecutionType.TICK_START;
|
||||||
private TaskSchedule delay = TaskSchedule.immediate();
|
private TaskSchedule delay = TaskSchedule.immediate();
|
||||||
private TaskSchedule repeat = TaskSchedule.stop();
|
private TaskSchedule repeat = TaskSchedule.stop();
|
||||||
|
|
||||||
|
|
|
@ -17,11 +17,13 @@ public class TestScheduler {
|
||||||
Scheduler scheduler = Scheduler.newScheduler();
|
Scheduler scheduler = Scheduler.newScheduler();
|
||||||
AtomicBoolean result = new AtomicBoolean(false);
|
AtomicBoolean result = new AtomicBoolean(false);
|
||||||
Task task = scheduler.scheduleNextTick(() -> result.set(true));
|
Task task = scheduler.scheduleNextTick(() -> result.set(true));
|
||||||
assertEquals(task.executionType(), ExecutionType.SYNC, "Tasks default execution type should be sync");
|
assertEquals(task.executionType(), ExecutionType.TICK_START, "Tasks default execution type should be tick start");
|
||||||
|
|
||||||
assertFalse(result.get(), "Tick task should not be executed after scheduling");
|
assertFalse(result.get(), "Tick task should not be executed after scheduling");
|
||||||
scheduler.process();
|
scheduler.process();
|
||||||
assertFalse(result.get(), "Tick task should not be executed after process");
|
assertFalse(result.get(), "Tick task should not be executed after process");
|
||||||
|
scheduler.processTickEnd();
|
||||||
|
assertFalse(result.get(), "Tick task should not be executed after processTickEnd");
|
||||||
scheduler.processTick();
|
scheduler.processTick();
|
||||||
assertTrue(result.get(), "Tick task must be executed after tick process");
|
assertTrue(result.get(), "Tick task must be executed after tick process");
|
||||||
assertFalse(task.isAlive(), "Tick task should be cancelled after execution");
|
assertFalse(task.isAlive(), "Tick task should be cancelled after execution");
|
||||||
|
@ -48,6 +50,8 @@ public class TestScheduler {
|
||||||
AtomicBoolean result = new AtomicBoolean(false);
|
AtomicBoolean result = new AtomicBoolean(false);
|
||||||
scheduler.scheduleNextProcess(() -> result.set(true));
|
scheduler.scheduleNextProcess(() -> result.set(true));
|
||||||
assertFalse(result.get());
|
assertFalse(result.get());
|
||||||
|
scheduler.processTickEnd();
|
||||||
|
assertFalse(result.get(), "processTickEnd should never execute immediate tasks unless it is of type TICK_END");
|
||||||
scheduler.process();
|
scheduler.process();
|
||||||
assertTrue(result.get());
|
assertTrue(result.get());
|
||||||
|
|
||||||
|
@ -161,4 +165,75 @@ public class TestScheduler {
|
||||||
assertDoesNotThrow(scheduler::processTick);
|
assertDoesNotThrow(scheduler::processTick);
|
||||||
assertEquals(100, executed.get());
|
assertEquals(100, executed.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void scheduleEndOfTick() {
|
||||||
|
Scheduler scheduler = Scheduler.newScheduler();
|
||||||
|
AtomicBoolean result = new AtomicBoolean(false);
|
||||||
|
scheduler.scheduleEndOfTick(() -> result.set(true));
|
||||||
|
assertFalse(result.get(), "End of tick tasks should not be executed immediately upon submission");
|
||||||
|
scheduler.processTick();
|
||||||
|
assertFalse(result.get(), "End of tick tasks should not be executed by processTick()");
|
||||||
|
scheduler.processTickEnd();
|
||||||
|
assertTrue(result.get(), "scheduleEndOfTick(...) tasks should be executed after the next call to processTickEnd()");
|
||||||
|
|
||||||
|
result.set(false);
|
||||||
|
scheduler.scheduleEndOfTick(() -> result.set(true));
|
||||||
|
scheduler.processTickEnd();
|
||||||
|
assertTrue(result.get(), "scheduleEndOfTick(...) tasks should always execute on the very next processTickEnd()");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void delayedEndOfTick() {
|
||||||
|
Scheduler scheduler = Scheduler.newScheduler();
|
||||||
|
AtomicBoolean result = new AtomicBoolean(false);
|
||||||
|
scheduler.buildTask(() -> result.set(true)).delay(TaskSchedule.tick(1))
|
||||||
|
.executionType(ExecutionType.TICK_END).schedule();
|
||||||
|
|
||||||
|
scheduler.processTickEnd(); scheduler.processTickEnd();
|
||||||
|
assertFalse(result.get(), "processTickEnd() should not increment the scheduler's internal tick counter");
|
||||||
|
scheduler.processTick();
|
||||||
|
scheduler.processTickEnd();
|
||||||
|
assertTrue(result.get(), "processTick() should increment the current tick counter processTickEnd() uses");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void repeatingEndOfTick() {
|
||||||
|
Scheduler scheduler = Scheduler.newScheduler();
|
||||||
|
AtomicInteger result = new AtomicInteger(0);
|
||||||
|
Task task = scheduler.scheduleTask(result::getAndIncrement, TaskSchedule.immediate(), TaskSchedule.tick(1), ExecutionType.TICK_END);
|
||||||
|
assertEquals(0, result.get(), "TICK_END tasks should not be executed immediately upon submission");
|
||||||
|
scheduler.processTickEnd();
|
||||||
|
assertEquals(1, result.get(), "processTickEnd() should always execute TaskSchedule.immediate() TICK_END tasks");
|
||||||
|
scheduler.processTickEnd();
|
||||||
|
assertEquals(1, result.get(), "task should not executed on processTickEnd() again until processTick() is called");
|
||||||
|
scheduler.processTick();
|
||||||
|
assertEquals(1, result.get(), "processTick() should never execute TICK_END tasks");
|
||||||
|
scheduler.processTickEnd();
|
||||||
|
assertEquals(2, result.get(), "processTickEnd() should execute this task");
|
||||||
|
|
||||||
|
task.cancel();
|
||||||
|
scheduler.processTick();
|
||||||
|
scheduler.processTickEnd();
|
||||||
|
assertEquals(2, result.get(), "this task should have been cancelled");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void durationEndOfTick() throws InterruptedException {
|
||||||
|
Scheduler scheduler = Scheduler.newScheduler();
|
||||||
|
AtomicBoolean result = new AtomicBoolean(false);
|
||||||
|
scheduler.buildTask(() -> result.set(true))
|
||||||
|
.delay(TaskSchedule.seconds(1))
|
||||||
|
.executionType(ExecutionType.TICK_END)
|
||||||
|
.schedule();
|
||||||
|
Thread.sleep(100);
|
||||||
|
scheduler.process();
|
||||||
|
scheduler.processTickEnd();
|
||||||
|
assertFalse(result.get(), "900ms remaining");
|
||||||
|
Thread.sleep(1200);
|
||||||
|
scheduler.process();
|
||||||
|
assertFalse(result.get(), "process() should never execute TICK_END tasks");
|
||||||
|
scheduler.processTickEnd();
|
||||||
|
assertTrue(result.get(), "Tick end task must be executed after 1 second");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue