8
8
9
9
package org .opensearch .http .reactor .netty4 ;
10
10
11
+ import org .opensearch .common .Nullable ;
11
12
import org .opensearch .common .network .NetworkService ;
12
13
import org .opensearch .common .settings .ClusterSettings ;
13
14
import org .opensearch .common .settings .Setting ;
23
24
import org .opensearch .http .HttpChannel ;
24
25
import org .opensearch .http .HttpReadTimeoutException ;
25
26
import org .opensearch .http .HttpServerChannel ;
27
+ import org .opensearch .http .reactor .netty4 .ssl .SslUtils ;
28
+ import org .opensearch .plugins .SecureHttpTransportSettingsProvider ;
26
29
import org .opensearch .telemetry .tracing .Tracer ;
27
30
import org .opensearch .threadpool .ThreadPool ;
28
31
import org .opensearch .transport .reactor .SharedGroupFactory ;
29
32
import org .opensearch .transport .reactor .netty4 .Netty4Utils ;
30
33
34
+ import javax .net .ssl .SSLEngine ;
35
+ import javax .net .ssl .SSLException ;
36
+ import javax .net .ssl .SSLSessionContext ;
37
+
31
38
import java .net .InetSocketAddress ;
32
39
import java .net .SocketOption ;
33
40
import java .time .Duration ;
41
+ import java .util .Arrays ;
42
+ import java .util .List ;
34
43
44
+ import io .netty .buffer .ByteBufAllocator ;
35
45
import io .netty .channel .ChannelOption ;
36
46
import io .netty .channel .socket .nio .NioChannelOption ;
37
47
import io .netty .handler .codec .http .DefaultLastHttpContent ;
38
48
import io .netty .handler .codec .http .FullHttpResponse ;
39
49
import io .netty .handler .codec .http .HttpContent ;
50
+ import io .netty .handler .ssl .ApplicationProtocolNegotiator ;
51
+ import io .netty .handler .ssl .SslContext ;
40
52
import io .netty .handler .timeout .ReadTimeoutException ;
53
+ import io .netty .util .ReferenceCountUtil ;
41
54
import org .reactivestreams .Publisher ;
42
55
import reactor .core .publisher .Mono ;
43
56
import reactor .core .scheduler .Scheduler ;
@@ -116,6 +129,7 @@ public class ReactorNetty4HttpServerTransport extends AbstractHttpServerTranspor
116
129
private final ByteSizeValue maxInitialLineLength ;
117
130
private final ByteSizeValue maxHeaderSize ;
118
131
private final ByteSizeValue maxChunkSize ;
132
+ private final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider ;
119
133
private volatile SharedGroupFactory .SharedGroup sharedGroup ;
120
134
private volatile DisposableServer disposableServer ;
121
135
private volatile Scheduler scheduler ;
@@ -142,6 +156,45 @@ public ReactorNetty4HttpServerTransport(
142
156
ClusterSettings clusterSettings ,
143
157
SharedGroupFactory sharedGroupFactory ,
144
158
Tracer tracer
159
+ ) {
160
+ this (
161
+ settings ,
162
+ networkService ,
163
+ bigArrays ,
164
+ threadPool ,
165
+ xContentRegistry ,
166
+ dispatcher ,
167
+ clusterSettings ,
168
+ sharedGroupFactory ,
169
+ null ,
170
+ tracer
171
+ );
172
+ }
173
+
174
+ /**
175
+ * Creates new HTTP transport implementations based on Reactor Netty (see please {@link HttpServer}).
176
+ * @param settings settings
177
+ * @param networkService network service
178
+ * @param bigArrays big array allocator
179
+ * @param threadPool thread pool instance
180
+ * @param xContentRegistry XContent registry instance
181
+ * @param dispatcher dispatcher instance
182
+ * @param clusterSettings cluster settings
183
+ * @param sharedGroupFactory shared group factory
184
+ * @param secureHttpTransportSettingsProvider secure HTTP transport settings provider
185
+ * @param tracer tracer instance
186
+ */
187
+ public ReactorNetty4HttpServerTransport (
188
+ Settings settings ,
189
+ NetworkService networkService ,
190
+ BigArrays bigArrays ,
191
+ ThreadPool threadPool ,
192
+ NamedXContentRegistry xContentRegistry ,
193
+ Dispatcher dispatcher ,
194
+ ClusterSettings clusterSettings ,
195
+ SharedGroupFactory sharedGroupFactory ,
196
+ @ Nullable SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider ,
197
+ Tracer tracer
145
198
) {
146
199
super (settings , networkService , bigArrays , threadPool , xContentRegistry , dispatcher , clusterSettings , tracer );
147
200
Netty4Utils .setAvailableProcessors (OpenSearchExecutors .NODE_PROCESSORS_SETTING .get (settings ));
@@ -152,6 +205,7 @@ public ReactorNetty4HttpServerTransport(
152
205
this .maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE .get (settings );
153
206
this .maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE .get (settings );
154
207
this .maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH .get (settings );
208
+ this .secureHttpTransportSettingsProvider = secureHttpTransportSettingsProvider ;
155
209
}
156
210
157
211
/**
@@ -160,7 +214,7 @@ public ReactorNetty4HttpServerTransport(
160
214
*/
161
215
@ Override
162
216
protected HttpServerChannel bind (InetSocketAddress socketAddress ) throws Exception {
163
- final HttpServer server = configureChannelOptions (
217
+ final HttpServer server = configure (
164
218
HttpServer .create ()
165
219
.httpFormDecoder (builder -> builder .scheduler (scheduler ))
166
220
.idleTimeout (Duration .ofMillis (connectTimeoutMillis ))
@@ -173,16 +227,15 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Excepti
173
227
.maxHeaderSize (maxHeaderSize .bytesAsInt ())
174
228
.maxInitialLineLength (maxInitialLineLength .bytesAsInt ())
175
229
)
176
- .protocol (HttpProtocol .HTTP11 , HttpProtocol .H2C )
177
230
.handle ((req , res ) -> incomingRequest (req , res ))
178
231
);
179
232
180
233
disposableServer = server .bindNow ();
181
234
return new ReactorNetty4HttpServerChannel (disposableServer .channel ());
182
235
}
183
236
184
- private HttpServer configureChannelOptions (final HttpServer server1 ) {
185
- HttpServer configured = server1 .childOption (ChannelOption .TCP_NODELAY , SETTING_HTTP_TCP_NO_DELAY .get (settings ))
237
+ private HttpServer configure (final HttpServer server ) throws Exception {
238
+ HttpServer configured = server .childOption (ChannelOption .TCP_NODELAY , SETTING_HTTP_TCP_NO_DELAY .get (settings ))
186
239
.childOption (ChannelOption .SO_KEEPALIVE , SETTING_HTTP_TCP_KEEP_ALIVE .get (settings ));
187
240
188
241
if (SETTING_HTTP_TCP_KEEP_ALIVE .get (settings )) {
@@ -229,6 +282,65 @@ private HttpServer configureChannelOptions(final HttpServer server1) {
229
282
configured = configured .option (ChannelOption .SO_REUSEADDR , reuseAddress );
230
283
configured = configured .childOption (ChannelOption .SO_REUSEADDR , reuseAddress );
231
284
285
+ // Configure SSL context if available
286
+ if (secureHttpTransportSettingsProvider != null ) {
287
+ final SSLEngine engine = secureHttpTransportSettingsProvider .buildSecureHttpServerEngine (settings , this )
288
+ .orElseGet (SslUtils ::createDefaultServerSSLEngine );
289
+
290
+ try {
291
+ final List <String > cipherSuites = Arrays .asList (engine .getEnabledCipherSuites ());
292
+ final List <String > applicationProtocols = Arrays .asList (engine .getSSLParameters ().getApplicationProtocols ());
293
+
294
+ configured = configured .secure (spec -> spec .sslContext (new SslContext () {
295
+ @ Override
296
+ public SSLSessionContext sessionContext () {
297
+ throw new UnsupportedOperationException (); /* server only, should never be called */
298
+ }
299
+
300
+ @ Override
301
+ public SSLEngine newEngine (ByteBufAllocator alloc , String peerHost , int peerPort ) {
302
+ throw new UnsupportedOperationException (); /* server only, should never be called */
303
+ }
304
+
305
+ @ Override
306
+ public SSLEngine newEngine (ByteBufAllocator alloc ) {
307
+ try {
308
+ return secureHttpTransportSettingsProvider .buildSecureHttpServerEngine (
309
+ settings ,
310
+ ReactorNetty4HttpServerTransport .this
311
+ ).orElseGet (SslUtils ::createDefaultServerSSLEngine );
312
+ } catch (final SSLException ex ) {
313
+ throw new UnsupportedOperationException ("Unable to create SSLEngine" , ex );
314
+ }
315
+ }
316
+
317
+ @ Override
318
+ public boolean isClient () {
319
+ return false ; /* server only */
320
+ }
321
+
322
+ @ Override
323
+ public List <String > cipherSuites () {
324
+ return cipherSuites ;
325
+ }
326
+
327
+ @ Override
328
+ public ApplicationProtocolNegotiator applicationProtocolNegotiator () {
329
+ return new ApplicationProtocolNegotiator () {
330
+ @ Override
331
+ public List <String > protocols () {
332
+ return applicationProtocols ;
333
+ }
334
+ };
335
+ }
336
+ }).build ()).protocol (HttpProtocol .HTTP11 , HttpProtocol .H2 );
337
+ } finally {
338
+ ReferenceCountUtil .release (engine );
339
+ }
340
+ } else {
341
+ configured = configured .protocol (HttpProtocol .HTTP11 , HttpProtocol .H2C );
342
+ }
343
+
232
344
return configured ;
233
345
}
234
346
@@ -302,6 +414,11 @@ protected void doStart() {
302
414
}
303
415
}
304
416
417
+ /**
418
+ * Exception handler
419
+ * @param channel HTTP channel
420
+ * @param cause exception occurred
421
+ */
305
422
@ Override
306
423
public void onException (HttpChannel channel , Exception cause ) {
307
424
if (cause instanceof ReadTimeoutException ) {
0 commit comments