Merge pull request #242 from Minestom/acquirable

Acquirable API
This commit is contained in:
TheMode 2021-04-26 16:35:24 +02:00 committed by GitHub
commit c4894cd684
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1005 additions and 334 deletions

View File

@ -74,6 +74,7 @@ public final class MinecraftServer {
// Threads
public static final String THREAD_NAME_BENCHMARK = "Ms-Benchmark";
public static final String THREAD_NAME_TICK_SCHEDULER = "Ms-TickScheduler";
public static final String THREAD_NAME_TICK = "Ms-Tick";
public static final String THREAD_NAME_BLOCK_BATCH = "Ms-BlockBatchPool";

View File

@ -1,13 +1,14 @@
package net.minestom.server;
import com.google.common.collect.Queues;
import net.minestom.server.acquirable.Acquirable;
import net.minestom.server.instance.Chunk;
import net.minestom.server.instance.Instance;
import net.minestom.server.instance.InstanceManager;
import net.minestom.server.monitoring.TickMonitor;
import net.minestom.server.network.ConnectionManager;
import net.minestom.server.network.player.NettyPlayerConnection;
import net.minestom.server.thread.PerInstanceThreadProvider;
import net.minestom.server.thread.SingleThreadProvider;
import net.minestom.server.thread.ThreadProvider;
import net.minestom.server.utils.async.AsyncUtils;
import org.jetbrains.annotations.NotNull;
@ -21,28 +22,22 @@ import java.util.function.LongConsumer;
/**
* Manager responsible for the server ticks.
* <p>
* The {@link ThreadProvider} manages the multi-thread aspect for {@link Instance} ticks,
* it can be modified with {@link #setThreadProvider(ThreadProvider)}.
* The {@link ThreadProvider} manages the multi-thread aspect of chunk ticks.
*/
public final class UpdateManager {
private final ScheduledExecutorService updateExecutionService = Executors.newSingleThreadScheduledExecutor(r ->
new Thread(r, "tick-scheduler"));
new Thread(r, MinecraftServer.THREAD_NAME_TICK_SCHEDULER));
private volatile boolean stopRequested;
private ThreadProvider threadProvider;
// TODO make configurable
private ThreadProvider threadProvider = new SingleThreadProvider();
private final Queue<LongConsumer> tickStartCallbacks = Queues.newConcurrentLinkedQueue();
private final Queue<LongConsumer> tickEndCallbacks = Queues.newConcurrentLinkedQueue();
private final List<Consumer<TickMonitor>> tickMonitors = new CopyOnWriteArrayList<>();
{
// DEFAULT THREAD PROVIDER
//threadProvider = new PerGroupChunkProvider();
threadProvider = new PerInstanceThreadProvider();
}
/**
* Should only be created in MinecraftServer.
*/
@ -85,9 +80,12 @@ public final class UpdateManager {
// Monitoring
if (!tickMonitors.isEmpty()) {
final double acquisitionTimeMs = Acquirable.getAcquiringTime() / 1e6D;
final double tickTimeMs = tickTime / 1e6D;
final TickMonitor tickMonitor = new TickMonitor(tickTimeMs);
final TickMonitor tickMonitor = new TickMonitor(tickTimeMs, acquisitionTimeMs);
this.tickMonitors.forEach(consumer -> consumer.accept(tickMonitor));
Acquirable.resetAcquiringTime();
}
// Flush all waiting packets
@ -108,21 +106,23 @@ public final class UpdateManager {
* @param tickStart the time of the tick in milliseconds
*/
private void serverTick(long tickStart) {
List<Future<?>> futures;
// Tick all instances
MinecraftServer.getInstanceManager().getInstances().forEach(instance ->
instance.tick(tickStart));
// Server tick (instance/chunk/entity)
// Synchronize with the update manager instance, like the signal for chunk load/unload
synchronized (this) {
futures = threadProvider.update(tickStart);
// Tick all chunks (and entities inside)
final CountDownLatch countDownLatch = threadProvider.update(tickStart);
// Wait tick end
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
for (final Future<?> future : futures) {
try {
future.get();
} catch (Throwable e) {
MinecraftServer.getExceptionManager().handleException(e);
}
}
// Clear removed entities & update threads
long tickTime = System.currentTimeMillis() - tickStart;
this.threadProvider.refreshThreads(tickTime);
}
/**
@ -145,20 +145,10 @@ public final class UpdateManager {
*
* @return the current thread provider
*/
public ThreadProvider getThreadProvider() {
public @NotNull ThreadProvider getThreadProvider() {
return threadProvider;
}
/**
* Changes the server {@link ThreadProvider}.
*
* @param threadProvider the new thread provider
* @throws NullPointerException if <code>threadProvider</code> is null
*/
public synchronized void setThreadProvider(ThreadProvider threadProvider) {
this.threadProvider = threadProvider;
}
/**
* Signals the {@link ThreadProvider} that an instance has been created.
* <p>
@ -166,9 +156,7 @@ public final class UpdateManager {
*
* @param instance the instance
*/
public synchronized void signalInstanceCreate(Instance instance) {
if (this.threadProvider == null)
return;
public void signalInstanceCreate(Instance instance) {
this.threadProvider.onInstanceCreate(instance);
}
@ -179,9 +167,7 @@ public final class UpdateManager {
*
* @param instance the instance
*/
public synchronized void signalInstanceDelete(Instance instance) {
if (this.threadProvider == null)
return;
public void signalInstanceDelete(Instance instance) {
this.threadProvider.onInstanceDelete(instance);
}
@ -190,13 +176,10 @@ public final class UpdateManager {
* <p>
* WARNING: should be automatically done by the {@link Instance} implementation.
*
* @param instance the instance of the chunk
* @param chunk the loaded chunk
* @param chunk the loaded chunk
*/
public synchronized void signalChunkLoad(Instance instance, @NotNull Chunk chunk) {
if (this.threadProvider == null)
return;
this.threadProvider.onChunkLoad(instance, chunk);
public void signalChunkLoad(@NotNull Chunk chunk) {
this.threadProvider.onChunkLoad(chunk);
}
/**
@ -204,13 +187,10 @@ public final class UpdateManager {
* <p>
* WARNING: should be automatically done by the {@link Instance} implementation.
*
* @param instance the instance of the chunk
* @param chunk the unloaded chunk
* @param chunk the unloaded chunk
*/
public synchronized void signalChunkUnload(Instance instance, @NotNull Chunk chunk) {
if (this.threadProvider == null)
return;
this.threadProvider.onChunkUnload(instance, chunk);
public void signalChunkUnload(@NotNull Chunk chunk) {
this.threadProvider.onChunkUnload(chunk);
}
/**
@ -265,6 +245,7 @@ public final class UpdateManager {
* Stops the server loop.
*/
public void stop() {
stopRequested = true;
this.stopRequested = true;
this.threadProvider.shutdown();
}
}

View File

@ -0,0 +1,173 @@
package net.minestom.server.acquirable;
import com.google.common.annotations.Beta;
import net.minestom.server.entity.Entity;
import net.minestom.server.thread.ThreadProvider;
import net.minestom.server.thread.TickThread;
import net.minestom.server.utils.async.AsyncUtils;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Stream;
@Beta
public interface Acquirable<T> {
/**
* Gets all the {@link Entity entities} being ticked in the current thread.
* <p>
* Useful when you want to ensure that no acquisition is ever done.
*
* @return the entities ticked in the current thread
*/
static @NotNull Stream<@NotNull Entity> currentEntities() {
return AcquirableImpl.CURRENT_ENTITIES.get();
}
/**
* Changes the stream returned by {@link #currentEntities()}.
* <p>
* Mostly for internal use, external calls are unrecommended as they could lead
* to unexpected behavior.
*
* @param entities the new entity stream
*/
@ApiStatus.Internal
static void refreshEntities(@NotNull Stream<@NotNull Entity> entities) {
AcquirableImpl.CURRENT_ENTITIES.set(entities);
}
/**
* Gets the time spent acquiring since last tick.
*
* @return the acquiring time
*/
static long getAcquiringTime() {
return AcquirableImpl.WAIT_COUNTER_NANO.get();
}
/**
* Resets {@link #getAcquiringTime()}.
* <p>
* Mostly for internal use.
*/
@ApiStatus.Internal
static void resetAcquiringTime() {
AcquirableImpl.WAIT_COUNTER_NANO.set(0);
}
/**
* Creates a new {@link Acquirable} object.
* <p>
* Mostly for internal use, as a {@link TickThread} has to be used
* and properly synchronized.
*
* @param value the acquirable element
* @param <T> the acquirable element type
* @return a new acquirable object
*/
@ApiStatus.Internal
static <T> @NotNull Acquirable<T> of(@NotNull T value) {
return new AcquirableImpl<>(value);
}
/**
* Returns a new {@link Acquired} object which will be locked to the current thread.
* <p>
* Useful when your code cannot be done inside a callback and need to be sync.
* Do not forget to call {@link Acquired#unlock()} once you are done with it.
*
* @return an acquired object
* @see #sync(Consumer) for auto-closeable capability
*/
default @NotNull Acquired<T> lock() {
var optional = local();
return optional.map(Acquired::local).orElseGet(() -> Acquired.locked(this));
}
/**
* Retrieves the acquirable value if and only if the element
* is already present/ticked in the current thread.
* <p>
* Useful when you want only want to acquire an element when you are guaranteed
* to do not create a huge performance impact.
*
* @return an optional containing the acquired element if safe,
* {@link Optional#empty()} otherwise
*/
default @NotNull Optional<T> local() {
final Thread currentThread = Thread.currentThread();
final TickThread tickThread = getHandler().getTickThread();
if (Objects.equals(currentThread, tickThread)) {
return Optional.of(unwrap());
}
return Optional.empty();
}
/**
* Locks the acquirable element, execute {@code consumer} synchronously and unlock the thread.
* <p>
* Free if the element is already present in the current thread, blocking otherwise.
*
* @param consumer the callback to execute once the element has been safely acquired
* @see #async(Consumer)
*/
default void sync(@NotNull Consumer<T> consumer) {
var acquired = lock();
consumer.accept(acquired.get());
acquired.unlock();
}
/**
* Async version of {@link #sync(Consumer)}.
*
* @param consumer the callback to execute once the element has been safely acquired
* @see #sync(Consumer)
*/
default void async(@NotNull Consumer<T> consumer) {
// TODO per-thread list
AsyncUtils.runAsync(() -> sync(consumer));
}
/**
* Unwrap the contained object unsafely.
* <p>
* Should only be considered when thread-safety is not necessary (e.g. comparing positions)
*
* @return the unwrapped value
*/
@NotNull T unwrap();
/**
* Gets the {@link Handler} of this acquirable element,
* containing the currently linked thread.
* <p>
* Mostly for internal use.
*
* @return this element handler
*/
@ApiStatus.Internal
@NotNull Handler getHandler();
class Handler {
private volatile ThreadProvider.ChunkEntry chunkEntry;
public ThreadProvider.ChunkEntry getChunkEntry() {
return chunkEntry;
}
@ApiStatus.Internal
public void refreshChunkEntry(@NotNull ThreadProvider.ChunkEntry chunkEntry) {
this.chunkEntry = chunkEntry;
}
public TickThread getTickThread() {
return chunkEntry != null ? chunkEntry.getThread() : null;
}
}
}

View File

@ -0,0 +1,148 @@
package net.minestom.server.acquirable;
import com.google.common.annotations.Beta;
import net.minestom.server.thread.TickThread;
import net.minestom.server.utils.async.AsyncUtils;
import org.jetbrains.annotations.NotNull;
import java.util.*;
import java.util.function.Consumer;
import java.util.stream.Stream;
@Beta
public class AcquirableCollection<E> implements Collection<Acquirable<E>> {
private final Collection<Acquirable<E>> acquirableCollection;
public AcquirableCollection(Collection<Acquirable<E>> acquirableCollection) {
this.acquirableCollection = acquirableCollection;
}
public void acquireSync(@NotNull Consumer<E> consumer) {
final Thread currentThread = Thread.currentThread();
var threadEntitiesMap = retrieveOptionalThreadMap(acquirableCollection, currentThread, consumer);
// Acquire all the threads one by one
{
for (var entry : threadEntitiesMap.entrySet()) {
final TickThread tickThread = entry.getKey();
final List<E> values = entry.getValue();
var lock = AcquirableImpl.enter(currentThread, tickThread);
for (E value : values) {
consumer.accept(value);
}
AcquirableImpl.leave(lock);
}
}
}
public void acquireAsync(@NotNull Consumer<E> consumer) {
AsyncUtils.runAsync(() -> acquireSync(consumer));
}
public @NotNull Stream<E> unwrap() {
return acquirableCollection.stream().map(Acquirable::unwrap);
}
@Override
public int size() {
return acquirableCollection.size();
}
@Override
public boolean isEmpty() {
return acquirableCollection.isEmpty();
}
@Override
public boolean contains(Object o) {
return acquirableCollection.contains(o);
}
@NotNull
@Override
public Iterator<Acquirable<E>> iterator() {
return acquirableCollection.iterator();
}
@NotNull
@Override
public Object[] toArray() {
return acquirableCollection.toArray();
}
@NotNull
@Override
public <T> T[] toArray(@NotNull T[] a) {
return acquirableCollection.toArray(a);
}
@Override
public boolean add(Acquirable<E> eAcquirable) {
return acquirableCollection.add(eAcquirable);
}
@Override
public boolean remove(Object o) {
return acquirableCollection.remove(o);
}
@Override
public boolean containsAll(@NotNull Collection<?> c) {
return acquirableCollection.containsAll(c);
}
@Override
public boolean addAll(@NotNull Collection<? extends Acquirable<E>> c) {
return acquirableCollection.addAll(c);
}
@Override
public boolean removeAll(@NotNull Collection<?> c) {
return acquirableCollection.removeAll(c);
}
@Override
public boolean retainAll(@NotNull Collection<?> c) {
return acquirableCollection.retainAll(c);
}
@Override
public void clear() {
this.acquirableCollection.clear();
}
/**
* Creates
*
* @param collection the acquirable collection
* @param currentThread the current thread
* @param consumer the consumer to execute when an element is already in the current thread
* @return a new Thread to acquirable elements map
*/
protected static <T> Map<TickThread, List<T>> retrieveOptionalThreadMap(@NotNull Collection<Acquirable<T>> collection,
@NotNull Thread currentThread,
@NotNull Consumer<T> consumer) {
// Separate a collection of acquirable elements into a map of thread->elements
// Useful to reduce the number of acquisition
Map<TickThread, List<T>> threadCacheMap = new HashMap<>();
for (var element : collection) {
final T value = element.unwrap();
final TickThread elementThread = element.getHandler().getTickThread();
if (currentThread == elementThread) {
// The element is managed in the current thread, consumer can be immediately called
consumer.accept(value);
} else {
// The element is manager in a different thread, cache it
List<T> threadCacheList = threadCacheMap.computeIfAbsent(elementThread, tickThread -> new ArrayList<>());
threadCacheList.add(value);
}
}
return threadCacheMap;
}
}

View File

@ -0,0 +1,77 @@
package net.minestom.server.acquirable;
import net.minestom.server.entity.Entity;
import net.minestom.server.thread.TickThread;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
class AcquirableImpl<T> implements Acquirable<T> {
protected static final ThreadLocal<Stream<Entity>> CURRENT_ENTITIES = ThreadLocal.withInitial(Stream::empty);
protected static final AtomicLong WAIT_COUNTER_NANO = new AtomicLong();
/**
* Global lock used for synchronization.
*/
private static final ReentrantLock GLOBAL_LOCK = new ReentrantLock();
private final T value;
private final Acquirable.Handler handler;
public AcquirableImpl(@NotNull T value) {
this.value = value;
this.handler = new Acquirable.Handler();
}
@Override
public @NotNull T unwrap() {
return value;
}
@Override
public @NotNull Acquirable.Handler getHandler() {
return handler;
}
protected static @Nullable ReentrantLock enter(@Nullable Thread currentThread, @Nullable TickThread elementThread) {
// Monitoring
long time = System.nanoTime();
ReentrantLock currentLock;
{
final TickThread current = currentThread instanceof TickThread ?
(TickThread) currentThread : null;
currentLock = current != null && current.getLock().isHeldByCurrentThread() ?
current.getLock() : null;
}
if (currentLock != null)
currentLock.unlock();
GLOBAL_LOCK.lock();
if (currentLock != null)
currentLock.lock();
final var lock = elementThread != null ? elementThread.getLock() : null;
final boolean acquired = lock == null || lock.isHeldByCurrentThread();
if (!acquired) {
lock.lock();
}
// Monitoring
AcquirableImpl.WAIT_COUNTER_NANO.addAndGet(System.nanoTime() - time);
return !acquired ? lock : null;
}
protected static void leave(@Nullable ReentrantLock lock) {
if (lock != null) {
lock.unlock();
}
GLOBAL_LOCK.unlock();
}
}

View File

@ -0,0 +1,52 @@
package net.minestom.server.acquirable;
import net.minestom.server.thread.TickThread;
import net.minestom.server.utils.validate.Check;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.locks.ReentrantLock;
public class Acquired<T> {
private final T value;
private final boolean locked;
private final ReentrantLock lock;
private boolean unlocked;
protected static <T> Acquired<T> local(@NotNull T value) {
return new Acquired<>(value, false, null, null);
}
protected static <T> Acquired<T> locked(@NotNull Acquirable<T> acquirable) {
final Thread currentThread = Thread.currentThread();
final TickThread tickThread = acquirable.getHandler().getTickThread();
return new Acquired<>(acquirable.unwrap(), true, currentThread, tickThread);
}
private Acquired(@NotNull T value,
boolean locked, Thread currentThread, TickThread tickThread) {
this.value = value;
this.locked = locked;
this.lock = locked ? AcquirableImpl.enter(currentThread, tickThread) : null;
}
public @NotNull T get() {
checkLock();
return value;
}
public void unlock() {
checkLock();
this.unlocked = true;
if (!locked)
return;
AcquirableImpl.leave(lock);
}
private void checkLock() {
Check.stateCondition(unlocked, "The acquired element has already been unlocked!");
}
}

View File

@ -1,5 +1,6 @@
package net.minestom.server.entity;
import com.google.common.annotations.Beta;
import com.google.common.collect.Queues;
import net.kyori.adventure.text.Component;
import net.kyori.adventure.text.event.HoverEvent;
@ -8,6 +9,7 @@ import net.kyori.adventure.text.event.HoverEventSource;
import net.minestom.server.MinecraftServer;
import net.minestom.server.Tickable;
import net.minestom.server.Viewable;
import net.minestom.server.acquirable.Acquirable;
import net.minestom.server.chat.JsonMessage;
import net.minestom.server.collision.BoundingBox;
import net.minestom.server.collision.CollisionUtils;
@ -42,6 +44,7 @@ import net.minestom.server.utils.time.Cooldown;
import net.minestom.server.utils.time.TimeUnit;
import net.minestom.server.utils.time.UpdateOption;
import net.minestom.server.utils.validate.Check;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -123,6 +126,8 @@ public class Entity implements Viewable, Tickable, EventHandler, DataContainer,
private long ticks;
private final EntityTickEvent tickEvent = new EntityTickEvent(this);
private final Acquirable<Entity> acquirable = Acquirable.of(this);
/**
* Lock used to support #switchEntityType
*/
@ -466,7 +471,7 @@ public class Entity implements Viewable, Tickable, EventHandler, DataContainer,
// Fix current chunk being null if the entity has been spawned before
if (currentChunk == null) {
currentChunk = instance.getChunkAt(position);
refreshCurrentChunk(instance.getChunkAt(position));
}
// Check if the entity chunk is loaded
@ -833,6 +838,12 @@ public class Entity implements Viewable, Tickable, EventHandler, DataContainer,
return currentChunk;
}
@ApiStatus.Internal
protected void refreshCurrentChunk(Chunk currentChunk) {
this.currentChunk = currentChunk;
MinecraftServer.getUpdateManager().getThreadProvider().updateEntity(this);
}
/**
* Gets the entity current instance.
*
@ -866,7 +877,7 @@ public class Entity implements Viewable, Tickable, EventHandler, DataContainer,
this.isActive = true;
this.instance = instance;
this.currentChunk = instance.getChunkAt(position.getX(), position.getZ());
refreshCurrentChunk(instance.getChunkAt(position.getX(), position.getZ()));
instance.UNSAFE_addEntity(this);
spawn();
EntitySpawnEvent entitySpawnEvent = new EntitySpawnEvent(this, instance);
@ -1340,7 +1351,7 @@ public class Entity implements Viewable, Tickable, EventHandler, DataContainer,
player.refreshVisibleEntities(newChunk);
}
this.currentChunk = newChunk;
refreshCurrentChunk(newChunk);
}
}
@ -1456,6 +1467,10 @@ public class Entity implements Viewable, Tickable, EventHandler, DataContainer,
* WARNING: this does not trigger {@link EntityDeathEvent}.
*/
public void remove() {
if (isRemoved())
return;
MinecraftServer.getUpdateManager().getThreadProvider().removeEntity(this);
this.removed = true;
this.shouldRemove = true;
Entity.ENTITY_BY_ID.remove(id);
@ -1559,6 +1574,16 @@ public class Entity implements Viewable, Tickable, EventHandler, DataContainer,
return Objects.requireNonNullElse(this.customSynchronizationCooldown, SYNCHRONIZATION_COOLDOWN);
}
@Beta
public <T extends Entity> @NotNull Acquirable<T> getAcquirable() {
return (Acquirable<T>) acquirable;
}
@Beta
public <T extends Entity> @NotNull Acquirable<T> getAcquirable(@NotNull Class<T> clazz) {
return (Acquirable<T>) acquirable;
}
public enum Pose {
STANDING,
FALL_FLYING,

View File

@ -561,6 +561,9 @@ public class Player extends LivingEntity implements CommandSender, Localizable,
@Override
public void remove() {
if (isRemoved())
return;
callEvent(PlayerDisconnectEvent.class, new PlayerDisconnectEvent(this));
super.remove();

View File

@ -54,7 +54,7 @@ import java.util.function.Consumer;
* WARNING: when making your own implementation registering the instance manually is required
* with {@link InstanceManager#registerInstance(Instance)}, and
* you need to be sure to signal the {@link UpdateManager} of the changes using
* {@link UpdateManager#signalChunkLoad(Instance, Chunk)} and {@link UpdateManager#signalChunkUnload(Instance, Chunk)}.
* {@link UpdateManager#signalChunkLoad(Chunk)} and {@link UpdateManager#signalChunkUnload(Chunk)}.
*/
public abstract class Instance implements BlockModifier, Tickable, EventHandler, DataContainer, PacketGroupingAudience {
@ -253,7 +253,7 @@ public abstract class Instance implements BlockModifier, Tickable, EventHandler,
* Used when a {@link Chunk} is not currently loaded in memory and need to be retrieved from somewhere else.
* Could be read from disk, or generated from scratch.
* <p>
* Be sure to signal the chunk using {@link UpdateManager#signalChunkLoad(Instance, Chunk)} and to cache
* Be sure to signal the chunk using {@link UpdateManager#signalChunkLoad(Chunk)} and to cache
* that this chunk has been loaded.
* <p>
* WARNING: it has to retrieve a chunk, this is not optional and should execute the callback in all case.
@ -267,7 +267,7 @@ public abstract class Instance implements BlockModifier, Tickable, EventHandler,
/**
* Called to generated a new {@link Chunk} from scratch.
* <p>
* Be sure to signal the chunk using {@link UpdateManager#signalChunkLoad(Instance, Chunk)} and to cache
* Be sure to signal the chunk using {@link UpdateManager#signalChunkLoad(Chunk)} and to cache
* that this chunk has been loaded.
* <p>
* This is where you can put your chunk generation code.

View File

@ -506,7 +506,7 @@ public class InstanceContainer extends Instance {
protected void retrieveChunk(int chunkX, int chunkZ, @Nullable ChunkCallback callback) {
final boolean loaded = chunkLoader.loadChunk(this, chunkX, chunkZ, chunk -> {
cacheChunk(chunk);
UPDATE_MANAGER.signalChunkLoad(this, chunk);
UPDATE_MANAGER.signalChunkLoad(chunk);
// Execute callback and event in the instance thread
scheduleNextTick(instance -> {
callChunkLoadEvent(chunkX, chunkZ);
@ -544,7 +544,7 @@ public class InstanceContainer extends Instance {
OptionalCallback.execute(callback, chunk);
}
UPDATE_MANAGER.signalChunkLoad(this, chunk);
UPDATE_MANAGER.signalChunkLoad(chunk);
callChunkLoadEvent(chunkX, chunkZ);
}
@ -641,7 +641,7 @@ public class InstanceContainer extends Instance {
final Chunk copiedChunk = chunk.copy(copiedInstance, chunkX, chunkZ);
copiedInstance.cacheChunk(copiedChunk);
UPDATE_MANAGER.signalChunkLoad(copiedInstance, copiedChunk);
UPDATE_MANAGER.signalChunkLoad(copiedChunk);
}
return copiedInstance;
@ -682,7 +682,7 @@ public class InstanceContainer extends Instance {
* Adds a {@link Chunk} to the internal instance map.
* <p>
* WARNING: the chunk will not automatically be sent to players and
* {@link net.minestom.server.UpdateManager#signalChunkLoad(Instance, Chunk)} must be called manually.
* {@link net.minestom.server.UpdateManager#signalChunkLoad(Chunk)} must be called manually.
*
* @param chunk the chunk to cache
*/
@ -823,7 +823,7 @@ public class InstanceContainer extends Instance {
chunk.unload();
UPDATE_MANAGER.signalChunkUnload(this, chunk);
UPDATE_MANAGER.signalChunkUnload(chunk);
}
this.scheduledChunksToRemove.clear();
}

View File

@ -41,6 +41,7 @@ public final class BenchmarkManager {
THREADS.add(THREAD_NAME_BLOCK_BATCH);
THREADS.add(THREAD_NAME_SCHEDULER);
THREADS.add(THREAD_NAME_TICK_SCHEDULER);
THREADS.add(THREAD_NAME_TICK);
}

View File

@ -3,12 +3,18 @@ package net.minestom.server.monitoring;
public class TickMonitor {
private final double tickTime;
private final double acquisitionTime;
public TickMonitor(double tickTime) {
public TickMonitor(double tickTime, double acquisitionTime) {
this.tickTime = tickTime;
this.acquisitionTime = acquisitionTime;
}
public double getTickTime() {
return tickTime;
}
}
public double getAcquisitionTime() {
return acquisitionTime;
}
}

View File

@ -0,0 +1,28 @@
package net.minestom.server.thread;
import net.minestom.server.instance.Chunk;
import org.jetbrains.annotations.NotNull;
/**
* Each {@link Chunk} gets assigned to a random thread.
*/
public class PerChunkThreadProvider extends ThreadProvider {
public PerChunkThreadProvider(int threadCount) {
super(threadCount);
}
public PerChunkThreadProvider() {
super();
}
@Override
public long findThread(@NotNull Chunk chunk) {
return chunk.hashCode();
}
@Override
public @NotNull RefreshType getChunkRefreshType() {
return RefreshType.NEVER;
}
}

View File

@ -4,61 +4,26 @@ import net.minestom.server.instance.Chunk;
import net.minestom.server.instance.Instance;
import org.jetbrains.annotations.NotNull;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
/**
* Separates work between instance (1 instance = 1 thread execution).
* Each {@link Instance} gets assigned to a random thread.
*/
public class PerInstanceThreadProvider extends ThreadProvider {
private final Map<Instance, Set<Chunk>> instanceChunkMap = new ConcurrentHashMap<>();
public PerInstanceThreadProvider(int threadCount) {
super(threadCount);
}
@Override
public void onInstanceCreate(@NotNull Instance instance) {
this.instanceChunkMap.putIfAbsent(instance, ConcurrentHashMap.newKeySet());
public PerInstanceThreadProvider() {
super();
}
@Override
public void onInstanceDelete(@NotNull Instance instance) {
this.instanceChunkMap.remove(instance);
public long findThread(@NotNull Chunk chunk) {
return chunk.getInstance().hashCode();
}
@Override
public void onChunkLoad(@NotNull Instance instance, @NotNull Chunk chunk) {
// Add the loaded chunk to the instance chunks list
Set<Chunk> chunks = getChunks(instance);
chunks.add(chunk);
public @NotNull RefreshType getChunkRefreshType() {
return RefreshType.NEVER;
}
@Override
public void onChunkUnload(@NotNull Instance instance, @NotNull Chunk chunk) {
Set<Chunk> chunks = getChunks(instance);
chunks.remove(chunk);
}
@NotNull
@Override
public List<Future<?>> update(long time) {
List<Future<?>> futures = new ArrayList<>();
instanceChunkMap.forEach((instance, chunks) -> futures.add(pool.submit(() -> {
// Tick instance
updateInstance(instance, time);
// Tick chunks
for (Chunk chunk : chunks) {
processChunkTick(instance, chunk, time);
}
})));
return futures;
}
private Set<Chunk> getChunks(Instance instance) {
return instanceChunkMap.computeIfAbsent(instance, inst -> ConcurrentHashMap.newKeySet());
}
}

View File

@ -1,56 +1,24 @@
package net.minestom.server.thread;
import net.minestom.server.instance.Chunk;
import net.minestom.server.instance.Instance;
import org.jetbrains.annotations.NotNull;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Future;
/**
* Simple thread provider implementation using a single thread to update all the instances and chunks.
* Uses a single thread for all chunks.
*/
public class SingleThreadProvider extends ThreadProvider {
{
setThreadCount(1);
}
private final Set<Instance> instances = new CopyOnWriteArraySet<>();
@Override
public void onInstanceCreate(@NotNull Instance instance) {
this.instances.add(instance);
public SingleThreadProvider() {
super(1);
}
@Override
public void onInstanceDelete(@NotNull Instance instance) {
this.instances.remove(instance);
public long findThread(@NotNull Chunk chunk) {
return 0;
}
@Override
public void onChunkLoad(@NotNull Instance instance, @NotNull Chunk chunk) {
}
@Override
public void onChunkUnload(@NotNull Instance instance, @NotNull Chunk chunk) {
}
@NotNull
@Override
public List<Future<?>> update(long time) {
return Collections.singletonList(pool.submit(() -> {
for (Instance instance : instances) {
updateInstance(instance, time);
for (Chunk chunk : instance.getChunks()) {
processChunkTick(instance, chunk, time);
}
}
}));
public @NotNull RefreshType getChunkRefreshType() {
return RefreshType.NEVER;
}
}

View File

@ -1,251 +1,386 @@
package net.minestom.server.thread;
import io.netty.util.NettyRuntime;
import net.minestom.server.MinecraftServer;
import net.minestom.server.entity.*;
import net.minestom.server.acquirable.Acquirable;
import net.minestom.server.entity.Entity;
import net.minestom.server.instance.Chunk;
import net.minestom.server.instance.Instance;
import net.minestom.server.instance.InstanceContainer;
import net.minestom.server.instance.SharedInstance;
import net.minestom.server.utils.callback.validator.EntityValidator;
import net.minestom.server.utils.MathUtils;
import net.minestom.server.utils.chunk.ChunkUtils;
import net.minestom.server.utils.thread.MinestomThread;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Stream;
/**
* Used to link chunks into multiple groups.
* Then executed into a thread pool.
* <p>
* You can change the current thread provider by calling {@link net.minestom.server.UpdateManager#setThreadProvider(ThreadProvider)}.
*/
public abstract class ThreadProvider {
/**
* The thread pool of this thread provider.
*/
protected ExecutorService pool;
/**
* The amount of threads in the thread pool
*/
private int threadCount;
private final List<TickThread> threads;
{
// Default thread count in the pool (cores * 2)
setThreadCount(1);
private final Map<TickThread, Set<ChunkEntry>> threadChunkMap = new HashMap<>();
private final Map<Chunk, ChunkEntry> chunkEntryMap = new HashMap<>();
// Iterated over to refresh the thread used to update entities & chunks
private final ArrayDeque<Chunk> chunks = new ArrayDeque<>();
private final Set<Entity> updatableEntities = ConcurrentHashMap.newKeySet();
private final Set<Entity> removedEntities = ConcurrentHashMap.newKeySet();
public ThreadProvider(int threadCount) {
this.threads = new ArrayList<>(threadCount);
for (int i = 0; i < threadCount; i++) {
final TickThread.BatchRunnable batchRunnable = new TickThread.BatchRunnable();
final TickThread tickThread = new TickThread(batchRunnable, i);
this.threads.add(tickThread);
tickThread.start();
}
}
/**
* Called when an {@link Instance} is registered.
*
* @param instance the newly create {@link Instance}
*/
public abstract void onInstanceCreate(@NotNull Instance instance);
public ThreadProvider() {
this(Runtime.getRuntime().availableProcessors());
}
/**
* Called when an {@link Instance} is unregistered.
*
* @param instance the deleted {@link Instance}
*/
public abstract void onInstanceDelete(@NotNull Instance instance);
public synchronized void onInstanceCreate(@NotNull Instance instance) {
instance.getChunks().forEach(this::addChunk);
}
/**
* Called when a chunk is loaded.
* <p>
* Be aware that this is possible for an instance to load chunks before being registered.
*
* @param instance the instance of the chunk
* @param chunk the loaded chunk
*/
public abstract void onChunkLoad(@NotNull Instance instance, @NotNull Chunk chunk);
public synchronized void onInstanceDelete(@NotNull Instance instance) {
instance.getChunks().forEach(this::removeChunk);
}
/**
* Called when a chunk is unloaded.
*
* @param instance the instance of the chunk
* @param chunk the unloaded chunk
*/
public abstract void onChunkUnload(@NotNull Instance instance, @NotNull Chunk chunk);
public synchronized void onChunkLoad(Chunk chunk) {
addChunk(chunk);
}
public synchronized void onChunkUnload(Chunk chunk) {
removeChunk(chunk);
}
/**
* Performs a server tick for all chunks based on their linked thread.
*
* @param time the update time in milliseconds
* @return the futures to execute to complete the tick
* @param chunk the chunk
*/
@NotNull
public abstract List<Future<?>> update(long time);
public abstract long findThread(@NotNull Chunk chunk);
/**
* Gets the current size of the thread pool.
* Defines how often chunks thread should be updated.
*
* @return the thread pool's size
* @return the refresh type
*/
public int getThreadCount() {
return threadCount;
public @NotNull RefreshType getChunkRefreshType() {
return RefreshType.CONSTANT;
}
/**
* Changes the amount of threads in the thread pool.
* Represents the maximum percentage of tick time that can be spent refreshing chunks thread.
* <p>
* Percentage based on {@link MinecraftServer#TICK_MS}.
*
* @param threadCount the new amount of threads
* @return the refresh percentage
*/
public synchronized void setThreadCount(int threadCount) {
this.threadCount = threadCount;
refreshPool();
public float getRefreshPercentage() {
return 0.3f;
}
private void refreshPool() {
if (pool != null) {
this.pool.shutdown();
/**
* Minimum time used to refresh chunks & entities thread.
*
* @return the minimum refresh time in milliseconds
*/
public int getMinimumRefreshTime() {
return 3;
}
/**
* Maximum time used to refresh chunks & entities thread.
*
* @return the maximum refresh time in milliseconds
*/
public int getMaximumRefreshTime() {
return (int) (MinecraftServer.TICK_MS * 0.3);
}
/**
* Prepares the update by creating the {@link TickThread} tasks.
*
* @param time the tick time in milliseconds
*/
public synchronized @NotNull CountDownLatch update(long time) {
CountDownLatch countDownLatch = new CountDownLatch(threads.size());
for (TickThread thread : threads) {
// Execute tick
thread.runnable.startTick(countDownLatch, () -> {
final var chunkEntries = threadChunkMap.get(thread);
if (chunkEntries == null || chunkEntries.isEmpty()) {
// Nothing to tick
Acquirable.refreshEntities(Stream.empty());
return;
}
final var entities = chunkEntries.stream()
.flatMap(chunkEntry -> chunkEntry.entities.stream());
Acquirable.refreshEntities(entities);
final ReentrantLock lock = thread.getLock();
lock.lock();
chunkEntries.forEach(chunkEntry -> {
Chunk chunk = chunkEntry.chunk;
if (!ChunkUtils.isLoaded(chunk))
return;
chunk.tick(time);
chunkEntry.entities.forEach(entity -> {
final boolean hasQueue = lock.hasQueuedThreads();
if (hasQueue) {
lock.unlock();
// #acquire callbacks should be called here
lock.lock();
}
entity.tick(time);
});
});
Acquirable.refreshEntities(Stream.empty());
lock.unlock();
});
}
this.pool = new MinestomThread(threadCount, MinecraftServer.THREAD_NAME_TICK);
return countDownLatch;
}
// INSTANCE UPDATE
/**
* Processes a whole tick for a chunk.
* Called at the end of each tick to clear removed entities,
* refresh the chunk linked to an entity, and chunk threads based on {@link #findThread(Chunk)}.
*
* @param instance the instance of the chunk
* @param chunk the chunk to update
* @param time the time of the update in milliseconds
* @param tickTime the duration of the tick in ms,
* used to ensure that the refresh does not take more time than the tick itself
*/
protected void processChunkTick(@NotNull Instance instance, @NotNull Chunk chunk, long time) {
if (!ChunkUtils.isLoaded(chunk))
public synchronized void refreshThreads(long tickTime) {
// Clear removed entities
processRemovedEntities();
// Update entities chunks
processUpdatedEntities();
if (getChunkRefreshType() == RefreshType.NEVER)
return;
updateChunk(instance, chunk, time);
updateEntities(instance, chunk, time);
final int timeOffset = MathUtils.clamp((int) ((double) tickTime * getRefreshPercentage()),
getMinimumRefreshTime(), getMaximumRefreshTime());
final long endTime = System.currentTimeMillis() + timeOffset;
final int size = chunks.size();
int counter = 0;
while (true) {
Chunk chunk = chunks.pollFirst();
if (!ChunkUtils.isLoaded(chunk)) {
removeChunk(chunk);
return;
}
// Update chunk threads
switchChunk(chunk);
// Add back to the deque
chunks.addLast(chunk);
if (++counter > size)
break;
if (System.currentTimeMillis() >= endTime)
break;
}
}
/**
* Executes an instance tick.
* Add an entity into the waiting list to get assigned in a thread.
* <p>
* Called when entering a new chunk.
*
* @param instance the instance
* @param time the current time in ms
* @param entity the entity to add
*/
protected void updateInstance(@NotNull Instance instance, long time) {
// The instance
instance.tick(time);
public void updateEntity(@NotNull Entity entity) {
this.updatableEntities.add(entity);
}
/**
* Executes a chunk tick (blocks update).
* Add an entity into the waiting list to get removed from its thread.
* <p>
* Called in {@link Entity#remove()}.
*
* @param instance the chunk's instance
* @param chunk the chunk
* @param time the current time in ms
* @param entity the entity to remove
*/
protected void updateChunk(@NotNull Instance instance, @NotNull Chunk chunk, long time) {
chunk.tick(time);
}
// ENTITY UPDATE
/**
* Executes an entity tick (all entities type creatures/objects/players) in an instance's chunk.
*
* @param instance the chunk's instance
* @param chunk the chunk
* @param time the current time in ms
*/
protected void updateEntities(@NotNull Instance instance, @NotNull Chunk chunk, long time) {
conditionalEntityUpdate(instance, chunk, time, null);
public void removeEntity(@NotNull Entity entity) {
this.removedEntities.add(entity);
}
/**
* Executes an entity tick for object entities in an instance's chunk.
*
* @param instance the chunk's instance
* @param chunk the chunk
* @param time the current time in ms
* Shutdowns all the {@link TickThread tick threads}.
* <p>
* Action is irreversible.
*/
protected void updateObjectEntities(@NotNull Instance instance, @NotNull Chunk chunk, long time) {
conditionalEntityUpdate(instance, chunk, time, entity -> entity instanceof ObjectEntity);
public void shutdown() {
this.threads.forEach(TickThread::shutdown);
}
/**
* Executes an entity tick for living entities in an instance's chunk.
* Gets all the {@link TickThread tick threads}.
*
* @param instance the chunk's instance
* @param chunk the chunk
* @param time the current time in ms
* @return the tick threads
*/
protected void updateLivingEntities(@NotNull Instance instance, @NotNull Chunk chunk, long time) {
conditionalEntityUpdate(instance, chunk, time, entity -> entity instanceof LivingEntity);
public @NotNull List<@NotNull TickThread> getThreads() {
return threads;
}
protected void addChunk(@NotNull Chunk chunk) {
ChunkEntry chunkEntry = setChunkThread(chunk, (thread) -> new ChunkEntry(thread, chunk));
this.chunkEntryMap.put(chunk, chunkEntry);
this.chunks.add(chunk);
}
protected void switchChunk(@NotNull Chunk chunk) {
ChunkEntry chunkEntry = chunkEntryMap.get(chunk);
if (chunkEntry == null)
return;
var chunks = threadChunkMap.get(chunkEntry.thread);
if (chunks == null || chunks.isEmpty())
return;
chunks.remove(chunkEntry);
setChunkThread(chunk, tickThread -> {
chunkEntry.thread = tickThread;
return chunkEntry;
});
}
protected @NotNull ChunkEntry setChunkThread(@NotNull Chunk chunk,
@NotNull Function<TickThread, ChunkEntry> chunkEntrySupplier) {
final int threadId = getThreadId(chunk);
TickThread thread = threads.get(threadId);
var chunks = threadChunkMap.computeIfAbsent(thread, tickThread -> ConcurrentHashMap.newKeySet());
ChunkEntry chunkEntry = chunkEntrySupplier.apply(thread);
chunks.add(chunkEntry);
return chunkEntry;
}
protected void removeChunk(Chunk chunk) {
ChunkEntry chunkEntry = chunkEntryMap.get(chunk);
if (chunkEntry != null) {
TickThread thread = chunkEntry.thread;
var chunks = threadChunkMap.get(thread);
if (chunks != null) {
chunks.remove(chunkEntry);
}
chunkEntryMap.remove(chunk);
}
this.chunks.remove(chunk);
}
/**
* Executes an entity tick for creatures entities in an instance's chunk.
* Finds the thread id associated to a {@link Chunk}.
*
* @param instance the chunk's instance
* @param chunk the chunk
* @param time the current time in ms
* @param chunk the chunk to find the thread id from
* @return the chunk thread id
*/
protected void updateCreatures(@NotNull Instance instance, @NotNull Chunk chunk, long time) {
conditionalEntityUpdate(instance, chunk, time, entity -> entity instanceof EntityCreature);
protected int getThreadId(@NotNull Chunk chunk) {
return (int) (Math.abs(findThread(chunk)) % threads.size());
}
/**
* Executes an entity tick for players in an instance's chunk.
*
* @param instance the chunk's instance
* @param chunk the chunk
* @param time the current time in ms
*/
protected void updatePlayers(@NotNull Instance instance, @NotNull Chunk chunk, long time) {
conditionalEntityUpdate(instance, chunk, time, entity -> entity instanceof Player);
private void processRemovedEntities() {
if (removedEntities.isEmpty())
return;
for (Entity entity : removedEntities) {
var acquirableEntity = entity.getAcquirable();
ChunkEntry chunkEntry = acquirableEntity.getHandler().getChunkEntry();
// Remove from list
if (chunkEntry != null) {
chunkEntry.entities.remove(entity);
}
}
this.removedEntities.clear();
}
/**
* Executes an entity tick in an instance's chunk if condition is verified.
*
* @param instance the chunk's instance
* @param chunk the chunk
* @param time the current time in ms
* @param condition the condition which confirm if the update happens or not
*/
protected void conditionalEntityUpdate(@NotNull Instance instance, @NotNull Chunk chunk, long time,
@Nullable EntityValidator condition) {
if (!instance.getEntities().isEmpty()) {
final Set<Entity> entities = instance.getChunkEntities(chunk);
private void processUpdatedEntities() {
if (updatableEntities.isEmpty())
return;
for (Entity entity : updatableEntities) {
var acquirableEntity = entity.getAcquirable();
ChunkEntry handlerChunkEntry = acquirableEntity.getHandler().getChunkEntry();
if (!entities.isEmpty()) {
for (Entity entity : entities) {
if (condition != null && !condition.isValid(entity))
continue;
entity.tick(time);
Chunk entityChunk = entity.getChunk();
// Entity is possibly not in the correct thread
// Remove from previous list
{
if (handlerChunkEntry != null) {
handlerChunkEntry.entities.remove(entity);
}
}
// Add to new list
{
ChunkEntry chunkEntry = chunkEntryMap.get(entityChunk);
if (chunkEntry != null) {
chunkEntry.entities.add(entity);
acquirableEntity.getHandler().refreshChunkEntry(chunkEntry);
}
}
}
updateSharedInstances(instance, sharedInstance -> conditionalEntityUpdate(sharedInstance, chunk, time, condition));
this.updatableEntities.clear();
}
/**
* If {@code instance} is an {@link InstanceContainer}, run a callback for all of its
* {@link SharedInstance}.
*
* @param instance the instance
* @param callback the callback to run for all the {@link SharedInstance}
* Defines how often chunks thread should be refreshed.
*/
private void updateSharedInstances(@NotNull Instance instance, @NotNull Consumer<SharedInstance> callback) {
if (instance instanceof InstanceContainer) {
final InstanceContainer instanceContainer = (InstanceContainer) instance;
public enum RefreshType {
/**
* Chunk thread is constant after being defined.
*/
NEVER,
/**
* Chunk thread should be recomputed as often as possible.
*/
CONSTANT,
/**
* Chunk thread should be recomputed, but not continuously.
*/
RARELY
}
if (!instanceContainer.hasSharedInstances())
return;
public static class ChunkEntry {
private volatile TickThread thread;
private final Chunk chunk;
private final List<Entity> entities = new ArrayList<>();
for (SharedInstance sharedInstance : instanceContainer.getSharedInstances()) {
callback.accept(sharedInstance);
}
private ChunkEntry(TickThread thread, Chunk chunk) {
this.thread = thread;
this.chunk = chunk;
}
public @NotNull TickThread getThread() {
return thread;
}
public @NotNull Chunk getChunk() {
return chunk;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ChunkEntry that = (ChunkEntry) o;
return chunk.equals(that.chunk);
}
@Override
public int hashCode() {
return Objects.hash(chunk);
}
}

View File

@ -0,0 +1,103 @@
package net.minestom.server.thread;
import net.minestom.server.MinecraftServer;
import net.minestom.server.utils.validate.Check;
import org.jetbrains.annotations.NotNull;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
/**
* Thread responsible for ticking {@link net.minestom.server.instance.Chunk chunks} and {@link net.minestom.server.entity.Entity entities}.
* <p>
* Created in {@link ThreadProvider}, and awaken every tick with a task to execute.
*/
public class TickThread extends Thread {
protected final BatchRunnable runnable;
private final ReentrantLock lock = new ReentrantLock();
public TickThread(@NotNull BatchRunnable runnable, int number) {
super(runnable, MinecraftServer.THREAD_NAME_TICK + "-" + number);
this.runnable = runnable;
this.runnable.setLinkedThread(this);
}
/**
* Gets the lock used to ensure the safety of entity acquisition.
*
* @return the thread lock
*/
public @NotNull ReentrantLock getLock() {
return lock;
}
/**
* Shutdowns the thread. Cannot be undone.
*/
public void shutdown() {
this.runnable.stop = true;
LockSupport.unpark(this);
}
protected static class BatchRunnable implements Runnable {
private volatile boolean stop;
private TickThread tickThread;
private volatile boolean inTick;
private final AtomicReference<CountDownLatch> countDownLatch = new AtomicReference<>();
private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
@Override
public void run() {
Check.notNull(tickThread, "The linked BatchThread cannot be null!");
while (!stop) {
LockSupport.park(tickThread);
if (stop)
break;
CountDownLatch localCountDownLatch = this.countDownLatch.get();
// The latch is necessary to control the tick rates
if (localCountDownLatch == null) {
continue;
}
this.inTick = true;
// Execute all pending runnable
Runnable runnable;
while ((runnable = queue.poll()) != null) {
runnable.run();
}
localCountDownLatch.countDown();
this.countDownLatch.compareAndSet(localCountDownLatch, null);
// Wait for the next notify (game tick)
this.inTick = false;
}
}
public void startTick(@NotNull CountDownLatch countDownLatch, @NotNull Runnable runnable) {
this.countDownLatch.set(countDownLatch);
this.queue.add(runnable);
LockSupport.unpark(tickThread);
}
public boolean isInTick() {
return inTick;
}
private void setLinkedThread(TickThread tickThread) {
this.tickThread = tickThread;
}
}
}

View File

@ -1,10 +1,8 @@
package demo;
import com.google.common.util.concurrent.AtomicDouble;
import demo.generator.ChunkGeneratorDemo;
import demo.generator.NoiseTestGenerator;
import net.kyori.adventure.text.Component;
import net.kyori.adventure.text.format.TextColor;
import net.minestom.server.MinecraftServer;
import net.minestom.server.adventure.audience.Audiences;
import net.minestom.server.chat.ColoredText;
@ -32,8 +30,8 @@ import net.minestom.server.item.ItemTag;
import net.minestom.server.item.Material;
import net.minestom.server.item.metadata.CompassMeta;
import net.minestom.server.monitoring.BenchmarkManager;
import net.minestom.server.monitoring.TickMonitor;
import net.minestom.server.network.ConnectionManager;
import net.minestom.server.ping.ResponseDataConsumer;
import net.minestom.server.utils.MathUtils;
import net.minestom.server.utils.Position;
import net.minestom.server.utils.Vector;
@ -42,14 +40,13 @@ import net.minestom.server.world.DimensionType;
import java.util.Collection;
import java.util.Collections;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
public class PlayerInit {
private static final InstanceContainer instanceContainer;
private static final Inventory inventory;
static {
@ -57,9 +54,12 @@ public class PlayerInit {
//StorageLocation storageLocation = MinecraftServer.getStorageManager().getLocation("instance_data", new StorageOptions().setCompression(true));
ChunkGeneratorDemo chunkGeneratorDemo = new ChunkGeneratorDemo();
NoiseTestGenerator noiseTestGenerator = new NoiseTestGenerator();
instanceContainer = instanceManager.createInstanceContainer(DimensionType.OVERWORLD);
instanceContainer.enableAutoChunkLoad(true);
instanceContainer.setChunkGenerator(chunkGeneratorDemo);
for (int i = 0; i < 4; i++) {
InstanceContainer instanceContainer = instanceManager.createInstanceContainer(DimensionType.OVERWORLD);
instanceContainer.enableAutoChunkLoad(true);
instanceContainer.setChunkGenerator(chunkGeneratorDemo);
}
inventory = new Inventory(InventoryType.CHEST_1_ROW, Component.text("Test inventory"));
/*inventory.addInventoryCondition((p, slot, clickType, inventoryConditionResult) -> {
@ -91,14 +91,14 @@ public class PlayerInit {
}
private static AtomicDouble LAST_TICK_TIME = new AtomicDouble();
private static AtomicReference<TickMonitor> LAST_TICK = new AtomicReference<>();
public static void init() {
ConnectionManager connectionManager = MinecraftServer.getConnectionManager();
BenchmarkManager benchmarkManager = MinecraftServer.getBenchmarkManager();
MinecraftServer.getUpdateManager().addTickMonitor(tickMonitor ->
LAST_TICK_TIME.set(tickMonitor.getTickTime()));
LAST_TICK.set(tickMonitor));
MinecraftServer.getSchedulerManager().buildTask(() -> {
@ -110,9 +110,13 @@ public class PlayerInit {
long ramUsage = benchmarkManager.getUsedMemory();
ramUsage /= 1e6; // bytes to MB
TickMonitor tickMonitor = LAST_TICK.get();
final Component header = Component.text("RAM USAGE: " + ramUsage + " MB")
.append(Component.newline())
.append(Component.text("TICK TIME: " + MathUtils.round(LAST_TICK_TIME.get(), 2) + "ms"));
.append(Component.text("TICK TIME: " + MathUtils.round(tickMonitor.getTickTime(), 2) + "ms"))
.append(Component.newline())
.append(Component.text("ACQ TIME: " + MathUtils.round(tickMonitor.getAcquisitionTime(), 2) + "ms"));
final Component footer = benchmarkManager.getCpuMonitoringMessage();
Audiences.players().sendPlayerListHeaderAndFooter(header, footer);
@ -183,7 +187,7 @@ public class PlayerInit {
final CustomBlock customBlock = player.getInstance().getCustomBlock(event.getBlockPosition());
final Block block = Block.fromStateId(blockStateId);
player.sendMessage("You clicked at the block " + block + " " + customBlock);
player.sendMessage("CHUNK COUNT " + instanceContainer.getChunks().size());
player.sendMessage("CHUNK COUNT " + player.getInstance().getChunks().size());
});
globalEventHandler.addEventCallback(PickupItemEvent.class, event -> {
@ -214,11 +218,12 @@ public class PlayerInit {
globalEventHandler.addEventCallback(PlayerLoginEvent.class, event -> {
final Player player = event.getPlayer();
player.sendMessage("test");
event.setSpawningInstance(instanceContainer);
int x = Math.abs(ThreadLocalRandom.current().nextInt()) % 1000 - 250;
int z = Math.abs(ThreadLocalRandom.current().nextInt()) % 1000 - 250;
var instances = MinecraftServer.getInstanceManager().getInstances();
Instance instance = instances.stream().skip(new Random().nextInt(instances.size())).findFirst().orElse(null);
event.setSpawningInstance(instance);
int x = Math.abs(ThreadLocalRandom.current().nextInt()) % 500 - 250;
int z = Math.abs(ThreadLocalRandom.current().nextInt()) % 500 - 250;
player.setRespawnPoint(new Position(0, 42f, 0));
player.getInventory().addInventoryCondition((p, slot, clickType, inventoryConditionResult) -> {
@ -288,7 +293,7 @@ public class PlayerInit {
// Unload the chunk (save memory) if it has no remaining viewer
if (chunk.getViewers().isEmpty()) {
//player.getInstance().unloadChunk(chunk);
player.getInstance().unloadChunk(chunk);
}
});
}