Improve console handling

This commit is contained in:
Vankka 2024-06-29 19:54:02 +03:00
parent 3c7d4074dc
commit ae60e7e423
No known key found for this signature in database
GPG Key ID: 62E48025ED4E7EBB
4 changed files with 279 additions and 122 deletions

View File

@ -27,6 +27,7 @@ import org.spongepowered.configurate.objectmapping.meta.Setting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@ConfigSerializable
public class DestinationConfig {
@ -45,23 +46,43 @@ public class DestinationConfig {
@Setting("channel-id")
public Long channelId = 0L;
@Setting("thread-name")
@Comment("If specified this destination will be a thread in the provided channel-id's channel, if left blank the destination will be the channel")
public String threadName = "";
public boolean privateThread = false;
@Setting(nodeFromParent = true)
public ThreadConfig thread = new ThreadConfig("");
public DestinationConfig asDestination() {
DestinationConfig config = new DestinationConfig();
if (StringUtils.isEmpty(threadName)) {
if (thread == null || StringUtils.isEmpty(thread.threadName)) {
config.channelIds.add(channelId);
} else {
ThreadConfig threadConfig = new ThreadConfig();
threadConfig.channelId = channelId;
threadConfig.threadName = threadName;
threadConfig.privateThread = privateThread;
config.threads.add(threadConfig);
config.threads.add(thread);
}
return config;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Single single = (Single) o;
return Objects.equals(channelId, single.channelId) && Objects.equals(thread, single.thread);
}
@Override
public int hashCode() {
return Objects.hash(channelId, thread);
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DestinationConfig that = (DestinationConfig) o;
return Objects.equals(channelIds, that.channelIds) && Objects.equals(threads, that.threads);
}
@Override
public int hashCode() {
return Objects.hash(channelIds, threads);
}
}

View File

@ -21,9 +21,17 @@ package com.discordsrv.common.config.main.generic;
import org.spongepowered.configurate.objectmapping.ConfigSerializable;
import org.spongepowered.configurate.objectmapping.meta.Comment;
import java.util.Objects;
@ConfigSerializable
public class ThreadConfig {
public ThreadConfig() {}
public ThreadConfig(String name) {
this.threadName = name;
}
@Comment("Specify the text or forum channel id and the name of the thread (the thread will be automatically created if it doesn't exist)")
public Long channelId = 0L;
@ -35,4 +43,19 @@ public class ThreadConfig {
@Comment("Does not effect forums")
public boolean privateThread = false;
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ThreadConfig that = (ThreadConfig) o;
return unarchiveExisting == that.unarchiveExisting
&& privateThread == that.privateThread
&& Objects.equals(channelId, that.channelId)
&& Objects.equals(threadName, that.threadName);
}
@Override
public int hashCode() {
return Objects.hash(channelId, threadName, unarchiveExisting, privateThread);
}
}

View File

@ -31,7 +31,6 @@ import com.discordsrv.common.logging.LogLevel;
import com.discordsrv.common.logging.NamedLogger;
import com.discordsrv.common.logging.backend.LoggingBackend;
import com.discordsrv.common.module.type.AbstractModule;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -57,6 +56,11 @@ public class ConsoleModule extends AbstractModule<DiscordSRV> implements LogAppe
return Collections.emptySet();
}
@Override
public boolean canEnableBeforeReady() {
return discordSRV.config() != null;
}
@Override
public void enable() {
backend = discordSRV.console().loggingBackend();
@ -65,15 +69,31 @@ public class ConsoleModule extends AbstractModule<DiscordSRV> implements LogAppe
@Override
public void reload(Consumer<DiscordSRVApi.ReloadResult> resultConsumer) {
for (SingleConsoleHandler handler : handlers) {
handler.shutdown();
}
handlers.clear();
List<ConsoleConfig> configs = discordSRV.config().console;
for (ConsoleConfig config : configs) {
Set<ConsoleConfig> uncheckedConfigs = new LinkedHashSet<>(configs);
for (int i = handlers.size() - 1; i >= 0; i--) {
SingleConsoleHandler handler = handlers.get(i);
ConsoleConfig matchingConfig = null;
for (ConsoleConfig config : configs) {
if (config.channel.equals(handler.getConfig().channel)) {
matchingConfig = config;
break;
}
}
if (matchingConfig != null) {
handler.setConfig(matchingConfig);
uncheckedConfigs.remove(matchingConfig);
} else {
handlers.remove(i);
discordSRV.scheduler().run(handler::shutdown);
}
}
for (ConsoleConfig config : uncheckedConfigs) {
DestinationConfig.Single destination = config.channel;
if (destination.channelId == 0L && StringUtils.isEmpty(destination.threadName)) {
if (destination.channelId == 0L) {
logger().debug("Skipping a console handler due to lack of channel");
continue;
}
@ -84,6 +104,7 @@ public class ConsoleModule extends AbstractModule<DiscordSRV> implements LogAppe
handlers.add(new SingleConsoleHandler(discordSRV, logger(), config));
}
logger().debug(handlers.size() + " console handlers active");
}
@Override

View File

@ -43,13 +43,12 @@ import com.discordsrv.common.logging.Logger;
import net.dv8tion.jda.api.entities.Message;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.tuple.Pair;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
* The log appending and command handling for a single console channel.
@ -57,20 +56,22 @@ import java.util.concurrent.TimeUnit;
public class SingleConsoleHandler {
private static final int MESSAGE_MAX_LENGTH = Message.MAX_CONTENT_LENGTH;
private static final int SEND_QUEUE_MAX_SIZE = 6;
private final DiscordSRV discordSRV;
private final Logger logger;
private final ConsoleConfig config;
private final Queue<LogEntry> queue;
private ConsoleConfig config;
private Queue<LogEntry> messageQueue;
private Deque<Pair<SendableDiscordMessage, Boolean>> sendQueue;
private Future<?> queueProcessingFuture;
private boolean shutdown = false;
// Editing
private final List<LogMessage> messageCache;
private Long mostRecentMessageId;
private List<LogMessage> messageCache;
private final AtomicLong mostRecentMessageId = new AtomicLong(0);
// Preventing concurrent sends
private final Object sendLock = new Object();
// Sending
private boolean sentFirstBatch = false;
private CompletableFuture<?> sendFuture;
// Don't annoy console users twice about using /
@ -79,11 +80,7 @@ public class SingleConsoleHandler {
public SingleConsoleHandler(DiscordSRV discordSRV, Logger logger, ConsoleConfig config) {
this.discordSRV = discordSRV;
this.logger = logger;
this.config = config;
this.queue = config.appender.outputMode != ConsoleConfig.OutputMode.OFF ? new LinkedBlockingQueue<>() : null;
this.messageCache = config.appender.useEditing ? new ArrayList<>() : null;
timeQueueProcess();
setConfig(config);
}
public void handleDiscordMessageReceived(DiscordMessageReceiveEvent event) {
@ -108,7 +105,7 @@ public class SingleConsoleHandler {
}
DestinationConfig.Single destination = config.channel;
String threadName = destination.threadName;
String threadName = destination.thread.threadName;
DiscordGuildChannel checkChannel;
if (StringUtils.isNotEmpty(threadName)) {
@ -176,36 +173,88 @@ public class SingleConsoleHandler {
if (messageCache != null) {
messageCache.clear();
}
mostRecentMessageId = null;
synchronized (mostRecentMessageId) {
mostRecentMessageId.set(0);
}
// Run the command
discordSRV.console().runCommandWithLogging(discordSRV, user, command);
}
public void queue(LogEntry entry) {
if (queue == null) {
if (messageQueue == null) {
return;
}
queue.offer(entry);
messageQueue.offer(entry);
}
@SuppressWarnings("SynchronizeOnNonFinalField")
public ConsoleConfig getConfig() {
return config;
}
public void setConfig(ConsoleConfig config) {
if (queueProcessingFuture != null) {
queueProcessingFuture.cancel(false);
}
this.config = config;
boolean sendOn = config.appender.outputMode != ConsoleConfig.OutputMode.OFF;
if (sendOn) {
if (messageQueue == null) {
this.messageQueue = new LinkedBlockingQueue<>();
this.sendQueue = new LinkedBlockingDeque<>();
}
} else {
if (messageQueue != null) {
this.messageQueue = null;
this.sendQueue = null;
}
}
boolean edit = config.appender.useEditing;
if (edit) {
if (messageCache == null) {
this.messageCache = new ArrayList<>();
}
} else {
if (messageCache != null) {
this.messageCache = null;
}
}
timeQueueProcess();
}
@SuppressWarnings({"BusyWait"})
public void shutdown() {
shutdown = true;
queueProcessingFuture.cancel(false);
processQueue();
try {
synchronized (queueProcessingFuture) {
queueProcessingFuture.wait(TimeUnit.SECONDS.toMillis(3));
long start = System.nanoTime();
while (!sendFuture.isDone()) {
if (System.nanoTime() - start > TimeUnit.SECONDS.toNanos(3)) {
break;
}
Thread.sleep(50);
}
} catch (InterruptedException e) {
sendFuture.cancel(true);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
queue.clear();
if (messageQueue != null) {
messageQueue.clear();
}
if (sendQueue != null) {
sendQueue.clear();
}
if (messageCache != null) {
messageCache.clear();
}
mostRecentMessageId = null;
mostRecentMessageId.set(0);
}
private void timeQueueProcess() {
@ -215,56 +264,79 @@ public class SingleConsoleHandler {
if (config.appender.outputMode == ConsoleConfig.OutputMode.OFF) {
return;
}
this.queueProcessingFuture = discordSRV.scheduler().runLater(this::processQueue, Duration.ofSeconds(2));
this.queueProcessingFuture = discordSRV.scheduler().runLater(this::processQueue, Duration.ofMillis(1500));
}
private void processQueue() {
try {
ConsoleConfig.Appender appenderConfig = config.appender;
ConsoleConfig.OutputMode outputMode = appenderConfig.outputMode;
processMessageQueue();
} catch (Exception e) {
logger.error("Failed to process console lines", e);
}
Queue<LogMessage> currentBuffer = new LinkedBlockingQueue<>();
LogEntry entry;
while ((entry = queue.poll()) != null) {
String level = entry.level().name();
if (appenderConfig.levels.levels.contains(level) == appenderConfig.levels.blacklist) {
// Ignored level
continue;
}
String loggerName = entry.loggerName();
if (StringUtils.isEmpty(loggerName)) loggerName = "NONE";
if (appenderConfig.loggers.loggers.contains(loggerName) == appenderConfig.loggers.blacklist) {
// Ignored logger
continue;
}
List<String> messages = formatEntry(entry, outputMode, config.appender.diffExceptions);
if (messages.size() == 1) {
LogMessage message = new LogMessage(entry, messages.get(0));
currentBuffer.add(message);
} else {
clearBuffer(currentBuffer, outputMode);
for (String message : messages) {
send(message, true, outputMode);
}
}
int oversize = sendQueue.size() - SEND_QUEUE_MAX_SIZE;
if (sentFirstBatch && oversize > 0) {
int remove = oversize + 1;
for (int i = 0; i < remove; i++) {
sendQueue.pollLast();
}
clearBuffer(currentBuffer, outputMode);
} catch (Exception ex) {
logger.error("Failed to process console lines", ex);
logger.warning("Skipping " + remove + " log messages because the send queue is backed up");
}
if (!shutdown && !discordSRV.isReady()) {
// Not ready yet
timeQueueProcess();
return;
}
try {
processSendQueue();
} catch (Exception e) {
logger.error("Failed to send console lines", e);
}
if (sendFuture != null) {
sendFuture.whenComplete((__, ___) -> {
sendFuture = null;
timeQueueProcess();
});
sendFuture.whenComplete((v, t) -> timeQueueProcess());
} else {
timeQueueProcess();
}
}
private void processMessageQueue() {
ConsoleConfig.Appender appenderConfig = config.appender;
ConsoleConfig.OutputMode outputMode = appenderConfig.outputMode;
Queue<LogMessage> currentBuffer = new LinkedBlockingQueue<>();
LogEntry entry;
while ((entry = messageQueue.poll()) != null) {
String level = entry.level().name();
if (appenderConfig.levels.levels.contains(level) == appenderConfig.levels.blacklist) {
// Ignored level
continue;
}
String loggerName = entry.loggerName();
if (StringUtils.isEmpty(loggerName)) loggerName = "NONE";
if (appenderConfig.loggers.loggers.contains(loggerName) == appenderConfig.loggers.blacklist) {
// Ignored logger
continue;
}
List<String> messages = formatEntry(entry, outputMode, config.appender.diffExceptions);
if (messages.size() == 1) {
LogMessage message = new LogMessage(entry, messages.get(0));
currentBuffer.add(message);
} else {
clearBuffer(currentBuffer, outputMode);
for (String message : messages) {
queueMessage(message, true, outputMode);
}
}
}
clearBuffer(currentBuffer, outputMode);
}
private void clearBuffer(Queue<LogMessage> currentBuffer, ConsoleConfig.OutputMode outputMode) {
if (currentBuffer.isEmpty()) {
return;
@ -272,7 +344,7 @@ public class SingleConsoleHandler {
int blockLength = outputMode.blockLength();
StringBuilder builder = new StringBuilder();
StringBuilder builder = new StringBuilder(MESSAGE_MAX_LENGTH);
if (messageCache != null) {
for (LogMessage logMessage : messageCache) {
builder.append(logMessage.formatted());
@ -283,7 +355,7 @@ public class SingleConsoleHandler {
while ((current = currentBuffer.poll()) != null) {
String formatted = current.formatted();
if (formatted.length() + builder.length() + blockLength > MESSAGE_MAX_LENGTH) {
send(builder.toString(), true, outputMode);
queueMessage(builder.toString(), true, outputMode);
builder.setLength(0);
if (messageCache != null) {
messageCache.clear();
@ -297,56 +369,18 @@ public class SingleConsoleHandler {
}
if (builder.length() > 0) {
send(builder.toString(), false, outputMode);
queueMessage(builder.toString(), false, outputMode);
}
}
private void send(String message, boolean isFull, ConsoleConfig.OutputMode outputMode) {
private void queueMessage(String message, boolean lastEdit, ConsoleConfig.OutputMode outputMode) {
SendableDiscordMessage sendableMessage = SendableDiscordMessage.builder()
.setContent(outputMode.prefix() + message + outputMode.suffix())
.setSuppressedNotifications(config.appender.silentMessages)
.setSuppressedEmbeds(config.appender.disableLinkEmbeds)
.build();
synchronized (sendLock) {
CompletableFuture<?> future = sendFuture != null ? sendFuture : CompletableFuture.completedFuture(null);
sendFuture = future
.thenCompose(__ ->
discordSRV.destinations()
.lookupDestination(config.channel.asDestination(), true, true)
)
.thenApply(channels -> {
if (channels.isEmpty()) {
// Nowhere to send to
return null;
}
DiscordGuildMessageChannel channel = channels.iterator().next();
if (mostRecentMessageId != null) {
long messageId = mostRecentMessageId;
if (isFull) {
mostRecentMessageId = null;
}
return channel.editMessageById(messageId, sendableMessage);
}
return channel.sendMessage(sendableMessage)
.whenComplete((receivedMessage, t) -> {
if (receivedMessage != null && messageCache != null) {
mostRecentMessageId = receivedMessage.getId();
}
});
}).exceptionally(ex -> {
String error = "Failed to send message to console channel";
if (message.contains(error)) {
// Prevent infinite loop of the same error
return null;
}
logger.error(error, ex);
return null;
});
}
sendQueue.offer(Pair.of(sendableMessage, lastEdit));
}
private List<String> formatEntry(LogEntry entry, ConsoleConfig.OutputMode outputMode, boolean diffExceptions) {
@ -477,4 +511,62 @@ public class SingleConsoleHandler {
}
return " ";
}
private void processSendQueue() {
Pair<SendableDiscordMessage, Boolean> pair;
do {
pair = sendQueue.poll();
if (pair == null) {
// *crickets* Nothing to send
continue;
}
SendableDiscordMessage sendableMessage = pair.getKey();
boolean lastEdit = pair.getValue();
if (sendFuture == null) {
sendFuture = CompletableFuture.completedFuture(null);
}
sendFuture = sendFuture
.thenCompose(__ -> discordSRV.destinations().lookupDestination(config.channel.asDestination(), true, true))
.thenCompose(channels -> {
if (channels.isEmpty()) {
// Nowhere to send to
return null;
}
DiscordGuildMessageChannel channel = channels.iterator().next();
synchronized (mostRecentMessageId) {
long messageId = mostRecentMessageId.get();
if (messageId != 0) {
if (lastEdit) {
mostRecentMessageId.set(0);
}
return channel.editMessageById(messageId, sendableMessage);
}
}
return channel.sendMessage(sendableMessage);
}).thenApply(msg -> {
if (!lastEdit && msg != null && messageCache != null) {
synchronized (mostRecentMessageId) {
mostRecentMessageId.set(msg.getId());
}
}
sentFirstBatch = true;
return msg;
}).exceptionally(ex -> {
String error = "Failed to send message to console channel";
String messageContent = sendableMessage.getContent();
if (messageContent != null && messageContent.contains(error)) {
// Prevent infinite loop of the same error
return null;
}
logger.error(error, ex);
return null;
});
} while (pair != null);
}
}