Many major improvements to Async Chunk Loading

Fixes some bugs with urgent priority, improves priority all
around to optimize blocking chunk requests as much as possible.

fixes casing on the -Dpaper.maxchunkthreads to now be -Dpaper.maxChunkThreads

adds -Dpaper.genThreadPriority=3 -Dpaper.loadThreadPriority=4

lowering thread priorities will help ensure main has more
priority over chunk threads
This commit is contained in:
Aikar 2018-11-02 22:48:33 -04:00
parent d2ff9c662c
commit 908300e2c0

View File

@ -43,7 +43,7 @@ reading or writing to the chunk will be safe, so plugins still
should not be touching chunks asynchronously! should not be touching chunks asynchronously!
diff --git a/src/main/java/com/destroystokyo/paper/PaperConfig.java b/src/main/java/com/destroystokyo/paper/PaperConfig.java diff --git a/src/main/java/com/destroystokyo/paper/PaperConfig.java b/src/main/java/com/destroystokyo/paper/PaperConfig.java
index b703e08486..77d35ac99d 100644 index b703e08486..73b0c23944 100644
--- a/src/main/java/com/destroystokyo/paper/PaperConfig.java --- a/src/main/java/com/destroystokyo/paper/PaperConfig.java
+++ b/src/main/java/com/destroystokyo/paper/PaperConfig.java +++ b/src/main/java/com/destroystokyo/paper/PaperConfig.java
@@ -0,0 +0,0 @@ public class PaperConfig { @@ -0,0 +0,0 @@ public class PaperConfig {
@ -70,7 +70,7 @@ index b703e08486..77d35ac99d 100644
+ asyncChunkGenThreadPerWorld = getBoolean("settings.async-chunks.thread-per-world-generation", true); + asyncChunkGenThreadPerWorld = getBoolean("settings.async-chunks.thread-per-world-generation", true);
+ asyncChunkLoadThreads = getInt("settings.async-chunks.load-threads", -1); + asyncChunkLoadThreads = getInt("settings.async-chunks.load-threads", -1);
+ if (asyncChunkLoadThreads <= 0) { + if (asyncChunkLoadThreads <= 0) {
+ asyncChunkLoadThreads = (int) Math.min(Integer.getInteger("paper.maxchunkthreads", 8), Runtime.getRuntime().availableProcessors() * 1.5); + asyncChunkLoadThreads = (int) Math.min(Integer.getInteger("paper.maxChunkThreads", 8), Runtime.getRuntime().availableProcessors() * 1.5);
+ } + }
+ +
+ // Let Shared Host set some limits + // Let Shared Host set some limits
@ -106,15 +106,12 @@ index b703e08486..77d35ac99d 100644
} }
diff --git a/src/main/java/com/destroystokyo/paper/util/PriorityQueuedExecutor.java b/src/main/java/com/destroystokyo/paper/util/PriorityQueuedExecutor.java diff --git a/src/main/java/com/destroystokyo/paper/util/PriorityQueuedExecutor.java b/src/main/java/com/destroystokyo/paper/util/PriorityQueuedExecutor.java
new file mode 100644 new file mode 100644
index 0000000000..e589aa356c index 0000000000..8f18c28695
--- /dev/null --- /dev/null
+++ b/src/main/java/com/destroystokyo/paper/util/PriorityQueuedExecutor.java +++ b/src/main/java/com/destroystokyo/paper/util/PriorityQueuedExecutor.java
@@ -0,0 +0,0 @@ @@ -0,0 +0,0 @@
+package com.destroystokyo.paper.util; +package com.destroystokyo.paper.util;
+ +
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import net.minecraft.server.NamedIncrementingThreadFactory;
+
+import javax.annotation.Nonnull; +import javax.annotation.Nonnull;
+import java.util.ArrayList; +import java.util.ArrayList;
+import java.util.List; +import java.util.List;
@ -122,7 +119,6 @@ index 0000000000..e589aa356c
+import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicInteger;
@ -132,33 +128,47 @@ index 0000000000..e589aa356c
+ * Implements an Executor Service that allows specifying Task Priority + * Implements an Executor Service that allows specifying Task Priority
+ * and bumping of task priority. + * and bumping of task priority.
+ * + *
+ * @author aikar + * This is a non blocking executor with 3 priority levels.
+ *
+ * URGENT: Rarely used, something that is critical to take action now.
+ * HIGH: Something with more importance than the base tasks
+ *
+ * @author Daniel Ennis &lt;aikar@aikar.co&gt;
+ */ + */
+@SuppressWarnings({"WeakerAccess", "UnusedReturnValue", "unused"}) +@SuppressWarnings({"WeakerAccess", "UnusedReturnValue", "unused"})
+public class PriorityQueuedExecutor extends AbstractExecutorService { +public class PriorityQueuedExecutor extends AbstractExecutorService {
+
+ private final ConcurrentLinkedQueue<Runnable> urgent = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue<Runnable> urgent = new ConcurrentLinkedQueue<>();
+ private final ConcurrentLinkedQueue<Runnable> high = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue<Runnable> high = new ConcurrentLinkedQueue<>();
+ private final ConcurrentLinkedQueue<Runnable> normal = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue<Runnable> normal = new ConcurrentLinkedQueue<>();
+ private final List<Thread> threads = new ArrayList<>();
+ private final RejectionHandler handler; + private final RejectionHandler handler;
+
+ private volatile boolean shuttingDown = false; + private volatile boolean shuttingDown = false;
+ private volatile boolean shuttingDownNow = false; + private volatile boolean shuttingDownNow = false;
+ private final List<Thread> threads = new ArrayList<>();
+ +
+ public PriorityQueuedExecutor(String name) { + public PriorityQueuedExecutor(String name) {
+ this(name, Runtime.getRuntime().availableProcessors(), null); + this(name, Math.max(1, Runtime.getRuntime().availableProcessors() - 1));
+ } + }
+ +
+ public PriorityQueuedExecutor(String name, int threads) { + public PriorityQueuedExecutor(String name, int threads) {
+ this(name, threads, null); + this(name, threads, Thread.NORM_PRIORITY, null);
+ }
+
+ public PriorityQueuedExecutor(String name, int threads, int threadPriority) {
+ this(name, threads, threadPriority, null);
+ } + }
+ +
+ public PriorityQueuedExecutor(String name, int threads, RejectionHandler handler) { + public PriorityQueuedExecutor(String name, int threads, RejectionHandler handler) {
+ ThreadFactory factory = new ThreadFactoryBuilder() + this(name, threads, Thread.NORM_PRIORITY, handler);
+ .setThreadFactory(new NamedIncrementingThreadFactory(name)) + }
+ .setDaemon(true) +
+ .build(); + public PriorityQueuedExecutor(String name, int threads, int threadPriority, RejectionHandler handler) {
+ for (int i = 0; i < threads; i++) { + for (int i = 0; i < threads; i++) {
+ final Thread thread = factory.newThread(this::processQueues); + ExecutorThread thread = new ExecutorThread(this::processQueues);
+ thread.setDaemon(true);
+ thread.setName(threads == 1 ? name : name + "-" + (i + 1));
+ thread.setPriority(threadPriority);
+ thread.start(); + thread.start();
+ this.threads.add(thread); + this.threads.add(thread);
+ } + }
@ -168,6 +178,17 @@ index 0000000000..e589aa356c
+ this.handler = handler; + this.handler = handler;
+ } + }
+ +
+ /**
+ * If the Current thread belongs to a PriorityQueuedExecutor, return that Executro
+ * @return The executor that controls this thread
+ */
+ public static PriorityQueuedExecutor getExecutor() {
+ if (!(Thread.currentThread() instanceof ExecutorThread)) {
+ return null;
+ }
+ return ((ExecutorThread) Thread.currentThread()).getExecutor();
+ }
+
+ public void shutdown() { + public void shutdown() {
+ shuttingDown = true; + shuttingDown = true;
+ synchronized (this) { + synchronized (this) {
@ -235,28 +256,20 @@ index 0000000000..e589aa356c
+ } + }
+ +
+ public PendingTask<Void> submitTask(Runnable run) { + public PendingTask<Void> submitTask(Runnable run) {
+ return submitTask(createPendingTask(run)); + return createPendingTask(run).submit();
+ } + }
+ +
+ public PendingTask<Void> submitTask(Runnable run, Priority priority) { + public PendingTask<Void> submitTask(Runnable run, Priority priority) {
+ return submitTask(createPendingTask(run, priority)); + return createPendingTask(run, priority).submit();
+ } + }
+ +
+ public <T> PendingTask<T> submitTask(Supplier<T> run) { + public <T> PendingTask<T> submitTask(Supplier<T> run) {
+ return submitTask(createPendingTask(run)); + return createPendingTask(run).submit();
+ } + }
+ +
+ public <T> PendingTask<T> submitTask(Supplier<T> run, Priority priority) { + public <T> PendingTask<T> submitTask(Supplier<T> run, Priority priority) {
+ return submitTask(createPendingTask(run, priority)); + PendingTask<T> task = createPendingTask(run, priority);
+ } + return task.submit();
+
+ public <T> PendingTask<T> submitTask(PendingTask<T> task) {
+ if (shuttingDown) {
+ handler.onRejection(task, this);
+ return task;
+ }
+ task.submit(this);
+ return task;
+ } + }
+ +
+ @Override + @Override
@ -264,7 +277,19 @@ index 0000000000..e589aa356c
+ submitTask(command); + submitTask(command);
+ } + }
+ +
+ private Runnable getTask() { + public boolean isCurrentThread() {
+ final Thread thread = Thread.currentThread();
+ if (!(thread instanceof ExecutorThread)) {
+ return false;
+ }
+ return ((ExecutorThread) thread).getExecutor() == this;
+ }
+
+ public Runnable getUrgentTask() {
+ return urgent.poll();
+ }
+
+ public Runnable getTask() {
+ Runnable run = urgent.poll(); + Runnable run = urgent.poll();
+ if (run != null) { + if (run != null) {
+ return run; + return run;
@ -304,10 +329,30 @@ index 0000000000..e589aa356c
+ } + }
+ } + }
+ +
+ public boolean processUrgentTasks() {
+ Runnable run;
+ boolean hadTask = false;
+ while ((run = getUrgentTask()) != null) {
+ run.run();
+ hadTask = true;
+ }
+ return hadTask;
+ }
+
+ public enum Priority { + public enum Priority {
+ NORMAL, HIGH, URGENT + NORMAL, HIGH, URGENT
+ } + }
+ +
+ public class ExecutorThread extends Thread {
+ public ExecutorThread(Runnable runnable) {
+ super(runnable);
+ }
+
+ public PriorityQueuedExecutor getExecutor() {
+ return PriorityQueuedExecutor.this;
+ }
+ }
+
+ public class PendingTask <T> implements Runnable { + public class PendingTask <T> implements Runnable {
+ +
+ private final AtomicBoolean hasRan = new AtomicBoolean(); + private final AtomicBoolean hasRan = new AtomicBoolean();
@ -350,31 +395,35 @@ index 0000000000..e589aa356c
+ public void bumpPriority(Priority newPriority) { + public void bumpPriority(Priority newPriority) {
+ for (;;) { + for (;;) {
+ int current = this.priority.get(); + int current = this.priority.get();
+ if (current >= newPriority.ordinal()) { + int ordinal = newPriority.ordinal();
+ return; + if (current >= ordinal || priority.compareAndSet(current, ordinal)) {
+ }
+ if (priority.compareAndSet(current, newPriority.ordinal())) {
+ break; + break;
+ } + }
+ } + }
+ +
+ if (this.executor == null) { +
+ if (this.submitted.get() == -1 || this.hasRan.get()) {
+ return; + return;
+ } + }
+ // If we have already been submitted, resubmit with new priority +
+ submit(this.executor); + // Only resubmit if it hasnt ran yet and has been submitted
+ submit();
+ } + }
+ +
+ public CompletableFuture<T> onDone() { + public CompletableFuture<T> onDone() {
+ return future; + return future;
+ } + }
+ +
+ public void submit(PriorityQueuedExecutor executor) { + public PendingTask<T> submit() {
+ if (shuttingDown) {
+ handler.onRejection(this, PriorityQueuedExecutor.this);
+ return this;
+ }
+ for (;;) { + for (;;) {
+ final int submitted = this.submitted.get(); + final int submitted = this.submitted.get();
+ final int priority = this.priority.get(); + final int priority = this.priority.get();
+ if (submitted == priority) { + if (submitted == priority) {
+ return; + return this;
+ } + }
+ if (this.submitted.compareAndSet(submitted, priority)) { + if (this.submitted.compareAndSet(submitted, priority)) {
+ if (priority == Priority.URGENT.ordinal()) { + if (priority == Priority.URGENT.ordinal()) {
@ -389,11 +438,11 @@ index 0000000000..e589aa356c
+ } + }
+ } + }
+ +
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (PriorityQueuedExecutor.this) {
+ synchronized (executor) {
+ // Wake up a thread to take this work + // Wake up a thread to take this work
+ executor.notify(); + PriorityQueuedExecutor.this.notify();
+ } + }
+ return this;
+ } + }
+ } + }
+ public interface RejectionHandler { + public interface RejectionHandler {
@ -933,7 +982,7 @@ index 49fba0979e..9ad646f8d4 100644
fx = fx % 360.0F; fx = fx % 360.0F;
if (fx >= 180.0F) { if (fx >= 180.0F) {
diff --git a/src/main/java/net/minecraft/server/MinecraftServer.java b/src/main/java/net/minecraft/server/MinecraftServer.java diff --git a/src/main/java/net/minecraft/server/MinecraftServer.java b/src/main/java/net/minecraft/server/MinecraftServer.java
index 763130b036..67722440fd 100644 index 763130b036..69b3218756 100644
--- a/src/main/java/net/minecraft/server/MinecraftServer.java --- a/src/main/java/net/minecraft/server/MinecraftServer.java
+++ b/src/main/java/net/minecraft/server/MinecraftServer.java +++ b/src/main/java/net/minecraft/server/MinecraftServer.java
@@ -0,0 +0,0 @@ public abstract class MinecraftServer implements IAsyncTaskHandler, IMojangStati @@ -0,0 +0,0 @@ public abstract class MinecraftServer implements IAsyncTaskHandler, IMojangStati
@ -978,7 +1027,7 @@ index 763130b036..67722440fd 100644
+ while (waitForChunks && !completablefuture.isDone() && isRunning()) { // Paper + while (waitForChunks && !completablefuture.isDone() && isRunning()) { // Paper
try { try {
- completablefuture.get(1L, TimeUnit.SECONDS); - completablefuture.get(1L, TimeUnit.SECONDS);
+ PaperAsyncChunkProvider.processChunkLoads(worldserver); // Paper + PaperAsyncChunkProvider.processMainThreadQueue(this); // Paper
+ completablefuture.get(50L, TimeUnit.MILLISECONDS); // Paper + completablefuture.get(50L, TimeUnit.MILLISECONDS); // Paper
} catch (InterruptedException interruptedexception) { } catch (InterruptedException interruptedexception) {
throw new RuntimeException(interruptedexception); throw new RuntimeException(interruptedexception);
@ -1009,7 +1058,7 @@ index 763130b036..67722440fd 100644
while ((futuretask = (FutureTask) this.f.poll()) != null) { while ((futuretask = (FutureTask) this.f.poll()) != null) {
SystemUtils.a(futuretask, MinecraftServer.LOGGER); SystemUtils.a(futuretask, MinecraftServer.LOGGER);
} }
+ PaperAsyncChunkProvider.processChunkLoads(this); // Paper + PaperAsyncChunkProvider.processMainThreadQueue(this); // Paper
MinecraftTimings.minecraftSchedulerTimer.stopTiming(); // Paper MinecraftTimings.minecraftSchedulerTimer.stopTiming(); // Paper
this.methodProfiler.c("commandFunctions"); this.methodProfiler.c("commandFunctions");
@ -1017,21 +1066,13 @@ index 763130b036..67722440fd 100644
// CraftBukkit - dropTickTime // CraftBukkit - dropTickTime
for (Iterator iterator = this.getWorlds().iterator(); iterator.hasNext();) { for (Iterator iterator = this.getWorlds().iterator(); iterator.hasNext();) {
WorldServer worldserver = (WorldServer) iterator.next(); WorldServer worldserver = (WorldServer) iterator.next();
+ PaperAsyncChunkProvider.processChunkLoads(worldserver); // Paper + PaperAsyncChunkProvider.processMainThreadQueue(worldserver); // Paper
TileEntityHopper.skipHopperEvents = worldserver.paperConfig.disableHopperMoveEvents || org.bukkit.event.inventory.InventoryMoveItemEvent.getHandlerList().getRegisteredListeners().length == 0; // Paper TileEntityHopper.skipHopperEvents = worldserver.paperConfig.disableHopperMoveEvents || org.bukkit.event.inventory.InventoryMoveItemEvent.getHandlerList().getRegisteredListeners().length == 0; // Paper
i = SystemUtils.c(); i = SystemUtils.c();
if (true || worldserver.worldProvider.getDimensionManager() == DimensionManager.OVERWORLD || this.getAllowNether()) { // CraftBukkit if (true || worldserver.worldProvider.getDimensionManager() == DimensionManager.OVERWORLD || this.getAllowNether()) { // CraftBukkit
@@ -0,0 +0,0 @@ public abstract class MinecraftServer implements IAsyncTaskHandler, IMojangStati
this.methodProfiler.e();
this.methodProfiler.e();
worldserver.explosionDensityCache.clear(); // Paper - Optimize explosions
+ PaperAsyncChunkProvider.processChunkLoads(worldserver); // Paper
}
}
diff --git a/src/main/java/net/minecraft/server/PaperAsyncChunkProvider.java b/src/main/java/net/minecraft/server/PaperAsyncChunkProvider.java diff --git a/src/main/java/net/minecraft/server/PaperAsyncChunkProvider.java b/src/main/java/net/minecraft/server/PaperAsyncChunkProvider.java
new file mode 100644 new file mode 100644
index 0000000000..c334462f20 index 0000000000..e9a38f9d90
--- /dev/null --- /dev/null
+++ b/src/main/java/net/minecraft/server/PaperAsyncChunkProvider.java +++ b/src/main/java/net/minecraft/server/PaperAsyncChunkProvider.java
@@ -0,0 +0,0 @@ @@ -0,0 +0,0 @@
@ -1076,7 +1117,7 @@ index 0000000000..c334462f20
+import java.util.Iterator; +import java.util.Iterator;
+import java.util.List; +import java.util.List;
+import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer; +import java.util.function.Consumer;
@ -1084,11 +1125,11 @@ index 0000000000..c334462f20
+@SuppressWarnings("unused") +@SuppressWarnings("unused")
+public class PaperAsyncChunkProvider extends ChunkProviderServer { +public class PaperAsyncChunkProvider extends ChunkProviderServer {
+ +
+ private static final PriorityQueuedExecutor EXECUTOR = new PriorityQueuedExecutor("PaperChunkLoader", PaperConfig.asyncChunks ? PaperConfig.asyncChunkLoadThreads : 0); + private static final int GEN_THREAD_PRIORITY = Integer.getInteger("paper.genThreadPriority", 3);
+ private static final PriorityQueuedExecutor SINGLE_GEN_EXECUTOR = new PriorityQueuedExecutor("PaperChunkGenerator", PaperConfig.asyncChunks && PaperConfig.asyncChunkGeneration && !PaperConfig.asyncChunkGenThreadPerWorld ? 1 : 0); + private static final int LOAD_THREAD_PRIORITY = Integer.getInteger("paper.loadThreadPriority", 4);
+ private static final ConcurrentLinkedQueue<Runnable> MAIN_THREAD_QUEUE = new ConcurrentLinkedQueue<>(); + private static final PriorityQueuedExecutor EXECUTOR = new PriorityQueuedExecutor("PaperChunkLoader", PaperConfig.asyncChunks ? PaperConfig.asyncChunkLoadThreads : 0, LOAD_THREAD_PRIORITY);
+ private static final ThreadLocal<Boolean> IS_CHUNK_THREAD = ThreadLocal.withInitial(() -> false); + private static final PriorityQueuedExecutor SINGLE_GEN_EXECUTOR = new PriorityQueuedExecutor("PaperChunkGenerator", PaperConfig.asyncChunks && PaperConfig.asyncChunkGeneration && !PaperConfig.asyncChunkGenThreadPerWorld ? 1 : 0, GEN_THREAD_PRIORITY);
+ private static final ThreadLocal<Boolean> IS_CHUNK_GEN_THREAD = ThreadLocal.withInitial(() -> false); + private static final ConcurrentLinkedDeque<Runnable> MAIN_THREAD_QUEUE = new ConcurrentLinkedDeque<>();
+ +
+ private final PriorityQueuedExecutor generationExecutor; + private final PriorityQueuedExecutor generationExecutor;
+ //private static final PriorityQueuedExecutor generationExecutor = new PriorityQueuedExecutor("PaperChunkGen", 1); + //private static final PriorityQueuedExecutor generationExecutor = new PriorityQueuedExecutor("PaperChunkGen", 1);
@ -1109,26 +1150,7 @@ index 0000000000..c334462f20
+ this.chunkLoader = chunkLoader; + this.chunkLoader = chunkLoader;
+ String worldName = this.world.getWorld().getName(); + String worldName = this.world.getWorld().getName();
+ this.shouldGenSync = generator instanceof CustomChunkGenerator && !(((CustomChunkGenerator) generator).asyncSupported) || !PaperConfig.asyncChunkGeneration; + this.shouldGenSync = generator instanceof CustomChunkGenerator && !(((CustomChunkGenerator) generator).asyncSupported) || !PaperConfig.asyncChunkGeneration;
+ this.generationExecutor = PaperConfig.asyncChunkGenThreadPerWorld ? new PriorityQueuedExecutor("PaperChunkGen-" + worldName, shouldGenSync ? 0 : 1) : SINGLE_GEN_EXECUTOR; + this.generationExecutor = PaperConfig.asyncChunkGenThreadPerWorld ? new PriorityQueuedExecutor("PaperChunkGen-" + worldName, shouldGenSync ? 0 : 1, GEN_THREAD_PRIORITY) : SINGLE_GEN_EXECUTOR;
+ }
+
+ static void processChunkLoads(MinecraftServer server) {
+ for (WorldServer world : server.getWorlds()) {
+ processChunkLoads(world);
+ }
+ }
+
+ static void processChunkLoads(World world) {
+ IChunkProvider chunkProvider = world.getChunkProvider();
+ if (chunkProvider instanceof PaperAsyncChunkProvider) {
+ ((PaperAsyncChunkProvider) chunkProvider).processChunkLoads();
+ }
+ }
+
+ static void stop(MinecraftServer server) {
+ for (WorldServer world : server.getWorlds()) {
+ world.getPlayerChunkMap().shutdown();
+ }
+ } + }
+ +
+ private static Priority calculatePriority(boolean isBlockingMain, boolean priority) { + private static Priority calculatePriority(boolean isBlockingMain, boolean priority) {
@ -1143,19 +1165,44 @@ index 0000000000..c334462f20
+ return Priority.NORMAL; + return Priority.NORMAL;
+ } + }
+ +
+ private boolean processChunkLoads() { + static void stop(MinecraftServer server) {
+ for (WorldServer world : server.getWorlds()) {
+ world.getPlayerChunkMap().shutdown();
+ }
+ }
+
+ static void processMainThreadQueue(MinecraftServer server) {
+ for (WorldServer world : server.getWorlds()) {
+ processMainThreadQueue(world);
+ }
+ }
+
+ static void processMainThreadQueue(World world) {
+ IChunkProvider chunkProvider = world.getChunkProvider();
+ if (chunkProvider instanceof PaperAsyncChunkProvider) {
+ ((PaperAsyncChunkProvider) chunkProvider).processMainThreadQueue();
+ }
+ }
+
+ private void processMainThreadQueue() {
+ processMainThreadQueue((PendingChunk) null);
+ }
+ private boolean processMainThreadQueue(PendingChunk pending) {
+ Runnable run; + Runnable run;
+ boolean hadLoad = false; + boolean hadLoad = false;
+ while ((run = MAIN_THREAD_QUEUE.poll()) != null) { + while ((run = MAIN_THREAD_QUEUE.poll()) != null) {
+ run.run(); + run.run();
+ hadLoad = true; + hadLoad = true;
+ if (pending != null && pending.hasPosted) {
+ break;
+ }
+ } + }
+ return hadLoad; + return hadLoad;
+ } + }
+ +
+ @Override + @Override
+ public void bumpPriority(ChunkCoordIntPair coords) { + public void bumpPriority(ChunkCoordIntPair coords) {
+ PendingChunk pending = pendingChunks.get(coords.asLong()); + final PendingChunk pending = pendingChunks.get(coords.asLong());
+ if (pending != null) { + if (pending != null) {
+ pending.bumpPriority(Priority.HIGH); + pending.bumpPriority(Priority.HIGH);
+ } + }
@ -1170,8 +1217,8 @@ index 0000000000..c334462f20
+ @Nullable + @Nullable
+ @Override + @Override
+ public Chunk getChunkAt(int x, int z, boolean load, boolean gen, boolean priority, Consumer<Chunk> consumer) { + public Chunk getChunkAt(int x, int z, boolean load, boolean gen, boolean priority, Consumer<Chunk> consumer) {
+ long key = ChunkCoordIntPair.asLong(x, z); + final long key = ChunkCoordIntPair.asLong(x, z);
+ Chunk chunk = this.chunks.get(key); + final Chunk chunk = this.chunks.get(key);
+ if (chunk != null || !load) { // return null if we aren't loading + if (chunk != null || !load) { // return null if we aren't loading
+ if (consumer != null) { + if (consumer != null) {
+ consumer.accept(chunk); + consumer.accept(chunk);
@ -1185,50 +1232,55 @@ index 0000000000..c334462f20
+ return requestChunk(x, z, gen, priority, consumer).getChunk(); + return requestChunk(x, z, gen, priority, consumer).getChunk();
+ } + }
+ +
+ PendingChunkRequest requestChunk(int x, int z, boolean gen, boolean priority, Consumer<Chunk> consumer) { + final PendingChunkRequest requestChunk(int x, int z, boolean gen, boolean priority, Consumer<Chunk> consumer) {
+ try (co.aikar.timings.Timing timing = world.timings.syncChunkLoadTimer.startTiming()) {
+ final long key = ChunkCoordIntPair.asLong(x, z); + final long key = ChunkCoordIntPair.asLong(x, z);
+ final boolean isChunkThread = isChunkThread();
+ final boolean isBlockingMain = consumer == null && server.isMainThread();
+ final boolean loadOnThisThread = isChunkThread || isBlockingMain;
+ final Priority taskPriority = calculatePriority(isBlockingMain, priority);
+ +
+ // Obtain a PendingChunk + // Obtain a PendingChunk
+ final PendingChunk pending; + final PendingChunk pending;
+ final boolean isBlockingMain = consumer == null && server.isMainThread();
+ synchronized (pendingChunks) { + synchronized (pendingChunks) {
+ PendingChunk pendingChunk = pendingChunks.get(key); + PendingChunk pendingChunk = pendingChunks.get(key);
+ if (pendingChunk == null) { + if (pendingChunk == null) {
+ pending = new PendingChunk(x, z, key, gen, calculatePriority(isBlockingMain, priority)); + pending = new PendingChunk(x, z, key, gen, taskPriority);
+ pendingChunks.put(key, pending); + pendingChunks.put(key, pending);
+ } else if (pendingChunk.hasFinished && gen && !pendingChunk.canGenerate && pendingChunk.chunk == null) { + } else if (pendingChunk.hasFinished && gen && !pendingChunk.canGenerate && pendingChunk.chunk == null) {
+ // need to overwrite the old + // need to overwrite the old
+ pending = new PendingChunk(x, z, key, true, calculatePriority(isBlockingMain, priority)); + pending = new PendingChunk(x, z, key, true, taskPriority);
+ pendingChunks.put(key, pending); + pendingChunks.put(key, pending);
+ } else { + } else {
+ pending = pendingChunk; + pending = pendingChunk;
+ if (pending.taskPriority != taskPriority) {
+ pending.bumpPriority(taskPriority);
+ }
+ }
+ }
+ +
+ Priority newPriority = calculatePriority(isBlockingMain, priority);
+ if (pending.taskPriority != newPriority) {
+ pending.bumpPriority(newPriority);
+ }
+ }
+ }
+ // Listen for when result is ready + // Listen for when result is ready
+ final CompletableFuture<Chunk> future = new CompletableFuture<>(); + final CompletableFuture<Chunk> future = new CompletableFuture<>();
+ PendingChunkRequest request = pending.addListener(future, gen); + final PendingChunkRequest request = pending.addListener(future, gen, !loadOnThisThread);
+ if (IS_CHUNK_THREAD.get()) { +
+ // Chunk Generation can trigger Chunk Loading, those loads may need to convert, and could be slow
+ // Give an opportunity for urgent tasks to jump in at these times
+ if (isChunkThread) {
+ processUrgentTasks();
+ }
+
+ if (loadOnThisThread) {
+ // do loads on main if blocking, or on current if we are a load/gen thread
+ // gen threads do trigger chunk loads
+ pending.loadTask.run(); + pending.loadTask.run();
+ } + }
+ +
+ if (isBlockingMain && pending.hasFinished) {
+ processChunkLoads();
+ request.initialReturnChunk = pending.postChunk();
+ return request;
+ }
+
+ if (isBlockingMain) { + if (isBlockingMain) {
+ try (co.aikar.timings.Timing timing = world.timings.syncChunkLoadTimer.startTiming()) {
+ while (!future.isDone()) { + while (!future.isDone()) {
+ // We aren't done, obtain lock on queue + // We aren't done, obtain lock on queue
+ synchronized (MAIN_THREAD_QUEUE) { + synchronized (MAIN_THREAD_QUEUE) {
+ // We may of received our request now, check it + // We may of received our request now, check it
+ if (processChunkLoads()) { + if (processMainThreadQueue(pending)) {
+ // If we processed SOMETHING, don't wait + // If we processed SOMETHING, don't wait
+ continue; + continue;
+ } + }
@ -1239,11 +1291,10 @@ index 0000000000..c334462f20
+ } + }
+ } + }
+ // Queue has been notified or timed out, process it + // Queue has been notified or timed out, process it
+ processChunkLoads(); + processMainThreadQueue(pending);
+ } + }
+ // We should be done AND posted into chunk map now, return it + // We should be done AND posted into chunk map now, return it
+ request.initialReturnChunk = future.join(); + request.initialReturnChunk = pending.postChunk();
+ }
+ } else if (consumer == null) { + } else if (consumer == null) {
+ // This is on another thread + // This is on another thread
+ request.initialReturnChunk = future.join(); + request.initialReturnChunk = future.join();
@ -1253,15 +1304,23 @@ index 0000000000..c334462f20
+ +
+ return request; + return request;
+ } + }
+ }
+
+ private void processUrgentTasks() {
+ final PriorityQueuedExecutor executor = PriorityQueuedExecutor.getExecutor();
+ if (executor != null) {
+ executor.processUrgentTasks();
+ }
+ }
+ +
+ @Override + @Override
+ public CompletableFuture<Void> loadAllChunks(Iterable<ChunkCoordIntPair> iterable, Consumer<Chunk> consumer) { + public CompletableFuture<Void> loadAllChunks(Iterable<ChunkCoordIntPair> iterable, Consumer<Chunk> consumer) {
+ Iterator<ChunkCoordIntPair> iterator = iterable.iterator(); + final Iterator<ChunkCoordIntPair> iterator = iterable.iterator();
+ +
+ List<CompletableFuture<Chunk>> all = new ArrayList<>(); + final List<CompletableFuture<Chunk>> all = new ArrayList<>();
+ while (iterator.hasNext()) { + while (iterator.hasNext()) {
+ ChunkCoordIntPair chunkcoordintpair = iterator.next(); + final ChunkCoordIntPair chunkcoordintpair = iterator.next();
+ CompletableFuture<Chunk> future = new CompletableFuture<>(); + final CompletableFuture<Chunk> future = new CompletableFuture<>();
+ all.add(future); + all.add(future);
+ this.getChunkAt(chunkcoordintpair.x, chunkcoordintpair.z, true, true, chunk -> { + this.getChunkAt(chunkcoordintpair.x, chunkcoordintpair.z, true, true, chunk -> {
+ future.complete(chunk); + future.complete(chunk);
@ -1275,8 +1334,7 @@ index 0000000000..c334462f20
+ +
+ boolean chunkGoingToExists(int x, int z) { + boolean chunkGoingToExists(int x, int z) {
+ synchronized (pendingChunks) { + synchronized (pendingChunks) {
+ long key = ChunkCoordIntPair.asLong(x, z); + PendingChunk pendingChunk = pendingChunks.get(ChunkCoordIntPair.asLong(x, z));
+ PendingChunk pendingChunk = pendingChunks.get(key);
+ return pendingChunk != null && pendingChunk.canGenerate; + return pendingChunk != null && pendingChunk.canGenerate;
+ } + }
+ } + }
@ -1350,10 +1408,22 @@ index 0000000000..c334462f20
+ } + }
+ } + }
+ +
+ private boolean isLoadThread() {
+ return EXECUTOR.isCurrentThread();
+ }
+
+ private boolean isGenThread() {
+ return generationExecutor.isCurrentThread();
+ }
+ private boolean isChunkThread() {
+ return isLoadThread() || isGenThread();
+ }
+
+ private class PendingChunk implements Runnable { + private class PendingChunk implements Runnable {
+ private final int x; + private final int x;
+ private final int z; + private final int z;
+ private final long key; + private final long key;
+ private final long started = System.currentTimeMillis();
+ private final CompletableFuture<Chunk> loadOnly = new CompletableFuture<>(); + private final CompletableFuture<Chunk> loadOnly = new CompletableFuture<>();
+ private final CompletableFuture<Chunk> generate = new CompletableFuture<>(); + private final CompletableFuture<Chunk> generate = new CompletableFuture<>();
+ private final AtomicInteger requests = new AtomicInteger(0); + private final AtomicInteger requests = new AtomicInteger(0);
@ -1402,11 +1472,6 @@ index 0000000000..c334462f20
+ } + }
+ } + }
+ +
+ private Chunk generateChunkExecutor() {
+ IS_CHUNK_THREAD.set(true);
+ IS_CHUNK_GEN_THREAD.set(true);
+ return generateChunk();
+ }
+ private Chunk generateChunk() { + private Chunk generateChunk() {
+ synchronized (this) { + synchronized (this) {
+ if (requests.get() <= 0) { + if (requests.get() <= 0) {
@ -1494,10 +1559,19 @@ index 0000000000..c334462f20
+ this.hasFinished = true; + this.hasFinished = true;
+ } + }
+ +
+ if (server.isMainThread()) {
+ postChunk();
+ return;
+ }
+
+ // Don't post here, even if on main, it must enter the queue so we can exit any open batch + // Don't post here, even if on main, it must enter the queue so we can exit any open batch
+ // schedulers, as post stage may trigger a new generation and cause errors + // schedulers, as post stage may trigger a new generation and cause errors
+ synchronized (MAIN_THREAD_QUEUE) { + synchronized (MAIN_THREAD_QUEUE) {
+ MAIN_THREAD_QUEUE.add(this::postChunk); + if (this.taskPriority == Priority.URGENT) {
+ MAIN_THREAD_QUEUE.addFirst(this::postChunk);
+ } else {
+ MAIN_THREAD_QUEUE.addLast(this::postChunk);
+ }
+ MAIN_THREAD_QUEUE.notify(); + MAIN_THREAD_QUEUE.notify();
+ } + }
+ } + }
@ -1544,7 +1618,7 @@ index 0000000000..c334462f20
+ } + }
+ } + }
+ +
+ synchronized PendingChunkRequest addListener(CompletableFuture<Chunk> future, boolean gen) { + synchronized PendingChunkRequest addListener(CompletableFuture<Chunk> future, boolean gen, boolean autoSubmit) {
+ if (hasFinished) { + if (hasFinished) {
+ future.complete(chunk); + future.complete(chunk);
+ return new PendingChunkRequest(this); + return new PendingChunkRequest(this);
@ -1564,24 +1638,18 @@ index 0000000000..c334462f20
+ if (loadTask == null) { + if (loadTask == null) {
+ // Take care of a race condition in that a request could be cancelled after the synchronize + // Take care of a race condition in that a request could be cancelled after the synchronize
+ // on pendingChunks, but before a listener is added, which would erase these pending tasks. + // on pendingChunks, but before a listener is added, which would erase these pending tasks.
+ if (shouldGenSync) {
+ genTask = generationExecutor.createPendingTask(this::generateChunk, taskPriority); + genTask = generationExecutor.createPendingTask(this::generateChunk, taskPriority);
+ } else {
+ genTask = generationExecutor.createPendingTask(this::generateChunkExecutor, taskPriority);
+ }
+ loadTask = EXECUTOR.createPendingTask(this, taskPriority); + loadTask = EXECUTOR.createPendingTask(this, taskPriority);
+ if (!IS_CHUNK_THREAD.get()) { + if (autoSubmit) {
+ // We will execute it outside of the synchronized context immediately after + // We will execute it outside of the synchronized context immediately after
+ EXECUTOR.submitTask(loadTask); + loadTask.submit();
+ } + }
+ } + }
+ return new PendingChunkRequest(this, gen); + return new PendingChunkRequest(this, gen);
+ } + }
+ +
+
+ @Override + @Override
+ public void run() { + public void run() {
+ IS_CHUNK_THREAD.set(true);
+ try { + try {
+ if (!loadFinished(loadChunk(x, z))) { + if (!loadFinished(loadChunk(x, z))) {
+ return; + return;
@ -1597,18 +1665,23 @@ index 0000000000..c334462f20
+ if (shouldGenSync) { + if (shouldGenSync) {
+ synchronized (this) { + synchronized (this) {
+ setStatus(PendingStatus.GENERATION_PENDING); + setStatus(PendingStatus.GENERATION_PENDING);
+ MAIN_THREAD_QUEUE.add(() -> generateFinished(this.generateChunk())); + if (this.taskPriority == Priority.URGENT) {
+ MAIN_THREAD_QUEUE.addFirst(() -> generateFinished(this.generateChunk()));
+ } else {
+ MAIN_THREAD_QUEUE.addLast(() -> generateFinished(this.generateChunk()));
+ }
+
+ } + }
+ synchronized (MAIN_THREAD_QUEUE) { + synchronized (MAIN_THREAD_QUEUE) {
+ MAIN_THREAD_QUEUE.notify(); + MAIN_THREAD_QUEUE.notify();
+ } + }
+ } else { + } else {
+ if (IS_CHUNK_GEN_THREAD.get()) { + if (isGenThread()) {
+ // ideally we should never run into 1 chunk generating another chunk... + // ideally we should never run into 1 chunk generating another chunk...
+ // but if we do, let's apply same solution + // but if we do, let's apply same solution
+ genTask.run(); + genTask.run();
+ } else { + } else {
+ generationExecutor.submitTask(genTask); + genTask.submit();
+ } + }
+ } + }
+ } + }
@ -1618,6 +1691,10 @@ index 0000000000..c334462f20
+ } + }
+ +
+ void bumpPriority(Priority newPriority) { + void bumpPriority(Priority newPriority) {
+ if (taskPriority.ordinal() >= newPriority.ordinal()) {
+ return;
+ }
+
+ this.taskPriority = newPriority; + this.taskPriority = newPriority;
+ PriorityQueuedExecutor.PendingTask<Void> loadTask = this.loadTask; + PriorityQueuedExecutor.PendingTask<Void> loadTask = this.loadTask;
+ PriorityQueuedExecutor.PendingTask<Chunk> genTask = this.genTask; + PriorityQueuedExecutor.PendingTask<Chunk> genTask = this.genTask;