From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 From: ishland Date: Sun, 15 Nov 2020 10:42:27 +0800 Subject: [PATCH] Multi-threaded RegionFile IO diff --git a/src/main/java/com/destroystokyo/paper/io/PaperFileIOThread.java b/src/main/java/com/destroystokyo/paper/io/PaperFileIOThread.java index 9fe91f9512ee8c2589fc8da76bda5f6d70c9fac4..8b81119cfdef81665a302d96cfada5bb43bc1077 100644 --- a/src/main/java/com/destroystokyo/paper/io/PaperFileIOThread.java +++ b/src/main/java/com/destroystokyo/paper/io/PaperFileIOThread.java @@ -35,7 +35,7 @@ import java.util.function.Function; * @see #scheduleSave(WorldServer, int, int, NBTTagCompound, NBTTagCompound, int) * @see #loadChunkDataAsync(WorldServer, int, int, int, Consumer, boolean, boolean, boolean) */ -public final class PaperFileIOThread extends QueueExecutorThread { +public final class PaperFileIOThread { // Yatopia public static final Logger LOGGER = MinecraftServer.LOGGER; public static final NBTTagCompound FAILURE_VALUE = new NBTTagCompound(); @@ -44,23 +44,84 @@ public final class PaperFileIOThread extends QueueExecutorThread { public static final PaperFileIOThread INSTANCE = new PaperFileIOThread(); + /* Yatopia static { INSTANCE.start(); } + */ } private final AtomicLong writeCounter = new AtomicLong(); + // Yatopia start - multi-threaded RegionFile IO + private final com.ibm.asyncutil.locks.AsyncNamedLock regionFileLock = com.ibm.asyncutil.locks.AsyncNamedLock.createFair(); + private final PrioritizedTaskQueue queue; + private final PaperFileIOThread.FileIOExecutorThread receiver; + private final java.util.Set executorThreads = com.google.common.collect.Sets.newConcurrentHashSet(); + private final java.util.concurrent.ExecutorService executor = java.util.concurrent.Executors.newFixedThreadPool( + org.yatopiamc.yatopia.server.YatopiaConfig.regionFileIOThreadPoolSize == -1 ? Math.min(Runtime.getRuntime().availableProcessors(), Integer.getInteger("paper.maxChunkThreads", 8)) : org.yatopiamc.yatopia.server.YatopiaConfig.regionFileIOThreadPoolSize, + new com.google.common.util.concurrent.ThreadFactoryBuilder() + .setNameFormat("Paper RegionFile IO Worker #%d") + .setPriority(Thread.NORM_PRIORITY - 1) + .setDaemon(true) + .setThreadFactory(r -> { + Thread thr = new Thread(r); + executorThreads.add(thr); + return thr; + }) + .setUncaughtExceptionHandler((t, e) -> { + LOGGER.fatal("Uncaught exception thrown from " + t.getName() + ", report this!", e); + executorThreads.remove(t); + }) + .build() + ); private PaperFileIOThread() { - super(new PrioritizedTaskQueue<>(), (int)(1.0e6)); // 1.0ms spinwait time - this.setName("Paper RegionFile IO Thread"); - this.setPriority(Thread.NORM_PRIORITY - 1); // we keep priority close to normal because threads can wait on us - this.setUncaughtExceptionHandler((final Thread unused, final Throwable thr) -> { - LOGGER.fatal("Uncaught exception thrown from IO thread, report this!", thr); + queue = new PrioritizedTaskQueue<>(); + receiver = new PaperFileIOThread.FileIOExecutorThread(queue, (int) (1.0e6)); // 1.0ms spinwait time + receiver.setName("Paper RegionFile IO Task Receiver"); + receiver.setPriority(Thread.NORM_PRIORITY - 1); // we keep priority close to normal because threads can wait on us + receiver.setUncaughtExceptionHandler((final Thread thread, final Throwable thr) -> { + LOGGER.fatal("Uncaught exception thrown from " + thread.getName() + ", report this!", thr); }); + receiver.start(); + + } + + public void flush() { + receiver.flush(); + final java.util.Set> runningTasks = new java.util.HashSet<>(receiver.runningTasks); + LOGGER.debug("Flushing Chunk IO: Waiting for {} futures", runningTasks.size()); + for(CompletableFuture future: runningTasks) { + try { + future.join(); + } catch (Throwable ignored) { + } + } + } + + private void queueTask(PrioritizedTaskQueue.PrioritizedTask newTask) { + queue.add(newTask); + receiver.notifyTasks(); + } + + public void close(final boolean wait) { + receiver.close(wait, true); + this.flush(); + executor.shutdown(); + while (wait && !executor.isTerminated()) { + try { + executor.awaitTermination(30, java.util.concurrent.TimeUnit.SECONDS); + } catch (InterruptedException e) { + } + } + } + + @SuppressWarnings("BooleanMethodIsAlwaysInverted") + public boolean isOnWorkerThread() { + return executorThreads.contains(Thread.currentThread()); } - /* run() is implemented by superclass */ + // Yatopia end /* * @@ -394,6 +455,85 @@ public final class PaperFileIOThread extends QueueExecutorThread { this.queueTask(new GeneralTask(priority, runnable)); } + // Yatopia start + public static final class RegionFileCoord { + + public final int x; + public final int z; + + public RegionFileCoord(int x, int z) { + this.x = x; + this.z = z; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + RegionFileCoord that = (RegionFileCoord) o; + return x == that.x && z == that.z; + } + + @Override + public int hashCode() { + return java.util.Objects.hash(x, z); + } + } + + final class FileIOExecutorThread extends QueueExecutorThread { + private final java.util.Set> runningTasks = com.google.common.collect.Sets.newConcurrentHashSet(); + + public FileIOExecutorThread(PrioritizedTaskQueue queue, long spinWaitTime) { + super(queue, spinWaitTime); + } + + @Override + protected void preMainLoop() { + runningTasks.removeIf(CompletableFuture::isDone); + } + + @Override + protected boolean pollTasks(boolean flushTasks) { + Runnable task; + boolean ret = false; + + while ((task = this.queue.poll()) != null) { + ret = true; + if (task instanceof ChunkDataTask) { + ChunkDataTask chunkDataTask = (ChunkDataTask) task; + runningTasks.add(regionFileLock.acquireLock(new RegionFileCoord(chunkDataTask.x >> 5, chunkDataTask.z >> 5)) + .thenApplyAsync(lockToken -> { + try { + chunkDataTask.run(); + } finally { + lockToken.releaseLock(); + } + return null; + }, executor) + .exceptionally(throwable -> { + LOGGER.fatal("Exception thrown from prioritized runnable task in thread '" + Thread.currentThread().getName() + "': " + IOUtil.genericToString(chunkDataTask), throwable); + return null; + }).toCompletableFuture()); + } else { + Runnable finalTask = task; + runningTasks.add(CompletableFuture.supplyAsync(() -> { + finalTask.run(); + return null; + }).exceptionally(throwable -> { + LOGGER.fatal("Exception thrown from prioritized runnable task in thread '" + Thread.currentThread().getName() + "': " + IOUtil.genericToString(finalTask), throwable); + return null; + })); + } + } + + if (flushTasks) { + this.handleFlushThreads(false); + } + + return ret; + } + } + // Yatopia end static final class GeneralTask extends PrioritizedTaskQueue.PrioritizedTask implements Runnable { private final Runnable run; diff --git a/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java index ee906b594b306906c170180a29a8b61997d05168..7e348fcb813707fee830082b826932e0bbba1c49 100644 --- a/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java +++ b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java @@ -35,6 +35,7 @@ public class QueueExecutorThread { @javax.annotation.Nullable @Override public NBTTagCompound read(ChunkCoordIntPair chunkcoordintpair) throws java.io.IOException { - if (this.world != null && Thread.currentThread() != com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE) { + if (this.world != null && !com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.isOnWorkerThread()) { // Yatopia NBTTagCompound ret = com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE .loadChunkDataAsyncFuture(this.world, chunkcoordintpair.x, chunkcoordintpair.z, com.destroystokyo.paper.io.IOUtil.getPriorityForCurrentThread(), true, false, true).join().poiData; @@ -519,7 +519,7 @@ public class VillagePlace extends RegionFileSection { @Override public void write(ChunkCoordIntPair chunkcoordintpair, NBTTagCompound nbttagcompound) throws java.io.IOException { - if (this.world != null && Thread.currentThread() != com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE) { + if (this.world != null && !com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.isOnWorkerThread()) { // Yatopia com.destroystokyo.paper.io.PaperFileIOThread.Holder.INSTANCE.scheduleSave( this.world, chunkcoordintpair.x, chunkcoordintpair.z, nbttagcompound, null, com.destroystokyo.paper.io.PrioritizedTaskQueue.NORMAL_PRIORITY); // Tuinity - writes are async, no need for priority diff --git a/src/main/java/org/yatopiamc/yatopia/server/YatopiaConfig.java b/src/main/java/org/yatopiamc/yatopia/server/YatopiaConfig.java index 545bf94bf47f0c6e9da1a4c162e081cbb2cd8390..e8e8e1fed06f0c17631a134e3673a25549d7c86c 100644 --- a/src/main/java/org/yatopiamc/yatopia/server/YatopiaConfig.java +++ b/src/main/java/org/yatopiamc/yatopia/server/YatopiaConfig.java @@ -16,6 +16,7 @@ import org.bukkit.Bukkit; import org.bukkit.command.Command; import org.bukkit.configuration.InvalidConfigurationException; import org.bukkit.configuration.file.YamlConfiguration; +import com.google.common.base.Preconditions; public class YatopiaConfig { @@ -282,4 +283,10 @@ public class YatopiaConfig { allowThreadedFeatures = getBoolean("settings.c2me.allow-threaded-features", allowThreadedFeatures); c2meThreads = getInt("settings.c2me.parallelism", c2meThreads); } + + public static int regionFileIOThreadPoolSize = -1; + private static void multiThreadedRegionFile() { + regionFileIOThreadPoolSize = getInt("settings.threads.regionfile", -1); + Preconditions.checkArgument(regionFileIOThreadPoolSize == -1 || regionFileIOThreadPoolSize > 0, "Invalid settings.threads.regionfile in yatopia.yml"); + } }