Skip to content

Commit c2b26a2

Browse files
committed
Make Remote Publication a dynamic setting
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
1 parent 8ddb3ee commit c2b26a2

File tree

12 files changed

+333
-34
lines changed

12 files changed

+333
-34
lines changed

server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java

+276-3
Large diffs are not rendered by default.

server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java

+14-3
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType;
3939
import org.opensearch.cluster.metadata.Metadata;
4040
import org.opensearch.cluster.node.DiscoveryNode;
41+
import org.opensearch.common.settings.ClusterSettings;
4142
import org.opensearch.common.settings.Settings;
4243
import org.opensearch.common.util.io.IOUtils;
4344
import org.opensearch.gateway.remote.ClusterMetadataManifest;
@@ -87,7 +88,8 @@ public CoordinationState(
8788
DiscoveryNode localNode,
8889
PersistedStateRegistry persistedStateRegistry,
8990
ElectionStrategy electionStrategy,
90-
Settings settings
91+
Settings settings,
92+
ClusterSettings clusterSettings
9193
) {
9294
this.localNode = localNode;
9395

@@ -105,10 +107,10 @@ public CoordinationState(
105107
.getLastAcceptedConfiguration();
106108
this.publishVotes = new VoteCollection();
107109
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
108-
// ToDo: revisit this check while making the setting dynamic
109110
this.isRemotePublicationEnabled = isRemoteStateEnabled
110111
&& REMOTE_PUBLICATION_SETTING.get(settings)
111-
&& localNode.isRemoteStatePublicationEnabled();
112+
&& localNode.isRemoteStatePublicationConfigured();
113+
clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting);
112114
}
113115

