Limit viewable packet allocations

This commit is contained in:
themode 2021-10-15 11:09:05 +02:00
parent 0bd519c894
commit 099397b968
2 changed files with 40 additions and 45 deletions

View File

@ -3,6 +3,7 @@ package net.minestom.server.utils;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.longs.LongArrayList; import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongIterator;
import it.unimi.dsi.fastutil.longs.LongList; import it.unimi.dsi.fastutil.longs.LongList;
import net.kyori.adventure.audience.Audience; import net.kyori.adventure.audience.Audience;
import net.kyori.adventure.audience.ForwardingAudience; import net.kyori.adventure.audience.ForwardingAudience;
@ -28,13 +29,11 @@ import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.WeakHashMap; import java.util.WeakHashMap;
import java.util.function.Consumer;
import java.util.zip.Deflater; import java.util.zip.Deflater;
/** /**
@ -162,9 +161,9 @@ public final class PacketUtils {
} }
ViewableStorage viewableStorage; ViewableStorage viewableStorage;
synchronized (VIEWABLE_STORAGE_MAP) { synchronized (VIEWABLE_STORAGE_MAP) {
viewableStorage = VIEWABLE_STORAGE_MAP.computeIfAbsent(viewable, ViewableStorage::new); viewableStorage = VIEWABLE_STORAGE_MAP.computeIfAbsent(viewable, v -> new ViewableStorage());
} }
viewableStorage.append(serverPacket, entity instanceof Player ? (Player) entity : null); viewableStorage.append(viewable, serverPacket, entity instanceof Player ? (Player) entity : null);
} }
@ApiStatus.Experimental @ApiStatus.Experimental
@ -175,8 +174,8 @@ public final class PacketUtils {
@ApiStatus.Internal @ApiStatus.Internal
public static void flush() { public static void flush() {
synchronized (VIEWABLE_STORAGE_MAP) { synchronized (VIEWABLE_STORAGE_MAP) {
VIEWABLE_STORAGE_MAP.values().parallelStream() VIEWABLE_STORAGE_MAP.entrySet().parallelStream().forEach(entry ->
.forEach(viewableStorage -> viewableStorage.process(null)); entry.getValue().process(entry.getKey()));
} }
} }
@ -240,26 +239,29 @@ public final class PacketUtils {
} }
private static final class ViewableStorage { private static final class ViewableStorage {
private final WeakReference<Viewable> viewable;
// Player id -> list of offsets to ignore (32:32 bits) // Player id -> list of offsets to ignore (32:32 bits)
private final Int2ObjectMap<LongList> entityIdMap = new Int2ObjectOpenHashMap<>(); private final Int2ObjectMap<LongList> entityIdMap = new Int2ObjectOpenHashMap<>();
private final BinaryBuffer buffer = PooledBuffers.get(); private final BinaryBuffer buffer = PooledBuffers.get();
private ViewableStorage(Viewable viewable) { {
this.viewable = new WeakReference<>(viewable);
PooledBuffers.registerBuffer(this, buffer); PooledBuffers.registerBuffer(this, buffer);
} }
private synchronized void append(ServerPacket serverPacket, Player player) { private synchronized void append(Viewable viewable, ServerPacket serverPacket, Player player) {
final ByteBuffer framedPacket = createFramedPacket(serverPacket).flip(); final ByteBuffer framedPacket = createFramedPacket(serverPacket).flip();
final int packetSize = framedPacket.limit(); final int packetSize = framedPacket.limit();
if (packetSize >= buffer.capacity()) { if (packetSize >= buffer.capacity()) {
process(new SingleEntry(framedPacket, player)); process(viewable);
for (Player viewer : viewable.getViewers()) {
if (!Objects.equals(player, viewer)) {
writeTo(viewer.getPlayerConnection(), framedPacket.position(0));
}
}
return; return;
} }
if (!buffer.canWrite(packetSize)) process(null); if (!buffer.canWrite(packetSize)) process(viewable);
final int start = buffer.writerOffset(); final int start = buffer.writerOffset();
buffer.write(framedPacket); this.buffer.write(framedPacket);
final int end = buffer.writerOffset(); final int end = buffer.writerOffset();
if (player != null) { if (player != null) {
final long offsets = (long) start << 32 | end & 0xFFFFFFFFL; final long offsets = (long) start << 32 | end & 0xFFFFFFFFL;
@ -268,29 +270,23 @@ public final class PacketUtils {
} }
} }
private synchronized void process(@Nullable SingleEntry singleEntry) { private synchronized void process(Viewable viewable) {
final Viewable viewable; if (buffer.writerOffset() == 0) {
if (buffer.writerOffset() == 0 || (viewable = this.viewable.get()) == null) {
clear(); clear();
return; return;
} }
for (Player player : viewable.getViewers()) { for (Player player : viewable.getViewers()) {
PlayerConnection connection = player.getPlayerConnection(); final PlayerConnection connection = player.getPlayerConnection();
Consumer<ByteBuffer> writer = connection instanceof PlayerSocketConnection
? ((PlayerSocketConnection) connection)::write :
byteBuffer -> {
// TODO for non-socket connection
};
int lastWrite = 0; int lastWrite = 0;
final LongList pairs = entityIdMap.get(player.getEntityId()); final LongList pairs;
if (pairs != null) { if (!entityIdMap.isEmpty() && (pairs = entityIdMap.get(player.getEntityId())) != null) {
for (long offsets : pairs) { for (LongIterator it = pairs.longIterator(); it.hasNext(); ) {
final long offsets = it.nextLong();
final int start = (int) (offsets >> 32); final int start = (int) (offsets >> 32);
final int end = (int) offsets; final int end = (int) offsets;
if (start != lastWrite) { if (start != lastWrite) {
ByteBuffer slice = buffer.asByteBuffer(lastWrite, start - lastWrite); ByteBuffer slice = buffer.view(lastWrite, start);
writer.accept(slice); writeTo(connection, slice);
} }
lastWrite = end; lastWrite = end;
} }
@ -298,31 +294,24 @@ public final class PacketUtils {
// Write remaining // Write remaining
final int remaining = buffer.writerOffset() - lastWrite; final int remaining = buffer.writerOffset() - lastWrite;
if (remaining > 0) { if (remaining > 0) {
ByteBuffer remainSlice = buffer.asByteBuffer(lastWrite, remaining); ByteBuffer remainSlice = buffer.view(lastWrite, buffer.writerOffset());
writer.accept(remainSlice); writeTo(connection, remainSlice);
}
// Handle single entry
if (singleEntry != null && !Objects.equals(singleEntry.exception, player)) {
writer.accept(singleEntry.buffer.position(0));
} }
} }
clear(); clear();
} }
private static void writeTo(PlayerConnection connection, ByteBuffer buffer) {
if (connection instanceof PlayerSocketConnection) {
((PlayerSocketConnection) connection).write(buffer);
return;
}
// TODO for non-socket connection
}
private void clear() { private void clear() {
this.entityIdMap.clear(); this.entityIdMap.clear();
this.buffer.clear(); this.buffer.clear();
} }
private static final class SingleEntry {
private final ByteBuffer buffer;
private final Player exception;
public SingleEntry(ByteBuffer buffer, Player exception) {
this.buffer = buffer;
this.exception = exception;
}
}
} }
} }

View File

@ -125,6 +125,7 @@ public final class BinaryBuffer {
public BinaryBuffer clear() { public BinaryBuffer clear() {
this.readerOffset = 0; this.readerOffset = 0;
this.writerOffset = 0; this.writerOffset = 0;
this.nioBuffer.limit(capacity);
return this; return this;
} }
@ -132,6 +133,11 @@ public final class BinaryBuffer {
return nioBuffer.position(reader).slice().limit(writer); return nioBuffer.position(reader).slice().limit(writer);
} }
@ApiStatus.Internal
public ByteBuffer view(int position, int limit) {
return nioBuffer.limit(limit).position(position);
}
public boolean writeChannel(WritableByteChannel channel) throws IOException { public boolean writeChannel(WritableByteChannel channel) throws IOException {
var writeBuffer = asByteBuffer(readerOffset, writerOffset - readerOffset); var writeBuffer = asByteBuffer(readerOffset, writerOffset - readerOffset);
final int count = channel.write(writeBuffer); final int count = channel.write(writeBuffer);