Refactor file watcher

This commit is contained in:
Luck 2020-02-24 18:26:52 +00:00
parent 2ed45c92a6
commit 5ce8217cd5
No known key found for this signature in database
GPG Key ID: EFA9B3EC5FD90F8B
8 changed files with 340 additions and 252 deletions

View File

@ -50,7 +50,7 @@ import me.lucko.luckperms.common.sender.Sender;
import me.lucko.luckperms.common.storage.Storage;
import me.lucko.luckperms.common.storage.StorageFactory;
import me.lucko.luckperms.common.storage.StorageType;
import me.lucko.luckperms.common.storage.implementation.file.FileWatcher;
import me.lucko.luckperms.common.storage.implementation.file.watcher.FileWatcher;
import me.lucko.luckperms.common.tasks.SyncTask;
import me.lucko.luckperms.common.treeview.PermissionRegistry;
import me.lucko.luckperms.common.verbose.VerboseHandler;
@ -232,7 +232,7 @@ public abstract class AbstractLuckPermsPlugin implements LuckPermsPlugin {
// close file watcher
if (this.fileWatcher != null) {
this.fileWatcher.close();
//this.fileWatcher.close();
}
// unregister api

View File

@ -49,7 +49,7 @@ import me.lucko.luckperms.common.plugin.logging.PluginLogger;
import me.lucko.luckperms.common.plugin.util.AbstractConnectionListener;
import me.lucko.luckperms.common.sender.Sender;
import me.lucko.luckperms.common.storage.Storage;
import me.lucko.luckperms.common.storage.implementation.file.FileWatcher;
import me.lucko.luckperms.common.storage.implementation.file.watcher.FileWatcher;
import me.lucko.luckperms.common.tasks.SyncTask;
import me.lucko.luckperms.common.treeview.PermissionRegistry;
import me.lucko.luckperms.common.verbose.VerboseHandler;

View File

@ -30,6 +30,7 @@ import me.lucko.luckperms.common.bulkupdate.comparison.Constraint;
import me.lucko.luckperms.common.node.model.HeldNodeImpl;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import me.lucko.luckperms.common.storage.implementation.file.loader.ConfigurateLoader;
import me.lucko.luckperms.common.storage.implementation.file.watcher.FileWatcher;
import me.lucko.luckperms.common.util.Iterators;
import net.luckperms.api.node.HeldNode;

View File

@ -1,209 +0,0 @@
/*
* This file is part of LuckPerms, licensed under the MIT License.
*
* Copyright (c) lucko (Luck) <luck@lucko.me>
* Copyright (c) contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package me.lucko.luckperms.common.storage.implementation.file;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import me.lucko.luckperms.common.util.Iterators;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class FileWatcher {
@SuppressWarnings("unchecked")
private static final WatchEvent.Kind<Path>[] KINDS = new WatchEvent.Kind[]{
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY
};
private final Path basePath;
private final Map<Path, WatchedLocation> watchedLocations;
// the watchservice instance
private final WatchService watchService;
private boolean initialised = false;
public FileWatcher(LuckPermsPlugin plugin, Path basePath) throws IOException {
this.watchedLocations = Collections.synchronizedMap(new HashMap<>());
this.basePath = basePath;
this.watchService = basePath.getFileSystem().newWatchService();
plugin.getBootstrap().getScheduler().asyncLater(this::initLocations, 5, TimeUnit.SECONDS);
plugin.getBootstrap().getScheduler().asyncRepeating(this::tick, 1, TimeUnit.SECONDS);
}
public WatchedLocation getWatcher(Path path) {
Path relativePath = this.basePath.relativize(path);
return this.watchedLocations.computeIfAbsent(relativePath, p -> new WatchedLocation(this, p));
}
public void close() {
if (this.watchService == null) {
return;
}
try {
this.watchService.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void initLocations() {
for (WatchedLocation loc : this.watchedLocations.values()) {
try {
loc.setup();
} catch (IOException e) {
e.printStackTrace();
}
}
this.initialised = true;
}
private void tick() {
List<Path> expired = new ArrayList<>();
for (Map.Entry<Path, WatchedLocation> ent : this.watchedLocations.entrySet()) {
boolean valid = ent.getValue().tick();
if (!valid) {
new RuntimeException("WatchKey no longer valid: " + ent.getKey().toString()).printStackTrace();
expired.add(ent.getKey());
}
}
expired.forEach(this.watchedLocations::remove);
}
private static boolean isFileTemporary(String fileName) {
return fileName.endsWith(".tmp") || fileName.endsWith(".swp") || fileName.endsWith(".swx") || fileName.endsWith(".swpz");
}
/**
* Encapsulates a "watcher" in a specific directory.
*/
public final class WatchedLocation {
// the parent watcher
private final FileWatcher watcher;
// the absolute path to the directory being watched
private final Path absolutePath;
// the times of recent changes
private final Map<String, Long> lastChange = Collections.synchronizedMap(new HashMap<>());
// if the key is registered
private boolean ready = false;
// the watch key
private WatchKey key = null;
// the callback functions
private final List<Consumer<Path>> callbacks = new CopyOnWriteArrayList<>();
private WatchedLocation(FileWatcher watcher, Path relativePath) {
this.watcher = watcher;
this.absolutePath = this.watcher.basePath.resolve(relativePath);
}
private synchronized void setup() throws IOException {
if (this.ready) {
return;
}
this.key = this.absolutePath.register(this.watcher.watchService, KINDS);
this.ready = true;
}
private boolean tick() {
if (!this.ready) {
// await init
if (!FileWatcher.this.initialised) {
return true;
}
try {
setup();
return true;
} catch (IOException e) {
e.printStackTrace();
return false;
}
}
// remove old change entries.
long expireTime = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(4);
this.lastChange.values().removeIf(lastChange -> lastChange < expireTime);
List<WatchEvent<?>> watchEvents = this.key.pollEvents();
for (WatchEvent<?> event : watchEvents) {
Path context = (Path) event.context();
if (context == null) {
continue;
}
String fileName = context.toString();
// ignore temporary changes
if (isFileTemporary(fileName)) {
continue;
}
// ignore changes already registered to the system
if (this.lastChange.containsKey(fileName)) {
continue;
}
this.lastChange.put(fileName, System.currentTimeMillis());
// process the change
Iterators.tryIterate(this.callbacks, cb -> cb.accept(context));
}
// reset the watch key.
return this.key.reset();
}
public void recordChange(String fileName) {
this.lastChange.put(fileName, System.currentTimeMillis());
}
public void addListener(Consumer<Path> updateConsumer) {
this.callbacks.add(updateConsumer);
}
}
}

