Even nicer throttling + encoding.

This commit is contained in:
md_5 2013-06-29 16:27:52 +10:00
parent 2332de0fda
commit b5047135c3

View File

@ -1,4 +1,4 @@
From a05a6c78a40c45b5b882ccd9877b7fea231c83eb Mon Sep 17 00:00:00 2001 From ac55b7e35bf3f589fda99e820f8f0a8d9df866d1 Mon Sep 17 00:00:00 2001
From: md_5 <md_5@live.com.au> From: md_5 <md_5@live.com.au>
Date: Sun, 23 Jun 2013 16:32:51 +1000 Date: Sun, 23 Jun 2013 16:32:51 +1000
Subject: [PATCH] Netty Subject: [PATCH] Netty
@ -569,7 +569,7 @@ index 0000000..2eb1dcb
+} +}
diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java
new file mode 100644 new file mode 100644
index 0000000..313e3ea index 0000000..c4e1153
--- /dev/null --- /dev/null
+++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java +++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java
@@ -0,0 +1,316 @@ @@ -0,0 +1,316 @@
@ -774,7 +774,7 @@ index 0000000..313e3ea
+ ChannelPromise promise = channel.newPromise(); + ChannelPromise promise = channel.newPromise();
+ if ( packet instanceof Packet255KickDisconnect ) + if ( packet instanceof Packet255KickDisconnect )
+ { + {
+ channel.pipeline().get( OutboundManager.class ).lastFlush = 0; + channel.pipeline().get( PacketEncoder.class ).lastFlush = 0;
+ } + }
+ +
+ channel.write( packet, promise ); + channel.write( packet, promise );
@ -891,10 +891,10 @@ index 0000000..313e3ea
+} +}
diff --git a/src/main/java/org/spigotmc/netty/NettyServerConnection.java b/src/main/java/org/spigotmc/netty/NettyServerConnection.java diff --git a/src/main/java/org/spigotmc/netty/NettyServerConnection.java b/src/main/java/org/spigotmc/netty/NettyServerConnection.java
new file mode 100644 new file mode 100644
index 0000000..9eecd59 index 0000000..e00bc6a
--- /dev/null --- /dev/null
+++ b/src/main/java/org/spigotmc/netty/NettyServerConnection.java +++ b/src/main/java/org/spigotmc/netty/NettyServerConnection.java
@@ -0,0 +1,106 @@ @@ -0,0 +1,105 @@
+package org.spigotmc.netty; +package org.spigotmc.netty;
+ +
+import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -961,7 +961,6 @@ index 0000000..9eecd59
+ +
+ NettyNetworkManager networkManager = new NettyNetworkManager(); + NettyNetworkManager networkManager = new NettyNetworkManager();
+ ch.pipeline() + ch.pipeline()
+ .addLast( "flusher", new OutboundManager( networkManager ) )
+ .addLast( "timer", new ReadTimeoutHandler( 30 ) ) + .addLast( "timer", new ReadTimeoutHandler( 30 ) )
+ .addLast( "decoder", new PacketDecoder() ) + .addLast( "decoder", new PacketDecoder() )
+ .addLast( "encoder", new PacketEncoder( networkManager ) ) + .addLast( "encoder", new PacketEncoder( networkManager ) )
@ -1301,46 +1300,6 @@ index 0000000..5da8a59
+ return ch.toString(); + return ch.toString();
+ } + }
+} +}
diff --git a/src/main/java/org/spigotmc/netty/OutboundManager.java b/src/main/java/org/spigotmc/netty/OutboundManager.java
new file mode 100644
index 0000000..728f260
--- /dev/null
+++ b/src/main/java/org/spigotmc/netty/OutboundManager.java
@@ -0,0 +1,34 @@
+package org.spigotmc.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.MessageList;
+import net.minecraft.server.PendingConnection;
+
+class OutboundManager extends ChannelOutboundHandlerAdapter
+{
+
+ private static final int FLUSH_TIME = 1;
+ /*========================================================================*/
+ public long lastFlush;
+ private final NettyNetworkManager manager;
+ private final MessageList<Object> pending = MessageList.newInstance();
+
+ OutboundManager(NettyNetworkManager manager)
+ {
+ this.manager = manager;
+ }
+
+ @Override
+ public void write(ChannelHandlerContext ctx, MessageList<Object> msgs, ChannelPromise promise) throws Exception
+ {
+ pending.add( msgs );
+ if ( manager.connection instanceof PendingConnection || System.currentTimeMillis() - lastFlush > FLUSH_TIME )
+ {
+ lastFlush = System.currentTimeMillis();
+ ctx.write( pending.copy() );
+ pending.clear();
+ }
+ }
+}
diff --git a/src/main/java/org/spigotmc/netty/PacketDecoder.java b/src/main/java/org/spigotmc/netty/PacketDecoder.java diff --git a/src/main/java/org/spigotmc/netty/PacketDecoder.java b/src/main/java/org/spigotmc/netty/PacketDecoder.java
new file mode 100644 new file mode 100644
index 0000000..3adc8d6 index 0000000..3adc8d6
@ -1417,10 +1376,10 @@ index 0000000..3adc8d6
+} +}
diff --git a/src/main/java/org/spigotmc/netty/PacketEncoder.java b/src/main/java/org/spigotmc/netty/PacketEncoder.java diff --git a/src/main/java/org/spigotmc/netty/PacketEncoder.java b/src/main/java/org/spigotmc/netty/PacketEncoder.java
new file mode 100644 new file mode 100644
index 0000000..5b35ab0 index 0000000..383fd46
--- /dev/null --- /dev/null
+++ b/src/main/java/org/spigotmc/netty/PacketEncoder.java +++ b/src/main/java/org/spigotmc/netty/PacketEncoder.java
@@ -0,0 +1,71 @@ @@ -0,0 +1,88 @@
+package org.spigotmc.netty; +package org.spigotmc.netty;
+ +
+import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBuf;
@ -1431,6 +1390,7 @@ index 0000000..5b35ab0
+import io.netty.channel.MessageList; +import io.netty.channel.MessageList;
+import java.io.DataOutputStream; +import java.io.DataOutputStream;
+import net.minecraft.server.Packet; +import net.minecraft.server.Packet;
+import net.minecraft.server.PendingConnection;
+ +
+/** +/**
+ * Netty encoder which takes a packet and encodes it, and adds a byte packet id + * Netty encoder which takes a packet and encodes it, and adds a byte packet id
@ -1439,7 +1399,11 @@ index 0000000..5b35ab0
+public class PacketEncoder extends ChannelOutboundHandlerAdapter +public class PacketEncoder extends ChannelOutboundHandlerAdapter
+{ +{
+ +
+ private static final int FLUSH_TIME = 1;
+ /*========================================================================*/
+ public long lastFlush;
+ private final NettyNetworkManager networkManager; + private final NettyNetworkManager networkManager;
+ private final MessageList<Object> pending = MessageList.newInstance();
+ +
+ public PacketEncoder(NettyNetworkManager networkManager) + public PacketEncoder(NettyNetworkManager networkManager)
+ { + {
@ -1449,9 +1413,21 @@ index 0000000..5b35ab0
+ @Override + @Override
+ public void write(ChannelHandlerContext ctx, MessageList<Object> msgs, ChannelPromise promise) throws Exception + public void write(ChannelHandlerContext ctx, MessageList<Object> msgs, ChannelPromise promise) throws Exception
+ { + {
+ // Append messages to queue and then recycle - we don't need to bother about recycling if the worst happens, since MessageList is GC safe
+ pending.add( msgs );
+ msgs.recycle();
+
+ // If we are not in the pending connect phase, and we have not reached our timer
+ if ( !( networkManager.connection instanceof PendingConnection ) && System.currentTimeMillis() - lastFlush < FLUSH_TIME )
+ {
+ return;
+ }
+ // Update our last write time
+ lastFlush = System.currentTimeMillis();
+
+ // Since we are writing in batches it can be useful to guess the size of our output to limit memcpy + // Since we are writing in batches it can be useful to guess the size of our output to limit memcpy
+ int estimatedSize = 0; + int estimatedSize = 0;
+ for ( Object msg : msgs ) + for ( Object msg : pending )
+ { + {
+ if ( msg instanceof Packet ) + if ( msg instanceof Packet )
+ { + {
@ -1469,7 +1445,7 @@ index 0000000..5b35ab0
+ try + try
+ { + {
+ // Iterate through all packets, this is safe as we know we will only ever get packets in the pipeline + // Iterate through all packets, this is safe as we know we will only ever get packets in the pipeline
+ for ( Packet packet : (MessageList<Packet>) (MessageList) msgs ) + for ( Packet packet : (MessageList<Packet>) (MessageList) pending )
+ { + {
+ // Write packet ID + // Write packet ID
+ outBuf.writeByte( packet.n() ); + outBuf.writeByte( packet.n() );
@ -1482,8 +1458,8 @@ index 0000000..5b35ab0
+ ctx.write( outBuf, promise ); + ctx.write( outBuf, promise );
+ } finally + } finally
+ { + {
+ // Very important that we do this for fast memory reclamation + // Reset packet queue
+ msgs.recycle(); + pending.clear();
+ // Since we are now in the event loop, the bytes have been written, we can free them if this was not the case + // Since we are now in the event loop, the bytes have been written, we can free them if this was not the case
+ if ( outBuf.refCnt() != 0 ) + if ( outBuf.refCnt() != 0 )
+ { + {