Replace ThreadLocal scheduleProcessPackets with queue, fixes #763

This commit is contained in:
Irmo van den Berge 2020-03-01 13:07:14 +01:00
parent 4baa4aa724
commit ab5fb40f8f
2 changed files with 91 additions and 9 deletions

View File

@ -126,9 +126,9 @@ public class ChannelInjector extends ByteToMessageDecoder implements Injector {
private PacketEvent finalEvent;
/**
* A flag set by the main thread to indiciate that a packet should not be processed.
* A queue of packets that were sent with filtered=false
*/
private final ThreadLocal<Boolean> scheduleProcessPackets = ThreadLocal.withInitial(() -> true);
private final PacketFilterQueue unfilteredProcessedPackets = new PacketFilterQueue();
// Other handlers
private ByteToMessageDecoder vanillaDecoder;
@ -328,7 +328,7 @@ public class ChannelInjector extends ByteToMessageDecoder implements Injector {
Object original = accessor.get(instance);
// See if we've been instructed not to process packets
if (!scheduleProcessPackets.get()) {
if (unfilteredProcessedPackets.contains(original)) {
NetworkMarker marker = getMarker(original);
if (marker != null) {
@ -416,7 +416,7 @@ public class ChannelInjector extends ByteToMessageDecoder implements Injector {
try {
// Skip every kind of non-filtered packet
if (!scheduleProcessPackets.get()) {
if (unfilteredProcessedPackets.remove(packet)) {
return;
}
@ -663,12 +663,11 @@ public class ChannelInjector extends ByteToMessageDecoder implements Injector {
public void sendServerPacket(Object packet, NetworkMarker marker, boolean filtered) {
saveMarker(packet, marker);
try {
scheduleProcessPackets.set(filtered);
invokeSendPacket(packet);
} finally {
scheduleProcessPackets.set(true);
if (!filtered) {
unfilteredProcessedPackets.add(packet);
}
invokeSendPacket(packet);
}
/**

View File

@ -0,0 +1,83 @@
/**
* ProtocolLib - Bukkit server library that allows access to the Minecraft protocol.
* Copyright (C) 2015 dmulloy2
*
* This program is free software; you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with this program;
* if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
* 02111-1307 USA
*/
package com.comphenix.protocol.injector.netty;
import java.util.ArrayDeque;
import java.util.Queue;
/**
* Stores packets that need to be sent without being handled by the listeners (filtered=false).
* When other packets sent after sending the packet are removed, the packet is removed as well
* to prevent a memory leak, assuming a consistent send order is in place.
*
* @author bergerkiller
*/
public class PacketFilterQueue {
private Queue<Object> queue = new ArrayDeque<>();
/**
* Adds a packet to this queue, indicating further on that it should not be filtered.
*
* @param packet
*/
public synchronized void add(Object packet) {
queue.add(packet);
}
/**
* Checks whether a packet is contained inside this queue, indicating
* it should not be filtered.
*
* @param packet
* @return True if contained and packet should not be filtered (filtered=false)
*/
public synchronized boolean contains(Object packet) {
return queue.contains(packet);
}
/**
* Checks whether a packet is contained inside this queue and removes it if so.
* Other packets marked in this queue that were sent before this packet are
* removed from the queue also, avoiding memory leaks because of dropped packets.
*
* @param packet
* @return True if contained and packet should not be filtered (filtered=false)
*/
public synchronized boolean remove(Object packet) {
if (queue.isEmpty()) {
// Nothing in the queue
return false;
} else if (queue.peek() == packet) {
// First in the queue (expected)
queue.poll();
return true;
} else if (!queue.contains(packet)) {
// There are unfiltered packets, but this one is not
return false;
} else {
// We have skipped over some packets (unexpected)
// Poll packets until we find it
while (queue.poll() != packet) {
if (queue.isEmpty()) {
// This should never happen! But to avoid infinite loop.
return false;
}
}
return true;
}
}
}