Yatopia/patches/removed/server/0072-Multi-threaded-RegionFile-IO.patch
2021-05-14 14:53:43 -04:00

308 lines
16 KiB
Diff

From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
From: ishland <ishlandmc@yeah.net>
Date: Sun, 15 Nov 2020 10:42:27 +0800
Subject: [PATCH] Multi-threaded RegionFile IO
diff --git a/src/main/java/com/destroystokyo/paper/io/PaperFileIOThread.java b/src/main/java/com/destroystokyo/paper/io/PaperFileIOThread.java
index 9fe91f9512ee8c2589fc8da76bda5f6d70c9fac4..8b81119cfdef81665a302d96cfada5bb43bc1077 100644
--- a/src/main/java/com/destroystokyo/paper/io/PaperFileIOThread.java
+++ b/src/main/java/com/destroystokyo/paper/io/PaperFileIOThread.java
@@ -35,7 +35,7 @@ import java.util.function.Function;
* @see #scheduleSave(WorldServer, int, int, NBTTagCompound, NBTTagCompound, int)
* @see #loadChunkDataAsync(WorldServer, int, int, int, Consumer, boolean, boolean, boolean)
*/
-public final class PaperFileIOThread extends QueueExecutorThread {
+public final class PaperFileIOThread { // Yatopia
public static final Logger LOGGER = MinecraftServer.LOGGER;
public static final NBTTagCompound FAILURE_VALUE = new NBTTagCompound();
@@ -44,23 +44,84 @@ public final class PaperFileIOThread extends QueueExecutorThread {
public static final PaperFileIOThread INSTANCE = new PaperFileIOThread();
+ /* Yatopia
static {
INSTANCE.start();
}
+ */
}
private final AtomicLong writeCounter = new AtomicLong();
+ // Yatopia start - multi-threaded RegionFile IO
+ private final com.ibm.asyncutil.locks.AsyncNamedLock<RegionFileCoord> regionFileLock = com.ibm.asyncutil.locks.AsyncNamedLock.createFair();
+ private final PrioritizedTaskQueue<PrioritizedTaskQueue.PrioritizedTask> queue;
+ private final PaperFileIOThread.FileIOExecutorThread receiver;
+ private final java.util.Set<Thread> executorThreads = com.google.common.collect.Sets.newConcurrentHashSet();
+ private final java.util.concurrent.ExecutorService executor = java.util.concurrent.Executors.newFixedThreadPool(
+ org.yatopiamc.yatopia.server.YatopiaConfig.regionFileIOThreadPoolSize == -1 ? Math.min(Runtime.getRuntime().availableProcessors(), Integer.getInteger("paper.maxChunkThreads", 8)) : org.yatopiamc.yatopia.server.YatopiaConfig.regionFileIOThreadPoolSize,
+ new com.google.common.util.concurrent.ThreadFactoryBuilder()
+ .setNameFormat("Paper RegionFile IO Worker #%d")
+ .setPriority(Thread.NORM_PRIORITY - 1)
+ .setDaemon(true)
+ .setThreadFactory(r -> {
+ Thread thr = new Thread(r);
+ executorThreads.add(thr);
+ return thr;
+ })
+ .setUncaughtExceptionHandler((t, e) -> {
+ LOGGER.fatal("Uncaught exception thrown from " + t.getName() + ", report this!", e);
+ executorThreads.remove(t);
+ })
+ .build()
+ );
private PaperFileIOThread() {
- super(new PrioritizedTaskQueue<>(), (int)(1.0e6)); // 1.0ms spinwait time
- this.setName("Paper RegionFile IO Thread");
- this.setPriority(Thread.NORM_PRIORITY - 1); // we keep priority close to normal because threads can wait on us
- this.setUncaughtExceptionHandler((final Thread unused, final Throwable thr) -> {
- LOGGER.fatal("Uncaught exception thrown from IO thread, report this!", thr);
+ queue = new PrioritizedTaskQueue<>();
+ receiver = new PaperFileIOThread.FileIOExecutorThread(queue, (int) (1.0e6)); // 1.0ms spinwait time
+ receiver.setName("Paper RegionFile IO Task Receiver");
+ receiver.setPriority(Thread.NORM_PRIORITY - 1); // we keep priority close to normal because threads can wait on us
+ receiver.setUncaughtExceptionHandler((final Thread thread, final Throwable thr) -> {
+ LOGGER.fatal("Uncaught exception thrown from " + thread.getName() + ", report this!", thr);
});
+ receiver.start();
+
+ }
+
+ public void flush() {
+ receiver.flush();
+ final java.util.Set<CompletableFuture<?>> runningTasks = new java.util.HashSet<>(receiver.runningTasks);
+ LOGGER.debug("Flushing Chunk IO: Waiting for {} futures", runningTasks.size());
+ for(CompletableFuture<?> future: runningTasks) {
+ try {
+ future.join();
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+
+ private void queueTask(PrioritizedTaskQueue.PrioritizedTask newTask) {
+ queue.add(newTask);
+ receiver.notifyTasks();
+ }
+
+ public void close(final boolean wait) {
+ receiver.close(wait, true);
+ this.flush();
+ executor.shutdown();
+ while (wait && !executor.isTerminated()) {
+ try {
+ executor.awaitTermination(30, java.util.concurrent.TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ @SuppressWarnings("BooleanMethodIsAlwaysInverted")
+ public boolean isOnWorkerThread() {
+ return executorThreads.contains(Thread.currentThread());
}
- /* run() is implemented by superclass */
+ // Yatopia end
/*
*
@@ -394,6 +455,85 @@ public final class PaperFileIOThread extends QueueExecutorThread {
this.queueTask(new GeneralTask(priority, runnable));
}
+ // Yatopia start
+ public static final class RegionFileCoord {
+
+ public final int x;
+ public final int z;
+
+ public RegionFileCoord(int x, int z) {
+ this.x = x;
+ this.z = z;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ RegionFileCoord that = (RegionFileCoord) o;
+ return x == that.x && z == that.z;
+ }
+
+ @Override
+ public int hashCode() {
+ return java.util.Objects.hash(x, z);
+ }
+ }
+
+ final class FileIOExecutorThread<T extends PrioritizedTaskQueue.PrioritizedTask & Runnable> extends QueueExecutorThread<T> {
+ private final java.util.Set<CompletableFuture<?>> runningTasks = com.google.common.collect.Sets.newConcurrentHashSet();
+
+ public FileIOExecutorThread(PrioritizedTaskQueue<T> queue, long spinWaitTime) {
+ super(queue, spinWaitTime);
+ }
+
+ @Override
+ protected void preMainLoop() {
+ runningTasks.removeIf(CompletableFuture::isDone);
+ }
+
+ @Override
+ protected boolean pollTasks(boolean flushTasks) {
+ Runnable task;
+ boolean ret = false;
+
+ while ((task = this.queue.poll()) != null) {
+ ret = true;
+ if (task instanceof ChunkDataTask) {
+ ChunkDataTask chunkDataTask = (ChunkDataTask) task;
+ runningTasks.add(regionFileLock.acquireLock(new RegionFileCoord(chunkDataTask.x >> 5, chunkDataTask.z >> 5))
+ .thenApplyAsync(lockToken -> {
+ try {
+ chunkDataTask.run();
+ } finally {
+ lockToken.releaseLock();
+ }
+ return null;
+ }, executor)
+ .exceptionally(throwable -> {
+ LOGGER.fatal("Exception thrown from prioritized runnable task in thread '" + Thread.currentThread().getName() + "': " + IOUtil.genericToString(chunkDataTask), throwable);
+ return null;
+ }).toCompletableFuture());
+ } else {
+ Runnable finalTask = task;
+ runningTasks.add(CompletableFuture.supplyAsync(() -> {
+ finalTask.run();
+ return null;
+ }).exceptionally(throwable -> {
+ LOGGER.fatal("Exception thrown from prioritized runnable task in thread '" + Thread.currentThread().getName() + "': " + IOUtil.genericToString(finalTask), throwable);
+ return null;
+ }));
+ }
+ }
+
+ if (flushTasks) {
+ this.handleFlushThreads(false);
+ }
+
+ return ret;
+ }
+ }
+ // Yatopia end
static final class GeneralTask extends PrioritizedTaskQueue.PrioritizedTask implements Runnable {
private final Runnable run;
diff --git a/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java
index ee906b594b306906c170180a29a8b61997d05168..7e348fcb813707fee830082b826932e0bbba1c49 100644
--- a/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java
+++ b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java
@@ -35,6 +35,7 @@ public class QueueExecutorThread<T extends PrioritizedTaskQueue.PrioritizedTask
final long spinWaitTime = this.spinWaitTime;
main_loop:
for (;;) {
+ preMainLoop(); // Yatopia
this.pollTasks(true);
// spinwait
@@ -81,11 +82,17 @@ public class QueueExecutorThread<T extends PrioritizedTaskQueue.PrioritizedTask
// LockSupport.park() can fail for any reason
do {
Thread.interrupted();
- LockSupport.park("Waiting on tasks");
+ LockSupport.parkNanos("Waiting on tasks", 1_000_000_000); // Yatopia - prevent dead lock
} while (this.parked.get());
}
}
+ // Yatopia start
+ protected void preMainLoop() {
+ // Used for PaperFileIOThread
+ }
+ // Yatopia end
+
protected boolean handleClose() {
if (this.closed) {
this.pollTasks(true); // this ensures we've emptied the queue
diff --git a/src/main/java/net/minecraft/server/MinecraftServer.java b/src/main/java/net/minecraft/server/MinecraftServer.java
index 6fd059412e07e1b3b2597b693df5a4f439ebe382..d42d5c7153a655da65d1847541b9f15137b53dbb 100644
--- a/src/main/java/net/minecraft/server/MinecraftServer.java
+++ b/src/main/java/net/minecraft/server/MinecraftServer.java
@@ -962,7 +962,7 @@ public abstract class MinecraftServer extends IAsyncTaskHandlerReentrant<TickTas
// Spigot end
// Paper start - move final shutdown items here
LOGGER.info("Flushing Chunk IO");
- com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.close(true, true); // Paper
+ com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.close(true); // Paper // Yatopia
LOGGER.info("Closing Thread Pool");
SystemUtils.shutdownServerThreadPool(); // Paper
LOGGER.info("Closing Server");
diff --git a/src/main/java/net/minecraft/server/level/PlayerChunkMap.java b/src/main/java/net/minecraft/server/level/PlayerChunkMap.java
index 9b3aa025069f45bf94949cbf626ccfa69a646845..aa4b548dbf3366e5bb0df3763be9bdc1c4e9d34d 100644
--- a/src/main/java/net/minecraft/server/level/PlayerChunkMap.java
+++ b/src/main/java/net/minecraft/server/level/PlayerChunkMap.java
@@ -1725,7 +1725,7 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d {
@Nullable
@Override
public NBTTagCompound read(ChunkCoordIntPair chunkcoordintpair) throws IOException {
- if (Thread.currentThread() != com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE) {
+ if (!com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.isOnWorkerThread()) { // Yatopia
NBTTagCompound ret = com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE
.loadChunkDataAsyncFuture(this.world, chunkcoordintpair.x, chunkcoordintpair.z, com.destroystokyo.paper.io.IOUtil.getPriorityForCurrentThread(),
false, true, true).join().chunkData;
@@ -1740,7 +1740,7 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d {
@Override
public void write(ChunkCoordIntPair chunkcoordintpair, NBTTagCompound nbttagcompound) throws IOException {
- if (Thread.currentThread() != com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE) {
+ if (!com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.isOnWorkerThread()) { // Yatopia
com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.scheduleSave(
this.world, chunkcoordintpair.x, chunkcoordintpair.z, null, nbttagcompound,
com.destroystokyo.paper.io.PrioritizedTaskQueue.NORMAL_PRIORITY); // Tuinity - writes are async, no need for priority
diff --git a/src/main/java/net/minecraft/world/entity/ai/village/poi/VillagePlace.java b/src/main/java/net/minecraft/world/entity/ai/village/poi/VillagePlace.java
index 29cd71efe86eea2227f373c15c39dc530e9e8199..55d497f4305de50e58c7e0788724576f81128138 100644
--- a/src/main/java/net/minecraft/world/entity/ai/village/poi/VillagePlace.java
+++ b/src/main/java/net/minecraft/world/entity/ai/village/poi/VillagePlace.java
@@ -504,7 +504,7 @@ public class VillagePlace extends RegionFileSection<VillagePlaceSection> {
@javax.annotation.Nullable
@Override
public NBTTagCompound read(ChunkCoordIntPair chunkcoordintpair) throws java.io.IOException {
- if (this.world != null && Thread.currentThread() != com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE) {
+ if (this.world != null && !com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.isOnWorkerThread()) { // Yatopia
NBTTagCompound ret = com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE
.loadChunkDataAsyncFuture(this.world, chunkcoordintpair.x, chunkcoordintpair.z, com.destroystokyo.paper.io.IOUtil.getPriorityForCurrentThread(),
true, false, true).join().poiData;
@@ -519,7 +519,7 @@ public class VillagePlace extends RegionFileSection<VillagePlaceSection> {
@Override
public void write(ChunkCoordIntPair chunkcoordintpair, NBTTagCompound nbttagcompound) throws java.io.IOException {
- if (this.world != null && Thread.currentThread() != com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE) {
+ if (this.world != null && !com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.isOnWorkerThread()) { // Yatopia
com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.scheduleSave(
this.world, chunkcoordintpair.x, chunkcoordintpair.z, nbttagcompound, null,
com.destroystokyo.paper.io.PrioritizedTaskQueue.NORMAL_PRIORITY); // Tuinity - writes are async, no need for priority
diff --git a/src/main/java/org/yatopiamc/yatopia/server/YatopiaConfig.java b/src/main/java/org/yatopiamc/yatopia/server/YatopiaConfig.java
index 545bf94bf47f0c6e9da1a4c162e081cbb2cd8390..e8e8e1fed06f0c17631a134e3673a25549d7c86c 100644
--- a/src/main/java/org/yatopiamc/yatopia/server/YatopiaConfig.java
+++ b/src/main/java/org/yatopiamc/yatopia/server/YatopiaConfig.java
@@ -16,6 +16,7 @@ import org.bukkit.Bukkit;
import org.bukkit.command.Command;
import org.bukkit.configuration.InvalidConfigurationException;
import org.bukkit.configuration.file.YamlConfiguration;
+import com.google.common.base.Preconditions;
public class YatopiaConfig {
@@ -282,4 +283,10 @@ public class YatopiaConfig {
allowThreadedFeatures = getBoolean("settings.c2me.allow-threaded-features", allowThreadedFeatures);
c2meThreads = getInt("settings.c2me.parallelism", c2meThreads);
}
+
+ public static int regionFileIOThreadPoolSize = -1;
+ private static void multiThreadedRegionFile() {
+ regionFileIOThreadPoolSize = getInt("settings.threads.regionfile", -1);
+ Preconditions.checkArgument(regionFileIOThreadPoolSize == -1 || regionFileIOThreadPoolSize > 0, "Invalid settings.threads.regionfile in yatopia.yml");
+ }
}