Fix issue with task queueing

This commit is contained in:
Jesse Boyd 2018-05-15 16:21:08 +10:00
parent fb744cdb21
commit 5d076ef4b2
No known key found for this signature in database
GPG Key ID: 59F1DE6293AF6E1F
5 changed files with 129 additions and 105 deletions

View File

@ -21,25 +21,7 @@ public class Cancel extends FaweCommand {
if (player == null) {
return false;
}
UUID uuid = player.getUUID();
Collection<FaweQueue> queues = SetQueue.IMP.getAllQueues();
int cancelled = 0;
player.clearActions();
for (FaweQueue queue : queues) {
Collection<EditSession> sessions = queue.getEditSessions();
for (EditSession session : sessions) {
FawePlayer currentPlayer = session.getPlayer();
if (currentPlayer == player) {
if (session.cancel()) {
cancelled++;
}
}
}
}
VirtualWorld world = player.getSession().getVirtualWorld();
if (world != null) {
world.clear();
}
int cancelled = player.cancel(false);
BBC.WORLDEDIT_CANCEL_COUNT.send(player, cancelled);
return true;
}

View File

@ -6,7 +6,7 @@ import com.boydti.fawe.config.Settings;
import com.boydti.fawe.logging.rollback.RollbackOptimizedHistory;
import com.boydti.fawe.object.RunnableVal;
import com.boydti.fawe.object.changeset.DiskStorageHistory;
import com.boydti.fawe.object.task.SingleThreadNotifyQueue;
import com.boydti.fawe.object.task.AsyncNotifyQueue;
import com.boydti.fawe.util.MainUtil;
import com.boydti.fawe.util.TaskManager;
import com.sk89q.worldedit.Vector;
@ -23,7 +23,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
public class RollbackDatabase extends SingleThreadNotifyQueue {
public class RollbackDatabase extends AsyncNotifyQueue {
private final String prefix;
private final File dbLocation;

View File

@ -8,6 +8,7 @@ import com.boydti.fawe.config.Settings;
import com.boydti.fawe.object.brush.visualization.VirtualWorld;
import com.boydti.fawe.object.clipboard.DiskOptimizedClipboard;
import com.boydti.fawe.object.exception.FaweException;
import com.boydti.fawe.object.task.SimpleAsyncNotifyQueue;
import com.boydti.fawe.regions.FaweMaskManager;
import com.boydti.fawe.util.*;
import com.boydti.fawe.wrappers.FakePlayer;
@ -29,12 +30,12 @@ import com.sk89q.worldedit.session.ClipboardHolder;
import com.sk89q.worldedit.world.World;
import com.sk89q.worldedit.world.registry.WorldData;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.text.NumberFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
public abstract class FawePlayer<T> extends Metadatable {
@ -127,6 +128,35 @@ public abstract class FawePlayer<T> extends Metadatable {
}
}
public int cancel(boolean close) {
Collection<FaweQueue> queues = SetQueue.IMP.getAllQueues();
int cancelled = 0;
clearActions();
for (FaweQueue queue : queues) {
Collection<EditSession> sessions = queue.getEditSessions();
for (EditSession session : sessions) {
FawePlayer currentPlayer = session.getPlayer();
if (currentPlayer == this) {
if (session.cancel()) {
cancelled++;
}
}
}
}
VirtualWorld world = getSession().getVirtualWorld();
if (world != null) {
if (close) {
try {
world.close(false);
} catch (IOException e) {
e.printStackTrace();
}
}
else world.clear();
}
return cancelled;
}
public void checkConfirmation(String command, int times, int limit) throws RegionOperationException {
if (command == null || getMeta("cmdConfirmRunning", false)) {
return;
@ -198,14 +228,11 @@ public abstract class FawePlayer<T> extends Metadatable {
if (confirm == null) {
return false;
}
queueAction(new Runnable() {
@Override
public void run() {
setMeta("cmdConfirmRunning", true);
CommandEvent event = new CommandEvent(getPlayer(), confirm);
CommandManager.getInstance().handleCommandOnCurrentThread(event);
setMeta("cmdConfirmRunning", false);
}
queueAction(() -> {
setMeta("cmdConfirmRunning", true);
CommandEvent event = new CommandEvent(getPlayer(), confirm);
CommandManager.getInstance().handleCommandOnCurrentThread(event);
setMeta("cmdConfirmRunning", false);
});
return true;
}
@ -220,62 +247,16 @@ public abstract class FawePlayer<T> extends Metadatable {
}
}
private AtomicInteger runningCount = new AtomicInteger();
/**
* Queue an action to run async
* @param run
*/
public void queueAction(final Runnable run) {
Runnable wrappedTask = new Runnable() {
@Override
public void run() {
try {
run.run();
} catch (Throwable e) {
while (e.getCause() != null) {
e = e.getCause();
}
if (e instanceof WorldEditException) {
sendMessage(BBC.getPrefix() + e.getLocalizedMessage());
} else {
FaweException fe = FaweException.get(e);
if (fe != null) {
sendMessage(fe.getMessage());
} else {
e.printStackTrace();
}
}
}
runningCount.decrementAndGet();
Runnable next = getActions().poll();
if (next != null) {
next.run();
}
}
};
getActions().add(wrappedTask);
FaweLimit limit = getLimit();
if (runningCount.getAndIncrement() < limit.MAX_ACTIONS) {
Runnable task = getActions().poll();
if (task != null) {
task.run();
}
}
runAction(run, false, true);
}
public void clearActions() {
while (getActions().poll() != null) {
runningCount.decrementAndGet();
}
}
private ConcurrentLinkedDeque<Runnable> getActions() {
ConcurrentLinkedDeque<Runnable> adder = getMeta("fawe_action_v2");
if (adder == null) {
adder = new ConcurrentLinkedDeque();
ConcurrentLinkedDeque<Runnable> previous = (ConcurrentLinkedDeque<Runnable>) getAndSetMeta("fawe_action_v2", adder);
if (previous != null) {
setMeta("fawe_action_v2", adder = previous);
}
}
return adder;
asyncNotifyQueue.clear();
}
public boolean runAsyncIfFree(Runnable r) {
@ -286,29 +267,52 @@ public abstract class FawePlayer<T> extends Metadatable {
return runAction(r, true, false);
}
public boolean runAction(final Runnable ifFree, boolean checkFree, boolean async) {
long[] actionTime = getMeta("lastActionTime");
if (actionTime == null) {
setMeta("lastActionTime", actionTime = new long[2]);
}
actionTime[1] = actionTime[0];
actionTime[0] = Fawe.get().getTimer().getTick();
if (checkFree) {
if (async) {
TaskManager.IMP.taskNow(new Runnable() {
@Override
public void run() {
queueAction(ifFree);
}
}, async);
} else {
queueAction(ifFree);
// Queue for async tasks
private AtomicInteger runningCount = new AtomicInteger();
private SimpleAsyncNotifyQueue asyncNotifyQueue = new SimpleAsyncNotifyQueue(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
while (e.getCause() != null) {
e = e.getCause();
}
if (e instanceof WorldEditException) {
sendMessage(BBC.getPrefix() + e.getLocalizedMessage());
} else {
FaweException fe = FaweException.get(e);
if (fe != null) {
sendMessage(fe.getMessage());
} else {
e.printStackTrace();
}
}
return true;
} else {
TaskManager.IMP.taskNow(ifFree, async);
}
return false;
});
/**
* Run a task either async, or on the current thread
* @param ifFree
* @param checkFree Whether to first check if a task is running
* @param async
* @return false if the task was ran or queued
*/
public boolean runAction(final Runnable ifFree, boolean checkFree, boolean async) {
if (checkFree) {
if (runningCount.get() != 0) return false;
}
Runnable wrapped = () -> {
try {
runningCount.addAndGet(1);
ifFree.run();
} finally {
runningCount.decrementAndGet();
}
};
if (async) {
asyncNotifyQueue.queue(wrapped);
} else {
TaskManager.IMP.taskNow(wrapped, false);
}
return true;
}
public boolean checkAction() {
@ -609,6 +613,7 @@ public abstract class FawePlayer<T> extends Metadatable {
* - Usually called on logout
*/
public void unregister() {
cancel(true);
if (Settings.IMP.HISTORY.DELETE_ON_LOGOUT) {
session = getSession();
WorldEdit.getInstance().removeSession(toWorldEditPlayer());

View File

@ -3,12 +3,12 @@ package com.boydti.fawe.object.task;
import com.boydti.fawe.util.TaskManager;
import java.util.concurrent.atomic.AtomicBoolean;
public abstract class SingleThreadNotifyQueue {
public abstract class AsyncNotifyQueue {
private Object lock = new Object();
private final Runnable task;
private final AtomicBoolean running = new AtomicBoolean();
public SingleThreadNotifyQueue() {
public AsyncNotifyQueue() {
this.task = new Runnable() {
@Override
public void run() {

View File

@ -0,0 +1,37 @@
package com.boydti.fawe.object.task;
import java.util.concurrent.ConcurrentLinkedQueue;
public class SimpleAsyncNotifyQueue extends AsyncNotifyQueue {
private ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>();
private Thread.UncaughtExceptionHandler handler;
public SimpleAsyncNotifyQueue(Thread.UncaughtExceptionHandler handler) {
this.handler = handler;
}
@Override
public boolean hasQueued() {
return !tasks.isEmpty();
}
@Override
public void operate() {
while (!tasks.isEmpty()) {
Runnable task = tasks.poll();
try {
if (task != null) task.run();
} catch (Throwable e) {
if (handler != null) handler.uncaughtException(Thread.currentThread(), e);
}
}
}
public int getSize() {
return tasks.size();
}
public void clear() {
tasks.clear();
}
}