114116
public boolean isRemotePublicationEnabled() {
@@ -651,6 +653,15 @@ private boolean shouldCommitRemotePersistedState() {
651653
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() != null;
652654
}
653655

656+
private void setRemotePublicationSetting(boolean remotePublicationSetting) {
657+
if (remotePublicationSetting == false) {
658+
this.isRemotePublicationEnabled = false;
659+
} else {
660+
this.isRemotePublicationEnabled = isRemoteStateEnabled && localNode.isRemoteStatePublicationConfigured();
661+
}
662+
663+
}
664+
654665
/**
655666
* Pluggable persistence layer for {@link CoordinationState}.
656667
*

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
187187
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
188188
private final NodeHealthService nodeHealthService;
189189
private final PersistedStateRegistry persistedStateRegistry;
190+
private final RemoteClusterStateService remoteClusterStateService;
190191
private final RemoteStoreNodeService remoteStoreNodeService;
191192
private NodeConnectionsService nodeConnectionsService;
193+
private final ClusterSettings clusterSettings;
192194

193195
/**
194196
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
@@ -312,6 +314,8 @@ public Coordinator(
312314
this.persistedStateRegistry = persistedStateRegistry;
313315
this.localNodeCommissioned = true;
314316
this.remoteStoreNodeService = remoteStoreNodeService;
317+
this.remoteClusterStateService = remoteClusterStateService;
318+
this.clusterSettings = clusterSettings;
315319
}
316320

317321
private ClusterFormationState getClusterFormationState() {
@@ -867,7 +871,9 @@ boolean publicationInProgress() {
867871
@Override
868872
protected void doStart() {
869873
synchronized (mutex) {
870-
coordinationState.set(new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings));
874+
coordinationState.set(
875+
new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings, clusterSettings)
876+
);
871877
peerFinder.setCurrentTerm(getCurrentTerm());
872878
configuredHostsResolver.start();
873879
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
@@ -912,9 +918,9 @@ public DiscoveryStats stats() {
912918
stats.add(persistedStateRegistry.getPersistedState(stateType).getStats());
913919
}
914920
});
915-
if (coordinationState.get().isRemotePublicationEnabled()) {
916-
stats.add(publicationHandler.getFullDownloadStats());
917-
stats.add(publicationHandler.getDiffDownloadStats());
921+
if (remoteClusterStateService != null) {
922+
stats.add(remoteClusterStateService.getFullDownloadStats());
923+
stats.add(remoteClusterStateService.getDiffDownloadStats());
918924
}
919925
clusterStateStats.setPersistenceStats(stats);
920926
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats);

server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -513,10 +513,10 @@ private static void ensureRemoteClusterStateNodesCompatibility(DiscoveryNode joi
513513

514514
assert existingNodes.isEmpty() == false;
515515
Optional<DiscoveryNode> remotePublicationNode = existingNodes.stream()
516-
.filter(DiscoveryNode::isRemoteStatePublicationEnabled)
516+
.filter(DiscoveryNode::isRemoteStatePublicationConfigured)
517517
.findFirst();
518518

519-
if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationEnabled()) {
519+
if (remotePublicationNode.isPresent() && joiningNode.isRemoteStatePublicationConfigured()) {
520520
ensureRepositoryCompatibility(joiningNode, remotePublicationNode.get(), REMOTE_CLUSTER_PUBLICATION_REPO_NAME_ATTRIBUTES);
521521
}
522522
}

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

+3-11
Original file line numberDiff line numberDiff line change
@@ -178,14 +178,6 @@ public PublishClusterStateStats stats() {
178178
);
179179
}
180180

181-
public PersistedStateStats getFullDownloadStats() {
182-
return remoteClusterStateService.getFullDownloadStats();
183-
}
184-
185-
public PersistedStateStats getDiffDownloadStats() {
186-
return remoteClusterStateService.getDiffDownloadStats();
187-
}
188-
189181
private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
190182
try (StreamInput in = CompressedStreamUtils.decompressBytes(request, namedWriteableRegistry)) {
191183
ClusterState incomingState;
@@ -356,7 +348,7 @@ public PublicationContext newPublicationContext(
356348
) {
357349
if (isRemotePublicationEnabled == true) {
358350
if (allNodesRemotePublicationEnabled.get() == false) {
359-
if (validateRemotePublicationOnAllNodes(clusterChangedEvent.state().nodes()) == true) {
351+
if (validateRemotePublicationConfiguredOnAllNodes(clusterChangedEvent.state().nodes()) == true) {
360352
allNodesRemotePublicationEnabled.set(true);
361353
}
362354
}
@@ -374,11 +366,11 @@ public PublicationContext newPublicationContext(
374366
return publicationContext;
375367
}
376368

377-
private boolean validateRemotePublicationOnAllNodes(DiscoveryNodes discoveryNodes) {
369+
private boolean validateRemotePublicationConfiguredOnAllNodes(DiscoveryNodes discoveryNodes) {
378370
assert ClusterMetadataManifest.getCodecForVersion(discoveryNodes.getMinNodeVersion()) >= ClusterMetadataManifest.CODEC_V0;
379371
for (DiscoveryNode node : discoveryNodes.getNodes().values()) {
380372
// if a node is non-remote then created local publication context
381-
if (node.isRemoteStatePublicationEnabled() == false) {
373+
if (node.isRemoteStatePublicationConfigured() == false) {
382374
return false;
383375
}
384376
}

server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,7 @@ public boolean isRemoteStoreNode() {
518518
* Returns whether remote cluster state publication is enabled on this node
519519
* @return true if the node contains remote cluster state node attribute and remote routing table node attribute
520520
*/
521-
public boolean isRemoteStatePublicationEnabled() {
521+
public boolean isRemoteStatePublicationConfigured() {
522522
return this.getAttributes()
523523
.keySet()
524524
.stream()

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@
111111
import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA;
112112
import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA;
113113
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX;
114+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
114115
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;
115116

116117
/**
@@ -132,7 +133,7 @@ public class RemoteClusterStateService implements Closeable {
132133
REMOTE_PUBLICATION_SETTING_KEY,
133134
false,
134135
Property.NodeScope,
135-
Property.Final
136+
Property.Dynamic
136137
);
137138

138139
/**
@@ -232,7 +233,7 @@ public static RemoteClusterStateValidationMode parseString(String mode) {
232233
private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged "
233234
+ "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata "
234235
+ "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]";
235-
private final boolean isPublicationEnabled;
236+
private boolean isPublicationEnabled;
236237
private final String remotePathPrefix;
237238

238239
private final RemoteClusterStateCache remoteClusterStateCache;
@@ -273,9 +274,10 @@ public RemoteClusterStateService(
273274
this.remoteStateStats = new RemotePersistenceStats();
274275
this.namedWriteableRegistry = namedWriteableRegistry;
275276
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
276-
this.isPublicationEnabled = REMOTE_PUBLICATION_SETTING.get(settings)
277+
this.isPublicationEnabled = clusterSettings.get(REMOTE_PUBLICATION_SETTING)
277278
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
278279
&& RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings);
280+
clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting);
279281
this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings);
280282
this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(
281283
repositoriesService,
@@ -1109,6 +1111,14 @@ private void setChecksumValidationMode(RemoteClusterStateValidationMode remoteCl
11091111
this.remoteClusterStateValidationMode = remoteClusterStateValidationMode;
11101112
}
11111113

1114+
private void setRemotePublicationSetting(boolean remotePublicationSetting) {
1115+
if (remotePublicationSetting == false) {
1116+
this.isPublicationEnabled = false;
1117+
} else {
1118+
this.isPublicationEnabled = isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableEnabled(settings);
1119+
}
1120+
}
1121+
11121122
// Package private for unit test
11131123
RemoteRoutingTableService getRemoteRoutingTableService() {
11141124
return this.remoteRoutingTableService;

server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ public class RemotePersistenceStats {
2121
RemoteDownloadStats remoteDiffDownloadStats;
2222
RemoteDownloadStats remoteFullDownloadStats;
2323

24-
final String FULL_DOWNLOAD_STATS = "remote_full_download";
25-
final String DIFF_DOWNLOAD_STATS = "remote_diff_download";
24+
public static final String FULL_DOWNLOAD_STATS = "remote_full_download";
25+
public static final String DIFF_DOWNLOAD_STATS = "remote_diff_download";
2626

2727
public RemotePersistenceStats() {
2828
remoteUploadStats = new RemoteUploadStats();

server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1283,7 +1283,7 @@ public static CoordinationState createCoordinationState(
12831283
DiscoveryNode localNode,
12841284
Settings settings
12851285
) {
1286-
return new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, settings);
1286+
return new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, settings, null);
12871287
}
12881288

12891289
public static ClusterState clusterState(

server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,8 @@ public void testPrevotingIndicatesElectionSuccess() {
302302
localNode,
303303
persistedStateRegistry,
304304
ElectionStrategy.DEFAULT_INSTANCE,
305-
Settings.EMPTY
305+
Settings.EMPTY,
306+
null
306307
);
307308

308309
final long newTerm = randomLongBetween(currentTerm + 1, Long.MAX_VALUE);

server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,13 @@ class MockNode {
9494
);
9595
PersistedStateRegistry persistedStateRegistry = persistedStateRegistry();
9696
persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, initialState));
97-
coordinationState = new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, Settings.EMPTY);
97+
coordinationState = new CoordinationState(
98+
localNode,
99+
persistedStateRegistry,
100+
ElectionStrategy.DEFAULT_INSTANCE,
101+
Settings.EMPTY,
102+
null
103+
);
98104
}
99105

100106
final DiscoveryNode localNode;

test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ static class ClusterNode {
150150
persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, persistedState);
151151

152152
this.electionStrategy = electionStrategy;
153-
state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY);
153+
state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY, null);
154154
}
155155

156156
void reboot() {
@@ -189,7 +189,7 @@ void reboot() {
189189
localNode.getVersion()
190190
);
191191

192-
state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY);
192+
state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY, null);
193193
}
194194

195195
void setInitialState(CoordinationMetadata.VotingConfiguration initialConfig, long initialValue) {

0 commit comments

Comments
 (0)