Heavily reduce the number of direct buffers required for each worker

This commit is contained in:
themode 2022-01-26 06:37:46 +01:00
parent d0b874ed7c
commit 0b4dd3b8aa
4 changed files with 15 additions and 21 deletions

View File

@ -85,8 +85,7 @@ public class PlayerSocketConnection extends PlayerConnection {
PooledBuffers.registerBuffers(this, waitingBuffers);
}
public void processPackets(Worker.Context workerContext, PacketProcessor packetProcessor) {
final BinaryBuffer readBuffer = workerContext.readBuffer;
public void processPackets(BinaryBuffer readBuffer, PacketProcessor packetProcessor) {
// Decrypt data
if (encrypted) {
final Cipher cipher = decryptCipher;
@ -100,7 +99,7 @@ public class PlayerSocketConnection extends PlayerConnection {
}
// Read all packets
try {
var result = PacketUtils.readPackets(readBuffer, compressed, workerContext);
var result = PacketUtils.readPackets(readBuffer, compressed);
this.cacheBuffer = result.remaining();
for (var packet : result.packets()) {
var id = packet.id();

View File

@ -6,6 +6,7 @@ import net.minestom.server.entity.Player;
import net.minestom.server.network.player.PlayerSocketConnection;
import net.minestom.server.thread.MinestomThread;
import net.minestom.server.utils.binary.BinaryBuffer;
import net.minestom.server.utils.binary.PooledBuffers;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscUnboundedXaddArrayQueue;
import org.jetbrains.annotations.ApiStatus;
@ -18,14 +19,12 @@ import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.Inflater;
@ApiStatus.Internal
public final class Worker extends MinestomThread {
private static final AtomicInteger COUNTER = new AtomicInteger();
final Selector selector;
private final Context context = new Context();
private final Map<SocketChannel, PlayerSocketConnection> connectionMap = new ConcurrentHashMap<>();
private final Server server;
private final MpscUnboundedXaddArrayQueue<Runnable> queue = new MpscUnboundedXaddArrayQueue<>(1024);
@ -62,12 +61,12 @@ public final class Worker extends MinestomThread {
if (!key.isReadable()) return;
PlayerSocketConnection connection = connectionMap.get(channel);
try {
BinaryBuffer readBuffer = context.readBuffer.clear();
BinaryBuffer readBuffer = BinaryBuffer.wrap(PooledBuffers.packetBuffer());
// Consume last incomplete packet
connection.consumeCache(readBuffer);
// Read & process
readBuffer.readChannel(channel);
connection.processPackets(context, server.packetProcessor());
connection.processPackets(readBuffer, server.packetProcessor());
} catch (IOException e) {
// TODO print exception? (should ignore disconnection)
connection.disconnect();
@ -112,13 +111,4 @@ public final class Worker extends MinestomThread {
public MessagePassingQueue<Runnable> queue() {
return queue;
}
/**
* Contains objects that we can be shared across all the connection of a {@link Worker worker}.
*/
public static final class Context {
public final BinaryBuffer readBuffer = BinaryBuffer.ofSize(Server.MAX_PACKET_SIZE);
public final BinaryBuffer contentBuffer = BinaryBuffer.ofSize(Server.MAX_PACKET_SIZE);
public final Inflater inflater = new Inflater();
}
}

View File

@ -18,7 +18,6 @@ import net.minestom.server.network.packet.server.SendablePacket;
import net.minestom.server.network.packet.server.ServerPacket;
import net.minestom.server.network.player.PlayerConnection;
import net.minestom.server.network.player.PlayerSocketConnection;
import net.minestom.server.network.socket.Worker;
import net.minestom.server.utils.binary.BinaryBuffer;
import net.minestom.server.utils.binary.BinaryWriter;
import net.minestom.server.utils.binary.PooledBuffers;
@ -161,8 +160,7 @@ public final class PacketUtils {
}
@ApiStatus.Internal
public static ReadResult readPackets(@NotNull BinaryBuffer readBuffer, boolean compressed,
@NotNull Worker.Context context) throws DataFormatException {
public static ReadResult readPackets(@NotNull BinaryBuffer readBuffer, boolean compressed) throws DataFormatException {
List<PacketPayload> packets = new ArrayList<>();
BinaryBuffer remaining = null;
while (readBuffer.readableBytes() > 0) {
@ -186,9 +184,9 @@ public final class PacketUtils {
decompressedSize = payloadLength;
} else {
// Decompress to content buffer
content = context.contentBuffer.clear();
content = BinaryBuffer.wrap(PooledBuffers.tempBuffer());
decompressedSize = dataLength;
Inflater inflater = context.inflater;
Inflater inflater = new Inflater(); // TODO: Pool?
inflater.setInput(readBuffer.asByteBuffer(readBuffer.readerOffset(), payloadLength));
inflater.inflate(content.asByteBuffer(0, dataLength));
inflater.reset();

View File

@ -33,6 +33,13 @@ public final class BinaryBuffer {
return new BinaryBuffer(ByteBuffer.allocateDirect(size));
}
@ApiStatus.Internal
public static BinaryBuffer wrap(ByteBuffer buffer) {
assert buffer.isDirect();
return new BinaryBuffer(buffer);
}
public static BinaryBuffer copy(BinaryBuffer buffer) {
final int size = buffer.readableBytes();
final var temp = ByteBuffer.allocateDirect(size)