Honor the sending index when the packet has finished processing.

This commit is contained in:
Kristian S. Stangeland 2012-10-10 05:42:45 +02:00
parent 7ed0bc82dd
commit cf68d229b0
7 changed files with 174 additions and 93 deletions

View File

@ -180,14 +180,18 @@ public class AsyncFilterManager implements AsynchronousManager {
* @param syncPacket - synchronous packet event.
* @param asyncMarker - the asynchronous marker to use.
*/
public void enqueueSyncPacket(PacketEvent syncPacket, AsyncMarker asyncMarker) {
public synchronized void enqueueSyncPacket(PacketEvent syncPacket, AsyncMarker asyncMarker) {
PacketEvent newEvent = PacketEvent.fromSynchronous(syncPacket, asyncMarker);
if (asyncMarker.isQueued() || asyncMarker.isTransmitted())
throw new IllegalArgumentException("Cannot queue a packet that has already been queued.");
// Start the process
getSendingQueue(syncPacket).enqueue(newEvent);
// We know this is occuring on the main thread, so pass TRUE
getProcessingQueue(syncPacket).enqueue(newEvent, true);
asyncMarker.setQueuedSendingIndex(asyncMarker.getNewSendingIndex());
}
@Override
@ -267,12 +271,16 @@ public class AsyncFilterManager implements AsynchronousManager {
* @param onMainThread - whether or not this method was run by the main thread.
*/
private void signalPacketTransmission(PacketEvent packet, boolean onMainThread) {
if (packet.getAsyncMarker() == null)
AsyncMarker marker = packet.getAsyncMarker();
if (marker == null)
throw new IllegalArgumentException(
"A sync packet cannot be transmitted by the asynchronous manager.");
if (!marker.isQueued())
throw new IllegalArgumentException(
"A packet must have been queued before it can be transmitted.");
// Only send if the packet is ready
if (packet.getAsyncMarker().decrementProcessingDelay() == 0) {
if (marker.decrementProcessingDelay() == 0) {
getSendingQueue(packet).signalPacketUpdate(packet, onMainThread);
}
}

View File

@ -59,18 +59,21 @@ public class AsyncMarker implements Serializable, Comparable<AsyncMarker> {
private long originalSendingIndex;
private long newSendingIndex;
// Used to determine if a packet must be reordered in the sending queue
private Long queuedSendingIndex;
// Whether or not the packet has been processed by the listeners
private volatile boolean processed;
// Whether or not to delay processing
private AtomicInteger processingDelay = new AtomicInteger();
// Whether or not the packet has been sent
private volatile boolean transmitted;
// Whether or not the asynchronous processing itself should be cancelled
private volatile boolean asyncCancelled;
// Whether or not to delay processing
private AtomicInteger processingDelay = new AtomicInteger();
// Used to synchronize processing on the shared PacketEvent
private Object processingLock = new Object();
@ -189,7 +192,8 @@ public class AsyncMarker implements Serializable, Comparable<AsyncMarker> {
* Increment the number of times this packet must be signalled as done before its transmitted.
* <p>
* This is useful if an asynchronous listener is waiting for further information before the
* packet can be sent to the user. A packet listener <b>MUST</b> eventually call signalPacketUpdate,
* packet can be sent to the user. A packet listener <b>MUST</b> eventually call
* {@link AsyncFilterManager#signalPacketTransmission(PacketEvent)},
* even if the packet is cancelled, after this method is called.
* <p>
* It is recommended that processing outside a packet listener is wrapped in a synchronized block
@ -219,6 +223,30 @@ public class AsyncMarker implements Serializable, Comparable<AsyncMarker> {
return processingDelay.get();
}
/**
* Whether or not this packet is or has been queued for processing.
* @return TRUE if it has, FALSE otherwise.
*/
public boolean isQueued() {
return queuedSendingIndex != null;
}
/**
* Retrieve the sending index when the packet was queued.
* @return Queued sending index.
*/
public long getQueuedSendingIndex() {
return queuedSendingIndex != null ? queuedSendingIndex : 0;
}
/**
* Set the sending index when the packet was queued.
* @param queuedSendingIndex - sending index.
*/
void setQueuedSendingIndex(Long queuedSendingIndex) {
this.queuedSendingIndex = queuedSendingIndex;
}
/**
* Processing lock used to synchronize access to the parent PacketEvent and PacketContainer.
* <p>
@ -269,6 +297,10 @@ public class AsyncMarker implements Serializable, Comparable<AsyncMarker> {
/**
* Set whether or not the asynchronous handling should be cancelled.
* <p>
* This is only relevant during the synchronous processing. Asynchronous
* listeners should use the normal cancel-field to cancel a PacketEvent.
*
* @param asyncCancelled - TRUE to cancel it, FALSE otherwise.
*/
public void setAsyncCancelled(boolean asyncCancelled) {

View File

@ -12,13 +12,17 @@ import com.google.common.collect.ComparisonChain;
class PacketEventHolder implements Comparable<PacketEventHolder> {
private PacketEvent event;
private long sendingIndex = 0;
/**
* A wrapper that ensures the packet event is ordered by sending index.
* @param event - packet event to wrap.
*/
public PacketEventHolder(PacketEvent event) {
this.event = Preconditions.checkNotNull(event, "Event must be non-null");
if (event.getAsyncMarker() != null)
this.sendingIndex = event.getAsyncMarker().getNewSendingIndex();
}
/**
@ -31,10 +35,8 @@ class PacketEventHolder implements Comparable<PacketEventHolder> {
@Override
public int compareTo(PacketEventHolder other) {
AsyncMarker marker = other != null ? other.getEvent().getAsyncMarker() : null;
return ComparisonChain.start().
compare(event.getAsyncMarker(), marker).
compare(sendingIndex, other.sendingIndex).
result();
}
}

View File

@ -55,8 +55,22 @@ class PacketSendingQueue {
* @param onMainThread - whether or not this is occuring on the main thread.
*/
public synchronized void signalPacketUpdate(PacketEvent packetUpdated, boolean onMainThread) {
AsyncMarker marker = packetUpdated.getAsyncMarker();
// Should we reorder the event?
if (marker.getQueuedSendingIndex() != marker.getNewSendingIndex()) {
PacketEvent copy = PacketEvent.fromSynchronous(packetUpdated, marker);
// "Cancel" the original event
packetUpdated.setCancelled(true);
// Enqueue the copy with the new sending index
enqueue(copy);
}
// Mark this packet as finished
packetUpdated.getAsyncMarker().setProcessed(true);
marker.setProcessed(true);
trySendPackets(onMainThread);
}

View File

@ -0,0 +1,96 @@
package com.comphenix.protocol.injector;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.comphenix.protocol.injector.PacketConstructor.Unwrapper;
import com.comphenix.protocol.reflect.instances.DefaultInstances;
/**
* Represents an object capable of converting wrapped Bukkit objects into NMS objects.
* <p>
* Typical conversions include:
* <ul>
* <li>org.bukkit.entity.Player -> net.minecraft.server.EntityPlayer</li>
* <li>org.bukkit.World -> net.minecraft.server.WorldServer</li>
* </ul>
*
* @author Kristian
*/
public class BukkitUnwrapper implements Unwrapper {
private static Map<Class<?>, Method> cache = new ConcurrentHashMap<Class<?>, Method>();
@SuppressWarnings("unchecked")
@Override
public Object unwrapItem(Object wrappedObject) {
// Special case
if (wrappedObject instanceof Collection) {
return handleCollection((Collection<Object>) wrappedObject);
}
Class<?> currentClass = wrappedObject.getClass();
Method cachedMethod = initializeCache(currentClass);
try {
// Retrieve the handle
if (cachedMethod != null)
return cachedMethod.invoke(wrappedObject);
else
return null;
} catch (IllegalArgumentException e) {
// Impossible
return null;
} catch (IllegalAccessException e) {
return null;
} catch (InvocationTargetException e) {
// This is REALLY bad
throw new RuntimeException("Minecraft error.", e);
}
}
private Object handleCollection(Collection<Object> wrappedObject) {
@SuppressWarnings("unchecked")
Collection<Object> copy = DefaultInstances.DEFAULT.getDefault(wrappedObject.getClass());
if (copy != null) {
// Unwrap every element
for (Object element : wrappedObject) {
copy.add(unwrapItem(element));
}
return copy;
} else {
// Impossible
return null;
}
}
private Method initializeCache(Class<?> type) {
// See if we're already determined this
if (cache.containsKey(type)) {
// We will never remove from the cache, so this ought to be thread safe
return cache.get(type);
}
try {
Method find = type.getMethod("getHandle");
// It's thread safe, as getMethod should return the same handle
cache.put(type, find);
return find;
} catch (SecurityException e) {
return null;
} catch (NoSuchMethodException e) {
return null;
}
}
}

View File

@ -15,7 +15,6 @@ import org.bukkit.craftbukkit.CraftWorld;
import org.bukkit.entity.Entity;
import org.bukkit.entity.Player;
import com.comphenix.protocol.injector.PacketConstructor.BukkitUnwrapper;
import com.comphenix.protocol.reflect.FieldAccessException;
import com.comphenix.protocol.reflect.FieldUtils;
import com.comphenix.protocol.reflect.FuzzyReflection;

View File

@ -2,17 +2,12 @@ package com.comphenix.protocol.injector;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.minecraft.server.Packet;
import com.comphenix.protocol.events.PacketContainer;
import com.comphenix.protocol.reflect.FieldAccessException;
import com.comphenix.protocol.reflect.instances.DefaultInstances;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@ -176,82 +171,17 @@ public class PacketConstructor {
return false;
}
public static class BukkitUnwrapper implements Unwrapper {
private static Map<Class<?>, Method> cache = new ConcurrentHashMap<Class<?>, Method>();
@SuppressWarnings("unchecked")
@Override
public Object unwrapItem(Object wrappedObject) {
// Special case
if (wrappedObject instanceof Collection) {
return handleCollection((Collection<Object>) wrappedObject);
}
Class<?> currentClass = wrappedObject.getClass();
Method cachedMethod = initializeCache(currentClass);
try {
// Retrieve the handle
if (cachedMethod != null)
return cachedMethod.invoke(wrappedObject);
else
return null;
} catch (IllegalArgumentException e) {
// Impossible
return null;
} catch (IllegalAccessException e) {
return null;
} catch (InvocationTargetException e) {
// This is REALLY bad
throw new RuntimeException("Minecraft error.", e);
}
}
private Object handleCollection(Collection<Object> wrappedObject) {
@SuppressWarnings("unchecked")
Collection<Object> copy = DefaultInstances.DEFAULT.getDefault(wrappedObject.getClass());
if (copy != null) {
// Unwrap every element
for (Object element : wrappedObject) {
copy.add(unwrapItem(element));
}
return copy;
} else {
// Impossible
return null;
}
}
private Method initializeCache(Class<?> type) {
// See if we're already determined this
if (cache.containsKey(type)) {
// We will never remove from the cache, so this ought to be thread safe
return cache.get(type);
}
try {
Method find = type.getMethod("getHandle");
// It's thread safe, as getMethod should return the same handle
cache.put(type, find);
return find;
} catch (SecurityException e) {
return null;
} catch (NoSuchMethodException e) {
return null;
}
}
}
/**
* Represents a unwrapper for a constructor parameter.
*
* @author Kristian
*/
public static interface Unwrapper {
/**
* Convert the given wrapped object to the equivalent net.minecraft.server object.
* @param wrappedObject - wrapped object.
* @return The net.minecraft.server object.
*/
public Object unwrapItem(Object wrappedObject);
}
}