Skip to content

Commit 386adba

Browse files
retapranikum
authored andcommitted
Support for HTTP/2 (server-side) (opensearch-project#3847)
* Support for HTTP/2 (server-side) Signed-off-by: Andriy Redko <andriy.redko@aiven.io> * Addressing code review comments Signed-off-by: Andriy Redko <andriy.redko@aiven.io> * Added HTTP/1.1 channel configuration Signed-off-by: Andriy Redko <andriy.redko@aiven.io> * Addressing code review comments Signed-off-by: Andriy Redko <andriy.redko@aiven.io> * Update pul request URL in CHANGELOG.md Signed-off-by: Andriy Redko <andriy.redko@aiven.io> Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
1 parent 0a38021 commit 386adba

12 files changed

+428
-32
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
88
- Added release notes for 1.3.5 ([#4343](https://github.com/opensearch-project/OpenSearch/pull/4343))
99
- Added release notes for 2.2.1 ([#4344](https://github.com/opensearch-project/OpenSearch/pull/4344))
1010
- Label configuration for dependabot PRs ([#4348](https://github.com/opensearch-project/OpenSearch/pull/4348))
11+
- Support for HTTP/2 (server-side) ([#3847](https://github.com/opensearch-project/OpenSearch/pull/3847))
1112

1213
### Changed
1314
- Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308))

modules/transport-netty4/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ dependencies {
5858
api "io.netty:netty-buffer:${versions.netty}"
5959
api "io.netty:netty-codec:${versions.netty}"
6060
api "io.netty:netty-codec-http:${versions.netty}"
61+
api "io.netty:netty-codec-http2:${versions.netty}"
6162
api "io.netty:netty-common:${versions.netty}"
6263
api "io.netty:netty-handler:${versions.netty}"
6364
api "io.netty:netty-resolver:${versions.netty}"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
0eeffab0cd5efb699d5e4ab9b694d32fef6694b3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.http.netty4;
10+
11+
import io.netty.handler.codec.http.FullHttpResponse;
12+
import io.netty.util.ReferenceCounted;
13+
import org.opensearch.OpenSearchNetty4IntegTestCase;
14+
import org.opensearch.common.transport.TransportAddress;
15+
import org.opensearch.http.HttpServerTransport;
16+
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
17+
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
18+
19+
import java.util.Collection;
20+
import java.util.Locale;
21+
import java.util.stream.IntStream;
22+
23+
import static org.hamcrest.Matchers.containsInAnyOrder;
24+
import static org.hamcrest.Matchers.hasSize;
25+
26+
@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
27+
public class Netty4Http2IT extends OpenSearchNetty4IntegTestCase {
28+
29+
@Override
30+
protected boolean addMockHttpTransport() {
31+
return false; // enable http
32+
}
33+
34+
public void testThatNettyHttpServerSupportsHttp2() throws Exception {
35+
String[] requests = new String[] { "/", "/_nodes/stats", "/", "/_cluster/state", "/" };
36+
37+
HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
38+
TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses();
39+
TransportAddress transportAddress = randomFrom(boundAddresses);
40+
41+
try (Netty4HttpClient nettyHttpClient = Netty4HttpClient.http2()) {
42+
Collection<FullHttpResponse> responses = nettyHttpClient.get(transportAddress.address(), requests);
43+
try {
44+
assertThat(responses, hasSize(5));
45+
46+
Collection<String> opaqueIds = Netty4HttpClient.returnOpaqueIds(responses);
47+
assertOpaqueIdsInAnyOrder(opaqueIds);
48+
} finally {
49+
responses.forEach(ReferenceCounted::release);
50+
}
51+
}
52+
}
53+
54+
private void assertOpaqueIdsInAnyOrder(Collection<String> opaqueIds) {
55+
// check if opaque ids are present in any order, since for HTTP/2 we use streaming (no head of line blocking)
56+
// and responses may come back at any order
57+
int i = 0;
58+
String msg = String.format(Locale.ROOT, "Expected list of opaque ids to be in any order, got [%s]", opaqueIds);
59+
assertThat(msg, opaqueIds, containsInAnyOrder(IntStream.range(0, 5).mapToObj(Integer::toString).toArray()));
60+
}
61+
62+
}

modules/transport-netty4/src/internalClusterTest/java/org/opensearch/http/netty4/Netty4HttpRequestSizeLimitIT.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public void testLimitsInFlightRequests() throws Exception {
100100
HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
101101
TransportAddress transportAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses());
102102

103-
try (Netty4HttpClient nettyHttpClient = new Netty4HttpClient()) {
103+
try (Netty4HttpClient nettyHttpClient = Netty4HttpClient.http()) {
104104
Collection<FullHttpResponse> singleResponse = nettyHttpClient.post(transportAddress.address(), requests.subList(0, 1));
105105
try {
106106
assertThat(singleResponse, hasSize(1));
@@ -130,7 +130,7 @@ public void testDoesNotLimitExcludedRequests() throws Exception {
130130
HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
131131
TransportAddress transportAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses());
132132

133-
try (Netty4HttpClient nettyHttpClient = new Netty4HttpClient()) {
133+
try (Netty4HttpClient nettyHttpClient = Netty4HttpClient.http()) {
134134
Collection<FullHttpResponse> responses = nettyHttpClient.put(transportAddress.address(), requestUris);
135135
try {
136136
assertThat(responses, hasSize(requestUris.size()));

modules/transport-netty4/src/internalClusterTest/java/org/opensearch/http/netty4/Netty4PipeliningIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void testThatNettyHttpServerSupportsPipelining() throws Exception {
6161
TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses();
6262
TransportAddress transportAddress = randomFrom(boundAddresses);
6363

64-
try (Netty4HttpClient nettyHttpClient = new Netty4HttpClient()) {
64+
try (Netty4HttpClient nettyHttpClient = Netty4HttpClient.http()) {
6565
Collection<FullHttpResponse> responses = nettyHttpClient.get(transportAddress.address(), requests);
6666
try {
6767
assertThat(responses, hasSize(5));

modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpChannel.java

+13
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@
3333
package org.opensearch.http.netty4;
3434

3535
import io.netty.channel.Channel;
36+
import io.netty.channel.ChannelPipeline;
37+
3638
import org.opensearch.action.ActionListener;
39+
import org.opensearch.common.Nullable;
3740
import org.opensearch.common.concurrent.CompletableContext;
3841
import org.opensearch.http.HttpChannel;
3942
import org.opensearch.http.HttpResponse;
@@ -45,9 +48,15 @@ public class Netty4HttpChannel implements HttpChannel {
4548

4649
private final Channel channel;
4750
private final CompletableContext<Void> closeContext = new CompletableContext<>();
51+
private final ChannelPipeline inboundPipeline;
4852

4953
Netty4HttpChannel(Channel channel) {
54+
this(channel, null);
55+
}
56+
57+
Netty4HttpChannel(Channel channel, ChannelPipeline inboundPipeline) {
5058
this.channel = channel;
59+
this.inboundPipeline = inboundPipeline;
5160
Netty4TcpChannel.addListener(this.channel.closeFuture(), closeContext);
5261
}
5362

@@ -81,6 +90,10 @@ public void close() {
8190
channel.close();
8291
}
8392

93+
public @Nullable ChannelPipeline inboundPipeline() {
94+
return inboundPipeline;
95+
}
96+
8497
public Channel getNettyChannel() {
8598
return channel;
8699
}

modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java

+146-14
Original file line numberDiff line numberDiff line change
@@ -40,18 +40,36 @@
4040
import io.netty.channel.ChannelInboundHandlerAdapter;
4141
import io.netty.channel.ChannelInitializer;
4242
import io.netty.channel.ChannelOption;
43+
import io.netty.channel.ChannelPipeline;
4344
import io.netty.channel.FixedRecvByteBufAllocator;
4445
import io.netty.channel.RecvByteBufAllocator;
46+
import io.netty.channel.SimpleChannelInboundHandler;
4547
import io.netty.channel.socket.nio.NioChannelOption;
4648
import io.netty.handler.codec.ByteToMessageDecoder;
4749
import io.netty.handler.codec.http.HttpContentCompressor;
4850
import io.netty.handler.codec.http.HttpContentDecompressor;
51+
import io.netty.handler.codec.http.HttpMessage;
4952
import io.netty.handler.codec.http.HttpObjectAggregator;
5053
import io.netty.handler.codec.http.HttpRequestDecoder;
5154
import io.netty.handler.codec.http.HttpResponseEncoder;
55+
import io.netty.handler.codec.http.HttpServerCodec;
56+
import io.netty.handler.codec.http.HttpServerUpgradeHandler;
57+
import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeCodec;
58+
import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeCodecFactory;
59+
import io.netty.handler.codec.http2.CleartextHttp2ServerUpgradeHandler;
60+
import io.netty.handler.codec.http2.Http2CodecUtil;
61+
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
62+
import io.netty.handler.codec.http2.Http2MultiplexHandler;
63+
import io.netty.handler.codec.http2.Http2ServerUpgradeCodec;
64+
import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec;
65+
import io.netty.handler.logging.LogLevel;
66+
import io.netty.handler.logging.LoggingHandler;
5267
import io.netty.handler.timeout.ReadTimeoutException;
5368
import io.netty.handler.timeout.ReadTimeoutHandler;
69+
import io.netty.util.AsciiString;
5470
import io.netty.util.AttributeKey;
71+
import io.netty.util.ReferenceCountUtil;
72+
5573
import org.apache.logging.log4j.LogManager;
5674
import org.apache.logging.log4j.Logger;
5775
import org.opensearch.ExceptionsHelper;
@@ -335,38 +353,152 @@ protected HttpChannelHandler(final Netty4HttpServerTransport transport, final Ht
335353
this.responseCreator = new Netty4HttpResponseCreator();
336354
}
337355

356+
public ChannelHandler getRequestHandler() {
357+
return requestHandler;
358+
}
359+
338360
@Override
339361
protected void initChannel(Channel ch) throws Exception {
340362
Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
341363
ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
342364
ch.pipeline().addLast("byte_buf_sizer", byteBufSizer);
343365
ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
366+
367+
configurePipeline(ch);
368+
transport.serverAcceptedChannel(nettyHttpChannel);
369+
}
370+
371+
@Override
372+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
373+
ExceptionsHelper.maybeDieOnAnotherThread(cause);
374+
super.exceptionCaught(ctx, cause);
375+
}
376+
377+
protected void configurePipeline(Channel ch) {
378+
final UpgradeCodecFactory upgradeCodecFactory = new UpgradeCodecFactory() {
379+
@Override
380+
public UpgradeCodec newUpgradeCodec(CharSequence protocol) {
381+
if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) {
382+
return new Http2ServerUpgradeCodec(
383+
Http2FrameCodecBuilder.forServer().build(),
384+
new Http2MultiplexHandler(createHttp2ChannelInitializer(ch.pipeline()))
385+
);
386+
} else {
387+
return null;
388+
}
389+
}
390+
};
391+
392+
final HttpServerCodec sourceCodec = new HttpServerCodec(
393+
handlingSettings.getMaxInitialLineLength(),
394+
handlingSettings.getMaxHeaderSize(),
395+
handlingSettings.getMaxChunkSize()
396+
);
397+
398+
final HttpServerUpgradeHandler upgradeHandler = new HttpServerUpgradeHandler(sourceCodec, upgradeCodecFactory);
399+
final CleartextHttp2ServerUpgradeHandler cleartextUpgradeHandler = new CleartextHttp2ServerUpgradeHandler(
400+
sourceCodec,
401+
upgradeHandler,
402+
createHttp2ChannelInitializerPriorKnowledge()
403+
);
404+
405+
ch.pipeline().addLast(cleartextUpgradeHandler).addLast(new SimpleChannelInboundHandler<HttpMessage>() {
406+
@Override
407+
protected void channelRead0(ChannelHandlerContext ctx, HttpMessage msg) throws Exception {
408+
final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
409+
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
410+
411+
// If this handler is hit then no upgrade has been attempted and the client is just talking HTTP
412+
final ChannelPipeline pipeline = ctx.pipeline();
413+
pipeline.addAfter(ctx.name(), "handler", getRequestHandler());
414+
pipeline.replace(this, "aggregator", aggregator);
415+
416+
ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());
417+
ch.pipeline().addLast("encoder", new HttpResponseEncoder());
418+
if (handlingSettings.isCompression()) {
419+
ch.pipeline()
420+
.addAfter("aggregator", "encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
421+
}
422+
ch.pipeline().addBefore("handler", "request_creator", requestCreator);
423+
ch.pipeline().addBefore("handler", "response_creator", responseCreator);
424+
ch.pipeline()
425+
.addBefore("handler", "pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));
426+
427+
ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
428+
}
429+
});
430+
}
431+
432+
protected void configureDefaultHttpPipeline(ChannelPipeline pipeline) {
344433
final HttpRequestDecoder decoder = new HttpRequestDecoder(
345434
handlingSettings.getMaxInitialLineLength(),
346435
handlingSettings.getMaxHeaderSize(),
347436
handlingSettings.getMaxChunkSize()
348437
);
349438
decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
350-
ch.pipeline().addLast("decoder", decoder);
351-
ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());
352-
ch.pipeline().addLast("encoder", new HttpResponseEncoder());
439+
pipeline.addLast("decoder", decoder);
440+
pipeline.addLast("decoder_compress", new HttpContentDecompressor());
441+
pipeline.addLast("encoder", new HttpResponseEncoder());
353442
final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
354443
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
355-
ch.pipeline().addLast("aggregator", aggregator);
444+
pipeline.addLast("aggregator", aggregator);
356445
if (handlingSettings.isCompression()) {
357-
ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
446+
pipeline.addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
358447
}
359-
ch.pipeline().addLast("request_creator", requestCreator);
360-
ch.pipeline().addLast("response_creator", responseCreator);
361-
ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));
362-
ch.pipeline().addLast("handler", requestHandler);
363-
transport.serverAcceptedChannel(nettyHttpChannel);
448+
pipeline.addLast("request_creator", requestCreator);
449+
pipeline.addLast("response_creator", responseCreator);
450+
pipeline.addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));
451+
pipeline.addLast("handler", requestHandler);
364452
}
365453

366-
@Override
367-
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
368-
ExceptionsHelper.maybeDieOnAnotherThread(cause);
369-
super.exceptionCaught(ctx, cause);
454+
protected void configureDefaultHttp2Pipeline(ChannelPipeline pipeline) {
455+
pipeline.addLast(Http2FrameCodecBuilder.forServer().build())
456+
.addLast(new Http2MultiplexHandler(createHttp2ChannelInitializer(pipeline)));
457+
}
458+
459+
private ChannelInitializer<Channel> createHttp2ChannelInitializerPriorKnowledge() {
460+
return new ChannelInitializer<Channel>() {
461+
@Override
462+
protected void initChannel(Channel childChannel) throws Exception {
463+
configureDefaultHttp2Pipeline(childChannel.pipeline());
464+
}
465+
};
466+
}
467+
468+
/**
469+
* Http2MultiplexHandler creates new pipeline, we are preserving the old one in case some handlers need to be
470+
* access (like for example opensearch-security plugin which accesses SSL handlers).
471+
*/
472+
private ChannelInitializer<Channel> createHttp2ChannelInitializer(ChannelPipeline inboundPipeline) {
473+
return new ChannelInitializer<Channel>() {
474+
@Override
475+
protected void initChannel(Channel childChannel) throws Exception {
476+
final Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(childChannel, inboundPipeline);
477+
childChannel.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
478+
479+
final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
480+
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
481+
482+
childChannel.pipeline()
483+
.addLast(new LoggingHandler(LogLevel.DEBUG))
484+
.addLast(new Http2StreamFrameToHttpObjectCodec(true))
485+
.addLast("byte_buf_sizer", byteBufSizer)
486+
.addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS))
487+
.addLast("decoder_decompress", new HttpContentDecompressor());
488+
489+
if (handlingSettings.isCompression()) {
490+
childChannel.pipeline()
491+
.addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
492+
}
493+
494+
childChannel.pipeline()
495+
.addLast("aggregator", aggregator)
496+
.addLast("request_creator", requestCreator)
497+
.addLast("response_creator", responseCreator)
498+
.addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents))
499+
.addLast("handler", getRequestHandler());
500+
}
501+
};
370502
}
371503
}
372504

modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4BadRequestTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext,
117117
httpServerTransport.start();
118118
final TransportAddress transportAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses());
119119

120-
try (Netty4HttpClient nettyHttpClient = new Netty4HttpClient()) {
120+
try (Netty4HttpClient nettyHttpClient = Netty4HttpClient.http()) {
121121
final Collection<FullHttpResponse> responses = nettyHttpClient.get(
122122
transportAddress.address(),
123123
"/_cluster/settings?pretty=%"

0 commit comments

Comments
 (0)