From ae60e7e4232975d5bb92d65aa60ee94aaeee7b21 Mon Sep 17 00:00:00 2001 From: Vankka Date: Sat, 29 Jun 2024 19:54:02 +0300 Subject: [PATCH] Improve console handling --- .../main/generic/DestinationConfig.java | 41 ++- .../config/main/generic/ThreadConfig.java | 23 ++ .../common/console/ConsoleModule.java | 37 ++- .../common/console/SingleConsoleHandler.java | 300 ++++++++++++------ 4 files changed, 279 insertions(+), 122 deletions(-) diff --git a/common/src/main/java/com/discordsrv/common/config/main/generic/DestinationConfig.java b/common/src/main/java/com/discordsrv/common/config/main/generic/DestinationConfig.java index 56835978..201d0ead 100644 --- a/common/src/main/java/com/discordsrv/common/config/main/generic/DestinationConfig.java +++ b/common/src/main/java/com/discordsrv/common/config/main/generic/DestinationConfig.java @@ -27,6 +27,7 @@ import org.spongepowered.configurate.objectmapping.meta.Setting; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; @ConfigSerializable public class DestinationConfig { @@ -45,23 +46,43 @@ public class DestinationConfig { @Setting("channel-id") public Long channelId = 0L; - @Setting("thread-name") - @Comment("If specified this destination will be a thread in the provided channel-id's channel, if left blank the destination will be the channel") - public String threadName = ""; - public boolean privateThread = false; + @Setting(nodeFromParent = true) + public ThreadConfig thread = new ThreadConfig(""); public DestinationConfig asDestination() { DestinationConfig config = new DestinationConfig(); - if (StringUtils.isEmpty(threadName)) { + if (thread == null || StringUtils.isEmpty(thread.threadName)) { config.channelIds.add(channelId); } else { - ThreadConfig threadConfig = new ThreadConfig(); - threadConfig.channelId = channelId; - threadConfig.threadName = threadName; - threadConfig.privateThread = privateThread; - config.threads.add(threadConfig); + config.threads.add(thread); } return config; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Single single = (Single) o; + return Objects.equals(channelId, single.channelId) && Objects.equals(thread, single.thread); + } + + @Override + public int hashCode() { + return Objects.hash(channelId, thread); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DestinationConfig that = (DestinationConfig) o; + return Objects.equals(channelIds, that.channelIds) && Objects.equals(threads, that.threads); + } + + @Override + public int hashCode() { + return Objects.hash(channelIds, threads); } } diff --git a/common/src/main/java/com/discordsrv/common/config/main/generic/ThreadConfig.java b/common/src/main/java/com/discordsrv/common/config/main/generic/ThreadConfig.java index efd2aa4d..e3090de6 100644 --- a/common/src/main/java/com/discordsrv/common/config/main/generic/ThreadConfig.java +++ b/common/src/main/java/com/discordsrv/common/config/main/generic/ThreadConfig.java @@ -21,9 +21,17 @@ package com.discordsrv.common.config.main.generic; import org.spongepowered.configurate.objectmapping.ConfigSerializable; import org.spongepowered.configurate.objectmapping.meta.Comment; +import java.util.Objects; + @ConfigSerializable public class ThreadConfig { + public ThreadConfig() {} + + public ThreadConfig(String name) { + this.threadName = name; + } + @Comment("Specify the text or forum channel id and the name of the thread (the thread will be automatically created if it doesn't exist)") public Long channelId = 0L; @@ -35,4 +43,19 @@ public class ThreadConfig { @Comment("Does not effect forums") public boolean privateThread = false; + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ThreadConfig that = (ThreadConfig) o; + return unarchiveExisting == that.unarchiveExisting + && privateThread == that.privateThread + && Objects.equals(channelId, that.channelId) + && Objects.equals(threadName, that.threadName); + } + + @Override + public int hashCode() { + return Objects.hash(channelId, threadName, unarchiveExisting, privateThread); + } } diff --git a/common/src/main/java/com/discordsrv/common/console/ConsoleModule.java b/common/src/main/java/com/discordsrv/common/console/ConsoleModule.java index d41fdbbf..0b098c94 100644 --- a/common/src/main/java/com/discordsrv/common/console/ConsoleModule.java +++ b/common/src/main/java/com/discordsrv/common/console/ConsoleModule.java @@ -31,7 +31,6 @@ import com.discordsrv.common.logging.LogLevel; import com.discordsrv.common.logging.NamedLogger; import com.discordsrv.common.logging.backend.LoggingBackend; import com.discordsrv.common.module.type.AbstractModule; -import org.apache.commons.lang3.StringUtils; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -57,6 +56,11 @@ public class ConsoleModule extends AbstractModule implements LogAppe return Collections.emptySet(); } + @Override + public boolean canEnableBeforeReady() { + return discordSRV.config() != null; + } + @Override public void enable() { backend = discordSRV.console().loggingBackend(); @@ -65,15 +69,31 @@ public class ConsoleModule extends AbstractModule implements LogAppe @Override public void reload(Consumer resultConsumer) { - for (SingleConsoleHandler handler : handlers) { - handler.shutdown(); - } - handlers.clear(); - List configs = discordSRV.config().console; - for (ConsoleConfig config : configs) { + Set uncheckedConfigs = new LinkedHashSet<>(configs); + + for (int i = handlers.size() - 1; i >= 0; i--) { + SingleConsoleHandler handler = handlers.get(i); + + ConsoleConfig matchingConfig = null; + for (ConsoleConfig config : configs) { + if (config.channel.equals(handler.getConfig().channel)) { + matchingConfig = config; + break; + } + } + if (matchingConfig != null) { + handler.setConfig(matchingConfig); + uncheckedConfigs.remove(matchingConfig); + } else { + handlers.remove(i); + discordSRV.scheduler().run(handler::shutdown); + } + } + + for (ConsoleConfig config : uncheckedConfigs) { DestinationConfig.Single destination = config.channel; - if (destination.channelId == 0L && StringUtils.isEmpty(destination.threadName)) { + if (destination.channelId == 0L) { logger().debug("Skipping a console handler due to lack of channel"); continue; } @@ -84,6 +104,7 @@ public class ConsoleModule extends AbstractModule implements LogAppe handlers.add(new SingleConsoleHandler(discordSRV, logger(), config)); } + logger().debug(handlers.size() + " console handlers active"); } @Override diff --git a/common/src/main/java/com/discordsrv/common/console/SingleConsoleHandler.java b/common/src/main/java/com/discordsrv/common/console/SingleConsoleHandler.java index dd9f8824..16d6db17 100644 --- a/common/src/main/java/com/discordsrv/common/console/SingleConsoleHandler.java +++ b/common/src/main/java/com/discordsrv/common/console/SingleConsoleHandler.java @@ -43,13 +43,12 @@ import com.discordsrv.common.logging.Logger; import net.dv8tion.jda.api.entities.Message; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.commons.lang3.tuple.Pair; import java.time.Duration; import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; /** * The log appending and command handling for a single console channel. @@ -57,20 +56,22 @@ import java.util.concurrent.TimeUnit; public class SingleConsoleHandler { private static final int MESSAGE_MAX_LENGTH = Message.MAX_CONTENT_LENGTH; + private static final int SEND_QUEUE_MAX_SIZE = 6; private final DiscordSRV discordSRV; private final Logger logger; - private final ConsoleConfig config; - private final Queue queue; + private ConsoleConfig config; + private Queue messageQueue; + private Deque> sendQueue; private Future queueProcessingFuture; private boolean shutdown = false; // Editing - private final List messageCache; - private Long mostRecentMessageId; + private List messageCache; + private final AtomicLong mostRecentMessageId = new AtomicLong(0); - // Preventing concurrent sends - private final Object sendLock = new Object(); + // Sending + private boolean sentFirstBatch = false; private CompletableFuture sendFuture; // Don't annoy console users twice about using / @@ -79,11 +80,7 @@ public class SingleConsoleHandler { public SingleConsoleHandler(DiscordSRV discordSRV, Logger logger, ConsoleConfig config) { this.discordSRV = discordSRV; this.logger = logger; - this.config = config; - this.queue = config.appender.outputMode != ConsoleConfig.OutputMode.OFF ? new LinkedBlockingQueue<>() : null; - this.messageCache = config.appender.useEditing ? new ArrayList<>() : null; - - timeQueueProcess(); + setConfig(config); } public void handleDiscordMessageReceived(DiscordMessageReceiveEvent event) { @@ -108,7 +105,7 @@ public class SingleConsoleHandler { } DestinationConfig.Single destination = config.channel; - String threadName = destination.threadName; + String threadName = destination.thread.threadName; DiscordGuildChannel checkChannel; if (StringUtils.isNotEmpty(threadName)) { @@ -176,36 +173,88 @@ public class SingleConsoleHandler { if (messageCache != null) { messageCache.clear(); } - mostRecentMessageId = null; + synchronized (mostRecentMessageId) { + mostRecentMessageId.set(0); + } // Run the command discordSRV.console().runCommandWithLogging(discordSRV, user, command); } public void queue(LogEntry entry) { - if (queue == null) { + if (messageQueue == null) { return; } - queue.offer(entry); + messageQueue.offer(entry); } - @SuppressWarnings("SynchronizeOnNonFinalField") + public ConsoleConfig getConfig() { + return config; + } + + public void setConfig(ConsoleConfig config) { + if (queueProcessingFuture != null) { + queueProcessingFuture.cancel(false); + } + + this.config = config; + + boolean sendOn = config.appender.outputMode != ConsoleConfig.OutputMode.OFF; + if (sendOn) { + if (messageQueue == null) { + this.messageQueue = new LinkedBlockingQueue<>(); + this.sendQueue = new LinkedBlockingDeque<>(); + } + } else { + if (messageQueue != null) { + this.messageQueue = null; + this.sendQueue = null; + } + } + + boolean edit = config.appender.useEditing; + if (edit) { + if (messageCache == null) { + this.messageCache = new ArrayList<>(); + } + } else { + if (messageCache != null) { + this.messageCache = null; + } + } + + timeQueueProcess(); + } + + @SuppressWarnings({"BusyWait"}) public void shutdown() { shutdown = true; queueProcessingFuture.cancel(false); + processQueue(); + try { - synchronized (queueProcessingFuture) { - queueProcessingFuture.wait(TimeUnit.SECONDS.toMillis(3)); + long start = System.nanoTime(); + while (!sendFuture.isDone()) { + if (System.nanoTime() - start > TimeUnit.SECONDS.toNanos(3)) { + break; + } + Thread.sleep(50); } - } catch (InterruptedException e) { + sendFuture.cancel(true); + } catch (InterruptedException ignored) { Thread.currentThread().interrupt(); } - queue.clear(); + if (messageQueue != null) { + messageQueue.clear(); + } + if (sendQueue != null) { + sendQueue.clear(); + } if (messageCache != null) { messageCache.clear(); } - mostRecentMessageId = null; + mostRecentMessageId.set(0); } private void timeQueueProcess() { @@ -215,56 +264,79 @@ public class SingleConsoleHandler { if (config.appender.outputMode == ConsoleConfig.OutputMode.OFF) { return; } - this.queueProcessingFuture = discordSRV.scheduler().runLater(this::processQueue, Duration.ofSeconds(2)); + this.queueProcessingFuture = discordSRV.scheduler().runLater(this::processQueue, Duration.ofMillis(1500)); } private void processQueue() { try { - ConsoleConfig.Appender appenderConfig = config.appender; - ConsoleConfig.OutputMode outputMode = appenderConfig.outputMode; + processMessageQueue(); + } catch (Exception e) { + logger.error("Failed to process console lines", e); + } - Queue currentBuffer = new LinkedBlockingQueue<>(); - LogEntry entry; - while ((entry = queue.poll()) != null) { - String level = entry.level().name(); - if (appenderConfig.levels.levels.contains(level) == appenderConfig.levels.blacklist) { - // Ignored level - continue; - } - - String loggerName = entry.loggerName(); - if (StringUtils.isEmpty(loggerName)) loggerName = "NONE"; - if (appenderConfig.loggers.loggers.contains(loggerName) == appenderConfig.loggers.blacklist) { - // Ignored logger - continue; - } - - List messages = formatEntry(entry, outputMode, config.appender.diffExceptions); - if (messages.size() == 1) { - LogMessage message = new LogMessage(entry, messages.get(0)); - currentBuffer.add(message); - } else { - clearBuffer(currentBuffer, outputMode); - for (String message : messages) { - send(message, true, outputMode); - } - } + int oversize = sendQueue.size() - SEND_QUEUE_MAX_SIZE; + if (sentFirstBatch && oversize > 0) { + int remove = oversize + 1; + for (int i = 0; i < remove; i++) { + sendQueue.pollLast(); } - clearBuffer(currentBuffer, outputMode); - } catch (Exception ex) { - logger.error("Failed to process console lines", ex); + + logger.warning("Skipping " + remove + " log messages because the send queue is backed up"); + } + + if (!shutdown && !discordSRV.isReady()) { + // Not ready yet + timeQueueProcess(); + return; + } + + try { + processSendQueue(); + } catch (Exception e) { + logger.error("Failed to send console lines", e); } if (sendFuture != null) { - sendFuture.whenComplete((__, ___) -> { - sendFuture = null; - timeQueueProcess(); - }); + sendFuture.whenComplete((v, t) -> timeQueueProcess()); } else { timeQueueProcess(); } } + private void processMessageQueue() { + ConsoleConfig.Appender appenderConfig = config.appender; + ConsoleConfig.OutputMode outputMode = appenderConfig.outputMode; + + Queue currentBuffer = new LinkedBlockingQueue<>(); + LogEntry entry; + while ((entry = messageQueue.poll()) != null) { + String level = entry.level().name(); + if (appenderConfig.levels.levels.contains(level) == appenderConfig.levels.blacklist) { + // Ignored level + continue; + } + + String loggerName = entry.loggerName(); + if (StringUtils.isEmpty(loggerName)) loggerName = "NONE"; + if (appenderConfig.loggers.loggers.contains(loggerName) == appenderConfig.loggers.blacklist) { + // Ignored logger + continue; + } + + List messages = formatEntry(entry, outputMode, config.appender.diffExceptions); + if (messages.size() == 1) { + LogMessage message = new LogMessage(entry, messages.get(0)); + currentBuffer.add(message); + } else { + clearBuffer(currentBuffer, outputMode); + for (String message : messages) { + queueMessage(message, true, outputMode); + } + } + } + clearBuffer(currentBuffer, outputMode); + } + private void clearBuffer(Queue currentBuffer, ConsoleConfig.OutputMode outputMode) { if (currentBuffer.isEmpty()) { return; @@ -272,7 +344,7 @@ public class SingleConsoleHandler { int blockLength = outputMode.blockLength(); - StringBuilder builder = new StringBuilder(); + StringBuilder builder = new StringBuilder(MESSAGE_MAX_LENGTH); if (messageCache != null) { for (LogMessage logMessage : messageCache) { builder.append(logMessage.formatted()); @@ -283,7 +355,7 @@ public class SingleConsoleHandler { while ((current = currentBuffer.poll()) != null) { String formatted = current.formatted(); if (formatted.length() + builder.length() + blockLength > MESSAGE_MAX_LENGTH) { - send(builder.toString(), true, outputMode); + queueMessage(builder.toString(), true, outputMode); builder.setLength(0); if (messageCache != null) { messageCache.clear(); @@ -297,56 +369,18 @@ public class SingleConsoleHandler { } if (builder.length() > 0) { - send(builder.toString(), false, outputMode); + queueMessage(builder.toString(), false, outputMode); } } - private void send(String message, boolean isFull, ConsoleConfig.OutputMode outputMode) { + private void queueMessage(String message, boolean lastEdit, ConsoleConfig.OutputMode outputMode) { SendableDiscordMessage sendableMessage = SendableDiscordMessage.builder() .setContent(outputMode.prefix() + message + outputMode.suffix()) .setSuppressedNotifications(config.appender.silentMessages) .setSuppressedEmbeds(config.appender.disableLinkEmbeds) .build(); - synchronized (sendLock) { - CompletableFuture future = sendFuture != null ? sendFuture : CompletableFuture.completedFuture(null); - - sendFuture = future - .thenCompose(__ -> - discordSRV.destinations() - .lookupDestination(config.channel.asDestination(), true, true) - ) - .thenApply(channels -> { - if (channels.isEmpty()) { - // Nowhere to send to - return null; - } - - DiscordGuildMessageChannel channel = channels.iterator().next(); - if (mostRecentMessageId != null) { - long messageId = mostRecentMessageId; - if (isFull) { - mostRecentMessageId = null; - } - return channel.editMessageById(messageId, sendableMessage); - } - - return channel.sendMessage(sendableMessage) - .whenComplete((receivedMessage, t) -> { - if (receivedMessage != null && messageCache != null) { - mostRecentMessageId = receivedMessage.getId(); - } - }); - }).exceptionally(ex -> { - String error = "Failed to send message to console channel"; - if (message.contains(error)) { - // Prevent infinite loop of the same error - return null; - } - logger.error(error, ex); - return null; - }); - } + sendQueue.offer(Pair.of(sendableMessage, lastEdit)); } private List formatEntry(LogEntry entry, ConsoleConfig.OutputMode outputMode, boolean diffExceptions) { @@ -477,4 +511,62 @@ public class SingleConsoleHandler { } return " "; } + + private void processSendQueue() { + Pair pair; + do { + pair = sendQueue.poll(); + if (pair == null) { + // *crickets* Nothing to send + continue; + } + SendableDiscordMessage sendableMessage = pair.getKey(); + boolean lastEdit = pair.getValue(); + + if (sendFuture == null) { + sendFuture = CompletableFuture.completedFuture(null); + } + + sendFuture = sendFuture + .thenCompose(__ -> discordSRV.destinations().lookupDestination(config.channel.asDestination(), true, true)) + .thenCompose(channels -> { + if (channels.isEmpty()) { + // Nowhere to send to + return null; + } + + DiscordGuildMessageChannel channel = channels.iterator().next(); + synchronized (mostRecentMessageId) { + long messageId = mostRecentMessageId.get(); + if (messageId != 0) { + if (lastEdit) { + mostRecentMessageId.set(0); + } + return channel.editMessageById(messageId, sendableMessage); + } + } + + return channel.sendMessage(sendableMessage); + }).thenApply(msg -> { + if (!lastEdit && msg != null && messageCache != null) { + synchronized (mostRecentMessageId) { + mostRecentMessageId.set(msg.getId()); + } + } + + sentFirstBatch = true; + return msg; + }).exceptionally(ex -> { + String error = "Failed to send message to console channel"; + String messageContent = sendableMessage.getContent(); + if (messageContent != null && messageContent.contains(error)) { + // Prevent infinite loop of the same error + return null; + } + + logger.error(error, ex); + return null; + }); + } while (pair != null); + } }