Allow tasks to be bound to the same thread for each run.

This commit is contained in:
jglrxavpok 2021-08-26 19:50:28 +02:00
parent f7cd0def20
commit 42938111d5
5 changed files with 149 additions and 12 deletions

View File

@ -68,6 +68,7 @@ public abstract class GLFWCapableBuffer {
} }
public void changeRenderingThreadToCurrent() { public void changeRenderingThreadToCurrent() {
System.out.println("Currently on thread "+Thread.currentThread().getId());
glfwMakeContextCurrent(glfwWindow); glfwMakeContextCurrent(glfwWindow);
GL.createCapabilities(); GL.createCapabilities();
} }
@ -90,6 +91,7 @@ public abstract class GLFWCapableBuffer {
render(rendering); render(rendering);
} }
}) })
.bindToSingleThread()
.repeat(period) .repeat(period)
.schedule(); .schedule();
} }

View File

@ -7,6 +7,7 @@ import net.minestom.server.MinecraftServer;
import net.minestom.server.extensions.Extension; import net.minestom.server.extensions.Extension;
import net.minestom.server.extensions.IExtensionObserver; import net.minestom.server.extensions.IExtensionObserver;
import net.minestom.server.utils.thread.MinestomThread; import net.minestom.server.utils.thread.MinestomThread;
import net.minestom.server.utils.thread.ThreadBindingExecutor;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import java.util.Collection; import java.util.Collection;
@ -35,6 +36,8 @@ public final class SchedulerManager implements IExtensionObserver {
private final AtomicInteger shutdownCounter; private final AtomicInteger shutdownCounter;
//A threaded execution //A threaded execution
private final ExecutorService batchesPool; private final ExecutorService batchesPool;
//Thread execution, which always uses the same Thread for a given Task
private final ExecutorService threadBindingPool;
// A single threaded scheduled execution // A single threaded scheduled execution
private final ScheduledExecutorService timerExecutionService; private final ScheduledExecutorService timerExecutionService;
// All the registered tasks (task id = task) // All the registered tasks (task id = task)
@ -66,6 +69,7 @@ public final class SchedulerManager implements IExtensionObserver {
this.shutdownCounter = new AtomicInteger(); this.shutdownCounter = new AtomicInteger();
this.batchesPool = new MinestomThread(MinecraftServer.THREAD_COUNT_SCHEDULER, MinecraftServer.THREAD_NAME_SCHEDULER); this.batchesPool = new MinestomThread(MinecraftServer.THREAD_COUNT_SCHEDULER, MinecraftServer.THREAD_NAME_SCHEDULER);
this.threadBindingPool = new ThreadBindingExecutor(MinecraftServer.THREAD_COUNT_SCHEDULER, MinecraftServer.THREAD_NAME_SCHEDULER);
this.timerExecutionService = Executors.newSingleThreadScheduledExecutor(); this.timerExecutionService = Executors.newSingleThreadScheduledExecutor();
this.tasks = new Int2ObjectOpenHashMap<>(); this.tasks = new Int2ObjectOpenHashMap<>();
this.shutdownTasks = new Int2ObjectOpenHashMap<>(); this.shutdownTasks = new Int2ObjectOpenHashMap<>();
@ -183,15 +187,26 @@ public final class SchedulerManager implements IExtensionObserver {
} }
/** /**
* Gets the execution service for all the registered {@link Task}. * Gets the execution service for all the registered {@link Task}, which are not marked as thread-bound.
* *
* @return the execution service for all the registered {@link Task} * @return the execution service for all the registered {@link Task}, which are not marked as thread-bound
*/ */
@NotNull @NotNull
public ExecutorService getBatchesPool() { public ExecutorService getBatchesPool() {
return batchesPool; return batchesPool;
} }
/**
* Gets the execution service for all the registered {@link Task}, which are marked as thread-bound.
* The thread to which tasks are assigned depends on their runnable hashcode. Two (or more) tasks can be bound to the same Thread.
*
* @return the execution service for all the registered {@link Task}, which are marked as thread-bound
*/
@NotNull
public ExecutorService getThreadBindingPool() {
return threadBindingPool;
}
/** /**
* Gets the scheduled execution service for all the registered {@link Task}. * Gets the scheduled execution service for all the registered {@link Task}.
* *

View File

@ -6,6 +6,7 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -42,6 +43,12 @@ public class Task implements Runnable {
private ScheduledFuture<?> future; private ScheduledFuture<?> future;
// The thread of the task // The thread of the task
private volatile Thread currentThreadTask; private volatile Thread currentThreadTask;
// The executor service used for this task
private final ExecutorService executorService;
// Whether this task will always execute on the same thread
private final boolean boundToSingleThread;
// Action executed on the executor. Stored inside the Task to avoid changing the hashcode (which ThreadBindingExecutor relies on)
private final Runnable action;
/** /**
* Creates a task. * Creates a task.
@ -51,24 +58,21 @@ public class Task implements Runnable {
* @param shutdown Defines whether the task is a shutdown task * @param shutdown Defines whether the task is a shutdown task
* @param delay The time to delay * @param delay The time to delay
* @param repeat The time until the repetition * @param repeat The time until the repetition
* @param bindToSingleThread Whether to run the given task always on the same thread.
*/ */
public Task(@NotNull SchedulerManager schedulerManager, @NotNull Runnable runnable, boolean shutdown, long delay, long repeat, boolean isTransient, @Nullable String owningExtension) { public Task(@NotNull SchedulerManager schedulerManager, @NotNull Runnable runnable, boolean shutdown, long delay, long repeat, boolean isTransient, @Nullable String owningExtension, boolean bindToSingleThread) {
this.schedulerManager = schedulerManager; this.schedulerManager = schedulerManager;
this.runnable = runnable; this.runnable = runnable;
this.shutdown = shutdown; this.shutdown = shutdown;
this.id = shutdown ? this.schedulerManager.getShutdownCounterIdentifier() : this.schedulerManager.getCounterIdentifier(); this.id = shutdown ? this.schedulerManager.getShutdownCounterIdentifier() : this.schedulerManager.getCounterIdentifier();
this.executorService = bindToSingleThread ? this.schedulerManager.getThreadBindingPool() : this.schedulerManager.getBatchesPool();
this.delay = delay; this.delay = delay;
this.repeat = repeat; this.repeat = repeat;
this.isTransient = isTransient; this.isTransient = isTransient;
this.boundToSingleThread = bindToSingleThread;
this.owningExtension = owningExtension; this.owningExtension = owningExtension;
}
/** this.action = () -> {
* Executes the task.
*/
@Override
public void run() {
this.schedulerManager.getBatchesPool().execute(() -> {
this.currentThreadTask = Thread.currentThread(); this.currentThreadTask = Thread.currentThread();
try { try {
this.runnable.run(); this.runnable.run();
@ -84,7 +88,15 @@ public class Task implements Runnable {
if (this.repeat == 0) this.finish(); if (this.repeat == 0) this.finish();
this.currentThreadTask = null; this.currentThreadTask = null;
} }
}); };
}
/**
* Executes the task.
*/
@Override
public void run() {
executorService.execute(action);
} }
/** /**

View File

@ -36,6 +36,12 @@ public class TaskBuilder {
*/ */
private boolean isTransient; private boolean isTransient;
/**
* Should this Task be run on the same thread for each run? Can be used for situations where context is bound to a
* single thread. (for instance OpenGL context)
*/
private boolean boundToSingleThread = false;
/** /**
* Creates a task builder. * Creates a task builder.
* <br> * <br>
@ -137,6 +143,15 @@ public class TaskBuilder {
return this; return this;
} }
/**
* Makes this Task be run on the same thread for each run. Can be used for situations where context is bound to a
* single thread. (for instance OpenGL context)
*/
public TaskBuilder bindToSingleThread() {
boundToSingleThread = true;
return this;
}
/** /**
* Builds a {@link Task}. * Builds a {@link Task}.
* *
@ -151,7 +166,8 @@ public class TaskBuilder {
this.delay, this.delay,
this.repeat, this.repeat,
this.isTransient, this.isTransient,
this.owningExtension); this.owningExtension,
this.boundToSingleThread);
} }
/** /**

View File

@ -0,0 +1,92 @@
package net.minestom.server.utils.thread;
import org.jetbrains.annotations.NotNull;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Executor service which will always give the same thread to a given Runnable.
* Uses {@link Runnable#hashCode()} to determine the thread to assign.
*/
public class ThreadBindingExecutor extends AbstractExecutorService {
private MinestomThread[] threadExecutors;
/**
* Creates a non-local thread-binding executor
*
* @param nThreads the number of threads
* @param name the name of the thread pool
*/
public ThreadBindingExecutor(int nThreads, String name) {
this(nThreads, name, false);
}
/**
* @param nThreads the number of threads
* @param name the name of the thread pool
* @param local set to true if this executor is only used inside a method and should *not* be kept in the internal list of executors
*/
public ThreadBindingExecutor(int nThreads, String name, boolean local) {
threadExecutors = new MinestomThread[nThreads];
for (int i = 0; i < nThreads; i++) {
threadExecutors[i] = new MinestomThread(1, name, local);
}
}
@Override
public void shutdown() {
for (MinestomThread t : threadExecutors) {
t.shutdown();
}
}
@NotNull
@Override
public List<Runnable> shutdownNow() {
List<Runnable> allTasks = new LinkedList<>();
for (MinestomThread t : threadExecutors) {
allTasks.addAll(t.shutdownNow());
}
return allTasks;
}
@Override
public boolean isShutdown() {
for (MinestomThread t : threadExecutors) {
if(!t.isShutdown())
return false;
}
return true;
}
@Override
public boolean isTerminated() {
for (MinestomThread t : threadExecutors) {
if(!t.isShutdown())
return false;
}
return true;
}
@Override
public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException {
boolean terminated = true;
for (MinestomThread t : threadExecutors) {
terminated &= t.awaitTermination(timeout, unit);
}
return terminated;
}
@Override
public void execute(@NotNull Runnable command) {
int hash = command.hashCode();
if(hash < 0) hash = -hash;
int bucket = hash % threadExecutors.length;
threadExecutors[bucket].execute(command);
}
}