WIP per-chunk thread

This commit is contained in:
TheMode 2021-04-14 21:29:37 +02:00
parent 356150847e
commit a55ea6d0c2
7 changed files with 59 additions and 212 deletions

View File

@ -41,7 +41,7 @@ public final class UpdateManager {
{ {
// DEFAULT THREAD PROVIDER // DEFAULT THREAD PROVIDER
//threadProvider = new PerGroupChunkProvider(); //threadProvider = new PerGroupChunkProvider();
threadProvider = new PerInstanceThreadProvider(2); threadProvider = new PerInstanceThreadProvider(4);
} }
/** /**
@ -132,9 +132,6 @@ public final class UpdateManager {
} catch (InterruptedException e) { } catch (InterruptedException e) {
MinecraftServer.getExceptionManager().handleException(e); MinecraftServer.getExceptionManager().handleException(e);
} }
// Reset thread cost count
this.threadProvider.cleanup();
} }
/** /**
@ -202,13 +199,12 @@ public final class UpdateManager {
* <p> * <p>
* WARNING: should be automatically done by the {@link Instance} implementation. * WARNING: should be automatically done by the {@link Instance} implementation.
* *
* @param instance the instance of the chunk
* @param chunk the loaded chunk * @param chunk the loaded chunk
*/ */
public synchronized void signalChunkLoad(Instance instance, @NotNull Chunk chunk) { public synchronized void signalChunkLoad(@NotNull Chunk chunk) {
if (this.threadProvider == null) if (this.threadProvider == null)
return; return;
this.threadProvider.onChunkLoad(instance, chunk); this.threadProvider.onChunkLoad(chunk);
} }
/** /**
@ -216,13 +212,12 @@ public final class UpdateManager {
* <p> * <p>
* WARNING: should be automatically done by the {@link Instance} implementation. * WARNING: should be automatically done by the {@link Instance} implementation.
* *
* @param instance the instance of the chunk
* @param chunk the unloaded chunk * @param chunk the unloaded chunk
*/ */
public synchronized void signalChunkUnload(Instance instance, @NotNull Chunk chunk) { public synchronized void signalChunkUnload(@NotNull Chunk chunk) {
if (this.threadProvider == null) if (this.threadProvider == null)
return; return;
this.threadProvider.onChunkUnload(instance, chunk); this.threadProvider.onChunkUnload(chunk);
} }
/** /**

View File

@ -506,7 +506,7 @@ public class InstanceContainer extends Instance {
protected void retrieveChunk(int chunkX, int chunkZ, @Nullable ChunkCallback callback) { protected void retrieveChunk(int chunkX, int chunkZ, @Nullable ChunkCallback callback) {
final boolean loaded = chunkLoader.loadChunk(this, chunkX, chunkZ, chunk -> { final boolean loaded = chunkLoader.loadChunk(this, chunkX, chunkZ, chunk -> {
cacheChunk(chunk); cacheChunk(chunk);
UPDATE_MANAGER.signalChunkLoad(this, chunk); UPDATE_MANAGER.signalChunkLoad(chunk);
// Execute callback and event in the instance thread // Execute callback and event in the instance thread
scheduleNextTick(instance -> { scheduleNextTick(instance -> {
callChunkLoadEvent(chunkX, chunkZ); callChunkLoadEvent(chunkX, chunkZ);
@ -544,7 +544,7 @@ public class InstanceContainer extends Instance {
OptionalCallback.execute(callback, chunk); OptionalCallback.execute(callback, chunk);
} }
UPDATE_MANAGER.signalChunkLoad(this, chunk); UPDATE_MANAGER.signalChunkLoad(chunk);
callChunkLoadEvent(chunkX, chunkZ); callChunkLoadEvent(chunkX, chunkZ);
} }
@ -641,7 +641,7 @@ public class InstanceContainer extends Instance {
final Chunk copiedChunk = chunk.copy(copiedInstance, chunkX, chunkZ); final Chunk copiedChunk = chunk.copy(copiedInstance, chunkX, chunkZ);
copiedInstance.cacheChunk(copiedChunk); copiedInstance.cacheChunk(copiedChunk);
UPDATE_MANAGER.signalChunkLoad(copiedInstance, copiedChunk); UPDATE_MANAGER.signalChunkLoad(copiedChunk);
} }
return copiedInstance; return copiedInstance;
@ -682,7 +682,7 @@ public class InstanceContainer extends Instance {
* Adds a {@link Chunk} to the internal instance map. * Adds a {@link Chunk} to the internal instance map.
* <p> * <p>
* WARNING: the chunk will not automatically be sent to players and * WARNING: the chunk will not automatically be sent to players and
* {@link net.minestom.server.UpdateManager#signalChunkLoad(Instance, Chunk)} must be called manually. * {@link net.minestom.server.UpdateManager#signalChunkLoad(Chunk)} must be called manually.
* *
* @param chunk the chunk to cache * @param chunk the chunk to cache
*/ */
@ -823,7 +823,7 @@ public class InstanceContainer extends Instance {
chunk.unload(); chunk.unload();
UPDATE_MANAGER.signalChunkUnload(this, chunk); UPDATE_MANAGER.signalChunkUnload(chunk);
} }
this.scheduledChunksToRemove.clear(); this.scheduledChunksToRemove.clear();
} }

