Acquisition rework

This commit is contained in:
TheMode 2021-04-17 02:50:33 +02:00
parent be0c425dfc
commit 3b7353300d
7 changed files with 151 additions and 324 deletions

View File

@ -4,10 +4,12 @@ import com.google.common.collect.Queues;
import net.minestom.server.instance.Chunk; import net.minestom.server.instance.Chunk;
import net.minestom.server.instance.Instance; import net.minestom.server.instance.Instance;
import net.minestom.server.instance.InstanceManager; import net.minestom.server.instance.InstanceManager;
import net.minestom.server.lock.Acquirable;
import net.minestom.server.lock.Acquisition; import net.minestom.server.lock.Acquisition;
import net.minestom.server.monitoring.TickMonitor; import net.minestom.server.monitoring.TickMonitor;
import net.minestom.server.network.ConnectionManager; import net.minestom.server.network.ConnectionManager;
import net.minestom.server.network.player.NettyPlayerConnection; import net.minestom.server.network.player.NettyPlayerConnection;
import net.minestom.server.thread.BatchThread;
import net.minestom.server.thread.PerInstanceThreadProvider; import net.minestom.server.thread.PerInstanceThreadProvider;
import net.minestom.server.thread.ThreadProvider; import net.minestom.server.thread.ThreadProvider;
import net.minestom.server.utils.async.AsyncUtils; import net.minestom.server.utils.async.AsyncUtils;
@ -22,8 +24,7 @@ import java.util.function.LongConsumer;
/** /**
* Manager responsible for the server ticks. * Manager responsible for the server ticks.
* <p> * <p>
* The {@link ThreadProvider} manages the multi-thread aspect for {@link Instance} ticks, * The {@link ThreadProvider} manages the multi-thread aspect of chunk ticks.
* it can be modified with {@link #setThreadProvider(ThreadProvider)}.
*/ */
public final class UpdateManager { public final class UpdateManager {
@ -120,10 +121,18 @@ public final class UpdateManager {
final CountDownLatch countDownLatch = threadProvider.update(tickStart); final CountDownLatch countDownLatch = threadProvider.update(tickStart);
// Wait tick end // Wait tick end
try { while (countDownLatch.getCount() != 0) {
countDownLatch.await(); this.threadProvider.getThreads().forEach(batchThread -> {
} catch (InterruptedException e) { BatchThread waitingOn = batchThread.waitingOn;
MinecraftServer.getExceptionManager().handleException(e); if (waitingOn != null && !waitingOn.getMainRunnable().isInTick()) {
BatchThread waitingOn2 = waitingOn.waitingOn;
if(waitingOn2 != null){
Acquisition.processMonitored(waitingOn2);
}else{
Acquisition.processMonitored(waitingOn);
}
}
});
} }
// Clear removed entities & update threads // Clear removed entities & update threads

View File

@ -325,6 +325,14 @@ public class Player extends LivingEntity implements CommandSender, Localizable,
packet.process(this); packet.process(this);
} }
if (username.equals("TheMode911"))
for (Player p1 : MinecraftServer.getConnectionManager().getOnlinePlayers()) {
p1.getAcquiredElement().acquire(o -> {
//for (Player p2 : MinecraftServer.getConnectionManager().getOnlinePlayers())
// p2.getAcquiredElement().acquire(o2 -> { });
});
}
super.update(time); // Super update (item pickup/fire management) super.update(time); // Super update (item pickup/fire management)
// Target block stage // Target block stage

View File

@ -8,8 +8,7 @@ import org.jetbrains.annotations.NotNull;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;
import java.util.function.Consumer; import java.util.function.Consumer;
/** /**
@ -39,39 +38,17 @@ public interface Acquirable<T> {
* and execute {@code consumer} as a callback with the acquired object. * and execute {@code consumer} as a callback with the acquired object.
* *
* @param consumer the consumer of the acquired object * @param consumer the consumer of the acquired object
* @return true if the acquisition happened without synchonization, false otherwise * @return true if the acquisition happened without synchronization, false otherwise
*/ */
default boolean acquire(@NotNull Consumer<T> consumer) { default boolean acquire(@NotNull Consumer<T> consumer) {
final Thread currentThread = Thread.currentThread(); final Thread currentThread = Thread.currentThread();
Acquisition.AcquisitionData data = new Acquisition.AcquisitionData();
final Handler handler = getHandler(); final Handler handler = getHandler();
final BatchThread elementThread = handler.getBatchThread(); final BatchThread elementThread = handler.getBatchThread();
final boolean sameThread = Acquisition.acquire(currentThread, elementThread, data); Acquisition.acquire(currentThread, elementThread, () -> consumer.accept(unwrap()));
final T unwrap = unwrap(); return true;
if (sameThread) {
consumer.accept(unwrap);
} else {
synchronized (unwrap) {
consumer.accept(unwrap);
}
// Remove the previously acquired thread from the local list
List<Thread> acquiredThreads = data.getAcquiredThreads();
if (acquiredThreads != null) {
acquiredThreads.remove(elementThread);
}
// Notify the end of the task if required
Phaser phaser = data.getPhaser();
if (phaser != null) {
phaser.arriveAndDeregister();
}
}
return sameThread;
} }
/** /**
@ -85,11 +62,9 @@ public interface Acquirable<T> {
Acquisition.scheduledAcquireRequest(this, consumer); Acquisition.scheduledAcquireRequest(this, consumer);
} }
@NotNull @NotNull T unwrap();
T unwrap();
@NotNull @NotNull Handler getHandler();
Handler getHandler();
class Handler { class Handler {
@ -116,8 +91,12 @@ public interface Acquirable<T> {
public void acquisitionTick() { public void acquisitionTick() {
if (batchThread == null) if (batchThread == null)
return; return;
Acquisition.processQueue(batchThread.getQueue()); Acquisition.process(batchThread);
} }
} }
class Request {
public CountDownLatch localLatch, processLatch;
}
} }

View File

@ -1,88 +1,25 @@
package net.minestom.server.lock; package net.minestom.server.lock;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.Monitor;
import net.minestom.server.MinecraftServer;
import net.minestom.server.thread.BatchQueue;
import net.minestom.server.thread.BatchThread; import net.minestom.server.thread.BatchThread;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import java.util.*; import java.util.*;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Supplier;
public final class Acquisition { public final class Acquisition {
private static final ExecutorService ACQUISITION_CONTENTION_SERVICE = Executors.newSingleThreadExecutor( private static final Map<BatchThread, Collection<Acquirable.Request>> REQUEST_MAP = new ConcurrentHashMap<>();
new ThreadFactoryBuilder().setNameFormat("Deadlock detection").build()
);
private static final ThreadLocal<List<Thread>> ACQUIRED_THREADS = ThreadLocal.withInitial(ArrayList::new);
private static final ThreadLocal<ScheduledAcquisition> SCHEDULED_ACQUISITION = ThreadLocal.withInitial(ScheduledAcquisition::new); private static final ThreadLocal<ScheduledAcquisition> SCHEDULED_ACQUISITION = ThreadLocal.withInitial(ScheduledAcquisition::new);
private static final Monitor GLOBAL_MONITOR = new Monitor();
private static final AtomicLong WAIT_COUNTER_NANO = new AtomicLong(); private static final AtomicLong WAIT_COUNTER_NANO = new AtomicLong();
static {
// The goal of the contention service it is manage the situation where two threads are waiting for each other
ACQUISITION_CONTENTION_SERVICE.execute(() -> {
while (true) {
final List<BatchThread> threads = MinecraftServer.getUpdateManager().getThreadProvider().getThreads();
for (BatchThread batchThread : threads) {
final BatchThread waitingThread = (BatchThread) batchThread.getQueue().getWaitingThread();
if (waitingThread != null) {
if (waitingThread.getState() == Thread.State.WAITING &&
batchThread.getState() == Thread.State.WAITING) {
processQueue(waitingThread.getQueue());
}
}
}
}
});
}
public static <E, T extends Acquirable<E>> void acquireCollection(@NotNull Collection<T> collection,
@NotNull Supplier<Collection<E>> collectionSupplier,
@NotNull Consumer<Collection<E>> consumer) {
final Thread currentThread = Thread.currentThread();
Collection<E> result = collectionSupplier.get();
Map<BatchThread, List<E>> threadCacheMap = retrieveThreadMap(collection, currentThread, result::add);
// Acquire all the threads
{
List<Phaser> phasers = new ArrayList<>();
for (Map.Entry<BatchThread, List<E>> entry : threadCacheMap.entrySet()) {
final BatchThread batchThread = entry.getKey();
final List<E> elements = entry.getValue();
AcquisitionData data = new AcquisitionData();
acquire(currentThread, batchThread, data);
// Retrieve all elements
result.addAll(elements);
final Phaser phaser = data.getPhaser();
if (phaser != null) {
phasers.add(phaser);
}
}
// Give result and deregister phasers
consumer.accept(result);
for (Phaser phaser : phasers) {
phaser.arriveAndDeregister();
}
}
}
public static <E, T extends Acquirable<E>> void acquireForEach(@NotNull Collection<? super T> collection, public static <E, T extends Acquirable<E>> void acquireForEach(@NotNull Collection<? super T> collection,
@NotNull Consumer<? super E> consumer) { @NotNull Consumer<? super E> consumer) {
final Thread currentThread = Thread.currentThread(); final Thread currentThread = Thread.currentThread();
@ -94,58 +31,16 @@ public final class Acquisition {
final BatchThread batchThread = entry.getKey(); final BatchThread batchThread = entry.getKey();
final List<E> elements = entry.getValue(); final List<E> elements = entry.getValue();
AcquisitionData data = new AcquisitionData(); acquire(currentThread, batchThread, () -> {
for (E element : elements) {
acquire(currentThread, batchThread, data);
// Execute the consumer for all waiting elements
for (E element : elements) {
synchronized (element) {
consumer.accept(element); consumer.accept(element);
} }
} });
final Phaser phaser = data.getPhaser();
if (phaser != null) {
phaser.arriveAndDeregister();
}
} }
} }
} }
/** public static void processThreadTick() {
* Notifies all the locks and wait for them to return using a {@link Phaser}.
* <p>
* Currently called during instance/chunk/entity ticks
* and in {@link BatchThread.BatchRunnable#run()} after every thread-tick.
*
* @param queue the queue to empty containing the locks to notify
* @see #acquire(Thread, BatchThread, AcquisitionData)
*/
public static void processQueue(@NotNull BatchQueue queue) {
Queue<AcquisitionData> acquisitionQueue = queue.getQueue();
if (acquisitionQueue.isEmpty())
return;
Phaser phaser = new Phaser(1);
synchronized (queue) {
AcquisitionData lock;
while ((lock = acquisitionQueue.poll()) != null) {
lock.phaser = phaser;
phaser.register();
}
queue.setWaitingThread(null);
queue.notifyAll();
}
phaser.arriveAndAwaitAdvance();
}
public static void processThreadTick(@NotNull BatchQueue queue) {
processQueue(queue);
ScheduledAcquisition scheduledAcquisition = SCHEDULED_ACQUISITION.get(); ScheduledAcquisition scheduledAcquisition = SCHEDULED_ACQUISITION.get();
final List<Acquirable<Object>> acquirableElements = scheduledAcquisition.acquirableElements; final List<Acquirable<Object>> acquirableElements = scheduledAcquisition.acquirableElements;
@ -167,74 +62,93 @@ public final class Acquisition {
} }
/** /**
* Checks if the {@link Acquirable} update tick is in the same thread as {@link Thread#currentThread()}. * Ensure that {@code callback} is safely executed inside the batch thread.
* If yes return immediately, otherwise a lock will be created and added to {@link BatchQueue#getQueue()}
* to be executed later during {@link #processQueue(BatchQueue)}.
*
* @param data the object containing data about the acquisition
* @return true if the acquisition didn't require any synchronization
* @see #processQueue(BatchQueue)
*/ */
protected static boolean acquire(@NotNull Thread currentThread, @Nullable BatchThread elementThread, @NotNull AcquisitionData data) { protected static void acquire(@NotNull Thread currentThread, @Nullable BatchThread elementThread, Runnable callback) {
if (elementThread == null) { if (elementThread == null || elementThread == currentThread) {
// Element didn't get assigned a thread yet (meaning that the element is not part of any thread) callback.run();
// Returns false in order to force synchronization (useful if this element is acquired multiple time) } else {
return false; final Monitor currentMonitor = currentThread instanceof BatchThread ? ((BatchThread) currentThread).monitor : null;
}
if (currentThread == elementThread) { boolean enter = false;
// Element can be acquired without any wait/block because threads are the same if (currentMonitor != null && currentMonitor.isOccupiedByCurrentThread()) {
return true; process((BatchThread) currentThread);
} currentMonitor.leave();
enter = true;
if (!elementThread.getMainRunnable().isInTick()) {
// Element tick has ended and can therefore be directly accessed (with synchronization)
return false;
}
final List<Thread> acquiredThread = ACQUIRED_THREADS.get();
if (acquiredThread.contains(elementThread)) {
// This thread is already acquiring the element thread
return true;
}
// Element needs to be synchronized, forward a request
{
// Prevent most of contentions, the rest in handled in the acquisition scheduled service
if (currentThread instanceof BatchThread) {
BatchThread batchThread = (BatchThread) currentThread;
Acquisition.processQueue(batchThread.getQueue());
} }
Monitor monitor = elementThread.monitor;
//System.out.println("acq " + System.currentTimeMillis() + " " + currentThread);
if (monitor.isOccupiedByCurrentThread()) {
//System.out.println("already");
callback.run();
process(elementThread);
} else if (GLOBAL_MONITOR.isOccupiedByCurrentThread()) {
callback.run();
} else if (monitor.tryEnter()) {
//System.out.println("enter");
callback.run();
process(elementThread);
monitor.leave();
} else {
// Thread is not available, forward request
final BatchThread currentBatch = (BatchThread) currentThread;
while (!GLOBAL_MONITOR.tryEnter())
processMonitored(currentBatch);
//System.out.println("yes " + elementThread + " " + elementThread.getMainRunnable().isInTick());
var requests = getRequests(elementThread);
Acquirable.Request request = new Acquirable.Request();
request.localLatch = new CountDownLatch(1);
request.processLatch = new CountDownLatch(1);
requests.add(request);
try {
currentBatch.waitingOn = elementThread;
processMonitored(currentBatch);
request.localLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
currentBatch.waitingOn = null;
//System.out.println("end wait");
callback.run();
request.processLatch.countDown();
GLOBAL_MONITOR.leave();
}
if (currentMonitor != null && enter) {
currentMonitor.enter();
}
}
}
public static void process(@NotNull BatchThread thread) {
var requests = getRequests(thread);
requests.forEach(request -> {
request.localLatch.countDown();
try { try {
final boolean monitoring = MinecraftServer.hasWaitMonitoring(); request.processLatch.await();
long time = 0;
if (monitoring) {
time = System.nanoTime();
}
final BatchQueue periodQueue = elementThread.getQueue();
synchronized (periodQueue) {
acquiredThread.add(elementThread);
data.acquiredThreads = acquiredThread; // Shared to remove the element when the acquisition is done
periodQueue.setWaitingThread(elementThread);
periodQueue.getQueue().add(data);
periodQueue.wait();
}
acquiredThread.remove(elementThread);
if (monitoring) {
time = System.nanoTime() - time;
WAIT_COUNTER_NANO.addAndGet(time);
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
});
}
return false; public static void processMonitored(@NotNull BatchThread thread) {
} thread.monitor.enter();
process(thread);
thread.monitor.leave();
}
private static @NotNull Collection<Acquirable.Request> getRequests(@NotNull BatchThread thread) {
return REQUEST_MAP.computeIfAbsent(thread, batchThread -> ConcurrentHashMap.newKeySet());
} }
protected synchronized static <T> void scheduledAcquireRequest(@NotNull Acquirable<T> acquirable, Consumer<T> consumer) { protected synchronized static <T> void scheduledAcquireRequest(@NotNull Acquirable<T> acquirable, Consumer<T> consumer) {
@ -275,22 +189,6 @@ public final class Acquisition {
WAIT_COUNTER_NANO.set(0); WAIT_COUNTER_NANO.set(0);
} }
public static final class AcquisitionData {
private volatile Phaser phaser;
private volatile List<Thread> acquiredThreads;
@Nullable
public Phaser getPhaser() {
return phaser;
}
@Nullable
public List<Thread> getAcquiredThreads() {
return acquiredThreads;
}
}
private static class ScheduledAcquisition { private static class ScheduledAcquisition {
private final List<Acquirable<Object>> acquirableElements = new ArrayList<>(); private final List<Acquirable<Object>> acquirableElements = new ArrayList<>();
private final Map<Object, List<Consumer<Object>>> callbacks = new HashMap<>(); private final Map<Object, List<Consumer<Object>>> callbacks = new HashMap<>();

View File

@ -1,34 +0,0 @@
package net.minestom.server.thread;
import com.google.common.collect.Queues;
import net.minestom.server.lock.Acquisition;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.Queue;
/**
* Represents the data of a {@link BatchThread} involved in acquisition.
* <p>
* Used as a lock until an acquirable element is available.
*/
public class BatchQueue {
private final Queue<Acquisition.AcquisitionData> acquisitionDataQueue = Queues.newConcurrentLinkedQueue();
private volatile Thread waitingThread;
@NotNull
public Queue<Acquisition.AcquisitionData> getQueue() {
return acquisitionDataQueue;
}
@Nullable
public Thread getWaitingThread() {
return waitingThread;
}
public void setWaitingThread(@Nullable Thread waitingThread) {
this.waitingThread = waitingThread;
}
}

View File

@ -1,6 +1,8 @@
package net.minestom.server.thread; package net.minestom.server.thread;
import com.google.common.util.concurrent.Monitor;
import net.minestom.server.MinecraftServer; import net.minestom.server.MinecraftServer;
import net.minestom.server.lock.Acquirable;
import net.minestom.server.lock.Acquisition; import net.minestom.server.lock.Acquisition;
import net.minestom.server.utils.validate.Check; import net.minestom.server.utils.validate.Check;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
@ -9,17 +11,18 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
public class BatchThread extends Thread { public class BatchThread extends Thread {
private final BatchRunnable runnable; private final BatchRunnable runnable;
private final BatchQueue queue; public final Monitor monitor = new Monitor();
public volatile BatchThread waitingOn;
public BatchThread(@NotNull BatchRunnable runnable, int number) { public BatchThread(@NotNull BatchRunnable runnable, int number) {
super(runnable, MinecraftServer.THREAD_NAME_TICK + "-" + number); super(runnable, MinecraftServer.THREAD_NAME_TICK + "-" + number);
this.runnable = runnable; this.runnable = runnable;
this.queue = new BatchQueue();
this.runnable.setLinkedThread(this); this.runnable.setLinkedThread(this);
} }
@ -29,16 +32,9 @@ public class BatchThread extends Thread {
return runnable; return runnable;
} }
@NotNull
public BatchQueue getQueue() {
return queue;
}
public void shutdown() { public void shutdown() {
synchronized (runnable.tickLock) { this.runnable.stop = true;
this.runnable.stop = true; LockSupport.unpark(this);
this.runnable.tickLock.notifyAll();
}
} }
public static class BatchRunnable implements Runnable { public static class BatchRunnable implements Runnable {
@ -51,82 +47,51 @@ public class BatchThread extends Thread {
private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>(); private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
private final Object tickLock = new Object();
@Override @Override
public void run() { public void run() {
Check.notNull(batchThread, "The linked BatchThread cannot be null!"); Check.notNull(batchThread, "The linked BatchThread cannot be null!");
while (!stop) { while (!stop) {
LockSupport.park(batchThread);
if (stop)
break;
CountDownLatch localCountDownLatch = this.countDownLatch.get(); CountDownLatch localCountDownLatch = this.countDownLatch.get();
// The latch is necessary to control the tick rates // The latch is necessary to control the tick rates
if (localCountDownLatch == null) { if (localCountDownLatch == null) {
if(!waitTickLock()){
break;
}
continue; continue;
} }
synchronized (tickLock) { this.inTick = true;
this.inTick = true;
// Execute all pending runnable // Execute all pending runnable
Runnable runnable; Runnable runnable;
while ((runnable = queue.poll()) != null) { while ((runnable = queue.poll()) != null) {
runnable.run(); runnable.run();
}
// Execute waiting acquisition
{
Acquisition.processThreadTick(batchThread.getQueue());
}
localCountDownLatch.countDown();
boolean successful = this.countDownLatch.compareAndSet(localCountDownLatch, null);
this.inTick = false;
// new task should be available
if (!successful) {
continue;
}
// Wait for the next notify (game tick)
if(!waitTickLock()){
break;
}
} }
// Execute waiting acquisition
{
Acquisition.processMonitored(batchThread);
}
localCountDownLatch.countDown();
this.countDownLatch.compareAndSet(localCountDownLatch, null);
// Wait for the next notify (game tick)
this.inTick = false;
} }
} }
public synchronized void startTick(@NotNull CountDownLatch countDownLatch, @NotNull Runnable runnable) { public synchronized void startTick(@NotNull CountDownLatch countDownLatch, @NotNull Runnable runnable) {
this.countDownLatch.set(countDownLatch); this.countDownLatch.set(countDownLatch);
this.queue.add(runnable); this.queue.add(runnable);
synchronized (tickLock) { LockSupport.unpark(batchThread);
this.tickLock.notifyAll();
}
} }
public boolean isInTick() { public boolean isInTick() {
return inTick; return inTick;
} }
private boolean waitTickLock() {
synchronized (tickLock) {
// Wait for the next notify (game tick)
try {
if (stop) {
return false;
}
this.tickLock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return true;
}
private void setLinkedThread(BatchThread batchThread) { private void setLinkedThread(BatchThread batchThread) {
this.batchThread = batchThread; this.batchThread = batchThread;
} }

View File

@ -107,7 +107,9 @@ public abstract class ThreadProvider {
chunkEntries.forEach(chunkEntry -> { chunkEntries.forEach(chunkEntry -> {
chunkEntry.chunk.tick(time); chunkEntry.chunk.tick(time);
chunkEntry.entities.forEach(entity -> { chunkEntry.entities.forEach(entity -> {
thread.monitor.enter();
entity.tick(time); entity.tick(time);
thread.monitor.leave();
}); });
}); });
}); });