Don't attempt further poll operations after SQL messenger has closed (#2516)

This commit is contained in:
Luck 2020-08-13 10:49:18 +01:00
parent ee3bb22c2c
commit 6f90e545fc
No known key found for this signature in database
GPG Key ID: EFA9B3EC5FD90F8B

View File

@ -36,6 +36,8 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** /**
* An implementation of {@link Messenger} using SQL. * An implementation of {@link Messenger} using SQL.
@ -45,6 +47,9 @@ public abstract class AbstractSqlMessenger implements Messenger {
private final IncomingMessageConsumer consumer; private final IncomingMessageConsumer consumer;
private long lastId = -1; private long lastId = -1;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private boolean closed = false;
protected AbstractSqlMessenger(IncomingMessageConsumer consumer) { protected AbstractSqlMessenger(IncomingMessageConsumer consumer) {
this.consumer = consumer; this.consumer = consumer;
} }
@ -82,6 +87,12 @@ public abstract class AbstractSqlMessenger implements Messenger {
@Override @Override
public void sendOutgoingMessage(@NonNull OutgoingMessage outgoingMessage) { public void sendOutgoingMessage(@NonNull OutgoingMessage outgoingMessage) {
this.lock.readLock().lock();
if (this.closed) {
this.lock.readLock().unlock();
return;
}
try (Connection c = getConnection()) { try (Connection c = getConnection()) {
try (PreparedStatement ps = c.prepareStatement("INSERT INTO `" + getTableName() + "` (`time`, `msg`) VALUES(NOW(), ?)")) { try (PreparedStatement ps = c.prepareStatement("INSERT INTO `" + getTableName() + "` (`time`, `msg`) VALUES(NOW(), ?)")) {
ps.setString(1, outgoingMessage.asEncodedString()); ps.setString(1, outgoingMessage.asEncodedString());
@ -89,10 +100,18 @@ public abstract class AbstractSqlMessenger implements Messenger {
} }
} catch (SQLException e) { } catch (SQLException e) {
e.printStackTrace(); e.printStackTrace();
} finally {
this.lock.readLock().unlock();
} }
} }
public void pollMessages() { public void pollMessages() {
this.lock.readLock().lock();
if (this.closed) {
this.lock.readLock().unlock();
return;
}
try (Connection c = getConnection()) { try (Connection c = getConnection()) {
try (PreparedStatement ps = c.prepareStatement("SELECT `id`, `msg` FROM `" + getTableName() + "` WHERE `id` > ? AND (NOW() - `time` < 30)")) { try (PreparedStatement ps = c.prepareStatement("SELECT `id`, `msg` FROM `" + getTableName() + "` WHERE `id` > ? AND (NOW() - `time` < 30)")) {
ps.setLong(1, this.lastId); ps.setLong(1, this.lastId);
@ -108,18 +127,36 @@ public abstract class AbstractSqlMessenger implements Messenger {
} }
} catch (SQLException e) { } catch (SQLException e) {
e.printStackTrace(); e.printStackTrace();
} finally {
this.lock.readLock().unlock();
} }
} }
public void runHousekeeping() { public void runHousekeeping() {
this.lock.readLock().lock();
if (this.closed) {
this.lock.readLock().unlock();
return;
}
try (Connection c = getConnection()) { try (Connection c = getConnection()) {
try (PreparedStatement ps = c.prepareStatement("DELETE FROM `" + getTableName() + "` WHERE (NOW() - `time` > 60)")) { try (PreparedStatement ps = c.prepareStatement("DELETE FROM `" + getTableName() + "` WHERE (NOW() - `time` > 60)")) {
ps.execute(); ps.execute();
} }
} catch (SQLException e) { } catch (SQLException e) {
e.printStackTrace(); e.printStackTrace();
} finally {
this.lock.readLock().unlock();
} }
} }
@Override
public void close() {
this.lock.writeLock().lock();
try {
this.closed = true;
} finally {
this.lock.writeLock().unlock();
}
}
} }