Skip to content

Commit 14f1c43

Browse files
authored
Abstracting outbound side of transport (opensearch-project#13293)
* Abstracting outbound side of transport Signed-off-by: Vacha Shah <vachshah@amazon.com> * Making outbound handler protocol dependent via inbound handler Signed-off-by: Vacha Shah <vachshah@amazon.com> * Fixing precommit Signed-off-by: Vacha Shah <vachshah@amazon.com> * Addressing comments Signed-off-by: Vacha Shah <vachshah@amazon.com> * Fixing precommit Signed-off-by: Vacha Shah <vachshah@amazon.com> * Cleaning up code Signed-off-by: Vacha Shah <vachshah@amazon.com> * Addressing comments Signed-off-by: Vacha Shah <vachshah@amazon.com> * Cleaning up Signed-off-by: Vacha Shah <vachshah@amazon.com> * Addressing comments Signed-off-by: Vacha Shah <vachshah@amazon.com> * Abstracting InboundHandlerTests Signed-off-by: Vacha Shah <vachshah@amazon.com> * Abstracting TransportLoggerTests Signed-off-by: Vacha Shah <vachshah@amazon.com> --------- Signed-off-by: Vacha Shah <vachshah@amazon.com>
1 parent 079cef5 commit 14f1c43

28 files changed

+1061
-645
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
## [Unreleased 2.x]
77
### Added
88
- Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478))
9+
- Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293))
910

1011
### Dependencies
1112
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))

server/src/main/java/org/opensearch/transport/InboundHandler.java

+13
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232

3333
package org.opensearch.transport;
3434

