Improve ThreadLocal performance when calling from a minestom thread

Signed-off-by: TheMode <themode@outlook.fr>
This commit is contained in:
TheMode 2021-09-25 19:37:14 +02:00
parent be9b11f238
commit 9fe3f28497
12 changed files with 126 additions and 59 deletions

View File

@ -31,10 +31,10 @@ import net.minestom.server.scoreboard.TeamManager;
import net.minestom.server.storage.StorageLocation;
import net.minestom.server.storage.StorageManager;
import net.minestom.server.terminal.MinestomTerminal;
import net.minestom.server.thread.MinestomThreadPool;
import net.minestom.server.timer.SchedulerManager;
import net.minestom.server.utils.MathUtils;
import net.minestom.server.utils.PacketUtils;
import net.minestom.server.utils.thread.MinestomThread;
import net.minestom.server.utils.validate.Check;
import net.minestom.server.world.Difficulty;
import net.minestom.server.world.DimensionTypeManager;
@ -726,7 +726,7 @@ public final class MinecraftServer {
LOGGER.info("Shutting down all thread pools.");
benchmarkManager.disable();
MinestomTerminal.stop();
MinestomThread.shutdownAll();
MinestomThreadPool.shutdownAll();
LOGGER.info("Minestom server stopped successfully.");
}

View File

