From 6bc7f1fca24b01a2ecef949326932fa0fd9a5327 Mon Sep 17 00:00:00 2001 From: Luck Date: Mon, 4 Sep 2017 22:24:07 +0100 Subject: [PATCH] Refactor Importer/Exporter to use multiple threads (#317) --- .../luckperms/common/backup/Exporter.java | 56 ++--- .../luckperms/common/backup/Importer.java | 208 ++++++++++++------ .../common/backup/ImporterSender.java | 10 +- .../luckperms/common/commands/utils/Util.java | 9 + .../common/logging/ProgressLogger.java | 2 +- .../lucko/luckperms/common/utils/Cycle.java | 79 +++++++ 6 files changed, 256 insertions(+), 108 deletions(-) create mode 100644 common/src/main/java/me/lucko/luckperms/common/utils/Cycle.java diff --git a/common/src/main/java/me/lucko/luckperms/common/backup/Exporter.java b/common/src/main/java/me/lucko/luckperms/common/backup/Exporter.java index 2c525a13d..e16afa1e6 100644 --- a/common/src/main/java/me/lucko/luckperms/common/backup/Exporter.java +++ b/common/src/main/java/me/lucko/luckperms/common/backup/Exporter.java @@ -35,6 +35,7 @@ import me.lucko.luckperms.common.model.User; import me.lucko.luckperms.common.node.NodeFactory; import me.lucko.luckperms.common.plugin.LuckPermsPlugin; import me.lucko.luckperms.common.storage.Storage; +import me.lucko.luckperms.common.utils.Cycle; import java.io.BufferedWriter; import java.io.IOException; @@ -44,7 +45,6 @@ import java.nio.file.Path; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.Date; import java.util.HashSet; import java.util.List; @@ -54,7 +54,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; -import java.util.function.Function; /** * Handles export operations @@ -62,15 +61,6 @@ import java.util.function.Function; public class Exporter implements Runnable { private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z"); - // number of users --> the value to divide the list by. base value of 1000 per thread, with a max of 10 threads. so, 3000 = 3 threads, 10000+ = 10 threads - private static final Function THREAD_COUNT_FUNCTION = usersCount -> { - // how many threads to make. must be 1 <= x <= 15 - int i = Math.max(1, Math.min(15, usersCount / 1000)); - - // then work out the value to split at. e.g. if we have 1,000 users to export and 2 threads, we should split at every 500 users. - return Math.max(1, usersCount / i); - }; - private static void write(BufferedWriter writer, String s) { try { writer.write(s); @@ -109,27 +99,27 @@ public class Exporter implements Runnable { // Create the actual groups first write(writer, "# Create groups"); - plugin.getGroupManager().getAll().values().stream() - .sorted((o1, o2) -> { - int i = Integer.compare(o2.getWeight().orElse(0), o1.getWeight().orElse(0)); - return i != 0 ? i : o1.getName().compareToIgnoreCase(o2.getName()); - }) - .filter(g -> !g.getName().equals("default")) - .forEach(group -> { - write(writer, "/luckperms creategroup " + group.getName()); - }); - - write(writer, ""); AtomicInteger groupCount = new AtomicInteger(0); - // export groups in order of weight plugin.getGroupManager().getAll().values().stream() + // export groups in order of weight .sorted((o1, o2) -> { int i = Integer.compare(o2.getWeight().orElse(0), o1.getWeight().orElse(0)); return i != 0 ? i : o1.getName().compareToIgnoreCase(o2.getName()); }) + // create all groups initially + .peek(group -> { + if (!group.getName().equals("default")) { + write(writer, "/luckperms creategroup " + group.getName()); + } + }) + // then export the content of each group .forEach(group -> { + if (groupCount.get() == 0) { + write(writer, ""); + } + write(writer, "# Export group: " + group.getName()); for (Node node : group.getEnduringNodes().values()) { write(writer, NodeFactory.nodeAsCommand(node, group.getName(), true, true)); @@ -187,17 +177,13 @@ public class Exporter implements Runnable { write(writer, "# Export users"); - List> subUsers; - AtomicInteger userCount = new AtomicInteger(0); - - // not really that many users, so it's not really worth spreading the load. - if (users.size() < 1500) { - subUsers = Collections.singletonList(new ArrayList<>(users)); - } else { - subUsers = Util.divideList(users, THREAD_COUNT_FUNCTION.apply(users.size())); + // divide into 16 pools. + Cycle> userPools = new Cycle<>(Util.nInstances(16, ArrayList::new)); + for (UUID uuid : users) { + userPools.next().add(uuid); } - log.log("Split users into " + subUsers.size() + " threads for export."); + log.log("Split users into " + userPools.getBacking().size() + " threads for export."); // Setup a file writing lock. We don't want multiple threads writing at the same time. // The write function accepts a list of strings, as we want a user's data to be grouped together. @@ -217,8 +203,10 @@ public class Exporter implements Runnable { // A set of futures, which are really just the threads we need to wait for. Set> futures = new HashSet<>(); + AtomicInteger userCount = new AtomicInteger(0); + // iterate through each user sublist. - for (List subList : subUsers) { + for (List subList : userPools.getBacking()) { // register and start a new thread to process the sublist futures.add(CompletableFuture.runAsync(() -> { @@ -229,7 +217,7 @@ public class Exporter implements Runnable { // actually export the user. this output will be fed to the writing function when we have all of the user's data. List output = new ArrayList<>(); - plugin.getStorage().loadUser(uuid, "null").join(); + plugin.getStorage().loadUser(uuid, null).join(); User user = plugin.getUserManager().getIfLoaded(uuid); output.add("# Export user: " + user.getUuid().toString() + " - " + user.getName().orElse("unknown username")); diff --git a/common/src/main/java/me/lucko/luckperms/common/backup/Importer.java b/common/src/main/java/me/lucko/luckperms/common/backup/Importer.java index 77bee9f1f..8f0b80930 100644 --- a/common/src/main/java/me/lucko/luckperms/common/backup/Importer.java +++ b/common/src/main/java/me/lucko/luckperms/common/backup/Importer.java @@ -25,7 +25,6 @@ package me.lucko.luckperms.common.backup; -import lombok.AccessLevel; import lombok.Getter; import lombok.Setter; @@ -37,13 +36,17 @@ import me.lucko.luckperms.common.commands.CommandResult; import me.lucko.luckperms.common.commands.sender.Sender; import me.lucko.luckperms.common.commands.utils.Util; import me.lucko.luckperms.common.locale.Message; -import me.lucko.luckperms.common.utils.DateUtil; +import me.lucko.luckperms.common.plugin.LuckPermsPlugin; +import me.lucko.luckperms.common.utils.Cycle; import java.util.ArrayList; -import java.util.HashMap; +import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -51,16 +54,11 @@ import java.util.stream.Collectors; * Handles import operations */ public class Importer implements Runnable { - private static final int PROGRESS_REPORT_SECONDS = 10; private final CommandManager commandManager; private final Set notify; private final List commands; - private final Map cmdResult; - private final ImporterSender fake; - - private long lastMsg = 0; - private int executing = -1; + private final List toExecute; public Importer(CommandManager commandManager, Sender executor, List commands) { this.commandManager = commandManager; @@ -77,11 +75,10 @@ public class Importer implements Runnable { .filter(s -> !s.startsWith("//")) .map(s -> s.startsWith("/") ? s.substring("/".length()) : s) .map(s -> s.startsWith("perms ") ? s.substring("perms ".length()) : s) + .map(s -> s.startsWith("lp ") ? s.substring("lp ".length()) : s) .map(s -> s.startsWith("luckperms ") ? s.substring("luckperms ".length()) : s) .collect(Collectors.toList()); - - this.cmdResult = new HashMap<>(); - this.fake = new ImporterSender(commandManager.getPlugin(), this::logMessage); + this.toExecute = new ArrayList<>(); } @Override @@ -89,35 +86,83 @@ public class Importer implements Runnable { long startTime = System.currentTimeMillis(); notify.forEach(s -> Message.IMPORT_START.send(s)); + // form instances for all commands, and register them int index = 1; for (String command : commands) { - long time = DateUtil.unixSecondsNow(); - if (lastMsg < (time - PROGRESS_REPORT_SECONDS)) { - lastMsg = time; - - sendProgress(index); - } - - executing = index; - try { - CommandResult result = commandManager.onCommand( - fake, - "lp", - Util.stripQuotes(Splitter.on(CommandManager.COMMAND_SEPARATOR_PATTERN).omitEmptyStrings().splitToList(command)) - ).get(); - getResult(index, command).setResult(result); - - } catch (Exception e) { - getResult(index, command).setResult(CommandResult.FAILURE); - e.printStackTrace(); - } + toExecute.add(new ImportCommand(commandManager.getPlugin(), index, command)); index++; } + // divide commands up into pools + Cycle> commandPools = new Cycle<>(Util.nInstances(16, ArrayList::new)); + + String lastTarget = null; + for (ImportCommand cmd : toExecute) { + // if the last target isn't the same, skip to a new pool + if (lastTarget == null || !lastTarget.equals(cmd.getTarget())) { + commandPools.next(); + } + + commandPools.current().add(cmd); + lastTarget = cmd.getTarget(); + } + + // A set of futures, which are really just the threads we need to wait for. + Set> futures = new HashSet<>(); + + AtomicInteger processedCount = new AtomicInteger(0); + + // iterate through each user sublist. + for (List subList : commandPools.getBacking()) { + + // register and start a new thread to process the sublist + futures.add(CompletableFuture.runAsync(() -> { + + // iterate through each user in the sublist, and grab their data. + for (ImportCommand cmd : subList) { + try { + CommandResult result = commandManager.onCommand( + cmd, + "lp", + Util.stripQuotes(Splitter.on(CommandManager.COMMAND_SEPARATOR_PATTERN).omitEmptyStrings().splitToList(cmd.getCommand())) + ).get(); + cmd.setResult(result); + + } catch (Exception e) { + cmd.setResult(CommandResult.FAILURE); + e.printStackTrace(); + } + + processedCount.incrementAndGet(); + } + }, commandManager.getPlugin().getScheduler().async())); + } + + // all of the threads have been scheduled now and are running. we just need to wait for them all to complete + CompletableFuture overallFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); + + while (true) { + try { + overallFuture.get(10, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException e) { + // abnormal error - just break + e.printStackTrace(); + break; + } catch (TimeoutException e) { + // still executing - send a progress report and continue waiting + sendProgress(processedCount.get()); + continue; + } + + // process is complete + break; + } + + long endTime = System.currentTimeMillis(); double seconds = (endTime - startTime) / 1000; - int errors = (int) cmdResult.values().stream().filter(v -> v.getResult() != null && !v.getResult().asBoolean()).count(); + int errors = (int) toExecute.stream().filter(v -> !v.getResult().asBoolean()).count(); if (errors == 0) { notify.forEach(s -> Message.IMPORT_END_COMPLETE.send(s, seconds)); @@ -128,11 +173,11 @@ public class Importer implements Runnable { } AtomicInteger errIndex = new AtomicInteger(1); - for (Map.Entry e : cmdResult.entrySet()) { - if (e.getValue().getResult() != null && !e.getValue().getResult().asBoolean()) { + for (ImportCommand e : toExecute) { + if (e.getResult() != null && !e.getResult().asBoolean()) { notify.forEach(s -> { - Message.IMPORT_END_ERROR_HEADER.send(s, errIndex.get(), e.getKey(), e.getValue().getCommand(), e.getValue().getResult().toString()); - for (String out : e.getValue().getOutput()) { + Message.IMPORT_END_ERROR_HEADER.send(s, errIndex.get(), e.getId(), e.getCommand(), e.getResult().toString()); + for (String out : e.getOutput()) { Message.IMPORT_END_ERROR_CONTENT.send(s, out); } Message.IMPORT_END_ERROR_FOOTER.send(s); @@ -143,50 +188,77 @@ public class Importer implements Runnable { } } - private void sendProgress(int executing) { - int percent = (executing * 100) / commands.size(); - int errors = (int) cmdResult.values().stream().filter(v -> v.getResult() != null && !v.getResult().asBoolean()).count(); + private void sendProgress(int processedCount) { + int percent = (processedCount * 100) / commands.size(); + int errors = (int) toExecute.stream().filter(v -> !v.getResult().asBoolean()).count(); if (errors == 1) { - notify.forEach(s -> Message.IMPORT_PROGRESS_SIN.send(s, percent, executing, commands.size(), errors)); + notify.forEach(s -> Message.IMPORT_PROGRESS_SIN.send(s, percent, processedCount, commands.size(), errors)); } else { - notify.forEach(s -> Message.IMPORT_PROGRESS.send(s, percent, executing, commands.size(), errors)); - } - } - - private Result getResult(int executing, String command) { - return cmdResult.compute(executing, (i, r) -> { - if (r == null) { - r = new Result(command); - } else { - if (!command.equals("") && r.getCommand().equals("")) { - r.setCommand(command); - } - } - - return r; - }); - } - - private void logMessage(String msg) { - if (executing != -1) { - getResult(executing, "").getOutput().add(Util.stripColor(msg)); + notify.forEach(s -> Message.IMPORT_PROGRESS.send(s, percent, processedCount, commands.size(), errors)); } } @Getter - @Setter - private static class Result { + private static class ImportCommand extends ImporterSender { + private static final Splitter SPACE_SPLIT = Splitter.on(" "); + + private final int id; + private final String command; + + private final String target; - @Setter(AccessLevel.NONE) private final List output = new ArrayList<>(); - private String command; + @Setter private CommandResult result = CommandResult.FAILURE; - private Result(String command) { + ImportCommand(LuckPermsPlugin plugin, int id, String command) { + super(plugin); + this.id = id; this.command = command; + this.target = determineTarget(command); } + + @Override + protected void consumeMessage(String s) { + output.add(s); + } + + private static String determineTarget(String command) { + if (command.startsWith("user ") && command.length() > "user ".length()) { + String subCmd = command.substring("user ".length()); + if (!subCmd.contains(" ")) { + return null; + } + + String targetUser = SPACE_SPLIT.split(subCmd).iterator().next(); + return "u:" + targetUser; + } + + if (command.startsWith("group ") && command.length() > "group ".length()) { + String subCmd = command.substring("group ".length()); + if (!subCmd.contains(" ")) { + return null; + } + + String targetGroup = SPACE_SPLIT.split(subCmd).iterator().next(); + return "g:" + targetGroup; + } + + if (command.startsWith("track ") && command.length() > "track ".length()) { + String subCmd = command.substring("track ".length()); + if (!subCmd.contains(" ")) { + return null; + } + + String targetTrack = SPACE_SPLIT.split(subCmd).iterator().next(); + return "t:" + targetTrack; + } + + return null; + } + } } diff --git a/common/src/main/java/me/lucko/luckperms/common/backup/ImporterSender.java b/common/src/main/java/me/lucko/luckperms/common/backup/ImporterSender.java index cdbf7d86e..ed133d552 100644 --- a/common/src/main/java/me/lucko/luckperms/common/backup/ImporterSender.java +++ b/common/src/main/java/me/lucko/luckperms/common/backup/ImporterSender.java @@ -37,12 +37,12 @@ import me.lucko.luckperms.common.utils.TextUtils; import net.kyori.text.Component; import java.util.UUID; -import java.util.function.Consumer; @AllArgsConstructor -public class ImporterSender implements Sender { +public abstract class ImporterSender implements Sender { private final LuckPermsPlugin plugin; - private final Consumer messageConsumer; + + protected abstract void consumeMessage(String s); @Override public LuckPermsPlugin getPlatform() { @@ -61,13 +61,13 @@ public class ImporterSender implements Sender { @Override public void sendMessage(String s) { - messageConsumer.accept(s); + consumeMessage(s); } @SuppressWarnings("deprecation") @Override public void sendMessage(Component message) { - messageConsumer.accept(TextUtils.toLegacy(message)); + consumeMessage(TextUtils.toLegacy(message)); } @Override diff --git a/common/src/main/java/me/lucko/luckperms/common/commands/utils/Util.java b/common/src/main/java/me/lucko/luckperms/common/commands/utils/Util.java index bf4ed4424..f8dd45a22 100644 --- a/common/src/main/java/me/lucko/luckperms/common/commands/utils/Util.java +++ b/common/src/main/java/me/lucko/luckperms/common/commands/utils/Util.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.ListIterator; import java.util.Map; import java.util.UUID; +import java.util.function.Supplier; import java.util.regex.Pattern; @UtilityClass @@ -111,6 +112,14 @@ public class Util { return s == null ? null : STRIP_COLOR_PATTERN.matcher(s).replaceAll(""); } + public static List nInstances(int count, Supplier supplier) { + List ret = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + ret.add(supplier.get()); + } + return ret; + } + public static List> divideList(Iterable source, int size) { List> lists = new ArrayList<>(); Iterator it = source.iterator(); diff --git a/common/src/main/java/me/lucko/luckperms/common/logging/ProgressLogger.java b/common/src/main/java/me/lucko/luckperms/common/logging/ProgressLogger.java index 13535d72a..bf688af72 100644 --- a/common/src/main/java/me/lucko/luckperms/common/logging/ProgressLogger.java +++ b/common/src/main/java/me/lucko/luckperms/common/logging/ProgressLogger.java @@ -38,7 +38,7 @@ import java.util.Set; @RequiredArgsConstructor public class ProgressLogger { - private static final int NOTIFY_FREQUENCY = 100; + private static final int NOTIFY_FREQUENCY = 500; private final String pluginName; private final Message logMessage; diff --git a/common/src/main/java/me/lucko/luckperms/common/utils/Cycle.java b/common/src/main/java/me/lucko/luckperms/common/utils/Cycle.java new file mode 100644 index 000000000..09cae7da4 --- /dev/null +++ b/common/src/main/java/me/lucko/luckperms/common/utils/Cycle.java @@ -0,0 +1,79 @@ +/* + * This file is part of LuckPerms, licensed under the MIT License. + * + * Copyright (c) lucko (Luck) + * Copyright (c) contributors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package me.lucko.luckperms.common.utils; + +import com.google.common.collect.ImmutableList; + +import java.util.List; + +/** + * A cycle of elements, backed by a list. All operations are thread safe. + * + * @param the element type + */ +public class Cycle { + protected final List objects; + protected int index = 0; + + public Cycle(List objects) { + if (objects == null || objects.isEmpty()) { + throw new IllegalArgumentException("List of objects cannot be null/empty."); + } + this.objects = ImmutableList.copyOf(objects); + } + + public int getIndex() { + return index; + } + + public E current() { + synchronized (this) { + return objects.get(index); + } + } + + public E next() { + synchronized (this) { + index++; + index = index > objects.size() - 1 ? 0 : index; + + return objects.get(index); + } + } + + public E back() { + synchronized (this) { + index--; + index = index == -1 ? objects.size() - 1 : index; + + return objects.get(index); + } + } + + public List getBacking() { + return objects; + } +}