feat: improve send/disconnect failover behavior

This commit is contained in:
Maurice Eisenblätter 2024-06-25 01:25:27 +02:00
parent 47bcd53fd0
commit d432e76e81
No known key found for this signature in database
GPG Key ID: 2E553EFBAE92FB3A
11 changed files with 298 additions and 902 deletions

View File

@ -1,494 +0,0 @@
/*
* ProtocolLib - Bukkit server library that allows access to the Minecraft protocol.
* Copyright (C) 2012 Kristian S. Stangeland
*
* This program is free software; you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with this program;
* if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
* 02111-1307 USA
*/
package com.comphenix.protocol.concurrency;
import com.google.common.base.Objects;
import com.google.common.collect.Range;
import java.util.HashSet;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
/**
* Represents a generic store of intervals and associated values. No two intervals
* can overlap in this representation.
* <p>
* Note that this implementation is not thread safe.
*
* @author Kristian
*
* @param <TKey> - type of the key. Must implement Comparable.
* @param <TValue> - type of the value to associate.
*/
public abstract class AbstractIntervalTree<TKey extends Comparable<TKey>, TValue> {
protected enum State {
OPEN,
CLOSE,
BOTH
}
/**
* Represents a range and a value in this interval tree.
*/
public class Entry implements Map.Entry<Range<TKey>, TValue> {
private final EndPoint left;
private final EndPoint right;
Entry(EndPoint left, EndPoint right) {
if (left == null)
throw new IllegalAccessError("left cannot be NUll");
if (right == null)
throw new IllegalAccessError("right cannot be NUll");
if (left.key.compareTo(right.key) > 0)
throw new IllegalArgumentException(
"Left key (" + left.key + ") cannot be greater than the right key (" + right.key + ")");
this.left = left;
this.right = right;
}
@Override
public Range<TKey> getKey() {
return Range.closed(left.key, right.key);
}
@Override
public TValue getValue() {
return left.value;
}
@Override
public TValue setValue(TValue value) {
TValue old = left.value;
// Set both end points
left.value = value;
right.value = value;
return old;
}
@SuppressWarnings("rawtypes")
@Override
public boolean equals(Object obj) {
// Quick equality check
if (obj == this) {
return true;
} else if (obj instanceof AbstractIntervalTree.Entry) {
return Objects.equal(left.key, ((AbstractIntervalTree.Entry) obj).left.key) &&
Objects.equal(right.key, ((AbstractIntervalTree.Entry) obj).right.key) &&
Objects.equal(left.value, ((AbstractIntervalTree.Entry) obj).left.value);
} else {
return false;
}
}
@Override
public int hashCode() {
return Objects.hashCode(left.key, right.key, left.value);
}
@Override
public String toString() {
return String.format("Value %s at [%s, %s]", left.value, left.key, right.key);
}
}
/**
* Represents a single end point (open, close or both) of a range.
*/
protected class EndPoint {
// Whether or not the end-point is opening a range, closing a range or both.
public State state;
// The value this range contains
public TValue value;
// The key of this end point
public TKey key;
public EndPoint(State state, TKey key, TValue value) {
this.state = state;
this.key = key;
this.value = value;
}
}
// To quickly look up ranges we'll index them by endpoints
protected NavigableMap<TKey, EndPoint> bounds = new TreeMap<>();
/**
* Removes every interval that intersects with the given range.
* @param lowerBound - lowest value to remove.
* @param upperBound - highest value to remove.
* @return Intervals that were removed
*/
public Set<Entry> remove(TKey lowerBound, TKey upperBound) {
return remove(lowerBound, upperBound, false);
}
/**
* Removes every interval that intersects with the given range.
* @param lowerBound - lowest value to remove.
* @param upperBound - highest value to remove.
* @param preserveDifference - whether or not to preserve the intervals that are partially outside.
* @return Intervals that were removed
*/
public Set<Entry> remove(TKey lowerBound, TKey upperBound, boolean preserveDifference) {
checkBounds(lowerBound, upperBound);
NavigableMap<TKey, EndPoint> range = bounds.subMap(lowerBound, true, upperBound, true);
EndPoint first = getNextEndPoint(lowerBound, true);
EndPoint last = getPreviousEndPoint(upperBound, true);
// Used while resizing intervals
EndPoint previous = null;
EndPoint next = null;
Set<Entry> resized = new HashSet<>();
Set<Entry> removed = new HashSet<>();
// Remove the previous element too. A close end-point must be preceded by an OPEN end-point.
if (first != null && first.state == State.CLOSE) {
previous = getPreviousEndPoint(first.key, false);
// Add the interval back
if (previous != null) {
removed.add(getEntry(previous, first));
}
}
// Get the closing element too.
if (last != null && last.state == State.OPEN) {
next = getNextEndPoint(last.key, false);
if (next != null) {
removed.add(getEntry(last, next));
}
}
// Now remove both ranges
removeEntrySafely(previous, first);
removeEntrySafely(last, next);
// Add new resized intervals
if (preserveDifference) {
if (previous != null) {
resized.add(putUnsafe(previous.key, decrementKey(lowerBound), previous.value));
}
if (next != null) {
resized.add(putUnsafe(incrementKey(upperBound), next.key, next.value));
}
}
// Get the removed entries too
getEntries(removed, range);
invokeEntryRemoved(removed);
if (preserveDifference) {
invokeEntryAdded(resized);
}
// Remove the range as well
range.clear();
return removed;
}
/**
* Retrieve the entry from a given set of end points.
* @param left - leftmost end point.
* @param right - rightmost end point.
* @return The associated entry.
*/
protected Entry getEntry(EndPoint left, EndPoint right) {
if (left == null)
throw new IllegalArgumentException("left endpoint cannot be NULL.");
if (right == null)
throw new IllegalArgumentException("right endpoint cannot be NULL.");
// Make sure the order is correct
if (right.key.compareTo(left.key) < 0) {
return getEntry(right, left);
} else {
return new Entry(left, right);
}
}
private void removeEntrySafely(EndPoint left, EndPoint right) {
if (left != null && right != null) {
bounds.remove(left.key);
bounds.remove(right.key);
}
}
// Adds a given end point
protected EndPoint addEndPoint(TKey key, TValue value, State state) {
EndPoint endPoint = bounds.get(key);
if (endPoint != null) {
endPoint.state = State.BOTH;
} else {
endPoint = new EndPoint(state, key, value);
bounds.put(key, endPoint);
}
return endPoint;
}
/**
* Associates a given interval of keys with a certain value. Any previous
* association will be overwritten in the given interval.
* <p>
* Overlapping intervals are not permitted. A key can only be associated with a single value.
*
* @param lowerBound - the minimum key (inclusive).
* @param upperBound - the maximum key (inclusive).
* @param value - the value, or NULL to reset this range.
*/
public void put(TKey lowerBound, TKey upperBound, TValue value) {
// While we don't permit overlapping intervals, we'll still allow overwriting existing intervals.
remove(lowerBound, upperBound, true);
invokeEntryAdded(putUnsafe(lowerBound, upperBound, value));
}
/**
* Associates a given interval without performing any interval checks.
* @param lowerBound - the minimum key (inclusive).
* @param upperBound - the maximum key (inclusive).
* @param value - the value, or NULL to reset the range.
*/
private Entry putUnsafe(TKey lowerBound, TKey upperBound, TValue value) {
// OK. Add the end points now
if (value != null) {
EndPoint left = addEndPoint(lowerBound, value, State.OPEN);
EndPoint right = addEndPoint(upperBound, value, State.CLOSE);
return new Entry(left, right);
} else {
return null;
}
}
/**
* Used to verify the validity of the given interval.
* @param lowerBound - lower bound (inclusive).
* @param upperBound - upper bound (inclusive).
*/
private void checkBounds(TKey lowerBound, TKey upperBound) {
if (lowerBound == null)
throw new IllegalAccessError("lowerbound cannot be NULL.");
if (upperBound == null)
throw new IllegalAccessError("upperBound cannot be NULL.");
if (upperBound.compareTo(lowerBound) < 0)
throw new IllegalArgumentException("upperBound cannot be less than lowerBound.");
}
/**
* Determines if the given key is within an interval.
* @param key - key to check.
* @return TRUE if the given key is within an interval in this tree, FALSE otherwise.
*/
public boolean containsKey(TKey key) {
return getEndPoint(key) != null;
}
/**
* Enumerates over every range in this interval tree.
* @return Number of ranges.
*/
public Set<Entry> entrySet() {
// Don't mind the Java noise
Set<Entry> result = new HashSet<>();
getEntries(result, bounds);
return result;
}
/**
* Remove every interval.
*/
public void clear() {
if (!bounds.isEmpty()) {
remove(bounds.firstKey(), bounds.lastKey());
}
}
/**
* Converts a map of end points into a set of entries.
* @param destination - set of entries.
* @param map - a map of end points.
*/
private void getEntries(Set<Entry> destination, NavigableMap<TKey, EndPoint> map) {
Map.Entry<TKey, EndPoint> last = null;
for (Map.Entry<TKey, EndPoint> entry : map.entrySet()) {
switch (entry.getValue().state) {
case BOTH:
EndPoint point = entry.getValue();
destination.add(new Entry(point, point));
break;
case CLOSE:
if (last != null) {
destination.add(new Entry(last.getValue(), entry.getValue()));
}
break;
case OPEN:
// We don't know the full range yet
last = entry;
break;
default:
throw new IllegalStateException("Illegal open/close state detected.");
}
}
}
/**
* Inserts every range from the given tree into the current tree.
* @param other - the other tree to read from.
*/
public void putAll(AbstractIntervalTree<TKey, TValue> other) {
// Naively copy every range.
for (Entry entry : other.entrySet()) {
put(entry.left.key, entry.right.key, entry.getValue());
}
}
/**
* Retrieves the value of the range that matches the given key, or NULL if nothing was found.
* @param key - the level to read for.
* @return The correct amount of experience, or NULL if nothing was recorded.
*/
public TValue get(TKey key) {
EndPoint point = getEndPoint(key);
if (point != null)
return point.value;
else
return null;
}
/**
* Get the left-most end-point associated with this key.
* @param key - key to search for.
* @return The end point found, or NULL.
*/
protected EndPoint getEndPoint(TKey key) {
EndPoint ends = bounds.get(key);
if (ends != null) {
// Always return the end point to the left
if (ends.state == State.CLOSE) {
Map.Entry<TKey, EndPoint> left = bounds.floorEntry(decrementKey(key));
return left != null ? left.getValue() : null;
} else {
return ends;
}
} else {
// We need to determine if the point intersects with a range
Map.Entry<TKey, EndPoint> left = bounds.floorEntry(key);
// We only need to check to the left
if (left != null && left.getValue().state == State.OPEN) {
return left.getValue();
} else {
return null;
}
}
}
/**
* Get the previous end point of a given key.
* @param point - the point to search with.
* @param inclusive - whether or not to include the current point in the search.
* @return The previous end point of a given given key, or NULL if not found.
*/
protected EndPoint getPreviousEndPoint(TKey point, boolean inclusive) {
if (point != null) {
Map.Entry<TKey, EndPoint> previous = bounds.floorEntry(inclusive ? point : decrementKey(point));
if (previous != null)
return previous.getValue();
}
return null;
}
/**
* Get the next end point of a given key.
* @param point - the point to search with.
* @param inclusive - whether or not to include the current point in the search.
* @return The next end point of a given given key, or NULL if not found.
*/
protected EndPoint getNextEndPoint(TKey point, boolean inclusive) {
if (point != null) {
Map.Entry<TKey, EndPoint> next = bounds.ceilingEntry(inclusive ? point : incrementKey(point));
if (next != null)
return next.getValue();
}
return null;
}
private void invokeEntryAdded(Entry added) {
if (added != null) {
onEntryAdded(added);
}
}
private void invokeEntryAdded(Set<Entry> added) {
for (Entry entry : added) {
onEntryAdded(entry);
}
}
private void invokeEntryRemoved(Set<Entry> removed) {
for (Entry entry : removed) {
onEntryRemoved(entry);
}
}
// Listeners for added or removed entries
/**
* Invoked when an entry is added.
* @param added - the entry that was added.
*/
protected void onEntryAdded(Entry added) { }
/**
* Invoked when an entry is removed.
* @param removed - the removed entry.
*/
protected void onEntryRemoved(Entry removed) { }
// Helpers for decrementing or incrementing key values
/**
* Decrement the given key by one unit.
* @param key - the key that should be decremented.
* @return The new decremented key.
*/
protected abstract TKey decrementKey(TKey key);
/**
* Increment the given key by one unit.
* @param key - the key that should be incremented.
* @return The new incremented key.
*/
protected abstract TKey incrementKey(TKey key);
}

