Dispatcher testing (#570)

This commit is contained in:
TheMode 2022-01-01 16:04:20 +01:00
parent b79054f8e8
commit da69526f49
8 changed files with 368 additions and 221 deletions

View File

@ -7,7 +7,6 @@ import net.minestom.server.instance.InstanceManager;
import net.minestom.server.monitoring.TickMonitor;
import net.minestom.server.network.ConnectionManager;
import net.minestom.server.network.socket.Worker;
import net.minestom.server.thread.DispatchUpdate;
import net.minestom.server.thread.MinestomThread;
import net.minestom.server.thread.ThreadDispatcher;
import net.minestom.server.timer.SchedulerManager;
@ -31,7 +30,7 @@ public final class UpdateManager {
private volatile boolean stopRequested;
// TODO make configurable
private final ThreadDispatcher threadDispatcher = ThreadDispatcher.singleThread();
private final ThreadDispatcher<Chunk> threadDispatcher = ThreadDispatcher.singleThread();
private final Queue<LongConsumer> tickStartCallbacks = new ConcurrentLinkedQueue<>();
private final Queue<LongConsumer> tickEndCallbacks = new ConcurrentLinkedQueue<>();
@ -55,7 +54,7 @@ public final class UpdateManager {
*
* @return the current thread provider
*/
public @NotNull ThreadDispatcher getThreadProvider() {
public @NotNull ThreadDispatcher<Chunk> getThreadProvider() {
return threadDispatcher;
}
@ -89,7 +88,7 @@ public final class UpdateManager {
* @param chunk the loaded chunk
*/
public void signalChunkLoad(@NotNull Chunk chunk) {
this.threadDispatcher.signalUpdate(new DispatchUpdate.ChunkLoad(chunk));
this.threadDispatcher.createPartition(chunk);
}
/**
@ -100,7 +99,7 @@ public final class UpdateManager {
* @param chunk the unloaded chunk
*/
public void signalChunkUnload(@NotNull Chunk chunk) {
this.threadDispatcher.signalUpdate(new DispatchUpdate.ChunkUnload(chunk));
this.threadDispatcher.deletePartition(chunk);
}
/**
@ -237,8 +236,7 @@ public final class UpdateManager {
this.threadDispatcher.updateAndAwait(tickStart);
// Clear removed entities & update threads
final long tickTime = System.currentTimeMillis() - tickStart;
this.threadDispatcher.refreshThreads(tickTime);
this.threadDispatcher.refreshThreads();
}
private void doTickCallback(Queue<LongConsumer> callbacks, long value) {

View File

@ -27,7 +27,9 @@ public interface Acquirable<T> {
final Thread currentThread = Thread.currentThread();
if (currentThread instanceof TickThread) {
return ((TickThread) currentThread).entries().stream()
.flatMap(chunkEntry -> chunkEntry.entities().stream());
.flatMap(partitionEntry -> partitionEntry.elements().stream())
.filter(tickable -> tickable instanceof Entity)
.map(tickable -> (Entity) tickable);
}
return Stream.empty();
}
@ -149,19 +151,19 @@ public interface Acquirable<T> {
@NotNull Handler getHandler();
final class Handler {
private volatile ThreadDispatcher.ChunkEntry chunkEntry;
private volatile ThreadDispatcher.Partition partition;
public ThreadDispatcher.ChunkEntry getChunkEntry() {
return chunkEntry;
public ThreadDispatcher.Partition getChunkEntry() {
return partition;
}
@ApiStatus.Internal
public void refreshChunkEntry(@NotNull ThreadDispatcher.ChunkEntry chunkEntry) {
this.chunkEntry = chunkEntry;
public void refreshChunkEntry(@NotNull ThreadDispatcher.Partition partition) {
this.partition = partition;
}
public TickThread getTickThread() {
final ThreadDispatcher.ChunkEntry entry = this.chunkEntry;
final ThreadDispatcher.Partition entry = this.partition;
return entry != null ? entry.thread() : null;
}
}

View File

@ -37,7 +37,6 @@ import net.minestom.server.potion.PotionEffect;
import net.minestom.server.potion.TimedPotion;
import net.minestom.server.tag.Tag;
import net.minestom.server.tag.TagHandler;
import net.minestom.server.thread.DispatchUpdate;
import net.minestom.server.timer.Schedulable;
import net.minestom.server.timer.Scheduler;
import net.minestom.server.timer.TaskSchedule;
@ -785,7 +784,7 @@ public class Entity implements Viewable, Tickable, Schedulable, TagHandler, Perm
@ApiStatus.Internal
protected void refreshCurrentChunk(Chunk currentChunk) {
this.currentChunk = currentChunk;
MinecraftServer.getUpdateManager().getThreadProvider().signalUpdate(new DispatchUpdate.EntityUpdate(this));
MinecraftServer.getUpdateManager().getThreadProvider().updateElement(this, currentChunk);
}
/**
@ -1424,7 +1423,7 @@ public class Entity implements Viewable, Tickable, Schedulable, TagHandler, Perm
if (!passengers.isEmpty()) passengers.forEach(this::removePassenger);
final Entity vehicle = this.vehicle;
if (vehicle != null) vehicle.removePassenger(this);
MinecraftServer.getUpdateManager().getThreadProvider().signalUpdate(new DispatchUpdate.EntityRemove(this));
MinecraftServer.getUpdateManager().getThreadProvider().removeElement(this);
this.removed = true;
Entity.ENTITY_BY_ID.remove(id);
Entity.ENTITY_BY_UUID.remove(uuid);

View File

@ -1,23 +0,0 @@
package net.minestom.server.thread;
import net.minestom.server.entity.Entity;
import net.minestom.server.instance.Chunk;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
@ApiStatus.Internal
public sealed interface DispatchUpdate permits
DispatchUpdate.ChunkLoad, DispatchUpdate.ChunkUnload,
DispatchUpdate.EntityUpdate, DispatchUpdate.EntityRemove {
record ChunkLoad(@NotNull Chunk chunk) implements DispatchUpdate {
}
record ChunkUnload(@NotNull Chunk chunk) implements DispatchUpdate {
}
record EntityUpdate(@NotNull Entity entity) implements DispatchUpdate {
}
record EntityRemove(@NotNull Entity entity) implements DispatchUpdate {
}
}

View File

@ -1,12 +1,12 @@
package net.minestom.server.thread;
import net.minestom.server.MinecraftServer;
import net.minestom.server.entity.Entity;
import net.minestom.server.instance.Chunk;
import net.minestom.server.utils.MathUtils;
import net.minestom.server.Tickable;
import net.minestom.server.acquirable.Acquirable;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscUnboundedArrayQueue;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Unmodifiable;
import java.util.*;
import java.util.concurrent.Phaser;
@ -15,21 +15,23 @@ import java.util.concurrent.Phaser;
* Used to link chunks into multiple groups.
* Then executed into a thread pool.
*/
public final class ThreadDispatcher {
private final ThreadProvider provider;
public final class ThreadDispatcher<P> {
private final ThreadProvider<P> provider;
private final List<TickThread> threads;
// Chunk -> ChunkEntry mapping
private final Map<Chunk, ChunkEntry> chunkEntryMap = new HashMap<>();
// Partition -> dispatching context
// Defines how computation is dispatched to the threads
private final Map<P, Partition> partitions = new WeakHashMap<>();
// Cache to retrieve the threading context from a tickable element
private final Map<Tickable, Partition> elements = new WeakHashMap<>();
// Queue to update chunks linked thread
private final ArrayDeque<Chunk> chunkUpdateQueue = new ArrayDeque<>();
private final ArrayDeque<P> partitionUpdateQueue = new ArrayDeque<>();
// Requests consumed at the end of each tick
private final MessagePassingQueue<DispatchUpdate> updates = new MpscUnboundedArrayQueue<>(1024);
private final MessagePassingQueue<DispatchUpdate<P>> updates = new MpscUnboundedArrayQueue<>(1024);
private final Phaser phaser = new Phaser(1);
private ThreadDispatcher(ThreadProvider provider, int threadCount) {
private ThreadDispatcher(ThreadProvider<P> provider, int threadCount) {
this.provider = provider;
TickThread[] threads = new TickThread[threadCount];
Arrays.setAll(threads, i -> new TickThread(phaser, i));
@ -37,41 +39,17 @@ public final class ThreadDispatcher {
this.threads.forEach(Thread::start);
}
public static @NotNull ThreadDispatcher of(@NotNull ThreadProvider provider, int threadCount) {
return new ThreadDispatcher(provider, threadCount);
public static <P> @NotNull ThreadDispatcher<P> of(@NotNull ThreadProvider<P> provider, int threadCount) {
return new ThreadDispatcher<>(provider, threadCount);
}
public static @NotNull ThreadDispatcher singleThread() {
return of(ThreadProvider.SINGLE, 1);
public static <P> @NotNull ThreadDispatcher<P> singleThread() {
return of(ThreadProvider.counter(), 1);
}
/**
* Represents the maximum percentage of tick time that can be spent refreshing chunks thread.
* <p>
* Percentage based on {@link MinecraftServer#TICK_MS}.
*
* @return the refresh percentage
*/
public float getRefreshPercentage() {
return 0.3f;
}
/**
* Minimum time used to refresh chunks and entities thread.
*
* @return the minimum refresh time in milliseconds
*/
public int getMinimumRefreshTime() {
return 3;
}
/**
* Maximum time used to refresh chunks and entities thread.
*
* @return the maximum refresh time in milliseconds
*/
public int getMaximumRefreshTime() {
return (int) (MinecraftServer.TICK_MS * 0.3);
@Unmodifiable
public @NotNull List<@NotNull TickThread> threads() {
return threads;
}
/**
@ -80,55 +58,79 @@ public final class ThreadDispatcher {
* @param time the tick time in milliseconds
*/
public void updateAndAwait(long time) {
for (TickThread thread : threads) thread.startTick(time);
this.phaser.arriveAndAwaitAdvance();
// Update dispatcher
this.updates.drain(update -> {
if (update instanceof DispatchUpdate.ChunkLoad chunkUpdate) {
processLoadedChunk(chunkUpdate.chunk());
} else if (update instanceof DispatchUpdate.ChunkUnload chunkUnload) {
processUnloadedChunk(chunkUnload.chunk());
} else if (update instanceof DispatchUpdate.EntityUpdate entityUpdate) {
processUpdatedEntity(entityUpdate.entity());
} else if (update instanceof DispatchUpdate.EntityRemove entityRemove) {
processRemovedEntity(entityRemove.entity());
if (update instanceof DispatchUpdate.PartitionLoad<P> chunkUpdate) {
processLoadedChunk(chunkUpdate.partition());
} else if (update instanceof DispatchUpdate.PartitionUnload<P> partitionUnload) {
processUnloadedChunk(partitionUnload.partition());
} else if (update instanceof DispatchUpdate.ElementUpdate<P> elementUpdate) {
processUpdatedElement(elementUpdate.tickable(), elementUpdate.partition());
} else if (update instanceof DispatchUpdate.ElementRemove elementRemove) {
processRemovedEntity(elementRemove.tickable());
} else {
throw new IllegalStateException("Unknown update type: " + update.getClass().getSimpleName());
}
});
// Tick all partitions
for (TickThread thread : threads) thread.startTick(time);
this.phaser.arriveAndAwaitAdvance();
}
/**
* Called at the end of each tick to clear removed entities,
* refresh the chunk linked to an entity, and chunk threads based on {@link ThreadProvider#findThread(Chunk)}.
* refresh the chunk linked to an entity, and chunk threads based on {@link ThreadProvider#findThread(Object)}.
*
* @param tickTime the duration of the tick in ms,
* used to ensure that the refresh does not take more time than the tick itself
* @param nanoTimeout max time in nanoseconds to update partitions
*/
public void refreshThreads(long tickTime) {
final ThreadProvider.RefreshType refreshType = provider.getChunkRefreshType();
if (refreshType == ThreadProvider.RefreshType.NEVER)
return;
final int timeOffset = MathUtils.clamp((int) ((double) tickTime * getRefreshPercentage()),
getMinimumRefreshTime(), getMaximumRefreshTime());
final long endTime = System.currentTimeMillis() + timeOffset;
final int size = chunkUpdateQueue.size();
int counter = 0;
public void refreshThreads(long nanoTimeout) {
switch (provider.refreshType()) {
case NEVER -> {
// Do nothing
}
case ALWAYS -> {
final long currentTime = System.nanoTime();
int counter = partitionUpdateQueue.size();
while (true) {
final Chunk chunk = chunkUpdateQueue.pollFirst();
if (chunk == null) break;
final P partition = partitionUpdateQueue.pollFirst();
if (partition == null) break;
// Update chunk's thread
ChunkEntry chunkEntry = chunkEntryMap.get(chunk);
if (chunkEntry != null) chunkEntry.thread = retrieveThread(chunk);
this.chunkUpdateQueue.addLast(chunk);
if (++counter > size || System.currentTimeMillis() >= endTime)
Partition partitionEntry = partitions.get(partition);
assert partitionEntry != null;
final TickThread previous = partitionEntry.thread;
final TickThread next = retrieveThread(partition);
if (next != previous) {
partitionEntry.thread = next;
previous.entries().remove(partitionEntry);
next.entries().add(partitionEntry);
}
this.partitionUpdateQueue.addLast(partition);
if (--counter <= 0 || System.nanoTime() - currentTime >= nanoTimeout) {
break;
}
}
}
}
}
public void signalUpdate(@NotNull DispatchUpdate update) {
this.updates.relaxedOffer(update);
public void refreshThreads() {
refreshThreads(Long.MAX_VALUE);
}
public void createPartition(P partition) {
signalUpdate(new DispatchUpdate.PartitionLoad<>(partition));
}
public void deletePartition(P partition) {
signalUpdate(new DispatchUpdate.PartitionUnload<>(partition));
}
public void updateElement(Tickable tickable, P partition) {
signalUpdate(new DispatchUpdate.ElementUpdate<>(tickable, partition));
}
public void removeElement(Tickable tickable) {
signalUpdate(new DispatchUpdate.ElementRemove<>(tickable));
}
/**
@ -140,86 +142,94 @@ public final class ThreadDispatcher {
this.threads.forEach(TickThread::shutdown);
}
private TickThread retrieveThread(Chunk chunk) {
final int threadId = Math.abs(provider.findThread(chunk)) % threads.size();
return threads.get(threadId);
private TickThread retrieveThread(P partition) {
final int threadId = provider.findThread(partition);
final int index = Math.abs(threadId) % threads.size();
return threads.get(index);
}
private void processLoadedChunk(Chunk chunk) {
final TickThread thread = retrieveThread(chunk);
final ChunkEntry chunkEntry = new ChunkEntry(thread, chunk);
thread.entries().add(chunkEntry);
this.chunkEntryMap.put(chunk, chunkEntry);
this.chunkUpdateQueue.add(chunk);
private void signalUpdate(@NotNull DispatchUpdate<P> update) {
this.updates.relaxedOffer(update);
}
private void processUnloadedChunk(Chunk chunk) {
final ChunkEntry chunkEntry = chunkEntryMap.remove(chunk);
if (chunkEntry != null) {
TickThread thread = chunkEntry.thread;
thread.entries().remove(chunkEntry);
}
this.chunkUpdateQueue.remove(chunk);
}
private void processRemovedEntity(Entity entity) {
var acquirableEntity = entity.getAcquirable();
ChunkEntry chunkEntry = acquirableEntity.getHandler().getChunkEntry();
if (chunkEntry != null) {
chunkEntry.entities.remove(entity);
private void processLoadedChunk(P partition) {
if (partitions.containsKey(partition)) return;
final TickThread thread = retrieveThread(partition);
final Partition partitionEntry = new Partition(thread);
thread.entries().add(partitionEntry);
this.partitions.put(partition, partitionEntry);
this.partitionUpdateQueue.add(partition);
if (partition instanceof Tickable tickable) {
processUpdatedElement(tickable, partition);
}
}
private void processUpdatedEntity(Entity entity) {
ChunkEntry chunkEntry;
private void processUnloadedChunk(P partition) {
final Partition partitionEntry = partitions.remove(partition);
if (partitionEntry != null) {
TickThread thread = partitionEntry.thread;
thread.entries().remove(partitionEntry);
}
this.partitionUpdateQueue.remove(partition);
}
var acquirableEntity = entity.getAcquirable();
chunkEntry = acquirableEntity.getHandler().getChunkEntry();
private void processRemovedEntity(Tickable tickable) {
Partition partition = elements.get(tickable);
if (partition != null) {
partition.elements.remove(tickable);
}
}
private void processUpdatedElement(Tickable tickable, P partition) {
Partition partitionEntry;
partitionEntry = elements.get(tickable);
// Remove from previous list
if (chunkEntry != null) {
chunkEntry.entities.remove(entity);
if (partitionEntry != null) {
partitionEntry.elements.remove(tickable);
}
// Add to new list
chunkEntry = chunkEntryMap.get(entity.getChunk());
if (chunkEntry != null) {
chunkEntry.entities.add(entity);
acquirableEntity.getHandler().refreshChunkEntry(chunkEntry);
partitionEntry = partitions.get(partition);
if (partitionEntry != null) {
this.elements.put(tickable, partitionEntry);
partitionEntry.elements.add(tickable);
if (tickable instanceof Acquirable<?> acquirable) {
acquirable.getHandler().refreshChunkEntry(partitionEntry);
}
}
}
public static final class ChunkEntry {
private volatile TickThread thread;
private final Chunk chunk;
private final List<Entity> entities = new ArrayList<>();
public static final class Partition {
private TickThread thread;
private final List<Tickable> elements = new ArrayList<>();
private ChunkEntry(TickThread thread, Chunk chunk) {
private Partition(TickThread thread) {
this.thread = thread;
this.chunk = chunk;
}
public @NotNull TickThread thread() {
return thread;
}
public @NotNull Chunk chunk() {
return chunk;
public @NotNull List<Tickable> elements() {
return elements;
}
}
public @NotNull List<Entity> entities() {
return entities;
@ApiStatus.Internal
sealed interface DispatchUpdate<P> permits
DispatchUpdate.PartitionLoad, DispatchUpdate.PartitionUnload,
DispatchUpdate.ElementUpdate, DispatchUpdate.ElementRemove {
record PartitionLoad<P>(@NotNull P partition) implements DispatchUpdate<P> {
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ChunkEntry that = (ChunkEntry) o;
return chunk.equals(that.chunk);
record PartitionUnload<P>(@NotNull P partition) implements DispatchUpdate<P> {
}
@Override
public int hashCode() {
return Objects.hash(chunk);
record ElementUpdate<P>(@NotNull Tickable tickable, P partition) implements DispatchUpdate<P> {
}
record ElementRemove<P>(@NotNull Tickable tickable) implements DispatchUpdate<P> {
}
}
}

View File

@ -2,8 +2,6 @@ package net.minestom.server.thread;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import net.minestom.server.instance.Chunk;
import net.minestom.server.instance.Instance;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
@ -11,39 +9,32 @@ import java.util.concurrent.atomic.AtomicInteger;
@FunctionalInterface
@ApiStatus.Experimental
public interface ThreadProvider {
ThreadProvider PER_CHUNk = new ThreadProvider() {
public interface ThreadProvider<T> {
static <T> @NotNull ThreadProvider<T> counter() {
return new ThreadProvider<>() {
private final Cache<T, Integer> cache = Caffeine.newBuilder().weakKeys().build();
private final AtomicInteger counter = new AtomicInteger();
@Override
public int findThread(@NotNull Chunk chunk) {
return counter.getAndIncrement();
public int findThread(@NotNull T partition) {
return cache.get(partition, i -> counter.getAndIncrement());
}
};
ThreadProvider PER_INSTANCE = new ThreadProvider() {
private final Cache<Instance, Integer> cache = Caffeine.newBuilder().weakKeys().build();
private final AtomicInteger counter = new AtomicInteger();
@Override
public int findThread(@NotNull Chunk chunk) {
return cache.get(chunk.getInstance(), i -> counter.getAndIncrement());
}
};
ThreadProvider SINGLE = chunk -> 0;
/**
* Performs a server tick for all chunks based on their linked thread.
*
* @param chunk the chunk
* @param partition the partition
*/
int findThread(@NotNull Chunk chunk);
int findThread(@NotNull T partition);
/**
* Defines how often chunks thread should be updated.
*
* @return the refresh type
*/
default @NotNull RefreshType getChunkRefreshType() {
default @NotNull RefreshType refreshType() {
return RefreshType.NEVER;
}
@ -52,16 +43,16 @@ public interface ThreadProvider {
*/
enum RefreshType {
/**
* Chunk thread is constant after being defined.
* Thread never change after being defined once.
* <p>
* Means that {@link #findThread(Object)} will only be called once for each partition.
*/
NEVER,
/**
* Chunk thread should be recomputed as often as possible.
* Thread is updated as often as possible.
* <p>
* Means that {@link #findThread(Object)} may be called multiple time for each partition.
*/
CONSTANT,
/**
* Chunk thread should be recomputed, but not continuously.
*/
RARELY
ALWAYS
}
}

View File

@ -1,6 +1,7 @@
package net.minestom.server.thread;
import net.minestom.server.MinecraftServer;
import net.minestom.server.Tickable;
import net.minestom.server.entity.Entity;
import net.minestom.server.instance.Chunk;
import org.jetbrains.annotations.ApiStatus;
@ -25,7 +26,7 @@ public final class TickThread extends MinestomThread {
private volatile boolean stop;
private long tickTime;
private final List<ThreadDispatcher.ChunkEntry> entries = new ArrayList<>();
private final List<ThreadDispatcher.Partition> entries = new ArrayList<>();
public TickThread(Phaser phaser, int number) {
super(MinecraftServer.THREAD_NAME_TICK + "-" + number);
@ -50,30 +51,23 @@ public final class TickThread extends MinestomThread {
}
private void tick() {
for (ThreadDispatcher.ChunkEntry entry : entries) {
final Chunk chunk = entry.chunk();
try {
chunk.tick(tickTime);
} catch (Throwable e) {
MinecraftServer.getExceptionManager().handleException(e);
}
final List<Entity> entities = entry.entities();
if (!entities.isEmpty()) {
for (Entity entity : entities) {
for (ThreadDispatcher.Partition entry : entries) {
final List<Tickable> elements = entry.elements();
if (elements.isEmpty()) continue;
for (Tickable element : elements) {
if (lock.hasQueuedThreads()) {
this.lock.unlock();
// #acquire() callbacks should be called here
this.lock.lock();
}
try {
entity.tick(tickTime);
element.tick(tickTime);
} catch (Throwable e) {
MinecraftServer.getExceptionManager().handleException(e);
}
}
}
}
}
void startTick(long tickTime) {
if (entries.isEmpty())
@ -84,7 +78,7 @@ public final class TickThread extends MinestomThread {
LockSupport.unpark(this);
}
public Collection<ThreadDispatcher.ChunkEntry> entries() {
public Collection<ThreadDispatcher.Partition> entries() {
return entries;
}

View File

@ -0,0 +1,176 @@
package thread;
import net.minestom.server.Tickable;
import net.minestom.server.thread.ThreadDispatcher;
import net.minestom.server.thread.ThreadProvider;
import net.minestom.server.thread.TickThread;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.*;
public class ThreadDispatcherTest {
@Test
public void elementTick() {
final AtomicInteger counter = new AtomicInteger();
ThreadDispatcher<Object> dispatcher = ThreadDispatcher.singleThread();
assertEquals(1, dispatcher.threads().size());
assertThrows(Exception.class, () -> dispatcher.threads().add(new TickThread(new Phaser(), 1)));
var partition = new Object();
Tickable element = (time) -> counter.incrementAndGet();
dispatcher.createPartition(partition);
dispatcher.updateElement(element, partition);
assertEquals(0, counter.get());
dispatcher.updateAndAwait(System.currentTimeMillis());
dispatcher.updateElement(element, partition); // Should be ignored
dispatcher.createPartition(partition); // Ignored too
assertEquals(1, counter.get());
dispatcher.updateAndAwait(System.currentTimeMillis());
assertEquals(2, counter.get());
dispatcher.removeElement(element);
dispatcher.updateAndAwait(System.currentTimeMillis());
assertEquals(2, counter.get());
dispatcher.shutdown();
}
@Test
public void partitionTick() {
// Partitions implementing Tickable should be ticked same as elements
final AtomicInteger counter1 = new AtomicInteger();
final AtomicInteger counter2 = new AtomicInteger();
ThreadDispatcher<Tickable> dispatcher = ThreadDispatcher.singleThread();
assertEquals(1, dispatcher.threads().size());
Tickable partition = (time) -> counter1.incrementAndGet();
Tickable element = (time) -> counter2.incrementAndGet();
dispatcher.createPartition(partition);
dispatcher.updateElement(element, partition);
assertEquals(0, counter1.get());
assertEquals(0, counter2.get());
dispatcher.updateAndAwait(System.currentTimeMillis());
assertEquals(1, counter1.get());
assertEquals(1, counter2.get());
dispatcher.updateAndAwait(System.currentTimeMillis());
assertEquals(2, counter1.get());
assertEquals(2, counter2.get());
dispatcher.deletePartition(partition);
dispatcher.updateAndAwait(System.currentTimeMillis());
assertEquals(2, counter1.get());
assertEquals(2, counter2.get());
dispatcher.shutdown();
}
@Test
public void uniqueThread() {
// Ensure that partitions are properly dispatched across threads
final int threadCount = 10;
ThreadDispatcher<Tickable> dispatcher = ThreadDispatcher.of(ThreadProvider.counter(), threadCount);
assertEquals(threadCount, dispatcher.threads().size());
final AtomicInteger counter = new AtomicInteger();
Set<Thread> threads = new CopyOnWriteArraySet<>();
Set<Tickable> partitions = IntStream.range(0, threadCount)
.mapToObj(value -> (Tickable) (time) -> {
final Thread thread = Thread.currentThread();
assertInstanceOf(TickThread.class, thread);
assertEquals(1, ((TickThread) thread).entries().size());
assertTrue(threads.add(thread));
counter.getAndIncrement();
})
.collect(Collectors.toUnmodifiableSet());
assertEquals(threadCount, partitions.size());
partitions.forEach(dispatcher::createPartition);
assertEquals(0, counter.get());
dispatcher.updateAndAwait(System.currentTimeMillis());
assertEquals(threadCount, counter.get());
dispatcher.shutdown();
}
@Test
public void threadUpdate() {
// Ensure that partitions threads are properly updated every tick
// when RefreshType.ALWAYS is used
interface Updater extends Tickable {
int getValue();
}
final int threadCount = 10;
ThreadDispatcher<Updater> dispatcher = ThreadDispatcher.of(new ThreadProvider<>() {
@Override
public int findThread(@NotNull Updater partition) {
return partition.getValue();
}
@Override
public @NotNull RefreshType refreshType() {
return RefreshType.ALWAYS;
}
}, threadCount);
assertEquals(threadCount, dispatcher.threads().size());
Map<Updater, Thread> threads = new ConcurrentHashMap<>();
Map<Updater, Thread> threads2 = new ConcurrentHashMap<>();
Set<Updater> partitions = IntStream.range(0, threadCount)
.mapToObj(value -> new Updater() {
private int v = value;
@Override
public int getValue() {
return v;
}
@Override
public void tick(long time) {
final Thread currentThread = Thread.currentThread();
assertInstanceOf(TickThread.class, currentThread);
if (threads.putIfAbsent(this, currentThread) == null) {
this.v = value + 1;
} else {
assertEquals(value + 1, v);
threads2.putIfAbsent(this, currentThread);
}
}
}).collect(Collectors.toUnmodifiableSet());
assertEquals(threadCount, partitions.size());
partitions.forEach(dispatcher::createPartition);
dispatcher.updateAndAwait(System.currentTimeMillis());
dispatcher.refreshThreads();
dispatcher.updateAndAwait(System.currentTimeMillis());
assertEquals(threads2.size(), threads.size());
assertNotEquals(threads, threads2, "Threads have not been updated at all");
for (var entry : threads.entrySet()) {
final Thread thread1 = entry.getValue();
final Thread thread2 = threads2.get(entry.getKey());
assertNotEquals(thread1, thread2);
}
dispatcher.shutdown();
}
}