Attempt to fix memory leaks with the ChannelInjector

Addresses https://github.com/aadnk/ProtocolLib/issues/70
This commit is contained in:
Dan Mulloy 2014-11-15 14:56:57 -05:00
parent 592874fd5e
commit ca2bc3ecc5
1 changed files with 119 additions and 112 deletions

View File

@ -61,16 +61,16 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
public static final ReportType REPORT_CANNOT_INTERCEPT_CLIENT_PACKET = new ReportType("Unable to intercept a read client packet.");
public static final ReportType REPORT_CANNOT_EXECUTE_IN_CHANNEL_THREAD = new ReportType("Cannot execute code in channel thread.");
public static final ReportType REPORT_CANNOT_FIND_GET_VERSION = new ReportType("Cannot find getVersion() in NetworkMananger");
/**
* Indicates that a packet has bypassed packet listeners.
*/
private static final PacketEvent BYPASSED_PACKET = new PacketEvent(ChannelInjector.class);
// The login packet
private static Class<?> PACKET_LOGIN_CLIENT = null;
private static FieldAccessor LOGIN_GAME_PROFILE = null;
// Saved accessors
private static MethodAccessor DECODE_BUFFER;
private static MethodAccessor ENCODE_BUFFER;
@ -78,17 +78,17 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
// For retrieving the protocol
private static FieldAccessor PROTOCOL_ACCESSOR;
// The factory that created this injector
private InjectionFactory factory;
// The player, or temporary player
private Player player;
private Player updated;
// The player connection
private Object playerConnection;
// The current network manager and channel
private final Object networkManager;
private final Channel originalChannel;
@ -96,32 +96,33 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
// Known network markers
private ConcurrentMap<Object, NetworkMarker> packetMarker = new MapMaker().weakKeys().makeMap();
/**
* Indicate that this packet has been processed by event listeners.
* <p>
* This must never be set outside the channel pipeline's thread.
*/
private PacketEvent currentEvent;
/**
* A packet event that should be processed by the write method.
*/
private PacketEvent finalEvent;
/**
* A flag set by the main thread to indiciate that a packet should not be processed.
*/
private final ThreadLocal<Boolean> scheduleProcessPackets = new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return true;
};
};
// Other handlers
private ByteToMessageDecoder vanillaDecoder;
private MessageToByteEncoder<Object> vanillaEncoder;
// Our extra handlers
private MessageToByteEncoder<Object> protocolEncoder;
private ChannelInboundHandler finishHandler;
@ -129,14 +130,14 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
// The channel listener
private ChannelListener channelListener;
// Processing network markers
private NetworkProcessor processor;
// Closed
private boolean injected;
private boolean closed;
/**
* Construct a new channel injector.
* @param player - the current player, or temporary player.
@ -152,14 +153,14 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
this.channelListener = Preconditions.checkNotNull(channelListener, "channelListener cannot be NULL");
this.factory = Preconditions.checkNotNull(factory, "factory cannot be NULL");
this.processor = new NetworkProcessor(ProtocolLibrary.getErrorReporter());
// Get the channel field
this.channelField = new VolatileField(
FuzzyReflection.fromObject(networkManager, true).
getFieldByType("channel", Channel.class),
getFieldByType("channel", Channel.class),
networkManager, true);
}
/**
* Get the version of the current protocol.
* @return The version.
@ -168,7 +169,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
public int getProtocolVersion() {
return MinecraftProtocolVersion.getCurrentVersion();
}
@Override
@SuppressWarnings("unchecked")
public boolean inject() {
@ -179,8 +180,8 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
return false;
if (!originalChannel.isActive())
return false;
// Main thread? We should synchronize with the channel thread, otherwise we might see a
// Main thread? We should synchronize with the channel thread, otherwise we might see a
// pipeline with only some of the handlers removed
if (Bukkit.isPrimaryThread()) {
// Just like in the close() method, we'll avoid blocking the main thread
@ -192,43 +193,43 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
});
return false; // We don't know
}
// Don't inject the same channel twice
if (findChannelHandler(originalChannel, ChannelInjector.class) != null) {
return false;
}
// Get the vanilla decoder, so we don't have to replicate the work
vanillaDecoder = (ByteToMessageDecoder) originalChannel.pipeline().get("decoder");
vanillaEncoder = (MessageToByteEncoder<Object>) originalChannel.pipeline().get("encoder");
if (vanillaDecoder == null)
throw new IllegalArgumentException("Unable to find vanilla decoder in " + originalChannel.pipeline() );
if (vanillaEncoder == null)
throw new IllegalArgumentException("Unable to find vanilla encoder in " + originalChannel.pipeline() );
patchEncoder(vanillaEncoder);
if (DECODE_BUFFER == null)
DECODE_BUFFER = Accessors.getMethodAccessor(vanillaDecoder.getClass(),
DECODE_BUFFER = Accessors.getMethodAccessor(vanillaDecoder.getClass(),
"decode", ChannelHandlerContext.class, ByteBuf.class, List.class);
if (ENCODE_BUFFER == null)
ENCODE_BUFFER = Accessors.getMethodAccessor(vanillaEncoder.getClass(),
"encode", ChannelHandlerContext.class, Object.class, ByteBuf.class);
// Intercept sent packets
protocolEncoder = new MessageToByteEncoder<Object>() {
@Override
protected void encode(ChannelHandlerContext ctx, Object packet, ByteBuf output) throws Exception {
ChannelInjector.this.encode(ctx, packet, output);
}
@Override
public void write(ChannelHandlerContext ctx, Object packet, ChannelPromise promise) throws Exception {
super.write(ctx, packet, promise);
ChannelInjector.this.finalWrite(ctx, packet, promise);
}
};
// Intercept recieved packets
finishHandler = new ChannelInboundHandlerAdapter() {
@Override
@ -238,27 +239,27 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
ChannelInjector.this.finishRead(ctx, msg);
}
};
// Insert our handlers - note that we effectively replace the vanilla encoder/decoder
// Insert our handlers - note that we effectively replace the vanilla encoder/decoder
originalChannel.pipeline().addBefore("decoder", "protocol_lib_decoder", this);
originalChannel.pipeline().addBefore("protocol_lib_decoder", "protocol_lib_finish", finishHandler);
originalChannel.pipeline().addAfter("encoder", "protocol_lib_encoder", protocolEncoder);
// Intercept all write methods
channelField.setValue(new ChannelProxy(originalChannel, MinecraftReflection.getPacketClass()) {
@Override
protected <T> Callable<T> onMessageScheduled(final Callable<T> callable, FieldAccessor packetAccessor) {
final PacketEvent event = handleScheduled(callable, packetAccessor);
// Handle cancelled events
if (event != null && event.isCancelled())
return null;
return new Callable<T>() {
@Override
public T call() throws Exception {
T result = null;
// This field must only be updated in the pipeline thread
currentEvent = event;
result = callable.call();
@ -267,11 +268,11 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
}
};
}
@Override
protected Runnable onMessageScheduled(final Runnable runnable, FieldAccessor packetAccessor) {
final PacketEvent event = handleScheduled(runnable, packetAccessor);
// Handle cancelled events
if (event != null && event.isCancelled())
return null;
@ -285,15 +286,15 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
}
};
}
protected PacketEvent handleScheduled(Object instance, FieldAccessor accessor) {
// Let the filters handle this packet
Object original = accessor.get(instance);
// See if we've been instructed not to process packets
if (!scheduleProcessPackets.get()) {
NetworkMarker marker = getMarker(original);
if (marker != null) {
PacketEvent result = new PacketEvent(ChannelInjector.class);
result.setNetworkMarker(marker);
@ -306,7 +307,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
if (event != null && !event.isCancelled()) {
Object changed = event.getPacket().getHandle();
// Change packet to be scheduled
if (original != changed)
accessor.set(instance, changed);
@ -314,7 +315,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
return event != null ? event : BYPASSED_PACKET;
}
});
injected = true;
return true;
}
@ -328,7 +329,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
private PacketEvent processSending(Object message) {
return channelListener.onPacketSending(ChannelInjector.this, message, getMarker(message));
}
/**
* This method patches the encoder so that it skips already created packets.
* @param encoder - the encoder to patch.
@ -339,17 +340,17 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
}
ENCODER_TYPE_MATCHER.set(encoder, TypeParameterMatcher.get(MinecraftReflection.getPacketClass()));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (channelListener.isDebug())
cause.printStackTrace();
super.exceptionCaught(ctx, cause);
}
/**
* Encode a packet to a byte buffer, taking over for the standard Minecraft encoder.
* @param ctx - the current context.
* @param ctx - the current context.
* @param packet - the packet to encode to a byte array.
* @param output - the output byte array.
* @throws Exception If anything went wrong.
@ -357,26 +358,26 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
protected void encode(ChannelHandlerContext ctx, Object packet, ByteBuf output) throws Exception {
NetworkMarker marker = null;
PacketEvent event = currentEvent;
try {
// Skip every kind of non-filtered packet
if (!scheduleProcessPackets.get()) {
return;
}
// This packet has not been seen by the main thread
if (event == null) {
Class<?> clazz = packet.getClass();
// Schedule the transmission on the main thread instead
if (channelListener.hasMainThreadListener(clazz)) {
// Delay the packet
scheduleMainThread(packet);
packet = null;
} else {
event = processSending(packet);
// Handle the output
if (event != null) {
packet = !event.isCancelled() ? event.getPacket().getHandle() : null;
@ -385,9 +386,9 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
}
if (event != null) {
// Retrieve marker without accidentally constructing it
marker = NetworkMarker.getNetworkMarker(event);
marker = NetworkMarker.getNetworkMarker(event);
}
// Process output handler
if (packet != null && event != null && NetworkMarker.hasOutputHandlers(marker)) {
ByteBuf packetBuffer = ctx.alloc().buffer();
@ -395,17 +396,17 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
// Let each handler prepare the actual output
byte[] data = processor.processOutput(event, marker, getBytes(packetBuffer));
// Write the result
output.writeBytes(data);
packet = null;
// Sent listeners?
finalEvent = event;
return;
}
} catch (Exception e) {
channelListener.getReporter().reportDetailed(this,
channelListener.getReporter().reportDetailed(this,
Report.newBuilder(REPORT_CANNOT_INTERCEPT_SERVER_PACKET).callerParam(packet).error(e).build());
} finally {
// Attempt to handle the packet nevertheless
@ -424,12 +425,12 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
*/
protected void finalWrite(ChannelHandlerContext ctx, Object packet, ChannelPromise promise) {
PacketEvent event = finalEvent;
if (event != null) {
// Necessary to prevent infinite loops
finalEvent = null;
currentEvent = null;
processor.invokePostEvent(event, NetworkMarker.getNetworkMarker(event));
}
}
@ -443,30 +444,30 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
}
});
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuffer, List<Object> packets) throws Exception {
byteBuffer.markReaderIndex();
DECODE_BUFFER.invoke(vanillaDecoder, ctx, byteBuffer, packets);
try {
try {
// Reset queue
finishQueue.clear();
for (ListIterator<Object> it = packets.listIterator(); it.hasNext(); ) {
Object input = it.next();
Class<?> packetClass = input.getClass();
NetworkMarker marker = null;
// Special case!
handleLogin(packetClass, input);
if (channelListener.includeBuffer(packetClass)) {
byteBuffer.resetReaderIndex();
marker = new NettyNetworkMarker(ConnectionSide.CLIENT_SIDE, getBytes(byteBuffer));
}
PacketEvent output = channelListener.onPacketReceiving(this, input, marker);
// Handle packet changes
if (output != null) {
if (output.isCancelled()) {
@ -475,16 +476,16 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
} else if (output.getPacket().getHandle() != input) {
it.set(output.getPacket().getHandle());
}
finishQueue.addLast(output);
}
}
} catch (Exception e) {
channelListener.getReporter().reportDetailed(this,
channelListener.getReporter().reportDetailed(this,
Report.newBuilder(REPORT_CANNOT_INTERCEPT_CLIENT_PACKET).callerParam(byteBuffer).error(e).build());
}
}
/**
* Invoked after our decoder.
* @param ctx - current context.
@ -493,16 +494,16 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
protected void finishRead(ChannelHandlerContext ctx, Object msg) {
// Assume same order
PacketEvent event = finishQueue.pollFirst();
if (event != null) {
NetworkMarker marker = NetworkMarker.getNetworkMarker(event);
if (marker != null) {
processor.invokePostEvent(event, marker);
}
}
}
/**
* Invoked when we may need to handle the login packet.
* @param packetClass - the packet class.
@ -511,7 +512,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
protected void handleLogin(Class<?> packetClass, Object packet) {
Class<?> loginClass = PACKET_LOGIN_CLIENT;
FieldAccessor loginClient = LOGIN_GAME_PROFILE;
// Initialize packet class and login
if (loginClass == null) {
loginClass = PacketType.Login.Client.START.getPacketClass();
@ -521,26 +522,26 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
loginClient = Accessors.getFieldAccessor(PACKET_LOGIN_CLIENT, GameProfile.class, true);
LOGIN_GAME_PROFILE = loginClient;
}
// See if we are dealing with the login packet
if (loginClass.equals(packetClass)) {
GameProfile profile = (GameProfile) loginClient.get(packet);
// Save the channel injector
factory.cacheInjector(profile.getName(), this);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
// See NetworkManager.channelActive(ChannelHandlerContext) for why
if (channelField != null) {
channelField.refreshValue();
}
}
/**
* Retrieve every byte in the given byte buffer.
* @param buffer - the buffer.
@ -548,11 +549,11 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
*/
private byte[] getBytes(ByteBuf buffer){
byte[] data = new byte[buffer.readableBytes()];
buffer.readBytes(data);
return data;
}
/**
* Disconnect the current player.
* @param message - the disconnect message, if possible.
@ -575,7 +576,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
@Override
public void sendServerPacket(Object packet, NetworkMarker marker, boolean filtered) {
saveMarker(packet, marker);
try {
scheduleProcessPackets.set(filtered);
invokeSendPacket(packet);
@ -583,7 +584,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
scheduleProcessPackets.set(true);
}
}
/**
* Invoke the sendPacket method in Minecraft.
* @param packet - the packet to send.
@ -600,11 +601,11 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
throw new RuntimeException("Unable to send server packet " + packet, e);
}
}
@Override
public void recieveClientPacket(final Object packet) {
// TODO: Ensure the packet listeners are executed in the channel thread.
// Execute this in the channel thread
Runnable action = new Runnable() {
@Override
@ -617,7 +618,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
}
}
};
// Execute in the worker thread
if (originalChannel.eventLoop().inEventLoop()) {
action.run();
@ -625,7 +626,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
originalChannel.eventLoop().execute(action);
}
}
@Override
public Protocol getCurrentProtocol() {
if (PROTOCOL_ACCESSOR == null) {
@ -634,7 +635,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
}
return Protocol.fromVanilla((Enum<?>) PROTOCOL_ACCESSOR.get(networkManager));
}
/**
* Retrieve the player connection of the current player.
* @return The player connection.
@ -645,45 +646,47 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
}
return playerConnection;
}
@Override
public NetworkMarker getMarker(Object packet) {
return packetMarker.get(packet);
}
@Override
public void saveMarker(Object packet, NetworkMarker marker) {
if (marker != null) {
packetMarker.put(packet, marker);
}
}
@Override
public Player getPlayer() {
return player;
}
/**
* Set the player instance.
* @param player - current instance.
*/
@Override
public void setPlayer(Player player) {
this.player = player;
}
/**
* Set the updated player instance.
* @param updated - updated instance.
*/
@Override
public void setUpdatedPlayer(Player updated) {
this.updated = updated;
}
@Override
public boolean isInjected() {
return injected;
}
/**
* Determine if this channel has been closed and cleaned up.
* @return TRUE if it has, FALSE otherwise.
@ -692,29 +695,29 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
public boolean isClosed() {
return closed;
}
@Override
public void close() {
if (!closed) {
closed = true;
if (injected) {
channelField.revertValue();
// Calling remove() in the main thread will block the main thread, which may lead
// Calling remove() in the main thread will block the main thread, which may lead
// to a deadlock:
// http://pastebin.com/L3SBVKzp
//
//
// ProtocolLib executes this close() method through a PlayerQuitEvent in the main thread,
// which has implicitly aquired a lock on SimplePluginManager (see SimplePluginManager.callEvent(Event)).
// Unfortunately, the remove() method will schedule the removal on one of the Netty worker threads if
// which has implicitly aquired a lock on SimplePluginManager (see SimplePluginManager.callEvent(Event)).
// Unfortunately, the remove() method will schedule the removal on one of the Netty worker threads if
// it's called from a different thread, blocking until the removal has been confirmed.
//
// This is bad enough (Rule #1: Don't block the main thread), but the real trouble starts if the same
// worker thread happens to be handling a server ping connection when this removal task is scheduled.
// In that case, it may attempt to invoke an asynchronous ServerPingEvent (see PacketStatusListener)
// using SimplePluginManager.callEvent(). But, since this has already been locked by the main thread,
// we end up with a deadlock. The main thread is waiting for the worker thread to process the task, and
//
// This is bad enough (Rule #1: Don't block the main thread), but the real trouble starts if the same
// worker thread happens to be handling a server ping connection when this removal task is scheduled.
// In that case, it may attempt to invoke an asynchronous ServerPingEvent (see PacketStatusListener)
// using SimplePluginManager.callEvent(). But, since this has already been locked by the main thread,
// we end up with a deadlock. The main thread is waiting for the worker thread to process the task, and
// the worker thread is waiting for the main thread to finish executing PlayerQuitEvent.
//
// TLDR: Concurrenty is hard.
@ -730,10 +733,14 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
}
}
});
// Clear cache
factory.invalidate(player);
}
// dmulloy2 - attempt to fix memory leakage
this.player = null;
this.updated = null;
}
}
@ -750,13 +757,13 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
try {
command.run();
} catch (Exception e) {
ProtocolLibrary.getErrorReporter().reportDetailed(ChannelInjector.this,
ProtocolLibrary.getErrorReporter().reportDetailed(ChannelInjector.this,
Report.newBuilder(REPORT_CANNOT_EXECUTE_IN_CHANNEL_THREAD).error(e).build());
}
}
});
}
/**
* Find the first channel handler that is assignable to a given type.
* @param channel - the channel.
@ -771,14 +778,14 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
}
return null;
}
/**
* Represents a socket injector that foreards to the current channel injector.
* @author Kristian
*/
static class ChannelSocketInjector implements SocketInjector {
private final ChannelInjector injector;
public ChannelSocketInjector(ChannelInjector injector) {
this.injector = Preconditions.checkNotNull(injector, "injector cannot be NULL");
}
@ -822,7 +829,7 @@ class ChannelInjector extends ByteToMessageDecoder implements Injector {
public void setUpdatedPlayer(Player updatedPlayer) {
injector.player = updatedPlayer;
}
public ChannelInjector getChannelInjector() {
return injector;
}