From 66776766e68e6515303d336b9483ed71f057e254 Mon Sep 17 00:00:00 2001 From: Gabriele C Date: Sun, 22 Nov 2020 15:15:31 +0100 Subject: [PATCH 1/3] Port #2005 --- src/main/java/fr/xephi/authme/AuthMe.java | 3 + .../authme/RegisterAdminCommand.java | 6 +- .../executable/authme/SetEmailCommand.java | 56 +++++------ .../authme/process/AsyncUserScheduler.java | 92 +++++++++++++++++++ .../fr/xephi/authme/process/Management.java | 37 ++++---- .../authme/process/join/AsynchronousJoin.java | 6 +- .../xephi/authme/service/BukkitService.java | 9 ++ 7 files changed, 163 insertions(+), 46 deletions(-) create mode 100644 src/main/java/fr/xephi/authme/process/AsyncUserScheduler.java diff --git a/src/main/java/fr/xephi/authme/AuthMe.java b/src/main/java/fr/xephi/authme/AuthMe.java index 045d1ee974..f8590fed6c 100644 --- a/src/main/java/fr/xephi/authme/AuthMe.java +++ b/src/main/java/fr/xephi/authme/AuthMe.java @@ -21,6 +21,7 @@ import fr.xephi.authme.listener.ServerListener; import fr.xephi.authme.output.ConsoleLoggerFactory; import fr.xephi.authme.security.crypts.Sha256; +import fr.xephi.authme.process.AsyncUserScheduler; import fr.xephi.authme.service.BackupService; import fr.xephi.authme.service.BukkitService; import fr.xephi.authme.service.MigrationService; @@ -66,6 +67,7 @@ public class AuthMe extends JavaPlugin { private Settings settings; private DataSource database; private BukkitService bukkitService; + private AsyncUserScheduler asyncUserScheduler; private Injector injector; private BackupService backupService; private ConsoleLogger logger; @@ -244,6 +246,7 @@ private void initialize() { void instantiateServices(Injector injector) { database = injector.getSingleton(DataSource.class); bukkitService = injector.getSingleton(BukkitService.class); + asyncUserScheduler = injector.getSingleton(AsyncUserScheduler.class); commandHandler = injector.getSingleton(CommandHandler.class); backupService = injector.getSingleton(BackupService.class); diff --git a/src/main/java/fr/xephi/authme/command/executable/authme/RegisterAdminCommand.java b/src/main/java/fr/xephi/authme/command/executable/authme/RegisterAdminCommand.java index ada52966af..7fe125717c 100644 --- a/src/main/java/fr/xephi/authme/command/executable/authme/RegisterAdminCommand.java +++ b/src/main/java/fr/xephi/authme/command/executable/authme/RegisterAdminCommand.java @@ -6,6 +6,7 @@ import fr.xephi.authme.datasource.DataSource; import fr.xephi.authme.output.ConsoleLoggerFactory; import fr.xephi.authme.message.MessageKey; +import fr.xephi.authme.process.AsyncUserScheduler; import fr.xephi.authme.security.PasswordSecurity; import fr.xephi.authme.security.crypts.HashedPassword; import fr.xephi.authme.service.BukkitService; @@ -40,6 +41,9 @@ public class RegisterAdminCommand implements ExecutableCommand { @Inject private ValidationService validationService; + @Inject + private AsyncUserScheduler asyncUserScheduler; + @Override public void executeCommand(final CommandSender sender, List arguments) { // Get the player name and password @@ -54,7 +58,7 @@ public void executeCommand(final CommandSender sender, List arguments) { return; } - bukkitService.runTaskOptionallyAsync(() -> { + asyncUserScheduler.runTask(playerNameLowerCase, () -> { if (dataSource.isAuthAvailable(playerNameLowerCase)) { commonService.send(sender, MessageKey.NAME_ALREADY_REGISTERED); return; diff --git a/src/main/java/fr/xephi/authme/command/executable/authme/SetEmailCommand.java b/src/main/java/fr/xephi/authme/command/executable/authme/SetEmailCommand.java index 987338fa27..d6cb315917 100644 --- a/src/main/java/fr/xephi/authme/command/executable/authme/SetEmailCommand.java +++ b/src/main/java/fr/xephi/authme/command/executable/authme/SetEmailCommand.java @@ -5,6 +5,7 @@ import fr.xephi.authme.data.auth.PlayerCache; import fr.xephi.authme.datasource.DataSource; import fr.xephi.authme.message.MessageKey; +import fr.xephi.authme.process.AsyncUserScheduler; import fr.xephi.authme.service.BukkitService; import fr.xephi.authme.service.CommonService; import fr.xephi.authme.service.ValidationService; @@ -33,11 +34,14 @@ public class SetEmailCommand implements ExecutableCommand { @Inject private ValidationService validationService; + @Inject + private AsyncUserScheduler asyncUserScheduler; + @Override public void executeCommand(final CommandSender sender, List arguments) { // Get the player name and email address - final String playerName = arguments.get(0); - final String playerEmail = arguments.get(1); + String playerName = arguments.get(0); + String playerEmail = arguments.get(1); // Validate the email address if (!validationService.validateEmail(playerEmail)) { @@ -45,34 +49,32 @@ public void executeCommand(final CommandSender sender, List arguments) { return; } - bukkitService.runTaskOptionallyAsync(new Runnable() { - @Override - public void run() { - // Validate the user - PlayerAuth auth = dataSource.getAuth(playerName); - if (auth == null) { - commonService.send(sender, MessageKey.UNKNOWN_USER); - return; - } else if (!validationService.isEmailFreeForRegistration(playerEmail, sender)) { - commonService.send(sender, MessageKey.EMAIL_ALREADY_USED_ERROR); - return; - } - - // Set the email address - auth.setEmail(playerEmail); - if (!dataSource.updateEmail(auth)) { - commonService.send(sender, MessageKey.ERROR); - return; - } + String lowercasePlayerName = playerName.toLowerCase(); + asyncUserScheduler.runTask(lowercasePlayerName, (Runnable) () -> { + // Validate the user + PlayerAuth auth = dataSource.getAuth(playerName); + if (auth == null) { + commonService.send(sender, MessageKey.UNKNOWN_USER); + return; + } else if (!validationService.isEmailFreeForRegistration(playerEmail, sender)) { + commonService.send(sender, MessageKey.EMAIL_ALREADY_USED_ERROR); + return; + } - // Update the player cache - if (playerCache.getAuth(playerName) != null) { - playerCache.updatePlayer(auth); - } + // Set the email address + auth.setEmail(playerEmail); + if (!dataSource.updateEmail(auth)) { + commonService.send(sender, MessageKey.ERROR); + return; + } - // Show a status message - commonService.send(sender, MessageKey.EMAIL_CHANGED_SUCCESS); + // Update the player cache + if (playerCache.getAuth(playerName) != null) { + playerCache.updatePlayer(auth); } + + // Show a status message + commonService.send(sender, MessageKey.EMAIL_CHANGED_SUCCESS); }); } } diff --git a/src/main/java/fr/xephi/authme/process/AsyncUserScheduler.java b/src/main/java/fr/xephi/authme/process/AsyncUserScheduler.java new file mode 100644 index 0000000000..d245af8b6b --- /dev/null +++ b/src/main/java/fr/xephi/authme/process/AsyncUserScheduler.java @@ -0,0 +1,92 @@ +package fr.xephi.authme.process; + +import fr.xephi.authme.service.BukkitService; +import org.bukkit.entity.Player; +import org.bukkit.scheduler.BukkitTask; + +import javax.inject.Inject; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Function; + +/** + * Handles the queue of async tasks on a per-player basis. + */ +public class AsyncUserScheduler { + + @Inject + private BukkitService bukkitService; + + private KeyedSequentialScheduler asyncUserScheduler; + + AsyncUserScheduler() { + this.asyncUserScheduler = new KeyedSequentialScheduler<>(runnable -> + bukkitService.runTaskAsynchronously(runnable)); + } + + /** + * Adds a task to the player's async task queue. + * + * @param playerName the player name. + * @param runnable the task. + */ + public void runTask(String playerName, Runnable runnable) { + if (bukkitService.isUseAsyncTasks()) { + asyncUserScheduler.submit(playerName.toLowerCase(), runnable); + } else { + runnable.run(); + } + } + /** + * Adds a task to the player's async task queue. + * + * @param player the player. + * @param runnable the task. + */ + public void runTask(Player player, Runnable runnable) { + runTask(player.getName(), runnable); + } + + public class KeyedSequentialScheduler { + private Map executors; + private Function scheduler; + + public KeyedSequentialScheduler(Function scheduler) { + this.executors = new LinkedHashMap<>(); + this.scheduler = scheduler; + } + + public void submit(K key, Runnable runnable) { + executors.computeIfAbsent(key, k -> new SequentialExecutor(scheduler, () -> executors.remove(key))) + .submit(runnable); + } + } + + public class SequentialExecutor { + private Queue queue; + private Function scheduler; + private Runnable callback; + + private BukkitTask executor; + + public SequentialExecutor(Function scheduler, Runnable callback) { + this.queue = new LinkedBlockingQueue<>(); + this.scheduler = scheduler; + this.callback = callback; + } + + public void submit(Runnable task) { + queue.add(task); + if (executor == null || executor.isCancelled()) { + executor = scheduler.apply(() -> { + while (!queue.isEmpty()) { + queue.poll().run(); + } + callback.run(); + }); + } + } + } +} diff --git a/src/main/java/fr/xephi/authme/process/Management.java b/src/main/java/fr/xephi/authme/process/Management.java index 5260fed13b..f8b39df5c6 100644 --- a/src/main/java/fr/xephi/authme/process/Management.java +++ b/src/main/java/fr/xephi/authme/process/Management.java @@ -11,7 +11,6 @@ import fr.xephi.authme.process.register.executors.RegistrationMethod; import fr.xephi.authme.process.register.executors.RegistrationParameters; import fr.xephi.authme.process.unregister.AsynchronousUnregister; -import fr.xephi.authme.service.BukkitService; import org.bukkit.command.CommandSender; import org.bukkit.entity.Player; @@ -23,7 +22,7 @@ public class Management { @Inject - private BukkitService bukkitService; + private AsyncUserScheduler asyncUserScheduler; // Processes @Inject @@ -50,54 +49,58 @@ public class Management { public void performLogin(Player player, String password) { - runTask(() -> asynchronousLogin.login(player, password)); + runTask(player, () -> asynchronousLogin.login(player, password)); } public void forceLogin(Player player) { - runTask(() -> asynchronousLogin.forceLogin(player)); + runTask(player, () -> asynchronousLogin.forceLogin(player)); } public void performLogout(Player player) { - runTask(() -> asynchronousLogout.logout(player)); + runTask(player, () -> asynchronousLogout.logout(player)); } public

