Update entities thread

This commit is contained in:
TheMode 2021-04-15 01:44:08 +02:00
parent fec36d4706
commit 11b1bbea2e
8 changed files with 123 additions and 86 deletions

View File

@ -113,17 +113,12 @@ public final class UpdateManager {
* @param tickStart the time of the tick in milliseconds * @param tickStart the time of the tick in milliseconds
*/ */
private void serverTick(long tickStart) { private void serverTick(long tickStart) {
// Tick all instances // Tick all instances
MinecraftServer.getInstanceManager().getInstances().forEach(instance -> MinecraftServer.getInstanceManager().getInstances().forEach(instance ->
instance.tick(tickStart)); instance.tick(tickStart));
// Server tick (instance/chunk/entity) // Server tick (instance/chunk/entity)
// Synchronize with the update manager instance, like the signal for chunk load/unload final CountDownLatch countDownLatch = threadProvider.update(tickStart);
final CountDownLatch countDownLatch;
synchronized (this) {
countDownLatch = threadProvider.update(tickStart);
}
// Wait tick end // Wait tick end
try { try {
@ -131,6 +126,9 @@ public final class UpdateManager {
} catch (InterruptedException e) { } catch (InterruptedException e) {
MinecraftServer.getExceptionManager().handleException(e); MinecraftServer.getExceptionManager().handleException(e);
} }
// Clear removed entities & update threads
this.threadProvider.refreshThreads();
} }
/** /**
@ -153,20 +151,10 @@ public final class UpdateManager {
* *
* @return the current thread provider * @return the current thread provider
*/ */
public ThreadProvider getThreadProvider() { public @NotNull ThreadProvider getThreadProvider() {
return threadProvider; return threadProvider;
} }
/**
* Changes the server {@link ThreadProvider}.
*
* @param threadProvider the new thread provider
* @throws NullPointerException if <code>threadProvider</code> is null
*/
public synchronized void setThreadProvider(ThreadProvider threadProvider) {
this.threadProvider = threadProvider;
}
/** /**
* Signals the {@link ThreadProvider} that an instance has been created. * Signals the {@link ThreadProvider} that an instance has been created.
* <p> * <p>
@ -174,9 +162,7 @@ public final class UpdateManager {
* *
* @param instance the instance * @param instance the instance
*/ */
public synchronized void signalInstanceCreate(Instance instance) { public void signalInstanceCreate(Instance instance) {
if (this.threadProvider == null)
return;
this.threadProvider.onInstanceCreate(instance); this.threadProvider.onInstanceCreate(instance);
} }
@ -187,9 +173,7 @@ public final class UpdateManager {
* *
* @param instance the instance * @param instance the instance
*/ */
public synchronized void signalInstanceDelete(Instance instance) { public void signalInstanceDelete(Instance instance) {
if (this.threadProvider == null)
return;
this.threadProvider.onInstanceDelete(instance); this.threadProvider.onInstanceDelete(instance);
} }
@ -200,9 +184,7 @@ public final class UpdateManager {
* *
* @param chunk the loaded chunk * @param chunk the loaded chunk
*/ */
public synchronized void signalChunkLoad(@NotNull Chunk chunk) { public void signalChunkLoad(@NotNull Chunk chunk) {
if (this.threadProvider == null)
return;
this.threadProvider.onChunkLoad(chunk); this.threadProvider.onChunkLoad(chunk);
} }
@ -213,9 +195,7 @@ public final class UpdateManager {
* *
* @param chunk the unloaded chunk * @param chunk the unloaded chunk
*/ */
public synchronized void signalChunkUnload(@NotNull Chunk chunk) { public void signalChunkUnload(@NotNull Chunk chunk) {
if (this.threadProvider == null)
return;
this.threadProvider.onChunkUnload(chunk); this.threadProvider.onChunkUnload(chunk);
} }

View File

@ -22,6 +22,9 @@ import net.minestom.server.instance.Chunk;
import net.minestom.server.instance.Instance; import net.minestom.server.instance.Instance;
import net.minestom.server.instance.InstanceManager; import net.minestom.server.instance.InstanceManager;
import net.minestom.server.instance.block.CustomBlock; import net.minestom.server.instance.block.CustomBlock;
import net.minestom.server.lock.Acquirable;
import net.minestom.server.lock.AcquirableImpl;
import net.minestom.server.lock.LockedElement;
import net.minestom.server.network.packet.server.play.*; import net.minestom.server.network.packet.server.play.*;
import net.minestom.server.network.player.PlayerConnection; import net.minestom.server.network.player.PlayerConnection;
import net.minestom.server.permission.Permission; import net.minestom.server.permission.Permission;
@ -58,7 +61,7 @@ import java.util.function.UnaryOperator;
* <p> * <p>
* To create your own entity you probably want to extends {@link LivingEntity} or {@link EntityCreature} instead. * To create your own entity you probably want to extends {@link LivingEntity} or {@link EntityCreature} instead.
*/ */
public class Entity implements Viewable, Tickable, EventHandler, DataContainer, PermissionHandler, HoverEventSource<ShowEntity> { public class Entity implements Viewable, Tickable, LockedElement, EventHandler, DataContainer, PermissionHandler, HoverEventSource<ShowEntity> {
private static final Map<Integer, Entity> ENTITY_BY_ID = new ConcurrentHashMap<>(); private static final Map<Integer, Entity> ENTITY_BY_ID = new ConcurrentHashMap<>();
private static final Map<UUID, Entity> ENTITY_BY_UUID = new ConcurrentHashMap<>(); private static final Map<UUID, Entity> ENTITY_BY_UUID = new ConcurrentHashMap<>();
@ -123,6 +126,8 @@ public class Entity implements Viewable, Tickable, EventHandler, DataContainer,
private long ticks; private long ticks;
private final EntityTickEvent tickEvent = new EntityTickEvent(this); private final EntityTickEvent tickEvent = new EntityTickEvent(this);
private final Acquirable<Entity> acquirable = new AcquirableImpl<>(this);
/** /**
* Lock used to support #switchEntityType * Lock used to support #switchEntityType
*/ */
@ -483,6 +488,9 @@ public class Entity implements Viewable, Tickable, EventHandler, DataContainer,
} }
} }
// Acquisition
getAcquiredElement().getHandler().acquisitionTick();
final boolean isNettyClient = PlayerUtils.isNettyClient(this); final boolean isNettyClient = PlayerUtils.isNettyClient(this);
// Synchronization with updated fields in #getPosition() // Synchronization with updated fields in #getPosition()
@ -1456,6 +1464,10 @@ public class Entity implements Viewable, Tickable, EventHandler, DataContainer,
* WARNING: this does not trigger {@link EntityDeathEvent}. * WARNING: this does not trigger {@link EntityDeathEvent}.
*/ */
public void remove() { public void remove() {
if (isRemoved())
return;
MinecraftServer.getUpdateManager().getThreadProvider().removeEntity(this);
this.removed = true; this.removed = true;
this.shouldRemove = true; this.shouldRemove = true;
Entity.ENTITY_BY_ID.remove(id); Entity.ENTITY_BY_ID.remove(id);
@ -1559,6 +1571,11 @@ public class Entity implements Viewable, Tickable, EventHandler, DataContainer,
return Objects.requireNonNullElse(this.customSynchronizationCooldown, SYNCHRONIZATION_COOLDOWN); return Objects.requireNonNullElse(this.customSynchronizationCooldown, SYNCHRONIZATION_COOLDOWN);
} }
@Override
public @NotNull <T> Acquirable<T> getAcquiredElement() {
return (Acquirable<T>) acquirable;
}
public enum Pose { public enum Pose {
STANDING, STANDING,
FALL_FLYING, FALL_FLYING,

View File

@ -561,6 +561,9 @@ public class Player extends LivingEntity implements CommandSender, Localizable,
@Override @Override
public void remove() { public void remove() {
if(isRemoved())
return;
callEvent(PlayerDisconnectEvent.class, new PlayerDisconnectEvent(this)); callEvent(PlayerDisconnectEvent.class, new PlayerDisconnectEvent(this));
super.remove(); super.remove();

View File

@ -1,9 +1,9 @@
package net.minestom.server.lock; package net.minestom.server.lock;
import net.minestom.server.instance.Chunk;
import net.minestom.server.thread.BatchThread; import net.minestom.server.thread.BatchThread;
import net.minestom.server.thread.batch.BatchInfo; import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.List; import java.util.List;
import java.util.concurrent.Phaser; import java.util.concurrent.Phaser;
@ -32,8 +32,7 @@ public interface Acquirable<T> {
Acquisition.AcquisitionData data = new Acquisition.AcquisitionData(); Acquisition.AcquisitionData data = new Acquisition.AcquisitionData();
final Handler handler = getHandler(); final Handler handler = getHandler();
final BatchInfo batchInfo = handler.getBatchInfo(); final BatchThread elementThread = handler.getBatchThread();
final BatchThread elementThread = batchInfo != null ? batchInfo.getBatchThread() : null;
final boolean sameThread = Acquisition.acquire(currentThread, elementThread, data); final boolean sameThread = Acquisition.acquire(currentThread, elementThread, data);
@ -80,25 +79,29 @@ public interface Acquirable<T> {
class Handler { class Handler {
private volatile BatchInfo batchInfo; private volatile BatchThread batchThread;
private volatile Chunk batchChunk;
@Nullable public BatchThread getBatchThread() {
public BatchInfo getBatchInfo() { return batchThread;
return batchInfo;
} }
public void refreshBatchInfo(@NotNull BatchInfo batchInfo) { public Chunk getBatchChunk() {
this.batchInfo = batchInfo; return batchChunk;
}
@ApiStatus.Internal
public void refreshBatchInfo(BatchThread batchThread, Chunk batchChunk) {
this.batchThread = batchThread;
this.batchChunk = batchChunk;
} }
/** /**
* Executed during this element tick to empty the current thread acquisition queue. * Executed during this element tick to empty the current thread acquisition queue.
*/ */
public void acquisitionTick() { public void acquisitionTick() {
final BatchThread batchThread = batchInfo.getBatchThread();
if (batchThread == null) if (batchThread == null)
return; return;
Acquisition.processQueue(batchThread.getQueue()); Acquisition.processQueue(batchThread.getQueue());
} }
} }

View File

@ -3,7 +3,6 @@ package net.minestom.server.lock;
import net.minestom.server.MinecraftServer; import net.minestom.server.MinecraftServer;
import net.minestom.server.thread.BatchQueue; import net.minestom.server.thread.BatchQueue;
import net.minestom.server.thread.BatchThread; import net.minestom.server.thread.BatchThread;
import net.minestom.server.thread.batch.BatchInfo;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
@ -250,8 +249,7 @@ public final class Acquisition {
for (T element : collection) { for (T element : collection) {
final E value = element.unwrap(); final E value = element.unwrap();
final BatchInfo batchInfo = element.getHandler().getBatchInfo(); final BatchThread elementThread = element.getHandler().getBatchThread();
final BatchThread elementThread = batchInfo != null ? batchInfo.getBatchThread() : null;
if (currentThread == elementThread) { if (currentThread == elementThread) {
// The element is managed in the current thread, consumer can be immediately called // The element is managed in the current thread, consumer can be immediately called
consumer.accept(value); consumer.accept(value);

View File

@ -17,7 +17,6 @@ public interface LockedElement {
* *
* @return the acquirable element linked to this object * @return the acquirable element linked to this object
*/ */
@NotNull <T> @NotNull Acquirable<T> getAcquiredElement();
<T> Acquirable<T> getAcquiredElement();
} }

View File

@ -87,10 +87,10 @@ public class BatchThread extends Thread {
} }
} }
public synchronized void startTick(@NotNull CountDownLatch countDownLatch, Runnable runnable) { public synchronized void startTick(@NotNull CountDownLatch countDownLatch, @NotNull Runnable runnable) {
this.countDownLatch = countDownLatch;
this.queue.add(runnable);
synchronized (tickLock) { synchronized (tickLock) {
this.queue.add(runnable);
this.countDownLatch = countDownLatch;
this.tickLock.notifyAll(); this.tickLock.notifyAll();
} }
} }

View File

@ -1,28 +1,27 @@
package net.minestom.server.thread; package net.minestom.server.thread;
import net.minestom.server.UpdateManager; import net.minestom.server.MinecraftServer;
import net.minestom.server.entity.Entity; import net.minestom.server.entity.Entity;
import net.minestom.server.instance.Chunk; import net.minestom.server.instance.Chunk;
import net.minestom.server.instance.Instance; import net.minestom.server.instance.Instance;
import net.minestom.server.lock.Acquirable;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
/** /**
* Used to link chunks into multiple groups. * Used to link chunks into multiple groups.
* Then executed into a thread pool. * Then executed into a thread pool.
* <p>
* You can change the current thread provider by calling {@link UpdateManager#setThreadProvider(ThreadProvider)}.
*/ */
public abstract class ThreadProvider { public abstract class ThreadProvider {
private final List<BatchThread> threads; private final List<BatchThread> threads;
private final Map<Integer, List<ChunkEntry>> threadChunkMap = new HashMap<>(); private final Map<BatchThread, Set<ChunkEntry>> threadChunkMap = new HashMap<>();
private final Map<Chunk, ChunkEntry> chunkEntryMap = new HashMap<>(); private final Map<Chunk, ChunkEntry> chunkEntryMap = new HashMap<>();
private final Set<Entity> removedEntities = ConcurrentHashMap.newKeySet();
private final ArrayDeque<Chunk> batchHandlers = new ArrayDeque<>();
public ThreadProvider(int threadCount) { public ThreadProvider(int threadCount) {
this.threads = new ArrayList<>(threadCount); this.threads = new ArrayList<>(threadCount);
@ -36,19 +35,19 @@ public abstract class ThreadProvider {
} }
} }
public void onInstanceCreate(@NotNull Instance instance) { public synchronized void onInstanceCreate(@NotNull Instance instance) {
instance.getChunks().forEach(this::addChunk); instance.getChunks().forEach(this::addChunk);
} }
public void onInstanceDelete(@NotNull Instance instance) { public synchronized void onInstanceDelete(@NotNull Instance instance) {
instance.getChunks().forEach(this::removeChunk); instance.getChunks().forEach(this::removeChunk);
} }
public void onChunkLoad(Chunk chunk) { public synchronized void onChunkLoad(Chunk chunk) {
addChunk(chunk); addChunk(chunk);
} }
public void onChunkUnload(Chunk chunk) { public synchronized void onChunkUnload(Chunk chunk) {
removeChunk(chunk); removeChunk(chunk);
} }
@ -60,8 +59,9 @@ public abstract class ThreadProvider {
public abstract int findThread(@NotNull Chunk chunk); public abstract int findThread(@NotNull Chunk chunk);
protected void addChunk(Chunk chunk) { protected void addChunk(Chunk chunk) {
int thread = findThread(chunk); int threadId = findThread(chunk);
var chunks = threadChunkMap.computeIfAbsent(thread, ArrayList::new); BatchThread thread = threads.get(threadId);
var chunks = threadChunkMap.computeIfAbsent(thread, batchThread -> ConcurrentHashMap.newKeySet());
ChunkEntry chunkEntry = new ChunkEntry(thread, chunk); ChunkEntry chunkEntry = new ChunkEntry(thread, chunk);
chunks.add(chunkEntry); chunks.add(chunkEntry);
@ -72,8 +72,8 @@ public abstract class ThreadProvider {
protected void removeChunk(Chunk chunk) { protected void removeChunk(Chunk chunk) {
ChunkEntry chunkEntry = chunkEntryMap.get(chunk); ChunkEntry chunkEntry = chunkEntryMap.get(chunk);
if (chunkEntry != null) { if (chunkEntry != null) {
int threadId = chunkEntry.threadId; BatchThread thread = chunkEntry.thread;
var chunks = threadChunkMap.get(threadId); var chunks = threadChunkMap.get(thread);
if (chunks != null) { if (chunks != null) {
chunks.remove(chunkEntry); chunks.remove(chunkEntry);
} }
@ -86,34 +86,20 @@ public abstract class ThreadProvider {
* *
* @param time the tick time in milliseconds * @param time the tick time in milliseconds
*/ */
public @NotNull CountDownLatch update(long time) { public synchronized @NotNull CountDownLatch update(long time) {
CountDownLatch countDownLatch = new CountDownLatch(threads.size()); CountDownLatch countDownLatch = new CountDownLatch(threads.size());
for (BatchThread thread : threads) { for (BatchThread thread : threads) {
final int id = threads.indexOf(thread); final var chunkEntries = threadChunkMap.get(thread);
if (id == -1) {
countDownLatch.countDown();
continue;
}
final var chunkEntries = threadChunkMap.get(id);
if (chunkEntries == null) { if (chunkEntries == null) {
countDownLatch.countDown(); countDownLatch.countDown();
continue; continue;
} }
// Cache chunk entities
Map<Chunk, List<Entity>> chunkListMap = new HashMap<>(chunkEntries.size());
for (ChunkEntry chunkEntry : chunkEntries) {
var chunk = chunkEntry.chunk;
var entities = new ArrayList<>(chunk.getInstance().getChunkEntities(chunk));
chunkListMap.put(chunk, entities);
}
// Execute tick // Execute tick
thread.getMainRunnable().startTick(countDownLatch, () -> { thread.getMainRunnable().startTick(countDownLatch, () -> {
chunkListMap.forEach((chunk, entities) -> { chunkEntries.forEach(chunkEntry -> {
chunk.tick(time); chunkEntry.chunk.tick(time);
entities.forEach(entity -> { chunkEntry.entities.forEach(entity -> {
entity.tick(time); entity.tick(time);
}); });
}); });
@ -122,6 +108,56 @@ public abstract class ThreadProvider {
return countDownLatch; return countDownLatch;
} }
public synchronized void refreshThreads() {
// Clear removed entities
for (Entity entity : removedEntities) {
Acquirable<Entity> acquirable = entity.getAcquiredElement();
Chunk batchChunk = acquirable.getHandler().getBatchChunk();
// Remove from list
{
ChunkEntry chunkEntry = chunkEntryMap.get(batchChunk);
if (chunkEntry != null) {
chunkEntry.entities.remove(entity);
}
}
}
this.removedEntities.clear();
for (Instance instance : MinecraftServer.getInstanceManager().getInstances()) {
var entities = instance.getEntities();
for (Entity entity : entities) {
Acquirable<Entity> acquirable = entity.getAcquiredElement();
Chunk batchChunk = acquirable.getHandler().getBatchChunk();
Chunk entityChunk = entity.getChunk();
if (!Objects.equals(batchChunk, entityChunk)) {
// Entity is possibly not in the correct thread
// Remove from previous list
{
ChunkEntry chunkEntry = chunkEntryMap.get(batchChunk);
if (chunkEntry != null) {
chunkEntry.entities.remove(entity);
}
}
// Add to new list
{
ChunkEntry chunkEntry = chunkEntryMap.get(entityChunk);
if (chunkEntry != null) {
chunkEntry.entities.add(entity);
acquirable.getHandler().refreshBatchInfo(chunkEntry.thread, chunkEntry.chunk);
}
}
}
}
}
}
public void removeEntity(Entity entity) {
this.removedEntities.add(entity);
}
public void shutdown() { public void shutdown() {
this.threads.forEach(BatchThread::shutdown); this.threads.forEach(BatchThread::shutdown);
@ -133,11 +169,12 @@ public abstract class ThreadProvider {
} }
private static class ChunkEntry { private static class ChunkEntry {
private final int threadId; private final BatchThread thread;
private final Chunk chunk; private final Chunk chunk;
private final List<Entity> entities = new ArrayList<>();
private ChunkEntry(int threadId, Chunk chunk) { private ChunkEntry(BatchThread thread, Chunk chunk) {
this.threadId = threadId; this.thread = thread;
this.chunk = chunk; this.chunk = chunk;
} }