feat: implement 1.20.2 client-rate-limited chunk batching

(cherry picked from commit e24cb62583)
This commit is contained in:
mworzala 2024-01-06 02:43:43 -05:00 committed by Matt Worzala
parent 0bb237f737
commit 6f30edb411
9 changed files with 155 additions and 61 deletions

View File

@ -25,7 +25,6 @@ import java.time.Duration;
public class Main {
public static void main(String[] args) {
System.setProperty("minestom.use-new-chunk-sending", "true");
System.setProperty("minestom.experiment.pose-updates", "true");
MinecraftServer.setCompressionThreshold(0);

View File

@ -1,6 +1,7 @@
package net.minestom.server.entity;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongArrayPriorityQueue;
import it.unimi.dsi.fastutil.longs.LongPriorityQueue;
import net.kyori.adventure.audience.MessageType;
import net.kyori.adventure.bossbar.BossBar;
import net.kyori.adventure.identity.Identified;
@ -57,7 +58,10 @@ import net.minestom.server.network.PlayerProvider;
import net.minestom.server.network.packet.client.ClientPacket;
import net.minestom.server.network.packet.server.SendablePacket;
import net.minestom.server.network.packet.server.ServerPacket;
import net.minestom.server.network.packet.server.common.*;
import net.minestom.server.network.packet.server.common.DisconnectPacket;
import net.minestom.server.network.packet.server.common.KeepAlivePacket;
import net.minestom.server.network.packet.server.common.PluginMessagePacket;
import net.minestom.server.network.packet.server.common.ResourcePackPushPacket;
import net.minestom.server.network.packet.server.login.LoginDisconnectPacket;
import net.minestom.server.network.packet.server.play.*;
import net.minestom.server.network.packet.server.play.data.DeathLocation;
@ -75,7 +79,6 @@ import net.minestom.server.snapshot.SnapshotImpl;
import net.minestom.server.snapshot.SnapshotUpdater;
import net.minestom.server.statistic.PlayerStatistic;
import net.minestom.server.timer.Scheduler;
import net.minestom.server.timer.TaskSchedule;
import net.minestom.server.utils.MathUtils;
import net.minestom.server.utils.PacketUtils;
import net.minestom.server.utils.async.AsyncUtils;
@ -94,6 +97,8 @@ import org.jctools.queues.MpscUnboundedXaddArrayQueue;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
@ -101,8 +106,8 @@ import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
/**
@ -112,8 +117,13 @@ import java.util.function.UnaryOperator;
* You can easily create your own implementation of this and use it with {@link ConnectionManager#setPlayerProvider(PlayerProvider)}.
*/
public class Player extends LivingEntity implements CommandSender, Localizable, HoverEventSource<ShowEntity>, Identified, NamedAndIdentified {
private static final Logger logger = LoggerFactory.getLogger(Player.class);
private static final Component REMOVE_MESSAGE = Component.text("You have been removed from the server without reason.", NamedTextColor.RED);
private static final float MIN_CHUNKS_PER_TICK = 0.01f;
private static final float MAX_CHUNKS_PER_TICK = 64.0f;
public static final boolean EXPERIMENT_PERFORM_POSE_UPDATES = Boolean.getBoolean("minestom.experiment.pose-updates");
private long lastKeepAlive;
@ -131,21 +141,30 @@ public class Player extends LivingEntity implements CommandSender, Localizable,
private DimensionType dimensionType;
private GameMode gameMode;
private DeathLocation deathLocation;
/**
* Keeps track of what chunks are sent to the client, this defines the center of the loaded area
* in the range of {@link MinecraftServer#getChunkViewDistance()}
*/
private Vec chunksLoadedByClient = Vec.ZERO;
private final ReentrantLock chunkQueueLock = new ReentrantLock();
private final LongPriorityQueue chunkQueue = new LongArrayPriorityQueue(this::compareChunkDistance);
private float targetChunksPerTick = 9f; // Always send 9 chunks immediately
private float pendingChunkCount = 0f; // Number of chunks to send on the current tick (ie 0.5 means we cannot send a chunk yet, 1.5 would send a single chunk with a 0.5 remainder)
private int maxChunkBatchLead = 1; // Maximum number of batches to send before waiting for a reply
private int chunkBatchLead = 0; // Number of batches sent without a reply
final IntegerBiConsumer chunkAdder = (chunkX, chunkZ) -> {
// Load new chunks
this.instance.loadOptionalChunk(chunkX, chunkZ).thenAccept(chunk -> {
if (chunk == null) return;
chunkQueueLock.lock();
try {
if (chunk != null) {
chunk.sendChunk(this);
EventDispatcher.call(new PlayerChunkLoadEvent(this, chunkX, chunkZ));
}
chunkQueue.enqueue(ChunkUtils.getChunkIndex(chunkX, chunkZ));
} catch (Exception e) {
MinecraftServer.getExceptionManager().handleException(e);
} finally {
chunkQueueLock.unlock();
}
});
};
@ -158,7 +177,8 @@ public class Player extends LivingEntity implements CommandSender, Localizable,
private final AtomicInteger teleportId = new AtomicInteger();
private int receivedTeleportId;
private record PacketInState(ConnectionState state, ClientPacket packet) {}
private record PacketInState(ConnectionState state, ClientPacket packet) {
}
private final MessagePassingQueue<PacketInState> packets = new MpscUnboundedXaddArrayQueue<>(32);
private final boolean levelFlat;
@ -387,6 +407,9 @@ public class Player extends LivingEntity implements CommandSender, Localizable,
// It is possible to be removed during packet processing, if thats the case exit immediately.
if (isRemoved()) return;
// Send any available queued chunks
sendPendingChunks();
super.update(time); // Super update (item pickup/fire management)
// Experience orb pickup
@ -508,16 +531,7 @@ public class Player extends LivingEntity implements CommandSender, Localizable,
Pos respawnPosition = respawnEvent.getRespawnPosition();
// The client unloads chunks when respawning, so resend all chunks next to spawn
ChunkUtils.forChunksInRange(respawnPosition, Math.min(MinecraftServer.getChunkViewDistance(), settings.getViewDistance()), (chunkX, chunkZ) ->
this.instance.loadOptionalChunk(chunkX, chunkZ).thenAccept(chunk -> {
try {
if (chunk != null) {
chunk.sendChunk(this);
}
} catch (Exception e) {
MinecraftServer.getExceptionManager().handleException(e);
}
}));
ChunkUtils.forChunksInRange(respawnPosition, settings.getEffectiveViewDistance(), chunkAdder);
chunksLoadedByClient = new Vec(respawnPosition.chunkX(), respawnPosition.chunkZ());
// Client also needs all entities resent to them, since those are unloaded as well
this.instance.getEntityTracker().nearbyEntitiesByChunkRange(respawnPosition, Math.min(MinecraftServer.getChunkViewDistance(), settings.getViewDistance()),
@ -721,26 +735,8 @@ public class Player extends LivingEntity implements CommandSender, Localizable,
chunkUpdateLimitChecker.addToHistory(getChunk());
sendPacket(new UpdateViewPositionPacket(chunkX, chunkZ));
if (ServerFlag.USE_NEW_CHUNK_SENDING) {
// FIXME: Improve this queueing. It is pretty scuffed
var chunkQueue = new LongArrayList();
ChunkUtils.forChunksInRange(spawnPosition, MinecraftServer.getChunkViewDistance(),
(x, z) -> chunkQueue.add(ChunkUtils.getChunkIndex(x, z)));
var iter = chunkQueue.iterator();
Supplier<TaskSchedule> taskRunnable = () -> {
for (int i = 0; i < ServerFlag.NEW_CHUNK_COUNT_PER_INTERVAL; i++) {
if (!iter.hasNext()) return TaskSchedule.stop();
var next = iter.nextLong();
chunkAdder.accept(ChunkUtils.getChunkCoordX(next), ChunkUtils.getChunkCoordZ(next));
}
return TaskSchedule.tick(ServerFlag.NEW_CHUNK_SEND_INTERVAL);
};
scheduler().submitTask(taskRunnable);
} else {
ChunkUtils.forChunksInRange(spawnPosition, MinecraftServer.getChunkViewDistance(), chunkAdder);
}
// Load the nearby chunks and queue them to be sent to them
ChunkUtils.forChunksInRange(spawnPosition, MinecraftServer.getChunkViewDistance(), chunkAdder);
}
synchronizePosition(true); // So the player doesn't get stuck
@ -768,6 +764,63 @@ public class Player extends LivingEntity implements CommandSender, Localizable,
EventDispatcher.call(new PlayerSpawnEvent(this, instance, firstSpawn));
}
@ApiStatus.Internal
public void onChunkBatchReceived(float newTargetChunksPerTick) {
// logger.debug("chunk batch received player={} chunks/tick={} lead={}", username, newTargetChunksPerTick, chunkBatchLead);
chunkBatchLead -= 1;
targetChunksPerTick = Float.isNaN(newTargetChunksPerTick) ? MIN_CHUNKS_PER_TICK
: MathUtils.clamp(newTargetChunksPerTick, MIN_CHUNKS_PER_TICK, MAX_CHUNKS_PER_TICK);
// Beyond the first batch we can preemptively send up to 10 (matching mojang server)
if (maxChunkBatchLead == 1) maxChunkBatchLead = 10;
}
/**
* Queues the given chunk to be sent to the player.
* @param chunk The chunk to send
*/
public void sendChunk(@NotNull Chunk chunk) {
if (!chunk.isLoaded()) return;
chunkQueueLock.lock();
try {
chunkQueue.enqueue(ChunkUtils.getChunkIndex(chunk.getChunkX(), chunk.getChunkZ()));
} finally {
chunkQueueLock.unlock();
}
}
private void sendPendingChunks() {
// If we have nothing to send or have sent the max # of batches without reply, do nothing
if (chunkQueue.isEmpty() || chunkBatchLead >= maxChunkBatchLead) return;
// Increment the pending chunk count by the target chunks per tick
pendingChunkCount = Math.min(pendingChunkCount + targetChunksPerTick, MAX_CHUNKS_PER_TICK);
if (pendingChunkCount < 1) return; // Cant send anything
chunkQueueLock.lock();
try {
int batchSize = 0;
sendPacket(new ChunkBatchStartPacket());
while (!chunkQueue.isEmpty() && pendingChunkCount >= 1f) {
long chunkIndex = chunkQueue.dequeueLong();
int chunkX = ChunkUtils.getChunkCoordX(chunkIndex), chunkZ = ChunkUtils.getChunkCoordZ(chunkIndex);
var chunk = instance.getChunk(chunkX, chunkZ);
if (chunk == null || !chunk.isLoaded()) continue;
sendPacket(chunk.getFullDataPacket());
EventDispatcher.call(new PlayerChunkLoadEvent(this, chunkX, chunkZ));
pendingChunkCount -= 1f;
batchSize += 1;
}
sendPacket(new ChunkBatchFinishedPacket(batchSize));
chunkBatchLead += 1;
// logger.debug("chunk batch sent player={} chunks={} lead={}", username, batchSize, chunkBatchLead);
} finally {
chunkQueueLock.unlock();
}
}
@Override
protected void updatePose() {
Pose oldPose = getPose();
@ -806,6 +859,7 @@ public class Player extends LivingEntity implements CommandSender, Localizable,
/**
* Returns true if the player can fit at the current position with the given {@link net.minestom.server.entity.Entity.Pose}, false otherwise.
*
* @param pose The pose to check
*/
private boolean canFitWithBoundingBox(@NotNull Pose pose) {
@ -2319,6 +2373,10 @@ public class Player extends LivingEntity implements CommandSender, Localizable,
return viewDistance;
}
public int getEffectiveViewDistance() {
return Math.min(getViewDistance(), MinecraftServer.getChunkViewDistance());
}
/**
* Gets the messages this player wants to receive.
*
@ -2392,4 +2450,14 @@ public class Player extends LivingEntity implements CommandSender, Localizable,
}
private int compareChunkDistance(long chunkIndexA, long chunkIndexB) {
int chunkAX = ChunkUtils.getChunkCoordX(chunkIndexA);
int chunkAZ = ChunkUtils.getChunkCoordZ(chunkIndexA);
int chunkBX = ChunkUtils.getChunkCoordX(chunkIndexB);
int chunkBZ = ChunkUtils.getChunkCoordZ(chunkIndexB);
int chunkDistanceA = Math.abs(chunkAX - chunksLoadedByClient.blockX()) + Math.abs(chunkAZ - chunksLoadedByClient.blockZ());
int chunkDistanceB = Math.abs(chunkBX - chunksLoadedByClient.blockX()) + Math.abs(chunkBZ - chunksLoadedByClient.blockZ());
return Integer.compare(chunkDistanceA, chunkDistanceB);
}
}

View File

@ -8,6 +8,7 @@ import net.minestom.server.entity.Player;
import net.minestom.server.entity.pathfinding.PFColumnarSpace;
import net.minestom.server.instance.block.Block;
import net.minestom.server.instance.block.BlockHandler;
import net.minestom.server.network.packet.server.SendablePacket;
import net.minestom.server.network.packet.server.play.ChunkDataPacket;
import net.minestom.server.snapshot.Snapshotable;
import net.minestom.server.tag.TagHandler;
@ -15,6 +16,7 @@ import net.minestom.server.tag.Taggable;
import net.minestom.server.utils.chunk.ChunkSupplier;
import net.minestom.server.utils.chunk.ChunkUtils;
import net.minestom.server.world.biomes.Biome;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -131,9 +133,16 @@ public abstract class Chunk implements Block.Getter, Block.Setter, Biome.Getter,
*
* @param player the player
*/
public abstract void sendChunk(@NotNull Player player);
public void sendChunk(@NotNull Player player) {
player.sendChunk(this);
}
public abstract void sendChunk();
public void sendChunk() {
getViewers().forEach(this::sendChunk);
}
@ApiStatus.Internal
public abstract @NotNull SendablePacket getFullDataPacket();
/**
* Creates a copy of this chunk, including blocks state id, custom block id, biomes, update data.

View File

@ -12,6 +12,7 @@ import net.minestom.server.instance.block.Block;
import net.minestom.server.instance.block.BlockHandler;
import net.minestom.server.network.NetworkBuffer;
import net.minestom.server.network.packet.server.CachedPacket;
import net.minestom.server.network.packet.server.SendablePacket;
import net.minestom.server.network.packet.server.play.ChunkDataPacket;
import net.minestom.server.network.packet.server.play.UpdateLightPacket;
import net.minestom.server.network.packet.server.play.data.ChunkData;
@ -190,15 +191,8 @@ public class DynamicChunk extends Chunk {
}
@Override
public void sendChunk(@NotNull Player player) {
if (!isLoaded()) return;
player.sendPacket(chunkCache);
}
@Override
public void sendChunk() {
if (!isLoaded()) return;
sendPacketToViewers(chunkCache);
public @NotNull SendablePacket getFullDataPacket() {
return chunkCache;
}
@Override

View File

@ -0,0 +1,12 @@
package net.minestom.server.listener;
import net.minestom.server.entity.Player;
import net.minestom.server.network.packet.client.play.ClientChunkBatchReceivedPacket;
import org.jetbrains.annotations.NotNull;
public final class ChunkBatchListener {
public static void batchReceivedListener(@NotNull ClientChunkBatchReceivedPacket packet, @NotNull Player player) {
player.onChunkBatchReceived(packet.targetChunksPerTick());
}
}

View File

@ -94,6 +94,7 @@ public final class PacketListenerManager {
setPlayListener(ClientSpectatePacket.class, SpectateListener::listener);
setPlayListener(ClientEditBookPacket.class, BookListener::listener);
setPlayListener(ClientChatSessionUpdatePacket.class, (packet, player) -> {/* empty */});
setPlayListener(ClientChunkBatchReceivedPacket.class, ChunkBatchListener::batchReceivedListener);
}
/**

View File

@ -96,7 +96,7 @@ public sealed class ClientPacketsHandler permits ClientPacketsHandler.Status, Cl
register(nextId(), ClientCommandChatPacket::new);
register(nextId(), ClientChatMessagePacket::new);
register(nextId(), ClientChatSessionUpdatePacket::new);
nextId(); // chunk batch received
register(nextId(), ClientChunkBatchReceivedPacket::new);
register(nextId(), ClientStatusPacket::new);
register(nextId(), ClientSettingsPacket::new);
register(nextId(), ClientTabCompletePacket::new);

View File

@ -0,0 +1,17 @@
package net.minestom.server.network.packet.client.play;
import net.minestom.server.network.NetworkBuffer;
import net.minestom.server.network.packet.client.ClientPacket;
import org.jetbrains.annotations.NotNull;
public record ClientChunkBatchReceivedPacket(float targetChunksPerTick) implements ClientPacket {
public ClientChunkBatchReceivedPacket(@NotNull NetworkBuffer reader) {
this(reader.read(NetworkBuffer.FLOAT));
}
@Override
public void write(@NotNull NetworkBuffer writer) {
writer.write(NetworkBuffer.FLOAT, targetChunksPerTick);
}
}

View File

@ -175,16 +175,10 @@ public final class ChunkUtils {
* which comes from kotlin port by <a href="https://github.com/Esophose">Esophose</a>, which comes from <a href="https://stackoverflow.com/questions/398299/looping-in-a-spiral">a stackoverflow answer</a>.
*/
public static void forChunksInRange(int chunkX, int chunkZ, int range, IntegerBiConsumer consumer) {
if (!ServerFlag.USE_NEW_CHUNK_SENDING) {
for (int x = -range; x <= range; ++x) {
for (int z = -range; z <= range; ++z) {
consumer.accept(chunkX + x, chunkZ + z);
}
}
return;
}
// Send in spiral around the center chunk
// Note: its not really required to start at the center anymore since the chunk queue is sorted by distance,
// however we still should send a circle so this method is still fine, and good for any other case a
// spiral might be needed.
consumer.accept(chunkX, chunkZ);
for (int id = 1; id < (range * 2 + 1) * (range * 2 + 1); id++) {
var index = id - 1;