Async flush (#492)

This commit is contained in:
TheMode 2021-10-16 19:55:24 +02:00 committed by GitHub
parent 13a8a22546
commit 06189ee783
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 22 deletions

View File

@ -9,6 +9,7 @@ import net.minestom.server.network.ConnectionManager;
import net.minestom.server.thread.MinestomThread;
import net.minestom.server.thread.ThreadDispatcher;
import net.minestom.server.utils.PacketUtils;
import net.minestom.server.utils.async.AsyncUtils;
import org.jetbrains.annotations.NotNull;
import java.util.List;
@ -184,8 +185,10 @@ public final class UpdateManager {
// Flush all waiting packets
PacketUtils.flush();
connectionManager.getOnlinePlayers().parallelStream().forEach(player ->
player.getPlayerConnection().flush());
AsyncUtils.runAsync(() -> MinecraftServer.getConnectionManager()
.getOnlinePlayers()
.parallelStream()
.forEach(player -> player.getPlayerConnection().flush()));
// the time that the tick took in nanoseconds
final long tickTime = System.nanoTime() - currentTime;

View File

@ -75,6 +75,7 @@ public class PlayerSocketConnection extends PlayerConnection {
private PlayerSkin bungeeSkin;
private final Object bufferLock = new Object();
private final Object flushLock = new Object();
private final List<BinaryBuffer> waitingBuffers = new ArrayList<>();
private final AtomicReference<BinaryBuffer> tickBuffer = new AtomicReference<>(PooledBuffers.get());
private volatile BinaryBuffer cacheBuffer;
@ -246,20 +247,14 @@ public class PlayerSocketConnection extends PlayerConnection {
final int capacity = localBuffer.capacity();
final int size = buffer.remaining();
if (size <= capacity) {
if (!localBuffer.canWrite(size)) {
flush();
localBuffer = tickBuffer.getPlain();
}
if (!localBuffer.canWrite(size)) localBuffer = updateLocalBuffer();
localBuffer.write(buffer);
} else {
final int bufferCount = size / capacity + 1;
for (int i = 0; i < bufferCount; i++) {
buffer.position(i * capacity);
buffer.limit(Math.min(size, buffer.position() + capacity));
if (!localBuffer.canWrite(buffer.remaining())) {
flush();
localBuffer = tickBuffer.getPlain();
}
if (!localBuffer.canWrite(buffer.remaining())) localBuffer = updateLocalBuffer();
localBuffer.write(buffer);
}
}
@ -281,19 +276,10 @@ public class PlayerSocketConnection extends PlayerConnection {
public void flush() {
try {
synchronized (bufferLock) {
final BinaryBuffer localBuffer = tickBuffer.getPlain();
final boolean emptyWaitingList = waitingBuffers.isEmpty();
if (localBuffer.readableBytes() == 0 && emptyWaitingList) return;
updateLocalBuffer();
}
synchronized (flushLock) {
try {
// Try to write the current buffer
if (emptyWaitingList && localBuffer.writeChannel(channel)) {
// Can reuse buffer
localBuffer.clear();
return;
}
this.tickBuffer.setPlain(PooledBuffers.get());
this.waitingBuffers.add(localBuffer);
if (emptyWaitingList) return;
// Write as much as possible from the waiting list
Iterator<BinaryBuffer> iterator = waitingBuffers.iterator();
while (iterator.hasNext()) {
@ -316,6 +302,15 @@ public class PlayerSocketConnection extends PlayerConnection {
}
}
private BinaryBuffer updateLocalBuffer() {
synchronized (flushLock) {
BinaryBuffer newBuffer = PooledBuffers.get();
this.waitingBuffers.add(tickBuffer.getPlain());
this.tickBuffer.setPlain(newBuffer);
return newBuffer;
}
}
@Override
public @NotNull SocketAddress getRemoteAddress() {
return remoteAddress;