mirror of
https://github.com/PaperMC/Paper.git
synced 2024-11-03 01:10:37 +01:00
Remove slow and complex pipelining and make our own packet writing system. Speed glorious speed.
This commit is contained in:
parent
b5047135c3
commit
184ccfa363
@ -1,4 +1,4 @@
|
|||||||
From ac55b7e35bf3f589fda99e820f8f0a8d9df866d1 Mon Sep 17 00:00:00 2001
|
From ed60ecd862c59f6f4117899e46cc30a635db3a95 Mon Sep 17 00:00:00 2001
|
||||||
From: md_5 <md_5@live.com.au>
|
From: md_5 <md_5@live.com.au>
|
||||||
Date: Sun, 23 Jun 2013 16:32:51 +1000
|
Date: Sun, 23 Jun 2013 16:32:51 +1000
|
||||||
Subject: [PATCH] Netty
|
Subject: [PATCH] Netty
|
||||||
@ -569,7 +569,7 @@ index 0000000..2eb1dcb
|
|||||||
+}
|
+}
|
||||||
diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java
|
diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java
|
||||||
new file mode 100644
|
new file mode 100644
|
||||||
index 0000000..c4e1153
|
index 0000000..2db3ebb
|
||||||
--- /dev/null
|
--- /dev/null
|
||||||
+++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java
|
+++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java
|
||||||
@@ -0,0 +1,316 @@
|
@@ -0,0 +1,316 @@
|
||||||
@ -579,10 +579,8 @@ index 0000000..c4e1153
|
|||||||
+import io.netty.channel.Channel;
|
+import io.netty.channel.Channel;
|
||||||
+import io.netty.channel.ChannelHandlerContext;
|
+import io.netty.channel.ChannelHandlerContext;
|
||||||
+import io.netty.channel.ChannelInboundHandlerAdapter;
|
+import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
+import io.netty.channel.ChannelPromise;
|
|
||||||
+import io.netty.channel.MessageList;
|
+import io.netty.channel.MessageList;
|
||||||
+import io.netty.channel.socket.SocketChannel;
|
+import io.netty.channel.socket.SocketChannel;
|
||||||
+import io.netty.channel.socket.nio.NioSocketChannel;
|
|
||||||
+import java.net.Socket;
|
+import java.net.Socket;
|
||||||
+import java.net.SocketAddress;
|
+import java.net.SocketAddress;
|
||||||
+import java.security.PrivateKey;
|
+import java.security.PrivateKey;
|
||||||
@ -647,6 +645,7 @@ index 0000000..c4e1153
|
|||||||
+ private Object[] dcArgs;
|
+ private Object[] dcArgs;
|
||||||
+ private Socket socketAdaptor;
|
+ private Socket socketAdaptor;
|
||||||
+ private long writtenBytes;
|
+ private long writtenBytes;
|
||||||
|
+ private PacketWriter writer;
|
||||||
+
|
+
|
||||||
+ @Override
|
+ @Override
|
||||||
+ public void channelActive(ChannelHandlerContext ctx) throws Exception
|
+ public void channelActive(ChannelHandlerContext ctx) throws Exception
|
||||||
@ -658,6 +657,7 @@ index 0000000..c4e1153
|
|||||||
+ socketAdaptor = NettySocketAdaptor.adapt( (SocketChannel) channel );
|
+ socketAdaptor = NettySocketAdaptor.adapt( (SocketChannel) channel );
|
||||||
+ // Followed by their first handler
|
+ // Followed by their first handler
|
||||||
+ connection = new PendingConnection( server, this );
|
+ connection = new PendingConnection( server, this );
|
||||||
|
+ writer = new PacketWriter();
|
||||||
+ // Finally register the connection
|
+ // Finally register the connection
|
||||||
+ connected = true;
|
+ connected = true;
|
||||||
+ serverConnection.register( (PendingConnection) connection );
|
+ serverConnection.register( (PendingConnection) connection );
|
||||||
@ -666,6 +666,7 @@ index 0000000..c4e1153
|
|||||||
+ @Override
|
+ @Override
|
||||||
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception
|
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception
|
||||||
+ {
|
+ {
|
||||||
|
+ writer.release();
|
||||||
+ a( "disconnect.endOfStream", new Object[ 0 ] );
|
+ a( "disconnect.endOfStream", new Object[ 0 ] );
|
||||||
+ }
|
+ }
|
||||||
+
|
+
|
||||||
@ -771,13 +772,12 @@ index 0000000..c4e1153
|
|||||||
+ {
|
+ {
|
||||||
+ highPriorityQueue.add( packet );
|
+ highPriorityQueue.add( packet );
|
||||||
+
|
+
|
||||||
+ ChannelPromise promise = channel.newPromise();
|
|
||||||
+ if ( packet instanceof Packet255KickDisconnect )
|
+ if ( packet instanceof Packet255KickDisconnect )
|
||||||
+ {
|
+ {
|
||||||
+ channel.pipeline().get( PacketEncoder.class ).lastFlush = 0;
|
+ writer.lastFlush = 0;
|
||||||
+ }
|
+ }
|
||||||
+
|
+
|
||||||
+ channel.write( packet, promise );
|
+ writer.write( channel, this, packet );
|
||||||
+ if ( packet instanceof Packet252KeyResponse )
|
+ if ( packet instanceof Packet252KeyResponse )
|
||||||
+ {
|
+ {
|
||||||
+ Cipher encrypt = NettyServerConnection.getCipher( Cipher.ENCRYPT_MODE, secret );
|
+ Cipher encrypt = NettyServerConnection.getCipher( Cipher.ENCRYPT_MODE, secret );
|
||||||
@ -891,10 +891,10 @@ index 0000000..c4e1153
|
|||||||
+}
|
+}
|
||||||
diff --git a/src/main/java/org/spigotmc/netty/NettyServerConnection.java b/src/main/java/org/spigotmc/netty/NettyServerConnection.java
|
diff --git a/src/main/java/org/spigotmc/netty/NettyServerConnection.java b/src/main/java/org/spigotmc/netty/NettyServerConnection.java
|
||||||
new file mode 100644
|
new file mode 100644
|
||||||
index 0000000..e00bc6a
|
index 0000000..1dfb36b
|
||||||
--- /dev/null
|
--- /dev/null
|
||||||
+++ b/src/main/java/org/spigotmc/netty/NettyServerConnection.java
|
+++ b/src/main/java/org/spigotmc/netty/NettyServerConnection.java
|
||||||
@@ -0,0 +1,105 @@
|
@@ -0,0 +1,104 @@
|
||||||
+package org.spigotmc.netty;
|
+package org.spigotmc.netty;
|
||||||
+
|
+
|
||||||
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
@ -963,7 +963,6 @@ index 0000000..e00bc6a
|
|||||||
+ ch.pipeline()
|
+ ch.pipeline()
|
||||||
+ .addLast( "timer", new ReadTimeoutHandler( 30 ) )
|
+ .addLast( "timer", new ReadTimeoutHandler( 30 ) )
|
||||||
+ .addLast( "decoder", new PacketDecoder() )
|
+ .addLast( "decoder", new PacketDecoder() )
|
||||||
+ .addLast( "encoder", new PacketEncoder( networkManager ) )
|
|
||||||
+ .addLast( "manager", networkManager );
|
+ .addLast( "manager", networkManager );
|
||||||
+ }
|
+ }
|
||||||
+ } ).childOption( ChannelOption.TCP_NODELAY, false ).group( group ).localAddress( host, port ).bind().syncUninterruptibly();
|
+ } ).childOption( ChannelOption.TCP_NODELAY, false ).group( group ).localAddress( host, port ).bind().syncUninterruptibly();
|
||||||
@ -1374,100 +1373,6 @@ index 0000000..3adc8d6
|
|||||||
+ }
|
+ }
|
||||||
+ }
|
+ }
|
||||||
+}
|
+}
|
||||||
diff --git a/src/main/java/org/spigotmc/netty/PacketEncoder.java b/src/main/java/org/spigotmc/netty/PacketEncoder.java
|
|
||||||
new file mode 100644
|
|
||||||
index 0000000..383fd46
|
|
||||||
--- /dev/null
|
|
||||||
+++ b/src/main/java/org/spigotmc/netty/PacketEncoder.java
|
|
||||||
@@ -0,0 +1,88 @@
|
|
||||||
+package org.spigotmc.netty;
|
|
||||||
+
|
|
||||||
+import io.netty.buffer.ByteBuf;
|
|
||||||
+import io.netty.buffer.ByteBufOutputStream;
|
|
||||||
+import io.netty.channel.ChannelHandlerContext;
|
|
||||||
+import io.netty.channel.ChannelOutboundHandlerAdapter;
|
|
||||||
+import io.netty.channel.ChannelPromise;
|
|
||||||
+import io.netty.channel.MessageList;
|
|
||||||
+import java.io.DataOutputStream;
|
|
||||||
+import net.minecraft.server.Packet;
|
|
||||||
+import net.minecraft.server.PendingConnection;
|
|
||||||
+
|
|
||||||
+/**
|
|
||||||
+ * Netty encoder which takes a packet and encodes it, and adds a byte packet id
|
|
||||||
+ * header.
|
|
||||||
+ */
|
|
||||||
+public class PacketEncoder extends ChannelOutboundHandlerAdapter
|
|
||||||
+{
|
|
||||||
+
|
|
||||||
+ private static final int FLUSH_TIME = 1;
|
|
||||||
+ /*========================================================================*/
|
|
||||||
+ public long lastFlush;
|
|
||||||
+ private final NettyNetworkManager networkManager;
|
|
||||||
+ private final MessageList<Object> pending = MessageList.newInstance();
|
|
||||||
+
|
|
||||||
+ public PacketEncoder(NettyNetworkManager networkManager)
|
|
||||||
+ {
|
|
||||||
+ this.networkManager = networkManager;
|
|
||||||
+ }
|
|
||||||
+
|
|
||||||
+ @Override
|
|
||||||
+ public void write(ChannelHandlerContext ctx, MessageList<Object> msgs, ChannelPromise promise) throws Exception
|
|
||||||
+ {
|
|
||||||
+ // Append messages to queue and then recycle - we don't need to bother about recycling if the worst happens, since MessageList is GC safe
|
|
||||||
+ pending.add( msgs );
|
|
||||||
+ msgs.recycle();
|
|
||||||
+
|
|
||||||
+ // If we are not in the pending connect phase, and we have not reached our timer
|
|
||||||
+ if ( !( networkManager.connection instanceof PendingConnection ) && System.currentTimeMillis() - lastFlush < FLUSH_TIME )
|
|
||||||
+ {
|
|
||||||
+ return;
|
|
||||||
+ }
|
|
||||||
+ // Update our last write time
|
|
||||||
+ lastFlush = System.currentTimeMillis();
|
|
||||||
+
|
|
||||||
+ // Since we are writing in batches it can be useful to guess the size of our output to limit memcpy
|
|
||||||
+ int estimatedSize = 0;
|
|
||||||
+ for ( Object msg : pending )
|
|
||||||
+ {
|
|
||||||
+ if ( msg instanceof Packet )
|
|
||||||
+ {
|
|
||||||
+ estimatedSize += ( (Packet) msg ).a();
|
|
||||||
+ } else
|
|
||||||
+ {
|
|
||||||
+ throw new IllegalStateException( "Cannot send message of class " + msg.getClass() );
|
|
||||||
+ }
|
|
||||||
+ }
|
|
||||||
+ // Allocate an output buffer of estimated size
|
|
||||||
+ ByteBuf outBuf = ctx.alloc().buffer( estimatedSize );
|
|
||||||
+ // And a stream to which we can write this buffer to
|
|
||||||
+ DataOutputStream dataOut = new DataOutputStream( new ByteBufOutputStream( outBuf ) );
|
|
||||||
+
|
|
||||||
+ try
|
|
||||||
+ {
|
|
||||||
+ // Iterate through all packets, this is safe as we know we will only ever get packets in the pipeline
|
|
||||||
+ for ( Packet packet : (MessageList<Packet>) (MessageList) pending )
|
|
||||||
+ {
|
|
||||||
+ // Write packet ID
|
|
||||||
+ outBuf.writeByte( packet.n() );
|
|
||||||
+ // Write packet data
|
|
||||||
+ packet.a( dataOut );
|
|
||||||
+ }
|
|
||||||
+ // Add to the courtesy API providing number of written bytes
|
|
||||||
+ networkManager.addWrittenBytes( outBuf.readableBytes() );
|
|
||||||
+ // Write down our single ByteBuf
|
|
||||||
+ ctx.write( outBuf, promise );
|
|
||||||
+ } finally
|
|
||||||
+ {
|
|
||||||
+ // Reset packet queue
|
|
||||||
+ pending.clear();
|
|
||||||
+ // Since we are now in the event loop, the bytes have been written, we can free them if this was not the case
|
|
||||||
+ if ( outBuf.refCnt() != 0 )
|
|
||||||
+ {
|
|
||||||
+ outBuf.release();
|
|
||||||
+ }
|
|
||||||
+ }
|
|
||||||
+ }
|
|
||||||
+}
|
|
||||||
diff --git a/src/main/java/org/spigotmc/netty/PacketListener.java b/src/main/java/org/spigotmc/netty/PacketListener.java
|
diff --git a/src/main/java/org/spigotmc/netty/PacketListener.java b/src/main/java/org/spigotmc/netty/PacketListener.java
|
||||||
new file mode 100644
|
new file mode 100644
|
||||||
index 0000000..965ba12
|
index 0000000..965ba12
|
||||||
@ -1586,6 +1491,100 @@ index 0000000..965ba12
|
|||||||
+ return packet;
|
+ return packet;
|
||||||
+ }
|
+ }
|
||||||
+}
|
+}
|
||||||
|
diff --git a/src/main/java/org/spigotmc/netty/PacketWriter.java b/src/main/java/org/spigotmc/netty/PacketWriter.java
|
||||||
|
new file mode 100644
|
||||||
|
index 0000000..f6fb958
|
||||||
|
--- /dev/null
|
||||||
|
+++ b/src/main/java/org/spigotmc/netty/PacketWriter.java
|
||||||
|
@@ -0,0 +1,88 @@
|
||||||
|
+package org.spigotmc.netty;
|
||||||
|
+
|
||||||
|
+import io.netty.buffer.ByteBuf;
|
||||||
|
+import io.netty.buffer.ByteBufOutputStream;
|
||||||
|
+import io.netty.channel.Channel;
|
||||||
|
+import io.netty.channel.ChannelHandlerContext;
|
||||||
|
+import io.netty.channel.ChannelOutboundHandlerAdapter;
|
||||||
|
+import io.netty.channel.ChannelPromise;
|
||||||
|
+import io.netty.channel.MessageList;
|
||||||
|
+import io.netty.handler.codec.EncoderException;
|
||||||
|
+import java.io.DataOutputStream;
|
||||||
|
+import java.io.IOException;
|
||||||
|
+import net.minecraft.server.Packet;
|
||||||
|
+import net.minecraft.server.PendingConnection;
|
||||||
|
+
|
||||||
|
+/**
|
||||||
|
+ * Netty encoder which takes a packet and encodes it, and adds a byte packet id
|
||||||
|
+ * header.
|
||||||
|
+ */
|
||||||
|
+public class PacketWriter
|
||||||
|
+{
|
||||||
|
+
|
||||||
|
+ private static final int FLUSH_TIME = 1;
|
||||||
|
+ /*========================================================================*/
|
||||||
|
+ long lastFlush;
|
||||||
|
+ private final MessageList<Packet> pending = MessageList.newInstance();
|
||||||
|
+
|
||||||
|
+ void release()
|
||||||
|
+ {
|
||||||
|
+ pending.recycle();
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
+ void write(Channel channel, NettyNetworkManager networkManager, Packet msg)
|
||||||
|
+ {
|
||||||
|
+ // Append messages to queue
|
||||||
|
+ pending.add( msg );
|
||||||
|
+
|
||||||
|
+ // If we are not in the pending connect phase, and we have not reached our timer
|
||||||
|
+ if ( !( networkManager.connection instanceof PendingConnection ) && System.currentTimeMillis() - lastFlush < FLUSH_TIME )
|
||||||
|
+ {
|
||||||
|
+ return;
|
||||||
|
+ }
|
||||||
|
+ // Update our last write time
|
||||||
|
+ lastFlush = System.currentTimeMillis();
|
||||||
|
+
|
||||||
|
+ // Since we are writing in batches it can be useful to guess the size of our output to limit memcpy
|
||||||
|
+ int estimatedSize = 0;
|
||||||
|
+ for ( Packet packet : pending )
|
||||||
|
+ {
|
||||||
|
+ estimatedSize += packet.a();
|
||||||
|
+ }
|
||||||
|
+ // Allocate an output buffer of estimated size
|
||||||
|
+ ByteBuf outBuf = channel.alloc().buffer( estimatedSize );
|
||||||
|
+ // And a stream to which we can write this buffer to
|
||||||
|
+ DataOutputStream dataOut = new DataOutputStream( new ByteBufOutputStream( outBuf ) );
|
||||||
|
+
|
||||||
|
+ try
|
||||||
|
+ {
|
||||||
|
+ // Iterate through all packets, this is safe as we know we will only ever get packets in the pipeline
|
||||||
|
+ for ( Packet packet : pending )
|
||||||
|
+ {
|
||||||
|
+ // Write packet ID
|
||||||
|
+ outBuf.writeByte( packet.n() );
|
||||||
|
+ // Write packet data
|
||||||
|
+ try
|
||||||
|
+ {
|
||||||
|
+ packet.a( dataOut );
|
||||||
|
+ } catch ( IOException ex )
|
||||||
|
+ {
|
||||||
|
+ throw new EncoderException( ex );
|
||||||
|
+ }
|
||||||
|
+ }
|
||||||
|
+ // Add to the courtesy API providing number of written bytes
|
||||||
|
+ networkManager.addWrittenBytes( outBuf.readableBytes() );
|
||||||
|
+ // Write down our single ByteBuf
|
||||||
|
+ channel.write( outBuf );
|
||||||
|
+ } finally
|
||||||
|
+ {
|
||||||
|
+ // Reset packet queue
|
||||||
|
+ pending.clear();
|
||||||
|
+ // Since we are now in the event loop, the bytes have been written, we can free them if this was not the case
|
||||||
|
+ if ( outBuf.refCnt() != 0 )
|
||||||
|
+ {
|
||||||
|
+ outBuf.release();
|
||||||
|
+ }
|
||||||
|
+ }
|
||||||
|
+ }
|
||||||
|
+}
|
||||||
diff --git a/src/main/java/org/spigotmc/netty/ReadState.java b/src/main/java/org/spigotmc/netty/ReadState.java
|
diff --git a/src/main/java/org/spigotmc/netty/ReadState.java b/src/main/java/org/spigotmc/netty/ReadState.java
|
||||||
new file mode 100644
|
new file mode 100644
|
||||||
index 0000000..d3a9cab
|
index 0000000..d3a9cab
|
||||||
|
Loading…
Reference in New Issue
Block a user