void performRegister(RegistrationMethod

variant, P parameters) { - runTask(() -> asyncRegister.register(variant, parameters)); + runTask(parameters.getPlayer(), () -> asyncRegister.register(variant, parameters)); } public void performUnregister(Player player, String password) { - runTask(() -> asynchronousUnregister.unregister(player, password)); + runTask(player, () -> asynchronousUnregister.unregister(player, password)); } public void performUnregisterByAdmin(CommandSender initiator, String name, Player player) { - runTask(() -> asynchronousUnregister.adminUnregister(initiator, name, player)); + runTask(player, () -> asynchronousUnregister.adminUnregister(initiator, name, player)); } public void performJoin(Player player) { - runTask(() -> asynchronousJoin.processJoin(player)); + runTask(player, () -> asynchronousJoin.processJoin(player)); } public void performQuit(Player player) { - runTask(() -> asynchronousQuit.processQuit(player)); + runTask(player, () -> asynchronousQuit.processQuit(player)); } public void performAddEmail(Player player, String newEmail) { - runTask(() -> asyncAddEmail.addEmail(player, newEmail)); + runTask(player, () -> asyncAddEmail.addEmail(player, newEmail)); } public void performChangeEmail(Player player, String oldEmail, String newEmail) { - runTask(() -> asyncChangeEmail.changeEmail(player, oldEmail, newEmail)); + runTask(player, () -> asyncChangeEmail.changeEmail(player, oldEmail, newEmail)); } public void performPasswordChange(Player player, String oldPassword, String newPassword) { - runTask(() -> asyncChangePassword.changePassword(player, oldPassword, newPassword)); + runTask(player, () -> asyncChangePassword.changePassword(player, oldPassword, newPassword)); } - public void performPasswordChangeAsAdmin(CommandSender sender, String playerName, String newPassword) { - runTask(() -> asyncChangePassword.changePasswordAsAdmin(sender, playerName, newPassword)); + public void performPasswordChangeAsAdmin(CommandSender sender, String name, String newPassword) { + runTask(name, () -> asyncChangePassword.changePasswordAsAdmin(sender, name, newPassword)); } - private void runTask(Runnable runnable) { - bukkitService.runTaskOptionallyAsync(runnable); + private void runTask(Player player, Runnable runnable) { + runTask(player.getName(), runnable); + } + + private void runTask(String playerName, Runnable runnable) { + asyncUserScheduler.runTask(playerName.toLowerCase(), runnable); } } diff --git a/src/main/java/fr/xephi/authme/process/join/AsynchronousJoin.java b/src/main/java/fr/xephi/authme/process/join/AsynchronousJoin.java index 3db023118a..95541220c8 100644 --- a/src/main/java/fr/xephi/authme/process/join/AsynchronousJoin.java +++ b/src/main/java/fr/xephi/authme/process/join/AsynchronousJoin.java @@ -7,6 +7,7 @@ import fr.xephi.authme.output.ConsoleLoggerFactory; import fr.xephi.authme.message.MessageKey; import fr.xephi.authme.permission.PlayerStatePermission; +import fr.xephi.authme.process.AsyncUserScheduler; import fr.xephi.authme.process.AsynchronousProcess; import fr.xephi.authme.process.login.AsynchronousLogin; import fr.xephi.authme.service.BukkitService; @@ -57,6 +58,9 @@ public class AsynchronousJoin implements AsynchronousProcess { @Inject private BukkitService bukkitService; + @Inject + private AsyncUserScheduler asyncUserScheduler; + @Inject private AsynchronousLogin asynchronousLogin; @@ -126,7 +130,7 @@ public void processJoin(final Player player) { // Run commands bukkitService.scheduleSyncTaskFromOptionallyAsyncTask( () -> commandManager.runCommandsOnSessionLogin(player)); - bukkitService.runTaskOptionallyAsync(() -> asynchronousLogin.forceLogin(player)); + asyncUserScheduler.runTask(name, () -> asynchronousLogin.forceLogin(player)); return; } } else if (!service.getProperty(RegistrationSettings.FORCE)) { diff --git a/src/main/java/fr/xephi/authme/service/BukkitService.java b/src/main/java/fr/xephi/authme/service/BukkitService.java index c5425b3750..c47d4c9059 100644 --- a/src/main/java/fr/xephi/authme/service/BukkitService.java +++ b/src/main/java/fr/xephi/authme/service/BukkitService.java @@ -349,4 +349,13 @@ public Optional isBungeeCordConfiguredForSpigot() { public String getIp() { return Bukkit.getServer().getIp(); } + + /** + * Returns if async tasks are enabled. + * + * @return true if async tasks are enabled. + */ + public boolean isUseAsyncTasks() { + return useAsyncTasks; + } } From 8a126858a771ed442f80c64bcbab55a8b95e538b Mon Sep 17 00:00:00 2001 From: Gabriele C Date: Mon, 23 Nov 2020 19:38:28 +0100 Subject: [PATCH 2/3] Fix /authme unregister --- src/main/java/fr/xephi/authme/process/Management.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/fr/xephi/authme/process/Management.java b/src/main/java/fr/xephi/authme/process/Management.java index f8b39df5c6..e926944277 100644 --- a/src/main/java/fr/xephi/authme/process/Management.java +++ b/src/main/java/fr/xephi/authme/process/Management.java @@ -69,7 +69,7 @@ public void performUnregister(Player player, String password) { } public void performUnregisterByAdmin(CommandSender initiator, String name, Player player) { - runTask(player, () -> asynchronousUnregister.adminUnregister(initiator, name, player)); + runTask(name, () -> asynchronousUnregister.adminUnregister(initiator, name, player)); } public void performJoin(Player player) { From d8c09ab6d1300f5990b7420c398a85270aed6770 Mon Sep 17 00:00:00 2001 From: Gabriele C Date: Wed, 25 Nov 2020 18:59:50 +0100 Subject: [PATCH 3/3] Switch to KeySequentialExecutor Use https://github.com/jano7/executor --- pom.xml | 12 ++++ .../authme/process/AsyncUserScheduler.java | 59 +++---------------- 2 files changed, 19 insertions(+), 52 deletions(-) diff --git a/pom.xml b/pom.xml index 0315c70f0b..958b53c3e7 100644 --- a/pom.xml +++ b/pom.xml @@ -308,6 +308,10 @@ ch.jalu fr.xephi.authme.libs.ch.jalu + + com.jano7.executor + fr.xephi.authme.libs.com.jano7.executor + com.zaxxer.hikari fr.xephi.authme.libs.com.zaxxer.hikari @@ -556,6 +560,14 @@ provided + + + com.jano7 + executor + 2.0.2 + true + + com.zaxxer diff --git a/src/main/java/fr/xephi/authme/process/AsyncUserScheduler.java b/src/main/java/fr/xephi/authme/process/AsyncUserScheduler.java index d245af8b6b..b11fbd9412 100644 --- a/src/main/java/fr/xephi/authme/process/AsyncUserScheduler.java +++ b/src/main/java/fr/xephi/authme/process/AsyncUserScheduler.java @@ -1,15 +1,10 @@ package fr.xephi.authme.process; +import com.jano7.executor.KeySequentialRunner; import fr.xephi.authme.service.BukkitService; import org.bukkit.entity.Player; -import org.bukkit.scheduler.BukkitTask; import javax.inject.Inject; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.function.Function; /** * Handles the queue of async tasks on a per-player basis. @@ -19,74 +14,34 @@ public class AsyncUserScheduler { @Inject private BukkitService bukkitService; - private KeyedSequentialScheduler asyncUserScheduler; + private KeySequentialRunner asyncUserScheduler; AsyncUserScheduler() { - this.asyncUserScheduler = new KeyedSequentialScheduler<>(runnable -> - bukkitService.runTaskAsynchronously(runnable)); + this.asyncUserScheduler = new KeySequentialRunner<>(command -> bukkitService.runTaskAsynchronously(command)); } /** * Adds a task to the player's async task queue. * * @param playerName the player name. - * @param runnable the task. + * @param runnable the task. */ public void runTask(String playerName, Runnable runnable) { if (bukkitService.isUseAsyncTasks()) { - asyncUserScheduler.submit(playerName.toLowerCase(), runnable); + asyncUserScheduler.run(playerName.toLowerCase(), runnable); } else { runnable.run(); } } + /** * Adds a task to the player's async task queue. * - * @param player the player. + * @param player the player. * @param runnable the task. */ public void runTask(Player player, Runnable runnable) { runTask(player.getName(), runnable); } - public class KeyedSequentialScheduler { - private Map executors; - private Function scheduler; - - public KeyedSequentialScheduler(Function scheduler) { - this.executors = new LinkedHashMap<>(); - this.scheduler = scheduler; - } - - public void submit(K key, Runnable runnable) { - executors.computeIfAbsent(key, k -> new SequentialExecutor(scheduler, () -> executors.remove(key))) - .submit(runnable); - } - } - - public class SequentialExecutor { - private Queue queue; - private Function scheduler; - private Runnable callback; - - private BukkitTask executor; - - public SequentialExecutor(Function scheduler, Runnable callback) { - this.queue = new LinkedBlockingQueue<>(); - this.scheduler = scheduler; - this.callback = callback; - } - - public void submit(Runnable task) { - queue.add(task); - if (executor == null || executor.isCancelled()) { - executor = scheduler.apply(() -> { - while (!queue.isEmpty()) { - queue.poll().run(); - } - callback.run(); - }); - } - } - } }