Renamed BatchThread.java

This commit is contained in:
TheMode 2021-04-23 09:58:52 +02:00
parent defdbea29b
commit 0a837d2714
6 changed files with 71 additions and 60 deletions

View File

@ -1,7 +1,7 @@
package net.minestom.server.entity.acquirable;
import net.minestom.server.entity.Entity;
import net.minestom.server.thread.BatchThread;
import net.minestom.server.thread.TickThread;
import net.minestom.server.thread.ThreadProvider;
import net.minestom.server.utils.consumer.EntityConsumer;
import org.jetbrains.annotations.ApiStatus;
@ -61,7 +61,7 @@ public class AcquirableEntity {
*/
public void acquire(@NotNull EntityConsumer consumer) {
final Thread currentThread = Thread.currentThread();
final BatchThread elementThread = getHandler().getBatchThread();
final TickThread elementThread = getHandler().getBatchThread();
Acquisition.acquire(currentThread, elementThread, () -> consumer.accept(unwrap()));
}
@ -75,7 +75,7 @@ public class AcquirableEntity {
*/
public boolean tryAcquire(@NotNull EntityConsumer consumer) {
final Thread currentThread = Thread.currentThread();
final BatchThread elementThread = getHandler().getBatchThread();
final TickThread elementThread = getHandler().getBatchThread();
if (Objects.equals(currentThread, elementThread)) {
consumer.accept(unwrap());
return true;
@ -91,7 +91,7 @@ public class AcquirableEntity {
*/
public @Nullable Entity tryAcquire() {
final Thread currentThread = Thread.currentThread();
final BatchThread elementThread = getHandler().getBatchThread();
final TickThread elementThread = getHandler().getBatchThread();
if (Objects.equals(currentThread, elementThread)) {
return unwrap();
}
@ -143,7 +143,7 @@ public class AcquirableEntity {
this.chunkEntry = chunkEntry;
}
public BatchThread getBatchThread() {
public TickThread getBatchThread() {
return chunkEntry != null ? chunkEntry.getThread() : null;
}
}

View File

@ -1,7 +1,7 @@
package net.minestom.server.entity.acquirable;
import net.minestom.server.entity.Entity;
import net.minestom.server.thread.BatchThread;
import net.minestom.server.thread.TickThread;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -32,15 +32,15 @@ public final class Acquisition {
public static void acquireForEach(@NotNull Collection<AcquirableEntity> collection,
@NotNull Consumer<Entity> consumer) {
final Thread currentThread = Thread.currentThread();
Map<BatchThread, List<Entity>> threadCacheMap = retrieveOptionalThreadMap(collection, currentThread, consumer);
Map<TickThread, List<Entity>> threadCacheMap = retrieveOptionalThreadMap(collection, currentThread, consumer);
// Acquire all the threads one by one
{
for (Map.Entry<BatchThread, List<Entity>> entry : threadCacheMap.entrySet()) {
final BatchThread batchThread = entry.getKey();
for (Map.Entry<TickThread, List<Entity>> entry : threadCacheMap.entrySet()) {
final TickThread tickThread = entry.getKey();
final List<Entity> entities = entry.getValue();
acquire(currentThread, batchThread, () -> {
acquire(currentThread, tickThread, () -> {
for (Entity entity : entities) {
consumer.accept(entity);
}
@ -76,7 +76,7 @@ public final class Acquisition {
/**
* Ensures that {@code callback} is safely executed inside the batch thread.
*/
protected static void acquire(@NotNull Thread currentThread, @Nullable BatchThread elementThread,
protected static void acquire(@NotNull Thread currentThread, @Nullable TickThread elementThread,
@NotNull Runnable callback) {
if (Objects.equals(currentThread, elementThread)) {
callback.run();
@ -87,14 +87,14 @@ public final class Acquisition {
}
}
protected static ReentrantLock acquireEnter(Thread currentThread, BatchThread elementThread) {
protected static ReentrantLock acquireEnter(Thread currentThread, TickThread elementThread) {
// Monitoring
long time = System.nanoTime();
ReentrantLock currentLock;
{
final BatchThread current = currentThread instanceof BatchThread ?
(BatchThread) currentThread : null;
final TickThread current = currentThread instanceof TickThread ?
(TickThread) currentThread : null;
currentLock = current != null && current.getLock().isHeldByCurrentThread() ?
current.getLock() : null;
}
@ -118,7 +118,7 @@ public final class Acquisition {
return !acquired ? lock : null;
}
protected static ReentrantLock acquireEnter(BatchThread elementThread) {
protected static ReentrantLock acquireEnter(TickThread elementThread) {
return acquireEnter(Thread.currentThread(), elementThread);
}
@ -145,23 +145,23 @@ public final class Acquisition {
* @param consumer the consumer to execute when an element is already in the current thread
* @return a new Thread to acquirable elements map
*/
protected static Map<BatchThread, List<Entity>> retrieveOptionalThreadMap(@NotNull Collection<AcquirableEntity> collection,
@NotNull Thread currentThread,
@NotNull Consumer<? super Entity> consumer) {
protected static Map<TickThread, List<Entity>> retrieveOptionalThreadMap(@NotNull Collection<AcquirableEntity> collection,
@NotNull Thread currentThread,
@NotNull Consumer<? super Entity> consumer) {
// Separate a collection of acquirable elements into a map of thread->elements
// Useful to reduce the number of acquisition
Map<BatchThread, List<Entity>> threadCacheMap = new HashMap<>();
Map<TickThread, List<Entity>> threadCacheMap = new HashMap<>();
for (AcquirableEntity element : collection) {
final Entity value = element.unwrap();
final BatchThread elementThread = element.getHandler().getBatchThread();
final TickThread elementThread = element.getHandler().getBatchThread();
if (currentThread == elementThread) {
// The element is managed in the current thread, consumer can be immediately called
consumer.accept(value);
} else {
// The element is manager in a different thread, cache it
List<Entity> threadCacheList = threadCacheMap.computeIfAbsent(elementThread, batchThread -> new ArrayList<>());
List<Entity> threadCacheList = threadCacheMap.computeIfAbsent(elementThread, tickThread -> new ArrayList<>());
threadCacheList.add(value);
}
}
@ -169,15 +169,15 @@ public final class Acquisition {
return threadCacheMap;
}
protected static Map<BatchThread, List<Entity>> retrieveThreadMap(@NotNull Collection<AcquirableEntity> collection) {
protected static Map<TickThread, List<Entity>> retrieveThreadMap(@NotNull Collection<AcquirableEntity> collection) {
// Separate a collection of acquirable elements into a map of thread->elements
// Useful to reduce the number of acquisition
Map<BatchThread, List<Entity>> threadCacheMap = new HashMap<>();
Map<TickThread, List<Entity>> threadCacheMap = new HashMap<>();
for (AcquirableEntity acquirableEntity : collection) {
final Entity entity = acquirableEntity.unwrap();
final BatchThread elementThread = acquirableEntity.getHandler().getBatchThread();
final TickThread elementThread = acquirableEntity.getHandler().getBatchThread();
List<Entity> threadCacheList = threadCacheMap.computeIfAbsent(elementThread, batchThread -> new ArrayList<>());
List<Entity> threadCacheList = threadCacheMap.computeIfAbsent(elementThread, tickThread -> new ArrayList<>());
threadCacheList.add(entity);
}

View File

@ -24,9 +24,9 @@ import java.util.stream.Collectors;
*/
public abstract class ThreadProvider {
private final List<BatchThread> threads;
private final List<TickThread> threads;
private final Map<BatchThread, Set<ChunkEntry>> threadChunkMap = new HashMap<>();
private final Map<TickThread, Set<ChunkEntry>> threadChunkMap = new HashMap<>();
private final Map<Chunk, ChunkEntry> chunkEntryMap = new HashMap<>();
// Iterated over to refresh the thread used to update entities & chunks
private final ArrayDeque<Chunk> chunks = new ArrayDeque<>();
@ -40,11 +40,11 @@ public abstract class ThreadProvider {
this.threads = new ArrayList<>(threadCount);
for (int i = 0; i < threadCount; i++) {
final BatchThread.BatchRunnable batchRunnable = new BatchThread.BatchRunnable();
final BatchThread batchThread = new BatchThread(batchRunnable, i);
this.threads.add(batchThread);
final TickThread.BatchRunnable batchRunnable = new TickThread.BatchRunnable();
final TickThread tickThread = new TickThread(batchRunnable, i);
this.threads.add(tickThread);
batchThread.start();
tickThread.start();
}
}
@ -126,17 +126,17 @@ public abstract class ThreadProvider {
return;
chunks.remove(chunkEntry);
setChunkThread(chunk, batchThread -> {
chunkEntry.thread = batchThread;
setChunkThread(chunk, tickThread -> {
chunkEntry.thread = tickThread;
return chunkEntry;
});
}
protected @NotNull ChunkEntry setChunkThread(@NotNull Chunk chunk,
@NotNull Function<@NotNull BatchThread, @NotNull ChunkEntry> chunkEntrySupplier) {
@NotNull Function<@NotNull TickThread, @NotNull ChunkEntry> chunkEntrySupplier) {
final int threadId = getThreadId(chunk);
BatchThread thread = threads.get(threadId);
var chunks = threadChunkMap.computeIfAbsent(thread, batchThread -> ConcurrentHashMap.newKeySet());
TickThread thread = threads.get(threadId);
var chunks = threadChunkMap.computeIfAbsent(thread, tickThread -> ConcurrentHashMap.newKeySet());
ChunkEntry chunkEntry = chunkEntrySupplier.apply(thread);
chunks.add(chunkEntry);
@ -146,7 +146,7 @@ public abstract class ThreadProvider {
protected void removeChunk(Chunk chunk) {
ChunkEntry chunkEntry = chunkEntryMap.get(chunk);
if (chunkEntry != null) {
BatchThread thread = chunkEntry.thread;
TickThread thread = chunkEntry.thread;
var chunks = threadChunkMap.get(thread);
if (chunks != null) {
chunks.remove(chunkEntry);
@ -167,7 +167,7 @@ public abstract class ThreadProvider {
*/
public synchronized @NotNull CountDownLatch update(long time) {
CountDownLatch countDownLatch = new CountDownLatch(threads.size());
for (BatchThread thread : threads) {
for (TickThread thread : threads) {
final var chunkEntries = threadChunkMap.get(thread);
if (chunkEntries == null || chunkEntries.isEmpty()) {
// The thread never had any task
@ -176,7 +176,7 @@ public abstract class ThreadProvider {
}
// Execute tick
thread.getMainRunnable().startTick(countDownLatch, () -> {
thread.runnable.startTick(countDownLatch, () -> {
final var entitiesList = chunkEntries.stream().map(chunkEntry -> chunkEntry.entities).collect(Collectors.toList());
final var entities = entitiesList.stream()
.flatMap(Collection::stream)
@ -270,10 +270,10 @@ public abstract class ThreadProvider {
}
public void shutdown() {
this.threads.forEach(BatchThread::shutdown);
this.threads.forEach(TickThread::shutdown);
}
public @NotNull List<@NotNull BatchThread> getThreads() {
public @NotNull List<@NotNull TickThread> getThreads() {
return threads;
}
@ -326,16 +326,16 @@ public abstract class ThreadProvider {
}
public static class ChunkEntry {
private volatile BatchThread thread;
private volatile TickThread thread;
private final Chunk chunk;
private final List<Entity> entities = new ArrayList<>();
private ChunkEntry(BatchThread thread, Chunk chunk) {
private ChunkEntry(TickThread thread, Chunk chunk) {
this.thread = thread;
this.chunk = chunk;
}
public @NotNull BatchThread getThread() {
public @NotNull TickThread getThread() {
return thread;
}

View File

@ -11,35 +11,44 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
public class BatchThread extends Thread {
/**
* Thread responsible for ticking {@link net.minestom.server.instance.Chunk chunks} and {@link net.minestom.server.entity.Entity entities}.
* <p>
* Created in {@link ThreadProvider}, and awaken every tick with a task to execute.
*/
public class TickThread extends Thread {
private final BatchRunnable runnable;
protected final BatchRunnable runnable;
private final ReentrantLock lock = new ReentrantLock();
public BatchThread(@NotNull BatchRunnable runnable, int number) {
public TickThread(@NotNull BatchRunnable runnable, int number) {
super(runnable, MinecraftServer.THREAD_NAME_TICK + "-" + number);
this.runnable = runnable;
this.runnable.setLinkedThread(this);
}
public @NotNull BatchRunnable getMainRunnable() {
return runnable;
}
/**
* Gets the lock used to ensure the safety of entity acquisition.
*
* @return the thread lock
*/
public @NotNull ReentrantLock getLock() {
return lock;
}
/**
* Shutdowns the thread. Cannot be undone.
*/
public void shutdown() {
this.runnable.stop = true;
LockSupport.unpark(this);
}
public static class BatchRunnable implements Runnable {
protected static class BatchRunnable implements Runnable {
private volatile boolean stop;
private BatchThread batchThread;
private TickThread tickThread;
private volatile boolean inTick;
private final AtomicReference<CountDownLatch> countDownLatch = new AtomicReference<>();
@ -48,9 +57,9 @@ public class BatchThread extends Thread {
@Override
public void run() {
Check.notNull(batchThread, "The linked BatchThread cannot be null!");
Check.notNull(tickThread, "The linked BatchThread cannot be null!");
while (!stop) {
LockSupport.park(batchThread);
LockSupport.park(tickThread);
if (stop)
break;
CountDownLatch localCountDownLatch = this.countDownLatch.get();
@ -79,15 +88,15 @@ public class BatchThread extends Thread {
public synchronized void startTick(@NotNull CountDownLatch countDownLatch, @NotNull Runnable runnable) {
this.countDownLatch.set(countDownLatch);
this.queue.add(runnable);
LockSupport.unpark(batchThread);
LockSupport.unpark(tickThread);
}
public boolean isInTick() {
return inTick;
}
private void setLinkedThread(BatchThread batchThread) {
this.batchThread = batchThread;
private void setLinkedThread(TickThread tickThread) {
this.tickThread = tickThread;
}
}

View File

@ -1,8 +1,9 @@
package net.minestom.server.utils.consumer;
import net.minestom.server.entity.Entity;
import org.jetbrains.annotations.NotNull;
@FunctionalInterface
public interface EntityConsumer {
void accept(Entity entity);
void accept(@NotNull Entity entity);
}

View File

@ -1,7 +1,8 @@
package net.minestom.server.utils.consumer;
import net.minestom.server.entity.Player;
import org.jetbrains.annotations.NotNull;
public interface PlayerConsumer extends EntityConsumer {
void accept(Player player);
void accept(@NotNull Player player);
}