Experimental and potentially unstable: async packet sending

This commit is contained in:
filoghost 2024-02-19 23:11:34 +01:00
parent ed3bbad1fb
commit 0754e0ac33
26 changed files with 248 additions and 93 deletions

View File

@ -24,6 +24,7 @@ import me.filoghost.holographicdisplays.core.placeholder.tracking.ActivePlacehol
import me.filoghost.holographicdisplays.core.tick.TickClock;
import me.filoghost.holographicdisplays.core.tick.TickingTask;
import me.filoghost.holographicdisplays.core.tracking.LineTrackerManager;
import me.filoghost.holographicdisplays.core.tracking.PacketSenderExecutor;
import me.filoghost.holographicdisplays.nms.common.NMSManager;
import org.bukkit.Bukkit;
import org.bukkit.entity.Player;
@ -47,6 +48,8 @@ public class HolographicDisplaysCore {
throw new PluginEnableException(t, "Couldn't initialize the NMS manager.");
}
PacketSenderExecutor.start();
PlaceholderRegistry placeholderRegistry = new PlaceholderRegistry();
TickClock tickClock = new TickClock();
ActivePlaceholderTracker placeholderTracker = new ActivePlaceholderTracker(placeholderRegistry, tickClock);
@ -103,6 +106,8 @@ public class HolographicDisplaysCore {
nmsManager.uninjectPacketListener(player);
}
}
PacketSenderExecutor.stopGracefully();
}
}

View File

@ -85,7 +85,11 @@ public abstract class ClickableLineTracker<T extends Viewer> extends LineTracker
@Override
protected void sendSpawnPackets(Viewers<T> viewers) {
if (spawnClickableEntity) {
viewers.sendPackets(clickableEntity.newSpawnPackets(getClickableEntityPosition()));
// Copy for async use
PositionCoordinates clickableEntityPosition = getClickableEntityPosition();
PacketSenderExecutor.execute(() -> {
viewers.sendPackets(clickableEntity.newSpawnPackets(clickableEntityPosition));
});
}
}
@ -93,7 +97,9 @@ public abstract class ClickableLineTracker<T extends Viewer> extends LineTracker
@Override
protected void sendDestroyPackets(Viewers<T> viewers) {
if (spawnClickableEntity) {
PacketSenderExecutor.execute(() -> {
viewers.sendPackets(clickableEntity.newDestroyPackets());
});
}
}
@ -104,9 +110,15 @@ public abstract class ClickableLineTracker<T extends Viewer> extends LineTracker
if (spawnClickableEntityChanged) {
if (spawnClickableEntity) {
viewers.sendPackets(clickableEntity.newSpawnPackets(getClickableEntityPosition()));
// Copy for async use
PositionCoordinates clickableEntityPosition = getClickableEntityPosition();
PacketSenderExecutor.execute(() -> {
viewers.sendPackets(clickableEntity.newSpawnPackets(clickableEntityPosition));
});
} else {
PacketSenderExecutor.execute(() -> {
viewers.sendPackets(clickableEntity.newDestroyPackets());
});
}
}
}
@ -115,7 +127,11 @@ public abstract class ClickableLineTracker<T extends Viewer> extends LineTracker
@Override
protected void sendPositionChangePackets(Viewers<T> viewers) {
if (spawnClickableEntity) {
viewers.sendPackets(clickableEntity.newTeleportPackets(getClickableEntityPosition()));
// Copy for async use
PositionCoordinates clickableEntityPosition = getClickableEntityPosition();
PacketSenderExecutor.execute(() -> {
viewers.sendPackets(clickableEntity.newTeleportPackets(clickableEntityPosition));
});
}
}

View File

