mirror of
https://github.com/SpigotMC/BungeeCord.git
synced 2024-09-29 22:27:29 +02:00
#3392: Consolidate flushes to reduce syscalls, improves performance
Based on Netty FlushConsolidationHandler
This commit is contained in:
parent
ee02d98cb2
commit
0376eacc04
5
pom.xml
5
pom.xml
@ -22,6 +22,11 @@
|
||||
<url>https://github.com/SpigotMC/BungeeCord/blob/master/LICENSE</url>
|
||||
<distribution>repo</distribution>
|
||||
</license>
|
||||
<license>
|
||||
<name>Apache License Version 2.0</name>
|
||||
<url>https://www.apache.org/licenses/LICENSE-2.0</url>
|
||||
<comments>Applies only to original versions of files in proxy/src/main/java/net/md_5/bungee/netty/flush/</comments>
|
||||
</license>
|
||||
</licenses>
|
||||
|
||||
<developers>
|
||||
|
@ -388,6 +388,10 @@ public class ServerConnector extends PacketHandler
|
||||
user.setServer( server );
|
||||
ch.getHandle().pipeline().get( HandlerBoss.class ).setHandler( new DownstreamBridge( bungee, user, server ) );
|
||||
|
||||
// Updates the flush handler connections (the get/set methods add the channel handlers if needed)
|
||||
ch.setFlushSignalingTarget( user.getCh().getFlushConsolidationHandler( false ) );
|
||||
user.getCh().setFlushSignalingTarget( ch.getFlushConsolidationHandler( true ) );
|
||||
|
||||
bungee.getPluginManager().callEvent( new ServerSwitchEvent( user, from ) );
|
||||
|
||||
thisState = State.FINISHED;
|
||||
|
@ -347,7 +347,7 @@ public final class UserConnection implements ProxiedPlayer
|
||||
|
||||
pendingConnects.add( target );
|
||||
|
||||
ChannelInitializer initializer = new ChannelInitializer()
|
||||
ChannelInitializer<Channel> initializer = new ChannelInitializer<Channel>()
|
||||
{
|
||||
@Override
|
||||
protected void initChannel(Channel ch) throws Exception
|
||||
|
@ -11,6 +11,8 @@ import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import net.md_5.bungee.compress.PacketCompressor;
|
||||
import net.md_5.bungee.compress.PacketDecompressor;
|
||||
import net.md_5.bungee.netty.flush.BungeeFlushConsolidationHandler;
|
||||
import net.md_5.bungee.netty.flush.FlushSignalingHandler;
|
||||
import net.md_5.bungee.protocol.DefinedPacket;
|
||||
import net.md_5.bungee.protocol.MinecraftDecoder;
|
||||
import net.md_5.bungee.protocol.MinecraftEncoder;
|
||||
@ -69,6 +71,37 @@ public class ChannelWrapper
|
||||
ch.pipeline().get( MinecraftEncoder.class ).setProtocolVersion( protocol );
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link FlushSignalingHandler} target. If the handler is absent, one will be added.
|
||||
* @param target the (new) target for the flush signaling handler
|
||||
*/
|
||||
public void setFlushSignalingTarget(BungeeFlushConsolidationHandler target)
|
||||
{
|
||||
FlushSignalingHandler handler = ch.pipeline().get( FlushSignalingHandler.class );
|
||||
if ( handler == null )
|
||||
{
|
||||
ch.pipeline().addFirst( PipelineUtils.FLUSH_SIGNALING, new FlushSignalingHandler( target ) );
|
||||
} else
|
||||
{
|
||||
handler.setTarget( target );
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the flush consolidation handler of this channel. If none is present, one will be added.
|
||||
* @param toClient whether this channel is a bungee-client connection
|
||||
* @return the flush consolidation handler for this channel
|
||||
*/
|
||||
public BungeeFlushConsolidationHandler getFlushConsolidationHandler(boolean toClient)
|
||||
{
|
||||
BungeeFlushConsolidationHandler handler = ch.pipeline().get( BungeeFlushConsolidationHandler.class );
|
||||
if ( handler == null )
|
||||
{
|
||||
ch.pipeline().addFirst( PipelineUtils.FLUSH_CONSOLIDATION, handler = BungeeFlushConsolidationHandler.newInstance( toClient ) );
|
||||
}
|
||||
return handler;
|
||||
}
|
||||
|
||||
public int getEncodeVersion()
|
||||
{
|
||||
return ch.pipeline().get( MinecraftEncoder.class ).getProtocolVersion();
|
||||
|
@ -105,6 +105,8 @@ public class PipelineUtils
|
||||
public static final String FRAME_PREPENDER = "frame-prepender";
|
||||
public static final String LEGACY_DECODER = "legacy-decoder";
|
||||
public static final String LEGACY_KICKER = "legacy-kick";
|
||||
public static final String FLUSH_CONSOLIDATION = "flush-consolidation";
|
||||
public static final String FLUSH_SIGNALING = "flush-signaling";
|
||||
|
||||
private static boolean epoll;
|
||||
private static boolean io_uring;
|
||||
|
@ -0,0 +1,255 @@
|
||||
/*
|
||||
* The original file is licensed under the following license:
|
||||
*
|
||||
* Copyright 2016 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*
|
||||
* ---
|
||||
*
|
||||
* This file is based on the io/netty/handler/flush/FlushConsolidationHandler.java file from Netty (v4.1).
|
||||
* It was modified to fit to bungee's use of forwarded connections.
|
||||
* All modifications are licensed under the "Modified BSD 3-Clause License" to be found at
|
||||
* https://github.com/SpigotMC/BungeeCord/blob/master/LICENSE
|
||||
*/
|
||||
package net.md_5.bungee.netty.flush;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelDuplexHandler;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelOutboundHandler;
|
||||
import io.netty.channel.ChannelOutboundInvoker;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.util.internal.ObjectUtil;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
/**
|
||||
* {@link ChannelDuplexHandler} which consolidates {@link Channel#flush()} / {@link ChannelHandlerContext#flush()}
|
||||
* operations (which also includes
|
||||
* {@link Channel#writeAndFlush(Object)} / {@link Channel#writeAndFlush(Object, ChannelPromise)} and
|
||||
* {@link ChannelOutboundInvoker#writeAndFlush(Object)} /
|
||||
* {@link ChannelOutboundInvoker#writeAndFlush(Object, ChannelPromise)}).
|
||||
* <p>
|
||||
* Flush operations are generally speaking expensive as these may trigger a syscall on the transport level. Thus it is
|
||||
* in most cases (where write latency can be traded with throughput) a good idea to try to minimize flush operations
|
||||
* as much as possible.
|
||||
* <p>
|
||||
* If a {@link FlushSignalingHandler} signalises a read loop is currently ongoing,
|
||||
* {@link #flush(ChannelHandlerContext)} will not be passed on to the next {@link ChannelOutboundHandler} in the
|
||||
* {@link ChannelPipeline}, as it will pick up any pending flushes when
|
||||
* {@link #channelReadComplete(ChannelHandlerContext)} is triggered.
|
||||
* If no read loop is ongoing, the behavior depends on the {@code consolidateWhenNoReadInProgress} constructor argument:
|
||||
* <ul>
|
||||
* <li>if {@code false}, flushes are passed on to the next handler directly;</li>
|
||||
* <li>if {@code true}, the invocation of the next handler is submitted as a separate task on the event loop. Under
|
||||
* high throughput, this gives the opportunity to process other flushes before the task gets executed, thus
|
||||
* batching multiple flushes into one.</li>
|
||||
* </ul>
|
||||
* If {@code explicitFlushAfterFlushes} is reached the flush will be forwarded as well (whether while in a read loop, or
|
||||
* while batching outside of a read loop).
|
||||
* <p>
|
||||
* If the {@link Channel} becomes non-writable it will also try to execute any pending flush operations.
|
||||
* <p>
|
||||
* The {@link BungeeFlushConsolidationHandler} should be put as first {@link ChannelHandler} in the
|
||||
* {@link ChannelPipeline} to have the best effect.
|
||||
*/
|
||||
public final class BungeeFlushConsolidationHandler extends ChannelDuplexHandler
|
||||
{
|
||||
private final int explicitFlushAfterFlushes;
|
||||
private final boolean consolidateWhenNoReadInProgress;
|
||||
private final Runnable flushTask;
|
||||
private int flushPendingCount;
|
||||
boolean readInProgress;
|
||||
ChannelHandlerContext ctx;
|
||||
private Future<?> nextScheduledFlush;
|
||||
|
||||
/**
|
||||
* The default number of flushes after which a flush will be forwarded to downstream handlers (whether while in a
|
||||
* read loop, or while batching outside of a read loop).
|
||||
*/
|
||||
public static final int DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES = 256;
|
||||
|
||||
/**
|
||||
* Creates a new instance with bungee's default values
|
||||
*
|
||||
* @param toClient whether this handler is for a bungee-client connection
|
||||
* @return a new instance of BungeeFlushConsolidationHandler
|
||||
*/
|
||||
public static BungeeFlushConsolidationHandler newInstance(boolean toClient)
|
||||
{
|
||||
// Currently the toClient boolean is ignored. It is present in case we find different parameters neccessary
|
||||
// for client and server connections.
|
||||
return new BungeeFlushConsolidationHandler( 20, false );
|
||||
}
|
||||
|
||||
/**
|
||||
* Create new instance which explicit flush after {@value DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES} pending flush
|
||||
* operations at the latest.
|
||||
*/
|
||||
private BungeeFlushConsolidationHandler()
|
||||
{
|
||||
this( DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, false );
|
||||
}
|
||||
|
||||
/**
|
||||
* Create new instance which doesn't consolidate flushes when no read is in progress.
|
||||
*
|
||||
* @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done.
|
||||
*/
|
||||
private BungeeFlushConsolidationHandler(int explicitFlushAfterFlushes)
|
||||
{
|
||||
this( explicitFlushAfterFlushes, false );
|
||||
}
|
||||
|
||||
/**
|
||||
* Create new instance.
|
||||
*
|
||||
* @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done.
|
||||
* @param consolidateWhenNoReadInProgress whether to consolidate flushes even when no read loop is currently
|
||||
* ongoing.
|
||||
*/
|
||||
private BungeeFlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress)
|
||||
{
|
||||
this.explicitFlushAfterFlushes = ObjectUtil.checkPositive( explicitFlushAfterFlushes, "explicitFlushAfterFlushes" );
|
||||
this.consolidateWhenNoReadInProgress = consolidateWhenNoReadInProgress;
|
||||
this.flushTask = consolidateWhenNoReadInProgress ? new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
if ( flushPendingCount > 0 && !readInProgress )
|
||||
{
|
||||
flushPendingCount = 0;
|
||||
nextScheduledFlush = null;
|
||||
ctx.flush();
|
||||
} // else we'll flush when the read completes
|
||||
}
|
||||
} : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception
|
||||
{
|
||||
this.ctx = ctx;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush(ChannelHandlerContext ctx) throws Exception
|
||||
{
|
||||
if ( readInProgress )
|
||||
{
|
||||
// If there is still a read in progress we are sure we will see a channelReadComplete(...) call. Thus
|
||||
// we only need to flush if we reach the explicitFlushAfterFlushes limit.
|
||||
if ( ++flushPendingCount == explicitFlushAfterFlushes )
|
||||
{
|
||||
flushNow( ctx );
|
||||
}
|
||||
} else if ( consolidateWhenNoReadInProgress )
|
||||
{
|
||||
// Flush immediately if we reach the threshold, otherwise schedule
|
||||
if ( ++flushPendingCount == explicitFlushAfterFlushes )
|
||||
{
|
||||
flushNow( ctx );
|
||||
} else
|
||||
{
|
||||
scheduleFlush( ctx );
|
||||
}
|
||||
} else
|
||||
{
|
||||
// Always flush directly
|
||||
flushNow( ctx );
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
|
||||
{
|
||||
// To ensure we not miss to flush anything, do it now.
|
||||
resetReadAndFlushIfNeeded( ctx );
|
||||
ctx.fireExceptionCaught( cause );
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception
|
||||
{
|
||||
// Try to flush one last time if flushes are pending before disconnect the channel.
|
||||
resetReadAndFlushIfNeeded( ctx );
|
||||
ctx.disconnect( promise );
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception
|
||||
{
|
||||
// Try to flush one last time if flushes are pending before close the channel.
|
||||
resetReadAndFlushIfNeeded( ctx );
|
||||
ctx.close( promise );
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception
|
||||
{
|
||||
if ( !ctx.channel().isWritable() )
|
||||
{
|
||||
// The writability of the channel changed to false, so flush all consolidated flushes now to free up memory.
|
||||
flushIfNeeded( ctx );
|
||||
}
|
||||
ctx.fireChannelWritabilityChanged();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception
|
||||
{
|
||||
flushIfNeeded( ctx );
|
||||
}
|
||||
|
||||
void resetReadAndFlushIfNeeded(ChannelHandlerContext ctx)
|
||||
{
|
||||
readInProgress = false;
|
||||
flushIfNeeded( ctx );
|
||||
}
|
||||
|
||||
void flushIfNeeded(ChannelHandlerContext ctx)
|
||||
{
|
||||
if ( flushPendingCount > 0 )
|
||||
{
|
||||
flushNow( ctx );
|
||||
}
|
||||
}
|
||||
|
||||
private void flushNow(ChannelHandlerContext ctx)
|
||||
{
|
||||
cancelScheduledFlush();
|
||||
flushPendingCount = 0;
|
||||
ctx.flush();
|
||||
}
|
||||
|
||||
private void scheduleFlush(final ChannelHandlerContext ctx)
|
||||
{
|
||||
if ( nextScheduledFlush == null )
|
||||
{
|
||||
// Run as soon as possible, but still yield to give a chance for additional writes to enqueue.
|
||||
nextScheduledFlush = ctx.channel().eventLoop().submit( flushTask );
|
||||
}
|
||||
}
|
||||
|
||||
private void cancelScheduledFlush()
|
||||
{
|
||||
if ( nextScheduledFlush != null )
|
||||
{
|
||||
nextScheduledFlush.cancel( false );
|
||||
nextScheduledFlush = null;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,94 @@
|
||||
/*
|
||||
* The original file is licensed under the following license:
|
||||
*
|
||||
* Copyright 2016 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*
|
||||
* ---
|
||||
*
|
||||
* This file is partly based on the io/netty/handler/flush/FlushConsolidationHandler.java file from Netty (v4.1).
|
||||
* It was modified to fit to bungee's use of forwarded connections.
|
||||
* All modifications are licensed under the "Modified BSD 3-Clause License" to be found at https://github.com/SpigotMC/BungeeCord/blob/master/LICENSE
|
||||
*/
|
||||
package net.md_5.bungee.netty.flush;
|
||||
|
||||
import io.netty.channel.ChannelDuplexHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
|
||||
/**
|
||||
* Signalises a read loop is currently ongoing to {@link BungeeFlushConsolidationHandler}.
|
||||
*/
|
||||
public final class FlushSignalingHandler extends ChannelDuplexHandler
|
||||
{
|
||||
private BungeeFlushConsolidationHandler target;
|
||||
|
||||
public FlushSignalingHandler(BungeeFlushConsolidationHandler target)
|
||||
{
|
||||
this.target = target;
|
||||
}
|
||||
|
||||
public void setTarget(BungeeFlushConsolidationHandler target)
|
||||
{
|
||||
// flush old target
|
||||
this.target.resetReadAndFlushIfNeeded( this.target.ctx );
|
||||
// set new target
|
||||
this.target = target;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
|
||||
{
|
||||
// This may be the last event in the read loop, so flush now!
|
||||
target.resetReadAndFlushIfNeeded( target.ctx );
|
||||
ctx.fireChannelReadComplete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
|
||||
{
|
||||
target.readInProgress = true;
|
||||
ctx.fireChannelRead( msg );
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
|
||||
{
|
||||
// To ensure we not miss to flush anything, do it now.
|
||||
target.resetReadAndFlushIfNeeded( target.ctx );
|
||||
ctx.fireExceptionCaught( cause );
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception
|
||||
{
|
||||
// Try to flush one last time if flushes are pending before disconnect the channel.
|
||||
target.resetReadAndFlushIfNeeded( target.ctx );
|
||||
ctx.disconnect( promise );
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception
|
||||
{
|
||||
// Try to flush one last time if flushes are pending before close the channel.
|
||||
target.resetReadAndFlushIfNeeded( target.ctx );
|
||||
ctx.close( promise );
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception
|
||||
{
|
||||
target.flushIfNeeded( target.ctx );
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user