Tidy up RabbitMQMessenger a bit

This commit is contained in:
Luck 2021-02-05 12:00:16 +00:00
parent d8aefd23d3
commit c8b89f245e
No known key found for this signature in database
GPG Key ID: EFA9B3EC5FD90F8B
2 changed files with 77 additions and 69 deletions

View File

@ -51,6 +51,9 @@ public class RabbitMQMessenger implements Messenger {
private static final int DEFAULT_PORT = 5672;
private static final String EXCHANGE = "luckperms";
private static final String ROUTING_KEY = "luckperms:update";
private static final boolean CHANNEL_PROP_DURABLE = false;
private static final boolean CHANNEL_PROP_EXCLUSIVE = true;
private static final boolean CHANNEL_PROP_AUTO_DELETE = true;
private final LuckPermsPlugin plugin;
private final IncomingMessageConsumer consumer;
@ -77,7 +80,7 @@ public class RabbitMQMessenger implements Messenger {
this.connectionFactory.setUsername(username);
this.connectionFactory.setPassword(password);
this.sub = new Subscription(this);
this.sub = new Subscription();
this.plugin.getBootstrap().getScheduler().executeAsync(this.sub);
}
@ -103,20 +106,68 @@ public class RabbitMQMessenger implements Messenger {
}
}
private static class Subscription implements Runnable {
private final RabbitMQMessenger parent;
private boolean isClosed = false;
private boolean firstStartup = true;
/**
* Checks the connection, and re-opens it if necessary.
*
* @return true if the connection is now alive, false otherwise
*/
private boolean checkAndReopenConnection(boolean firstStartup) {
boolean connectionAlive = this.connection != null && this.connection.isOpen();
boolean channelAlive = this.channel != null && this.channel.isOpen();
private Subscription(RabbitMQMessenger parent) {
this.parent = parent;
if (connectionAlive && channelAlive) {
return true;
}
// cleanup existing
if (this.channel != null && this.channel.isOpen()) {
try {
this.channel.close();
} catch (Exception e) {
// ignore
}
}
if (this.connection != null && this.connection.isOpen()) {
try {
this.connection.close();
} catch (Exception e) {
// ignore
}
}
// (re)create
if (!firstStartup) {
this.plugin.getLogger().warn("RabbitMQ pubsub connection dropped, trying to re-open the connection");
}
try {
this.connection = this.connectionFactory.newConnection();
this.channel = this.connection.createChannel();
String queue = this.channel.queueDeclare("", CHANNEL_PROP_DURABLE, CHANNEL_PROP_EXCLUSIVE, CHANNEL_PROP_AUTO_DELETE, null).getQueue();
this.channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC, CHANNEL_PROP_DURABLE, CHANNEL_PROP_AUTO_DELETE, null);
this.channel.queueBind(queue, EXCHANGE, ROUTING_KEY);
this.channel.basicConsume(queue, true, this.sub, tag -> {});
if (!firstStartup) {
this.plugin.getLogger().info("RabbitMQ pubsub connection re-established");
}
return true;
} catch (Exception ignored) {
return false;
}
}
private class Subscription implements Runnable, DeliverCallback {
private boolean isClosed = false;
@Override
public void run() {
boolean firstStartup = true;
while (!Thread.interrupted() && !this.isClosed) {
try {
if (!checkAndReopenConnection()) {
if (!checkAndReopenConnection(firstStartup)) {
// Sleep for 5 seconds to prevent massive spam in console
Thread.sleep(5000);
continue;
@ -124,61 +175,23 @@ public class RabbitMQMessenger implements Messenger {
// Check connection life every every 30 seconds
Thread.sleep(30_000);
} catch (InterruptedException ie) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
this.firstStartup = false;
firstStartup = false;
}
}
}
private boolean checkAndReopenConnection() {
boolean channelIsDead = this.parent.channel == null || !this.parent.channel.isOpen();
if (channelIsDead) {
boolean connectionIsDead = this.parent.connection == null || !this.parent.connection.isOpen();
if (connectionIsDead) {
if (!this.firstStartup) {
this.parent.plugin.getLogger().warn("RabbitMQ pubsub connection dropped, trying to re-open the connection");
}
try {
this.parent.connection = this.parent.connectionFactory.newConnection();
this.parent.channel = this.parent.connection.createChannel();
String queue = this.parent.channel.queueDeclare("", false, true, true, null).getQueue();
this.parent.channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC, false, true, null);
this.parent.channel.queueBind(queue, EXCHANGE, ROUTING_KEY);
this.parent.channel.basicConsume(queue, true, new ChannelListener(), (consumerTag) -> { });
if (!this.firstStartup) {
this.parent.plugin.getLogger().info("RabbitMQ pubsub connection re-established");
}
return true;
} catch (Exception ignored) {
return false;
}
} else {
try {
this.parent.channel = this.parent.connection.createChannel();
return true;
} catch (Exception ignored) {
return false;
}
}
}
return true;
}
private class ChannelListener implements DeliverCallback {
@Override
public void handle(String consumerTag, Delivery message) {
try {
byte[] data = message.getBody();
ByteArrayDataInput input = ByteStreams.newDataInput(data);
String msg = input.readUTF();
Subscription.this.parent.consumer.consumeIncomingMessageAsString(msg);
} catch (Exception e) {
e.printStackTrace();
}
@Override
public void handle(String consumerTag, Delivery message) {
try {
byte[] data = message.getBody();
ByteArrayDataInput input = ByteStreams.newDataInput(data);
String msg = input.readUTF();
RabbitMQMessenger.this.consumer.consumeIncomingMessageAsString(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -63,7 +63,7 @@ public class RedisMessenger implements Messenger {
this.jedisPool = new JedisPool(new JedisPoolConfig(), host, port, Protocol.DEFAULT_TIMEOUT, password, ssl);
this.sub = new Subscription(this);
this.sub = new Subscription();
this.plugin.getBootstrap().getScheduler().executeAsync(this.sub);
}
@ -82,26 +82,21 @@ public class RedisMessenger implements Messenger {
this.jedisPool.destroy();
}
private static class Subscription extends JedisPubSub implements Runnable {
private final RedisMessenger parent;
private Subscription(RedisMessenger parent) {
this.parent = parent;
}
private class Subscription extends JedisPubSub implements Runnable {
@Override
public void run() {
boolean wasBroken = false;
while (!Thread.interrupted() && !this.parent.jedisPool.isClosed()) {
try (Jedis jedis = this.parent.jedisPool.getResource()) {
while (!Thread.interrupted() && !RedisMessenger.this.jedisPool.isClosed()) {
try (Jedis jedis = RedisMessenger.this.jedisPool.getResource()) {
if (wasBroken) {
this.parent.plugin.getLogger().info("Redis pubsub connection re-established");
RedisMessenger.this.plugin.getLogger().info("Redis pubsub connection re-established");
wasBroken = false;
}
jedis.subscribe(this, CHANNEL);
} catch (Exception e) {
wasBroken = true;
this.parent.plugin.getLogger().warn("Redis pubsub connection dropped, trying to re-open the connection", e);
RedisMessenger.this.plugin.getLogger().warn("Redis pubsub connection dropped, trying to re-open the connection", e);
try {
unsubscribe();
} catch (Exception ignored) {
@ -123,7 +118,7 @@ public class RedisMessenger implements Messenger {
if (!channel.equals(CHANNEL)) {
return;
}
this.parent.consumer.consumeIncomingMessageAsString(msg);
RedisMessenger.this.consumer.consumeIncomingMessageAsString(msg);
}
}