Attempt to fix processing issues on large networks (#498):

- Moved from ArrayBlockingQueue implementation to ExecutorService
- Two ExecutorServices
  - max 2 thread critical service
  - max 6 thread non-critical service
- Critical ExecutorService is used for player data
  - Unprocessed saved on disable, should not cause freezes like old system
This commit is contained in:
Rsl1122 2018-04-01 13:53:33 +03:00
parent e44fc781e4
commit d624cd596f
45 changed files with 284 additions and 327 deletions

View File

@ -7,7 +7,7 @@ import com.djrapitops.plan.system.info.InfoSystem;
import com.djrapitops.plan.system.info.connection.ConnectionSystem;
import com.djrapitops.plan.system.info.server.Server;
import com.djrapitops.plan.system.info.server.ServerInfo;
import com.djrapitops.plan.system.processing.Processor;
import com.djrapitops.plan.system.processing.Processing;
import com.djrapitops.plan.system.settings.Permissions;
import com.djrapitops.plan.system.settings.locale.Locale;
import com.djrapitops.plan.system.settings.locale.Msg;
@ -49,7 +49,7 @@ public class AnalyzeCommand extends SubCommand {
public boolean onCommand(ISender sender, String commandLabel, String[] args) {
sender.sendMessage(Locale.get(Msg.CMD_INFO_FETCH_DATA).toString());
Processor.queue(() -> {
Processing.submitNonCritical(() -> {
try {
Server server = getServer(args).orElseGet(ServerInfo::getServer);
UUID serverUUID = server.getUuid();
@ -59,7 +59,6 @@ public class AnalyzeCommand extends SubCommand {
sendWebUserNotificationIfNecessary(sender);
sendLink(server, sender);
} catch (DBException | WebException e) {
// TODO Exception handling
sender.sendMessage(ChatColor.RED + " Error occurred: " + e.toString());
Log.toLog(this.getClass(), e);
}

View File

@ -3,6 +3,7 @@ package com.djrapitops.plan.command.commands;
import com.djrapitops.plan.api.exceptions.database.DBException;
import com.djrapitops.plan.api.exceptions.database.FatalDBException;
import com.djrapitops.plan.system.database.databases.Database;
import com.djrapitops.plan.system.processing.Processing;
import com.djrapitops.plan.system.processing.processors.info.InspectCacheRequestProcessor;
import com.djrapitops.plan.system.settings.Permissions;
import com.djrapitops.plan.system.settings.locale.Locale;
@ -72,7 +73,7 @@ public class InspectCommand extends SubCommand {
sender.sendMessage(ChatColor.YELLOW + "[Plan] You might not have a web user, use /plan register <password>");
}
}
new InspectCacheRequestProcessor(uuid, sender, playerName).queue();
Processing.submit(new InspectCacheRequestProcessor(uuid, sender, playerName));
} catch (FatalDBException ex) {
Log.toLog(this.getClass(), ex);
sender.sendMessage(ChatColor.RED + "Fatal database exception occurred: " + ex.getMessage());

View File

@ -8,7 +8,7 @@ import com.djrapitops.plan.system.info.InfoSystem;
import com.djrapitops.plan.system.info.request.CheckConnectionRequest;
import com.djrapitops.plan.system.info.server.Server;
import com.djrapitops.plan.system.info.server.ServerInfo;
import com.djrapitops.plan.system.processing.Processor;
import com.djrapitops.plan.system.processing.Processing;
import com.djrapitops.plan.system.settings.Permissions;
import com.djrapitops.plan.system.settings.locale.Locale;
import com.djrapitops.plan.system.settings.locale.Msg;
@ -54,7 +54,7 @@ public class ManageConDebugCommand extends SubCommand {
return true;
}
Processor.queue(() -> {
Processing.submitNonCritical(() -> {
testServers(sender);
});

View File

@ -2,7 +2,7 @@ package com.djrapitops.plan.command.commands.manage;
import com.djrapitops.plan.api.exceptions.connection.*;
import com.djrapitops.plan.system.info.InfoSystem;
import com.djrapitops.plan.system.processing.Processor;
import com.djrapitops.plan.system.processing.Processing;
import com.djrapitops.plan.system.settings.Permissions;
import com.djrapitops.plan.system.settings.Settings;
import com.djrapitops.plan.system.settings.locale.Locale;
@ -59,7 +59,7 @@ public class ManageSetupCommand extends SubCommand {
}
private void requestSetup(ISender sender, String address) {
Processor.queue(() -> {
Processing.submitNonCritical(() -> {
try {
Settings.BUNGEE_OVERRIDE_STANDALONE_MODE.set(false);
Settings.BUNGEE_COPY_CONFIG.set(true);

View File

@ -13,7 +13,7 @@ import com.djrapitops.plan.system.file.FileSystem;
import com.djrapitops.plan.system.info.InfoSystem;
import com.djrapitops.plan.system.info.server.ServerInfo;
import com.djrapitops.plan.system.listeners.ListenerSystem;
import com.djrapitops.plan.system.processing.ProcessingQueue;
import com.djrapitops.plan.system.processing.Processing;
import com.djrapitops.plan.system.settings.config.ConfigSystem;
import com.djrapitops.plan.system.tasks.TaskSystem;
import com.djrapitops.plan.system.update.VersionCheckSystem;
@ -34,7 +34,7 @@ public abstract class PlanSystem implements SubSystem {
protected static PlanSystem testSystem;
// Initialized in this class
protected final ProcessingQueue processingQueue;
private Processing processing;
protected final WebServerSystem webServerSystem;
protected final CacheSystem cacheSystem;
@ -55,7 +55,7 @@ public abstract class PlanSystem implements SubSystem {
protected PlanAPI planAPI;
public PlanSystem() {
processingQueue = new ProcessingQueue();
processing = new Processing();
webServerSystem = new WebServerSystem();
cacheSystem = new CacheSystem(this);
}
@ -83,7 +83,7 @@ public abstract class PlanSystem implements SubSystem {
configSystem,
databaseSystem,
webServerSystem,
processingQueue,
processing,
serverInfo,
infoSystem,
cacheSystem,
@ -103,7 +103,7 @@ public abstract class PlanSystem implements SubSystem {
hookHandler,
cacheSystem,
listenerSystem,
processingQueue,
processing,
databaseSystem,
webServerSystem,
infoSystem,
@ -142,10 +142,6 @@ public abstract class PlanSystem implements SubSystem {
// Accessor methods.
public ProcessingQueue getProcessingQueue() {
return processingQueue;
}
public VersionCheckSystem getVersionCheckSystem() {
return versionCheckSystem;
}
@ -193,4 +189,8 @@ public abstract class PlanSystem implements SubSystem {
public PlanAPI getPlanAPI() {
return planAPI;
}
public Processing getProcessing() {
return processing;
}
}

View File

@ -5,7 +5,7 @@ import com.djrapitops.plan.data.container.Session;
import com.djrapitops.plan.system.PlanSystem;
import com.djrapitops.plan.system.database.databases.Database;
import com.djrapitops.plan.system.info.connection.WebExceptionLogger;
import com.djrapitops.plan.system.processing.Processor;
import com.djrapitops.plan.system.processing.Processing;
import com.djrapitops.plan.utilities.MiscUtils;
import com.djrapitops.plugin.api.utility.log.Log;
import com.djrapitops.plugin.utilities.Verify;
@ -54,7 +54,7 @@ public class SessionCache {
public void cacheSession(UUID uuid, Session session) {
activeSessions.put(uuid, session);
Processor.queue(() -> WebExceptionLogger.logIfOccurs(this.getClass(), () ->
Processing.submitNonCritical(() -> WebExceptionLogger.logIfOccurs(this.getClass(), () ->
system.getInfoSystem().generateAndCachePlayerPage(uuid))
);
}

View File

@ -182,7 +182,6 @@ public class GeoInfoTable extends UserIDTable {
public List<String> getNetworkGeolocations() throws SQLException {
String subQuery = "SELECT " +
Col.USER_ID + ", " +
Col.GEOLOCATION + ", " +
"MAX(" + Col.LAST_USED + ") as max" +
" FROM " + tableName +
" GROUP BY " + Col.USER_ID;

View File

@ -11,7 +11,7 @@ import com.djrapitops.plan.system.database.databases.Database;
import com.djrapitops.plan.system.info.request.*;
import com.djrapitops.plan.system.info.server.Server;
import com.djrapitops.plan.system.info.server.ServerInfo;
import com.djrapitops.plan.system.processing.Processor;
import com.djrapitops.plan.system.processing.Processing;
import com.djrapitops.plan.system.settings.Settings;
import com.djrapitops.plan.system.settings.locale.Locale;
import com.djrapitops.plan.system.settings.locale.Msg;
@ -39,7 +39,7 @@ public class BukkitConnectionSystem extends ConnectionSystem {
}
private void refreshServerMap() {
Processor.queue(() -> {
Processing.submitNonCritical(() -> {
if (latestServerMapRefresh < MiscUtils.getTime() - TimeAmount.SECOND.ms() * 15L) {
try {
Database database = Database.getActive();

View File

@ -8,7 +8,7 @@ import com.djrapitops.plan.api.exceptions.connection.TransferDatabaseException;
import com.djrapitops.plan.api.exceptions.connection.WebException;
import com.djrapitops.plan.api.exceptions.database.DBException;
import com.djrapitops.plan.system.database.databases.Database;
import com.djrapitops.plan.system.processing.Processor;
import com.djrapitops.plan.system.processing.Processing;
import com.djrapitops.plan.system.settings.Settings;
import com.djrapitops.plan.system.webserver.response.DefaultResponses;
import com.djrapitops.plan.system.webserver.response.Response;
@ -82,7 +82,7 @@ public class CacheAnalysisPageRequest implements CacheRequest {
private void cache(boolean export, UUID serverUUID, String html) {
ResponseCache.cacheResponse(PageId.SERVER.of(serverUUID), () -> new AnalysisPageResponse(html));
if (export) {
Processor.queue(() -> HtmlExport.exportServer(serverUUID));
Processing.submitNonCritical(() -> HtmlExport.exportServer(serverUUID));
}
}

View File

@ -9,7 +9,7 @@ import com.djrapitops.plan.api.exceptions.connection.WebException;
import com.djrapitops.plan.api.exceptions.database.DBException;
import com.djrapitops.plan.system.database.databases.Database;
import com.djrapitops.plan.system.info.server.ServerInfo;
import com.djrapitops.plan.system.processing.Processor;
import com.djrapitops.plan.system.processing.Processing;
import com.djrapitops.plan.system.settings.Settings;
import com.djrapitops.plan.system.webserver.response.DefaultResponses;
import com.djrapitops.plan.system.webserver.response.Response;
@ -87,7 +87,7 @@ public class CacheInspectPageRequest implements CacheRequest {
private void cache(boolean export, UUID uuid, String html) {
ResponseCache.cacheResponse(PageId.PLAYER.of(uuid), () -> new InspectPageResponse(uuid, html));
if (export) {
Processor.queue(() -> HtmlExport.exportPlayer(uuid));
Processing.submitNonCritical(() -> HtmlExport.exportPlayer(uuid));
}
}

View File

@ -59,7 +59,9 @@ public class GenerateAnalysisPageRequest extends InfoRequestWithVariables implem
throw new BadRequestException("Requested Analysis page from wrong server.");
}
generateAndCache(serverUUID);
if (!Analysis.isAnalysisBeingRun()) {
generateAndCache(serverUUID);
}
return DefaultResponses.SUCCESS.get();
}

View File

@ -1,6 +1,7 @@
package com.djrapitops.plan.system.listeners.bukkit;
import com.djrapitops.plan.system.cache.SessionCache;
import com.djrapitops.plan.system.processing.Processing;
import com.djrapitops.plan.system.processing.processors.player.NameProcessor;
import com.djrapitops.plugin.api.utility.log.Log;
import org.bukkit.entity.Player;
@ -40,7 +41,7 @@ public class ChatListener implements Listener {
sessionCache.firstSessionMessageSent(uuid);
}
new NameProcessor(uuid, name, displayName).queue();
Processing.submit(new NameProcessor(uuid, name, displayName));
} catch (Exception e) {
Log.toLog(this.getClass(), e);
}

View File

@ -1,6 +1,7 @@
package com.djrapitops.plan.system.listeners.bukkit;
import com.djrapitops.plan.Plan;
import com.djrapitops.plan.system.processing.Processing;
import com.djrapitops.plan.system.processing.processors.CommandProcessor;
import com.djrapitops.plan.system.settings.Permissions;
import com.djrapitops.plan.system.settings.Settings;
@ -57,7 +58,7 @@ public class CommandPreprocessListener implements Listener {
commandName = command.getName();
}
}
new CommandProcessor(commandName).queue();
Processing.submit(new CommandProcessor(commandName));
} catch (Exception e) {
Log.toLog(this.getClass(), e);
}

View File

@ -2,7 +2,7 @@ package com.djrapitops.plan.system.listeners.bukkit;
import com.djrapitops.plan.data.container.Session;
import com.djrapitops.plan.system.cache.SessionCache;
import com.djrapitops.plan.system.processing.Processor;
import com.djrapitops.plan.system.processing.Processing;
import com.djrapitops.plan.system.processing.processors.player.KillProcessor;
import com.djrapitops.plan.utilities.MiscUtils;
import com.djrapitops.plugin.api.utility.log.Log;
@ -37,7 +37,7 @@ public class DeathEventListener implements Listener {
if (dead instanceof Player) {
// Process Death
Processor.queue(() -> SessionCache.getCachedSession(dead.getUniqueId()).ifPresent(Session::died));
Processing.submitCritical(() -> SessionCache.getCachedSession(dead.getUniqueId()).ifPresent(Session::died));
}
try {
@ -65,7 +65,7 @@ public class DeathEventListener implements Listener {
processor = handleArrowKill(time, dead, (Arrow) killerEntity);
}
if (processor != null) {
processor.queue();
Processing.submit(processor);
}
}

View File

@ -2,7 +2,7 @@ package com.djrapitops.plan.system.listeners.bukkit;
import com.djrapitops.plan.data.container.Session;
import com.djrapitops.plan.system.cache.SessionCache;
import com.djrapitops.plan.system.processing.Processor;
import com.djrapitops.plan.system.processing.Processing;
import com.djrapitops.plan.system.processing.processors.info.NetworkPageUpdateProcessor;
import com.djrapitops.plan.system.processing.processors.player.*;
import com.djrapitops.plan.system.tasks.TaskSystem;
@ -41,7 +41,7 @@ public class PlayerOnlineListener implements Listener {
UUID uuid = event.getPlayer().getUniqueId();
boolean op = event.getPlayer().isOp();
boolean banned = result == PlayerLoginEvent.Result.KICK_BANNED;
new BanAndOpProcessor(uuid, banned, op).queue();
Processing.submit(new BanAndOpProcessor(uuid, banned, op));
} catch (Exception e) {
Log.toLog(this.getClass(), e);
}
@ -62,7 +62,7 @@ public class PlayerOnlineListener implements Listener {
return;
}
UUID uuid = event.getPlayer().getUniqueId();
new KickProcessor(uuid).queue();
Processing.submit(new KickProcessor(uuid));
} catch (Exception e) {
Log.toLog(this.getClass(), e);
}
@ -96,13 +96,13 @@ public class PlayerOnlineListener implements Listener {
SessionCache.getInstance().cacheSession(uuid, new Session(time, world, gm));
Processor.queueMany(
Processing.submit(
new RegisterProcessor(uuid, player.getFirstPlayed(), time, playerName, playersOnline,
new IPUpdateProcessor(uuid, ip, time),
new NameProcessor(uuid, playerName, displayName)
),
new NetworkPageUpdateProcessor()
)
);
Processing.submit(new NetworkPageUpdateProcessor());
} catch (Exception e) {
Log.toLog(this.getClass(), e);
}
@ -122,16 +122,14 @@ public class PlayerOnlineListener implements Listener {
Player player = event.getPlayer();
UUID uuid = player.getUniqueId();
Processor.queueMany(
new BanAndOpProcessor(uuid, player.isBanned(), player.isOp()),
new EndSessionProcessor(uuid, time),
new NetworkPageUpdateProcessor()
);
Processing.submit(new BanAndOpProcessor(uuid, player.isBanned(), player.isOp()));
Processing.submit(new EndSessionProcessor(uuid, time));
Processing.submit(new NetworkPageUpdateProcessor());
SessionCache sessionCache = SessionCache.getInstance();
if (sessionCache.isFirstSession(uuid)) {
int messagesSent = sessionCache.getFirstSessionMsgCount(uuid);
new FirstLeaveProcessor(uuid, time, messagesSent).queue();
Processing.submit(new FirstLeaveProcessor(uuid, time, messagesSent));
}
} catch (Exception e) {
Log.toLog(this.getClass(), e);

View File

@ -5,6 +5,7 @@
package com.djrapitops.plan.system.listeners.bungee;
import com.djrapitops.plan.PlanBungee;
import com.djrapitops.plan.system.processing.Processing;
import com.djrapitops.plan.system.processing.processors.player.BungeePlayerRegisterProcessor;
import com.djrapitops.plan.utilities.MiscUtils;
import com.djrapitops.plugin.api.utility.log.Log;
@ -36,7 +37,7 @@ public class PlayerOnlineListener implements Listener {
String name = player.getName();
long now = MiscUtils.getTime();
plugin.getSystem().getProcessingQueue().queue(new BungeePlayerRegisterProcessor(uuid, name, now));
Processing.submit(new BungeePlayerRegisterProcessor(uuid, name, now));
} catch (Exception e) {
Log.toLog(this.getClass(), e);
}

View File

@ -0,0 +1,6 @@
package com.djrapitops.plan.system.processing;
import java.util.concurrent.Callable;
public interface CriticalCallable<T> extends Callable<T> {
}

View File

@ -0,0 +1,4 @@
package com.djrapitops.plan.system.processing;
public interface CriticalRunnable extends Runnable {
}

View File

@ -0,0 +1,112 @@
package com.djrapitops.plan.system.processing;
import com.djrapitops.plan.PlanPlugin;
import com.djrapitops.plan.api.exceptions.EnableException;
import com.djrapitops.plan.system.PlanSystem;
import com.djrapitops.plan.system.SubSystem;
import com.djrapitops.plugin.StaticHolder;
import com.djrapitops.plugin.api.utility.log.Log;
import com.djrapitops.plugin.utilities.Verify;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Processing implements SubSystem {
private final ExecutorService nonCriticalExecutor;
private final ExecutorService criticalExecutor;
public Processing() {
nonCriticalExecutor = Executors.newFixedThreadPool(6);
criticalExecutor = Executors.newFixedThreadPool(2);
}
public static void submit(Runnable runnable) {
if (runnable instanceof CriticalRunnable) {
submitCritical(runnable);
}
submitNonCritical(runnable);
}
public static void saveInstance(Object obj) {
StaticHolder.saveInstance(obj.getClass(), PlanPlugin.getInstance().getClass());
}
public static void submitNonCritical(Runnable runnable) {
saveInstance(runnable);
getInstance().nonCriticalExecutor.submit(runnable);
}
public static void submitCritical(Runnable runnable) {
saveInstance(runnable);
getInstance().criticalExecutor.submit(runnable);
}
public static void submitNonCritical(Runnable... runnables) {
ExecutorService nonCriticalExecutor = getInstance().nonCriticalExecutor;
for (Runnable runnable : runnables) {
saveInstance(runnable);
nonCriticalExecutor.submit(runnable);
}
}
public static void submitCritical(Runnable... runnables) {
ExecutorService criticalExecutor = getInstance().criticalExecutor;
for (Runnable runnable : runnables) {
saveInstance(runnable);
criticalExecutor.submit(runnable);
}
}
public static <T> Future<T> submit(Callable<T> task) {
saveInstance(task);
if (task instanceof CriticalCallable) {
return submitCritical(task);
}
return submitNonCritical(task);
}
public static <T> Future<T> submitNonCritical(Callable<T> task) {
saveInstance(task);
return getInstance().nonCriticalExecutor.submit(task);
}
public static <T> Future<T> submitCritical(Callable<T> task) {
saveInstance(task);
return getInstance().criticalExecutor.submit(task);
}
public static Processing getInstance() {
Processing processing = PlanSystem.getInstance().getProcessing();
Verify.nullCheck(processing, () -> new IllegalStateException("Processing System has not been initialized."));
return processing;
}
@Override
public void enable() throws EnableException {
if (nonCriticalExecutor.isShutdown()) {
throw new EnableException("Non Critical ExecutorService was shut down on enable");
}
if (criticalExecutor.isShutdown()) {
throw new EnableException("Critical ExecutorService was shut down on enable");
}
}
@Override
public void disable() {
nonCriticalExecutor.shutdown();
List<Runnable> criticalTasks = criticalExecutor.shutdownNow();
Log.info("Processing critical unprocessed tasks. (" + criticalTasks.size() + ")");
for (Runnable runnable : criticalTasks) {
try {
runnable.run();
} catch (Exception | NoClassDefFoundError | NoSuchMethodError | NoSuchFieldError e) {
Log.toLog(this.getClass(), e);
}
}
Log.info("Processing complete.");
}
}

View File

@ -1,102 +0,0 @@
package com.djrapitops.plan.system.processing;
import com.djrapitops.plan.system.PlanSystem;
import com.djrapitops.plan.system.SubSystem;
import com.djrapitops.plan.utilities.queue.Consumer;
import com.djrapitops.plan.utilities.queue.Queue;
import com.djrapitops.plan.utilities.queue.Setup;
import com.djrapitops.plugin.api.Benchmark;
import com.djrapitops.plugin.api.utility.log.Log;
import com.djrapitops.plugin.utilities.Verify;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* This Class is starts the Process Queue Thread, that processes Processor
* objects.
*
* @author Rsl1122
* @since 3.0.0
*/
public class ProcessingQueue extends Queue<Processor> implements SubSystem {
public ProcessingQueue() {
super(new ArrayBlockingQueue<>(20000));
setup = new ProcessSetup(queue);
}
public static ProcessingQueue getInstance() {
ProcessingQueue processingQueue = PlanSystem.getInstance().getProcessingQueue();
Verify.nullCheck(processingQueue, () -> new IllegalStateException("ProcessingQueue has not been initialized."));
return processingQueue;
}
@Override
public void enable() {
setup.go();
}
@Override
public void disable() {
List<Processor> processors = stopAndReturnLeftovers();
Log.info("Processing unprocessed processors. (" + processors.size() + ")");
for (Processor processor : processors) {
try {
processor.process();
} catch (Exception e) {
Log.toLog(this.getClass(), e);
}
}
}
/**
* Used to add Processor object to be processed.
*
* @param processor processing object.
*/
public void queue(Processor processor) {
if (!queue.offer(processor)) {
Log.toLog(Processor.class, new IllegalStateException("Processor was not added to Queue"));
}
}
}
class ProcessConsumer extends Consumer<Processor> {
ProcessConsumer(BlockingQueue<Processor> q) {
super(q, "ProcessQueueConsumer");
}
@Override
protected void consume(Processor process) {
if (process == null) {
return;
}
try {
String benchName = "Processed " + process.getClass().getSimpleName() + ".";
Benchmark.start(benchName);
process.process();
Benchmark.stop(benchName);
} catch (Exception | NoClassDefFoundError | NoSuchFieldError | NoSuchMethodError e) {
Log.toLog(process.getClass(), e);
}
}
}
class ProcessSetup extends Setup<Processor> {
ProcessSetup(BlockingQueue<Processor> q) {
super(
new ProcessConsumer(q),
new ProcessConsumer(q),
new ProcessConsumer(q),
new ProcessConsumer(q),
new ProcessConsumer(q),
new ProcessConsumer(q),
new ProcessConsumer(q),
new ProcessConsumer(q)
);
}
}

View File

@ -1,36 +0,0 @@
/*
* Licence is provided in the jar as license.yml also here:
* https://github.com/Rsl1122/Plan-PlayerAnalytics/blob/master/Plan/src/main/resources/license.yml
*/
package com.djrapitops.plan.system.processing;
/**
* Interface for ProcessingQueue.
* <p>
* Allows lambda Processor creation.
*
* @author Rsl1122
*/
public interface Processor {
static void queueMany(Processor... processors) {
ProcessingQueue processingQueue = ProcessingQueue.getInstance();
for (Processor processor : processors) {
processingQueue.queue(processor);
}
}
/**
* A way to run code Async in ProcessingQueue.
* <p>
* Good for lambdas.
*
* @param processor Processor.
*/
static void queue(Processor processor) {
ProcessingQueue.getInstance().queue(processor);
}
void process();
}

View File

@ -6,6 +6,7 @@ package com.djrapitops.plan.system.processing.processors;
import com.djrapitops.plan.api.exceptions.database.DBException;
import com.djrapitops.plan.system.database.databases.Database;
import com.djrapitops.plan.system.processing.CriticalRunnable;
import com.djrapitops.plugin.api.utility.log.Log;
/**
@ -13,16 +14,18 @@ import com.djrapitops.plugin.api.utility.log.Log;
*
* @author Rsl1122
*/
public class CommandProcessor extends ObjectProcessor<String> {
public class CommandProcessor implements CriticalRunnable {
public CommandProcessor(String object) {
super(object);
private final String command;
public CommandProcessor(String command) {
this.command = command;
}
@Override
public void process() {
public void run() {
try {
Database.getActive().save().commandUsed(object);
Database.getActive().save().commandUsed(command);
} catch (DBException e) {
Log.toLog(this.getClass(), e);
}

View File

@ -1,31 +0,0 @@
/*
* Licence is provided in the jar as license.yml also here:
* https://github.com/Rsl1122/Plan-PlayerAnalytics/blob/master/Plan/src/main/resources/license.yml
*/
package com.djrapitops.plan.system.processing.processors;
import com.djrapitops.plan.system.processing.Processor;
/**
* Abstract class for processing different objects using Generics.
*
* @author Rsl1122
*/
public abstract class ObjectProcessor<T> implements Processor {
protected final T object;
public ObjectProcessor(T object) {
this.object = object;
}
@Override
public abstract void process();
protected T getObject() {
return object;
}
public void queue() {
Processor.queue(this);
}
}

View File

@ -8,6 +8,7 @@ import com.djrapitops.plan.api.exceptions.database.DBException;
import com.djrapitops.plan.data.container.TPS;
import com.djrapitops.plan.data.container.builders.TPSBuilder;
import com.djrapitops.plan.system.database.databases.Database;
import com.djrapitops.plan.system.processing.CriticalRunnable;
import com.djrapitops.plan.utilities.analysis.MathUtils;
import com.djrapitops.plugin.api.utility.log.Log;
@ -18,15 +19,17 @@ import java.util.List;
*
* @author Rsl1122
*/
public class TPSInsertProcessor extends ObjectProcessor<List<TPS>> {
public class TPSInsertProcessor implements CriticalRunnable {
public TPSInsertProcessor(List<TPS> object) {
super(object);
private final List<TPS> tpsList;
public TPSInsertProcessor(List<TPS> tpsList) {
this.tpsList = tpsList;
}
@Override
public void process() {
List<TPS> history = object;
public void run() {
List<TPS> history = tpsList;
final long lastDate = history.get(history.size() - 1).getDate();
final double averageTPS = MathUtils.round(MathUtils.averageDouble(history.stream().map(TPS::getTicksPerSecond)));
final int peakPlayersOnline = history.stream().mapToInt(TPS::getPlayers).max().orElse(0);

View File

@ -8,7 +8,6 @@ import com.djrapitops.plan.api.exceptions.connection.*;
import com.djrapitops.plan.system.cache.SessionCache;
import com.djrapitops.plan.system.info.InfoSystem;
import com.djrapitops.plan.system.info.connection.ConnectionSystem;
import com.djrapitops.plan.system.processing.processors.player.PlayerProcessor;
import com.djrapitops.plan.system.settings.locale.Locale;
import com.djrapitops.plan.system.settings.locale.Msg;
import com.djrapitops.plugin.api.utility.log.Log;
@ -22,26 +21,26 @@ import java.util.UUID;
*
* @author Rsl1122
*/
public class InspectCacheRequestProcessor extends PlayerProcessor {
public class InspectCacheRequestProcessor implements Runnable {
private final UUID uuid;
private final ISender sender;
private final String playerName;
public InspectCacheRequestProcessor(UUID uuid, ISender sender, String playerName) {
super(uuid);
this.playerName = playerName;
this.uuid = uuid;
this.sender = sender;
this.playerName = playerName;
}
@Override
public void process() {
public void run() {
SessionCache.refreshActiveSessionsState();
try {
InfoSystem.getInstance().generateAndCachePlayerPage(getUUID());
InfoSystem.getInstance().generateAndCachePlayerPage(uuid);
sendInspectMsg(sender, playerName);
} catch (ConnectionFailException | UnsupportedTransferDatabaseException | UnauthorizedServerException
| NotFoundException | NoServersException e) {
// TODO Test if this is appropriate
sender.sendMessage("§c" + e.getMessage());
} catch (WebException e) {
Log.toLog(this.getClass(), e);

View File

@ -6,7 +6,6 @@ package com.djrapitops.plan.system.processing.processors.info;
import com.djrapitops.plan.api.exceptions.connection.WebException;
import com.djrapitops.plan.system.info.InfoSystem;
import com.djrapitops.plan.system.processing.Processor;
import com.djrapitops.plugin.api.utility.log.Log;
/**
@ -14,10 +13,10 @@ import com.djrapitops.plugin.api.utility.log.Log;
*
* @author Rsl1122
*/
public class NetworkPageUpdateProcessor implements Processor {
public class NetworkPageUpdateProcessor implements Runnable {
@Override
public void process() {
public void run() {
try {
InfoSystem.getInstance().updateNetworkPage();
} catch (WebException e) {

View File

@ -16,24 +16,24 @@ import java.util.UUID;
*
* @author Rsl1122
*/
public class BanAndOpProcessor extends PlayerProcessor {
public class BanAndOpProcessor implements Runnable {
private final UUID uuid;
private final boolean banned;
private final boolean opped;
private final boolean op;
public BanAndOpProcessor(UUID uuid, boolean banned, boolean op) {
super(uuid);
this.uuid = uuid;
this.banned = banned;
opped = op;
this.op = op;
}
@Override
public void process() {
UUID uuid = getUUID();
public void run() {
try {
SaveOperations save = Database.getActive().save();
save.banStatus(uuid, banned);
save.opStatus(uuid, opped);
save.opStatus(uuid, op);
} catch (DBException e) {
Log.toLog(this.getClass(), e);
}

View File

@ -6,6 +6,7 @@ package com.djrapitops.plan.system.processing.processors.player;
import com.djrapitops.plan.api.exceptions.database.DBException;
import com.djrapitops.plan.system.database.databases.Database;
import com.djrapitops.plan.system.processing.CriticalRunnable;
import com.djrapitops.plugin.api.utility.log.Log;
import java.util.UUID;
@ -15,20 +16,20 @@ import java.util.UUID;
*
* @author Rsl1122
*/
public class BungeePlayerRegisterProcessor extends PlayerProcessor {
public class BungeePlayerRegisterProcessor implements CriticalRunnable {
private final UUID uuid;
private final String name;
private final long registered;
public BungeePlayerRegisterProcessor(UUID uuid, String name, long registered) {
super(uuid);
this.uuid = uuid;
this.name = name;
this.registered = registered;
}
@Override
public void process() {
UUID uuid = getUUID();
public void run() {
Database database = Database.getActive();
try {
if (database.check().isPlayerRegistered(uuid)) {

View File

@ -5,6 +5,7 @@
package com.djrapitops.plan.system.processing.processors.player;
import com.djrapitops.plan.system.cache.SessionCache;
import com.djrapitops.plan.system.processing.CriticalRunnable;
import java.util.UUID;
@ -13,18 +14,18 @@ import java.util.UUID;
*
* @author Rsl1122
*/
public class EndSessionProcessor extends PlayerProcessor {
public class EndSessionProcessor implements CriticalRunnable {
private final UUID uuid;
private final long time;
public EndSessionProcessor(UUID uuid, long time) {
super(uuid);
this.uuid = uuid;
this.time = time;
}
@Override
public void process() {
UUID uuid = getUUID();
public void run() {
SessionCache.getInstance().endSession(uuid, time);
}
}

View File

@ -9,6 +9,7 @@ import com.djrapitops.plan.data.Actions;
import com.djrapitops.plan.data.container.Action;
import com.djrapitops.plan.system.cache.SessionCache;
import com.djrapitops.plan.system.database.databases.Database;
import com.djrapitops.plan.system.processing.CriticalRunnable;
import com.djrapitops.plugin.api.utility.log.Log;
import java.util.UUID;
@ -19,18 +20,18 @@ import java.util.UUID;
* @author Rsl1122
* @since 4.0.0
*/
public class FirstLeaveProcessor extends PlayerProcessor {
public class FirstLeaveProcessor implements CriticalRunnable {
private final UUID uuid;
private final Action leaveAction;
public FirstLeaveProcessor(UUID uuid, long time, int messagesSent) {
super(uuid);
this.uuid = uuid;
leaveAction = new Action(time, Actions.FIRST_LOGOUT, "Messages sent: " + messagesSent);
}
@Override
public void process() {
UUID uuid = getUUID();
public void run() {
try {
Database.getActive().save().action(uuid, leaveAction);
} catch (DBException e) {

View File

@ -8,6 +8,7 @@ import com.djrapitops.plan.api.exceptions.database.DBException;
import com.djrapitops.plan.data.container.GeoInfo;
import com.djrapitops.plan.system.cache.GeolocationCache;
import com.djrapitops.plan.system.database.databases.Database;
import com.djrapitops.plan.system.processing.CriticalRunnable;
import com.djrapitops.plugin.api.utility.log.Log;
import java.util.UUID;
@ -17,20 +18,20 @@ import java.util.UUID;
*
* @author Rsl1122
*/
public class IPUpdateProcessor extends PlayerProcessor {
public class IPUpdateProcessor implements CriticalRunnable {
private final UUID uuid;
private final String ip;
private final long time;
public IPUpdateProcessor(UUID uuid, String ip, long time) {
super(uuid);
this.uuid = uuid;
this.ip = ip;
this.time = time;
}
@Override
public void process() {
UUID uuid = getUUID();
public void run() {
String country = GeolocationCache.getCountry(ip);
try {
Database.getActive().save().geoInfo(uuid, new GeoInfo(ip, country, time));

View File

@ -6,6 +6,7 @@ package com.djrapitops.plan.system.processing.processors.player;
import com.djrapitops.plan.api.exceptions.database.DBException;
import com.djrapitops.plan.system.database.databases.Database;
import com.djrapitops.plan.system.processing.CriticalRunnable;
import com.djrapitops.plugin.api.utility.log.Log;
import java.util.UUID;
@ -15,14 +16,16 @@ import java.util.UUID;
*
* @author Rsl1122
*/
public class KickProcessor extends PlayerProcessor {
public class KickProcessor implements CriticalRunnable {
private final UUID uuid;
public KickProcessor(UUID uuid) {
super(uuid);
this.uuid = uuid;
}
@Override
public void process() {
UUID uuid = getUUID();
public void run() {
try {
Database.getActive().save().playerWasKicked(uuid);
} catch (DBException e) {

View File

@ -3,6 +3,7 @@ package com.djrapitops.plan.system.processing.processors.player;
import com.djrapitops.plan.data.container.PlayerKill;
import com.djrapitops.plan.data.container.Session;
import com.djrapitops.plan.system.cache.SessionCache;
import com.djrapitops.plan.system.processing.CriticalRunnable;
import org.bukkit.entity.LivingEntity;
import org.bukkit.entity.Player;
@ -18,8 +19,9 @@ import java.util.UUID;
* @author Rsl1122
* @since 4.0.0
*/
public class KillProcessor extends PlayerProcessor {
public class KillProcessor implements CriticalRunnable {
private final UUID uuid;
private final LivingEntity dead;
private final String weaponName;
private final long time;
@ -33,16 +35,14 @@ public class KillProcessor extends PlayerProcessor {
* @param weaponName Weapon used.
*/
public KillProcessor(UUID uuid, long time, LivingEntity dead, String weaponName) {
super(uuid);
this.uuid = uuid;
this.time = time;
this.dead = dead;
this.weaponName = weaponName;
}
@Override
public void process() {
UUID uuid = getUUID();
public void run() {
Optional<Session> cachedSession = SessionCache.getCachedSession(uuid);
if (!cachedSession.isPresent()) {
return;

View File

@ -10,7 +10,8 @@ import com.djrapitops.plan.data.container.Action;
import com.djrapitops.plan.system.cache.DataCache;
import com.djrapitops.plan.system.database.databases.Database;
import com.djrapitops.plan.system.info.server.ServerInfo;
import com.djrapitops.plan.system.processing.Processor;
import com.djrapitops.plan.system.processing.CriticalRunnable;
import com.djrapitops.plan.system.processing.Processing;
import com.djrapitops.plan.utilities.MiscUtils;
import com.djrapitops.plan.utilities.html.HtmlUtils;
import com.djrapitops.plugin.api.utility.log.Log;
@ -24,20 +25,20 @@ import java.util.UUID;
* @author Rsl1122
* @since 4.0.0
*/
public class NameProcessor extends PlayerProcessor {
public class NameProcessor implements CriticalRunnable {
private final UUID uuid;
private final String playerName;
private final String displayName;
public NameProcessor(UUID uuid, String playerName, String displayName) {
super(uuid);
this.uuid = uuid;
this.playerName = playerName;
this.displayName = displayName;
}
@Override
public void process() {
UUID uuid = getUUID();
public void run() {
DataCache dataCache = DataCache.getInstance();
String cachedName = dataCache.getName(uuid);
String cachedDisplayName = dataCache.getDisplayName(uuid);
@ -68,7 +69,7 @@ public class NameProcessor extends PlayerProcessor {
long time = MiscUtils.getTime();
Processor.queue(() -> {
Processing.submitCritical(() -> {
String info = HtmlUtils.removeXSS(displayName);
Action action = new Action(time, Actions.NEW_NICKNAME, info);

View File

@ -1,30 +0,0 @@
/*
* Licence is provided in the jar as license.yml also here:
* https://github.com/Rsl1122/Plan-PlayerAnalytics/blob/master/Plan/src/main/resources/license.yml
*/
package com.djrapitops.plan.system.processing.processors.player;
import com.djrapitops.plan.system.processing.processors.ObjectProcessor;
import java.util.UUID;
/**
* Abstract Processor that takes UUID as a parameter.
* <p>
* Created to allow extending processors to use Generics.
*
* @author Rsl1122
*/
public abstract class PlayerProcessor extends ObjectProcessor<UUID> {
public PlayerProcessor(UUID uuid) {
super(uuid);
}
protected UUID getUUID() {
return object;
}
@Override
public abstract void process();
}

View File

@ -9,8 +9,8 @@ import com.djrapitops.plan.data.Actions;
import com.djrapitops.plan.data.container.Action;
import com.djrapitops.plan.system.cache.SessionCache;
import com.djrapitops.plan.system.database.databases.Database;
import com.djrapitops.plan.system.processing.Processor;
import com.djrapitops.plan.system.processing.processors.ObjectProcessor;
import com.djrapitops.plan.system.processing.CriticalRunnable;
import com.djrapitops.plan.system.processing.Processing;
import com.djrapitops.plugin.api.utility.log.Log;
import com.djrapitops.plugin.utilities.Verify;
@ -21,16 +21,17 @@ import java.util.UUID;
*
* @author Rsl1122
*/
public class RegisterProcessor extends PlayerProcessor {
public class RegisterProcessor implements CriticalRunnable {
private final UUID uuid;
private final long registered;
private final long time;
private final int playersOnline;
private final String name;
private final ObjectProcessor[] afterProcess;
private final Runnable[] afterProcess;
public RegisterProcessor(UUID uuid, long registered, long time, String name, int playersOnline, ObjectProcessor... afterProcess) {
super(uuid);
public RegisterProcessor(UUID uuid, long registered, long time, String name, int playersOnline, Runnable... afterProcess) {
this.uuid = uuid;
this.registered = registered;
this.time = time;
this.playersOnline = playersOnline;
@ -39,8 +40,7 @@ public class RegisterProcessor extends PlayerProcessor {
}
@Override
public void process() {
UUID uuid = getUUID();
public void run() {
Database db = Database.getActive();
Verify.nullCheck(uuid, () -> new IllegalStateException("UUID was null"));
try {
@ -58,7 +58,9 @@ public class RegisterProcessor extends PlayerProcessor {
} catch (DBException e) {
Log.toLog(this.getClass(), e);
} finally {
Processor.queueMany(afterProcess);
for (Runnable runnable : afterProcess) {
Processing.submit(runnable);
}
}
}
}

View File

@ -4,7 +4,7 @@
*/
package com.djrapitops.plan.system.settings;
import com.djrapitops.plan.system.processing.Processor;
import com.djrapitops.plan.system.processing.Processing;
import com.djrapitops.plan.system.settings.config.ConfigSystem;
import com.djrapitops.plugin.api.config.Config;
import com.djrapitops.plugin.api.config.ConfigNode;
@ -55,7 +55,7 @@ public class WorldAliasSettings {
String previousValue = aliasSect.getConfigNode(world).getValue();
if (Verify.isEmpty(previousValue)) {
aliasSect.set(world, world);
Processor.queue(() -> {
Processing.submitNonCritical(() -> {
try {
aliasSect.save();
} catch (IOException e) {

View File

@ -8,7 +8,7 @@ import com.djrapitops.plan.api.exceptions.connection.UnsupportedTransferDatabase
import com.djrapitops.plan.api.exceptions.database.DBException;
import com.djrapitops.plan.system.database.databases.Database;
import com.djrapitops.plan.system.info.server.ServerInfo;
import com.djrapitops.plan.system.processing.Processor;
import com.djrapitops.plan.system.processing.Processing;
import com.djrapitops.plan.system.settings.ServerSpecificSettings;
import com.djrapitops.plan.system.settings.Settings;
import com.djrapitops.plan.utilities.Base64Util;
@ -40,7 +40,7 @@ public class NetworkSettings {
return;
}
Processor.queue(() -> {
Processing.submitNonCritical(() -> {
try {
new NetworkSettings().loadFromDatabase();
} catch (DBException | UnsupportedTransferDatabaseException e) {
@ -54,7 +54,7 @@ public class NetworkSettings {
return;
}
Processor.queue(() -> {
Processing.submitCritical(() -> {
try {
new NetworkSettings().placeToDatabase();
} catch (DBException | UnsupportedTransferDatabaseException e) {

View File

@ -2,7 +2,7 @@ package com.djrapitops.plan.system.tasks;
import com.djrapitops.plan.PlanPlugin;
import com.djrapitops.plan.data.container.TPS;
import com.djrapitops.plan.system.processing.Processor;
import com.djrapitops.plan.system.processing.Processing;
import com.djrapitops.plan.system.processing.processors.TPSInsertProcessor;
import com.djrapitops.plan.utilities.MiscUtils;
import com.djrapitops.plugin.api.utility.log.Log;
@ -38,7 +38,7 @@ public abstract class TPSCountTimer<T extends PlanPlugin> extends AbsRunnable {
addNewTPSEntry(nanoTime, now);
if (history.size() >= 60) {
Processor.queue(new TPSInsertProcessor(new ArrayList<>(history)));
Processing.submit(new TPSInsertProcessor(new ArrayList<>(history)));
history.clear();
}
} catch (Exception | NoClassDefFoundError | NoSuchMethodError | NoSuchFieldError e) {

View File

@ -4,7 +4,7 @@ import com.djrapitops.plan.api.exceptions.connection.ConnectionFailException;
import com.djrapitops.plan.api.exceptions.connection.NoServersException;
import com.djrapitops.plan.api.exceptions.connection.WebException;
import com.djrapitops.plan.system.info.InfoSystem;
import com.djrapitops.plan.system.processing.Processor;
import com.djrapitops.plan.system.processing.Processing;
import com.djrapitops.plan.system.webserver.pages.parsing.AnalysisPage;
import com.djrapitops.plan.system.webserver.response.Response;
import com.djrapitops.plan.system.webserver.response.cache.PageId;
@ -21,7 +21,7 @@ import java.util.UUID;
public class AnalysisPageResponse extends Response {
public static AnalysisPageResponse refreshNow(UUID serverUUID) {
Processor.queue(() -> {
Processing.submitNonCritical(() -> {
try {
InfoSystem.getInstance().generateAnalysisPage(serverUUID);
} catch (NoServersException | ConnectionFailException e) {

View File

@ -12,6 +12,7 @@ import com.djrapitops.plan.system.cache.DataCache;
import com.djrapitops.plan.system.cache.SessionCache;
import com.djrapitops.plan.system.database.databases.Database;
import com.djrapitops.plan.system.info.server.ServerInfo;
import com.djrapitops.plan.system.processing.Processing;
import com.djrapitops.plan.system.settings.Settings;
import com.djrapitops.plan.system.settings.locale.Locale;
import com.djrapitops.plan.system.settings.locale.Msg;
@ -23,12 +24,14 @@ import com.djrapitops.plugin.api.Benchmark;
import com.djrapitops.plugin.api.utility.log.Log;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
/**
* @author Rsl1122
*/
public class Analysis {
public class Analysis implements Callable<AnalysisData> {
private static Long refreshDate;
private final UUID serverUUID;
@ -36,6 +39,9 @@ public class Analysis {
private static ServerProfile serverProfile;
private final DataCache dataCache;
private static Future<AnalysisData> future;
private boolean analysingThisServer;
private Analysis(UUID serverUUID, Database database, DataCache dataCache) {
@ -50,7 +56,8 @@ public class Analysis {
}
public static AnalysisData runAnalysisFor(UUID serverUUID, Database database, DataCache dataCache) throws Exception {
return new Analysis(serverUUID, database, dataCache).runAnalysis();
future = future != null ? future : Processing.submit(new Analysis(serverUUID, database, dataCache));
return future.get();
}
/**
@ -62,6 +69,11 @@ public class Analysis {
return serverProfile;
}
@Override
public AnalysisData call() throws Exception {
return runAnalysis();
}
private AnalysisData runAnalysis() throws Exception {
((BukkitTaskSystem) TaskSystem.getInstance()).cancelBootAnalysis();

View File

@ -26,7 +26,7 @@ public class SessionCacheTest {
@BeforeClass
public static void setUpClass() throws Exception {
SystemMockUtil.setUp(temporaryFolder.getRoot())
.enableProcessingQueue();
.enableProcessing();
}
@Before

View File

@ -863,7 +863,7 @@ public class SQLiteTest {
processors.add(new RegisterProcessor(playerUUID, 500L, 1000L, "name", 4));
}
for (RegisterProcessor processor : processors) {
processor.process();
processor.run();
}
System.out.println("- RegisterProcessors Run -\n");
assertTrue(db.getUsersTable().isRegistered(playerUUID));

View File

@ -64,10 +64,16 @@ public class HTTPSWebServerAuthTest {
@After
public void tearDown() {
Teardown.resetSettingsTempValues();
}
@AfterClass
public static void tearDownClass() {
if (bukkitSystem != null) {
bukkitSystem.disable();
}
Teardown.resetSettingsTempValues();
bukkitSystem.disable();
}
private static final TrustManager[] trustAllCerts = new TrustManager[]{

View File

@ -51,8 +51,8 @@ public class SystemMockUtil {
return this;
}
public SystemMockUtil enableProcessingQueue() {
bukkitSystem.getProcessingQueue().enable();
public SystemMockUtil enableProcessing() throws EnableException {
bukkitSystem.getProcessing().enable();
return this;
}