Merge chunk task urgent executor thread into the worker queue

By keeping them separate, urgent tasks could not be executed
by the worker queue.
This commit is contained in:
Spottedleaf 2022-01-07 15:08:34 -08:00
parent 5ccd3050ed
commit eb48ada00b
2 changed files with 63 additions and 37 deletions

View File

@ -912,10 +912,10 @@ index 0000000000000000000000000000000000000000..a630a84b60b4517e3bc330d4983b914b
+} +}
diff --git a/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java diff --git a/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java
new file mode 100644 new file mode 100644
index 0000000000000000000000000000000000000000..97f2e433c483f1ebd7500ae142269e144ef5fda4 index 0000000000000000000000000000000000000000..24fe40c14cc50f8357a9c7a7493140fdea016a3d
--- /dev/null --- /dev/null
+++ b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java +++ b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java
@@ -0,0 +1,277 @@ @@ -0,0 +1,298 @@
+package com.destroystokyo.paper.io; +package com.destroystokyo.paper.io;
+ +
+import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentLinkedQueue;
@ -1022,6 +1022,27 @@ index 0000000000000000000000000000000000000000..97f2e433c483f1ebd7500ae142269e14
+ } + }
+ +
+ /** + /**
+ * Polls the highest priority task currently available. {@code null} if none.
+ */
+ public T poll(final int lowestPriority) {
+ T task;
+ final int max = Math.min(LOWEST_PRIORITY, lowestPriority);
+ for (int i = 0; i <= max; ++i) {
+ final ConcurrentLinkedQueue<T> queue = this.queues[i];
+
+ while ((task = queue.poll()) != null) {
+ final int prevPriority = task.tryComplete(i);
+ if (prevPriority != COMPLETING_PRIORITY && prevPriority <= i) {
+ // if the prev priority was greater-than or equal to our current priority
+ return task;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Returns whether this queue may have tasks queued. + * Returns whether this queue may have tasks queued.
+ * <p> + * <p>
+ * This operation is not atomic, but is MT-Safe. + * This operation is not atomic, but is MT-Safe.
@ -1195,10 +1216,10 @@ index 0000000000000000000000000000000000000000..97f2e433c483f1ebd7500ae142269e14
+} +}
diff --git a/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java diff --git a/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java
new file mode 100644 new file mode 100644
index 0000000000000000000000000000000000000000..ee906b594b306906c170180a29a8b61997d05168 index 0000000000000000000000000000000000000000..64b772dc1ed857ccd6999591f89dd89aface0649
--- /dev/null --- /dev/null
+++ b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java +++ b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java
@@ -0,0 +1,241 @@ @@ -0,0 +1,254 @@
+package com.destroystokyo.paper.io; +package com.destroystokyo.paper.io;
+ +
+import net.minecraft.server.MinecraftServer; +import net.minecraft.server.MinecraftServer;
@ -1222,6 +1243,19 @@ index 0000000000000000000000000000000000000000..ee906b594b306906c170180a29a8b619
+ protected volatile ConcurrentLinkedQueue<Thread> flushQueue = new ConcurrentLinkedQueue<>(); + protected volatile ConcurrentLinkedQueue<Thread> flushQueue = new ConcurrentLinkedQueue<>();
+ protected volatile long flushCycles; + protected volatile long flushCycles;
+ +
+ protected int lowestPriorityToPoll = PrioritizedTaskQueue.LOWEST_PRIORITY;
+
+ public int getLowestPriorityToPoll() {
+ return this.lowestPriorityToPoll;
+ }
+
+ public void setLowestPriorityToPoll(final int lowestPriorityToPoll) {
+ if (this.isAlive()) {
+ throw new IllegalStateException("Cannot set after starting");
+ }
+ this.lowestPriorityToPoll = lowestPriorityToPoll;
+ }
+
+ public QueueExecutorThread(final PrioritizedTaskQueue<T> queue) { + public QueueExecutorThread(final PrioritizedTaskQueue<T> queue) {
+ this(queue, (int)(1.e6)); // 1.0ms + this(queue, (int)(1.e6)); // 1.0ms
+ } + }
@ -1300,7 +1334,7 @@ index 0000000000000000000000000000000000000000..ee906b594b306906c170180a29a8b619
+ Runnable task; + Runnable task;
+ boolean ret = false; + boolean ret = false;
+ +
+ while ((task = this.queue.poll()) != null) { + while ((task = this.queue.poll(this.lowestPriorityToPoll)) != null) {
+ ret = true; + ret = true;
+ try { + try {
+ task.run(); + task.run();
@ -1749,10 +1783,10 @@ index 0000000000000000000000000000000000000000..058fb5a41565e6ce2acbd1f4d071a1b8
+} +}
diff --git a/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java diff --git a/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java
new file mode 100644 new file mode 100644
index 0000000000000000000000000000000000000000..18ae2e2b339d357fbe0f6f2b18bc14c0dfe4c222 index 0000000000000000000000000000000000000000..db6f8ac2ae068d5bc2ec2d587ab625c4956f6947
--- /dev/null --- /dev/null
+++ b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java +++ b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java
@@ -0,0 +1,513 @@ @@ -0,0 +1,505 @@
+package com.destroystokyo.paper.io.chunk; +package com.destroystokyo.paper.io.chunk;
+ +
+import com.destroystokyo.paper.io.PaperFileIOThread; +import com.destroystokyo.paper.io.PaperFileIOThread;
@ -1795,9 +1829,7 @@ index 0000000000000000000000000000000000000000..18ae2e2b339d357fbe0f6f2b18bc14c0
+ private final PrioritizedTaskQueue<ChunkTask> chunkTasks = new PrioritizedTaskQueue<>(); // used if async chunks are disabled in config + private final PrioritizedTaskQueue<ChunkTask> chunkTasks = new PrioritizedTaskQueue<>(); // used if async chunks are disabled in config
+ +
+ protected static QueueExecutorThread<ChunkTask>[] globalWorkers; + protected static QueueExecutorThread<ChunkTask>[] globalWorkers;
+ protected static QueueExecutorThread<ChunkTask> globalUrgentWorker;
+ protected static PrioritizedTaskQueue<ChunkTask> globalQueue; + protected static PrioritizedTaskQueue<ChunkTask> globalQueue;
+ protected static PrioritizedTaskQueue<ChunkTask> globalUrgentQueue;
+ +
+ protected static final ConcurrentLinkedQueue<Runnable> CHUNK_WAIT_QUEUE = new ConcurrentLinkedQueue<>(); + protected static final ConcurrentLinkedQueue<Runnable> CHUNK_WAIT_QUEUE = new ConcurrentLinkedQueue<>();
+ +
@ -1891,12 +1923,12 @@ index 0000000000000000000000000000000000000000..18ae2e2b339d357fbe0f6f2b18bc14c0
+ if (threads <= 0 || globalWorkers != null) { + if (threads <= 0 || globalWorkers != null) {
+ return; + return;
+ } + }
+ ++threads; // add one for urgent executor
+ +
+ globalWorkers = new QueueExecutorThread[threads]; + globalWorkers = new QueueExecutorThread[threads];
+ globalQueue = new PrioritizedTaskQueue<>(); + globalQueue = new PrioritizedTaskQueue<>();
+ globalUrgentQueue = new PrioritizedTaskQueue<>();
+ +
+ for (int i = 0; i < threads; ++i) { + for (int i = 0; i < (threads - 1); ++i) {
+ globalWorkers[i] = new QueueExecutorThread<>(globalQueue, (long)0.10e6); //0.1ms + globalWorkers[i] = new QueueExecutorThread<>(globalQueue, (long)0.10e6); //0.1ms
+ globalWorkers[i].setName("Paper Async Chunk Task Thread #" + i); + globalWorkers[i].setName("Paper Async Chunk Task Thread #" + i);
+ globalWorkers[i].setPriority(Thread.NORM_PRIORITY - 1); + globalWorkers[i].setPriority(Thread.NORM_PRIORITY - 1);
@ -1907,14 +1939,14 @@ index 0000000000000000000000000000000000000000..18ae2e2b339d357fbe0f6f2b18bc14c0
+ globalWorkers[i].start(); + globalWorkers[i].start();
+ } + }
+ +
+ globalUrgentWorker = new QueueExecutorThread<>(globalUrgentQueue, (long)0.10e6); //0.1ms + globalWorkers[threads - 1] = new QueueExecutorThread<>(globalQueue, (long)0.10e6); //0.1ms
+ globalUrgentWorker.setName("Paper Async Chunk Urgent Task Thread"); + globalWorkers[threads - 1].setName("Paper Async Chunk Urgent Task Thread");
+ globalUrgentWorker.setPriority(Thread.NORM_PRIORITY+1); + globalWorkers[threads - 1].setPriority(Thread.NORM_PRIORITY+1);
+ globalUrgentWorker.setUncaughtExceptionHandler((final Thread thread, final Throwable throwable) -> { + globalWorkers[threads - 1].setUncaughtExceptionHandler((final Thread thread, final Throwable throwable) -> {
+ PaperFileIOThread.LOGGER.fatal("Thread '" + thread.getName() + "' threw an uncaught exception!", throwable); + PaperFileIOThread.LOGGER.fatal("Thread '" + thread.getName() + "' threw an uncaught exception!", throwable);
+ }); + });
+ + globalWorkers[threads - 1].setLowestPriorityToPoll(PrioritizedTaskQueue.HIGHEST_PRIORITY);
+ globalUrgentWorker.start(); + globalWorkers[threads - 1].start();
+ } + }
+ +
+ /** + /**
@ -2165,7 +2197,6 @@ index 0000000000000000000000000000000000000000..18ae2e2b339d357fbe0f6f2b18bc14c0
+ worker.flush(); + worker.flush();
+ } + }
+ } + }
+ if (globalUrgentWorker != null) globalUrgentWorker.flush();
+ +
+ // flush again since tasks we execute async saves + // flush again since tasks we execute async saves
+ drainChunkWaitQueue(); + drainChunkWaitQueue();
@ -2215,15 +2246,10 @@ index 0000000000000000000000000000000000000000..18ae2e2b339d357fbe0f6f2b18bc14c0
+ if (task.isScheduled() && raised && this.workers != null) { + if (task.isScheduled() && raised && this.workers != null) {
+ // only notify if we're in queue to be executed + // only notify if we're in queue to be executed
+ if (priority == PrioritizedTaskQueue.HIGHEST_PRIORITY) { + if (priority == PrioritizedTaskQueue.HIGHEST_PRIORITY) {
+ // was in another queue but became urgent later, add to urgent queue and the previous + // notify urgent worker as well
+ // queue will just have to ignore this task if it has already been started.
+ // Ultimately, we now have 2 potential queues that can pull it out whoever gets it first
+ // but the urgent queue has dedicated thread(s) so it's likely to win....
+ globalUrgentQueue.add(task);
+ this.internalScheduleNotifyUrgent(); + this.internalScheduleNotifyUrgent();
+ } else {
+ this.internalScheduleNotify();
+ } + }
+ this.internalScheduleNotify();
+ } + }
+ } + }
+ +
@ -2235,12 +2261,11 @@ index 0000000000000000000000000000000000000000..18ae2e2b339d357fbe0f6f2b18bc14c0
+ +
+ // It's important we order the task to be executed before notifying. Avoid a race condition where the worker thread + // It's important we order the task to be executed before notifying. Avoid a race condition where the worker thread
+ // wakes up and goes to sleep before we actually schedule (or it's just about to sleep) + // wakes up and goes to sleep before we actually schedule (or it's just about to sleep)
+ if (task.getPriority() == PrioritizedTaskQueue.HIGHEST_PRIORITY) {
+ globalUrgentQueue.add(task);
+ this.internalScheduleNotifyUrgent();
+ } else {
+ this.queue.add(task); + this.queue.add(task);
+ this.internalScheduleNotify(); + this.internalScheduleNotify();
+ if (task.getPriority() == PrioritizedTaskQueue.HIGHEST_PRIORITY) {
+ // notify urgent too
+ this.internalScheduleNotifyUrgent();
+ } + }
+ +
+ } + }
@ -2249,7 +2274,8 @@ index 0000000000000000000000000000000000000000..18ae2e2b339d357fbe0f6f2b18bc14c0
+ if (this.workers == null) { + if (this.workers == null) {
+ return; + return;
+ } + }
+ for (final QueueExecutorThread<ChunkTask> worker : this.workers) { + for (int i = 0, len = this.workers.length - 1; i < len; ++i) {
+ final QueueExecutorThread<ChunkTask> worker = this.workers[i];
+ if (worker.notifyTasks()) { + if (worker.notifyTasks()) {
+ // break here since we only want to wake up one worker for scheduling one task + // break here since we only want to wake up one worker for scheduling one task
+ break; + break;
@ -2259,10 +2285,10 @@ index 0000000000000000000000000000000000000000..18ae2e2b339d357fbe0f6f2b18bc14c0
+ +
+ +
+ protected void internalScheduleNotifyUrgent() { + protected void internalScheduleNotifyUrgent() {
+ if (globalUrgentWorker == null) { + if (this.workers == null) {
+ return; + return;
+ } + }
+ globalUrgentWorker.notifyTasks(); + this.workers[this.workers.length - 1].notifyTasks();
+ } + }
+ +
+} +}
@ -2318,7 +2344,7 @@ index 45de5e508540b4ba622985d530f1aadaa7eb4535..5b8b9dabc6673b6f0a335a42d2ec71a5
ChunkHolder.FullChunkStatus playerchunk_state1 = ChunkHolder.getFullChunkStatus(this.ticketLevel); ChunkHolder.FullChunkStatus playerchunk_state1 = ChunkHolder.getFullChunkStatus(this.ticketLevel);
// CraftBukkit start // CraftBukkit start
diff --git a/src/main/java/net/minecraft/server/level/ChunkMap.java b/src/main/java/net/minecraft/server/level/ChunkMap.java diff --git a/src/main/java/net/minecraft/server/level/ChunkMap.java b/src/main/java/net/minecraft/server/level/ChunkMap.java
index a8c47535ec8c0cf992c40ec74a7a3a1f78da4865..87f055f8338d4ce2f9ff76bdc6c0b7ffc266ce78 100644 index bf80f2e299108d3de70354bf45fcd0efeff42ec7..8ccdaddcef1e5e83660e58075b039b124f36fce3 100644
--- a/src/main/java/net/minecraft/server/level/ChunkMap.java --- a/src/main/java/net/minecraft/server/level/ChunkMap.java
+++ b/src/main/java/net/minecraft/server/level/ChunkMap.java +++ b/src/main/java/net/minecraft/server/level/ChunkMap.java
@@ -466,6 +466,7 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider @@ -466,6 +466,7 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider

View File

@ -23,10 +23,10 @@ Chunks in front of the player have higher priority, to help with
fast traveling players keep up with their movement. fast traveling players keep up with their movement.
diff --git a/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java diff --git a/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java
index 18ae2e2b339d357fbe0f6f2b18bc14c0dfe4c222..3b7ba9c755c82a6f086d5542d32b3567c0f98b99 100644 index db6f8ac2ae068d5bc2ec2d587ab625c4956f6947..057968ccce588072662666fef145669d63490791 100644
--- a/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java --- a/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java
+++ b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java +++ b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java
@@ -108,7 +108,7 @@ public final class ChunkTaskManager { @@ -106,7 +106,7 @@ public final class ChunkTaskManager {
} }
static void dumpChunkInfo(Set<ChunkHolder> seenChunks, ChunkHolder chunkHolder, int x, int z) { static void dumpChunkInfo(Set<ChunkHolder> seenChunks, ChunkHolder chunkHolder, int x, int z) {
@ -35,7 +35,7 @@ index 18ae2e2b339d357fbe0f6f2b18bc14c0dfe4c222..3b7ba9c755c82a6f086d5542d32b3567
} }
static void dumpChunkInfo(Set<ChunkHolder> seenChunks, ChunkHolder chunkHolder, int x, int z, int indent, int maxDepth) { static void dumpChunkInfo(Set<ChunkHolder> seenChunks, ChunkHolder chunkHolder, int x, int z, int indent, int maxDepth) {
@@ -129,6 +129,31 @@ public final class ChunkTaskManager { @@ -127,6 +127,31 @@ public final class ChunkTaskManager {
PaperFileIOThread.LOGGER.log(Level.ERROR, indentStr + "Chunk Status - " + ((chunk == null) ? "null chunk" : chunk.getStatus().toString())); PaperFileIOThread.LOGGER.log(Level.ERROR, indentStr + "Chunk Status - " + ((chunk == null) ? "null chunk" : chunk.getStatus().toString()));
PaperFileIOThread.LOGGER.log(Level.ERROR, indentStr + "Chunk Ticket Status - " + ChunkHolder.getStatus(chunkHolder.getTicketLevel())); PaperFileIOThread.LOGGER.log(Level.ERROR, indentStr + "Chunk Ticket Status - " + ChunkHolder.getStatus(chunkHolder.getTicketLevel()));
PaperFileIOThread.LOGGER.log(Level.ERROR, indentStr + "Chunk Holder Status - " + ((holderStatus == null) ? "null" : holderStatus.toString())); PaperFileIOThread.LOGGER.log(Level.ERROR, indentStr + "Chunk Holder Status - " + ((holderStatus == null) ? "null" : holderStatus.toString()));
@ -360,7 +360,7 @@ index 9e96b0465717bfa761289c255fd8d2f1df1be3d8..87271552aa85626f22f7f8569c8fb48f
return this.isEntityTickingReady; return this.isEntityTickingReady;
} }
diff --git a/src/main/java/net/minecraft/server/level/ChunkMap.java b/src/main/java/net/minecraft/server/level/ChunkMap.java diff --git a/src/main/java/net/minecraft/server/level/ChunkMap.java b/src/main/java/net/minecraft/server/level/ChunkMap.java
index 326a3b312d3446b813e325867f852e0cf6786945..3e6cc16ff6f6c7993b15bd807257c0554afb6c77 100644 index cac9c9ede6024653f4ae83cbbdd9939f75ccad48..01ce18ba387f1f73e894268c08b19cced6b5ea26 100644
--- a/src/main/java/net/minecraft/server/level/ChunkMap.java --- a/src/main/java/net/minecraft/server/level/ChunkMap.java
+++ b/src/main/java/net/minecraft/server/level/ChunkMap.java +++ b/src/main/java/net/minecraft/server/level/ChunkMap.java
@@ -128,6 +128,7 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider @@ -128,6 +128,7 @@ public class ChunkMap extends ChunkStorage implements ChunkHolder.PlayerProvider