Add support for redis cluster (#3670)

This commit is contained in:
Dominik 2023-06-24 00:30:22 +02:00 committed by GitHub
parent 9f1e74fa7c
commit 7a59536450
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 104 additions and 32 deletions

View File

@ -262,6 +262,7 @@ broadcast-received-log-entries: true
# Settings for Redis.
# Port 6379 is used by default; set address to "host:port" if differs
# Multiple Redis nodes can be specified in the same format as a string list under the name "addresses".
redis:
enabled: false
address: localhost

View File

@ -260,6 +260,7 @@ broadcast-received-log-entries: false
# Settings for Redis.
# Port 6379 is used by default; set address to "host:port" if differs
# Multiple Redis nodes can be specified in the same format as a string list under the name "addresses".
redis:
enabled: false
address: localhost

View File

@ -74,7 +74,7 @@ dependencies {
transitive = false
}
compileOnly 'com.zaxxer:HikariCP:4.0.3'
compileOnly 'redis.clients:jedis:3.5.2'
compileOnly 'redis.clients:jedis:4.4.3'
compileOnly 'io.nats:jnats:2.16.4'
compileOnly 'com.rabbitmq:amqp-client:5.12.0'
compileOnly 'org.mongodb:mongodb-driver-legacy:4.5.0'

View File

@ -73,6 +73,7 @@ import static me.lucko.luckperms.common.config.generic.key.ConfigKeyFactory.lowe
import static me.lucko.luckperms.common.config.generic.key.ConfigKeyFactory.mapKey;
import static me.lucko.luckperms.common.config.generic.key.ConfigKeyFactory.notReloadable;
import static me.lucko.luckperms.common.config.generic.key.ConfigKeyFactory.stringKey;
import static me.lucko.luckperms.common.config.generic.key.ConfigKeyFactory.stringListKey;
/**
* All of the {@link ConfigKey}s used by LuckPerms.
@ -639,6 +640,11 @@ public final class ConfigKeys {
*/
public static final ConfigKey<String> REDIS_ADDRESS = notReloadable(stringKey("redis.address", null));
/**
* The addresses of the redis servers (only for redis clusters)
*/
public static final ConfigKey<List<String>> REDIS_ADDRESSES = notReloadable(stringListKey("redis.addresses", ImmutableList.of()));
/**
* The username to connect with, or an empty string if it should use default
*/

View File

@ -28,6 +28,7 @@ package me.lucko.luckperms.common.config.generic.key;
import com.google.common.collect.ImmutableMap;
import me.lucko.luckperms.common.config.generic.adapter.ConfigurationAdapter;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
@ -36,6 +37,7 @@ public interface ConfigKeyFactory<T> {
ConfigKeyFactory<Boolean> BOOLEAN = ConfigurationAdapter::getBoolean;
ConfigKeyFactory<String> STRING = ConfigurationAdapter::getString;
ConfigKeyFactory<List<String>> STRING_LIST = ConfigurationAdapter::getStringList;
ConfigKeyFactory<String> LOWERCASE_STRING = (adapter, path, def) -> adapter.getString(path, def).toLowerCase(Locale.ROOT);
ConfigKeyFactory<Map<String, String>> STRING_MAP = (config, path, def) -> ImmutableMap.copyOf(config.getStringMap(path, ImmutableMap.of()));
@ -56,6 +58,10 @@ public interface ConfigKeyFactory<T> {
return key(new Bound<>(STRING, path, def));
}
static SimpleConfigKey<List<String>> stringListKey(String path, List<String> def) {
return key(new Bound<>(STRING_LIST, path, def));
}
static SimpleConfigKey<String> lowercaseStringKey(String path, String def) {
return key(new Bound<>(LOWERCASE_STRING, path, def));
}

View File

@ -239,8 +239,8 @@ public enum Dependency {
JEDIS(
"redis.clients",
"jedis",
"3.5.2",
"jX3340YaYjHFQN2sA+GCo33LB4FuIYKgQUPUv2MK/Xo=",
"4.4.3",
"wwwoCDPCywcfoNwpvwP95kXYusXSTtXhuVrB31sxE0k=",
Relocation.of("jedis", "redis{}clients{}jedis"),
Relocation.of("commonspool2", "org{}apache{}commons{}pool2")
),

View File

@ -43,6 +43,8 @@ import net.luckperms.api.messenger.Messenger;
import net.luckperms.api.messenger.MessengerProvider;
import org.checkerframework.checker.nullness.qual.NonNull;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
public class MessagingFactory<P extends LuckPermsPlugin> {
@ -194,6 +196,7 @@ public class MessagingFactory<P extends LuckPermsPlugin> {
LuckPermsConfiguration config = getPlugin().getConfiguration();
String address = config.get(ConfigKeys.REDIS_ADDRESS);
List<String> addresses = config.get(ConfigKeys.REDIS_ADDRESSES);
String username = config.get(ConfigKeys.REDIS_USERNAME);
String password = config.get(ConfigKeys.REDIS_PASSWORD);
if (password.isEmpty()) {
@ -204,7 +207,18 @@ public class MessagingFactory<P extends LuckPermsPlugin> {
}
boolean ssl = config.get(ConfigKeys.REDIS_SSL);
if (!addresses.isEmpty()) {
// redis cluster
addresses = new ArrayList<>(addresses);
if (address != null) {
addresses.add(address);
}
redis.init(addresses, username, password, ssl);
} else {
// redis pool
redis.init(address, username, password, ssl);
}
return redis;
}
}

View File

@ -30,11 +30,21 @@ import net.luckperms.api.messenger.IncomingMessageConsumer;
import net.luckperms.api.messenger.Messenger;
import net.luckperms.api.messenger.message.OutgoingMessage;
import org.checkerframework.checker.nullness.qual.NonNull;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.DefaultJedisClientConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisClientConfig;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPooled;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.UnifiedJedis;
import redis.clients.jedis.exceptions.JedisClusterOperationException;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
* An implementation of {@link Messenger} using Redis.
@ -45,7 +55,7 @@ public class RedisMessenger implements Messenger {
private final LuckPermsPlugin plugin;
private final IncomingMessageConsumer consumer;
private /* final */ JedisPool jedisPool;
private /* final */ UnifiedJedis jedis;
private /* final */ Subscription sub;
private boolean closing = false;
@ -54,57 +64,74 @@ public class RedisMessenger implements Messenger {
this.consumer = consumer;
}
public void init(List<String> addresses, String username, String password, boolean ssl) {
Set<HostAndPort> hosts = addresses.stream().map(RedisMessenger::parseAddress).collect(Collectors.toSet());
this.init(new JedisCluster(hosts, jedisConfig(username, password, ssl)));
}
public void init(String address, String username, String password, boolean ssl) {
this.init(new JedisPooled(parseAddress(address), jedisConfig(username, password, ssl)));
}
private void init(UnifiedJedis jedis) {
this.jedis = jedis;
this.sub = new Subscription(this);
this.plugin.getBootstrap().getScheduler().executeAsync(this.sub);
}
private static JedisClientConfig jedisConfig(String username, String password, boolean ssl) {
return DefaultJedisClientConfig.builder()
.user(username)
.password(password)
.ssl(ssl)
.timeoutMillis(Protocol.DEFAULT_TIMEOUT)
.build();
}
private static HostAndPort parseAddress(String address) {
String[] addressSplit = address.split(":");
String host = addressSplit[0];
int port = addressSplit.length > 1 ? Integer.parseInt(addressSplit[1]) : Protocol.DEFAULT_PORT;
if (username == null) {
this.jedisPool = new JedisPool(new JedisPoolConfig(), host, port, Protocol.DEFAULT_TIMEOUT, password, ssl);
} else {
this.jedisPool = new JedisPool(new JedisPoolConfig(), host, port, Protocol.DEFAULT_TIMEOUT, username, password, ssl);
}
this.sub = new Subscription();
this.plugin.getBootstrap().getScheduler().executeAsync(this.sub);
return new HostAndPort(host, port);
}
@Override
public void sendOutgoingMessage(@NonNull OutgoingMessage outgoingMessage) {
try (Jedis jedis = this.jedisPool.getResource()) {
jedis.publish(CHANNEL, outgoingMessage.asEncodedString());
} catch (Exception e) {
e.printStackTrace();
}
this.jedis.publish(CHANNEL, outgoingMessage.asEncodedString());
}
@Override
public void close() {
this.closing = true;
this.sub.unsubscribe();
this.jedisPool.destroy();
this.jedis.close();
}
private class Subscription extends JedisPubSub implements Runnable {
private static class Subscription extends JedisPubSub implements Runnable {
private final RedisMessenger messenger;
private Subscription(RedisMessenger messenger) {
this.messenger = messenger;
}
@Override
public void run() {
boolean first = true;
while (!RedisMessenger.this.closing && !Thread.interrupted() && !RedisMessenger.this.jedisPool.isClosed()) {
try (Jedis jedis = RedisMessenger.this.jedisPool.getResource()) {
while (!this.messenger.closing && !Thread.interrupted() && this.isRedisAlive()) {
try {
if (first) {
first = false;
} else {
RedisMessenger.this.plugin.getLogger().info("Redis pubsub connection re-established");
this.messenger.plugin.getLogger().info("Redis pubsub connection re-established");
}
jedis.subscribe(this, CHANNEL); // blocking call
this.messenger.jedis.subscribe(this, CHANNEL); // blocking call
} catch (Exception e) {
if (RedisMessenger.this.closing) {
if (this.messenger.closing) {
return;
}
RedisMessenger.this.plugin.getLogger().warn("Redis pubsub connection dropped, trying to re-open the connection", e);
this.messenger.plugin.getLogger().warn("Redis pubsub connection dropped, trying to re-open the connection", e);
try {
unsubscribe();
} catch (Exception ignored) {
@ -126,8 +153,19 @@ public class RedisMessenger implements Messenger {
if (!channel.equals(CHANNEL)) {
return;
}
RedisMessenger.this.consumer.consumeIncomingMessageAsString(msg);
}
this.messenger.consumer.consumeIncomingMessageAsString(msg);
}
private boolean isRedisAlive() {
UnifiedJedis jedis = this.messenger.jedis;
if (jedis instanceof JedisPooled) {
return !((JedisPooled) jedis).getPool().isClosed();
} else if (jedis instanceof JedisCluster) {
return !((JedisCluster) jedis).getClusterNodes().isEmpty();
} else {
throw new RuntimeException("Unknown jedis type: " + jedis.getClass().getName());
}
}
}
}

View File

@ -265,6 +265,7 @@ broadcast-received-log-entries = true
# Settings for Redis.
# Port 6379 is used by default; set address to "host:port" if differs
# Multiple Redis nodes can be specified in the same format as a string list under the name "addresses".
redis {
enabled = false
address = "localhost"

View File

@ -263,6 +263,7 @@ broadcast-received-log-entries = true
# Settings for Redis.
# Port 6379 is used by default; set address to "host:port" if differs
# Multiple Redis nodes can be specified in the same format as a string list under the name "addresses".
redis {
enabled = false
address = "localhost"

View File

@ -257,6 +257,7 @@ broadcast-received-log-entries: true
# Settings for Redis.
# Port 6379 is used by default; set address to "host:port" if differs
# Multiple Redis nodes can be specified in the same format as a string list under the name "addresses".
redis:
enabled: false
address: localhost

View File

@ -265,6 +265,7 @@ broadcast-received-log-entries = true
# Settings for Redis.
# Port 6379 is used by default; set address to "host:port" if differs
# Multiple Redis nodes can be specified in the same format as a string list under the name "addresses".
redis {
enabled = false
address = "localhost"

View File

@ -29,7 +29,7 @@ dependencies {
testImplementation 'org.awaitility:awaitility:4.2.0'
testImplementation 'com.zaxxer:HikariCP:4.0.3'
testImplementation 'redis.clients:jedis:3.5.2'
testImplementation 'redis.clients:jedis:4.4.3'
testImplementation 'io.nats:jnats:2.16.4'
testImplementation 'com.rabbitmq:amqp-client:5.12.0'
testImplementation 'org.postgresql:postgresql:42.6.0'

View File

@ -247,6 +247,7 @@ broadcast-received-log-entries: true
# Settings for Redis.
# Port 6379 is used by default; set address to "host:port" if differs
# Multiple Redis nodes can be specified in the same format as a string list under the name "addresses".
redis:
enabled: false
address: localhost

View File

@ -251,6 +251,7 @@ broadcast-received-log-entries: false
# Settings for Redis.
# Port 6379 is used by default; set address to "host:port" if differs
# Multiple Redis nodes can be specified in the same format as a string list under the name "addresses".
redis:
enabled: false
address: localhost