Implement abillity to handle Http-Requests asyncrounously
This commit is contained in:
parent
22ed75c513
commit
1be7dd5746
|
@ -6,18 +6,28 @@ import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
|
import java.nio.channels.Channel;
|
||||||
import java.nio.channels.SelectableChannel;
|
import java.nio.channels.SelectableChannel;
|
||||||
import java.nio.channels.SelectionKey;
|
import java.nio.channels.SelectionKey;
|
||||||
import java.nio.channels.SocketChannel;
|
import java.nio.channels.SocketChannel;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
public class HttpConnection implements SelectionConsumer {
|
public class HttpConnection implements SelectionConsumer {
|
||||||
|
|
||||||
private final HttpRequestHandler requestHandler;
|
private final HttpRequestHandler requestHandler;
|
||||||
|
private final Executor responseHandlerExecutor;
|
||||||
private HttpRequest request;
|
private HttpRequest request;
|
||||||
|
private CompletableFuture<HttpResponse> futureResponse;
|
||||||
private HttpResponse response;
|
private HttpResponse response;
|
||||||
|
|
||||||
public HttpConnection(HttpRequestHandler requestHandler) {
|
public HttpConnection(HttpRequestHandler requestHandler) {
|
||||||
|
this(requestHandler, Runnable::run); //run synchronously
|
||||||
|
}
|
||||||
|
|
||||||
|
public HttpConnection(HttpRequestHandler requestHandler, Executor responseHandlerExecutor) {
|
||||||
this.requestHandler = requestHandler;
|
this.requestHandler = requestHandler;
|
||||||
|
this.responseHandlerExecutor = responseHandlerExecutor;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -48,10 +58,22 @@ public class HttpConnection implements SelectionConsumer {
|
||||||
}
|
}
|
||||||
|
|
||||||
// process request
|
// process request
|
||||||
if (response == null) {
|
if (futureResponse == null) {
|
||||||
this.response = requestHandler.handle(request);
|
futureResponse = CompletableFuture.supplyAsync(
|
||||||
|
() -> requestHandler.handle(request),
|
||||||
|
responseHandlerExecutor
|
||||||
|
);
|
||||||
|
futureResponse.thenAccept(response -> {
|
||||||
|
try {
|
||||||
|
response.read(channel); // do an initial read to trigger response sending intent
|
||||||
|
this.response = response;
|
||||||
|
} catch (IOException e) {
|
||||||
|
handleIOException(channel, e);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (response == null) return;
|
||||||
if (!selectionKey.isValid()) return;
|
if (!selectionKey.isValid()) return;
|
||||||
|
|
||||||
// send response
|
// send response
|
||||||
|
@ -63,16 +85,24 @@ public class HttpConnection implements SelectionConsumer {
|
||||||
// reset to accept new request
|
// reset to accept new request
|
||||||
request.clear();
|
request.clear();
|
||||||
response.close();
|
response.close();
|
||||||
|
futureResponse = null;
|
||||||
response = null;
|
response = null;
|
||||||
selectionKey.interestOps(SelectionKey.OP_READ);
|
selectionKey.interestOps(SelectionKey.OP_READ);
|
||||||
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
Logger.global.logDebug("Failed to process selection: " + e);
|
handleIOException(channel, e);
|
||||||
try {
|
}
|
||||||
channel.close();
|
}
|
||||||
} catch (IOException e2) {
|
|
||||||
Logger.global.logWarning("Failed to close channel" + e2);
|
private void handleIOException(Channel channel, IOException e) {
|
||||||
}
|
request.clear();
|
||||||
|
response = null;
|
||||||
|
|
||||||
|
Logger.global.logDebug("Failed to process selection: " + e);
|
||||||
|
try {
|
||||||
|
channel.close();
|
||||||
|
} catch (IOException e2) {
|
||||||
|
Logger.global.logWarning("Failed to close channel" + e2);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -57,7 +57,7 @@ public class HttpResponse implements Closeable {
|
||||||
this.headers = new HashMap<>();
|
this.headers = new HashMap<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean read(WritableByteChannel channel) throws IOException {
|
public synchronized boolean read(WritableByteChannel channel) throws IOException {
|
||||||
if (complete) return true;
|
if (complete) return true;
|
||||||
|
|
||||||
// send headers
|
// send headers
|
||||||
|
|
|
@ -13,6 +13,10 @@ public class HttpServer extends Server {
|
||||||
@Override
|
@Override
|
||||||
public SelectionConsumer createConnectionHandler() {
|
public SelectionConsumer createConnectionHandler() {
|
||||||
return new HttpConnection(requestHandler);
|
return new HttpConnection(requestHandler);
|
||||||
|
|
||||||
|
// Enable async request handling ...
|
||||||
|
// TODO: maybe find a better/separate executor than using bluemap's common thread-pool
|
||||||
|
//return new HttpConnection(requestHandler, BlueMap.THREAD_POOL);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue