Adds concurrent island leveling.

New config.yml entry. Can do concurrent checks up to amount admin
decides.
Fixes issue with reloading where the queue was not disabled and
continued to run.
This commit is contained in:
tastybento 2020-07-18 13:39:10 -07:00
parent 35da2246c5
commit 3cc887b6d3
6 changed files with 96 additions and 24 deletions

View File

@ -65,7 +65,7 @@
<!-- Do not change unless you want different name for local builds. -->
<build.number>-LOCAL</build.number>
<!-- This allows to change between versions. -->
<build.version>2.3.2</build.version>
<build.version>2.3.3</build.version>
</properties>
<!-- Profiles will allow to automatically change build version. -->

View File

@ -100,7 +100,32 @@ public class Level extends Addon implements Listener {
@EventHandler
public void onBentoBoxReady(BentoBoxReadyEvent e) {
manager.loadTopTens();
/*
* DEBUG code to generate fake islands and then try to level them all.
Bukkit.getScheduler().runTaskLater(getPlugin(), () -> {
getPlugin().getAddonsManager().getGameModeAddons().stream()
.filter(gm -> !settings.getGameModes().contains(gm.getDescription().getName()))
.forEach(gm -> {
for (int i = 0; i < 1000; i++) {
try {
NewIsland.builder().addon(gm).player(User.getInstance(UUID.randomUUID())).name("default").reason(Reason.CREATE).noPaste().build();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
});
// Queue all islands DEBUG
getIslands().getIslands().stream().filter(Island::isOwned).forEach(is -> {
this.getManager().calculateLevel(is.getOwner(), is).thenAccept(r ->
log("Result for island calc " + r.getLevel() + " at " + is.getCenter()));
});
}, 60L);*/
}
private void registerPlaceholders(GameModeAddon gm) {
if (getPlugin().getPlaceholdersManager() == null) return;
@ -172,6 +197,9 @@ public class Level extends Addon implements Listener {
@Override
public void onDisable() {
// Stop the pipeline
this.getPipeliner().stop();
// Save player data and the top tens
if (manager != null) {
manager.save();
}

View File

@ -121,7 +121,6 @@ public class LevelsManager {
addon.getPipeliner().addIsland(island).thenAccept(r -> {
// Results are irrelevant because the island is unowned or deleted, or IslandLevelCalcEvent is cancelled
if (r == null || fireIslandLevelCalcEvent(targetPlayer, island, r)) {
addon.logWarning("Island calcs stopped due to event cancelation");
result.complete(null);
}
// Save result

View File

@ -1,6 +1,8 @@
package world.bentobox.level.calculators;
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -19,9 +21,9 @@ import world.bentobox.level.Level;
public class Pipeliner {
private static final int START_DURATION = 10; // 10 seconds
private final Queue<IslandLevelCalculator> processQueue;
private final Queue<IslandLevelCalculator> toProcessQueue;
private final Set<IslandLevelCalculator> inProcessQueue;
private final BukkitTask task;
private boolean inProcess;
private final Level addon;
private long time;
private long count;
@ -31,25 +33,26 @@ public class Pipeliner {
*/
public Pipeliner(Level addon) {
this.addon = addon;
processQueue = new ConcurrentLinkedQueue<>();
toProcessQueue = new ConcurrentLinkedQueue<>();
inProcessQueue = new HashSet<>();
// Loop continuously - check every tick if there is an island to scan
task = Bukkit.getScheduler().runTaskTimer(BentoBox.getInstance(), () -> {
if (!BentoBox.getInstance().isEnabled()) {
cancel();
return;
}
// One island at a time
if (inProcess || processQueue.isEmpty()) return;
IslandLevelCalculator iD = processQueue.poll();
// Complete the current to Process queue first
if (!inProcessQueue.isEmpty() || toProcessQueue.isEmpty()) return;
for (int j = 0; j < addon.getSettings().getConcurrentIslandCalcs() && !toProcessQueue.isEmpty(); j++) {
IslandLevelCalculator iD = toProcessQueue.poll();
// Ignore deleted or unonwed islands
if (iD.getIsland().isDeleted() || iD.getIsland().isUnowned()) return;
// Start the process
inProcess = true;
if (!iD.getIsland().isDeleted() && !iD.getIsland().isUnowned()) {
inProcessQueue.add(iD);
// Start the scanning of a island with the first chunk
scanChunk(iD);
}, 1L, 1L);
}
}
}, 1L, 10L);
}
private void cancel() {
@ -60,7 +63,7 @@ public class Pipeliner {
* @return number of islands currently in the queue or in process
*/
public int getIslandsInQueue() {
return inProcess ? processQueue.size() + 1 : processQueue.size();
return inProcessQueue.size() + toProcessQueue.size();
}
/**
@ -68,10 +71,9 @@ public class Pipeliner {
* @param iD
*/
private void scanChunk(IslandLevelCalculator iD) {
if (iD.getIsland().isDeleted() || iD.getIsland().isUnowned()) {
if (iD.getIsland().isDeleted() || iD.getIsland().isUnowned() || task.isCancelled()) {
// Island is deleted, so finish early with nothing
addon.log("Canceling island level calculation - island has been deleted, or has become unowned.");
inProcess = false;
inProcessQueue.remove(iD);
iD.getR().complete(null);
return;
}
@ -80,12 +82,12 @@ public class Pipeliner {
if (!Bukkit.isPrimaryThread()) {
addon.getPlugin().logError("scanChunk not on Primary Thread!");
}
if (Boolean.TRUE.equals(r)) {
if (Boolean.TRUE.equals(r) || task.isCancelled()) {
// scanNextChunk returns true if there are more chunks to scan
scanChunk(iD);
} else {
// Done
inProcess = false;
inProcessQueue.remove(iD);
iD.getR().complete(iD.getResults());
}
});
@ -100,7 +102,12 @@ public class Pipeliner {
*/
public CompletableFuture<Results> addIsland(Island island) {
CompletableFuture<Results> r = new CompletableFuture<>();
processQueue.add(new IslandLevelCalculator(addon, island, r));
// Check if queue already contains island
/*
if (processQueue.parallelStream().map(IslandLevelCalculator::getIsland).anyMatch(island::equals)) {
return CompletableFuture.completedFuture(null);
}*/
toProcessQueue.add(new IslandLevelCalculator(addon, island, r));
count++;
return r;
}
@ -122,5 +129,15 @@ public class Pipeliner {
this.time += time;
}
/**
* Stop the current queue.
*/
public void stop() {
addon.log("Stopping Level queue");
task.cancel();
this.inProcessQueue.clear();
this.toProcessQueue.clear();
}
}

View File

@ -19,6 +19,12 @@ public class ConfigSettings implements ConfigObject {
@ConfigEntry(path = "disabled-game-modes")
private List<String> gameModes = Collections.emptyList();
@ConfigComment("")
@ConfigComment("Number of concurrent island calculations")
@ConfigComment("If your CPU can handle it, you can run parallel island calcs if there are more than one in the queue")
@ConfigEntry(path = "concurrent-island-calcs")
private int concurrentIslandCalcs = 1;
@ConfigComment("")
@ConfigComment("Calculate island level on login")
@ConfigComment("This silently calculates the player's island level when they login")
@ -292,6 +298,24 @@ public class ConfigSettings implements ConfigObject {
}
/**
* @return the concurrentIslandCalcs
*/
public int getConcurrentIslandCalcs() {
if (concurrentIslandCalcs < 1) concurrentIslandCalcs = 1;
return concurrentIslandCalcs;
}
/**
* @param concurrentIslandCalcs the concurrentIslandCalcs to set
*/
public void setConcurrentIslandCalcs(int concurrentIslandCalcs) {
if (concurrentIslandCalcs < 1) concurrentIslandCalcs = 1;
this.concurrentIslandCalcs = concurrentIslandCalcs;
}

View File

@ -6,6 +6,10 @@
disabled-game-modes:
- AOneBlock
#
# Number of concurrent island calculations
# If your CPU can handle it, you can run parallel island calcs if there are more than one in the queue
concurrent-island-calcs: 1
#
# Calculate island level on login
# This silently calculates the player's island level when they login
# This applies to all islands the player has on the server, e.g., BSkyBlock, AcidIsland