mirror of
https://github.com/Minestom/Minestom.git
synced 2025-01-01 14:07:43 +01:00
Replace thread locals to pool (#1079)
This commit is contained in:
parent
1b12905500
commit
8d7175af73
@ -19,9 +19,9 @@ import net.minestom.server.snapshot.SnapshotImpl;
|
||||
import net.minestom.server.snapshot.SnapshotUpdater;
|
||||
import net.minestom.server.utils.ArrayUtils;
|
||||
import net.minestom.server.utils.MathUtils;
|
||||
import net.minestom.server.utils.ObjectPool;
|
||||
import net.minestom.server.utils.Utils;
|
||||
import net.minestom.server.utils.binary.BinaryWriter;
|
||||
import net.minestom.server.utils.binary.PooledBuffers;
|
||||
import net.minestom.server.utils.chunk.ChunkUtils;
|
||||
import net.minestom.server.world.biomes.Biome;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
@ -204,10 +204,13 @@ public class DynamicChunk extends Chunk {
|
||||
"WORLD_SURFACE", NBT.LongArray(Utils.encodeBlocks(worldSurface, bitsForHeight))));
|
||||
}
|
||||
// Data
|
||||
final BinaryWriter writer = new BinaryWriter(PooledBuffers.tempBuffer());
|
||||
final byte[] data = ObjectPool.PACKET_POOL.use(buffer -> {
|
||||
final BinaryWriter writer = new BinaryWriter(buffer);
|
||||
for (Section section : sections) writer.write(section);
|
||||
return writer.toByteArray();
|
||||
});
|
||||
return new ChunkDataPacket(chunkX, chunkZ,
|
||||
new ChunkData(heightmapsNBT, writer.toByteArray(), entries),
|
||||
new ChunkData(heightmapsNBT, data, entries),
|
||||
createLightData());
|
||||
}
|
||||
|
||||
|
@ -38,9 +38,9 @@ public final class CachedPacket implements SendablePacket {
|
||||
return cache != null ? cache.packet() : packetSupplier.get();
|
||||
}
|
||||
|
||||
public @NotNull ByteBuffer body() {
|
||||
public @Nullable ByteBuffer body() {
|
||||
FramedPacket cache = updatedCache();
|
||||
return cache != null ? cache.body() : PacketUtils.createFramedPacket(packetSupplier.get());
|
||||
return cache != null ? cache.body() : null;
|
||||
}
|
||||
|
||||
private @Nullable FramedPacket updatedCache() {
|
||||
|
@ -14,9 +14,9 @@ import net.minestom.server.network.PacketProcessor;
|
||||
import net.minestom.server.network.packet.server.*;
|
||||
import net.minestom.server.network.packet.server.login.SetCompressionPacket;
|
||||
import net.minestom.server.network.socket.Worker;
|
||||
import net.minestom.server.utils.ObjectPool;
|
||||
import net.minestom.server.utils.PacketUtils;
|
||||
import net.minestom.server.utils.binary.BinaryBuffer;
|
||||
import net.minestom.server.utils.binary.PooledBuffers;
|
||||
import net.minestom.server.utils.validate.Check;
|
||||
import org.jctools.queues.MessagePassingQueue;
|
||||
import org.jetbrains.annotations.ApiStatus;
|
||||
@ -46,6 +46,7 @@ import java.util.zip.DataFormatException;
|
||||
@ApiStatus.Internal
|
||||
public class PlayerSocketConnection extends PlayerConnection {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(PlayerSocketConnection.class);
|
||||
private static final ObjectPool<BinaryBuffer> POOL = ObjectPool.BUFFER_POOL;
|
||||
|
||||
private final Worker worker;
|
||||
private final MessagePassingQueue<Runnable> workerQueue;
|
||||
@ -73,7 +74,7 @@ public class PlayerSocketConnection extends PlayerConnection {
|
||||
private PlayerSkin bungeeSkin;
|
||||
|
||||
private final List<BinaryBuffer> waitingBuffers = new ArrayList<>();
|
||||
private final AtomicReference<BinaryBuffer> tickBuffer = new AtomicReference<>(PooledBuffers.get());
|
||||
private final AtomicReference<BinaryBuffer> tickBuffer = new AtomicReference<>(POOL.get());
|
||||
private BinaryBuffer cacheBuffer;
|
||||
|
||||
private final ListenerHandle<PlayerPacketOutEvent> outgoing = EventDispatcher.getHandle(PlayerPacketOutEvent.class);
|
||||
@ -84,8 +85,8 @@ public class PlayerSocketConnection extends PlayerConnection {
|
||||
this.workerQueue = worker.queue();
|
||||
this.channel = channel;
|
||||
this.remoteAddress = remoteAddress;
|
||||
PooledBuffers.registerBuffer(this, tickBuffer);
|
||||
PooledBuffers.registerBuffers(this, waitingBuffers);
|
||||
POOL.register(this, tickBuffer);
|
||||
POOL.register(this, waitingBuffers);
|
||||
}
|
||||
|
||||
public void processPackets(BinaryBuffer readBuffer, PacketProcessor packetProcessor) {
|
||||
@ -351,10 +352,11 @@ public class PlayerSocketConnection extends PlayerConnection {
|
||||
writeServerPacketSync(serverPacket, compressed);
|
||||
} else if (packet instanceof FramedPacket framedPacket) {
|
||||
var buffer = framedPacket.body();
|
||||
writeBufferSync0(buffer, 0, buffer.limit());
|
||||
writeBufferSync(buffer, 0, buffer.limit());
|
||||
} else if (packet instanceof CachedPacket cachedPacket) {
|
||||
var buffer = cachedPacket.body();
|
||||
writeBufferSync0(buffer, buffer.position(), buffer.remaining());
|
||||
if (buffer != null) writeBufferSync(buffer, buffer.position(), buffer.remaining());
|
||||
else writeServerPacketSync(cachedPacket.packet(), compressed);
|
||||
} else if (packet instanceof LazyPacket lazyPacket) {
|
||||
writeServerPacketSync(lazyPacket.packet(), compressed);
|
||||
} else {
|
||||
@ -370,32 +372,31 @@ public class PlayerSocketConnection extends PlayerConnection {
|
||||
GlobalTranslator.render(component, Objects.requireNonNullElseGet(player.getLocale(), MinestomAdventure::getDefaultLocale)));
|
||||
}
|
||||
}
|
||||
var buffer = PacketUtils.createFramedPacket(serverPacket, compressed);
|
||||
writeBufferSync0(buffer, 0, buffer.limit());
|
||||
try (var hold = ObjectPool.PACKET_POOL.hold()) {
|
||||
var buffer = PacketUtils.createFramedPacket(hold.get(), serverPacket, compressed);
|
||||
writeBufferSync(buffer, 0, buffer.limit());
|
||||
}
|
||||
}
|
||||
|
||||
private void writeBufferSync(@NotNull ByteBuffer buffer, int index, int length) {
|
||||
// TODO read buffer for outgoing event
|
||||
// Encrypt data
|
||||
final EncryptionContext encryptionContext = this.encryptionContext;
|
||||
if (encryptionContext != null) { // Encryption support
|
||||
try (var hold = ObjectPool.PACKET_POOL.hold()) {
|
||||
ByteBuffer output = hold.get();
|
||||
try {
|
||||
length = encryptionContext.encrypt().update(buffer.slice(index, length), output);
|
||||
writeBufferSync0(output, 0, length);
|
||||
} catch (ShortBufferException e) {
|
||||
MinecraftServer.getExceptionManager().handleException(e);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
writeBufferSync0(buffer, index, length);
|
||||
}
|
||||
|
||||
private void writeBufferSync0(@NotNull ByteBuffer buffer, int index, int length) {
|
||||
// Encrypt data
|
||||
{
|
||||
final EncryptionContext encryptionContext = this.encryptionContext;
|
||||
if (encryptionContext != null) { // Encryption support
|
||||
ByteBuffer output = PooledBuffers.tempBuffer();
|
||||
try {
|
||||
encryptionContext.encrypt().update(buffer.slice(index, length), output);
|
||||
buffer = output.flip();
|
||||
index = 0;
|
||||
} catch (ShortBufferException e) {
|
||||
MinecraftServer.getExceptionManager().handleException(e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Write data
|
||||
BinaryBuffer localBuffer = tickBuffer.getPlain();
|
||||
final int capacity = localBuffer.capacity();
|
||||
if (length <= capacity) {
|
||||
@ -425,13 +426,13 @@ public class PlayerSocketConnection extends PlayerConnection {
|
||||
BinaryBuffer waitingBuffer = iterator.next();
|
||||
if (!waitingBuffer.writeChannel(channel)) break;
|
||||
iterator.remove();
|
||||
PooledBuffers.add(waitingBuffer);
|
||||
POOL.add(waitingBuffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private BinaryBuffer updateLocalBuffer() {
|
||||
BinaryBuffer newBuffer = PooledBuffers.get();
|
||||
BinaryBuffer newBuffer = POOL.get();
|
||||
this.waitingBuffers.add(tickBuffer.getPlain());
|
||||
this.tickBuffer.setPlain(newBuffer);
|
||||
return newBuffer;
|
||||
|
@ -3,8 +3,8 @@ package net.minestom.server.network.socket;
|
||||
import net.minestom.server.MinecraftServer;
|
||||
import net.minestom.server.network.player.PlayerSocketConnection;
|
||||
import net.minestom.server.thread.MinestomThread;
|
||||
import net.minestom.server.utils.ObjectPool;
|
||||
import net.minestom.server.utils.binary.BinaryBuffer;
|
||||
import net.minestom.server.utils.binary.PooledBuffers;
|
||||
import org.jctools.queues.MessagePassingQueue;
|
||||
import org.jctools.queues.MpscUnboundedXaddArrayQueue;
|
||||
import org.jetbrains.annotations.ApiStatus;
|
||||
@ -69,12 +69,14 @@ public final class Worker extends MinestomThread {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
BinaryBuffer readBuffer = BinaryBuffer.wrap(PooledBuffers.packetBuffer());
|
||||
try (var holder = ObjectPool.PACKET_POOL.hold()) {
|
||||
BinaryBuffer readBuffer = BinaryBuffer.wrap(holder.get());
|
||||
// Consume last incomplete packet
|
||||
connection.consumeCache(readBuffer);
|
||||
// Read & process
|
||||
readBuffer.readChannel(channel);
|
||||
connection.processPackets(readBuffer, server.packetProcessor());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// TODO print exception? (should ignore disconnection)
|
||||
connection.disconnect();
|
||||
|
@ -3,35 +3,10 @@ package net.minestom.server.thread;
|
||||
import org.jetbrains.annotations.ApiStatus;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
@ApiStatus.Internal
|
||||
@ApiStatus.NonExtendable
|
||||
public class MinestomThread extends Thread {
|
||||
public static final AtomicInteger LOCAL_COUNT = new AtomicInteger();
|
||||
private Object[] locals = new Object[0];
|
||||
|
||||
public MinestomThread(@NotNull String name) {
|
||||
super(name);
|
||||
}
|
||||
|
||||
@ApiStatus.Internal
|
||||
@ApiStatus.Experimental
|
||||
public <T> T localCache(int index, Supplier<T> supplier) {
|
||||
Object[] array = locals;
|
||||
T value;
|
||||
final int requiredLength = index + 1;
|
||||
if (array.length < requiredLength) {
|
||||
Object[] temp = new Object[requiredLength];
|
||||
System.arraycopy(array, 0, temp, 0, array.length);
|
||||
array = temp;
|
||||
this.locals = array;
|
||||
}
|
||||
if ((value = (T) array[index]) == null) {
|
||||
value = supplier.get();
|
||||
array[index] = value;
|
||||
}
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
135
src/main/java/net/minestom/server/utils/ObjectPool.java
Normal file
135
src/main/java/net/minestom/server/utils/ObjectPool.java
Normal file
@ -0,0 +1,135 @@
|
||||
package net.minestom.server.utils;
|
||||
|
||||
import net.minestom.server.network.socket.Server;
|
||||
import net.minestom.server.utils.binary.BinaryBuffer;
|
||||
import org.jctools.queues.MessagePassingQueue;
|
||||
import org.jctools.queues.MpmcUnboundedXaddArrayQueue;
|
||||
import org.jetbrains.annotations.ApiStatus;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.lang.ref.Cleaner;
|
||||
import java.lang.ref.SoftReference;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.function.UnaryOperator;
|
||||
|
||||
@ApiStatus.Internal
|
||||
@ApiStatus.Experimental
|
||||
public final class ObjectPool<T> {
|
||||
private static final int QUEUE_SIZE = 32_768;
|
||||
private static final int BUFFER_SIZE = Integer.getInteger("minestom.pooled-buffer-size", 262_143);
|
||||
|
||||
public static final ObjectPool<BinaryBuffer> BUFFER_POOL = new ObjectPool<>(() -> BinaryBuffer.ofSize(BUFFER_SIZE), BinaryBuffer::clear);
|
||||
public static final ObjectPool<ByteBuffer> PACKET_POOL = new ObjectPool<>(() -> ByteBuffer.allocateDirect(Server.MAX_PACKET_SIZE), ByteBuffer::clear);
|
||||
|
||||
private final Cleaner cleaner = Cleaner.create();
|
||||
private final MessagePassingQueue<SoftReference<T>> pool = new MpmcUnboundedXaddArrayQueue<>(QUEUE_SIZE);
|
||||
private final Supplier<T> supplier;
|
||||
private final UnaryOperator<T> sanitizer;
|
||||
|
||||
ObjectPool(Supplier<T> supplier, UnaryOperator<T> sanitizer) {
|
||||
this.supplier = supplier;
|
||||
this.sanitizer = sanitizer;
|
||||
}
|
||||
|
||||
public @NotNull T get() {
|
||||
T result;
|
||||
SoftReference<T> ref;
|
||||
while ((ref = pool.poll()) != null) {
|
||||
if ((result = ref.get()) != null) return result;
|
||||
}
|
||||
return supplier.get();
|
||||
}
|
||||
|
||||
public @NotNull T getAndRegister(@NotNull Object ref) {
|
||||
T result = get();
|
||||
register(ref, result);
|
||||
return result;
|
||||
}
|
||||
|
||||
public void add(@NotNull T object) {
|
||||
object = sanitizer.apply(object);
|
||||
this.pool.offer(new SoftReference<>(object));
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
this.pool.clear();
|
||||
}
|
||||
|
||||
public int count() {
|
||||
return pool.size();
|
||||
}
|
||||
|
||||
public void register(@NotNull Object ref, @NotNull AtomicReference<T> objectRef) {
|
||||
this.cleaner.register(ref, new BufferRefCleaner<>(this, objectRef));
|
||||
}
|
||||
|
||||
public void register(@NotNull Object ref, @NotNull T object) {
|
||||
this.cleaner.register(ref, new BufferCleaner<>(this, object));
|
||||
}
|
||||
|
||||
public void register(@NotNull Object ref, @NotNull Collection<T> objects) {
|
||||
this.cleaner.register(ref, new BuffersCleaner<>(this, objects));
|
||||
}
|
||||
|
||||
public @NotNull Holder hold() {
|
||||
return new Holder(get());
|
||||
}
|
||||
|
||||
public <R> R use(@NotNull Function<@NotNull T, R> function) {
|
||||
T object = get();
|
||||
try {
|
||||
return function.apply(object);
|
||||
} finally {
|
||||
add(object);
|
||||
}
|
||||
}
|
||||
|
||||
private record BufferRefCleaner<T>(ObjectPool<T> pool, AtomicReference<T> objectRef) implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
this.pool.add(objectRef.get());
|
||||
}
|
||||
}
|
||||
|
||||
private record BufferCleaner<T>(ObjectPool<T> pool, T object) implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
this.pool.add(object);
|
||||
}
|
||||
}
|
||||
|
||||
private record BuffersCleaner<T>(ObjectPool<T> pool, Collection<T> objects) implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
for (T buffer : objects) {
|
||||
this.pool.add(buffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public final class Holder implements AutoCloseable {
|
||||
private final T object;
|
||||
private boolean closed;
|
||||
|
||||
Holder(T object) {
|
||||
this.object = object;
|
||||
}
|
||||
|
||||
public @NotNull T get() {
|
||||
if (closed) throw new IllegalStateException("Holder is closed");
|
||||
return object;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (!closed) {
|
||||
closed = true;
|
||||
add(object);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -21,9 +21,7 @@ import net.minestom.server.network.player.PlayerConnection;
|
||||
import net.minestom.server.network.player.PlayerSocketConnection;
|
||||
import net.minestom.server.utils.binary.BinaryBuffer;
|
||||
import net.minestom.server.utils.binary.BinaryWriter;
|
||||
import net.minestom.server.utils.binary.PooledBuffers;
|
||||
import net.minestom.server.utils.binary.Writeable;
|
||||
import net.minestom.server.utils.cache.LocalCache;
|
||||
import org.jetbrains.annotations.ApiStatus;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
@ -47,7 +45,7 @@ import java.util.zip.Inflater;
|
||||
* Be sure to check the implementation code.
|
||||
*/
|
||||
public final class PacketUtils {
|
||||
private static final LocalCache<Deflater> LOCAL_DEFLATER = LocalCache.of(Deflater::new);
|
||||
private static final ThreadLocal<Deflater> LOCAL_DEFLATER = ThreadLocal.withInitial(Deflater::new);
|
||||
|
||||
public static final boolean GROUPED_PACKET = PropertyUtils.getBoolean("minestom.grouped-packet", true);
|
||||
public static final boolean CACHED_PACKET = PropertyUtils.getBoolean("minestom.cached-packet", true);
|
||||
@ -157,6 +155,7 @@ public final class PacketUtils {
|
||||
public static @Nullable BinaryBuffer readPackets(@NotNull BinaryBuffer readBuffer, boolean compressed,
|
||||
BiConsumer<Integer, ByteBuffer> payloadConsumer) throws DataFormatException {
|
||||
BinaryBuffer remaining = null;
|
||||
ByteBuffer pool = ObjectPool.PACKET_POOL.get();
|
||||
while (readBuffer.readableBytes() > 0) {
|
||||
final var beginMark = readBuffer.mark();
|
||||
try {
|
||||
@ -178,7 +177,7 @@ public final class PacketUtils {
|
||||
decompressedSize = payloadLength;
|
||||
} else {
|
||||
// Decompress to content buffer
|
||||
content = BinaryBuffer.wrap(PooledBuffers.tempBuffer());
|
||||
content = BinaryBuffer.wrap(pool);
|
||||
decompressedSize = dataLength;
|
||||
Inflater inflater = new Inflater(); // TODO: Pool?
|
||||
inflater.setInput(readBuffer.asByteBuffer(readBuffer.readerOffset(), payloadLength));
|
||||
@ -202,6 +201,7 @@ public final class PacketUtils {
|
||||
break;
|
||||
}
|
||||
}
|
||||
ObjectPool.PACKET_POOL.add(pool);
|
||||
return remaining;
|
||||
}
|
||||
|
||||
@ -237,49 +237,49 @@ public final class PacketUtils {
|
||||
final boolean compressed = packetSize >= compressionThreshold;
|
||||
if (compressed) {
|
||||
// Packet large enough, compress it
|
||||
final ByteBuffer input = PooledBuffers.tempBuffer().put(0, buffer, contentStart, packetSize);
|
||||
try (var hold = ObjectPool.PACKET_POOL.hold()) {
|
||||
final ByteBuffer input = hold.get().put(0, buffer, contentStart, packetSize);
|
||||
Deflater deflater = LOCAL_DEFLATER.get();
|
||||
deflater.setInput(input.limit(packetSize));
|
||||
deflater.finish();
|
||||
deflater.deflate(buffer.position(contentStart));
|
||||
deflater.reset();
|
||||
}
|
||||
}
|
||||
// Packet header (Packet + Data Length)
|
||||
Utils.writeVarIntHeader(buffer, compressedIndex, buffer.position() - uncompressedIndex);
|
||||
Utils.writeVarIntHeader(buffer, uncompressedIndex, compressed ? packetSize : 0);
|
||||
}
|
||||
|
||||
@ApiStatus.Internal
|
||||
public static ByteBuffer createFramedPacket(@NotNull ServerPacket packet, boolean compression) {
|
||||
ByteBuffer buffer = PooledBuffers.packetBuffer();
|
||||
public static ByteBuffer createFramedPacket(@NotNull ByteBuffer buffer, @NotNull ServerPacket packet, boolean compression) {
|
||||
writeFramedPacket(buffer, packet, compression);
|
||||
return buffer.flip();
|
||||
}
|
||||
|
||||
@ApiStatus.Internal
|
||||
public static ByteBuffer createFramedPacket(@NotNull ServerPacket packet) {
|
||||
return createFramedPacket(packet, MinecraftServer.getCompressionThreshold() > 0);
|
||||
public static ByteBuffer createFramedPacket(@NotNull ByteBuffer buffer, @NotNull ServerPacket packet) {
|
||||
return createFramedPacket(buffer, packet, MinecraftServer.getCompressionThreshold() > 0);
|
||||
}
|
||||
|
||||
@ApiStatus.Internal
|
||||
public static FramedPacket allocateTrimmedPacket(@NotNull ServerPacket packet) {
|
||||
final ByteBuffer temp = PacketUtils.createFramedPacket(packet);
|
||||
try (var hold = ObjectPool.PACKET_POOL.hold()) {
|
||||
final ByteBuffer temp = PacketUtils.createFramedPacket(hold.get(), packet);
|
||||
final int size = temp.remaining();
|
||||
final ByteBuffer buffer = ByteBuffer.allocateDirect(size).put(0, temp, 0, size);
|
||||
return new FramedPacket(packet, buffer);
|
||||
}
|
||||
}
|
||||
|
||||
private static final class ViewableStorage {
|
||||
// Player id -> list of offsets to ignore (32:32 bits)
|
||||
private final Int2ObjectMap<LongArrayList> entityIdMap = new Int2ObjectOpenHashMap<>();
|
||||
private final BinaryBuffer buffer = PooledBuffers.get();
|
||||
|
||||
{
|
||||
PooledBuffers.registerBuffer(this, buffer);
|
||||
}
|
||||
private final BinaryBuffer buffer = ObjectPool.BUFFER_POOL.getAndRegister(this);
|
||||
|
||||
private synchronized void append(Viewable viewable, ServerPacket serverPacket, Player player) {
|
||||
final ByteBuffer framedPacket = createFramedPacket(serverPacket);
|
||||
try (var hold = ObjectPool.PACKET_POOL.hold()) {
|
||||
final ByteBuffer framedPacket = createFramedPacket(hold.get(), serverPacket);
|
||||
final int packetSize = framedPacket.limit();
|
||||
if (packetSize >= buffer.capacity()) {
|
||||
process(viewable);
|
||||
@ -300,6 +300,7 @@ public final class PacketUtils {
|
||||
list.add(offsets);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void process(Viewable viewable) {
|
||||
if (buffer.writerOffset() == 0) return;
|
||||
|
@ -1,99 +0,0 @@
|
||||
package net.minestom.server.utils.binary;
|
||||
|
||||
import net.minestom.server.network.socket.Server;
|
||||
import net.minestom.server.utils.cache.LocalCache;
|
||||
import org.jctools.queues.MessagePassingQueue;
|
||||
import org.jctools.queues.MpmcUnboundedXaddArrayQueue;
|
||||
import org.jetbrains.annotations.ApiStatus;
|
||||
|
||||
import java.lang.ref.Cleaner;
|
||||
import java.lang.ref.SoftReference;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@ApiStatus.Internal
|
||||
@ApiStatus.Experimental
|
||||
public final class PooledBuffers {
|
||||
private final static MessagePassingQueue<SoftReference<BinaryBuffer>> POOLED_BUFFERS = new MpmcUnboundedXaddArrayQueue<>(1024);
|
||||
private final static int BUFFER_SIZE = Integer.getInteger("minestom.pooled-buffer-size", 262_143);
|
||||
private final static Cleaner CLEANER = Cleaner.create();
|
||||
|
||||
private static final LocalCache<ByteBuffer> PACKET_BUFFER = LocalCache.ofBuffer(Server.MAX_PACKET_SIZE);
|
||||
private static final LocalCache<ByteBuffer> LOCAL_BUFFER = LocalCache.ofBuffer(Server.MAX_PACKET_SIZE);
|
||||
|
||||
/**
|
||||
* Thread local buffer containing raw packet stream.
|
||||
*/
|
||||
public static ByteBuffer packetBuffer() {
|
||||
return PACKET_BUFFER.get().clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Thread local buffer targeted at very small scope operations (encryption, compression, ...).
|
||||
*/
|
||||
public static ByteBuffer tempBuffer() {
|
||||
return LOCAL_BUFFER.get().clear();
|
||||
}
|
||||
|
||||
public static BinaryBuffer get() {
|
||||
BinaryBuffer buffer;
|
||||
SoftReference<BinaryBuffer> ref;
|
||||
while ((ref = POOLED_BUFFERS.relaxedPoll()) != null) {
|
||||
if ((buffer = ref.get()) != null) return buffer;
|
||||
}
|
||||
return BinaryBuffer.ofSize(BUFFER_SIZE);
|
||||
}
|
||||
|
||||
public static void add(BinaryBuffer buffer) {
|
||||
POOLED_BUFFERS.relaxedOffer(new SoftReference<>(buffer.clear()));
|
||||
}
|
||||
|
||||
public static void clear() {
|
||||
POOLED_BUFFERS.clear();
|
||||
}
|
||||
|
||||
public static int count() {
|
||||
return POOLED_BUFFERS.size();
|
||||
}
|
||||
|
||||
public static int bufferSize() {
|
||||
return BUFFER_SIZE;
|
||||
}
|
||||
|
||||
public static void registerBuffer(Object ref, AtomicReference<BinaryBuffer> buffer) {
|
||||
CLEANER.register(ref, new BufferRefCleaner(buffer));
|
||||
}
|
||||
|
||||
public static void registerBuffer(Object ref, BinaryBuffer buffer) {
|
||||
CLEANER.register(ref, new BufferCleaner(buffer));
|
||||
}
|
||||
|
||||
public static void registerBuffers(Object ref, Collection<BinaryBuffer> buffers) {
|
||||
CLEANER.register(ref, new BuffersCleaner(buffers));
|
||||
}
|
||||
|
||||
private record BufferRefCleaner(AtomicReference<BinaryBuffer> bufferRef) implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
add(bufferRef.get());
|
||||
}
|
||||
}
|
||||
|
||||
private record BufferCleaner(BinaryBuffer buffer) implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
add(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
private record BuffersCleaner(Collection<BinaryBuffer> buffers) implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
if (buffers.isEmpty()) return;
|
||||
for (BinaryBuffer buffer : buffers) {
|
||||
add(buffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,45 +0,0 @@
|
||||
package net.minestom.server.utils.cache;
|
||||
|
||||
import net.minestom.server.thread.MinestomThread;
|
||||
import org.jetbrains.annotations.ApiStatus;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* Faster alternative to {@link ThreadLocal} when called from a {@link MinestomThread}.
|
||||
* Idea took from Netty's FastThreadLocal.
|
||||
* <p>
|
||||
* Must not be abused, as the underlying array is not downsized.
|
||||
* Mostly for internal use.
|
||||
*
|
||||
* @param <T> the type to cache
|
||||
*/
|
||||
@ApiStatus.Internal
|
||||
public final class LocalCache<T> {
|
||||
private final int tickIndex = MinestomThread.LOCAL_COUNT.getAndIncrement();
|
||||
private final Supplier<T> supplier;
|
||||
private final ThreadLocal<T> fallback;
|
||||
|
||||
private LocalCache(@NotNull Supplier<T> supplier) {
|
||||
this.supplier = supplier;
|
||||
this.fallback = ThreadLocal.withInitial(supplier);
|
||||
}
|
||||
|
||||
public static <T> LocalCache<T> of(@NotNull Supplier<T> supplier) {
|
||||
return new LocalCache<>(supplier);
|
||||
}
|
||||
|
||||
public static LocalCache<ByteBuffer> ofBuffer(int size) {
|
||||
return of(() -> ByteBuffer.allocateDirect(size));
|
||||
}
|
||||
|
||||
public T get() {
|
||||
Thread current = Thread.currentThread();
|
||||
if (current instanceof MinestomThread minestomThread) {
|
||||
return minestomThread.localCache(tickIndex, supplier);
|
||||
}
|
||||
return fallback.get();
|
||||
}
|
||||
}
|
@ -1,33 +0,0 @@
|
||||
package net.minestom.server.network;
|
||||
|
||||
import net.minestom.server.utils.binary.BinaryBuffer;
|
||||
import net.minestom.server.utils.binary.PooledBuffers;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
public class PooledBuffersTest {
|
||||
|
||||
@Test
|
||||
public void pool() {
|
||||
Set<BinaryBuffer> pooledBuffers = new HashSet<>();
|
||||
PooledBuffers.clear();
|
||||
|
||||
assertEquals(0, PooledBuffers.count());
|
||||
var buffer = PooledBuffers.get();
|
||||
assertEquals(PooledBuffers.bufferSize(), buffer.capacity());
|
||||
pooledBuffers.add(buffer);
|
||||
|
||||
buffer = PooledBuffers.get();
|
||||
assertTrue(pooledBuffers.add(buffer));
|
||||
|
||||
PooledBuffers.add(buffer);
|
||||
assertEquals(1, PooledBuffers.count());
|
||||
buffer = PooledBuffers.get();
|
||||
assertEquals(0, PooledBuffers.count());
|
||||
assertFalse(pooledBuffers.add(buffer));
|
||||
}
|
||||
}
|
@ -2,11 +2,11 @@ package net.minestom.server.network;
|
||||
|
||||
import it.unimi.dsi.fastutil.Pair;
|
||||
import net.minestom.server.network.packet.client.play.ClientPluginMessagePacket;
|
||||
import net.minestom.server.utils.ObjectPool;
|
||||
import net.minestom.server.utils.PacketUtils;
|
||||
import net.minestom.server.utils.Utils;
|
||||
import net.minestom.server.utils.binary.BinaryBuffer;
|
||||
import net.minestom.server.utils.binary.BinaryReader;
|
||||
import net.minestom.server.utils.binary.PooledBuffers;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
|
||||
@ -24,7 +24,7 @@ public class SocketReadTest {
|
||||
public void complete(boolean compressed) throws DataFormatException {
|
||||
var packet = new ClientPluginMessagePacket("channel", new byte[2000]);
|
||||
|
||||
var buffer = PooledBuffers.packetBuffer();
|
||||
var buffer = ObjectPool.PACKET_POOL.get();
|
||||
PacketUtils.writeFramedPacket(buffer, 0x0A, packet, compressed ? 256 : 0);
|
||||
|
||||
var wrapper = BinaryBuffer.wrap(buffer);
|
||||
@ -48,7 +48,7 @@ public class SocketReadTest {
|
||||
public void completeTwo(boolean compressed) throws DataFormatException {
|
||||
var packet = new ClientPluginMessagePacket("channel", new byte[2000]);
|
||||
|
||||
var buffer = PooledBuffers.packetBuffer();
|
||||
var buffer = ObjectPool.PACKET_POOL.get();
|
||||
PacketUtils.writeFramedPacket(buffer, 0x0A, packet, compressed ? 256 : 0);
|
||||
PacketUtils.writeFramedPacket(buffer, 0x0A, packet, compressed ? 256 : 0);
|
||||
|
||||
@ -76,7 +76,7 @@ public class SocketReadTest {
|
||||
|
||||
var packet = new ClientPluginMessagePacket("channel", new byte[2000]);
|
||||
|
||||
var buffer = PooledBuffers.packetBuffer();
|
||||
var buffer = ObjectPool.PACKET_POOL.get();
|
||||
PacketUtils.writeFramedPacket(buffer, 0x0A, packet, compressed ? 256 : 0);
|
||||
Utils.writeVarInt(buffer, 200); // incomplete 200 bytes packet
|
||||
|
||||
@ -104,7 +104,7 @@ public class SocketReadTest {
|
||||
|
||||
var packet = new ClientPluginMessagePacket("channel", new byte[2000]);
|
||||
|
||||
var buffer = PooledBuffers.packetBuffer();
|
||||
var buffer = ObjectPool.PACKET_POOL.get();
|
||||
PacketUtils.writeFramedPacket(buffer, 0x0A, packet, compressed ? 256 : 0);
|
||||
buffer.put((byte) -85); // incomplete var-int length
|
||||
|
||||
|
@ -1,40 +0,0 @@
|
||||
package net.minestom.server.thread;
|
||||
|
||||
import net.minestom.server.thread.MinestomThread;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class LocalThreadCacheTest {
|
||||
|
||||
@Test
|
||||
public void testLocalThreadCache() throws InterruptedException {
|
||||
AtomicBoolean result = new AtomicBoolean(false);
|
||||
var thread = new MinestomThread("name") {
|
||||
@Override
|
||||
public void run() {
|
||||
final int dummy = -1;
|
||||
|
||||
assertEquals(7, localCache(1, () -> 7));
|
||||
assertEquals(7, localCache(1, () -> dummy));
|
||||
|
||||
assertEquals(5, localCache(0, () -> 5));
|
||||
assertEquals(7, localCache(1, () -> dummy));
|
||||
|
||||
assertEquals(5, localCache(0, () -> dummy));
|
||||
|
||||
assertEquals(5, localCache(2, () -> 5));
|
||||
assertEquals(7, localCache(1, () -> dummy));
|
||||
assertEquals(5, localCache(0, () -> dummy));
|
||||
|
||||
result.set(true);
|
||||
}
|
||||
};
|
||||
thread.start();
|
||||
thread.join(1500);
|
||||
assertTrue(result.get(), "Thread didn't start");
|
||||
}
|
||||
}
|
47
src/test/java/net/minestom/server/utils/ObjectPoolTest.java
Normal file
47
src/test/java/net/minestom/server/utils/ObjectPoolTest.java
Normal file
@ -0,0 +1,47 @@
|
||||
package net.minestom.server.utils;
|
||||
|
||||
import net.minestom.server.utils.binary.BinaryBuffer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
public class ObjectPoolTest {
|
||||
|
||||
@Test
|
||||
public void pool() {
|
||||
var pool = ObjectPool.BUFFER_POOL;
|
||||
Set<BinaryBuffer> pooledBuffers = new HashSet<>();
|
||||
pool.clear();
|
||||
|
||||
assertEquals(0, pool.count());
|
||||
var buffer = pool.get();
|
||||
pooledBuffers.add(buffer);
|
||||
|
||||
buffer = pool.get();
|
||||
assertTrue(pooledBuffers.add(buffer));
|
||||
|
||||
pool.add(buffer);
|
||||
assertEquals(1, pool.count());
|
||||
buffer = pool.get();
|
||||
assertEquals(0, pool.count());
|
||||
assertFalse(pooledBuffers.add(buffer));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void autoClose() {
|
||||
var pool = ObjectPool.BUFFER_POOL;
|
||||
assertEquals(0, pool.count());
|
||||
try (var ignored = pool.hold()) {
|
||||
assertEquals(0, pool.count());
|
||||
}
|
||||
assertEquals(1, pool.count());
|
||||
|
||||
try (var ignored = pool.hold()) {
|
||||
assertEquals(0, pool.count());
|
||||
}
|
||||
assertEquals(1, pool.count());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user