@ -5,6 +5,7 @@
*/
package me.filoghost.holographicdisplays.core.tracking;
import me.filoghost.holographicdisplays.common.PositionCoordinates;
import me.filoghost.holographicdisplays.core.base.BaseItemHologramLine;
import me.filoghost.holographicdisplays.core.listener.LineClickListener;
import me.filoghost.holographicdisplays.core.tick.CachedPlayer;
@ -106,7 +107,12 @@ public class ItemLineTracker extends ClickableLineTracker<Viewer> {
super.sendSpawnPackets(viewers);
if (spawnItemEntity) {
// Copy for async use
PositionCoordinates positionCoordinates = this.positionCoordinates;
ItemStack itemStack = this.itemStack;
PacketSenderExecutor.execute(() -> {
viewers.sendPackets(itemEntity.newSpawnPackets(positionCoordinates, itemStack));
});
}
}
@ -116,7 +122,9 @@ public class ItemLineTracker extends ClickableLineTracker<Viewer> {
super.sendDestroyPackets(viewers);
if (spawnItemEntity) {
PacketSenderExecutor.execute(() -> {
viewers.sendPackets(itemEntity.newDestroyPackets());
});
}
}
@ -127,13 +135,24 @@ public class ItemLineTracker extends ClickableLineTracker<Viewer> {
if (spawnItemEntityChanged) {
if (spawnItemEntity) {
// Copy for async use
PositionCoordinates positionCoordinates = this.positionCoordinates;
ItemStack itemStack = this.itemStack;
PacketSenderExecutor.execute(() -> {
viewers.sendPackets(itemEntity.newSpawnPackets(positionCoordinates, itemStack));
});
} else {
PacketSenderExecutor.execute(() -> {
viewers.sendPackets(itemEntity.newDestroyPackets());
});
}
} else if (itemStackChanged) {
// Only send item changes if full spawn/destroy packets were not sent
// Copy for async use
ItemStack itemStack = this.itemStack;
PacketSenderExecutor.execute(() -> {
viewers.sendPackets(itemEntity.newChangePackets(itemStack));
});
}
}
@ -143,7 +162,11 @@ public class ItemLineTracker extends ClickableLineTracker<Viewer> {
super.sendPositionChangePackets(viewers);
if (spawnItemEntity) {
// Copy for async use
PositionCoordinates positionCoordinates = this.positionCoordinates;
PacketSenderExecutor.execute(() -> {
viewers.sendPackets(itemEntity.newTeleportPackets(positionCoordinates));
});
}
}

View File

