Skip to content

Commit 5008552

Browse files
committed
removed isRemotePublicationEnabled flag from CoordinationState
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
1 parent 6ca49aa commit 5008552

File tree

8 files changed

+38
-69
lines changed

8 files changed

+38
-69
lines changed

server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java

+1-22
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType;
3939
import org.opensearch.cluster.metadata.Metadata;
4040
import org.opensearch.cluster.node.DiscoveryNode;
41-
import org.opensearch.common.settings.ClusterSettings;
4241
import org.opensearch.common.settings.Settings;
4342
import org.opensearch.common.util.io.IOUtils;
4443
import org.opensearch.gateway.remote.ClusterMetadataManifest;
@@ -54,7 +53,6 @@
5453
import java.util.Set;
5554

5655
import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
57-
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING;
5856
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;
5957

6058
/**
@@ -82,14 +80,12 @@ public class CoordinationState {
8280
private VotingConfiguration lastPublishedConfiguration;
8381
private VoteCollection publishVotes;
8482
private final boolean isRemoteStateEnabled;
85-
private boolean isRemotePublicationEnabled;
8683

8784
public CoordinationState(
8885
DiscoveryNode localNode,
8986
PersistedStateRegistry persistedStateRegistry,
9087
ElectionStrategy electionStrategy,
91-
Settings settings,
92-
ClusterSettings clusterSettings
88+
Settings settings
9389
) {
9490
this.localNode = localNode;
9591

@@ -107,14 +103,6 @@ public CoordinationState(
107103
.getLastAcceptedConfiguration();
108104
this.publishVotes = new VoteCollection();
109105
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
110-
this.isRemotePublicationEnabled = isRemoteStateEnabled
111-
&& REMOTE_PUBLICATION_SETTING.get(settings)
112-
&& localNode.isRemoteStatePublicationConfigured();
113-
clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting);
114-
}
115-
116-
public boolean isRemotePublicationEnabled() {
117-
return isRemotePublicationEnabled;
118106
}
119107

120108
public long getCurrentTerm() {
@@ -653,15 +641,6 @@ private boolean shouldCommitRemotePersistedState() {
653641
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() != null;
654642
}
655643

656-
private void setRemotePublicationSetting(boolean remotePublicationSetting) {
657-
if (remotePublicationSetting == false) {
658-
this.isRemotePublicationEnabled = false;
659-
} else {
660-
this.isRemotePublicationEnabled = isRemoteStateEnabled && localNode.isRemoteStatePublicationConfigured();
661-
}
662-
663-
}
664-
665644
/**
666645
* Pluggable persistence layer for {@link CoordinationState}.
667646
*

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -871,9 +871,7 @@ boolean publicationInProgress() {
871871
@Override
872872
protected void doStart() {
873873
synchronized (mutex) {
874-
coordinationState.set(
875-
new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings, clusterSettings)
876-
);
874+
coordinationState.set(new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings));
877875
peerFinder.setCurrentTerm(getCurrentTerm());
878876
configuredHostsResolver.start();
879877
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
@@ -1362,7 +1360,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
13621360

13631361
final PublicationTransportHandler.PublicationContext publicationContext = publicationHandler.newPublicationContext(
13641362
clusterChangedEvent,
1365-
coordinationState.get().isRemotePublicationEnabled(),
1363+
this.isRemotePublicationEnabled(),
13661364
persistedStateRegistry
13671365
);
13681366
logger.debug("initialized PublicationContext using class: {}", publicationContext.getClass().toString());
@@ -1891,8 +1889,8 @@ public static boolean isZen1Node(DiscoveryNode discoveryNode) {
18911889
}
18921890

18931891
public boolean isRemotePublicationEnabled() {
1894-
if (coordinationState.get() != null) {
1895-
return coordinationState.get().isRemotePublicationEnabled();
1892+
if (remoteClusterStateService != null) {
1893+
return remoteClusterStateService.isRemotePublicationEnabled();
18961894
}
18971895
return false;
18981896
}

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

+22-18
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import java.util.concurrent.ConcurrentHashMap;
8484
import java.util.concurrent.CountDownLatch;
8585
import java.util.concurrent.TimeUnit;
86+
import java.util.concurrent.atomic.AtomicBoolean;
8687
import java.util.concurrent.atomic.AtomicReference;
8788
import java.util.function.Function;
8889
import java.util.function.LongSupplier;
@@ -233,7 +234,7 @@ public static RemoteClusterStateValidationMode parseString(String mode) {
233234
private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged "
234235
+ "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata "
235236
+ "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]";
236-
private boolean isPublicationEnabled;
237+
private AtomicBoolean isPublicationEnabled;
237238
private final String remotePathPrefix;
238239

239240
private final RemoteClusterStateCache remoteClusterStateCache;
@@ -274,9 +275,11 @@ public RemoteClusterStateService(
274275
this.remoteStateStats = new RemotePersistenceStats();
275276
this.namedWriteableRegistry = namedWriteableRegistry;
276277
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
277-
this.isPublicationEnabled = clusterSettings.get(REMOTE_PUBLICATION_SETTING)
278-
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
279-
&& RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings);
278+
this.isPublicationEnabled = new AtomicBoolean(
279+
clusterSettings.get(REMOTE_PUBLICATION_SETTING)
280+
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
281+
&& RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings)
282+
);
280283
clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting);
281284
this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings);
282285
this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(
@@ -305,19 +308,20 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat
305308
return null;
306309
}
307310

311+
boolean publicationEnabled = isPublicationEnabled.get();
308312
UploadedMetadataResults uploadedMetadataResults = writeMetadataInParallel(
309313
clusterState,
310314
new ArrayList<>(clusterState.metadata().indices().values()),
311315
emptyMap(),
312-
RemoteGlobalMetadataManager.filterCustoms(clusterState.metadata().customs(), isPublicationEnabled),
316+
RemoteGlobalMetadataManager.filterCustoms(clusterState.metadata().customs(), publicationEnabled),
313317
true,
314318
true,
315319
true,
316-
isPublicationEnabled,
317-
isPublicationEnabled,
318-
isPublicationEnabled,
319-
isPublicationEnabled ? clusterState.customs() : Collections.emptyMap(),
320-
isPublicationEnabled,
320+
publicationEnabled,
321+
publicationEnabled,
322+
publicationEnabled,
323+
publicationEnabled ? clusterState.customs() : Collections.emptyMap(),
324+
publicationEnabled,
321325
remoteRoutingTableService.getIndicesRouting(clusterState.getRoutingTable()),
322326
null
323327
);
@@ -396,9 +400,9 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
396400
boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles();
397401

398402
final DiffableUtils.MapDiff<String, Metadata.Custom, Map<String, Metadata.Custom>> customsDiff = remoteGlobalMetadataManager
399-
.getCustomsDiff(clusterState, previousClusterState, firstUploadForSplitGlobalMetadata, isPublicationEnabled);
403+
.getCustomsDiff(clusterState, previousClusterState, firstUploadForSplitGlobalMetadata, isPublicationEnabled.get());
400404
final DiffableUtils.MapDiff<String, ClusterState.Custom, Map<String, ClusterState.Custom>> clusterStateCustomsDiff =
401-
remoteClusterStateAttributesManager.getUpdatedCustoms(clusterState, previousClusterState, isPublicationEnabled, false);
405+
remoteClusterStateAttributesManager.getUpdatedCustoms(clusterState, previousClusterState, isPublicationEnabled.get(), false);
402406
final Map<String, UploadedMetadataAttribute> allUploadedCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap());
403407
final Map<String, UploadedMetadataAttribute> allUploadedClusterStateCustomsMap = new HashMap<>(
404408
previousManifest.getClusterStateCustomMap()
@@ -463,10 +467,10 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
463467
boolean updateTemplatesMetadata = firstUploadForSplitGlobalMetadata
464468
|| Metadata.isTemplatesMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
465469

466-
final boolean updateDiscoveryNodes = isPublicationEnabled
470+
final boolean updateDiscoveryNodes = isPublicationEnabled.get()
467471
&& clusterState.getNodes().delta(previousClusterState.getNodes()).hasChanges();
468-
final boolean updateClusterBlocks = isPublicationEnabled && !clusterState.blocks().equals(previousClusterState.blocks());
469-
final boolean updateHashesOfConsistentSettings = isPublicationEnabled
472+
final boolean updateClusterBlocks = isPublicationEnabled.get() && !clusterState.blocks().equals(previousClusterState.blocks());
473+
final boolean updateHashesOfConsistentSettings = isPublicationEnabled.get()
470474
&& Metadata.isHashesOfConsistentSettingsEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
471475

472476
uploadedMetadataResults = writeMetadataInParallel(
@@ -1119,9 +1123,9 @@ private void setChecksumValidationMode(RemoteClusterStateValidationMode remoteCl
11191123

11201124
private void setRemotePublicationSetting(boolean remotePublicationSetting) {
11211125
if (remotePublicationSetting == false) {
1122-
this.isPublicationEnabled = false;
1126+
this.isPublicationEnabled.set(false);
11231127
} else {
1124-
this.isPublicationEnabled = isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableEnabled(settings);
1128+
this.isPublicationEnabled.set(isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableEnabled(settings));
11251129
}
11261130
}
11271131

@@ -1840,7 +1844,7 @@ public String getLastKnownUUIDFromRemote(String clusterName) {
18401844
}
18411845

18421846
public boolean isRemotePublicationEnabled() {
1843-
return this.isPublicationEnabled;
1847+
return this.isPublicationEnabled.get();
18441848
}
18451849

18461850
public void setRemoteStateReadTimeout(TimeValue remoteStateReadTimeout) {

server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java

+1-12
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@
6868
import static java.util.Collections.emptyMap;
6969
import static java.util.Collections.emptySet;
7070
import static org.opensearch.gateway.remote.ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION;
71-
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
7271
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY;
7372
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
7473
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
@@ -1268,22 +1267,12 @@ public void testHandleCommitOnFollowerNodeWhenRemotePublicationEnabledWithNullRe
12681267
verifyNoInteractions(remoteClusterStateService);
12691268
}
12701269

1271-
public void testIsRemotePublicationEnabled_WithInconsistentSettings() {
1272-
// create settings with remote state disabled but publication enabled
1273-
Settings settings = Settings.builder()
1274-
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), false)
1275-
.put(REMOTE_PUBLICATION_SETTING_KEY, true)
1276-
.build();
1277-
CoordinationState coordinationState = createCoordinationState(psr1, node1, settings);
1278-
assertFalse(coordinationState.isRemotePublicationEnabled());
1279-
}
1280-
12811270
public static CoordinationState createCoordinationState(
12821271
PersistedStateRegistry persistedStateRegistry,
12831272
DiscoveryNode localNode,
12841273
Settings settings
12851274
) {
1286-
return new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, settings, null);
1275+
return new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, settings);
12871276
}
12881277

12891278
public static ClusterState clusterState(

server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -302,8 +302,7 @@ public void testPrevotingIndicatesElectionSuccess() {
302302
localNode,
303303
persistedStateRegistry,
304304
ElectionStrategy.DEFAULT_INSTANCE,
305-
Settings.EMPTY,
306-
null
305+
Settings.EMPTY
307306
);
308307

309308
final long newTerm = randomLongBetween(currentTerm + 1, Long.MAX_VALUE);

server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java

+1-7
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,7 @@ class MockNode {
9494
);
9595
PersistedStateRegistry persistedStateRegistry = persistedStateRegistry();
9696
persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, initialState));
97-
coordinationState = new CoordinationState(
98-
localNode,
99-
persistedStateRegistry,
100-
ElectionStrategy.DEFAULT_INSTANCE,
101-
Settings.EMPTY,
102-
null
103-
);
97+
coordinationState = new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, Settings.EMPTY);
10498
}
10599

106100
final DiscoveryNode localNode;

server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java

+6
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,8 @@ public void testWriteFullMetadataSuccessPublicationEnabled() throws IOException
370370
// TODO Make the publication flag parameterized
371371
publicationEnabled = true;
372372
settings = Settings.builder().put(settings).put(REMOTE_PUBLICATION_SETTING_KEY, publicationEnabled).build();
373+
clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
374+
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
373375
remoteClusterStateService = new RemoteClusterStateService(
374376
"test-node-id",
375377
repositoriesServiceSupplier,
@@ -388,6 +390,7 @@ public void testWriteFullMetadataSuccessPublicationEnabled() throws IOException
388390
),
389391
writableRegistry()
390392
);
393+
assertTrue(remoteClusterStateService.isRemotePublicationEnabled());
391394
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager())
392395
.customs(
393396
Map.of(
@@ -747,6 +750,8 @@ public void testWriteIncrementalMetadataSuccess() throws IOException {
747750
public void testWriteIncrementalMetadataSuccessWhenPublicationEnabled() throws IOException {
748751
publicationEnabled = true;
749752
settings = Settings.builder().put(settings).put(REMOTE_PUBLICATION_SETTING_KEY, true).build();
753+
clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
754+
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
750755
remoteClusterStateService = new RemoteClusterStateService(
751756
"test-node-id",
752757
repositoriesServiceSupplier,
@@ -765,6 +770,7 @@ public void testWriteIncrementalMetadataSuccessWhenPublicationEnabled() throws I
765770
),
766771
writableRegistry()
767772
);
773+
assertTrue(remoteClusterStateService.isRemotePublicationEnabled());
768774
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
769775
mockBlobStoreObjects();
770776
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build();

test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ static class ClusterNode {
150150
persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, persistedState);
151151

152152
this.electionStrategy = electionStrategy;
153-
state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY, null);
153+
state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY);
154154
}
155155

156156
void reboot() {
@@ -189,7 +189,7 @@ void reboot() {
189189
localNode.getVersion()
190190
);
191191

192-
state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY, null);
192+
state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY);
193193
}
194194

195195
void setInitialState(CoordinationMetadata.VotingConfiguration initialConfig, long initialValue) {

0 commit comments

Comments
 (0)