Skip to content

Commit f5c3ef9

Browse files
authored
Replacing InboundMessage with NativeInboundMessage for deprecation (opensearch-project#13126)
* Replacing InboundMessage with NativeInboundMessage for deprecation Signed-off-by: Vacha Shah <vachshah@amazon.com> * Removing InboundMessage class Signed-off-by: Vacha Shah <vachshah@amazon.com> --------- Signed-off-by: Vacha Shah <vachshah@amazon.com>
1 parent ba25c23 commit f5c3ef9

10 files changed

+80
-158
lines changed

server/src/main/java/org/opensearch/transport/InboundAggregator.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.opensearch.core.common.bytes.BytesArray;
4141
import org.opensearch.core.common.bytes.BytesReference;
4242
import org.opensearch.core.common.bytes.CompositeBytesReference;
43+
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;
4344

4445
import java.io.IOException;
4546
import java.util.ArrayList;
@@ -113,7 +114,7 @@ public void aggregate(ReleasableBytesReference content) {
113114
}
114115
}
115116

116-
public InboundMessage finishAggregation() throws IOException {
117+
public NativeInboundMessage finishAggregation() throws IOException {
117118
ensureOpen();
118119
final ReleasableBytesReference releasableContent;
119120
if (isFirstContent()) {
@@ -127,7 +128,7 @@ public InboundMessage finishAggregation() throws IOException {
127128
}
128129

129130
final BreakerControl breakerControl = new BreakerControl(circuitBreaker);
130-
final InboundMessage aggregated = new InboundMessage(currentHeader, releasableContent, breakerControl);
131+
final NativeInboundMessage aggregated = new NativeInboundMessage(currentHeader, releasableContent, breakerControl);
131132
boolean success = false;
132133
try {
133134
if (aggregated.getHeader().needsToReadVariableHeader()) {
@@ -142,7 +143,7 @@ public InboundMessage finishAggregation() throws IOException {
142143
if (isShortCircuited()) {
143144
aggregated.close();
144145
success = true;
145-
return new InboundMessage(aggregated.getHeader(), aggregationException);
146+
return new NativeInboundMessage(aggregated.getHeader(), aggregationException);
146147
} else {
147148
success = true;
148149
return aggregated;

server/src/main/java/org/opensearch/transport/InboundMessage.java

-108
This file was deleted.

server/src/main/java/org/opensearch/transport/NativeMessageHandler.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.opensearch.telemetry.tracing.Tracer;
5252
import org.opensearch.telemetry.tracing.channels.TraceableTcpTransportChannel;
5353
import org.opensearch.threadpool.ThreadPool;
54+
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;
5455

5556
import java.io.EOFException;
5657
import java.io.IOException;
@@ -111,7 +112,7 @@ public void messageReceived(
111112
long slowLogThresholdMs,
112113
TransportMessageListener messageListener
113114
) throws IOException {
114-
InboundMessage inboundMessage = (InboundMessage) message;
115+
NativeInboundMessage inboundMessage = (NativeInboundMessage) message;
115116
TransportLogger.logInboundMessage(channel, inboundMessage);
116117
if (inboundMessage.isPing()) {
117118
keepAlive.receiveKeepAlive(channel);
@@ -122,7 +123,7 @@ public void messageReceived(
122123

123124
private void handleMessage(
124125
TcpChannel channel,
125-
InboundMessage message,
126+
NativeInboundMessage message,
126127
long startTime,
127128
long slowLogThresholdMs,
128129
TransportMessageListener messageListener
@@ -194,7 +195,7 @@ private Map<String, Collection<String>> extractHeaders(Map<String, String> heade
194195
private <T extends TransportRequest> void handleRequest(
195196
TcpChannel channel,
196197
Header header,
197-
InboundMessage message,
198+
NativeInboundMessage message,
198199
TransportMessageListener messageListener
199200
) throws IOException {
200201
final String action = header.getActionName();

server/src/main/java/org/opensearch/transport/TcpTransport.java

-12
Original file line numberDiff line numberDiff line change
@@ -761,18 +761,6 @@ protected void serverAcceptedChannel(TcpChannel channel) {
761761
*/
762762
protected abstract void stopInternal();
763763

764-
/**
765-
* @deprecated use {@link #inboundMessage(TcpChannel, ProtocolInboundMessage)}
766-
* Handles inbound message that has been decoded.
767-
*
768-
* @param channel the channel the message is from
769-
* @param message the message
770-
*/
771-
@Deprecated(since = "2.14.0", forRemoval = true)
772-
public void inboundMessage(TcpChannel channel, InboundMessage message) {
773-
inboundMessage(channel, (ProtocolInboundMessage) message);
774-
}
775-
776764
/**
777765
* Handles inbound message that has been decoded.
778766
*

server/src/main/java/org/opensearch/transport/TransportLogger.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
4141
import org.opensearch.core.common.io.stream.StreamInput;
4242
import org.opensearch.core.compress.CompressorRegistry;
43+
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;
4344

4445
import java.io.IOException;
4546

@@ -64,7 +65,7 @@ static void logInboundMessage(TcpChannel channel, BytesReference message) {
6465
}
6566
}
6667

67-
static void logInboundMessage(TcpChannel channel, InboundMessage message) {
68+
static void logInboundMessage(TcpChannel channel, NativeInboundMessage message) {
6869
if (logger.isTraceEnabled()) {
6970
try {
7071
String logMessage = format(channel, message, "READ");
@@ -136,7 +137,7 @@ private static String format(TcpChannel channel, BytesReference message, String
136137
return sb.toString();
137138
}
138139

139-
private static String format(TcpChannel channel, InboundMessage message, String event) throws IOException {
140+
private static String format(TcpChannel channel, NativeInboundMessage message, String event) throws IOException {
140141
final StringBuilder sb = new StringBuilder();
141142
sb.append(channel);
142143

server/src/main/java/org/opensearch/transport/nativeprotocol/NativeInboundBytesHandler.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.opensearch.transport.InboundAggregator;
1717
import org.opensearch.transport.InboundBytesHandler;
1818
import org.opensearch.transport.InboundDecoder;
19-
import org.opensearch.transport.InboundMessage;
2019
import org.opensearch.transport.ProtocolInboundMessage;
2120
import org.opensearch.transport.StatsTracker;
2221
import org.opensearch.transport.TcpChannel;
@@ -32,7 +31,7 @@
3231
public class NativeInboundBytesHandler implements InboundBytesHandler {
3332

3433
private static final ThreadLocal<ArrayList<Object>> fragmentList = ThreadLocal.withInitial(ArrayList::new);
35-
private static final InboundMessage PING_MESSAGE = new InboundMessage(null, true);
34+
private static final NativeInboundMessage PING_MESSAGE = new NativeInboundMessage(null, true);
3635

3736
private final ArrayDeque<ReleasableBytesReference> pending;
3837
private final InboundDecoder decoder;
@@ -152,7 +151,7 @@ private void forwardFragments(
152151
messageHandler.accept(channel, PING_MESSAGE);
153152
} else if (fragment == InboundDecoder.END_CONTENT) {
154153
assert aggregator.isAggregating();
155-
try (InboundMessage aggregated = aggregator.finishAggregation()) {
154+
try (NativeInboundMessage aggregated = aggregator.finishAggregation()) {
156155
statsTracker.markMessageReceived();
157156
messageHandler.accept(channel, aggregated);
158157
}

server/src/test/java/org/opensearch/transport/InboundAggregatorTests.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.opensearch.core.common.breaker.CircuitBreakingException;
4343
import org.opensearch.core.common.bytes.BytesArray;
4444
import org.opensearch.test.OpenSearchTestCase;
45+
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;
4546
import org.junit.Before;
4647

4748
import java.io.IOException;
@@ -107,7 +108,7 @@ public void testInboundAggregation() throws IOException {
107108
}
108109

109110
// Signal EOS
110-
InboundMessage aggregated = aggregator.finishAggregation();
111+
NativeInboundMessage aggregated = aggregator.finishAggregation();
111112

112113
assertThat(aggregated, notNullValue());
113114
assertFalse(aggregated.isPing());
@@ -138,7 +139,7 @@ public void testInboundUnknownAction() throws IOException {
138139
assertEquals(0, content.refCount());
139140

140141
// Signal EOS
141-
InboundMessage aggregated = aggregator.finishAggregation();
142+
NativeInboundMessage aggregated = aggregator.finishAggregation();
142143

143144
assertThat(aggregated, notNullValue());
144145
assertTrue(aggregated.isShortCircuit());
@@ -161,7 +162,7 @@ public void testCircuitBreak() throws IOException {
161162
content1.close();
162163

163164
// Signal EOS
164-
InboundMessage aggregated1 = aggregator.finishAggregation();
165+
NativeInboundMessage aggregated1 = aggregator.finishAggregation();
165166

166167
assertEquals(0, content1.refCount());
167168
assertThat(aggregated1, notNullValue());
@@ -180,7 +181,7 @@ public void testCircuitBreak() throws IOException {
180181
content2.close();
181182

182183
// Signal EOS
183-
InboundMessage aggregated2 = aggregator.finishAggregation();
184+
NativeInboundMessage aggregated2 = aggregator.finishAggregation();
184185

185186
assertEquals(1, content2.refCount());
186187
assertThat(aggregated2, notNullValue());
@@ -199,7 +200,7 @@ public void testCircuitBreak() throws IOException {
199200
content3.close();
200201

201202
// Signal EOS
202-
InboundMessage aggregated3 = aggregator.finishAggregation();
203+
NativeInboundMessage aggregated3 = aggregator.finishAggregation();
203204

204205
assertEquals(1, content3.refCount());
205206
assertThat(aggregated3, notNullValue());
@@ -263,7 +264,7 @@ public void testFinishAggregationWillFinishHeader() throws IOException {
263264
content.close();
264265

265266
// Signal EOS
266-
InboundMessage aggregated = aggregator.finishAggregation();
267+
NativeInboundMessage aggregated = aggregator.finishAggregation();
267268

268269
assertThat(aggregated, notNullValue());
269270
assertFalse(header.needsToReadVariableHeader());

0 commit comments

Comments
 (0)