diff --git a/src/main/java/net/minestom/server/MinecraftServer.java b/src/main/java/net/minestom/server/MinecraftServer.java index 1f62869a6..c8c69b092 100644 --- a/src/main/java/net/minestom/server/MinecraftServer.java +++ b/src/main/java/net/minestom/server/MinecraftServer.java @@ -74,6 +74,7 @@ public final class MinecraftServer { // Threads public static final String THREAD_NAME_BENCHMARK = "Ms-Benchmark"; + public static final String THREAD_NAME_TICK_SCHEDULER = "Ms-TickScheduler"; public static final String THREAD_NAME_TICK = "Ms-Tick"; public static final String THREAD_NAME_BLOCK_BATCH = "Ms-BlockBatchPool"; diff --git a/src/main/java/net/minestom/server/UpdateManager.java b/src/main/java/net/minestom/server/UpdateManager.java index 39c50cc2c..37fc0ae7e 100644 --- a/src/main/java/net/minestom/server/UpdateManager.java +++ b/src/main/java/net/minestom/server/UpdateManager.java @@ -1,13 +1,14 @@ package net.minestom.server; import com.google.common.collect.Queues; +import net.minestom.server.acquirable.Acquirable; import net.minestom.server.instance.Chunk; import net.minestom.server.instance.Instance; import net.minestom.server.instance.InstanceManager; import net.minestom.server.monitoring.TickMonitor; import net.minestom.server.network.ConnectionManager; import net.minestom.server.network.player.NettyPlayerConnection; -import net.minestom.server.thread.PerInstanceThreadProvider; +import net.minestom.server.thread.SingleThreadProvider; import net.minestom.server.thread.ThreadProvider; import net.minestom.server.utils.async.AsyncUtils; import org.jetbrains.annotations.NotNull; @@ -21,28 +22,22 @@ import java.util.function.LongConsumer; /** * Manager responsible for the server ticks. *

- * The {@link ThreadProvider} manages the multi-thread aspect for {@link Instance} ticks, - * it can be modified with {@link #setThreadProvider(ThreadProvider)}. + * The {@link ThreadProvider} manages the multi-thread aspect of chunk ticks. */ public final class UpdateManager { private final ScheduledExecutorService updateExecutionService = Executors.newSingleThreadScheduledExecutor(r -> - new Thread(r, "tick-scheduler")); + new Thread(r, MinecraftServer.THREAD_NAME_TICK_SCHEDULER)); private volatile boolean stopRequested; - private ThreadProvider threadProvider; + // TODO make configurable + private ThreadProvider threadProvider = new SingleThreadProvider(); private final Queue tickStartCallbacks = Queues.newConcurrentLinkedQueue(); private final Queue tickEndCallbacks = Queues.newConcurrentLinkedQueue(); private final List> tickMonitors = new CopyOnWriteArrayList<>(); - { - // DEFAULT THREAD PROVIDER - //threadProvider = new PerGroupChunkProvider(); - threadProvider = new PerInstanceThreadProvider(); - } - /** * Should only be created in MinecraftServer. */ @@ -85,9 +80,12 @@ public final class UpdateManager { // Monitoring if (!tickMonitors.isEmpty()) { + final double acquisitionTimeMs = Acquirable.getAcquiringTime() / 1e6D; final double tickTimeMs = tickTime / 1e6D; - final TickMonitor tickMonitor = new TickMonitor(tickTimeMs); + final TickMonitor tickMonitor = new TickMonitor(tickTimeMs, acquisitionTimeMs); this.tickMonitors.forEach(consumer -> consumer.accept(tickMonitor)); + + Acquirable.resetAcquiringTime(); } // Flush all waiting packets @@ -108,21 +106,23 @@ public final class UpdateManager { * @param tickStart the time of the tick in milliseconds */ private void serverTick(long tickStart) { - List> futures; + // Tick all instances + MinecraftServer.getInstanceManager().getInstances().forEach(instance -> + instance.tick(tickStart)); - // Server tick (instance/chunk/entity) - // Synchronize with the update manager instance, like the signal for chunk load/unload - synchronized (this) { - futures = threadProvider.update(tickStart); + // Tick all chunks (and entities inside) + final CountDownLatch countDownLatch = threadProvider.update(tickStart); + + // Wait tick end + try { + countDownLatch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); } - for (final Future future : futures) { - try { - future.get(); - } catch (Throwable e) { - MinecraftServer.getExceptionManager().handleException(e); - } - } + // Clear removed entities & update threads + long tickTime = System.currentTimeMillis() - tickStart; + this.threadProvider.refreshThreads(tickTime); } /** @@ -145,20 +145,10 @@ public final class UpdateManager { * * @return the current thread provider */ - public ThreadProvider getThreadProvider() { + public @NotNull ThreadProvider getThreadProvider() { return threadProvider; } - /** - * Changes the server {@link ThreadProvider}. - * - * @param threadProvider the new thread provider - * @throws NullPointerException if threadProvider is null - */ - public synchronized void setThreadProvider(ThreadProvider threadProvider) { - this.threadProvider = threadProvider; - } - /** * Signals the {@link ThreadProvider} that an instance has been created. *

@@ -166,9 +156,7 @@ public final class UpdateManager { * * @param instance the instance */ - public synchronized void signalInstanceCreate(Instance instance) { - if (this.threadProvider == null) - return; + public void signalInstanceCreate(Instance instance) { this.threadProvider.onInstanceCreate(instance); } @@ -179,9 +167,7 @@ public final class UpdateManager { * * @param instance the instance */ - public synchronized void signalInstanceDelete(Instance instance) { - if (this.threadProvider == null) - return; + public void signalInstanceDelete(Instance instance) { this.threadProvider.onInstanceDelete(instance); } @@ -190,13 +176,10 @@ public final class UpdateManager { *

* WARNING: should be automatically done by the {@link Instance} implementation. * - * @param instance the instance of the chunk - * @param chunk the loaded chunk + * @param chunk the loaded chunk */ - public synchronized void signalChunkLoad(Instance instance, @NotNull Chunk chunk) { - if (this.threadProvider == null) - return; - this.threadProvider.onChunkLoad(instance, chunk); + public void signalChunkLoad(@NotNull Chunk chunk) { + this.threadProvider.onChunkLoad(chunk); } /** @@ -204,13 +187,10 @@ public final class UpdateManager { *

* WARNING: should be automatically done by the {@link Instance} implementation. * - * @param instance the instance of the chunk - * @param chunk the unloaded chunk + * @param chunk the unloaded chunk */ - public synchronized void signalChunkUnload(Instance instance, @NotNull Chunk chunk) { - if (this.threadProvider == null) - return; - this.threadProvider.onChunkUnload(instance, chunk); + public void signalChunkUnload(@NotNull Chunk chunk) { + this.threadProvider.onChunkUnload(chunk); } /** @@ -265,6 +245,7 @@ public final class UpdateManager { * Stops the server loop. */ public void stop() { - stopRequested = true; + this.stopRequested = true; + this.threadProvider.shutdown(); } } diff --git a/src/main/java/net/minestom/server/acquirable/Acquirable.java b/src/main/java/net/minestom/server/acquirable/Acquirable.java new file mode 100644 index 000000000..036e75fc8 --- /dev/null +++ b/src/main/java/net/minestom/server/acquirable/Acquirable.java @@ -0,0 +1,173 @@ +package net.minestom.server.acquirable; + +import com.google.common.annotations.Beta; +import net.minestom.server.entity.Entity; +import net.minestom.server.thread.ThreadProvider; +import net.minestom.server.thread.TickThread; +import net.minestom.server.utils.async.AsyncUtils; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; + +import java.util.Objects; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.stream.Stream; + +@Beta +public interface Acquirable { + + /** + * Gets all the {@link Entity entities} being ticked in the current thread. + *

+ * Useful when you want to ensure that no acquisition is ever done. + * + * @return the entities ticked in the current thread + */ + static @NotNull Stream<@NotNull Entity> currentEntities() { + return AcquirableImpl.CURRENT_ENTITIES.get(); + } + + /** + * Changes the stream returned by {@link #currentEntities()}. + *

+ * Mostly for internal use, external calls are unrecommended as they could lead + * to unexpected behavior. + * + * @param entities the new entity stream + */ + @ApiStatus.Internal + static void refreshEntities(@NotNull Stream<@NotNull Entity> entities) { + AcquirableImpl.CURRENT_ENTITIES.set(entities); + } + + /** + * Gets the time spent acquiring since last tick. + * + * @return the acquiring time + */ + static long getAcquiringTime() { + return AcquirableImpl.WAIT_COUNTER_NANO.get(); + } + + /** + * Resets {@link #getAcquiringTime()}. + *

+ * Mostly for internal use. + */ + @ApiStatus.Internal + static void resetAcquiringTime() { + AcquirableImpl.WAIT_COUNTER_NANO.set(0); + } + + /** + * Creates a new {@link Acquirable} object. + *

+ * Mostly for internal use, as a {@link TickThread} has to be used + * and properly synchronized. + * + * @param value the acquirable element + * @param the acquirable element type + * @return a new acquirable object + */ + @ApiStatus.Internal + static @NotNull Acquirable of(@NotNull T value) { + return new AcquirableImpl<>(value); + } + + /** + * Returns a new {@link Acquired} object which will be locked to the current thread. + *

+ * Useful when your code cannot be done inside a callback and need to be sync. + * Do not forget to call {@link Acquired#unlock()} once you are done with it. + * + * @return an acquired object + * @see #sync(Consumer) for auto-closeable capability + */ + default @NotNull Acquired lock() { + var optional = local(); + return optional.map(Acquired::local).orElseGet(() -> Acquired.locked(this)); + } + + /** + * Retrieves the acquirable value if and only if the element + * is already present/ticked in the current thread. + *

+ * Useful when you want only want to acquire an element when you are guaranteed + * to do not create a huge performance impact. + * + * @return an optional containing the acquired element if safe, + * {@link Optional#empty()} otherwise + */ + default @NotNull Optional local() { + final Thread currentThread = Thread.currentThread(); + final TickThread tickThread = getHandler().getTickThread(); + if (Objects.equals(currentThread, tickThread)) { + return Optional.of(unwrap()); + } + return Optional.empty(); + } + + /** + * Locks the acquirable element, execute {@code consumer} synchronously and unlock the thread. + *

+ * Free if the element is already present in the current thread, blocking otherwise. + * + * @param consumer the callback to execute once the element has been safely acquired + * @see #async(Consumer) + */ + default void sync(@NotNull Consumer consumer) { + var acquired = lock(); + consumer.accept(acquired.get()); + acquired.unlock(); + } + + /** + * Async version of {@link #sync(Consumer)}. + * + * @param consumer the callback to execute once the element has been safely acquired + * @see #sync(Consumer) + */ + default void async(@NotNull Consumer consumer) { + // TODO per-thread list + AsyncUtils.runAsync(() -> sync(consumer)); + } + + /** + * Unwrap the contained object unsafely. + *

+ * Should only be considered when thread-safety is not necessary (e.g. comparing positions) + * + * @return the unwrapped value + */ + @NotNull T unwrap(); + + /** + * Gets the {@link Handler} of this acquirable element, + * containing the currently linked thread. + *

+ * Mostly for internal use. + * + * @return this element handler + */ + @ApiStatus.Internal + @NotNull Handler getHandler(); + + class Handler { + + private volatile ThreadProvider.ChunkEntry chunkEntry; + + public ThreadProvider.ChunkEntry getChunkEntry() { + return chunkEntry; + } + + @ApiStatus.Internal + public void refreshChunkEntry(@NotNull ThreadProvider.ChunkEntry chunkEntry) { + this.chunkEntry = chunkEntry; + } + + public TickThread getTickThread() { + return chunkEntry != null ? chunkEntry.getThread() : null; + } + } + +} diff --git a/src/main/java/net/minestom/server/acquirable/AcquirableCollection.java b/src/main/java/net/minestom/server/acquirable/AcquirableCollection.java new file mode 100644 index 000000000..225a725d4 --- /dev/null +++ b/src/main/java/net/minestom/server/acquirable/AcquirableCollection.java @@ -0,0 +1,148 @@ +package net.minestom.server.acquirable; + +import com.google.common.annotations.Beta; +import net.minestom.server.thread.TickThread; +import net.minestom.server.utils.async.AsyncUtils; +import org.jetbrains.annotations.NotNull; + +import java.util.*; +import java.util.function.Consumer; +import java.util.stream.Stream; + +@Beta +public class AcquirableCollection implements Collection> { + + private final Collection> acquirableCollection; + + public AcquirableCollection(Collection> acquirableCollection) { + this.acquirableCollection = acquirableCollection; + } + + public void acquireSync(@NotNull Consumer consumer) { + final Thread currentThread = Thread.currentThread(); + var threadEntitiesMap = retrieveOptionalThreadMap(acquirableCollection, currentThread, consumer); + + // Acquire all the threads one by one + { + for (var entry : threadEntitiesMap.entrySet()) { + final TickThread tickThread = entry.getKey(); + final List values = entry.getValue(); + + var lock = AcquirableImpl.enter(currentThread, tickThread); + for (E value : values) { + consumer.accept(value); + } + AcquirableImpl.leave(lock); + } + } + } + + public void acquireAsync(@NotNull Consumer consumer) { + AsyncUtils.runAsync(() -> acquireSync(consumer)); + } + + public @NotNull Stream unwrap() { + return acquirableCollection.stream().map(Acquirable::unwrap); + } + + @Override + public int size() { + return acquirableCollection.size(); + } + + @Override + public boolean isEmpty() { + return acquirableCollection.isEmpty(); + } + + @Override + public boolean contains(Object o) { + return acquirableCollection.contains(o); + } + + @NotNull + @Override + public Iterator> iterator() { + return acquirableCollection.iterator(); + } + + @NotNull + @Override + public Object[] toArray() { + return acquirableCollection.toArray(); + } + + @NotNull + @Override + public T[] toArray(@NotNull T[] a) { + return acquirableCollection.toArray(a); + } + + @Override + public boolean add(Acquirable eAcquirable) { + return acquirableCollection.add(eAcquirable); + } + + @Override + public boolean remove(Object o) { + return acquirableCollection.remove(o); + } + + @Override + public boolean containsAll(@NotNull Collection c) { + return acquirableCollection.containsAll(c); + } + + @Override + public boolean addAll(@NotNull Collection> c) { + return acquirableCollection.addAll(c); + } + + @Override + public boolean removeAll(@NotNull Collection c) { + return acquirableCollection.removeAll(c); + } + + @Override + public boolean retainAll(@NotNull Collection c) { + return acquirableCollection.retainAll(c); + } + + @Override + public void clear() { + this.acquirableCollection.clear(); + } + + /** + * Creates + * + * @param collection the acquirable collection + * @param currentThread the current thread + * @param consumer the consumer to execute when an element is already in the current thread + * @return a new Thread to acquirable elements map + */ + protected static Map> retrieveOptionalThreadMap(@NotNull Collection> collection, + @NotNull Thread currentThread, + @NotNull Consumer consumer) { + // Separate a collection of acquirable elements into a map of thread->elements + // Useful to reduce the number of acquisition + + Map> threadCacheMap = new HashMap<>(); + for (var element : collection) { + final T value = element.unwrap(); + + final TickThread elementThread = element.getHandler().getTickThread(); + if (currentThread == elementThread) { + // The element is managed in the current thread, consumer can be immediately called + consumer.accept(value); + } else { + // The element is manager in a different thread, cache it + List threadCacheList = threadCacheMap.computeIfAbsent(elementThread, tickThread -> new ArrayList<>()); + threadCacheList.add(value); + } + } + + return threadCacheMap; + } + +} diff --git a/src/main/java/net/minestom/server/acquirable/AcquirableImpl.java b/src/main/java/net/minestom/server/acquirable/AcquirableImpl.java new file mode 100644 index 000000000..809c15253 --- /dev/null +++ b/src/main/java/net/minestom/server/acquirable/AcquirableImpl.java @@ -0,0 +1,77 @@ +package net.minestom.server.acquirable; + +import net.minestom.server.entity.Entity; +import net.minestom.server.thread.TickThread; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Stream; + +class AcquirableImpl implements Acquirable { + + protected static final ThreadLocal> CURRENT_ENTITIES = ThreadLocal.withInitial(Stream::empty); + protected static final AtomicLong WAIT_COUNTER_NANO = new AtomicLong(); + + /** + * Global lock used for synchronization. + */ + private static final ReentrantLock GLOBAL_LOCK = new ReentrantLock(); + + private final T value; + private final Acquirable.Handler handler; + + public AcquirableImpl(@NotNull T value) { + this.value = value; + this.handler = new Acquirable.Handler(); + } + + @Override + public @NotNull T unwrap() { + return value; + } + + @Override + public @NotNull Acquirable.Handler getHandler() { + return handler; + } + + protected static @Nullable ReentrantLock enter(@Nullable Thread currentThread, @Nullable TickThread elementThread) { + // Monitoring + long time = System.nanoTime(); + + ReentrantLock currentLock; + { + final TickThread current = currentThread instanceof TickThread ? + (TickThread) currentThread : null; + currentLock = current != null && current.getLock().isHeldByCurrentThread() ? + current.getLock() : null; + } + if (currentLock != null) + currentLock.unlock(); + + GLOBAL_LOCK.lock(); + + if (currentLock != null) + currentLock.lock(); + + final var lock = elementThread != null ? elementThread.getLock() : null; + final boolean acquired = lock == null || lock.isHeldByCurrentThread(); + if (!acquired) { + lock.lock(); + } + + // Monitoring + AcquirableImpl.WAIT_COUNTER_NANO.addAndGet(System.nanoTime() - time); + + return !acquired ? lock : null; + } + + protected static void leave(@Nullable ReentrantLock lock) { + if (lock != null) { + lock.unlock(); + } + GLOBAL_LOCK.unlock(); + } +} diff --git a/src/main/java/net/minestom/server/acquirable/Acquired.java b/src/main/java/net/minestom/server/acquirable/Acquired.java new file mode 100644 index 000000000..7783648aa --- /dev/null +++ b/src/main/java/net/minestom/server/acquirable/Acquired.java @@ -0,0 +1,52 @@ +package net.minestom.server.acquirable; + +import net.minestom.server.thread.TickThread; +import net.minestom.server.utils.validate.Check; +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.locks.ReentrantLock; + +public class Acquired { + + private final T value; + + private final boolean locked; + private final ReentrantLock lock; + + private boolean unlocked; + + protected static Acquired local(@NotNull T value) { + return new Acquired<>(value, false, null, null); + } + + protected static Acquired locked(@NotNull Acquirable acquirable) { + final Thread currentThread = Thread.currentThread(); + final TickThread tickThread = acquirable.getHandler().getTickThread(); + return new Acquired<>(acquirable.unwrap(), true, currentThread, tickThread); + } + + private Acquired(@NotNull T value, + boolean locked, Thread currentThread, TickThread tickThread) { + this.value = value; + this.locked = locked; + this.lock = locked ? AcquirableImpl.enter(currentThread, tickThread) : null; + } + + public @NotNull T get() { + checkLock(); + return value; + } + + public void unlock() { + checkLock(); + this.unlocked = true; + if (!locked) + return; + AcquirableImpl.leave(lock); + } + + private void checkLock() { + Check.stateCondition(unlocked, "The acquired element has already been unlocked!"); + } + +} diff --git a/src/main/java/net/minestom/server/entity/Entity.java b/src/main/java/net/minestom/server/entity/Entity.java index 5a22544e8..6e81e30e0 100644 --- a/src/main/java/net/minestom/server/entity/Entity.java +++ b/src/main/java/net/minestom/server/entity/Entity.java @@ -1,5 +1,6 @@ package net.minestom.server.entity; +import com.google.common.annotations.Beta; import com.google.common.collect.Queues; import net.kyori.adventure.text.Component; import net.kyori.adventure.text.event.HoverEvent; @@ -8,6 +9,7 @@ import net.kyori.adventure.text.event.HoverEventSource; import net.minestom.server.MinecraftServer; import net.minestom.server.Tickable; import net.minestom.server.Viewable; +import net.minestom.server.acquirable.Acquirable; import net.minestom.server.chat.JsonMessage; import net.minestom.server.collision.BoundingBox; import net.minestom.server.collision.CollisionUtils; @@ -42,6 +44,7 @@ import net.minestom.server.utils.time.Cooldown; import net.minestom.server.utils.time.TimeUnit; import net.minestom.server.utils.time.UpdateOption; import net.minestom.server.utils.validate.Check; +import org.jetbrains.annotations.ApiStatus; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -123,6 +126,8 @@ public class Entity implements Viewable, Tickable, EventHandler, DataContainer, private long ticks; private final EntityTickEvent tickEvent = new EntityTickEvent(this); + private final Acquirable acquirable = Acquirable.of(this); + /** * Lock used to support #switchEntityType */ @@ -466,7 +471,7 @@ public class Entity implements Viewable, Tickable, EventHandler, DataContainer, // Fix current chunk being null if the entity has been spawned before if (currentChunk == null) { - currentChunk = instance.getChunkAt(position); + refreshCurrentChunk(instance.getChunkAt(position)); } // Check if the entity chunk is loaded @@ -833,6 +838,12 @@ public class Entity implements Viewable, Tickable, EventHandler, DataContainer, return currentChunk; } + @ApiStatus.Internal + protected void refreshCurrentChunk(Chunk currentChunk) { + this.currentChunk = currentChunk; + MinecraftServer.getUpdateManager().getThreadProvider().updateEntity(this); + } + /** * Gets the entity current instance. * @@ -866,7 +877,7 @@ public class Entity implements Viewable, Tickable, EventHandler, DataContainer, this.isActive = true; this.instance = instance; - this.currentChunk = instance.getChunkAt(position.getX(), position.getZ()); + refreshCurrentChunk(instance.getChunkAt(position.getX(), position.getZ())); instance.UNSAFE_addEntity(this); spawn(); EntitySpawnEvent entitySpawnEvent = new EntitySpawnEvent(this, instance); @@ -1340,7 +1351,7 @@ public class Entity implements Viewable, Tickable, EventHandler, DataContainer, player.refreshVisibleEntities(newChunk); } - this.currentChunk = newChunk; + refreshCurrentChunk(newChunk); } } @@ -1456,6 +1467,10 @@ public class Entity implements Viewable, Tickable, EventHandler, DataContainer, * WARNING: this does not trigger {@link EntityDeathEvent}. */ public void remove() { + if (isRemoved()) + return; + + MinecraftServer.getUpdateManager().getThreadProvider().removeEntity(this); this.removed = true; this.shouldRemove = true; Entity.ENTITY_BY_ID.remove(id); @@ -1559,6 +1574,16 @@ public class Entity implements Viewable, Tickable, EventHandler, DataContainer, return Objects.requireNonNullElse(this.customSynchronizationCooldown, SYNCHRONIZATION_COOLDOWN); } + @Beta + public @NotNull Acquirable getAcquirable() { + return (Acquirable) acquirable; + } + + @Beta + public @NotNull Acquirable getAcquirable(@NotNull Class clazz) { + return (Acquirable) acquirable; + } + public enum Pose { STANDING, FALL_FLYING, diff --git a/src/main/java/net/minestom/server/entity/Player.java b/src/main/java/net/minestom/server/entity/Player.java index f2b0c45a9..4a7767658 100644 --- a/src/main/java/net/minestom/server/entity/Player.java +++ b/src/main/java/net/minestom/server/entity/Player.java @@ -561,6 +561,9 @@ public class Player extends LivingEntity implements CommandSender, Localizable, @Override public void remove() { + if (isRemoved()) + return; + callEvent(PlayerDisconnectEvent.class, new PlayerDisconnectEvent(this)); super.remove(); diff --git a/src/main/java/net/minestom/server/instance/Instance.java b/src/main/java/net/minestom/server/instance/Instance.java index befe5b053..5392143fa 100644 --- a/src/main/java/net/minestom/server/instance/Instance.java +++ b/src/main/java/net/minestom/server/instance/Instance.java @@ -54,7 +54,7 @@ import java.util.function.Consumer; * WARNING: when making your own implementation registering the instance manually is required * with {@link InstanceManager#registerInstance(Instance)}, and * you need to be sure to signal the {@link UpdateManager} of the changes using - * {@link UpdateManager#signalChunkLoad(Instance, Chunk)} and {@link UpdateManager#signalChunkUnload(Instance, Chunk)}. + * {@link UpdateManager#signalChunkLoad(Chunk)} and {@link UpdateManager#signalChunkUnload(Chunk)}. */ public abstract class Instance implements BlockModifier, Tickable, EventHandler, DataContainer, PacketGroupingAudience { @@ -253,7 +253,7 @@ public abstract class Instance implements BlockModifier, Tickable, EventHandler, * Used when a {@link Chunk} is not currently loaded in memory and need to be retrieved from somewhere else. * Could be read from disk, or generated from scratch. *

- * Be sure to signal the chunk using {@link UpdateManager#signalChunkLoad(Instance, Chunk)} and to cache + * Be sure to signal the chunk using {@link UpdateManager#signalChunkLoad(Chunk)} and to cache * that this chunk has been loaded. *

* WARNING: it has to retrieve a chunk, this is not optional and should execute the callback in all case. @@ -267,7 +267,7 @@ public abstract class Instance implements BlockModifier, Tickable, EventHandler, /** * Called to generated a new {@link Chunk} from scratch. *

- * Be sure to signal the chunk using {@link UpdateManager#signalChunkLoad(Instance, Chunk)} and to cache + * Be sure to signal the chunk using {@link UpdateManager#signalChunkLoad(Chunk)} and to cache * that this chunk has been loaded. *

* This is where you can put your chunk generation code. diff --git a/src/main/java/net/minestom/server/instance/InstanceContainer.java b/src/main/java/net/minestom/server/instance/InstanceContainer.java index 6f81fa438..becdd6961 100644 --- a/src/main/java/net/minestom/server/instance/InstanceContainer.java +++ b/src/main/java/net/minestom/server/instance/InstanceContainer.java @@ -506,7 +506,7 @@ public class InstanceContainer extends Instance { protected void retrieveChunk(int chunkX, int chunkZ, @Nullable ChunkCallback callback) { final boolean loaded = chunkLoader.loadChunk(this, chunkX, chunkZ, chunk -> { cacheChunk(chunk); - UPDATE_MANAGER.signalChunkLoad(this, chunk); + UPDATE_MANAGER.signalChunkLoad(chunk); // Execute callback and event in the instance thread scheduleNextTick(instance -> { callChunkLoadEvent(chunkX, chunkZ); @@ -544,7 +544,7 @@ public class InstanceContainer extends Instance { OptionalCallback.execute(callback, chunk); } - UPDATE_MANAGER.signalChunkLoad(this, chunk); + UPDATE_MANAGER.signalChunkLoad(chunk); callChunkLoadEvent(chunkX, chunkZ); } @@ -641,7 +641,7 @@ public class InstanceContainer extends Instance { final Chunk copiedChunk = chunk.copy(copiedInstance, chunkX, chunkZ); copiedInstance.cacheChunk(copiedChunk); - UPDATE_MANAGER.signalChunkLoad(copiedInstance, copiedChunk); + UPDATE_MANAGER.signalChunkLoad(copiedChunk); } return copiedInstance; @@ -682,7 +682,7 @@ public class InstanceContainer extends Instance { * Adds a {@link Chunk} to the internal instance map. *

* WARNING: the chunk will not automatically be sent to players and - * {@link net.minestom.server.UpdateManager#signalChunkLoad(Instance, Chunk)} must be called manually. + * {@link net.minestom.server.UpdateManager#signalChunkLoad(Chunk)} must be called manually. * * @param chunk the chunk to cache */ @@ -823,7 +823,7 @@ public class InstanceContainer extends Instance { chunk.unload(); - UPDATE_MANAGER.signalChunkUnload(this, chunk); + UPDATE_MANAGER.signalChunkUnload(chunk); } this.scheduledChunksToRemove.clear(); } diff --git a/src/main/java/net/minestom/server/monitoring/BenchmarkManager.java b/src/main/java/net/minestom/server/monitoring/BenchmarkManager.java index f29bf4605..747d425a0 100644 --- a/src/main/java/net/minestom/server/monitoring/BenchmarkManager.java +++ b/src/main/java/net/minestom/server/monitoring/BenchmarkManager.java @@ -41,6 +41,7 @@ public final class BenchmarkManager { THREADS.add(THREAD_NAME_BLOCK_BATCH); THREADS.add(THREAD_NAME_SCHEDULER); + THREADS.add(THREAD_NAME_TICK_SCHEDULER); THREADS.add(THREAD_NAME_TICK); } diff --git a/src/main/java/net/minestom/server/monitoring/TickMonitor.java b/src/main/java/net/minestom/server/monitoring/TickMonitor.java index a92e5d40d..566de4bfe 100644 --- a/src/main/java/net/minestom/server/monitoring/TickMonitor.java +++ b/src/main/java/net/minestom/server/monitoring/TickMonitor.java @@ -3,12 +3,18 @@ package net.minestom.server.monitoring; public class TickMonitor { private final double tickTime; + private final double acquisitionTime; - public TickMonitor(double tickTime) { + public TickMonitor(double tickTime, double acquisitionTime) { this.tickTime = tickTime; + this.acquisitionTime = acquisitionTime; } public double getTickTime() { return tickTime; } -} \ No newline at end of file + + public double getAcquisitionTime() { + return acquisitionTime; + } +} diff --git a/src/main/java/net/minestom/server/thread/PerChunkThreadProvider.java b/src/main/java/net/minestom/server/thread/PerChunkThreadProvider.java new file mode 100644 index 000000000..ccd4365b8 --- /dev/null +++ b/src/main/java/net/minestom/server/thread/PerChunkThreadProvider.java @@ -0,0 +1,28 @@ +package net.minestom.server.thread; + +import net.minestom.server.instance.Chunk; +import org.jetbrains.annotations.NotNull; + +/** + * Each {@link Chunk} gets assigned to a random thread. + */ +public class PerChunkThreadProvider extends ThreadProvider { + + public PerChunkThreadProvider(int threadCount) { + super(threadCount); + } + + public PerChunkThreadProvider() { + super(); + } + + @Override + public long findThread(@NotNull Chunk chunk) { + return chunk.hashCode(); + } + + @Override + public @NotNull RefreshType getChunkRefreshType() { + return RefreshType.NEVER; + } +} diff --git a/src/main/java/net/minestom/server/thread/PerInstanceThreadProvider.java b/src/main/java/net/minestom/server/thread/PerInstanceThreadProvider.java index d1b3a9065..baba20527 100644 --- a/src/main/java/net/minestom/server/thread/PerInstanceThreadProvider.java +++ b/src/main/java/net/minestom/server/thread/PerInstanceThreadProvider.java @@ -4,61 +4,26 @@ import net.minestom.server.instance.Chunk; import net.minestom.server.instance.Instance; import org.jetbrains.annotations.NotNull; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; - /** - * Separates work between instance (1 instance = 1 thread execution). + * Each {@link Instance} gets assigned to a random thread. */ public class PerInstanceThreadProvider extends ThreadProvider { - private final Map> instanceChunkMap = new ConcurrentHashMap<>(); + public PerInstanceThreadProvider(int threadCount) { + super(threadCount); + } - @Override - public void onInstanceCreate(@NotNull Instance instance) { - this.instanceChunkMap.putIfAbsent(instance, ConcurrentHashMap.newKeySet()); + public PerInstanceThreadProvider() { + super(); } @Override - public void onInstanceDelete(@NotNull Instance instance) { - this.instanceChunkMap.remove(instance); + public long findThread(@NotNull Chunk chunk) { + return chunk.getInstance().hashCode(); } @Override - public void onChunkLoad(@NotNull Instance instance, @NotNull Chunk chunk) { - // Add the loaded chunk to the instance chunks list - Set chunks = getChunks(instance); - chunks.add(chunk); + public @NotNull RefreshType getChunkRefreshType() { + return RefreshType.NEVER; } - - @Override - public void onChunkUnload(@NotNull Instance instance, @NotNull Chunk chunk) { - Set chunks = getChunks(instance); - chunks.remove(chunk); - } - - @NotNull - @Override - public List> update(long time) { - List> futures = new ArrayList<>(); - - instanceChunkMap.forEach((instance, chunks) -> futures.add(pool.submit(() -> { - // Tick instance - updateInstance(instance, time); - // Tick chunks - for (Chunk chunk : chunks) { - processChunkTick(instance, chunk, time); - } - }))); - return futures; - } - - private Set getChunks(Instance instance) { - return instanceChunkMap.computeIfAbsent(instance, inst -> ConcurrentHashMap.newKeySet()); - } - } diff --git a/src/main/java/net/minestom/server/thread/SingleThreadProvider.java b/src/main/java/net/minestom/server/thread/SingleThreadProvider.java index d1c02082c..34fa99b98 100644 --- a/src/main/java/net/minestom/server/thread/SingleThreadProvider.java +++ b/src/main/java/net/minestom/server/thread/SingleThreadProvider.java @@ -1,56 +1,24 @@ package net.minestom.server.thread; import net.minestom.server.instance.Chunk; -import net.minestom.server.instance.Instance; import org.jetbrains.annotations.NotNull; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.Future; - /** - * Simple thread provider implementation using a single thread to update all the instances and chunks. + * Uses a single thread for all chunks. */ public class SingleThreadProvider extends ThreadProvider { - { - setThreadCount(1); - } - - private final Set instances = new CopyOnWriteArraySet<>(); - - @Override - public void onInstanceCreate(@NotNull Instance instance) { - this.instances.add(instance); + public SingleThreadProvider() { + super(1); } @Override - public void onInstanceDelete(@NotNull Instance instance) { - this.instances.remove(instance); + public long findThread(@NotNull Chunk chunk) { + return 0; } @Override - public void onChunkLoad(@NotNull Instance instance, @NotNull Chunk chunk) { - - } - - @Override - public void onChunkUnload(@NotNull Instance instance, @NotNull Chunk chunk) { - - } - - @NotNull - @Override - public List> update(long time) { - return Collections.singletonList(pool.submit(() -> { - for (Instance instance : instances) { - updateInstance(instance, time); - for (Chunk chunk : instance.getChunks()) { - processChunkTick(instance, chunk, time); - } - } - })); + public @NotNull RefreshType getChunkRefreshType() { + return RefreshType.NEVER; } } diff --git a/src/main/java/net/minestom/server/thread/ThreadProvider.java b/src/main/java/net/minestom/server/thread/ThreadProvider.java index 1e1874638..74915b729 100644 --- a/src/main/java/net/minestom/server/thread/ThreadProvider.java +++ b/src/main/java/net/minestom/server/thread/ThreadProvider.java @@ -1,251 +1,386 @@ package net.minestom.server.thread; -import io.netty.util.NettyRuntime; import net.minestom.server.MinecraftServer; -import net.minestom.server.entity.*; +import net.minestom.server.acquirable.Acquirable; +import net.minestom.server.entity.Entity; import net.minestom.server.instance.Chunk; import net.minestom.server.instance.Instance; -import net.minestom.server.instance.InstanceContainer; -import net.minestom.server.instance.SharedInstance; -import net.minestom.server.utils.callback.validator.EntityValidator; +import net.minestom.server.utils.MathUtils; import net.minestom.server.utils.chunk.ChunkUtils; -import net.minestom.server.utils.thread.MinestomThread; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.function.Consumer; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; +import java.util.stream.Stream; /** * Used to link chunks into multiple groups. * Then executed into a thread pool. - *

- * You can change the current thread provider by calling {@link net.minestom.server.UpdateManager#setThreadProvider(ThreadProvider)}. */ public abstract class ThreadProvider { - /** - * The thread pool of this thread provider. - */ - protected ExecutorService pool; - /** - * The amount of threads in the thread pool - */ - private int threadCount; + private final List threads; - { - // Default thread count in the pool (cores * 2) - setThreadCount(1); + private final Map> threadChunkMap = new HashMap<>(); + private final Map chunkEntryMap = new HashMap<>(); + // Iterated over to refresh the thread used to update entities & chunks + private final ArrayDeque chunks = new ArrayDeque<>(); + private final Set updatableEntities = ConcurrentHashMap.newKeySet(); + private final Set removedEntities = ConcurrentHashMap.newKeySet(); + + public ThreadProvider(int threadCount) { + this.threads = new ArrayList<>(threadCount); + + for (int i = 0; i < threadCount; i++) { + final TickThread.BatchRunnable batchRunnable = new TickThread.BatchRunnable(); + final TickThread tickThread = new TickThread(batchRunnable, i); + this.threads.add(tickThread); + + tickThread.start(); + } } - /** - * Called when an {@link Instance} is registered. - * - * @param instance the newly create {@link Instance} - */ - public abstract void onInstanceCreate(@NotNull Instance instance); + public ThreadProvider() { + this(Runtime.getRuntime().availableProcessors()); + } - /** - * Called when an {@link Instance} is unregistered. - * - * @param instance the deleted {@link Instance} - */ - public abstract void onInstanceDelete(@NotNull Instance instance); + public synchronized void onInstanceCreate(@NotNull Instance instance) { + instance.getChunks().forEach(this::addChunk); + } - /** - * Called when a chunk is loaded. - *

- * Be aware that this is possible for an instance to load chunks before being registered. - * - * @param instance the instance of the chunk - * @param chunk the loaded chunk - */ - public abstract void onChunkLoad(@NotNull Instance instance, @NotNull Chunk chunk); + public synchronized void onInstanceDelete(@NotNull Instance instance) { + instance.getChunks().forEach(this::removeChunk); + } - /** - * Called when a chunk is unloaded. - * - * @param instance the instance of the chunk - * @param chunk the unloaded chunk - */ - public abstract void onChunkUnload(@NotNull Instance instance, @NotNull Chunk chunk); + public synchronized void onChunkLoad(Chunk chunk) { + addChunk(chunk); + } + + public synchronized void onChunkUnload(Chunk chunk) { + removeChunk(chunk); + } /** * Performs a server tick for all chunks based on their linked thread. * - * @param time the update time in milliseconds - * @return the futures to execute to complete the tick + * @param chunk the chunk */ - @NotNull - public abstract List> update(long time); + public abstract long findThread(@NotNull Chunk chunk); /** - * Gets the current size of the thread pool. + * Defines how often chunks thread should be updated. * - * @return the thread pool's size + * @return the refresh type */ - public int getThreadCount() { - return threadCount; + public @NotNull RefreshType getChunkRefreshType() { + return RefreshType.CONSTANT; } /** - * Changes the amount of threads in the thread pool. + * Represents the maximum percentage of tick time that can be spent refreshing chunks thread. + *

+ * Percentage based on {@link MinecraftServer#TICK_MS}. * - * @param threadCount the new amount of threads + * @return the refresh percentage */ - public synchronized void setThreadCount(int threadCount) { - this.threadCount = threadCount; - refreshPool(); + public float getRefreshPercentage() { + return 0.3f; } - private void refreshPool() { - if (pool != null) { - this.pool.shutdown(); + /** + * Minimum time used to refresh chunks & entities thread. + * + * @return the minimum refresh time in milliseconds + */ + public int getMinimumRefreshTime() { + return 3; + } + + /** + * Maximum time used to refresh chunks & entities thread. + * + * @return the maximum refresh time in milliseconds + */ + public int getMaximumRefreshTime() { + return (int) (MinecraftServer.TICK_MS * 0.3); + } + + /** + * Prepares the update by creating the {@link TickThread} tasks. + * + * @param time the tick time in milliseconds + */ + public synchronized @NotNull CountDownLatch update(long time) { + CountDownLatch countDownLatch = new CountDownLatch(threads.size()); + for (TickThread thread : threads) { + // Execute tick + thread.runnable.startTick(countDownLatch, () -> { + final var chunkEntries = threadChunkMap.get(thread); + if (chunkEntries == null || chunkEntries.isEmpty()) { + // Nothing to tick + Acquirable.refreshEntities(Stream.empty()); + return; + } + + final var entities = chunkEntries.stream() + .flatMap(chunkEntry -> chunkEntry.entities.stream()); + Acquirable.refreshEntities(entities); + + final ReentrantLock lock = thread.getLock(); + lock.lock(); + chunkEntries.forEach(chunkEntry -> { + Chunk chunk = chunkEntry.chunk; + if (!ChunkUtils.isLoaded(chunk)) + return; + chunk.tick(time); + chunkEntry.entities.forEach(entity -> { + final boolean hasQueue = lock.hasQueuedThreads(); + if (hasQueue) { + lock.unlock(); + // #acquire callbacks should be called here + lock.lock(); + } + entity.tick(time); + }); + }); + Acquirable.refreshEntities(Stream.empty()); + lock.unlock(); + }); } - this.pool = new MinestomThread(threadCount, MinecraftServer.THREAD_NAME_TICK); + return countDownLatch; } - // INSTANCE UPDATE - /** - * Processes a whole tick for a chunk. + * Called at the end of each tick to clear removed entities, + * refresh the chunk linked to an entity, and chunk threads based on {@link #findThread(Chunk)}. * - * @param instance the instance of the chunk - * @param chunk the chunk to update - * @param time the time of the update in milliseconds + * @param tickTime the duration of the tick in ms, + * used to ensure that the refresh does not take more time than the tick itself */ - protected void processChunkTick(@NotNull Instance instance, @NotNull Chunk chunk, long time) { - if (!ChunkUtils.isLoaded(chunk)) + public synchronized void refreshThreads(long tickTime) { + // Clear removed entities + processRemovedEntities(); + // Update entities chunks + processUpdatedEntities(); + + if (getChunkRefreshType() == RefreshType.NEVER) return; - updateChunk(instance, chunk, time); - updateEntities(instance, chunk, time); + final int timeOffset = MathUtils.clamp((int) ((double) tickTime * getRefreshPercentage()), + getMinimumRefreshTime(), getMaximumRefreshTime()); + final long endTime = System.currentTimeMillis() + timeOffset; + final int size = chunks.size(); + int counter = 0; + while (true) { + Chunk chunk = chunks.pollFirst(); + if (!ChunkUtils.isLoaded(chunk)) { + removeChunk(chunk); + return; + } + + // Update chunk threads + switchChunk(chunk); + + // Add back to the deque + chunks.addLast(chunk); + + if (++counter > size) + break; + + if (System.currentTimeMillis() >= endTime) + break; + } } /** - * Executes an instance tick. + * Add an entity into the waiting list to get assigned in a thread. + *

+ * Called when entering a new chunk. * - * @param instance the instance - * @param time the current time in ms + * @param entity the entity to add */ - protected void updateInstance(@NotNull Instance instance, long time) { - // The instance - instance.tick(time); + public void updateEntity(@NotNull Entity entity) { + this.updatableEntities.add(entity); } /** - * Executes a chunk tick (blocks update). + * Add an entity into the waiting list to get removed from its thread. + *

+ * Called in {@link Entity#remove()}. * - * @param instance the chunk's instance - * @param chunk the chunk - * @param time the current time in ms + * @param entity the entity to remove */ - protected void updateChunk(@NotNull Instance instance, @NotNull Chunk chunk, long time) { - chunk.tick(time); - } - - // ENTITY UPDATE - - /** - * Executes an entity tick (all entities type creatures/objects/players) in an instance's chunk. - * - * @param instance the chunk's instance - * @param chunk the chunk - * @param time the current time in ms - */ - protected void updateEntities(@NotNull Instance instance, @NotNull Chunk chunk, long time) { - conditionalEntityUpdate(instance, chunk, time, null); + public void removeEntity(@NotNull Entity entity) { + this.removedEntities.add(entity); } /** - * Executes an entity tick for object entities in an instance's chunk. - * - * @param instance the chunk's instance - * @param chunk the chunk - * @param time the current time in ms + * Shutdowns all the {@link TickThread tick threads}. + *

+ * Action is irreversible. */ - protected void updateObjectEntities(@NotNull Instance instance, @NotNull Chunk chunk, long time) { - conditionalEntityUpdate(instance, chunk, time, entity -> entity instanceof ObjectEntity); + public void shutdown() { + this.threads.forEach(TickThread::shutdown); } /** - * Executes an entity tick for living entities in an instance's chunk. + * Gets all the {@link TickThread tick threads}. * - * @param instance the chunk's instance - * @param chunk the chunk - * @param time the current time in ms + * @return the tick threads */ - protected void updateLivingEntities(@NotNull Instance instance, @NotNull Chunk chunk, long time) { - conditionalEntityUpdate(instance, chunk, time, entity -> entity instanceof LivingEntity); + public @NotNull List<@NotNull TickThread> getThreads() { + return threads; + } + + protected void addChunk(@NotNull Chunk chunk) { + ChunkEntry chunkEntry = setChunkThread(chunk, (thread) -> new ChunkEntry(thread, chunk)); + this.chunkEntryMap.put(chunk, chunkEntry); + this.chunks.add(chunk); + } + + protected void switchChunk(@NotNull Chunk chunk) { + ChunkEntry chunkEntry = chunkEntryMap.get(chunk); + if (chunkEntry == null) + return; + var chunks = threadChunkMap.get(chunkEntry.thread); + if (chunks == null || chunks.isEmpty()) + return; + chunks.remove(chunkEntry); + + setChunkThread(chunk, tickThread -> { + chunkEntry.thread = tickThread; + return chunkEntry; + }); + } + + protected @NotNull ChunkEntry setChunkThread(@NotNull Chunk chunk, + @NotNull Function chunkEntrySupplier) { + final int threadId = getThreadId(chunk); + TickThread thread = threads.get(threadId); + var chunks = threadChunkMap.computeIfAbsent(thread, tickThread -> ConcurrentHashMap.newKeySet()); + + ChunkEntry chunkEntry = chunkEntrySupplier.apply(thread); + chunks.add(chunkEntry); + return chunkEntry; + } + + protected void removeChunk(Chunk chunk) { + ChunkEntry chunkEntry = chunkEntryMap.get(chunk); + if (chunkEntry != null) { + TickThread thread = chunkEntry.thread; + var chunks = threadChunkMap.get(thread); + if (chunks != null) { + chunks.remove(chunkEntry); + } + chunkEntryMap.remove(chunk); + } + this.chunks.remove(chunk); } /** - * Executes an entity tick for creatures entities in an instance's chunk. + * Finds the thread id associated to a {@link Chunk}. * - * @param instance the chunk's instance - * @param chunk the chunk - * @param time the current time in ms + * @param chunk the chunk to find the thread id from + * @return the chunk thread id */ - protected void updateCreatures(@NotNull Instance instance, @NotNull Chunk chunk, long time) { - conditionalEntityUpdate(instance, chunk, time, entity -> entity instanceof EntityCreature); + protected int getThreadId(@NotNull Chunk chunk) { + return (int) (Math.abs(findThread(chunk)) % threads.size()); } - /** - * Executes an entity tick for players in an instance's chunk. - * - * @param instance the chunk's instance - * @param chunk the chunk - * @param time the current time in ms - */ - protected void updatePlayers(@NotNull Instance instance, @NotNull Chunk chunk, long time) { - conditionalEntityUpdate(instance, chunk, time, entity -> entity instanceof Player); + private void processRemovedEntities() { + if (removedEntities.isEmpty()) + return; + for (Entity entity : removedEntities) { + var acquirableEntity = entity.getAcquirable(); + ChunkEntry chunkEntry = acquirableEntity.getHandler().getChunkEntry(); + // Remove from list + if (chunkEntry != null) { + chunkEntry.entities.remove(entity); + } + } + this.removedEntities.clear(); } - /** - * Executes an entity tick in an instance's chunk if condition is verified. - * - * @param instance the chunk's instance - * @param chunk the chunk - * @param time the current time in ms - * @param condition the condition which confirm if the update happens or not - */ - protected void conditionalEntityUpdate(@NotNull Instance instance, @NotNull Chunk chunk, long time, - @Nullable EntityValidator condition) { - if (!instance.getEntities().isEmpty()) { - final Set entities = instance.getChunkEntities(chunk); + private void processUpdatedEntities() { + if (updatableEntities.isEmpty()) + return; + for (Entity entity : updatableEntities) { + var acquirableEntity = entity.getAcquirable(); + ChunkEntry handlerChunkEntry = acquirableEntity.getHandler().getChunkEntry(); - if (!entities.isEmpty()) { - for (Entity entity : entities) { - if (condition != null && !condition.isValid(entity)) - continue; - entity.tick(time); + Chunk entityChunk = entity.getChunk(); + + // Entity is possibly not in the correct thread + + // Remove from previous list + { + if (handlerChunkEntry != null) { + handlerChunkEntry.entities.remove(entity); + } + } + + // Add to new list + { + ChunkEntry chunkEntry = chunkEntryMap.get(entityChunk); + if (chunkEntry != null) { + chunkEntry.entities.add(entity); + acquirableEntity.getHandler().refreshChunkEntry(chunkEntry); } } } - - updateSharedInstances(instance, sharedInstance -> conditionalEntityUpdate(sharedInstance, chunk, time, condition)); + this.updatableEntities.clear(); } /** - * If {@code instance} is an {@link InstanceContainer}, run a callback for all of its - * {@link SharedInstance}. - * - * @param instance the instance - * @param callback the callback to run for all the {@link SharedInstance} + * Defines how often chunks thread should be refreshed. */ - private void updateSharedInstances(@NotNull Instance instance, @NotNull Consumer callback) { - if (instance instanceof InstanceContainer) { - final InstanceContainer instanceContainer = (InstanceContainer) instance; + public enum RefreshType { + /** + * Chunk thread is constant after being defined. + */ + NEVER, + /** + * Chunk thread should be recomputed as often as possible. + */ + CONSTANT, + /** + * Chunk thread should be recomputed, but not continuously. + */ + RARELY + } - if (!instanceContainer.hasSharedInstances()) - return; + public static class ChunkEntry { + private volatile TickThread thread; + private final Chunk chunk; + private final List entities = new ArrayList<>(); - for (SharedInstance sharedInstance : instanceContainer.getSharedInstances()) { - callback.accept(sharedInstance); - } + private ChunkEntry(TickThread thread, Chunk chunk) { + this.thread = thread; + this.chunk = chunk; + } + + public @NotNull TickThread getThread() { + return thread; + } + + public @NotNull Chunk getChunk() { + return chunk; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ChunkEntry that = (ChunkEntry) o; + return chunk.equals(that.chunk); + } + + @Override + public int hashCode() { + return Objects.hash(chunk); } } diff --git a/src/main/java/net/minestom/server/thread/TickThread.java b/src/main/java/net/minestom/server/thread/TickThread.java new file mode 100644 index 000000000..d988de2d9 --- /dev/null +++ b/src/main/java/net/minestom/server/thread/TickThread.java @@ -0,0 +1,103 @@ +package net.minestom.server.thread; + +import net.minestom.server.MinecraftServer; +import net.minestom.server.utils.validate.Check; +import org.jetbrains.annotations.NotNull; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Thread responsible for ticking {@link net.minestom.server.instance.Chunk chunks} and {@link net.minestom.server.entity.Entity entities}. + *

+ * Created in {@link ThreadProvider}, and awaken every tick with a task to execute. + */ +public class TickThread extends Thread { + + protected final BatchRunnable runnable; + private final ReentrantLock lock = new ReentrantLock(); + + public TickThread(@NotNull BatchRunnable runnable, int number) { + super(runnable, MinecraftServer.THREAD_NAME_TICK + "-" + number); + this.runnable = runnable; + + this.runnable.setLinkedThread(this); + } + + /** + * Gets the lock used to ensure the safety of entity acquisition. + * + * @return the thread lock + */ + public @NotNull ReentrantLock getLock() { + return lock; + } + + /** + * Shutdowns the thread. Cannot be undone. + */ + public void shutdown() { + this.runnable.stop = true; + LockSupport.unpark(this); + } + + protected static class BatchRunnable implements Runnable { + + private volatile boolean stop; + private TickThread tickThread; + + private volatile boolean inTick; + private final AtomicReference countDownLatch = new AtomicReference<>(); + + private final Queue queue = new ConcurrentLinkedQueue<>(); + + @Override + public void run() { + Check.notNull(tickThread, "The linked BatchThread cannot be null!"); + while (!stop) { + LockSupport.park(tickThread); + if (stop) + break; + CountDownLatch localCountDownLatch = this.countDownLatch.get(); + + // The latch is necessary to control the tick rates + if (localCountDownLatch == null) { + continue; + } + + this.inTick = true; + + // Execute all pending runnable + Runnable runnable; + while ((runnable = queue.poll()) != null) { + runnable.run(); + } + + localCountDownLatch.countDown(); + this.countDownLatch.compareAndSet(localCountDownLatch, null); + + // Wait for the next notify (game tick) + this.inTick = false; + } + } + + public void startTick(@NotNull CountDownLatch countDownLatch, @NotNull Runnable runnable) { + this.countDownLatch.set(countDownLatch); + this.queue.add(runnable); + LockSupport.unpark(tickThread); + } + + public boolean isInTick() { + return inTick; + } + + private void setLinkedThread(TickThread tickThread) { + this.tickThread = tickThread; + } + } + +} diff --git a/src/test/java/demo/PlayerInit.java b/src/test/java/demo/PlayerInit.java index 043319dcc..34984b591 100644 --- a/src/test/java/demo/PlayerInit.java +++ b/src/test/java/demo/PlayerInit.java @@ -1,10 +1,8 @@ package demo; -import com.google.common.util.concurrent.AtomicDouble; import demo.generator.ChunkGeneratorDemo; import demo.generator.NoiseTestGenerator; import net.kyori.adventure.text.Component; -import net.kyori.adventure.text.format.TextColor; import net.minestom.server.MinecraftServer; import net.minestom.server.adventure.audience.Audiences; import net.minestom.server.chat.ColoredText; @@ -32,8 +30,8 @@ import net.minestom.server.item.ItemTag; import net.minestom.server.item.Material; import net.minestom.server.item.metadata.CompassMeta; import net.minestom.server.monitoring.BenchmarkManager; +import net.minestom.server.monitoring.TickMonitor; import net.minestom.server.network.ConnectionManager; -import net.minestom.server.ping.ResponseDataConsumer; import net.minestom.server.utils.MathUtils; import net.minestom.server.utils.Position; import net.minestom.server.utils.Vector; @@ -42,14 +40,13 @@ import net.minestom.server.world.DimensionType; import java.util.Collection; import java.util.Collections; +import java.util.Random; import java.util.Set; -import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicReference; public class PlayerInit { - private static final InstanceContainer instanceContainer; - private static final Inventory inventory; static { @@ -57,9 +54,12 @@ public class PlayerInit { //StorageLocation storageLocation = MinecraftServer.getStorageManager().getLocation("instance_data", new StorageOptions().setCompression(true)); ChunkGeneratorDemo chunkGeneratorDemo = new ChunkGeneratorDemo(); NoiseTestGenerator noiseTestGenerator = new NoiseTestGenerator(); - instanceContainer = instanceManager.createInstanceContainer(DimensionType.OVERWORLD); - instanceContainer.enableAutoChunkLoad(true); - instanceContainer.setChunkGenerator(chunkGeneratorDemo); + + for (int i = 0; i < 4; i++) { + InstanceContainer instanceContainer = instanceManager.createInstanceContainer(DimensionType.OVERWORLD); + instanceContainer.enableAutoChunkLoad(true); + instanceContainer.setChunkGenerator(chunkGeneratorDemo); + } inventory = new Inventory(InventoryType.CHEST_1_ROW, Component.text("Test inventory")); /*inventory.addInventoryCondition((p, slot, clickType, inventoryConditionResult) -> { @@ -91,14 +91,14 @@ public class PlayerInit { } - private static AtomicDouble LAST_TICK_TIME = new AtomicDouble(); + private static AtomicReference LAST_TICK = new AtomicReference<>(); public static void init() { ConnectionManager connectionManager = MinecraftServer.getConnectionManager(); BenchmarkManager benchmarkManager = MinecraftServer.getBenchmarkManager(); MinecraftServer.getUpdateManager().addTickMonitor(tickMonitor -> - LAST_TICK_TIME.set(tickMonitor.getTickTime())); + LAST_TICK.set(tickMonitor)); MinecraftServer.getSchedulerManager().buildTask(() -> { @@ -110,9 +110,13 @@ public class PlayerInit { long ramUsage = benchmarkManager.getUsedMemory(); ramUsage /= 1e6; // bytes to MB + TickMonitor tickMonitor = LAST_TICK.get(); + final Component header = Component.text("RAM USAGE: " + ramUsage + " MB") .append(Component.newline()) - .append(Component.text("TICK TIME: " + MathUtils.round(LAST_TICK_TIME.get(), 2) + "ms")); + .append(Component.text("TICK TIME: " + MathUtils.round(tickMonitor.getTickTime(), 2) + "ms")) + .append(Component.newline()) + .append(Component.text("ACQ TIME: " + MathUtils.round(tickMonitor.getAcquisitionTime(), 2) + "ms")); final Component footer = benchmarkManager.getCpuMonitoringMessage(); Audiences.players().sendPlayerListHeaderAndFooter(header, footer); @@ -183,7 +187,7 @@ public class PlayerInit { final CustomBlock customBlock = player.getInstance().getCustomBlock(event.getBlockPosition()); final Block block = Block.fromStateId(blockStateId); player.sendMessage("You clicked at the block " + block + " " + customBlock); - player.sendMessage("CHUNK COUNT " + instanceContainer.getChunks().size()); + player.sendMessage("CHUNK COUNT " + player.getInstance().getChunks().size()); }); globalEventHandler.addEventCallback(PickupItemEvent.class, event -> { @@ -214,11 +218,12 @@ public class PlayerInit { globalEventHandler.addEventCallback(PlayerLoginEvent.class, event -> { final Player player = event.getPlayer(); - player.sendMessage("test"); - event.setSpawningInstance(instanceContainer); - int x = Math.abs(ThreadLocalRandom.current().nextInt()) % 1000 - 250; - int z = Math.abs(ThreadLocalRandom.current().nextInt()) % 1000 - 250; + var instances = MinecraftServer.getInstanceManager().getInstances(); + Instance instance = instances.stream().skip(new Random().nextInt(instances.size())).findFirst().orElse(null); + event.setSpawningInstance(instance); + int x = Math.abs(ThreadLocalRandom.current().nextInt()) % 500 - 250; + int z = Math.abs(ThreadLocalRandom.current().nextInt()) % 500 - 250; player.setRespawnPoint(new Position(0, 42f, 0)); player.getInventory().addInventoryCondition((p, slot, clickType, inventoryConditionResult) -> { @@ -288,7 +293,7 @@ public class PlayerInit { // Unload the chunk (save memory) if it has no remaining viewer if (chunk.getViewers().isEmpty()) { - //player.getInstance().unloadChunk(chunk); + player.getInstance().unloadChunk(chunk); } }); }