Skip to content

Commit f480d24

Browse files
committed
Add remote publish unit tests
Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
1 parent afc033c commit f480d24

File tree

3 files changed

+238
-15
lines changed

3 files changed

+238
-15
lines changed

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

+25-2
Original file line numberDiff line numberDiff line change
@@ -228,14 +228,19 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
228228
}
229229
}
230230

231-
private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException {
231+
// package private for testing
232+
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException {
232233
if (transportService.getLocalNode().equals(request.getSourceNode())) {
233234
return acceptRemoteStateOnLocalNode(request);
234235
}
236+
// TODO Make cluster state download non-blocking: https://github.com/opensearch-project/OpenSearch/issues/14102
235237
ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName(
236238
request.getClusterUUID(),
237239
request.getManifestFile()
238240
);
241+
if (manifest == null) {
242+
throw new IllegalStateException("Publication failed as manifest was not found for " + request);
243+
}
239244
boolean applyFullState = false;
240245
final ClusterState lastSeen = lastSeenClusterState.get();
241246
if (lastSeen == null) {
@@ -309,6 +314,13 @@ private PublishWithJoinResponse acceptRemoteStateOnLocalNode(RemotePublishReques
309314
if (publishRequest == null
310315
|| publishRequest.getAcceptedState().coordinationMetadata().term() != remotePublishRequest.term
311316
|| publishRequest.getAcceptedState().version() != remotePublishRequest.version) {
317+
logger.debug(
318+
() -> new ParameterizedMessage(
319+
"Publication failure for current publish request : {} and remote publish request: {}",
320+
publishRequest,
321+
remotePublishRequest
322+
)
323+
);
312324
throw new IllegalStateException("publication to self failed for " + remotePublishRequest);
313325
}
314326
PublishWithJoinResponse publishWithJoinResponse = handlePublishRequest.apply(publishRequest);
@@ -334,6 +346,16 @@ public PublicationContext newPublicationContext(
334346
return publicationContext;
335347
}
336348

349+
// package private for testing
350+
void setCurrentPublishRequestToSelf(PublishRequest publishRequest) {
351+
this.currentPublishRequestToSelf.set(publishRequest);
352+
}
353+
354+
// package private for testing
355+
void setLastSeenClusterState(ClusterState clusterState) {
356+
this.lastSeenClusterState.set(clusterState);
357+
}
358+
337359
private static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
338360
final BytesReference serializedState = CompressedStreamUtils.createCompressedStream(nodeVersion, stream -> {
339361
stream.writeBoolean(true);
@@ -447,7 +469,8 @@ public void onFailure(Exception e) {
447469
} else {
448470
responseActionListener = listener;
449471
}
450-
if (sendRemoteState && destination.isRemoteClusterStateEnabled() && destination.isRemoteRoutingTableEnabled()) {
472+
if (sendRemoteState && destination.isRemoteStatePublicationEnabled()) {
473+
logger.trace("sending remote cluster state version [{}] to [{}]", newState.version(), destination);
451474
sendRemoteClusterState(destination, publishRequest.getAcceptedState(), responseActionListener);
452475
} else if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
453476
logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination);

server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java

+8
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,14 @@ public boolean isRemoteRoutingTableEnabled() {
491491
return this.getAttributes().keySet().stream().anyMatch(key -> key.equals(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY));
492492
}
493493

494+
/**
495+
* Returns whether remote cluster state publication is enabled on this node
496+
* @return true if the node contains remote cluster state node attribute and remote routing table node attribute
497+
*/
498+
public boolean isRemoteStatePublicationEnabled() {
499+
return isRemoteClusterStateEnabled() && isRemoteRoutingTableEnabled();
500+
}
501+
494502
/**
495503
* Returns a set of all the roles that the node has. The roles are returned in sorted order by the role name.
496504
* <p>

server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java

+205-13
Original file line numberDiff line numberDiff line change
@@ -37,33 +37,59 @@
3737
import org.opensearch.cluster.ClusterState;
3838
import org.opensearch.cluster.Diff;
3939
import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration;
40+
import org.opensearch.cluster.metadata.Metadata;
4041
import org.opensearch.cluster.node.DiscoveryNode;
4142
import org.opensearch.cluster.node.DiscoveryNodes;
4243
import org.opensearch.common.settings.ClusterSettings;
4344
import org.opensearch.common.settings.Settings;
4445
import org.opensearch.core.common.io.stream.StreamOutput;
46+
import org.opensearch.gateway.remote.ClusterMetadataManifest;
47+
import org.opensearch.gateway.remote.RemoteClusterStateService;
4548
import org.opensearch.node.Node;
4649
import org.opensearch.telemetry.tracing.noop.NoopTracer;
4750
import org.opensearch.test.OpenSearchTestCase;
4851
import org.opensearch.test.transport.CapturingTransport;
4952
import org.opensearch.transport.TransportService;
53+
import org.junit.Before;
5054

5155
import java.io.IOException;
5256
import java.util.Collections;
57+
import java.util.Optional;
58+
import java.util.function.Function;
59+
60+
import org.mockito.Mockito;
5361

5462
import static org.hamcrest.Matchers.containsString;
5563
import static org.hamcrest.Matchers.instanceOf;
64+
import static org.hamcrest.Matchers.is;
65+
import static org.mockito.Mockito.mock;
66+
import static org.mockito.Mockito.times;
67+
import static org.mockito.Mockito.when;
5668

5769
public class PublicationTransportHandlerTests extends OpenSearchTestCase {
5870

59-
public void testDiffSerializationFailure() {
60-
DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(
71+
private static final long TERM = 5;
72+
private static final long VERSION = 5;
73+
private static final String CLUSTER_NAME = "test-cluster";
74+
private static final String CLUSTER_UUID = "test-cluster-UUID";
75+
private static final String MANIFEST_FILE = "path/to/manifest";
76+
private static final String LOCAL_NODE_ID = "localNode";
77+
78+
private DeterministicTaskQueue deterministicTaskQueue;
79+
private TransportService transportService;
80+
private DiscoveryNode localNode;
81+
private DiscoveryNode secondNode;
82+
83+
@Before
84+
public void setup() {
85+
deterministicTaskQueue = new DeterministicTaskQueue(
6186
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(),
6287
random()
6388
);
6489
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
65-
final DiscoveryNode localNode = new DiscoveryNode("localNode", buildNewFakeTransportAddress(), Version.CURRENT);
66-
final TransportService transportService = new CapturingTransport().createTransportService(
90+
localNode = new DiscoveryNode(LOCAL_NODE_ID, buildNewFakeTransportAddress(), Version.CURRENT);
91+
secondNode = new DiscoveryNode("secondNode", buildNewFakeTransportAddress(), Version.CURRENT);
92+
transportService = new CapturingTransport().createTransportService(
6793
Settings.EMPTY,
6894
deterministicTaskQueue.getThreadPool(),
6995
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
@@ -72,15 +98,10 @@ public void testDiffSerializationFailure() {
7298
Collections.emptySet(),
7399
NoopTracer.INSTANCE
74100
);
75-
final PublicationTransportHandler handler = new PublicationTransportHandler(
76-
transportService,
77-
writableRegistry(),
78-
pu -> null,
79-
(pu, l) -> {},
80-
null
81-
);
82-
transportService.start();
83-
transportService.acceptIncomingRequests();
101+
}
102+
103+
public void testDiffSerializationFailure() {
104+
final PublicationTransportHandler handler = getPublicationTransportHandler(p -> null, null);
84105

85106
final DiscoveryNode otherNode = new DiscoveryNode("otherNode", buildNewFakeTransportAddress(), Version.CURRENT);
86107
final ClusterState clusterState = CoordinationStateTests.clusterState(
@@ -118,4 +139,175 @@ public void writeTo(StreamOutput out) throws IOException {
118139
assertThat(e.getCause(), instanceOf(IOException.class));
119140
assertThat(e.getCause().getMessage(), containsString("Simulated failure of diff serialization"));
120141
}
142+
143+
public void testHandleRemoteIncomingPublishRequestWhenNoCurrentPublishRequest() {
144+
RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class);
145+
146+
PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty());
147+
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest = p -> expectedPublishResponse;
148+
final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService);
149+
RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
150+
localNode,
151+
TERM,
152+
VERSION,
153+
CLUSTER_NAME,
154+
CLUSTER_UUID,
155+
MANIFEST_FILE
156+
);
157+
158+
IllegalStateException e = expectThrows(
159+
IllegalStateException.class,
160+
() -> handler.handleIncomingRemotePublishRequest(remotePublishRequest)
161+
);
162+
assertThat(e.getMessage(), containsString("publication to self failed"));
163+
Mockito.verifyNoInteractions(remoteClusterStateService);
164+
}
165+
166+
public void testHandleRemoteIncomingPublishRequestWhenTermMismatch() {
167+
RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class);
168+
169+
PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty());
170+
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest = p -> expectedPublishResponse;
171+
final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService);
172+
RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
173+
localNode,
174+
TERM,
175+
VERSION,
176+
CLUSTER_NAME,
177+
CLUSTER_UUID,
178+
MANIFEST_FILE
179+
);
180+
ClusterState clusterState = buildClusterState(6L, VERSION);
181+
PublishRequest publishRequest = new PublishRequest(clusterState);
182+
handler.setCurrentPublishRequestToSelf(publishRequest);
183+
IllegalStateException e = expectThrows(
184+
IllegalStateException.class,
185+
() -> handler.handleIncomingRemotePublishRequest(remotePublishRequest)
186+
);
187+
assertThat(e.getMessage(), containsString("publication to self failed"));
188+
Mockito.verifyNoInteractions(remoteClusterStateService);
189+
}
190+
191+
public void testHandleRemoteIncomingPublishRequestWhenVersionMismatch() {
192+
RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class);
193+
194+
PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty());
195+
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest = p -> expectedPublishResponse;
196+
final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService);
197+
RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
198+
localNode,
199+
TERM,
200+
VERSION,
201+
CLUSTER_NAME,
202+
CLUSTER_UUID,
203+
MANIFEST_FILE
204+
);
205+
ClusterState clusterState = buildClusterState(TERM, 11L);
206+
PublishRequest publishRequest = new PublishRequest(clusterState);
207+
handler.setCurrentPublishRequestToSelf(publishRequest);
208+
IllegalStateException e = expectThrows(
209+
IllegalStateException.class,
210+
() -> handler.handleIncomingRemotePublishRequest(remotePublishRequest)
211+
);
212+
assertThat(e.getMessage(), containsString("publication to self failed"));
213+
Mockito.verifyNoInteractions(remoteClusterStateService);
214+
}
215+
216+
public void testHandleRemoteIncomingPublishRequestForLocalNode() throws IOException {
217+
RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class);
218+
219+
PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty());
220+
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest = p -> expectedPublishResponse;
221+
final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService);
222+
RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
223+
localNode,
224+
TERM,
225+
VERSION,
226+
CLUSTER_NAME,
227+
CLUSTER_UUID,
228+
MANIFEST_FILE
229+
);
230+
ClusterState clusterState = buildClusterState(TERM, VERSION);
231+
PublishRequest publishRequest = new PublishRequest(clusterState);
232+
handler.setCurrentPublishRequestToSelf(publishRequest);
233+
PublishWithJoinResponse publishWithJoinResponse = handler.handleIncomingRemotePublishRequest(remotePublishRequest);
234+
assertThat(publishWithJoinResponse, is(expectedPublishResponse));
235+
Mockito.verifyNoInteractions(remoteClusterStateService);
236+
}
237+
238+
public void testHandleRemoteIncomingPublishRequestWhenManifestNotFound() throws IOException {
239+
RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class);
240+
241+
PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty());
242+
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest = p -> expectedPublishResponse;
243+
final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService);
244+
RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
245+
secondNode,
246+
TERM,
247+
VERSION,
248+
CLUSTER_NAME,
249+
CLUSTER_UUID,
250+
MANIFEST_FILE
251+
);
252+
when(remoteClusterStateService.getClusterMetadataManifestByFileName(CLUSTER_UUID, MANIFEST_FILE)).thenReturn(null);
253+
ClusterState clusterState = buildClusterState(TERM, VERSION);
254+
PublishRequest publishRequest = new PublishRequest(clusterState);
255+
handler.setCurrentPublishRequestToSelf(publishRequest);
256+
IllegalStateException e = expectThrows(
257+
IllegalStateException.class,
258+
() -> handler.handleIncomingRemotePublishRequest(remotePublishRequest)
259+
);
260+
assertThat(e.getMessage(), containsString("Publication failed as manifest was not found for"));
261+
Mockito.verify(remoteClusterStateService, times(1)).getClusterMetadataManifestByFileName(Mockito.any(), Mockito.any());
262+
}
263+
264+
public void testHandleRemoteIncomingPublishRequestWhenNoLastSeenState() throws IOException {
265+
RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class);
266+
267+
PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty());
268+
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest = p -> expectedPublishResponse;
269+
final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService);
270+
RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
271+
secondNode,
272+
TERM,
273+
VERSION,
274+
CLUSTER_NAME,
275+
CLUSTER_UUID,
276+
MANIFEST_FILE
277+
);
278+
ClusterMetadataManifest manifest = ClusterMetadataManifest.builder().clusterTerm(TERM).stateVersion(VERSION).build();
279+
when(remoteClusterStateService.getClusterMetadataManifestByFileName(CLUSTER_UUID, MANIFEST_FILE)).thenReturn(manifest);
280+
when(remoteClusterStateService.getClusterStateForManifest(CLUSTER_NAME, manifest, LOCAL_NODE_ID, true)).thenReturn(
281+
buildClusterState(TERM, VERSION)
282+
);
283+
ClusterState clusterState = buildClusterState(TERM, VERSION);
284+
PublishRequest publishRequest = new PublishRequest(clusterState);
285+
handler.setCurrentPublishRequestToSelf(publishRequest);
286+
PublishWithJoinResponse publishWithJoinResponse = handler.handleIncomingRemotePublishRequest(remotePublishRequest);
287+
assertThat(publishWithJoinResponse, is(expectedPublishResponse));
288+
Mockito.verify(remoteClusterStateService, times(1)).getClusterMetadataManifestByFileName(Mockito.any(), Mockito.any());
289+
}
290+
291+
private PublicationTransportHandler getPublicationTransportHandler(
292+
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest,
293+
RemoteClusterStateService remoteClusterStateService
294+
) {
295+
final PublicationTransportHandler handler = new PublicationTransportHandler(
296+
transportService,
297+
writableRegistry(),
298+
handlePublishRequest,
299+
(pu, l) -> {},
300+
remoteClusterStateService
301+
);
302+
transportService.start();
303+
transportService.acceptIncomingRequests();
304+
return handler;
305+
}
306+
307+
private ClusterState buildClusterState(long term, long version) {
308+
CoordinationMetadata.Builder coordMetadataBuilder = CoordinationMetadata.builder().term(term);
309+
Metadata newMetadata = Metadata.builder().coordinationMetadata(coordMetadataBuilder.build()).build();
310+
DiscoveryNodes nodes = DiscoveryNodes.builder().add(localNode).add(secondNode).localNodeId(LOCAL_NODE_ID).build();
311+
return ClusterState.builder(ClusterState.EMPTY_STATE).version(version).metadata(newMetadata).nodes(nodes).build();
312+
}
121313
}

0 commit comments

Comments
 (0)