@ -6,6 +6,7 @@ import net.minestom.server.instance.Instance;
import net.minestom.server.instance.InstanceManager;
import net.minestom.server.monitoring.TickMonitor;
import net.minestom.server.network.ConnectionManager;
import net.minestom.server.thread.MinestomThread;
import net.minestom.server.thread.ThreadDispatcher;
import net.minestom.server.utils.PacketUtils;
import org.jetbrains.annotations.NotNull;
@ -154,7 +155,7 @@ public final class UpdateManager {
this.stopRequested = true;
}
private final class TickSchedulerThread extends Thread {
private final class TickSchedulerThread extends MinestomThread {
private final ThreadDispatcher threadDispatcher = UpdateManager.this.threadDispatcher;
TickSchedulerThread() {

View File

@ -1,8 +1,8 @@
package net.minestom.server.instance;
import net.minestom.server.MinecraftServer;
import net.minestom.server.thread.MinestomThreadPool;
import net.minestom.server.utils.async.AsyncUtils;
import net.minestom.server.utils.thread.MinestomThread;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -60,7 +60,7 @@ public interface IChunkLoader {
*/
default @NotNull CompletableFuture<Void> saveChunks(@NotNull Collection<Chunk> chunks) {
if (supportsParallelSaving()) {
ExecutorService parallelSavingThreadPool = new MinestomThread(MinecraftServer.THREAD_COUNT_PARALLEL_CHUNK_SAVING, MinecraftServer.THREAD_NAME_PARALLEL_CHUNK_SAVING, true);
ExecutorService parallelSavingThreadPool = new MinestomThreadPool(MinecraftServer.THREAD_COUNT_PARALLEL_CHUNK_SAVING, MinecraftServer.THREAD_NAME_PARALLEL_CHUNK_SAVING, true);
chunks.forEach(c -> parallelSavingThreadPool.execute(() -> saveChunk(c)));
try {
parallelSavingThreadPool.shutdown();

View File

@ -3,7 +3,7 @@ package net.minestom.server.instance.batch;
import net.minestom.server.MinecraftServer;
import net.minestom.server.instance.Instance;
import net.minestom.server.instance.block.BlockSetter;
import net.minestom.server.utils.thread.MinestomThread;
import net.minestom.server.thread.MinestomThreadPool;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -29,7 +29,7 @@ import java.util.concurrent.ExecutorService;
*/
public interface Batch<C> extends BlockSetter {
ExecutorService BLOCK_BATCH_POOL = new MinestomThread(
ExecutorService BLOCK_BATCH_POOL = new MinestomThreadPool(
MinecraftServer.THREAD_COUNT_BLOCK_BATCH,
MinecraftServer.THREAD_NAME_BLOCK_BATCH);

View File

@ -1,7 +1,7 @@
package net.minestom.server.map;
import net.minestom.server.MinecraftServer;
import net.minestom.server.utils.thread.MinestomThread;
import net.minestom.server.thread.MinestomThreadPool;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@ -187,7 +187,7 @@ public enum MapColors {
private static void fillRGBArray() {
rgbArray = new PreciseMapColor[0xFFFFFF + 1];
MinestomThread threads = new MinestomThread(Runtime.getRuntime().availableProcessors(), "RGBMapping", true);
MinestomThreadPool threads = new MinestomThreadPool(Runtime.getRuntime().availableProcessors(), "RGBMapping", true);
for (int rgb = 0; rgb <= 0xFFFFFF; rgb++) {
int finalRgb = rgb;
threads.execute(() -> rgbArray[finalRgb] = mapColor(finalRgb));

View File

@ -0,0 +1,46 @@
package net.minestom.server.thread;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@ApiStatus.Internal
@ApiStatus.NonExtendable
public class MinestomThread extends Thread {
public static final AtomicInteger LOCAL_COUNT = new AtomicInteger();
private Object[] locals = new Object[0];
public MinestomThread(@Nullable Runnable target, String name) {
super(target, name);
}
public MinestomThread(Runnable target) {
super(target);
}
public MinestomThread(@NotNull String name) {
super(name);
}
@ApiStatus.Internal
@ApiStatus.Experimental
public <T> T localCache(int index, Supplier<T> supplier) {
Object[] array = locals;
T value;
final int requiredLength = index + 1;
if (array.length < requiredLength) {
Object[] temp = new Object[requiredLength];
System.arraycopy(temp, 0, temp, 0, array.length);
array = temp;
this.locals = array;
}
if ((value = (T) array[index]) == null) {
value = supplier.get();
array[index] = value;
}
return value;
}
}

View File

@ -1,4 +1,6 @@
package net.minestom.server.utils.thread;
package net.minestom.server.thread;
import org.jetbrains.annotations.ApiStatus;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
@ -6,9 +8,10 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MinestomThread extends ThreadPoolExecutor {
@ApiStatus.Internal
public class MinestomThreadPool extends ThreadPoolExecutor {
private static final Set<MinestomThread> executors = new CopyOnWriteArraySet<>();
private static final Set<MinestomThreadPool> executors = new CopyOnWriteArraySet<>();
/**
* Creates a non-local thread pool executor
@ -16,7 +19,7 @@ public class MinestomThread extends ThreadPoolExecutor {
* @param nThreads the number of threads
* @param name the name of the thread pool
*/
public MinestomThread(int nThreads, String name) {
public MinestomThreadPool(int nThreads, String name) {
this(nThreads, name, false);
}
@ -25,15 +28,15 @@ public class MinestomThread extends ThreadPoolExecutor {
* @param name the name of the thread pool
* @param local set to true if this executor is only used inside a method and should *not* be kept in the internal list of executors
*/
public MinestomThread(int nThreads, String name, boolean local) {
public MinestomThreadPool(int nThreads, String name, boolean local) {
super(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), r -> {
Thread thread = new Thread(r);
Thread thread = new MinestomThread(r);
thread.setDaemon(true);
thread.setName(thread.getName().replace("Thread", name));
return thread;
});
if (!local) {
MinestomThread.executors.add(this);
MinestomThreadPool.executors.add(this);
}
}
@ -41,6 +44,6 @@ public class MinestomThread extends ThreadPoolExecutor {
* Shutdown all the thread pools
*/
public static void shutdownAll() {
executors.forEach(MinestomThread::shutdownNow);
executors.forEach(MinestomThreadPool::shutdownNow);
}
}

View File

@ -14,12 +14,12 @@ import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
/**
* Thread responsible for ticking {@link net.minestom.server.instance.Chunk chunks} and {@link net.minestom.server.entity.Entity entities}.
* Thread responsible for ticking {@link Chunk chunks} and {@link Entity entities}.
* <p>
* Created in {@link ThreadDispatcher}, and awaken every tick with a task to execute.
*/
@ApiStatus.Internal
public final class TickThread extends Thread {
public final class TickThread extends MinestomThread {
private final ReentrantLock lock = new ReentrantLock();
private final Phaser phaser;
private volatile boolean stop;

View File

@ -6,7 +6,7 @@ import it.unimi.dsi.fastutil.objects.ObjectCollection;
import net.minestom.server.MinecraftServer;
import net.minestom.server.extensions.Extension;
import net.minestom.server.extensions.IExtensionObserver;
import net.minestom.server.utils.thread.MinestomThread;
import net.minestom.server.thread.MinestomThreadPool;
import org.jetbrains.annotations.NotNull;
import java.util.Collection;
@ -65,7 +65,7 @@ public final class SchedulerManager implements IExtensionObserver {
this.counter = new AtomicInteger();
this.shutdownCounter = new AtomicInteger();
this.batchesPool = new MinestomThread(MinecraftServer.THREAD_COUNT_SCHEDULER, MinecraftServer.THREAD_NAME_SCHEDULER);
this.batchesPool = new MinestomThreadPool(MinecraftServer.THREAD_COUNT_SCHEDULER, MinecraftServer.THREAD_NAME_SCHEDULER);
this.timerExecutionService = Executors.newSingleThreadScheduledExecutor();
this.tasks = new Int2ObjectOpenHashMap<>();
this.shutdownTasks = new Int2ObjectOpenHashMap<>();

View File

@ -18,6 +18,7 @@ import net.minestom.server.network.socket.Server;
import net.minestom.server.utils.binary.BinaryBuffer;
import net.minestom.server.utils.binary.BinaryWriter;
import net.minestom.server.utils.binary.PooledBuffers;
import net.minestom.server.utils.cache.LocalCache;
import net.minestom.server.utils.callback.validator.PlayerValidator;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
@ -26,7 +27,6 @@ import org.jetbrains.annotations.Nullable;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.zip.Deflater;
@ -43,8 +43,8 @@ public final class PacketUtils {
private static final ThreadLocal<Deflater> LOCAL_DEFLATER = ThreadLocal.withInitial(Deflater::new);
/// Local buffers
private static final LocalCache PACKET_BUFFER = LocalCache.get("packet-buffer", Server.MAX_PACKET_SIZE);
private static final LocalCache LOCAL_BUFFER = LocalCache.get("local-buffer", Server.MAX_PACKET_SIZE);
private static final LocalCache<ByteBuffer> PACKET_BUFFER = LocalCache.ofBuffer(Server.MAX_PACKET_SIZE);
private static final LocalCache<ByteBuffer> LOCAL_BUFFER = LocalCache.ofBuffer(Server.MAX_PACKET_SIZE);
// Viewable packets
private static final Object VIEWABLE_PACKET_LOCK = new Object();
@ -56,7 +56,7 @@ public final class PacketUtils {
@ApiStatus.Internal
@ApiStatus.Experimental
public static ByteBuffer localBuffer() {
return LOCAL_BUFFER.get();
return LOCAL_BUFFER.get().clear();
}
/**
@ -215,7 +215,7 @@ public final class PacketUtils {
@ApiStatus.Internal
public static ByteBuffer createFramedPacket(@NotNull ServerPacket packet, boolean compression) {
ByteBuffer buffer = PACKET_BUFFER.get();
ByteBuffer buffer = PACKET_BUFFER.get().clear();
writeFramedPacket(buffer, packet, compression);
return buffer;
}
@ -233,31 +233,6 @@ public final class PacketUtils {
return new FramedPacket(packet.getId(), buffer, packet);
}
@ApiStatus.Internal
public static final class LocalCache {
private static final Map<String, LocalCache> CACHES = new ConcurrentHashMap<>();
private final String name;
private final ThreadLocal<ByteBuffer> cache;
private LocalCache(String name, int size) {
this.name = name;
this.cache = ThreadLocal.withInitial(() -> ByteBuffer.allocateDirect(size));
}
public static LocalCache get(String name, int size) {
return CACHES.computeIfAbsent(name, s -> new LocalCache(s, size));
}
public String name() {
return name;
}
public ByteBuffer get() {
return cache.get().clear();
}
}
private static final class ViewableStorage {
private final WeakReference<Viewable> viewable;
private final Map<PlayerConnection, List<IntIntPair>> entityIdMap = new HashMap<>();

View File

@ -0,0 +1,41 @@
package net.minestom.server.utils.cache;
import net.minestom.server.thread.MinestomThread;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import java.nio.ByteBuffer;
import java.util.function.Supplier;
/**
* Faster alternative to {@link ThreadLocal} when called from a {@link MinestomThread}.
* Idea took from Netty's FastThreadLocal.
* <p>
* Must not be abused, as the underlying array is not downsized.
* Mostly for internal use.
*
* @param <T> the type to cache
*/
@ApiStatus.Internal
public final class LocalCache<T> {
private final int tickIndex = MinestomThread.LOCAL_COUNT.getAndIncrement();
private final Supplier<T> supplier;
private final ThreadLocal<T> fallback;
private LocalCache(@NotNull Supplier<T> supplier) {
this.supplier = supplier;
this.fallback = ThreadLocal.withInitial(supplier);
}
public static LocalCache<ByteBuffer> ofBuffer(int size) {
return new LocalCache<>(() -> ByteBuffer.allocateDirect(size));
}
public T get() {
Thread current = Thread.currentThread();
if (current instanceof MinestomThread) {
return ((MinestomThread) current).localCache(tickIndex, supplier);
}
return fallback.get();
}
}

View File

@ -1,5 +1,6 @@
package net.minestom.server.utils.thread;
import net.minestom.server.thread.MinestomThreadPool;
import org.jetbrains.annotations.NotNull;
import java.util.LinkedList;
@ -13,7 +14,7 @@ import java.util.concurrent.TimeUnit;
*/
public class ThreadBindingExecutor extends AbstractExecutorService {
private MinestomThread[] threadExecutors;
private MinestomThreadPool[] threadExecutors;
/**
* Creates a non-local thread-binding executor
@ -31,15 +32,15 @@ public class ThreadBindingExecutor extends AbstractExecutorService {
* @param local set to true if this executor is only used inside a method and should *not* be kept in the internal list of executors
*/
public ThreadBindingExecutor(int nThreads, String name, boolean local) {
threadExecutors = new MinestomThread[nThreads];
threadExecutors = new MinestomThreadPool[nThreads];
for (int i = 0; i < nThreads; i++) {
threadExecutors[i] = new MinestomThread(1, name, local);
threadExecutors[i] = new MinestomThreadPool(1, name, local);
}
}
@Override
public void shutdown() {
for (MinestomThread t : threadExecutors) {
for (MinestomThreadPool t : threadExecutors) {
t.shutdown();
}
}
@ -48,7 +49,7 @@ public class ThreadBindingExecutor extends AbstractExecutorService {
@Override
public List<Runnable> shutdownNow() {
List<Runnable> allTasks = new LinkedList<>();
for (MinestomThread t : threadExecutors) {
for (MinestomThreadPool t : threadExecutors) {
allTasks.addAll(t.shutdownNow());
}
return allTasks;
@ -56,7 +57,7 @@ public class ThreadBindingExecutor extends AbstractExecutorService {
@Override
public boolean isShutdown() {
for (MinestomThread t : threadExecutors) {
for (MinestomThreadPool t : threadExecutors) {
if(!t.isShutdown())
return false;
}
@ -65,7 +66,7 @@ public class ThreadBindingExecutor extends AbstractExecutorService {
@Override
public boolean isTerminated() {
for (MinestomThread t : threadExecutors) {
for (MinestomThreadPool t : threadExecutors) {
if(!t.isShutdown())
return false;
}
@ -75,7 +76,7 @@ public class ThreadBindingExecutor extends AbstractExecutorService {
@Override
public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException {
boolean terminated = true;
for (MinestomThread t : threadExecutors) {
for (MinestomThreadPool t : threadExecutors) {
terminated &= t.awaitTermination(timeout, unit);
}
return terminated;