Minestom/src/main/java/net/minestom/server/utils/PacketUtils.java

335 lines
15 KiB
Java
Raw Normal View History

2020-04-24 03:25:58 +02:00
package net.minestom.server.utils;
2019-08-22 14:52:32 +02:00
2021-08-24 14:59:17 +02:00
import it.unimi.dsi.fastutil.ints.IntIntPair;
import net.kyori.adventure.audience.Audience;
import net.kyori.adventure.audience.ForwardingAudience;
import net.minestom.server.MinecraftServer;
2021-08-24 14:59:17 +02:00
import net.minestom.server.Viewable;
2021-06-11 17:19:11 +02:00
import net.minestom.server.adventure.MinestomAdventure;
import net.minestom.server.adventure.audience.PacketGroupingAudience;
2021-08-24 14:59:17 +02:00
import net.minestom.server.entity.Entity;
import net.minestom.server.entity.Player;
import net.minestom.server.listener.manager.PacketListenerManager;
2021-08-04 04:00:42 +02:00
import net.minestom.server.network.packet.FramedPacket;
2021-03-12 16:33:19 +01:00
import net.minestom.server.network.packet.server.ComponentHoldingServerPacket;
2020-04-24 03:25:58 +02:00
import net.minestom.server.network.packet.server.ServerPacket;
import net.minestom.server.network.player.PlayerConnection;
import net.minestom.server.network.player.PlayerSocketConnection;
2021-08-04 12:41:15 +02:00
import net.minestom.server.network.socket.Server;
2021-08-24 14:59:17 +02:00
import net.minestom.server.utils.binary.BinaryBuffer;
import net.minestom.server.utils.binary.BinaryWriter;
import net.minestom.server.utils.callback.validator.PlayerValidator;
2021-08-11 14:47:20 +02:00
import org.jetbrains.annotations.ApiStatus;
2020-11-05 22:20:51 +01:00
import org.jetbrains.annotations.NotNull;
2021-08-24 14:59:17 +02:00
import org.jetbrains.annotations.Nullable;
2019-08-22 14:52:32 +02:00
2021-08-04 12:41:15 +02:00
import java.nio.BufferOverflowException;
2021-06-20 20:59:53 +02:00
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
2021-08-24 14:59:17 +02:00
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
2021-08-24 14:59:17 +02:00
import java.util.function.Consumer;
2021-06-20 20:59:53 +02:00
import java.util.zip.Deflater;
2020-08-15 04:05:15 +02:00
/**
* Utils class for packets. Including writing a {@link ServerPacket} into a {@link ByteBuffer}
2020-11-13 09:17:53 +01:00
* for network processing.
2020-08-15 04:05:15 +02:00
*/
public final class PacketUtils {
private static final PacketListenerManager PACKET_LISTENER_MANAGER = MinecraftServer.getPacketListenerManager();
2021-06-20 20:59:53 +02:00
private static final ThreadLocal<Deflater> COMPRESSOR = ThreadLocal.withInitial(Deflater::new);
2021-08-13 01:13:58 +02:00
private static final LocalCache PACKET_BUFFER = LocalCache.get("packet-buffer", Server.SOCKET_BUFFER_SIZE);
private static final LocalCache COMPRESSION_CACHE = LocalCache.get("compression-buffer", Server.SOCKET_BUFFER_SIZE);
private PacketUtils() {
}
2019-08-22 14:52:32 +02:00
/**
* Sends a packet to an audience. This method performs the following steps in the
* following order:
* <ol>
* <li>If {@code audience} is a {@link Player}, send the packet to them.</li>
* <li>Otherwise, if {@code audience} is a {@link PacketGroupingAudience}, call
* {@link #sendGroupedPacket(Collection, ServerPacket)} on the players that the
* grouping audience contains.</li>
* <li>Otherwise, if {@code audience} is a {@link ForwardingAudience.Single},
* call this method on the single audience inside the forwarding audience.</li>
* <li>Otherwise, if {@code audience} is a {@link ForwardingAudience}, call this
* method for each audience member of the forwarding audience.</li>
* <li>Otherwise, do nothing.</li>
* </ol>
*
* @param audience the audience
* @param packet the packet
*/
@SuppressWarnings("OverrideOnly") // we need to access the audiences inside ForwardingAudience
public static void sendPacket(@NotNull Audience audience, @NotNull ServerPacket packet) {
if (audience instanceof Player) {
((Player) audience).getPlayerConnection().sendPacket(packet);
} else if (audience instanceof PacketGroupingAudience) {
PacketUtils.sendGroupedPacket(((PacketGroupingAudience) audience).getPlayers(), packet);
} else if (audience instanceof ForwardingAudience.Single) {
PacketUtils.sendPacket(((ForwardingAudience.Single) audience).audience(), packet);
} else if (audience instanceof ForwardingAudience) {
for (Audience member : ((ForwardingAudience) audience).audiences()) {
PacketUtils.sendPacket(member, packet);
}
}
}
2020-11-13 09:17:53 +01:00
/**
* Sends a {@link ServerPacket} to multiple players.
2020-11-13 09:17:53 +01:00
* <p>
* Can drastically improve performance since the packet will not have to be processed as much.
2020-11-13 09:17:53 +01:00
*
* @param players the players to send the packet to
* @param packet the packet to send to the players
* @param playerValidator optional callback to check if a specify player of {@code players} should receive the packet
2020-11-13 09:17:53 +01:00
*/
public static void sendGroupedPacket(@NotNull Collection<Player> players, @NotNull ServerPacket packet,
2021-08-04 16:49:01 +02:00
@NotNull PlayerValidator playerValidator) {
2020-11-20 14:14:55 +01:00
if (players.isEmpty())
return;
2021-03-12 16:33:19 +01:00
// work out if the packet needs to be sent individually due to server-side translating
boolean needsTranslating = false;
2021-06-11 17:19:11 +02:00
if (MinestomAdventure.AUTOMATIC_COMPONENT_TRANSLATION && packet instanceof ComponentHoldingServerPacket) {
needsTranslating = ComponentUtils.areAnyTranslatable(((ComponentHoldingServerPacket) packet).components());
2021-03-12 16:33:19 +01:00
}
if (MinecraftServer.hasGroupedPacket() && !needsTranslating) {
// Send grouped packet...
2021-08-04 16:49:01 +02:00
if (!PACKET_LISTENER_MANAGER.processServerPacket(packet, players))
return;
final ByteBuffer finalBuffer = createFramedPacket(packet);
final FramedPacket framedPacket = new FramedPacket(packet.getId(), finalBuffer, packet);
2021-08-04 16:49:01 +02:00
// Send packet to all players
for (Player player : players) {
if (!player.isOnline() || !playerValidator.isValid(player))
continue;
final PlayerConnection connection = player.getPlayerConnection();
if (connection instanceof PlayerSocketConnection) {
((PlayerSocketConnection) connection).write(framedPacket);
2021-08-04 16:49:01 +02:00
} else {
connection.sendPacket(packet);
}
}
} else {
// Write the same packet for each individual players
for (Player player : players) {
2021-08-04 16:49:01 +02:00
if (!player.isOnline() || !playerValidator.isValid(player))
continue;
player.getPlayerConnection().sendPacket(packet, false);
}
}
}
/**
* Same as {@link #sendGroupedPacket(Collection, ServerPacket, PlayerValidator)}
* but with the player validator sets to null.
*
* @see #sendGroupedPacket(Collection, ServerPacket, PlayerValidator)
*/
public static void sendGroupedPacket(@NotNull Collection<Player> players, @NotNull ServerPacket packet) {
2021-08-04 16:49:01 +02:00
sendGroupedPacket(players, packet, player -> true);
}
public static void broadcastPacket(@NotNull ServerPacket packet) {
sendGroupedPacket(MinecraftServer.getConnectionManager().getOnlinePlayers(), packet);
}
public static void writeFramedPacket(@NotNull ByteBuffer buffer,
@NotNull ServerPacket packet,
boolean compression) {
if (!compression) {
// Length + payload
final int lengthIndex = Utils.writeEmptyVarIntHeader(buffer);
Utils.writeVarInt(buffer, packet.getId());
packet.write(new BinaryWriter(buffer));
final int finalSize = buffer.position() - (lengthIndex + 3);
Utils.writeVarIntHeader(buffer, lengthIndex, finalSize);
return;
}
// Compressed format
final int compressedIndex = Utils.writeEmptyVarIntHeader(buffer);
final int uncompressedIndex = Utils.writeEmptyVarIntHeader(buffer);
final int contentStart = buffer.position();
Utils.writeVarInt(buffer, packet.getId());
packet.write(new BinaryWriter(buffer));
final int packetSize = buffer.position() - contentStart;
if (packetSize >= MinecraftServer.getCompressionThreshold()) {
// Packet large enough, compress
final int limitCache = buffer.limit();
buffer.position(contentStart).limit(contentStart + packetSize);
2021-08-13 01:13:58 +02:00
var uncompressedCopy = COMPRESSION_CACHE.get().put(buffer).flip();
buffer.position(contentStart).limit(limitCache);
var deflater = COMPRESSOR.get();
deflater.setInput(uncompressedCopy);
deflater.finish();
deflater.deflate(buffer);
deflater.reset();
Utils.writeVarIntHeader(buffer, compressedIndex, (buffer.position() - contentStart) + 3);
Utils.writeVarIntHeader(buffer, uncompressedIndex, packetSize);
} else {
Utils.writeVarIntHeader(buffer, compressedIndex, packetSize + 3);
Utils.writeVarIntHeader(buffer, uncompressedIndex, 0);
2021-03-26 13:08:05 +01:00
}
}
2021-08-04 12:41:15 +02:00
2021-08-05 03:09:45 +02:00
public static ByteBuffer createFramedPacket(@NotNull ByteBuffer initial, @NotNull ServerPacket packet,
boolean compression) {
2021-08-04 16:49:01 +02:00
var buffer = initial;
2021-08-04 12:41:15 +02:00
try {
2021-08-04 16:49:01 +02:00
writeFramedPacket(buffer, packet, compression);
2021-08-04 12:41:15 +02:00
} catch (BufferOverflowException e) {
// In the unlikely case where the packet is bigger than the default buffer size,
// increase to the highest authorized buffer size using heap (for cheap allocation)
buffer = ByteBuffer.allocate(Server.MAX_PACKET_SIZE);
2021-08-04 16:49:01 +02:00
writeFramedPacket(buffer, packet, compression);
2021-08-04 12:41:15 +02:00
}
return buffer;
}
2021-08-04 16:49:01 +02:00
2021-08-05 03:09:45 +02:00
public static ByteBuffer createFramedPacket(@NotNull ByteBuffer initial, @NotNull ServerPacket packet) {
return createFramedPacket(initial, packet, MinecraftServer.getCompressionThreshold() > 0);
}
2021-08-04 16:49:01 +02:00
public static ByteBuffer createFramedPacket(@NotNull ServerPacket packet) {
2021-08-13 01:13:58 +02:00
return createFramedPacket(PACKET_BUFFER.get(), packet);
2021-08-04 16:49:01 +02:00
}
2021-08-05 00:08:53 +02:00
2021-08-05 03:09:45 +02:00
public static ByteBuffer createFramedPacket(@NotNull ServerPacket packet, boolean compression) {
2021-08-13 01:13:58 +02:00
return createFramedPacket(PACKET_BUFFER.get(), packet, compression);
2021-08-05 03:09:45 +02:00
}
@ApiStatus.Internal
public static FramedPacket allocateTrimmedPacket(@NotNull ServerPacket packet) {
2021-08-05 00:08:53 +02:00
final var temp = PacketUtils.createFramedPacket(packet);
2021-08-24 21:21:51 +02:00
final var buffer = ByteBuffer.allocateDirect(temp.position()).put(temp.flip()).asReadOnlyBuffer();
return new FramedPacket(packet.getId(), buffer, packet);
2021-08-05 00:08:53 +02:00
}
@ApiStatus.Internal
2021-08-13 01:13:58 +02:00
public static final class LocalCache {
private static final Map<String, LocalCache> CACHES = new ConcurrentHashMap<>();
private final String name;
private final ThreadLocal<ByteBuffer> cache;
2021-08-13 01:13:58 +02:00
private LocalCache(String name, int size) {
this.name = name;
this.cache = ThreadLocal.withInitial(() -> ByteBuffer.allocateDirect(size));
}
2021-08-13 01:13:58 +02:00
public static LocalCache get(String name, int size) {
return CACHES.computeIfAbsent(name, s -> new LocalCache(name, size));
}
public String name() {
return name;
}
2021-08-13 01:13:58 +02:00
public ByteBuffer get() {
return cache.get().clear();
}
}
2021-08-24 14:59:17 +02:00
private static volatile Map<Viewable, ViewableStorage> VIEWABLE_STORAGE_MAP = new ConcurrentHashMap<>();
private static class ViewableStorage {
private final Viewable viewable;
private final Map<Integer, Entry> entries = new ConcurrentHashMap<>();
private ViewableStorage(Viewable viewable) {
this.viewable = viewable;
}
private synchronized void append(PlayerConnection playerConnection, ServerPacket serverPacket) {
ViewableStorage.Entry entry = entries.computeIfAbsent(serverPacket.getId(), integer -> new ViewableStorage.Entry());
final boolean hasConnection = playerConnection != null;
var entityIdMap = entry.entityIdMap;
if (hasConnection && entityIdMap.containsKey(playerConnection)) return;
BinaryBuffer buffer = entry.buffer;
final int start = buffer.writerOffset();
final ByteBuffer framedPacket = createFramedPacket(serverPacket);
buffer.write(framedPacket.flip());
final int end = buffer.writerOffset();
if (hasConnection) {
entityIdMap.put(playerConnection, IntIntPair.of(start, end));
}
}
private void process() {
this.entries.forEach((integer, entry) -> {
final var entityIdMap = entry.entityIdMap;
BinaryBuffer buffer = entry.buffer;
final int readable = buffer.readableBytes();
final Set<Player> viewers = viewable.getViewers();
if (viewers.isEmpty()) return;
for (Player player : viewers) {
PlayerConnection connection = player.getPlayerConnection();
Consumer<ByteBuffer> writer = connection instanceof PlayerSocketConnection
? ((PlayerSocketConnection) connection)::write :
byteBuffer -> {
// TODO for non-socket connection
};
final var pair = entityIdMap.get(connection);
if (pair != null) {
final int start = pair.leftInt();
final int end = pair.rightInt();
if (start == 0) {
writer.accept(buffer.asByteBuffer(end, readable - end));
} else if (end == readable) {
writer.accept(buffer.asByteBuffer(0, start));
} else {
writer.accept(buffer.asByteBuffer(0, start));
writer.accept(buffer.asByteBuffer(end, readable - end));
}
} else {
2021-08-24 21:21:51 +02:00
ByteBuffer result = buffer.asByteBuffer(0, buffer.writerOffset());
2021-08-24 14:59:17 +02:00
result.position(result.limit());
writer.accept(result);
}
}
});
}
private static class Entry {
Map<PlayerConnection, IntIntPair> entityIdMap = new ConcurrentHashMap<>();
BinaryBuffer buffer = BinaryBuffer.ofSize(Server.SOCKET_BUFFER_SIZE);
}
}
public static void prepareGroupedPacket(@NotNull Viewable viewable, @NotNull ServerPacket serverPacket,
@Nullable Entity entity) {
if (entity != null && !entity.isAutoViewable()) {
// Operation cannot be optimized
entity.sendPacketToViewers(serverPacket);
return;
}
final PlayerConnection playerConnection = entity instanceof Player ? ((Player) entity).getPlayerConnection() : null;
ViewableStorage viewableStorage = VIEWABLE_STORAGE_MAP.computeIfAbsent(viewable, c -> new ViewableStorage(viewable));
viewableStorage.append(playerConnection, serverPacket);
}
public static void prepareGroupedPacket(@NotNull Viewable viewable, @NotNull ServerPacket serverPacket) {
prepareGroupedPacket(viewable, serverPacket, null);
}
public static void flush() {
2021-08-24 21:21:51 +02:00
final var map = VIEWABLE_STORAGE_MAP;
2021-08-24 14:59:17 +02:00
VIEWABLE_STORAGE_MAP = new ConcurrentHashMap<>();
2021-08-24 21:21:51 +02:00
for (ViewableStorage viewableStorage : map.values()) {
if (viewableStorage.entries.isEmpty()) continue;
2021-08-24 14:59:17 +02:00
viewableStorage.process();
2021-08-24 21:21:51 +02:00
}
2021-08-24 14:59:17 +02:00
}
2019-08-22 14:52:32 +02:00
}