Fix race condition in BufferedRequest

This commit is contained in:
Luck 2018-09-23 19:52:42 +01:00
parent 57619dacb1
commit e1b16465f8
No known key found for this signature in database
GPG Key ID: EFA9B3EC5FD90F8B

View File

@ -101,19 +101,21 @@ public abstract class BufferedRequest<T> {
*/ */
protected abstract T perform(); protected abstract T perform();
private static class Processor<R> implements Runnable { private static class Processor<R> {
private Supplier<R> supplier; private Supplier<R> supplier;
private final long delay; private final long delay;
private final TimeUnit unit; private final TimeUnit unit;
private final SchedulerAdapter schedulerAdapter; private final SchedulerAdapter schedulerAdapter;
private SchedulerTask task;
private final Object[] mutex = new Object[0]; private final Object[] mutex = new Object[0];
private CompletableFuture<R> future = new CompletableFuture<>(); private CompletableFuture<R> future = new CompletableFuture<>();
private boolean usable = true; private boolean usable = true;
private SchedulerTask scheduledTask;
private BoundTask boundTask = null;
Processor(Supplier<R> supplier, long delay, TimeUnit unit, SchedulerAdapter schedulerAdapter) { Processor(Supplier<R> supplier, long delay, TimeUnit unit, SchedulerAdapter schedulerAdapter) {
this.supplier = supplier; this.supplier = supplier;
this.delay = delay; this.delay = delay;
@ -128,36 +130,14 @@ public abstract class BufferedRequest<T> {
if (!this.usable) { if (!this.usable) {
throw new IllegalStateException("Processor not usable"); throw new IllegalStateException("Processor not usable");
} }
if (this.task != null) { if (this.scheduledTask != null) {
this.task.cancel(); this.scheduledTask.cancel();
} }
this.task = this.schedulerAdapter.asyncLater(this, this.delay, this.unit); this.boundTask = new BoundTask();
this.scheduledTask = this.schedulerAdapter.asyncLater(this.boundTask, this.delay, this.unit);
} }
} }
@Override
public void run() {
synchronized (this.mutex) {
if (!this.usable) {
throw new IllegalStateException("Task has already ran");
}
this.usable = false;
}
// compute result
try {
R result = this.supplier.get();
this.future.complete(result);
} catch (Exception e) {
this.future.completeExceptionally(e);
}
// allow supplier and future to be GCed
this.task = null;
this.supplier = null;
this.future = null;
}
CompletableFuture<R> getFuture() { CompletableFuture<R> getFuture() {
return this.future; return this.future;
} }
@ -167,6 +147,39 @@ public abstract class BufferedRequest<T> {
return this.future; return this.future;
} }
private final class BoundTask implements Runnable {
@Override
public void run() {
synchronized (Processor.this.mutex) {
if (!Processor.this.usable) {
throw new IllegalStateException("Task has already ran");
}
// check that we're still the bound task.
// prevents a race condition between #run and #rescheduleTask
if (Processor.this.boundTask != this) {
return;
}
Processor.this.usable = false;
}
// compute result
try {
R result = Processor.this.supplier.get();
Processor.this.future.complete(result);
} catch (Exception e) {
Processor.this.future.completeExceptionally(e);
}
// allow supplier and future to be GCed
Processor.this.supplier = null;
Processor.this.future = null;
Processor.this.scheduledTask = null;
Processor.this.boundTask = null;
}
}
} }
} }