Change packet handling from ByteBufs to byte arrays to work around netty bug. Connection now appears to be stable - just need to add an optimized encryption algorithm back.

This commit is contained in:
md_5 2013-03-14 17:24:32 +11:00
parent b0820208e6
commit 30b381853c
23 changed files with 232 additions and 123 deletions

View File

@ -1,7 +1,5 @@
package net.md_5.bungee;
import io.netty.buffer.ByteBuf;
/**
* Class to rewrite integers within packets.
*/
@ -115,20 +113,20 @@ public class EntityMap
};
}
public static void rewrite(ByteBuf packet, int oldId, int newId)
public static void rewrite(byte[] packet, int oldId, int newId)
{
int packetId = packet.getUnsignedByte( 0 );
int packetId = packet[0] & 0xFF;
if ( packetId == 0x1D )
{ // bulk entity
for ( int pos = 2; pos < packet.writerIndex(); pos += 4 )
for ( int pos = 2; pos < packet.length; pos += 4 )
{
int readId = packet.getInt( pos );
int readId = readInt( packet, pos );
if ( readId == oldId )
{
packet.setInt( pos, newId );
setInt( packet, pos, newId );
} else if ( readId == newId )
{
packet.setInt( pos, oldId );
setInt( packet, pos, oldId );
}
}
} else
@ -138,27 +136,29 @@ public class EntityMap
{
for ( int pos : idArray )
{
int readId = packet.getInt( pos );
int readId = readInt( packet, pos );
if ( readId == oldId )
{
packet.setInt( pos, newId );
setInt( packet, pos, newId );
} else if ( readId == newId )
{
packet.setInt( pos, oldId );
setInt( packet, pos, oldId );
}
}
}
}
if ( packetId == 0x17 )
}
private static void setInt(byte[] buf, int pos, int i)
{
int type = packet.getByte( 5 );
if ( type >= 60 && type <= 62 )
buf[pos] = (byte) ( i >> 24 );
buf[pos + 1] = (byte) ( i >> 16 );
buf[pos + 2] = (byte) ( i >> 8 );
buf[pos + 3] = (byte) i;
}
private static int readInt(byte[] buf, int pos)
{
if ( packet.getInt( 20 ) == oldId )
{
packet.setInt( 20, newId );
}
}
}
return ( ( ( buf[pos] & 0xFF ) << 24 ) | ( ( buf[pos + 1] & 0xFF ) << 16 ) | ( ( buf[pos + 2] & 0xFF ) << 8 ) | buf[pos + 3] & 0xFF );
}
}

View File

@ -51,9 +51,9 @@ public class DownstreamBridge extends PacketHandler
}
@Override
public void handle(Wrapper buf) throws Exception
public void handle(byte[] buf) throws Exception
{
EntityMap.rewrite( buf.buf, con.serverEntityId, con.clientEntityId );
EntityMap.rewrite( buf, con.serverEntityId, con.clientEntityId );
con.ch.write( buf );
}

View File

