From 3b7353300d2afffbf5917e88fd6288bc0b086e5e Mon Sep 17 00:00:00 2001 From: TheMode Date: Sat, 17 Apr 2021 02:50:33 +0200 Subject: [PATCH] Acquisition rework --- .../net/minestom/server/UpdateManager.java | 21 +- .../net/minestom/server/entity/Player.java | 8 + .../net/minestom/server/lock/Acquirable.java | 43 +-- .../net/minestom/server/lock/Acquisition.java | 278 ++++++------------ .../minestom/server/thread/BatchQueue.java | 34 --- .../minestom/server/thread/BatchThread.java | 89 ++---- .../server/thread/ThreadProvider.java | 2 + 7 files changed, 151 insertions(+), 324 deletions(-) delete mode 100644 src/main/java/net/minestom/server/thread/BatchQueue.java diff --git a/src/main/java/net/minestom/server/UpdateManager.java b/src/main/java/net/minestom/server/UpdateManager.java index e67ea74f9..9cb9723b8 100644 --- a/src/main/java/net/minestom/server/UpdateManager.java +++ b/src/main/java/net/minestom/server/UpdateManager.java @@ -4,10 +4,12 @@ import com.google.common.collect.Queues; import net.minestom.server.instance.Chunk; import net.minestom.server.instance.Instance; import net.minestom.server.instance.InstanceManager; +import net.minestom.server.lock.Acquirable; import net.minestom.server.lock.Acquisition; import net.minestom.server.monitoring.TickMonitor; import net.minestom.server.network.ConnectionManager; import net.minestom.server.network.player.NettyPlayerConnection; +import net.minestom.server.thread.BatchThread; import net.minestom.server.thread.PerInstanceThreadProvider; import net.minestom.server.thread.ThreadProvider; import net.minestom.server.utils.async.AsyncUtils; @@ -22,8 +24,7 @@ import java.util.function.LongConsumer; /** * Manager responsible for the server ticks. *

- * The {@link ThreadProvider} manages the multi-thread aspect for {@link Instance} ticks, - * it can be modified with {@link #setThreadProvider(ThreadProvider)}. + * The {@link ThreadProvider} manages the multi-thread aspect of chunk ticks. */ public final class UpdateManager { @@ -120,10 +121,18 @@ public final class UpdateManager { final CountDownLatch countDownLatch = threadProvider.update(tickStart); // Wait tick end - try { - countDownLatch.await(); - } catch (InterruptedException e) { - MinecraftServer.getExceptionManager().handleException(e); + while (countDownLatch.getCount() != 0) { + this.threadProvider.getThreads().forEach(batchThread -> { + BatchThread waitingOn = batchThread.waitingOn; + if (waitingOn != null && !waitingOn.getMainRunnable().isInTick()) { + BatchThread waitingOn2 = waitingOn.waitingOn; + if(waitingOn2 != null){ + Acquisition.processMonitored(waitingOn2); + }else{ + Acquisition.processMonitored(waitingOn); + } + } + }); } // Clear removed entities & update threads diff --git a/src/main/java/net/minestom/server/entity/Player.java b/src/main/java/net/minestom/server/entity/Player.java index 637a41bbb..478331bb3 100644 --- a/src/main/java/net/minestom/server/entity/Player.java +++ b/src/main/java/net/minestom/server/entity/Player.java @@ -325,6 +325,14 @@ public class Player extends LivingEntity implements CommandSender, Localizable, packet.process(this); } + if (username.equals("TheMode911")) + for (Player p1 : MinecraftServer.getConnectionManager().getOnlinePlayers()) { + p1.getAcquiredElement().acquire(o -> { + //for (Player p2 : MinecraftServer.getConnectionManager().getOnlinePlayers()) + // p2.getAcquiredElement().acquire(o2 -> { }); + }); + } + super.update(time); // Super update (item pickup/fire management) // Target block stage diff --git a/src/main/java/net/minestom/server/lock/Acquirable.java b/src/main/java/net/minestom/server/lock/Acquirable.java index e2dcca1de..9d3d39747 100644 --- a/src/main/java/net/minestom/server/lock/Acquirable.java +++ b/src/main/java/net/minestom/server/lock/Acquirable.java @@ -8,8 +8,7 @@ import org.jetbrains.annotations.NotNull; import java.util.Collection; import java.util.Collections; -import java.util.List; -import java.util.concurrent.Phaser; +import java.util.concurrent.CountDownLatch; import java.util.function.Consumer; /** @@ -39,39 +38,17 @@ public interface Acquirable { * and execute {@code consumer} as a callback with the acquired object. * * @param consumer the consumer of the acquired object - * @return true if the acquisition happened without synchonization, false otherwise + * @return true if the acquisition happened without synchronization, false otherwise */ default boolean acquire(@NotNull Consumer consumer) { final Thread currentThread = Thread.currentThread(); - Acquisition.AcquisitionData data = new Acquisition.AcquisitionData(); final Handler handler = getHandler(); final BatchThread elementThread = handler.getBatchThread(); - final boolean sameThread = Acquisition.acquire(currentThread, elementThread, data); + Acquisition.acquire(currentThread, elementThread, () -> consumer.accept(unwrap())); - final T unwrap = unwrap(); - if (sameThread) { - consumer.accept(unwrap); - } else { - synchronized (unwrap) { - consumer.accept(unwrap); - } - - // Remove the previously acquired thread from the local list - List acquiredThreads = data.getAcquiredThreads(); - if (acquiredThreads != null) { - acquiredThreads.remove(elementThread); - } - - // Notify the end of the task if required - Phaser phaser = data.getPhaser(); - if (phaser != null) { - phaser.arriveAndDeregister(); - } - } - - return sameThread; + return true; } /** @@ -85,11 +62,9 @@ public interface Acquirable { Acquisition.scheduledAcquireRequest(this, consumer); } - @NotNull - T unwrap(); + @NotNull T unwrap(); - @NotNull - Handler getHandler(); + @NotNull Handler getHandler(); class Handler { @@ -116,8 +91,12 @@ public interface Acquirable { public void acquisitionTick() { if (batchThread == null) return; - Acquisition.processQueue(batchThread.getQueue()); + Acquisition.process(batchThread); } } + class Request { + public CountDownLatch localLatch, processLatch; + } + } \ No newline at end of file diff --git a/src/main/java/net/minestom/server/lock/Acquisition.java b/src/main/java/net/minestom/server/lock/Acquisition.java index 495bd5009..a519c05d8 100644 --- a/src/main/java/net/minestom/server/lock/Acquisition.java +++ b/src/main/java/net/minestom/server/lock/Acquisition.java @@ -1,88 +1,25 @@ package net.minestom.server.lock; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import net.minestom.server.MinecraftServer; -import net.minestom.server.thread.BatchQueue; +import com.google.common.util.concurrent.Monitor; import net.minestom.server.thread.BatchThread; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Phaser; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; -import java.util.function.Supplier; public final class Acquisition { - private static final ExecutorService ACQUISITION_CONTENTION_SERVICE = Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setNameFormat("Deadlock detection").build() - ); - private static final ThreadLocal> ACQUIRED_THREADS = ThreadLocal.withInitial(ArrayList::new); - + private static final Map> REQUEST_MAP = new ConcurrentHashMap<>(); private static final ThreadLocal SCHEDULED_ACQUISITION = ThreadLocal.withInitial(ScheduledAcquisition::new); + private static final Monitor GLOBAL_MONITOR = new Monitor(); + private static final AtomicLong WAIT_COUNTER_NANO = new AtomicLong(); - static { - // The goal of the contention service it is manage the situation where two threads are waiting for each other - ACQUISITION_CONTENTION_SERVICE.execute(() -> { - while (true) { - final List threads = MinecraftServer.getUpdateManager().getThreadProvider().getThreads(); - - for (BatchThread batchThread : threads) { - final BatchThread waitingThread = (BatchThread) batchThread.getQueue().getWaitingThread(); - if (waitingThread != null) { - if (waitingThread.getState() == Thread.State.WAITING && - batchThread.getState() == Thread.State.WAITING) { - processQueue(waitingThread.getQueue()); - } - } - } - } - }); - } - - public static > void acquireCollection(@NotNull Collection collection, - @NotNull Supplier> collectionSupplier, - @NotNull Consumer> consumer) { - final Thread currentThread = Thread.currentThread(); - Collection result = collectionSupplier.get(); - - Map> threadCacheMap = retrieveThreadMap(collection, currentThread, result::add); - - // Acquire all the threads - { - List phasers = new ArrayList<>(); - - for (Map.Entry> entry : threadCacheMap.entrySet()) { - final BatchThread batchThread = entry.getKey(); - final List elements = entry.getValue(); - - AcquisitionData data = new AcquisitionData(); - - acquire(currentThread, batchThread, data); - - // Retrieve all elements - result.addAll(elements); - - final Phaser phaser = data.getPhaser(); - if (phaser != null) { - phasers.add(phaser); - } - } - - // Give result and deregister phasers - consumer.accept(result); - for (Phaser phaser : phasers) { - phaser.arriveAndDeregister(); - } - - } - } - public static > void acquireForEach(@NotNull Collection collection, @NotNull Consumer consumer) { final Thread currentThread = Thread.currentThread(); @@ -94,58 +31,16 @@ public final class Acquisition { final BatchThread batchThread = entry.getKey(); final List elements = entry.getValue(); - AcquisitionData data = new AcquisitionData(); - - acquire(currentThread, batchThread, data); - - // Execute the consumer for all waiting elements - for (E element : elements) { - synchronized (element) { + acquire(currentThread, batchThread, () -> { + for (E element : elements) { consumer.accept(element); } - } - - final Phaser phaser = data.getPhaser(); - if (phaser != null) { - phaser.arriveAndDeregister(); - } + }); } } } - /** - * Notifies all the locks and wait for them to return using a {@link Phaser}. - *

- * Currently called during instance/chunk/entity ticks - * and in {@link BatchThread.BatchRunnable#run()} after every thread-tick. - * - * @param queue the queue to empty containing the locks to notify - * @see #acquire(Thread, BatchThread, AcquisitionData) - */ - public static void processQueue(@NotNull BatchQueue queue) { - Queue acquisitionQueue = queue.getQueue(); - - if (acquisitionQueue.isEmpty()) - return; - - Phaser phaser = new Phaser(1); - synchronized (queue) { - AcquisitionData lock; - while ((lock = acquisitionQueue.poll()) != null) { - lock.phaser = phaser; - phaser.register(); - } - - queue.setWaitingThread(null); - queue.notifyAll(); - } - - phaser.arriveAndAwaitAdvance(); - } - - public static void processThreadTick(@NotNull BatchQueue queue) { - processQueue(queue); - + public static void processThreadTick() { ScheduledAcquisition scheduledAcquisition = SCHEDULED_ACQUISITION.get(); final List> acquirableElements = scheduledAcquisition.acquirableElements; @@ -167,74 +62,93 @@ public final class Acquisition { } /** - * Checks if the {@link Acquirable} update tick is in the same thread as {@link Thread#currentThread()}. - * If yes return immediately, otherwise a lock will be created and added to {@link BatchQueue#getQueue()} - * to be executed later during {@link #processQueue(BatchQueue)}. - * - * @param data the object containing data about the acquisition - * @return true if the acquisition didn't require any synchronization - * @see #processQueue(BatchQueue) + * Ensure that {@code callback} is safely executed inside the batch thread. */ - protected static boolean acquire(@NotNull Thread currentThread, @Nullable BatchThread elementThread, @NotNull AcquisitionData data) { - if (elementThread == null) { - // Element didn't get assigned a thread yet (meaning that the element is not part of any thread) - // Returns false in order to force synchronization (useful if this element is acquired multiple time) - return false; - } + protected static void acquire(@NotNull Thread currentThread, @Nullable BatchThread elementThread, Runnable callback) { + if (elementThread == null || elementThread == currentThread) { + callback.run(); + } else { + final Monitor currentMonitor = currentThread instanceof BatchThread ? ((BatchThread) currentThread).monitor : null; - if (currentThread == elementThread) { - // Element can be acquired without any wait/block because threads are the same - return true; - } - - if (!elementThread.getMainRunnable().isInTick()) { - // Element tick has ended and can therefore be directly accessed (with synchronization) - return false; - } - - final List acquiredThread = ACQUIRED_THREADS.get(); - if (acquiredThread.contains(elementThread)) { - // This thread is already acquiring the element thread - return true; - } - - // Element needs to be synchronized, forward a request - { - // Prevent most of contentions, the rest in handled in the acquisition scheduled service - if (currentThread instanceof BatchThread) { - BatchThread batchThread = (BatchThread) currentThread; - Acquisition.processQueue(batchThread.getQueue()); + boolean enter = false; + if (currentMonitor != null && currentMonitor.isOccupiedByCurrentThread()) { + process((BatchThread) currentThread); + currentMonitor.leave(); + enter = true; } + Monitor monitor = elementThread.monitor; + + //System.out.println("acq " + System.currentTimeMillis() + " " + currentThread); + if (monitor.isOccupiedByCurrentThread()) { + //System.out.println("already"); + callback.run(); + process(elementThread); + } else if (GLOBAL_MONITOR.isOccupiedByCurrentThread()) { + callback.run(); + } else if (monitor.tryEnter()) { + //System.out.println("enter"); + callback.run(); + + process(elementThread); + + monitor.leave(); + } else { + // Thread is not available, forward request + + final BatchThread currentBatch = (BatchThread) currentThread; + + while (!GLOBAL_MONITOR.tryEnter()) + processMonitored(currentBatch); + //System.out.println("yes " + elementThread + " " + elementThread.getMainRunnable().isInTick()); + var requests = getRequests(elementThread); + + Acquirable.Request request = new Acquirable.Request(); + request.localLatch = new CountDownLatch(1); + request.processLatch = new CountDownLatch(1); + requests.add(request); + + try { + currentBatch.waitingOn = elementThread; + processMonitored(currentBatch); + request.localLatch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + currentBatch.waitingOn = null; + //System.out.println("end wait"); + callback.run(); + request.processLatch.countDown(); + + GLOBAL_MONITOR.leave(); + } + + if (currentMonitor != null && enter) { + currentMonitor.enter(); + } + } + } + + public static void process(@NotNull BatchThread thread) { + var requests = getRequests(thread); + requests.forEach(request -> { + request.localLatch.countDown(); try { - final boolean monitoring = MinecraftServer.hasWaitMonitoring(); - long time = 0; - if (monitoring) { - time = System.nanoTime(); - } - - final BatchQueue periodQueue = elementThread.getQueue(); - synchronized (periodQueue) { - acquiredThread.add(elementThread); - data.acquiredThreads = acquiredThread; // Shared to remove the element when the acquisition is done - - periodQueue.setWaitingThread(elementThread); - periodQueue.getQueue().add(data); - periodQueue.wait(); - } - - acquiredThread.remove(elementThread); - - if (monitoring) { - time = System.nanoTime() - time; - WAIT_COUNTER_NANO.addAndGet(time); - } + request.processLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } + }); + } - return false; - } + public static void processMonitored(@NotNull BatchThread thread) { + thread.monitor.enter(); + process(thread); + thread.monitor.leave(); + } + + private static @NotNull Collection getRequests(@NotNull BatchThread thread) { + return REQUEST_MAP.computeIfAbsent(thread, batchThread -> ConcurrentHashMap.newKeySet()); } protected synchronized static void scheduledAcquireRequest(@NotNull Acquirable acquirable, Consumer consumer) { @@ -275,22 +189,6 @@ public final class Acquisition { WAIT_COUNTER_NANO.set(0); } - public static final class AcquisitionData { - - private volatile Phaser phaser; - private volatile List acquiredThreads; - - @Nullable - public Phaser getPhaser() { - return phaser; - } - - @Nullable - public List getAcquiredThreads() { - return acquiredThreads; - } - } - private static class ScheduledAcquisition { private final List> acquirableElements = new ArrayList<>(); private final Map>> callbacks = new HashMap<>(); diff --git a/src/main/java/net/minestom/server/thread/BatchQueue.java b/src/main/java/net/minestom/server/thread/BatchQueue.java deleted file mode 100644 index 0d48d9567..000000000 --- a/src/main/java/net/minestom/server/thread/BatchQueue.java +++ /dev/null @@ -1,34 +0,0 @@ -package net.minestom.server.thread; - -import com.google.common.collect.Queues; -import net.minestom.server.lock.Acquisition; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; - -import java.util.Queue; - -/** - * Represents the data of a {@link BatchThread} involved in acquisition. - *

- * Used as a lock until an acquirable element is available. - */ -public class BatchQueue { - - private final Queue acquisitionDataQueue = Queues.newConcurrentLinkedQueue(); - - private volatile Thread waitingThread; - - @NotNull - public Queue getQueue() { - return acquisitionDataQueue; - } - - @Nullable - public Thread getWaitingThread() { - return waitingThread; - } - - public void setWaitingThread(@Nullable Thread waitingThread) { - this.waitingThread = waitingThread; - } -} \ No newline at end of file diff --git a/src/main/java/net/minestom/server/thread/BatchThread.java b/src/main/java/net/minestom/server/thread/BatchThread.java index 18be12ea6..c6db61de1 100644 --- a/src/main/java/net/minestom/server/thread/BatchThread.java +++ b/src/main/java/net/minestom/server/thread/BatchThread.java @@ -1,6 +1,8 @@ package net.minestom.server.thread; +import com.google.common.util.concurrent.Monitor; import net.minestom.server.MinecraftServer; +import net.minestom.server.lock.Acquirable; import net.minestom.server.lock.Acquisition; import net.minestom.server.utils.validate.Check; import org.jetbrains.annotations.NotNull; @@ -9,17 +11,18 @@ import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; public class BatchThread extends Thread { private final BatchRunnable runnable; - private final BatchQueue queue; + public final Monitor monitor = new Monitor(); + public volatile BatchThread waitingOn; public BatchThread(@NotNull BatchRunnable runnable, int number) { super(runnable, MinecraftServer.THREAD_NAME_TICK + "-" + number); this.runnable = runnable; - this.queue = new BatchQueue(); this.runnable.setLinkedThread(this); } @@ -29,16 +32,9 @@ public class BatchThread extends Thread { return runnable; } - @NotNull - public BatchQueue getQueue() { - return queue; - } - public void shutdown() { - synchronized (runnable.tickLock) { - this.runnable.stop = true; - this.runnable.tickLock.notifyAll(); - } + this.runnable.stop = true; + LockSupport.unpark(this); } public static class BatchRunnable implements Runnable { @@ -51,82 +47,51 @@ public class BatchThread extends Thread { private final Queue queue = new ConcurrentLinkedQueue<>(); - private final Object tickLock = new Object(); - @Override public void run() { Check.notNull(batchThread, "The linked BatchThread cannot be null!"); while (!stop) { + LockSupport.park(batchThread); + if (stop) + break; CountDownLatch localCountDownLatch = this.countDownLatch.get(); // The latch is necessary to control the tick rates if (localCountDownLatch == null) { - if(!waitTickLock()){ - break; - } continue; } - synchronized (tickLock) { - this.inTick = true; + this.inTick = true; - // Execute all pending runnable - Runnable runnable; - while ((runnable = queue.poll()) != null) { - runnable.run(); - } - - // Execute waiting acquisition - { - Acquisition.processThreadTick(batchThread.getQueue()); - } - - localCountDownLatch.countDown(); - boolean successful = this.countDownLatch.compareAndSet(localCountDownLatch, null); - - this.inTick = false; - - // new task should be available - if (!successful) { - continue; - } - - // Wait for the next notify (game tick) - if(!waitTickLock()){ - break; - } + // Execute all pending runnable + Runnable runnable; + while ((runnable = queue.poll()) != null) { + runnable.run(); } + + // Execute waiting acquisition + { + Acquisition.processMonitored(batchThread); + } + + localCountDownLatch.countDown(); + this.countDownLatch.compareAndSet(localCountDownLatch, null); + + // Wait for the next notify (game tick) + this.inTick = false; } } public synchronized void startTick(@NotNull CountDownLatch countDownLatch, @NotNull Runnable runnable) { this.countDownLatch.set(countDownLatch); this.queue.add(runnable); - synchronized (tickLock) { - this.tickLock.notifyAll(); - } + LockSupport.unpark(batchThread); } public boolean isInTick() { return inTick; } - private boolean waitTickLock() { - synchronized (tickLock) { - // Wait for the next notify (game tick) - try { - if (stop) { - return false; - } - - this.tickLock.wait(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - return true; - } - private void setLinkedThread(BatchThread batchThread) { this.batchThread = batchThread; } diff --git a/src/main/java/net/minestom/server/thread/ThreadProvider.java b/src/main/java/net/minestom/server/thread/ThreadProvider.java index 6b568e0b3..08d2fa34b 100644 --- a/src/main/java/net/minestom/server/thread/ThreadProvider.java +++ b/src/main/java/net/minestom/server/thread/ThreadProvider.java @@ -107,7 +107,9 @@ public abstract class ThreadProvider { chunkEntries.forEach(chunkEntry -> { chunkEntry.chunk.tick(time); chunkEntry.entities.forEach(entity -> { + thread.monitor.enter(); entity.tick(time); + thread.monitor.leave(); }); }); });