From da69526f49f7878049c9bc8cc8a8868e6c6f8d2e Mon Sep 17 00:00:00 2001 From: TheMode Date: Sat, 1 Jan 2022 16:04:20 +0100 Subject: [PATCH] Dispatcher testing (#570) --- .../net/minestom/server/UpdateManager.java | 12 +- .../server/acquirable/Acquirable.java | 16 +- .../net/minestom/server/entity/Entity.java | 5 +- .../server/thread/DispatchUpdate.java | 23 -- .../server/thread/ThreadDispatcher.java | 268 +++++++++--------- .../server/thread/ThreadProvider.java | 51 ++-- .../minestom/server/thread/TickThread.java | 38 ++- .../java/thread/ThreadDispatcherTest.java | 176 ++++++++++++ 8 files changed, 368 insertions(+), 221 deletions(-) delete mode 100644 src/main/java/net/minestom/server/thread/DispatchUpdate.java create mode 100644 src/test/java/thread/ThreadDispatcherTest.java diff --git a/src/main/java/net/minestom/server/UpdateManager.java b/src/main/java/net/minestom/server/UpdateManager.java index 728e1dd03..0e0189577 100644 --- a/src/main/java/net/minestom/server/UpdateManager.java +++ b/src/main/java/net/minestom/server/UpdateManager.java @@ -7,7 +7,6 @@ import net.minestom.server.instance.InstanceManager; import net.minestom.server.monitoring.TickMonitor; import net.minestom.server.network.ConnectionManager; import net.minestom.server.network.socket.Worker; -import net.minestom.server.thread.DispatchUpdate; import net.minestom.server.thread.MinestomThread; import net.minestom.server.thread.ThreadDispatcher; import net.minestom.server.timer.SchedulerManager; @@ -31,7 +30,7 @@ public final class UpdateManager { private volatile boolean stopRequested; // TODO make configurable - private final ThreadDispatcher threadDispatcher = ThreadDispatcher.singleThread(); + private final ThreadDispatcher threadDispatcher = ThreadDispatcher.singleThread(); private final Queue tickStartCallbacks = new ConcurrentLinkedQueue<>(); private final Queue tickEndCallbacks = new ConcurrentLinkedQueue<>(); @@ -55,7 +54,7 @@ public final class UpdateManager { * * @return the current thread provider */ - public @NotNull ThreadDispatcher getThreadProvider() { + public @NotNull ThreadDispatcher getThreadProvider() { return threadDispatcher; } @@ -89,7 +88,7 @@ public final class UpdateManager { * @param chunk the loaded chunk */ public void signalChunkLoad(@NotNull Chunk chunk) { - this.threadDispatcher.signalUpdate(new DispatchUpdate.ChunkLoad(chunk)); + this.threadDispatcher.createPartition(chunk); } /** @@ -100,7 +99,7 @@ public final class UpdateManager { * @param chunk the unloaded chunk */ public void signalChunkUnload(@NotNull Chunk chunk) { - this.threadDispatcher.signalUpdate(new DispatchUpdate.ChunkUnload(chunk)); + this.threadDispatcher.deletePartition(chunk); } /** @@ -237,8 +236,7 @@ public final class UpdateManager { this.threadDispatcher.updateAndAwait(tickStart); // Clear removed entities & update threads - final long tickTime = System.currentTimeMillis() - tickStart; - this.threadDispatcher.refreshThreads(tickTime); + this.threadDispatcher.refreshThreads(); } private void doTickCallback(Queue callbacks, long value) { diff --git a/src/main/java/net/minestom/server/acquirable/Acquirable.java b/src/main/java/net/minestom/server/acquirable/Acquirable.java index d3c8ceac0..d68c3cc31 100644 --- a/src/main/java/net/minestom/server/acquirable/Acquirable.java +++ b/src/main/java/net/minestom/server/acquirable/Acquirable.java @@ -27,7 +27,9 @@ public interface Acquirable { final Thread currentThread = Thread.currentThread(); if (currentThread instanceof TickThread) { return ((TickThread) currentThread).entries().stream() - .flatMap(chunkEntry -> chunkEntry.entities().stream()); + .flatMap(partitionEntry -> partitionEntry.elements().stream()) + .filter(tickable -> tickable instanceof Entity) + .map(tickable -> (Entity) tickable); } return Stream.empty(); } @@ -149,19 +151,19 @@ public interface Acquirable { @NotNull Handler getHandler(); final class Handler { - private volatile ThreadDispatcher.ChunkEntry chunkEntry; + private volatile ThreadDispatcher.Partition partition; - public ThreadDispatcher.ChunkEntry getChunkEntry() { - return chunkEntry; + public ThreadDispatcher.Partition getChunkEntry() { + return partition; } @ApiStatus.Internal - public void refreshChunkEntry(@NotNull ThreadDispatcher.ChunkEntry chunkEntry) { - this.chunkEntry = chunkEntry; + public void refreshChunkEntry(@NotNull ThreadDispatcher.Partition partition) { + this.partition = partition; } public TickThread getTickThread() { - final ThreadDispatcher.ChunkEntry entry = this.chunkEntry; + final ThreadDispatcher.Partition entry = this.partition; return entry != null ? entry.thread() : null; } } diff --git a/src/main/java/net/minestom/server/entity/Entity.java b/src/main/java/net/minestom/server/entity/Entity.java index 30806e3ce..8ccbd70f3 100644 --- a/src/main/java/net/minestom/server/entity/Entity.java +++ b/src/main/java/net/minestom/server/entity/Entity.java @@ -37,7 +37,6 @@ import net.minestom.server.potion.PotionEffect; import net.minestom.server.potion.TimedPotion; import net.minestom.server.tag.Tag; import net.minestom.server.tag.TagHandler; -import net.minestom.server.thread.DispatchUpdate; import net.minestom.server.timer.Schedulable; import net.minestom.server.timer.Scheduler; import net.minestom.server.timer.TaskSchedule; @@ -785,7 +784,7 @@ public class Entity implements Viewable, Tickable, Schedulable, TagHandler, Perm @ApiStatus.Internal protected void refreshCurrentChunk(Chunk currentChunk) { this.currentChunk = currentChunk; - MinecraftServer.getUpdateManager().getThreadProvider().signalUpdate(new DispatchUpdate.EntityUpdate(this)); + MinecraftServer.getUpdateManager().getThreadProvider().updateElement(this, currentChunk); } /** @@ -1424,7 +1423,7 @@ public class Entity implements Viewable, Tickable, Schedulable, TagHandler, Perm if (!passengers.isEmpty()) passengers.forEach(this::removePassenger); final Entity vehicle = this.vehicle; if (vehicle != null) vehicle.removePassenger(this); - MinecraftServer.getUpdateManager().getThreadProvider().signalUpdate(new DispatchUpdate.EntityRemove(this)); + MinecraftServer.getUpdateManager().getThreadProvider().removeElement(this); this.removed = true; Entity.ENTITY_BY_ID.remove(id); Entity.ENTITY_BY_UUID.remove(uuid); diff --git a/src/main/java/net/minestom/server/thread/DispatchUpdate.java b/src/main/java/net/minestom/server/thread/DispatchUpdate.java deleted file mode 100644 index 89cd3b0f6..000000000 --- a/src/main/java/net/minestom/server/thread/DispatchUpdate.java +++ /dev/null @@ -1,23 +0,0 @@ -package net.minestom.server.thread; - -import net.minestom.server.entity.Entity; -import net.minestom.server.instance.Chunk; -import org.jetbrains.annotations.ApiStatus; -import org.jetbrains.annotations.NotNull; - -@ApiStatus.Internal -public sealed interface DispatchUpdate permits - DispatchUpdate.ChunkLoad, DispatchUpdate.ChunkUnload, - DispatchUpdate.EntityUpdate, DispatchUpdate.EntityRemove { - record ChunkLoad(@NotNull Chunk chunk) implements DispatchUpdate { - } - - record ChunkUnload(@NotNull Chunk chunk) implements DispatchUpdate { - } - - record EntityUpdate(@NotNull Entity entity) implements DispatchUpdate { - } - - record EntityRemove(@NotNull Entity entity) implements DispatchUpdate { - } -} diff --git a/src/main/java/net/minestom/server/thread/ThreadDispatcher.java b/src/main/java/net/minestom/server/thread/ThreadDispatcher.java index 76618d13b..758bb1cdd 100644 --- a/src/main/java/net/minestom/server/thread/ThreadDispatcher.java +++ b/src/main/java/net/minestom/server/thread/ThreadDispatcher.java @@ -1,12 +1,12 @@ package net.minestom.server.thread; -import net.minestom.server.MinecraftServer; -import net.minestom.server.entity.Entity; -import net.minestom.server.instance.Chunk; -import net.minestom.server.utils.MathUtils; +import net.minestom.server.Tickable; +import net.minestom.server.acquirable.Acquirable; import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.MpscUnboundedArrayQueue; +import org.jetbrains.annotations.ApiStatus; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Unmodifiable; import java.util.*; import java.util.concurrent.Phaser; @@ -15,21 +15,23 @@ import java.util.concurrent.Phaser; * Used to link chunks into multiple groups. * Then executed into a thread pool. */ -public final class ThreadDispatcher { - private final ThreadProvider provider; +public final class ThreadDispatcher

{ + private final ThreadProvider

provider; private final List threads; - // Chunk -> ChunkEntry mapping - private final Map chunkEntryMap = new HashMap<>(); + // Partition -> dispatching context + // Defines how computation is dispatched to the threads + private final Map partitions = new WeakHashMap<>(); + // Cache to retrieve the threading context from a tickable element + private final Map elements = new WeakHashMap<>(); // Queue to update chunks linked thread - private final ArrayDeque chunkUpdateQueue = new ArrayDeque<>(); + private final ArrayDeque

partitionUpdateQueue = new ArrayDeque<>(); // Requests consumed at the end of each tick - private final MessagePassingQueue updates = new MpscUnboundedArrayQueue<>(1024); - + private final MessagePassingQueue> updates = new MpscUnboundedArrayQueue<>(1024); private final Phaser phaser = new Phaser(1); - private ThreadDispatcher(ThreadProvider provider, int threadCount) { + private ThreadDispatcher(ThreadProvider

provider, int threadCount) { this.provider = provider; TickThread[] threads = new TickThread[threadCount]; Arrays.setAll(threads, i -> new TickThread(phaser, i)); @@ -37,41 +39,17 @@ public final class ThreadDispatcher { this.threads.forEach(Thread::start); } - public static @NotNull ThreadDispatcher of(@NotNull ThreadProvider provider, int threadCount) { - return new ThreadDispatcher(provider, threadCount); + public static

@NotNull ThreadDispatcher

of(@NotNull ThreadProvider

provider, int threadCount) { + return new ThreadDispatcher<>(provider, threadCount); } - public static @NotNull ThreadDispatcher singleThread() { - return of(ThreadProvider.SINGLE, 1); + public static

@NotNull ThreadDispatcher

singleThread() { + return of(ThreadProvider.counter(), 1); } - /** - * Represents the maximum percentage of tick time that can be spent refreshing chunks thread. - *

- * Percentage based on {@link MinecraftServer#TICK_MS}. - * - * @return the refresh percentage - */ - public float getRefreshPercentage() { - return 0.3f; - } - - /** - * Minimum time used to refresh chunks and entities thread. - * - * @return the minimum refresh time in milliseconds - */ - public int getMinimumRefreshTime() { - return 3; - } - - /** - * Maximum time used to refresh chunks and entities thread. - * - * @return the maximum refresh time in milliseconds - */ - public int getMaximumRefreshTime() { - return (int) (MinecraftServer.TICK_MS * 0.3); + @Unmodifiable + public @NotNull List<@NotNull TickThread> threads() { + return threads; } /** @@ -80,55 +58,79 @@ public final class ThreadDispatcher { * @param time the tick time in milliseconds */ public void updateAndAwait(long time) { - for (TickThread thread : threads) thread.startTick(time); - this.phaser.arriveAndAwaitAdvance(); // Update dispatcher this.updates.drain(update -> { - if (update instanceof DispatchUpdate.ChunkLoad chunkUpdate) { - processLoadedChunk(chunkUpdate.chunk()); - } else if (update instanceof DispatchUpdate.ChunkUnload chunkUnload) { - processUnloadedChunk(chunkUnload.chunk()); - } else if (update instanceof DispatchUpdate.EntityUpdate entityUpdate) { - processUpdatedEntity(entityUpdate.entity()); - } else if (update instanceof DispatchUpdate.EntityRemove entityRemove) { - processRemovedEntity(entityRemove.entity()); + if (update instanceof DispatchUpdate.PartitionLoad

chunkUpdate) { + processLoadedChunk(chunkUpdate.partition()); + } else if (update instanceof DispatchUpdate.PartitionUnload

partitionUnload) { + processUnloadedChunk(partitionUnload.partition()); + } else if (update instanceof DispatchUpdate.ElementUpdate

elementUpdate) { + processUpdatedElement(elementUpdate.tickable(), elementUpdate.partition()); + } else if (update instanceof DispatchUpdate.ElementRemove elementRemove) { + processRemovedEntity(elementRemove.tickable()); } else { throw new IllegalStateException("Unknown update type: " + update.getClass().getSimpleName()); } }); + // Tick all partitions + for (TickThread thread : threads) thread.startTick(time); + this.phaser.arriveAndAwaitAdvance(); } /** * Called at the end of each tick to clear removed entities, - * refresh the chunk linked to an entity, and chunk threads based on {@link ThreadProvider#findThread(Chunk)}. + * refresh the chunk linked to an entity, and chunk threads based on {@link ThreadProvider#findThread(Object)}. * - * @param tickTime the duration of the tick in ms, - * used to ensure that the refresh does not take more time than the tick itself + * @param nanoTimeout max time in nanoseconds to update partitions */ - public void refreshThreads(long tickTime) { - final ThreadProvider.RefreshType refreshType = provider.getChunkRefreshType(); - if (refreshType == ThreadProvider.RefreshType.NEVER) - return; - - final int timeOffset = MathUtils.clamp((int) ((double) tickTime * getRefreshPercentage()), - getMinimumRefreshTime(), getMaximumRefreshTime()); - final long endTime = System.currentTimeMillis() + timeOffset; - final int size = chunkUpdateQueue.size(); - int counter = 0; - while (true) { - final Chunk chunk = chunkUpdateQueue.pollFirst(); - if (chunk == null) break; - // Update chunk's thread - ChunkEntry chunkEntry = chunkEntryMap.get(chunk); - if (chunkEntry != null) chunkEntry.thread = retrieveThread(chunk); - this.chunkUpdateQueue.addLast(chunk); - if (++counter > size || System.currentTimeMillis() >= endTime) - break; + public void refreshThreads(long nanoTimeout) { + switch (provider.refreshType()) { + case NEVER -> { + // Do nothing + } + case ALWAYS -> { + final long currentTime = System.nanoTime(); + int counter = partitionUpdateQueue.size(); + while (true) { + final P partition = partitionUpdateQueue.pollFirst(); + if (partition == null) break; + // Update chunk's thread + Partition partitionEntry = partitions.get(partition); + assert partitionEntry != null; + final TickThread previous = partitionEntry.thread; + final TickThread next = retrieveThread(partition); + if (next != previous) { + partitionEntry.thread = next; + previous.entries().remove(partitionEntry); + next.entries().add(partitionEntry); + } + this.partitionUpdateQueue.addLast(partition); + if (--counter <= 0 || System.nanoTime() - currentTime >= nanoTimeout) { + break; + } + } + } } } - public void signalUpdate(@NotNull DispatchUpdate update) { - this.updates.relaxedOffer(update); + public void refreshThreads() { + refreshThreads(Long.MAX_VALUE); + } + + public void createPartition(P partition) { + signalUpdate(new DispatchUpdate.PartitionLoad<>(partition)); + } + + public void deletePartition(P partition) { + signalUpdate(new DispatchUpdate.PartitionUnload<>(partition)); + } + + public void updateElement(Tickable tickable, P partition) { + signalUpdate(new DispatchUpdate.ElementUpdate<>(tickable, partition)); + } + + public void removeElement(Tickable tickable) { + signalUpdate(new DispatchUpdate.ElementRemove<>(tickable)); } /** @@ -140,86 +142,94 @@ public final class ThreadDispatcher { this.threads.forEach(TickThread::shutdown); } - private TickThread retrieveThread(Chunk chunk) { - final int threadId = Math.abs(provider.findThread(chunk)) % threads.size(); - return threads.get(threadId); + private TickThread retrieveThread(P partition) { + final int threadId = provider.findThread(partition); + final int index = Math.abs(threadId) % threads.size(); + return threads.get(index); } - private void processLoadedChunk(Chunk chunk) { - final TickThread thread = retrieveThread(chunk); - final ChunkEntry chunkEntry = new ChunkEntry(thread, chunk); - thread.entries().add(chunkEntry); - this.chunkEntryMap.put(chunk, chunkEntry); - this.chunkUpdateQueue.add(chunk); + private void signalUpdate(@NotNull DispatchUpdate

update) { + this.updates.relaxedOffer(update); } - private void processUnloadedChunk(Chunk chunk) { - final ChunkEntry chunkEntry = chunkEntryMap.remove(chunk); - if (chunkEntry != null) { - TickThread thread = chunkEntry.thread; - thread.entries().remove(chunkEntry); - } - this.chunkUpdateQueue.remove(chunk); - } - - private void processRemovedEntity(Entity entity) { - var acquirableEntity = entity.getAcquirable(); - ChunkEntry chunkEntry = acquirableEntity.getHandler().getChunkEntry(); - if (chunkEntry != null) { - chunkEntry.entities.remove(entity); + private void processLoadedChunk(P partition) { + if (partitions.containsKey(partition)) return; + final TickThread thread = retrieveThread(partition); + final Partition partitionEntry = new Partition(thread); + thread.entries().add(partitionEntry); + this.partitions.put(partition, partitionEntry); + this.partitionUpdateQueue.add(partition); + if (partition instanceof Tickable tickable) { + processUpdatedElement(tickable, partition); } } - private void processUpdatedEntity(Entity entity) { - ChunkEntry chunkEntry; + private void processUnloadedChunk(P partition) { + final Partition partitionEntry = partitions.remove(partition); + if (partitionEntry != null) { + TickThread thread = partitionEntry.thread; + thread.entries().remove(partitionEntry); + } + this.partitionUpdateQueue.remove(partition); + } - var acquirableEntity = entity.getAcquirable(); - chunkEntry = acquirableEntity.getHandler().getChunkEntry(); + private void processRemovedEntity(Tickable tickable) { + Partition partition = elements.get(tickable); + if (partition != null) { + partition.elements.remove(tickable); + } + } + + private void processUpdatedElement(Tickable tickable, P partition) { + Partition partitionEntry; + + partitionEntry = elements.get(tickable); // Remove from previous list - if (chunkEntry != null) { - chunkEntry.entities.remove(entity); + if (partitionEntry != null) { + partitionEntry.elements.remove(tickable); } // Add to new list - chunkEntry = chunkEntryMap.get(entity.getChunk()); - if (chunkEntry != null) { - chunkEntry.entities.add(entity); - acquirableEntity.getHandler().refreshChunkEntry(chunkEntry); + partitionEntry = partitions.get(partition); + if (partitionEntry != null) { + this.elements.put(tickable, partitionEntry); + partitionEntry.elements.add(tickable); + if (tickable instanceof Acquirable acquirable) { + acquirable.getHandler().refreshChunkEntry(partitionEntry); + } } } - public static final class ChunkEntry { - private volatile TickThread thread; - private final Chunk chunk; - private final List entities = new ArrayList<>(); + public static final class Partition { + private TickThread thread; + private final List elements = new ArrayList<>(); - private ChunkEntry(TickThread thread, Chunk chunk) { + private Partition(TickThread thread) { this.thread = thread; - this.chunk = chunk; } public @NotNull TickThread thread() { return thread; } - public @NotNull Chunk chunk() { - return chunk; + public @NotNull List elements() { + return elements; + } + } + + @ApiStatus.Internal + sealed interface DispatchUpdate

permits + DispatchUpdate.PartitionLoad, DispatchUpdate.PartitionUnload, + DispatchUpdate.ElementUpdate, DispatchUpdate.ElementRemove { + record PartitionLoad

(@NotNull P partition) implements DispatchUpdate

{ } - public @NotNull List entities() { - return entities; + record PartitionUnload

(@NotNull P partition) implements DispatchUpdate

{ } - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ChunkEntry that = (ChunkEntry) o; - return chunk.equals(that.chunk); + record ElementUpdate

(@NotNull Tickable tickable, P partition) implements DispatchUpdate

{ } - @Override - public int hashCode() { - return Objects.hash(chunk); + record ElementRemove

(@NotNull Tickable tickable) implements DispatchUpdate

{ } } } diff --git a/src/main/java/net/minestom/server/thread/ThreadProvider.java b/src/main/java/net/minestom/server/thread/ThreadProvider.java index 71300185a..1830d2f56 100644 --- a/src/main/java/net/minestom/server/thread/ThreadProvider.java +++ b/src/main/java/net/minestom/server/thread/ThreadProvider.java @@ -2,8 +2,6 @@ package net.minestom.server.thread; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; -import net.minestom.server.instance.Chunk; -import net.minestom.server.instance.Instance; import org.jetbrains.annotations.ApiStatus; import org.jetbrains.annotations.NotNull; @@ -11,39 +9,32 @@ import java.util.concurrent.atomic.AtomicInteger; @FunctionalInterface @ApiStatus.Experimental -public interface ThreadProvider { - ThreadProvider PER_CHUNk = new ThreadProvider() { - private final AtomicInteger counter = new AtomicInteger(); +public interface ThreadProvider { + static @NotNull ThreadProvider counter() { + return new ThreadProvider<>() { + private final Cache cache = Caffeine.newBuilder().weakKeys().build(); + private final AtomicInteger counter = new AtomicInteger(); - @Override - public int findThread(@NotNull Chunk chunk) { - return counter.getAndIncrement(); - } - }; - ThreadProvider PER_INSTANCE = new ThreadProvider() { - private final Cache cache = Caffeine.newBuilder().weakKeys().build(); - private final AtomicInteger counter = new AtomicInteger(); - - @Override - public int findThread(@NotNull Chunk chunk) { - return cache.get(chunk.getInstance(), i -> counter.getAndIncrement()); - } - }; - ThreadProvider SINGLE = chunk -> 0; + @Override + public int findThread(@NotNull T partition) { + return cache.get(partition, i -> counter.getAndIncrement()); + } + }; + } /** * Performs a server tick for all chunks based on their linked thread. * - * @param chunk the chunk + * @param partition the partition */ - int findThread(@NotNull Chunk chunk); + int findThread(@NotNull T partition); /** * Defines how often chunks thread should be updated. * * @return the refresh type */ - default @NotNull RefreshType getChunkRefreshType() { + default @NotNull RefreshType refreshType() { return RefreshType.NEVER; } @@ -52,16 +43,16 @@ public interface ThreadProvider { */ enum RefreshType { /** - * Chunk thread is constant after being defined. + * Thread never change after being defined once. + *

+ * Means that {@link #findThread(Object)} will only be called once for each partition. */ NEVER, /** - * Chunk thread should be recomputed as often as possible. + * Thread is updated as often as possible. + *

+ * Means that {@link #findThread(Object)} may be called multiple time for each partition. */ - CONSTANT, - /** - * Chunk thread should be recomputed, but not continuously. - */ - RARELY + ALWAYS } } diff --git a/src/main/java/net/minestom/server/thread/TickThread.java b/src/main/java/net/minestom/server/thread/TickThread.java index 470efcaf3..770b026a8 100644 --- a/src/main/java/net/minestom/server/thread/TickThread.java +++ b/src/main/java/net/minestom/server/thread/TickThread.java @@ -1,6 +1,7 @@ package net.minestom.server.thread; import net.minestom.server.MinecraftServer; +import net.minestom.server.Tickable; import net.minestom.server.entity.Entity; import net.minestom.server.instance.Chunk; import org.jetbrains.annotations.ApiStatus; @@ -25,7 +26,7 @@ public final class TickThread extends MinestomThread { private volatile boolean stop; private long tickTime; - private final List entries = new ArrayList<>(); + private final List entries = new ArrayList<>(); public TickThread(Phaser phaser, int number) { super(MinecraftServer.THREAD_NAME_TICK + "-" + number); @@ -50,26 +51,19 @@ public final class TickThread extends MinestomThread { } private void tick() { - for (ThreadDispatcher.ChunkEntry entry : entries) { - final Chunk chunk = entry.chunk(); - try { - chunk.tick(tickTime); - } catch (Throwable e) { - MinecraftServer.getExceptionManager().handleException(e); - } - final List entities = entry.entities(); - if (!entities.isEmpty()) { - for (Entity entity : entities) { - if (lock.hasQueuedThreads()) { - this.lock.unlock(); - // #acquire() callbacks should be called here - this.lock.lock(); - } - try { - entity.tick(tickTime); - } catch (Throwable e) { - MinecraftServer.getExceptionManager().handleException(e); - } + for (ThreadDispatcher.Partition entry : entries) { + final List elements = entry.elements(); + if (elements.isEmpty()) continue; + for (Tickable element : elements) { + if (lock.hasQueuedThreads()) { + this.lock.unlock(); + // #acquire() callbacks should be called here + this.lock.lock(); + } + try { + element.tick(tickTime); + } catch (Throwable e) { + MinecraftServer.getExceptionManager().handleException(e); } } } @@ -84,7 +78,7 @@ public final class TickThread extends MinestomThread { LockSupport.unpark(this); } - public Collection entries() { + public Collection entries() { return entries; } diff --git a/src/test/java/thread/ThreadDispatcherTest.java b/src/test/java/thread/ThreadDispatcherTest.java new file mode 100644 index 000000000..6b8312c01 --- /dev/null +++ b/src/test/java/thread/ThreadDispatcherTest.java @@ -0,0 +1,176 @@ +package thread; + +import net.minestom.server.Tickable; +import net.minestom.server.thread.ThreadDispatcher; +import net.minestom.server.thread.ThreadProvider; +import net.minestom.server.thread.TickThread; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.*; + +public class ThreadDispatcherTest { + + @Test + public void elementTick() { + final AtomicInteger counter = new AtomicInteger(); + ThreadDispatcher dispatcher = ThreadDispatcher.singleThread(); + assertEquals(1, dispatcher.threads().size()); + assertThrows(Exception.class, () -> dispatcher.threads().add(new TickThread(new Phaser(), 1))); + + var partition = new Object(); + Tickable element = (time) -> counter.incrementAndGet(); + dispatcher.createPartition(partition); + dispatcher.updateElement(element, partition); + assertEquals(0, counter.get()); + + dispatcher.updateAndAwait(System.currentTimeMillis()); + dispatcher.updateElement(element, partition); // Should be ignored + dispatcher.createPartition(partition); // Ignored too + assertEquals(1, counter.get()); + + dispatcher.updateAndAwait(System.currentTimeMillis()); + assertEquals(2, counter.get()); + + dispatcher.removeElement(element); + dispatcher.updateAndAwait(System.currentTimeMillis()); + assertEquals(2, counter.get()); + + dispatcher.shutdown(); + } + + @Test + public void partitionTick() { + // Partitions implementing Tickable should be ticked same as elements + final AtomicInteger counter1 = new AtomicInteger(); + final AtomicInteger counter2 = new AtomicInteger(); + ThreadDispatcher dispatcher = ThreadDispatcher.singleThread(); + assertEquals(1, dispatcher.threads().size()); + + Tickable partition = (time) -> counter1.incrementAndGet(); + Tickable element = (time) -> counter2.incrementAndGet(); + dispatcher.createPartition(partition); + dispatcher.updateElement(element, partition); + assertEquals(0, counter1.get()); + assertEquals(0, counter2.get()); + + dispatcher.updateAndAwait(System.currentTimeMillis()); + assertEquals(1, counter1.get()); + assertEquals(1, counter2.get()); + + dispatcher.updateAndAwait(System.currentTimeMillis()); + assertEquals(2, counter1.get()); + assertEquals(2, counter2.get()); + + dispatcher.deletePartition(partition); + dispatcher.updateAndAwait(System.currentTimeMillis()); + assertEquals(2, counter1.get()); + assertEquals(2, counter2.get()); + + dispatcher.shutdown(); + } + + @Test + public void uniqueThread() { + // Ensure that partitions are properly dispatched across threads + final int threadCount = 10; + ThreadDispatcher dispatcher = ThreadDispatcher.of(ThreadProvider.counter(), threadCount); + assertEquals(threadCount, dispatcher.threads().size()); + + final AtomicInteger counter = new AtomicInteger(); + Set threads = new CopyOnWriteArraySet<>(); + Set partitions = IntStream.range(0, threadCount) + .mapToObj(value -> (Tickable) (time) -> { + final Thread thread = Thread.currentThread(); + assertInstanceOf(TickThread.class, thread); + assertEquals(1, ((TickThread) thread).entries().size()); + assertTrue(threads.add(thread)); + counter.getAndIncrement(); + }) + .collect(Collectors.toUnmodifiableSet()); + assertEquals(threadCount, partitions.size()); + + partitions.forEach(dispatcher::createPartition); + assertEquals(0, counter.get()); + + dispatcher.updateAndAwait(System.currentTimeMillis()); + assertEquals(threadCount, counter.get()); + + dispatcher.shutdown(); + } + + @Test + public void threadUpdate() { + // Ensure that partitions threads are properly updated every tick + // when RefreshType.ALWAYS is used + interface Updater extends Tickable { + int getValue(); + } + + final int threadCount = 10; + ThreadDispatcher dispatcher = ThreadDispatcher.of(new ThreadProvider<>() { + @Override + public int findThread(@NotNull Updater partition) { + return partition.getValue(); + } + + @Override + public @NotNull RefreshType refreshType() { + return RefreshType.ALWAYS; + } + }, threadCount); + assertEquals(threadCount, dispatcher.threads().size()); + + Map threads = new ConcurrentHashMap<>(); + Map threads2 = new ConcurrentHashMap<>(); + Set partitions = IntStream.range(0, threadCount) + .mapToObj(value -> new Updater() { + private int v = value; + + @Override + public int getValue() { + return v; + } + + @Override + public void tick(long time) { + final Thread currentThread = Thread.currentThread(); + assertInstanceOf(TickThread.class, currentThread); + if (threads.putIfAbsent(this, currentThread) == null) { + this.v = value + 1; + } else { + assertEquals(value + 1, v); + threads2.putIfAbsent(this, currentThread); + } + } + }).collect(Collectors.toUnmodifiableSet()); + assertEquals(threadCount, partitions.size()); + + partitions.forEach(dispatcher::createPartition); + + dispatcher.updateAndAwait(System.currentTimeMillis()); + + dispatcher.refreshThreads(); + + dispatcher.updateAndAwait(System.currentTimeMillis()); + + assertEquals(threads2.size(), threads.size()); + assertNotEquals(threads, threads2, "Threads have not been updated at all"); + for (var entry : threads.entrySet()) { + final Thread thread1 = entry.getValue(); + final Thread thread2 = threads2.get(entry.getKey()); + assertNotEquals(thread1, thread2); + } + + dispatcher.shutdown(); + } +}