Prevent compression from blocking the thread too much

This commit is contained in:
themode 2021-01-08 16:20:04 +01:00
parent 57166b2d23
commit 90006ac48d
2 changed files with 47 additions and 20 deletions

View File

@ -43,7 +43,7 @@ public class PacketCompressor extends ByteToMessageCodec<ByteBuf> {
@Override @Override
protected void encode(ChannelHandlerContext ctx, ByteBuf from, ByteBuf to) { protected void encode(ChannelHandlerContext ctx, ByteBuf from, ByteBuf to) {
PacketUtils.compressBuffer(deflater, buffer, from, to); PacketUtils.compressBuffer(deflater, buffer, from, to, false);
} }
@Override @Override
@ -68,13 +68,11 @@ public class PacketCompressor extends ByteToMessageCodec<ByteBuf> {
buf.readBytes(input); buf.readBytes(input);
inflater.setInput(input); inflater.setInput(input);
byte[] output = new byte[i]; byte[] output = new byte[i];
inflater.inflate(output); inflater.inflate(output);
out.add(Unpooled.wrappedBuffer(output));
inflater.reset(); inflater.reset();
out.add(Unpooled.wrappedBuffer(output));
} }
} }
} }

View File

@ -16,6 +16,9 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.zip.Deflater; import java.util.zip.Deflater;
/** /**
@ -26,8 +29,14 @@ public final class PacketUtils {
private static final PacketListenerManager PACKET_LISTENER_MANAGER = MinecraftServer.getPacketListenerManager(); private static final PacketListenerManager PACKET_LISTENER_MANAGER = MinecraftServer.getPacketListenerManager();
private final static Deflater deflater = new Deflater(3); private static final int DEFLATER_COUNT = 10;
private final static byte[] buffer = new byte[8192]; private static final List<Deflater> DEFLATERS = new CopyOnWriteArrayList<>();
static {
for (int i = 0; i < DEFLATER_COUNT; i++) {
DEFLATERS.add(new Deflater(3));
}
}
private PacketUtils() { private PacketUtils() {
@ -191,28 +200,47 @@ public final class PacketUtils {
* {@code packetBuffer} needs to be the packet content without any header (if you want to use it to write a Minecraft packet). * {@code packetBuffer} needs to be the packet content without any header (if you want to use it to write a Minecraft packet).
* *
* @param deflater the deflater for zlib compression * @param deflater the deflater for zlib compression
* @param buffer a cached buffer which will be used to store temporary the deflater output * @param buffer a cached buffer which will be used to store temporary the deflater output,
* null if you prefer the buffer to be allocated dynamically when required
* @param packetBuffer the buffer containing all the packet fields * @param packetBuffer the buffer containing all the packet fields
* @param compressionTarget the buffer which will receive the compressed version of {@code packetBuffer} * @param compressionTarget the buffer which will receive the compressed version of {@code packetBuffer}
*/ */
public static void compressBuffer(@NotNull Deflater deflater, @NotNull byte[] buffer, @NotNull ByteBuf packetBuffer, @NotNull ByteBuf compressionTarget) { public static void compressBuffer(@NotNull Deflater deflater, @Nullable byte[] buffer,
@NotNull ByteBuf packetBuffer, @NotNull ByteBuf compressionTarget,
boolean synchronization) {
final int packetLength = packetBuffer.readableBytes(); final int packetLength = packetBuffer.readableBytes();
if (packetLength < MinecraftServer.getCompressionThreshold()) { if (packetLength < MinecraftServer.getCompressionThreshold()) {
Utils.writeVarIntBuf(compressionTarget, 0); Utils.writeVarIntBuf(compressionTarget, 0);
compressionTarget.writeBytes(packetBuffer); compressionTarget.writeBytes(packetBuffer);
} else { } else {
Utils.writeVarIntBuf(compressionTarget, packetLength); Utils.writeVarIntBuf(compressionTarget, packetLength);
deflater.setInput(packetBuffer.nioBuffer()); final Runnable compressor = () -> {
deflater.finish(); // Allocate buffer if not already
byte[] output = buffer != null ? buffer : new byte[8192];
while (!deflater.finished()) { deflater.setInput(packetBuffer.nioBuffer());
final int length = deflater.deflate(buffer); deflater.finish();
compressionTarget.writeBytes(buffer, 0, length);
while (!deflater.finished()) {
final int length = deflater.deflate(output);
compressionTarget.writeBytes(output, 0, length);
}
deflater.reset();
;
};
// Synchronize only if asked
if (synchronization) {
synchronized (deflater) {
compressor.run();
}
} else {
compressor.run();
} }
deflater.reset();
} }
} }
@ -231,12 +259,13 @@ public final class PacketUtils {
ByteBuf packetBuf = writePacket(serverPacket); ByteBuf packetBuf = writePacket(serverPacket);
if (MinecraftServer.getCompressionThreshold() > 0) { if (MinecraftServer.getCompressionThreshold() > 0) {
ByteBuf compressedBuf = directBuffer ? BufUtils.getBuffer(true) : Unpooled.buffer(); ByteBuf compressedBuf = directBuffer ? BufUtils.getBuffer(true) : Unpooled.buffer();
ByteBuf framedBuf = directBuffer ? BufUtils.getBuffer(true) : Unpooled.buffer(); ByteBuf framedBuf = directBuffer ? BufUtils.getBuffer(true) : Unpooled.buffer();
synchronized (deflater) {
compressBuffer(deflater, buffer, packetBuf, compressedBuf); // Get a random deflater
} final Deflater deflater = DEFLATERS.get(ThreadLocalRandom.current().nextInt(DEFLATER_COUNT));
compressBuffer(deflater, null, packetBuf, compressedBuf, true);
packetBuf.release(); packetBuf.release();
frameBuffer(compressedBuf, framedBuf); frameBuffer(compressedBuf, framedBuf);