Simplify ThreadDispatcher even further

Signed-off-by: TheMode <themode@outlook.fr>
This commit is contained in:
TheMode 2021-09-15 17:33:01 +02:00
parent 9ab5e746ca
commit 0e8c07a811

View File

@ -12,7 +12,6 @@ import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Phaser; import java.util.concurrent.Phaser;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
/** /**
* Used to link chunks into multiple groups. * Used to link chunks into multiple groups.
@ -22,10 +21,14 @@ public final class ThreadDispatcher {
private final ThreadProvider provider; private final ThreadProvider provider;
private final List<TickThread> threads; private final List<TickThread> threads;
private final Map<TickThread, Set<ChunkEntry>> threadChunkMap = new HashMap<>(); // Contains all the chunks being ticked in a thread
private final Map<TickThread, Collection<ChunkEntry>> threadChunkMap = new HashMap<>();
// Chunk -> ChunkEntry mapping
private final Map<Chunk, ChunkEntry> chunkEntryMap = new HashMap<>(); private final Map<Chunk, ChunkEntry> chunkEntryMap = new HashMap<>();
// Queue to update chunks linked thread
private final ArrayDeque<Chunk> chunkUpdateQueue = new ArrayDeque<>(); private final ArrayDeque<Chunk> chunkUpdateQueue = new ArrayDeque<>();
// Requests consumed at the end of each tick
private final Queue<Chunk> chunkLoadRequests = new ConcurrentLinkedQueue<>(); private final Queue<Chunk> chunkLoadRequests = new ConcurrentLinkedQueue<>();
private final Queue<Chunk> chunkUnloadRequests = new ConcurrentLinkedQueue<>(); private final Queue<Chunk> chunkUnloadRequests = new ConcurrentLinkedQueue<>();
private final Queue<Entity> entityUpdateRequests = new ConcurrentLinkedQueue<>(); private final Queue<Entity> entityUpdateRequests = new ConcurrentLinkedQueue<>();
@ -88,7 +91,7 @@ public final class ThreadDispatcher {
*/ */
public void updateAndAwait(long time) { public void updateAndAwait(long time) {
for (var entry : threadChunkMap.entrySet()) { for (var entry : threadChunkMap.entrySet()) {
final Set<ChunkEntry> chunkEntries = entry.getValue(); final Collection<ChunkEntry> chunkEntries = entry.getValue();
if (chunkEntries == null || chunkEntries.isEmpty()) { if (chunkEntries == null || chunkEntries.isEmpty()) {
// Nothing to tick // Nothing to tick
continue; continue;
@ -152,15 +155,9 @@ public final class ThreadDispatcher {
int counter = 0; int counter = 0;
while (true) { while (true) {
final Chunk chunk = chunkUpdateQueue.pollFirst(); final Chunk chunk = chunkUpdateQueue.pollFirst();
if (!ChunkUtils.isLoaded(chunk)) { if (chunk == null) break;
removeChunk(chunk); updateChunk(chunk);
continue; this.chunkUpdateQueue.addLast(chunk);
}
// Update chunk threads
switchChunk(chunk);
// Add back to the deque
chunkUpdateQueue.addLast(chunk);
if (++counter > size || System.currentTimeMillis() >= endTime) if (++counter > size || System.currentTimeMillis() >= endTime)
break; break;
} }
@ -199,47 +196,25 @@ public final class ThreadDispatcher {
this.entityRemovalRequests.add(entity); this.entityRemovalRequests.add(entity);
} }
private void switchChunk(@NotNull Chunk chunk) { private TickThread retrieveThread(Chunk chunk) {
final int threadId = Math.abs(provider.findThread(chunk)) % threads.size();
return threads.get(threadId);
}
private void updateChunk(@NotNull Chunk chunk) {
ChunkEntry chunkEntry = chunkEntryMap.get(chunk); ChunkEntry chunkEntry = chunkEntryMap.get(chunk);
if (chunkEntry == null) return; if (chunkEntry == null) return;
Set<ChunkEntry> chunks = threadChunkMap.get(chunkEntry.thread); Collection<ChunkEntry> chunks = threadChunkMap.get(chunkEntry.thread);
if (chunks == null || chunks.isEmpty()) return; if (chunks == null || chunks.isEmpty()) return;
chunks.remove(chunkEntry); chunkEntry.thread = retrieveThread(chunk);
setChunkThread(chunk, tickThread -> {
chunkEntry.thread = tickThread;
return chunkEntry;
});
}
private @NotNull ChunkEntry setChunkThread(@NotNull Chunk chunk,
@NotNull Function<TickThread, ChunkEntry> chunkEntrySupplier) {
final int threadId = Math.abs(provider.findThread(chunk)) % threads.size();
TickThread thread = threads.get(threadId);
Set<ChunkEntry> chunks = threadChunkMap.computeIfAbsent(thread, tickThread -> new HashSet<>());
ChunkEntry chunkEntry = chunkEntrySupplier.apply(thread);
chunks.add(chunkEntry);
return chunkEntry;
}
private void removeChunk(Chunk chunk) {
ChunkEntry chunkEntry = chunkEntryMap.get(chunk);
if (chunkEntry != null) {
TickThread thread = chunkEntry.thread;
Set<ChunkEntry> chunks = threadChunkMap.get(thread);
if (chunks != null) {
chunks.remove(chunkEntry);
}
chunkEntryMap.remove(chunk);
}
this.chunkUpdateQueue.remove(chunk);
} }
private void processLoadedChunks() { private void processLoadedChunks() {
Chunk chunk; Chunk chunk;
while ((chunk = chunkLoadRequests.poll()) != null) { while ((chunk = chunkLoadRequests.poll()) != null) {
Chunk finalChunk = chunk; final TickThread thread = retrieveThread(chunk);
ChunkEntry chunkEntry = setChunkThread(chunk, (thread) -> new ChunkEntry(thread, finalChunk)); final ChunkEntry chunkEntry = new ChunkEntry(thread, chunk);
this.threadChunkMap.computeIfAbsent(thread, t -> new ArrayList<>()).add(chunkEntry);
this.chunkEntryMap.put(chunk, chunkEntry); this.chunkEntryMap.put(chunk, chunkEntry);
this.chunkUpdateQueue.add(chunk); this.chunkUpdateQueue.add(chunk);
} }
@ -248,7 +223,13 @@ public final class ThreadDispatcher {
private void processUnloadedChunks() { private void processUnloadedChunks() {
Chunk chunk; Chunk chunk;
while ((chunk = chunkUnloadRequests.poll()) != null) { while ((chunk = chunkUnloadRequests.poll()) != null) {
removeChunk(chunk); final ChunkEntry chunkEntry = chunkEntryMap.remove(chunk);
if (chunkEntry != null) {
final TickThread thread = chunkEntry.thread;
Collection<ChunkEntry> chunks = threadChunkMap.get(thread);
if (chunks != null) chunks.remove(chunkEntry);
}
this.chunkUpdateQueue.remove(chunk);
} }
} }