World generation/loading in virtual threads (#2400)

This commit is contained in:
TheMode 2024-09-17 00:52:12 +02:00 committed by Matt Worzala
parent 62da78bb45
commit c092e7dd0b
5 changed files with 153 additions and 155 deletions

View File

@ -2,15 +2,11 @@ package net.minestom.server.instance;
import net.minestom.server.MinecraftServer;
import net.minestom.server.instance.anvil.AnvilLoader;
import net.minestom.server.utils.async.AsyncUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.Phaser;
/**
* Interface implemented to change the way chunks are loaded/saved.
@ -37,22 +33,19 @@ public interface IChunkLoader {
* @param instance the {@link Instance} where the {@link Chunk} belong
* @param chunkX the chunk X
* @param chunkZ the chunk Z
* @return a {@link CompletableFuture} containing the chunk, or null if not present
* @return the chunk, or null if not present
*/
@NotNull CompletableFuture<@Nullable Chunk> loadChunk(@NotNull Instance instance, int chunkX, int chunkZ);
@Nullable Chunk loadChunk(@NotNull Instance instance, int chunkX, int chunkZ);
default @NotNull CompletableFuture<Void> saveInstance(@NotNull Instance instance) {
return AsyncUtils.VOID_FUTURE;
default void saveInstance(@NotNull Instance instance) {
}
/**
* Saves a {@link Chunk} with an optional callback for when it is done.
*
* @param chunk the {@link Chunk} to save
* @return a {@link CompletableFuture} executed when the {@link Chunk} is done saving,
* should be called even if the saving failed (you can throw an exception).
*/
@NotNull CompletableFuture<Void> saveChunk(@NotNull Chunk chunk);
void saveChunk(@NotNull Chunk chunk);
/**
* Saves multiple chunks with an optional callback for when it is done.
@ -60,37 +53,31 @@ public interface IChunkLoader {
* Implementations need to check {@link #supportsParallelSaving()} to support the feature if possible.
*
* @param chunks the chunks to save
* @return a {@link CompletableFuture} executed when the {@link Chunk} is done saving,
* should be called even if the saving failed (you can throw an exception).
*/
default @NotNull CompletableFuture<Void> saveChunks(@NotNull Collection<Chunk> chunks) {
default void saveChunks(@NotNull Collection<Chunk> chunks) {
if (supportsParallelSaving()) {
ExecutorService parallelSavingThreadPool = ForkJoinPool.commonPool();
chunks.forEach(c -> parallelSavingThreadPool.execute(() -> saveChunk(c)));
try {
parallelSavingThreadPool.shutdown();
parallelSavingThreadPool.awaitTermination(1L, java.util.concurrent.TimeUnit.DAYS);
} catch (InterruptedException e) {
MinecraftServer.getExceptionManager().handleException(e);
}
return AsyncUtils.VOID_FUTURE;
} else {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
AtomicInteger counter = new AtomicInteger();
Phaser phaser = new Phaser(1);
for (Chunk chunk : chunks) {
saveChunk(chunk).whenComplete((unused, throwable) -> {
final boolean isLast = counter.incrementAndGet() == chunks.size();
if (isLast) {
completableFuture.complete(null);
phaser.register();
Thread.startVirtualThread(() -> {
try {
saveChunk(chunk);
phaser.arriveAndDeregister();
} catch (Throwable e) {
MinecraftServer.getExceptionManager().handleException(e);
}
});
}
return completableFuture;
phaser.arriveAndAwaitAdvance();
} else {
for (Chunk chunk : chunks) {
saveChunk(chunk);
}
}
}
/**
* Does this {@link IChunkLoader} allow for multi-threaded saving of {@link Chunk}?
* Supports for instance/chunk saving in virtual threads.
*
* @return true if the chunk loader supports parallel saving
*/
@ -99,7 +86,7 @@ public interface IChunkLoader {
}
/**
* Does this {@link IChunkLoader} allow for multi-threaded loading of {@link Chunk}?
* Supports for instance/chunk loading in virtual threads.
*
* @return true if the chunk loader supports parallel loading
*/
@ -114,5 +101,6 @@ public interface IChunkLoader {
*
* @param chunk the chunk to unload
*/
default void unloadChunk(Chunk chunk) {}
default void unloadChunk(Chunk chunk) {
}
}

View File

@ -44,9 +44,9 @@ import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static net.minestom.server.utils.chunk.ChunkUtils.isLoaded;
@ -282,17 +282,37 @@ public class InstanceContainer extends Instance {
@Override
public @NotNull CompletableFuture<Void> saveInstance() {
return chunkLoader.saveInstance(this);
final IChunkLoader chunkLoader = this.chunkLoader;
return optionalAsync(chunkLoader.supportsParallelSaving(), () -> chunkLoader.saveInstance(this));
}
@Override
public @NotNull CompletableFuture<Void> saveChunkToStorage(@NotNull Chunk chunk) {
return chunkLoader.saveChunk(chunk);
final IChunkLoader chunkLoader = this.chunkLoader;
return optionalAsync(chunkLoader.supportsParallelSaving(), () -> chunkLoader.saveChunk(chunk));
}
@Override
public @NotNull CompletableFuture<Void> saveChunksToStorage() {
return chunkLoader.saveChunks(getChunks());
final IChunkLoader chunkLoader = this.chunkLoader;
return optionalAsync(chunkLoader.supportsParallelSaving(), () -> chunkLoader.saveChunks(getChunks()));
}
private CompletableFuture<Void> optionalAsync(boolean async, Runnable runnable) {
if (!async) {
runnable.run();
return CompletableFuture.completedFuture(null);
}
final CompletableFuture<Void> future = new CompletableFuture<>();
Thread.startVirtualThread(() -> {
try {
runnable.run();
future.complete(null);
} catch (Throwable e) {
MinecraftServer.getExceptionManager().handleException(e);
}
});
return future;
}
protected @NotNull CompletableFuture<@NotNull Chunk> retrieveChunk(int chunkX, int chunkZ) {
@ -301,108 +321,107 @@ public class InstanceContainer extends Instance {
final CompletableFuture<Chunk> prev = loadingChunks.putIfAbsent(index, completableFuture);
if (prev != null) return prev;
final IChunkLoader loader = chunkLoader;
final Runnable retriever = () -> loader.loadChunk(this, chunkX, chunkZ)
.thenCompose(chunk -> {
if (chunk != null) {
// Chunk has been loaded from storage
return CompletableFuture.completedFuture(chunk);
} else {
// Loader couldn't load the chunk, generate it
return createChunk(chunkX, chunkZ).whenComplete((c, a) -> c.onGenerate());
}
})
// cache the retrieved chunk
.thenAccept(chunk -> {
// TODO run in the instance thread?
cacheChunk(chunk);
chunk.onLoad();
final Consumer<Chunk> generate = chunk -> {
if (chunk == null) {
// Loader couldn't load the chunk, generate it
chunk = createChunk(chunkX, chunkZ);
chunk.onGenerate();
}
EventDispatcher.call(new InstanceChunkLoadEvent(this, chunk));
final CompletableFuture<Chunk> future = this.loadingChunks.remove(index);
assert future == completableFuture : "Invalid future: " + future;
completableFuture.complete(chunk);
})
.exceptionally(throwable -> {
MinecraftServer.getExceptionManager().handleException(throwable);
return null;
});
// TODO run in the instance thread?
cacheChunk(chunk);
chunk.onLoad();
EventDispatcher.call(new InstanceChunkLoadEvent(this, chunk));
final CompletableFuture<Chunk> future = this.loadingChunks.remove(index);
assert future == completableFuture : "Invalid future: " + future;
completableFuture.complete(chunk);
};
if (loader.supportsParallelLoading()) {
CompletableFuture.runAsync(retriever);
Thread.startVirtualThread(() -> {
try {
final Chunk chunk = loader.loadChunk(this, chunkX, chunkZ);
generate.accept(chunk);
} catch (Throwable e) {
MinecraftServer.getExceptionManager().handleException(e);
}
});
} else {
retriever.run();
final Chunk chunk = loader.loadChunk(this, chunkX, chunkZ);
Thread.startVirtualThread(() -> {
try {
generate.accept(chunk);
} catch (Throwable e) {
MinecraftServer.getExceptionManager().handleException(e);
}
});
}
return completableFuture;
}
Map<Long, List<GeneratorImpl.SectionModifierImpl>> generationForks = new ConcurrentHashMap<>();
protected @NotNull CompletableFuture<@NotNull Chunk> createChunk(int chunkX, int chunkZ) {
protected @NotNull Chunk createChunk(int chunkX, int chunkZ) {
final Chunk chunk = chunkSupplier.createChunk(this, chunkX, chunkZ);
Check.notNull(chunk, "Chunks supplied by a ChunkSupplier cannot be null.");
Generator generator = generator();
if (generator != null && chunk.shouldGenerate()) {
CompletableFuture<Chunk> resultFuture = new CompletableFuture<>();
// TODO: virtual thread once Loom is available
ForkJoinPool.commonPool().submit(() -> {
GeneratorImpl.GenSection[] genSections = new GeneratorImpl.GenSection[chunk.getSections().size()];
Arrays.setAll(genSections, i -> {
Section section = chunk.getSections().get(i);
return new GeneratorImpl.GenSection(section.blockPalette(), section.biomePalette());
});
var chunkUnit = GeneratorImpl.chunk(MinecraftServer.getBiomeRegistry(), genSections,
chunk.getChunkX(), chunk.minSection, chunk.getChunkZ());
try {
// Generate block/biome palette
generator.generate(chunkUnit);
// Apply nbt/handler
if (chunkUnit.modifier() instanceof GeneratorImpl.AreaModifierImpl chunkModifier) {
for (var section : chunkModifier.sections()) {
if (section.modifier() instanceof GeneratorImpl.SectionModifierImpl sectionModifier) {
applyGenerationData(chunk, sectionModifier);
}
}
}
// Register forks or apply locally
for (var fork : chunkUnit.forks()) {
var sections = ((GeneratorImpl.AreaModifierImpl) fork.modifier()).sections();
for (var section : sections) {
if (section.modifier() instanceof GeneratorImpl.SectionModifierImpl sectionModifier) {
if (sectionModifier.genSection().blocks().count() == 0)
continue;
final Point start = section.absoluteStart();
final Chunk forkChunk = start.chunkX() == chunkX && start.chunkZ() == chunkZ ? chunk : getChunkAt(start);
if (forkChunk != null) {
applyFork(forkChunk, sectionModifier);
// Update players
forkChunk.invalidate();
forkChunk.sendChunk();
} else {
final long index = CoordConversion.chunkIndex(start);
this.generationForks.compute(index, (i, sectionModifiers) -> {
if (sectionModifiers == null) sectionModifiers = new ArrayList<>();
sectionModifiers.add(sectionModifier);
return sectionModifiers;
});
}
}
}
}
// Apply awaiting forks
processFork(chunk);
} catch (Throwable e) {
MinecraftServer.getExceptionManager().handleException(e);
} finally {
// End generation
refreshLastBlockChangeTime();
resultFuture.complete(chunk);
}
});
return resultFuture;
} else {
if (generator == null || !chunk.shouldGenerate()) {
// No chunk generator, execute the callback with the empty chunk
processFork(chunk);
return CompletableFuture.completedFuture(chunk);
return chunk;
}
GeneratorImpl.GenSection[] genSections = new GeneratorImpl.GenSection[chunk.getSections().size()];
Arrays.setAll(genSections, i -> {
Section section = chunk.getSections().get(i);
return new GeneratorImpl.GenSection(section.blockPalette(), section.biomePalette());
});
var chunkUnit = GeneratorImpl.chunk(MinecraftServer.getBiomeRegistry(), genSections,
chunk.getChunkX(), chunk.minSection, chunk.getChunkZ());
try {
// Generate block/biome palette
generator.generate(chunkUnit);
// Apply nbt/handler
if (chunkUnit.modifier() instanceof GeneratorImpl.AreaModifierImpl chunkModifier) {
for (var section : chunkModifier.sections()) {
if (section.modifier() instanceof GeneratorImpl.SectionModifierImpl sectionModifier) {
applyGenerationData(chunk, sectionModifier);
}
}
}
// Register forks or apply locally
for (var fork : chunkUnit.forks()) {
var sections = ((GeneratorImpl.AreaModifierImpl) fork.modifier()).sections();
for (var section : sections) {
if (section.modifier() instanceof GeneratorImpl.SectionModifierImpl sectionModifier) {
if (sectionModifier.genSection().blocks().count() == 0)
continue;
final Point start = section.absoluteStart();
final Chunk forkChunk = start.chunkX() == chunkX && start.chunkZ() == chunkZ ? chunk : getChunkAt(start);
if (forkChunk != null) {
applyFork(forkChunk, sectionModifier);
// Update players
forkChunk.invalidate();
forkChunk.sendChunk();
} else {
final long index = CoordConversion.chunkIndex(start);
this.generationForks.compute(index, (i, sectionModifiers) -> {
if (sectionModifiers == null) sectionModifiers = new ArrayList<>();
sectionModifiers.add(sectionModifier);
return sectionModifiers;
});
}
}
}
}
// Apply awaiting forks
processFork(chunk);
} catch (Throwable e) {
MinecraftServer.getExceptionManager().handleException(e);
} finally {
// End generation
refreshLastBlockChangeTime();
}
return chunk;
}
private void processFork(Chunk chunk) {

View File

@ -1,23 +1,21 @@
package net.minestom.server.instance;
import net.minestom.server.utils.async.AsyncUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.concurrent.CompletableFuture;
final class NoopChunkLoaderImpl implements IChunkLoader {
static final NoopChunkLoaderImpl INSTANCE = new NoopChunkLoaderImpl();
private NoopChunkLoaderImpl(){}
@Override
public @NotNull CompletableFuture<@Nullable Chunk> loadChunk(@NotNull Instance instance, int chunkX, int chunkZ) {
return CompletableFuture.completedFuture(null);
private NoopChunkLoaderImpl() {
}
@Override
public @NotNull CompletableFuture<Void> saveChunk(@NotNull Chunk chunk) {
return AsyncUtils.VOID_FUTURE;
public @Nullable Chunk loadChunk(@NotNull Instance instance, int chunkX, int chunkZ) {
return null;
}
@Override
public void saveChunk(@NotNull Chunk chunk) {
// Empty
}
}

View File

@ -14,7 +14,6 @@ import net.minestom.server.instance.palette.Palettes;
import net.minestom.server.registry.DynamicRegistry;
import net.minestom.server.utils.MathUtils;
import net.minestom.server.utils.NamespaceID;
import net.minestom.server.utils.async.AsyncUtils;
import net.minestom.server.utils.validate.Check;
import net.minestom.server.world.biome.Biome;
import org.jetbrains.annotations.NotNull;
@ -30,7 +29,6 @@ import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
@ -82,26 +80,24 @@ public class AnvilLoader implements IChunkLoader {
}
@Override
public @NotNull CompletableFuture<@Nullable Chunk> loadChunk(@NotNull Instance instance, int chunkX, int chunkZ) {
public @Nullable Chunk loadChunk(@NotNull Instance instance, int chunkX, int chunkZ) {
if (!Files.exists(path)) {
// No world folder
return CompletableFuture.completedFuture(null);
return null;
}
try {
return loadMCA(instance, chunkX, chunkZ);
} catch (Exception e) {
MinecraftServer.getExceptionManager().handleException(e);
return CompletableFuture.completedFuture(null);
return null;
}
}
private @NotNull CompletableFuture<@Nullable Chunk> loadMCA(Instance instance, int chunkX, int chunkZ) throws IOException {
private @Nullable Chunk loadMCA(Instance instance, int chunkX, int chunkZ) throws IOException {
final RegionFile mcaFile = getMCAFile(chunkX, chunkZ);
if (mcaFile == null)
return CompletableFuture.completedFuture(null);
if (mcaFile == null) return null;
final CompoundBinaryTag chunkData = mcaFile.readChunkData(chunkX, chunkZ);
if (chunkData == null)
return CompletableFuture.completedFuture(null);
if (chunkData == null) return null;
// Load the chunk data (assuming it is fully generated)
final Chunk chunk = instance.getChunkSupplier().createChunk(instance, chunkX, chunkZ);
@ -113,7 +109,6 @@ public class AnvilLoader implements IChunkLoader {
// TODO: Parallelize block, block entities and biome loading
// Blocks + Biomes
loadSections(chunk, chunkData);
// Block entities
loadBlockEntities(chunk, chunkData);
@ -133,7 +128,7 @@ public class AnvilLoader implements IChunkLoader {
} finally {
perRegionLoadedChunksLock.unlock();
}
return CompletableFuture.completedFuture(chunk);
return chunk;
}
private @Nullable RegionFile getMCAFile(int chunkX, int chunkZ) {
@ -309,22 +304,21 @@ public class AnvilLoader implements IChunkLoader {
}
@Override
public @NotNull CompletableFuture<Void> saveInstance(@NotNull Instance instance) {
public void saveInstance(@NotNull Instance instance) {
final CompoundBinaryTag nbt = instance.tagHandler().asCompound();
if (nbt.size() == 0) {
// Instance has no data
return AsyncUtils.VOID_FUTURE;
return;
}
try (OutputStream os = Files.newOutputStream(levelPath, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)) {
BinaryTagIO.writer().writeNamed(Map.entry("", nbt), os, BinaryTagIO.Compression.GZIP);
} catch (IOException e) {
MinecraftServer.getExceptionManager().handleException(e);
}
return AsyncUtils.VOID_FUTURE;
}
@Override
public @NotNull CompletableFuture<Void> saveChunk(@NotNull Chunk chunk) {
public void saveChunk(@NotNull Chunk chunk) {
final int chunkX = chunk.getChunkX();
final int chunkZ = chunk.getChunkZ();
@ -349,7 +343,7 @@ public class AnvilLoader implements IChunkLoader {
} catch (IOException e) {
LOGGER.error("Failed to create region file for " + chunkX + ", " + chunkZ, e);
MinecraftServer.getExceptionManager().handleException(e);
return AsyncUtils.VOID_FUTURE;
return;
}
}
} finally {
@ -373,7 +367,6 @@ public class AnvilLoader implements IChunkLoader {
LOGGER.error("Failed to save chunk " + chunkX + ", " + chunkZ, e);
MinecraftServer.getExceptionManager().handleException(e);
}
return AsyncUtils.VOID_FUTURE;
}
private void saveSectionData(@NotNull Chunk chunk, @NotNull CompoundBinaryTag.Builder chunkData) {

View File

@ -139,7 +139,7 @@ public class LightParityIntegrationTest {
// Read from anvil
for (int x = 1; x < REGION_SIZE - 1; x++) {
for (int z = 1; z < REGION_SIZE - 1; z++) {
var chunk = anvilLoader.loadChunk(instance, x, z).join();
var chunk = anvilLoader.loadChunk(instance, x, z);
if (chunk == null) continue;
for (int sectionY = chunk.getMinSection(); sectionY < chunk.getMaxSection(); sectionY++) {