mirror of
https://github.com/Minestom/Minestom.git
synced 2025-01-04 23:47:59 +01:00
Add dedicated PooledBuffers class
Signed-off-by: TheMode <themode@outlook.fr>
This commit is contained in:
parent
f307e81f04
commit
e1a3c295a2
@ -16,6 +16,7 @@ import net.minestom.server.network.socket.Worker;
|
|||||||
import net.minestom.server.utils.PacketUtils;
|
import net.minestom.server.utils.PacketUtils;
|
||||||
import net.minestom.server.utils.Utils;
|
import net.minestom.server.utils.Utils;
|
||||||
import net.minestom.server.utils.binary.BinaryBuffer;
|
import net.minestom.server.utils.binary.BinaryBuffer;
|
||||||
|
import net.minestom.server.utils.binary.PooledBuffers;
|
||||||
import net.minestom.server.utils.validate.Check;
|
import net.minestom.server.utils.validate.Check;
|
||||||
import org.jetbrains.annotations.ApiStatus;
|
import org.jetbrains.annotations.ApiStatus;
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
@ -27,7 +28,6 @@ import javax.crypto.Cipher;
|
|||||||
import javax.crypto.SecretKey;
|
import javax.crypto.SecretKey;
|
||||||
import javax.crypto.ShortBufferException;
|
import javax.crypto.ShortBufferException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.ref.SoftReference;
|
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
import java.nio.BufferUnderflowException;
|
import java.nio.BufferUnderflowException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
@ -35,7 +35,6 @@ import java.nio.channels.ClosedChannelException;
|
|||||||
import java.nio.channels.SocketChannel;
|
import java.nio.channels.SocketChannel;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
||||||
import java.util.zip.DataFormatException;
|
import java.util.zip.DataFormatException;
|
||||||
import java.util.zip.Inflater;
|
import java.util.zip.Inflater;
|
||||||
|
|
||||||
@ -47,8 +46,6 @@ import java.util.zip.Inflater;
|
|||||||
@ApiStatus.Internal
|
@ApiStatus.Internal
|
||||||
public class PlayerSocketConnection extends PlayerConnection {
|
public class PlayerSocketConnection extends PlayerConnection {
|
||||||
private final static Logger LOGGER = LoggerFactory.getLogger(PlayerSocketConnection.class);
|
private final static Logger LOGGER = LoggerFactory.getLogger(PlayerSocketConnection.class);
|
||||||
private final static Queue<SoftReference<BinaryBuffer>> POOLED_BUFFERS = new ConcurrentLinkedQueue<>();
|
|
||||||
private final static int BUFFER_SIZE = 262_143;
|
|
||||||
|
|
||||||
private final Worker worker;
|
private final Worker worker;
|
||||||
private final SocketChannel channel;
|
private final SocketChannel channel;
|
||||||
@ -78,7 +75,7 @@ public class PlayerSocketConnection extends PlayerConnection {
|
|||||||
|
|
||||||
private final Object bufferLock = new Object();
|
private final Object bufferLock = new Object();
|
||||||
private final List<BinaryBuffer> waitingBuffers = new ArrayList<>();
|
private final List<BinaryBuffer> waitingBuffers = new ArrayList<>();
|
||||||
private BinaryBuffer tickBuffer = getPooledBuffer();
|
private BinaryBuffer tickBuffer = PooledBuffers.get();
|
||||||
private volatile BinaryBuffer cacheBuffer;
|
private volatile BinaryBuffer cacheBuffer;
|
||||||
|
|
||||||
public PlayerSocketConnection(@NotNull Worker worker, @NotNull SocketChannel channel, SocketAddress remoteAddress) {
|
public PlayerSocketConnection(@NotNull Worker worker, @NotNull SocketChannel channel, SocketAddress remoteAddress) {
|
||||||
@ -226,16 +223,17 @@ public class PlayerSocketConnection extends PlayerConnection {
|
|||||||
@ApiStatus.Internal
|
@ApiStatus.Internal
|
||||||
public void write(@NotNull ByteBuffer buffer) {
|
public void write(@NotNull ByteBuffer buffer) {
|
||||||
synchronized (bufferLock) {
|
synchronized (bufferLock) {
|
||||||
|
final int capacity = tickBuffer.capacity();
|
||||||
final int size = buffer.remaining();
|
final int size = buffer.remaining();
|
||||||
if (size <= BUFFER_SIZE) {
|
if (size <= capacity) {
|
||||||
if (!tickBuffer.canWrite(size)) flush();
|
if (!tickBuffer.canWrite(size)) flush();
|
||||||
if (!isOnline()) return;
|
if (!isOnline()) return;
|
||||||
this.tickBuffer.write(buffer);
|
this.tickBuffer.write(buffer);
|
||||||
} else {
|
} else {
|
||||||
final int bufferCount = size / BUFFER_SIZE + 1;
|
final int bufferCount = size / capacity + 1;
|
||||||
for (int i = 0; i < bufferCount; i++) {
|
for (int i = 0; i < bufferCount; i++) {
|
||||||
buffer.position(i * BUFFER_SIZE);
|
buffer.position(i * capacity);
|
||||||
buffer.limit(Math.min(size, buffer.position() + BUFFER_SIZE));
|
buffer.limit(Math.min(size, buffer.position() + capacity));
|
||||||
if (!tickBuffer.canWrite(buffer.remaining())) flush();
|
if (!tickBuffer.canWrite(buffer.remaining())) flush();
|
||||||
if (!isOnline()) return;
|
if (!isOnline()) return;
|
||||||
this.tickBuffer.write(buffer);
|
this.tickBuffer.write(buffer);
|
||||||
@ -268,13 +266,13 @@ public class PlayerSocketConnection extends PlayerConnection {
|
|||||||
if (localBuffer.readableBytes() == 0 && waitingBuffers.isEmpty()) return;
|
if (localBuffer.readableBytes() == 0 && waitingBuffers.isEmpty()) return;
|
||||||
// Update tick buffer
|
// Update tick buffer
|
||||||
try {
|
try {
|
||||||
this.tickBuffer = getPooledBuffer();
|
this.tickBuffer = PooledBuffers.get();
|
||||||
if (encrypted) {
|
if (encrypted) {
|
||||||
final Cipher cipher = encryptCipher;
|
final Cipher cipher = encryptCipher;
|
||||||
// Encrypt data first
|
// Encrypt data first
|
||||||
ByteBuffer cipherInput = localBuffer.asByteBuffer(0, localBuffer.writerOffset());
|
ByteBuffer cipherInput = localBuffer.asByteBuffer(0, localBuffer.writerOffset());
|
||||||
BinaryBuffer pooled = getPooledBuffer();
|
BinaryBuffer pooled = PooledBuffers.get();
|
||||||
ByteBuffer cipherOutput = pooled.asByteBuffer(0, BUFFER_SIZE);
|
ByteBuffer cipherOutput = pooled.asByteBuffer(0, pooled.capacity());
|
||||||
try {
|
try {
|
||||||
cipher.update(cipherInput, cipherOutput);
|
cipher.update(cipherInput, cipherOutput);
|
||||||
} catch (ShortBufferException e) {
|
} catch (ShortBufferException e) {
|
||||||
@ -282,7 +280,7 @@ public class PlayerSocketConnection extends PlayerConnection {
|
|||||||
}
|
}
|
||||||
localBuffer.clear();
|
localBuffer.clear();
|
||||||
localBuffer.write(cipherOutput.flip());
|
localBuffer.write(cipherOutput.flip());
|
||||||
POOLED_BUFFERS.add(new SoftReference<>(pooled));
|
PooledBuffers.add(pooled);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.waitingBuffers.add(localBuffer);
|
this.waitingBuffers.add(localBuffer);
|
||||||
@ -292,7 +290,7 @@ public class PlayerSocketConnection extends PlayerConnection {
|
|||||||
try {
|
try {
|
||||||
if (!waitingBuffer.writeChannel(channel)) break;
|
if (!waitingBuffer.writeChannel(channel)) break;
|
||||||
iterator.remove();
|
iterator.remove();
|
||||||
POOLED_BUFFERS.add(new SoftReference<>(waitingBuffer));
|
PooledBuffers.add(waitingBuffer);
|
||||||
} catch (ClosedChannelException e) {
|
} catch (ClosedChannelException e) {
|
||||||
shouldDisconnect = true;
|
shouldDisconnect = true;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
@ -331,7 +329,7 @@ public class PlayerSocketConnection extends PlayerConnection {
|
|||||||
synchronized (bufferLock) {
|
synchronized (bufferLock) {
|
||||||
if (!waitingBuffers.isEmpty()) {
|
if (!waitingBuffers.isEmpty()) {
|
||||||
for (BinaryBuffer waitingBuffer : waitingBuffers) {
|
for (BinaryBuffer waitingBuffer : waitingBuffers) {
|
||||||
POOLED_BUFFERS.add(new SoftReference<>(waitingBuffer));
|
PooledBuffers.add(waitingBuffer);
|
||||||
}
|
}
|
||||||
this.waitingBuffers.clear();
|
this.waitingBuffers.clear();
|
||||||
}
|
}
|
||||||
@ -471,15 +469,4 @@ public class PlayerSocketConnection extends PlayerConnection {
|
|||||||
public void setNonce(byte[] nonce) {
|
public void setNonce(byte[] nonce) {
|
||||||
this.nonce = nonce;
|
this.nonce = nonce;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static BinaryBuffer getPooledBuffer() {
|
|
||||||
BinaryBuffer buffer = null;
|
|
||||||
SoftReference<BinaryBuffer> ref;
|
|
||||||
while ((ref = POOLED_BUFFERS.poll()) != null) {
|
|
||||||
buffer = ref.get();
|
|
||||||
if (buffer != null) break;
|
|
||||||
}
|
|
||||||
if (buffer != null) buffer.clear();
|
|
||||||
return Objects.requireNonNullElseGet(buffer, () -> BinaryBuffer.ofSize(BUFFER_SIZE));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -39,9 +39,12 @@ import java.util.zip.Deflater;
|
|||||||
public final class PacketUtils {
|
public final class PacketUtils {
|
||||||
private static final PacketListenerManager PACKET_LISTENER_MANAGER = MinecraftServer.getPacketListenerManager();
|
private static final PacketListenerManager PACKET_LISTENER_MANAGER = MinecraftServer.getPacketListenerManager();
|
||||||
private static final ThreadLocal<Deflater> LOCAL_DEFLATER = ThreadLocal.withInitial(Deflater::new);
|
private static final ThreadLocal<Deflater> LOCAL_DEFLATER = ThreadLocal.withInitial(Deflater::new);
|
||||||
|
|
||||||
|
/// Local buffers
|
||||||
private static final LocalCache PACKET_BUFFER = LocalCache.get("packet-buffer", Server.MAX_PACKET_SIZE);
|
private static final LocalCache PACKET_BUFFER = LocalCache.get("packet-buffer", Server.MAX_PACKET_SIZE);
|
||||||
private static final LocalCache LOCAL_BUFFER = LocalCache.get("local-buffer", Server.MAX_PACKET_SIZE);
|
private static final LocalCache LOCAL_BUFFER = LocalCache.get("local-buffer", Server.MAX_PACKET_SIZE);
|
||||||
|
|
||||||
|
// Viewable packets
|
||||||
private static final Object VIEWABLE_PACKET_LOCK = new Object();
|
private static final Object VIEWABLE_PACKET_LOCK = new Object();
|
||||||
private static final Map<Viewable, ViewableStorage> VIEWABLE_STORAGE_MAP = new WeakHashMap<>();
|
private static final Map<Viewable, ViewableStorage> VIEWABLE_STORAGE_MAP = new WeakHashMap<>();
|
||||||
|
|
||||||
|
@ -0,0 +1,34 @@
|
|||||||
|
package net.minestom.server.utils.binary;
|
||||||
|
|
||||||
|
import org.jetbrains.annotations.ApiStatus;
|
||||||
|
|
||||||
|
import java.lang.ref.SoftReference;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
|
||||||
|
@ApiStatus.Internal
|
||||||
|
@ApiStatus.Experimental
|
||||||
|
public final class PooledBuffers {
|
||||||
|
private final static Queue<SoftReference<BinaryBuffer>> POOLED_BUFFERS = new ConcurrentLinkedQueue<>();
|
||||||
|
private final static int BUFFER_SIZE = 262_143;
|
||||||
|
|
||||||
|
public static BinaryBuffer get() {
|
||||||
|
BinaryBuffer buffer = null;
|
||||||
|
SoftReference<BinaryBuffer> ref;
|
||||||
|
while ((ref = POOLED_BUFFERS.poll()) != null) {
|
||||||
|
buffer = ref.get();
|
||||||
|
if (buffer != null) break;
|
||||||
|
}
|
||||||
|
return Objects.requireNonNullElseGet(buffer, () -> BinaryBuffer.ofSize(BUFFER_SIZE));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void add(BinaryBuffer buffer) {
|
||||||
|
buffer.clear();
|
||||||
|
POOLED_BUFFERS.add(new SoftReference<>(buffer));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int bufferSize() {
|
||||||
|
return BUFFER_SIZE;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user