Per-chunk batch management

This commit is contained in:
TheMode 2021-04-14 20:32:02 +02:00
parent 9b8dd6e768
commit 356150847e
7 changed files with 96 additions and 287 deletions

View File

@ -4,6 +4,7 @@ import com.google.common.collect.Queues;
import net.minestom.server.instance.Chunk;
import net.minestom.server.instance.Instance;
import net.minestom.server.instance.InstanceManager;
import net.minestom.server.lock.Acquisition;
import net.minestom.server.monitoring.TickMonitor;
import net.minestom.server.network.ConnectionManager;
import net.minestom.server.network.player.NettyPlayerConnection;
@ -40,7 +41,7 @@ public final class UpdateManager {
{
// DEFAULT THREAD PROVIDER
//threadProvider = new PerGroupChunkProvider();
threadProvider = new PerInstanceThreadProvider();
threadProvider = new PerInstanceThreadProvider(2);
}
/**
@ -85,9 +86,13 @@ public final class UpdateManager {
// Monitoring
if (!tickMonitors.isEmpty()) {
// TODO use value
final double acquisitionTimeMs = Acquisition.getCurrentWaitMonitoring() / 1e6D;
final double tickTimeMs = tickTime / 1e6D;
final TickMonitor tickMonitor = new TickMonitor(tickTimeMs);
this.tickMonitors.forEach(consumer -> consumer.accept(tickMonitor));
Acquisition.resetWaitMonitoring();
}
// Flush all waiting packets
@ -108,21 +113,28 @@ public final class UpdateManager {
* @param tickStart the time of the tick in milliseconds
*/
private void serverTick(long tickStart) {
List<Future<?>> futures;
// Tick all instances
MinecraftServer.getInstanceManager().getInstances().forEach(instance ->
instance.tick(tickStart));
// Server tick (instance/chunk/entity)
// Synchronize with the update manager instance, like the signal for chunk load/unload
synchronized (this) {
futures = threadProvider.update(tickStart);
this.threadProvider.prepareUpdate(tickStart);
}
for (final Future<?> future : futures) {
try {
future.get();
} catch (Throwable e) {
MinecraftServer.getExceptionManager().handleException(e);
}
CountDownLatch countDownLatch = threadProvider.notifyThreads();
// Wait tick end
try {
countDownLatch.await();
} catch (InterruptedException e) {
MinecraftServer.getExceptionManager().handleException(e);
}
// Reset thread cost count
this.threadProvider.cleanup();
}
/**

View File

@ -1,64 +1,50 @@
package net.minestom.server.thread;
import net.minestom.server.MinecraftServer;
import net.minestom.server.instance.Chunk;
import net.minestom.server.instance.Instance;
import net.minestom.server.instance.InstanceManager;
import org.jetbrains.annotations.NotNull;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
/**
* Separates work between instance (1 instance = 1 thread execution).
*/
public class PerInstanceThreadProvider extends ThreadProvider {
private final Map<Instance, Set<Chunk>> instanceChunkMap = new ConcurrentHashMap<>();
private static final InstanceManager INSTANCE_MANAGER = MinecraftServer.getInstanceManager();
public PerInstanceThreadProvider(int threadCount) {
super(threadCount);
}
@Override
public void onInstanceCreate(@NotNull Instance instance) {
this.instanceChunkMap.putIfAbsent(instance, ConcurrentHashMap.newKeySet());
}
@Override
public void onInstanceDelete(@NotNull Instance instance) {
this.instanceChunkMap.remove(instance);
}
@Override
public void onChunkLoad(@NotNull Instance instance, @NotNull Chunk chunk) {
// Add the loaded chunk to the instance chunks list
Set<Chunk> chunks = getChunks(instance);
chunks.add(chunk);
public void onChunkLoad(@NotNull Instance instance, Chunk chunk) {
}
@Override
public void onChunkUnload(@NotNull Instance instance, @NotNull Chunk chunk) {
Set<Chunk> chunks = getChunks(instance);
chunks.remove(chunk);
public void onChunkUnload(@NotNull Instance instance, Chunk chunk) {
}
@NotNull
@Override
public List<Future<?>> update(long time) {
List<Future<?>> futures = new ArrayList<>();
public void update(long time) {
for (Instance instance : INSTANCE_MANAGER.getInstances()) {
createBatch(batchHandler -> {
instanceChunkMap.forEach((instance, chunks) -> futures.add(pool.submit(() -> {
// Tick instance
updateInstance(instance, time);
// Tick chunks
for (Chunk chunk : chunks) {
processChunkTick(instance, chunk, time);
}
})));
return futures;
for (Chunk chunk : instance.getChunks()) {
// Tick chunks & entities
batchHandler.updateChunk(chunk, time);
}
}, time);
}
}
private Set<Chunk> getChunks(Instance instance) {
return instanceChunkMap.computeIfAbsent(instance, inst -> ConcurrentHashMap.newKeySet());
}
}

View File

@ -1,56 +0,0 @@
package net.minestom.server.thread;
import net.minestom.server.instance.Chunk;
import net.minestom.server.instance.Instance;
import org.jetbrains.annotations.NotNull;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Future;
/**
* Simple thread provider implementation using a single thread to update all the instances and chunks.
*/
public class SingleThreadProvider extends ThreadProvider {
{
setThreadCount(1);
}
private final Set<Instance> instances = new CopyOnWriteArraySet<>();
@Override
public void onInstanceCreate(@NotNull Instance instance) {
this.instances.add(instance);
}
@Override
public void onInstanceDelete(@NotNull Instance instance) {
this.instances.remove(instance);
}
@Override
public void onChunkLoad(@NotNull Instance instance, @NotNull Chunk chunk) {
}
@Override
public void onChunkUnload(@NotNull Instance instance, @NotNull Chunk chunk) {
}
@NotNull
@Override
public List<Future<?>> update(long time) {
return Collections.singletonList(pool.submit(() -> {
for (Instance instance : instances) {
updateInstance(instance, time);
for (Chunk chunk : instance.getChunks()) {
processChunkTick(instance, chunk, time);
}
}
}));
}
}

View File

@ -4,7 +4,6 @@ import net.minestom.server.UpdateManager;
import net.minestom.server.instance.Chunk;
import net.minestom.server.instance.Instance;
import net.minestom.server.thread.batch.BatchHandler;
import net.minestom.server.thread.batch.BatchSetupHandler;
import net.minestom.server.utils.time.Cooldown;
import net.minestom.server.utils.time.TimeUnit;
import net.minestom.server.utils.time.UpdateOption;
@ -28,7 +27,7 @@ public abstract class ThreadProvider {
private final Set<BatchThread> threads;
private final List<BatchSetupHandler> batchHandlers = new ArrayList<>();
private final List<BatchHandler> batchHandlers = new ArrayList<>();
private UpdateOption batchesRefreshCooldown;
private long lastBatchRefreshTime;
@ -86,13 +85,13 @@ public abstract class ThreadProvider {
public abstract void update(long time);
public void createBatch(@NotNull Consumer<BatchHandler> consumer, long time) {
BatchSetupHandler batchSetupHandler = new BatchSetupHandler();
BatchHandler batchHandler = new BatchHandler();
consumer.accept(batchSetupHandler);
consumer.accept(batchHandler);
this.batchHandlers.add(batchSetupHandler);
this.batchHandlers.add(batchHandler);
batchSetupHandler.pushTask(threads, time);
batchHandler.pushTask(threads, time);
}
/**
@ -112,7 +111,7 @@ public abstract class ThreadProvider {
update(time);
} else {
// Push the tasks
for (BatchSetupHandler batchHandler : batchHandlers) {
for (BatchHandler batchHandler : batchHandlers) {
batchHandler.pushTask(threads, time);
}
}

View File

@ -1,100 +1,63 @@
package net.minestom.server.thread.batch;
import net.minestom.server.entity.Entity;
import net.minestom.server.instance.Chunk;
import net.minestom.server.instance.Instance;
import net.minestom.server.instance.InstanceContainer;
import net.minestom.server.instance.SharedInstance;
import net.minestom.server.utils.callback.validator.EntityValidator;
import net.minestom.server.utils.chunk.ChunkUtils;
import net.minestom.server.lock.Acquirable;
import net.minestom.server.thread.BatchThread;
import net.minestom.server.utils.validate.Check;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.function.Consumer;
import java.util.ArrayList;
import java.util.Set;
public interface BatchHandler {
public class BatchHandler {
// INSTANCE UPDATE
private final BatchInfo batchInfo = new BatchInfo();
/**
* Executes an instance tick.
*
* @param instance the instance
* @param time the current time in ms
*/
void updateInstance(@NotNull Instance instance, long time);
private final ArrayList<Chunk> chunks = new ArrayList<>();
private int estimatedCost;
/**
* Executes a chunk tick (blocks update).
*
* @param instance the chunk's instance
* @param chunk the chunk
* @param time the current time in ms
*/
void updateChunk(@NotNull Instance instance, @NotNull Chunk chunk, long time);
public void updateChunk(@NotNull Chunk chunk, long time) {
// Set the BatchInfo field
//Acquirable.Handler handler = acquirable.getHandler();
//handler.refreshBatchInfo(batchInfo);
/**
* Processes a whole tick for a chunk.
*
* @param instance the instance of the chunk
* @param chunkIndex the index of the chunk {@link ChunkUtils#getChunkIndex(int, int)}
* @param time the time of the update in milliseconds
*/
default void updateChunk(@NotNull Instance instance, long chunkIndex, long time) {
final int chunkX = ChunkUtils.getChunkCoordX(chunkIndex);
final int chunkZ = ChunkUtils.getChunkCoordZ(chunkIndex);
final Chunk chunk = instance.getChunk(chunkX, chunkZ);
updateChunk(instance, chunk, time);
this.chunks.add(chunk);
this.estimatedCost++;
}
// ENTITY UPDATE
public void pushTask(@NotNull Set<BatchThread> threads, long time) {
BatchThread fitThread = null;
int minCost = Integer.MAX_VALUE;
/**
* Executes an entity tick (all entities type creatures/objects/players) in an instance's chunk.
*
* @param instance the chunk's instance
* @param chunk the chunk
* @param time the current time in ms
*/
default void updateEntities(@NotNull Instance instance, @NotNull Chunk chunk, long time) {
conditionalEntityUpdate(instance, chunk, time, null);
}
/**
* Executes an entity tick in an instance's chunk if condition is verified.
*
* @param instance the chunk's instance
* @param chunk the chunk
* @param time the current time in ms
* @param condition the condition which confirm if the update happens or not
*/
void conditionalEntityUpdate(@NotNull Instance instance,
@NotNull Chunk chunk, long time,
@Nullable EntityValidator condition);
default boolean shouldTick(@NotNull Entity entity, @Nullable EntityValidator condition) {
return condition == null || condition.isValid(entity);
}
/**
* If {@code instance} is an {@link InstanceContainer}, run a callback for all of its
* {@link SharedInstance}.
*
* @param instance the instance
* @param callback the callback to run for all the {@link SharedInstance}
*/
default void updateSharedInstances(@NotNull Instance instance, @NotNull Consumer<SharedInstance> callback) {
if (instance instanceof InstanceContainer) {
final InstanceContainer instanceContainer = (InstanceContainer) instance;
if (!instanceContainer.hasSharedInstances())
return;
for (SharedInstance sharedInstance : instanceContainer.getSharedInstances()) {
callback.accept(sharedInstance);
// Find the thread with the lowest number of tasks
for (BatchThread thread : threads) {
final boolean switchThread = fitThread == null || thread.getCost() < minCost;
if (switchThread) {
fitThread = thread;
minCost = thread.getCost();
}
}
Check.notNull(fitThread, "The task thread returned null, something went terribly wrong.");
// The thread has been decided
this.batchInfo.refreshThread(fitThread);
// Create the runnable and send it to the thread for execution in the next tick
final Runnable runnable = createRunnable(time);
fitThread.addRunnable(runnable, estimatedCost);
}
@NotNull
private Runnable createRunnable(long time) {
return () -> {
for (Chunk chunk : chunks) {
chunk.tick(time);
chunk.getInstance().getEntities().forEach(entity -> {
entity.tick(time);
});
}
};
}
}

View File

@ -22,7 +22,7 @@ public class BatchInfo {
/**
* Specifies in which thread this element will be updated.
* Currently defined before every tick for all game elements in {@link BatchSetupHandler#pushTask(Set, long)}.
* Currently defined before every tick for all game elements in {@link BatchHandler#pushTask(Set, long)}.
*
* @param batchThread the thread where this element will be updated
*/

View File

@ -1,95 +0,0 @@
package net.minestom.server.thread.batch;
import net.minestom.server.Tickable;
import net.minestom.server.entity.Entity;
import net.minestom.server.instance.Chunk;
import net.minestom.server.instance.Instance;
import net.minestom.server.lock.Acquirable;
import net.minestom.server.thread.BatchThread;
import net.minestom.server.utils.callback.validator.EntityValidator;
import net.minestom.server.utils.validate.Check;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.ArrayList;
import java.util.Set;
public class BatchSetupHandler implements BatchHandler {
private static final int INSTANCE_COST = 5;
private static final int CHUNK_COST = 5;
private static final int ENTITY_COST = 5;
private final BatchInfo batchInfo = new BatchInfo();
private final ArrayList<Acquirable<?>> elements = new ArrayList<>();
private int estimatedCost;
@Override
public void updateInstance(@NotNull Instance instance, long time) {
addAcquirable(instance.getAcquiredElement(), INSTANCE_COST);
}
@Override
public void updateChunk(@NotNull Instance instance, @NotNull Chunk chunk, long time) {
addAcquirable(chunk.getAcquiredElement(), CHUNK_COST);
}
@Override
public void conditionalEntityUpdate(@NotNull Instance instance, @NotNull Chunk chunk, long time,
@Nullable EntityValidator condition) {
final Set<Entity> entities = instance.getChunkEntities(chunk);
for (Entity entity : entities) {
if (shouldTick(entity, condition)) {
addAcquirable(entity.getAcquiredElement(), ENTITY_COST);
}
}
}
public void pushTask(@NotNull Set<BatchThread> threads, long time) {
BatchThread fitThread = null;
int minCost = Integer.MAX_VALUE;
// Find the thread with the lowest number of tasks
for (BatchThread thread : threads) {
final boolean switchThread = fitThread == null || thread.getCost() < minCost;
if (switchThread) {
fitThread = thread;
minCost = thread.getCost();
}
}
Check.notNull(fitThread, "The task thread returned null, something went terribly wrong.");
// The thread has been decided
this.batchInfo.refreshThread(fitThread);
// Create the runnable and send it to the thread for execution in the next tick
final Runnable runnable = createRunnable(time);
fitThread.addRunnable(runnable, estimatedCost);
}
@NotNull
private Runnable createRunnable(long time) {
return () -> {
for (Acquirable<?> element : elements) {
final Object unwrapElement = element.unwrap();
if (unwrapElement instanceof Tickable) {
((Tickable) unwrapElement).tick(time);
}
}
};
}
private void addAcquirable(Acquirable<?> acquirable, int estimatedCost) {
// Set the BatchInfo field
Acquirable.Handler handler = acquirable.getHandler();
handler.refreshBatchInfo(batchInfo);
this.elements.add(acquirable);
this.estimatedCost += estimatedCost;
}
}