More packet writing utils

This commit is contained in:
themode 2024-09-16 18:46:44 +02:00 committed by Matt Worzala
parent 47480f330b
commit 959caeeb75
2 changed files with 56 additions and 24 deletions

View File

@ -8,6 +8,9 @@ import net.minestom.server.network.packet.server.ServerPacket;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import java.util.Queue;
import java.util.function.BiPredicate;
/**
* Tools to write packets into a {@link NetworkBuffer} for network processing.
* <p>
@ -43,6 +46,17 @@ public final class PacketWriting {
@NotNull T packet,
int compressionThreshold) throws IndexOutOfBoundsException {
final PacketRegistry.PacketInfo<T> packetInfo = registry.packetInfo(packet);
writeFramedPacket(
buffer,
packetInfo, packet,
compressionThreshold
);
}
public static <T> void writeFramedPacket(@NotNull NetworkBuffer buffer,
@NotNull PacketRegistry.PacketInfo<T> packetInfo,
@NotNull T packet,
int compressionThreshold) throws IndexOutOfBoundsException {
final int id = packetInfo.id();
final NetworkBuffer.Type<T> serializer = packetInfo.serializer();
writeFramedPacket(
@ -160,4 +174,41 @@ public final class PacketWriting {
return tmpBuffer.copy(0, tmpBuffer.writeIndex());
}
}
public static <T> void writeQueue(NetworkBuffer buffer, Queue<T> queue, int minWrite,
BiPredicate<NetworkBuffer, T> writer) {
// The goal of this method is to write at the very least `minWrite` packets if the queue permits it.
// The buffer is resized if it cannot hold this minimum.
final int size = queue.size();
minWrite = Math.min(minWrite, size);
T packet;
int written = 0;
while ((packet = queue.peek()) != null) {
final long index = buffer.writeIndex();
boolean success;
try {
success = writer.test(buffer, packet);
} catch (IndexOutOfBoundsException e) {
success = false;
}
assert !success || buffer.writeIndex() > 0;
// Poll the packet only if fully written
if (success) {
// Packet fully written
queue.poll();
written++;
} else {
buffer.writeIndex(index);
if (written < minWrite) {
// Try again with a bigger buffer
final long newSize = Math.min(buffer.capacity() * 2, ServerFlag.MAX_PACKET_SIZE);
buffer.resize(newSize);
} else {
// At least one packet has been written
// Not worth resizing to fit more, we'll try again next flush
break;
}
}
}
}
}

View File

@ -408,31 +408,12 @@ public class PlayerSocketConnection extends PlayerConnection {
if (!channel.isConnected()) throw new EOFException("Channel is closed");
NetworkBuffer buffer = PacketVanilla.PACKET_POOL.get();
// Write to buffer
SendablePacket packet;
int written = 0;
while ((packet = packetQueue.peek()) != null) {
PacketWriting.writeQueue(buffer, packetQueue, 1, (b, packet) -> {
final boolean compressed = sentPacketCounter.get() > compressionStart;
final boolean success = writeSendable(buffer, packet, compressed);
assert !success || buffer.writeIndex() > 0;
// Poll the packet only if fully written
if (success) {
// Packet fully written
packetQueue.poll();
sentPacketCounter.getAndIncrement();
written++;
} else {
if (written == 0) {
assert buffer.writeIndex() == 0;
// Try again with a bigger buffer
final long newSize = Math.min(buffer.capacity() * 2, ServerFlag.MAX_PACKET_SIZE);
buffer.resize(newSize);
} else {
// At least one packet has been written
// Not worth resizing to fit more, we'll try again next flush
break;
}
}
}
final boolean success = writeSendable(b, packet, compressed);
if (success) sentPacketCounter.getAndIncrement();
return success;
});
// Write to channel
final boolean success = buffer.writeChannel(channel);
// Keep the buffer if not fully written