mirror of
https://github.com/Minestom/Minestom.git
synced 2024-09-29 15:07:36 +02:00
Simplify packet reading
Signed-off-by: TheMode <themode@outlook.fr>
This commit is contained in:
parent
0c5b37ed18
commit
ee95015ed3
@ -37,6 +37,7 @@ import java.util.*;
|
|||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.zip.DataFormatException;
|
import java.util.zip.DataFormatException;
|
||||||
|
import java.util.zip.Inflater;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Represents a socket connection.
|
* Represents a socket connection.
|
||||||
@ -88,7 +89,7 @@ public class PlayerSocketConnection extends PlayerConnection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void processPackets(Worker.Context workerContext, PacketProcessor packetProcessor) {
|
public void processPackets(Worker.Context workerContext, PacketProcessor packetProcessor) {
|
||||||
final var readBuffer = workerContext.readBuffer;
|
final BinaryBuffer readBuffer = workerContext.readBuffer;
|
||||||
// Decrypt data
|
// Decrypt data
|
||||||
if (encrypted) {
|
if (encrypted) {
|
||||||
final Cipher cipher = decryptCipher;
|
final Cipher cipher = decryptCipher;
|
||||||
@ -104,7 +105,6 @@ public class PlayerSocketConnection extends PlayerConnection {
|
|||||||
readBuffer.clear();
|
readBuffer.clear();
|
||||||
readBuffer.writeBytes(output);
|
readBuffer.writeBytes(output);
|
||||||
}
|
}
|
||||||
final int limit = readBuffer.writerOffset();
|
|
||||||
// Read all packets
|
// Read all packets
|
||||||
while (readBuffer.readableBytes() > 0) {
|
while (readBuffer.readableBytes() > 0) {
|
||||||
final var beginMark = readBuffer.mark();
|
final var beginMark = readBuffer.mark();
|
||||||
@ -112,42 +112,35 @@ public class PlayerSocketConnection extends PlayerConnection {
|
|||||||
// Ensure that the buffer contains the full packet (or wait for next socket read)
|
// Ensure that the buffer contains the full packet (or wait for next socket read)
|
||||||
final int packetLength = readBuffer.readVarInt();
|
final int packetLength = readBuffer.readVarInt();
|
||||||
final int readerStart = readBuffer.readerOffset();
|
final int readerStart = readBuffer.readerOffset();
|
||||||
final int packetEnd = readerStart + packetLength;
|
if (!readBuffer.canRead(packetLength)) {
|
||||||
if (packetEnd > readBuffer.writerOffset()) {
|
|
||||||
// Integrity fail
|
// Integrity fail
|
||||||
throw new BufferUnderflowException();
|
throw new BufferUnderflowException();
|
||||||
}
|
}
|
||||||
// Read packet https://wiki.vg/Protocol#Packet_format
|
// Read packet https://wiki.vg/Protocol#Packet_format
|
||||||
BinaryBuffer content;
|
BinaryBuffer content = readBuffer;
|
||||||
int payloadLength;
|
int decompressedSize = packetLength;
|
||||||
if (!compressed) {
|
if (compressed) {
|
||||||
// Compression disabled, payload is following
|
|
||||||
content = readBuffer;
|
|
||||||
payloadLength = packetLength;
|
|
||||||
} else {
|
|
||||||
final int dataLength = readBuffer.readVarInt();
|
final int dataLength = readBuffer.readVarInt();
|
||||||
|
final int payloadLength = packetLength - (readBuffer.readerOffset() - readerStart);
|
||||||
if (dataLength == 0) {
|
if (dataLength == 0) {
|
||||||
// Data is too small to be compressed, payload is following
|
// Data is too small to be compressed, payload is following
|
||||||
content = readBuffer;
|
decompressedSize = payloadLength;
|
||||||
payloadLength = packetLength - (content.readerOffset() - readerStart);
|
|
||||||
} else {
|
} else {
|
||||||
// Decompress to content buffer
|
// Decompress to content buffer
|
||||||
content = workerContext.contentBuffer;
|
content = workerContext.contentBuffer.clear();
|
||||||
payloadLength = dataLength;
|
decompressedSize = dataLength;
|
||||||
final var contentStartMark = content.mark();
|
|
||||||
try {
|
try {
|
||||||
final var inflater = workerContext.inflater;
|
Inflater inflater = workerContext.inflater;
|
||||||
inflater.setInput(readBuffer.asByteBuffer(readBuffer.readerOffset(), packetEnd));
|
inflater.setInput(readBuffer.asByteBuffer(readBuffer.readerOffset(), payloadLength));
|
||||||
inflater.inflate(content.asByteBuffer(0, content.capacity()));
|
inflater.inflate(content.asByteBuffer(0, dataLength));
|
||||||
inflater.reset();
|
inflater.reset();
|
||||||
} catch (DataFormatException e) {
|
} catch (DataFormatException e) {
|
||||||
MinecraftServer.getExceptionManager().handleException(e);
|
MinecraftServer.getExceptionManager().handleException(e);
|
||||||
}
|
}
|
||||||
content.reset(contentStartMark);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Process packet
|
// Process packet
|
||||||
ByteBuffer payload = content.asByteBuffer(content.readerOffset(), payloadLength);
|
ByteBuffer payload = content.asByteBuffer(content.readerOffset(), decompressedSize);
|
||||||
final int packetId = Utils.readVarInt(payload);
|
final int packetId = Utils.readVarInt(payload);
|
||||||
try {
|
try {
|
||||||
packetProcessor.process(this, packetId, payload);
|
packetProcessor.process(this, packetId, payload);
|
||||||
@ -162,7 +155,7 @@ public class PlayerSocketConnection extends PlayerConnection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Position buffer to read the next packet
|
// Position buffer to read the next packet
|
||||||
readBuffer.reset(packetEnd, limit);
|
readBuffer.readerOffset(readerStart + packetLength);
|
||||||
} catch (BufferUnderflowException e) {
|
} catch (BufferUnderflowException e) {
|
||||||
readBuffer.reset(beginMark);
|
readBuffer.reset(beginMark);
|
||||||
this.cacheBuffer = BinaryBuffer.copy(readBuffer);
|
this.cacheBuffer = BinaryBuffer.copy(readBuffer);
|
||||||
|
@ -8,6 +8,7 @@ import net.minestom.server.utils.binary.BinaryBuffer;
|
|||||||
import org.jetbrains.annotations.ApiStatus;
|
import org.jetbrains.annotations.ApiStatus;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.Socket;
|
||||||
import java.nio.channels.SelectionKey;
|
import java.nio.channels.SelectionKey;
|
||||||
import java.nio.channels.Selector;
|
import java.nio.channels.Selector;
|
||||||
import java.nio.channels.SocketChannel;
|
import java.nio.channels.SocketChannel;
|
||||||
@ -42,7 +43,7 @@ public final class Worker extends Thread {
|
|||||||
if (!key.isReadable()) return;
|
if (!key.isReadable()) return;
|
||||||
PlayerSocketConnection connection = connectionMap.get(channel);
|
PlayerSocketConnection connection = connectionMap.get(channel);
|
||||||
try {
|
try {
|
||||||
var readBuffer = context.readBuffer;
|
BinaryBuffer readBuffer = context.readBuffer.clear();
|
||||||
// Consume last incomplete packet
|
// Consume last incomplete packet
|
||||||
connection.consumeCache(readBuffer);
|
connection.consumeCache(readBuffer);
|
||||||
// Read & process
|
// Read & process
|
||||||
@ -51,8 +52,6 @@ public final class Worker extends Thread {
|
|||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// TODO print exception? (should ignore disconnection)
|
// TODO print exception? (should ignore disconnection)
|
||||||
connection.disconnect();
|
connection.disconnect();
|
||||||
} finally {
|
|
||||||
context.clearBuffers();
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
@ -80,7 +79,7 @@ public final class Worker extends Thread {
|
|||||||
this.connectionMap.put(channel, new PlayerSocketConnection(this, channel, channel.getRemoteAddress()));
|
this.connectionMap.put(channel, new PlayerSocketConnection(this, channel, channel.getRemoteAddress()));
|
||||||
channel.configureBlocking(false);
|
channel.configureBlocking(false);
|
||||||
channel.register(selector, SelectionKey.OP_READ);
|
channel.register(selector, SelectionKey.OP_READ);
|
||||||
var socket = channel.socket();
|
Socket socket = channel.socket();
|
||||||
socket.setSendBufferSize(Server.SOCKET_SEND_BUFFER_SIZE);
|
socket.setSendBufferSize(Server.SOCKET_SEND_BUFFER_SIZE);
|
||||||
socket.setReceiveBufferSize(Server.SOCKET_RECEIVE_BUFFER_SIZE);
|
socket.setReceiveBufferSize(Server.SOCKET_RECEIVE_BUFFER_SIZE);
|
||||||
socket.setTcpNoDelay(Server.NO_DELAY);
|
socket.setTcpNoDelay(Server.NO_DELAY);
|
||||||
@ -95,10 +94,5 @@ public final class Worker extends Thread {
|
|||||||
public final BinaryBuffer readBuffer = BinaryBuffer.ofSize(Server.MAX_PACKET_SIZE);
|
public final BinaryBuffer readBuffer = BinaryBuffer.ofSize(Server.MAX_PACKET_SIZE);
|
||||||
public final BinaryBuffer contentBuffer = BinaryBuffer.ofSize(Server.MAX_PACKET_SIZE);
|
public final BinaryBuffer contentBuffer = BinaryBuffer.ofSize(Server.MAX_PACKET_SIZE);
|
||||||
public final Inflater inflater = new Inflater();
|
public final Inflater inflater = new Inflater();
|
||||||
|
|
||||||
void clearBuffers() {
|
|
||||||
this.readBuffer.clear();
|
|
||||||
this.contentBuffer.clear();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -78,6 +78,10 @@ public final class BinaryBuffer {
|
|||||||
reset(marker.readerOffset(), marker.writerOffset());
|
reset(marker.readerOffset(), marker.writerOffset());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean canRead(int size) {
|
||||||
|
return readerOffset + size < capacity;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean canWrite(int size) {
|
public boolean canWrite(int size) {
|
||||||
return writerOffset + size < capacity;
|
return writerOffset + size < capacity;
|
||||||
}
|
}
|
||||||
@ -90,6 +94,10 @@ public final class BinaryBuffer {
|
|||||||
return readerOffset;
|
return readerOffset;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void readerOffset(int offset) {
|
||||||
|
this.readerOffset = offset;
|
||||||
|
}
|
||||||
|
|
||||||
public int writerOffset() {
|
public int writerOffset() {
|
||||||
return writerOffset;
|
return writerOffset;
|
||||||
}
|
}
|
||||||
@ -114,9 +122,10 @@ public final class BinaryBuffer {
|
|||||||
return readBytes(readableBytes());
|
return readBytes(readableBytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clear() {
|
public BinaryBuffer clear() {
|
||||||
this.readerOffset = 0;
|
this.readerOffset = 0;
|
||||||
this.writerOffset = 0;
|
this.writerOffset = 0;
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ByteBuffer asByteBuffer(int reader, int writer) {
|
public ByteBuffer asByteBuffer(int reader, int writer) {
|
||||||
|
Loading…
Reference in New Issue
Block a user