mirror of
https://github.com/PaperMC/Paper.git
synced 2025-01-07 16:57:42 +01:00
Fix race conditions in flush allowing for previously scheduled tasks to execute later than the flush call (#2548)
This commit is contained in:
parent
859d398ff8
commit
7b3f64a452
@ -1053,7 +1053,7 @@ index 000000000..4f10a8311
|
||||
+}
|
||||
diff --git a/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java
|
||||
new file mode 100644
|
||||
index 000000000..c3ca3c4a1
|
||||
index 000000000..78bd238f4
|
||||
--- /dev/null
|
||||
+++ b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java
|
||||
@@ -0,0 +0,0 @@
|
||||
@ -1160,6 +1160,24 @@ index 000000000..c3ca3c4a1
|
||||
+ }
|
||||
+
|
||||
+ /**
|
||||
+ * Returns whether this queue may have tasks queued.
|
||||
+ * <p>
|
||||
+ * This operation is not atomic, but is MT-Safe.
|
||||
+ * </p>
|
||||
+ * @return {@code true} if tasks may be queued, {@code false} otherwise
|
||||
+ */
|
||||
+ public boolean hasTasks() {
|
||||
+ for (int i = 0; i < TOTAL_PRIORITIES; ++i) {
|
||||
+ final ConcurrentLinkedQueue<T> queue = this.queues[i];
|
||||
+
|
||||
+ if (queue.peek() != null) {
|
||||
+ return true;
|
||||
+ }
|
||||
+ }
|
||||
+ return false;
|
||||
+ }
|
||||
+
|
||||
+ /**
|
||||
+ * Prevent further additions to this queue. Attempts to add after this call has completed (potentially during) will
|
||||
+ * result in {@link IllegalStateException} being thrown.
|
||||
+ * <p>
|
||||
@ -1317,7 +1335,7 @@ index 000000000..c3ca3c4a1
|
||||
+}
|
||||
diff --git a/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java
|
||||
new file mode 100644
|
||||
index 000000000..f127ef236
|
||||
index 000000000..ee906b594
|
||||
--- /dev/null
|
||||
+++ b/src/main/java/com/destroystokyo/paper/io/QueueExecutorThread.java
|
||||
@@ -0,0 +0,0 @@
|
||||
@ -1342,11 +1360,7 @@ index 000000000..f127ef236
|
||||
+ protected final AtomicBoolean parked = new AtomicBoolean();
|
||||
+
|
||||
+ protected volatile ConcurrentLinkedQueue<Thread> flushQueue = new ConcurrentLinkedQueue<>();
|
||||
+
|
||||
+ // this is required to synchronize LockSupport#park()
|
||||
+ // LockSupport explicitly states that it will only follow ordering with respect to volatile access
|
||||
+ // see flush() for more details
|
||||
+ protected volatile long flushCounter;
|
||||
+ protected volatile long flushCycles;
|
||||
+
|
||||
+ public QueueExecutorThread(final PrioritizedTaskQueue<T> queue) {
|
||||
+ this(queue, (int)(1.e6)); // 1.0ms
|
||||
@ -1392,20 +1406,14 @@ index 000000000..f127ef236
|
||||
+ }
|
||||
+
|
||||
+ this.parked.set(true);
|
||||
+
|
||||
+ // We need to parse here to avoid a race condition where a thread queues a task before we set parked to true
|
||||
+ // (i.e it will not notify us)
|
||||
+
|
||||
+ // it also resolves race condition where we've overriden a concurrent thread's flush call which set parked to false
|
||||
+ // the important ordering: (volatile guarantees we cannot re-order the below events)
|
||||
+ // us: parked -> true, parse tasks -> writeCounter + 1 -> drain flush queue
|
||||
+ // them: read write counter -> add to flush queue -> write parked to false -> park loop
|
||||
+
|
||||
+ // if we overwrite their set parked to false call then they're in the park loop or about to be, and we're about to
|
||||
+ // drain the flush queue
|
||||
+ if (this.pollTasks(true)) {
|
||||
+ this.parked.set(false);
|
||||
+ continue;
|
||||
+ }
|
||||
+
|
||||
+ if (this.handleClose()) {
|
||||
+ return;
|
||||
+ }
|
||||
@ -1452,19 +1460,22 @@ index 000000000..f127ef236
|
||||
+ }
|
||||
+
|
||||
+ protected void handleFlushThreads(final boolean shutdown) {
|
||||
+ final ConcurrentLinkedQueue<Thread> flushQueue = this.flushQueue; // Note: this can be a plain read
|
||||
+ Thread parking;
|
||||
+ ConcurrentLinkedQueue<Thread> flushQueue = this.flushQueue;
|
||||
+ do {
|
||||
+ ++flushCycles; // may be plain read opaque write
|
||||
+ while ((parking = flushQueue.poll()) != null) {
|
||||
+ LockSupport.unpark(parking);
|
||||
+ }
|
||||
+ } while (this.pollTasks(false));
|
||||
+
|
||||
+ if (shutdown) {
|
||||
+ this.flushQueue = null; // Note: this can be a release write
|
||||
+ }
|
||||
+ this.flushQueue = null;
|
||||
+
|
||||
+ Thread current;
|
||||
+
|
||||
+ while ((current = flushQueue.poll()) != null) {
|
||||
+ this.pollTasks(false);
|
||||
+ // increment flush counter so threads will wake up after being unparked()
|
||||
+ //noinspection NonAtomicOperationOnVolatileField
|
||||
+ ++this.flushCounter; // may be plain read plain write if we order before poll() (also would need to re-order pollTasks)
|
||||
+ LockSupport.unpark(current);
|
||||
+ // defend against a race condition where a flush thread double-checks right before we set to null
|
||||
+ while ((parking = flushQueue.poll()) != null) {
|
||||
+ LockSupport.unpark(parking);
|
||||
+ }
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
@ -1485,7 +1496,6 @@ index 000000000..f127ef236
|
||||
+ this.notifyTasks();
|
||||
+ }
|
||||
+
|
||||
+
|
||||
+ /**
|
||||
+ * Waits until this thread's queue is empty.
|
||||
+ *
|
||||
@ -1501,42 +1511,47 @@ index 000000000..f127ef236
|
||||
+
|
||||
+ // order is important
|
||||
+
|
||||
+ long flushCounter = this.flushCounter;
|
||||
+ int successes = 0;
|
||||
+ long lastCycle = -1L;
|
||||
+
|
||||
+ ConcurrentLinkedQueue<Thread> flushQueue = this.flushQueue;
|
||||
+ do {
|
||||
+ final ConcurrentLinkedQueue<Thread> flushQueue = this.flushQueue;
|
||||
+ if (flushQueue == null) {
|
||||
+ return;
|
||||
+ }
|
||||
+
|
||||
+ // it's important to read the flush queue after the flush counter to ensure that if we proceed from here
|
||||
+ // we have a flush counter that would be different from the final flush counter if the queue executor shuts down
|
||||
+ // the double read of the flush queue is not enough to account for this since
|
||||
+ if (flushQueue == null) {
|
||||
+ return; // queue executor has received shutdown and emptied queue
|
||||
+ }
|
||||
+ flushQueue.add(currentThread);
|
||||
+
|
||||
+ flushQueue.add(currentThread);
|
||||
+ // double check flush queue
|
||||
+ if (this.flushQueue == null) {
|
||||
+ return;
|
||||
+ }
|
||||
+
|
||||
+ // re-check null flush queue, we need to guarantee the executor is not shutting down before parking
|
||||
+ final long currentCycle = this.flushCycles; // may be opaque read
|
||||
+
|
||||
+ if (this.flushQueue == null) {
|
||||
+ // cannot guarantee state of flush queue now, the executor is done though
|
||||
+ return;
|
||||
+ }
|
||||
+ if (currentCycle == lastCycle) {
|
||||
+ Thread.yield();
|
||||
+ continue;
|
||||
+ }
|
||||
+
|
||||
+ // force a response from the IO thread, we're not sure of its state currently
|
||||
+ this.parked.set(false);
|
||||
+ LockSupport.unpark(this);
|
||||
+ // force response
|
||||
+ this.parked.set(false);
|
||||
+ LockSupport.unpark(this);
|
||||
+
|
||||
+ // Note: see the run() function for handling of a race condition where the queue executor overwrites our parked write
|
||||
+ LockSupport.park("flushing queue executor thread");
|
||||
+
|
||||
+ boolean interrupted = false; // preserve interrupted status
|
||||
+ // returns whether there are tasks queued, does not return whether there are tasks executing
|
||||
+ // this is why we cycle twice twice through flush (we know a pollTask call is made after a flush cycle)
|
||||
+ // we really only need to guarantee that the tasks this thread has queued has gone through, and can leave
|
||||
+ // tasks queued concurrently that are unsychronized with this thread as undefined behavior
|
||||
+ if (this.queue.hasTasks()) {
|
||||
+ successes = 0;
|
||||
+ } else {
|
||||
+ ++successes;
|
||||
+ }
|
||||
+
|
||||
+ while (this.flushCounter == flushCounter) {
|
||||
+ interrupted |= Thread.interrupted();
|
||||
+ LockSupport.park();
|
||||
+ }
|
||||
+ } while (successes != 2);
|
||||
+
|
||||
+ if (interrupted) {
|
||||
+ Thread.currentThread().interrupt();
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
+ /**
|
||||
|
Loading…
Reference in New Issue
Block a user