35+
import org.opensearch.Version;
3536
import org.opensearch.common.unit.TimeValue;
37+
import org.opensearch.common.util.BigArrays;
3638
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
3739
import org.opensearch.telemetry.tracing.Tracer;
3840
import org.opensearch.threadpool.ThreadPool;
@@ -57,7 +59,12 @@ public class InboundHandler {
5759
private final Map<String, ProtocolMessageHandler> protocolMessageHandlers;
5860

5961
InboundHandler(
62+
String nodeName,
63+
Version version,
64+
String[] features,
65+
StatsTracker statsTracker,
6066
ThreadPool threadPool,
67+
BigArrays bigArrays,
6168
OutboundHandler outboundHandler,
6269
NamedWriteableRegistry namedWriteableRegistry,
6370
TransportHandshaker handshaker,
@@ -70,7 +77,12 @@ public class InboundHandler {
7077
this.protocolMessageHandlers = Map.of(
7178
NativeInboundMessage.NATIVE_PROTOCOL,
7279
new NativeMessageHandler(
80+
nodeName,
81+
version,
82+
features,
83+
statsTracker,
7384
threadPool,
85+
bigArrays,
7486
outboundHandler,
7587
namedWriteableRegistry,
7688
handshaker,
@@ -83,6 +95,7 @@ public class InboundHandler {
8395
}
8496

8597
void setMessageListener(TransportMessageListener listener) {
98+
protocolMessageHandlers.values().forEach(handler -> handler.setMessageListener(listener));
8699
if (messageListener == TransportMessageListener.NOOP_LISTENER) {
87100
messageListener = listener;
88101
} else {

server/src/main/java/org/opensearch/transport/NativeMessageHandler.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.logging.log4j.message.ParameterizedMessage;
3838
import org.apache.lucene.util.BytesRef;
3939
import org.opensearch.Version;
40+
import org.opensearch.common.util.BigArrays;
4041
import org.opensearch.common.util.concurrent.AbstractRunnable;
4142
import org.opensearch.common.util.concurrent.ThreadContext;
4243
import org.opensearch.core.common.io.stream.ByteBufferStreamInput;
@@ -52,6 +53,7 @@
5253
import org.opensearch.telemetry.tracing.channels.TraceableTcpTransportChannel;
5354
import org.opensearch.threadpool.ThreadPool;
5455
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;
56+
import org.opensearch.transport.nativeprotocol.NativeOutboundHandler;
5557

5658
import java.io.EOFException;
5759
import java.io.IOException;
@@ -72,7 +74,7 @@ public class NativeMessageHandler implements ProtocolMessageHandler {
7274
private static final Logger logger = LogManager.getLogger(NativeMessageHandler.class);
7375

7476
private final ThreadPool threadPool;
75-
private final OutboundHandler outboundHandler;
77+
private final NativeOutboundHandler outboundHandler;
7678
private final NamedWriteableRegistry namedWriteableRegistry;
7779
private final TransportHandshaker handshaker;
7880
private final TransportKeepAlive keepAlive;
@@ -82,7 +84,12 @@ public class NativeMessageHandler implements ProtocolMessageHandler {
8284
private final Tracer tracer;
8385

8486
NativeMessageHandler(
87+
String nodeName,
88+
Version version,
89+
String[] features,
90+
StatsTracker statsTracker,
8591
ThreadPool threadPool,
92+
BigArrays bigArrays,
8693
OutboundHandler outboundHandler,
8794
NamedWriteableRegistry namedWriteableRegistry,
8895
TransportHandshaker handshaker,
@@ -92,7 +99,7 @@ public class NativeMessageHandler implements ProtocolMessageHandler {
9299
TransportKeepAlive keepAlive
93100
) {
94101
this.threadPool = threadPool;
95-
this.outboundHandler = outboundHandler;
102+
this.outboundHandler = new NativeOutboundHandler(nodeName, version, features, statsTracker, threadPool, bigArrays, outboundHandler);
96103
this.namedWriteableRegistry = namedWriteableRegistry;
97104
this.handshaker = handshaker;
98105
this.requestHandlers = requestHandlers;
@@ -492,4 +499,9 @@ public void onFailure(Exception e) {
492499
}
493500
}
494501

502+
@Override
503+
public void setMessageListener(TransportMessageListener listener) {
504+
outboundHandler.setMessageListener(listener);
505+
}
506+
495507
}

server/src/main/java/org/opensearch/transport/OutboundHandler.java

+13-158
Original file line numberDiff line numberDiff line change
@@ -35,164 +35,47 @@
3535
import org.apache.logging.log4j.LogManager;
3636
import org.apache.logging.log4j.Logger;
3737
import org.apache.logging.log4j.message.ParameterizedMessage;
38-
import org.opensearch.Version;
39-
import org.opensearch.cluster.node.DiscoveryNode;
4038
import org.opensearch.common.CheckedSupplier;
41-
import org.opensearch.common.io.stream.ReleasableBytesStreamOutput;
4239
import org.opensearch.common.lease.Releasable;
4340
import org.opensearch.common.lease.Releasables;
4441
import org.opensearch.common.network.CloseableChannel;
4542
import org.opensearch.common.transport.NetworkExceptionHelper;
46-
import org.opensearch.common.util.BigArrays;
4743
import org.opensearch.common.util.concurrent.ThreadContext;
48-
import org.opensearch.common.util.io.IOUtils;
4944
import org.opensearch.core.action.ActionListener;
5045
import org.opensearch.core.action.NotifyOnceListener;
5146
import org.opensearch.core.common.bytes.BytesReference;
52-
import org.opensearch.core.common.transport.TransportAddress;
53-
import org.opensearch.core.transport.TransportResponse;
5447
import org.opensearch.threadpool.ThreadPool;
5548

5649
import java.io.IOException;
57-
import java.util.Set;
5850

5951
/**
6052
* Outbound data handler
6153
*
6254
* @opensearch.internal
6355
*/
64-
final class OutboundHandler {
56+
public final class OutboundHandler {
6557

6658
private static final Logger logger = LogManager.getLogger(OutboundHandler.class);
6759

68-
private final String nodeName;
69-
private final Version version;
70-
private final String[] features;
7160
private final StatsTracker statsTracker;
7261
private final ThreadPool threadPool;
73-
private final BigArrays bigArrays;
74-
private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;
7562

76-
OutboundHandler(
77-
String nodeName,
78-
Version version,
79-
String[] features,
80-
StatsTracker statsTracker,
81-
ThreadPool threadPool,
82-
BigArrays bigArrays
83-
) {
84-
this.nodeName = nodeName;
85-
this.version = version;
86-
this.features = features;
63+
public OutboundHandler(StatsTracker statsTracker, ThreadPool threadPool) {
8764
this.statsTracker = statsTracker;
8865
this.threadPool = threadPool;
89-
this.bigArrays = bigArrays;
9066
}
9167

9268
void sendBytes(TcpChannel channel, BytesReference bytes, ActionListener<Void> listener) {
93-
SendContext sendContext = new SendContext(channel, () -> bytes, listener);
69+
SendContext sendContext = new SendContext(statsTracker, channel, () -> bytes, listener);
9470
try {
95-
internalSend(channel, sendContext);
71+
sendBytes(channel, sendContext);
9672
} catch (IOException e) {
9773
// This should not happen as the bytes are already serialized
9874
throw new AssertionError(e);
9975
}
10076
}
10177

102-
/**
103-
* Sends the request to the given channel. This method should be used to send {@link TransportRequest}
104-
* objects back to the caller.
105-
*/
106-
void sendRequest(
107-
final DiscoveryNode node,
108-
final TcpChannel channel,
109-
final long requestId,
110-
final String action,
111-
final TransportRequest request,
112-
final TransportRequestOptions options,
113-
final Version channelVersion,
114-
final boolean compressRequest,
115-
final boolean isHandshake
116-
) throws IOException, TransportException {
117-
Version version = Version.min(this.version, channelVersion);
118-
OutboundMessage.Request message = new OutboundMessage.Request(
119-
threadPool.getThreadContext(),
120-
features,
121-
request,
122-
version,
123-
action,
124-
requestId,
125-
isHandshake,
126-
compressRequest
127-
);
128-
ActionListener<Void> listener = ActionListener.wrap(() -> messageListener.onRequestSent(node, requestId, action, request, options));
129-
sendMessage(channel, message, listener);
130-
}
131-
132-
/**
133-
* Sends the response to the given channel. This method should be used to send {@link TransportResponse}
134-
* objects back to the caller.
135-
*
136-
* @see #sendErrorResponse(Version, Set, TcpChannel, long, String, Exception) for sending error responses
137-
*/
138-
void sendResponse(
139-
final Version nodeVersion,
140-
final Set<String> features,
141-
final TcpChannel channel,
142-
final long requestId,
143-
final String action,
144-
final TransportResponse response,
145-
final boolean compress,
146-
final boolean isHandshake
147-
) throws IOException {
148-
Version version = Version.min(this.version, nodeVersion);
149-
OutboundMessage.Response message = new OutboundMessage.Response(
150-
threadPool.getThreadContext(),
151-
features,
152-
response,
153-
version,
154-
requestId,
155-
isHandshake,
156-
compress
157-
);
158-
ActionListener<Void> listener = ActionListener.wrap(() -> messageListener.onResponseSent(requestId, action, response));
159-
sendMessage(channel, message, listener);
160-
}
161-
162-
/**
163-
* Sends back an error response to the caller via the given channel
164-
*/
165-
void sendErrorResponse(
166-
final Version nodeVersion,
167-
final Set<String> features,
168-
final TcpChannel channel,
169-
final long requestId,
170-
final String action,
171-
final Exception error
172-
) throws IOException {
173-
Version version = Version.min(this.version, nodeVersion);
174-
TransportAddress address = new TransportAddress(channel.getLocalAddress());
175-
RemoteTransportException tx = new RemoteTransportException(nodeName, address, action, error);
176-
OutboundMessage.Response message = new OutboundMessage.Response(
177-
threadPool.getThreadContext(),
178-
features,
179-
tx,
180-
version,
181-
requestId,
182-
false,
183-
false
184-
);
185-
ActionListener<Void> listener = ActionListener.wrap(() -> messageListener.onResponseSent(requestId, action, error));
186-
sendMessage(channel, message, listener);
187-
}
188-
189-
private void sendMessage(TcpChannel channel, OutboundMessage networkMessage, ActionListener<Void> listener) throws IOException {
190-
MessageSerializer serializer = new MessageSerializer(networkMessage, bigArrays);
191-
SendContext sendContext = new SendContext(channel, serializer, listener, serializer);
192-
internalSend(channel, sendContext);
193-
}
194-
195-
private void internalSend(TcpChannel channel, SendContext sendContext) throws IOException {
78+
public void sendBytes(TcpChannel channel, SendContext sendContext) throws IOException {
19679
channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis());
19780
BytesReference reference = sendContext.get();
19881
// stash thread context so that channel event loop is not polluted by thread context
@@ -205,59 +88,30 @@ private void internalSend(TcpChannel channel, SendContext sendContext) throws IO
20588
}
20689
}
20790

208-
void setMessageListener(TransportMessageListener listener) {
209-
if (messageListener == TransportMessageListener.NOOP_LISTENER) {
210-
messageListener = listener;
211-
} else {
212-
throw new IllegalStateException("Cannot set message listener twice");
213-
}
214-
}
215-
21691
/**
21792
* Internal message serializer
21893
*
21994
* @opensearch.internal
22095
*/
221-
private static class MessageSerializer implements CheckedSupplier<BytesReference, IOException>, Releasable {
222-
223-
private final OutboundMessage message;
224-
private final BigArrays bigArrays;
225-
private volatile ReleasableBytesStreamOutput bytesStreamOutput;
226-
227-
private MessageSerializer(OutboundMessage message, BigArrays bigArrays) {
228-
this.message = message;
229-
this.bigArrays = bigArrays;
230-
}
231-
232-
@Override
233-
public BytesReference get() throws IOException {
234-
bytesStreamOutput = new ReleasableBytesStreamOutput(bigArrays);
235-
return message.serialize(bytesStreamOutput);
236-
}
237-
238-
@Override
239-
public void close() {
240-
IOUtils.closeWhileHandlingException(bytesStreamOutput);
241-
}
242-
}
243-
244-
private class SendContext extends NotifyOnceListener<Void> implements CheckedSupplier<BytesReference, IOException> {
245-
96+
public static class SendContext extends NotifyOnceListener<Void> implements CheckedSupplier<BytesReference, IOException> {
97+
private final StatsTracker statsTracker;
24698
private final TcpChannel channel;
24799
private final CheckedSupplier<BytesReference, IOException> messageSupplier;
248100
private final ActionListener<Void> listener;
249101
private final Releasable optionalReleasable;
250102
private long messageSize = -1;
251103

252-
private SendContext(
104+
SendContext(
105+
StatsTracker statsTracker,
253106
TcpChannel channel,
254107
CheckedSupplier<BytesReference, IOException> messageSupplier,
255108
ActionListener<Void> listener
256109
) {
257-
this(channel, messageSupplier, listener, null);
110+
this(statsTracker, channel, messageSupplier, listener, null);
258111
}
259112

260-
private SendContext(
113+
public SendContext(
114+
StatsTracker statsTracker,
261115
TcpChannel channel,
262116
CheckedSupplier<BytesReference, IOException> messageSupplier,
263117
ActionListener<Void> listener,
@@ -267,6 +121,7 @@ private SendContext(
267121
this.messageSupplier = messageSupplier;
268122
this.listener = listener;
269123
this.optionalReleasable = optionalReleasable;
124+
this.statsTracker = statsTracker;
270125
}
271126

272127
public BytesReference get() throws IOException {

server/src/main/java/org/opensearch/transport/ProtocolMessageHandler.java

+14
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,25 @@
1717
*/
1818
public interface ProtocolMessageHandler {
1919

20+
/**
21+
* Handles the message received on the channel.
22+
* @param channel the channel on which the message was received
23+
* @param message the message received
24+
* @param startTime the start time
25+
* @param slowLogThresholdMs the threshold for slow logs
26+
* @param messageListener the message listener
27+
*/
2028
public void messageReceived(
2129
TcpChannel channel,
2230
ProtocolInboundMessage message,
2331
long startTime,
2432
long slowLogThresholdMs,
2533
TransportMessageListener messageListener
2634
) throws IOException;
35+
36+
/**
37+
* Sets the message listener to be used by the handler.
38+
* @param listener the message listener
39+
*/
40+
public void setMessageListener(TransportMessageListener listener);
2741
}

0 commit comments

Comments
 (0)