From 06be8ea6f4fd85da9125eb641c0568528c4f4f70 Mon Sep 17 00:00:00 2001 From: Gabriele C Date: Fri, 26 Jun 2020 22:23:50 +0200 Subject: [PATCH] Port thread safety/performance optimizations from #1992 --- .../executable/authme/AccountsCommand.java | 54 +++++++++---------- .../executable/authme/ConverterCommand.java | 15 +++--- .../executable/authme/PurgePlayerCommand.java | 2 +- .../authme/initialization/OnStartupTasks.java | 14 +++-- .../xephi/authme/listener/PlayerListener.java | 9 +++- .../process/login/ProcessSyncPlayerLogin.java | 6 +-- .../xephi/authme/service/AntiBotService.java | 52 ++++++++---------- .../xephi/authme/task/purge/PurgeService.java | 2 +- .../authme/util/AtomicIntervalCounter.java | 53 ++++++++++++++++++ .../authme/PurgePlayerCommandTest.java | 7 +-- .../authme/listener/PlayerListenerTest.java | 19 ++++--- .../authme/service/AntiBotServiceTest.java | 2 + .../authme/task/purge/PurgeServiceTest.java | 2 +- 13 files changed, 146 insertions(+), 91 deletions(-) create mode 100644 src/main/java/fr/xephi/authme/util/AtomicIntervalCounter.java diff --git a/src/main/java/fr/xephi/authme/command/executable/authme/AccountsCommand.java b/src/main/java/fr/xephi/authme/command/executable/authme/AccountsCommand.java index 9f24592f3..2a87802eb 100644 --- a/src/main/java/fr/xephi/authme/command/executable/authme/AccountsCommand.java +++ b/src/main/java/fr/xephi/authme/command/executable/authme/AccountsCommand.java @@ -32,40 +32,34 @@ public class AccountsCommand implements ExecutableCommand { // Assumption: a player name cannot contain '.' if (playerName.contains(".")) { - bukkitService.runTaskAsynchronously(new Runnable() { - @Override - public void run() { - List accountList = dataSource.getAllAuthsByIp(playerName); - if (accountList.isEmpty()) { - sender.sendMessage("[AuthMe] This IP does not exist in the database."); - } else if (accountList.size() == 1) { - sender.sendMessage("[AuthMe] " + playerName + " is a single account player"); - } else { - outputAccountsList(sender, playerName, accountList); - } + bukkitService.runTaskAsynchronously(() -> { + List accountList = dataSource.getAllAuthsByIp(playerName); + if (accountList.isEmpty()) { + sender.sendMessage("[AuthMe] This IP does not exist in the database."); + } else if (accountList.size() == 1) { + sender.sendMessage("[AuthMe] " + playerName + " is a single account player"); + } else { + outputAccountsList(sender, playerName, accountList); } }); } else { - bukkitService.runTaskAsynchronously(new Runnable() { - @Override - public void run() { - PlayerAuth auth = dataSource.getAuth(playerName.toLowerCase()); - if (auth == null) { - commonService.send(sender, MessageKey.UNKNOWN_USER); - return; - } else if (auth.getLastIp() == null) { - sender.sendMessage("No known last IP address for player"); - return; - } + bukkitService.runTaskAsynchronously(() -> { + PlayerAuth auth = dataSource.getAuth(playerName.toLowerCase()); + if (auth == null) { + commonService.send(sender, MessageKey.UNKNOWN_USER); + return; + } else if (auth.getLastIp() == null) { + sender.sendMessage("No known last IP address for player"); + return; + } - List accountList = dataSource.getAllAuthsByIp(auth.getLastIp()); - if (accountList.isEmpty()) { - commonService.send(sender, MessageKey.UNKNOWN_USER); - } else if (accountList.size() == 1) { - sender.sendMessage("[AuthMe] " + playerName + " is a single account player"); - } else { - outputAccountsList(sender, playerName, accountList); - } + List accountList = dataSource.getAllAuthsByIp(auth.getLastIp()); + if (accountList.isEmpty()) { + commonService.send(sender, MessageKey.UNKNOWN_USER); + } else if (accountList.size() == 1) { + sender.sendMessage("[AuthMe] " + playerName + " is a single account player"); + } else { + outputAccountsList(sender, playerName, accountList); } }); } diff --git a/src/main/java/fr/xephi/authme/command/executable/authme/ConverterCommand.java b/src/main/java/fr/xephi/authme/command/executable/authme/ConverterCommand.java index b95eda17d..fd0a73513 100644 --- a/src/main/java/fr/xephi/authme/command/executable/authme/ConverterCommand.java +++ b/src/main/java/fr/xephi/authme/command/executable/authme/ConverterCommand.java @@ -55,15 +55,12 @@ public class ConverterCommand implements ExecutableCommand { final Converter converter = converterFactory.newInstance(converterClass); // Run the convert job - bukkitService.runTaskAsynchronously(new Runnable() { - @Override - public void run() { - try { - converter.execute(sender); - } catch (Exception e) { - commonService.send(sender, MessageKey.ERROR); - logger.logException("Error during conversion:", e); - } + bukkitService.runTaskAsynchronously(() -> { + try { + converter.execute(sender); + } catch (Exception e) { + commonService.send(sender, MessageKey.ERROR); + logger.logException("Error during conversion:", e); } }); diff --git a/src/main/java/fr/xephi/authme/command/executable/authme/PurgePlayerCommand.java b/src/main/java/fr/xephi/authme/command/executable/authme/PurgePlayerCommand.java index 0905458a8..ded4dd9d7 100644 --- a/src/main/java/fr/xephi/authme/command/executable/authme/PurgePlayerCommand.java +++ b/src/main/java/fr/xephi/authme/command/executable/authme/PurgePlayerCommand.java @@ -29,7 +29,7 @@ public class PurgePlayerCommand implements ExecutableCommand { @Override public void executeCommand(CommandSender sender, List arguments) { String option = arguments.size() > 1 ? arguments.get(1) : null; - bukkitService.runTaskOptionallyAsync( + bukkitService.runTaskAsynchronously( () -> executeCommand(sender, arguments.get(0), option)); } diff --git a/src/main/java/fr/xephi/authme/initialization/OnStartupTasks.java b/src/main/java/fr/xephi/authme/initialization/OnStartupTasks.java index 59c7ccaa2..93cdd41bd 100644 --- a/src/main/java/fr/xephi/authme/initialization/OnStartupTasks.java +++ b/src/main/java/fr/xephi/authme/initialization/OnStartupTasks.java @@ -19,6 +19,7 @@ import org.bukkit.Bukkit; import org.bukkit.entity.Player; import javax.inject.Inject; +import java.util.List; import java.util.logging.Logger; import static fr.xephi.authme.service.BukkitService.TICKS_PER_MINUTE; @@ -94,12 +95,15 @@ public class OnStartupTasks { return; } bukkitService.runTaskTimerAsynchronously(() -> { - for (String playerWithoutMail : dataSource.getLoggedPlayersWithEmptyMail()) { - Player player = bukkitService.getPlayerExact(playerWithoutMail); - if (player != null) { - messages.send(player, MessageKey.ADD_EMAIL_MESSAGE); + List loggedPlayersWithEmptyMail = dataSource.getLoggedPlayersWithEmptyMail(); + bukkitService.runTask(() -> { + for (String playerWithoutMail : loggedPlayersWithEmptyMail) { + Player player = bukkitService.getPlayerExact(playerWithoutMail); + if (player != null) { + messages.send(player, MessageKey.ADD_EMAIL_MESSAGE); + } } - } + }); }, 1, TICKS_PER_MINUTE * settings.getProperty(EmailSettings.DELAY_RECALL)); } } diff --git a/src/main/java/fr/xephi/authme/listener/PlayerListener.java b/src/main/java/fr/xephi/authme/listener/PlayerListener.java index 61008b1e4..33d875edd 100644 --- a/src/main/java/fr/xephi/authme/listener/PlayerListener.java +++ b/src/main/java/fr/xephi/authme/listener/PlayerListener.java @@ -112,7 +112,6 @@ public class PlayerListener implements Listener { // Non-blocking checks try { - onJoinVerifier.checkSingleSession(name); onJoinVerifier.checkIsValidName(name); } catch (FailedVerificationException e) { event.setKickMessage(messages.retrieveSingle(name, e.getReason(), e.getArgs())); @@ -161,6 +160,14 @@ public class PlayerListener implements Listener { final Player player = event.getPlayer(); final String name = player.getName(); + try { + onJoinVerifier.checkSingleSession(name); + } catch (FailedVerificationException e) { + event.setKickMessage(messages.retrieveSingle(name, e.getReason(), e.getArgs())); + event.setResult(PlayerLoginEvent.Result.KICK_OTHER); + return; + } + if (validationService.isUnrestricted(name)) { return; } diff --git a/src/main/java/fr/xephi/authme/process/login/ProcessSyncPlayerLogin.java b/src/main/java/fr/xephi/authme/process/login/ProcessSyncPlayerLogin.java index eb2aad817..8501e85cb 100644 --- a/src/main/java/fr/xephi/authme/process/login/ProcessSyncPlayerLogin.java +++ b/src/main/java/fr/xephi/authme/process/login/ProcessSyncPlayerLogin.java @@ -1,9 +1,9 @@ package fr.xephi.authme.process.login; import fr.xephi.authme.data.auth.PlayerAuth; +import fr.xephi.authme.data.auth.PlayerCache; import fr.xephi.authme.data.limbo.LimboPlayer; import fr.xephi.authme.data.limbo.LimboService; -import fr.xephi.authme.datasource.DataSource; import fr.xephi.authme.events.LoginEvent; import fr.xephi.authme.events.RestoreInventoryEvent; import fr.xephi.authme.permission.PermissionsManager; @@ -40,7 +40,7 @@ public class ProcessSyncPlayerLogin implements SynchronousProcess { private TeleportationService teleportationService; @Inject - private DataSource dataSource; + private PlayerCache playerCache; @Inject private CommandManager commandManager; @@ -88,7 +88,7 @@ public class ProcessSyncPlayerLogin implements SynchronousProcess { restoreInventory(player); } - final PlayerAuth auth = dataSource.getAuth(name); + final PlayerAuth auth = playerCache.getAuth(name); teleportationService.teleportOnLogin(player, auth, limbo); // We can now display the join message (if delayed) diff --git a/src/main/java/fr/xephi/authme/service/AntiBotService.java b/src/main/java/fr/xephi/authme/service/AntiBotService.java index b65400992..47d8b9007 100644 --- a/src/main/java/fr/xephi/authme/service/AntiBotService.java +++ b/src/main/java/fr/xephi/authme/service/AntiBotService.java @@ -7,6 +7,7 @@ import fr.xephi.authme.permission.AdminPermission; import fr.xephi.authme.permission.PermissionsManager; import fr.xephi.authme.settings.Settings; import fr.xephi.authme.settings.properties.ProtectionSettings; +import fr.xephi.authme.util.AtomicIntervalCounter; import org.bukkit.scheduler.BukkitTask; import javax.inject.Inject; @@ -29,14 +30,11 @@ public class AntiBotService implements SettingsDependent { private final CopyOnWriteArrayList antibotKicked = new CopyOnWriteArrayList<>(); // Settings private int duration; - private int sensibility; - private int interval; // Service status private AntiBotStatus antiBotStatus; private boolean startup; private BukkitTask disableTask; - private Instant lastFlaggedJoin; - private int flagged = 0; + private AtomicIntervalCounter flaggedCounter; @Inject AntiBotService(Settings settings, Messages messages, PermissionsManager permissionsManager, @@ -47,7 +45,6 @@ public class AntiBotService implements SettingsDependent { this.bukkitService = bukkitService; // Initial status disableTask = null; - flagged = 0; antiBotStatus = AntiBotStatus.DISABLED; startup = true; // Load settings and start if required @@ -58,8 +55,9 @@ public class AntiBotService implements SettingsDependent { public void reload(Settings settings) { // Load settings duration = settings.getProperty(ProtectionSettings.ANTIBOT_DURATION); - sensibility = settings.getProperty(ProtectionSettings.ANTIBOT_SENSIBILITY); - interval = settings.getProperty(ProtectionSettings.ANTIBOT_INTERVAL); + int sensibility = settings.getProperty(ProtectionSettings.ANTIBOT_SENSIBILITY); + int interval = settings.getProperty(ProtectionSettings.ANTIBOT_INTERVAL); + flaggedCounter = new AtomicIntervalCounter(sensibility, interval); // Stop existing protection stopProtection(); @@ -83,19 +81,25 @@ public class AntiBotService implements SettingsDependent { } } + /** + * Transitions the anti bot service to an active status. + */ private void startProtection() { - // Disable existing antibot session - stopProtection(); - // Enable the new session - antiBotStatus = AntiBotStatus.ACTIVE; - - // Inform admins - bukkitService.getOnlinePlayers().stream() - .filter(player -> permissionsManager.hasPermission(player, AdminPermission.ANTIBOT_MESSAGES)) - .forEach(player -> messages.send(player, MessageKey.ANTIBOT_AUTO_ENABLED_MESSAGE)); - + if (antiBotStatus == AntiBotStatus.ACTIVE) { + return; // Already activating/active + } + if (disableTask != null) { + disableTask.cancel(); + } // Schedule auto-disable disableTask = bukkitService.runTaskLater(this::stopProtection, duration * TICKS_PER_MINUTE); + antiBotStatus = AntiBotStatus.ACTIVE; + bukkitService.scheduleSyncTaskFromOptionallyAsyncTask(() -> { + // Inform admins + bukkitService.getOnlinePlayers().stream() + .filter(player -> permissionsManager.hasPermission(player, AdminPermission.ANTIBOT_MESSAGES)) + .forEach(player -> messages.send(player, MessageKey.ANTIBOT_AUTO_ENABLED_MESSAGE)); + }); } /** @@ -108,7 +112,7 @@ public class AntiBotService implements SettingsDependent { // Change status antiBotStatus = AntiBotStatus.LISTENING; - flagged = 0; + flaggedCounter.reset(); antibotKicked.clear(); // Cancel auto-disable task @@ -158,17 +162,7 @@ public class AntiBotService implements SettingsDependent { return true; } - if (lastFlaggedJoin == null) { - lastFlaggedJoin = Instant.now(); - } - if (ChronoUnit.SECONDS.between(lastFlaggedJoin, Instant.now()) <= interval) { - flagged++; - } else { - // reset to 1 because this player is also count as not registered - flagged = 1; - lastFlaggedJoin = null; - } - if (flagged > sensibility) { + if (flaggedCounter.handle()) { startProtection(); return true; } diff --git a/src/main/java/fr/xephi/authme/task/purge/PurgeService.java b/src/main/java/fr/xephi/authme/task/purge/PurgeService.java index 489262883..880d51185 100644 --- a/src/main/java/fr/xephi/authme/task/purge/PurgeService.java +++ b/src/main/java/fr/xephi/authme/task/purge/PurgeService.java @@ -99,7 +99,7 @@ public class PurgeService { isPurging = true; PurgeTask purgeTask = new PurgeTask(this, permissionsManager, sender, names, players); - bukkitService.runTaskTimer(purgeTask, 0, 1); + bukkitService.runTaskTimerAsynchronously(purgeTask, 0, 1); } /** diff --git a/src/main/java/fr/xephi/authme/util/AtomicIntervalCounter.java b/src/main/java/fr/xephi/authme/util/AtomicIntervalCounter.java new file mode 100644 index 000000000..2a50e660a --- /dev/null +++ b/src/main/java/fr/xephi/authme/util/AtomicIntervalCounter.java @@ -0,0 +1,53 @@ +package fr.xephi.authme.util; + +/** + * A thread-safe interval counter, allows to detect if an event happens more than 'threshold' times + * in the given 'interval'. + */ +public class AtomicIntervalCounter { + private final int threshold; + private final int interval; + private int count; + private long lastInsert; + + /** + * Constructs a new counter. + * + * @param threshold the threshold value of the counter. + * @param interval the counter interval in milliseconds. + */ + public AtomicIntervalCounter(int threshold, int interval) { + this.threshold = threshold; + this.interval = interval; + reset(); + } + + /** + * Resets the counter count. + */ + public synchronized void reset() { + count = 0; + lastInsert = 0; + } + + /** + * Increments the counter and returns true if the current count has reached the threshold value + * in the given interval, this will also reset the count value. + * + * @return true if the count has reached the threshold value. + */ + public synchronized boolean handle() { + long now = System.currentTimeMillis(); + if (now - lastInsert > interval) { + count = 1; + } else { + count++; + } + if (count > threshold) { + reset(); + return true; + } + lastInsert = now; + return false; + } +} diff --git a/src/test/java/fr/xephi/authme/command/executable/authme/PurgePlayerCommandTest.java b/src/test/java/fr/xephi/authme/command/executable/authme/PurgePlayerCommandTest.java index 370ba1294..846c4cabb 100644 --- a/src/test/java/fr/xephi/authme/command/executable/authme/PurgePlayerCommandTest.java +++ b/src/test/java/fr/xephi/authme/command/executable/authme/PurgePlayerCommandTest.java @@ -11,6 +11,7 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import static fr.xephi.authme.service.BukkitServiceTestHelper.setBukkitServiceToRunTaskAsynchronously; import static fr.xephi.authme.service.BukkitServiceTestHelper.setBukkitServiceToRunTaskOptionallyAsync; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; @@ -45,7 +46,7 @@ public class PurgePlayerCommandTest { String name = "Bobby"; given(dataSource.isAuthAvailable(name)).willReturn(true); CommandSender sender = mock(CommandSender.class); - setBukkitServiceToRunTaskOptionallyAsync(bukkitService); + setBukkitServiceToRunTaskAsynchronously(bukkitService); // when command.executeCommand(sender, singletonList(name)); @@ -63,7 +64,7 @@ public class PurgePlayerCommandTest { OfflinePlayer player = mock(OfflinePlayer.class); given(bukkitService.getOfflinePlayer(name)).willReturn(player); CommandSender sender = mock(CommandSender.class); - setBukkitServiceToRunTaskOptionallyAsync(bukkitService); + setBukkitServiceToRunTaskAsynchronously(bukkitService); // when command.executeCommand(sender, singletonList(name)); @@ -80,7 +81,7 @@ public class PurgePlayerCommandTest { OfflinePlayer player = mock(OfflinePlayer.class); given(bukkitService.getOfflinePlayer(name)).willReturn(player); CommandSender sender = mock(CommandSender.class); - setBukkitServiceToRunTaskOptionallyAsync(bukkitService); + setBukkitServiceToRunTaskAsynchronously(bukkitService); // when command.executeCommand(sender, asList(name, "force")); diff --git a/src/test/java/fr/xephi/authme/listener/PlayerListenerTest.java b/src/test/java/fr/xephi/authme/listener/PlayerListenerTest.java index 1a1640753..c2459a652 100644 --- a/src/test/java/fr/xephi/authme/listener/PlayerListenerTest.java +++ b/src/test/java/fr/xephi/authme/listener/PlayerListenerTest.java @@ -646,7 +646,7 @@ public class PlayerListenerTest { } @Test - public void shouldNotInterfereWithUnrestrictedUser() { + public void shouldNotInterfereWithUnrestrictedUser() throws FailedVerificationException { // given String name = "Player01"; Player player = mockPlayerWithName(name); @@ -658,12 +658,13 @@ public class PlayerListenerTest { // then verify(validationService).isUnrestricted(name); + verify(onJoinVerifier).checkSingleSession(name); verifyNoModifyingCalls(event); - verifyNoInteractions(onJoinVerifier); + verifyNoMoreInteractions(onJoinVerifier); } @Test - public void shouldStopHandlingForFullServer() { + public void shouldStopHandlingForFullServer() throws FailedVerificationException { // given String name = "someone"; Player player = mockPlayerWithName(name); @@ -676,12 +677,14 @@ public class PlayerListenerTest { // then verify(validationService).isUnrestricted(name); - verify(onJoinVerifier, only()).refusePlayerForFullServer(event); + verify(onJoinVerifier).checkSingleSession(name); + verify(onJoinVerifier).refusePlayerForFullServer(event); + verifyNoMoreInteractions(onJoinVerifier); verifyNoModifyingCalls(event); } @Test - public void shouldStopHandlingEventForBadResult() { + public void shouldStopHandlingEventForBadResult() throws FailedVerificationException { // given String name = "someone"; Player player = mockPlayerWithName(name); @@ -696,7 +699,8 @@ public class PlayerListenerTest { // then verify(validationService).isUnrestricted(name); - verify(onJoinVerifier, only()).refusePlayerForFullServer(event); + verify(onJoinVerifier).checkSingleSession(name); + verify(onJoinVerifier).refusePlayerForFullServer(event); verifyNoModifyingCalls(event); } @@ -715,14 +719,13 @@ public class PlayerListenerTest { // then verify(validationService).isUnrestricted(name); - verify(onJoinVerifier).checkSingleSession(name); verify(onJoinVerifier).checkIsValidName(name); verifyNoInteractions(dataSource); verifyNoModifyingCalls(preLoginEvent); } @Test - public void shouldKickPreLoginLowestUnresolvedHostname() throws FailedVerificationException { + public void shouldKickPreLoginLowestUnresolvedHostname() { // given String name = "someone"; UUID uniqueId = UUID.fromString("753493c9-33ba-4a4a-bf61-1bce9d3c9a71"); diff --git a/src/test/java/fr/xephi/authme/service/AntiBotServiceTest.java b/src/test/java/fr/xephi/authme/service/AntiBotServiceTest.java index 8a41671a3..643792547 100644 --- a/src/test/java/fr/xephi/authme/service/AntiBotServiceTest.java +++ b/src/test/java/fr/xephi/authme/service/AntiBotServiceTest.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.List; import static fr.xephi.authme.service.BukkitServiceTestHelper.setBukkitServiceToScheduleSyncDelayedTaskWithDelay; +import static fr.xephi.authme.service.BukkitServiceTestHelper.setBukkitServiceToScheduleSyncTaskFromOptionallyAsyncTask; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -160,6 +161,7 @@ public class AntiBotServiceTest { given(bukkitService.getOnlinePlayers()).willReturn(players); given(permissionsManager.hasPermission(players.get(0), AdminPermission.ANTIBOT_MESSAGES)).willReturn(false); given(permissionsManager.hasPermission(players.get(1), AdminPermission.ANTIBOT_MESSAGES)).willReturn(true); + setBukkitServiceToScheduleSyncTaskFromOptionallyAsyncTask(bukkitService); // when antiBotService.overrideAntiBotStatus(true); diff --git a/src/test/java/fr/xephi/authme/task/purge/PurgeServiceTest.java b/src/test/java/fr/xephi/authme/task/purge/PurgeServiceTest.java index 8b0864e59..8823f1aa1 100644 --- a/src/test/java/fr/xephi/authme/task/purge/PurgeServiceTest.java +++ b/src/test/java/fr/xephi/authme/task/purge/PurgeServiceTest.java @@ -189,7 +189,7 @@ public class PurgeServiceTest { private void verifyScheduledPurgeTask(UUID senderUuid, Set names) { ArgumentCaptor captor = ArgumentCaptor.forClass(PurgeTask.class); - verify(bukkitService).runTaskTimer(captor.capture(), eq(0L), eq(1L)); + verify(bukkitService).runTaskTimerAsynchronously(captor.capture(), eq(0L), eq(1L)); PurgeTask task = captor.getValue(); Object senderInTask = ReflectionTestUtils.getFieldValue(PurgeTask.class, task, "sender");