Skip to content

Commit 96745e0

Browse files
committed
Addressing comments
Signed-off-by: Vacha Shah <vachshah@amazon.com>
1 parent 8cbab5c commit 96745e0

15 files changed

+440
-245
lines changed

server/src/main/java/org/opensearch/transport/Header.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -75,19 +75,19 @@ public int getNetworkMessageSize() {
7575
return networkMessageSize;
7676
}
7777

78-
Version getVersion() {
78+
public Version getVersion() {
7979
return version;
8080
}
8181

82-
long getRequestId() {
82+
public long getRequestId() {
8383
return requestId;
8484
}
8585

8686
byte getStatus() {
8787
return status;
8888
}
8989

90-
boolean isRequest() {
90+
public boolean isRequest() {
9191
return TransportStatus.isRequest(status);
9292
}
9393

@@ -99,7 +99,7 @@ boolean isError() {
9999
return TransportStatus.isError(status);
100100
}
101101

102-
boolean isHandshake() {
102+
public boolean isHandshake() {
103103
return TransportStatus.isHandshake(status);
104104
}
105105

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.transport;
10+
11+
import org.opensearch.common.bytes.ReleasableBytesReference;
12+
13+
import java.io.IOException;
14+
15+
public interface InboundBytesHandler {
16+
17+
public void doHandleBytes(TcpChannel channel, ReleasableBytesReference reference) throws IOException;
18+
}

server/src/main/java/org/opensearch/transport/InboundDecoder.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@
5050
*/
5151
public class InboundDecoder implements Releasable {
5252

53-
static final Object PING = new Object();
54-
static final Object END_CONTENT = new Object();
53+
public static final Object PING = new Object();
54+
public static final Object END_CONTENT = new Object();
5555

5656
private final Version version;
5757
private final PageCacheRecycler recycler;

server/src/main/java/org/opensearch/transport/InboundHandler.java

+21-35
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.opensearch.threadpool.ThreadPool;
3939

4040
import java.io.IOException;
41+
import java.util.Map;
4142

4243
/**
4344
* Handler for inbound data
@@ -47,18 +48,12 @@
4748
public class InboundHandler {
4849

4950
private final ThreadPool threadPool;
50-
private final OutboundHandler outboundHandler;
51-
private final NamedWriteableRegistry namedWriteableRegistry;
52-
private final TransportHandshaker handshaker;
53-
private final TransportKeepAlive keepAlive;
54-
private final Transport.ResponseHandlers responseHandlers;
55-
private final Transport.RequestHandlers requestHandlers;
5651

5752
private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;
5853

5954
private volatile long slowLogThresholdMs = Long.MAX_VALUE;
6055

61-
private final Tracer tracer;
56+
private final Map<String, ProtocolMessageHandler> protocolMessageHandlers;
6257

6358
InboundHandler(
6459
ThreadPool threadPool,
@@ -71,13 +66,19 @@ public class InboundHandler {
7166
Tracer tracer
7267
) {
7368
this.threadPool = threadPool;
74-
this.outboundHandler = outboundHandler;
75-
this.namedWriteableRegistry = namedWriteableRegistry;
76-
this.handshaker = handshaker;
77-
this.keepAlive = keepAlive;
78-
this.requestHandlers = requestHandlers;
79-
this.responseHandlers = responseHandlers;
80-
this.tracer = tracer;
69+
this.protocolMessageHandlers = Map.of(
70+
ProtocolInboundMessage.NATIVE_PROTOCOL,
71+
new NativeMessageHandler(
72+
threadPool,
73+
outboundHandler,
74+
namedWriteableRegistry,
75+
handshaker,
76+
requestHandlers,
77+
responseHandlers,
78+
tracer,
79+
keepAlive
80+
)
81+
);
8182
}
8283

8384
void setMessageListener(TransportMessageListener listener) {
@@ -92,32 +93,17 @@ void setSlowLogThreshold(TimeValue slowLogThreshold) {
9293
this.slowLogThresholdMs = slowLogThreshold.getMillis();
9394
}
9495

95-
void inboundMessage(TcpChannel channel, BaseInboundMessage message) throws Exception {
96+
void inboundMessage(TcpChannel channel, ProtocolInboundMessage message) throws Exception {
9697
final long startTime = threadPool.relativeTimeInMillis();
9798
channel.getChannelStats().markAccessed(startTime);
9899
messageReceivedFromPipeline(channel, message, startTime);
99100
}
100101

101-
private void messageReceivedFromPipeline(TcpChannel channel, BaseInboundMessage message, long startTime) throws IOException {
102-
if ((message.getProtocol()).equals(BaseInboundMessage.PROTOBUF_PROTOCOL)) {
103-
// protobuf messages logic can be added here
104-
} else {
105-
InboundMessage inboundMessage = (InboundMessage) message;
106-
TransportLogger.logInboundMessage(channel, inboundMessage);
107-
if (inboundMessage.isPing()) {
108-
keepAlive.receiveKeepAlive(channel);
109-
} else {
110-
NativeMessageHandler nativeInboundHandler = new NativeMessageHandler(
111-
threadPool,
112-
outboundHandler,
113-
namedWriteableRegistry,
114-
handshaker,
115-
requestHandlers,
116-
responseHandlers,
117-
tracer
118-
);
119-
nativeInboundHandler.messageReceived(channel, inboundMessage, startTime, slowLogThresholdMs, messageListener);
120-
}
102+
private void messageReceivedFromPipeline(TcpChannel channel, ProtocolInboundMessage message, long startTime) throws IOException {
103+
ProtocolMessageHandler protocolMessageHandler = protocolMessageHandlers.get(message.getProtocol());
104+
if (protocolMessageHandler == null) {
105+
throw new IllegalStateException("No protocol message handler found for protocol: " + message.getProtocol());
121106
}
107+
protocolMessageHandler.messageReceived(channel, message, startTime, slowLogThresholdMs, messageListener);
122108
}
123109
}

server/src/main/java/org/opensearch/transport/InboundMessage.java

+17-62
Original file line numberDiff line numberDiff line change
@@ -32,116 +32,71 @@
3232

3333
package org.opensearch.transport;
3434

35-
import org.opensearch.common.annotation.PublicApi;
3635
import org.opensearch.common.bytes.ReleasableBytesReference;
3736
import org.opensearch.common.lease.Releasable;
38-
import org.opensearch.common.lease.Releasables;
39-
import org.opensearch.common.util.io.IOUtils;
4037
import org.opensearch.core.common.io.stream.StreamInput;
38+
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;
4139

4240
import java.io.IOException;
4341

4442
/**
4543
* Inbound data as a message
46-
*
44+
* This api is deprecated, please use {@link org.opensearch.transport.nativeprotocol.NativeInboundMessage} instead.
4745
* @opensearch.api
4846
*/
49-
@PublicApi(since = "1.0.0")
50-
public class InboundMessage implements Releasable, BaseInboundMessage {
47+
@Deprecated
48+
public class InboundMessage implements Releasable, ProtocolInboundMessage {
5149

52-
private final Header header;
53-
private final ReleasableBytesReference content;
54-
private final Exception exception;
55-
private final boolean isPing;
56-
private Releasable breakerRelease;
57-
private StreamInput streamInput;
58-
private String protocol;
50+
private NativeInboundMessage nativeInboundMessage;
5951

6052
public InboundMessage(Header header, ReleasableBytesReference content, Releasable breakerRelease) {
61-
this.header = header;
62-
this.content = content;
63-
this.breakerRelease = breakerRelease;
64-
this.exception = null;
65-
this.isPing = false;
53+
this.nativeInboundMessage = new NativeInboundMessage(header, content, breakerRelease);
6654
}
6755

6856
public InboundMessage(Header header, Exception exception) {
69-
this.header = header;
70-
this.content = null;
71-
this.breakerRelease = null;
72-
this.exception = exception;
73-
this.isPing = false;
57+
this.nativeInboundMessage = new NativeInboundMessage(header, exception);
7458
}
7559

7660
public InboundMessage(Header header, boolean isPing) {
77-
this.header = header;
78-
this.content = null;
79-
this.breakerRelease = null;
80-
this.exception = null;
81-
this.isPing = isPing;
61+
this.nativeInboundMessage = new NativeInboundMessage(header, isPing);
8262
}
8363

8464
public Header getHeader() {
85-
return header;
65+
return this.nativeInboundMessage.getHeader();
8666
}
8767

8868
public int getContentLength() {
89-
if (content == null) {
90-
return 0;
91-
} else {
92-
return content.length();
93-
}
69+
return this.nativeInboundMessage.getContentLength();
9470
}
9571

9672
public Exception getException() {
97-
return exception;
73+
return this.nativeInboundMessage.getException();
9874
}
9975

10076
public boolean isPing() {
101-
return isPing;
77+
return this.nativeInboundMessage.isPing();
10278
}
10379

10480
public boolean isShortCircuit() {
105-
return exception != null;
81+
return this.nativeInboundMessage.getException() != null;
10682
}
10783

10884
public Releasable takeBreakerReleaseControl() {
109-
final Releasable toReturn = breakerRelease;
110-
breakerRelease = null;
111-
if (toReturn != null) {
112-
return toReturn;
113-
} else {
114-
return () -> {};
115-
}
85+
return this.nativeInboundMessage.takeBreakerReleaseControl();
11686
}
11787

11888
public StreamInput openOrGetStreamInput() throws IOException {
119-
assert isPing == false && content != null;
120-
if (streamInput == null) {
121-
streamInput = content.streamInput();
122-
streamInput.setVersion(header.getVersion());
123-
}
124-
return streamInput;
89+
return this.nativeInboundMessage.openOrGetStreamInput();
12590
}
12691

12792
@Override
12893
public void close() {
129-
IOUtils.closeWhileHandlingException(streamInput);
130-
Releasables.closeWhileHandlingException(content, breakerRelease);
94+
this.nativeInboundMessage.close();
13195
}
13296

13397
@Override
13498
public String toString() {
135-
return "InboundMessage{" + header + "}";
136-
}
137-
138-
@Override
139-
public String getProtocol() {
140-
return NATIVE_PROTOCOL;
99+
return this.nativeInboundMessage.toString();
141100
}
142101

143-
@Override
144-
public void setProtocol() {
145-
this.protocol = NATIVE_PROTOCOL;
146-
}
147102
}

0 commit comments

Comments
 (0)