Fix race condition in the import process (#833)

This commit is contained in:
Luck 2018-03-14 22:01:52 +00:00
parent dbfc524180
commit 4773934481
No known key found for this signature in database
GPG Key ID: EFA9B3EC5FD90F8B
13 changed files with 134 additions and 101 deletions

View File

@ -26,21 +26,26 @@
package me.lucko.luckperms.common.backup;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.MultimapBuilder;
import me.lucko.luckperms.common.command.CommandManager;
import me.lucko.luckperms.common.command.CommandResult;
import me.lucko.luckperms.common.locale.message.Message;
import me.lucko.luckperms.common.sender.DummySender;
import me.lucko.luckperms.common.sender.Sender;
import me.lucko.luckperms.common.utils.Cycle;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@ -54,8 +59,8 @@ public class Importer implements Runnable {
private final CommandManager commandManager;
private final Set<Sender> notify;
private final List<String> commands;
private final List<ImportCommand> toExecute;
private final List<String> commandList;
private final List<ImportCommand> commands;
public Importer(CommandManager commandManager, Sender executor, List<String> commands) {
this.commandManager = commandManager;
@ -65,7 +70,7 @@ public class Importer implements Runnable {
} else {
this.notify = ImmutableSet.of(executor, commandManager.getPlugin().getConsoleSender());
}
this.commands = commands.stream()
this.commandList = commands.stream()
.map(String::trim)
.filter(s -> !s.isEmpty())
.filter(s -> !s.startsWith("#"))
@ -73,7 +78,7 @@ public class Importer implements Runnable {
.map(s -> s.startsWith("/luckperms ") ? s.substring("/luckperms ".length()) : s)
.map(s -> s.startsWith("/lp ") ? s.substring("/lp ".length()) : s)
.collect(Collectors.toList());
this.toExecute = new ArrayList<>();
this.commands = new ArrayList<>();
}
@Override
@ -81,11 +86,14 @@ public class Importer implements Runnable {
long startTime = System.currentTimeMillis();
this.notify.forEach(s -> Message.IMPORT_START.send(s));
// start an update task in the background - we'll #join this later
CompletableFuture<Void> updateTask = CompletableFuture.runAsync(() -> this.commandManager.getPlugin().getUpdateTaskBuffer().requestDirectly());
// form instances for all commands, and register them
int index = 1;
for (String command : this.commands) {
for (String command : this.commandList) {
ImportCommand cmd = new ImportCommand(this.commandManager, index, command);
this.toExecute.add(cmd);
this.commands.add(cmd);
if (cmd.getCommand().startsWith("creategroup ") || cmd.getCommand().startsWith("createtrack ")) {
cmd.process(); // process immediately
@ -94,37 +102,37 @@ public class Importer implements Runnable {
index++;
}
// divide commands up into pools
Cycle<List<ImportCommand>> commandPools = new Cycle<>(nInstances(128, ArrayList::new));
String lastTarget = null;
for (ImportCommand cmd : this.toExecute) {
// if the last target isn't the same, skip to a new pool
if (lastTarget == null || !lastTarget.equals(cmd.getTarget())) {
commandPools.next();
}
commandPools.current().add(cmd);
lastTarget = cmd.getTarget();
// split data up into sections for each holder
// holder id --> commands
ListMultimap<String, ImportCommand> sections = MultimapBuilder.linkedHashKeys().arrayListValues().build();
for (ImportCommand cmd : this.commands) {
String target = Strings.nullToEmpty(cmd.getTarget());
sections.put(target, cmd);
}
// join the update task future before scheduling command executions
updateTask.join();
// build a list of commands to be executed by each thread
ExecutorService executor = Executors.newFixedThreadPool(128);
// A set of futures, which are really just the threads we need to wait for.
Set<CompletableFuture<Void>> futures = new HashSet<>();
AtomicInteger processedCount = new AtomicInteger(0);
// iterate through each user sublist.
for (List<ImportCommand> subList : commandPools.getBacking()) {
for (Collection<ImportCommand> subList : sections.asMap().values()) {
// register and start a new thread to process the sublist
futures.add(CompletableFuture.runAsync(() -> {
futures.add(CompletableFuture.completedFuture(subList).thenAcceptAsync(sl -> {
// iterate through each user in the sublist, and grab their data.
for (ImportCommand cmd : subList) {
for (ImportCommand cmd : sl) {
cmd.process();
processedCount.incrementAndGet();
}
}, this.commandManager.getPlugin().getBootstrap().getScheduler().async()));
}, executor));
}
// all of the threads have been scheduled now and are running. we just need to wait for them all to complete
@ -132,7 +140,7 @@ public class Importer implements Runnable {
while (true) {
try {
overallFuture.get(10, TimeUnit.SECONDS);
overallFuture.get(2, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException e) {
// abnormal error - just break
e.printStackTrace();
@ -147,11 +155,12 @@ public class Importer implements Runnable {
break;
}
executor.shutdown();
long endTime = System.currentTimeMillis();
double seconds = (endTime - startTime) / 1000;
int errors = (int) this.toExecute.stream().filter(v -> v.getResult().wasFailure()).count();
int errors = (int) this.commands.stream().filter(v -> v.getResult().wasFailure()).count();
switch (errors) {
case 0:
@ -166,7 +175,7 @@ public class Importer implements Runnable {
}
AtomicInteger errIndex = new AtomicInteger(1);
for (ImportCommand e : this.toExecute) {
for (ImportCommand e : this.commands) {
if (e.getResult() != null && e.getResult().wasFailure()) {
this.notify.forEach(s -> {
Message.IMPORT_END_ERROR_HEADER.send(s, errIndex.get(), e.getId(), e.getCommand(), e.getResult().toString());
@ -182,8 +191,8 @@ public class Importer implements Runnable {
}
private void sendProgress(int processedCount) {
int percent = (processedCount * 100) / this.commands.size();
int errors = (int) this.toExecute.stream().filter(v -> v.isCompleted() && v.getResult().wasFailure()).count();
int percent = (processedCount * 100) / this.commandList.size();
int errors = (int) this.commands.stream().filter(v -> v.isCompleted() && v.getResult().wasFailure()).count();
if (errors == 1) {
this.notify.forEach(s -> Message.IMPORT_PROGRESS_SIN.send(s, percent, processedCount, this.commands.size(), errors));

View File

@ -42,7 +42,64 @@ import java.util.Optional;
*/
public final class StorageAssistant {
public static Group loadGroup(String target, Sender sender, LuckPermsPlugin plugin, boolean auditTemporary) {
// special handling for the importer
if (sender.isImport()) {
Group group = plugin.getGroupManager().getIfLoaded(target);
if (group == null) {
Message.GROUP_NOT_FOUND.send(sender, target);
}
return group;
}
Group group = plugin.getStorage().loadGroup(target).join().orElse(null);
if (group == null) {
// failed to load, but it might be a display name.
group = plugin.getGroupManager().getByDisplayName(target);
// nope, not a display name
if (group == null) {
Message.GROUP_NOT_FOUND.send(sender, target);
return null;
}
// it was a display name, we need to reload
plugin.getStorage().loadGroup(group.getName()).join();
}
if (auditTemporary) {
group.auditTemporaryPermissions();
}
return group;
}
public static Track loadTrack(String target, Sender sender, LuckPermsPlugin plugin) {
Track track;
// special handling for the importer
if (sender.isImport()) {
track = plugin.getTrackManager().getIfLoaded(target);
} else {
track = plugin.getStorage().loadTrack(target).join().orElse(null);
}
if (track == null) {
Message.TRACK_NOT_FOUND.send(sender, target);
return null;
}
return track;
}
public static void save(User user, Sender sender, LuckPermsPlugin plugin) {
// special handling for the importer
if (sender.isImport()) {
// join calls to save users - as we always load them
plugin.getStorage().saveUser(user).join();
return;
}
try {
plugin.getStorage().noBuffer().saveUser(user).get();
} catch (Exception e) {
@ -51,21 +108,22 @@ public final class StorageAssistant {
return;
}
if (sender.isImport()) {
user.getRefreshBuffer().request();
} else {
user.getRefreshBuffer().requestDirectly();
}
user.getRefreshBuffer().requestDirectly();
if (!sender.isImport()) {
Optional<InternalMessagingService> messagingService = plugin.getMessagingService();
if (messagingService.isPresent() && plugin.getConfiguration().get(ConfigKeys.AUTO_PUSH_UPDATES)) {
messagingService.get().pushUserUpdate(user);
}
Optional<InternalMessagingService> messagingService = plugin.getMessagingService();
if (messagingService.isPresent() && plugin.getConfiguration().get(ConfigKeys.AUTO_PUSH_UPDATES)) {
messagingService.get().pushUserUpdate(user);
}
}
public static void save(Group group, Sender sender, LuckPermsPlugin plugin) {
// special handling for the importer
if (sender.isImport()) {
// allow the buffer to handle things
plugin.getStorage().saveGroup(group);
return;
}
try {
plugin.getStorage().noBuffer().saveGroup(group).get();
} catch (Exception e) {
@ -74,21 +132,22 @@ public final class StorageAssistant {
return;
}
if (sender.isImport()) {
plugin.getUpdateTaskBuffer().request();
} else {
plugin.getUpdateTaskBuffer().requestDirectly();
}
plugin.getUpdateTaskBuffer().requestDirectly();
if (!sender.isImport()) {
Optional<InternalMessagingService> messagingService = plugin.getMessagingService();
if (messagingService.isPresent() && plugin.getConfiguration().get(ConfigKeys.AUTO_PUSH_UPDATES)) {
messagingService.get().getUpdateBuffer().request();
}
Optional<InternalMessagingService> messagingService = plugin.getMessagingService();
if (messagingService.isPresent() && plugin.getConfiguration().get(ConfigKeys.AUTO_PUSH_UPDATES)) {
messagingService.get().getUpdateBuffer().request();
}
}
public static void save(Track track, Sender sender, LuckPermsPlugin plugin) {
// special handling for the importer
if (sender.isImport()) {
// allow the buffer to handle things
plugin.getStorage().saveTrack(track);
return;
}
try {
plugin.getStorage().noBuffer().saveTrack(track).get();
} catch (Exception e) {
@ -97,17 +156,11 @@ public final class StorageAssistant {
return;
}
if (sender.isImport()) {
plugin.getUpdateTaskBuffer().request();
} else {
plugin.getUpdateTaskBuffer().requestDirectly();
}
plugin.getUpdateTaskBuffer().requestDirectly();
if (!sender.isImport()) {
Optional<InternalMessagingService> messagingService = plugin.getMessagingService();
if (messagingService.isPresent() && plugin.getConfiguration().get(ConfigKeys.AUTO_PUSH_UPDATES)) {
messagingService.get().getUpdateBuffer().request();
}
Optional<InternalMessagingService> messagingService = plugin.getMessagingService();
if (messagingService.isPresent() && plugin.getConfiguration().get(ConfigKeys.AUTO_PUSH_UPDATES)) {
messagingService.get().getUpdateBuffer().request();
}
}

View File

@ -65,9 +65,8 @@ public class ParentAdd extends SharedSubCommand {
String groupName = ArgumentParser.parseName(0, args);
MutableContextSet context = ArgumentParser.parseContext(1, args, plugin);
Group group = plugin.getStorage().loadGroup(groupName).join().orElse(null);
Group group = StorageAssistant.loadGroup(groupName, sender, plugin, false);
if (group == null) {
Message.DOES_NOT_EXIST.send(sender, groupName);
return CommandResult.INVALID_ARGS;
}

View File

@ -72,9 +72,8 @@ public class ParentAddTemp extends SharedSubCommand {
TemporaryModifier modifier = ArgumentParser.parseTemporaryModifier(2, args).orElseGet(() -> plugin.getConfiguration().get(ConfigKeys.TEMPORARY_ADD_BEHAVIOUR));
MutableContextSet context = ArgumentParser.parseContext(2, args, plugin);
Group group = plugin.getStorage().loadGroup(groupName).join().orElse(null);
Group group = StorageAssistant.loadGroup(groupName, sender, plugin, false);
if (group == null) {
Message.DOES_NOT_EXIST.send(sender, groupName);
return CommandResult.INVALID_ARGS;
}

View File

@ -68,9 +68,8 @@ public class ParentClearTrack extends SharedSubCommand {
return CommandResult.INVALID_ARGS;
}
Track track = plugin.getStorage().loadTrack(trackName).join().orElse(null);
Track track = StorageAssistant.loadTrack(trackName, sender, plugin);
if (track == null) {
Message.DOES_NOT_EXIST.send(sender, trackName);
return CommandResult.LOADING_ERROR;
}

View File

@ -65,9 +65,8 @@ public class ParentSet extends SharedSubCommand {
String groupName = ArgumentParser.parseName(0, args);
MutableContextSet context = ArgumentParser.parseContext(1, args, plugin);
Group group = plugin.getStorage().loadGroup(groupName).join().orElse(null);
Group group = StorageAssistant.loadGroup(groupName, sender, plugin, false);
if (group == null) {
Message.DOES_NOT_EXIST.send(sender, groupName);
return CommandResult.LOADING_ERROR;
}

View File

@ -70,9 +70,8 @@ public class ParentSetTrack extends SharedSubCommand {
return CommandResult.INVALID_ARGS;
}
Track track = plugin.getStorage().loadTrack(trackName).join().orElse(null);
Track track = StorageAssistant.loadTrack(trackName, sender, plugin);
if (track == null) {
Message.DOES_NOT_EXIST.send(sender, trackName);
return CommandResult.LOADING_ERROR;
}
@ -100,9 +99,8 @@ public class ParentSetTrack extends SharedSubCommand {
MutableContextSet context = ArgumentParser.parseContext(2, args, plugin);
Group group = plugin.getStorage().loadGroup(groupName).join().orElse(null);
Group group = StorageAssistant.loadGroup(groupName, sender, plugin, false);
if (group == null) {
Message.DOES_NOT_EXIST.send(sender, groupName);
return CommandResult.LOADING_ERROR;
}

View File

@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableList;
import me.lucko.luckperms.common.command.abstraction.Command;
import me.lucko.luckperms.common.command.abstraction.MainCommand;
import me.lucko.luckperms.common.command.utils.StorageAssistant;
import me.lucko.luckperms.common.commands.generic.meta.CommandMeta;
import me.lucko.luckperms.common.commands.generic.other.HolderClear;
import me.lucko.luckperms.common.commands.generic.other.HolderEditor;
@ -39,7 +40,6 @@ import me.lucko.luckperms.common.commands.generic.parent.CommandParent;
import me.lucko.luckperms.common.commands.generic.permission.CommandPermission;
import me.lucko.luckperms.common.locale.LocaleManager;
import me.lucko.luckperms.common.locale.command.CommandSpec;
import me.lucko.luckperms.common.locale.message.Message;
import me.lucko.luckperms.common.model.Group;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import me.lucko.luckperms.common.sender.Sender;
@ -84,20 +84,7 @@ public class GroupMainCommand extends MainCommand<Group, String> {
@Override
protected Group getTarget(String target, LuckPermsPlugin plugin, Sender sender) {
Group group = plugin.getStorage().loadGroup(target).join().orElse(null);
if (group == null) {
// failed to load, but it might be a display name.
group = plugin.getGroupManager().getByDisplayName(target);
// nope, not a display name
if (group == null) {
Message.GROUP_NOT_FOUND.send(sender, target);
return null;
}
}
group.auditTemporaryPermissions();
return group;
return StorageAssistant.loadGroup(target, sender, plugin, true);
}
@Override

View File

@ -58,9 +58,8 @@ public class TrackAppend extends SubCommand<Track> {
return CommandResult.INVALID_ARGS;
}
Group group = plugin.getStorage().loadGroup(groupName).join().orElse(null);
Group group = StorageAssistant.loadGroup(groupName, sender, plugin, false);
if (group == null) {
Message.DOES_NOT_EXIST.send(sender, groupName);
return CommandResult.LOADING_ERROR;
}

View File

@ -66,9 +66,8 @@ public class TrackInsert extends SubCommand<Track> {
return CommandResult.INVALID_ARGS;
}
Group group = plugin.getStorage().loadGroup(groupName).join().orElse(null);
Group group = StorageAssistant.loadGroup(groupName, sender, plugin, false);
if (group == null) {
Message.DOES_NOT_EXIST.send(sender, groupName);
return CommandResult.LOADING_ERROR;
}

View File

@ -31,9 +31,9 @@ import com.google.common.collect.ImmutableList;
import me.lucko.luckperms.common.command.abstraction.Command;
import me.lucko.luckperms.common.command.abstraction.MainCommand;
import me.lucko.luckperms.common.command.utils.StorageAssistant;
import me.lucko.luckperms.common.locale.LocaleManager;
import me.lucko.luckperms.common.locale.command.CommandSpec;
import me.lucko.luckperms.common.locale.message.Message;
import me.lucko.luckperms.common.model.Track;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import me.lucko.luckperms.common.sender.Sender;
@ -73,13 +73,7 @@ public class TrackMainCommand extends MainCommand<Track, String> {
@Override
protected Track getTarget(String target, LuckPermsPlugin plugin, Sender sender) {
Track track = plugin.getStorage().loadTrack(target).join().orElse(null);
if (track == null) {
Message.TRACK_NOT_FOUND.send(sender, target);
return null;
}
return track;
return StorageAssistant.loadTrack(target, sender, plugin);
}
@Override

View File

@ -73,9 +73,8 @@ public class UserDemote extends SubCommand<User> {
return CommandResult.INVALID_ARGS;
}
Track track = plugin.getStorage().loadTrack(trackName).join().orElse(null);
Track track = StorageAssistant.loadTrack(trackName, sender, plugin);
if (track == null) {
Message.DOES_NOT_EXIST.send(sender, trackName);
return CommandResult.LOADING_ERROR;
}

View File

@ -71,9 +71,8 @@ public class UserPromote extends SubCommand<User> {
return CommandResult.INVALID_ARGS;
}
Track track = plugin.getStorage().loadTrack(trackName).join().orElse(null);
Track track = StorageAssistant.loadTrack(trackName, sender, plugin);
if (track == null) {
Message.DOES_NOT_EXIST.send(sender, trackName);
return CommandResult.LOADING_ERROR;
}