mirror of
https://github.com/DiscordSRV/Ascension.git
synced 2024-11-22 11:55:54 +01:00
Scheduler convenience methods, scheduler stuff cleanup
This commit is contained in:
parent
3ee90712e9
commit
8f90fea16e
@ -18,11 +18,14 @@
|
||||
|
||||
package com.discordsrv.bukkit.scheduler;
|
||||
|
||||
import com.discordsrv.common.function.CheckedSupplier;
|
||||
import com.discordsrv.common.future.util.CompletableFutureUtil;
|
||||
import com.discordsrv.common.scheduler.ServerScheduler;
|
||||
import org.bukkit.Server;
|
||||
import org.bukkit.command.CommandSender;
|
||||
import org.bukkit.plugin.Plugin;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
public interface IBukkitScheduler extends ServerScheduler {
|
||||
@ -33,4 +36,12 @@ public interface IBukkitScheduler extends ServerScheduler {
|
||||
runOnMainThread(task);
|
||||
}
|
||||
|
||||
default CompletableFuture<Void> executeOnMainThread(CommandSender sender, Runnable runnable) {
|
||||
return CompletableFuture.runAsync(runnable, task -> runOnMainThread(sender, task));
|
||||
}
|
||||
|
||||
default <T> CompletableFuture<T> supplyOnMainThread(CommandSender sender, CheckedSupplier<T> supplier) {
|
||||
return CompletableFutureUtil.supplyAsync(supplier, task -> runOnMainThread(sender, task));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ import org.bukkit.command.BlockCommandSender;
|
||||
import org.bukkit.command.CommandSender;
|
||||
import org.bukkit.command.ProxiedCommandSender;
|
||||
import org.bukkit.entity.Entity;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
public interface IFoliaScheduler extends ServerScheduler, IBukkitScheduler {
|
||||
|
||||
@ -51,17 +52,17 @@ public interface IFoliaScheduler extends ServerScheduler, IBukkitScheduler {
|
||||
}
|
||||
|
||||
@Override
|
||||
default void runOnMainThread(Runnable task) {
|
||||
default void runOnMainThread(@NotNull Runnable task) {
|
||||
runWithArgs((server, plugin) -> server.getGlobalRegionScheduler().execute(plugin, task));
|
||||
}
|
||||
|
||||
@Override
|
||||
default void runOnMainThreadLaterInTicks(Runnable task, int ticks) {
|
||||
default void runOnMainThreadLaterInTicks(@NotNull Runnable task, int ticks) {
|
||||
runWithArgs((server, plugin) -> server.getGlobalRegionScheduler().runDelayed(plugin, r -> task.run(), ticks));
|
||||
}
|
||||
|
||||
@Override
|
||||
default void runOnMainThreadAtFixedRateInTicks(Runnable task, int initialTicks, int rateTicks) {
|
||||
default void runOnMainThreadAtFixedRateInTicks(@NotNull Runnable task, int initialTicks, int rateTicks) {
|
||||
runWithArgs((server, plugin) -> server.getGlobalRegionScheduler().execute(plugin, task));
|
||||
}
|
||||
}
|
||||
|
@ -35,20 +35,14 @@ public class BukkitGameCommandExecutionHelper implements GameCommandExecutionHel
|
||||
|
||||
if (PaperCommandMap.IS_AVAILABLE) {
|
||||
// If Paper's CommandMap is available we can list out 'root' commands
|
||||
CompletableFuture<List<String>> future = new CompletableFuture<>();
|
||||
discordSRV.scheduler().runOnMainThread(discordSRV.server().getConsoleSender(), () -> {
|
||||
try {
|
||||
return discordSRV.scheduler().supplyOnMainThread(discordSRV.server().getConsoleSender(), () -> {
|
||||
for (String cmd : PaperCommandMap.getKnownCommands(discordSRV.server())) {
|
||||
if (commandName == null || cmd.startsWith(commandName)) {
|
||||
suggestions.add(cmd);
|
||||
}
|
||||
}
|
||||
future.complete(suggestions);
|
||||
} catch (Throwable t) {
|
||||
future.completeExceptionally(t);
|
||||
}
|
||||
return suggestions;
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
return CompletableFuture.completedFuture(suggestions);
|
||||
@ -57,24 +51,17 @@ public class BukkitGameCommandExecutionHelper implements GameCommandExecutionHel
|
||||
|
||||
// Get the arguments minus the last one (if any)
|
||||
String prefix = parts.isEmpty() ? "" : String.join(" ", parts.subList(0, parts.size() - 1)) + " ";
|
||||
CompletableFuture<List<String>> future = new CompletableFuture<>();
|
||||
|
||||
CommandSender commandSender = discordSRV.server().getConsoleSender();
|
||||
discordSRV.scheduler().runOnMainThread(commandSender, () -> {
|
||||
try {
|
||||
return discordSRV.scheduler().supplyOnMainThread(commandSender, () -> {
|
||||
List<String> completions = command.tabComplete(commandSender, commandName, parts.toArray(new String[0]));
|
||||
|
||||
List<String> suggestions = new ArrayList<>();
|
||||
for (String suggestion : completions) {
|
||||
suggestions.add(commandName + " " + prefix + suggestion);
|
||||
}
|
||||
future.complete(suggestions);
|
||||
} catch (Throwable t) {
|
||||
future.completeExceptionally(t);
|
||||
}
|
||||
return suggestions;
|
||||
});
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -70,7 +70,7 @@ public class EssentialsXIntegration
|
||||
}
|
||||
|
||||
private CompletableFuture<User> getUser(UUID playerUUID) {
|
||||
return CompletableFuture.supplyAsync(() -> get().getUsers().loadUncachedUser(playerUUID), discordSRV.scheduler().executor());
|
||||
return discordSRV.scheduler().supply(() -> get().getUsers().loadUncachedUser(playerUUID));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -116,20 +116,11 @@ public class VaultIntegration extends PluginIntegration<BukkitDiscordSRV> implem
|
||||
}
|
||||
|
||||
private <T> CompletableFuture<T> supply(CheckedSupplier<T> supplier, boolean async) {
|
||||
CompletableFuture<T> future = new CompletableFuture<>();
|
||||
Runnable runnable = () -> {
|
||||
try {
|
||||
future.complete(supplier.get());
|
||||
} catch (Throwable e) {
|
||||
future.completeExceptionally(e);
|
||||
}
|
||||
};
|
||||
if (async) {
|
||||
discordSRV.scheduler().run(runnable);
|
||||
return discordSRV.scheduler().supply(supplier);
|
||||
} else {
|
||||
discordSRV.scheduler().runOnMainThread(runnable);
|
||||
return discordSRV.scheduler().supplyOnMainThread(supplier);
|
||||
}
|
||||
return future;
|
||||
}
|
||||
|
||||
private OfflinePlayer offlinePlayer(UUID player) {
|
||||
|
@ -77,14 +77,14 @@ public class BukkitPlayerProvider extends ServerPlayerProvider<BukkitPlayer, Buk
|
||||
// IOfflinePlayer
|
||||
|
||||
private CompletableFuture<IOfflinePlayer> getFuture(Supplier<OfflinePlayer> provider) {
|
||||
return CompletableFuture.supplyAsync(() -> {
|
||||
return discordSRV.scheduler().supply(() -> {
|
||||
OfflinePlayer offlinePlayer = provider.get();
|
||||
if (offlinePlayer == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return new BukkitOfflinePlayer(discordSRV, offlinePlayer);
|
||||
}, discordSRV.scheduler().executor());
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -19,6 +19,7 @@
|
||||
package com.discordsrv.bukkit.scheduler;
|
||||
|
||||
import com.discordsrv.bukkit.BukkitDiscordSRV;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
public class BukkitScheduler extends AbstractBukkitScheduler {
|
||||
|
||||
@ -27,17 +28,17 @@ public class BukkitScheduler extends AbstractBukkitScheduler {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void runOnMainThread(Runnable task) {
|
||||
public void runOnMainThread(@NotNull Runnable task) {
|
||||
checkDisable(task, (server, plugin) -> server.getScheduler().runTask(plugin, task));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void runOnMainThreadLaterInTicks(Runnable task, int ticks) {
|
||||
public void runOnMainThreadLaterInTicks(@NotNull Runnable task, int ticks) {
|
||||
checkDisable(task, (server, plugin) -> server.getScheduler().runTaskLater(plugin, task, ticks));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void runOnMainThreadAtFixedRateInTicks(Runnable task, int initialTicks, int rateTicks) {
|
||||
public void runOnMainThreadAtFixedRateInTicks(@NotNull Runnable task, int initialTicks, int rateTicks) {
|
||||
checkDisable(task, (server, plugin) -> server.getScheduler().runTaskTimer(plugin, task, initialTicks, rateTicks));
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
package com.discordsrv.bukkit.scheduler;
|
||||
|
||||
import com.discordsrv.bukkit.BukkitDiscordSRV;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
public class FoliaScheduler extends AbstractBukkitScheduler implements IFoliaScheduler {
|
||||
|
||||
@ -27,17 +28,17 @@ public class FoliaScheduler extends AbstractBukkitScheduler implements IFoliaSch
|
||||
}
|
||||
|
||||
@Override
|
||||
public void runOnMainThread(Runnable task) {
|
||||
public void runOnMainThread(@NotNull Runnable task) {
|
||||
checkDisable(task, (server, plugin) -> IFoliaScheduler.super.runOnMainThread(task));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void runOnMainThreadLaterInTicks(Runnable task, int ticks) {
|
||||
public void runOnMainThreadLaterInTicks(@NotNull Runnable task, int ticks) {
|
||||
checkDisable(task, (server, plugin) -> IFoliaScheduler.super.runOnMainThreadLaterInTicks(task, ticks));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void runOnMainThreadAtFixedRateInTicks(Runnable task, int initialTicks, int rateTicks) {
|
||||
public void runOnMainThreadAtFixedRateInTicks(@NotNull Runnable task, int initialTicks, int rateTicks) {
|
||||
checkDisable(task, (server, plugin) -> IFoliaScheduler.super.runOnMainThreadAtFixedRateInTicks(task, initialTicks, rateTicks));
|
||||
}
|
||||
}
|
||||
|
@ -538,7 +538,7 @@ public abstract class AbstractDiscordSRV<
|
||||
|
||||
@Override
|
||||
public final CompletableFuture<Void> invokeDisable() {
|
||||
return CompletableFuture.runAsync(this::disable, scheduler().executorService());
|
||||
return scheduler().execute(this::disable);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -59,7 +59,7 @@ public abstract class ServerDiscordSRV<
|
||||
}
|
||||
|
||||
public final CompletableFuture<Void> invokeServerStarted() {
|
||||
return CompletableFuture.supplyAsync(() -> {
|
||||
return scheduler().supply(() -> {
|
||||
if (status().isShutdown()) {
|
||||
// Already shutdown/shutting down, don't bother
|
||||
return null;
|
||||
@ -76,7 +76,7 @@ public abstract class ServerDiscordSRV<
|
||||
logger().error("Failed to start", t);
|
||||
}
|
||||
return null;
|
||||
}, scheduler().executorService());
|
||||
});
|
||||
}
|
||||
|
||||
@OverridingMethodsMustInvokeSuper
|
||||
|
@ -283,7 +283,7 @@ public class JDAConnectionManager implements DiscordConnectionManager {
|
||||
throw new IllegalStateException("Cannot reconnect, still active");
|
||||
}
|
||||
|
||||
return connectionFuture = CompletableFuture.runAsync(this::connectInternal, discordSRV.scheduler().executor());
|
||||
return connectionFuture = discordSRV.scheduler().execute(this::connectInternal);
|
||||
}
|
||||
|
||||
private void connectInternal() {
|
||||
@ -413,10 +413,10 @@ public class JDAConnectionManager implements DiscordConnectionManager {
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> reconnect() {
|
||||
return CompletableFuture.runAsync(() -> {
|
||||
return discordSRV.scheduler().execute(() -> {
|
||||
shutdown().join();
|
||||
connect().join();
|
||||
}, discordSRV.scheduler().executor());
|
||||
});
|
||||
}
|
||||
|
||||
@Subscribe(priority = EventPriority.LATE)
|
||||
@ -427,7 +427,7 @@ public class JDAConnectionManager implements DiscordConnectionManager {
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> shutdown(long timeoutMillis) {
|
||||
return CompletableFuture.runAsync(() -> shutdownInternal(timeoutMillis), discordSRV.scheduler().executor());
|
||||
return discordSRV.scheduler().execute(() -> shutdownInternal(timeoutMillis));
|
||||
}
|
||||
|
||||
@SuppressWarnings("BusyWait")
|
||||
|
@ -19,7 +19,7 @@
|
||||
package com.discordsrv.common.function;
|
||||
|
||||
@FunctionalInterface
|
||||
public interface CheckedRunnable<T> {
|
||||
public interface CheckedRunnable {
|
||||
|
||||
T run() throws Throwable;
|
||||
void run() throws Throwable;
|
||||
}
|
||||
|
@ -19,12 +19,15 @@
|
||||
package com.discordsrv.common.future.util;
|
||||
|
||||
import com.discordsrv.common.DiscordSRV;
|
||||
import com.discordsrv.common.function.CheckedRunnable;
|
||||
import com.discordsrv.common.function.CheckedSupplier;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
@ -48,20 +51,13 @@ public final class CompletableFutureUtil {
|
||||
|
||||
@SafeVarargs
|
||||
public static <T> CompletableFuture<List<T>> combine(CompletableFuture<T>... futures) {
|
||||
CompletableFuture<List<T>> future = new CompletableFuture<>();
|
||||
CompletableFuture.allOf(futures).whenComplete((v, t) -> {
|
||||
if (t != null) {
|
||||
future.completeExceptionally(t);
|
||||
return;
|
||||
}
|
||||
|
||||
return CompletableFuture.allOf(futures).thenApply(v -> {
|
||||
List<T> results = new ArrayList<>();
|
||||
for (CompletableFuture<T> aFuture : futures) {
|
||||
results.add(aFuture.join());
|
||||
}
|
||||
future.complete(results);
|
||||
return results;
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
public static <T> CompletableFuture<T> timeout(DiscordSRV discordSRV, CompletableFuture<T> future, Duration timeout) {
|
||||
@ -76,4 +72,28 @@ public final class CompletableFutureUtil {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public static <T> CompletableFuture<T> supplyAsync(CheckedSupplier<T> supplier, Executor executor) {
|
||||
CompletableFuture<T> future = new CompletableFuture<>();
|
||||
executor.execute(() -> {
|
||||
if (future.isCancelled()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
future.complete(supplier.get());
|
||||
} catch (InterruptedException ignored) {
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (Throwable t) {
|
||||
future.completeExceptionally(t);
|
||||
}
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
public static CompletableFuture<Void> runAsync(CheckedRunnable runnable, Executor executor) {
|
||||
return supplyAsync(() -> {
|
||||
runnable.run();
|
||||
return null;
|
||||
}, executor);
|
||||
}
|
||||
}
|
||||
|
@ -26,8 +26,7 @@ public final class HttpUtil {
|
||||
}
|
||||
|
||||
public static <T> CompletableFuture<T> readJson(DiscordSRV discordSRV, Request request, Class<T> type) {
|
||||
CompletableFuture<T> future = new CompletableFuture<>();
|
||||
discordSRV.scheduler().run(() -> {
|
||||
return discordSRV.scheduler().supply(() -> {
|
||||
try (Response response = discordSRV.httpClient().newCall(request).execute()) {
|
||||
ResponseBody responseBody = checkIfResponseSuccessful(request, response);
|
||||
|
||||
@ -35,11 +34,8 @@ public final class HttpUtil {
|
||||
if (result == null) {
|
||||
throw new MessageException("Response json cannot be parsed");
|
||||
}
|
||||
future.complete(result);
|
||||
} catch (Throwable t) {
|
||||
future.completeExceptionally(t);
|
||||
return result;
|
||||
}
|
||||
});
|
||||
return future;
|
||||
}
|
||||
}
|
||||
|
@ -171,15 +171,7 @@ public class MinecraftAuthenticationLinker extends CachedLinkProvider implements
|
||||
Consumer<T> linked,
|
||||
Consumer<T> unlinked
|
||||
) {
|
||||
CompletableFuture<Optional<T>> authService = new CompletableFuture<>();
|
||||
|
||||
discordSRV.scheduler().run(() -> {
|
||||
try {
|
||||
authService.complete(authSupplier.get());
|
||||
} catch (Throwable t) {
|
||||
authService.completeExceptionally(t);
|
||||
}
|
||||
});
|
||||
CompletableFuture<Optional<T>> authService = discordSRV.scheduler().supply(authSupplier);
|
||||
if (!canCauseLink) {
|
||||
return authService;
|
||||
}
|
||||
|
@ -41,77 +41,58 @@ public class StorageLinker extends CachedLinkProvider implements LinkProvider, L
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Optional<Long>> queryUserId(@NotNull UUID playerUUID, boolean canCauseLink) {
|
||||
return CompletableFuture.supplyAsync(() -> {
|
||||
return discordSRV.scheduler().supply(() -> {
|
||||
Long value = discordSRV.storage().getUserId(playerUUID);
|
||||
return Optional.ofNullable(value);
|
||||
}, discordSRV.scheduler().executor());
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Optional<UUID>> queryPlayerUUID(long userId, boolean canCauseLink) {
|
||||
return CompletableFuture.supplyAsync(() -> {
|
||||
return discordSRV.scheduler().supply(() -> {
|
||||
UUID value = discordSRV.storage().getPlayerUUID(userId);
|
||||
return Optional.ofNullable(value);
|
||||
}, discordSRV.scheduler().executor());
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> createLink(@NotNull UUID playerUUID, long userId) {
|
||||
return CompletableFuture.runAsync(
|
||||
() -> discordSRV.storage().createLink(playerUUID, userId),
|
||||
discordSRV.scheduler().executor()
|
||||
);
|
||||
return discordSRV.scheduler().execute(() -> discordSRV.storage().createLink(playerUUID, userId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> removeLink(@NotNull UUID playerUUID, long userId) {
|
||||
return CompletableFuture.runAsync(
|
||||
() -> discordSRV.storage().removeLink(playerUUID, userId),
|
||||
discordSRV.scheduler().executor()
|
||||
);
|
||||
return discordSRV.scheduler().execute(() -> discordSRV.storage().removeLink(playerUUID, userId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<UUID> getCodeLinking(long userId, @NotNull String code) {
|
||||
return CompletableFuture.supplyAsync(
|
||||
() -> discordSRV.storage().getLinkingCode(code),
|
||||
discordSRV.scheduler().executor()
|
||||
);
|
||||
return discordSRV.scheduler().supply(() -> discordSRV.storage().getLinkingCode(code));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> removeLinkingCode(@NotNull String code) {
|
||||
return CompletableFuture.runAsync(
|
||||
() -> {
|
||||
return discordSRV.scheduler().execute(() -> {
|
||||
UUID player = discordSRV.storage().getLinkingCode(code);
|
||||
discordSRV.storage().removeLinkingCode(player);
|
||||
},
|
||||
discordSRV.scheduler().executor()
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> removeLinkingCode(@NotNull UUID playerUUID) {
|
||||
return CompletableFuture.runAsync(
|
||||
() -> discordSRV.storage().removeLinkingCode(playerUUID),
|
||||
discordSRV.scheduler().executor()
|
||||
);
|
||||
return discordSRV.scheduler().execute(() -> discordSRV.storage().removeLinkingCode(playerUUID));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Integer> getLinkedAccountCount() {
|
||||
return CompletableFuture.supplyAsync(
|
||||
() -> discordSRV.storage().getLinkedAccountCount(),
|
||||
discordSRV.scheduler().executor()
|
||||
);
|
||||
return discordSRV.scheduler().supply(() -> discordSRV.storage().getLinkedAccountCount());
|
||||
}
|
||||
|
||||
private final SecureRandom secureRandom = new SecureRandom();
|
||||
|
||||
@Override
|
||||
public CompletableFuture<MinecraftComponent> getLinkingInstructions(String username, UUID playerUUID, @Nullable Locale locale, @Nullable String requestReason) {
|
||||
return CompletableFuture.supplyAsync(
|
||||
() -> {
|
||||
return discordSRV.scheduler().supply(() -> {
|
||||
String code = null;
|
||||
while (code == null || discordSRV.storage().getLinkingCode(code) != null) {
|
||||
code = String.valueOf(secureRandom.nextInt(1000000));
|
||||
@ -122,8 +103,6 @@ public class StorageLinker extends CachedLinkProvider implements LinkProvider, L
|
||||
|
||||
discordSRV.storage().storeLinkingCode(playerUUID, code);
|
||||
return ComponentUtil.toAPI(Component.text(code));
|
||||
},
|
||||
discordSRV.scheduler().executor()
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -19,7 +19,6 @@
|
||||
package com.discordsrv.common.linking.requirelinking.requirement;
|
||||
|
||||
import com.discordsrv.common.DiscordSRV;
|
||||
import com.discordsrv.common.function.CheckedSupplier;
|
||||
import me.minecraftauth.lib.AuthService;
|
||||
import me.minecraftauth.lib.account.platform.twitch.SubTier;
|
||||
import me.minecraftauth.lib.exception.LookupException;
|
||||
@ -165,23 +164,13 @@ public class MinecraftAuthRequirement<T> implements Requirement<MinecraftAuthReq
|
||||
public CompletableFuture<Boolean> isMet(Reference<T> atomicReference, UUID player, long userId) {
|
||||
String token = discordSRV.connectionConfig().minecraftAuth.token;
|
||||
T value = atomicReference.getValue();
|
||||
return discordSRV.scheduler().supply(() -> {
|
||||
if (value == null) {
|
||||
return supply(() -> test.test(token, player));
|
||||
return test.test(token, player);
|
||||
} else {
|
||||
return supply(() -> testSpecific.test(token, player, value));
|
||||
}
|
||||
}
|
||||
|
||||
private CompletableFuture<Boolean> supply(CheckedSupplier<Boolean> provider) {
|
||||
CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
|
||||
discordSRV.scheduler().run(() -> {
|
||||
try {
|
||||
completableFuture.complete(provider.get());
|
||||
} catch (Throwable t) {
|
||||
completableFuture.completeExceptionally(t);
|
||||
return testSpecific.test(token, player, value);
|
||||
}
|
||||
});
|
||||
return completableFuture;
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
|
@ -41,12 +41,11 @@ import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
public class JoinMessageModule extends AbstractGameMessageModule<IMessageConfig, JoinMessageReceiveEvent> {
|
||||
|
||||
private final Map<UUID, Runnable> delayedTasks = new HashMap<>();
|
||||
private final Map<UUID, Future<?>> delayedTasks = new HashMap<>();
|
||||
|
||||
public JoinMessageModule(DiscordSRV discordSRV) {
|
||||
super(discordSRV, "JOIN_MESSAGES");
|
||||
@ -79,24 +78,12 @@ public class JoinMessageModule extends AbstractGameMessageModule<IMessageConfig,
|
||||
|
||||
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
|
||||
synchronized (delayedTasks) {
|
||||
Future<?> future = discordSRV.scheduler().runLater(() -> {
|
||||
CompletableFuture<Void> forward = super.forwardToChannel(event, player, config);
|
||||
CompletableFuture<Void> future = discordSRV.scheduler()
|
||||
.supplyLater(() -> super.forwardToChannel(event, player, config), Duration.ofMillis(delay))
|
||||
.thenCompose(r -> r)
|
||||
.whenComplete((v, t) -> delayedTasks.remove(playerUUID));
|
||||
|
||||
synchronized (delayedTasks) {
|
||||
delayedTasks.remove(playerUUID);
|
||||
}
|
||||
try {
|
||||
completableFuture.complete(forward.get());
|
||||
} catch (ExecutionException e) {
|
||||
completableFuture.completeExceptionally(e.getCause());
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}, Duration.ofMillis(delay));
|
||||
delayedTasks.put(playerUUID, () -> {
|
||||
completableFuture.complete(null);
|
||||
future.cancel(true);
|
||||
});
|
||||
delayedTasks.put(playerUUID, future);
|
||||
}
|
||||
return completableFuture;
|
||||
}
|
||||
@ -106,9 +93,9 @@ public class JoinMessageModule extends AbstractGameMessageModule<IMessageConfig,
|
||||
@Subscribe
|
||||
public void onPlayerDisconnected(PlayerDisconnectedEvent event) {
|
||||
IPlayer player = event.player();
|
||||
Runnable cancel = delayedTasks.remove(player.uniqueId());
|
||||
if (cancel != null) {
|
||||
cancel.run();
|
||||
Future<?> future = delayedTasks.remove(player.uniqueId());
|
||||
if (future != null) {
|
||||
future.cancel(true);
|
||||
logger().info(player.username() + " left within timeout period, join message will not be sent");
|
||||
}
|
||||
}
|
||||
|
@ -18,7 +18,9 @@
|
||||
|
||||
package com.discordsrv.common.scheduler;
|
||||
|
||||
import org.jetbrains.annotations.ApiStatus;
|
||||
import com.discordsrv.common.function.CheckedRunnable;
|
||||
import com.discordsrv.common.function.CheckedSupplier;
|
||||
import com.discordsrv.common.future.util.CompletableFutureUtil;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.time.Duration;
|
||||
@ -61,15 +63,37 @@ public interface Scheduler {
|
||||
*
|
||||
* @param task the task
|
||||
*/
|
||||
@NotNull
|
||||
Future<?> run(@NotNull Runnable task);
|
||||
|
||||
@NotNull
|
||||
default CompletableFuture<Void> execute(@NotNull CheckedRunnable task) {
|
||||
return CompletableFutureUtil.runAsync(task, this::run);
|
||||
}
|
||||
|
||||
@NotNull
|
||||
default <T> CompletableFuture<T> supply(@NotNull CheckedSupplier<T> supplier) {
|
||||
return CompletableFutureUtil.supplyAsync(supplier, this::run);
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules the given task after the provided amount of milliseconds.
|
||||
*
|
||||
* @param task the task
|
||||
* @param delay the delay before executing the task
|
||||
*/
|
||||
ScheduledFuture<?> runLater(Runnable task, Duration delay);
|
||||
@NotNull
|
||||
ScheduledFuture<?> runLater(@NotNull Runnable task, @NotNull Duration delay);
|
||||
|
||||
@NotNull
|
||||
default CompletableFuture<Void> executeLater(@NotNull CheckedRunnable task, @NotNull Duration delay) {
|
||||
return CompletableFutureUtil.runAsync(task, t -> runLater(t, delay));
|
||||
}
|
||||
|
||||
@NotNull
|
||||
default <T> CompletableFuture<T> supplyLater(@NotNull CheckedSupplier<T> supplier, @NotNull Duration delay) {
|
||||
return CompletableFutureUtil.supplyAsync(supplier, task -> runLater(task, delay));
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules the given task at the given rate.
|
||||
@ -77,7 +101,7 @@ public interface Scheduler {
|
||||
* @param task the task
|
||||
* @param rate the rate in the given unit
|
||||
*/
|
||||
@ApiStatus.NonExtendable
|
||||
@NotNull
|
||||
default ScheduledFuture<?> runAtFixedRate(@NotNull Runnable task, Duration rate) {
|
||||
return runAtFixedRate(task, rate, rate);
|
||||
}
|
||||
@ -89,7 +113,7 @@ public interface Scheduler {
|
||||
* @param initialDelay the initial delay in the provided unit
|
||||
* @param rate the rate to run the task at in the given unit
|
||||
*/
|
||||
@ApiStatus.NonExtendable
|
||||
@NotNull
|
||||
ScheduledFuture<?> runAtFixedRate(@NotNull Runnable task, Duration initialDelay, Duration rate);
|
||||
|
||||
}
|
||||
|
@ -18,8 +18,12 @@
|
||||
|
||||
package com.discordsrv.common.scheduler;
|
||||
|
||||
import org.jetbrains.annotations.ApiStatus;
|
||||
import com.discordsrv.common.function.CheckedRunnable;
|
||||
import com.discordsrv.common.function.CheckedSupplier;
|
||||
import com.discordsrv.common.future.util.CompletableFutureUtil;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@SuppressWarnings("unused") // API
|
||||
@ -44,7 +48,17 @@ public interface ServerScheduler extends Scheduler {
|
||||
* Runs the provided task on the server's main thread as soon as possible.
|
||||
* @param task the task
|
||||
*/
|
||||
void runOnMainThread(Runnable task);
|
||||
void runOnMainThread(@NotNull Runnable task);
|
||||
|
||||
@NotNull
|
||||
default CompletableFuture<Void> supplyOnMainThread(@NotNull CheckedRunnable task) {
|
||||
return CompletableFutureUtil.runAsync(task, this::runOnMainThread);
|
||||
}
|
||||
|
||||
@NotNull
|
||||
default <T> CompletableFuture<T> supplyOnMainThread(@NotNull CheckedSupplier<T> task) {
|
||||
return CompletableFutureUtil.supplyAsync(task, this::runOnMainThread);
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs the provided task in on the server's main thread in the provided amount of ticks.
|
||||
@ -53,7 +67,17 @@ public interface ServerScheduler extends Scheduler {
|
||||
* @see #TICKS_PER_SECOND
|
||||
* @see #timeToTicks(long, TimeUnit)
|
||||
*/
|
||||
void runOnMainThreadLaterInTicks(Runnable task, int ticks);
|
||||
void runOnMainThreadLaterInTicks(@NotNull Runnable task, int ticks);
|
||||
|
||||
@NotNull
|
||||
default CompletableFuture<Void> executeOnMainThreadLaterInTicks(@NotNull CheckedRunnable task, int ticks) {
|
||||
return CompletableFutureUtil.runAsync(task, t -> runOnMainThreadLaterInTicks(t, ticks));
|
||||
}
|
||||
|
||||
@NotNull
|
||||
default <T> CompletableFuture<T> supplyOnMainThreadLaterInTicks(@NotNull CheckedSupplier<T> task, int ticks) {
|
||||
return CompletableFutureUtil.supplyAsync(task, t -> runOnMainThreadLaterInTicks(t, ticks));
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs the task on the server's main thread continuously at provided rate in ticks.
|
||||
@ -61,8 +85,7 @@ public interface ServerScheduler extends Scheduler {
|
||||
* @param task the task
|
||||
* @param rateTicks the rate in ticks
|
||||
*/
|
||||
@ApiStatus.NonExtendable
|
||||
default void runOnMainThreadAtFixedRateInTicks(Runnable task, int rateTicks) {
|
||||
default void runOnMainThreadAtFixedRateInTicks(@NotNull Runnable task, int rateTicks) {
|
||||
runOnMainThreadAtFixedRateInTicks(task, 0, rateTicks);
|
||||
}
|
||||
|
||||
@ -73,7 +96,7 @@ public interface ServerScheduler extends Scheduler {
|
||||
* @param initialTicks the initial delay in ticks
|
||||
* @param rateTicks the rate in ticks
|
||||
*/
|
||||
void runOnMainThreadAtFixedRateInTicks(Runnable task, int initialTicks, int rateTicks);
|
||||
void runOnMainThreadAtFixedRateInTicks(@NotNull Runnable task, int initialTicks, int rateTicks);
|
||||
|
||||
|
||||
}
|
||||
|
@ -120,17 +120,17 @@ public class StandardScheduler implements Scheduler {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<?> run(@NotNull Runnable task) {
|
||||
public @NotNull Future<?> run(@NotNull Runnable task) {
|
||||
return executorService.submit(wrap(task));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> runLater(Runnable task, Duration delay) {
|
||||
public @NotNull ScheduledFuture<?> runLater(@NotNull Runnable task, @NotNull Duration delay) {
|
||||
return scheduledExecutorService.schedule(wrap(task), delay.toMillis(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> runAtFixedRate(@NotNull Runnable task, Duration initialDelay, Duration rate) {
|
||||
public @NotNull ScheduledFuture<?> runAtFixedRate(@NotNull Runnable task, Duration initialDelay, Duration rate) {
|
||||
return scheduledExecutorService.scheduleAtFixedRate(wrap(task), initialDelay.toMillis(), rate.toMillis(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user