Fix concurrency issues with json cache and database
This commit is contained in:
parent
9622f6a614
commit
5ba6e0dc9f
|
@ -22,6 +22,7 @@ import com.djrapitops.plan.delivery.formatting.Formatters;
|
|||
import com.djrapitops.plan.settings.config.PlanConfig;
|
||||
import com.djrapitops.plan.settings.config.paths.WebserverSettings;
|
||||
import com.djrapitops.plan.storage.file.PlanFiles;
|
||||
import com.djrapitops.plan.utilities.ReentrantLockHelper;
|
||||
import net.playeranalytics.plugin.scheduling.RunnableFactory;
|
||||
import net.playeranalytics.plugin.scheduling.TimeAmount;
|
||||
import net.playeranalytics.plugin.server.PluginLogger;
|
||||
|
@ -57,6 +58,7 @@ public class JSONFileStorage implements JSONStorage {
|
|||
|
||||
private final Path jsonDirectory;
|
||||
|
||||
private final ReentrantLockHelper readWriteProtectionLock = new ReentrantLockHelper();
|
||||
private final Pattern timestampRegex = Pattern.compile(".*-([0-9]*).json");
|
||||
private static final String JSON_FILE_EXTENSION = ".json";
|
||||
|
||||
|
@ -71,7 +73,6 @@ public class JSONFileStorage implements JSONStorage {
|
|||
this.logger = logger;
|
||||
|
||||
dateFormatter = formatters.yearLong();
|
||||
|
||||
jsonDirectory = files.getJSONStorageDirectory();
|
||||
}
|
||||
|
||||
|
@ -90,15 +91,21 @@ public class JSONFileStorage implements JSONStorage {
|
|||
public StoredJSON storeJson(String identifier, String json, long timestamp) {
|
||||
Path writingTo = jsonDirectory.resolve(identifier + '-' + timestamp + JSON_FILE_EXTENSION);
|
||||
String jsonToWrite = addMissingTimestamp(json, timestamp);
|
||||
try {
|
||||
if (!Files.isSymbolicLink(jsonDirectory)) Files.createDirectories(jsonDirectory);
|
||||
Files.write(writingTo, jsonToWrite.getBytes(StandardCharsets.UTF_8), StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);
|
||||
} catch (IOException e) {
|
||||
logger.warn("Could not write a file to " + writingTo.toFile().getAbsolutePath() + ": " + e.getMessage());
|
||||
}
|
||||
write(writingTo, jsonToWrite);
|
||||
return new StoredJSON(jsonToWrite, timestamp);
|
||||
}
|
||||
|
||||
private void write(Path writingTo, String jsonToWrite) {
|
||||
readWriteProtectionLock.performWriteOperation(() -> {
|
||||
try {
|
||||
if (!Files.isSymbolicLink(jsonDirectory)) Files.createDirectories(jsonDirectory);
|
||||
Files.write(writingTo, jsonToWrite.getBytes(StandardCharsets.UTF_8), StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);
|
||||
} catch (IOException e) {
|
||||
logger.warn("Could not write a file to " + writingTo.toFile().getAbsolutePath() + ": " + e.getMessage());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private String addMissingTimestamp(String json, long timestamp) {
|
||||
String writtenJSON;
|
||||
if (!json.startsWith("{\"") || json.contains("timestamp")) {
|
||||
|
@ -132,22 +139,24 @@ public class JSONFileStorage implements JSONStorage {
|
|||
}
|
||||
|
||||
private StoredJSON readStoredJSON(File from) {
|
||||
Matcher timestampMatch = timestampRegex.matcher(from.getName());
|
||||
if (timestampMatch.find()) {
|
||||
try (Stream<String> lines = Files.lines(from.toPath())) {
|
||||
long timestamp = Long.parseLong(timestampMatch.group(1));
|
||||
StringBuilder json = new StringBuilder();
|
||||
lines.forEach(json::append);
|
||||
return new StoredJSON(json.toString(), timestamp);
|
||||
} catch (IOException e) {
|
||||
logger.warn(jsonDirectory.toFile().getAbsolutePath() + " file '" + from.getName() + "' could not be read: " + e.getMessage());
|
||||
} catch (NumberFormatException e) {
|
||||
logger.warn(jsonDirectory.toFile().getAbsolutePath() + " contained a file '" + from.getName() + "' with improperly formatted -timestamp (could not parse number). This file was not placed there by Plan!");
|
||||
return readWriteProtectionLock.performReadOperation(() -> {
|
||||
Matcher timestampMatch = timestampRegex.matcher(from.getName());
|
||||
if (timestampMatch.find()) {
|
||||
try (Stream<String> lines = Files.lines(from.toPath())) {
|
||||
long timestamp = Long.parseLong(timestampMatch.group(1));
|
||||
StringBuilder json = new StringBuilder();
|
||||
lines.forEach(json::append);
|
||||
return new StoredJSON(json.toString(), timestamp);
|
||||
} catch (IOException e) {
|
||||
logger.warn(jsonDirectory.toFile().getAbsolutePath() + " file '" + from.getName() + "' could not be read: " + e.getMessage());
|
||||
} catch (NumberFormatException e) {
|
||||
logger.warn(jsonDirectory.toFile().getAbsolutePath() + " contained a file '" + from.getName() + "' with improperly formatted -timestamp (could not parse number). This file was not placed there by Plan!");
|
||||
}
|
||||
} else {
|
||||
logger.warn(jsonDirectory.toFile().getAbsolutePath() + " contained a file '" + from.getName() + "' that has no -timestamp. This file was not placed there by Plan!");
|
||||
}
|
||||
} else {
|
||||
logger.warn(jsonDirectory.toFile().getAbsolutePath() + " contained a file '" + from.getName() + "' that has no -timestamp. This file was not placed there by Plan!");
|
||||
}
|
||||
return null;
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -249,14 +258,16 @@ public class JSONFileStorage implements JSONStorage {
|
|||
}
|
||||
|
||||
private void deleteFiles(List<File> toDelete) {
|
||||
for (File fileToDelete : toDelete) {
|
||||
try {
|
||||
Files.delete(fileToDelete.toPath());
|
||||
} catch (IOException e) {
|
||||
// Failed to delete, set for deletion on next server shutdown.
|
||||
fileToDelete.deleteOnExit();
|
||||
readWriteProtectionLock.performWriteOperation(() -> {
|
||||
for (File fileToDelete : toDelete) {
|
||||
try {
|
||||
Files.delete(fileToDelete.toPath());
|
||||
} catch (IOException e) {
|
||||
// Failed to delete, set for deletion on next server shutdown.
|
||||
fileToDelete.deleteOnExit();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Singleton
|
||||
|
|
|
@ -33,7 +33,7 @@ public abstract class AbstractDatabase implements Database {
|
|||
|
||||
protected AbstractDatabase() {
|
||||
state = State.CLOSED;
|
||||
accessLock = new DBAccessLock();
|
||||
accessLock = new DBAccessLock(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -34,9 +34,11 @@ import java.util.concurrent.locks.ReentrantLock;
|
|||
*/
|
||||
public class DBAccessLock {
|
||||
|
||||
private final Database database;
|
||||
private final ReentrantLock reentrantLock;
|
||||
|
||||
public DBAccessLock() {
|
||||
public DBAccessLock(Database database) {
|
||||
this.database = database;
|
||||
reentrantLock = new ReentrantLock();
|
||||
}
|
||||
|
||||
|
@ -59,16 +61,28 @@ public class DBAccessLock {
|
|||
operation.apply();
|
||||
return;
|
||||
}
|
||||
try {
|
||||
reentrantLock.lockInterruptibly();
|
||||
if (isDatabasePatching()) {
|
||||
boolean interrupted = false;
|
||||
try {
|
||||
reentrantLock.lockInterruptibly();
|
||||
operation.apply();
|
||||
} catch (InterruptedException e) {
|
||||
interrupted = true;
|
||||
Thread.currentThread().interrupt();
|
||||
} finally {
|
||||
if (!interrupted) {
|
||||
reentrantLock.unlock();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
operation.apply();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
} finally {
|
||||
reentrantLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isDatabasePatching() {
|
||||
return database.getState() != Database.State.OPEN && database.getState() != Database.State.CLOSING;
|
||||
}
|
||||
|
||||
public <T, E extends Exception> T performDatabaseOperation(ThrowingSupplier<T, E> operation) throws E {
|
||||
return performDatabaseOperation(operation, false);
|
||||
}
|
||||
|
@ -81,14 +95,22 @@ public class DBAccessLock {
|
|||
if (isOperationCriticalTransaction) {
|
||||
return operation.get();
|
||||
}
|
||||
try {
|
||||
reentrantLock.lockInterruptibly();
|
||||
if (isDatabasePatching()) {
|
||||
boolean interrupted = false;
|
||||
try {
|
||||
reentrantLock.lockInterruptibly();
|
||||
return operation.get();
|
||||
} catch (InterruptedException e) {
|
||||
interrupted = true;
|
||||
Thread.currentThread().interrupt();
|
||||
throw new DBClosedException("Operation interrupted");
|
||||
} finally {
|
||||
if (!interrupted) {
|
||||
reentrantLock.unlock();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return operation.get();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new DBClosedException("Operation interrupted");
|
||||
} finally {
|
||||
reentrantLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,68 @@
|
|||
/*
|
||||
* This file is part of Player Analytics (Plan).
|
||||
*
|
||||
* Plan is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Lesser General Public License v3 as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Plan is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Lesser General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with Plan. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package com.djrapitops.plan.utilities;
|
||||
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* Allows using ReentrantLock with lambdas.
|
||||
*
|
||||
* @author AuroraLS3
|
||||
*/
|
||||
public class ReentrantLockHelper {
|
||||
|
||||
private final ReentrantReadWriteLock.WriteLock writeLock;
|
||||
private final ReentrantReadWriteLock.ReadLock readLock;
|
||||
|
||||
public ReentrantLockHelper() {
|
||||
var lock = new ReentrantReadWriteLock();
|
||||
writeLock = lock.writeLock();
|
||||
readLock = lock.readLock();
|
||||
}
|
||||
|
||||
public void performWriteOperation(Runnable runnable) {
|
||||
boolean interrupted = false;
|
||||
try {
|
||||
writeLock.lockInterruptibly();
|
||||
runnable.run();
|
||||
} catch (InterruptedException e) {
|
||||
interrupted = true;
|
||||
Thread.currentThread().interrupt();
|
||||
} finally {
|
||||
if (!interrupted) {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public <T> T performReadOperation(Supplier<T> supplier) {
|
||||
boolean interrupted = false;
|
||||
try {
|
||||
readLock.lockInterruptibly();
|
||||
return supplier.get();
|
||||
} catch (InterruptedException e) {
|
||||
interrupted = true;
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IllegalStateException("Thread continued after being interrupted, this error should never happen.", e);
|
||||
} finally {
|
||||
if (!interrupted) {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue