Add Nats messenger (#3525)

This commit is contained in:
Kacper Krzychała 2022-11-26 19:06:15 +01:00 committed by GitHub
parent 5ff9a12a72
commit 0193c06f16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 297 additions and 6 deletions

View File

@ -34,6 +34,7 @@ shadowJar {
relocate 'com.mongodb', 'me.lucko.luckperms.lib.mongodb'
relocate 'org.bson', 'me.lucko.luckperms.lib.bson'
relocate 'redis.clients.jedis', 'me.lucko.luckperms.lib.jedis'
relocate 'io.nats.client', 'me.lucko.luckperms.lib.nats'
relocate 'com.rabbitmq', 'me.lucko.luckperms.lib.rabbitmq'
relocate 'org.apache.commons.pool2', 'me.lucko.luckperms.lib.commonspool2'
relocate 'ninja.leaping.configurate', 'me.lucko.luckperms.lib.configurate'

View File

@ -44,6 +44,7 @@ shadowJar {
relocate 'com.mongodb', 'me.lucko.luckperms.lib.mongodb'
relocate 'org.bson', 'me.lucko.luckperms.lib.bson'
relocate 'redis.clients.jedis', 'me.lucko.luckperms.lib.jedis'
relocate 'io.nats.client', 'me.lucko.luckperms.lib.nats'
relocate 'com.rabbitmq', 'me.lucko.luckperms.lib.rabbitmq'
relocate 'org.apache.commons.pool2', 'me.lucko.luckperms.lib.commonspool2'
relocate 'ninja.leaping.configurate', 'me.lucko.luckperms.lib.configurate'

View File

@ -241,6 +241,8 @@ watch-files: true
# below.
# => rabbitmq Uses RabbitMQ pub-sub to push changes. Your server connection info must be
# configured below.
# => nats Uses Nats pub-sub to push changes. Your server connection info must be
# configured below.
# => custom Uses a messaging service provided using the LuckPerms API.
# => auto Attempts to automatically setup a messaging service using redis or sql.
messaging-service: auto
@ -266,6 +268,14 @@ redis:
username: ''
password: ''
# Settings for Nats.
# Port 4222 is used by default; set address to "host:port" if differs
nats:
enabled: false
address: localhost
username: ''
password: ''
# Settings for RabbitMQ.
# Port 5672 is used by default; set address to "host:port" if differs
rabbitmq:

View File

@ -36,6 +36,7 @@ shadowJar {
relocate 'com.mongodb', 'me.lucko.luckperms.lib.mongodb'
relocate 'org.bson', 'me.lucko.luckperms.lib.bson'
relocate 'redis.clients.jedis', 'me.lucko.luckperms.lib.jedis'
relocate 'io.nats.client', 'me.lucko.luckperms.lib.nats'
relocate 'com.rabbitmq', 'me.lucko.luckperms.lib.rabbitmq'
relocate 'org.apache.commons.pool2', 'me.lucko.luckperms.lib.commonspool2'
relocate 'ninja.leaping.configurate', 'me.lucko.luckperms.lib.configurate'

View File

@ -239,6 +239,8 @@ watch-files: true
# the RedisBungee plugin installed.
# => rabbitmq Uses RabbitMQ pub-sub to push changes. Your server connection info must be
# configured below.
# => nats Uses Nats pub-sub to push changes. Your server connection info must be
# configured below.
# => custom Uses a messaging service provided using the LuckPerms API.
# => auto Attempts to automatically setup a messaging service using redis or sql.
messaging-service: auto
@ -264,6 +266,14 @@ redis:
username: ''
password: ''
# Settings for Nats.
# Port 4222 is used by default; set address to "host:port" if differs
nats:
enabled: false
address: localhost
username: ''
password: ''
# Settings for RabbitMQ.
# Port 5672 is used by default; set address to "host:port" if differs
rabbitmq:

View File

@ -72,6 +72,7 @@ dependencies {
}
api 'com.zaxxer:HikariCP:4.0.3'
api 'redis.clients:jedis:3.5.2'
api 'io.nats:jnats:2.16.4'
api 'com.rabbitmq:amqp-client:5.12.0'
api 'org.mongodb:mongodb-driver-legacy:4.5.0'
api 'org.yaml:snakeyaml:1.28'

View File

@ -651,6 +651,31 @@ public final class ConfigKeys {
*/
public static final ConfigKey<Boolean> REDIS_SSL = notReloadable(booleanKey("redis.ssl", false));
/**
* If nats messaging is enabled
*/
public static final ConfigKey<Boolean> NATS_ENABLED = notReloadable(booleanKey("nats.enabled", false));
/**
* The address of the nats server
*/
public static final ConfigKey<String> NATS_ADDRESS = notReloadable(stringKey("nats.address", null));
/**
* The username to connect with, or an empty string if it should use default
*/
public static final ConfigKey<String> NATS_USERNAME = notReloadable(stringKey("nats.username", ""));
/**
* The password in use by the nats server, or an empty string if there is no password
*/
public static final ConfigKey<String> NATS_PASSWORD = notReloadable(stringKey("nats.password", ""));
/**
* If the nats connection should use SSL
*/
public static final ConfigKey<Boolean> NATS_SSL = notReloadable(booleanKey("nats.ssl", false));
/**
* If rabbitmq messaging is enabled
*/

View File

@ -245,6 +245,13 @@ public enum Dependency {
Relocation.of("jedis", "redis{}clients{}jedis"),
Relocation.of("commonspool2", "org{}apache{}commons{}pool2")
),
NATS(
"io.nats",
"jnats",
"2.16.4",
"/WZgFi9iJToRGIiGoShlwE5aHwstOuNGZyr4UaBRilM=",
Relocation.of("nats", "io{}nats{}client")
),
RABBITMQ(
"com{}rabbitmq",
"amqp-client",

View File

@ -128,8 +128,8 @@ public class DependencyManager {
}
}
public void loadStorageDependencies(Set<StorageType> storageTypes, boolean redis, boolean rabbitmq) {
loadDependencies(this.registry.resolveStorageDependencies(storageTypes, redis, rabbitmq));
public void loadStorageDependencies(Set<StorageType> storageTypes, boolean redis, boolean rabbitmq, boolean nats) {
loadDependencies(this.registry.resolveStorageDependencies(storageTypes, redis, rabbitmq, nats));
}
public void loadDependencies(Set<Dependency> dependencies) {

View File

@ -72,7 +72,7 @@ public class DependencyRegistry {
this.platformType = platformType;
}
public Set<Dependency> resolveStorageDependencies(Set<StorageType> storageTypes, boolean redis, boolean rabbitmq) {
public Set<Dependency> resolveStorageDependencies(Set<StorageType> storageTypes, boolean redis, boolean rabbitmq, boolean nats) {
Set<Dependency> dependencies = new LinkedHashSet<>();
for (StorageType storageType : storageTypes) {
dependencies.addAll(STORAGE_DEPENDENCIES.get(storageType));
@ -85,6 +85,10 @@ public class DependencyRegistry {
dependencies.add(Dependency.SLF4J_SIMPLE);
}
if (nats) {
dependencies.add(Dependency.NATS);
}
if (rabbitmq) {
dependencies.add(Dependency.RABBITMQ);
}

View File

@ -27,6 +27,7 @@ package me.lucko.luckperms.common.messaging;
import me.lucko.luckperms.common.config.ConfigKeys;
import me.lucko.luckperms.common.config.LuckPermsConfiguration;
import me.lucko.luckperms.common.messaging.nats.NatsMessenger;
import me.lucko.luckperms.common.messaging.rabbitmq.RabbitMQMessenger;
import me.lucko.luckperms.common.messaging.redis.RedisMessenger;
import me.lucko.luckperms.common.messaging.sql.SqlMessenger;
@ -67,6 +68,8 @@ public class MessagingFactory<P extends LuckPermsPlugin> {
messagingType = "redis";
} else if (this.plugin.getConfiguration().get(ConfigKeys.RABBITMQ_ENABLED)) {
messagingType = "rabbitmq";
} else if (this.plugin.getConfiguration().get(ConfigKeys.NATS_ENABLED)) {
messagingType = "nats";
} else {
for (StorageImplementation implementation : this.plugin.getStorage().getImplementations()) {
if (implementation instanceof SqlStorage) {
@ -84,13 +87,16 @@ public class MessagingFactory<P extends LuckPermsPlugin> {
return null;
}
this.plugin.getLogger().info("Loading messaging service... [" + messagingType.toUpperCase(Locale.ROOT) + "]");
if (messagingType.equals("custom")) {
this.plugin.getLogger().info("Messaging service is set to custom. No service is initialized at this stage yet.");
return null;
}
this.plugin.getLogger().info("Loading messaging service... [" + messagingType.toUpperCase(Locale.ROOT) + "]");
InternalMessagingService service = getServiceFor(messagingType);
if (service != null) {
return service;
}
this.plugin.getLogger().warn("Messaging service '" + messagingType + "' not recognised.");
return null;
}
@ -106,6 +112,16 @@ public class MessagingFactory<P extends LuckPermsPlugin> {
} else {
this.plugin.getLogger().warn("Messaging Service was set to redis, but redis is not enabled!");
}
} else if (messagingType.equals("nats")) {
if (this.plugin.getConfiguration().get(ConfigKeys.NATS_ENABLED)) {
try {
return new LuckPermsMessagingService(this.plugin, new NatsMesengerProvider());
} catch (Exception e) {
getPlugin().getLogger().severe("Exception occurred whilst enabling Nats messaging service", e);
}
} else {
this.plugin.getLogger().warn("Messaging Service was set to nats, but nats is not enabled!");
}
} else if (messagingType.equals("rabbitmq")) {
if (this.plugin.getConfiguration().get(ConfigKeys.RABBITMQ_ENABLED)) {
try {
@ -127,6 +143,34 @@ public class MessagingFactory<P extends LuckPermsPlugin> {
return null;
}
private class NatsMesengerProvider implements MessengerProvider {
@Override
public @NonNull String getName() {
return "Nats";
}
@Override
public @NonNull Messenger obtain(@NonNull IncomingMessageConsumer incomingMessageConsumer) {
NatsMessenger natsMessenger = new NatsMessenger(getPlugin(), incomingMessageConsumer);
LuckPermsConfiguration configuration = getPlugin().getConfiguration();
String address = configuration.get(ConfigKeys.NATS_ADDRESS);
String username = configuration.get(ConfigKeys.NATS_USERNAME);
String password = configuration.get(ConfigKeys.NATS_PASSWORD);
if (password.isEmpty()) {
password = null;
}
if (username.isEmpty()) {
username = null;
}
boolean ssl = configuration.get(ConfigKeys.NATS_SSL);
natsMessenger.init(address, username, password, ssl);
return natsMessenger;
}
}
private class RedisMessengerProvider implements MessengerProvider {
@Override

View File

@ -0,0 +1,128 @@
/*
* This file is part of LuckPerms, licensed under the MIT License.
*
* Copyright (c) lucko (Luck) <luck@lucko.me>
* Copyright (c) contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package me.lucko.luckperms.common.messaging.nats;
import com.google.common.io.ByteArrayDataInput;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import me.lucko.luckperms.common.util.Throwing;
import net.luckperms.api.messenger.IncomingMessageConsumer;
import net.luckperms.api.messenger.Messenger;
import net.luckperms.api.messenger.message.OutgoingMessage;
import org.checkerframework.checker.nullness.qual.NonNull;
import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.Options.Builder;
import java.time.Duration;
/**
* An implementation of Messenger for Nats messaging client.
*/
public class NatsMessenger implements Messenger {
private static final String CHANNEL = "luckperms:update";
private final LuckPermsPlugin plugin;
private final IncomingMessageConsumer consumer;
private Connection connection;
private Dispatcher messageDispatcher;
public NatsMessenger(LuckPermsPlugin plugin, IncomingMessageConsumer consumer) {
this.plugin = plugin;
this.consumer = consumer;
}
@Override
public void sendOutgoingMessage(@NonNull OutgoingMessage outgoingMessage) {
ByteArrayDataOutput output = ByteStreams.newDataOutput();
output.writeUTF(outgoingMessage.asEncodedString());
this.connection.publish(CHANNEL, output.toByteArray());
}
public void init(String address, String username, String password, boolean ssl) {
String[] addressSplit = address.split(":");
String host = addressSplit[0];
int port = addressSplit.length > 1 ? Integer.parseInt(addressSplit[1]) : Options.DEFAULT_PORT;
this.connection = createConnection(builder -> {
builder.server("nats://" + host + ":" + port)
.reconnectWait(Duration.ofSeconds(5))
.maxReconnects(Integer.MAX_VALUE)
.connectionName("LuckPerms");
if (username != null && password != null) {
builder.userInfo(username, password);
}
if (ssl) {
builder.secure();
}
});
this.messageDispatcher = this.connection.createDispatcher(new Handler()).subscribe(CHANNEL);
}
private Connection createConnection(Throwing.Consumer<Builder> config) {
try {
Builder builder = new Builder();
config.accept(builder);
return Nats.connect(builder.build());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void close() {
try {
this.connection.closeDispatcher(this.messageDispatcher);
this.connection.close();
} catch (InterruptedException e) {
this.plugin.getLogger().warn("An error occurred during closing messenger.", e);
}
}
private class Handler implements MessageHandler {
@Override
public void onMessage(Message message) {
byte[] data = message.getData();
ByteArrayDataInput input = ByteStreams.newDataInput(data);
String messageAsString = input.readUTF();
NatsMessenger.this.consumer.consumeIncomingMessageAsString(messageAsString);
}
}
}

View File

@ -162,7 +162,8 @@ public abstract class AbstractLuckPermsPlugin implements LuckPermsPlugin {
this.dependencyManager.loadStorageDependencies(
storageFactory.getRequiredTypes(),
getConfiguration().get(ConfigKeys.REDIS_ENABLED),
getConfiguration().get(ConfigKeys.RABBITMQ_ENABLED)
getConfiguration().get(ConfigKeys.RABBITMQ_ENABLED),
getConfiguration().get(ConfigKeys.NATS_ENABLED)
);
// register listeners

View File

@ -66,6 +66,7 @@ shadowJar {
relocate 'com.mongodb', 'me.lucko.luckperms.lib.mongodb'
relocate 'org.bson', 'me.lucko.luckperms.lib.bson'
relocate 'redis.clients.jedis', 'me.lucko.luckperms.lib.jedis'
relocate 'io.nats.client', 'me.lucko.luckperms.lib.nats'
relocate 'com.rabbitmq', 'me.lucko.luckperms.lib.rabbitmq'
relocate 'org.apache.commons.pool2', 'me.lucko.luckperms.lib.commonspool2'
relocate 'ninja.leaping.configurate', 'me.lucko.luckperms.lib.configurate'

View File

@ -244,6 +244,8 @@ watch-files = true
# below.
# => rabbitmq Uses RabbitMQ pub-sub to push changes. Your server connection info must be
# configured below.
# => nats Uses Nats pub-sub to push changes. Your server connection info must be
# configured below.
# => custom Uses a messaging service provided using the LuckPerms API.
# => auto Attempts to automatically setup a messaging service using redis or sql.
messaging-service = "auto"
@ -270,6 +272,15 @@ redis {
password = ""
}
# Settings for nats.
# Port 4222 is used by default; set address to "host:port" if differs
nats {
enabled = false
address = "localhost"
username = ""
password = ""
}
# Settings for RabbitMQ.
# Port 5672 is used by default; set address to "host:port" if differs
rabbitmq {

View File

@ -65,6 +65,7 @@ shadowJar {
relocate 'com.mongodb', 'me.lucko.luckperms.lib.mongodb'
relocate 'org.bson', 'me.lucko.luckperms.lib.bson'
relocate 'redis.clients.jedis', 'me.lucko.luckperms.lib.jedis'
relocate 'io.nats.client', 'me.lucko.luckperms.lib.nats'
relocate 'com.rabbitmq', 'me.lucko.luckperms.lib.rabbitmq'
relocate 'org.apache.commons.pool2', 'me.lucko.luckperms.lib.commonspool2'
relocate 'ninja.leaping.configurate', 'me.lucko.luckperms.lib.configurate'

View File

@ -35,6 +35,7 @@ shadowJar {
relocate 'com.mongodb', 'me.lucko.luckperms.lib.mongodb'
relocate 'org.bson', 'me.lucko.luckperms.lib.bson'
relocate 'redis.clients.jedis', 'me.lucko.luckperms.lib.jedis'
relocate 'io.nats.client', 'me.lucko.luckperms.lib.nats'
relocate 'com.rabbitmq', 'me.lucko.luckperms.lib.rabbitmq'
relocate 'org.apache.commons.pool2', 'me.lucko.luckperms.lib.commonspool2'
relocate 'ninja.leaping.configurate', 'me.lucko.luckperms.lib.configurate'

View File

@ -236,6 +236,8 @@ watch-files: true
# below.
# => rabbitmq Uses RabbitMQ pub-sub to push changes. Your server connection info must be
# configured below.
# => nats Uses Nats pub-sub to push changes. Your server connection info must be
# configured below.
# => custom Uses a messaging service provided using the LuckPerms API.
# => auto Attempts to automatically setup a messaging service using redis or sql.
messaging-service: auto
@ -261,6 +263,14 @@ redis:
username: ''
password: ''
# Settings for Nats.
# Port 4222 is used by default; set address to "host:port" if differs
nats:
enabled: false
address: localhost
username: ''
password: ''
# Settings for RabbitMQ.
# Port 5672 is used by default; set address to "host:port" if differs
rabbitmq:

View File

@ -47,6 +47,7 @@ shadowJar {
relocate 'com.mongodb', 'me.lucko.luckperms.lib.mongodb'
relocate 'org.bson', 'me.lucko.luckperms.lib.bson'
relocate 'redis.clients.jedis', 'me.lucko.luckperms.lib.jedis'
relocate 'io.nats.client', 'me.lucko.luckperms.lib.nats'
relocate 'com.rabbitmq', 'me.lucko.luckperms.lib.rabbitmq'
relocate 'org.apache.commons.pool2', 'me.lucko.luckperms.lib.commonspool2'
relocate 'ninja.leaping.configurate', 'me.lucko.luckperms.lib.configurate'

View File

@ -244,6 +244,8 @@ watch-files = true
# below.
# => rabbitmq Uses RabbitMQ pub-sub to push changes. Your server connection info must be
# configured below.
# => nats Uses Nats pub-sub to push changes. Your server connection info must be
# configured below.
# => custom Uses a messaging service provided using the LuckPerms API.
# => auto Attempts to automatically setup a messaging service using redis or sql.
messaging-service = "auto"
@ -270,6 +272,15 @@ redis {
password = ""
}
# Settings for nats.
# Port 4222 is used by default; set address to "host:port" if differs
nats {
enabled = false
address = "localhost"
username = ""
password = ""
}
# Settings for RabbitMQ.
# Port 5672 is used by default; set address to "host:port" if differs
rabbitmq {

View File

@ -30,6 +30,7 @@ shadowJar {
relocate 'com.mongodb', 'me.lucko.luckperms.lib.mongodb'
relocate 'org.bson', 'me.lucko.luckperms.lib.bson'
relocate 'redis.clients.jedis', 'me.lucko.luckperms.lib.jedis'
relocate 'io.nats.client', 'me.lucko.luckperms.lib.nats'
relocate 'com.rabbitmq', 'me.lucko.luckperms.lib.rabbitmq'
relocate 'org.apache.commons.pool2', 'me.lucko.luckperms.lib.commonspool2'
relocate 'ninja.leaping.configurate', 'me.lucko.luckperms.lib.configurate'

View File

@ -226,6 +226,8 @@ watch-files: true
# below.
# => rabbitmq Uses RabbitMQ pub-sub to push changes. Your server connection info must be
# configured below.
# => nats Uses Nats pub-sub to push changes. Your server connection info must be
# configured below.
# => custom Uses a messaging service provided using the LuckPerms API.
# => auto Attempts to automatically setup a messaging service using redis or sql.
messaging-service: auto
@ -251,6 +253,14 @@ redis:
username: ''
password: ''
# Settings for Nats.
# Port 4222 is used by default; set address to "host:port" if differs
nats:
enabled: false
address: localhost
username: ''
password: ''
# Settings for RabbitMQ.
# Port 5672 is used by default; set address to "host:port" if differs
rabbitmq:

View File

@ -41,6 +41,7 @@ shadowJar {
relocate 'com.mongodb', 'me.lucko.luckperms.lib.mongodb'
relocate 'org.bson', 'me.lucko.luckperms.lib.bson'
relocate 'redis.clients.jedis', 'me.lucko.luckperms.lib.jedis'
relocate 'io.nats.client', 'me.lucko.luckperms.lib.nats'
relocate 'com.rabbitmq', 'me.lucko.luckperms.lib.rabbitmq'
relocate 'org.apache.commons.pool2', 'me.lucko.luckperms.lib.commonspool2'
relocate 'ninja.leaping.configurate', 'me.lucko.luckperms.lib.configurate'

View File

@ -230,6 +230,8 @@ watch-files: true
# configured below.
# => rabbitmq Uses RabbitMQ pub-sub to push changes. Your server connection info must be
# configured below.
# => nats Uses Nats pub-sub to push changes. Your server connection info must be
# configured below.
# => custom Uses a messaging service provided using the LuckPerms API.
# => auto Attempts to automatically setup a messaging service using redis or sql.
messaging-service: auto
@ -255,6 +257,14 @@ redis:
username: ''
password: ''
# Settings for Nats.
# Port 4222 is used by default; set address to "host:port" if differs
nats:
enabled: false
address: localhost
username: ''
password: ''
# Settings for RabbitMQ.
# Port 5672 is used by default; set address to "host:port" if differs
rabbitmq: