From 356150847ec9479fd9a179ace2a01defeaaf032e Mon Sep 17 00:00:00 2001 From: TheMode Date: Wed, 14 Apr 2021 20:32:02 +0200 Subject: [PATCH] Per-chunk batch management --- .../net/minestom/server/UpdateManager.java | 30 +++-- .../thread/PerInstanceThreadProvider.java | 60 ++++----- .../server/thread/SingleThreadProvider.java | 56 -------- .../server/thread/ThreadProvider.java | 13 +- .../server/thread/batch/BatchHandler.java | 127 +++++++----------- .../server/thread/batch/BatchInfo.java | 2 +- .../thread/batch/BatchSetupHandler.java | 95 ------------- 7 files changed, 96 insertions(+), 287 deletions(-) delete mode 100644 src/main/java/net/minestom/server/thread/SingleThreadProvider.java delete mode 100644 src/main/java/net/minestom/server/thread/batch/BatchSetupHandler.java diff --git a/src/main/java/net/minestom/server/UpdateManager.java b/src/main/java/net/minestom/server/UpdateManager.java index 23b0020b0..1c724ecab 100644 --- a/src/main/java/net/minestom/server/UpdateManager.java +++ b/src/main/java/net/minestom/server/UpdateManager.java @@ -4,6 +4,7 @@ import com.google.common.collect.Queues; import net.minestom.server.instance.Chunk; import net.minestom.server.instance.Instance; import net.minestom.server.instance.InstanceManager; +import net.minestom.server.lock.Acquisition; import net.minestom.server.monitoring.TickMonitor; import net.minestom.server.network.ConnectionManager; import net.minestom.server.network.player.NettyPlayerConnection; @@ -40,7 +41,7 @@ public final class UpdateManager { { // DEFAULT THREAD PROVIDER //threadProvider = new PerGroupChunkProvider(); - threadProvider = new PerInstanceThreadProvider(); + threadProvider = new PerInstanceThreadProvider(2); } /** @@ -85,9 +86,13 @@ public final class UpdateManager { // Monitoring if (!tickMonitors.isEmpty()) { + // TODO use value + final double acquisitionTimeMs = Acquisition.getCurrentWaitMonitoring() / 1e6D; final double tickTimeMs = tickTime / 1e6D; final TickMonitor tickMonitor = new TickMonitor(tickTimeMs); this.tickMonitors.forEach(consumer -> consumer.accept(tickMonitor)); + + Acquisition.resetWaitMonitoring(); } // Flush all waiting packets @@ -108,21 +113,28 @@ public final class UpdateManager { * @param tickStart the time of the tick in milliseconds */ private void serverTick(long tickStart) { - List> futures; + + // Tick all instances + MinecraftServer.getInstanceManager().getInstances().forEach(instance -> + instance.tick(tickStart)); // Server tick (instance/chunk/entity) // Synchronize with the update manager instance, like the signal for chunk load/unload synchronized (this) { - futures = threadProvider.update(tickStart); + this.threadProvider.prepareUpdate(tickStart); } - for (final Future future : futures) { - try { - future.get(); - } catch (Throwable e) { - MinecraftServer.getExceptionManager().handleException(e); - } + CountDownLatch countDownLatch = threadProvider.notifyThreads(); + + // Wait tick end + try { + countDownLatch.await(); + } catch (InterruptedException e) { + MinecraftServer.getExceptionManager().handleException(e); } + + // Reset thread cost count + this.threadProvider.cleanup(); } /** diff --git a/src/main/java/net/minestom/server/thread/PerInstanceThreadProvider.java b/src/main/java/net/minestom/server/thread/PerInstanceThreadProvider.java index d1b3a9065..07b8687c7 100644 --- a/src/main/java/net/minestom/server/thread/PerInstanceThreadProvider.java +++ b/src/main/java/net/minestom/server/thread/PerInstanceThreadProvider.java @@ -1,64 +1,50 @@ package net.minestom.server.thread; +import net.minestom.server.MinecraftServer; import net.minestom.server.instance.Chunk; import net.minestom.server.instance.Instance; +import net.minestom.server.instance.InstanceManager; import org.jetbrains.annotations.NotNull; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; - -/** - * Separates work between instance (1 instance = 1 thread execution). - */ public class PerInstanceThreadProvider extends ThreadProvider { - private final Map> instanceChunkMap = new ConcurrentHashMap<>(); + private static final InstanceManager INSTANCE_MANAGER = MinecraftServer.getInstanceManager(); + + public PerInstanceThreadProvider(int threadCount) { + super(threadCount); + } @Override public void onInstanceCreate(@NotNull Instance instance) { - this.instanceChunkMap.putIfAbsent(instance, ConcurrentHashMap.newKeySet()); + } @Override public void onInstanceDelete(@NotNull Instance instance) { - this.instanceChunkMap.remove(instance); + } @Override - public void onChunkLoad(@NotNull Instance instance, @NotNull Chunk chunk) { - // Add the loaded chunk to the instance chunks list - Set chunks = getChunks(instance); - chunks.add(chunk); + public void onChunkLoad(@NotNull Instance instance, Chunk chunk) { + } @Override - public void onChunkUnload(@NotNull Instance instance, @NotNull Chunk chunk) { - Set chunks = getChunks(instance); - chunks.remove(chunk); + public void onChunkUnload(@NotNull Instance instance, Chunk chunk) { + } - @NotNull @Override - public List> update(long time) { - List> futures = new ArrayList<>(); + public void update(long time) { + for (Instance instance : INSTANCE_MANAGER.getInstances()) { + createBatch(batchHandler -> { - instanceChunkMap.forEach((instance, chunks) -> futures.add(pool.submit(() -> { - // Tick instance - updateInstance(instance, time); - // Tick chunks - for (Chunk chunk : chunks) { - processChunkTick(instance, chunk, time); - } - }))); - return futures; + for (Chunk chunk : instance.getChunks()) { + // Tick chunks & entities + batchHandler.updateChunk(chunk, time); + } + + }, time); + } } - - private Set getChunks(Instance instance) { - return instanceChunkMap.computeIfAbsent(instance, inst -> ConcurrentHashMap.newKeySet()); - } - } diff --git a/src/main/java/net/minestom/server/thread/SingleThreadProvider.java b/src/main/java/net/minestom/server/thread/SingleThreadProvider.java deleted file mode 100644 index d1c02082c..000000000 --- a/src/main/java/net/minestom/server/thread/SingleThreadProvider.java +++ /dev/null @@ -1,56 +0,0 @@ -package net.minestom.server.thread; - -import net.minestom.server.instance.Chunk; -import net.minestom.server.instance.Instance; -import org.jetbrains.annotations.NotNull; - -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.Future; - -/** - * Simple thread provider implementation using a single thread to update all the instances and chunks. - */ -public class SingleThreadProvider extends ThreadProvider { - - { - setThreadCount(1); - } - - private final Set instances = new CopyOnWriteArraySet<>(); - - @Override - public void onInstanceCreate(@NotNull Instance instance) { - this.instances.add(instance); - } - - @Override - public void onInstanceDelete(@NotNull Instance instance) { - this.instances.remove(instance); - } - - @Override - public void onChunkLoad(@NotNull Instance instance, @NotNull Chunk chunk) { - - } - - @Override - public void onChunkUnload(@NotNull Instance instance, @NotNull Chunk chunk) { - - } - - @NotNull - @Override - public List> update(long time) { - return Collections.singletonList(pool.submit(() -> { - for (Instance instance : instances) { - updateInstance(instance, time); - for (Chunk chunk : instance.getChunks()) { - processChunkTick(instance, chunk, time); - } - } - })); - } -} diff --git a/src/main/java/net/minestom/server/thread/ThreadProvider.java b/src/main/java/net/minestom/server/thread/ThreadProvider.java index fbee25987..5018b988a 100644 --- a/src/main/java/net/minestom/server/thread/ThreadProvider.java +++ b/src/main/java/net/minestom/server/thread/ThreadProvider.java @@ -4,7 +4,6 @@ import net.minestom.server.UpdateManager; import net.minestom.server.instance.Chunk; import net.minestom.server.instance.Instance; import net.minestom.server.thread.batch.BatchHandler; -import net.minestom.server.thread.batch.BatchSetupHandler; import net.minestom.server.utils.time.Cooldown; import net.minestom.server.utils.time.TimeUnit; import net.minestom.server.utils.time.UpdateOption; @@ -28,7 +27,7 @@ public abstract class ThreadProvider { private final Set threads; - private final List batchHandlers = new ArrayList<>(); + private final List batchHandlers = new ArrayList<>(); private UpdateOption batchesRefreshCooldown; private long lastBatchRefreshTime; @@ -86,13 +85,13 @@ public abstract class ThreadProvider { public abstract void update(long time); public void createBatch(@NotNull Consumer consumer, long time) { - BatchSetupHandler batchSetupHandler = new BatchSetupHandler(); + BatchHandler batchHandler = new BatchHandler(); - consumer.accept(batchSetupHandler); + consumer.accept(batchHandler); - this.batchHandlers.add(batchSetupHandler); + this.batchHandlers.add(batchHandler); - batchSetupHandler.pushTask(threads, time); + batchHandler.pushTask(threads, time); } /** @@ -112,7 +111,7 @@ public abstract class ThreadProvider { update(time); } else { // Push the tasks - for (BatchSetupHandler batchHandler : batchHandlers) { + for (BatchHandler batchHandler : batchHandlers) { batchHandler.pushTask(threads, time); } } diff --git a/src/main/java/net/minestom/server/thread/batch/BatchHandler.java b/src/main/java/net/minestom/server/thread/batch/BatchHandler.java index 32e7674a2..195a82861 100644 --- a/src/main/java/net/minestom/server/thread/batch/BatchHandler.java +++ b/src/main/java/net/minestom/server/thread/batch/BatchHandler.java @@ -1,100 +1,63 @@ package net.minestom.server.thread.batch; -import net.minestom.server.entity.Entity; import net.minestom.server.instance.Chunk; -import net.minestom.server.instance.Instance; -import net.minestom.server.instance.InstanceContainer; -import net.minestom.server.instance.SharedInstance; -import net.minestom.server.utils.callback.validator.EntityValidator; -import net.minestom.server.utils.chunk.ChunkUtils; +import net.minestom.server.lock.Acquirable; +import net.minestom.server.thread.BatchThread; +import net.minestom.server.utils.validate.Check; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import java.util.function.Consumer; +import java.util.ArrayList; +import java.util.Set; -public interface BatchHandler { +public class BatchHandler { - // INSTANCE UPDATE + private final BatchInfo batchInfo = new BatchInfo(); - /** - * Executes an instance tick. - * - * @param instance the instance - * @param time the current time in ms - */ - void updateInstance(@NotNull Instance instance, long time); + private final ArrayList chunks = new ArrayList<>(); + private int estimatedCost; - /** - * Executes a chunk tick (blocks update). - * - * @param instance the chunk's instance - * @param chunk the chunk - * @param time the current time in ms - */ - void updateChunk(@NotNull Instance instance, @NotNull Chunk chunk, long time); + public void updateChunk(@NotNull Chunk chunk, long time) { + // Set the BatchInfo field + //Acquirable.Handler handler = acquirable.getHandler(); + //handler.refreshBatchInfo(batchInfo); - /** - * Processes a whole tick for a chunk. - * - * @param instance the instance of the chunk - * @param chunkIndex the index of the chunk {@link ChunkUtils#getChunkIndex(int, int)} - * @param time the time of the update in milliseconds - */ - default void updateChunk(@NotNull Instance instance, long chunkIndex, long time) { - final int chunkX = ChunkUtils.getChunkCoordX(chunkIndex); - final int chunkZ = ChunkUtils.getChunkCoordZ(chunkIndex); - - final Chunk chunk = instance.getChunk(chunkX, chunkZ); - updateChunk(instance, chunk, time); + this.chunks.add(chunk); + this.estimatedCost++; } - // ENTITY UPDATE + public void pushTask(@NotNull Set threads, long time) { + BatchThread fitThread = null; + int minCost = Integer.MAX_VALUE; - /** - * Executes an entity tick (all entities type creatures/objects/players) in an instance's chunk. - * - * @param instance the chunk's instance - * @param chunk the chunk - * @param time the current time in ms - */ - default void updateEntities(@NotNull Instance instance, @NotNull Chunk chunk, long time) { - conditionalEntityUpdate(instance, chunk, time, null); - } - - /** - * Executes an entity tick in an instance's chunk if condition is verified. - * - * @param instance the chunk's instance - * @param chunk the chunk - * @param time the current time in ms - * @param condition the condition which confirm if the update happens or not - */ - void conditionalEntityUpdate(@NotNull Instance instance, - @NotNull Chunk chunk, long time, - @Nullable EntityValidator condition); - - default boolean shouldTick(@NotNull Entity entity, @Nullable EntityValidator condition) { - return condition == null || condition.isValid(entity); - } - - /** - * If {@code instance} is an {@link InstanceContainer}, run a callback for all of its - * {@link SharedInstance}. - * - * @param instance the instance - * @param callback the callback to run for all the {@link SharedInstance} - */ - default void updateSharedInstances(@NotNull Instance instance, @NotNull Consumer callback) { - if (instance instanceof InstanceContainer) { - final InstanceContainer instanceContainer = (InstanceContainer) instance; - - if (!instanceContainer.hasSharedInstances()) - return; - - for (SharedInstance sharedInstance : instanceContainer.getSharedInstances()) { - callback.accept(sharedInstance); + // Find the thread with the lowest number of tasks + for (BatchThread thread : threads) { + final boolean switchThread = fitThread == null || thread.getCost() < minCost; + if (switchThread) { + fitThread = thread; + minCost = thread.getCost(); } } + + Check.notNull(fitThread, "The task thread returned null, something went terribly wrong."); + + // The thread has been decided + this.batchInfo.refreshThread(fitThread); + + // Create the runnable and send it to the thread for execution in the next tick + final Runnable runnable = createRunnable(time); + fitThread.addRunnable(runnable, estimatedCost); + } + + @NotNull + private Runnable createRunnable(long time) { + return () -> { + for (Chunk chunk : chunks) { + chunk.tick(time); + chunk.getInstance().getEntities().forEach(entity -> { + entity.tick(time); + }); + } + }; } } \ No newline at end of file diff --git a/src/main/java/net/minestom/server/thread/batch/BatchInfo.java b/src/main/java/net/minestom/server/thread/batch/BatchInfo.java index 3e0779a41..7dff4d525 100644 --- a/src/main/java/net/minestom/server/thread/batch/BatchInfo.java +++ b/src/main/java/net/minestom/server/thread/batch/BatchInfo.java @@ -22,7 +22,7 @@ public class BatchInfo { /** * Specifies in which thread this element will be updated. - * Currently defined before every tick for all game elements in {@link BatchSetupHandler#pushTask(Set, long)}. + * Currently defined before every tick for all game elements in {@link BatchHandler#pushTask(Set, long)}. * * @param batchThread the thread where this element will be updated */ diff --git a/src/main/java/net/minestom/server/thread/batch/BatchSetupHandler.java b/src/main/java/net/minestom/server/thread/batch/BatchSetupHandler.java deleted file mode 100644 index 5bbb61ece..000000000 --- a/src/main/java/net/minestom/server/thread/batch/BatchSetupHandler.java +++ /dev/null @@ -1,95 +0,0 @@ -package net.minestom.server.thread.batch; - -import net.minestom.server.Tickable; -import net.minestom.server.entity.Entity; -import net.minestom.server.instance.Chunk; -import net.minestom.server.instance.Instance; -import net.minestom.server.lock.Acquirable; -import net.minestom.server.thread.BatchThread; -import net.minestom.server.utils.callback.validator.EntityValidator; -import net.minestom.server.utils.validate.Check; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; - -import java.util.ArrayList; -import java.util.Set; - -public class BatchSetupHandler implements BatchHandler { - - private static final int INSTANCE_COST = 5; - private static final int CHUNK_COST = 5; - private static final int ENTITY_COST = 5; - - private final BatchInfo batchInfo = new BatchInfo(); - - private final ArrayList> elements = new ArrayList<>(); - private int estimatedCost; - - @Override - public void updateInstance(@NotNull Instance instance, long time) { - addAcquirable(instance.getAcquiredElement(), INSTANCE_COST); - } - - @Override - public void updateChunk(@NotNull Instance instance, @NotNull Chunk chunk, long time) { - addAcquirable(chunk.getAcquiredElement(), CHUNK_COST); - } - - @Override - public void conditionalEntityUpdate(@NotNull Instance instance, @NotNull Chunk chunk, long time, - @Nullable EntityValidator condition) { - final Set entities = instance.getChunkEntities(chunk); - - for (Entity entity : entities) { - if (shouldTick(entity, condition)) { - addAcquirable(entity.getAcquiredElement(), ENTITY_COST); - } - } - } - - public void pushTask(@NotNull Set threads, long time) { - BatchThread fitThread = null; - int minCost = Integer.MAX_VALUE; - - // Find the thread with the lowest number of tasks - for (BatchThread thread : threads) { - final boolean switchThread = fitThread == null || thread.getCost() < minCost; - if (switchThread) { - fitThread = thread; - minCost = thread.getCost(); - } - } - - Check.notNull(fitThread, "The task thread returned null, something went terribly wrong."); - - // The thread has been decided - this.batchInfo.refreshThread(fitThread); - - // Create the runnable and send it to the thread for execution in the next tick - final Runnable runnable = createRunnable(time); - fitThread.addRunnable(runnable, estimatedCost); - } - - @NotNull - private Runnable createRunnable(long time) { - return () -> { - for (Acquirable element : elements) { - final Object unwrapElement = element.unwrap(); - - if (unwrapElement instanceof Tickable) { - ((Tickable) unwrapElement).tick(time); - } - } - }; - } - - private void addAcquirable(Acquirable acquirable, int estimatedCost) { - // Set the BatchInfo field - Acquirable.Handler handler = acquirable.getHandler(); - handler.refreshBatchInfo(batchInfo); - - this.elements.add(acquirable); - this.estimatedCost += estimatedCost; - } - -} \ No newline at end of file