Format netty patch

This commit is contained in:
md_5 2013-06-01 13:57:16 +10:00
parent 829a3e9181
commit fdd25ff498

View File

@ -1,4 +1,4 @@
From cf9f87164540e86ffc374ea90f0c1e7e9674a29d Mon Sep 17 00:00:00 2001 From 5c9533473e7a221d24d6aa9a36e43804aae72298 Mon Sep 17 00:00:00 2001
From: md_5 <md_5@live.com.au> From: md_5 <md_5@live.com.au>
Date: Tue, 23 Apr 2013 11:47:32 +1000 Date: Tue, 23 Apr 2013 11:47:32 +1000
Subject: [PATCH] Netty Subject: [PATCH] Netty
@ -368,10 +368,10 @@ index 0000000..c8ea80a
+} +}
diff --git a/src/main/java/org/spigotmc/netty/CipherBase.java b/src/main/java/org/spigotmc/netty/CipherBase.java diff --git a/src/main/java/org/spigotmc/netty/CipherBase.java b/src/main/java/org/spigotmc/netty/CipherBase.java
new file mode 100644 new file mode 100644
index 0000000..1f915df index 0000000..e9068e7
--- /dev/null --- /dev/null
+++ b/src/main/java/org/spigotmc/netty/CipherBase.java +++ b/src/main/java/org/spigotmc/netty/CipherBase.java
@@ -0,0 +1,45 @@ @@ -0,0 +1,52 @@
+package org.spigotmc.netty; +package org.spigotmc.netty;
+ +
+import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBuf;
@ -383,35 +383,42 @@ index 0000000..1f915df
+ * {@link #cipher(io.netty.buffer.ByteBuf, io.netty.buffer.ByteBuf)} method to + * {@link #cipher(io.netty.buffer.ByteBuf, io.netty.buffer.ByteBuf)} method to
+ * aid in the efficient passing of ByteBuffers through a cipher. + * aid in the efficient passing of ByteBuffers through a cipher.
+ */ + */
+class CipherBase { +class CipherBase
+{
+ +
+ private final Cipher cipher; + private final Cipher cipher;
+ private ThreadLocal<byte[]> heapInLocal = new EmptyByteThreadLocal(); + private ThreadLocal<byte[]> heapInLocal = new EmptyByteThreadLocal();
+ private ThreadLocal<byte[]> heapOutLocal = new EmptyByteThreadLocal(); + private ThreadLocal<byte[]> heapOutLocal = new EmptyByteThreadLocal();
+ +
+ private static class EmptyByteThreadLocal extends ThreadLocal<byte[]> { + private static class EmptyByteThreadLocal extends ThreadLocal<byte[]>
+ {
+ +
+ @Override + @Override
+ protected byte[] initialValue() { + protected byte[] initialValue()
+ {
+ return new byte[ 0 ]; + return new byte[ 0 ];
+ } + }
+ } + }
+ +
+ protected CipherBase(Cipher cipher) { + protected CipherBase(Cipher cipher)
+ {
+ this.cipher = cipher; + this.cipher = cipher;
+ } + }
+ +
+ protected void cipher(ByteBuf in, ByteBuf out) throws ShortBufferException { + protected void cipher(ByteBuf in, ByteBuf out) throws ShortBufferException
+ {
+ byte[] heapIn = heapInLocal.get(); + byte[] heapIn = heapInLocal.get();
+ int readableBytes = in.readableBytes(); + int readableBytes = in.readableBytes();
+ if (heapIn.length < readableBytes) { + if ( heapIn.length < readableBytes )
+ {
+ heapIn = new byte[ readableBytes ]; + heapIn = new byte[ readableBytes ];
+ } + }
+ in.readBytes( heapIn, 0, readableBytes ); + in.readBytes( heapIn, 0, readableBytes );
+ +
+ byte[] heapOut = heapOutLocal.get(); + byte[] heapOut = heapOutLocal.get();
+ int outputSize = cipher.getOutputSize( readableBytes ); + int outputSize = cipher.getOutputSize( readableBytes );
+ if (heapOut.length < outputSize) { + if ( heapOut.length < outputSize )
+ {
+ heapOut = new byte[ outputSize ]; + heapOut = new byte[ outputSize ];
+ } + }
+ out.writeBytes( heapOut, 0, cipher.update( heapIn, 0, readableBytes, heapOut ) ); + out.writeBytes( heapOut, 0, cipher.update( heapIn, 0, readableBytes, heapOut ) );
@ -419,10 +426,10 @@ index 0000000..1f915df
+} +}
diff --git a/src/main/java/org/spigotmc/netty/CipherDecoder.java b/src/main/java/org/spigotmc/netty/CipherDecoder.java diff --git a/src/main/java/org/spigotmc/netty/CipherDecoder.java b/src/main/java/org/spigotmc/netty/CipherDecoder.java
new file mode 100644 new file mode 100644
index 0000000..30b1d5f index 0000000..98dc3a0
--- /dev/null --- /dev/null
+++ b/src/main/java/org/spigotmc/netty/CipherDecoder.java +++ b/src/main/java/org/spigotmc/netty/CipherDecoder.java
@@ -0,0 +1,20 @@ @@ -0,0 +1,23 @@
+package org.spigotmc.netty; +package org.spigotmc.netty;
+ +
+import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBuf;
@ -430,25 +437,28 @@ index 0000000..30b1d5f
+import io.netty.handler.codec.ByteToByteDecoder; +import io.netty.handler.codec.ByteToByteDecoder;
+import javax.crypto.Cipher; +import javax.crypto.Cipher;
+ +
+class CipherDecoder extends ByteToByteDecoder { +class CipherDecoder extends ByteToByteDecoder
+{
+ +
+ private final CipherBase cipher; + private final CipherBase cipher;
+ +
+ public CipherDecoder(Cipher cipher) { + public CipherDecoder(Cipher cipher)
+ {
+ this.cipher = new CipherBase( cipher ); + this.cipher = new CipherBase( cipher );
+ } + }
+ +
+ @Override + @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception { + protected void decode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception
+ {
+ cipher.cipher( in, out ); + cipher.cipher( in, out );
+ } + }
+} +}
diff --git a/src/main/java/org/spigotmc/netty/CipherEncoder.java b/src/main/java/org/spigotmc/netty/CipherEncoder.java diff --git a/src/main/java/org/spigotmc/netty/CipherEncoder.java b/src/main/java/org/spigotmc/netty/CipherEncoder.java
new file mode 100644 new file mode 100644
index 0000000..aa8192e index 0000000..4ff943b
--- /dev/null --- /dev/null
+++ b/src/main/java/org/spigotmc/netty/CipherEncoder.java +++ b/src/main/java/org/spigotmc/netty/CipherEncoder.java
@@ -0,0 +1,20 @@ @@ -0,0 +1,23 @@
+package org.spigotmc.netty; +package org.spigotmc.netty;
+ +
+import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBuf;
@ -456,25 +466,28 @@ index 0000000..aa8192e
+import io.netty.handler.codec.ByteToByteEncoder; +import io.netty.handler.codec.ByteToByteEncoder;
+import javax.crypto.Cipher; +import javax.crypto.Cipher;
+ +
+class CipherEncoder extends ByteToByteEncoder { +class CipherEncoder extends ByteToByteEncoder
+{
+ +
+ private final CipherBase cipher; + private final CipherBase cipher;
+ +
+ public CipherEncoder(Cipher cipher) { + public CipherEncoder(Cipher cipher)
+ {
+ this.cipher = new CipherBase( cipher ); + this.cipher = new CipherBase( cipher );
+ } + }
+ +
+ @Override + @Override
+ protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception { + protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception
+ {
+ cipher.cipher( in, out ); + cipher.cipher( in, out );
+ } + }
+} +}
diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java
new file mode 100644 new file mode 100644
index 0000000..a9224d9 index 0000000..93193fc
--- /dev/null --- /dev/null
+++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java +++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java
@@ -0,0 +1,254 @@ @@ -0,0 +1,296 @@
+package org.spigotmc.netty; +package org.spigotmc.netty;
+ +
+import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -509,7 +522,8 @@ index 0000000..a9224d9
+ * {@link INetworkManager} and handles all events and inbound messages provided + * {@link INetworkManager} and handles all events and inbound messages provided
+ * by the upstream Netty process. + * by the upstream Netty process.
+ */ + */
+public class NettyNetworkManager extends ChannelInboundMessageHandlerAdapter<Packet> implements INetworkManager { +public class NettyNetworkManager extends ChannelInboundMessageHandlerAdapter<Packet> implements INetworkManager
+{
+ +
+ private static final ExecutorService threadPool = Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat( "Async Packet Handler - %1$d" ).build() ); + private static final ExecutorService threadPool = Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat( "Async Packet Handler - %1$d" ).build() );
+ private static final MinecraftServer server = MinecraftServer.getServer(); + private static final MinecraftServer server = MinecraftServer.getServer();
@ -517,19 +531,23 @@ index 0000000..a9224d9
+ private static final MultiplexingServerConnection serverConnection = (MultiplexingServerConnection) server.ae(); + private static final MultiplexingServerConnection serverConnection = (MultiplexingServerConnection) server.ae();
+ /*========================================================================*/ + /*========================================================================*/
+ private final Queue<Packet> syncPackets = new ConcurrentLinkedQueue<Packet>(); + private final Queue<Packet> syncPackets = new ConcurrentLinkedQueue<Packet>();
+ private final List<Packet> highPriorityQueue = new AbstractList<Packet>() { + private final List<Packet> highPriorityQueue = new AbstractList<Packet>()
+ {
+ @Override + @Override
+ public void add(int index, Packet element) { + public void add(int index, Packet element)
+ {
+ // NOP + // NOP
+ } + }
+ +
+ @Override + @Override
+ public Packet get(int index) { + public Packet get(int index)
+ {
+ throw new UnsupportedOperationException(); + throw new UnsupportedOperationException();
+ } + }
+ +
+ @Override + @Override
+ public int size() { + public int size()
+ {
+ return 0; + return 0;
+ } + }
+ }; + };
@ -544,12 +562,14 @@ index 0000000..a9224d9
+ private long writtenBytes; + private long writtenBytes;
+ +
+ @Override + @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception { + public void channelActive(ChannelHandlerContext ctx) throws Exception
+ {
+ // Channel and address groundwork first + // Channel and address groundwork first
+ channel = ctx.channel(); + channel = ctx.channel();
+ address = channel.remoteAddress(); + address = channel.remoteAddress();
+ // Check the throttle + // Check the throttle
+ if (serverConnection.throttle(((InetSocketAddress) channel.remoteAddress()).getAddress())) { + if ( serverConnection.throttle( ( (InetSocketAddress) channel.remoteAddress() ).getAddress() ) )
+ {
+ channel.close(); + channel.close();
+ } + }
+ // Then the socket adaptor + // Then the socket adaptor
@ -562,45 +582,59 @@ index 0000000..a9224d9
+ } + }
+ +
+ @Override + @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception { + public void channelInactive(ChannelHandlerContext ctx) throws Exception
+ {
+ a( "disconnect.endOfStream", new Object[ 0 ] ); + a( "disconnect.endOfStream", new Object[ 0 ] );
+ } + }
+ +
+ @Override + @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
+ {
+ // TODO: Remove this once we are more stable + // TODO: Remove this once we are more stable
+ // Bukkit.getServer().getLogger().severe("======================= Start Netty Debug Log ======================="); + // Bukkit.getServer().getLogger().severe("======================= Start Netty Debug Log =======================");
+ // Bukkit.getServer().getLogger().log(Level.SEVERE, "Error caught whilst handling " + channel, cause); + // Bukkit.getServer().getLogger().log(Level.SEVERE, "Error caught whilst handling " + channel, cause);
+ // Bukkit.getServer().getLogger().severe("======================= End Netty Debug Log ======================="); + // Bukkit.getServer().getLogger().severe("======================= End Netty Debug Log =======================");
+ // Disconnect with generic reason + exception + // Disconnect with generic reason + exception
+ a("disconnect.genericReason", new Object[]{"Internal exception: " + cause}); + a( "disconnect.genericReason", new Object[]
+ {
+ "Internal exception: " + cause
+ } );
+ } + }
+ +
+ @Override + @Override
+ public void messageReceived(ChannelHandlerContext ctx, final Packet msg) throws Exception { + public void messageReceived(ChannelHandlerContext ctx, final Packet msg) throws Exception
+ if (connected) { + {
+ if (msg instanceof Packet252KeyResponse) { + if ( connected )
+ {
+ if ( msg instanceof Packet252KeyResponse )
+ {
+ secret = ( (Packet252KeyResponse) msg ).a( key ); + secret = ( (Packet252KeyResponse) msg ).a( key );
+ Cipher decrypt = NettyServerConnection.getCipher( Cipher.DECRYPT_MODE, secret ); + Cipher decrypt = NettyServerConnection.getCipher( Cipher.DECRYPT_MODE, secret );
+ channel.pipeline().addBefore( "decoder", "decrypt", new CipherDecoder( decrypt ) ); + channel.pipeline().addBefore( "decoder", "decrypt", new CipherDecoder( decrypt ) );
+ } + }
+ +
+ if (msg.a_()) { + if ( msg.a_() )
+ threadPool.submit(new Runnable() { + {
+ public void run() { + threadPool.submit( new Runnable()
+ {
+ public void run()
+ {
+ Packet packet = PacketListener.callReceived( NettyNetworkManager.this, connection, msg ); + Packet packet = PacketListener.callReceived( NettyNetworkManager.this, connection, msg );
+ if (packet != null) { + if ( packet != null )
+ {
+ packet.handle( connection ); + packet.handle( connection );
+ } + }
+ } + }
+ } ); + } );
+ } else { + } else
+ {
+ syncPackets.add( msg ); + syncPackets.add( msg );
+ } + }
+ } + }
+ } + }
+ +
+ public Socket getSocket() { + public Socket getSocket()
+ {
+ return socketAdaptor; + return socketAdaptor;
+ } + }
+ +
@ -609,7 +643,8 @@ index 0000000..a9224d9
+ * + *
+ * @param nh the new {@link NetHandler} instance + * @param nh the new {@link NetHandler} instance
+ */ + */
+ public void a(Connection nh) { + public void a(Connection nh)
+ {
+ connection = nh; + connection = nh;
+ } + }
+ +
@ -619,21 +654,26 @@ index 0000000..a9224d9
+ * + *
+ * @param packet the packet to queue + * @param packet the packet to queue
+ */ + */
+ public void queue(Packet packet) { + public void queue(Packet packet)
+ {
+ // Only send if channel is still connected + // Only send if channel is still connected
+ if (connected) { + if ( connected )
+ {
+ // Process packet via handler + // Process packet via handler
+ packet = PacketListener.callQueued( this, connection, packet ); + packet = PacketListener.callQueued( this, connection, packet );
+ // If handler indicates packet send + // If handler indicates packet send
+ if (packet != null) { + if ( packet != null )
+ {
+ highPriorityQueue.add( packet ); + highPriorityQueue.add( packet );
+ +
+ if (packet instanceof Packet255KickDisconnect) { + if ( packet instanceof Packet255KickDisconnect )
+ {
+ channel.pipeline().get( OutboundManager.class ).flushNow = true; + channel.pipeline().get( OutboundManager.class ).flushNow = true;
+ } + }
+ +
+ channel.write( packet, channel.voidPromise() ); + channel.write( packet, channel.voidPromise() );
+ if (packet instanceof Packet252KeyResponse) { + if ( packet instanceof Packet252KeyResponse )
+ {
+ Cipher encrypt = NettyServerConnection.getCipher( Cipher.ENCRYPT_MODE, secret ); + Cipher encrypt = NettyServerConnection.getCipher( Cipher.ENCRYPT_MODE, secret );
+ channel.pipeline().addBefore( "decoder", "encrypt", new CipherEncoder( encrypt ) ); + channel.pipeline().addBefore( "decoder", "encrypt", new CipherEncoder( encrypt ) );
+ } + }
@ -645,28 +685,34 @@ index 0000000..a9224d9
+ * wakeThreads. In Vanilla this method will interrupt the network read and + * wakeThreads. In Vanilla this method will interrupt the network read and
+ * write threads, thus waking them. + * write threads, thus waking them.
+ */ + */
+ public void a() { + public void a()
+ {
+ } + }
+ +
+ /** + /**
+ * processPackets. Remove up to 1000 packets from the queue and process + * processPackets. Remove up to 1000 packets from the queue and process
+ * them. This method should only be called from the main server thread. + * them. This method should only be called from the main server thread.
+ */ + */
+ public void b() { + public void b()
+ for (int i = 1000; !syncPackets.isEmpty() && i >= 0; i--) { + {
+ if (connection instanceof PendingConnection ? ((PendingConnection) connection).b : ((PlayerConnection) connection).disconnected) { + for ( int i = 1000; !syncPackets.isEmpty() && i >= 0; i-- )
+ {
+ if ( connection instanceof PendingConnection ? ( (PendingConnection) connection ).b : ( (PlayerConnection) connection ).disconnected )
+ {
+ syncPackets.clear(); + syncPackets.clear();
+ break; + break;
+ } + }
+ +
+ Packet packet = PacketListener.callReceived( this, connection, syncPackets.poll() ); + Packet packet = PacketListener.callReceived( this, connection, syncPackets.poll() );
+ if (packet != null) { + if ( packet != null )
+ {
+ packet.handle( connection ); + packet.handle( connection );
+ } + }
+ } + }
+ +
+ // Disconnect via the handler - this performs all plugin related cleanup + logging + // Disconnect via the handler - this performs all plugin related cleanup + logging
+ if (!connected && (dcReason != null || dcArgs != null)) { + if ( !connected && ( dcReason != null || dcArgs != null ) )
+ {
+ connection.a( dcReason, dcArgs ); + connection.a( dcReason, dcArgs );
+ } + }
+ } + }
@ -677,19 +723,23 @@ index 0000000..a9224d9
+ * + *
+ * @return the remote address of this connection + * @return the remote address of this connection
+ */ + */
+ public SocketAddress getSocketAddress() { + public SocketAddress getSocketAddress()
+ {
+ return address; + return address;
+ } + }
+ +
+ public void setSocketAddress(SocketAddress address) { + public void setSocketAddress(SocketAddress address)
+ {
+ this.address = address; + this.address = address;
+ } + }
+ +
+ /** + /**
+ * close. Close and release all resources associated with this connection. + * close. Close and release all resources associated with this connection.
+ */ + */
+ public void d() { + public void d()
+ if (connected) { + {
+ if ( connected )
+ {
+ connected = false; + connected = false;
+ channel.close(); + channel.close();
+ } + }
@ -701,7 +751,8 @@ index 0000000..a9224d9
+ * + *
+ * @return the size of the packet send queue + * @return the size of the packet send queue
+ */ + */
+ public int e() { + public int e()
+ {
+ return 0; + return 0;
+ } + }
+ +
@ -713,28 +764,32 @@ index 0000000..a9224d9
+ * @param arguments additional disconnect arguments, for example, the + * @param arguments additional disconnect arguments, for example, the
+ * exception which triggered the disconnect. + * exception which triggered the disconnect.
+ */ + */
+ public void a(String reason, Object... arguments) { + public void a(String reason, Object... arguments)
+ if (connected) { + {
+ if ( connected )
+ {
+ dcReason = reason; + dcReason = reason;
+ dcArgs = arguments; + dcArgs = arguments;
+ d(); + d();
+ } + }
+ } + }
+ +
+ public long getWrittenBytes() { + public long getWrittenBytes()
+ {
+ return writtenBytes; + return writtenBytes;
+ } + }
+ +
+ public void addWrittenBytes(int written) { + public void addWrittenBytes(int written)
+ {
+ writtenBytes += written; + writtenBytes += written;
+ } + }
+} +}
diff --git a/src/main/java/org/spigotmc/netty/NettyServerConnection.java b/src/main/java/org/spigotmc/netty/NettyServerConnection.java diff --git a/src/main/java/org/spigotmc/netty/NettyServerConnection.java b/src/main/java/org/spigotmc/netty/NettyServerConnection.java
new file mode 100644 new file mode 100644
index 0000000..7a621aa index 0000000..3822a3f
--- /dev/null --- /dev/null
+++ b/src/main/java/org/spigotmc/netty/NettyServerConnection.java +++ b/src/main/java/org/spigotmc/netty/NettyServerConnection.java
@@ -0,0 +1,81 @@ @@ -0,0 +1,91 @@
+package org.spigotmc.netty; +package org.spigotmc.netty;
+ +
+import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -761,19 +816,25 @@ index 0000000..7a621aa
+ * server and this NIO implementation. It handles starting, stopping and + * server and this NIO implementation. It handles starting, stopping and
+ * processing the Netty backend. + * processing the Netty backend.
+ */ + */
+public class NettyServerConnection extends ServerConnection { +public class NettyServerConnection extends ServerConnection
+{
+ +
+ private final ChannelFuture socket; + private final ChannelFuture socket;
+ +
+ public NettyServerConnection(MinecraftServer ms, InetAddress host, int port) { + public NettyServerConnection(MinecraftServer ms, InetAddress host, int port)
+ {
+ super( ms ); + super( ms );
+ int threads = Integer.getInteger( "org.spigotmc.netty.threads", 3 ); + int threads = Integer.getInteger( "org.spigotmc.netty.threads", 3 );
+ socket = new ServerBootstrap().channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() { + socket = new ServerBootstrap().channel( NioServerSocketChannel.class ).childHandler( new ChannelInitializer()
+ {
+ @Override + @Override
+ public void initChannel(Channel ch) throws Exception { + public void initChannel(Channel ch) throws Exception
+ try { + {
+ try
+ {
+ ch.config().setOption( ChannelOption.IP_TOS, 0x18 ); + ch.config().setOption( ChannelOption.IP_TOS, 0x18 );
+ } catch (ChannelException ex) { + } catch ( ChannelException ex )
+ {
+ // IP_TOS is not supported (Windows XP / Windows Server 2003) + // IP_TOS is not supported (Windows XP / Windows Server 2003)
+ } + }
+ +
@ -795,7 +856,8 @@ index 0000000..7a621aa
+ * action. + * action.
+ */ + */
+ @Override + @Override
+ public void a() { + public void a()
+ {
+ socket.channel().close().syncUninterruptibly(); + socket.channel().close().syncUninterruptibly();
+ } + }
+ +
@ -806,22 +868,25 @@ index 0000000..7a621aa
+ * @param key to use as the initial vector + * @param key to use as the initial vector
+ * @return the initialized cipher + * @return the initialized cipher
+ */ + */
+ public static Cipher getCipher(int opMode, Key key) { + public static Cipher getCipher(int opMode, Key key)
+ try { + {
+ try
+ {
+ Cipher cip = Cipher.getInstance( "AES/CFB8/NoPadding" ); + Cipher cip = Cipher.getInstance( "AES/CFB8/NoPadding" );
+ cip.init( opMode, key, new IvParameterSpec( key.getEncoded() ) ); + cip.init( opMode, key, new IvParameterSpec( key.getEncoded() ) );
+ return cip; + return cip;
+ } catch (GeneralSecurityException ex) { + } catch ( GeneralSecurityException ex )
+ {
+ throw new RuntimeException( ex ); + throw new RuntimeException( ex );
+ } + }
+ } + }
+} +}
diff --git a/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java b/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java diff --git a/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java b/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java
new file mode 100644 new file mode 100644
index 0000000..a3b86b8 index 0000000..5da8a59
--- /dev/null --- /dev/null
+++ b/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java +++ b/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java
@@ -0,0 +1,248 @@ @@ -0,0 +1,294 @@
+package org.spigotmc.netty; +package org.spigotmc.netty;
+ +
+import io.netty.channel.Channel; +import io.netty.channel.Channel;
@ -842,231 +907,277 @@ index 0000000..a3b86b8
+ * a {@link Channel} are implemented here. Those which cannot will throw an + * a {@link Channel} are implemented here. Those which cannot will throw an
+ * {@link UnsupportedOperationException}. + * {@link UnsupportedOperationException}.
+ */ + */
+public class NettySocketAdaptor extends Socket { +public class NettySocketAdaptor extends Socket
+{
+ +
+ private final io.netty.channel.socket.SocketChannel ch; + private final io.netty.channel.socket.SocketChannel ch;
+ +
+ private NettySocketAdaptor(io.netty.channel.socket.SocketChannel ch) { + private NettySocketAdaptor(io.netty.channel.socket.SocketChannel ch)
+ {
+ this.ch = ch; + this.ch = ch;
+ } + }
+ +
+ public static NettySocketAdaptor adapt(io.netty.channel.socket.SocketChannel ch) { + public static NettySocketAdaptor adapt(io.netty.channel.socket.SocketChannel ch)
+ {
+ return new NettySocketAdaptor( ch ); + return new NettySocketAdaptor( ch );
+ } + }
+ +
+ @Override + @Override
+ public void bind(SocketAddress bindpoint) throws IOException { + public void bind(SocketAddress bindpoint) throws IOException
+ {
+ ch.bind( bindpoint ).syncUninterruptibly(); + ch.bind( bindpoint ).syncUninterruptibly();
+ } + }
+ +
+ @Override + @Override
+ public synchronized void close() throws IOException { + public synchronized void close() throws IOException
+ {
+ ch.close().syncUninterruptibly(); + ch.close().syncUninterruptibly();
+ } + }
+ +
+ @Override + @Override
+ public void connect(SocketAddress endpoint) throws IOException { + public void connect(SocketAddress endpoint) throws IOException
+ {
+ ch.connect( endpoint ).syncUninterruptibly(); + ch.connect( endpoint ).syncUninterruptibly();
+ } + }
+ +
+ @Override + @Override
+ public void connect(SocketAddress endpoint, int timeout) throws IOException { + public void connect(SocketAddress endpoint, int timeout) throws IOException
+ {
+ ch.config().setConnectTimeoutMillis( timeout ); + ch.config().setConnectTimeoutMillis( timeout );
+ ch.connect( endpoint ).syncUninterruptibly(); + ch.connect( endpoint ).syncUninterruptibly();
+ } + }
+ +
+ @Override + @Override
+ public boolean equals(Object obj) { + public boolean equals(Object obj)
+ {
+ return obj instanceof NettySocketAdaptor && ch.equals( ( (NettySocketAdaptor) obj ).ch ); + return obj instanceof NettySocketAdaptor && ch.equals( ( (NettySocketAdaptor) obj ).ch );
+ } + }
+ +
+ @Override + @Override
+ public SocketChannel getChannel() { + public SocketChannel getChannel()
+ {
+ throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." );
+ } + }
+ +
+ @Override + @Override
+ public InetAddress getInetAddress() { + public InetAddress getInetAddress()
+ {
+ return ch.remoteAddress().getAddress(); + return ch.remoteAddress().getAddress();
+ } + }
+ +
+ @Override + @Override
+ public InputStream getInputStream() throws IOException { + public InputStream getInputStream() throws IOException
+ {
+ throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." );
+ } + }
+ +
+ @Override + @Override
+ public boolean getKeepAlive() throws SocketException { + public boolean getKeepAlive() throws SocketException
+ {
+ return ch.config().getOption( ChannelOption.SO_KEEPALIVE ); + return ch.config().getOption( ChannelOption.SO_KEEPALIVE );
+ } + }
+ +
+ @Override + @Override
+ public InetAddress getLocalAddress() { + public InetAddress getLocalAddress()
+ {
+ return ch.localAddress().getAddress(); + return ch.localAddress().getAddress();
+ } + }
+ +
+ @Override + @Override
+ public int getLocalPort() { + public int getLocalPort()
+ {
+ return ch.localAddress().getPort(); + return ch.localAddress().getPort();
+ } + }
+ +
+ @Override + @Override
+ public SocketAddress getLocalSocketAddress() { + public SocketAddress getLocalSocketAddress()
+ {
+ return ch.localAddress(); + return ch.localAddress();
+ } + }
+ +
+ @Override + @Override
+ public boolean getOOBInline() throws SocketException { + public boolean getOOBInline() throws SocketException
+ {
+ throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." );
+ } + }
+ +
+ @Override + @Override
+ public OutputStream getOutputStream() throws IOException { + public OutputStream getOutputStream() throws IOException
+ {
+ throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." );
+ } + }
+ +
+ @Override + @Override
+ public int getPort() { + public int getPort()
+ {
+ return ch.remoteAddress().getPort(); + return ch.remoteAddress().getPort();
+ } + }
+ +
+ @Override + @Override
+ public synchronized int getReceiveBufferSize() throws SocketException { + public synchronized int getReceiveBufferSize() throws SocketException
+ {
+ return ch.config().getOption( ChannelOption.SO_RCVBUF ); + return ch.config().getOption( ChannelOption.SO_RCVBUF );
+ } + }
+ +
+ @Override + @Override
+ public SocketAddress getRemoteSocketAddress() { + public SocketAddress getRemoteSocketAddress()
+ {
+ return ch.remoteAddress(); + return ch.remoteAddress();
+ } + }
+ +
+ @Override + @Override
+ public boolean getReuseAddress() throws SocketException { + public boolean getReuseAddress() throws SocketException
+ {
+ return ch.config().getOption( ChannelOption.SO_REUSEADDR ); + return ch.config().getOption( ChannelOption.SO_REUSEADDR );
+ } + }
+ +
+ @Override + @Override
+ public synchronized int getSendBufferSize() throws SocketException { + public synchronized int getSendBufferSize() throws SocketException
+ {
+ return ch.config().getOption( ChannelOption.SO_SNDBUF ); + return ch.config().getOption( ChannelOption.SO_SNDBUF );
+ } + }
+ +
+ @Override + @Override
+ public int getSoLinger() throws SocketException { + public int getSoLinger() throws SocketException
+ {
+ return ch.config().getOption( ChannelOption.SO_LINGER ); + return ch.config().getOption( ChannelOption.SO_LINGER );
+ } + }
+ +
+ @Override + @Override
+ public synchronized int getSoTimeout() throws SocketException { + public synchronized int getSoTimeout() throws SocketException
+ {
+ throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." );
+ } + }
+ +
+ @Override + @Override
+ public boolean getTcpNoDelay() throws SocketException { + public boolean getTcpNoDelay() throws SocketException
+ {
+ return ch.config().getOption( ChannelOption.TCP_NODELAY ); + return ch.config().getOption( ChannelOption.TCP_NODELAY );
+ } + }
+ +
+ @Override + @Override
+ public int getTrafficClass() throws SocketException { + public int getTrafficClass() throws SocketException
+ {
+ return ch.config().getOption( ChannelOption.IP_TOS ); + return ch.config().getOption( ChannelOption.IP_TOS );
+ } + }
+ +
+ @Override + @Override
+ public int hashCode() { + public int hashCode()
+ {
+ return ch.hashCode(); + return ch.hashCode();
+ } + }
+ +
+ @Override + @Override
+ public boolean isBound() { + public boolean isBound()
+ {
+ return ch.localAddress() != null; + return ch.localAddress() != null;
+ } + }
+ +
+ @Override + @Override
+ public boolean isClosed() { + public boolean isClosed()
+ {
+ return !ch.isOpen(); + return !ch.isOpen();
+ } + }
+ +
+ @Override + @Override
+ public boolean isConnected() { + public boolean isConnected()
+ {
+ return ch.isActive(); + return ch.isActive();
+ } + }
+ +
+ @Override + @Override
+ public boolean isInputShutdown() { + public boolean isInputShutdown()
+ {
+ return ch.isInputShutdown(); + return ch.isInputShutdown();
+ } + }
+ +
+ @Override + @Override
+ public boolean isOutputShutdown() { + public boolean isOutputShutdown()
+ {
+ return ch.isOutputShutdown(); + return ch.isOutputShutdown();
+ } + }
+ +
+ @Override + @Override
+ public void sendUrgentData(int data) throws IOException { + public void sendUrgentData(int data) throws IOException
+ {
+ throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." );
+ } + }
+ +
+ @Override + @Override
+ public void setKeepAlive(boolean on) throws SocketException { + public void setKeepAlive(boolean on) throws SocketException
+ {
+ ch.config().setOption( ChannelOption.SO_KEEPALIVE, on ); + ch.config().setOption( ChannelOption.SO_KEEPALIVE, on );
+ } + }
+ +
+ @Override + @Override
+ public void setOOBInline(boolean on) throws SocketException { + public void setOOBInline(boolean on) throws SocketException
+ {
+ throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." );
+ } + }
+ +
+ @Override + @Override
+ public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) { + public void setPerformancePreferences(int connectionTime, int latency, int bandwidth)
+ {
+ throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." );
+ } + }
+ +
+ @Override + @Override
+ public synchronized void setReceiveBufferSize(int size) throws SocketException { + public synchronized void setReceiveBufferSize(int size) throws SocketException
+ {
+ ch.config().setOption( ChannelOption.SO_RCVBUF, size ); + ch.config().setOption( ChannelOption.SO_RCVBUF, size );
+ } + }
+ +
+ @Override + @Override
+ public void setReuseAddress(boolean on) throws SocketException { + public void setReuseAddress(boolean on) throws SocketException
+ {
+ ch.config().setOption( ChannelOption.SO_REUSEADDR, on ); + ch.config().setOption( ChannelOption.SO_REUSEADDR, on );
+ } + }
+ +
+ @Override + @Override
+ public synchronized void setSendBufferSize(int size) throws SocketException { + public synchronized void setSendBufferSize(int size) throws SocketException
+ {
+ ch.config().setOption( ChannelOption.SO_SNDBUF, size ); + ch.config().setOption( ChannelOption.SO_SNDBUF, size );
+ } + }
+ +
+ @Override + @Override
+ public void setSoLinger(boolean on, int linger) throws SocketException { + public void setSoLinger(boolean on, int linger) throws SocketException
+ {
+ ch.config().setOption( ChannelOption.SO_LINGER, linger ); + ch.config().setOption( ChannelOption.SO_LINGER, linger );
+ } + }
+ +
+ @Override + @Override
+ public synchronized void setSoTimeout(int timeout) throws SocketException { + public synchronized void setSoTimeout(int timeout) throws SocketException
+ {
+ throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." );
+ } + }
+ +
+ @Override + @Override
+ public void setTcpNoDelay(boolean on) throws SocketException { + public void setTcpNoDelay(boolean on) throws SocketException
+ {
+ ch.config().setOption( ChannelOption.TCP_NODELAY, on ); + ch.config().setOption( ChannelOption.TCP_NODELAY, on );
+ } + }
+ +
+ @Override + @Override
+ public void setTrafficClass(int tc) throws SocketException { + public void setTrafficClass(int tc) throws SocketException
+ {
+ ch.config().setOption( ChannelOption.IP_TOS, tc ); + ch.config().setOption( ChannelOption.IP_TOS, tc );
+ } + }
+ +
+ @Override + @Override
+ public void shutdownInput() throws IOException { + public void shutdownInput() throws IOException
+ {
+ throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." );
+ } + }
+ +
+ @Override + @Override
+ public void shutdownOutput() throws IOException { + public void shutdownOutput() throws IOException
+ {
+ ch.shutdownOutput().syncUninterruptibly(); + ch.shutdownOutput().syncUninterruptibly();
+ } + }
+ +
+ @Override + @Override
+ public String toString() { + public String toString()
+ {
+ return ch.toString(); + return ch.toString();
+ } + }
+} +}
@ -1243,10 +1354,10 @@ index 0000000..c21be9f
+} +}
diff --git a/src/main/java/org/spigotmc/netty/PacketListener.java b/src/main/java/org/spigotmc/netty/PacketListener.java diff --git a/src/main/java/org/spigotmc/netty/PacketListener.java b/src/main/java/org/spigotmc/netty/PacketListener.java
new file mode 100644 new file mode 100644
index 0000000..8e3b932 index 0000000..965ba12
--- /dev/null --- /dev/null
+++ b/src/main/java/org/spigotmc/netty/PacketListener.java +++ b/src/main/java/org/spigotmc/netty/PacketListener.java
@@ -0,0 +1,100 @@ @@ -0,0 +1,112 @@
+package org.spigotmc.netty; +package org.spigotmc.netty;
+ +
+import com.google.common.base.Preconditions; +import com.google.common.base.Preconditions;
@ -1266,7 +1377,8 @@ index 0000000..8e3b932
+ * override the methods you wish to use, and call + * override the methods you wish to use, and call
+ * {@link #register(org.spigotmc.netty.PacketListener, org.bukkit.plugin.Plugin)}. + * {@link #register(org.spigotmc.netty.PacketListener, org.bukkit.plugin.Plugin)}.
+ */ + */
+public class PacketListener { +public class PacketListener
+{
+ +
+ /** + /**
+ * A mapping of all registered listeners and their owning plugins. + * A mapping of all registered listeners and their owning plugins.
@ -1284,7 +1396,8 @@ index 0000000..8e3b932
+ * @param listener the listener to register + * @param listener the listener to register
+ * @param plugin the plugin owning this listener + * @param plugin the plugin owning this listener
+ */ + */
+ public static synchronized void register(PacketListener listener, Plugin plugin) { + public static synchronized void register(PacketListener listener, Plugin plugin)
+ {
+ Preconditions.checkNotNull( listener, "listener" ); + Preconditions.checkNotNull( listener, "listener" );
+ Preconditions.checkNotNull( plugin, "plugin" ); + Preconditions.checkNotNull( plugin, "plugin" );
+ Preconditions.checkState( !listeners.containsKey( listener ), "listener already registered" ); + Preconditions.checkState( !listeners.containsKey( listener ), "listener already registered" );
@ -1296,22 +1409,30 @@ index 0000000..8e3b932
+ baked[size] = listener; + baked[size] = listener;
+ } + }
+ +
+ static Packet callReceived(INetworkManager networkManager, Connection connection, Packet packet) { + static Packet callReceived(INetworkManager networkManager, Connection connection, Packet packet)
+ for (PacketListener listener : baked) { + {
+ try { + for ( PacketListener listener : baked )
+ {
+ try
+ {
+ packet = listener.packetReceived( networkManager, connection, packet ); + packet = listener.packetReceived( networkManager, connection, packet );
+ } catch (Throwable t) { + } catch ( Throwable t )
+ {
+ Bukkit.getServer().getLogger().log( Level.SEVERE, "Error whilst firing receive hook for packet", t ); + Bukkit.getServer().getLogger().log( Level.SEVERE, "Error whilst firing receive hook for packet", t );
+ } + }
+ } + }
+ return packet; + return packet;
+ } + }
+ +
+ static Packet callQueued(INetworkManager networkManager, Connection connection, Packet packet) { + static Packet callQueued(INetworkManager networkManager, Connection connection, Packet packet)
+ for (PacketListener listener : baked) { + {
+ try { + for ( PacketListener listener : baked )
+ {
+ try
+ {
+ packet = listener.packetQueued( networkManager, connection, packet ); + packet = listener.packetQueued( networkManager, connection, packet );
+ } catch (Throwable t) { + } catch ( Throwable t )
+ {
+ Bukkit.getServer().getLogger().log( Level.SEVERE, "Error whilst firing queued hook for packet", t ); + Bukkit.getServer().getLogger().log( Level.SEVERE, "Error whilst firing queued hook for packet", t );
+ } + }
+ } + }
@ -1329,7 +1450,8 @@ index 0000000..8e3b932
+ * @param packet the received packet + * @param packet the received packet
+ * @return the packet to be handled, or null to cancel + * @return the packet to be handled, or null to cancel
+ */ + */
+ public Packet packetReceived(INetworkManager networkManager, Connection connection, Packet packet) { + public Packet packetReceived(INetworkManager networkManager, Connection connection, Packet packet)
+ {
+ return packet; + return packet;
+ } + }
+ +
@ -1343,22 +1465,24 @@ index 0000000..8e3b932
+ * @param packet the queue packet + * @param packet the queue packet
+ * @return the packet to be sent, or null if the packet will not be sent. + * @return the packet to be sent, or null if the packet will not be sent.
+ */ + */
+ public Packet packetQueued(INetworkManager networkManager, Connection connection, Packet packet) { + public Packet packetQueued(INetworkManager networkManager, Connection connection, Packet packet)
+ {
+ return packet; + return packet;
+ } + }
+} +}
diff --git a/src/main/java/org/spigotmc/netty/ReadState.java b/src/main/java/org/spigotmc/netty/ReadState.java diff --git a/src/main/java/org/spigotmc/netty/ReadState.java b/src/main/java/org/spigotmc/netty/ReadState.java
new file mode 100644 new file mode 100644
index 0000000..5dc3754 index 0000000..d3a9cab
--- /dev/null --- /dev/null
+++ b/src/main/java/org/spigotmc/netty/ReadState.java +++ b/src/main/java/org/spigotmc/netty/ReadState.java
@@ -0,0 +1,16 @@ @@ -0,0 +1,17 @@
+package org.spigotmc.netty; +package org.spigotmc.netty;
+ +
+/** +/**
+ * Stores the state of the packet currently being read. + * Stores the state of the packet currently being read.
+ */ + */
+public enum ReadState { +public enum ReadState
+{
+ +
+ /** + /**
+ * Indicates the byte representing the ID has been read. + * Indicates the byte representing the ID has been read.
@ -1384,5 +1508,5 @@ index 67b4fa9..d26e644 100644
allow-end: true allow-end: true
warn-on-overload: true warn-on-overload: true
-- --
1.8.2.1 1.8.1.2