Ensure ticket updates are processed for task queue reference count

It is possible that when the reference count is retrieved that
the ticket updates have not yet been processed, which means
that the region section at the queued position may not exist or may
become nonexistant while or after the task is being added.

This restructures the reference counting code to now refer to
both a reference counter and a flag indicating whether ticket
updates are processed.

Fixes https://github.com/PaperMC/Folia/issues/262
This issue shows the race condition where the region becomes
nonexistant.
This commit is contained in:
Spottedleaf 2024-12-03 02:06:00 -08:00
parent f50f36339c
commit e1601057b3

View File

@ -2129,10 +2129,10 @@ index 0000000000000000000000000000000000000000..fc053ded0c14b76a1c6c82b59d3fd320
+}
diff --git a/src/main/java/io/papermc/paper/threadedregions/RegionizedTaskQueue.java b/src/main/java/io/papermc/paper/threadedregions/RegionizedTaskQueue.java
new file mode 100644
index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983d0c65eba
index 0000000000000000000000000000000000000000..a2313b5b4c37e8536973a8ea0b371557ea912473
--- /dev/null
+++ b/src/main/java/io/papermc/paper/threadedregions/RegionizedTaskQueue.java
@@ -0,0 +1,803 @@
@@ -0,0 +1,807 @@
+package io.papermc.paper.threadedregions;
+
+import ca.spottedleaf.concurrentutil.collection.MultiThreadedQueue;
@ -2208,7 +2208,7 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+ public static final class WorldRegionTaskData {
+ private final ServerLevel world;
+ private final MultiThreadedQueue<Runnable> globalChunkTask = new MultiThreadedQueue<>();
+ private final ConcurrentLong2ReferenceChainedHashTable<AtomicLong> referenceCounters = new ConcurrentLong2ReferenceChainedHashTable<>();
+ private final ConcurrentLong2ReferenceChainedHashTable<ReferenceCountData> referenceCounters = new ConcurrentLong2ReferenceChainedHashTable<>();
+
+ public WorldRegionTaskData(final ServerLevel world) {
+ this.world = world;
@ -2258,96 +2258,95 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+ this.world.moonrise$getChunkTaskScheduler().chunkHolderManager.processTicketUpdates(CoordinateUtils.getChunkX(coord), CoordinateUtils.getChunkZ(coord));
+ }
+
+ private void decrementReference(final AtomicLong reference, final long coord) {
+ final long val = reference.decrementAndGet();
+ if (val == 0L) {
+ final int chunkX = CoordinateUtils.getChunkX(coord);
+ final int chunkZ = CoordinateUtils.getChunkZ(coord);
+ final ca.spottedleaf.concurrentutil.lock.ReentrantAreaLock.Node ticketLock = this.world.moonrise$getChunkTaskScheduler().chunkHolderManager.ticketLockArea.lock(chunkX, chunkZ);
+ try {
+ if (this.referenceCounters.remove(coord, reference) == reference) {
+ WorldRegionTaskData.this.removeTicket(coord);
+ } // else: race condition, something replaced our reference - not our issue anymore
+ } finally {
+ this.world.moonrise$getChunkTaskScheduler().chunkHolderManager.ticketLockArea.unlock(ticketLock);
+ }
+ } else if (val < 0L) {
+ throw new IllegalStateException("Reference count < 0: " + val);
+ // note: only call on acquired referenceCountData
+ private void ensureTicketAdded(final long coord, final ReferenceCountData referenceCountData) {
+ if (!referenceCountData.addedTicket) {
+ // fine if multiple threads do this, no removeTicket may be called for this coord due to reference count inc
+ this.addTicket(coord);
+ this.processTicketUpdates(coord);
+ referenceCountData.addedTicket = true;
+ }
+ }
+
+ private AtomicLong incrementReference(final long coord) {
+ final AtomicLong ret = this.referenceCounters.get(coord);
+ if (ret != null) {
+ // try to fast acquire counter
+ int failures = 0;
+ for (long curr = ret.get();;) {
+ if (curr == 0L) {
+ // failed to fast acquire as reference expired
+ break;
+ }
+ private void decrementReference(final ReferenceCountData referenceCountData, final long coord) {
+ if (!referenceCountData.decreaseReferenceCount()) {
+ return;
+ } // else: need to remove ticket
+
+ for (int i = 0; i < failures; ++i) {
+ ConcurrentUtil.backoff();
+ }
+
+ if (curr == (curr = ret.compareAndExchange(curr, curr + 1L))) {
+ return ret;
+ }
+
+ ++failures;
+ // note: it is possible that another thread increments and then removes the reference before we can, so
+ // use ifPresent
+ this.referenceCounters.computeIfPresent(coord, (final long keyInMap, final ReferenceCountData valueInMap) -> {
+ if (valueInMap.referenceCount.get() != 0L) {
+ return valueInMap;
+ }
+
+ // note: valueInMap may not be referenceCountData
+
+ // possible to invoke this outside of the compute call, but not required and requires additional logic
+ WorldRegionTaskData.this.removeTicket(keyInMap);
+
+ return null;
+ });
+ }
+
+ private ReferenceCountData incrementReference(final long coord) {
+ ReferenceCountData referenceCountData = this.referenceCounters.get(coord);
+
+ if (referenceCountData != null && referenceCountData.addCount()) {
+ this.ensureTicketAdded(coord, referenceCountData);
+ return referenceCountData;
+ }
+
+ // slow acquire
+ final int chunkX = CoordinateUtils.getChunkX(coord);
+ final int chunkZ = CoordinateUtils.getChunkZ(coord);
+ final ca.spottedleaf.concurrentutil.lock.ReentrantAreaLock.Node ticketLock = this.world.moonrise$getChunkTaskScheduler().chunkHolderManager.ticketLockArea.lock(chunkX, chunkZ);
+ final AtomicLong ret2;
+ final boolean processTicketUpdates;
+ try {
+ final AtomicLong replace = new AtomicLong(1L);
+ final AtomicLong valueInMap = this.referenceCounters.putIfAbsent(coord, replace);
+ referenceCountData = this.referenceCounters.compute(coord, (final long keyInMap, final ReferenceCountData valueInMap) -> {
+ if (valueInMap == null) {
+ // replaced, we should usually be here
+ this.addTicket(coord);
+ ret2 = replace;
+ processTicketUpdates = true;
+ } else {
+ processTicketUpdates = false;
+ int failures = 0;
+ for (long curr = valueInMap.get();;) {
+ if (curr == 0L) {
+ // don't need to add ticket here, since ticket is only removed during the lock
+ // we just need to replace the value in the map so that the thread removing fails and doesn't
+ // remove the ticket (see decrementReference)
+ this.referenceCounters.put(coord, replace);
+ ret2 = replace;
+ break;
+ }
+
+ for (int i = 0; i < failures; ++i) {
+ ConcurrentUtil.backoff();
+ }
+
+ if (curr == (curr = valueInMap.compareAndExchange(curr, curr + 1L))) {
+ // acquired
+ ret2 = valueInMap;
+ break;
+ }
+
+ ++failures;
+ }
+ // sets reference count to 1
+ return new ReferenceCountData();
+ }
+ } finally {
+ this.world.moonrise$getChunkTaskScheduler().chunkHolderManager.ticketLockArea.unlock(ticketLock);
+ }
+ // OK if we add from 0, the remove call will use compute() and catch this race condition
+ valueInMap.referenceCount.getAndIncrement();
+
+ if (processTicketUpdates) {
+ this.processTicketUpdates(coord);
+ }
+ return valueInMap;
+ });
+
+ return ret2;
+ this.ensureTicketAdded(coord, referenceCountData);
+
+ return referenceCountData;
+ }
+ }
+
+ private static final class ReferenceCountData {
+
+ public final AtomicLong referenceCount = new AtomicLong(1L);
+ public volatile boolean addedTicket;
+
+ // returns false if reference count is 0, otherwise increments ref count
+ public boolean addCount() {
+ int failures = 0;
+ for (long curr = this.referenceCount.get();;) {
+ for (int i = 0; i < failures; ++i) {
+ Thread.onSpinWait();
+ }
+
+ if (curr == 0L) {
+ return false;
+ }
+
+ if (curr == (curr = this.referenceCount.compareAndExchange(curr, curr + 1L))) {
+ return true;
+ }
+
+ ++failures;
+ }
+ }
+
+ // returns true if new reference count is 0
+ public boolean decreaseReferenceCount() {
+ final long res = this.referenceCount.decrementAndGet();
+ if (res >= 0L) {
+ return res == 0L;
+ } else {
+ throw new IllegalStateException("Negative reference count");
+ }
+ }
+ }
+
@ -2544,7 +2543,7 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+ final ArrayDeque<ChunkBasedPriorityTask>[] queues = this.queues;
+ final int max = Priority.IDLE.priority;
+ ChunkBasedPriorityTask task = null;
+ AtomicLong referenceCounter = null;
+ ReferenceCountData referenceCounter = null;
+ synchronized (this) {
+ if (this.isDestroyed) {
+ throw new IllegalStateException("Attempting to poll from dead queue");
@ -2576,7 +2575,10 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+
+ private static final class ChunkBasedPriorityTask implements PrioritisedExecutor.PrioritisedTask {
+
+ private static final AtomicLong REFERENCE_COUNTER_NOT_SET = new AtomicLong(-1L);
+ private static final ReferenceCountData REFERENCE_COUNTER_NOT_SET = new ReferenceCountData();
+ static {
+ REFERENCE_COUNTER_NOT_SET.referenceCount.set((long)Integer.MIN_VALUE);
+ }
+
+ private final WorldRegionTaskData world;
+ private final int chunkX;
@ -2584,8 +2586,8 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+ private final long sectionLowerLeftCoord; // chunk coordinate
+ private final boolean isChunkTask;
+
+ private volatile AtomicLong referenceCounter;
+ private static final VarHandle REFERENCE_COUNTER_HANDLE = ConcurrentUtil.getVarHandle(ChunkBasedPriorityTask.class, "referenceCounter", AtomicLong.class);
+ private volatile ReferenceCountData referenceCounter;
+ private static final VarHandle REFERENCE_COUNTER_HANDLE = ConcurrentUtil.getVarHandle(ChunkBasedPriorityTask.class, "referenceCounter", ReferenceCountData.class);
+ private Runnable run;
+ private volatile Priority priority;
+ private static final VarHandle PRIORITY_HANDLE = ConcurrentUtil.getVarHandle(ChunkBasedPriorityTask.class, "priority", Priority.class);
@ -2622,16 +2624,16 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+ return (Priority)PRIORITY_HANDLE.compareAndExchange(this, expect, update);
+ }
+
+ private void setReferenceCounterPlain(final AtomicLong value) {
+ private void setReferenceCounterPlain(final ReferenceCountData value) {
+ REFERENCE_COUNTER_HANDLE.set(this, value);
+ }
+
+ private AtomicLong getReferenceCounterVolatile() {
+ return (AtomicLong)REFERENCE_COUNTER_HANDLE.get(this);
+ private ReferenceCountData getReferenceCounterVolatile() {
+ return (ReferenceCountData)REFERENCE_COUNTER_HANDLE.get(this);
+ }
+
+ private AtomicLong compareAndExchangeReferenceCounter(final AtomicLong expect, final AtomicLong update) {
+ return (AtomicLong)REFERENCE_COUNTER_HANDLE.compareAndExchange(this, expect, update);
+ private ReferenceCountData compareAndExchangeReferenceCounter(final ReferenceCountData expect, final ReferenceCountData update) {
+ return (ReferenceCountData)REFERENCE_COUNTER_HANDLE.compareAndExchange(this, expect, update);
+ }
+
+ private void executeInternal() {
@ -2648,7 +2650,7 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+
+ private boolean tryComplete(final boolean cancel) {
+ int failures = 0;
+ for (AtomicLong curr = this.getReferenceCounterVolatile();;) {
+ for (ReferenceCountData curr = this.getReferenceCounterVolatile();;) {
+ if (curr == null) {
+ return false;
+ }
@ -2697,7 +2699,7 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+ return false;
+ }
+
+ final AtomicLong referenceCounter = this.world.incrementReference(this.sectionLowerLeftCoord);
+ final ReferenceCountData referenceCounter = this.world.incrementReference(this.sectionLowerLeftCoord);
+ if (this.compareAndExchangeReferenceCounter(REFERENCE_COUNTER_NOT_SET, referenceCounter) != REFERENCE_COUNTER_NOT_SET) {
+ // we don't expect race conditions here, so it is OK if we have to needlessly reference count
+ this.world.decrementReference(referenceCounter, this.sectionLowerLeftCoord);
@ -2747,7 +2749,7 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+ }
+ }
+
+ private AtomicLong trySetCompleting(final int minPriority) {
+ private ReferenceCountData trySetCompleting(final int minPriority) {
+ // first, try to set priority to EXECUTING
+ for (Priority curr = this.getPriorityVolatile();;) {
+ if (curr.isLowerPriority(minPriority)) {
@ -2759,7 +2761,7 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+ } // else: continue
+ }
+
+ for (AtomicLong curr = this.getReferenceCounterVolatile();;) {
+ for (ReferenceCountData curr = this.getReferenceCounterVolatile();;) {
+ if (curr == null) {
+ // something acquired before us
+ return null;
@ -2772,6 +2774,7 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+ if (curr != (curr = this.compareAndExchangeReferenceCounter(curr, null))) {
+ continue;
+ }
+
+ return curr;
+ }
+ }
@ -2779,7 +2782,7 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+ private void updatePriorityInQueue() {
+ boolean synchronise = false;
+ for (;;) {
+ final AtomicLong referenceCount = this.getReferenceCounterVolatile();
+ final ReferenceCountData referenceCount = this.getReferenceCounterVolatile();
+ if (referenceCount == REFERENCE_COUNTER_NOT_SET || referenceCount == null) {
+ // cancelled or not queued
+ return;
@ -2798,6 +2801,7 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+ if (queue == null) {
+ if (!synchronise) {
+ // may be incorrectly null when unsynchronised
+ synchronise = true;
+ continue;
+ }
+ // must have been removed
@ -2873,7 +2877,7 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+
+ @Override
+ public boolean setPriorityAndSubOrder(final Priority priority, final long subOrder) {
+ throw new UnsupportedOperationException();
+ return this.setPriority(priority);
+ }
+
+ @Override
@ -3887,7 +3891,7 @@ index 0000000000000000000000000000000000000000..74ac328bf8d5f762f7060a6c5d49089d
+}
diff --git a/src/main/java/io/papermc/paper/threadedregions/ThreadedRegionizer.java b/src/main/java/io/papermc/paper/threadedregions/ThreadedRegionizer.java
new file mode 100644
index 0000000000000000000000000000000000000000..ce388e0ef231d7d73f75f5778c58eb40f6402f0f
index 0000000000000000000000000000000000000000..8e1b1df1c889d9235b10b86fc4cedbc06b7885c2
--- /dev/null
+++ b/src/main/java/io/papermc/paper/threadedregions/ThreadedRegionizer.java
@@ -0,0 +1,1405 @@
@ -4878,7 +4882,7 @@ index 0000000000000000000000000000000000000000..ce388e0ef231d7d73f75f5778c58eb40
+
+ return this.state == STATE_READY;
+ } catch (final Throwable throwable) {
+ LOGGER.error("Failed to acquire region " + this, throwable);
+ LOGGER.error("Failed to release region " + this, throwable);
+ SneakyThrow.sneaky(throwable);
+ return false; // unreachable
+ } finally {