mirror of
https://github.com/YatopiaMC/Yatopia.git
synced 2024-12-01 15:13:24 +01:00
308 lines
16 KiB
Diff
308 lines
16 KiB
Diff
From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
|
|
From: ishland <ishlandmc@yeah.net>
|
|
Date: Sun, 15 Nov 2020 10:42:27 +0800
|
|
Subject: [PATCH] Multi-threaded RegionFile IO
|
|
|
|
|
|
diff --git a/src/main/java/com/destroystokyo/paper/io/PaperFileIOThread.java b/src/main/java/com/destroystokyo/paper/io/PaperFileIOThread.java
|
|
index 9fe91f9512ee8c2589fc8da76bda5f6d70c9fac4..8b81119cfdef81665a302d96cfada5bb43bc1077 100644
|
|
--- a/src/main/java/com/destroystokyo/paper/io/PaperFileIOThread.java
|
|
+++ b/src/main/java/com/destroystokyo/paper/io/PaperFileIOThread.java
|
|
@@ -35,7 +35,7 @@ import java.util.function.Function;
|
|
* @see #scheduleSave(WorldServer, int, int, NBTTagCompound, NBTTagCompound, int)
|
|
* @see #loadChunkDataAsync(WorldServer, int, int, int, Consumer, boolean, boolean, boolean)
|
|
*/
|
|
-public final class PaperFileIOThread extends QueueExecutorThread {
|
|
+public final class PaperFileIOThread { // Yatopia
|
|
|
|
public static final Logger LOGGER = MinecraftServer.LOGGER;
|
|
public static final NBTTagCompound FAILURE_VALUE = new NBTTagCompound();
|
|
@@ -44,23 +44,84 @@ public final class PaperFileIOThread extends QueueExecutorThread {
|
|
|
|
public static final PaperFileIOThread INSTANCE = new PaperFileIOThread();
|
|
|
|
+ /* Yatopia
|
|
static {
|
|
INSTANCE.start();
|
|
}
|
|
+ */
|
|
}
|
|
|
|
private final AtomicLong writeCounter = new AtomicLong();
|
|
+ // Yatopia start - multi-threaded RegionFile IO
|
|
+ private final com.ibm.asyncutil.locks.AsyncNamedLock<RegionFileCoord> regionFileLock = com.ibm.asyncutil.locks.AsyncNamedLock.createFair();
|
|
+ private final PrioritizedTaskQueue<PrioritizedTaskQueue.PrioritizedTask> queue;
|
|
+ private final PaperFileIOThread.FileIOExecutorThread receiver;
|
|
+ private final java.util.Set<Thread> executorThreads = com.google.common.collect.Sets.newConcurrentHashSet();
|
|
+ private final java.util.concurrent.ExecutorService executor = java.util.concurrent.Executors.newFixedThreadPool(
|
|
+ org.yatopiamc.yatopia.server.YatopiaConfig.regionFileIOThreadPoolSize == -1 ? Math.min(Runtime.getRuntime().availableProcessors(), Integer.getInteger("paper.maxChunkThreads", 8)) : org.yatopiamc.yatopia.server.YatopiaConfig.regionFileIOThreadPoolSize,
|
|
+ new com.google.common.util.concurrent.ThreadFactoryBuilder()
|
|
+ .setNameFormat("Paper RegionFile IO Worker #%d")
|
|
+ .setPriority(Thread.NORM_PRIORITY - 1)
|
|
+ .setDaemon(true)
|
|
+ .setThreadFactory(r -> {
|
|
+ Thread thr = new Thread(r);
|
|
+ executorThreads.add(thr);
|
|
+ return thr;
|
|
+ })
|
|
+ .setUncaughtExceptionHandler((t, e) -> {
|
|
+ LOGGER.fatal("Uncaught exception thrown from " + t.getName() + ", report this!", e);
|
|
+ executorThreads.remove(t);
|
|
+ })
|
|
+ .build()
|
|
+ );
|
|
|
|
private PaperFileIOThread() {
|
|
- super(new PrioritizedTaskQueue<>(), (int)(1.0e6)); // 1.0ms spinwait time
|
|
- this.setName("Paper RegionFile IO Thread");
|
|
- this.setPriority(Thread.NORM_PRIORITY - 1); // we keep priority close to normal because threads can wait on us
|
|
- this.setUncaughtExceptionHandler((final Thread unused, final Throwable thr) -> {
|
|
- LOGGER.fatal("Uncaught exception thrown from IO thread, report this!", thr);
|
|
+ queue = new PrioritizedTaskQueue<>();
|
|
+ receiver = new PaperFileIOThread.FileIOExecutorThread(queue, (int) (1.0e6)); // 1.0ms spinwait time
|
|
+ receiver.setName("Paper RegionFile IO Task Receiver");
|
|
+ receiver.setPriority(Thread.NORM_PRIORITY - 1); // we keep priority close to normal because threads can wait on us
|
|
+ receiver.setUncaughtExceptionHandler((final Thread thread, final Throwable thr) -> {
|
|
+ LOGGER.fatal("Uncaught exception thrown from " + thread.getName() + ", report this!", thr);
|
|
});
|
|
+ receiver.start();
|
|
+
|
|
+ }
|
|
+
|
|
+ public void flush() {
|
|
+ receiver.flush();
|
|
+ final java.util.Set<CompletableFuture<?>> runningTasks = new java.util.HashSet<>(receiver.runningTasks);
|
|
+ LOGGER.debug("Flushing Chunk IO: Waiting for {} futures", runningTasks.size());
|
|
+ for(CompletableFuture<?> future: runningTasks) {
|
|
+ try {
|
|
+ future.join();
|
|
+ } catch (Throwable ignored) {
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private void queueTask(PrioritizedTaskQueue.PrioritizedTask newTask) {
|
|
+ queue.add(newTask);
|
|
+ receiver.notifyTasks();
|
|
+ }
|
|
+
|
|
+ public void close(final boolean wait) {
|
|
+ receiver.close(wait, true);
|
|
+ this.flush();
|
|
+ executor.shutdown();
|
|
+ while (wait && !executor.isTerminated()) {
|
|
+ try {
|
|
+ executor.awaitTermination(30, java.util.concurrent.TimeUnit.SECONDS);
|
|
+ } catch (InterruptedException e) {
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ @SuppressWarnings("BooleanMethodIsAlwaysInverted")
|
|
+ public boolean isOnWorkerThread() {
|
|
+ return executorThreads.contains(Thread.currentThread());
|
|
}
|
|
|
|
- /* run() is implemented by superclass */
|
|
+ // Yatopia end
|
|
|
|
/*
|
|
*
|
|
@@ -394,6 +455,85 @@ public final class PaperFileIOThread extends QueueExecutorThread {
|
|
this.queueTask(new GeneralTask(priority, runnable));
|
|
}
|
|
|
|
+ // Yatopia start
|
|
+ public static final class RegionFileCoord {
|
|
+
|
|
+ public final int x;
|
|
+ public final int z;
|
|
+
|
|
+ public RegionFileCoord(int x, int z) {
|
|
+ this.x = x;
|
|
+ this.z = z;
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public boolean equals(Object o) {
|
|
+ if (this == o) return true;
|
|
+ if (o == null || getClass() != o.getClass()) return false;
|
|
+ RegionFileCoord that = (RegionFileCoord) o;
|
|
+ return x == that.x && z == that.z;
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public int hashCode() {
|
|
+ return java.util.Objects.hash(x, z);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ final class FileIOExecutorThread<T extends PrioritizedTaskQueue.PrioritizedTask & Runnable> extends QueueExecutorThread<T> {
|
|
+ private final java.util.Set<CompletableFuture<?>> runningTasks = com.google.common.collect.Sets.newConcurrentHashSet();
|
|
+
|
|
+ public FileIOExecutorThread(PrioritizedTaskQueue<T> queue, long spinWaitTime) {
|
|
+ super(queue, spinWaitTime);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ protected void preMainLoop() {
|
|
+ runningTasks.removeIf(CompletableFuture::isDone);
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ protected boolean pollTasks(boolean flushTasks) {
|
|
+ Runnable task;
|
|
+ boolean ret = false;
|
|
+
|
|
+ while ((task = this.queue.poll()) != null) {
|
|
+ ret = true;
|
|
+ if (task instanceof ChunkDataTask) {
|
|
+ ChunkDataTask chunkDataTask = (ChunkDataTask) task;
|
|
+ runningTasks.add(regionFileLock.acquireLock(new RegionFileCoord(chunkDataTask.x >> 5, chunkDataTask.z >> 5))
|
|
+ .thenApplyAsync(lockToken -> {
|
|
+ try {
|
|
+ chunkDataTask.run();
|
|
+ } finally {
|
|
+ lockToken.releaseLock();
|
|
+ }
|
|
+ return null;
|
|
+ }, executor)
|
|
+ .exceptionally(throwable -> {
|
|
+ LOGGER.fatal("Exception thrown from prioritized runnable task in thread '" + Thread.currentThread().getName() + "': " + IOUtil.genericToString(chunkDataTask), throwable);
|
|
+ return null;
|
|
+ }).toCompletableFuture());
|
|
+ } else {
|
|
+ Runnable finalTask = task;
|
|
+ runningTasks.add(CompletableFuture.supplyAsync(() -> {
|
|
+ finalTask.run();
|
|
+ return null;
|
|
+ }).exceptionally(throwable -> {
|
|
+ LOGGER.fatal("Exception thrown from prioritized runnable task in thread '" + Thread.currentThread().getName() + "': " + IOUtil.genericToString(finalTask), throwable);
|
|
+ return null;
|
|
+ }));
|
|
+ }
|
|
+ }
|
|
+
|
|
+ if (flushTasks) {
|
|
+ this.handleFlushThreads(false);
|
|
+ }
|
|
+
|
|
+ return ret;
|
|
+ }
|
|
+ }
|
|
+ // Yatopia end
|
|
static final class GeneralTask extends PrioritizedTaskQueue.PrioritizedTask implements Runnable {
|
|
|
|
private final Runnable run;
|
|
diff --git a/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java
|
|
index ee906b594b306906c170180a29a8b61997d05168..7e348fcb813707fee830082b826932e0bbba1c49 100644
|
|
--- a/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java
|
|
+++ b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java
|
|
@@ -35,6 +35,7 @@ public class QueueExecutorThread<T extends PrioritizedTaskQueue.PrioritizedTask
|
|
final long spinWaitTime = this.spinWaitTime;
|
|
main_loop:
|
|
for (;;) {
|
|
+ preMainLoop(); // Yatopia
|
|
this.pollTasks(true);
|
|
|
|
// spinwait
|
|
@@ -81,11 +82,17 @@ public class QueueExecutorThread<T extends PrioritizedTaskQueue.PrioritizedTask
|
|
// LockSupport.park() can fail for any reason
|
|
do {
|
|
Thread.interrupted();
|
|
- LockSupport.park("Waiting on tasks");
|
|
+ LockSupport.parkNanos("Waiting on tasks", 1_000_000_000); // Yatopia - prevent dead lock
|
|
} while (this.parked.get());
|
|
}
|
|
}
|
|
|
|
+ // Yatopia start
|
|
+ protected void preMainLoop() {
|
|
+ // Used for PaperFileIOThread
|
|
+ }
|
|
+ // Yatopia end
|
|
+
|
|
protected boolean handleClose() {
|
|
if (this.closed) {
|
|
this.pollTasks(true); // this ensures we've emptied the queue
|
|
diff --git a/src/main/java/net/minecraft/server/MinecraftServer.java b/src/main/java/net/minecraft/server/MinecraftServer.java
|
|
index 6fd059412e07e1b3b2597b693df5a4f439ebe382..d42d5c7153a655da65d1847541b9f15137b53dbb 100644
|
|
--- a/src/main/java/net/minecraft/server/MinecraftServer.java
|
|
+++ b/src/main/java/net/minecraft/server/MinecraftServer.java
|
|
@@ -962,7 +962,7 @@ public abstract class MinecraftServer extends IAsyncTaskHandlerReentrant<TickTas
|
|
// Spigot end
|
|
// Paper start - move final shutdown items here
|
|
LOGGER.info("Flushing Chunk IO");
|
|
- com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.close(true, true); // Paper
|
|
+ com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.close(true); // Paper // Yatopia
|
|
LOGGER.info("Closing Thread Pool");
|
|
SystemUtils.shutdownServerThreadPool(); // Paper
|
|
LOGGER.info("Closing Server");
|
|
diff --git a/src/main/java/net/minecraft/server/level/PlayerChunkMap.java b/src/main/java/net/minecraft/server/level/PlayerChunkMap.java
|
|
index 9b3aa025069f45bf94949cbf626ccfa69a646845..aa4b548dbf3366e5bb0df3763be9bdc1c4e9d34d 100644
|
|
--- a/src/main/java/net/minecraft/server/level/PlayerChunkMap.java
|
|
+++ b/src/main/java/net/minecraft/server/level/PlayerChunkMap.java
|
|
@@ -1725,7 +1725,7 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d {
|
|
@Nullable
|
|
@Override
|
|
public NBTTagCompound read(ChunkCoordIntPair chunkcoordintpair) throws IOException {
|
|
- if (Thread.currentThread() != com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE) {
|
|
+ if (!com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.isOnWorkerThread()) { // Yatopia
|
|
NBTTagCompound ret = com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE
|
|
.loadChunkDataAsyncFuture(this.world, chunkcoordintpair.x, chunkcoordintpair.z, com.destroystokyo.paper.io.IOUtil.getPriorityForCurrentThread(),
|
|
false, true, true).join().chunkData;
|
|
@@ -1740,7 +1740,7 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d {
|
|
|
|
@Override
|
|
public void write(ChunkCoordIntPair chunkcoordintpair, NBTTagCompound nbttagcompound) throws IOException {
|
|
- if (Thread.currentThread() != com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE) {
|
|
+ if (!com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.isOnWorkerThread()) { // Yatopia
|
|
com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.scheduleSave(
|
|
this.world, chunkcoordintpair.x, chunkcoordintpair.z, null, nbttagcompound,
|
|
com.destroystokyo.paper.io.PrioritizedTaskQueue.NORMAL_PRIORITY); // Tuinity - writes are async, no need for priority
|
|
diff --git a/src/main/java/net/minecraft/world/entity/ai/village/poi/VillagePlace.java b/src/main/java/net/minecraft/world/entity/ai/village/poi/VillagePlace.java
|
|
index 29cd71efe86eea2227f373c15c39dc530e9e8199..55d497f4305de50e58c7e0788724576f81128138 100644
|
|
--- a/src/main/java/net/minecraft/world/entity/ai/village/poi/VillagePlace.java
|
|
+++ b/src/main/java/net/minecraft/world/entity/ai/village/poi/VillagePlace.java
|
|
@@ -504,7 +504,7 @@ public class VillagePlace extends RegionFileSection<VillagePlaceSection> {
|
|
@javax.annotation.Nullable
|
|
@Override
|
|
public NBTTagCompound read(ChunkCoordIntPair chunkcoordintpair) throws java.io.IOException {
|
|
- if (this.world != null && Thread.currentThread() != com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE) {
|
|
+ if (this.world != null && !com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.isOnWorkerThread()) { // Yatopia
|
|
NBTTagCompound ret = com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE
|
|
.loadChunkDataAsyncFuture(this.world, chunkcoordintpair.x, chunkcoordintpair.z, com.destroystokyo.paper.io.IOUtil.getPriorityForCurrentThread(),
|
|
true, false, true).join().poiData;
|
|
@@ -519,7 +519,7 @@ public class VillagePlace extends RegionFileSection<VillagePlaceSection> {
|
|
|
|
@Override
|
|
public void write(ChunkCoordIntPair chunkcoordintpair, NBTTagCompound nbttagcompound) throws java.io.IOException {
|
|
- if (this.world != null && Thread.currentThread() != com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE) {
|
|
+ if (this.world != null && !com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.isOnWorkerThread()) { // Yatopia
|
|
com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.scheduleSave(
|
|
this.world, chunkcoordintpair.x, chunkcoordintpair.z, nbttagcompound, null,
|
|
com.destroystokyo.paper.io.PrioritizedTaskQueue.NORMAL_PRIORITY); // Tuinity - writes are async, no need for priority
|
|
diff --git a/src/main/java/org/yatopiamc/yatopia/server/YatopiaConfig.java b/src/main/java/org/yatopiamc/yatopia/server/YatopiaConfig.java
|
|
index 545bf94bf47f0c6e9da1a4c162e081cbb2cd8390..e8e8e1fed06f0c17631a134e3673a25549d7c86c 100644
|
|
--- a/src/main/java/org/yatopiamc/yatopia/server/YatopiaConfig.java
|
|
+++ b/src/main/java/org/yatopiamc/yatopia/server/YatopiaConfig.java
|
|
@@ -16,6 +16,7 @@ import org.bukkit.Bukkit;
|
|
import org.bukkit.command.Command;
|
|
import org.bukkit.configuration.InvalidConfigurationException;
|
|
import org.bukkit.configuration.file.YamlConfiguration;
|
|
+import com.google.common.base.Preconditions;
|
|
|
|
public class YatopiaConfig {
|
|
|
|
@@ -282,4 +283,10 @@ public class YatopiaConfig {
|
|
allowThreadedFeatures = getBoolean("settings.c2me.allow-threaded-features", allowThreadedFeatures);
|
|
c2meThreads = getInt("settings.c2me.parallelism", c2meThreads);
|
|
}
|
|
+
|
|
+ public static int regionFileIOThreadPoolSize = -1;
|
|
+ private static void multiThreadedRegionFile() {
|
|
+ regionFileIOThreadPoolSize = getInt("settings.threads.regionfile", -1);
|
|
+ Preconditions.checkArgument(regionFileIOThreadPoolSize == -1 || regionFileIOThreadPoolSize > 0, "Invalid settings.threads.regionfile in yatopia.yml");
|
|
+ }
|
|
}
|