From 5ead47ba2fc3bf94b0b3066d31abadd21786ef1d Mon Sep 17 00:00:00 2001 From: Jesse Boyd Date: Sat, 4 Mar 2017 18:22:11 +1100 Subject: [PATCH] Possibly fixes #438 --- .../fawe/bukkit/v1_10/BukkitQueue_1_10.java | 4 +-- .../fawe/bukkit/v1_11/BukkitQueue_1_11.java | 4 +-- .../fawe/example/DefaultFaweQueueMap.java | 28 +++++++++++++------ .../boydti/fawe/example/IFaweQueueMap.java | 3 +- .../boydti/fawe/example/MappedFaweQueue.java | 5 ++-- .../boydti/fawe/example/WeakFaweQueueMap.java | 24 +++++++++++----- .../boydti/fawe/jnbt/anvil/MCAQueueMap.java | 3 +- .../com/boydti/fawe/object/FaweQueue.java | 4 +-- .../boydti/fawe/util/DelegateFaweQueue.java | 5 ++-- .../java/com/boydti/fawe/util/SetQueue.java | 9 ++++-- 10 files changed, 55 insertions(+), 34 deletions(-) diff --git a/bukkit/src/main/java/com/boydti/fawe/bukkit/v1_10/BukkitQueue_1_10.java b/bukkit/src/main/java/com/boydti/fawe/bukkit/v1_10/BukkitQueue_1_10.java index e8fd2283..6ea3f7df 100644 --- a/bukkit/src/main/java/com/boydti/fawe/bukkit/v1_10/BukkitQueue_1_10.java +++ b/bukkit/src/main/java/com/boydti/fawe/bukkit/v1_10/BukkitQueue_1_10.java @@ -247,8 +247,8 @@ public class BukkitQueue_1_10 extends BukkitQueue_0 item = iter.next(); FaweChunk chunk = item.getValue(); if (skip && chunk == lastWrappedChunk) { i--; - added--; continue; } iter.remove(); parent.start(chunk); - pool.submit(chunk); + service.submit(chunk); + added++; } // if result, then submitted = amount if (result) { @@ -165,14 +170,19 @@ public class DefaultFaweQueueMap implements IFaweQueueMap { } iter.remove(); parent.start(chunk); - pool.submit(chunk); - FaweChunk fc = ((FaweChunk) pool.take().get()); - parent.end(fc); + service.submit(chunk); + Future future = service.poll(50, TimeUnit.MILLISECONDS); + if (future != null) { + FaweChunk fc = (FaweChunk) future.get(); + parent.end(fc); + } } } } - for (int i = 0; i < added; i++) { - FaweChunk fc = ((FaweChunk) pool.take().get()); + pool.awaitQuiescence(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + Future future; + while ((future = service.poll()) != null) { + FaweChunk fc = (FaweChunk) future.get(); parent.end(fc); } } diff --git a/core/src/main/java/com/boydti/fawe/example/IFaweQueueMap.java b/core/src/main/java/com/boydti/fawe/example/IFaweQueueMap.java index e20d1b41..6633b075 100644 --- a/core/src/main/java/com/boydti/fawe/example/IFaweQueueMap.java +++ b/core/src/main/java/com/boydti/fawe/example/IFaweQueueMap.java @@ -3,7 +3,6 @@ package com.boydti.fawe.example; import com.boydti.fawe.object.FaweChunk; import com.boydti.fawe.object.RunnableVal; import java.util.Collection; -import java.util.concurrent.ExecutorCompletionService; public interface IFaweQueueMap { @@ -21,5 +20,5 @@ public interface IFaweQueueMap { int size(); - boolean next(int size, ExecutorCompletionService dispatcher, long time); + boolean next(int size, long time); } diff --git a/core/src/main/java/com/boydti/fawe/example/MappedFaweQueue.java b/core/src/main/java/com/boydti/fawe/example/MappedFaweQueue.java index 0ccd63d7..234a3bf4 100644 --- a/core/src/main/java/com/boydti/fawe/example/MappedFaweQueue.java +++ b/core/src/main/java/com/boydti/fawe/example/MappedFaweQueue.java @@ -21,7 +21,6 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.UUID; -import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; @@ -213,8 +212,8 @@ public abstract class MappedFaweQueue exte } @Override - public boolean next(int amount, ExecutorCompletionService pool, long time) { - return map.next(amount, pool, time); + public boolean next(int amount, long time) { + return map.next(amount, time); } public void start(FaweChunk chunk) { diff --git a/core/src/main/java/com/boydti/fawe/example/WeakFaweQueueMap.java b/core/src/main/java/com/boydti/fawe/example/WeakFaweQueueMap.java index 45e48644..6912645f 100644 --- a/core/src/main/java/com/boydti/fawe/example/WeakFaweQueueMap.java +++ b/core/src/main/java/com/boydti/fawe/example/WeakFaweQueueMap.java @@ -14,6 +14,9 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; public class WeakFaweQueueMap implements IFaweQueueMap { @@ -149,7 +152,7 @@ public class WeakFaweQueueMap implements IFaweQueueMap { private int lastZ = Integer.MIN_VALUE; @Override - public boolean next(int amount, ExecutorCompletionService pool, long time) { + public boolean next(int amount, long time) { synchronized (blocks) { try { boolean skip = parent.getStage() == SetQueue.QueueStage.INACTIVE; @@ -179,6 +182,8 @@ public class WeakFaweQueueMap implements IFaweQueueMap { } while (System.currentTimeMillis() - start < time); return !blocks.isEmpty(); } + ExecutorCompletionService service = SetQueue.IMP.getCompleterService(); + ForkJoinPool pool = SetQueue.IMP.getForkJoinPool(); boolean result = true; // amount = 8; for (int i = 0; i < amount && (result = iter.hasNext());) { @@ -191,7 +196,7 @@ public class WeakFaweQueueMap implements IFaweQueueMap { iter.remove(); if (chunk != null) { parent.start(chunk); - pool.submit(chunk); + service.submit(chunk); added++; i++; } else { @@ -212,15 +217,20 @@ public class WeakFaweQueueMap implements IFaweQueueMap { iter.remove(); if (chunk != null) { parent.start(chunk); - pool.submit(chunk); - FaweChunk fc = ((FaweChunk) pool.take().get()); - parent.end(fc); + service.submit(chunk); + Future future = service.poll(50, TimeUnit.MILLISECONDS); + if (future != null) { + FaweChunk fc = (FaweChunk) future.get(); + parent.end(fc); + } } } } } - for (int i = 0; i < added; i++) { - FaweChunk fc = ((FaweChunk) pool.take().get()); + pool.awaitQuiescence(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + Future future; + while ((future = service.poll()) != null) { + FaweChunk fc = (FaweChunk) future.get(); parent.end(fc); } } catch (Throwable e) { diff --git a/core/src/main/java/com/boydti/fawe/jnbt/anvil/MCAQueueMap.java b/core/src/main/java/com/boydti/fawe/jnbt/anvil/MCAQueueMap.java index b2c4409f..fe808b8d 100644 --- a/core/src/main/java/com/boydti/fawe/jnbt/anvil/MCAQueueMap.java +++ b/core/src/main/java/com/boydti/fawe/jnbt/anvil/MCAQueueMap.java @@ -14,7 +14,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorCompletionService; public class MCAQueueMap implements IFaweQueueMap { @@ -150,7 +149,7 @@ public class MCAQueueMap implements IFaweQueueMap { } @Override - public boolean next(int size, ExecutorCompletionService dispatcher, long time) { + public boolean next(int size, long time) { lastX = Integer.MIN_VALUE; lastZ = Integer.MIN_VALUE; lastFileX = Integer.MIN_VALUE; diff --git a/core/src/main/java/com/boydti/fawe/object/FaweQueue.java b/core/src/main/java/com/boydti/fawe/object/FaweQueue.java index 2f9e2ad6..6a4b83c2 100644 --- a/core/src/main/java/com/boydti/fawe/object/FaweQueue.java +++ b/core/src/main/java/com/boydti/fawe/object/FaweQueue.java @@ -314,14 +314,14 @@ public abstract class FaweQueue implements HasFaweQueue { int amount = Settings.IMP.QUEUE.PARALLEL_THREADS; ExecutorCompletionService service = SetQueue.IMP.getCompleterService(); long time = 20; // 30ms - return next(amount, service, time); + return next(amount, time); } /** * Gets the FaweChunk and sets the requested blocks * @return */ - public abstract boolean next(int amount, ExecutorCompletionService pool, long time); + public abstract boolean next(int amount, long time); public void saveMemory() { MainUtil.sendAdmin(BBC.OOM.s()); diff --git a/core/src/main/java/com/boydti/fawe/util/DelegateFaweQueue.java b/core/src/main/java/com/boydti/fawe/util/DelegateFaweQueue.java index 6ffb0245..8f588201 100644 --- a/core/src/main/java/com/boydti/fawe/util/DelegateFaweQueue.java +++ b/core/src/main/java/com/boydti/fawe/util/DelegateFaweQueue.java @@ -19,7 +19,6 @@ import java.util.Collection; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ExecutorCompletionService; import javax.annotation.Nullable; public class DelegateFaweQueue extends FaweQueue { @@ -232,8 +231,8 @@ public class DelegateFaweQueue extends FaweQueue { } @Override - public boolean next(int amount, ExecutorCompletionService pool, long time) { - return parent.next(amount, pool, time); + public boolean next(int amount, long time) { + return parent.next(amount, time); } @Override diff --git a/core/src/main/java/com/boydti/fawe/util/SetQueue.java b/core/src/main/java/com/boydti/fawe/util/SetQueue.java index 7fa00dea..62778c7f 100644 --- a/core/src/main/java/com/boydti/fawe/util/SetQueue.java +++ b/core/src/main/java/com/boydti/fawe/util/SetQueue.java @@ -54,6 +54,11 @@ public class SetQueue { return completer; } + @Deprecated + public ForkJoinPool getForkJoinPool() { + return pool; + } + public void runMiscTasks() { while (Fawe.get().getTimer().isAbove(targetTPS)) { Runnable task = tasks.poll(); @@ -116,7 +121,7 @@ public class SetQueue { boolean parallel = Settings.IMP.QUEUE.PARALLEL_THREADS > 1; queue.startSet(parallel); try { - if (!queue.next(Settings.IMP.QUEUE.PARALLEL_THREADS, getCompleterService(), time) && queue.getStage() == QueueStage.ACTIVE) { + if (!queue.next(Settings.IMP.QUEUE.PARALLEL_THREADS, time) && queue.getStage() == QueueStage.ACTIVE) { queue.setStage(QueueStage.NONE); queue.runTasks(); } @@ -216,7 +221,7 @@ public class SetQueue { public void flush(FaweQueue queue) { queue.startSet(Settings.IMP.QUEUE.PARALLEL_THREADS > 1); try { - queue.next(Settings.IMP.QUEUE.PARALLEL_THREADS, getCompleterService(), Long.MAX_VALUE); + queue.next(Settings.IMP.QUEUE.PARALLEL_THREADS, Long.MAX_VALUE); } catch (Throwable e) { pool.awaitQuiescence(Settings.IMP.QUEUE.DISCARD_AFTER_MS, TimeUnit.MILLISECONDS); completer = new ExecutorCompletionService(pool);