Use the normal packet listener instead of an async listener.

This commit is contained in:
Kristian S. Stangeland 2012-09-29 19:13:12 +02:00
parent 6f02e79802
commit 285952b14d
9 changed files with 230 additions and 124 deletions

View File

@ -7,6 +7,7 @@ import org.bukkit.plugin.Plugin;
import com.comphenix.protocol.PacketStream;
import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.events.PacketListener;
/**
* Represents a filter manager for asynchronous packets.
@ -36,7 +37,7 @@ public class AsyncFilterManager {
this.mainThread = Thread.currentThread();
}
public ListenerToken registerAsyncHandler(Plugin plugin, AsyncListener listener) {
public ListenerToken registerAsyncHandler(Plugin plugin, PacketListener listener) {
ListenerToken token = new ListenerToken(plugin, mainThread, this, listener);
processingQueue.addListener(token, listener.getSendingWhitelist());
@ -56,15 +57,33 @@ public class AsyncFilterManager {
processingQueue.removeListener(listenerToken, listenerToken.getAsyncListener().getSendingWhitelist());
}
public void enqueueSyncPacket(PacketEvent syncPacket, int sendingDelta, long timeoutDelta) {
AsyncPacket asyncPacket = new AsyncPacket(packetStream, syncPacket,
currentSendingIndex.getAndIncrement() + sendingDelta,
System.currentTimeMillis(),
timeoutDelta);
/**
* Enqueue a packet for asynchronous processing.
* @param syncPacket - synchronous packet event.
* @param asyncMarker - the asynchronous marker to use.
*/
public void enqueueSyncPacket(PacketEvent syncPacket, AsyncPacket asyncMarker) {
PacketEvent newEvent = PacketEvent.fromSynchronous(syncPacket, asyncMarker);
// Start the process
sendingQueue.enqueue(asyncPacket);
processingQueue.enqueuePacket(asyncPacket);
sendingQueue.enqueue(newEvent);
processingQueue.enqueuePacket(newEvent);
}
/**
* Construct an async marker with the given sending priority delta and timeout delta.
* @param sendingDelta - how many packets we're willing to wait.
* @param timeoutDelta - how long (in ms) until the packet expire.
* @return An async marker.
*/
public AsyncPacket createAsyncMarker(long sendingDelta, long timeoutDelta) {
return createAsyncMarker(sendingDelta, timeoutDelta,
currentSendingIndex.incrementAndGet(), System.currentTimeMillis());
}
// Helper method
private AsyncPacket createAsyncMarker(long sendingDelta, long timeoutDelta, long sendingIndex, long currentTime) {
return new AsyncPacket(packetStream, sendingIndex, sendingDelta, System.currentTimeMillis(), timeoutDelta);
}
public PacketStream getPacketStream() {
@ -83,9 +102,11 @@ public class AsyncFilterManager {
return sendingQueue;
}
/**
* Remove listeners, close threads and transmit every delayed packet.
*/
public void cleanupAll() {
// Remove all listeners
// We don't necessarily remove packets, as this might be a part of a server reload
processingQueue.cleanupAll();
sendingQueue.cleanupAll();
}
}

View File

@ -1,17 +0,0 @@
package com.comphenix.protocol.async;
import org.bukkit.plugin.Plugin;
import com.comphenix.protocol.events.ListeningWhitelist;
public interface AsyncListener {
public void onAsyncPacket(AsyncPacket packet);
public ListeningWhitelist getSendingWhitelist();
/**
* Retrieve the plugin that created this async packet listener.
* @return The plugin, or NULL if not available.
*/
public Plugin getPlugin();
}

View File

