diff --git a/src/main/java/com/viaversion/aas/codec/CompressionCodec.java b/src/main/java/com/viaversion/aas/codec/CompressionCodec.java index edf1289..201c7a5 100644 --- a/src/main/java/com/viaversion/aas/codec/CompressionCodec.java +++ b/src/main/java/com/viaversion/aas/codec/CompressionCodec.java @@ -1,20 +1,36 @@ package com.viaversion.aas.codec; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.velocitypowered.natives.compression.JavaVelocityCompressor; import com.velocitypowered.natives.compression.VelocityCompressor; import com.velocitypowered.natives.util.MoreByteBufUtils; import com.velocitypowered.natives.util.Natives; +import com.viaversion.aas.config.VIAaaSConfig; +import com.viaversion.aas.handler.MinecraftHandler; import com.viaversion.viaversion.api.type.Type; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.MessageToMessageCodec; +import kotlin.Pair; +import java.net.SocketAddress; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.DataFormatException; public class CompressionCodec extends MessageToMessageCodec { // stolen from Krypton (GPL) and modified // https://github.com/astei/krypton/blob/master/src/main/java/me/steinborn/krypton/mod/shared/network/compression/MinecraftCompressEncoder.java private static final int UNCOMPRESSED_CAP = 8 * 1024 * 1024; // 8MiB + // Workaround for Lilypad backend servers + private static final LoadingCache> nativeFails = CacheBuilder + .newBuilder() + .expireAfterWrite(2, TimeUnit.HOURS) + .build(CacheLoader.from(() -> new Pair<>(new AtomicInteger(), new AtomicInteger()))); private int threshold; private VelocityCompressor compressor; @@ -28,7 +44,24 @@ public class CompressionCodec extends MessageToMessageCodec { @Override public void handlerAdded(ChannelHandlerContext ctx) { - compressor = Natives.compress.get().create(6); + var useNative = true; + + var attempts = getAttempts(ctx); + if (attempts != null && Math.random() <= 0.95) { + // We'll use Java when the native compression fail rate is high + var probabilityNo = attempts.getSecond().get() + 1; + var divisor = attempts.getFirst().get() + 2; + useNative = !(Math.random() <= (double) probabilityNo / divisor); + } + + var level = VIAaaSConfig.INSTANCE.getCompressionLevel(); + if (useNative) { + compressor = Natives.compress.get().create(level); + } else { + compressor = JavaVelocityCompressor.FACTORY.create(level); + } + + recordHandlerAdded(ctx); } @Override @@ -66,6 +99,32 @@ public class CompressionCodec extends MessageToMessageCodec { return MoreByteBufUtils.preferredBuffer(ctx.alloc(), compressor, initialBufferSize); } + private Pair getAttempts(ChannelHandlerContext ctx) { + var handler = ctx.pipeline().get(MinecraftHandler.class); + if (handler == null || handler.getFrontEnd() || !ctx.channel().isActive()) return null; + var addr = handler.getEndRemoteAddress(); + + return nativeFails.getUnchecked(addr); + } + + private void recordHandlerAdded(ChannelHandlerContext ctx) { + if (compressor instanceof JavaVelocityCompressor) return; // Only record errors happened with native + + var attempts = getAttempts(ctx); + if (attempts != null) { + attempts.getFirst().incrementAndGet(); + } + } + + private void recordDecompressFailed(ChannelHandlerContext ctx) { + if (compressor instanceof JavaVelocityCompressor) return; // Only record errors happened with native + + var attempts = getAttempts(ctx); + if (attempts != null) { + attempts.getSecond().incrementAndGet(); + } + } + @Override protected void decode(ChannelHandlerContext ctx, ByteBuf input, List out) throws Exception { if (!input.isReadable() || !ctx.channel().isActive()) return; @@ -88,6 +147,12 @@ public class CompressionCodec extends MessageToMessageCodec { compressor.inflate(compatibleIn, decompressed, claimedUncompressedSize); input.clear(); out.add(decompressed.retain()); + } catch (DataFormatException ex) { + if (ex.getMessage().startsWith("Received a deflate stream that was too large, wanted ")) { + return; // workaround for lilypad + } + recordDecompressFailed(ctx); + throw ex; } finally { decompressed.release(); compatibleIn.release(); diff --git a/src/main/kotlin/com/viaversion/aas/config/VIAaaSConfig.kt b/src/main/kotlin/com/viaversion/aas/config/VIAaaSConfig.kt index 2434d87..68b7b7a 100644 --- a/src/main/kotlin/com/viaversion/aas/config/VIAaaSConfig.kt +++ b/src/main/kotlin/com/viaversion/aas/config/VIAaaSConfig.kt @@ -95,4 +95,5 @@ object VIAaaSConfig : Config(File("config/viaaas.yml")) { get() = this.getString("backend-proxy", "").let { if (it.isNullOrEmpty()) null else URI.create(it) } val protocolDetectorCache: Int get() = this.getInt("protocol-detector-cache", 30) + val compressionLevel: Int get() = this.getInt("compression-level", 6) } diff --git a/src/main/kotlin/com/viaversion/aas/handler/Util.kt b/src/main/kotlin/com/viaversion/aas/handler/Util.kt index 5bb0b66..ba52cad 100644 --- a/src/main/kotlin/com/viaversion/aas/handler/Util.kt +++ b/src/main/kotlin/com/viaversion/aas/handler/Util.kt @@ -37,10 +37,14 @@ fun addProxyHandler(pipe: ChannelPipeline, proxyUri: URI?, socket: InetSocketAdd } fun decodeBrand(data: ByteArray, is17: Boolean): String { - return if (is17) { - String(data, Charsets.UTF_8) - } else { - Type.STRING.read(Unpooled.wrappedBuffer(data)) + return when { + data.isEmpty() -> "" + is17 -> { + String(data, Charsets.UTF_8) + } + else -> { + Type.STRING.read(Unpooled.wrappedBuffer(data)) + } } } diff --git a/src/main/resources/viaaas.yml b/src/main/resources/viaaas.yml index 7aeaede..a61bdb6 100644 --- a/src/main/resources/viaaas.yml +++ b/src/main/resources/viaaas.yml @@ -16,6 +16,8 @@ backend-proxy: '' # Migrated to backend-proxy backend-socks5-proxy-address: '' backend-socks5-proxy-port: 9050 +# Zlib Compression level +compression-level: 6 # ###### # Crypto