View File

@ -31,6 +31,7 @@ import me.lucko.luckperms.common.model.User;
import me.lucko.luckperms.common.node.model.HeldNodeImpl;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import me.lucko.luckperms.common.storage.implementation.file.loader.ConfigurateLoader;
import me.lucko.luckperms.common.storage.implementation.file.watcher.FileWatcher;
import me.lucko.luckperms.common.util.Iterators;
import me.lucko.luckperms.common.util.MoreFiles;
import me.lucko.luckperms.common.util.Uuids;

View File

@ -0,0 +1,189 @@
/*
* This file is part of LuckPerms, licensed under the MIT License.
*
* Copyright (c) lucko (Luck) <luck@lucko.me>
* Copyright (c) contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package me.lucko.luckperms.common.storage.implementation.file.watcher;
import java.io.IOException;
import java.nio.file.ClosedWatchServiceException;
import java.nio.file.FileSystem;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
/**
* Utility for "watching" for file changes using a {@link WatchService}.
*/
public abstract class AbstractFileWatcher implements AutoCloseable {
/**
* Get a {@link WatchKey} from the given {@link WatchService} in the given {@link Path directory}.
*
* @param watchService the watch service
* @param directory the directory
* @return the watch key
* @throws IOException if unable to register
*/
private static WatchKey register(WatchService watchService, Path directory) throws IOException {
return directory.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY);
}
/** The watch service */
private final WatchService service;
/** A map of all registered watch keys */
private final Map<WatchKey, Path> keys = Collections.synchronizedMap(new HashMap<>());
/** If this file watcher should discover directories */
private final boolean autoRegisterNewSubDirectories;
/** The thread currently being used to wait for & process watch events */
private AtomicReference<Thread> processingThread = new AtomicReference<>();
public AbstractFileWatcher(FileSystem fileSystem, boolean autoRegisterNewSubDirectories) throws IOException {
this.service = fileSystem.newWatchService();
this.autoRegisterNewSubDirectories = autoRegisterNewSubDirectories;
}
/**
* Register a watch key in the given directory.
*
* @param directory the directory
* @throws IOException if unable to register a key
*/
public void register(Path directory) throws IOException {
final WatchKey key = register(this.service, directory);
this.keys.put(key, directory);
}
/**
* Register a watch key recursively in the given directory.
*
* @param root the root directory
* @throws IOException if unable to register a key
*/
public void registerRecursively(Path root) throws IOException {
Files.walkFileTree(root, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
register(dir);
return super.preVisitDirectory(dir, attrs);
}
});
}
/**
* Process an observed watch event.
*
* @param event the event
* @param path the resolved event context
*/
protected abstract void processEvent(WatchEvent<Path> event, Path path);
/**
* Processes {@link WatchEvent}s from the watch service until it is closed, or until
* the thread is interrupted.
*/
public final void runEventProcessingLoop() {
if (!this.processingThread.compareAndSet(null, Thread.currentThread())) {
throw new IllegalStateException("A thread is already processing events for this watcher.");
}
while (true) {
// poll for a key from the watch service
WatchKey key;
try {
key = this.service.take();
} catch (InterruptedException e) {
e.printStackTrace();
break;
} catch (ClosedWatchServiceException e) {
break;
}
// find the directory the key is watching
Path directory = this.keys.get(key);
if (directory == null) {
key.cancel();
continue;
}
// process each watch event the key has
for (WatchEvent<?> ev : key.pollEvents()) {
@SuppressWarnings("unchecked")
WatchEvent<Path> event = (WatchEvent<Path>) ev;
Path context = event.context();
// ignore contexts with a name count of zero
if (context.getNameCount() == 0) {
continue;
}
// resolve the context of the event against the directory being watched
Path file = directory.resolve(context);
// if the file is a regular file, send the event on to be processed
if (Files.isRegularFile(file)) {
processEvent(event, file);
}
// handle recursive directory creation
if (this.autoRegisterNewSubDirectories && event.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
try {
if (Files.isDirectory(file, LinkOption.NOFOLLOW_LINKS)) {
registerRecursively(file);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
// reset the key
boolean valid = key.reset();
if (!valid) {
this.keys.remove(key);
}
}
this.processingThread.compareAndSet(Thread.currentThread(), null);
}
@Override
public void close() throws IOException {
this.service.close();
}
}

View File

@ -0,0 +1,143 @@
/*
* This file is part of LuckPerms, licensed under the MIT License.
*
* Copyright (c) lucko (Luck) <luck@lucko.me>
* Copyright (c) contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package me.lucko.luckperms.common.storage.implementation.file.watcher;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import me.lucko.luckperms.common.util.ExpiringSet;
import me.lucko.luckperms.common.util.Iterators;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.WatchEvent;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* Simple implementation of {@link AbstractFileWatcher} for LuckPerms data files.
*/
public class FileWatcher extends AbstractFileWatcher {
/** The base watched path */
private final Path basePath;
/** A map of watched locations with corresponding listeners */
private final Map<Path, WatchedLocation> watchedLocations;
public FileWatcher(LuckPermsPlugin plugin, Path basePath) throws IOException {
super(basePath.getFileSystem(), true);
this.watchedLocations = Collections.synchronizedMap(new HashMap<>());
this.basePath = basePath;
super.registerRecursively(basePath);
plugin.getBootstrap().getScheduler().executeAsync(super::runEventProcessingLoop);
}
/**
* Gets a {@link WatchedLocation} instance for a given path.
*
* @param path the path to get a watcher for
* @return the watched location
*/
public WatchedLocation getWatcher(Path path) {
if (path.isAbsolute()) {
path = this.basePath.relativize(path);
}
return this.watchedLocations.computeIfAbsent(path, WatchedLocation::new);
}
@Override
protected void processEvent(WatchEvent<Path> event, Path path) {
// get the relative path of the event
Path relativePath = this.basePath.relativize(path);
if (relativePath.getNameCount() == 0) {
return;
}
// pass the event onto all watched locations that match
for (Map.Entry<Path, WatchedLocation> entry : this.watchedLocations.entrySet()) {
if (relativePath.startsWith(entry.getKey())) {
entry.getValue().onEvent(event, relativePath);
}
}
}
/**
* Encapsulates a "watcher" in a specific directory.
*/
public static final class WatchedLocation {
/** The directory being watched by this instance. */
private final Path path;
/** A set of files which have been modified recently */
private final Set<String> recentlyModifiedFiles = new ExpiringSet<>(4, TimeUnit.SECONDS);
/** The listener callback functions */
private final List<Consumer<Path>> callbacks = new CopyOnWriteArrayList<>();
WatchedLocation(Path path) {
this.path = path;
}
void onEvent(WatchEvent<Path> event, Path path) {
// get the relative path of the modified file
Path relativePath = this.path.relativize(path);
// check if the file has been modified recently
String fileName = relativePath.toString();
if (!this.recentlyModifiedFiles.add(fileName)) {
return;
}
// pass the event onto registered listeners
Iterators.tryIterate(this.callbacks, cb -> cb.accept(relativePath));
}
/**
* Record that a file has been changed recently.
*
* @param fileName the name of the file
*/
public void recordChange(String fileName) {
this.recentlyModifiedFiles.add(fileName);
}
/**
* Register a listener.
*
* @param listener the listener
*/
public void addListener(Consumer<Path> listener) {
this.callbacks.add(listener);
}
}
}

View File

@ -28,9 +28,7 @@ package me.lucko.luckperms.common.util;
import com.github.benmanes.caffeine.cache.Cache;
import com.google.common.collect.ForwardingSet;
import org.checkerframework.checker.nullness.qual.NonNull;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -40,46 +38,11 @@ import java.util.concurrent.TimeUnit;
* @param <E> element type
*/
public class ExpiringSet<E> extends ForwardingSet<E> {
private final Cache<E, Boolean> cache;
private final Set<E> setView;
public ExpiringSet(long duration, TimeUnit unit) {
this.cache = CaffeineFactory.newBuilder().expireAfterAccess(duration, unit).build();
this.setView = this.cache.asMap().keySet();
}
@Override
public boolean add(@NonNull E element) {
this.cache.put(element, Boolean.TRUE);
// we don't care about the return value
return true;
}
@Override
public boolean addAll(@NonNull Collection<? extends E> collection) {
for (E element : collection) {
add(element);
}
// we don't care about the return value
return true;
}
@Override
public boolean remove(@NonNull Object key) {
this.cache.invalidate(key);
// we don't care about the return value
return true;
}
@Override
public boolean removeAll(@NonNull Collection<?> keys) {
this.cache.invalidateAll(keys);
// we don't care about the return value
return true;
Cache<E, Boolean> cache = CaffeineFactory.newBuilder().expireAfterAccess(duration, unit).build();
this.setView = Collections.newSetFromMap(cache.asMap());
}
@Override