View File

@ -1,266 +0,0 @@
/*
* ProtocolLib - Bukkit server library that allows access to the Minecraft protocol.
* Copyright (C) 2012 Kristian S. Stangeland
*
* This program is free software; you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with this program;
* if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
* 02111-1307 USA
*/
package com.comphenix.protocol.concurrency;
import com.comphenix.protocol.utility.SafeCacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.RemovalCause;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
/**
* A map that supports blocking on read operations. Null keys are not supported.
* <p>
* Values are stored as weak references, and will be automatically removed once they've all been dereferenced.
* <p>
* @author Kristian
*
* @param <TKey> - type of the key.
* @param <TValue> - type of the value.
*/
public class BlockingHashMap<TKey, TValue> {
// Map of values
private final ConcurrentMap<TKey, TValue> backingMap;
// Map of locked objects
private final ConcurrentMap<TKey, Object> locks;
/**
* Retrieve a cache loader that will always throw an exception.
* @param <TKey> Type of the key
* @param <TValue> Type of the value
* @return An invalid cache loader.
*/
public static <TKey, TValue> CacheLoader<TKey, TValue> newInvalidCacheLoader() {
return new CacheLoader<TKey, TValue>() {
@Override
public TValue load(TKey key) throws Exception {
throw new IllegalStateException("Illegal use. Access the map directly instead.");
}
};
}
/**
* Initialize a new map.
*/
public BlockingHashMap() {
backingMap = SafeCacheBuilder.<TKey, TValue>newBuilder().
weakValues().
removalListener(
new RemovalListener<TKey, TValue>() {
@Override
public void onRemoval(RemovalNotification<TKey, TValue> entry) {
// Clean up locks too
if (entry.getCause() != RemovalCause.REPLACED) {
locks.remove(entry.getKey());
}
}
}).
build(BlockingHashMap.<TKey, TValue> newInvalidCacheLoader());
// Normal concurrent hash map
locks = new ConcurrentHashMap<>();
}
/**
* Initialize a new map.
* @param <TKey> Type of the key
* @param <TValue> Type of the value
* @return The created map.
*/
public static <TKey, TValue> BlockingHashMap<TKey, TValue> create() {
return new BlockingHashMap<>();
}
/**
* Waits until a value has been associated with the given key, and then retrieves that value.
* @param key - the key whose associated value is to be returned
* @return The value to which the specified key is mapped.
* @throws InterruptedException If the current thread got interrupted while waiting.
*/
public TValue get(TKey key) throws InterruptedException {
if (key == null)
throw new IllegalArgumentException("key cannot be NULL.");
TValue value = backingMap.get(key);
// Only lock if no value is available
if (value == null) {
final Object lock = getLock(key);
synchronized (lock) {
while (value == null) {
lock.wait();
value = backingMap.get(key);
}
}
}
return value;
}
/**
* Waits until a value has been associated with the given key, and then retrieves that value.
* @param key - the key whose associated value is to be returned
* @param timeout - the amount of time to wait until an association has been made.
* @param unit - unit of timeout.
* @return The value to which the specified key is mapped, or NULL if the timeout elapsed.
* @throws InterruptedException If the current thread got interrupted while waiting.
*/
public TValue get(TKey key, long timeout, TimeUnit unit) throws InterruptedException {
return get(key, timeout, unit, false);
}
/**
* Waits until a value has been associated with the given key, and then retrieves that value.
* <p>
* If timeout is zero, this method will return immediately if it can't find an socket injector.
*
* @param key - the key whose associated value is to be returned
* @param timeout - the amount of time to wait until an association has been made.
* @param unit - unit of timeout.
* @param ignoreInterrupted - TRUE if we should ignore the thread being interrupted, FALSE otherwise.
* @return The value to which the specified key is mapped, or NULL if the timeout elapsed.
* @throws InterruptedException If the current thread got interrupted while waiting.
*/
public TValue get(TKey key, long timeout, TimeUnit unit, boolean ignoreInterrupted) throws InterruptedException {
if (key == null)
throw new IllegalArgumentException("key cannot be NULL.");
if (unit == null)
throw new IllegalArgumentException("Unit cannot be NULL.");
if (timeout < 0)
throw new IllegalArgumentException("Timeout cannot be less than zero.");
TValue value = backingMap.get(key);
// Only lock if no value is available
if (value == null && timeout > 0) {
final Object lock = getLock(key);
final long stopTimeNS = System.nanoTime() + unit.toNanos(timeout);
// Don't exceed the timeout
synchronized (lock) {
while (value == null) {
try {
long remainingTime = stopTimeNS - System.nanoTime();
if (remainingTime > 0) {
TimeUnit.NANOSECONDS.timedWait(lock, remainingTime);
value = backingMap.get(key);
} else {
// Timeout elapsed
break;
}
} catch (InterruptedException e) {
// This is fairly dangerous - but we might HAVE to block the thread
if (!ignoreInterrupted)
throw e;
}
}
}
}
return value;
}
/**
* Associate a given key with the given value.
* <p>
* Wakes up any blocking getters on this specific key.
*
* @param key - the key to associate.
* @param value - the value.
* @return The previously associated value.
*/
public TValue put(TKey key, TValue value) {
if (value == null)
throw new IllegalArgumentException("This map doesn't support NULL values.");
final TValue previous = backingMap.put(key, value);
final Object lock = getLock(key);
// Inform our readers about this change
synchronized (lock) {
lock.notifyAll();
return previous;
}
}
/**
* If and only if a key is not present in the map will it be associated with the given value.
* @param key - the key to associate.
* @param value - the value to associate.
* @return The previous value this key has been associated with.
*/
public TValue putIfAbsent(TKey key, TValue value) {
if (value == null)
throw new IllegalArgumentException("This map doesn't support NULL values.");
final TValue previous = backingMap.putIfAbsent(key, value);
// No need to unlock readers if we haven't changed anything
if (previous == null) {
final Object lock = getLock(key);
synchronized (lock) {
lock.notifyAll();
}
}
return previous;
}
public int size() {
return backingMap.size();
}
public Collection<TValue> values() {
return backingMap.values();
}
public Set<TKey> keys() {
return backingMap.keySet();
}
/**
* Atomically retrieve the lock associated with a given key.
* @param key - the current key.
* @return An asssociated lock.
*/
private Object getLock(TKey key) {
Object lock = locks.get(key);
if (lock == null) {
Object created = new Object();
// Do this atomically
lock = locks.putIfAbsent(key, created);
// If we succeeded, use the latch we created - otherwise, use the already inserted latch
if (lock == null) {
lock = created;
}
}
return lock;
}
}