@ -1,6 +1,5 @@
package net.md_5.bungee.connection;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import lombok.RequiredArgsConstructor;
import net.md_5.bungee.EntityMap;
@ -10,7 +9,6 @@ import net.md_5.bungee.api.ProxyServer;
import net.md_5.bungee.api.event.ChatEvent;
import net.md_5.bungee.api.event.PlayerDisconnectEvent;
import net.md_5.bungee.api.event.PluginMessageEvent;
import net.md_5.bungee.netty.Wrapper;
import net.md_5.bungee.packet.Packet0KeepAlive;
import net.md_5.bungee.packet.Packet3Chat;
import net.md_5.bungee.packet.PacketFAPluginMessage;
@ -45,9 +43,9 @@ public class UpstreamBridge extends PacketHandler
}
@Override
public void handle(Wrapper buf) throws Exception
public void handle(byte[] buf) throws Exception
{
EntityMap.rewrite( buf.buf, con.clientEntityId, con.serverEntityId );
EntityMap.rewrite( buf, con.clientEntityId, con.serverEntityId );
if ( con.getServer() != null )
{
con.getServer().getCh().write( buf );

View File

@ -0,0 +1,17 @@
package net.md_5.bungee.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
@ChannelHandler.Sharable
public class ByteArrayEncoder extends MessageToByteEncoder<byte[]>
{
@Override
protected void encode(ChannelHandlerContext ctx, byte[] msg, ByteBuf out) throws Exception
{
out.writeBytes( msg );
}
}

View File

@ -0,0 +1,18 @@
package net.md_5.bungee.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import net.md_5.bungee.packet.DefinedPacket;
@ChannelHandler.Sharable
public class DefinedPacketEncoder extends MessageToByteEncoder<DefinedPacket>
{
@Override
protected void encode(ChannelHandlerContext ctx, DefinedPacket msg, ByteBuf out) throws Exception
{
out.writeBytes( msg.getPacket() );
}
}

View File

@ -1,17 +0,0 @@
package net.md_5.bungee.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class HackEncoder extends MessageToByteEncoder<Wrapper>
{
@Override
protected void encode(ChannelHandlerContext ctx, Wrapper msg, ByteBuf out) throws Exception
{
out.capacity( msg.buf.readableBytes() );
out.writeBytes( msg.buf );
msg.buf.release();
}
}

View File

@ -1,7 +1,6 @@
package net.md_5.bungee.netty;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.handler.timeout.ReadTimeoutException;
@ -16,7 +15,7 @@ import net.md_5.bungee.packet.PacketHandler;
* channels to maintain simple states, and only call the required, adapted
* methods when the channel is connected.
*/
public class HandlerBoss extends ChannelInboundMessageHandlerAdapter<Wrapper>
public class HandlerBoss extends ChannelInboundMessageHandlerAdapter<byte[]>
{
private PacketHandler handler;
@ -48,11 +47,11 @@ public class HandlerBoss extends ChannelInboundMessageHandlerAdapter<Wrapper>
}
@Override
public void messageReceived(ChannelHandlerContext ctx, Wrapper msg) throws Exception
public void messageReceived(ChannelHandlerContext ctx, byte[] msg) throws Exception
{
if ( handler != null && ctx.channel().isActive() )
{
DefinedPacket packet = DefinedPacket.packet( msg.buf );
DefinedPacket packet = DefinedPacket.packet( msg);
boolean sendPacket = true;
if ( packet != null )
{

View File

@ -25,11 +25,13 @@ public class PacketDecoder extends ReplayingDecoder<Void>
private int protocol;
@Override
protected Wrapper decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception
protected byte[] decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception
{
int startIndex = in.readerIndex();
PacketReader.readPacket( in, protocol );
ByteBuf ret = in.copy( startIndex, in.readerIndex() - startIndex );
return new Wrapper( ret );
byte[] buf = new byte[ in.readerIndex() - startIndex ];
in.readerIndex( startIndex );
in.readBytes( buf, 0, buf.length );
return buf;
}
}

View File

@ -1,7 +1,6 @@
package net.md_5.bungee.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
@ -42,6 +41,8 @@ public class PipelineUtils
}
};
public static final Base BASE = new Base();
private static final DefinedPacketEncoder packetEncoder = new DefinedPacketEncoder();
private static final ByteArrayEncoder arrayEncoder = new ByteArrayEncoder();
public final static class Base extends ChannelInitializer<Channel>
{
@ -58,7 +59,8 @@ public class PipelineUtils
}
ch.pipeline().addLast( "timer", new ReadTimeoutHandler( BungeeCord.getInstance().config.getTimeout(), TimeUnit.MILLISECONDS ) );
ch.pipeline().addLast( "decoder", new PacketDecoder( PacketDefinitions.VANILLA_PROTOCOL ) );
ch.pipeline().addLast( "encoder", new HackEncoder() );
ch.pipeline().addLast( "packet-encoder", packetEncoder );
ch.pipeline().addLast( "array-encoder", arrayEncoder );
ch.pipeline().addLast( "handler", new HandlerBoss() );
}
};

