diff --git a/src/main/java/net/minecraft/server/EntityPlayer.java b/src/main/java/net/minecraft/server/EntityPlayer.java index 04628ddb24..6481e0d05b 100644 --- a/src/main/java/net/minecraft/server/EntityPlayer.java +++ b/src/main/java/net/minecraft/server/EntityPlayer.java @@ -7,6 +7,7 @@ import java.util.Set; // CraftBukkit start import org.bukkit.Bukkit; +import org.bukkit.craftbukkit.ChunkCompressionThread; import org.bukkit.craftbukkit.CraftWorld; import org.bukkit.craftbukkit.inventory.CraftItemStack; import org.bukkit.event.entity.EntityDeathEvent; @@ -209,7 +210,7 @@ public class EntityPlayer extends EntityHuman implements ICrafting { if (chunkcoordintpair != null) { boolean flag1 = false; - if (this.netServerHandler.b() < 4) { + if (this.netServerHandler.b() + ChunkCompressionThread.getPlayerQueueSize(this) < 4) { // CraftBukkit - Add check against Chunk Packets in the ChunkCompressionThread. flag1 = true; } diff --git a/src/main/java/net/minecraft/server/NetServerHandler.java b/src/main/java/net/minecraft/server/NetServerHandler.java index 4744063d56..43b4564782 100644 --- a/src/main/java/net/minecraft/server/NetServerHandler.java +++ b/src/main/java/net/minecraft/server/NetServerHandler.java @@ -7,6 +7,7 @@ import java.util.logging.Logger; // CraftBukkit start import org.bukkit.ChatColor; +import org.bukkit.craftbukkit.ChunkCompressionThread; import org.bukkit.craftbukkit.command.ColouredConsoleSender; import org.bukkit.Location; import org.bukkit.command.CommandException; @@ -629,6 +630,10 @@ public class NetServerHandler extends NetHandler implements ICommandListener { this.networkManager.queue(new Packet3Chat(line)); } packet = null; + } else if (packet.k == true) { + // Reroute all low-priority packets through to compression thread. + ChunkCompressionThread.sendPacket(this.player, packet); + packet = null; } if (packet != null) this.networkManager.queue(packet); // CraftBukkit end diff --git a/src/main/java/net/minecraft/server/Packet51MapChunk.java b/src/main/java/net/minecraft/server/Packet51MapChunk.java index c2ca83bdd2..3bf163f621 100644 --- a/src/main/java/net/minecraft/server/Packet51MapChunk.java +++ b/src/main/java/net/minecraft/server/Packet51MapChunk.java @@ -16,7 +16,8 @@ public class Packet51MapChunk extends Packet { public int e; public int f; public byte[] g; - private int h; + public int h; // CraftBukkit - private -> public + public byte[] rawData; // CraftBukkit public Packet51MapChunk() { this.k = true; @@ -36,6 +37,7 @@ public class Packet51MapChunk extends Packet { this.d = l; this.e = i1; this.f = j1; + /* CraftBukkit - Moved compression into its own method. byte[] abyte = data; // CraftBukkit - uses data from above constructor Deflater deflater = new Deflater(-1); @@ -46,7 +48,8 @@ public class Packet51MapChunk extends Packet { this.h = deflater.deflate(this.g); } finally { deflater.end(); - } + }*/ + this.rawData = data; // CraftBukkit } public void a(DataInputStream datainputstream) throws IOException { // CraftBukkit - throws IOEXception diff --git a/src/main/java/org/bukkit/craftbukkit/ChunkCompressionThread.java b/src/main/java/org/bukkit/craftbukkit/ChunkCompressionThread.java new file mode 100644 index 0000000000..7490002f6d --- /dev/null +++ b/src/main/java/org/bukkit/craftbukkit/ChunkCompressionThread.java @@ -0,0 +1,135 @@ +package org.bukkit.craftbukkit; + +import java.util.HashMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.zip.Deflater; + +import net.minecraft.server.EntityPlayer; +import net.minecraft.server.Packet; +import net.minecraft.server.Packet51MapChunk; + +public final class ChunkCompressionThread implements Runnable { + + private static final ChunkCompressionThread instance = new ChunkCompressionThread(); + private static boolean isRunning = false; + + private final int QUEUE_CAPACITY = 1024 * 10; + private final HashMap queueSizePerPlayer = new HashMap(); + private final BlockingQueue packetQueue = new LinkedBlockingQueue(QUEUE_CAPACITY); + + private final int CHUNK_SIZE = 16 * 128 * 16 * 5 / 2; + private final int REDUCED_DEFLATE_THRESHOLD = CHUNK_SIZE / 4; + private final int DEFLATE_LEVEL_CHUNKS = 6; + private final int DEFLATE_LEVEL_PARTS = 1; + + private final Deflater deflater = new Deflater(); + private byte[] deflateBuffer = new byte[CHUNK_SIZE + 100]; + + public static void startThread() { + if (!isRunning) { + isRunning = true; + new Thread(instance).start(); + } + } + + public void run() { + while (true) { + try { + handleQueuedPacket(packetQueue.take()); + } catch (InterruptedException ie) { + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + private void handleQueuedPacket(QueuedPacket queuedPacket) { + addToPlayerQueueSize(queuedPacket.player, -1); + // Compress the packet if necessary. + if (queuedPacket.compress) { + handleMapChunk(queuedPacket); + } + sendToNetworkQueue(queuedPacket); + } + + private void handleMapChunk(QueuedPacket queuedPacket) { + Packet51MapChunk packet = (Packet51MapChunk) queuedPacket.packet; + + // If 'packet.g' is set then this packet has already been compressed. + if (packet.g != null) { + return; + } + + int dataSize = packet.rawData.length; + if (deflateBuffer.length < dataSize + 100) { + deflateBuffer = new byte[dataSize + 100]; + } + + deflater.reset(); + deflater.setLevel(dataSize < REDUCED_DEFLATE_THRESHOLD ? DEFLATE_LEVEL_PARTS : DEFLATE_LEVEL_CHUNKS); + deflater.setInput(packet.rawData); + deflater.finish(); + int size = deflater.deflate(deflateBuffer); + if (size == 0) { + size = deflater.deflate(deflateBuffer); + } + + // copy compressed data to packet + packet.g = new byte[size]; + packet.h = size; + System.arraycopy(deflateBuffer, 0, packet.g, 0, size); + } + + private void sendToNetworkQueue(QueuedPacket queuedPacket) { + queuedPacket.player.netServerHandler.networkManager.queue(queuedPacket.packet); + } + + public static void sendPacket(EntityPlayer player, Packet packet) { + if (packet instanceof Packet51MapChunk) { + // MapChunk Packets need compressing. + instance.addQueuedPacket(new QueuedPacket(player, packet, true)); + } else { + // Other Packets don't. + instance.addQueuedPacket(new QueuedPacket(player, packet, false)); + } + } + + private void addToPlayerQueueSize(EntityPlayer player, int amount) { + synchronized (queueSizePerPlayer) { + Integer count = queueSizePerPlayer.get(player); + queueSizePerPlayer.put(player, (count == null ? 0 : count) + amount); + } + } + + public static int getPlayerQueueSize(EntityPlayer player) { + synchronized (instance.queueSizePerPlayer) { + Integer count = instance.queueSizePerPlayer.get(player); + return count == null ? 0 : count; + } + } + + private void addQueuedPacket(QueuedPacket task) { + addToPlayerQueueSize(task.player, +1); + + while (true) { + try { + packetQueue.put(task); + return; + } catch (InterruptedException e) { + } + } + } + + private static class QueuedPacket { + final EntityPlayer player; + final Packet packet; + final boolean compress; + + QueuedPacket(EntityPlayer player, Packet packet, boolean compress) { + this.player = player; + this.packet = packet; + this.compress = compress; + } + } +} \ No newline at end of file diff --git a/src/main/java/org/bukkit/craftbukkit/CraftServer.java b/src/main/java/org/bukkit/craftbukkit/CraftServer.java index 692fdd6d11..c4a87b8079 100644 --- a/src/main/java/org/bukkit/craftbukkit/CraftServer.java +++ b/src/main/java/org/bukkit/craftbukkit/CraftServer.java @@ -96,6 +96,8 @@ public final class CraftServer implements Server { loadConfig(); loadPlugins(); enablePlugins(PluginLoadOrder.STARTUP); + + ChunkCompressionThread.startThread(); } private void loadConfig() {