diff --git a/src/lwjgl/java/net/minestom/server/map/framebuffers/GLFWCapableBuffer.java b/src/lwjgl/java/net/minestom/server/map/framebuffers/GLFWCapableBuffer.java index 92dc949f0..18dfb866f 100644 --- a/src/lwjgl/java/net/minestom/server/map/framebuffers/GLFWCapableBuffer.java +++ b/src/lwjgl/java/net/minestom/server/map/framebuffers/GLFWCapableBuffer.java @@ -68,6 +68,7 @@ public abstract class GLFWCapableBuffer { } public void changeRenderingThreadToCurrent() { + System.out.println("Currently on thread "+Thread.currentThread().getId()); glfwMakeContextCurrent(glfwWindow); GL.createCapabilities(); } @@ -90,6 +91,7 @@ public abstract class GLFWCapableBuffer { render(rendering); } }) + .bindToSingleThread() .repeat(period) .schedule(); } diff --git a/src/main/java/net/minestom/server/timer/SchedulerManager.java b/src/main/java/net/minestom/server/timer/SchedulerManager.java index bddfd29b8..698044727 100644 --- a/src/main/java/net/minestom/server/timer/SchedulerManager.java +++ b/src/main/java/net/minestom/server/timer/SchedulerManager.java @@ -7,6 +7,7 @@ import net.minestom.server.MinecraftServer; import net.minestom.server.extensions.Extension; import net.minestom.server.extensions.IExtensionObserver; import net.minestom.server.utils.thread.MinestomThread; +import net.minestom.server.utils.thread.ThreadBindingExecutor; import org.jetbrains.annotations.NotNull; import java.util.Collection; @@ -35,6 +36,8 @@ public final class SchedulerManager implements IExtensionObserver { private final AtomicInteger shutdownCounter; //A threaded execution private final ExecutorService batchesPool; + //Thread execution, which always uses the same Thread for a given Task + private final ExecutorService threadBindingPool; // A single threaded scheduled execution private final ScheduledExecutorService timerExecutionService; // All the registered tasks (task id = task) @@ -66,6 +69,7 @@ public final class SchedulerManager implements IExtensionObserver { this.shutdownCounter = new AtomicInteger(); this.batchesPool = new MinestomThread(MinecraftServer.THREAD_COUNT_SCHEDULER, MinecraftServer.THREAD_NAME_SCHEDULER); + this.threadBindingPool = new ThreadBindingExecutor(MinecraftServer.THREAD_COUNT_SCHEDULER, MinecraftServer.THREAD_NAME_SCHEDULER); this.timerExecutionService = Executors.newSingleThreadScheduledExecutor(); this.tasks = new Int2ObjectOpenHashMap<>(); this.shutdownTasks = new Int2ObjectOpenHashMap<>(); @@ -183,15 +187,26 @@ public final class SchedulerManager implements IExtensionObserver { } /** - * Gets the execution service for all the registered {@link Task}. + * Gets the execution service for all the registered {@link Task}, which are not marked as thread-bound. * - * @return the execution service for all the registered {@link Task} + * @return the execution service for all the registered {@link Task}, which are not marked as thread-bound */ @NotNull public ExecutorService getBatchesPool() { return batchesPool; } + /** + * Gets the execution service for all the registered {@link Task}, which are marked as thread-bound. + * The thread to which tasks are assigned depends on their runnable hashcode. Two (or more) tasks can be bound to the same Thread. + * + * @return the execution service for all the registered {@link Task}, which are marked as thread-bound + */ + @NotNull + public ExecutorService getThreadBindingPool() { + return threadBindingPool; + } + /** * Gets the scheduled execution service for all the registered {@link Task}. * diff --git a/src/main/java/net/minestom/server/timer/Task.java b/src/main/java/net/minestom/server/timer/Task.java index 8b7e8218a..2e3d40e7d 100644 --- a/src/main/java/net/minestom/server/timer/Task.java +++ b/src/main/java/net/minestom/server/timer/Task.java @@ -6,6 +6,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.util.Objects; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -42,6 +43,12 @@ public class Task implements Runnable { private ScheduledFuture future; // The thread of the task private volatile Thread currentThreadTask; + // The executor service used for this task + private final ExecutorService executorService; + // Whether this task will always execute on the same thread + private final boolean boundToSingleThread; + // Action executed on the executor. Stored inside the Task to avoid changing the hashcode (which ThreadBindingExecutor relies on) + private final Runnable action; /** * Creates a task. @@ -51,24 +58,21 @@ public class Task implements Runnable { * @param shutdown Defines whether the task is a shutdown task * @param delay The time to delay * @param repeat The time until the repetition + * @param bindToSingleThread Whether to run the given task always on the same thread. */ - public Task(@NotNull SchedulerManager schedulerManager, @NotNull Runnable runnable, boolean shutdown, long delay, long repeat, boolean isTransient, @Nullable String owningExtension) { + public Task(@NotNull SchedulerManager schedulerManager, @NotNull Runnable runnable, boolean shutdown, long delay, long repeat, boolean isTransient, @Nullable String owningExtension, boolean bindToSingleThread) { this.schedulerManager = schedulerManager; this.runnable = runnable; this.shutdown = shutdown; this.id = shutdown ? this.schedulerManager.getShutdownCounterIdentifier() : this.schedulerManager.getCounterIdentifier(); + this.executorService = bindToSingleThread ? this.schedulerManager.getThreadBindingPool() : this.schedulerManager.getBatchesPool(); this.delay = delay; this.repeat = repeat; this.isTransient = isTransient; + this.boundToSingleThread = bindToSingleThread; this.owningExtension = owningExtension; - } - /** - * Executes the task. - */ - @Override - public void run() { - this.schedulerManager.getBatchesPool().execute(() -> { + this.action = () -> { this.currentThreadTask = Thread.currentThread(); try { this.runnable.run(); @@ -84,7 +88,15 @@ public class Task implements Runnable { if (this.repeat == 0) this.finish(); this.currentThreadTask = null; } - }); + }; + } + + /** + * Executes the task. + */ + @Override + public void run() { + executorService.execute(action); } /** diff --git a/src/main/java/net/minestom/server/timer/TaskBuilder.java b/src/main/java/net/minestom/server/timer/TaskBuilder.java index f1387f0c7..287e3d3b9 100644 --- a/src/main/java/net/minestom/server/timer/TaskBuilder.java +++ b/src/main/java/net/minestom/server/timer/TaskBuilder.java @@ -36,6 +36,12 @@ public class TaskBuilder { */ private boolean isTransient; + /** + * Should this Task be run on the same thread for each run? Can be used for situations where context is bound to a + * single thread. (for instance OpenGL context) + */ + private boolean boundToSingleThread = false; + /** * Creates a task builder. *
@@ -137,6 +143,15 @@ public class TaskBuilder { return this; } + /** + * Makes this Task be run on the same thread for each run. Can be used for situations where context is bound to a + * single thread. (for instance OpenGL context) + */ + public TaskBuilder bindToSingleThread() { + boundToSingleThread = true; + return this; + } + /** * Builds a {@link Task}. * @@ -151,7 +166,8 @@ public class TaskBuilder { this.delay, this.repeat, this.isTransient, - this.owningExtension); + this.owningExtension, + this.boundToSingleThread); } /** diff --git a/src/main/java/net/minestom/server/utils/thread/ThreadBindingExecutor.java b/src/main/java/net/minestom/server/utils/thread/ThreadBindingExecutor.java new file mode 100644 index 000000000..3b8c119cb --- /dev/null +++ b/src/main/java/net/minestom/server/utils/thread/ThreadBindingExecutor.java @@ -0,0 +1,92 @@ +package net.minestom.server.utils.thread; + +import org.jetbrains.annotations.NotNull; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Executor service which will always give the same thread to a given Runnable. + * Uses {@link Runnable#hashCode()} to determine the thread to assign. + */ +public class ThreadBindingExecutor extends AbstractExecutorService { + + private MinestomThread[] threadExecutors; + + /** + * Creates a non-local thread-binding executor + * + * @param nThreads the number of threads + * @param name the name of the thread pool + */ + public ThreadBindingExecutor(int nThreads, String name) { + this(nThreads, name, false); + } + + /** + * @param nThreads the number of threads + * @param name the name of the thread pool + * @param local set to true if this executor is only used inside a method and should *not* be kept in the internal list of executors + */ + public ThreadBindingExecutor(int nThreads, String name, boolean local) { + threadExecutors = new MinestomThread[nThreads]; + for (int i = 0; i < nThreads; i++) { + threadExecutors[i] = new MinestomThread(1, name, local); + } + } + + @Override + public void shutdown() { + for (MinestomThread t : threadExecutors) { + t.shutdown(); + } + } + + @NotNull + @Override + public List shutdownNow() { + List allTasks = new LinkedList<>(); + for (MinestomThread t : threadExecutors) { + allTasks.addAll(t.shutdownNow()); + } + return allTasks; + } + + @Override + public boolean isShutdown() { + for (MinestomThread t : threadExecutors) { + if(!t.isShutdown()) + return false; + } + return true; + } + + @Override + public boolean isTerminated() { + for (MinestomThread t : threadExecutors) { + if(!t.isShutdown()) + return false; + } + return true; + } + + @Override + public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException { + boolean terminated = true; + for (MinestomThread t : threadExecutors) { + terminated &= t.awaitTermination(timeout, unit); + } + return terminated; + } + + @Override + public void execute(@NotNull Runnable command) { + int hash = command.hashCode(); + if(hash < 0) hash = -hash; + int bucket = hash % threadExecutors.length; + + threadExecutors[bucket].execute(command); + } +}