Initial acquirable API commit

This commit is contained in:
TheMode 2021-04-14 20:12:56 +02:00
parent dc5b764732
commit ccab205a54
11 changed files with 944 additions and 192 deletions

View File

@ -2,7 +2,6 @@ package net.minestom.server;
import net.minestom.server.advancements.AdvancementManager;
import net.minestom.server.adventure.bossbar.BossBarManager;
import net.minestom.server.monitoring.BenchmarkManager;
import net.minestom.server.command.CommandManager;
import net.minestom.server.data.DataManager;
import net.minestom.server.data.DataType;
@ -25,6 +24,7 @@ import net.minestom.server.instance.block.rule.BlockPlacementRule;
import net.minestom.server.item.Enchantment;
import net.minestom.server.item.Material;
import net.minestom.server.listener.manager.PacketListenerManager;
import net.minestom.server.monitoring.BenchmarkManager;
import net.minestom.server.network.ConnectionManager;
import net.minestom.server.network.PacketProcessor;
import net.minestom.server.network.netty.NettyServer;
@ -136,6 +136,7 @@ public final class MinecraftServer {
private static int compressionThreshold = 256;
private static boolean packetCaching = true;
private static boolean groupedPacket = true;
private static boolean waitMonitoring = false;
private static ResponseDataConsumer responseDataConsumer;
private static String brandName = "Minestom";
private static Difficulty difficulty = Difficulty.NORMAL;
@ -613,6 +614,14 @@ public final class MinecraftServer {
MinecraftServer.groupedPacket = groupedPacket;
}
public static boolean hasWaitMonitoring() {
return waitMonitoring;
}
public static void setWaitMonitoring(boolean waitMonitoring) {
MinecraftServer.waitMonitoring = waitMonitoring;
}
/**
* Gets the consumer executed to show server-list data.
*

View File

@ -0,0 +1,106 @@
package net.minestom.server.lock;
import net.minestom.server.thread.BatchThread;
import net.minestom.server.thread.batch.BatchInfo;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.List;
import java.util.concurrent.Phaser;
import java.util.function.Consumer;
/**
* Represents an element which can be acquired.
* Used for synchronization purpose.
* <p>
* Implementations of this class are recommended to be immutable (or at least thread-safe).
* The default one is {@link AcquirableImpl}.
*
* @param <T> the acquirable object type
*/
public interface Acquirable<T> {
/**
* Blocks the current thread until 'this' can be acquired,
* and execute {@code consumer} as a callback with the acquired object.
*
* @param consumer the consumer of the acquired object
* @return true if the acquisition happened without synchonization, false otherwise
*/
default boolean acquire(@NotNull Consumer<T> consumer) {
final Thread currentThread = Thread.currentThread();
Acquisition.AcquisitionData data = new Acquisition.AcquisitionData();
final Handler handler = getHandler();
final BatchInfo batchInfo = handler.getBatchInfo();
final BatchThread elementThread = batchInfo != null ? batchInfo.getBatchThread() : null;
final boolean sameThread = Acquisition.acquire(currentThread, elementThread, data);
final T unwrap = unwrap();
if (sameThread) {
consumer.accept(unwrap);
} else {
synchronized (unwrap) {
consumer.accept(unwrap);
}
// Remove the previously acquired thread from the local list
List<Thread> acquiredThreads = data.getAcquiredThreads();
if (acquiredThreads != null) {
acquiredThreads.remove(elementThread);
}
// Notify the end of the task if required
Phaser phaser = data.getPhaser();
if (phaser != null) {
phaser.arriveAndDeregister();
}
}
return sameThread;
}
/**
* Signals the acquisition manager to acquire 'this' at the end of the thread tick.
* <p>
* Thread-safety is guaranteed but not the order.
*
* @param consumer the consumer of the acquired object
*/
default void scheduledAcquire(@NotNull Consumer<T> consumer) {
Acquisition.scheduledAcquireRequest(this, consumer);
}
@NotNull
T unwrap();
@NotNull
Handler getHandler();
class Handler {
private volatile BatchInfo batchInfo;
@Nullable
public BatchInfo getBatchInfo() {
return batchInfo;
}
public void refreshBatchInfo(@NotNull BatchInfo batchInfo) {
this.batchInfo = batchInfo;
}
/**
* Executed during this element tick to empty the current thread acquisition queue.
*/
public void acquisitionTick() {
final BatchThread batchThread = batchInfo.getBatchThread();
if (batchThread == null)
return;
Acquisition.processQueue(batchThread.getQueue());
}
}
}

View File

@ -0,0 +1,33 @@
package net.minestom.server.lock;
import org.jetbrains.annotations.NotNull;
/**
* Basic implementation of {@link Acquirable}.
* <p>
* Class is immutable.
*
* @param <T> the object type which can be acquired
*/
public class AcquirableImpl<T> implements Acquirable<T> {
private final T value;
private final Handler handler;
public AcquirableImpl(@NotNull T value) {
this.value = value;
this.handler = new Handler();
}
@NotNull
@Override
public T unwrap() {
return value;
}
@NotNull
@Override
public Handler getHandler() {
return handler;
}
}

View File

@ -0,0 +1,296 @@
package net.minestom.server.lock;
import net.minestom.server.MinecraftServer;
import net.minestom.server.thread.BatchQueue;
import net.minestom.server.thread.BatchThread;
import net.minestom.server.thread.batch.BatchInfo;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
public final class Acquisition {
private static final ScheduledExecutorService ACQUISITION_CONTENTION_SERVICE = Executors.newSingleThreadScheduledExecutor();
private static final ThreadLocal<List<Thread>> ACQUIRED_THREADS = ThreadLocal.withInitial(ArrayList::new);
private static final ThreadLocal<ScheduledAcquisition> SCHEDULED_ACQUISITION = ThreadLocal.withInitial(ScheduledAcquisition::new);
private static final AtomicLong WAIT_COUNTER_NANO = new AtomicLong();
static {
// The goal of the contention service it is manage the situation where two threads are waiting for each other
ACQUISITION_CONTENTION_SERVICE.scheduleAtFixedRate(() -> {
final Set<BatchThread> threads = MinecraftServer.getUpdateManager().getThreadProvider().getThreads();
for (BatchThread batchThread : threads) {
final BatchThread waitingThread = (BatchThread) batchThread.getQueue().getWaitingThread();
if (waitingThread != null) {
if (waitingThread.getState() == Thread.State.WAITING &&
batchThread.getState() == Thread.State.WAITING) {
processQueue(waitingThread.getQueue());
}
}
}
}, 3, 3, TimeUnit.MILLISECONDS);
}
public static <E, T extends Acquirable<E>> void acquireCollection(@NotNull Collection<T> collection,
@NotNull Supplier<Collection<E>> collectionSupplier,
@NotNull Consumer<Collection<E>> consumer) {
final Thread currentThread = Thread.currentThread();
Collection<E> result = collectionSupplier.get();
Map<BatchThread, List<E>> threadCacheMap = retrieveThreadMap(collection, currentThread, result::add);
// Acquire all the threads
{
List<Phaser> phasers = new ArrayList<>();
for (Map.Entry<BatchThread, List<E>> entry : threadCacheMap.entrySet()) {
final BatchThread batchThread = entry.getKey();
final List<E> elements = entry.getValue();
AcquisitionData data = new AcquisitionData();
acquire(currentThread, batchThread, data);
// Retrieve all elements
result.addAll(elements);
final Phaser phaser = data.getPhaser();
if (phaser != null) {
phasers.add(phaser);
}
}
// Give result and deregister phasers
consumer.accept(result);
for (Phaser phaser : phasers) {
phaser.arriveAndDeregister();
}
}
}
public static <E, T extends Acquirable<E>> void acquireForEach(@NotNull Collection<T> collection,
@NotNull Consumer<E> consumer) {
final Thread currentThread = Thread.currentThread();
Map<BatchThread, List<E>> threadCacheMap = retrieveThreadMap(collection, currentThread, consumer);
// Acquire all the threads one by one
{
for (Map.Entry<BatchThread, List<E>> entry : threadCacheMap.entrySet()) {
final BatchThread batchThread = entry.getKey();
final List<E> elements = entry.getValue();
AcquisitionData data = new AcquisitionData();
acquire(currentThread, batchThread, data);
// Execute the consumer for all waiting elements
for (E element : elements) {
synchronized (element) {
consumer.accept(element);
}
}
final Phaser phaser = data.getPhaser();
if (phaser != null) {
phaser.arriveAndDeregister();
}
}
}
}
/**
* Notifies all the locks and wait for them to return using a {@link Phaser}.
* <p>
* Currently called during instance/chunk/entity ticks
* and in {@link BatchThread.BatchRunnable#run()} after every thread-tick.
*
* @param queue the queue to empty containing the locks to notify
* @see #acquire(Thread, BatchThread, AcquisitionData)
*/
public static void processQueue(@NotNull BatchQueue queue) {
Queue<AcquisitionData> acquisitionQueue = queue.getQueue();
if (acquisitionQueue.isEmpty())
return;
Phaser phaser = new Phaser(1);
synchronized (queue) {
AcquisitionData lock;
while ((lock = acquisitionQueue.poll()) != null) {
lock.phaser = phaser;
phaser.register();
}
queue.setWaitingThread(null);
queue.notifyAll();
}
phaser.arriveAndAwaitAdvance();
}
public static void processThreadTick(@NotNull BatchQueue queue) {
processQueue(queue);
ScheduledAcquisition scheduledAcquisition = SCHEDULED_ACQUISITION.get();
final List<Acquirable<Object>> acquirableElements = scheduledAcquisition.acquirableElements;
if (!acquirableElements.isEmpty()) {
final Map<Object, List<Consumer<Object>>> callbacks = scheduledAcquisition.callbacks;
acquireForEach(acquirableElements, element -> {
List<Consumer<Object>> consumers = callbacks.get(element);
if (consumers == null || consumers.isEmpty())
return;
consumers.forEach(objectConsumer -> objectConsumer.accept(element));
});
// Clear collections..
acquirableElements.clear();
callbacks.clear();
}
}
/**
* Checks if the {@link Acquirable} update tick is in the same thread as {@link Thread#currentThread()}.
* If yes return immediately, otherwise a lock will be created and added to {@link BatchQueue#getQueue()}
* to be executed later during {@link #processQueue(BatchQueue)}.
*
* @param data the object containing data about the acquisition
* @return true if the acquisition didn't require any synchronization
* @see #processQueue(BatchQueue)
*/
protected static boolean acquire(@NotNull Thread currentThread, @Nullable BatchThread elementThread, @NotNull AcquisitionData data) {
if (elementThread == null) {
// Element didn't get assigned a thread yet (meaning that the element is not part of any thread)
// Returns false in order to force synchronization (useful if this element is acquired multiple time)
return false;
}
if (currentThread == elementThread) {
// Element can be acquired without any wait/block because threads are the same
return true;
}
if (!elementThread.getMainRunnable().isInTick()) {
// Element tick has ended and can therefore be directly accessed (with synchronization)
return false;
}
final List<Thread> acquiredThread = ACQUIRED_THREADS.get();
if (acquiredThread.contains(elementThread)) {
// This thread is already acquiring the element thread
return true;
}
// Element needs to be synchronized, forward a request
{
// Prevent most of contentions, the rest in handled in the acquisition scheduled service
if (currentThread instanceof BatchThread) {
BatchThread batchThread = (BatchThread) currentThread;
Acquisition.processQueue(batchThread.getQueue());
}
try {
final boolean monitoring = MinecraftServer.hasWaitMonitoring();
long time = 0;
if (monitoring) {
time = System.nanoTime();
}
final BatchQueue periodQueue = elementThread.getQueue();
synchronized (periodQueue) {
acquiredThread.add(elementThread);
data.acquiredThreads = acquiredThread; // Shared to remove the element when the acquisition is done
periodQueue.setWaitingThread(elementThread);
periodQueue.getQueue().add(data);
periodQueue.wait();
}
if (monitoring) {
time = System.nanoTime() - time;
WAIT_COUNTER_NANO.addAndGet(time);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
}
protected synchronized static <T> void scheduledAcquireRequest(@NotNull Acquirable<T> acquirable, Consumer<T> consumer) {
ScheduledAcquisition scheduledAcquisition = SCHEDULED_ACQUISITION.get();
scheduledAcquisition.acquirableElements.add((Acquirable<Object>) acquirable);
scheduledAcquisition.callbacks
.computeIfAbsent(acquirable.unwrap(), objectAcquirable -> new ArrayList<>())
.add((Consumer<Object>) consumer);
}
private static <E, T extends Acquirable<E>> Map<BatchThread, List<E>> retrieveThreadMap(@NotNull Collection<T> collection,
@NotNull Thread currentThread,
@NotNull Consumer<E> consumer) {
Map<BatchThread, List<E>> threadCacheMap = new HashMap<>();
for (T element : collection) {
final E value = element.unwrap();
final BatchInfo batchInfo = element.getHandler().getBatchInfo();
final BatchThread elementThread = batchInfo != null ? batchInfo.getBatchThread() : null;
if (currentThread == elementThread) {
// The element is managed in the current thread, consumer can be immediately called
consumer.accept(value);
} else {
// The element is manager in a different thread, cache it
List<E> threadCacheList = threadCacheMap.computeIfAbsent(elementThread, batchThread -> new ArrayList<>());
threadCacheList.add(value);
}
}
return threadCacheMap;
}
public static long getCurrentWaitMonitoring() {
return WAIT_COUNTER_NANO.get();
}
public static void resetWaitMonitoring() {
WAIT_COUNTER_NANO.set(0);
}
public static final class AcquisitionData {
private volatile Phaser phaser;
private volatile List<Thread> acquiredThreads;
@Nullable
public Phaser getPhaser() {
return phaser;
}
@Nullable
public List<Thread> getAcquiredThreads() {
return acquiredThreads;
}
}
private static class ScheduledAcquisition {
private final List<Acquirable<Object>> acquirableElements = new ArrayList<>();
private final Map<Object, List<Consumer<Object>>> callbacks = new HashMap<>();
}
}

View File

@ -0,0 +1,23 @@
package net.minestom.server.lock;
import org.jetbrains.annotations.NotNull;
/**
* Represents an element that have a {@link Acquirable} linked to it.
* <p>
* Useful if you want to provide an access point to an object without risking to compromise
* the thread-safety of your code.
*/
public interface LockedElement {
/**
* Gets the {@link Acquirable} of this locked element.
* <p>
* Should be a constant.
*
* @return the acquirable element linked to this object
*/
@NotNull
<T> Acquirable<T> getAcquiredElement();
}

View File

@ -0,0 +1,34 @@
package net.minestom.server.thread;
import com.google.common.collect.Queues;
import net.minestom.server.lock.Acquisition;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.Queue;
/**
* Represents the data of a {@link BatchThread} involved in acquisition.
* <p>
* Used as a lock until an acquirable element is available.
*/
public class BatchQueue {
private final Queue<Acquisition.AcquisitionData> acquisitionDataQueue = Queues.newConcurrentLinkedQueue();
private volatile Thread waitingThread;
@NotNull
public Queue<Acquisition.AcquisitionData> getQueue() {
return acquisitionDataQueue;
}
@Nullable
public Thread getWaitingThread() {
return waitingThread;
}
public void setWaitingThread(@Nullable Thread waitingThread) {
this.waitingThread = waitingThread;
}
}

View File

@ -0,0 +1,121 @@
package net.minestom.server.thread;
import net.minestom.server.MinecraftServer;
import net.minestom.server.lock.Acquisition;
import net.minestom.server.utils.validate.Check;
import org.jetbrains.annotations.NotNull;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
public class BatchThread extends Thread {
private final BatchRunnable runnable;
private final BatchQueue queue;
private int cost;
public BatchThread(@NotNull BatchRunnable runnable, int number) {
super(runnable, MinecraftServer.THREAD_NAME_TICK + "-" + number);
this.runnable = runnable;
this.queue = new BatchQueue();
this.runnable.setLinkedThread(this);
}
public int getCost() {
return cost;
}
public void setCost(int cost) {
this.cost = cost;
}
@NotNull
public BatchRunnable getMainRunnable() {
return runnable;
}
@NotNull
public BatchQueue getQueue() {
return queue;
}
public void addRunnable(@NotNull Runnable runnable, int cost) {
this.runnable.queue.add(runnable);
this.cost += cost;
}
public void shutdown() {
this.runnable.stop = true;
}
public static class BatchRunnable implements Runnable {
private volatile boolean stop;
private BatchThread batchThread;
private volatile boolean inTick;
private volatile CountDownLatch countDownLatch;
private final Queue<Runnable> queue = new ArrayDeque<>();
private final Object tickLock = new Object();
@Override
public void run() {
Check.notNull(batchThread, "The linked BatchThread cannot be null!");
while (!stop) {
// The latch is necessary to control the tick rates
if (countDownLatch == null)
continue;
synchronized (tickLock) {
this.inTick = true;
// Execute all pending runnable
Runnable runnable;
while ((runnable = queue.poll()) != null) {
runnable.run();
}
// Execute waiting acquisition
{
Acquisition.processThreadTick(batchThread.getQueue());
}
this.countDownLatch.countDown();
this.countDownLatch = null;
this.inTick = false;
// Wait for the next notify (game tick)
try {
this.tickLock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public synchronized void startTick(@NotNull CountDownLatch countDownLatch) {
synchronized (tickLock) {
this.countDownLatch = countDownLatch;
this.tickLock.notifyAll();
}
}
public boolean isInTick() {
return inTick;
}
private void setLinkedThread(BatchThread batchThread) {
this.batchThread = batchThread;
}
}
}

View File

@ -1,44 +1,49 @@
package net.minestom.server.thread;
import io.netty.util.NettyRuntime;
import net.minestom.server.MinecraftServer;
import net.minestom.server.entity.*;
import net.minestom.server.UpdateManager;
import net.minestom.server.instance.Chunk;
import net.minestom.server.instance.Instance;
import net.minestom.server.instance.InstanceContainer;
import net.minestom.server.instance.SharedInstance;
import net.minestom.server.utils.callback.validator.EntityValidator;
import net.minestom.server.utils.chunk.ChunkUtils;
import net.minestom.server.utils.thread.MinestomThread;
import net.minestom.server.thread.batch.BatchHandler;
import net.minestom.server.thread.batch.BatchSetupHandler;
import net.minestom.server.utils.time.Cooldown;
import net.minestom.server.utils.time.TimeUnit;
import net.minestom.server.utils.time.UpdateOption;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
/**
* Used to link chunks into multiple groups.
* Then executed into a thread pool.
* <p>
* You can change the current thread provider by calling {@link net.minestom.server.UpdateManager#setThreadProvider(ThreadProvider)}.
* You can change the current thread provider by calling {@link UpdateManager#setThreadProvider(ThreadProvider)}.
*/
public abstract class ThreadProvider {
/**
* The thread pool of this thread provider.
*/
protected ExecutorService pool;
/**
* The amount of threads in the thread pool
*/
private int threadCount;
private final Set<BatchThread> threads;
{
// Default thread count in the pool (cores * 2)
setThreadCount(1);
private final List<BatchSetupHandler> batchHandlers = new ArrayList<>();
private UpdateOption batchesRefreshCooldown;
private long lastBatchRefreshTime;
public ThreadProvider(int threadCount) {
this.threads = new HashSet<>(threadCount);
this.batchesRefreshCooldown = new UpdateOption(500, TimeUnit.MILLISECOND);
for (int i = 0; i < threadCount; i++) {
final BatchThread.BatchRunnable batchRunnable = new BatchThread.BatchRunnable();
final BatchThread batchThread = new BatchThread(batchRunnable, i);
this.threads.add(batchThread);
batchThread.start();
}
}
/**
@ -61,192 +66,89 @@ public abstract class ThreadProvider {
* Be aware that this is possible for an instance to load chunks before being registered.
*
* @param instance the instance of the chunk
* @param chunk the loaded chunk
* @param chunk the chunk
*/
public abstract void onChunkLoad(@NotNull Instance instance, @NotNull Chunk chunk);
public abstract void onChunkLoad(@NotNull Instance instance, Chunk chunk);
/**
* Called when a chunk is unloaded.
*
* @param instance the instance of the chunk
* @param chunk the unloaded chunk
* @param chunk the chunk
*/
public abstract void onChunkUnload(@NotNull Instance instance, @NotNull Chunk chunk);
public abstract void onChunkUnload(@NotNull Instance instance, Chunk chunk);
/**
* Performs a server tick for all chunks based on their linked thread.
*
* @param time the update time in milliseconds
* @return the futures to execute to complete the tick
*/
public abstract void update(long time);
public void createBatch(@NotNull Consumer<BatchHandler> consumer, long time) {
BatchSetupHandler batchSetupHandler = new BatchSetupHandler();
consumer.accept(batchSetupHandler);
this.batchHandlers.add(batchSetupHandler);
batchSetupHandler.pushTask(threads, time);
}
/**
* Prepares the update.
* <p>
* {@link #update(long)} is called based on its cooldown to limit the overhead.
* The cooldown can be modified using {@link #setBatchesRefreshCooldown(UpdateOption)}.
*
* @param time the tick time in milliseconds
*/
public void prepareUpdate(long time) {
// Verify if the batches should be updated
if (batchesRefreshCooldown == null ||
!Cooldown.hasCooldown(time, lastBatchRefreshTime, batchesRefreshCooldown)) {
this.lastBatchRefreshTime = time;
this.batchHandlers.clear();
update(time);
} else {
// Push the tasks
for (BatchSetupHandler batchHandler : batchHandlers) {
batchHandler.pushTask(threads, time);
}
}
}
@NotNull
public abstract List<Future<?>> update(long time);
/**
* Gets the current size of the thread pool.
*
* @return the thread pool's size
*/
public int getThreadCount() {
return threadCount;
}
/**
* Changes the amount of threads in the thread pool.
*
* @param threadCount the new amount of threads
*/
public synchronized void setThreadCount(int threadCount) {
this.threadCount = threadCount;
refreshPool();
}
private void refreshPool() {
if (pool != null) {
this.pool.shutdown();
public CountDownLatch notifyThreads() {
CountDownLatch countDownLatch = new CountDownLatch(threads.size());
for (BatchThread thread : threads) {
final BatchThread.BatchRunnable runnable = thread.getMainRunnable();
runnable.startTick(countDownLatch);
}
this.pool = new MinestomThread(threadCount, MinecraftServer.THREAD_NAME_TICK);
return countDownLatch;
}
// INSTANCE UPDATE
/**
* Processes a whole tick for a chunk.
*
* @param instance the instance of the chunk
* @param chunk the chunk to update
* @param time the time of the update in milliseconds
*/
protected void processChunkTick(@NotNull Instance instance, @NotNull Chunk chunk, long time) {
if (!ChunkUtils.isLoaded(chunk))
return;
updateChunk(instance, chunk, time);
updateEntities(instance, chunk, time);
}
/**
* Executes an instance tick.
*
* @param instance the instance
* @param time the current time in ms
*/
protected void updateInstance(@NotNull Instance instance, long time) {
// The instance
instance.tick(time);
}
/**
* Executes a chunk tick (blocks update).
*
* @param instance the chunk's instance
* @param chunk the chunk
* @param time the current time in ms
*/
protected void updateChunk(@NotNull Instance instance, @NotNull Chunk chunk, long time) {
chunk.tick(time, instance);
}
// ENTITY UPDATE
/**
* Executes an entity tick (all entities type creatures/objects/players) in an instance's chunk.
*
* @param instance the chunk's instance
* @param chunk the chunk
* @param time the current time in ms
*/
protected void updateEntities(@NotNull Instance instance, @NotNull Chunk chunk, long time) {
conditionalEntityUpdate(instance, chunk, time, null);
}
/**
* Executes an entity tick for object entities in an instance's chunk.
*
* @param instance the chunk's instance
* @param chunk the chunk
* @param time the current time in ms
*/
protected void updateObjectEntities(@NotNull Instance instance, @NotNull Chunk chunk, long time) {
conditionalEntityUpdate(instance, chunk, time, entity -> entity instanceof ObjectEntity);
}
/**
* Executes an entity tick for living entities in an instance's chunk.
*
* @param instance the chunk's instance
* @param chunk the chunk
* @param time the current time in ms
*/
protected void updateLivingEntities(@NotNull Instance instance, @NotNull Chunk chunk, long time) {
conditionalEntityUpdate(instance, chunk, time, entity -> entity instanceof LivingEntity);
}
/**
* Executes an entity tick for creatures entities in an instance's chunk.
*
* @param instance the chunk's instance
* @param chunk the chunk
* @param time the current time in ms
*/
protected void updateCreatures(@NotNull Instance instance, @NotNull Chunk chunk, long time) {
conditionalEntityUpdate(instance, chunk, time, entity -> entity instanceof EntityCreature);
}
/**
* Executes an entity tick for players in an instance's chunk.
*
* @param instance the chunk's instance
* @param chunk the chunk
* @param time the current time in ms
*/
protected void updatePlayers(@NotNull Instance instance, @NotNull Chunk chunk, long time) {
conditionalEntityUpdate(instance, chunk, time, entity -> entity instanceof Player);
}
/**
* Executes an entity tick in an instance's chunk if condition is verified.
*
* @param instance the chunk's instance
* @param chunk the chunk
* @param time the current time in ms
* @param condition the condition which confirm if the update happens or not
*/
protected void conditionalEntityUpdate(@NotNull Instance instance, @NotNull Chunk chunk, long time,
@Nullable EntityValidator condition) {
if (!instance.getEntities().isEmpty()) {
final Set<Entity> entities = instance.getChunkEntities(chunk);
if (!entities.isEmpty()) {
for (Entity entity : entities) {
if (condition != null && !condition.isValid(entity))
continue;
entity.tick(time);
}
}
}
updateSharedInstances(instance, sharedInstance -> conditionalEntityUpdate(sharedInstance, chunk, time, condition));
}
/**
* If {@code instance} is an {@link InstanceContainer}, run a callback for all of its
* {@link SharedInstance}.
*
* @param instance the instance
* @param callback the callback to run for all the {@link SharedInstance}
*/
private void updateSharedInstances(@NotNull Instance instance, @NotNull Consumer<SharedInstance> callback) {
if (instance instanceof InstanceContainer) {
final InstanceContainer instanceContainer = (InstanceContainer) instance;
if (!instanceContainer.hasSharedInstances())
return;
for (SharedInstance sharedInstance : instanceContainer.getSharedInstances()) {
callback.accept(sharedInstance);
}
public void cleanup() {
for (BatchThread thread : threads) {
thread.setCost(0);
}
}
}
public void shutdown() {
this.threads.forEach(BatchThread::shutdown);
}
@NotNull
public Set<BatchThread> getThreads() {
return threads;
}
@Nullable
public UpdateOption getBatchesRefreshCooldown() {
return batchesRefreshCooldown;
}
public void setBatchesRefreshCooldown(@Nullable UpdateOption batchesRefreshCooldown) {
this.batchesRefreshCooldown = batchesRefreshCooldown;
}
}

View File

@ -0,0 +1,100 @@
package net.minestom.server.thread.batch;
import net.minestom.server.entity.Entity;
import net.minestom.server.instance.Chunk;
import net.minestom.server.instance.Instance;
import net.minestom.server.instance.InstanceContainer;
import net.minestom.server.instance.SharedInstance;
import net.minestom.server.utils.callback.validator.EntityValidator;
import net.minestom.server.utils.chunk.ChunkUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.function.Consumer;
public interface BatchHandler {
// INSTANCE UPDATE
/**
* Executes an instance tick.
*
* @param instance the instance
* @param time the current time in ms
*/
void updateInstance(@NotNull Instance instance, long time);
/**
* Executes a chunk tick (blocks update).
*
* @param instance the chunk's instance
* @param chunk the chunk
* @param time the current time in ms
*/
void updateChunk(@NotNull Instance instance, @NotNull Chunk chunk, long time);
/**
* Processes a whole tick for a chunk.
*
* @param instance the instance of the chunk
* @param chunkIndex the index of the chunk {@link ChunkUtils#getChunkIndex(int, int)}
* @param time the time of the update in milliseconds
*/
default void updateChunk(@NotNull Instance instance, long chunkIndex, long time) {
final int chunkX = ChunkUtils.getChunkCoordX(chunkIndex);
final int chunkZ = ChunkUtils.getChunkCoordZ(chunkIndex);
final Chunk chunk = instance.getChunk(chunkX, chunkZ);
updateChunk(instance, chunk, time);
}
// ENTITY UPDATE
/**
* Executes an entity tick (all entities type creatures/objects/players) in an instance's chunk.
*
* @param instance the chunk's instance
* @param chunk the chunk
* @param time the current time in ms
*/
default void updateEntities(@NotNull Instance instance, @NotNull Chunk chunk, long time) {
conditionalEntityUpdate(instance, chunk, time, null);
}
/**
* Executes an entity tick in an instance's chunk if condition is verified.
*
* @param instance the chunk's instance
* @param chunk the chunk
* @param time the current time in ms
* @param condition the condition which confirm if the update happens or not
*/
void conditionalEntityUpdate(@NotNull Instance instance,
@NotNull Chunk chunk, long time,
@Nullable EntityValidator condition);
default boolean shouldTick(@NotNull Entity entity, @Nullable EntityValidator condition) {
return condition == null || condition.isValid(entity);
}
/**
* If {@code instance} is an {@link InstanceContainer}, run a callback for all of its
* {@link SharedInstance}.
*
* @param instance the instance
* @param callback the callback to run for all the {@link SharedInstance}
*/
default void updateSharedInstances(@NotNull Instance instance, @NotNull Consumer<SharedInstance> callback) {
if (instance instanceof InstanceContainer) {
final InstanceContainer instanceContainer = (InstanceContainer) instance;
if (!instanceContainer.hasSharedInstances())
return;
for (SharedInstance sharedInstance : instanceContainer.getSharedInstances()) {
callback.accept(sharedInstance);
}
}
}
}

View File

@ -0,0 +1,33 @@
package net.minestom.server.thread.batch;
import net.minestom.server.thread.BatchThread;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.Set;
/**
* Object shared between all elements of a batch.
* <p>
* Used to retrieve the element tick thread.
*/
public class BatchInfo {
private volatile BatchThread batchThread;
@Nullable
public BatchThread getBatchThread() {
return batchThread;
}
/**
* Specifies in which thread this element will be updated.
* Currently defined before every tick for all game elements in {@link BatchSetupHandler#pushTask(Set, long)}.
*
* @param batchThread the thread where this element will be updated
*/
public void refreshThread(@NotNull BatchThread batchThread) {
this.batchThread = batchThread;
}
}

View File

@ -0,0 +1,95 @@
package net.minestom.server.thread.batch;
import net.minestom.server.Tickable;
import net.minestom.server.entity.Entity;
import net.minestom.server.instance.Chunk;
import net.minestom.server.instance.Instance;
import net.minestom.server.lock.Acquirable;
import net.minestom.server.thread.BatchThread;
import net.minestom.server.utils.callback.validator.EntityValidator;
import net.minestom.server.utils.validate.Check;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.ArrayList;
import java.util.Set;
public class BatchSetupHandler implements BatchHandler {
private static final int INSTANCE_COST = 5;
private static final int CHUNK_COST = 5;
private static final int ENTITY_COST = 5;
private final BatchInfo batchInfo = new BatchInfo();
private final ArrayList<Acquirable<?>> elements = new ArrayList<>();
private int estimatedCost;
@Override
public void updateInstance(@NotNull Instance instance, long time) {
addAcquirable(instance.getAcquiredElement(), INSTANCE_COST);
}
@Override
public void updateChunk(@NotNull Instance instance, @NotNull Chunk chunk, long time) {
addAcquirable(chunk.getAcquiredElement(), CHUNK_COST);
}
@Override
public void conditionalEntityUpdate(@NotNull Instance instance, @NotNull Chunk chunk, long time,
@Nullable EntityValidator condition) {
final Set<Entity> entities = instance.getChunkEntities(chunk);
for (Entity entity : entities) {
if (shouldTick(entity, condition)) {
addAcquirable(entity.getAcquiredElement(), ENTITY_COST);
}
}
}
public void pushTask(@NotNull Set<BatchThread> threads, long time) {
BatchThread fitThread = null;
int minCost = Integer.MAX_VALUE;
// Find the thread with the lowest number of tasks
for (BatchThread thread : threads) {
final boolean switchThread = fitThread == null || thread.getCost() < minCost;
if (switchThread) {
fitThread = thread;
minCost = thread.getCost();
}
}
Check.notNull(fitThread, "The task thread returned null, something went terribly wrong.");
// The thread has been decided
this.batchInfo.refreshThread(fitThread);
// Create the runnable and send it to the thread for execution in the next tick
final Runnable runnable = createRunnable(time);
fitThread.addRunnable(runnable, estimatedCost);
}
@NotNull
private Runnable createRunnable(long time) {
return () -> {
for (Acquirable<?> element : elements) {
final Object unwrapElement = element.unwrap();
if (unwrapElement instanceof Tickable) {
((Tickable) unwrapElement).tick(time);
}
}
};
}
private void addAcquirable(Acquirable<?> acquirable, int estimatedCost) {
// Set the BatchInfo field
Acquirable.Handler handler = acquirable.getHandler();
handler.refreshBatchInfo(batchInfo);
this.elements.add(acquirable);
this.estimatedCost += estimatedCost;
}
}