View File

@ -1,8 +1,11 @@
package net.md_5.bungee.packet;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ReferenceCounted;
import io.netty.buffer.Unpooled;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import lombok.Delegate;
@ -13,40 +16,58 @@ import net.md_5.bungee.Util;
* subclasses can read and write to the backing byte array which can be
* retrieved via the {@link #getPacket()} method.
*/
public abstract class DefinedPacket implements ByteBuf
public abstract class DefinedPacket implements DataOutput
{
@Delegate(types =
private static interface Overriden
{
ByteBuf.class, ReferenceCounted.class
})
private ByteBuf buf;
public DefinedPacket(int id, ByteBuf buf)
void readUTF();
void writeUTF(String s);
}
private ByteArrayInputStream byteStream;
private DataInputStream in;
@Delegate(excludes = Overriden.class)
private ByteArrayDataOutput out;
private byte[] buf;
public DefinedPacket(int id, byte[] buf)
{
this.buf = buf;
byteStream = new ByteArrayInputStream( buf );
in = new DataInputStream( byteStream );
if ( readUnsignedByte() != id )
{
throw new IllegalArgumentException( "Wasn't expecting packet id " + Util.hex( id ) );
}
this.buf = buf;
}
public DefinedPacket(int id)
{
buf = Unpooled.buffer();
out = ByteStreams.newDataOutput();
writeByte( id );
}
public void writeString(String s)
/**
* Gets the bytes that make up this packet.
*
* @return the bytes which make up this packet, either the original byte
* array or the newly written one.
*/
public byte[] getPacket()
{
writeShort( s.length() );
for ( char c : s.toCharArray() )
{
writeChar( c );
}
return buf == null ? buf = out.toByteArray() : buf;
}
public String readString()
@Override
public void writeUTF(String s)
{
writeShort( s.length() );
writeChars( s );
}
public String readUTF()
{
short len = readShort();
char[] chars = new char[ len ];
@ -60,17 +81,99 @@ public abstract class DefinedPacket implements ByteBuf
public void writeArray(byte[] b)
{
writeShort( b.length );
writeBytes( b );
write( b );
}
public byte[] readArray()
{
short len = readShort();
byte[] ret = new byte[ len ];
readBytes( ret );
readFully( ret );
return ret;
}
public final int available()
{
return byteStream.available();
}
public final void readFully(byte b[])
{
try
{
in.readFully( b );
} catch ( IOException e )
{
throw new IllegalStateException( e );
}
}
public final boolean readBoolean()
{
try
{
return in.readBoolean();
} catch ( IOException e )
{
throw new IllegalStateException( e );
}
}
public final byte readByte()
{
try
{
return in.readByte();
} catch ( IOException e )
{
throw new IllegalStateException( e );
}
}
public final int readUnsignedByte()
{
try
{
return in.readUnsignedByte();
} catch ( IOException e )
{
throw new IllegalStateException( e );
}
}
public final short readShort()
{
try
{
return in.readShort();
} catch ( IOException e )
{
throw new IllegalStateException( e );
}
}
public final char readChar()
{
try
{
return in.readChar();
} catch ( IOException e )
{
throw new IllegalStateException( e );
}
}
public final int readInt()
{
try
{
return in.readInt();
} catch ( IOException e )
{
throw new IllegalStateException( e );
}
}
@Override
public abstract boolean equals(Object obj);
@ -86,9 +189,9 @@ public abstract class DefinedPacket implements ByteBuf
@SuppressWarnings("unchecked")
private static Constructor<? extends DefinedPacket>[] consructors = new Constructor[ 256 ];
public static DefinedPacket packet(ByteBuf buf)
public static DefinedPacket packet(byte[] buf)
{
short id = buf.getUnsignedByte( 0 );
int id = buf[0] & 0xFF;
Class<? extends DefinedPacket> clazz = classes[id];
DefinedPacket ret = null;
if ( clazz != null )
@ -98,15 +201,13 @@ public abstract class DefinedPacket implements ByteBuf
Constructor<? extends DefinedPacket> constructor = consructors[id];
if ( constructor == null )
{
constructor = clazz.getDeclaredConstructor( ByteBuf.class );
constructor = clazz.getDeclaredConstructor( byte[].class );
consructors[id] = constructor;
}
if ( constructor != null )
{
buf.markReaderIndex();
ret = constructor.newInstance( buf );
buf.resetReaderIndex();
}
} catch ( IllegalAccessException | InstantiationException | InvocationTargetException | NoSuchMethodException ex )
{

View File

@ -1,6 +1,5 @@
package net.md_5.bungee.packet;
import io.netty.buffer.ByteBuf;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@ -11,7 +10,7 @@ public class Packet0KeepAlive extends DefinedPacket
public int id;
Packet0KeepAlive(ByteBuf buf)
Packet0KeepAlive(byte[] buf)
{
super( 0x00, buf );
id = readInt();

View File

@ -1,6 +1,5 @@
package net.md_5.bungee.packet;
import io.netty.buffer.ByteBuf;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@ -21,7 +20,7 @@ public class Packet1Login extends DefinedPacket
{
super( 0x01 );
writeInt( entityId );
writeString( levelType );
writeUTF( levelType );
writeByte( gameMode );
writeByte( dimension );
writeByte( difficulty );
@ -36,16 +35,16 @@ public class Packet1Login extends DefinedPacket
this.maxPlayers = maxPlayers;
}
Packet1Login(ByteBuf buf)
Packet1Login(byte[] buf)
{
super( 0x01, buf );
this.entityId = readInt();
this.levelType = readString();
this.levelType = readUTF();
this.gameMode = readByte();
if ( readableBytes() == 4 )
if ( available() == 4 )
{
this.dimension = readByte();
} else if ( readableBytes() == 7 )
} else if ( available() == 7 )
{
this.dimension = readInt();
} else

View File

@ -1,6 +1,5 @@
package net.md_5.bungee.packet;
import io.netty.buffer.ByteBuf;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@ -18,8 +17,8 @@ public class Packet2Handshake extends DefinedPacket
{
super( 0x02 );
writeByte( protocolVersion );
writeString( username );
writeString( host );
writeUTF( username );
writeUTF( host );
writeInt( port );
this.procolVersion = protocolVersion;
this.username = username;
@ -27,12 +26,12 @@ public class Packet2Handshake extends DefinedPacket
this.port = port;
}
Packet2Handshake(ByteBuf buf)
Packet2Handshake(byte[] buf)
{
super( 0x02, buf );
this.procolVersion = readByte();
this.username = readString();
this.host = readString();
this.username = readUTF();
this.host = readUTF();
this.port = readInt();
}

View File

@ -1,6 +1,5 @@
package net.md_5.bungee.packet;
import io.netty.buffer.ByteBuf;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@ -14,14 +13,14 @@ public class Packet3Chat extends DefinedPacket
public Packet3Chat(String message)
{
super( 0x03 );
writeString( message );
writeUTF( message );
this.message = message;
}
Packet3Chat(ByteBuf buf)
Packet3Chat(byte[] buf)
{
super( 0x03, buf );
this.message = readString();
this.message = readUTF();
}
@Override

View File

@ -24,7 +24,7 @@ public class Packet9Respawn extends DefinedPacket
writeByte( difficulty );
writeByte( gameMode );
writeShort( worldHeight );
writeString( levelType );
writeUTF( levelType );
this.dimension = dimension;
this.difficulty = difficulty;
this.gameMode = gameMode;
@ -32,14 +32,14 @@ public class Packet9Respawn extends DefinedPacket
this.levelType = levelType;
}
Packet9Respawn(ByteBuf buf)
Packet9Respawn(byte[] buf)
{
super( 0x09, buf );
this.dimension = readInt();
this.difficulty = readByte();
this.gameMode = readByte();
this.worldHeight = readShort();
this.levelType = readString();
this.levelType = readUTF();
}
@Override

View File

@ -1,6 +1,5 @@
package net.md_5.bungee.packet;
import io.netty.buffer.ByteBuf;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@ -13,10 +12,10 @@ public class PacketC9PlayerListItem extends DefinedPacket
public boolean online;
public int ping;
PacketC9PlayerListItem(ByteBuf buf)
PacketC9PlayerListItem(byte[] buf)
{
super( 0xC9, buf );
username = readString();
username = readUTF();
online = readBoolean();
ping = readShort();
}
@ -24,7 +23,7 @@ public class PacketC9PlayerListItem extends DefinedPacket
public PacketC9PlayerListItem(String username, boolean online, int ping)
{
super( 0xC9 );
writeString( username );
writeUTF( username );
writeBoolean( online );
writeShort( ping );
}

View File

@ -26,7 +26,7 @@ public class PacketCDClientStatus extends DefinedPacket
writeByte( payload );
}
PacketCDClientStatus(ByteBuf buf)
PacketCDClientStatus(byte[] buf)
{
super( 0xCD, buf );
}

View File

@ -1,6 +1,5 @@
package net.md_5.bungee.packet;
import io.netty.buffer.ByteBuf;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@ -15,16 +14,16 @@ public class PacketFAPluginMessage extends DefinedPacket
public PacketFAPluginMessage(String tag, byte[] data)
{
super( 0xFA );
writeString( tag );
writeUTF( tag );
writeArray( data );
this.tag = tag;
this.data = data;
}
PacketFAPluginMessage(ByteBuf buf)
PacketFAPluginMessage(byte[] buf)
{
super( 0xFA, buf );
this.tag = readString();
this.tag = readUTF();
this.data = readArray();
}

View File

@ -1,6 +1,5 @@
package net.md_5.bungee.packet;
import io.netty.buffer.ByteBuf;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@ -28,7 +27,7 @@ public class PacketFCEncryptionResponse extends DefinedPacket
this.verifyToken = verifyToken;
}
PacketFCEncryptionResponse(ByteBuf buf)
PacketFCEncryptionResponse(byte[] buf)
{
super( 0xFC, buf );
this.sharedSecret = readArray();

View File

@ -16,7 +16,7 @@ public class PacketFDEncryptionRequest extends DefinedPacket
public PacketFDEncryptionRequest(String serverId, byte[] publicKey, byte[] verifyToken)
{
super( 0xFD );
writeString( serverId );
writeUTF( serverId );
writeArray( publicKey );
writeArray( verifyToken );
this.serverId = serverId;
@ -24,10 +24,10 @@ public class PacketFDEncryptionRequest extends DefinedPacket
this.verifyToken = verifyToken;
}
PacketFDEncryptionRequest(ByteBuf buf)
PacketFDEncryptionRequest(byte[] buf)
{
super( 0xFD, buf );
serverId = readString();
serverId = readUTF();
publicKey = readArray();
verifyToken = readArray();
}

View File

@ -1,6 +1,5 @@
package net.md_5.bungee.packet;
import io.netty.buffer.ByteBuf;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@ -11,7 +10,7 @@ public class PacketFEPing extends DefinedPacket
public byte version;
PacketFEPing(ByteBuf buffer)
PacketFEPing(byte[] buffer)
{
super( 0xFE, buffer );
version = readByte();

View File

@ -1,6 +1,5 @@
package net.md_5.bungee.packet;
import io.netty.buffer.ByteBuf;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@ -14,13 +13,13 @@ public class PacketFFKick extends DefinedPacket
public PacketFFKick(String message)
{
super( 0xFF );
writeString( message );
writeUTF( message );
}
PacketFFKick(ByteBuf buf)
PacketFFKick(byte[] buf)
{
super( 0xFF, buf );
this.message = readString();
this.message = readUTF();
}
@Override

View File

@ -1,8 +1,6 @@
package net.md_5.bungee.packet;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import net.md_5.bungee.netty.Wrapper;
public abstract class PacketHandler
{
@ -22,7 +20,7 @@ public abstract class PacketHandler
{
}
public void handle(Wrapper buf) throws Exception
public void handle(byte[] buf) throws Exception
{
}