mirror of
https://github.com/plan-player-analytics/Plan.git
synced 2024-12-27 19:47:49 +01:00
Reduced chance of duplicate refresh processes
- Access lock prevents duplicate processes being placed if two threads enter the same method in such a way that both get before the first one puts the update Future in the Map. - Update threshold prevents all calls without timestamp causing a new refresh process from being created - Bad request 400 prevents timestamp from being too far in the future to avoid bad actor increasing timestamp to create new refresh processes
This commit is contained in:
parent
05a2c20fdd
commit
899d29f2d2
@ -18,7 +18,9 @@ package com.djrapitops.plan.delivery.webserver.resolver.json;
|
|||||||
|
|
||||||
import com.djrapitops.plan.delivery.webserver.cache.DataID;
|
import com.djrapitops.plan.delivery.webserver.cache.DataID;
|
||||||
import com.djrapitops.plan.processing.Processing;
|
import com.djrapitops.plan.processing.Processing;
|
||||||
|
import com.djrapitops.plan.settings.config.PlanConfig;
|
||||||
import com.djrapitops.plan.storage.json.JSONStorage;
|
import com.djrapitops.plan.storage.json.JSONStorage;
|
||||||
|
import com.djrapitops.plan.utilities.UnitSemaphoreAccessLock;
|
||||||
|
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
import javax.inject.Singleton;
|
import javax.inject.Singleton;
|
||||||
@ -28,6 +30,7 @@ import java.util.UUID;
|
|||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
@ -39,19 +42,26 @@ import java.util.function.Supplier;
|
|||||||
@Singleton
|
@Singleton
|
||||||
public class AsyncJSONResolverService {
|
public class AsyncJSONResolverService {
|
||||||
|
|
||||||
|
private final PlanConfig config;
|
||||||
private final Processing processing;
|
private final Processing processing;
|
||||||
private final JSONStorage jsonStorage;
|
private final JSONStorage jsonStorage;
|
||||||
private final Map<String, Future<JSONStorage.StoredJSON>> currentlyProcessing;
|
private final Map<String, Future<JSONStorage.StoredJSON>> currentlyProcessing;
|
||||||
|
private final Map<String, Long> previousUpdates;
|
||||||
|
private final UnitSemaphoreAccessLock accessLock; // Access lock prevents double processing same resource
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public AsyncJSONResolverService(
|
public AsyncJSONResolverService(
|
||||||
|
PlanConfig config,
|
||||||
Processing processing,
|
Processing processing,
|
||||||
JSONStorage jsonStorage
|
JSONStorage jsonStorage
|
||||||
) {
|
) {
|
||||||
|
this.config = config;
|
||||||
this.processing = processing;
|
this.processing = processing;
|
||||||
this.jsonStorage = jsonStorage;
|
this.jsonStorage = jsonStorage;
|
||||||
|
|
||||||
currentlyProcessing = new ConcurrentHashMap<>();
|
currentlyProcessing = new ConcurrentHashMap<>();
|
||||||
|
previousUpdates = new ConcurrentHashMap<>();
|
||||||
|
accessLock = new UnitSemaphoreAccessLock();
|
||||||
}
|
}
|
||||||
|
|
||||||
public <T> JSONStorage.StoredJSON resolve(long newerThanTimestamp, DataID dataID, UUID serverUUID, Function<UUID, T> creator) {
|
public <T> JSONStorage.StoredJSON resolve(long newerThanTimestamp, DataID dataID, UUID serverUUID, Function<UUID, T> creator) {
|
||||||
@ -64,17 +74,26 @@ public class AsyncJSONResolverService {
|
|||||||
}
|
}
|
||||||
// No new enough version, let's refresh and send old version of the file
|
// No new enough version, let's refresh and send old version of the file
|
||||||
|
|
||||||
|
long updateThreshold = TimeUnit.MINUTES.toMillis(1L); // TODO make configurable
|
||||||
|
|
||||||
// Check if the json is already being created
|
// Check if the json is already being created
|
||||||
Future<JSONStorage.StoredJSON> updatedJSON = currentlyProcessing.get(identifier);
|
Future<JSONStorage.StoredJSON> updatedJSON;
|
||||||
if (updatedJSON == null) {
|
accessLock.enter();
|
||||||
// Submit a task to refresh the data if the json is old
|
try {
|
||||||
updatedJSON = processing.submitNonCritical(() -> {
|
updatedJSON = currentlyProcessing.get(identifier);
|
||||||
JSONStorage.StoredJSON created = jsonStorage.storeJson(identifier, creator.apply(serverUUID));
|
if (updatedJSON == null && previousUpdates.getOrDefault(identifier, 0L) < newerThanTimestamp - updateThreshold) {
|
||||||
currentlyProcessing.remove(identifier);
|
// Submit a task to refresh the data if the json is old
|
||||||
jsonStorage.invalidateOlder(identifier, created.timestamp);
|
updatedJSON = processing.submitNonCritical(() -> {
|
||||||
return created;
|
JSONStorage.StoredJSON created = jsonStorage.storeJson(identifier, creator.apply(serverUUID));
|
||||||
});
|
currentlyProcessing.remove(identifier);
|
||||||
currentlyProcessing.put(identifier, updatedJSON);
|
jsonStorage.invalidateOlder(identifier, created.timestamp);
|
||||||
|
previousUpdates.put(identifier, created.timestamp);
|
||||||
|
return created;
|
||||||
|
});
|
||||||
|
currentlyProcessing.put(identifier, updatedJSON);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
accessLock.exit();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get an old version from cache
|
// Get an old version from cache
|
||||||
@ -84,6 +103,8 @@ public class AsyncJSONResolverService {
|
|||||||
} else {
|
} else {
|
||||||
// If there is no version available, block thread until the new finishes being generated.
|
// If there is no version available, block thread until the new finishes being generated.
|
||||||
try {
|
try {
|
||||||
|
// updatedJSON is not null in this case ever because previousUpdates.getOrDefault(..., 0L) gets 0.
|
||||||
|
//noinspection ConstantConditions
|
||||||
return updatedJSON.get();
|
return updatedJSON.get();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
@ -104,17 +125,26 @@ public class AsyncJSONResolverService {
|
|||||||
}
|
}
|
||||||
// No new enough version, let's refresh and send old version of the file
|
// No new enough version, let's refresh and send old version of the file
|
||||||
|
|
||||||
|
long updateThreshold = TimeUnit.MINUTES.toMillis(1L); // TODO make configurable
|
||||||
|
|
||||||
// Check if the json is already being created
|
// Check if the json is already being created
|
||||||
Future<JSONStorage.StoredJSON> updatedJSON = currentlyProcessing.get(identifier);
|
Future<JSONStorage.StoredJSON> updatedJSON;
|
||||||
if (updatedJSON == null) {
|
accessLock.enter();
|
||||||
// Submit a task to refresh the data if the json is old
|
try {
|
||||||
updatedJSON = processing.submitNonCritical(() -> {
|
updatedJSON = currentlyProcessing.get(identifier);
|
||||||
JSONStorage.StoredJSON created = jsonStorage.storeJson(identifier, creator.get());
|
if (updatedJSON == null && previousUpdates.getOrDefault(identifier, 0L) < newerThanTimestamp - updateThreshold) {
|
||||||
currentlyProcessing.remove(identifier);
|
// Submit a task to refresh the data if the json is old
|
||||||
jsonStorage.invalidateOlder(identifier, created.timestamp);
|
updatedJSON = processing.submitNonCritical(() -> {
|
||||||
return created;
|
JSONStorage.StoredJSON created = jsonStorage.storeJson(identifier, creator.get());
|
||||||
});
|
currentlyProcessing.remove(identifier);
|
||||||
currentlyProcessing.put(identifier, updatedJSON);
|
jsonStorage.invalidateOlder(identifier, created.timestamp);
|
||||||
|
previousUpdates.put(identifier, created.timestamp);
|
||||||
|
return created;
|
||||||
|
});
|
||||||
|
currentlyProcessing.put(identifier, updatedJSON);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
accessLock.exit();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get an old version from cache
|
// Get an old version from cache
|
||||||
@ -124,6 +154,8 @@ public class AsyncJSONResolverService {
|
|||||||
} else {
|
} else {
|
||||||
// If there is no version available, block thread until the new finishes being generated.
|
// If there is no version available, block thread until the new finishes being generated.
|
||||||
try {
|
try {
|
||||||
|
// updatedJSON is not null in this case ever because previousUpdates.getOrDefault(..., 0L) gets 0.
|
||||||
|
//noinspection ConstantConditions
|
||||||
return updatedJSON.get();
|
return updatedJSON.get();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
|
@ -28,6 +28,7 @@ import com.djrapitops.plan.identification.Identifiers;
|
|||||||
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -72,9 +73,14 @@ public class ServerTabJSONResolver<T> implements Resolver {
|
|||||||
|
|
||||||
private long getTimestamp(Request request) {
|
private long getTimestamp(Request request) {
|
||||||
try {
|
try {
|
||||||
return request.getQuery().get("timestamp")
|
long currentTime = System.currentTimeMillis();
|
||||||
|
long timestamp = request.getQuery().get("timestamp")
|
||||||
.map(Long::parseLong)
|
.map(Long::parseLong)
|
||||||
.orElseGet(System::currentTimeMillis);
|
.orElse(currentTime);
|
||||||
|
if (currentTime + TimeUnit.SECONDS.toMillis(10L) < timestamp) {
|
||||||
|
throw new BadRequestException("Attempt to get data from the future! " + timestamp + " > " + currentTime);
|
||||||
|
}
|
||||||
|
return timestamp;
|
||||||
} catch (NumberFormatException nonNumberTimestamp) {
|
} catch (NumberFormatException nonNumberTimestamp) {
|
||||||
throw new BadRequestException("'timestamp' was not a number: " + nonNumberTimestamp.getMessage());
|
throw new BadRequestException("'timestamp' was not a number: " + nonNumberTimestamp.getMessage());
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,55 @@
|
|||||||
|
/*
|
||||||
|
* This file is part of Player Analytics (Plan).
|
||||||
|
*
|
||||||
|
* Plan is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU Lesser General Public License v3 as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* Plan is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU Lesser General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Lesser General Public License
|
||||||
|
* along with Plan. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
package com.djrapitops.plan.utilities;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Synchronizes a critical section of code so that only a single thread can access it at a time.
|
||||||
|
*
|
||||||
|
* @author Rsl1122
|
||||||
|
*/
|
||||||
|
public class UnitSemaphoreAccessLock {
|
||||||
|
|
||||||
|
private final AtomicBoolean accessing;
|
||||||
|
private final Object lockObject;
|
||||||
|
|
||||||
|
public UnitSemaphoreAccessLock() {
|
||||||
|
accessing = new AtomicBoolean(false);
|
||||||
|
lockObject = new Object();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void enter() {
|
||||||
|
try {
|
||||||
|
if (accessing.get()) {
|
||||||
|
synchronized (lockObject) {
|
||||||
|
lockObject.wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
accessing.set(true);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void exit() {
|
||||||
|
accessing.set(false);
|
||||||
|
synchronized (lockObject) {
|
||||||
|
lockObject.notify();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user