Use a separate asynchronous sending queue for every online player.

This ensures that packets intended for player A doesn't have to wait
for the packets of player B to be finished processing.
This commit is contained in:
Kristian S. Stangeland 2012-11-20 03:23:43 +01:00
parent f8af92eb5b
commit 7b9d971238
6 changed files with 376 additions and 116 deletions

View File

@ -23,6 +23,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.bukkit.entity.Player;
import org.bukkit.plugin.Plugin;
import org.bukkit.scheduler.BukkitScheduler;
@ -52,12 +53,12 @@ public class AsyncFilterManager implements AsynchronousManager {
private Set<PacketListener> timeoutListeners;
private PacketProcessingQueue serverProcessingQueue;
private PacketSendingQueue serverQueue;
private PacketProcessingQueue clientProcessingQueue;
private PacketSendingQueue clientQueue;
// Sending queues
private PlayerSendingHandler playerSendingHandler;
// Report exceptions
private ErrorReporter reporter;
// The likely main thread
@ -72,9 +73,6 @@ public class AsyncFilterManager implements AsynchronousManager {
// Current packet index
private AtomicInteger currentSendingIndex = new AtomicInteger();
// Whether or not we're currently cleaning up
private volatile boolean cleaningUp;
public AsyncFilterManager(ErrorReporter reporter, BukkitScheduler scheduler, ProtocolManager manager) {
// Initialize timeout listeners
@ -82,28 +80,9 @@ public class AsyncFilterManager implements AsynchronousManager {
clientTimeoutListeners = new SortedPacketListenerList();
timeoutListeners = Sets.newSetFromMap(new ConcurrentHashMap<PacketListener, Boolean>());
// Server packets are synchronized already
this.serverQueue = new PacketSendingQueue(false) {
@Override
protected void onPacketTimeout(PacketEvent event) {
if (!cleaningUp) {
serverTimeoutListeners.invokePacketSending(AsyncFilterManager.this.reporter, event);
}
}
};
// Client packets must be synchronized
this.clientQueue = new PacketSendingQueue(true) {
@Override
protected void onPacketTimeout(PacketEvent event) {
if (!cleaningUp) {
clientTimeoutListeners.invokePacketSending(AsyncFilterManager.this.reporter, event);
}
}
};
this.serverProcessingQueue = new PacketProcessingQueue(serverQueue);
this.clientProcessingQueue = new PacketProcessingQueue(clientQueue);
this.playerSendingHandler = new PlayerSendingHandler(reporter, serverTimeoutListeners, clientTimeoutListeners);
this.serverProcessingQueue = new PacketProcessingQueue(playerSendingHandler);
this.clientProcessingQueue = new PacketProcessingQueue(playerSendingHandler);
this.scheduler = scheduler;
this.manager = manager;
@ -219,15 +198,12 @@ public class AsyncFilterManager implements AsynchronousManager {
List<Integer> removed = serverProcessingQueue.removeListener(handler, listener.getSendingWhitelist());
// We're already taking care of this, so don't do anything
if (!cleaningUp)
serverQueue.signalPacketUpdate(removed, synchronusOK);
playerSendingHandler.sendServerPackets(removed, synchronusOK);
}
if (hasValidWhitelist(listener.getReceivingWhitelist())) {
List<Integer> removed = clientProcessingQueue.removeListener(handler, listener.getReceivingWhitelist());
if (!cleaningUp)
clientQueue.signalPacketUpdate(removed, synchronusOK);
playerSendingHandler.sendClientPackets(removed, synchronusOK);
}
}
@ -337,11 +313,10 @@ public class AsyncFilterManager implements AsynchronousManager {
@Override
public void cleanupAll() {
cleaningUp = true;
serverProcessingQueue.cleanupAll();
serverQueue.cleanupAll();
playerSendingHandler.cleanupAll();
timeoutListeners.clear();
serverTimeoutListeners = null;
clientTimeoutListeners = null;
}
@ -367,7 +342,11 @@ public class AsyncFilterManager implements AsynchronousManager {
// Only send if the packet is ready
if (marker.decrementProcessingDelay() == 0) {
getSendingQueue(packet).signalPacketUpdate(packet, onMainThread);
PacketSendingQueue queue = getSendingQueue(packet, false);
// No need to create a new queue if the player has logged out
if (queue != null)
queue.signalPacketUpdate(packet, onMainThread);
}
}
@ -376,8 +355,27 @@ public class AsyncFilterManager implements AsynchronousManager {
* @param packet - the packet.
* @return The server or client sending queue the packet belongs to.
*/
private PacketSendingQueue getSendingQueue(PacketEvent packet) {
return packet.isServerPacket() ? serverQueue : clientQueue;
public PacketSendingQueue getSendingQueue(PacketEvent packet) {
return playerSendingHandler.getSendingQueue(packet);
}
/**
* Retrieve the sending queue this packet belongs to.
* @param packet - the packet.
* @param createNew - if TRUE, create a new queue if it hasn't already been created.
* @return The server or client sending queue the packet belongs to.
*/
public PacketSendingQueue getSendingQueue(PacketEvent packet, boolean createNew) {
return playerSendingHandler.getSendingQueue(packet, createNew);
}
/**
* Retrieve the processing queue this packet belongs to.
* @param packet - the packet.
* @return The server or client sending processing the packet belongs to.
*/
public PacketProcessingQueue getProcessingQueue(PacketEvent packet) {
return packet.isServerPacket() ? serverProcessingQueue : clientProcessingQueue;
}
/**
@ -388,24 +386,23 @@ public class AsyncFilterManager implements AsynchronousManager {
getProcessingQueue(packet).signalProcessingDone();
}
/**
* Retrieve the processing queue this packet belongs to.
* @param packet - the packet.
* @return The server or client sending processing the packet belongs to.
*/
private PacketProcessingQueue getProcessingQueue(PacketEvent packet) {
return packet.isServerPacket() ? serverProcessingQueue : clientProcessingQueue;
}
/**
* Send any due packets, or clean up packets that have expired.
*/
public void sendProcessedPackets(int tickCounter, boolean onMainThread) {
// The server queue is unlikely to need checking that often
if (tickCounter % 10 == 0) {
serverQueue.trySendPackets(onMainThread);
playerSendingHandler.trySendServerPackets(onMainThread);
}
clientQueue.trySendPackets(onMainThread);
playerSendingHandler.trySendClientPackets(onMainThread);
}
/**
* Clean up after a given player has logged out.
* @param player - the player that has just logged out.
*/
public void removePlayer(Player player) {
playerSendingHandler.removePlayer(player);
}
}

View File

@ -58,13 +58,13 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap<AsyncList
private Queue<PacketEventHolder> processingQueue;
// Packets for sending
private PacketSendingQueue sendingQueue;
private PlayerSendingHandler sendingHandler;
public PacketProcessingQueue(PacketSendingQueue sendingQueue) {
this(sendingQueue, INITIAL_CAPACITY, DEFAULT_QUEUE_LIMIT, DEFAULT_MAXIMUM_CONCURRENCY);
public PacketProcessingQueue(PlayerSendingHandler sendingHandler) {
this(sendingHandler, INITIAL_CAPACITY, DEFAULT_QUEUE_LIMIT, DEFAULT_MAXIMUM_CONCURRENCY);
}
public PacketProcessingQueue(PacketSendingQueue sendingQueue, int initialSize, int maximumSize, int maximumConcurrency) {
public PacketProcessingQueue(PlayerSendingHandler sendingHandler, int initialSize, int maximumSize, int maximumConcurrency) {
super();
this.processingQueue = Synchronization.queue(MinMaxPriorityQueue.
@ -74,7 +74,7 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap<AsyncList
this.maximumConcurrency = maximumConcurrency;
this.concurrentProcessing = new Semaphore(maximumConcurrency);
this.sendingQueue = sendingQueue;
this.sendingHandler = sendingHandler;
}
/**
@ -131,8 +131,13 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap<AsyncList
}
// The packet has no further listeners. Just send it.
if (marker.decrementProcessingDelay() == 0)
if (marker.decrementProcessingDelay() == 0) {
PacketSendingQueue sendingQueue = sendingHandler.getSendingQueue(packet, false);
// In case the player has logged out
if (sendingQueue != null)
sendingQueue.signalPacketUpdate(packet, onMainThread);
}
signalProcessingDone();
} else {
@ -168,5 +173,8 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap<AsyncList
// Remove the rest, just in case
clearListeners();
// Remove every packet in the queue
processingQueue.clear();
}
}

View File

@ -35,7 +35,7 @@ import com.comphenix.protocol.reflect.FieldAccessException;
*/
abstract class PacketSendingQueue {
public static final int INITIAL_CAPACITY = 64;
public static final int INITIAL_CAPACITY = 10;
private PriorityBlockingQueue<PacketEventHolder> sendingQueue;

View File

@ -0,0 +1,217 @@
package com.comphenix.protocol.async;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.bukkit.entity.Player;
import com.comphenix.protocol.error.ErrorReporter;
import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.injector.SortedPacketListenerList;
/**
* Contains every sending queue for every player.
*
* @author Kristian
*/
class PlayerSendingHandler {
private ErrorReporter reporter;
private ConcurrentHashMap<String, QueueContainer> playerSendingQueues;
// Timeout listeners
private SortedPacketListenerList serverTimeoutListeners;
private SortedPacketListenerList clientTimeoutListeners;
// Whether or not we're currently cleaning up
private volatile boolean cleaningUp;
/**
* Sending queues for a given player.
*
* @author Kristian
*/
private class QueueContainer {
private PacketSendingQueue serverQueue;
private PacketSendingQueue clientQueue;
public QueueContainer() {
// Server packets are synchronized already
serverQueue = new PacketSendingQueue(false) {
@Override
protected void onPacketTimeout(PacketEvent event) {
if (!cleaningUp) {
serverTimeoutListeners.invokePacketSending(reporter, event);
}
}
};
// Client packets must be synchronized
clientQueue = new PacketSendingQueue(true) {
@Override
protected void onPacketTimeout(PacketEvent event) {
if (!cleaningUp) {
clientTimeoutListeners.invokePacketSending(reporter, event);
}
}
};
}
public PacketSendingQueue getServerQueue() {
return serverQueue;
}
public PacketSendingQueue getClientQueue() {
return clientQueue;
}
}
public PlayerSendingHandler(ErrorReporter reporter,
SortedPacketListenerList serverTimeoutListeners, SortedPacketListenerList clientTimeoutListeners) {
this.reporter = reporter;
this.serverTimeoutListeners = serverTimeoutListeners;
this.clientTimeoutListeners = clientTimeoutListeners;
// Initialize storage of queues
playerSendingQueues = new ConcurrentHashMap<String, QueueContainer>();
}
/**
* Retrieve the sending queue this packet belongs to.
* @param packet - the packet.
* @return The server or client sending queue the packet belongs to.
*/
public PacketSendingQueue getSendingQueue(PacketEvent packet) {
return getSendingQueue(packet, true);
}
/**
* Retrieve the sending queue this packet belongs to.
* @param packet - the packet.
* @param createNew - if TRUE, create a new queue if it hasn't already been created.
* @return The server or client sending queue the packet belongs to.
*/
public PacketSendingQueue getSendingQueue(PacketEvent packet, boolean createNew) {
String name = packet.getPlayer().getName();
QueueContainer queues = playerSendingQueues.get(name);
// Safe concurrent initialization
if (queues == null && createNew) {
QueueContainer previous = playerSendingQueues.putIfAbsent(name, new QueueContainer());
if (previous != null)
queues = previous;
}
// Check for NULL again
if (queues != null)
return packet.isServerPacket() ? queues.getServerQueue() : queues.getClientQueue();
else
return null;
}
/**
* Send all pending packets.
*/
public void sendAllPackets() {
if (!cleaningUp) {
for (QueueContainer queues : playerSendingQueues.values()) {
queues.getClientQueue().cleanupAll();
queues.getServerQueue().cleanupAll();
}
}
}
/**
* Immediately send every server packet with the given list of IDs.
* @param ids - ID of every packet to send immediately.
* @param synchronusOK - whether or not we're running on the main thread.
*/
public void sendServerPackets(List<Integer> ids, boolean synchronusOK) {
if (!cleaningUp) {
for (QueueContainer queue : playerSendingQueues.values()) {
queue.getServerQueue().signalPacketUpdate(ids, synchronusOK);
}
}
}
/**
* Immediately send every client packet with the given list of IDs.
* @param ids - ID of every packet to send immediately.
* @param synchronusOK - whether or not we're running on the main thread.
*/
public void sendClientPackets(List<Integer> ids, boolean synchronusOK) {
if (!cleaningUp) {
for (QueueContainer queue : playerSendingQueues.values()) {
queue.getClientQueue().signalPacketUpdate(ids, synchronusOK);
}
}
}
/**
* Send any outstanding server packets.
* @param onMainThread - whether or not this is occuring on the main thread.
*/
public void trySendServerPackets(boolean onMainThread) {
for (QueueContainer queue : playerSendingQueues.values()) {
queue.getServerQueue().trySendPackets(onMainThread);
}
}
/**
* Send any outstanding server packets.
* @param onMainThread - whether or not this is occuring on the main thread.
*/
public void trySendClientPackets(boolean onMainThread) {
for (QueueContainer queue : playerSendingQueues.values()) {
queue.getClientQueue().trySendPackets(onMainThread);
}
}
/**
* Retrieve every server packet queue for every player.
* @return Every sever packet queue.
*/
public List<PacketSendingQueue> getServerQueues() {
List<PacketSendingQueue> result = new ArrayList<PacketSendingQueue>();
for (QueueContainer queue : playerSendingQueues.values())
result.add(queue.getServerQueue());
return result;
}
/**
* Retrieve every client packet queue for every player.
* @return Every client packet queue.
*/
public List<PacketSendingQueue> getClientQueues() {
List<PacketSendingQueue> result = new ArrayList<PacketSendingQueue>();
for (QueueContainer queue : playerSendingQueues.values())
result.add(queue.getClientQueue());
return result;
}
/**
* Send all pending packets and clean up queues.
*/
public void cleanupAll() {
cleaningUp = true;
sendAllPackets();
playerSendingQueues.clear();
}
/**
* Invoked when a player has just logged out.
* @param player - the player that just logged out.
*/
public void removePlayer(Player player) {
String name = player.getName();
// Every packet will be dropped - there's nothing we can do
playerSendingQueues.remove(name);
}
}

View File

@ -20,6 +20,7 @@ package com.comphenix.protocol.events;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.ref.WeakReference;
import java.util.EventObject;
import org.bukkit.entity.Player;
@ -33,7 +34,9 @@ public class PacketEvent extends EventObject implements Cancellable {
*/
private static final long serialVersionUID = -5360289379097430620L;
private transient Player player;
private transient WeakReference<Player> playerReference;
private transient Player offlinePlayer;
private PacketContainer packet;
private boolean serverPacket;
private boolean cancel;
@ -52,14 +55,14 @@ public class PacketEvent extends EventObject implements Cancellable {
private PacketEvent(Object source, PacketContainer packet, Player player, boolean serverPacket) {
super(source);
this.packet = packet;
this.player = player;
this.playerReference = new WeakReference<Player>(player);
this.serverPacket = serverPacket;
}
private PacketEvent(PacketEvent origial, AsyncMarker asyncMarker) {
super(origial.source);
this.packet = origial.packet;
this.player = origial.player;
this.playerReference = origial.playerReference;
this.cancel = origial.cancel;
this.serverPacket = origial.serverPacket;
this.asyncMarker = asyncMarker;
@ -143,7 +146,7 @@ public class PacketEvent extends EventObject implements Cancellable {
* @return The player associated with this event.
*/
public Player getPlayer() {
return player;
return playerReference.get();
}
/**
@ -197,18 +200,20 @@ public class PacketEvent extends EventObject implements Cancellable {
output.defaultWriteObject();
// Write the name of the player (or NULL if it's not set)
output.writeObject(player != null ? new SerializedOfflinePlayer(player) : null);
output.writeObject(playerReference.get() != null ? new SerializedOfflinePlayer(playerReference.get()) : null);
}
private void readObject(ObjectInputStream input) throws ClassNotFoundException, IOException {
// Default deserialization
input.defaultReadObject();
final SerializedOfflinePlayer offlinePlayer = (SerializedOfflinePlayer) input.readObject();
final SerializedOfflinePlayer serialized = (SerializedOfflinePlayer) input.readObject();
if (offlinePlayer != null) {
// Better than nothing
player = offlinePlayer.getPlayer();
if (serialized != null) {
// Store it, to prevent weak reference from cleaning up the reference
offlinePlayer = serialized.getPlayer();
playerReference = new WeakReference<Player>(offlinePlayer);
}
}
}

View File

@ -598,8 +598,32 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok
try {
manager.registerEvents(new Listener() {
@EventHandler(priority = EventPriority.LOWEST, ignoreCancelled = true)
@EventHandler(priority = EventPriority.LOWEST)
public void onPrePlayerJoin(PlayerJoinEvent event) {
PacketFilterManager.this.onPrePlayerJoin(event);
}
@EventHandler(priority = EventPriority.MONITOR)
public void onPlayerJoin(PlayerJoinEvent event) {
PacketFilterManager.this.onPlayerJoin(event);
}
@EventHandler(priority = EventPriority.MONITOR)
public void onPlayerQuit(PlayerQuitEvent event) {
PacketFilterManager.this.onPlayerQuit(event);
}
@EventHandler(priority = EventPriority.MONITOR)
public void onPluginDisabled(PluginDisableEvent event) {
PacketFilterManager.this.onPluginDisabled(event, plugin);
}
}, plugin);
} catch (NoSuchMethodError e) {
// Oh wow! We're running on 1.0.0 or older.
registerOld(manager, plugin);
}
}
private void onPrePlayerJoin(PlayerJoinEvent event) {
try {
// Let's clean up the other injection first.
playerInjection.uninjectPlayer(event.getPlayer().getAddress());
@ -608,8 +632,7 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok
}
}
@EventHandler(priority = EventPriority.MONITOR, ignoreCancelled = true)
public void onPlayerJoin(PlayerJoinEvent event) {
private void onPlayerJoin(PlayerJoinEvent event) {
try {
// This call will be ignored if no listeners are registered
playerInjection.injectPlayer(event.getPlayer());
@ -618,21 +641,22 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok
}
}
@EventHandler(priority = EventPriority.MONITOR, ignoreCancelled = true)
public void onPlayerQuit(PlayerQuitEvent event) {
private void onPlayerQuit(PlayerQuitEvent event) {
try {
playerInjection.handleDisconnect(event.getPlayer());
playerInjection.uninjectPlayer(event.getPlayer());
Player player = event.getPlayer();
asyncFilterManager.removePlayer(player);
playerInjection.handleDisconnect(player);
playerInjection.uninjectPlayer(player);
} catch (Exception e) {
reporter.reportDetailed(PacketFilterManager.this, "Unable to uninject logged off player.", e, event);
}
}
@EventHandler(priority = EventPriority.MONITOR, ignoreCancelled = true)
public void onPluginDisabled(PluginDisableEvent event) {
private void onPluginDisabled(PluginDisableEvent event, Plugin protocolLibrary) {
try {
// Clean up in case the plugin forgets
if (event.getPlugin() != plugin) {
if (event.getPlugin() != protocolLibrary) {
removePacketListeners(event.getPlugin());
}
} catch (Exception e) {
@ -640,14 +664,6 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok
}
}
}, plugin);
} catch (NoSuchMethodError e) {
// Oh wow! We're running on 1.0.0 or older.
registerOld(manager, plugin);
}
}
/**
* Retrieve the number of listeners that expect packets during playing.
* @return Number of listeners.
@ -689,7 +705,7 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok
// Yes, this is crazy.
@SuppressWarnings({ "unchecked", "rawtypes" })
private void registerOld(PluginManager manager, Plugin plugin) {
private void registerOld(PluginManager manager, final Plugin plugin) {
try {
ClassLoader loader = manager.getClass().getClassLoader();
@ -699,6 +715,7 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok
Class eventPriority = loader.loadClass("org.bukkit.event.Event$Priority");
// Get the priority
Object priorityLowest = Enum.valueOf(eventPriority, "Lowest");
Object priorityMonitor = Enum.valueOf(eventPriority, "Monitor");
// Get event types
@ -714,26 +731,40 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok
Method registerEvent = FuzzyReflection.fromObject(manager).getMethodByParameters("registerEvent",
eventTypes, Listener.class, eventPriority, Plugin.class);
Enhancer playerLow = new Enhancer();
Enhancer playerEx = new Enhancer();
Enhancer serverEx = new Enhancer();
playerEx.setSuperclass(playerListener);
playerEx.setClassLoader(classLoader);
playerEx.setCallback(new MethodInterceptor() {
playerLow.setSuperclass(playerListener);
playerLow.setClassLoader(classLoader);
playerLow.setCallback(new MethodInterceptor() {
@Override
public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
// Must have a parameter
if (args.length == 1) {
Object event = args[0];
if (event instanceof PlayerJoinEvent) {
onPrePlayerJoin((PlayerJoinEvent) event);
}
}
return null;
}
});
playerEx.setSuperclass(playerListener);
playerEx.setClassLoader(classLoader);
playerEx.setCallback(new MethodInterceptor() {
@Override
public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
if (args.length == 1) {
Object event = args[0];
// Check for the correct event
if (event instanceof PlayerJoinEvent) {
Player player = ((PlayerJoinEvent) event).getPlayer();
playerInjection.injectPlayer(player);
onPlayerJoin((PlayerJoinEvent) event);
} else if (event instanceof PlayerQuitEvent) {
Player player = ((PlayerQuitEvent) event).getPlayer();
playerInjection.handleDisconnect(player);
playerInjection.uninjectPlayer(player);
onPlayerQuit((PlayerQuitEvent) event);
}
}
return null;
@ -751,16 +782,18 @@ public final class PacketFilterManager implements ProtocolManager, ListenerInvok
Object event = args[0];
if (event instanceof PluginDisableEvent)
removePacketListeners(((PluginDisableEvent) event).getPlugin());
onPluginDisabled((PluginDisableEvent) event, plugin);
}
return null;
}
});
// Create our listener
Object playerProxyLow = playerLow.create();
Object playerProxy = playerEx.create();
Object serverProxy = serverEx.create();
registerEvent.invoke(manager, playerJoinType, playerProxyLow, priorityLowest, plugin);
registerEvent.invoke(manager, playerJoinType, playerProxy, priorityMonitor, plugin);
registerEvent.invoke(manager, playerQuitType, playerProxy, priorityMonitor, plugin);
registerEvent.invoke(manager, pluginDisabledType, serverProxy, priorityMonitor, plugin);