@ -14,14 +14,14 @@ import org.bukkit.entity.Player;
import org.jetbrains.annotations.MustBeInvokedByOverriders;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public abstract class LineTracker<T extends Viewer> {
private final Map<Player, T> viewers;
private final ConcurrentMap<Player, T> viewers;
private final Viewers<T> iterableViewers;
private String positionWorldName;
@ -32,7 +32,7 @@ public abstract class LineTracker<T extends Viewer> {
private int lastVisibilitySettingsVersion;
protected LineTracker() {
this.viewers = new HashMap<>();
this.viewers = new ConcurrentHashMap<>();
this.iterableViewers = new DelegateViewers<>(viewers.values());
}

View File

@ -15,7 +15,7 @@ public class MutableViewers<T extends Viewer> implements Viewers<T> {
private T viewer;
private List<T> additionalViewers;
public void add(T viewer) {
public synchronized void add(T viewer) {
if (this.viewer == null) {
this.viewer = viewer;
} else {
@ -27,7 +27,7 @@ public class MutableViewers<T extends Viewer> implements Viewers<T> {
}
@Override
public void forEach(Consumer<? super T> action) {
public synchronized void forEach(Consumer<? super T> action) {
if (viewer != null) {
action.accept(viewer);
if (additionalViewers != null) {

View File

@ -0,0 +1,69 @@
/*
* Copyright (C) filoghost and contributors
*
* SPDX-License-Identifier: GPL-3.0-or-later
*/
package me.filoghost.holographicdisplays.core.tracking;
import me.filoghost.fcommons.logging.Log;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* This is a quick but ugly helper class for creating and sending packets async.
* Static classes like this should be avoided.
*/
public class PacketSenderExecutor {
private static volatile BlockingQueue<Runnable> tasks;
private static volatile Thread thread;
private static final Runnable STOP_MARKER_TASK = () -> {};
static {
tasks = new LinkedBlockingQueue<>();
thread = new Thread(() -> {
while (true) {
try {
Runnable task = tasks.take();
task.run();
} catch (Throwable t) {
Log.severe("Error in packet sender task", t);
}
}
});
thread.setName("Holographic Displays async packets");
thread.start();
}
public static void execute(Runnable task) {
tasks.add(task);
}
public static void start() {
tasks = new LinkedBlockingQueue<>();
thread = new Thread(() -> {
while (true) {
try {
Runnable task = tasks.take();
if (task == STOP_MARKER_TASK) {
return;
}
task.run();
} catch (Throwable t) {
Log.severe("Error in packet sender task", t);
}
}
});
thread.setName("Holographic Displays packet sender");
thread.start();
}
public static void stopGracefully() {
if (tasks != null) {
tasks.add(STOP_MARKER_TASK);
}
}
}

View File

@ -5,6 +5,7 @@
*/
package me.filoghost.holographicdisplays.core.tracking;
import me.filoghost.holographicdisplays.common.PositionCoordinates;
import me.filoghost.holographicdisplays.core.base.BaseTextHologramLine;
import me.filoghost.holographicdisplays.core.listener.LineClickListener;
import me.filoghost.holographicdisplays.core.placeholder.tracking.ActivePlaceholderTracker;
@ -84,15 +85,22 @@ public class TextLineTracker extends ClickableLineTracker<TextLineViewer> {
protected void sendSpawnPackets(Viewers<TextLineViewer> viewers) {
super.sendSpawnPackets(viewers);
// Copy for async use
PositionCoordinates positionCoordinates = this.positionCoordinates;
viewers.forEach(TextLineViewer::updateNextTextToSend);
PacketSenderExecutor.execute(() -> {
IndividualTextPacketGroup spawnPackets = textEntity.newSpawnPackets(positionCoordinates);
viewers.forEach(viewer -> viewer.sendTextPackets(spawnPackets));
});
}
@MustBeInvokedByOverriders
@Override
protected void sendDestroyPackets(Viewers<TextLineViewer> viewers) {
super.sendDestroyPackets(viewers);
PacketSenderExecutor.execute(() -> {
viewers.sendPackets(textEntity.newDestroyPackets());
});
}
@Override
@ -100,8 +108,11 @@ public class TextLineTracker extends ClickableLineTracker<TextLineViewer> {
super.sendChangesPackets(viewers);
if (displayTextChanged) {
viewers.forEach(TextLineViewer::updateNextTextToSend);
PacketSenderExecutor.execute(() -> {
IndividualTextPacketGroup changePackets = textEntity.newChangePackets();
viewers.forEach(viewer -> viewer.sendTextPacketsIfNecessary(changePackets));
});
}
}
@ -109,7 +120,11 @@ public class TextLineTracker extends ClickableLineTracker<TextLineViewer> {
@Override
protected void sendPositionChangePackets(Viewers<TextLineViewer> viewers) {
super.sendPositionChangePackets(viewers);
// Copy for async use
PositionCoordinates positionCoordinates = this.positionCoordinates;
PacketSenderExecutor.execute(() -> {
viewers.sendPackets(textEntity.newTeleportPackets(positionCoordinates));
});
}
@Override

View File

@ -5,9 +5,8 @@
*/
package me.filoghost.holographicdisplays.core.tracking;
import me.filoghost.holographicdisplays.nms.common.IndividualTextPacketGroup;
import me.filoghost.holographicdisplays.core.tick.CachedPlayer;
import org.jetbrains.annotations.Nullable;
import me.filoghost.holographicdisplays.nms.common.IndividualTextPacketGroup;
import java.util.Objects;
@ -15,8 +14,10 @@ class TextLineViewer extends Viewer {
private final DisplayText displayText;
// Access to these variables must be synchronized, they are accessed from multiple threads
private String individualText;
private String lastSentText;
private String nextTextToSend;
TextLineViewer(CachedPlayer player, DisplayText displayText) {
super(player);
@ -24,34 +25,41 @@ class TextLineViewer extends Viewer {
}
public void sendTextPackets(IndividualTextPacketGroup packets) {
String text = getOrComputeText();
String text;
synchronized (this) {
text = nextTextToSend;
this.lastSentText = text;
}
sendIndividualPackets(packets, text);
}
public void sendTextPacketsIfNecessary(IndividualTextPacketGroup packets) {
String text = getOrComputeText();
String text;
synchronized (this) {
text = nextTextToSend;
if (Objects.equals(lastSentText, text)) {
return; // Avoid sending unnecessary packets
}
this.lastSentText = text;
}
sendIndividualPackets(packets, text);
}
private @Nullable String getOrComputeText() {
public synchronized void updateNextTextToSend() {
if (displayText.containsIndividualPlaceholders()) {
if (individualText == null) {
individualText = displayText.computeIndividualText(this);
}
return individualText;
nextTextToSend = individualText;
} else {
individualText = null;
return displayText.getGlobalText();
nextTextToSend = displayText.getGlobalText();
}
}
public boolean updateIndividualText() {
String individualText = displayText.computeIndividualText(this);
synchronized (this) {
if (!Objects.equals(this.individualText, individualText)) {
this.individualText = individualText;
return true;
@ -59,5 +67,6 @@ class TextLineViewer extends Viewer {
return false;
}
}
}
}

View File

@ -14,13 +14,14 @@ import java.util.UUID;
class PacketByteBuffer {
private static final PacketByteBuffer INSTANCE = new PacketByteBuffer();
private static final ThreadLocal<PacketByteBuffer> LOCAL_INSTANCE = ThreadLocal.withInitial(PacketByteBuffer::new);
private final PacketDataSerializer serializer;
static PacketByteBuffer get() {
INSTANCE.clear();
return INSTANCE;
PacketByteBuffer instance = LOCAL_INSTANCE.get();
instance.clear();
return instance;
}
private PacketByteBuffer() {

View File

@ -14,13 +14,14 @@ import java.util.UUID;
class PacketByteBuffer {
private static final PacketByteBuffer INSTANCE = new PacketByteBuffer();
private static final ThreadLocal<PacketByteBuffer> LOCAL_INSTANCE = ThreadLocal.withInitial(PacketByteBuffer::new);
private final PacketDataSerializer serializer;
static PacketByteBuffer get() {
INSTANCE.clear();
return INSTANCE;
PacketByteBuffer instance = LOCAL_INSTANCE.get();
instance.clear();
return instance;
}
private PacketByteBuffer() {

View File

@ -14,13 +14,14 @@ import java.util.UUID;
class PacketByteBuffer {
private static final PacketByteBuffer INSTANCE = new PacketByteBuffer();
private static final ThreadLocal<PacketByteBuffer> LOCAL_INSTANCE = ThreadLocal.withInitial(PacketByteBuffer::new);
private final PacketDataSerializer serializer;
static PacketByteBuffer get() {
INSTANCE.clear();
return INSTANCE;
PacketByteBuffer instance = LOCAL_INSTANCE.get();
instance.clear();
return instance;
}
private PacketByteBuffer() {

View File

@ -14,13 +14,14 @@ import java.util.UUID;
class PacketByteBuffer {
private static final PacketByteBuffer INSTANCE = new PacketByteBuffer();
private static final ThreadLocal<PacketByteBuffer> LOCAL_INSTANCE = ThreadLocal.withInitial(PacketByteBuffer::new);
private final PacketDataSerializer serializer;
static PacketByteBuffer get() {
INSTANCE.clear();
return INSTANCE;
PacketByteBuffer instance = LOCAL_INSTANCE.get();
instance.clear();
return instance;
}
private PacketByteBuffer() {

View File

@ -14,13 +14,14 @@ import java.util.UUID;
class PacketByteBuffer {
private static final PacketByteBuffer INSTANCE = new PacketByteBuffer();
private static final ThreadLocal<PacketByteBuffer> LOCAL_INSTANCE = ThreadLocal.withInitial(PacketByteBuffer::new);
private final PacketDataSerializer serializer;
static PacketByteBuffer get() {
INSTANCE.clear();
return INSTANCE;
PacketByteBuffer instance = LOCAL_INSTANCE.get();
instance.clear();
return instance;
}
private PacketByteBuffer() {

View File

@ -14,13 +14,14 @@ import java.util.UUID;
class PacketByteBuffer {
private static final PacketByteBuffer INSTANCE = new PacketByteBuffer();
private static final ThreadLocal<PacketByteBuffer> LOCAL_INSTANCE = ThreadLocal.withInitial(PacketByteBuffer::new);
private final PacketDataSerializer serializer;
static PacketByteBuffer get() {
INSTANCE.clear();
return INSTANCE;
PacketByteBuffer instance = LOCAL_INSTANCE.get();
instance.clear();
return instance;
}
private PacketByteBuffer() {

View File

@ -14,13 +14,14 @@ import java.util.UUID;
class PacketByteBuffer {
private static final PacketByteBuffer INSTANCE = new PacketByteBuffer();
private static final ThreadLocal<PacketByteBuffer> LOCAL_INSTANCE = ThreadLocal.withInitial(PacketByteBuffer::new);
private final PacketDataSerializer serializer;
static PacketByteBuffer get() {
INSTANCE.clear();
return INSTANCE;
PacketByteBuffer instance = LOCAL_INSTANCE.get();
instance.clear();
return instance;
}
void writeBoolean(boolean flag) {

View File

@ -12,13 +12,14 @@ import java.util.UUID;
class PacketByteBuffer {
private static final PacketByteBuffer INSTANCE = new PacketByteBuffer();
private static final ThreadLocal<PacketByteBuffer> LOCAL_INSTANCE = ThreadLocal.withInitial(PacketByteBuffer::new);
private final PacketDataSerializer serializer;
static PacketByteBuffer get() {
INSTANCE.clear();
return INSTANCE;
PacketByteBuffer instance = LOCAL_INSTANCE.get();
instance.clear();
return instance;
}
private PacketByteBuffer() {

View File

@ -12,13 +12,14 @@ import java.util.UUID;
class PacketByteBuffer {
private static final PacketByteBuffer INSTANCE = new PacketByteBuffer();
private static final ThreadLocal<PacketByteBuffer> LOCAL_INSTANCE = ThreadLocal.withInitial(PacketByteBuffer::new);
private final PacketDataSerializer serializer;
static PacketByteBuffer get() {
INSTANCE.clear();
return INSTANCE;
PacketByteBuffer instance = LOCAL_INSTANCE.get();
instance.clear();
return instance;
}
private PacketByteBuffer() {

View File

@ -12,13 +12,14 @@ import java.util.UUID;
class PacketByteBuffer {
private static final PacketByteBuffer INSTANCE = new PacketByteBuffer();
private static final ThreadLocal<PacketByteBuffer> LOCAL_INSTANCE = ThreadLocal.withInitial(PacketByteBuffer::new);
private final PacketDataSerializer serializer;
static PacketByteBuffer get() {
INSTANCE.clear();
return INSTANCE;
PacketByteBuffer instance = LOCAL_INSTANCE.get();
instance.clear();
return instance;
}
private PacketByteBuffer() {

View File

@ -12,13 +12,14 @@ import java.util.UUID;
class PacketByteBuffer {
private static final PacketByteBuffer INSTANCE = new PacketByteBuffer();
private static final ThreadLocal<PacketByteBuffer> LOCAL_INSTANCE = ThreadLocal.withInitial(PacketByteBuffer::new);
private final PacketDataSerializer serializer;
static PacketByteBuffer get() {
INSTANCE.clear();
return INSTANCE;
PacketByteBuffer instance = LOCAL_INSTANCE.get();
instance.clear();
return instance;
}
private PacketByteBuffer() {

View File

@ -12,13 +12,14 @@ import java.util.UUID;
class PacketByteBuffer {
private static final PacketByteBuffer INSTANCE = new PacketByteBuffer();
private static final ThreadLocal<PacketByteBuffer> LOCAL_INSTANCE = ThreadLocal.withInitial(PacketByteBuffer::new);
private final PacketDataSerializer serializer;
static PacketByteBuffer get() {
INSTANCE.clear();
return INSTANCE;
PacketByteBuffer instance = LOCAL_INSTANCE.get();
instance.clear();
return instance;
}
private PacketByteBuffer() {

View File

@ -12,13 +12,14 @@ import java.util.UUID;
class PacketByteBuffer {
private static final PacketByteBuffer INSTANCE = new PacketByteBuffer();
private static final ThreadLocal<PacketByteBuffer> LOCAL_INSTANCE = ThreadLocal.withInitial(PacketByteBuffer::new);
private final PacketDataSerializer serializer;
static PacketByteBuffer get() {
INSTANCE.clear();
return INSTANCE;
PacketByteBuffer instance = LOCAL_INSTANCE.get();
instance.clear();
return instance;
}
private PacketByteBuffer() {

View File

@ -12,13 +12,14 @@ import java.util.UUID;
class PacketByteBuffer {
private static final PacketByteBuffer INSTANCE = new PacketByteBuffer();
private static final ThreadLocal<PacketByteBuffer> LOCAL_INSTANCE = ThreadLocal.withInitial(PacketByteBuffer::new);
private final PacketDataSerializer serializer;
static PacketByteBuffer get() {
INSTANCE.clear();
return INSTANCE;
PacketByteBuffer instance = LOCAL_INSTANCE.get();
instance.clear();
return instance;
}
private PacketByteBuffer() {

View File

@ -12,13 +12,14 @@ import java.util.UUID;
class PacketByteBuffer {
private static final PacketByteBuffer INSTANCE = new PacketByteBuffer();
private static final ThreadLocal<PacketByteBuffer> LOCAL_INSTANCE = ThreadLocal.withInitial(PacketByteBuffer::new);
private final PacketDataSerializer serializer;
static PacketByteBuffer get() {
INSTANCE.clear();
return INSTANCE;
PacketByteBuffer instance = LOCAL_INSTANCE.get();
instance.clear();
return instance;
}
private PacketByteBuffer() {

View File

@ -12,13 +12,14 @@ import java.util.UUID;
class PacketByteBuffer {
private static final PacketByteBuffer INSTANCE = new PacketByteBuffer();
private static final ThreadLocal<PacketByteBuffer> LOCAL_INSTANCE = ThreadLocal.withInitial(PacketByteBuffer::new);
private final PacketDataSerializer serializer;
static PacketByteBuffer get() {
INSTANCE.clear();
return INSTANCE;
PacketByteBuffer instance = LOCAL_INSTANCE.get();
instance.clear();
return instance;
}
private PacketByteBuffer() {

View File

@ -14,13 +14,14 @@ import java.io.IOException;
class PacketByteBuffer {
private static final PacketByteBuffer INSTANCE = new PacketByteBuffer();
private static final ThreadLocal<PacketByteBuffer> LOCAL_INSTANCE = ThreadLocal.withInitial(PacketByteBuffer::new);
private final PacketDataSerializer serializer;
static PacketByteBuffer get() {
INSTANCE.clear();
return INSTANCE;
PacketByteBuffer instance = LOCAL_INSTANCE.get();
instance.clear();
return instance;
}
private PacketByteBuffer() {

View File

@ -14,13 +14,14 @@ import java.util.UUID;
class PacketByteBuffer {
private static final PacketByteBuffer INSTANCE = new PacketByteBuffer();
private static final ThreadLocal<PacketByteBuffer> LOCAL_INSTANCE = ThreadLocal.withInitial(PacketByteBuffer::new);
private final PacketDataSerializer serializer;
static PacketByteBuffer get() {
INSTANCE.clear();
return INSTANCE;
PacketByteBuffer instance = LOCAL_INSTANCE.get();
instance.clear();
return instance;
}
private PacketByteBuffer() {