From ab5fb40f8fa324036f74245c8a0baca421f091de Mon Sep 17 00:00:00 2001 From: Irmo van den Berge Date: Sun, 1 Mar 2020 13:07:14 +0100 Subject: [PATCH] Replace ThreadLocal scheduleProcessPackets with queue, fixes #763 --- .../injector/netty/ChannelInjector.java | 17 ++-- .../injector/netty/PacketFilterQueue.java | 83 +++++++++++++++++++ 2 files changed, 91 insertions(+), 9 deletions(-) create mode 100644 src/main/java/com/comphenix/protocol/injector/netty/PacketFilterQueue.java diff --git a/src/main/java/com/comphenix/protocol/injector/netty/ChannelInjector.java b/src/main/java/com/comphenix/protocol/injector/netty/ChannelInjector.java index 77c9033c..cb5e6235 100644 --- a/src/main/java/com/comphenix/protocol/injector/netty/ChannelInjector.java +++ b/src/main/java/com/comphenix/protocol/injector/netty/ChannelInjector.java @@ -126,9 +126,9 @@ public class ChannelInjector extends ByteToMessageDecoder implements Injector { private PacketEvent finalEvent; /** - * A flag set by the main thread to indiciate that a packet should not be processed. + * A queue of packets that were sent with filtered=false */ - private final ThreadLocal scheduleProcessPackets = ThreadLocal.withInitial(() -> true); + private final PacketFilterQueue unfilteredProcessedPackets = new PacketFilterQueue(); // Other handlers private ByteToMessageDecoder vanillaDecoder; @@ -328,7 +328,7 @@ public class ChannelInjector extends ByteToMessageDecoder implements Injector { Object original = accessor.get(instance); // See if we've been instructed not to process packets - if (!scheduleProcessPackets.get()) { + if (unfilteredProcessedPackets.contains(original)) { NetworkMarker marker = getMarker(original); if (marker != null) { @@ -416,7 +416,7 @@ public class ChannelInjector extends ByteToMessageDecoder implements Injector { try { // Skip every kind of non-filtered packet - if (!scheduleProcessPackets.get()) { + if (unfilteredProcessedPackets.remove(packet)) { return; } @@ -663,12 +663,11 @@ public class ChannelInjector extends ByteToMessageDecoder implements Injector { public void sendServerPacket(Object packet, NetworkMarker marker, boolean filtered) { saveMarker(packet, marker); - try { - scheduleProcessPackets.set(filtered); - invokeSendPacket(packet); - } finally { - scheduleProcessPackets.set(true); + if (!filtered) { + unfilteredProcessedPackets.add(packet); } + + invokeSendPacket(packet); } /** diff --git a/src/main/java/com/comphenix/protocol/injector/netty/PacketFilterQueue.java b/src/main/java/com/comphenix/protocol/injector/netty/PacketFilterQueue.java new file mode 100644 index 00000000..6f56f9b6 --- /dev/null +++ b/src/main/java/com/comphenix/protocol/injector/netty/PacketFilterQueue.java @@ -0,0 +1,83 @@ +/** + * ProtocolLib - Bukkit server library that allows access to the Minecraft protocol. + * Copyright (C) 2015 dmulloy2 + * + * This program is free software; you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation; either version 2 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; + * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with this program; + * if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA + * 02111-1307 USA + */ +package com.comphenix.protocol.injector.netty; + +import java.util.ArrayDeque; +import java.util.Queue; + +/** + * Stores packets that need to be sent without being handled by the listeners (filtered=false). + * When other packets sent after sending the packet are removed, the packet is removed as well + * to prevent a memory leak, assuming a consistent send order is in place. + * + * @author bergerkiller + */ +public class PacketFilterQueue { + private Queue queue = new ArrayDeque<>(); + + /** + * Adds a packet to this queue, indicating further on that it should not be filtered. + * + * @param packet + */ + public synchronized void add(Object packet) { + queue.add(packet); + } + + /** + * Checks whether a packet is contained inside this queue, indicating + * it should not be filtered. + * + * @param packet + * @return True if contained and packet should not be filtered (filtered=false) + */ + public synchronized boolean contains(Object packet) { + return queue.contains(packet); + } + + /** + * Checks whether a packet is contained inside this queue and removes it if so. + * Other packets marked in this queue that were sent before this packet are + * removed from the queue also, avoiding memory leaks because of dropped packets. + * + * @param packet + * @return True if contained and packet should not be filtered (filtered=false) + */ + public synchronized boolean remove(Object packet) { + if (queue.isEmpty()) { + // Nothing in the queue + return false; + } else if (queue.peek() == packet) { + // First in the queue (expected) + queue.poll(); + return true; + } else if (!queue.contains(packet)) { + // There are unfiltered packets, but this one is not + return false; + } else { + // We have skipped over some packets (unexpected) + // Poll packets until we find it + while (queue.poll() != packet) { + if (queue.isEmpty()) { + // This should never happen! But to avoid infinite loop. + return false; + } + } + return true; + } + } +}