Remove Acquisition.java

This commit is contained in:
TheMode 2021-04-25 19:08:04 +02:00
parent fd17a63f7c
commit b47946bfed
6 changed files with 121 additions and 148 deletions

View File

@ -1,7 +1,7 @@
package net.minestom.server; package net.minestom.server;
import com.google.common.collect.Queues; import com.google.common.collect.Queues;
import net.minestom.server.acquirable.Acquisition; import net.minestom.server.acquirable.Acquirable;
import net.minestom.server.instance.Chunk; import net.minestom.server.instance.Chunk;
import net.minestom.server.instance.Instance; import net.minestom.server.instance.Instance;
import net.minestom.server.instance.InstanceManager; import net.minestom.server.instance.InstanceManager;
@ -86,12 +86,12 @@ public final class UpdateManager {
// Monitoring // Monitoring
if (!tickMonitors.isEmpty()) { if (!tickMonitors.isEmpty()) {
final double acquisitionTimeMs = Acquisition.getCurrentWaitMonitoring() / 1e6D; final double acquisitionTimeMs = Acquirable.getAcquiringTime() / 1e6D;
final double tickTimeMs = tickTime / 1e6D; final double tickTimeMs = tickTime / 1e6D;
final TickMonitor tickMonitor = new TickMonitor(tickTimeMs, acquisitionTimeMs); final TickMonitor tickMonitor = new TickMonitor(tickTimeMs, acquisitionTimeMs);
this.tickMonitors.forEach(consumer -> consumer.accept(tickMonitor)); this.tickMonitors.forEach(consumer -> consumer.accept(tickMonitor));
Acquisition.resetWaitMonitoring(); Acquirable.resetAcquiringTime();
} }
// Flush all waiting packets // Flush all waiting packets

View File

@ -38,6 +38,25 @@ public interface Acquirable<T> {
AcquirableImpl.CURRENT_ENTITIES.set(entities); AcquirableImpl.CURRENT_ENTITIES.set(entities);
} }
/**
* Gets the time spent acquiring since last tick.
*
* @return the acquiring time
*/
static long getAcquiringTime() {
return AcquirableImpl.WAIT_COUNTER_NANO.get();
}
/**
* Resets {@link #getAcquiringTime()}.
* <p>
* Mostly for internal use.
*/
@ApiStatus.Internal
static void resetAcquiringTime() {
AcquirableImpl.WAIT_COUNTER_NANO.set(0);
}
/** /**
* Creates a new {@link Acquirable} object. * Creates a new {@link Acquirable} object.
* <p> * <p>
@ -69,7 +88,7 @@ public interface Acquirable<T> {
} else { } else {
final Thread currentThread = Thread.currentThread(); final Thread currentThread = Thread.currentThread();
final TickThread tickThread = getHandler().getTickThread(); final TickThread tickThread = getHandler().getTickThread();
var lock = Acquisition.acquireEnter(currentThread, tickThread); var lock = Acquired.acquireEnter(currentThread, tickThread);
return new Acquired<>(unwrap(), true, lock); return new Acquired<>(unwrap(), true, lock);
} }
} }

View File

@ -1,10 +1,10 @@
package net.minestom.server.acquirable; package net.minestom.server.acquirable;
import net.minestom.server.thread.TickThread;
import net.minestom.server.utils.async.AsyncUtils; import net.minestom.server.utils.async.AsyncUtils;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import java.util.Collection; import java.util.*;
import java.util.Iterator;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -17,7 +17,22 @@ public class AcquirableCollection<E> implements Collection<Acquirable<E>> {
} }
public void forEachSync(@NotNull Consumer<E> consumer) { public void forEachSync(@NotNull Consumer<E> consumer) {
Acquisition.acquireForEach(acquirableCollection, consumer); final Thread currentThread = Thread.currentThread();
var threadEntitiesMap = retrieveOptionalThreadMap(acquirableCollection, currentThread, consumer);
// Acquire all the threads one by one
{
for (var entry : threadEntitiesMap.entrySet()) {
final TickThread tickThread = entry.getKey();
final List<E> values = entry.getValue();
var lock = Acquired.acquireEnter(currentThread, tickThread);
for (E value : values) {
consumer.accept(value);
}
Acquired.acquireLeave(lock);
}
}
} }
public void forEachAsync(@NotNull Consumer<E> consumer) { public void forEachAsync(@NotNull Consumer<E> consumer) {
@ -95,4 +110,37 @@ public class AcquirableCollection<E> implements Collection<Acquirable<E>> {
public void clear() { public void clear() {
this.acquirableCollection.clear(); this.acquirableCollection.clear();
} }
/**
* Creates
*
* @param collection the acquirable collection
* @param currentThread the current thread
* @param consumer the consumer to execute when an element is already in the current thread
* @return a new Thread to acquirable elements map
*/
protected static <T> Map<TickThread, List<T>> retrieveOptionalThreadMap(@NotNull Collection<Acquirable<T>> collection,
@NotNull Thread currentThread,
@NotNull Consumer<T> consumer) {
// Separate a collection of acquirable elements into a map of thread->elements
// Useful to reduce the number of acquisition
Map<TickThread, List<T>> threadCacheMap = new HashMap<>();
for (var element : collection) {
final T value = element.unwrap();
final TickThread elementThread = element.getHandler().getTickThread();
if (currentThread == elementThread) {
// The element is managed in the current thread, consumer can be immediately called
consumer.accept(value);
} else {
// The element is manager in a different thread, cache it
List<T> threadCacheList = threadCacheMap.computeIfAbsent(elementThread, tickThread -> new ArrayList<>());
threadCacheList.add(value);
}
}
return threadCacheMap;
}
} }