View File

@ -173,7 +173,7 @@ public class PacketFilterManager implements ListenerInvoker, InternalManager {
}
// process outbound
this.networkManagerInjector.getInjector(receiver).sendServerPacket(packet.getHandle(), marker, filters);
this.networkManagerInjector.getInjector(receiver).sendClientboundPacket(packet.getHandle(), marker, filters);
}
}
@ -227,7 +227,7 @@ public class PacketFilterManager implements ListenerInvoker, InternalManager {
}
// post to the player inject, reset our cancel state change
this.networkManagerInjector.getInjector(sender).receiveClientPacket(nmsPacket);
this.networkManagerInjector.getInjector(sender).readServerboundPacket(nmsPacket);
}
}

View File

@ -15,6 +15,8 @@ import com.comphenix.protocol.events.NetworkMarker;
*/
public interface Injector {
SocketAddress getAddress();
/**
* Retrieve the current protocol version of the player.
*
@ -41,12 +43,14 @@ public interface Injector {
* @param marker - the network marker.
* @param filtered - whether or not the packet is filtered.
*/
void sendServerPacket(Object packet, NetworkMarker marker, boolean filtered);
void sendClientboundPacket(Object packet, NetworkMarker marker, boolean filtered);
void receiveClientPacket(Object packet);
void readServerboundPacket(Object packet);
void sendWirePacket(WirePacket packet);
void disconnect(String message);
/**
* Retrieve the current protocol state. Note that since 1.20.2 the client and server direction can be in different
* protocol states.
@ -56,8 +60,6 @@ public interface Injector {
*/
Protocol getCurrentProtocol(PacketType.Sender sender);
SocketAddress getAddress();
/**
* Retrieve the current player or temporary player associated with the injector.
*
@ -72,8 +74,6 @@ public interface Injector {
*/
void setPlayer(Player player);
void disconnect(String message);
boolean isConnected();
/**

View File

@ -42,17 +42,21 @@ final class EmptyInjector implements Injector {
}
@Override
public void sendServerPacket(Object packet, NetworkMarker marker, boolean filtered) {
public void sendClientboundPacket(Object packet, NetworkMarker marker, boolean filtered) {
}
@Override
public void receiveClientPacket(Object packet) {
public void readServerboundPacket(Object packet) {
}
@Override
public void sendWirePacket(WirePacket packet) {
}
@Override
public void disconnect(String message) {
}
@Override
public Protocol getCurrentProtocol(PacketType.Sender sender) {
return Protocol.HANDSHAKING;
@ -68,10 +72,6 @@ final class EmptyInjector implements Injector {
this.player = player;
}
@Override
public void disconnect(String message) {
}
@Override
public boolean isConnected() {
return false;

View File

@ -11,8 +11,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.bukkit.Bukkit;
import org.bukkit.entity.Player;
@ -34,15 +32,10 @@ import com.comphenix.protocol.injector.packet.PacketRegistry;
import com.comphenix.protocol.reflect.FuzzyReflection;
import com.comphenix.protocol.reflect.accessors.Accessors;
import com.comphenix.protocol.reflect.accessors.FieldAccessor;
import com.comphenix.protocol.reflect.accessors.MethodAccessor;
import com.comphenix.protocol.reflect.fuzzy.FuzzyFieldContract;
import com.comphenix.protocol.utility.ByteBuddyGenerated;
import com.comphenix.protocol.utility.MinecraftFields;
import com.comphenix.protocol.utility.MinecraftMethods;
import com.comphenix.protocol.utility.MinecraftProtocolVersion;
import com.comphenix.protocol.utility.MinecraftReflection;
import com.comphenix.protocol.utility.MinecraftVersion;
import com.comphenix.protocol.wrappers.WrappedChatComponent;
import com.comphenix.protocol.wrappers.WrappedGameProfile;
import io.netty.channel.Channel;
@ -85,6 +78,7 @@ public class NettyChannelInjector implements Injector {
// protocol lib stuff we need
private final ErrorReporter errorReporter;
private final NetworkProcessor networkProcessor;
private final PacketListenerInvoker listenerInvoker;
// references
private final Object networkManager;
@ -108,7 +102,6 @@ public class NettyChannelInjector implements Injector {
private Player player;
// lazy initialized fields, if we don't need them we don't bother about them
private volatile Object playerConnection;
private volatile InboundProtocolReader inboundProtocolReader;
public NettyChannelInjector(
@ -125,6 +118,7 @@ public class NettyChannelInjector implements Injector {
// protocol lib stuff
this.errorReporter = errorReporter;
this.networkProcessor = new NetworkProcessor(errorReporter);
this.listenerInvoker = new PacketListenerInvoker(networkManager);
// references
this.networkManager = networkManager;
@ -260,7 +254,7 @@ public class NettyChannelInjector implements Injector {
}
@Override
public void sendServerPacket(Object packet, NetworkMarker marker, boolean filtered) {
public void sendClientboundPacket(Object packet, NetworkMarker marker, boolean filtered) {
// ignore call if the injector is closed or not injected
if (this.closed.get() || !this.injected) {
return;
@ -277,14 +271,7 @@ public class NettyChannelInjector implements Injector {
}
try {
Object playerConnection = this.getPlayerConnection();
// try to use the player connection if possible
if (playerConnection != null) {
MinecraftMethods.getPlayerConnectionSendMethod().invoke(playerConnection, packet);
} else {
MinecraftMethods.getNetworkManagerSendMethod().invoke(this.networkManager, packet);
}
this.listenerInvoker.send(packet);
} catch (Exception exception) {
this.errorReporter.reportWarning(this, Report.newBuilder(REPORT_CANNOT_SEND_PACKET)
.messageParam(packet, this.playerName)
@ -294,7 +281,7 @@ public class NettyChannelInjector implements Injector {
}
@Override
public void receiveClientPacket(Object packet) {
public void readServerboundPacket(Object packet) {
// ignore call if the injector is closed or not injected
if (this.closed.get() || !this.injected) {
return;
@ -303,7 +290,7 @@ public class NettyChannelInjector implements Injector {
this.ensureInEventLoop(() -> {
try {
// try to invoke the method, this should normally not fail
MinecraftMethods.getNetworkManagerReadPacketMethod().invoke(this.networkManager, null, packet);
this.listenerInvoker.read(packet);
} catch (Exception exception) {
this.errorReporter.reportWarning(this, Report.newBuilder(REPORT_CANNOT_READ_PACKET)
.messageParam(packet, this.playerName)
@ -332,6 +319,23 @@ public class NettyChannelInjector implements Injector {
});
}
@Override
public void disconnect(String message) {
// ignore call if the injector is closed or not injected
if (this.closed.get() || !this.injected) {
return;
}
try {
this.listenerInvoker.disconnect(message);
} catch (Exception exception) {
this.errorReporter.reportWarning(this, Report.newBuilder(REPORT_CANNOT_DISCONNECT)
.messageParam(this.playerName, message)
.error(exception)
.build());
}
}
@Override
public Protocol getCurrentProtocol(PacketType.Sender sender) {
return ChannelProtocolUtil.PROTOCOL_RESOLVER.apply(this.channel, sender);
@ -359,35 +363,6 @@ public class NettyChannelInjector implements Injector {
this.playerName = player.getName();
}
@Override
public void disconnect(String message) {
try {
Object playerConnection = this.getPlayerConnection();
// try to use the player connection if possible
if (playerConnection != null) {
// this method prefers the main thread so no event loop here
MethodAccessor accessor = MinecraftMethods.getPlayerConnectionDisconnectMethod();
// check if the parameter is a chat component
if (MinecraftReflection.isIChatBaseComponent(accessor.getMethod().getParameters()[0].getClass())) {
Object component = WrappedChatComponent.fromText(message).getHandle();
accessor.invoke(playerConnection, component);
} else {
accessor.invoke(playerConnection, message);
}
} else {
Object component = WrappedChatComponent.fromText(message).getHandle();
MinecraftMethods.getNetworkManagerDisconnectMethod().invoke(this.networkManager, component);
}
} catch (Exception exception) {
this.errorReporter.reportWarning(this, Report.newBuilder(REPORT_CANNOT_DISCONNECT)
.messageParam(this.playerName, message)
.error(exception)
.build());
}
}
@Override
public boolean isConnected() {
return this.channel.isActive();
@ -538,7 +513,7 @@ public class NettyChannelInjector implements Injector {
// ensure that we are on the main thread if we need to
if (this.channelListener.hasMainThreadListener(packetType) && !Bukkit.isPrimaryThread()) {
// not on the main thread but we are required to be - re-schedule the packet on the main thread
ProtocolLibrary.getScheduler().runTask(() -> this.sendServerPacket(packet, null, true));
ProtocolLibrary.getScheduler().runTask(() -> this.sendClientboundPacket(packet, null, true));
return null;
}
@ -618,30 +593,6 @@ public class NettyChannelInjector implements Injector {
});
}
/**
* Returns the PlayerConnection or null if the player instance is null or a
* temporary player
*
* @return the PlayerConnection or null
*/
@Nullable
private Object getPlayerConnection() {
// resolve the player connection if needed
if (this.playerConnection == null) {
Player target = this.getPlayer();
// if player is null or temporary return null
if (target == null || target instanceof ByteBuddyGenerated) {
return null;
}
// this can in some cases still return null because the connection isn't set
// during the configuration phase but the player instance got created
this.playerConnection = MinecraftFields.getPlayerConnection(target);
}
return this.playerConnection;
}
private void ensureInEventLoop(Runnable runnable) {
this.ensureInEventLoop(this.channel.eventLoop(), runnable);
}

View File

@ -0,0 +1,237 @@
package com.comphenix.protocol.injector.netty.channel;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Parameter;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import com.comphenix.protocol.ProtocolLogger;
import com.comphenix.protocol.reflect.FuzzyReflection;
import com.comphenix.protocol.reflect.accessors.Accessors;
import com.comphenix.protocol.reflect.accessors.MethodAccessor;
import com.comphenix.protocol.reflect.fuzzy.FuzzyMethodContract;
import com.comphenix.protocol.utility.MinecraftReflection;
import com.comphenix.protocol.wrappers.WrappedChatComponent;
import io.netty.channel.ChannelHandlerContext;
/**
* This class facilitates the invocation of methods on the current packet listener.
* It attempts to execute the <code>send</code>, <code>read</code>, and <code>disconnect</code>
* methods and, upon failure (either due to the absence of the method or the packet
* listener being of an incorrect type), it delegates the call to the network manager.
*
* <p>Supported packet listener types include CONFIGURATION and PLAY. If the packet
* listener does not match these types, or if the required method is missing, the
* operation falls back to similar methods available in the network manager.
*
* <p>It is important to note that this class does not handle exceptions internally.
* Instead, it propagates them to the caller. During the initialization phase, the
* class will throw an exception if the necessary methods in the network manager are
* not available, ensuring that these dependencies are addressed early in the runtime.
*/
public class PacketListenerInvoker {
private static final Class<?> PACKET_LISTENER_CLASS = MinecraftReflection.getMinecraftClass("network.PacketListener", "PacketListener");
private static final Class<?> GAME_PACKET_LISTENER_CLASS = MinecraftReflection.getPlayerConnectionClass();
private static final Class<?> COMMON_PACKET_LISTENER_CLASS = MinecraftReflection.getNullableNMS("server.network.ServerCommonPacketListenerImpl");
private static final Class<?> PREFERRED_PACKET_LISTENER_CLASS = COMMON_PACKET_LISTENER_CLASS != null ? COMMON_PACKET_LISTENER_CLASS : GAME_PACKET_LISTENER_CLASS;
private static final MethodAccessor PACKET_LISTENER_SEND = getPacketListenerSend();
private static final MethodAccessor PACKET_LISTENER_DISCONNECT = getPacketListenerDisconnect();
private static final boolean DOES_PACKET_LISTENER_DISCONNECT_USE_COMPONENT = doesPacketListenerDisconnectUseComponent();
private static final MethodAccessor NETWORK_MANAGER_SEND = getNetworkManagerSend();
private static final MethodAccessor NETWORK_MANAGER_READ = getNetworkManagerRead();
private static final MethodAccessor NETWORK_MANAGER_DISCONNECT = getNetworkManagerDisconnect();
private static final MethodAccessor NETWORK_MANAGER_PACKET_LISTENER = getNetworkManagerPacketListener();
public static void ensureStaticInitializedWithoutError() {
}
private static MethodAccessor getPacketListenerSend() {
FuzzyReflection packetListener = FuzzyReflection.fromClass(PREFERRED_PACKET_LISTENER_CLASS);
List<Method> send = packetListener.getMethodList(FuzzyMethodContract.newBuilder()
.banModifier(Modifier.STATIC)
.returnTypeVoid()
.parameterCount(1)
.parameterExactType(MinecraftReflection.getPacketClass(), 0)
.build());
if (send.isEmpty()) {
ProtocolLogger.debug("Can't get packet listener send method");
return null;
}
return Accessors.getMethodAccessor(send.get(0));
}
private static MethodAccessor getPacketListenerDisconnect() {
FuzzyReflection packetListener = FuzzyReflection.fromClass(PREFERRED_PACKET_LISTENER_CLASS);
List<Method> disconnect = packetListener.getMethodList(FuzzyMethodContract.newBuilder()
.banModifier(Modifier.STATIC)
.returnTypeVoid()
.parameterCount(1)
.parameterExactType(MinecraftReflection.getIChatBaseComponentClass(), 0)
.build());
if (disconnect.isEmpty()) {
disconnect = packetListener.getMethodList(FuzzyMethodContract.newBuilder()
.banModifier(Modifier.STATIC)
.returnTypeVoid()
.nameRegex("disconnect.*")
.parameterCount(1)
.parameterExactType(String.class, 0)
.build());
}
if (disconnect.isEmpty()) {
ProtocolLogger.debug("Can't get packet listener disconnect method");
return null;
}
return Accessors.getMethodAccessor(disconnect.get(0));
}
private static boolean doesPacketListenerDisconnectUseComponent() {
if (PACKET_LISTENER_DISCONNECT != null) {
Parameter reason = PACKET_LISTENER_DISCONNECT.getMethod().getParameters()[0];
return MinecraftReflection.isIChatBaseComponent(reason.getClass());
}
return false;
}
private static MethodAccessor getNetworkManagerSend() {
FuzzyReflection networkManager = FuzzyReflection.fromClass(MinecraftReflection.getNetworkManagerClass());
Method send = networkManager.getMethod(FuzzyMethodContract.newBuilder()
.banModifier(Modifier.STATIC)
.returnTypeVoid()
.parameterCount(1)
.parameterExactType(MinecraftReflection.getPacketClass(), 0)
.build());
return Accessors.getMethodAccessor(send);
}
private static MethodAccessor getNetworkManagerRead() {
FuzzyReflection networkManager = FuzzyReflection.fromClass(MinecraftReflection.getNetworkManagerClass(), true);
Method read = networkManager
.getMethodByParameters("read", ChannelHandlerContext.class, MinecraftReflection.getPacketClass());
return Accessors.getMethodAccessor(read);
}
private static MethodAccessor getNetworkManagerDisconnect() {
FuzzyReflection networkManager = FuzzyReflection.fromClass(MinecraftReflection.getNetworkManagerClass());
Method disconnect = networkManager.getMethod(FuzzyMethodContract.newBuilder()
.banModifier(Modifier.STATIC)
.returnTypeVoid()
.parameterCount(1)
.parameterExactType(MinecraftReflection.getIChatBaseComponentClass(), 0)
.build());
return Accessors.getMethodAccessor(disconnect);
}
private static MethodAccessor getNetworkManagerPacketListener() {
FuzzyReflection networkManager = FuzzyReflection.fromClass(MinecraftReflection.getNetworkManagerClass());
Method packetListener = networkManager.getMethod(FuzzyMethodContract.newBuilder()
.banModifier(Modifier.STATIC)
.returnTypeExact(PACKET_LISTENER_CLASS)
.parameterCount(0)
.build());
return Accessors.getMethodAccessor(packetListener);
}
private final Object networkManager;
private final AtomicReference<Object> packetListener = new AtomicReference<>(null);
PacketListenerInvoker(Object networkManager) {
if (!MinecraftReflection.is(MinecraftReflection.getNetworkManagerClass(), networkManager)) {
throw new IllegalArgumentException("Given NetworkManager isn't an isntance of NetworkManager");
}
this.networkManager = networkManager;
}
/**
* Retrieves the current packet listener associated with this network manager.
*
* <p>This method ensures thread-safety and returns the packet listener only if it is an
* instance of the preferred class. If the packet listener has changed or does not match
* the preferred class, it returns {@code null}.
*
* @return the current packet listener if it meets the required criteria, otherwise {@code null}.
*/
private Object getPacketListener() {
// Retrieve the current packet listener from the network manager using reflection.
Object packetListener = NETWORK_MANAGER_PACKET_LISTENER.invoke(this.networkManager);
// Perform a thread-safe check to see if the packet listener has changed since the last retrieval.
if (!this.packetListener.compareAndSet(packetListener, packetListener)) {
// If the packet listener has changed, attempt to update the cached listener to the new instance,
// or invalidate the cached object if it does not match the preferred type.
if (PREFERRED_PACKET_LISTENER_CLASS.isInstance(packetListener)) {
this.packetListener.set(packetListener);
} else {
this.packetListener.set(null);
}
}
// Return the currently cached packet listener, which may be null if it does not match the preferred type.
return this.packetListener.get();
}
/**
* Sends a packet using the current packet listener if available and valid; otherwise,
* falls back to the network manager.
*
* @param packet The packet to be sent.
*/
public void send(Object packet) {
Object packetListener = this.getPacketListener();
if (PACKET_LISTENER_SEND != null && packetListener != null) {
PACKET_LISTENER_SEND.invoke(packetListener, packet);
} else {
NETWORK_MANAGER_SEND.invoke(this.networkManager, packet);
}
}
/**
* Reads a packet directly using the network manager.
*
* @param packet The packet to be read.
*/
public void read(Object packet) {
NETWORK_MANAGER_READ.invoke(this.networkManager, null, packet);
}
/**
* Disconnects the player using the current packet listener if available and valid; otherwise,
* falls back to the network manager.
*
* @param reason The reason for the disconnection.
*/
public void disconnect(String reason) {
Object packetListener = this.getPacketListener();
boolean hasPacketListener = PACKET_LISTENER_DISCONNECT != null && packetListener != null;
Object wrapped = reason;
if (!hasPacketListener || DOES_PACKET_LISTENER_DISCONNECT_USE_COMPONENT) {
wrapped = WrappedChatComponent.fromText(reason).getHandle();
}
if (hasPacketListener) {
PACKET_LISTENER_DISCONNECT.invoke(packetListener, wrapped);
} else {
NETWORK_MANAGER_DISCONNECT.invoke(this.networkManager, wrapped);
}
}
}

View File

@ -170,7 +170,7 @@ public class TemporaryPlayerFactory {
*/
private static Object sendMessage(Injector injector, String message) {
for (PacketContainer packet : ChatExtensions.createChatPackets(message)) {
injector.sendServerPacket(packet.getHandle(), null, false);
injector.sendClientboundPacket(packet.getHandle(), null, false);
}
return null;

View File

@ -374,7 +374,7 @@ public final class MinecraftReflection {
return false;
}
// check for accidential class objects
// check for accidental class objects
if (object instanceof Class) {
return clazz.isAssignableFrom((Class<?>) object);
}

View File

@ -1,53 +0,0 @@
/*
* ProtocolLib - Bukkit server library that allows access to the Minecraft protocol.
* Copyright (C) 2012 Kristian S. Stangeland
*
* This program is free software; you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation; either version 2 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with this program;
* if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
* 02111-1307 USA
*/
package com.comphenix.protocol.concurrency;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.junit.jupiter.api.Test;
public class BlockingHashMapTest {
@Test
public void test() throws InterruptedException, ExecutionException {
final BlockingHashMap<Integer, String> map = BlockingHashMap.create();
ExecutorService service = Executors.newSingleThreadExecutor();
// Create a reader
Future<String> future = service.submit(() -> {
// Combine for easy reading
return map.get(0) + map.get(1);
});
// Wait a bit
Thread.sleep(50);
// Insert values
map.put(0, "hello ");
map.put(1, "world");
// Wait for the other thread to complete
assertEquals(future.get(), "hello world");
}
}

View File

@ -0,0 +1,21 @@
package com.comphenix.protocol.injector.netty.channel;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import com.comphenix.protocol.BukkitInitialization;
public class PacketListenerInvokerTest {
@BeforeAll
public static void beforeClass() {
BukkitInitialization.initializeAll();
}
@Test
public void test() {
assertDoesNotThrow(() -> PacketListenerInvoker.ensureStaticInitializedWithoutError());
}
}