Implement Messaging Service using SQL (#534)

This commit is contained in:
Luck 2018-03-18 16:02:04 +00:00
parent 926195efc4
commit da797f154d
No known key found for this signature in database
GPG Key ID: EFA9B3EC5FD90F8B
8 changed files with 254 additions and 1 deletions

View File

@ -197,6 +197,9 @@ watch-files: true
# for LuckPerms to poll the database for changes.
#
# - Possible options:
# => sql Uses the SQL database to form a queue system for communication. Will only work when
# 'storage-method' is set to MySQL or MariaDB. This is chosen by default if the
# option is set to 'none' and SQL storage is in use. Set to 'notsql' to disable this.
# => bungee Uses the plugin messaging channels to communicate with the proxy.
# LuckPerms must be installed on your proxy & all connected servers backend servers.
# Won't work if you have more than one BungeeCord proxy.

View File

@ -194,6 +194,10 @@ watch-files: true
# for LuckPerms to poll the database for changes.
#
# - Possible options:
# => sql Uses the SQL database to form a queue system for communication. Will only work
# when 'storage-method' is set to MySQL or MariaDB. This is chosen by default if
# the option is set to 'none' and SQL storage is in use. Set to 'notsql' to
# disable this.
# => bungee Uses the plugin messaging channels to communicate with the proxy.
# LuckPerms must be installed on your proxy & all connected servers backend
# servers. Won't work if you have more than one BungeeCord proxy.

View File

@ -25,12 +25,18 @@
package me.lucko.luckperms.common.messaging;
import com.google.common.base.Preconditions;
import me.lucko.luckperms.api.messenger.IncomingMessageConsumer;
import me.lucko.luckperms.api.messenger.Messenger;
import me.lucko.luckperms.api.messenger.MessengerProvider;
import me.lucko.luckperms.common.config.ConfigKeys;
import me.lucko.luckperms.common.messaging.redis.RedisMessenger;
import me.lucko.luckperms.common.messaging.sql.SqlMessenger;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import me.lucko.luckperms.common.storage.dao.sql.SqlDao;
import me.lucko.luckperms.common.storage.dao.sql.connection.hikari.MariaDbConnectionFactory;
import me.lucko.luckperms.common.storage.dao.sql.connection.hikari.MySqlConnectionFactory;
import javax.annotation.Nonnull;
@ -51,7 +57,14 @@ public class MessagingFactory<P extends LuckPermsPlugin> {
messagingType = "redis";
}
if (messagingType.equals("none")) {
if (messagingType.equals("none") && this.plugin.getStorage().getDao() instanceof SqlDao) {
SqlDao dao = (SqlDao) this.plugin.getStorage().getDao();
if (dao.getProvider() instanceof MySqlConnectionFactory || dao.getProvider() instanceof MariaDbConnectionFactory) {
messagingType = "sql";
}
}
if (messagingType.equals("none") || messagingType.equals("notsql")) {
return null;
}
@ -77,6 +90,12 @@ public class MessagingFactory<P extends LuckPermsPlugin> {
} else {
this.plugin.getLogger().warn("Messaging Service was set to redis, but redis is not enabled!");
}
} else if (messagingType.equals("sql")) {
try {
return new LuckPermsMessagingService(this.plugin, new SqlMessengerProvider());
} catch (Exception e) {
e.printStackTrace();
}
}
return null;
@ -99,4 +118,24 @@ public class MessagingFactory<P extends LuckPermsPlugin> {
}
}
private class SqlMessengerProvider implements MessengerProvider {
@Nonnull
@Override
public String getName() {
return "Sql";
}
@Nonnull
@Override
public Messenger obtain(@Nonnull IncomingMessageConsumer incomingMessageConsumer) {
SqlDao dao = (SqlDao) getPlugin().getStorage().getDao();
Preconditions.checkState(dao.getProvider() instanceof MySqlConnectionFactory || dao.getProvider() instanceof MariaDbConnectionFactory, "not a supported sql type");
SqlMessenger sql = new SqlMessenger(getPlugin(), dao, incomingMessageConsumer);
sql.init();
return sql;
}
}
}

View File