View File

@ -3,11 +3,13 @@ package net.minestom.server.acquirable;
import net.minestom.server.entity.Entity; import net.minestom.server.entity.Entity;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream; import java.util.stream.Stream;
class AcquirableImpl<T> implements Acquirable<T> { class AcquirableImpl<T> implements Acquirable<T> {
protected static final ThreadLocal<Stream<Entity>> CURRENT_ENTITIES = ThreadLocal.withInitial(Stream::empty); protected static final ThreadLocal<Stream<Entity>> CURRENT_ENTITIES = ThreadLocal.withInitial(Stream::empty);
protected static final AtomicLong WAIT_COUNTER_NANO = new AtomicLong();
private final T value; private final T value;
private final Acquirable.Handler handler; private final Acquirable.Handler handler;

View File

@ -1,5 +1,6 @@
package net.minestom.server.acquirable; package net.minestom.server.acquirable;
import net.minestom.server.thread.TickThread;
import net.minestom.server.utils.validate.Check; import net.minestom.server.utils.validate.Check;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
@ -8,6 +9,11 @@ import java.util.concurrent.locks.ReentrantLock;
public class Acquired<T> { public class Acquired<T> {
/**
* Global lock used for synchronization.
*/
private static final ReentrantLock GLOBAL_LOCK = new ReentrantLock();
private final T value; private final T value;
private final boolean locked; private final boolean locked;
@ -32,7 +38,45 @@ public class Acquired<T> {
this.unlocked = true; this.unlocked = true;
if (!locked) if (!locked)
return; return;
Acquisition.acquireLeave(lock); acquireLeave(lock);
}
protected static @Nullable ReentrantLock acquireEnter(@Nullable Thread currentThread, @Nullable TickThread elementThread) {
// Monitoring
long time = System.nanoTime();
ReentrantLock currentLock;
{
final TickThread current = currentThread instanceof TickThread ?
(TickThread) currentThread : null;
currentLock = current != null && current.getLock().isHeldByCurrentThread() ?
current.getLock() : null;
}
if (currentLock != null)
currentLock.unlock();
GLOBAL_LOCK.lock();
if (currentLock != null)
currentLock.lock();
final var lock = elementThread != null ? elementThread.getLock() : null;
final boolean acquired = lock == null || lock.isHeldByCurrentThread();
if (!acquired) {
lock.lock();
}
// Monitoring
AcquirableImpl.WAIT_COUNTER_NANO.addAndGet(System.nanoTime() - time);
return !acquired ? lock : null;
}
protected static void acquireLeave(@Nullable ReentrantLock lock) {
if (lock != null) {
lock.unlock();
}
GLOBAL_LOCK.unlock();
} }
private void checkLock() { private void checkLock() {

View File

@ -1,140 +0,0 @@
package net.minestom.server.acquirable;
import net.minestom.server.thread.TickThread;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
public final class Acquisition {
/**
* Global lock used for synchronization.
*/
private static final ReentrantLock GLOBAL_LOCK = new ReentrantLock();
private static final AtomicLong WAIT_COUNTER_NANO = new AtomicLong();
/**
* Acquires a {@link Collection}.
* <p>
* Order is not guaranteed.
*
* @param collection the collection to acquire
* @param consumer the consumer called for each of the collection element
*/
protected static <T> void acquireForEach(@NotNull Collection<Acquirable<T>> collection,
@NotNull Consumer<T> consumer) {
final Thread currentThread = Thread.currentThread();
var threadEntitiesMap = retrieveOptionalThreadMap(collection, currentThread, consumer);
// Acquire all the threads one by one
{
for (var entry : threadEntitiesMap.entrySet()) {
final TickThread tickThread = entry.getKey();
final List<T> values = entry.getValue();
acquire(currentThread, tickThread, () -> {
for (T value : values) {
consumer.accept(value);
}
});
}
}
}
/**
* Ensures that {@code callback} is safely executed inside the batch thread.
*/
protected static void acquire(@NotNull Thread currentThread, @Nullable TickThread elementThread,
@NotNull Runnable callback) {
if (Objects.equals(currentThread, elementThread)) {
callback.run();
} else {
var lock = acquireEnter(currentThread, elementThread);
callback.run();
acquireLeave(lock);
}
}
protected static ReentrantLock acquireEnter(Thread currentThread, TickThread elementThread) {
// Monitoring
long time = System.nanoTime();
ReentrantLock currentLock;
{
final TickThread current = currentThread instanceof TickThread ?
(TickThread) currentThread : null;
currentLock = current != null && current.getLock().isHeldByCurrentThread() ?
current.getLock() : null;
}
if (currentLock != null)
currentLock.unlock();
GLOBAL_LOCK.lock();
if (currentLock != null)
currentLock.lock();
final var lock = elementThread != null ? elementThread.getLock() : null;
final boolean acquired = lock == null || lock.isHeldByCurrentThread();
if (!acquired) {
lock.lock();
}
// Monitoring
WAIT_COUNTER_NANO.addAndGet(System.nanoTime() - time);
return !acquired ? lock : null;
}
protected static void acquireLeave(ReentrantLock lock) {
if (lock != null) {
lock.unlock();
}
GLOBAL_LOCK.unlock();
}
/**
* Creates
*
* @param collection the acquirable collection
* @param currentThread the current thread
* @param consumer the consumer to execute when an element is already in the current thread
* @return a new Thread to acquirable elements map
*/
protected static <T> Map<TickThread, List<T>> retrieveOptionalThreadMap(@NotNull Collection<Acquirable<T>> collection,
@NotNull Thread currentThread,
@NotNull Consumer<T> consumer) {
// Separate a collection of acquirable elements into a map of thread->elements
// Useful to reduce the number of acquisition
Map<TickThread, List<T>> threadCacheMap = new HashMap<>();
for (var element : collection) {
final T value = element.unwrap();
final TickThread elementThread = element.getHandler().getTickThread();
if (currentThread == elementThread) {
// The element is managed in the current thread, consumer can be immediately called
consumer.accept(value);
} else {
// The element is manager in a different thread, cache it
List<T> threadCacheList = threadCacheMap.computeIfAbsent(elementThread, tickThread -> new ArrayList<>());
threadCacheList.add(value);
}
}
return threadCacheMap;
}
public static long getCurrentWaitMonitoring() {
return WAIT_COUNTER_NANO.get();
}
public static void resetWaitMonitoring() {
WAIT_COUNTER_NANO.set(0);
}
}