Skip to content

Commit 8ad0dc0

Browse files
authored
[Remote Store] Add RemoteStoreSettings class to handle remote store related settings (opensearch-project#12838)
* Add RemoteStoreSettings class to handle remote store related settings --------- Signed-off-by: Sachin Kale <kalsac@amazon.com> Co-authored-by: Sachin Kale <kalsac@amazon.com>
1 parent 6ddbdcd commit 8ad0dc0

19 files changed

+249
-136
lines changed

server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import org.opensearch.index.translog.TestTranslog;
8585
import org.opensearch.index.translog.Translog;
8686
import org.opensearch.index.translog.TranslogStats;
87+
import org.opensearch.indices.DefaultRemoteStoreSettings;
8788
import org.opensearch.indices.IndicesService;
8889
import org.opensearch.indices.recovery.RecoveryState;
8990
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
@@ -711,9 +712,9 @@ public static final IndexShard newIndexShard(
711712
SegmentReplicationCheckpointPublisher.EMPTY,
712713
null,
713714
null,
714-
() -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL,
715715
nodeId,
716716
null,
717+
DefaultRemoteStoreSettings.INSTANCE,
717718
false
718719
);
719720
}

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.opensearch.index.shard.IndexShardClosedException;
3232
import org.opensearch.index.translog.Translog.Durability;
3333
import org.opensearch.indices.IndicesService;
34+
import org.opensearch.indices.RemoteStoreSettings;
3435
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
3536
import org.opensearch.indices.recovery.RecoverySettings;
3637
import org.opensearch.indices.recovery.RecoveryState;
@@ -56,7 +57,7 @@
5657

5758
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
5859
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
59-
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
60+
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
6061
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
6162
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
6263
import static org.hamcrest.Matchers.comparesEqualTo;
@@ -189,7 +190,7 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
189190
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");
190191

