Improve buffering code

This commit is contained in:
Luck 2018-06-03 19:36:55 +01:00
parent bcb4e5ca64
commit 8cf0f7da5f
No known key found for this signature in database
GPG Key ID: EFA9B3EC5FD90F8B
5 changed files with 105 additions and 77 deletions

View File

@ -25,125 +25,148 @@
package me.lucko.luckperms.common.buffers; package me.lucko.luckperms.common.buffers;
import java.lang.ref.WeakReference; import me.lucko.luckperms.common.plugin.SchedulerTask;
import me.lucko.luckperms.common.plugin.scheduler.SchedulerAdapter;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier; import java.util.function.Supplier;
/** /**
* Thread-safe request buffer. * Thread-safe request buffer.
* *
* Waits for the buffer time to pass before performing the operation. If the task is called again in that time, the * Waits for the buffer time to pass before performing the operation.
* buffer time is reset. * If the request is called again in that time, the buffer time is reset.
* *
* @param <T> the return type * @param <T> the return type
*/ */
public abstract class BufferedRequest<T> { public abstract class BufferedRequest<T> {
private final long bufferTimeMillis;
private final long sleepInterval;
private final Executor executor;
private WeakReference<Processor<T>> processor = null; /** The buffer time */
private final ReentrantLock lock = new ReentrantLock(); private final long bufferTime;
private final TimeUnit unit;
private final SchedulerAdapter schedulerAdapter;
public BufferedRequest(long bufferTimeMillis, long sleepInterval, Executor executor) { /** The active processor task, if present */
this.bufferTimeMillis = bufferTimeMillis; private Processor<T> processor = null;
this.sleepInterval = sleepInterval;
this.executor = executor; /** Mutex to guard processor */
private final Object[] mutex = new Object[0];
/**
* Creates a new buffer with the given timeout millis
*
* @param bufferTime the timeout
* @param unit the unit of the timeout
*/
public BufferedRequest(long bufferTime, TimeUnit unit, SchedulerAdapter schedulerAdapter) {
this.bufferTime = bufferTime;
this.unit = unit;
this.schedulerAdapter = schedulerAdapter;
} }
/**
* Makes a request to the buffer
*
* @return the future
*/
public CompletableFuture<T> request() { public CompletableFuture<T> request() {
this.lock.lock(); synchronized (this.mutex) {
try {
if (this.processor != null) { if (this.processor != null) {
Processor<T> p = this.processor.get(); try {
if (p != null && p.isUsable()) { return this.processor.extendAndGetFuture();
return p.getAndExtend(); } catch (IllegalStateException e) {
// ignore
} }
} }
Processor<T> p = new Processor<>(this.bufferTimeMillis, this.sleepInterval, this::perform); Processor<T> p = this.processor = new Processor<>(this::perform, this.bufferTime, this.unit, this.schedulerAdapter);
this.executor.execute(p); return p.getFuture();
this.processor = new WeakReference<>(p);
return p.get();
} finally {
this.lock.unlock();
} }
} }
/**
* Requests the value, bypassing the buffer
*
* @return the value
*/
public T requestDirectly() { public T requestDirectly() {
return perform(); return perform();
} }
/**
* Performs the buffered task
*
* @return the result
*/
protected abstract T perform(); protected abstract T perform();
private static class Processor<R> implements Runnable { private static class Processor<R> implements Runnable {
private final long delayMillis; private Supplier<R> supplier;
private final long sleepMillis;
private final Supplier<R> supplier;
private final ReentrantLock lock = new ReentrantLock();
private final CompletableFuture<R> future = new CompletableFuture<>();
private boolean usable = true;
private long executionTime;
public Processor(long delayMillis, long sleepMillis, Supplier<R> supplier) { private final long delay;
this.delayMillis = delayMillis; private final TimeUnit unit;
this.sleepMillis = sleepMillis;
private SchedulerAdapter schedulerAdapter;
private SchedulerTask task;
private final Object[] mutex = new Object[0];
private CompletableFuture<R> future = new CompletableFuture<>();
private boolean usable = true;
Processor(Supplier<R> supplier, long delay, TimeUnit unit, SchedulerAdapter schedulerAdapter) {
this.supplier = supplier; this.supplier = supplier;
this.delay = delay;
this.unit = unit;
this.schedulerAdapter = schedulerAdapter;
rescheduleTask();
}
private void rescheduleTask() {
synchronized (this.mutex) {
if (!this.usable) {
throw new IllegalStateException("Processor not usable");
}
if (this.task != null) {
this.task.cancel();
}
this.task = this.schedulerAdapter.asyncLater(this, this.delay, this.unit);
}
} }
@Override @Override
public void run() { public void run() {
this.lock.lock(); synchronized (this.mutex) {
if (!this.usable) {
throw new IllegalStateException("Task has already ran");
}
this.usable = false;
}
// compute result
try { try {
this.executionTime = System.currentTimeMillis() + this.delayMillis; R result = this.supplier.get();
} finally { this.future.complete(result);
this.lock.unlock(); } catch (Exception e) {
this.future.completeExceptionally(e);
} }
while (true) { // allow supplier and future to be GCed
this.lock.lock(); this.task = null;
try { this.supplier = null;
if (System.currentTimeMillis() > this.executionTime) { this.future = null;
this.usable = false;
break;
}
} finally {
this.lock.unlock();
}
try {
Thread.sleep(this.sleepMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
R result = this.supplier.get();
this.future.complete(result);
} }
public CompletableFuture<R> get() { CompletableFuture<R> getFuture() {
return this.future; return this.future;
} }
public CompletableFuture<R> getAndExtend() { CompletableFuture<R> extendAndGetFuture() {
this.lock.lock(); rescheduleTask();
try {
this.executionTime = System.currentTimeMillis() + this.delayMillis;
} finally {
this.lock.unlock();
}
return this.future; return this.future;
} }
public boolean isUsable() {
return this.usable;
}
} }
} }

View File

@ -28,11 +28,13 @@ package me.lucko.luckperms.common.buffers;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin; import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import me.lucko.luckperms.common.tasks.UpdateTask; import me.lucko.luckperms.common.tasks.UpdateTask;
import java.util.concurrent.TimeUnit;
public class UpdateTaskBuffer extends BufferedRequest<Void> { public class UpdateTaskBuffer extends BufferedRequest<Void> {
private final LuckPermsPlugin plugin; private final LuckPermsPlugin plugin;
public UpdateTaskBuffer(LuckPermsPlugin plugin) { public UpdateTaskBuffer(LuckPermsPlugin plugin) {
super(250L, 50L, plugin.getBootstrap().getScheduler().async()); super(250L, TimeUnit.MILLISECONDS, plugin.getBootstrap().getScheduler());
this.plugin = plugin; this.plugin = plugin;
} }

View File

@ -47,6 +47,7 @@ import java.util.HashSet;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
@ -207,7 +208,7 @@ public class LuckPermsMessagingService implements InternalMessagingService, Inco
private final class PushUpdateBuffer extends BufferedRequest<Void> { private final class PushUpdateBuffer extends BufferedRequest<Void> {
public PushUpdateBuffer(LuckPermsPlugin plugin) { public PushUpdateBuffer(LuckPermsPlugin plugin) {
super(2000L, 200L, plugin.getBootstrap().getScheduler().async()); super(2, TimeUnit.SECONDS, plugin.getBootstrap().getScheduler());
} }
@Override @Override

View File

@ -47,6 +47,7 @@ import java.nio.file.Path;
import java.util.Queue; import java.util.Queue;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
public class FileActionLogger { public class FileActionLogger {
@ -166,7 +167,7 @@ public class FileActionLogger {
private final class SaveBuffer extends BufferedRequest<Void> { private final class SaveBuffer extends BufferedRequest<Void> {
public SaveBuffer(LuckPermsPlugin plugin) { public SaveBuffer(LuckPermsPlugin plugin) {
super(2000L, 500L, plugin.getBootstrap().getScheduler().async()); super(2, TimeUnit.SECONDS, plugin.getBootstrap().getScheduler());
} }
@Override @Override

View File

@ -41,6 +41,7 @@ import org.spongepowered.api.service.permission.Subject;
import java.io.IOException; import java.io.IOException;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeUnit;
/** /**
* A simple persistable Subject implementation * A simple persistable Subject implementation
@ -164,7 +165,7 @@ public class PersistedSubject extends CalculatedSubject implements LPSubject {
private final class SaveBuffer extends BufferedRequest<Void> { private final class SaveBuffer extends BufferedRequest<Void> {
public SaveBuffer(LuckPermsPlugin plugin) { public SaveBuffer(LuckPermsPlugin plugin) {
super(1000L, 500L, plugin.getBootstrap().getScheduler().async()); super(1, TimeUnit.SECONDS, plugin.getBootstrap().getScheduler());
} }
@Override @Override