Skip to content

Commit df6c12b

Browse files
shiv0408soosinha
andauthored
[Backport 2.x] Make Remote Publication a dynamic setting (opensearch-project#15937) (opensearch-project#16362)
* Make Remote Publication a dynamic setting (opensearch-project#15937) Signed-off-by: Shivansh Arora <hishiv@amazon.com> Co-authored-by: Sooraj Sinha <soosinha@amazon.com>
1 parent 12df195 commit df6c12b

File tree

10 files changed

+392
-57
lines changed

10 files changed

+392
-57
lines changed

server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java

+338-5
Large diffs are not rendered by default.

server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java

-10
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import java.util.Set;
5454

5555
import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
56-
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING;
5756
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;
5857

5958
/**
@@ -81,7 +80,6 @@ public class CoordinationState {
8180
private VotingConfiguration lastPublishedConfiguration;
8281
private VoteCollection publishVotes;
8382
private final boolean isRemoteStateEnabled;
84-
private boolean isRemotePublicationEnabled;
8583

8684
public CoordinationState(
8785
DiscoveryNode localNode,
@@ -105,14 +103,6 @@ public CoordinationState(
105103
.getLastAcceptedConfiguration();
106104
this.publishVotes = new VoteCollection();
107105
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
108-
// ToDo: revisit this check while making the setting dynamic
109-
this.isRemotePublicationEnabled = isRemoteStateEnabled
110-
&& REMOTE_PUBLICATION_SETTING.get(settings)
111-
&& localNode.isRemoteStatePublicationEnabled();
112-
}
113-
114-
public boolean isRemotePublicationEnabled() {
115-
return isRemotePublicationEnabled;
116106
}
117107

118108
public long getCurrentTerm() {

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
189189
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
190190
private final NodeHealthService nodeHealthService;
191191
private final PersistedStateRegistry persistedStateRegistry;
192+
private final RemoteClusterStateService remoteClusterStateService;
192193
private final RemoteStoreNodeService remoteStoreNodeService;
193194
private NodeConnectionsService nodeConnectionsService;
194-
private final RemoteClusterStateService remoteClusterStateService;
195+
private final ClusterSettings clusterSettings;
195196

196197
/**
197198
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
@@ -316,6 +317,7 @@ public Coordinator(
316317
this.localNodeCommissioned = true;
317318
this.remoteStoreNodeService = remoteStoreNodeService;
318319
this.remoteClusterStateService = remoteClusterStateService;
320+
this.clusterSettings = clusterSettings;
319321
}
320322

321323
private ClusterFormationState getClusterFormationState() {
@@ -1365,7 +1367,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
13651367

13661368
final PublicationTransportHandler.PublicationContext publicationContext = publicationHandler.newPublicationContext(
13671369
clusterChangedEvent,
1368-
coordinationState.get().isRemotePublicationEnabled(),
1370+
this.isRemotePublicationEnabled(),
13691371
persistedStateRegistry
13701372
);
13711373
logger.debug("initialized PublicationContext using class: {}", publicationContext.getClass().toString());
@@ -1900,8 +1902,8 @@ public static boolean isZen1Node(DiscoveryNode discoveryNode) {
19001902
}
19011903

19021904
public boolean isRemotePublicationEnabled() {
1903-
if (coordinationState.get() != null) {
1904-
return coordinationState.get().isRemotePublicationEnabled();
1905+
if (remoteClusterStateService != null) {
1906+
return remoteClusterStateService.isRemotePublicationEnabled();
19051907
}
19061908
return false;
19071909
}

server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
import java.util.function.Supplier;
5050
import java.util.stream.Collectors;
5151

52-
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
52+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableConfigured;
5353

5454
/**
5555
* A Service which provides APIs to upload and download routing table from remote store.
@@ -76,7 +76,7 @@ public InternalRemoteRoutingTableService(
7676
ThreadPool threadpool,
7777
String clusterName
7878
) {
79-
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
79+
assert isRemoteRoutingTableConfigured(settings) : "Remote routing table is not enabled";
8080
this.repositoriesService = repositoriesService;
8181
this.settings = settings;
8282
this.threadPool = threadpool;
@@ -233,7 +233,7 @@ protected void doClose() throws IOException {
233233

234234
@Override
235235
protected void doStart() {
236-
assert isRemoteRoutingTableEnabled(settings) == true : "Remote routing table is not enabled";
236+
assert isRemoteRoutingTableConfigured(settings) == true : "Remote routing table is not enabled";
237237
final String remoteStoreRepo = settings.get(
238238
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY
239239
);

server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import java.util.function.Supplier;
1717

18-
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
18+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableConfigured;
1919

2020
/**
2121
* Factory to provide impl for RemoteRoutingTableService based on settings.
@@ -37,7 +37,7 @@ public static RemoteRoutingTableService getService(
3737
ThreadPool threadPool,
3838
String clusterName
3939
) {
40-
if (isRemoteRoutingTableEnabled(settings)) {
40+
if (isRemoteRoutingTableConfigured(settings)) {
4141
return new InternalRemoteRoutingTableService(repositoriesService, settings, clusterSettings, threadPool, clusterName);
4242
}
4343
return new NoopRemoteRoutingTableService();

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

+32-17
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import java.util.concurrent.ConcurrentHashMap;
8484
import java.util.concurrent.CountDownLatch;
8585
import java.util.concurrent.TimeUnit;
86+
import java.util.concurrent.atomic.AtomicBoolean;
8687
import java.util.concurrent.atomic.AtomicReference;
8788
import java.util.function.Function;
8889
import java.util.function.LongSupplier;
@@ -112,6 +113,8 @@
112113
import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA;
113114
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX;
114115
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateConfigured;
116+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableConfigured;
117+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;
115118

116119
/**
117120
* A Service which provides APIs to upload and download cluster metadata from remote store.
@@ -132,7 +135,7 @@ public class RemoteClusterStateService implements Closeable {
132135
REMOTE_PUBLICATION_SETTING_KEY,
133136
false,
134137
Property.NodeScope,
135-
Property.Final
138+
Property.Dynamic
136139
);
137140

138141
/**
@@ -232,7 +235,7 @@ public static RemoteClusterStateValidationMode parseString(String mode) {
232235
private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged "
233236
+ "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata "
234237
+ "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]";
235-
private final boolean isPublicationEnabled;
238+
private volatile AtomicBoolean isPublicationEnabled;
236239
private final String remotePathPrefix;
237240

238241
private final RemoteClusterStateCache remoteClusterStateCache;
@@ -273,9 +276,12 @@ public RemoteClusterStateService(
273276
this.remoteStateStats = new RemotePersistenceStats();
274277
this.namedWriteableRegistry = namedWriteableRegistry;
275278
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
276-
this.isPublicationEnabled = REMOTE_PUBLICATION_SETTING.get(settings)
277-
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
278-
&& RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings);
279+
this.isPublicationEnabled = new AtomicBoolean(
280+
clusterSettings.get(REMOTE_PUBLICATION_SETTING)
281+
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
282+
&& RemoteStoreNodeAttribute.isRemoteRoutingTableConfigured(settings)
283+
);
284+
clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting);
279285
this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings);
280286
this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(
281287
repositoriesService,
@@ -303,19 +309,20 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat
303309
return null;
304310
}
305311

312+
boolean publicationEnabled = isPublicationEnabled.get();
306313
UploadedMetadataResults uploadedMetadataResults = writeMetadataInParallel(
307314
clusterState,
308315
new ArrayList<>(clusterState.metadata().indices().values()),
309316
emptyMap(),
310-
RemoteGlobalMetadataManager.filterCustoms(clusterState.metadata().customs(), isPublicationEnabled),
317+
RemoteGlobalMetadataManager.filterCustoms(clusterState.metadata().customs(), publicationEnabled),
311318
true,
312319
true,
313320
true,
314-
isPublicationEnabled,
315-
isPublicationEnabled,
316-
isPublicationEnabled,
317-
isPublicationEnabled ? clusterState.customs() : Collections.emptyMap(),
318-
isPublicationEnabled,
321+
publicationEnabled,
322+
publicationEnabled,
323+
publicationEnabled,
324+
publicationEnabled ? clusterState.customs() : Collections.emptyMap(),
325+
publicationEnabled,
319326
remoteRoutingTableService.getIndicesRouting(clusterState.getRoutingTable()),
320327
null
321328
);
@@ -394,9 +401,9 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
394401
boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles();
395402

396403
final DiffableUtils.MapDiff<String, Metadata.Custom, Map<String, Metadata.Custom>> customsDiff = remoteGlobalMetadataManager
397-
.getCustomsDiff(clusterState, previousClusterState, firstUploadForSplitGlobalMetadata, isPublicationEnabled);
404+
.getCustomsDiff(clusterState, previousClusterState, firstUploadForSplitGlobalMetadata, isPublicationEnabled.get());
398405
final DiffableUtils.MapDiff<String, ClusterState.Custom, Map<String, ClusterState.Custom>> clusterStateCustomsDiff =
399-
remoteClusterStateAttributesManager.getUpdatedCustoms(clusterState, previousClusterState, isPublicationEnabled, false);
406+
remoteClusterStateAttributesManager.getUpdatedCustoms(clusterState, previousClusterState, isPublicationEnabled.get(), false);
400407
final Map<String, UploadedMetadataAttribute> allUploadedCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap());
401408
final Map<String, UploadedMetadataAttribute> allUploadedClusterStateCustomsMap = new HashMap<>(
402409
previousManifest.getClusterStateCustomMap()
@@ -461,10 +468,10 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
461468
boolean updateTemplatesMetadata = firstUploadForSplitGlobalMetadata
462469
|| Metadata.isTemplatesMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
463470

464-
final boolean updateDiscoveryNodes = isPublicationEnabled
471+
final boolean updateDiscoveryNodes = isPublicationEnabled.get()
465472
&& clusterState.getNodes().delta(previousClusterState.getNodes()).hasChanges();
466-
final boolean updateClusterBlocks = isPublicationEnabled && !clusterState.blocks().equals(previousClusterState.blocks());
467-
final boolean updateHashesOfConsistentSettings = isPublicationEnabled
473+
final boolean updateClusterBlocks = isPublicationEnabled.get() && !clusterState.blocks().equals(previousClusterState.blocks());
474+
final boolean updateHashesOfConsistentSettings = isPublicationEnabled.get()
468475
&& Metadata.isHashesOfConsistentSettingsEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
469476

470477
uploadedMetadataResults = writeMetadataInParallel(
@@ -1115,6 +1122,14 @@ private void setChecksumValidationMode(RemoteClusterStateValidationMode remoteCl
11151122
this.remoteClusterStateValidationMode = remoteClusterStateValidationMode;
11161123
}
11171124

1125+
private void setRemotePublicationSetting(boolean remotePublicationSetting) {
1126+
if (remotePublicationSetting == false) {
1127+
this.isPublicationEnabled.set(false);
1128+
} else {
1129+
this.isPublicationEnabled.set(isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableConfigured(settings));
1130+
}
1131+
}
1132+
11181133
// Package private for unit test
11191134
RemoteRoutingTableService getRemoteRoutingTableService() {
11201135
return this.remoteRoutingTableService;
@@ -1830,7 +1845,7 @@ public String getLastKnownUUIDFromRemote(String clusterName) {
18301845
}
18311846

18321847
public boolean isRemotePublicationEnabled() {
1833-
return this.isPublicationEnabled;
1848+
return this.isPublicationEnabled.get();
18341849
}
18351850

18361851
public void setRemoteStateReadTimeout(TimeValue remoteStateReadTimeout) {

server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ public class RemotePersistenceStats {
2121
RemoteDownloadStats remoteDiffDownloadStats;
2222
RemoteDownloadStats remoteFullDownloadStats;
2323

24-
final String FULL_DOWNLOAD_STATS = "remote_full_download";
25-
final String DIFF_DOWNLOAD_STATS = "remote_diff_download";
24+
public static final String FULL_DOWNLOAD_STATS = "remote_full_download";
25+
public static final String DIFF_DOWNLOAD_STATS = "remote_diff_download";
2626

2727
public RemotePersistenceStats() {
2828
remoteUploadStats = new RemoteUploadStats();

server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ private static boolean isRemoteRoutingTableAttributePresent(Settings settings) {
202202
.isEmpty() == false;
203203
}
204204

205-
public static boolean isRemoteRoutingTableEnabled(Settings settings) {
205+
public static boolean isRemoteRoutingTableConfigured(Settings settings) {
206206
return isRemoteRoutingTableAttributePresent(settings);
207207
}
208208

server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java

-11
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@
6868
import static java.util.Collections.emptyMap;
6969
import static java.util.Collections.emptySet;
7070
import static org.opensearch.gateway.remote.ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION;
71-
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
7271
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY;
7372
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
7473
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
@@ -1268,16 +1267,6 @@ public void testHandleCommitOnFollowerNodeWhenRemotePublicationEnabledWithNullRe
12681267
verifyNoInteractions(remoteClusterStateService);
12691268
}
12701269

1271-
public void testIsRemotePublicationEnabled_WithInconsistentSettings() {
1272-
// create settings with remote state disabled but publication enabled
1273-
Settings settings = Settings.builder()
1274-
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), false)
1275-
.put(REMOTE_PUBLICATION_SETTING_KEY, true)
1276-
.build();
1277-
CoordinationState coordinationState = createCoordinationState(psr1, node1, settings);
1278-
assertFalse(coordinationState.isRemotePublicationEnabled());
1279-
}
1280-
12811270
public static CoordinationState createCoordinationState(
12821271
PersistedStateRegistry persistedStateRegistry,
12831272
DiscoveryNode localNode,

server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@
151151
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
152152
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
153153
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
154-
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
154+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableConfigured;
155155
import static org.hamcrest.Matchers.anEmptyMap;
156156
import static org.hamcrest.Matchers.equalTo;
157157
import static org.hamcrest.Matchers.is;
@@ -370,6 +370,8 @@ public void testWriteFullMetadataSuccessPublicationEnabled() throws IOException
370370
// TODO Make the publication flag parameterized
371371
publicationEnabled = true;
372372
settings = Settings.builder().put(settings).put(REMOTE_PUBLICATION_SETTING_KEY, publicationEnabled).build();
373+
clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
374+
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
373375
remoteClusterStateService = new RemoteClusterStateService(
374376
"test-node-id",
375377
repositoriesServiceSupplier,
@@ -388,6 +390,7 @@ public void testWriteFullMetadataSuccessPublicationEnabled() throws IOException
388390
),
389391
writableRegistry()
390392
);
393+
assertTrue(remoteClusterStateService.isRemotePublicationEnabled());
391394
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager())
392395
.customs(
393396
Map.of(
@@ -747,6 +750,8 @@ public void testWriteIncrementalMetadataSuccess() throws IOException {
747750
public void testWriteIncrementalMetadataSuccessWhenPublicationEnabled() throws IOException {
748751
publicationEnabled = true;
749752
settings = Settings.builder().put(settings).put(REMOTE_PUBLICATION_SETTING_KEY, true).build();
753+
clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
754+
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
750755
remoteClusterStateService = new RemoteClusterStateService(
751756
"test-node-id",
752757
repositoriesServiceSupplier,
@@ -765,6 +770,7 @@ public void testWriteIncrementalMetadataSuccessWhenPublicationEnabled() throws I
765770
),
766771
writableRegistry()
767772
);
773+
assertTrue(remoteClusterStateService.isRemotePublicationEnabled());
768774
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
769775
mockBlobStoreObjects();
770776
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build();
@@ -2749,7 +2755,7 @@ public void testRemoteStateUploadStats() throws IOException {
27492755
}
27502756

27512757
public void testRemoteRoutingTableNotInitializedWhenDisabled() {
2752-
if (isRemoteRoutingTableEnabled(settings)) {
2758+
if (isRemoteRoutingTableConfigured(settings)) {
27532759
assertTrue(remoteClusterStateService.getRemoteRoutingTableService() instanceof InternalRemoteRoutingTableService);
27542760
} else {
27552761
assertTrue(remoteClusterStateService.getRemoteRoutingTableService() instanceof NoopRemoteRoutingTableService);

0 commit comments

Comments
 (0)