191192
IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME);
192-
int lastNMetadataFilesToKeep = indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles();
193+
int lastNMetadataFilesToKeep = indexShard.getRemoteStoreSettings().getMinRemoteSegmentMetadataFiles();
193194
// Delete is async.
194195
assertBusy(() -> {
195196
int actualFileCount = getFileCount(indexPath);
@@ -224,7 +225,7 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
224225

225226
public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception {
226227
Settings.Builder settings = Settings.builder()
227-
.put(RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "3");
228+
.put(RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "3");
228229
internalCluster().startNode(settings);
229230

230231
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
@@ -243,7 +244,7 @@ public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception {
243244

244245
public void testStaleCommitDeletionWithMinSegmentFiles_Disabled() throws Exception {
245246
Settings.Builder settings = Settings.builder()
246-
.put(RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "-1");
247+
.put(RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "-1");
247248
internalCluster().startNode(settings);
248249

249250
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
@@ -469,7 +470,7 @@ public void testAsyncDurabilityThrowsExceptionWhenRestrictSettingTrue() throws E
469470

470471
private void assertClusterRemoteBufferInterval(TimeValue expectedBufferInterval, String dataNode) {
471472
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode);
472-
assertEquals(expectedBufferInterval, indicesService.getClusterRemoteTranslogBufferInterval());
473+
assertEquals(expectedBufferInterval, indicesService.getRemoteStoreSettings().getClusterRemoteTranslogBufferInterval());
473474
}
474475

475476
private void assertBufferInterval(TimeValue expectedBufferInterval, IndexShard indexShard) {

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@
117117
import org.opensearch.indices.IndicesQueryCache;
118118
import org.opensearch.indices.IndicesRequestCache;
119119
import org.opensearch.indices.IndicesService;
120+
import org.opensearch.indices.RemoteStoreSettings;
120121
import org.opensearch.indices.ShardLimitValidator;
121122
import org.opensearch.indices.analysis.HunspellService;
122123
import org.opensearch.indices.breaker.BreakerSettings;
@@ -297,7 +298,6 @@ public void apply(Settings value, Settings current, Settings previous) {
297298
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
298299
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING,
299300
RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT,
300-
RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
301301
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
302302
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING,
303303
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
@@ -706,7 +706,6 @@ public void apply(Settings value, Settings current, Settings previous) {
706706
RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
707707
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
708708
RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING,
709-
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
710709
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
711710
IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING,
712711
IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING,
@@ -723,7 +722,10 @@ public void apply(Settings value, Settings current, Settings previous) {
723722

724723
// Concurrent segment search settings
725724
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING,
726-
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING
725+
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING,
726+
727+
RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
728+
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING
727729
)
728730
)
729731
);

server/src/main/java/org/opensearch/index/IndexModule.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.opensearch.index.store.remote.filecache.FileCache;
8080
import org.opensearch.index.translog.TranslogFactory;
8181
import org.opensearch.indices.IndicesQueryCache;
82+
import org.opensearch.indices.RemoteStoreSettings;
8283
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
8384
import org.opensearch.indices.mapper.MapperRegistry;
8485
import org.opensearch.indices.recovery.RecoverySettings;
@@ -604,8 +605,8 @@ public IndexService newIndexService(
604605
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
605606
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
606607
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
607-
Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier,
608-
RecoverySettings recoverySettings
608+
RecoverySettings recoverySettings,
609+
RemoteStoreSettings remoteStoreSettings
609610
) throws IOException {
610611
final IndexEventListener eventListener = freeze();
611612
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
@@ -663,8 +664,8 @@ public IndexService newIndexService(
663664
recoveryStateFactory,
664665
translogFactorySupplier,
665666
clusterDefaultRefreshIntervalSupplier,
666-
clusterRemoteTranslogBufferIntervalSupplier,
667-
recoverySettings
667+
recoverySettings,
668+
remoteStoreSettings
668669
);
669670
success = true;
670671
return indexService;

server/src/main/java/org/opensearch/index/IndexService.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
import org.opensearch.index.store.Store;
9595
import org.opensearch.index.translog.Translog;
9696
import org.opensearch.index.translog.TranslogFactory;
97+
import org.opensearch.indices.RemoteStoreSettings;
9798
import org.opensearch.indices.cluster.IndicesClusterStateService;
9899
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
99100
import org.opensearch.indices.mapper.MapperRegistry;
@@ -183,8 +184,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
183184
private final ValuesSourceRegistry valuesSourceRegistry;
184185
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
185186
private final Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier;
186-
private final Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier;
187187
private final RecoverySettings recoverySettings;
188+
private final RemoteStoreSettings remoteStoreSettings;
188189

189190
public IndexService(
190191
IndexSettings indexSettings,
@@ -219,8 +220,8 @@ public IndexService(
219220
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
220221
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
221222
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
222-
Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier,
223-
RecoverySettings recoverySettings
223+
RecoverySettings recoverySettings,
224+
RemoteStoreSettings remoteStoreSettings
224225
) {
225226
super(indexSettings);
226227
this.allowExpensiveQueries = allowExpensiveQueries;
@@ -296,8 +297,8 @@ public IndexService(
296297
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
297298
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
298299
this.translogFactorySupplier = translogFactorySupplier;
299-
this.clusterRemoteTranslogBufferIntervalSupplier = clusterRemoteTranslogBufferIntervalSupplier;
300300
this.recoverySettings = recoverySettings;
301+
this.remoteStoreSettings = remoteStoreSettings;
301302
updateFsyncTaskIfNecessary();
302303
}
303304

@@ -549,9 +550,9 @@ public synchronized IndexShard createShard(
549550
this.indexSettings.isSegRepEnabledOrRemoteNode() ? checkpointPublisher : null,
550551
remoteStore,
551552
remoteStoreStatsTrackerFactory,
552-
clusterRemoteTranslogBufferIntervalSupplier,
553553
nodeEnv.nodeId(),
554554
recoverySettings,
555+
remoteStoreSettings,
555556
seedRemote
556557
);
557558
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");

server/src/main/java/org/opensearch/index/shard/IndexShard.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@
179179
import org.opensearch.index.warmer.WarmerStats;
180180
import org.opensearch.indices.IndexingMemoryController;
181181
import org.opensearch.indices.IndicesService;
182+
import org.opensearch.indices.RemoteStoreSettings;
182183
import org.opensearch.indices.cluster.IndicesClusterStateService;
183184
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
184185
import org.opensearch.indices.recovery.RecoveryFailedException;
@@ -349,6 +350,7 @@ Runnable getGlobalCheckpointSyncer() {
349350
private final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
350351
private final RemoteStoreFileDownloader fileDownloader;
351352
private final RecoverySettings recoverySettings;
353+
private final RemoteStoreSettings remoteStoreSettings;
352354
/*
353355
On source doc rep node, It will be DOCREP_NON_MIGRATING.
354356
On source remote node , it will be REMOTE_MIGRATING_SEEDED when relocating from remote node
@@ -381,9 +383,9 @@ public IndexShard(
381383
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
382384
@Nullable final Store remoteStore,
383385
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
384-
final Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier,
385386
final String nodeId,
386387
final RecoverySettings recoverySettings,
388+
final RemoteStoreSettings remoteStoreSettings,
387389
boolean seedRemote
388390
) throws IOException {
389391
super(shardRouting.shardId(), indexSettings);
@@ -405,7 +407,7 @@ public IndexShard(
405407
threadPool,
406408
this::getEngine,
407409
indexSettings.isRemoteNode(),
408-
() -> getRemoteTranslogUploadBufferInterval(clusterRemoteTranslogBufferIntervalSupplier)
410+
() -> getRemoteTranslogUploadBufferInterval(remoteStoreSettings::getClusterRemoteTranslogBufferInterval)
409411
);
410412
this.mapperService = mapperService;
411413
this.indexCache = indexCache;
@@ -481,6 +483,7 @@ public boolean shouldCache(Query query) {
481483
: mapperService.documentMapper().mappers().containsTimeStampField();
482484
this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory;
483485
this.recoverySettings = recoverySettings;
486+
this.remoteStoreSettings = remoteStoreSettings;
484487
this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings);
485488
this.shardMigrationState = getShardMigrationState(indexSettings, seedRemote);
486489
}
@@ -598,6 +601,10 @@ public RecoverySettings getRecoverySettings() {
598601
return recoverySettings;
599602
}
600603

604+
public RemoteStoreSettings getRemoteStoreSettings() {
605+
return remoteStoreSettings;
606+
}
607+
601608
public RemoteStoreFileDownloader getFileDownloader() {
602609
return fileDownloader;
603610
}

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ private boolean syncSegments() {
224224
// is considered as a first refresh post commit. A cleanup of stale commit files is triggered.
225225
// This is done to avoid delete post each refresh.
226226
if (isRefreshAfterCommit()) {
227-
remoteDirectory.deleteStaleSegmentsAsync(indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles());
227+
remoteDirectory.deleteStaleSegmentsAsync(indexShard.getRemoteStoreSettings().getMinRemoteSegmentMetadataFiles());
228228
}
229229

230230
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices;
10+
11+
import org.opensearch.common.settings.ClusterSettings;
12+
import org.opensearch.common.settings.Settings;
13+
14+
/**
15+
* Utility to provide a {@link RemoteStoreSettings} instance containing all defaults
16+
*
17+
* @opensearch.internal
18+
*/
19+
public final class DefaultRemoteStoreSettings {
20+
private DefaultRemoteStoreSettings() {}
21+
22+
public static final RemoteStoreSettings INSTANCE = new RemoteStoreSettings(
23+
Settings.EMPTY,
24+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
25+
);
26+
}

server/src/main/java/org/opensearch/indices/IndicesService.java

+8-27
Original file line numberDiff line numberDiff line change
@@ -249,17 +249,6 @@ public class IndicesService extends AbstractLifecycleComponent
249249
Property.Final
250250
);
251251

252-
/**
253-
* Used to specify the default translog buffer interval for remote store backed indexes.
254-
*/
255-
public static final Setting<TimeValue> CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING = Setting.timeSetting(
256-
"cluster.remote_store.translog.buffer_interval",
257-
IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL,
258-
IndexSettings.MINIMUM_REMOTE_TRANSLOG_BUFFER_INTERVAL,
259-
Property.NodeScope,
260-
Property.Dynamic
261-
);
262-
263252
/**
264253
* This setting is used to set the refresh interval when the {@code index.refresh_interval} index setting is not
265254
* provided during index creation or when the existing {@code index.refresh_interval} index setting is set as null.
@@ -366,7 +355,7 @@ public class IndicesService extends AbstractLifecycleComponent
366355
private volatile boolean idFieldDataEnabled;
367356
private volatile boolean allowExpensiveQueries;
368357
private final RecoverySettings recoverySettings;
369-
358+
private final RemoteStoreSettings remoteStoreSettings;
370359
@Nullable
371360
private final OpenSearchThreadPoolExecutor danglingIndicesThreadPoolExecutor;
372361
private final Set<Index> danglingIndicesToWrite = Sets.newConcurrentHashSet();
@@ -375,8 +364,6 @@ public class IndicesService extends AbstractLifecycleComponent
375364
private final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory;
376365
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
377366
private volatile TimeValue clusterDefaultRefreshInterval;
378-
private volatile TimeValue clusterRemoteTranslogBufferInterval;
379-
380367
private final SearchRequestStats searchRequestStats;
381368

382369
@Override
@@ -411,7 +398,8 @@ public IndicesService(
411398
SearchRequestStats searchRequestStats,
412399
@Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
413400
RecoverySettings recoverySettings,
414-
CacheService cacheService
401+
CacheService cacheService,
402+
RemoteStoreSettings remoteStoreSettings
415403
) {
416404
this.settings = settings;
417405
this.threadPool = threadPool;
@@ -515,10 +503,8 @@ protected void closeInternal() {
515503
this.clusterDefaultRefreshInterval = CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.get(clusterService.getSettings());
516504
clusterService.getClusterSettings()
517505
.addSettingsUpdateConsumer(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING, this::onRefreshIntervalUpdate);
518-
this.clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(clusterService.getSettings());
519-
clusterService.getClusterSettings()
520-
.addSettingsUpdateConsumer(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, this::setClusterRemoteTranslogBufferInterval);
521506
this.recoverySettings = recoverySettings;
507+
this.remoteStoreSettings = remoteStoreSettings;
522508
}
523509

524510
/**
@@ -923,8 +909,8 @@ private synchronized IndexService createIndexService(
923909
remoteDirectoryFactory,
924910
translogFactorySupplier,
925911
this::getClusterDefaultRefreshInterval,
926-
this::getClusterRemoteTranslogBufferInterval,
927-
this.recoverySettings
912+
this.recoverySettings,
913+
this.remoteStoreSettings
928914
);
929915
}
930916

@@ -2044,12 +2030,7 @@ private TimeValue getClusterDefaultRefreshInterval() {
20442030
return this.clusterDefaultRefreshInterval;
20452031
}
20462032

2047-
// Exclusively for testing, please do not use it elsewhere.
2048-
public TimeValue getClusterRemoteTranslogBufferInterval() {
2049-
return clusterRemoteTranslogBufferInterval;
2050-
}
2051-
2052-
private void setClusterRemoteTranslogBufferInterval(TimeValue clusterRemoteTranslogBufferInterval) {
2053-
this.clusterRemoteTranslogBufferInterval = clusterRemoteTranslogBufferInterval;
2033+
public RemoteStoreSettings getRemoteStoreSettings() {
2034+
return this.remoteStoreSettings;
20542035
}
20552036
}

0 commit comments

Comments
 (0)