Improve the thread provider api

Signed-off-by: TheMode <themode@outlook.fr>
This commit is contained in:
TheMode 2021-09-12 22:55:37 +02:00
parent c31aa8a7ec
commit 06d8586f7f
11 changed files with 363 additions and 442 deletions

View File

@ -7,8 +7,7 @@ import net.minestom.server.instance.Instance;
import net.minestom.server.instance.InstanceManager;
import net.minestom.server.monitoring.TickMonitor;
import net.minestom.server.network.ConnectionManager;
import net.minestom.server.thread.SingleThreadProvider;
import net.minestom.server.thread.ThreadProvider;
import net.minestom.server.thread.ThreadDispatcher;
import net.minestom.server.utils.PacketUtils;
import net.minestom.server.utils.async.AsyncUtils;
import org.jetbrains.annotations.NotNull;
@ -24,14 +23,14 @@ import java.util.function.LongConsumer;
/**
* Manager responsible for the server ticks.
* <p>
* The {@link ThreadProvider} manages the multi-thread aspect of chunk ticks.
* The {@link ThreadDispatcher} manages the multi-thread aspect of chunk ticks.
*/
public final class UpdateManager {
private volatile boolean stopRequested;
// TODO make configurable
private ThreadProvider threadProvider = new SingleThreadProvider();
private ThreadDispatcher threadDispatcher = ThreadDispatcher.singleThread();
private final Queue<LongConsumer> tickStartCallbacks = new ConcurrentLinkedQueue<>();
private final Queue<LongConsumer> tickEndCallbacks = new ConcurrentLinkedQueue<>();
@ -96,7 +95,7 @@ public final class UpdateManager {
MinecraftServer.getExceptionManager().handleException(e);
}
}
this.threadProvider.shutdown();
this.threadDispatcher.shutdown();
}, MinecraftServer.THREAD_NAME_TICK_SCHEDULER).start();
}
@ -115,11 +114,11 @@ public final class UpdateManager {
}
});
// Tick all chunks (and entities inside)
this.threadProvider.updateAndAwait(tickStart);
this.threadDispatcher.updateAndAwait(tickStart);
// Clear removed entities & update threads
long tickTime = System.currentTimeMillis() - tickStart;
this.threadProvider.refreshThreads(tickTime);
final long tickTime = System.currentTimeMillis() - tickStart;
this.threadDispatcher.refreshThreads(tickTime);
}
/**
@ -138,56 +137,56 @@ public final class UpdateManager {
}
/**
* Gets the current {@link ThreadProvider}.
* Gets the current {@link ThreadDispatcher}.
*
* @return the current thread provider
*/
public @NotNull ThreadProvider getThreadProvider() {
return threadProvider;
public @NotNull ThreadDispatcher getThreadProvider() {
return threadDispatcher;
}
/**
* Signals the {@link ThreadProvider} that an instance has been created.
* Signals the {@link ThreadDispatcher} that an instance has been created.
* <p>
* WARNING: should be automatically done by the {@link InstanceManager}.
*
* @param instance the instance
*/
public void signalInstanceCreate(Instance instance) {
this.threadProvider.onInstanceCreate(instance);
this.threadDispatcher.onInstanceCreate(instance);
}
/**
* Signals the {@link ThreadProvider} that an instance has been deleted.
* Signals the {@link ThreadDispatcher} that an instance has been deleted.
* <p>
* WARNING: should be automatically done by the {@link InstanceManager}.
*
* @param instance the instance
*/
public void signalInstanceDelete(Instance instance) {
this.threadProvider.onInstanceDelete(instance);
this.threadDispatcher.onInstanceDelete(instance);
}
/**
* Signals the {@link ThreadProvider} that a chunk has been loaded.
* Signals the {@link ThreadDispatcher} that a chunk has been loaded.
* <p>
* WARNING: should be automatically done by the {@link Instance} implementation.
*
* @param chunk the loaded chunk
*/
public void signalChunkLoad(@NotNull Chunk chunk) {
this.threadProvider.onChunkLoad(chunk);
this.threadDispatcher.onChunkLoad(chunk);
}
/**
* Signals the {@link ThreadProvider} that a chunk has been unloaded.
* Signals the {@link ThreadDispatcher} that a chunk has been unloaded.
* <p>
* WARNING: should be automatically done by the {@link Instance} implementation.
*
* @param chunk the unloaded chunk
*/
public void signalChunkUnload(@NotNull Chunk chunk) {
this.threadProvider.onChunkUnload(chunk);
this.threadDispatcher.onChunkUnload(chunk);
}
/**

View File

@ -1,7 +1,7 @@
package net.minestom.server.acquirable;
import net.minestom.server.entity.Entity;
import net.minestom.server.thread.ThreadProvider;
import net.minestom.server.thread.ThreadDispatcher;
import net.minestom.server.thread.TickThread;
import net.minestom.server.utils.async.AsyncUtils;
import org.jetbrains.annotations.ApiStatus;
@ -36,7 +36,7 @@ public interface Acquirable<T> {
* @param entries the new chunk entries
*/
@ApiStatus.Internal
static void refreshEntries(@NotNull Collection<ThreadProvider.ChunkEntry> entries) {
static void refreshEntries(@NotNull Collection<ThreadDispatcher.ChunkEntry> entries) {
AcquirableImpl.ENTRIES.set(entries);
}
@ -157,19 +157,19 @@ public interface Acquirable<T> {
@NotNull Handler getHandler();
final class Handler {
private volatile ThreadProvider.ChunkEntry chunkEntry;
private volatile ThreadDispatcher.ChunkEntry chunkEntry;
public ThreadProvider.ChunkEntry getChunkEntry() {
public ThreadDispatcher.ChunkEntry getChunkEntry() {
return chunkEntry;
}
@ApiStatus.Internal
public void refreshChunkEntry(@NotNull ThreadProvider.ChunkEntry chunkEntry) {
public void refreshChunkEntry(@NotNull ThreadDispatcher.ChunkEntry chunkEntry) {
this.chunkEntry = chunkEntry;
}
public TickThread getTickThread() {
final ThreadProvider.ChunkEntry entry = this.chunkEntry;
final ThreadDispatcher.ChunkEntry entry = this.chunkEntry;
return entry != null ? entry.getThread() : null;
}
}

View File

@ -1,6 +1,6 @@
package net.minestom.server.acquirable;
import net.minestom.server.thread.ThreadProvider;
import net.minestom.server.thread.ThreadDispatcher;
import net.minestom.server.thread.TickThread;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -11,7 +11,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
final class AcquirableImpl<T> implements Acquirable<T> {
static final ThreadLocal<Collection<ThreadProvider.ChunkEntry>> ENTRIES = ThreadLocal.withInitial(Collections::emptySet);
static final ThreadLocal<Collection<ThreadDispatcher.ChunkEntry>> ENTRIES = ThreadLocal.withInitial(Collections::emptySet);
static final AtomicLong WAIT_COUNTER_NANO = new AtomicLong();
/**

View File

@ -33,7 +33,6 @@ import net.minestom.server.potion.PotionEffect;
import net.minestom.server.potion.TimedPotion;
import net.minestom.server.tag.Tag;
import net.minestom.server.tag.TagHandler;
import net.minestom.server.thread.ThreadProvider;
import net.minestom.server.utils.PacketUtils;
import net.minestom.server.utils.async.AsyncUtils;
import net.minestom.server.utils.block.BlockIterator;
@ -164,7 +163,6 @@ public class Entity implements Viewable, Tickable, TagHandler, PermissionHandler
/**
* Schedules a task to be run during the next entity tick.
* It ensures that the task will be executed in the same thread as the entity (depending of the {@link ThreadProvider}).
*
* @param callback the task to execute during the next entity tick
*/

View File

@ -26,7 +26,6 @@ import net.minestom.server.network.packet.server.play.BlockActionPacket;
import net.minestom.server.network.packet.server.play.TimeUpdatePacket;
import net.minestom.server.tag.Tag;
import net.minestom.server.tag.TagHandler;
import net.minestom.server.thread.ThreadProvider;
import net.minestom.server.utils.PacketUtils;
import net.minestom.server.utils.chunk.ChunkUtils;
import net.minestom.server.utils.entity.EntityUtils;
@ -129,8 +128,6 @@ public abstract class Instance implements BlockGetter, BlockSetter, Tickable, Ta
/**
* Schedules a task to be run during the next instance tick.
* It ensures that the task will be executed in the same thread as the instance
* and its chunks/entities (depending of the {@link ThreadProvider}).
*
* @param callback the task to execute during the next instance tick
*/

View File

@ -1,28 +0,0 @@
package net.minestom.server.thread;
import net.minestom.server.instance.Chunk;
import org.jetbrains.annotations.NotNull;
/**
* Each {@link Chunk} gets assigned to a random thread.
*/
public class PerChunkThreadProvider extends ThreadProvider {
public PerChunkThreadProvider(int threadCount) {
super(threadCount);
}
public PerChunkThreadProvider() {
super();
}
@Override
public int findThread(@NotNull Chunk chunk) {
return chunk.hashCode();
}
@Override
public @NotNull RefreshType getChunkRefreshType() {
return RefreshType.NEVER;
}
}

View File

@ -1,29 +0,0 @@
package net.minestom.server.thread;
import net.minestom.server.instance.Chunk;
import net.minestom.server.instance.Instance;
import org.jetbrains.annotations.NotNull;
/**
* Each {@link Instance} gets assigned to a random thread.
*/
public class PerInstanceThreadProvider extends ThreadProvider {
public PerInstanceThreadProvider(int threadCount) {
super(threadCount);
}
public PerInstanceThreadProvider() {
super();
}
@Override
public int findThread(@NotNull Chunk chunk) {
return chunk.getInstance().hashCode();
}
@Override
public @NotNull RefreshType getChunkRefreshType() {
return RefreshType.NEVER;
}
}

View File

@ -1,24 +0,0 @@
package net.minestom.server.thread;
import net.minestom.server.instance.Chunk;
import org.jetbrains.annotations.NotNull;
/**
* Uses a single thread for all chunks.
*/
public class SingleThreadProvider extends ThreadProvider {
public SingleThreadProvider() {
super(1);
}
@Override
public int findThread(@NotNull Chunk chunk) {
return 0;
}
@Override
public @NotNull RefreshType getChunkRefreshType() {
return RefreshType.NEVER;
}
}

View File

@ -0,0 +1,325 @@
package net.minestom.server.thread;
import net.minestom.server.MinecraftServer;
import net.minestom.server.acquirable.Acquirable;
import net.minestom.server.entity.Entity;
import net.minestom.server.instance.Chunk;
import net.minestom.server.instance.Instance;
import net.minestom.server.utils.MathUtils;
import net.minestom.server.utils.chunk.ChunkUtils;
import org.jetbrains.annotations.NotNull;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Phaser;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
/**
* Used to link chunks into multiple groups.
* Then executed into a thread pool.
*/
public final class ThreadDispatcher {
private final ThreadProvider provider;
private final List<TickThread> threads;
private final Map<TickThread, Set<ChunkEntry>> threadChunkMap = new HashMap<>();
private final Map<Chunk, ChunkEntry> chunkEntryMap = new HashMap<>();
private final ArrayDeque<Chunk> chunkUpdateQueue = new ArrayDeque<>();
private final Queue<Chunk> chunkLoadRequests = new ConcurrentLinkedQueue<>();
private final Queue<Chunk> chunkUnloadRequests = new ConcurrentLinkedQueue<>();
private final Queue<Entity> entityUpdateRequests = new ConcurrentLinkedQueue<>();
private final Queue<Entity> entityRemovalRequests = new ConcurrentLinkedQueue<>();
private final Phaser phaser = new Phaser(1);
private ThreadDispatcher(ThreadProvider provider, int threadCount) {
this.provider = provider;
this.threads = new ArrayList<>(threadCount);
for (int i = 0; i < threadCount; i++) {
final TickThread.BatchRunnable batchRunnable = new TickThread.BatchRunnable();
final TickThread tickThread = new TickThread(batchRunnable, i);
this.threads.add(tickThread);
tickThread.start();
}
}
public static @NotNull ThreadDispatcher of(@NotNull ThreadProvider provider, int threadCount) {
return new ThreadDispatcher(provider, threadCount);
}
public static @NotNull ThreadDispatcher singleThread() {
return of(ThreadProvider.SINGLE, 1);
}
/**
* Represents the maximum percentage of tick time that can be spent refreshing chunks thread.
* <p>
* Percentage based on {@link MinecraftServer#TICK_MS}.
*
* @return the refresh percentage
*/
public float getRefreshPercentage() {
return 0.3f;
}
/**
* Minimum time used to refresh chunks and entities thread.
*
* @return the minimum refresh time in milliseconds
*/
public int getMinimumRefreshTime() {
return 3;
}
/**
* Maximum time used to refresh chunks and entities thread.
*
* @return the maximum refresh time in milliseconds
*/
public int getMaximumRefreshTime() {
return (int) (MinecraftServer.TICK_MS * 0.3);
}
/**
* Prepares the update by creating the {@link TickThread} tasks.
*
* @param time the tick time in milliseconds
*/
public void updateAndAwait(long time) {
for (var entry : threadChunkMap.entrySet()) {
final TickThread thread = entry.getKey();
final Set<ChunkEntry> chunkEntries = entry.getValue();
if (chunkEntries == null || chunkEntries.isEmpty()) {
// Nothing to tick
continue;
}
// Execute tick
this.phaser.register();
thread.runnable.startTick(phaser, () -> {
Acquirable.refreshEntries(chunkEntries);
final ReentrantLock lock = thread.getLock();
lock.lock();
for (ChunkEntry chunkEntry : chunkEntries) {
final Chunk chunk = chunkEntry.chunk;
if (!ChunkUtils.isLoaded(chunk)) return;
try {
chunk.tick(time);
} catch (Throwable e) {
MinecraftServer.getExceptionManager().handleException(e);
}
final List<Entity> entities = chunkEntry.entities;
if (!entities.isEmpty()) {
for (Entity entity : entities) {
if (lock.hasQueuedThreads()) {
lock.unlock();
// #acquire() callbacks should be called here
lock.lock();
}
try {
entity.tick(time);
} catch (Throwable e) {
MinecraftServer.getExceptionManager().handleException(e);
}
}
}
}
lock.unlock();
// #acquire() callbacks
});
}
this.phaser.arriveAndAwaitAdvance();
}
/**
* Called at the end of each tick to clear removed entities,
* refresh the chunk linked to an entity, and chunk threads based on {@link ThreadProvider#findThread(Chunk)}.
*
* @param tickTime the duration of the tick in ms,
* used to ensure that the refresh does not take more time than the tick itself
*/
public void refreshThreads(long tickTime) {
processLoadedChunks();
processUnloadedChunks();
processRemovedEntities();
processUpdatedEntities();
if (provider.getChunkRefreshType() == ThreadProvider.RefreshType.NEVER)
return;
final int timeOffset = MathUtils.clamp((int) ((double) tickTime * getRefreshPercentage()),
getMinimumRefreshTime(), getMaximumRefreshTime());
final long endTime = System.currentTimeMillis() + timeOffset;
final int size = chunkUpdateQueue.size();
int counter = 0;
while (true) {
final Chunk chunk = chunkUpdateQueue.pollFirst();
if (!ChunkUtils.isLoaded(chunk)) {
removeChunk(chunk);
continue;
}
// Update chunk threads
switchChunk(chunk);
// Add back to the deque
chunkUpdateQueue.addLast(chunk);
if (++counter > size || System.currentTimeMillis() >= endTime)
break;
}
}
/**
* Shutdowns all the {@link TickThread tick threads}.
* <p>
* Action is irreversible.
*/
public void shutdown() {
this.threads.forEach(TickThread::shutdown);
}
public void onInstanceCreate(@NotNull Instance instance) {
instance.getChunks().forEach(this::onChunkLoad);
}
public void onInstanceDelete(@NotNull Instance instance) {
instance.getChunks().forEach(this::onChunkUnload);
}
public void onChunkLoad(Chunk chunk) {
this.chunkLoadRequests.add(chunk);
}
public void onChunkUnload(Chunk chunk) {
this.chunkUnloadRequests.add(chunk);
}
public void updateEntity(@NotNull Entity entity) {
this.entityUpdateRequests.add(entity);
}
public void removeEntity(@NotNull Entity entity) {
this.entityRemovalRequests.add(entity);
}
private void switchChunk(@NotNull Chunk chunk) {
ChunkEntry chunkEntry = chunkEntryMap.get(chunk);
if (chunkEntry == null) return;
Set<ChunkEntry> chunks = threadChunkMap.get(chunkEntry.thread);
if (chunks == null || chunks.isEmpty()) return;
chunks.remove(chunkEntry);
setChunkThread(chunk, tickThread -> {
chunkEntry.thread = tickThread;
return chunkEntry;
});
}
private @NotNull ChunkEntry setChunkThread(@NotNull Chunk chunk,
@NotNull Function<TickThread, ChunkEntry> chunkEntrySupplier) {
final int threadId = Math.abs(provider.findThread(chunk)) % threads.size();
TickThread thread = threads.get(threadId);
Set<ChunkEntry> chunks = threadChunkMap.computeIfAbsent(thread, tickThread -> new HashSet<>());
ChunkEntry chunkEntry = chunkEntrySupplier.apply(thread);
chunks.add(chunkEntry);
return chunkEntry;
}
private void removeChunk(Chunk chunk) {
ChunkEntry chunkEntry = chunkEntryMap.get(chunk);
if (chunkEntry != null) {
TickThread thread = chunkEntry.thread;
Set<ChunkEntry> chunks = threadChunkMap.get(thread);
if (chunks != null) {
chunks.remove(chunkEntry);
}
chunkEntryMap.remove(chunk);
}
this.chunkUpdateQueue.remove(chunk);
}
private void processLoadedChunks() {
Chunk chunk;
while ((chunk = chunkLoadRequests.poll()) != null) {
Chunk finalChunk = chunk;
ChunkEntry chunkEntry = setChunkThread(chunk, (thread) -> new ChunkEntry(thread, finalChunk));
this.chunkEntryMap.put(chunk, chunkEntry);
this.chunkUpdateQueue.add(chunk);
}
}
private void processUnloadedChunks() {
Chunk chunk;
while ((chunk = chunkUnloadRequests.poll()) != null) {
removeChunk(chunk);
}
}
private void processRemovedEntities() {
Entity entity;
while ((entity = entityRemovalRequests.poll()) != null) {
var acquirableEntity = entity.getAcquirable();
ChunkEntry chunkEntry = acquirableEntity.getHandler().getChunkEntry();
if (chunkEntry != null) {
chunkEntry.entities.remove(entity);
}
}
}
private void processUpdatedEntities() {
Entity entity;
while ((entity = entityUpdateRequests.poll()) != null) {
ChunkEntry chunkEntry;
var acquirableEntity = entity.getAcquirable();
chunkEntry = acquirableEntity.getHandler().getChunkEntry();
// Remove from previous list
if (chunkEntry != null) {
chunkEntry.entities.remove(entity);
}
// Add to new list
chunkEntry = chunkEntryMap.get(entity.getChunk());
if (chunkEntry != null) {
chunkEntry.entities.add(entity);
acquirableEntity.getHandler().refreshChunkEntry(chunkEntry);
}
}
}
public static final class ChunkEntry {
private volatile TickThread thread;
private final Chunk chunk;
private final List<Entity> entities = new ArrayList<>();
private ChunkEntry(TickThread thread, Chunk chunk) {
this.thread = thread;
this.chunk = chunk;
}
public @NotNull TickThread getThread() {
return thread;
}
public @NotNull Chunk getChunk() {
return chunk;
}
public @NotNull List<Entity> getEntities() {
return entities;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ChunkEntry that = (ChunkEntry) o;
return chunk.equals(that.chunk);
}
@Override
public int hashCode() {
return Objects.hash(chunk);
}
}
}

View File

@ -1,317 +1,36 @@
package net.minestom.server.thread;
import net.minestom.server.MinecraftServer;
import net.minestom.server.acquirable.Acquirable;
import net.minestom.server.entity.Entity;
import net.minestom.server.instance.Chunk;
import net.minestom.server.instance.Instance;
import net.minestom.server.utils.MathUtils;
import net.minestom.server.utils.chunk.ChunkUtils;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Phaser;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
/**
* Used to link chunks into multiple groups.
* Then executed into a thread pool.
*/
public abstract class ThreadProvider {
private final List<TickThread> threads;
private final Map<TickThread, Set<ChunkEntry>> threadChunkMap = new HashMap<>();
private final Map<Chunk, ChunkEntry> chunkEntryMap = new HashMap<>();
// Iterated over to refresh the thread used to update entities & chunks
private final ArrayDeque<Chunk> chunks = new ArrayDeque<>();
private final Set<Entity> updatableEntities = ConcurrentHashMap.newKeySet();
private final Set<Entity> removedEntities = ConcurrentHashMap.newKeySet();
private final Phaser phaser = new Phaser(1);
public ThreadProvider(int threadCount) {
this.threads = new ArrayList<>(threadCount);
for (int i = 0; i < threadCount; i++) {
final TickThread.BatchRunnable batchRunnable = new TickThread.BatchRunnable();
final TickThread tickThread = new TickThread(batchRunnable, i);
this.threads.add(tickThread);
tickThread.start();
}
}
public ThreadProvider() {
this(Runtime.getRuntime().availableProcessors());
}
public synchronized void onInstanceCreate(@NotNull Instance instance) {
instance.getChunks().forEach(this::addChunk);
}
public synchronized void onInstanceDelete(@NotNull Instance instance) {
instance.getChunks().forEach(this::removeChunk);
}
public synchronized void onChunkLoad(Chunk chunk) {
addChunk(chunk);
}
public synchronized void onChunkUnload(Chunk chunk) {
removeChunk(chunk);
}
@FunctionalInterface
@ApiStatus.Experimental
public interface ThreadProvider {
ThreadProvider PER_CHUNk = Object::hashCode;
ThreadProvider PER_INSTANCE = chunk -> chunk.getInstance().hashCode();
ThreadProvider SINGLE = chunk -> 0;
/**
* Performs a server tick for all chunks based on their linked thread.
*
* @param chunk the chunk
*/
public abstract int findThread(@NotNull Chunk chunk);
int findThread(@NotNull Chunk chunk);
/**
* Defines how often chunks thread should be updated.
*
* @return the refresh type
*/
public @NotNull RefreshType getChunkRefreshType() {
return RefreshType.CONSTANT;
}
/**
* Represents the maximum percentage of tick time that can be spent refreshing chunks thread.
* <p>
* Percentage based on {@link MinecraftServer#TICK_MS}.
*
* @return the refresh percentage
*/
public float getRefreshPercentage() {
return 0.3f;
}
/**
* Minimum time used to refresh chunks and entities thread.
*
* @return the minimum refresh time in milliseconds
*/
public int getMinimumRefreshTime() {
return 3;
}
/**
* Maximum time used to refresh chunks and entities thread.
*
* @return the maximum refresh time in milliseconds
*/
public int getMaximumRefreshTime() {
return (int) (MinecraftServer.TICK_MS * 0.3);
}
/**
* Prepares the update by creating the {@link TickThread} tasks.
*
* @param time the tick time in milliseconds
*/
public void updateAndAwait(long time) {
for (var entry : threadChunkMap.entrySet()) {
final TickThread thread = entry.getKey();
final var chunkEntries = entry.getValue();
if (chunkEntries == null || chunkEntries.isEmpty()) {
// Nothing to tick
continue;
}
// Execute tick
this.phaser.register();
thread.runnable.startTick(phaser, () -> {
Acquirable.refreshEntries(chunkEntries);
final ReentrantLock lock = thread.getLock();
lock.lock();
for (ChunkEntry chunkEntry : chunkEntries) {
final Chunk chunk = chunkEntry.chunk;
if (!ChunkUtils.isLoaded(chunk)) return;
try {
chunk.tick(time);
} catch (Throwable e) {
MinecraftServer.getExceptionManager().handleException(e);
}
final List<Entity> entities = chunkEntry.entities;
if (!entities.isEmpty()) {
for (Entity entity : entities) {
if (lock.hasQueuedThreads()) {
lock.unlock();
// #acquire() callbacks should be called here
lock.lock();
}
try {
entity.tick(time);
} catch (Throwable e) {
MinecraftServer.getExceptionManager().handleException(e);
}
}
}
}
lock.unlock();
// #acquire() callbacks
});
}
this.phaser.arriveAndAwaitAdvance();
}
/**
* Called at the end of each tick to clear removed entities,
* refresh the chunk linked to an entity, and chunk threads based on {@link #findThread(Chunk)}.
*
* @param tickTime the duration of the tick in ms,
* used to ensure that the refresh does not take more time than the tick itself
*/
public synchronized void refreshThreads(long tickTime) {
// Clear removed entities
processRemovedEntities();
// Update entities chunks
processUpdatedEntities();
if (getChunkRefreshType() == RefreshType.NEVER)
return;
final int timeOffset = MathUtils.clamp((int) ((double) tickTime * getRefreshPercentage()),
getMinimumRefreshTime(), getMaximumRefreshTime());
final long endTime = System.currentTimeMillis() + timeOffset;
final int size = chunks.size();
int counter = 0;
while (true) {
final Chunk chunk = chunks.pollFirst();
if (!ChunkUtils.isLoaded(chunk)) {
removeChunk(chunk);
continue;
}
// Update chunk threads
switchChunk(chunk);
// Add back to the deque
chunks.addLast(chunk);
if (++counter > size)
break;
if (System.currentTimeMillis() >= endTime)
break;
}
}
/**
* Add an entity into the waiting list to get assigned in a thread.
* <p>
* Called when entering a new chunk.
*
* @param entity the entity to add
*/
public void updateEntity(@NotNull Entity entity) {
this.updatableEntities.add(entity);
}
/**
* Add an entity into the waiting list to get removed from its thread.
* <p>
* Called in {@link Entity#remove()}.
*
* @param entity the entity to remove
*/
public void removeEntity(@NotNull Entity entity) {
this.removedEntities.add(entity);
}
/**
* Shutdowns all the {@link TickThread tick threads}.
* <p>
* Action is irreversible.
*/
public void shutdown() {
this.threads.forEach(TickThread::shutdown);
}
private void addChunk(@NotNull Chunk chunk) {
ChunkEntry chunkEntry = setChunkThread(chunk, (thread) -> new ChunkEntry(thread, chunk));
this.chunkEntryMap.put(chunk, chunkEntry);
this.chunks.add(chunk);
}
private void switchChunk(@NotNull Chunk chunk) {
ChunkEntry chunkEntry = chunkEntryMap.get(chunk);
if (chunkEntry == null)
return;
var chunks = threadChunkMap.get(chunkEntry.thread);
if (chunks == null || chunks.isEmpty())
return;
chunks.remove(chunkEntry);
setChunkThread(chunk, tickThread -> {
chunkEntry.thread = tickThread;
return chunkEntry;
});
}
private @NotNull ChunkEntry setChunkThread(@NotNull Chunk chunk,
@NotNull Function<TickThread, ChunkEntry> chunkEntrySupplier) {
final int threadId = Math.abs(findThread(chunk)) % threads.size();
TickThread thread = threads.get(threadId);
var chunks = threadChunkMap.computeIfAbsent(thread, tickThread -> ConcurrentHashMap.newKeySet());
ChunkEntry chunkEntry = chunkEntrySupplier.apply(thread);
chunks.add(chunkEntry);
return chunkEntry;
}
private void removeChunk(Chunk chunk) {
ChunkEntry chunkEntry = chunkEntryMap.get(chunk);
if (chunkEntry != null) {
TickThread thread = chunkEntry.thread;
var chunks = threadChunkMap.get(thread);
if (chunks != null) {
chunks.remove(chunkEntry);
}
chunkEntryMap.remove(chunk);
}
this.chunks.remove(chunk);
}
private void processRemovedEntities() {
if (removedEntities.isEmpty())
return;
for (Entity entity : removedEntities) {
var acquirableEntity = entity.getAcquirable();
ChunkEntry chunkEntry = acquirableEntity.getHandler().getChunkEntry();
// Remove from list
if (chunkEntry != null) {
chunkEntry.entities.remove(entity);
}
}
this.removedEntities.clear();
}
private void processUpdatedEntities() {
if (updatableEntities.isEmpty())
return;
for (Entity entity : updatableEntities) {
var acquirableEntity = entity.getAcquirable();
ChunkEntry handlerChunkEntry = acquirableEntity.getHandler().getChunkEntry();
// Remove from previous list
if (handlerChunkEntry != null) {
handlerChunkEntry.entities.remove(entity);
}
// Add to new list
ChunkEntry chunkEntry = chunkEntryMap.get(entity.getChunk());
if (chunkEntry != null) {
chunkEntry.entities.add(entity);
acquirableEntity.getHandler().refreshChunkEntry(chunkEntry);
}
}
this.updatableEntities.clear();
default @NotNull RefreshType getChunkRefreshType() {
return RefreshType.NEVER;
}
/**
* Defines how often chunks thread should be refreshed.
*/
public enum RefreshType {
enum RefreshType {
/**
* Chunk thread is constant after being defined.
*/
@ -325,40 +44,4 @@ public abstract class ThreadProvider {
*/
RARELY
}
public static class ChunkEntry {
private volatile TickThread thread;
private final Chunk chunk;
private final List<Entity> entities = new ArrayList<>();
private ChunkEntry(TickThread thread, Chunk chunk) {
this.thread = thread;
this.chunk = chunk;
}
public @NotNull TickThread getThread() {
return thread;
}
public @NotNull Chunk getChunk() {
return chunk;
}
public @NotNull List<Entity> getEntities() {
return entities;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ChunkEntry that = (ChunkEntry) o;
return chunk.equals(that.chunk);
}
@Override
public int hashCode() {
return Objects.hash(chunk);
}
}
}

View File

@ -12,7 +12,7 @@ import java.util.concurrent.locks.ReentrantLock;
/**
* Thread responsible for ticking {@link net.minestom.server.instance.Chunk chunks} and {@link net.minestom.server.entity.Entity entities}.
* <p>
* Created in {@link ThreadProvider}, and awaken every tick with a task to execute.
* Created in {@link ThreadDispatcher}, and awaken every tick with a task to execute.
*/
public class TickThread extends Thread {