diff --git a/patches/server/Asynchronous-chunk-IO-and-loading.patch b/patches/server/Asynchronous-chunk-IO-and-loading.patch index ff282ad244..10bcf4e2a8 100644 --- a/patches/server/Asynchronous-chunk-IO-and-loading.patch +++ b/patches/server/Asynchronous-chunk-IO-and-loading.patch @@ -1022,6 +1022,27 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000 + } + + /** ++ * Polls the highest priority task currently available. {@code null} if none. ++ */ ++ public T poll(final int lowestPriority) { ++ T task; ++ final int max = Math.min(LOWEST_PRIORITY, lowestPriority); ++ for (int i = 0; i <= max; ++i) { ++ final ConcurrentLinkedQueue queue = this.queues[i]; ++ ++ while ((task = queue.poll()) != null) { ++ final int prevPriority = task.tryComplete(i); ++ if (prevPriority != COMPLETING_PRIORITY && prevPriority <= i) { ++ // if the prev priority was greater-than or equal to our current priority ++ return task; ++ } ++ } ++ } ++ ++ return null; ++ } ++ ++ /** + * Returns whether this queue may have tasks queued. + *

+ * This operation is not atomic, but is MT-Safe. @@ -1222,6 +1243,19 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000 + protected volatile ConcurrentLinkedQueue flushQueue = new ConcurrentLinkedQueue<>(); + protected volatile long flushCycles; + ++ protected int lowestPriorityToPoll = PrioritizedTaskQueue.LOWEST_PRIORITY; ++ ++ public int getLowestPriorityToPoll() { ++ return this.lowestPriorityToPoll; ++ } ++ ++ public void setLowestPriorityToPoll(final int lowestPriorityToPoll) { ++ if (this.isAlive()) { ++ throw new IllegalStateException("Cannot set after starting"); ++ } ++ this.lowestPriorityToPoll = lowestPriorityToPoll; ++ } ++ + public QueueExecutorThread(final PrioritizedTaskQueue queue) { + this(queue, (int)(1.e6)); // 1.0ms + } @@ -1300,7 +1334,7 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000 + Runnable task; + boolean ret = false; + -+ while ((task = this.queue.poll()) != null) { ++ while ((task = this.queue.poll(this.lowestPriorityToPoll)) != null) { + ret = true; + try { + task.run(); @@ -1795,9 +1829,7 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000 + private final PrioritizedTaskQueue chunkTasks = new PrioritizedTaskQueue<>(); // used if async chunks are disabled in config + + protected static QueueExecutorThread[] globalWorkers; -+ protected static QueueExecutorThread globalUrgentWorker; + protected static PrioritizedTaskQueue globalQueue; -+ protected static PrioritizedTaskQueue globalUrgentQueue; + + protected static final ConcurrentLinkedQueue CHUNK_WAIT_QUEUE = new ConcurrentLinkedQueue<>(); + @@ -1891,12 +1923,12 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000 + if (threads <= 0 || globalWorkers != null) { + return; + } ++ ++threads; // add one for urgent executor + + globalWorkers = new QueueExecutorThread[threads]; + globalQueue = new PrioritizedTaskQueue<>(); -+ globalUrgentQueue = new PrioritizedTaskQueue<>(); + -+ for (int i = 0; i < threads; ++i) { ++ for (int i = 0; i < (threads - 1); ++i) { + globalWorkers[i] = new QueueExecutorThread<>(globalQueue, (long)0.10e6); //0.1ms + globalWorkers[i].setName("Paper Async Chunk Task Thread #" + i); + globalWorkers[i].setPriority(Thread.NORM_PRIORITY - 1); @@ -1907,14 +1939,14 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000 + globalWorkers[i].start(); + } + -+ globalUrgentWorker = new QueueExecutorThread<>(globalUrgentQueue, (long)0.10e6); //0.1ms -+ globalUrgentWorker.setName("Paper Async Chunk Urgent Task Thread"); -+ globalUrgentWorker.setPriority(Thread.NORM_PRIORITY+1); -+ globalUrgentWorker.setUncaughtExceptionHandler((final Thread thread, final Throwable throwable) -> { ++ globalWorkers[threads - 1] = new QueueExecutorThread<>(globalQueue, (long)0.10e6); //0.1ms ++ globalWorkers[threads - 1].setName("Paper Async Chunk Urgent Task Thread"); ++ globalWorkers[threads - 1].setPriority(Thread.NORM_PRIORITY+1); ++ globalWorkers[threads - 1].setUncaughtExceptionHandler((final Thread thread, final Throwable throwable) -> { + PaperFileIOThread.LOGGER.fatal("Thread '" + thread.getName() + "' threw an uncaught exception!", throwable); + }); -+ -+ globalUrgentWorker.start(); ++ globalWorkers[threads - 1].setLowestPriorityToPoll(PrioritizedTaskQueue.HIGHEST_PRIORITY); ++ globalWorkers[threads - 1].start(); + } + + /** @@ -2165,7 +2197,6 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000 + worker.flush(); + } + } -+ if (globalUrgentWorker != null) globalUrgentWorker.flush(); + + // flush again since tasks we execute async saves + drainChunkWaitQueue(); @@ -2215,15 +2246,10 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000 + if (task.isScheduled() && raised && this.workers != null) { + // only notify if we're in queue to be executed + if (priority == PrioritizedTaskQueue.HIGHEST_PRIORITY) { -+ // was in another queue but became urgent later, add to urgent queue and the previous -+ // queue will just have to ignore this task if it has already been started. -+ // Ultimately, we now have 2 potential queues that can pull it out whoever gets it first -+ // but the urgent queue has dedicated thread(s) so it's likely to win.... -+ globalUrgentQueue.add(task); ++ // notify urgent worker as well + this.internalScheduleNotifyUrgent(); -+ } else { -+ this.internalScheduleNotify(); + } ++ this.internalScheduleNotify(); + } + } + @@ -2235,12 +2261,11 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000 + + // It's important we order the task to be executed before notifying. Avoid a race condition where the worker thread + // wakes up and goes to sleep before we actually schedule (or it's just about to sleep) ++ this.queue.add(task); ++ this.internalScheduleNotify(); + if (task.getPriority() == PrioritizedTaskQueue.HIGHEST_PRIORITY) { -+ globalUrgentQueue.add(task); ++ // notify urgent too + this.internalScheduleNotifyUrgent(); -+ } else { -+ this.queue.add(task); -+ this.internalScheduleNotify(); + } + + } @@ -2249,7 +2274,8 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000 + if (this.workers == null) { + return; + } -+ for (final QueueExecutorThread worker : this.workers) { ++ for (int i = 0, len = this.workers.length - 1; i < len; ++i) { ++ final QueueExecutorThread worker = this.workers[i]; + if (worker.notifyTasks()) { + // break here since we only want to wake up one worker for scheduling one task + break; @@ -2259,10 +2285,10 @@ index 0000000000000000000000000000000000000000..00000000000000000000000000000000 + + + protected void internalScheduleNotifyUrgent() { -+ if (globalUrgentWorker == null) { ++ if (this.workers == null) { + return; + } -+ globalUrgentWorker.notifyTasks(); ++ this.workers[this.workers.length - 1].notifyTasks(); + } + +}