Added a timeout listener.

This commit is contained in:
Kristian S. Stangeland 2012-11-04 01:10:05 +01:00
parent 6c8bda24fd
commit 2f2eb148fa
6 changed files with 124 additions and 13 deletions

View File

@ -99,4 +99,22 @@ public interface AsynchronousManager {
* @param packet - packet to signal.
*/
public abstract void signalPacketTransmission(PacketEvent packet);
/**
* Register a synchronous listener that handles packets when they time out.
* @param listener - synchronous listener that will handle timed out packets.
*/
public abstract void registerTimeoutHandler(PacketListener listener);
/**
* Unregisters a given timeout listener.
* @param listener - the timeout listener to unregister.
*/
public abstract void unregisterTimeoutHandler(PacketListener listener);
/**
* Get a immutable list of every registered timeout handler.
* @return List of every registered timeout handler.
*/
public abstract Set<PacketListener> getTimeoutHandlers();
}

View File

@ -20,6 +20,7 @@ package com.comphenix.protocol.async;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.bukkit.plugin.Plugin;
@ -34,7 +35,10 @@ import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.events.PacketListener;
import com.comphenix.protocol.injector.PacketFilterManager;
import com.comphenix.protocol.injector.PrioritizedListener;
import com.comphenix.protocol.injector.SortedPacketListenerList;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
/**
* Represents a filter manager for asynchronous packets.
@ -43,9 +47,14 @@ import com.google.common.base.Objects;
*/
public class AsyncFilterManager implements AsynchronousManager {
private SortedPacketListenerList serverTimeoutListeners;
private SortedPacketListenerList clientTimeoutListeners;
private Set<PacketListener> timeoutListeners;
private PacketProcessingQueue serverProcessingQueue;
private PacketSendingQueue serverQueue;
private PacketProcessingQueue clientProcessingQueue;
private PacketSendingQueue clientQueue;
@ -68,11 +77,30 @@ public class AsyncFilterManager implements AsynchronousManager {
public AsyncFilterManager(ErrorReporter reporter, BukkitScheduler scheduler, ProtocolManager manager) {
// Initialize timeout listeners
serverTimeoutListeners = new SortedPacketListenerList();
clientTimeoutListeners = new SortedPacketListenerList();
timeoutListeners = Sets.newSetFromMap(new ConcurrentHashMap<PacketListener, Boolean>());
// Server packets are synchronized already
this.serverQueue = new PacketSendingQueue(false);
this.serverQueue = new PacketSendingQueue(false) {
@Override
protected void onPacketTimeout(PacketEvent event) {
if (!cleaningUp) {
serverTimeoutListeners.invokePacketSending(AsyncFilterManager.this.reporter, event);
}
}
};
// Client packets must be synchronized
this.clientQueue = new PacketSendingQueue(true);
this.clientQueue = new PacketSendingQueue(true) {
@Override
protected void onPacketTimeout(PacketEvent event) {
if (!cleaningUp) {
clientTimeoutListeners.invokePacketSending(AsyncFilterManager.this.reporter, event);
}
}
};
this.serverProcessingQueue = new PacketProcessingQueue(serverQueue);
this.clientProcessingQueue = new PacketProcessingQueue(clientQueue);
@ -89,6 +117,27 @@ public class AsyncFilterManager implements AsynchronousManager {
return registerAsyncHandler(listener, true);
}
@Override
public void registerTimeoutHandler(PacketListener listener) {
if (listener == null)
throw new IllegalArgumentException("listener cannot be NULL.");
if (!timeoutListeners.add(listener))
return;
ListeningWhitelist sending = listener.getSendingWhitelist();
ListeningWhitelist receiving = listener.getReceivingWhitelist();
if (!ListeningWhitelist.isEmpty(sending))
serverTimeoutListeners.addListener(listener, sending);
if (!ListeningWhitelist.isEmpty(receiving))
serverTimeoutListeners.addListener(listener, receiving);
}
@Override
public Set<PacketListener> getTimeoutHandlers() {
return ImmutableSet.copyOf(timeoutListeners);
}
/**
* Registers an asynchronous packet handler.
* <p>
@ -131,6 +180,21 @@ public class AsyncFilterManager implements AsynchronousManager {
return whitelist != null && whitelist.getWhitelist().size() > 0;
}
@Override
public void unregisterTimeoutHandler(PacketListener listener) {
if (listener == null)
throw new IllegalArgumentException("listener cannot be NULL.");
ListeningWhitelist sending = listener.getSendingWhitelist();
ListeningWhitelist receiving = listener.getReceivingWhitelist();
// Do it in the opposite order
if (serverTimeoutListeners.removeListener(listener, sending).size() > 0 ||
clientTimeoutListeners.removeListener(listener, receiving).size() > 0) {
timeoutListeners.remove(listener);
}
}
@Override
public void unregisterAsyncHandler(AsyncListenerHandler handler) {
if (handler == null)
@ -276,6 +340,10 @@ public class AsyncFilterManager implements AsynchronousManager {
cleaningUp = true;
serverProcessingQueue.cleanupAll();
serverQueue.cleanupAll();
timeoutListeners.clear();
serverTimeoutListeners = null;
clientTimeoutListeners = null;
}
@Override
@ -333,7 +401,6 @@ public class AsyncFilterManager implements AsynchronousManager {
* Send any due packets, or clean up packets that have expired.
*/
public void sendProcessedPackets(int tickCounter, boolean onMainThread) {
// The server queue is unlikely to need checking that often
if (tickCounter % 10 == 0) {
serverQueue.trySendPackets(onMainThread);

View File

@ -418,7 +418,7 @@ public class AsyncMarker implements Serializable, Comparable<AsyncMarker> {
// We're in 1.2.5
alwaysSync = true;
} else {
System.err.println("Cannot determine asynchronous state of packets!");
System.err.println("[ProtocolLib] Cannot determine asynchronous state of packets!");
alwaysSync = true;
}
}

View File

@ -27,13 +27,14 @@ import org.bukkit.entity.Player;
import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.injector.PlayerLoggedOutException;
import com.comphenix.protocol.injector.SortedPacketListenerList;
import com.comphenix.protocol.reflect.FieldAccessException;
/**
* Represents packets ready to be transmitted to a client.
* @author Kristian
*/
class PacketSendingQueue {
abstract class PacketSendingQueue {
public static final int INITIAL_CAPACITY = 64;
@ -77,7 +78,7 @@ class PacketSendingQueue {
AsyncMarker marker = packetUpdated.getAsyncMarker();
// Should we reorder the event?
if (marker.getQueuedSendingIndex() != marker.getNewSendingIndex()) {
if (marker.getQueuedSendingIndex() != marker.getNewSendingIndex() && !marker.hasExpired()) {
PacketEvent copy = PacketEvent.fromSynchronous(packetUpdated, marker);
// "Cancel" the original event
@ -127,6 +128,7 @@ class PacketSendingQueue {
if (holder != null) {
PacketEvent current = holder.getEvent();
AsyncMarker marker = current.getAsyncMarker();
boolean hasExpired = marker.hasExpired();
// Abort if we're not on the main thread
if (synchronizeMain) {
@ -144,8 +146,16 @@ class PacketSendingQueue {
}
}
if (marker.isProcessed() || marker.hasExpired()) {
if (marker.isProcessed() && !current.isCancelled()) {
if (marker.isProcessed() || hasExpired) {
if (hasExpired) {
// Notify timeout listeners
onPacketTimeout(current);
// Recompute
marker = current.getAsyncMarker();
hasExpired = marker.hasExpired();
}
if (marker.isProcessed() && !current.isCancelled() && !hasExpired) {
// Silently skip players that have logged out
if (isOnline(current.getPlayer())) {
sendPacket(current);
@ -162,6 +172,12 @@ class PacketSendingQueue {
}
}
/**
* Invoked when a packet has timed out.
* @param event - the timed out packet.
*/
protected abstract void onPacketTimeout(PacketEvent event);
private boolean isOnline(Player player) {
return player != null && player.isOnline();
}

View File

@ -108,8 +108,8 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok
private PlayerInjectionHandler playerInjection;
// The two listener containers
private SortedPacketListenerList recievedListeners = new SortedPacketListenerList();
private SortedPacketListenerList sendingListeners = new SortedPacketListenerList();
private SortedPacketListenerList recievedListeners;
private SortedPacketListenerList sendingListeners;
// Whether or not this class has been closed
private volatile boolean hasClosed;
@ -150,6 +150,10 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok
// Just boilerplate
final DelayedSingleTask finalUnhookTask = unhookTask;
// Listener containers
this.recievedListeners = new SortedPacketListenerList();
this.sendingListeners = new SortedPacketListenerList();
// References
this.unhookTask = unhookTask;
this.server = server;
@ -366,12 +370,16 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok
@Override
public void invokePacketRecieving(PacketEvent event) {
handlePacket(recievedListeners, event, false);
if (!hasClosed) {
handlePacket(recievedListeners, event, false);
}
}
@Override
public void invokePacketSending(PacketEvent event) {
handlePacket(sendingListeners, event, true);
if (!hasClosed) {
handlePacket(sendingListeners, event, true);
}
}
/**
@ -812,6 +820,8 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok
// Remove listeners
packetListeners.clear();
recievedListeners = null;
sendingListeners = null;
// Clean up async handlers. We have to do this last.
asyncFilterManager.cleanupAll();

View File

@ -29,7 +29,7 @@ import com.comphenix.protocol.events.PacketListener;
*
* @author Kristian
*/
class SortedPacketListenerList extends AbstractConcurrentListenerMultimap<PacketListener> {
public final class SortedPacketListenerList extends AbstractConcurrentListenerMultimap<PacketListener> {
/**
* Invokes the given packet event for every registered listener.