implement pooled buffers

This commit is contained in:
Eoghanmc22 2020-11-20 12:16:45 -05:00
parent 035844787e
commit ee2e141673
7 changed files with 78 additions and 52 deletions

View File

@ -9,6 +9,10 @@ public class GroupedPacketHandler extends MessageToByteEncoder<FramedPacket> {
@Override
protected void encode(ChannelHandlerContext ctx, FramedPacket msg, ByteBuf out) {
out.writeBytes(msg.body.retainedSlice());
final ByteBuf packet = msg.body;
out.writeBytes(packet.duplicate());
if (msg.releaseBuf) {
packet.release();
}
}
}

View File

@ -10,9 +10,15 @@ import org.jetbrains.annotations.NotNull;
public class FramedPacket {
public final ByteBuf body;
public boolean releaseBuf = false;
public FramedPacket(@NotNull ByteBuf body) {
this.body = body;
}
public FramedPacket(@NotNull ByteBuf body, boolean releaseBuf) {
this.body = body;
this.releaseBuf = releaseBuf;
}
}

View File

@ -120,14 +120,7 @@ public class NettyPlayerConnection extends PlayerConnection {
if (identifier == null) {
// This packet explicitly said to do not retrieve the cache
if (MinecraftServer.processingNettyErrors())
channel.write(serverPacket).addListener(future -> {
if (!future.isSuccess()) {
future.cause().printStackTrace();
}
});
else
channel.write(serverPacket, channel.voidPromise());
write(serverPacket);
} else {
// Try to retrieve the cached buffer
TemporaryCache<ByteBuf> temporaryCache = cacheablePacket.getCache();
@ -135,44 +128,42 @@ public class NettyPlayerConnection extends PlayerConnection {
if (buffer == null) {
// Buffer not found, create and cache it
final long time = System.currentTimeMillis();
buffer = PacketUtils.createFramedPacket(serverPacket);
buffer = PacketUtils.createFramedPacket(serverPacket, false);
temporaryCache.cacheObject(identifier, buffer, time);
}
FramedPacket framedPacket = new FramedPacket(buffer);
if (MinecraftServer.processingNettyErrors())
channel.write(framedPacket).addListener(future -> {
if (!future.isSuccess()) {
future.cause().printStackTrace();
}
});
else
channel.write(framedPacket, channel.voidPromise());
write(framedPacket);
}
} else {
if (MinecraftServer.processingNettyErrors())
channel.write(serverPacket).addListener(future -> {
if (!future.isSuccess()) {
future.cause().printStackTrace();
}
});
else
channel.write(serverPacket, channel.voidPromise());
}
} else {
if (MinecraftServer.processingNettyErrors())
channel.writeAndFlush(serverPacket).addListener(future -> {
if (!future.isSuccess()) {
future.cause().printStackTrace();
}
});
else
channel.writeAndFlush(serverPacket, channel.voidPromise());
}
} else
write(serverPacket);
} else
writeAndFlush(serverPacket);
}
}
public void write(Object message) {
if (MinecraftServer.processingNettyErrors())
channel.write(message).addListener(future -> {
if (!future.isSuccess()) {
future.cause().printStackTrace();
}
});
else
channel.write(message, channel.voidPromise());
}
public void writeAndFlush(Object message) {
if (MinecraftServer.processingNettyErrors())
channel.writeAndFlush(message).addListener(future -> {
if (!future.isSuccess()) {
future.cause().printStackTrace();
}
});
else
channel.writeAndFlush(message, channel.voidPromise());
}
@NotNull
@Override
public SocketAddress getRemoteAddress() {

View File

@ -24,8 +24,8 @@ public final class PacketUtils {
private static final PacketListenerManager PACKET_LISTENER_MANAGER = MinecraftServer.getPacketListenerManager();
private static Deflater deflater = new Deflater(3);
private static byte[] buffer = new byte[8192];
private final static Deflater deflater = new Deflater(3);
private final static byte[] buffer = new byte[8192];
private PacketUtils() {
@ -45,16 +45,20 @@ public final class PacketUtils {
final boolean success = PACKET_LISTENER_MANAGER.processServerPacket(packet, players);
if (success) {
final ByteBuf finalBuffer = createFramedPacket(packet);
final FramedPacket framedPacket = new FramedPacket(finalBuffer);
final ByteBuf finalBuffer = createFramedPacket(packet, true);
final FramedPacket framedPacket = new FramedPacket(finalBuffer, true);
final int refIncrease = players.size() - 1;
if (refIncrease > 0)
finalBuffer.retain(refIncrease);
for (Player player : players) {
final PlayerConnection playerConnection = player.getPlayerConnection();
if (playerConnection instanceof NettyPlayerConnection) {
final NettyPlayerConnection nettyPlayerConnection = (NettyPlayerConnection) playerConnection;
nettyPlayerConnection.getChannel().write(framedPacket);
nettyPlayerConnection.write(framedPacket);
} else {
playerConnection.sendPacket(packet);
finalBuffer.release();
}
}
}
@ -84,7 +88,7 @@ public final class PacketUtils {
// Add 5 for the packet id and for the packet size
final int size = packetBuffer.writerIndex() + 5 + 5;
ByteBuf buffer = Unpooled.buffer(size);
ByteBuf buffer = BufUtils.getBuffer(true, size);
writePacket(buffer, packetBuffer, packet.getId());
@ -189,24 +193,27 @@ public final class PacketUtils {
* @param serverPacket the server packet to write
* @return the framed packet from the server one
*/
public static ByteBuf createFramedPacket(@NotNull ServerPacket serverPacket) {
public static ByteBuf createFramedPacket(@NotNull ServerPacket serverPacket, boolean directBuffer) {
ByteBuf packetBuf = writePacket(serverPacket);
// TODO use pooled buffers instead of unpooled ones
if (MinecraftServer.getCompressionThreshold() > 0) {
ByteBuf compressedBuf = Unpooled.buffer();
ByteBuf framedBuf = Unpooled.buffer();
ByteBuf compressedBuf = directBuffer ? BufUtils.getBuffer(true) : Unpooled.buffer();
ByteBuf framedBuf = directBuffer ? BufUtils.getBuffer(true) : Unpooled.buffer();
synchronized (deflater) {
compressBuffer(deflater, buffer, packetBuf, compressedBuf);
}
packetBuf.release();
frameBuffer(compressedBuf, framedBuf);
compressedBuf.release();
return framedBuf;
} else {
ByteBuf framedBuf = Unpooled.buffer();
ByteBuf framedBuf = directBuffer ? BufUtils.getBuffer(true) : Unpooled.buffer();
frameBuffer(packetBuf, framedBuf);
packetBuf.release();
return framedBuf;
}

View File

@ -4,6 +4,7 @@ import demo.blocks.BurningTorchBlock;
import demo.blocks.StoneBlock;
import demo.blocks.UpdatableBlockDemo;
import demo.commands.*;
import io.netty.handler.traffic.GlobalChannelTrafficShapingHandler;
import net.minestom.server.MinecraftServer;
import net.minestom.server.command.CommandManager;
import net.minestom.server.extras.optifine.OptifineSupport;
@ -27,8 +28,11 @@ public class Main {
MinecraftServer.setShouldProcessNettyErrors(false);
final NettyServer nettyServer = MinecraftServer.getNettyServer();
nettyServer.setWriteLimit(500_000);
nettyServer.setWriteLimit(500_000);
final GlobalChannelTrafficShapingHandler trafficHandler = nettyServer.getGlobalTrafficHandler();
trafficHandler.setReadChannelLimit(500_000);
trafficHandler.setReadChannelLimit(500_000);
//trafficHandler.setReadLimit(500_000);
//trafficHandler.setReadLimit(500_000);
BlockManager blockManager = MinecraftServer.getBlockManager();
blockManager.registerCustomBlock(new StoneBlock());

View File

@ -30,6 +30,7 @@ import net.minestom.server.utils.time.TimeUnit;
import net.minestom.server.world.DimensionType;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
public class PlayerInit {
@ -162,7 +163,9 @@ public class PlayerInit {
player.addEventCallback(PlayerLoginEvent.class, event -> {
event.setSpawningInstance(instanceContainer);
//int x = ThreadLocalRandom.current().nextInt()%10000;
int x = Math.abs(ThreadLocalRandom.current().nextInt()) % 1500 - 750;
int z = Math.abs(ThreadLocalRandom.current().nextInt()) % 1500 - 750;
player.setRespawnPoint(new Position(0, 64f, 0));
/*player.getInventory().addInventoryCondition((p, slot, clickType, inventoryConditionResult) -> {

View File

@ -0,0 +1,11 @@
package demo;
import net.minestom.server.Bootstrap;
public class Start {
public static void main(String[] args) {
Bootstrap.bootstrap("demo.Main", new String[0]);
}
}