View File

@ -29,7 +29,7 @@ public final class Acquisition {
// The goal of the contention service it is manage the situation where two threads are waiting for each other // The goal of the contention service it is manage the situation where two threads are waiting for each other
ACQUISITION_CONTENTION_SERVICE.scheduleAtFixedRate(() -> { ACQUISITION_CONTENTION_SERVICE.scheduleAtFixedRate(() -> {
final Set<BatchThread> threads = MinecraftServer.getUpdateManager().getThreadProvider().getThreads(); final List<BatchThread> threads = MinecraftServer.getUpdateManager().getThreadProvider().getThreads();
for (BatchThread batchThread : threads) { for (BatchThread batchThread : threads) {
final BatchThread waitingThread = (BatchThread) batchThread.getQueue().getWaitingThread(); final BatchThread waitingThread = (BatchThread) batchThread.getQueue().getWaitingThread();

View File

@ -15,8 +15,6 @@ public class BatchThread extends Thread {
private final BatchQueue queue; private final BatchQueue queue;
private int cost;
public BatchThread(@NotNull BatchRunnable runnable, int number) { public BatchThread(@NotNull BatchRunnable runnable, int number) {
super(runnable, MinecraftServer.THREAD_NAME_TICK + "-" + number); super(runnable, MinecraftServer.THREAD_NAME_TICK + "-" + number);
this.runnable = runnable; this.runnable = runnable;
@ -25,14 +23,6 @@ public class BatchThread extends Thread {
this.runnable.setLinkedThread(this); this.runnable.setLinkedThread(this);
} }
public int getCost() {
return cost;
}
public void setCost(int cost) {
this.cost = cost;
}
@NotNull @NotNull
public BatchRunnable getMainRunnable() { public BatchRunnable getMainRunnable() {
return runnable; return runnable;
@ -43,9 +33,8 @@ public class BatchThread extends Thread {
return queue; return queue;
} }
public void addRunnable(@NotNull Runnable runnable, int cost) { public void addRunnable(@NotNull Runnable runnable) {
this.runnable.queue.add(runnable); this.runnable.queue.add(runnable);
this.cost += cost;
} }
public void shutdown() { public void shutdown() {

View File

@ -1,50 +1,18 @@
package net.minestom.server.thread; package net.minestom.server.thread;
import net.minestom.server.MinecraftServer;
import net.minestom.server.instance.Chunk; import net.minestom.server.instance.Chunk;
import net.minestom.server.instance.Instance;
import net.minestom.server.instance.InstanceManager;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
public class PerInstanceThreadProvider extends ThreadProvider { import java.util.concurrent.ThreadLocalRandom;
private static final InstanceManager INSTANCE_MANAGER = MinecraftServer.getInstanceManager(); public class PerInstanceThreadProvider extends ThreadProvider {
public PerInstanceThreadProvider(int threadCount) { public PerInstanceThreadProvider(int threadCount) {
super(threadCount); super(threadCount);
} }
@Override @Override
public void onInstanceCreate(@NotNull Instance instance) { public int findThread(@NotNull Chunk chunk) {
return ThreadLocalRandom.current().nextInt(getThreads().size());
}
@Override
public void onInstanceDelete(@NotNull Instance instance) {
}
@Override
public void onChunkLoad(@NotNull Instance instance, Chunk chunk) {
}
@Override
public void onChunkUnload(@NotNull Instance instance, Chunk chunk) {
}
@Override
public void update(long time) {
for (Instance instance : INSTANCE_MANAGER.getInstances()) {
createBatch(batchHandler -> {
for (Chunk chunk : instance.getChunks()) {
// Tick chunks & entities
batchHandler.updateChunk(chunk, time);
}
}, time);
}
} }
} }

View File

@ -3,19 +3,10 @@ package net.minestom.server.thread;
import net.minestom.server.UpdateManager; import net.minestom.server.UpdateManager;
import net.minestom.server.instance.Chunk; import net.minestom.server.instance.Chunk;
import net.minestom.server.instance.Instance; import net.minestom.server.instance.Instance;
import net.minestom.server.thread.batch.BatchHandler;
import net.minestom.server.utils.time.Cooldown;
import net.minestom.server.utils.time.TimeUnit;
import net.minestom.server.utils.time.UpdateOption;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.ArrayList; import java.util.*;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
/** /**
* Used to link chunks into multiple groups. * Used to link chunks into multiple groups.
@ -25,16 +16,13 @@ import java.util.function.Consumer;
*/ */
public abstract class ThreadProvider { public abstract class ThreadProvider {
private final Set<BatchThread> threads; private final List<BatchThread> threads;
private final List<BatchHandler> batchHandlers = new ArrayList<>(); private final Map<Integer, List<Chunk>> threadChunkMap = new HashMap<>();
private final ArrayDeque<Chunk> batchHandlers = new ArrayDeque<>();
private UpdateOption batchesRefreshCooldown;
private long lastBatchRefreshTime;
public ThreadProvider(int threadCount) { public ThreadProvider(int threadCount) {
this.threads = new HashSet<>(threadCount); this.threads = new ArrayList<>(threadCount);
this.batchesRefreshCooldown = new UpdateOption(500, TimeUnit.MILLISECOND);
for (int i = 0; i < threadCount; i++) { for (int i = 0; i < threadCount; i++) {
final BatchThread.BatchRunnable batchRunnable = new BatchThread.BatchRunnable(); final BatchThread.BatchRunnable batchRunnable = new BatchThread.BatchRunnable();
@ -45,76 +33,61 @@ public abstract class ThreadProvider {
} }
} }
/** public void onInstanceCreate(@NotNull Instance instance) {
* Called when an {@link Instance} is registered. instance.getChunks().forEach(this::addChunk);
* }
* @param instance the newly create {@link Instance}
*/
public abstract void onInstanceCreate(@NotNull Instance instance);
/** public void onInstanceDelete(@NotNull Instance instance) {
* Called when an {@link Instance} is unregistered. instance.getChunks().forEach(this::removeChunk);
* }
* @param instance the deleted {@link Instance}
*/
public abstract void onInstanceDelete(@NotNull Instance instance);
/** public void onChunkLoad(Chunk chunk) {
* Called when a chunk is loaded. addChunk(chunk);
* <p> }
* Be aware that this is possible for an instance to load chunks before being registered.
*
* @param instance the instance of the chunk
* @param chunk the chunk
*/
public abstract void onChunkLoad(@NotNull Instance instance, Chunk chunk);
/** public void onChunkUnload(Chunk chunk) {
* Called when a chunk is unloaded. removeChunk(chunk);
* }
* @param instance the instance of the chunk
* @param chunk the chunk
*/
public abstract void onChunkUnload(@NotNull Instance instance, Chunk chunk);
/** /**
* Performs a server tick for all chunks based on their linked thread. * Performs a server tick for all chunks based on their linked thread.
* *
* @param time the update time in milliseconds * @param chunk the chunk
*/ */
public abstract void update(long time); public abstract int findThread(@NotNull Chunk chunk);
public void createBatch(@NotNull Consumer<BatchHandler> consumer, long time) { protected void addChunk(Chunk chunk) {
BatchHandler batchHandler = new BatchHandler(); int thread = findThread(chunk);
var chunks = threadChunkMap.computeIfAbsent(thread, ArrayList::new);
chunks.add(chunk);
}
consumer.accept(batchHandler); protected void removeChunk(Chunk chunk) {
int thread = findThread(chunk);
this.batchHandlers.add(batchHandler); var chunks = threadChunkMap.get(thread);
if (chunks != null) {
batchHandler.pushTask(threads, time); chunks.remove(chunk);
}
} }
/** /**
* Prepares the update. * Prepares the update.
* <p>
* {@link #update(long)} is called based on its cooldown to limit the overhead.
* The cooldown can be modified using {@link #setBatchesRefreshCooldown(UpdateOption)}.
* *
* @param time the tick time in milliseconds * @param time the tick time in milliseconds
*/ */
public void prepareUpdate(long time) { public void prepareUpdate(long time) {
// Verify if the batches should be updated this.threadChunkMap.forEach((threadId, chunks) -> {
if (batchesRefreshCooldown == null || BatchThread thread = threads.get(threadId);
!Cooldown.hasCooldown(time, lastBatchRefreshTime, batchesRefreshCooldown)) { var chunksCopy = new ArrayList<>(chunks);
this.lastBatchRefreshTime = time; thread.addRunnable(() -> {
this.batchHandlers.clear(); for (Chunk chunk : chunksCopy) {
update(time); chunk.tick(time);
} else { chunk.getInstance().getChunkEntities(chunk).forEach(entity -> {
// Push the tasks entity.tick(time);
for (BatchHandler batchHandler : batchHandlers) { });
batchHandler.pushTask(threads, time); }
} });
} });
} }
@NotNull @NotNull
@ -127,27 +100,12 @@ public abstract class ThreadProvider {
return countDownLatch; return countDownLatch;
} }
public void cleanup() {
for (BatchThread thread : threads) {
thread.setCost(0);
}
}
public void shutdown() { public void shutdown() {
this.threads.forEach(BatchThread::shutdown); this.threads.forEach(BatchThread::shutdown);
} }
@NotNull @NotNull
public Set<BatchThread> getThreads() { public List<BatchThread> getThreads() {
return threads; return threads;
} }
@Nullable
public UpdateOption getBatchesRefreshCooldown() {
return batchesRefreshCooldown;
}
public void setBatchesRefreshCooldown(@Nullable UpdateOption batchesRefreshCooldown) {
this.batchesRefreshCooldown = batchesRefreshCooldown;
}
} }

View File

@ -1,63 +0,0 @@
package net.minestom.server.thread.batch;
import net.minestom.server.instance.Chunk;
import net.minestom.server.lock.Acquirable;
import net.minestom.server.thread.BatchThread;
import net.minestom.server.utils.validate.Check;
import org.jetbrains.annotations.NotNull;
import java.util.ArrayList;
import java.util.Set;
public class BatchHandler {
private final BatchInfo batchInfo = new BatchInfo();
private final ArrayList<Chunk> chunks = new ArrayList<>();
private int estimatedCost;
public void updateChunk(@NotNull Chunk chunk, long time) {
// Set the BatchInfo field
//Acquirable.Handler handler = acquirable.getHandler();
//handler.refreshBatchInfo(batchInfo);
this.chunks.add(chunk);
this.estimatedCost++;
}
public void pushTask(@NotNull Set<BatchThread> threads, long time) {
BatchThread fitThread = null;
int minCost = Integer.MAX_VALUE;
// Find the thread with the lowest number of tasks
for (BatchThread thread : threads) {
final boolean switchThread = fitThread == null || thread.getCost() < minCost;
if (switchThread) {
fitThread = thread;
minCost = thread.getCost();
}
}
Check.notNull(fitThread, "The task thread returned null, something went terribly wrong.");
// The thread has been decided
this.batchInfo.refreshThread(fitThread);
// Create the runnable and send it to the thread for execution in the next tick
final Runnable runnable = createRunnable(time);
fitThread.addRunnable(runnable, estimatedCost);
}
@NotNull
private Runnable createRunnable(long time) {
return () -> {
for (Chunk chunk : chunks) {
chunk.tick(time);
chunk.getInstance().getEntities().forEach(entity -> {
entity.tick(time);
});
}
};
}
}