@ -1,5 +1,6 @@
package com.comphenix.protocol.async;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.util.Iterator;
@ -15,12 +16,7 @@ import com.google.common.primitives.Longs;
* @author Kristian
*/
public class AsyncPacket implements Serializable, Comparable<AsyncPacket> {
/**
* Signal an end to the packet processing.
*/
static final AsyncPacket INTERUPT_PACKET = new AsyncPacket();
/**
* Generated by Eclipse.
*/
@ -31,11 +27,6 @@ public class AsyncPacket implements Serializable, Comparable<AsyncPacket> {
*/
public static final int DEFAULT_TIMEOUT_DELTA = 60000;
/**
* The original synchronized packet.
*/
private PacketEvent packetEvent;
/**
* The packet stream responsible for transmitting the packet when it's done processing.
*/
@ -56,33 +47,16 @@ public class AsyncPacket implements Serializable, Comparable<AsyncPacket> {
// Whether or not the packet has been processed by the listeners
private volatile boolean processed;
private AsyncPacket() {
// Used by the poision pill pattern
}
/**
* Determine whether or not this is a signal for the async listener to interrupt processing.
* @return Interrupt packet processing.
*/
boolean isInteruptPacket() {
// This is only possble if we're dealing with the poision pill packet
return packetEvent == null || packetStream == null;
}
/**
* Create a container for asyncronous packets.
* @param packetEvent - the synchronous packet event.
* @param initialTime - the current time in milliseconds since 01.01.1970 00:00.
*/
public AsyncPacket(PacketStream packetStream, PacketEvent packetEvent, long sendingIndex, long initialTime, long timeoutDelta) {
if (packetEvent == null)
throw new IllegalArgumentException("packetEvent cannot be NULL");
AsyncPacket(PacketStream packetStream, long sendingIndex, long sendingDelta, long initialTime, long timeoutDelta) {
if (packetStream == null)
throw new IllegalArgumentException("packetStream cannot be NULL");
this.packetStream = packetStream;
this.packetEvent = packetEvent;
// Timeout
this.initialTime = initialTime;
@ -137,22 +111,6 @@ public class AsyncPacket implements Serializable, Comparable<AsyncPacket> {
this.newSendingIndex = newSendingIndex;
}
/**
* Retrieve the original synchronous packet event.
* @return The original packet event.
*/
public PacketEvent getPacketEvent() {
return packetEvent;
}
/**
* Retrieve the packet ID of the underlying packet.
* @return Packet ID.
*/
public int getPacketID() {
return packetEvent.getPacketID();
}
/**
* Retrieve the packet stream responsible for transmitting this packet.
* @return The packet stream.
@ -173,7 +131,7 @@ public class AsyncPacket implements Serializable, Comparable<AsyncPacket> {
* Retrieve whether or not this packet has been processed by the async listeners.
* @return TRUE if it has been processed, FALSE otherwise.
*/
boolean isProcessed() {
public boolean isProcessed() {
return processed;
}
@ -189,21 +147,9 @@ public class AsyncPacket implements Serializable, Comparable<AsyncPacket> {
* Retrieve iterator for the next listener in line.
* @return Next async packet listener iterator.
*/
Iterator<PrioritizedListener<ListenerToken>> getListenerTraversal() {
public Iterator<PrioritizedListener<ListenerToken>> getListenerTraversal() {
return listenerTraversal;
}
/**
* We're done processing. Send the packet.
*/
void sendPacket() {
try {
// We only support server packets at this stage
packetStream.sendServerPacket(packetEvent.getPlayer(), packetEvent.getPacket(), false);
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
/**
* Set the iterator for the next listener.
@ -212,7 +158,26 @@ public class AsyncPacket implements Serializable, Comparable<AsyncPacket> {
void setListenerTraversal(Iterator<PrioritizedListener<ListenerToken>> listenerTraversal) {
this.listenerTraversal = listenerTraversal;
}
/**
* Transmit a given packet to the current packet stream.
* @param event - the packet to send.
* @throws IOException If the packet couldn't be sent.
*/
public void sendPacket(PacketEvent event) throws IOException {
try {
if (event.isServerPacket()) {
packetStream.sendServerPacket(event.getPlayer(), event.getPacket(), false);
} else {
packetStream.recieveClientPacket(event.getPlayer(), event.getPacket(), false);
}
} catch (InvocationTargetException e) {
throw new IOException("Cannot send packet", e);
} catch (IllegalAccessException e) {
throw new IOException("Cannot send packet", e);
}
}
@Override
public int compareTo(AsyncPacket o) {
if (o == null)

View File

@ -5,8 +5,16 @@ import java.util.logging.Level;
import org.bukkit.plugin.Plugin;
public class ListenerToken {
import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.events.PacketListener;
class ListenerToken {
/**
* Signal an end to the packet processing.
*/
private static final PacketEvent INTERUPT_PACKET = new PacketEvent(new Object());
// Default queue capacity
private static int DEFAULT_CAPACITY = 1024;
@ -14,7 +22,7 @@ public class ListenerToken {
private volatile boolean cancelled;
// The packet listener
private AsyncListener listener;
private PacketListener listener;
// The original plugin
private Plugin plugin;
@ -23,12 +31,12 @@ public class ListenerToken {
private AsyncFilterManager filterManager;
// List of queued packets
private ArrayBlockingQueue<AsyncPacket> queuedPackets = new ArrayBlockingQueue<AsyncPacket>(DEFAULT_CAPACITY);
private ArrayBlockingQueue<PacketEvent> queuedPackets = new ArrayBlockingQueue<PacketEvent>(DEFAULT_CAPACITY);
// Minecraft main thread
private Thread mainThread;
public ListenerToken(Plugin plugin, Thread mainThread, AsyncFilterManager filterManager, AsyncListener listener) {
public ListenerToken(Plugin plugin, Thread mainThread, AsyncFilterManager filterManager, PacketListener listener) {
if (filterManager == null)
throw new IllegalArgumentException("filterManager cannot be NULL");
if (listener == null)
@ -44,7 +52,7 @@ public class ListenerToken {
return cancelled;
}
public AsyncListener getAsyncListener() {
public PacketListener getAsyncListener() {
return listener;
}
@ -57,7 +65,7 @@ public class ListenerToken {
// Poison Pill Shutdown
queuedPackets.clear();
queuedPackets.add(AsyncPacket.INTERUPT_PACKET);
queuedPackets.add(INTERUPT_PACKET);
}
/**
@ -65,7 +73,7 @@ public class ListenerToken {
* @param packet - a packet for processing.
* @throws IllegalStateException If the underlying packet queue is full.
*/
public void enqueuePacket(AsyncPacket packet) {
public void enqueuePacket(PacketEvent packet) {
if (packet == null)
throw new IllegalArgumentException("packet is NULL");
@ -86,16 +94,21 @@ public class ListenerToken {
try {
mainLoop:
while (!cancelled) {
AsyncPacket packet = queuedPackets.take();
PacketEvent packet = queuedPackets.take();
AsyncPacket marker = packet.getAsyncMarker();
// Handle cancel requests
if (packet == null || packet.isInteruptPacket()) {
if (packet == null || marker == null || !packet.isAsynchronous()) {
break;
}
// Here's the core of the asynchronous processing
try {
listener.onAsyncPacket(packet);
if (packet.isServerPacket())
listener.onPacketSending(packet);
else
listener.onPacketReceiving(packet);
} catch (Throwable e) {
// Minecraft doesn't want your Exception.
filterManager.getLogger().log(Level.SEVERE,
@ -103,8 +116,8 @@ public class ListenerToken {
}
// Now, get the next non-cancelled listener
for (; packet.getListenerTraversal().hasNext(); ) {
ListenerToken token = packet.getListenerTraversal().next().getListener();
for (; marker.getListenerTraversal().hasNext(); ) {
ListenerToken token = marker.getListenerTraversal().next().getListener();
if (!token.isCancelled()) {
token.enqueuePacket(packet);

View File

@ -6,6 +6,7 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Semaphore;
import com.comphenix.protocol.concurrency.AbstractConcurrentListenerMultimap;
import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.injector.PrioritizedListener;
/**
@ -32,7 +33,7 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap<ListenerT
private Semaphore concurrentProcessing;
// Queued packets for being processed
private ArrayBlockingQueue<AsyncPacket> processingQueue;
private ArrayBlockingQueue<PacketEvent> processingQueue;
// Packets for sending
private PacketSendingQueue sendingQueue;
@ -43,7 +44,7 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap<ListenerT
public PacketProcessingQueue(PacketSendingQueue sendingQueue, int queueLimit, int maximumConcurrency) {
super();
this.processingQueue = new ArrayBlockingQueue<AsyncPacket>(queueLimit);
this.processingQueue = new ArrayBlockingQueue<PacketEvent>(queueLimit);
this.maximumConcurrency = maximumConcurrency;
this.concurrentProcessing = new Semaphore(maximumConcurrency);
this.sendingQueue = sendingQueue;
@ -54,7 +55,7 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap<ListenerT
* @param packet - packet to process.
* @return TRUE if we sucessfully queued the packet, FALSE if the queue ran out if space.
*/
public boolean enqueuePacket(AsyncPacket packet) {
public boolean enqueuePacket(PacketEvent packet) {
try {
processingQueue.add(packet);
@ -71,17 +72,18 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap<ListenerT
*/
public void signalBeginProcessing() {
while (concurrentProcessing.tryAcquire()) {
AsyncPacket packet = processingQueue.poll();
PacketEvent packet = processingQueue.poll();
// Any packet queued?
if (packet != null) {
Collection<PrioritizedListener<ListenerToken>> list = getListener(packet.getPacketID());
AsyncPacket marker = packet.getAsyncMarker();
if (list != null) {
Iterator<PrioritizedListener<ListenerToken>> iterator = list.iterator();
if (iterator.hasNext()) {
packet.setListenerTraversal(iterator);
marker.setListenerTraversal(iterator);
iterator.next().getListener().enqueuePacket(packet);
continue;
}
@ -113,7 +115,15 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap<ListenerT
return maximumConcurrency;
}
public void removeListeners() {
for (PrioritizedListener<ListenerToken> token : )
public void cleanupAll() {
// Cancel all the threads and every listener
for (PrioritizedListener<ListenerToken> token : values()) {
if (token != null) {
token.getListener().cancel();
}
}
// Remove the rest, just in case
clearListeners();
}
}

View File

@ -1,41 +1,85 @@
package com.comphenix.protocol.async;
import java.io.IOException;
import java.util.concurrent.PriorityBlockingQueue;
import com.comphenix.protocol.events.PacketEvent;
/**
* Represents packets ready to be transmitted to a client.
* @author Kristian
*/
class PacketSendingQueue {
private PriorityBlockingQueue<AsyncPacket> sendingQueue;
private PriorityBlockingQueue<PacketEvent> sendingQueue;
/**
* Enqueue a packet for sending.
* @param packet
*/
public void enqueue(AsyncPacket packet) {
public void enqueue(PacketEvent packet) {
sendingQueue.add(packet);
}
/**
* Invoked when one of the packets have finished processing.
*/
public synchronized void signalPacketUpdate(AsyncPacket packetUpdated) {
public synchronized void signalPacketUpdate(PacketEvent packetUpdated) {
// Mark this packet as finished
packetUpdated.setProcessed(true);
// Transmit as many packets as we can
packetUpdated.getAsyncMarker().setProcessed(true);
signalPacketUpdates();
}
/**
* Send every packet, regardless of the processing state.
*/
public synchronized void forceSend() {
while (true) {
AsyncPacket current = sendingQueue.peek();
PacketEvent current = sendingQueue.poll();
if (current != null && current.isProcessed()) {
current.sendPacket();
sendingQueue.poll();
if (current != null) {
// Just print the error
try {
current.getAsyncMarker().sendPacket(current);
} catch (IOException e) {
e.printStackTrace();
}
} else {
break;
}
}
}
/**
* Invoked when potentially every packet is finished.
*/
private void signalPacketUpdates() {
// Transmit as many packets as we can
while (true) {
PacketEvent current = sendingQueue.peek();
if (current != null && current.getAsyncMarker().isProcessed()) {
// Just print the error
try {
current.getAsyncMarker().sendPacket(current);
} catch (IOException e) {
e.printStackTrace();
}
sendingQueue.poll();
} else {
break;
}
}
// And we're done
}
/**
* Automatically transmits every delayed packet.
*/
public void cleanupAll() {
forceSend();
}
}

View File

@ -8,6 +8,7 @@ import java.util.concurrent.ConcurrentMap;
import com.comphenix.protocol.events.ListeningWhitelist;
import com.comphenix.protocol.injector.PrioritizedListener;
import com.google.common.collect.Iterables;
/**
* A thread-safe implementation of a listener multimap.
@ -103,4 +104,19 @@ public abstract class AbstractConcurrentListenerMultimap<TListener> {
public Collection<PrioritizedListener<TListener>> getListener(int packetID) {
return listeners.get(packetID);
}
/**
* Retrieve every listener.
* @return Every listener.
*/
protected Iterable<PrioritizedListener<TListener>> values() {
return Iterables.concat(listeners.values());
}
/**
* Remove all packet listeners.
*/
protected void clearListeners() {
listeners.clear();
}
}

View File

@ -25,6 +25,8 @@ import java.util.EventObject;
import org.bukkit.entity.Player;
import org.bukkit.event.Cancellable;
import com.comphenix.protocol.async.AsyncPacket;
public class PacketEvent extends EventObject implements Cancellable {
/**
* Automatically generated by Eclipse.
@ -35,7 +37,10 @@ public class PacketEvent extends EventObject implements Cancellable {
private PacketContainer packet;
private boolean serverPacket;
private boolean cancel;
private AsyncPacket asyncMarker;
private boolean asynchronous;
/**
* Use the static constructors to create instances of this event.
* @param source - the event source.
@ -50,6 +55,15 @@ public class PacketEvent extends EventObject implements Cancellable {
this.player = player;
this.serverPacket = serverPacket;
}
private PacketEvent(PacketEvent origial, AsyncPacket asyncMarker) {
super(origial.source);
this.packet = origial.packet;
this.player = origial.player;
this.serverPacket = origial.serverPacket;
this.asyncMarker = asyncMarker;
this.asynchronous = true;
}
/**
* Creates an event representing a client packet transmission.
@ -73,6 +87,16 @@ public class PacketEvent extends EventObject implements Cancellable {
return new PacketEvent(source, packet, recipient, true);
}
/**
* Create an asynchronous packet event from a synchronous event and a async marker.
* @param event - the original synchronous event.
* @param marker - the asynchronous marker.
* @return The new packet event.
*/
public static PacketEvent fromSynchronous(PacketEvent event, AsyncPacket marker) {
return new PacketEvent(event, marker);
}
/**
* Retrieves the packet that will be sent to the player.
* @return Packet to send to the player.
@ -129,6 +153,36 @@ public class PacketEvent extends EventObject implements Cancellable {
return serverPacket;
}
/**
* Retrieve the asynchronous marker.
* <p>
* If the packet is synchronous, this marker will be used to schedule an asynchronous event. In this
* asynchronous event, the marker is used to correctly pass the packet around to the different threads.
* @return The current asynchronous marker.
*/
public AsyncPacket getAsyncMarker() {
return asyncMarker;
}
/**
* Set the asynchronous marker.
* <p>
* If the marker is non-null at the end of an synchronous event processing, the packet will be scheduled
* to be processed asynchronously with the given settings.
* @param asyncMarker - the new asynchronous marker.
*/
public void setAsyncMarker(AsyncPacket asyncMarker) {
this.asyncMarker = asyncMarker;
}
/**
* Determine if the packet event has been executed asynchronously or not.
* @return TRUE if this packet event is asynchronous, FALSE otherwise.
*/
public boolean isAsynchronous() {
return asynchronous;
}
private void writeObject(ObjectOutputStream output) throws IOException {
// Default serialization
output.defaultWriteObject();

View File

@ -668,10 +668,7 @@ public final class PacketFilterManager implements ProtocolManager {
for (PlayerInjector injection : playerInjection.values()) {
injection.cleanupAll();
}
// Clean up async handlers
asyncFilterManager.cleanupAll();
// Remove packet handlers
if (packetInjector != null)
packetInjector.cleanupAll();
@ -681,6 +678,9 @@ public final class PacketFilterManager implements ProtocolManager {
playerInjection.clear();
connectionLookup.clear();
hasClosed = true;
// Clean up async handlers. We have to do this last.
asyncFilterManager.cleanupAll();
}
@Override