Make Worker a thread

This commit is contained in:
TheMode 2021-08-08 21:28:38 +02:00
parent 99a3ad88a1
commit 3881102925
2 changed files with 36 additions and 55 deletions

View File

@ -35,7 +35,9 @@ public final class Server {
public Server(PacketProcessor packetProcessor) throws IOException { public Server(PacketProcessor packetProcessor) throws IOException {
// Create all workers // Create all workers
for (int i = 0; i < WORKER_COUNT; i++) { for (int i = 0; i < WORKER_COUNT; i++) {
this.workers.add(new Worker(this, packetProcessor)); Worker worker = new Worker(this, packetProcessor);
this.workers.add(worker);
worker.start();
} }
} }

View File

@ -14,43 +14,50 @@ import java.nio.channels.SocketChannel;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.zip.Inflater; import java.util.zip.Inflater;
@ApiStatus.Internal @ApiStatus.Internal
public final class Worker { public final class Worker extends Thread {
private static final AtomicInteger COUNTER = new AtomicInteger();
final Selector selector = Selector.open(); final Selector selector = Selector.open();
private final Context context = new Context();
private final Map<SocketChannel, PlayerSocketConnection> connectionMap = new ConcurrentHashMap<>(); private final Map<SocketChannel, PlayerSocketConnection> connectionMap = new ConcurrentHashMap<>();
private final Server server;
private final PacketProcessor packetProcessor; private final PacketProcessor packetProcessor;
public Worker(Server server, PacketProcessor packetProcessor) throws IOException { public Worker(Server server, PacketProcessor packetProcessor) throws IOException {
super(null, null, "Ms-worker-" + COUNTER.getAndIncrement());
this.server = server;
this.packetProcessor = packetProcessor; this.packetProcessor = packetProcessor;
Thread.start(server, this::threadTick);
} }
private void threadTick(Context workerContext) { @Override
try { public void run() {
this.selector.select(key -> { while (server.isOpen()) {
final SocketChannel channel = (SocketChannel) key.channel(); try {
if (!channel.isOpen()) return; this.selector.select(key -> {
if (!key.isReadable()) return; final SocketChannel channel = (SocketChannel) key.channel();
var connection = connectionMap.get(channel); if (!channel.isOpen()) return;
try { if (!key.isReadable()) return;
var readBuffer = workerContext.readBuffer; var connection = connectionMap.get(channel);
// Consume last incomplete packet try {
connection.consumeCache(readBuffer); var readBuffer = context.readBuffer;
// Read & process // Consume last incomplete packet
readBuffer.readChannel(channel); connection.consumeCache(readBuffer);
connection.processPackets(workerContext, packetProcessor); // Read & process
} catch (IOException e) { readBuffer.readChannel(channel);
// TODO print exception? (should ignore disconnection) connection.processPackets(context, packetProcessor);
connection.disconnect(); } catch (IOException e) {
} finally { // TODO print exception? (should ignore disconnection)
workerContext.clearBuffers(); connection.disconnect();
} } finally {
}); context.clearBuffers();
} catch (IOException e) { }
MinecraftServer.getExceptionManager().handleException(e); });
} catch (IOException e) {
MinecraftServer.getExceptionManager().handleException(e);
}
} }
} }
@ -80,39 +87,11 @@ public final class Worker {
} }
} }
/**
* Thread responsible for reading players socket and forwarding packets into
* players' packet queue.
*/
static final class Thread extends java.lang.Thread {
private static final AtomicInteger COUNTER = new AtomicInteger();
private Thread(Runnable runnable) {
super(null, runnable, "Ms-worker-" + COUNTER.getAndIncrement());
}
static void start(Server server, Consumer<Context> runnable) {
new Thread(() -> {
Context context = new Context();
while (server.isOpen()) {
try {
runnable.accept(context);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
/** /**
* Contains objects that we can be shared across all the connection of a {@link Worker worker}. * Contains objects that we can be shared across all the connection of a {@link Worker worker}.
*/ */
public static final class Context { public static final class Context {
public final BinaryBuffer readBuffer = BinaryBuffer.ofSize(Server.SOCKET_BUFFER_SIZE); public final BinaryBuffer readBuffer = BinaryBuffer.ofSize(Server.SOCKET_BUFFER_SIZE);
/**
* Stores a single packet payload to be read.
*/
public final BinaryBuffer contentBuffer = BinaryBuffer.ofSize(Server.MAX_PACKET_SIZE); public final BinaryBuffer contentBuffer = BinaryBuffer.ofSize(Server.MAX_PACKET_SIZE);
public final Inflater inflater = new Inflater(); public final Inflater inflater = new Inflater();