From 3881102925314ed370bb6aa1470018ae1911c91b Mon Sep 17 00:00:00 2001 From: TheMode Date: Sun, 8 Aug 2021 21:28:38 +0200 Subject: [PATCH] Make `Worker` a thread --- .../server/network/socket/Server.java | 4 +- .../server/network/socket/Worker.java | 87 +++++++------------ 2 files changed, 36 insertions(+), 55 deletions(-) diff --git a/src/main/java/net/minestom/server/network/socket/Server.java b/src/main/java/net/minestom/server/network/socket/Server.java index 8fda5710f..ccadcc56e 100644 --- a/src/main/java/net/minestom/server/network/socket/Server.java +++ b/src/main/java/net/minestom/server/network/socket/Server.java @@ -35,7 +35,9 @@ public final class Server { public Server(PacketProcessor packetProcessor) throws IOException { // Create all workers for (int i = 0; i < WORKER_COUNT; i++) { - this.workers.add(new Worker(this, packetProcessor)); + Worker worker = new Worker(this, packetProcessor); + this.workers.add(worker); + worker.start(); } } diff --git a/src/main/java/net/minestom/server/network/socket/Worker.java b/src/main/java/net/minestom/server/network/socket/Worker.java index 4cad204c2..80487897a 100644 --- a/src/main/java/net/minestom/server/network/socket/Worker.java +++ b/src/main/java/net/minestom/server/network/socket/Worker.java @@ -14,43 +14,50 @@ import java.nio.channels.SocketChannel; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; import java.util.zip.Inflater; @ApiStatus.Internal -public final class Worker { +public final class Worker extends Thread { + private static final AtomicInteger COUNTER = new AtomicInteger(); + final Selector selector = Selector.open(); + private final Context context = new Context(); private final Map connectionMap = new ConcurrentHashMap<>(); + private final Server server; private final PacketProcessor packetProcessor; public Worker(Server server, PacketProcessor packetProcessor) throws IOException { + super(null, null, "Ms-worker-" + COUNTER.getAndIncrement()); + this.server = server; this.packetProcessor = packetProcessor; - Thread.start(server, this::threadTick); } - private void threadTick(Context workerContext) { - try { - this.selector.select(key -> { - final SocketChannel channel = (SocketChannel) key.channel(); - if (!channel.isOpen()) return; - if (!key.isReadable()) return; - var connection = connectionMap.get(channel); - try { - var readBuffer = workerContext.readBuffer; - // Consume last incomplete packet - connection.consumeCache(readBuffer); - // Read & process - readBuffer.readChannel(channel); - connection.processPackets(workerContext, packetProcessor); - } catch (IOException e) { - // TODO print exception? (should ignore disconnection) - connection.disconnect(); - } finally { - workerContext.clearBuffers(); - } - }); - } catch (IOException e) { - MinecraftServer.getExceptionManager().handleException(e); + @Override + public void run() { + while (server.isOpen()) { + try { + this.selector.select(key -> { + final SocketChannel channel = (SocketChannel) key.channel(); + if (!channel.isOpen()) return; + if (!key.isReadable()) return; + var connection = connectionMap.get(channel); + try { + var readBuffer = context.readBuffer; + // Consume last incomplete packet + connection.consumeCache(readBuffer); + // Read & process + readBuffer.readChannel(channel); + connection.processPackets(context, packetProcessor); + } catch (IOException e) { + // TODO print exception? (should ignore disconnection) + connection.disconnect(); + } finally { + context.clearBuffers(); + } + }); + } catch (IOException e) { + MinecraftServer.getExceptionManager().handleException(e); + } } } @@ -80,39 +87,11 @@ public final class Worker { } } - /** - * Thread responsible for reading players socket and forwarding packets into - * players' packet queue. - */ - static final class Thread extends java.lang.Thread { - private static final AtomicInteger COUNTER = new AtomicInteger(); - - private Thread(Runnable runnable) { - super(null, runnable, "Ms-worker-" + COUNTER.getAndIncrement()); - } - - static void start(Server server, Consumer runnable) { - new Thread(() -> { - Context context = new Context(); - while (server.isOpen()) { - try { - runnable.accept(context); - } catch (Exception e) { - e.printStackTrace(); - } - } - }).start(); - } - } - /** * Contains objects that we can be shared across all the connection of a {@link Worker worker}. */ public static final class Context { public final BinaryBuffer readBuffer = BinaryBuffer.ofSize(Server.SOCKET_BUFFER_SIZE); - /** - * Stores a single packet payload to be read. - */ public final BinaryBuffer contentBuffer = BinaryBuffer.ofSize(Server.MAX_PACKET_SIZE); public final Inflater inflater = new Inflater();