@ -0,0 +1,112 @@
/*
* This file is part of LuckPerms, licensed under the MIT License.
*
* Copyright (c) lucko (Luck) <luck@lucko.me>
* Copyright (c) contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package me.lucko.luckperms.common.messaging.sql;
import me.lucko.luckperms.api.messenger.IncomingMessageConsumer;
import me.lucko.luckperms.api.messenger.Messenger;
import me.lucko.luckperms.api.messenger.message.OutgoingMessage;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import javax.annotation.Nonnull;
/**
* An implementation of {@link Messenger} using SQL.
*/
public abstract class AbstractSqlMessenger implements Messenger {
private final IncomingMessageConsumer consumer;
private long lastId = -1;
protected AbstractSqlMessenger(IncomingMessageConsumer consumer) {
this.consumer = consumer;
}
protected abstract Connection getConnection() throws SQLException;
public void init() throws SQLException {
try (Connection c = getConnection()) {
// init table
try (PreparedStatement ps = c.prepareStatement("CREATE TABLE IF NOT EXISTS `luckperms_messages` (`id` INT AUTO_INCREMENT NOT NULL, `time` TIMESTAMP NOT NULL, `msg` TEXT NOT NULL, PRIMARY KEY (`id`))")) {
ps.execute();
}
// pull last id
try (PreparedStatement ps = c.prepareStatement("SELECT MAX(`id`) as `latest` FROM `luckperms_messages`")) {
try (ResultSet rs = ps.executeQuery()) {
if (rs.next()) {
this.lastId = rs.getLong("latest");
}
}
}
}
}
@Override
public void sendOutgoingMessage(@Nonnull OutgoingMessage outgoingMessage) {
try (Connection c = getConnection()) {
try (PreparedStatement ps = c.prepareStatement("INSERT INTO luckperms_messages(`time`, `msg`) VALUES(NOW(), ?)")) {
ps.setString(1, outgoingMessage.asEncodedString());
ps.execute();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
public void pollMessages() {
try (Connection c = getConnection()) {
try (PreparedStatement ps = c.prepareStatement("SELECT `id`, `msg` FROM luckperms_messages WHERE `id` > ? AND (NOW() - `time` > 60)")) {
ps.setLong(1, this.lastId);
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
long id = rs.getLong("id");
this.lastId = Math.max(this.lastId, id);
String message = rs.getString("msg");
this.consumer.consumeIncomingMessageAsString(message);
}
}
}
} catch (SQLException e) {
e.printStackTrace();
}
}
public void runHousekeeping() {
try (Connection c = getConnection()) {
try (PreparedStatement ps = c.prepareStatement("DELETE FROM luckperms_messages WHERE (NOW() - `time` > 60)")) {
ps.execute();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,85 @@
/*
* This file is part of LuckPerms, licensed under the MIT License.
*
* Copyright (c) lucko (Luck) <luck@lucko.me>
* Copyright (c) contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package me.lucko.luckperms.common.messaging.sql;
import me.lucko.luckperms.api.messenger.IncomingMessageConsumer;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import me.lucko.luckperms.common.plugin.SchedulerAdapter;
import me.lucko.luckperms.common.plugin.SchedulerTask;
import me.lucko.luckperms.common.storage.dao.sql.SqlDao;
import java.sql.Connection;
import java.sql.SQLException;
public class SqlMessenger extends AbstractSqlMessenger {
private final LuckPermsPlugin plugin;
private final SqlDao sqlDao;
private SchedulerTask pollTask;
private SchedulerTask housekeepingTask;
public SqlMessenger(LuckPermsPlugin plugin, SqlDao sqlDao, IncomingMessageConsumer consumer) {
super(consumer);
this.plugin = plugin;
this.sqlDao = sqlDao;
}
@Override
public void init() {
try {
super.init();
} catch (SQLException e) {
throw new RuntimeException(e);
}
// schedule poll tasks
SchedulerAdapter scheduler = this.plugin.getBootstrap().getScheduler();
this.pollTask = scheduler.asyncRepeating(this::pollMessages, 20L);
this.housekeepingTask = scheduler.asyncRepeating(this::runHousekeeping, 20L * 30);
}
@Override
public void close() {
SchedulerTask task = this.pollTask;
if (task != null) {
task.cancel();
}
task = this.housekeepingTask;
if (task != null) {
task.cancel();
}
this.pollTask = null;
this.housekeepingTask = null;
super.close();
}
@Override
protected Connection getConnection() throws SQLException {
return this.sqlDao.getProvider().getConnection();
}
}

View File

@ -129,6 +129,10 @@ public class SqlDao extends AbstractDao {
return this.gson;
}
public AbstractConnectionFactory getProvider() {
return this.provider;
}
public Function<String, String> getPrefix() {
return this.prefix;
}

View File

@ -197,6 +197,9 @@ watch-files: true
# for LuckPerms to poll the database for changes.
#
# - Possible options:
# => sql Uses the SQL database to form a queue system for communication. Will only work when
# 'storage-method' is set to MySQL or MariaDB. This is chosen by default if the
# option is set to 'none' and SQL storage is in use. Set to 'notsql' to disable this.
# => redis Uses Redis pub-sub to push changes. Your server connection info must be configured
# below.
# => none Disables the service.

View File

@ -201,6 +201,9 @@ watch-files = true
# for LuckPerms to poll the database for changes.
#
# - Possible options:
# => sql Uses the SQL database to form a queue system for communication. Will only work when
# 'storage-method' is set to MySQL or MariaDB. This is chosen by default if the
# option is set to 'none' and SQL storage is in use. Set to 'notsql' to disable this.
# => bungee Uses the plugin messaging channels to communicate with the proxy.
# LuckPerms must be installed on your proxy & all connected servers backend servers.
# Won't work if you have more than one BungeeCord proxy.