Replace more instances with agnostic scheduler

This commit is contained in:
Dan Mulloy 2023-06-15 14:40:45 -05:00
parent 65a9ef5acf
commit a6122cbd24
No known key found for this signature in database
GPG Key ID: F379C293F178751F
12 changed files with 191 additions and 34 deletions

View File

@ -25,6 +25,10 @@ import com.comphenix.protocol.error.ReportType;
import com.comphenix.protocol.injector.InternalManager; import com.comphenix.protocol.injector.InternalManager;
import com.comphenix.protocol.injector.PacketFilterManager; import com.comphenix.protocol.injector.PacketFilterManager;
import com.comphenix.protocol.metrics.Statistics; import com.comphenix.protocol.metrics.Statistics;
import com.comphenix.protocol.scheduler.DefaultScheduler;
import com.comphenix.protocol.scheduler.FoliaScheduler;
import com.comphenix.protocol.scheduler.ProtocolScheduler;
import com.comphenix.protocol.scheduler.Task;
import com.comphenix.protocol.updater.Updater; import com.comphenix.protocol.updater.Updater;
import com.comphenix.protocol.updater.Updater.UpdateType; import com.comphenix.protocol.updater.Updater.UpdateType;
import com.comphenix.protocol.utility.*; import com.comphenix.protocol.utility.*;
@ -34,6 +38,7 @@ import com.google.common.collect.Iterables;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.HashSet; import java.util.HashSet;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.logging.Handler; import java.util.logging.Handler;
@ -90,13 +95,12 @@ public class ProtocolLib extends JavaPlugin {
// these fields are only existing once, we can make them static // these fields are only existing once, we can make them static
private static Logger logger; private static Logger logger;
private static ProtocolConfig config; private static ProtocolConfig config;
private static InternalManager protocolManager; private static InternalManager protocolManager;
private static ErrorReporter reporter = new BasicErrorReporter(); private static ErrorReporter reporter = new BasicErrorReporter();
private Statistics statistics; private Statistics statistics;
private int packetTask = -1; private Task packetTask = null;
private int tickCounter = 0; private int tickCounter = 0;
private int configExpectedMod = -1; private int configExpectedMod = -1;
@ -104,6 +108,8 @@ public class ProtocolLib extends JavaPlugin {
private Updater updater; private Updater updater;
private Handler redirectHandler; private Handler redirectHandler;
private ProtocolScheduler scheduler;
// commands // commands
private CommandProtocol commandProtocol; private CommandProtocol commandProtocol;
private CommandPacket commandPacket; private CommandPacket commandPacket;
@ -155,6 +161,10 @@ public class ProtocolLib extends JavaPlugin {
} }
try { try {
this.scheduler = Util.isUsingFolia()
? new FoliaScheduler(this)
: new DefaultScheduler(this);
// Check for other versions // Check for other versions
this.checkConflictingVersions(); this.checkConflictingVersions();
@ -171,7 +181,7 @@ public class ProtocolLib extends JavaPlugin {
.minecraftVersion(version) .minecraftVersion(version)
.reporter(reporter) .reporter(reporter)
.build(); .build();
ProtocolLibrary.init(this, config, protocolManager, reporter); ProtocolLibrary.init(this, config, protocolManager, scheduler, reporter);
// Setup error reporter // Setup error reporter
detailedReporter.addGlobalParameter("manager", protocolManager); detailedReporter.addGlobalParameter("manager", protocolManager);
@ -483,12 +493,12 @@ public class ProtocolLib extends JavaPlugin {
private void createPacketTask(Server server) { private void createPacketTask(Server server) {
try { try {
if (this.packetTask >= 0) { if (this.packetTask != null) {
throw new IllegalStateException("Packet task has already been created"); throw new IllegalStateException("Packet task has already been created");
} }
// Attempt to create task // Attempt to create task
this.packetTask = SchedulerUtil.scheduleSyncRepeatingTask(this, () -> { this.packetTask = scheduler.scheduleSyncRepeatingTask(() -> {
AsyncFilterManager manager = (AsyncFilterManager) protocolManager.getAsynchronousManager(); AsyncFilterManager manager = (AsyncFilterManager) protocolManager.getAsynchronousManager();
// We KNOW we're on the main thread at the moment // We KNOW we're on the main thread at the moment
manager.sendProcessedPackets(ProtocolLib.this.tickCounter++, true); manager.sendProcessedPackets(ProtocolLib.this.tickCounter++, true);
@ -504,7 +514,7 @@ public class ProtocolLib extends JavaPlugin {
} catch (OutOfMemoryError e) { } catch (OutOfMemoryError e) {
throw e; throw e;
} catch (Throwable e) { } catch (Throwable e) {
if (this.packetTask == -1) { if (this.packetTask == null) {
reporter.reportDetailed(this, Report.newBuilder(REPORT_CANNOT_CREATE_TIMEOUT_TASK).error(e)); reporter.reportDetailed(this, Report.newBuilder(REPORT_CANNOT_CREATE_TIMEOUT_TASK).error(e));
} }
} }
@ -563,9 +573,9 @@ public class ProtocolLib extends JavaPlugin {
} }
// Clean up // Clean up
if (this.packetTask >= 0) { if (this.packetTask != null) {
SchedulerUtil.cancelTask(this, packetTask); packetTask.cancel();
this.packetTask = -1; this.packetTask = null;
} }
// And redirect handler too // And redirect handler too
@ -601,6 +611,10 @@ public class ProtocolLib extends JavaPlugin {
return config; return config;
} }
public ProtocolScheduler getScheduler() {
return scheduler;
}
// Different commands // Different commands
private enum ProtocolCommand { private enum ProtocolCommand {
FILTER, FILTER,

View File

@ -16,6 +16,7 @@ package com.comphenix.protocol;
import com.comphenix.protocol.error.BasicErrorReporter; import com.comphenix.protocol.error.BasicErrorReporter;
import com.comphenix.protocol.error.ErrorReporter; import com.comphenix.protocol.error.ErrorReporter;
import com.comphenix.protocol.scheduler.ProtocolScheduler;
import com.comphenix.protocol.utility.MinecraftVersion; import com.comphenix.protocol.utility.MinecraftVersion;
import java.util.List; import java.util.List;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
@ -51,17 +52,20 @@ public class ProtocolLibrary {
private static Plugin plugin; private static Plugin plugin;
private static ProtocolConfig config; private static ProtocolConfig config;
private static ProtocolManager manager; private static ProtocolManager manager;
private static ProtocolScheduler scheduler;
private static ErrorReporter reporter = new BasicErrorReporter(); private static ErrorReporter reporter = new BasicErrorReporter();
private static boolean updatesDisabled; private static boolean updatesDisabled;
private static boolean initialized; private static boolean initialized;
protected static void init(Plugin plugin, ProtocolConfig config, ProtocolManager manager, ErrorReporter reporter) { protected static void init(Plugin plugin, ProtocolConfig config, ProtocolManager manager,
ProtocolScheduler scheduler, ErrorReporter reporter) {
Validate.isTrue(!initialized, "ProtocolLib has already been initialized."); Validate.isTrue(!initialized, "ProtocolLib has already been initialized.");
ProtocolLibrary.plugin = plugin; ProtocolLibrary.plugin = plugin;
ProtocolLibrary.config = config; ProtocolLibrary.config = config;
ProtocolLibrary.manager = manager; ProtocolLibrary.manager = manager;
ProtocolLibrary.reporter = reporter; ProtocolLibrary.reporter = reporter;
ProtocolLibrary.scheduler = scheduler;
initialized = true; initialized = true;
} }
@ -89,6 +93,10 @@ public class ProtocolLibrary {
return manager; return manager;
} }
public static ProtocolScheduler getScheduler() {
return scheduler;
}
/** /**
* Retrieve the current error reporter. * Retrieve the current error reporter.
* @return Current error reporter. * @return Current error reporter.

View File

@ -33,13 +33,13 @@ import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.events.PacketListener; import com.comphenix.protocol.events.PacketListener;
import com.comphenix.protocol.injector.PrioritizedListener; import com.comphenix.protocol.injector.PrioritizedListener;
import com.comphenix.protocol.injector.SortedPacketListenerList; import com.comphenix.protocol.injector.SortedPacketListenerList;
import com.comphenix.protocol.scheduler.ProtocolScheduler;
import com.google.common.base.Objects; import com.google.common.base.Objects;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import org.bukkit.entity.Player; import org.bukkit.entity.Player;
import org.bukkit.plugin.Plugin; import org.bukkit.plugin.Plugin;
import org.bukkit.scheduler.BukkitScheduler;
/** /**
* Represents a filter manager for asynchronous packets. * Represents a filter manager for asynchronous packets.
@ -67,7 +67,7 @@ public class AsyncFilterManager implements AsynchronousManager {
private final Thread mainThread; private final Thread mainThread;
// Default scheduler // Default scheduler
private final BukkitScheduler scheduler; private final ProtocolScheduler scheduler;
// Current packet index // Current packet index
private final AtomicInteger currentSendingIndex = new AtomicInteger(); private final AtomicInteger currentSendingIndex = new AtomicInteger();
@ -82,7 +82,7 @@ public class AsyncFilterManager implements AsynchronousManager {
* @param reporter - desired error reporter. * @param reporter - desired error reporter.
* @param scheduler - task scheduler. * @param scheduler - task scheduler.
*/ */
public AsyncFilterManager(ErrorReporter reporter, BukkitScheduler scheduler) { public AsyncFilterManager(ErrorReporter reporter, ProtocolScheduler scheduler) {
// Initialize timeout listeners // Initialize timeout listeners
this.serverTimeoutListeners = new SortedPacketListenerList(); this.serverTimeoutListeners = new SortedPacketListenerList();
this.clientTimeoutListeners = new SortedPacketListenerList(); this.clientTimeoutListeners = new SortedPacketListenerList();
@ -340,7 +340,7 @@ public class AsyncFilterManager implements AsynchronousManager {
* Retrieve the current task scheduler. * Retrieve the current task scheduler.
* @return Current task scheduler. * @return Current task scheduler.
*/ */
public BukkitScheduler getScheduler() { public ProtocolScheduler getScheduler() {
return scheduler; return scheduler;
} }

View File

@ -31,6 +31,7 @@ import com.comphenix.protocol.events.ListeningWhitelist;
import com.comphenix.protocol.events.PacketAdapter; import com.comphenix.protocol.events.PacketAdapter;
import com.comphenix.protocol.events.PacketEvent; import com.comphenix.protocol.events.PacketEvent;
import com.comphenix.protocol.events.PacketListener; import com.comphenix.protocol.events.PacketListener;
import com.comphenix.protocol.scheduler.Task;
import com.comphenix.protocol.timing.TimedListenerManager; import com.comphenix.protocol.timing.TimedListenerManager;
import com.comphenix.protocol.timing.TimedListenerManager.ListenerType; import com.comphenix.protocol.timing.TimedListenerManager.ListenerType;
import com.comphenix.protocol.timing.TimedTracker; import com.comphenix.protocol.timing.TimedTracker;
@ -91,13 +92,13 @@ public class AsyncListenerHandler {
private final Object stopLock = new Object(); private final Object stopLock = new Object();
// Processing task on the main thread // Processing task on the main thread
private int syncTask = -1; private Task syncTask = null;
// Minecraft main thread // Minecraft main thread
private Thread mainThread; private Thread mainThread;
// Warn plugins that the async listener handler must be started // Warn plugins that the async listener handler must be started
private int warningTask; private Task warningTask;
// Timing manager // Timing manager
private TimedListenerManager timedManager = TimedListenerManager.getInstance(); private TimedListenerManager timedManager = TimedListenerManager.getInstance();
@ -121,19 +122,16 @@ public class AsyncListenerHandler {
} }
private void startWarningTask() { private void startWarningTask() {
warningTask = filterManager.getScheduler().scheduleSyncDelayedTask(getPlugin(), () -> ProtocolLibrary.getErrorReporter().reportWarning(AsyncListenerHandler.this, Report. warningTask = filterManager.getScheduler().scheduleSyncDelayedTask(() -> ProtocolLibrary.getErrorReporter().reportWarning(AsyncListenerHandler.this, Report.
newBuilder(REPORT_HANDLER_NOT_STARTED). newBuilder(REPORT_HANDLER_NOT_STARTED).
messageParam(listener.getPlugin(), AsyncListenerHandler.this). messageParam(listener.getPlugin(), AsyncListenerHandler.this).
build()), 2 * TICKS_PER_SECOND); build()), 2 * TICKS_PER_SECOND);
} }
private void stopWarningTask() { private void stopWarningTask() {
int taskId = warningTask; if (warningTask != null) {
warningTask.cancel();
// Ensure we have a task to cancel warningTask = null;
if (warningTask >= 0) {
filterManager.getScheduler().cancelTask(taskId);
warningTask = -1;
} }
} }
@ -406,10 +404,10 @@ public class AsyncListenerHandler {
final long tickDelay = 1; final long tickDelay = 1;
final int workerID = nextID.incrementAndGet(); final int workerID = nextID.incrementAndGet();
if (syncTask < 0) { if (syncTask == null) {
stopWarningTask(); stopWarningTask();
syncTask = filterManager.getScheduler().scheduleSyncRepeatingTask(getPlugin(), () -> { syncTask = filterManager.getScheduler().scheduleSyncRepeatingTask(() -> {
long stopTime = System.nanoTime() + unit.convert(time, TimeUnit.NANOSECONDS); long stopTime = System.nanoTime() + unit.convert(time, TimeUnit.NANOSECONDS);
while (!cancelled) { while (!cancelled) {
@ -435,7 +433,7 @@ public class AsyncListenerHandler {
}, tickDelay, tickDelay); }, tickDelay, tickDelay);
// This is very bad - force the caller to handle it // This is very bad - force the caller to handle it
if (syncTask < 0) if (syncTask == null)
throw new IllegalStateException("Cannot start synchronous task."); throw new IllegalStateException("Cannot start synchronous task.");
else else
return true; return true;
@ -449,10 +447,9 @@ public class AsyncListenerHandler {
* @return TRUE if we stopped any processing tasks, FALSE if it has already been stopped. * @return TRUE if we stopped any processing tasks, FALSE if it has already been stopped.
*/ */
public synchronized boolean syncStop() { public synchronized boolean syncStop() {
if (syncTask > 0) { if (syncTask != null) {
filterManager.getScheduler().cancelTask(syncTask); syncTask.cancel();
syncTask = null;
syncTask = -1;
return true; return true;
} else { } else {
return false; return false;

View File

@ -1,5 +1,6 @@
package com.comphenix.protocol.injector; package com.comphenix.protocol.injector;
import com.comphenix.protocol.ProtocolLib;
import com.comphenix.protocol.async.AsyncFilterManager; import com.comphenix.protocol.async.AsyncFilterManager;
import com.comphenix.protocol.error.ErrorReporter; import com.comphenix.protocol.error.ErrorReporter;
import com.comphenix.protocol.utility.MinecraftVersion; import com.comphenix.protocol.utility.MinecraftVersion;
@ -10,7 +11,7 @@ import org.bukkit.plugin.Plugin;
public class PacketFilterBuilder { public class PacketFilterBuilder {
private Server server; private Server server;
private Plugin library; private ProtocolLib library;
private MinecraftVersion mcVersion; private MinecraftVersion mcVersion;
private ErrorReporter reporter; private ErrorReporter reporter;
private AsyncFilterManager asyncManager; private AsyncFilterManager asyncManager;
@ -32,7 +33,7 @@ public class PacketFilterBuilder {
* @param library - plugin instance. * @param library - plugin instance.
* @return This builder, for chaining. * @return This builder, for chaining.
*/ */
public PacketFilterBuilder library(@Nonnull Plugin library) { public PacketFilterBuilder library(@Nonnull ProtocolLib library) {
this.library = library; this.library = library;
return this; return this;
} }
@ -116,7 +117,7 @@ public class PacketFilterBuilder {
throw new IllegalArgumentException("reporter cannot be NULL."); throw new IllegalArgumentException("reporter cannot be NULL.");
} }
this.asyncManager = new AsyncFilterManager(this.reporter, this.server.getScheduler()); this.asyncManager = new AsyncFilterManager(this.reporter, this.library.getScheduler());
return new PacketFilterManager(this); return new PacketFilterManager(this);
} }
} }

View File

@ -0,0 +1,32 @@
package com.comphenix.protocol.scheduler;
import org.bukkit.plugin.Plugin;
import org.bukkit.scheduler.BukkitScheduler;
public class DefaultScheduler implements ProtocolScheduler {
private final Plugin plugin;
private final BukkitScheduler scheduler;
public DefaultScheduler(Plugin plugin) {
this.plugin = plugin;
this.scheduler = plugin.getServer().getScheduler();
}
@Override
public Task scheduleSyncRepeatingTask(Runnable task, long delay, long period) {
int taskId = scheduler.scheduleSyncRepeatingTask(plugin, task, delay, period);
return taskId >= 0 ? new DefaultTask(scheduler, taskId) : null;
}
@Override
public Task runTask(Runnable task) {
int taskId = scheduler.runTask(plugin, task).getTaskId();
return taskId >= 0 ? new DefaultTask(scheduler, taskId) : null;
}
@Override
public Task scheduleSyncDelayedTask(Runnable task, long delay) {
int taskId = scheduler.scheduleSyncDelayedTask(plugin, task, delay);
return taskId >= 0 ? new DefaultTask(scheduler, taskId) : null;
}
}

View File

@ -0,0 +1,19 @@
package com.comphenix.protocol.scheduler;
import org.bukkit.scheduler.BukkitScheduler;
import org.bukkit.scheduler.BukkitTask;
public class DefaultTask implements Task {
private final int taskId;
private final BukkitScheduler scheduler;
public DefaultTask(BukkitScheduler scheduler, int taskId) {
this.taskId = taskId;
this.scheduler = scheduler;
}
@Override
public void cancel() {
scheduler.cancelTask(taskId);
}
}

View File

@ -0,0 +1,51 @@
package com.comphenix.protocol.scheduler;
import com.comphenix.protocol.reflect.accessors.Accessors;
import com.comphenix.protocol.reflect.accessors.MethodAccessor;
import com.comphenix.protocol.utility.MinecraftReflection;
import org.bukkit.Bukkit;
import org.bukkit.plugin.Plugin;
import java.util.function.Consumer;
public class FoliaScheduler implements ProtocolScheduler {
private final Object foliaScheduler;
private final MethodAccessor runAtFixedRate;
private final MethodAccessor runDelayed;
private final MethodAccessor execute;
private final MethodAccessor cancel;
private final Plugin plugin;
public FoliaScheduler(Plugin plugin) {
this.plugin = plugin;
MethodAccessor getScheduler = Accessors.getMethodAccessor(Bukkit.getServer().getClass(), "getGlobalRegionScheduler");
this.foliaScheduler = getScheduler.invoke(Bukkit.getServer());
this.runAtFixedRate = Accessors.getMethodAccessor(foliaScheduler.getClass(), "runAtFixedRate", Plugin.class,
Consumer.class, long.class, long.class);
this.execute = Accessors.getMethodAccessor(foliaScheduler.getClass(), "run", Plugin.class, Runnable.class);
this.runDelayed = Accessors.getMethodAccessor(foliaScheduler.getClass(), "runDelayed", Plugin.class, Runnable.class, long.class);
Class<?> taskClass = MinecraftReflection.getLibraryClass("io.papermc.paper.threadedregions.scheduler.ScheduledTask");
this.cancel = Accessors.getMethodAccessor(taskClass, "cancel");
}
@Override
public Task scheduleSyncRepeatingTask(Runnable task, long delay, long period) {
Object taskHandle = runAtFixedRate.invoke(foliaScheduler, plugin, (Consumer<Object>)(t -> task.run()), delay, period);
return new FoliaTask(cancel, taskHandle);
}
@Override
public Task runTask(Runnable task) {
Object taskHandle = execute.invoke(foliaScheduler, plugin, (Consumer<Object>)(t -> task.run()));
return new FoliaTask(cancel, taskHandle);
}
@Override
public Task scheduleSyncDelayedTask(Runnable task, long delay) {
Object taskHandle = runDelayed.invoke(foliaScheduler, plugin, (Consumer<Object>)(t -> task.run()), delay);
return new FoliaTask(cancel, taskHandle);
}
}

View File

@ -0,0 +1,18 @@
package com.comphenix.protocol.scheduler;
import com.comphenix.protocol.reflect.accessors.MethodAccessor;
public class FoliaTask implements Task {
private final MethodAccessor cancel;
private final Object taskHandle;
public FoliaTask(MethodAccessor cancel, Object taskHandle) {
this.cancel = cancel;
this.taskHandle = taskHandle;
}
@Override
public void cancel() {
cancel.invoke(taskHandle);
}
}

View File

@ -0,0 +1,12 @@
package com.comphenix.protocol.scheduler;
import com.comphenix.protocol.ProtocolLib;
import org.bukkit.plugin.Plugin;
public interface ProtocolScheduler {
Task scheduleSyncRepeatingTask(Runnable task, long delay, long period);
Task runTask(Runnable task);
Task scheduleSyncDelayedTask(Runnable task, long delay);
}

View File

@ -0,0 +1,5 @@
package com.comphenix.protocol.scheduler;
public interface Task {
void cancel();
}

View File

@ -81,7 +81,7 @@ public final class SpigotUpdater extends Updater {
} finally { } finally {
// Invoke the listeners on the main thread // Invoke the listeners on the main thread
for (Runnable listener : listeners) { for (Runnable listener : listeners) {
SchedulerUtil.execute(plugin, listener); ProtocolLibrary.getScheduler().runTask(listener);
} }
} }
} }