37
37
import org .opensearch .core .xcontent .NamedXContentRegistry ;
38
38
import org .opensearch .http .HttpChannel ;
39
39
import org .opensearch .http .HttpHandlingSettings ;
40
+ import org .opensearch .http .HttpServerTransport ;
40
41
import org .opensearch .http .netty4 .Netty4HttpServerTransport ;
41
- import org .opensearch .plugins .SecureTransportSettingsProvider ;
42
+ import org .opensearch .plugins .SecureHttpTransportSettingsProvider ;
43
+ import org .opensearch .plugins .TransportExceptionHandler ;
42
44
import org .opensearch .telemetry .tracing .Tracer ;
43
45
import org .opensearch .threadpool .ThreadPool ;
44
46
import org .opensearch .transport .SharedGroupFactory ;
47
+ import org .opensearch .transport .TransportAdapterProvider ;
45
48
import org .opensearch .transport .netty4 .ssl .SslUtils ;
46
49
47
50
import javax .net .ssl .SSLEngine ;
48
51
52
+ import java .util .List ;
53
+ import java .util .Optional ;
54
+ import java .util .stream .Collectors ;
55
+
49
56
import io .netty .channel .Channel ;
50
57
import io .netty .channel .ChannelHandler ;
58
+ import io .netty .channel .ChannelInboundHandlerAdapter ;
51
59
import io .netty .handler .codec .DecoderException ;
52
60
import io .netty .handler .ssl .SslHandler ;
53
61
54
62
/**
55
63
* @see <a href="https://github.com/opensearch-project/security/blob/d526c9f6c2a438c14db8b413148204510b9fe2e2/src/main/java/org/opensearch/security/ssl/http/netty/SecuritySSLNettyHttpServerTransport.java">SecuritySSLNettyHttpServerTransport</a>
56
64
*/
57
65
public class SecureNetty4HttpServerTransport extends Netty4HttpServerTransport {
66
+ public static final String REQUEST_HEADER_VERIFIER = "HeaderVerifier" ;
67
+ public static final String REQUEST_DECOMPRESSOR = "RequestDecompressor" ;
68
+
58
69
private static final Logger logger = LogManager .getLogger (SecureNetty4HttpServerTransport .class );
59
- private final SecureTransportSettingsProvider secureTransportSettingsProvider ;
60
- private final SecureTransportSettingsProvider .ServerExceptionHandler exceptionHandler ;
70
+ private final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider ;
71
+ private final TransportExceptionHandler exceptionHandler ;
72
+ private final ChannelInboundHandlerAdapter headerVerifier ;
73
+ private final TransportAdapterProvider <HttpServerTransport > decompressorProvider ;
61
74
62
75
public SecureNetty4HttpServerTransport (
63
76
final Settings settings ,
@@ -68,7 +81,7 @@ public SecureNetty4HttpServerTransport(
68
81
final Dispatcher dispatcher ,
69
82
final ClusterSettings clusterSettings ,
70
83
final SharedGroupFactory sharedGroupFactory ,
71
- final SecureTransportSettingsProvider secureTransportSettingsProvider ,
84
+ final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider ,
72
85
final Tracer tracer
73
86
) {
74
87
super (
@@ -82,9 +95,45 @@ public SecureNetty4HttpServerTransport(
82
95
sharedGroupFactory ,
83
96
tracer
84
97
);
85
- this .secureTransportSettingsProvider = secureTransportSettingsProvider ;
86
- this .exceptionHandler = secureTransportSettingsProvider .buildHttpServerExceptionHandler (settings , this )
87
- .orElse (SecureTransportSettingsProvider .ServerExceptionHandler .NOOP );
98
+
99
+ this .secureHttpTransportSettingsProvider = secureHttpTransportSettingsProvider ;
100
+ this .exceptionHandler = secureHttpTransportSettingsProvider .buildHttpServerExceptionHandler (settings , this )
101
+ .orElse (TransportExceptionHandler .NOOP );
102
+
103
+ final List <ChannelInboundHandlerAdapter > headerVerifiers = secureHttpTransportSettingsProvider .getHttpTransportAdapterProviders (
104
+ settings
105
+ )
106
+ .stream ()
107
+ .filter (p -> REQUEST_HEADER_VERIFIER .equalsIgnoreCase (p .name ()))
108
+ .map (p -> p .create (settings , this , ChannelInboundHandlerAdapter .class ))
109
+ .filter (Optional ::isPresent )
110
+ .map (Optional ::get )
111
+ .collect (Collectors .toList ());
112
+
113
+ if (headerVerifiers .size () > 1 ) {
114
+ throw new IllegalArgumentException ("Cannot have more than one header verifier configured, supplied " + headerVerifiers .size ());
115
+ }
116
+
117
+ final Optional <TransportAdapterProvider <HttpServerTransport >> decompressorProviderOpt = secureHttpTransportSettingsProvider
118
+ .getHttpTransportAdapterProviders (settings )
119
+ .stream ()
120
+ .filter (p -> REQUEST_DECOMPRESSOR .equalsIgnoreCase (p .name ()))
121
+ .findFirst ();
122
+ // There could be multiple request decompressor providers configured, using the first one
123
+ decompressorProviderOpt .ifPresent (p -> logger .debug ("Using request decompressor provider: {}" , p ));
124
+
125
+ this .headerVerifier = headerVerifiers .isEmpty () ? null : headerVerifiers .get (0 );
126
+ this .decompressorProvider = decompressorProviderOpt .orElseGet (() -> new TransportAdapterProvider <HttpServerTransport >() {
127
+ @ Override
128
+ public String name () {
129
+ return REQUEST_DECOMPRESSOR ;
130
+ }
131
+
132
+ @ Override
133
+ public <C > Optional <C > create (Settings settings , HttpServerTransport transport , Class <C > adapterClass ) {
134
+ return Optional .empty ();
135
+ }
136
+ });
88
137
}
89
138
90
139
@ Override
@@ -114,7 +163,7 @@ protected SslHttpChannelHandler(final Netty4HttpServerTransport transport, final
114
163
protected void initChannel (Channel ch ) throws Exception {
115
164
super .initChannel (ch );
116
165
117
- final SSLEngine sslEngine = secureTransportSettingsProvider .buildSecureHttpServerEngine (
166
+ final SSLEngine sslEngine = secureHttpTransportSettingsProvider .buildSecureHttpServerEngine (
118
167
settings ,
119
168
SecureNetty4HttpServerTransport .this
120
169
).orElseGet (SslUtils ::createDefaultServerSSLEngine );
@@ -123,4 +172,17 @@ protected void initChannel(Channel ch) throws Exception {
123
172
ch .pipeline ().addFirst ("ssl_http" , sslHandler );
124
173
}
125
174
}
175
+
176
+ protected ChannelInboundHandlerAdapter createHeaderVerifier () {
177
+ if (headerVerifier != null ) {
178
+ return headerVerifier ;
179
+ } else {
180
+ return super .createHeaderVerifier ();
181
+ }
182
+ }
183
+
184
+ @ Override
185
+ protected ChannelInboundHandlerAdapter createDecompressor () {
186
+ return decompressorProvider .create (settings , this , ChannelInboundHandlerAdapter .class ).orElseGet (super ::createDecompressor );
187
+ }
126
188
}
0 commit comments