Renamed the async marker. Added handling of close.

This commit is contained in:
Kristian S. Stangeland 2012-09-29 19:33:22 +02:00
parent 285952b14d
commit 81321383d5
6 changed files with 57 additions and 44 deletions

View File

@ -62,7 +62,7 @@ public class AsyncFilterManager {
* @param syncPacket - synchronous packet event.
* @param asyncMarker - the asynchronous marker to use.
*/
public void enqueueSyncPacket(PacketEvent syncPacket, AsyncPacket asyncMarker) {
public void enqueueSyncPacket(PacketEvent syncPacket, AsyncMarker asyncMarker) {
PacketEvent newEvent = PacketEvent.fromSynchronous(syncPacket, asyncMarker);
// Start the process
@ -76,14 +76,14 @@ public class AsyncFilterManager {
* @param timeoutDelta - how long (in ms) until the packet expire.
* @return An async marker.
*/
public AsyncPacket createAsyncMarker(long sendingDelta, long timeoutDelta) {
public AsyncMarker createAsyncMarker(long sendingDelta, long timeoutDelta) {
return createAsyncMarker(sendingDelta, timeoutDelta,
currentSendingIndex.incrementAndGet(), System.currentTimeMillis());
}
// Helper method
private AsyncPacket createAsyncMarker(long sendingDelta, long timeoutDelta, long sendingIndex, long currentTime) {
return new AsyncPacket(packetStream, sendingIndex, sendingDelta, System.currentTimeMillis(), timeoutDelta);
private AsyncMarker createAsyncMarker(long sendingDelta, long timeoutDelta, long sendingIndex, long currentTime) {
return new AsyncMarker(packetStream, sendingIndex, sendingDelta, System.currentTimeMillis(), timeoutDelta);
}
public PacketStream getPacketStream() {

View File

@ -11,11 +11,11 @@ import com.comphenix.protocol.injector.PrioritizedListener;
import com.google.common.primitives.Longs;
/**
* Represents a packet that is being processed by asynchronous listeners.
* Contains information about the packet that is being processed by asynchronous listeners.
*
* @author Kristian
*/
public class AsyncPacket implements Serializable, Comparable<AsyncPacket> {
public class AsyncMarker implements Serializable, Comparable<AsyncMarker> {
/**
* Generated by Eclipse.
@ -47,12 +47,15 @@ public class AsyncPacket implements Serializable, Comparable<AsyncPacket> {
// Whether or not the packet has been processed by the listeners
private volatile boolean processed;
// Whethre or not the packet has been sent
private volatile boolean transmitted;
/**
* Create a container for asyncronous packets.
* @param initialTime - the current time in milliseconds since 01.01.1970 00:00.
*/
AsyncPacket(PacketStream packetStream, long sendingIndex, long sendingDelta, long initialTime, long timeoutDelta) {
AsyncMarker(PacketStream packetStream, long sendingIndex, long sendingDelta, long initialTime, long timeoutDelta) {
if (packetStream == null)
throw new IllegalArgumentException("packetStream cannot be NULL");
@ -143,6 +146,14 @@ public class AsyncPacket implements Serializable, Comparable<AsyncPacket> {
this.processed = processed;
}
/**
* Retrieve whether or not this packet has already been sent.
* @return TRUE if it has been sent before, FALSE otherwise.
*/
public boolean isTransmitted() {
return transmitted;
}
/**
* Retrieve iterator for the next listener in line.
* @return Next async packet listener iterator.
@ -171,6 +182,8 @@ public class AsyncPacket implements Serializable, Comparable<AsyncPacket> {
} else {
packetStream.recieveClientPacket(event.getPlayer(), event.getPacket(), false);
}
transmitted = true;
} catch (InvocationTargetException e) {
throw new IOException("Cannot send packet", e);
} catch (IllegalAccessException e) {
@ -179,7 +192,7 @@ public class AsyncPacket implements Serializable, Comparable<AsyncPacket> {
}
@Override
public int compareTo(AsyncPacket o) {
public int compareTo(AsyncMarker o) {
if (o == null)
return 1;
else

View File

@ -95,7 +95,7 @@ class ListenerToken {
mainLoop:
while (!cancelled) {
PacketEvent packet = queuedPackets.take();
AsyncPacket marker = packet.getAsyncMarker();
AsyncMarker marker = packet.getAsyncMarker();
// Handle cancel requests
if (packet == null || marker == null || !packet.isAsynchronous()) {

View File

@ -77,7 +77,7 @@ class PacketProcessingQueue extends AbstractConcurrentListenerMultimap<ListenerT
// Any packet queued?
if (packet != null) {
Collection<PrioritizedListener<ListenerToken>> list = getListener(packet.getPacketID());
AsyncPacket marker = packet.getAsyncMarker();
AsyncMarker marker = packet.getAsyncMarker();
if (list != null) {
Iterator<PrioritizedListener<ListenerToken>> iterator = list.iterator();

View File

@ -27,7 +27,18 @@ class PacketSendingQueue {
public synchronized void signalPacketUpdate(PacketEvent packetUpdated) {
// Mark this packet as finished
packetUpdated.getAsyncMarker().setProcessed(true);
signalPacketUpdates();
// Transmit as many packets as we can
while (true) {
PacketEvent current = sendingQueue.peek();
if (current != null && current.getAsyncMarker().isProcessed()) {
sendPacket(current);
sendingQueue.poll();
} else {
break;
}
}
}
/**
@ -38,12 +49,7 @@ class PacketSendingQueue {
PacketEvent current = sendingQueue.poll();
if (current != null) {
// Just print the error
try {
current.getAsyncMarker().sendPacket(current);
} catch (IOException e) {
e.printStackTrace();
}
sendPacket(current);
} else {
break;
}
@ -51,29 +57,23 @@ class PacketSendingQueue {
}
/**
* Invoked when potentially every packet is finished.
* Transmit a packet, if it hasn't already.
* @param event - the packet to transmit.
*/
private void signalPacketUpdates() {
// Transmit as many packets as we can
while (true) {
PacketEvent current = sendingQueue.peek();
if (current != null && current.getAsyncMarker().isProcessed()) {
// Just print the error
try {
current.getAsyncMarker().sendPacket(current);
} catch (IOException e) {
e.printStackTrace();
}
sendingQueue.poll();
} else {
break;
}
}
private void sendPacket(PacketEvent event) {
// And we're done
AsyncMarker marker = event.getAsyncMarker();
try {
// Don't send a packet twice
if (marker != null && !marker.isTransmitted()) {
marker.sendPacket(event);
}
} catch (IOException e) {
// Just print the error
e.printStackTrace();
}
}
/**

View File

@ -25,7 +25,7 @@ import java.util.EventObject;
import org.bukkit.entity.Player;
import org.bukkit.event.Cancellable;
import com.comphenix.protocol.async.AsyncPacket;
import com.comphenix.protocol.async.AsyncMarker;
public class PacketEvent extends EventObject implements Cancellable {
/**
@ -38,7 +38,7 @@ public class PacketEvent extends EventObject implements Cancellable {
private boolean serverPacket;
private boolean cancel;
private AsyncPacket asyncMarker;
private AsyncMarker asyncMarker;
private boolean asynchronous;
/**
@ -56,7 +56,7 @@ public class PacketEvent extends EventObject implements Cancellable {
this.serverPacket = serverPacket;
}
private PacketEvent(PacketEvent origial, AsyncPacket asyncMarker) {
private PacketEvent(PacketEvent origial, AsyncMarker asyncMarker) {
super(origial.source);
this.packet = origial.packet;
this.player = origial.player;
@ -93,7 +93,7 @@ public class PacketEvent extends EventObject implements Cancellable {
* @param marker - the asynchronous marker.
* @return The new packet event.
*/
public static PacketEvent fromSynchronous(PacketEvent event, AsyncPacket marker) {
public static PacketEvent fromSynchronous(PacketEvent event, AsyncMarker marker) {
return new PacketEvent(event, marker);
}
@ -160,7 +160,7 @@ public class PacketEvent extends EventObject implements Cancellable {
* asynchronous event, the marker is used to correctly pass the packet around to the different threads.
* @return The current asynchronous marker.
*/
public AsyncPacket getAsyncMarker() {
public AsyncMarker getAsyncMarker() {
return asyncMarker;
}
@ -171,7 +171,7 @@ public class PacketEvent extends EventObject implements Cancellable {
* to be processed asynchronously with the given settings.
* @param asyncMarker - the new asynchronous marker.
*/
public void setAsyncMarker(AsyncPacket asyncMarker) {
public void setAsyncMarker(AsyncMarker asyncMarker) {
this.asyncMarker = asyncMarker;
}