Send a different type of update ping for user changes, and only apply the change if the user is loaded

This commit is contained in:
Luck 2017-12-28 13:37:55 +00:00
parent 3d6aa69ca1
commit 3413adf16f
No known key found for this signature in database
GPG Key ID: EFA9B3EC5FD90F8B
5 changed files with 138 additions and 40 deletions

View File

@ -35,8 +35,9 @@ public interface MessagingService {
/**
* Uses the messaging service to inform other servers about changes.
*
* <p>This will push the update asynchronously, and this method will return immediately. Calling this method is
* equivalent to running "/lp networksync", except will not sync this server.</p>
* <p>This will push the update asynchronously, and this method will return
* immediately. Calling this method is equivalent to running "/lp networksync",
* except will not sync this server.</p>
*/
void pushUpdate();

View File

@ -349,10 +349,10 @@ public class ExtendedLogEntry implements LogEntry {
}
}
public static JsonObject serializeWithId(UUID id, LogEntry entry) {
public static JsonObject serializeWithId(String id, LogEntry entry) {
JsonObject data = new JsonObject();
data.add("id", new JsonPrimitive(id.toString()));
data.add("id", new JsonPrimitive(id));
data.add("actor", new JsonPrimitive(entry.getActor().toString()));
data.add("actorName", new JsonPrimitive(entry.getActorName()));
data.add("type", new JsonPrimitive(entry.getType().name()));
@ -365,10 +365,10 @@ public class ExtendedLogEntry implements LogEntry {
return data;
}
public static Map.Entry<UUID, ExtendedLogEntry> deserialize(JsonObject object) {
public static Map.Entry<String, ExtendedLogEntry> deserialize(JsonObject object) {
ExtendedLogEntryBuilder builder = build();
UUID id = UUID.fromString(object.get("id").getAsString());
String id = object.get("id").getAsString();
builder.actor(UUID.fromString(object.get("actor").getAsString()));
builder.actorName(object.get("actorName").getAsString());

View File

@ -190,7 +190,7 @@ public abstract class SubCommand<T> extends Command<T, Void> {
if (!sender.isImport()) {
Optional<ExtendedMessagingService> messagingService = plugin.getMessagingService();
if (messagingService.isPresent() && plugin.getConfiguration().get(ConfigKeys.AUTO_PUSH_UPDATES)) {
messagingService.get().getUpdateBuffer().request();
messagingService.get().pushUserUpdate(user);
}
}
}

View File

@ -27,6 +27,7 @@ package me.lucko.luckperms.common.messaging;
import lombok.Getter;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
@ -34,8 +35,11 @@ import me.lucko.luckperms.api.LogEntry;
import me.lucko.luckperms.common.actionlog.ExtendedLogEntry;
import me.lucko.luckperms.common.buffers.BufferedRequest;
import me.lucko.luckperms.common.config.ConfigKeys;
import me.lucko.luckperms.common.model.User;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
@ -49,6 +53,10 @@ import java.util.function.Consumer;
public abstract class AbstractMessagingService implements ExtendedMessagingService {
protected static final String CHANNEL = "lpuc";
private static final String UPDATE_HEADER = "update:";
private static final String USER_UPDATE_HEADER = "userupdate:";
private static final String LOG_HEADER = "log";
@Getter
private final LuckPermsPlugin plugin;
@ -72,19 +80,21 @@ public abstract class AbstractMessagingService implements ExtendedMessagingServi
protected abstract void sendMessage(String message);
protected void onMessage(String msg, Consumer<String> callback) {
if (msg.startsWith("update:") && msg.length() > "update:".length()) {
UUID uuid = parseUpdateMessage(msg);
if (uuid == null) {
if (msg.startsWith(UPDATE_HEADER) && msg.length() > UPDATE_HEADER.length()) {
String content = msg.substring(UPDATE_HEADER.length());
UUID requestId = uuidFromString(content);
if (requestId == null) {
return;
}
if (!receivedMessages.add(uuid)) {
if (!receivedMessages.add(requestId)) {
return;
}
plugin.getLog().info("[" + name + " Messaging] Received update ping with id: " + uuid.toString());
plugin.getLog().info("[" + name + " Messaging] Received update ping with id: " + requestId.toString());
if (plugin.getApiProvider().getEventFactory().handleNetworkPreSync(false, uuid)) {
if (plugin.getApiProvider().getEventFactory().handleNetworkPreSync(false, requestId)) {
return;
}
@ -94,25 +104,62 @@ public abstract class AbstractMessagingService implements ExtendedMessagingServi
callback.accept(msg);
}
} else if (msg.startsWith("log:") && msg.length() > "log:".length()) {
String logData = msg.substring("log:".length());
Map.Entry<UUID, ExtendedLogEntry> entry = null;
try {
entry = ExtendedLogEntry.deserialize(gson.fromJson(logData, JsonObject.class));
} catch (Exception e) {
plugin.getLog().warn("Error whilst deserializing log: " + logData);
e.printStackTrace();
}
} else if (msg.startsWith(USER_UPDATE_HEADER) && msg.length() > USER_UPDATE_HEADER.length()) {
String content = msg.substring(USER_UPDATE_HEADER.length());
Map.Entry<UUID, UUID> entry = uuidsFromString(content);
if (entry == null) {
return;
}
if (!receivedMessages.add(entry.getKey())) {
UUID requestId = entry.getKey();
UUID userUuid = entry.getValue();
if (!receivedMessages.add(requestId)) {
return;
}
plugin.getApiProvider().getEventFactory().handleLogReceive(entry.getKey(), entry.getValue());
User user = plugin.getUserManager().getIfLoaded(userUuid);
if (user == null) {
return;
}
plugin.getLog().info("[" + name + " Messaging] Received user update ping for '" + user.getFriendlyName() + "' with id: " + requestId.toString());
if (plugin.getApiProvider().getEventFactory().handleNetworkPreSync(false, requestId)) {
return;
}
plugin.getStorage().loadUser(user.getUuid(), null);
if (callback != null) {
callback.accept(msg);
}
} else if (msg.startsWith(LOG_HEADER) && msg.length() > LOG_HEADER.length()) {
String content = msg.substring(LOG_HEADER.length());
Map.Entry<String, ExtendedLogEntry> entry;
try {
entry = ExtendedLogEntry.deserialize(gson.fromJson(content, JsonObject.class));
} catch (Exception e) {
return;
}
if (entry.getKey() == null) {
return;
}
UUID requestId = uuidFromString(entry.getKey());
if (requestId == null) {
return;
}
if (!receivedMessages.add(requestId)) {
return;
}
plugin.getApiProvider().getEventFactory().handleLogReceive(requestId, entry.getValue());
plugin.getLogDispatcher().dispatchFromRemote(entry.getValue());
if (callback != null) {
@ -122,26 +169,39 @@ public abstract class AbstractMessagingService implements ExtendedMessagingServi
}
@Override
public void pushLog(LogEntry logEntry) {
public void pushUpdate() {
plugin.getScheduler().doAsync(() -> {
UUID id = generatePingId();
UUID requestId = generatePingId();
String strId = uuidToString(requestId);
if (plugin.getApiProvider().getEventFactory().handleLogNetworkPublish(!plugin.getConfiguration().get(ConfigKeys.PUSH_LOG_ENTRIES), id, logEntry)) {
return;
}
plugin.getLog().info("[" + name + " Messaging] Sending log with id: " + id.toString());
sendMessage("log:" + gson.toJson(ExtendedLogEntry.serializeWithId(id, logEntry)));
plugin.getLog().info("[" + name + " Messaging] Sending ping with id: " + strId);
sendMessage("update:" + strId);
});
}
@Override
public void pushUpdate() {
public void pushUserUpdate(User user) {
plugin.getScheduler().doAsync(() -> {
UUID id = generatePingId();
plugin.getLog().info("[" + name + " Messaging] Sending ping with id: " + id.toString());
UUID requestId = generatePingId();
String strId = uuidToString(requestId);
sendMessage("update:" + id.toString());
plugin.getLog().info("[" + name + " Messaging] Sending user ping for '" + user.getFriendlyName() + "' with id: " + strId);
sendMessage("userupdate:" + uuidsToString(requestId, user.getUuid()));
});
}
@Override
public void pushLog(LogEntry logEntry) {
plugin.getScheduler().doAsync(() -> {
UUID requestId = generatePingId();
String strId = uuidToString(requestId);
if (plugin.getApiProvider().getEventFactory().handleLogNetworkPublish(!plugin.getConfiguration().get(ConfigKeys.PUSH_LOG_ENTRIES), requestId, logEntry)) {
return;
}
plugin.getLog().info("[" + name + " Messaging] Sending log with id: " + strId);
sendMessage("log:" + gson.toJson(ExtendedLogEntry.serializeWithId(strId, logEntry)));
});
}
@ -151,10 +211,39 @@ public abstract class AbstractMessagingService implements ExtendedMessagingServi
return uuid;
}
private static UUID parseUpdateMessage(String msg) {
String requestId = msg.substring("update:".length());
private static String uuidToString(UUID uuid) {
ByteBuffer buf = ByteBuffer.allocate(Long.BYTES * 2);
buf.putLong(uuid.getMostSignificantBits());
buf.putLong(uuid.getLeastSignificantBits());
return Base64.getEncoder().encodeToString(buf.array());
}
private static UUID uuidFromString(String s) {
try {
return UUID.fromString(requestId);
byte[] bytes = Base64.getDecoder().decode(s);
ByteBuffer buf = ByteBuffer.wrap(bytes);
return new UUID(buf.getLong(), buf.getLong());
} catch (IllegalArgumentException e) {
return null;
}
}
private static String uuidsToString(UUID uuid1, UUID uuid2) {
ByteBuffer buf = ByteBuffer.allocate(Long.BYTES * 4);
buf.putLong(uuid1.getMostSignificantBits());
buf.putLong(uuid1.getLeastSignificantBits());
buf.putLong(uuid2.getMostSignificantBits());
buf.putLong(uuid2.getLeastSignificantBits());
return Base64.getEncoder().encodeToString(buf.array());
}
private static Map.Entry<UUID, UUID> uuidsFromString(String s) {
try {
byte[] bytes = Base64.getDecoder().decode(s);
ByteBuffer buf = ByteBuffer.wrap(bytes);
UUID uuid1 = new UUID(buf.getLong(), buf.getLong());
UUID uuid2 = new UUID(buf.getLong(), buf.getLong());
return Maps.immutableEntry(uuid1, uuid2);
} catch (IllegalArgumentException e) {
return null;
}
@ -162,7 +251,7 @@ public abstract class AbstractMessagingService implements ExtendedMessagingServi
private final class PushUpdateBuffer extends BufferedRequest<Void> {
public PushUpdateBuffer(LuckPermsPlugin plugin) {
super(3000L, 200L, plugin.getScheduler().async());
super(2000L, 200L, plugin.getScheduler().async());
}
@Override

View File

@ -28,6 +28,7 @@ package me.lucko.luckperms.common.messaging;
import me.lucko.luckperms.api.LogEntry;
import me.lucko.luckperms.api.MessagingService;
import me.lucko.luckperms.common.buffers.BufferedRequest;
import me.lucko.luckperms.common.model.User;
public interface ExtendedMessagingService extends MessagingService {
@ -50,6 +51,13 @@ public interface ExtendedMessagingService extends MessagingService {
*/
BufferedRequest<Void> getUpdateBuffer();
/**
* Pushes an update for a specific user.
*
* @param user the user
*/
void pushUserUpdate(User user);
/**
* Pushes a log entry to connected servers.
*