Flush in worker threads

Signed-off-by: TheMode <themode@outlook.fr>
This commit is contained in:
TheMode 2021-11-16 21:53:33 +01:00
parent 62b9e80d46
commit a4522e44ed
3 changed files with 25 additions and 7 deletions

View File

@ -6,10 +6,10 @@ import net.minestom.server.instance.Instance;
import net.minestom.server.instance.InstanceManager; import net.minestom.server.instance.InstanceManager;
import net.minestom.server.monitoring.TickMonitor; import net.minestom.server.monitoring.TickMonitor;
import net.minestom.server.network.ConnectionManager; import net.minestom.server.network.ConnectionManager;
import net.minestom.server.network.socket.Worker;
import net.minestom.server.thread.MinestomThread; import net.minestom.server.thread.MinestomThread;
import net.minestom.server.thread.ThreadDispatcher; import net.minestom.server.thread.ThreadDispatcher;
import net.minestom.server.utils.PacketUtils; import net.minestom.server.utils.PacketUtils;
import net.minestom.server.utils.async.AsyncUtils;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import java.util.List; import java.util.List;
@ -166,6 +166,7 @@ public final class UpdateManager {
@Override @Override
public void run() { public void run() {
final ConnectionManager connectionManager = MinecraftServer.getConnectionManager(); final ConnectionManager connectionManager = MinecraftServer.getConnectionManager();
final List<Worker> workers = MinecraftServer.getServer().workers();
while (!stopRequested) { while (!stopRequested) {
try { try {
long currentTime = System.nanoTime(); long currentTime = System.nanoTime();
@ -185,10 +186,7 @@ public final class UpdateManager {
// Flush all waiting packets // Flush all waiting packets
PacketUtils.flush(); PacketUtils.flush();
AsyncUtils.runAsync(() -> MinecraftServer.getConnectionManager() workers.forEach(Worker::flush);
.getOnlinePlayers()
.parallelStream()
.forEach(player -> player.getPlayerConnection().flush()));
// the time that the tick took in nanoseconds // the time that the tick took in nanoseconds
final long tickTime = System.nanoTime() - currentTime; final long tickTime = System.nanoTime() - currentTime;

View File

@ -26,7 +26,7 @@ public final class Server {
private volatile boolean stop; private volatile boolean stop;
private final Selector selector = Selector.open(); private final Selector selector = Selector.open();
private final List<Worker> workers = new ArrayList<>(WORKER_COUNT); private final List<Worker> workers;
private int index; private int index;
private ServerSocketChannel serverSocket; private ServerSocketChannel serverSocket;
@ -35,11 +35,13 @@ public final class Server {
public Server(PacketProcessor packetProcessor) throws IOException { public Server(PacketProcessor packetProcessor) throws IOException {
// Create all workers // Create all workers
List<Worker> workers = new ArrayList<>(WORKER_COUNT);
for (int i = 0; i < WORKER_COUNT; i++) { for (int i = 0; i < WORKER_COUNT; i++) {
Worker worker = new Worker(this, packetProcessor); Worker worker = new Worker(this, packetProcessor);
this.workers.add(worker); workers.add(worker);
worker.start(); worker.start();
} }
this.workers = List.copyOf(workers);
} }
@ApiStatus.Internal @ApiStatus.Internal
@ -90,6 +92,11 @@ public final class Server {
this.workers.forEach(worker -> worker.selector.wakeup()); this.workers.forEach(worker -> worker.selector.wakeup());
} }
@ApiStatus.Internal
public List<Worker> workers() {
return workers;
}
public String getAddress() { public String getAddress() {
return address; return address;
} }

View File

@ -16,6 +16,7 @@ import java.nio.channels.Selector;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.Inflater; import java.util.zip.Inflater;
@ -29,6 +30,8 @@ public final class Worker extends MinestomThread {
private final Server server; private final Server server;
private final PacketProcessor packetProcessor; private final PacketProcessor packetProcessor;
private final AtomicBoolean flush = new AtomicBoolean();
public Worker(Server server, PacketProcessor packetProcessor) throws IOException { public Worker(Server server, PacketProcessor packetProcessor) throws IOException {
super("Ms-worker-" + COUNTER.getAndIncrement()); super("Ms-worker-" + COUNTER.getAndIncrement());
this.server = server; this.server = server;
@ -39,6 +42,11 @@ public final class Worker extends MinestomThread {
public void run() { public void run() {
while (server.isOpen()) { while (server.isOpen()) {
try { try {
// Flush all connections if needed
if (flush.compareAndSet(true, false)) {
connectionMap.values().forEach(PlayerSocketConnection::flush);
}
// Wait for an event
this.selector.select(key -> { this.selector.select(key -> {
final SocketChannel channel = (SocketChannel) key.channel(); final SocketChannel channel = (SocketChannel) key.channel();
if (!channel.isOpen()) return; if (!channel.isOpen()) return;
@ -92,6 +100,11 @@ public final class Worker extends MinestomThread {
this.selector.wakeup(); this.selector.wakeup();
} }
public void flush() {
this.flush.set(true);
this.selector.wakeup();
}
/** /**
* Contains objects that we can be shared across all the connection of a {@link Worker worker}. * Contains objects that we can be shared across all the connection of a {@link Worker worker}.
*/ */