Skip to content

Commit 270e81f

Browse files
authored
[Remote Store] Add RemoteStoreSettings class to handle remote store r… (#12962)
Signed-off-by: Sachin Kale <kalsac@amazon.com>
1 parent 28c3dec commit 270e81f

19 files changed

+250
-137
lines changed

server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import org.opensearch.index.translog.TestTranslog;
8585
import org.opensearch.index.translog.Translog;
8686
import org.opensearch.index.translog.TranslogStats;
87+
import org.opensearch.indices.DefaultRemoteStoreSettings;
8788
import org.opensearch.indices.IndicesService;
8889
import org.opensearch.indices.recovery.RecoveryState;
8990
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
@@ -688,9 +689,9 @@ public static final IndexShard newIndexShard(
688689
SegmentReplicationCheckpointPublisher.EMPTY,
689690
null,
690691
null,
691-
() -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL,
692692
nodeId,
693693
null,
694+
DefaultRemoteStoreSettings.INSTANCE,
694695
false
695696
);
696697
}

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.opensearch.index.shard.IndexShardClosedException;
3232
import org.opensearch.index.translog.Translog.Durability;
3333
import org.opensearch.indices.IndicesService;
34+
import org.opensearch.indices.RemoteStoreSettings;
3435
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
3536
import org.opensearch.indices.recovery.RecoverySettings;
3637
import org.opensearch.indices.recovery.RecoveryState;
@@ -56,7 +57,7 @@
5657

5758
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
5859
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
59-
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
60+
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
6061
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
6162
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
6263
import static org.hamcrest.Matchers.comparesEqualTo;
@@ -189,7 +190,7 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
189190
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");
190191

191192
IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME);
192-
int lastNMetadataFilesToKeep = indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles();
193+
int lastNMetadataFilesToKeep = indexShard.getRemoteStoreSettings().getMinRemoteSegmentMetadataFiles();
193194
// Delete is async.
194195
assertBusy(() -> {
195196
int actualFileCount = getFileCount(indexPath);
@@ -224,7 +225,7 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
224225

225226
public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception {
226227
Settings.Builder settings = Settings.builder()
227-
.put(RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "3");
228+
.put(RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "3");
228229
internalCluster().startNode(settings);
229230

230231
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
@@ -243,7 +244,7 @@ public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception {
243244

244245
public void testStaleCommitDeletionWithMinSegmentFiles_Disabled() throws Exception {
245246
Settings.Builder settings = Settings.builder()
246-
.put(RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "-1");
247+
.put(RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "-1");
247248
internalCluster().startNode(settings);
248249

249250
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
@@ -469,7 +470,7 @@ public void testAsyncDurabilityThrowsExceptionWhenRestrictSettingTrue() throws E
469470

470471
private void assertClusterRemoteBufferInterval(TimeValue expectedBufferInterval, String dataNode) {
471472
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode);
472-
assertEquals(expectedBufferInterval, indicesService.getClusterRemoteTranslogBufferInterval());
473+
assertEquals(expectedBufferInterval, indicesService.getRemoteStoreSettings().getClusterRemoteTranslogBufferInterval());
473474
}
474475

475476
private void assertBufferInterval(TimeValue expectedBufferInterval, IndexShard indexShard) {

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@
118118
import org.opensearch.indices.IndicesQueryCache;
119119
import org.opensearch.indices.IndicesRequestCache;
120120
import org.opensearch.indices.IndicesService;
121+
import org.opensearch.indices.RemoteStoreSettings;
121122
import org.opensearch.indices.ShardLimitValidator;
122123
import org.opensearch.indices.analysis.HunspellService;
123124
import org.opensearch.indices.breaker.BreakerSettings;
@@ -298,7 +299,6 @@ public void apply(Settings value, Settings current, Settings previous) {
298299
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
299300
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING,
300301
RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT,
301-
RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
302302
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
303303
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING,
304304
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
@@ -708,23 +708,25 @@ public void apply(Settings value, Settings current, Settings previous) {
708708
RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
709709
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
710710
RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING,
711-
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
711+
712712
IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING,
713713
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
714714

715715
// Concurrent segment search settings
716716
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING,
717717
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING,
718-
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
718+
719719
AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE,
720720
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
721721
CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT,
722722
CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT,
723723
CpuBasedAdmissionControllerSettings.CLUSTER_ADMIN_CPU_USAGE_LIMIT,
724724
IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
725725
IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT,
726-
IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT
726+
IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT,
727727

728+
RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
729+
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING
728730
)
729731
)
730732
);

server/src/main/java/org/opensearch/index/IndexModule.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.opensearch.index.store.remote.filecache.FileCache;
8080
import org.opensearch.index.translog.TranslogFactory;
8181
import org.opensearch.indices.IndicesQueryCache;
82+
import org.opensearch.indices.RemoteStoreSettings;
8283
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
8384
import org.opensearch.indices.mapper.MapperRegistry;
8485
import org.opensearch.indices.recovery.RecoverySettings;
@@ -604,8 +605,8 @@ public IndexService newIndexService(
604605
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
605606
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
606607
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
607-
Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier,
608-
RecoverySettings recoverySettings
608+
RecoverySettings recoverySettings,
609+
RemoteStoreSettings remoteStoreSettings
609610
) throws IOException {
610611
final IndexEventListener eventListener = freeze();
611612
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
@@ -663,8 +664,8 @@ public IndexService newIndexService(
663664
recoveryStateFactory,
664665
translogFactorySupplier,
665666
clusterDefaultRefreshIntervalSupplier,
666-
clusterRemoteTranslogBufferIntervalSupplier,
667-
recoverySettings
667+
recoverySettings,
668+
remoteStoreSettings
668669
);
669670
success = true;
670671
return indexService;

server/src/main/java/org/opensearch/index/IndexService.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
import org.opensearch.index.store.Store;
9595
import org.opensearch.index.translog.Translog;
9696
import org.opensearch.index.translog.TranslogFactory;
97+
import org.opensearch.indices.RemoteStoreSettings;
9798
import org.opensearch.indices.cluster.IndicesClusterStateService;
9899
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
99100
import org.opensearch.indices.mapper.MapperRegistry;
@@ -183,8 +184,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
183184
private final ValuesSourceRegistry valuesSourceRegistry;
184185
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
185186
private final Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier;
186-
private final Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier;
187187
private final RecoverySettings recoverySettings;
188+
private final RemoteStoreSettings remoteStoreSettings;
188189

189190
public IndexService(
190191
IndexSettings indexSettings,
@@ -219,8 +220,8 @@ public IndexService(
219220
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
220221
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
221222
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
222-
Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier,
223-
RecoverySettings recoverySettings
223+
RecoverySettings recoverySettings,
224+
RemoteStoreSettings remoteStoreSettings
224225
) {
225226
super(indexSettings);
226227
this.allowExpensiveQueries = allowExpensiveQueries;
@@ -296,8 +297,8 @@ public IndexService(
296297
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
297298
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
298299
this.translogFactorySupplier = translogFactorySupplier;
299-
this.clusterRemoteTranslogBufferIntervalSupplier = clusterRemoteTranslogBufferIntervalSupplier;
300300
this.recoverySettings = recoverySettings;
301+
this.remoteStoreSettings = remoteStoreSettings;
301302
updateFsyncTaskIfNecessary();
302303
}
303304

@@ -548,9 +549,9 @@ public synchronized IndexShard createShard(
548549
this.indexSettings.isSegRepEnabledOrRemoteNode() ? checkpointPublisher : null,
549550
remoteStore,
550551
remoteStoreStatsTrackerFactory,
551-
clusterRemoteTranslogBufferIntervalSupplier,
552552
nodeEnv.nodeId(),
553553
recoverySettings,
554+
remoteStoreSettings,
554555
seedRemote
555556
);
556557
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");

server/src/main/java/org/opensearch/index/shard/IndexShard.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@
180180
import org.opensearch.index.warmer.WarmerStats;
181181
import org.opensearch.indices.IndexingMemoryController;
182182
import org.opensearch.indices.IndicesService;
183+
import org.opensearch.indices.RemoteStoreSettings;
183184
import org.opensearch.indices.cluster.IndicesClusterStateService;
184185
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
185186
import org.opensearch.indices.recovery.RecoveryFailedException;
@@ -351,6 +352,7 @@ Runnable getGlobalCheckpointSyncer() {
351352
private final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
352353
private final RemoteStoreFileDownloader fileDownloader;
353354
private final RecoverySettings recoverySettings;
355+
private final RemoteStoreSettings remoteStoreSettings;
354356
/*
355357
On source doc rep node, It will be DOCREP_NON_MIGRATING.
356358
On source remote node , it will be REMOTE_MIGRATING_SEEDED when relocating from remote node
@@ -383,9 +385,9 @@ public IndexShard(
383385
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
384386
@Nullable final Store remoteStore,
385387
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
386-
final Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier,
387388
final String nodeId,
388389
final RecoverySettings recoverySettings,
390+
final RemoteStoreSettings remoteStoreSettings,
389391
boolean seedRemote
390392
) throws IOException {
391393
super(shardRouting.shardId(), indexSettings);
@@ -407,7 +409,7 @@ public IndexShard(
407409
threadPool,
408410
this::getEngine,
409411
indexSettings.isRemoteNode(),
410-
() -> getRemoteTranslogUploadBufferInterval(clusterRemoteTranslogBufferIntervalSupplier)
412+
() -> getRemoteTranslogUploadBufferInterval(remoteStoreSettings::getClusterRemoteTranslogBufferInterval)
411413
);
412414
this.mapperService = mapperService;
413415
this.indexCache = indexCache;
@@ -483,6 +485,7 @@ public boolean shouldCache(Query query) {
483485
: mapperService.documentMapper().mappers().containsTimeStampField();
484486
this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory;
485487
this.recoverySettings = recoverySettings;
488+
this.remoteStoreSettings = remoteStoreSettings;
486489
this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings);
487490
this.shardMigrationState = getShardMigrationState(indexSettings, seedRemote);
488491
}
@@ -600,6 +603,10 @@ public RecoverySettings getRecoverySettings() {
600603
return recoverySettings;
601604
}
602605

606+
public RemoteStoreSettings getRemoteStoreSettings() {
607+
return remoteStoreSettings;
608+
}
609+
603610
public RemoteStoreFileDownloader getFileDownloader() {
604611
return fileDownloader;
605612
}

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ private boolean syncSegments() {
224224
// is considered as a first refresh post commit. A cleanup of stale commit files is triggered.
225225
// This is done to avoid delete post each refresh.
226226
if (isRefreshAfterCommit()) {
227-
remoteDirectory.deleteStaleSegmentsAsync(indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles());
227+
remoteDirectory.deleteStaleSegmentsAsync(indexShard.getRemoteStoreSettings().getMinRemoteSegmentMetadataFiles());
228228
}
229229

230230
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices;
10+
11+
import org.opensearch.common.settings.ClusterSettings;
12+
import org.opensearch.common.settings.Settings;
13+
14+
/**
15+
* Utility to provide a {@link RemoteStoreSettings} instance containing all defaults
16+
*
17+
* @opensearch.internal
18+
*/
19+
public final class DefaultRemoteStoreSettings {
20+
private DefaultRemoteStoreSettings() {}
21+
22+
public static final RemoteStoreSettings INSTANCE = new RemoteStoreSettings(
23+
Settings.EMPTY,
24+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
25+
);
26+
}

server/src/main/java/org/opensearch/indices/IndicesService.java

+8-27
Original file line numberDiff line numberDiff line change
@@ -250,17 +250,6 @@ public class IndicesService extends AbstractLifecycleComponent
250250
Property.Final
251251
);
252252

253-
/**
254-
* Used to specify the default translog buffer interval for remote store backed indexes.
255-
*/
256-
public static final Setting<TimeValue> CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING = Setting.timeSetting(
257-
"cluster.remote_store.translog.buffer_interval",
258-
IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL,
259-
IndexSettings.MINIMUM_REMOTE_TRANSLOG_BUFFER_INTERVAL,
260-
Property.NodeScope,
261-
Property.Dynamic
262-
);
263-
264253
/**
265254
* This setting is used to set the refresh interval when the {@code index.refresh_interval} index setting is not
266255
* provided during index creation or when the existing {@code index.refresh_interval} index setting is set as null.
@@ -355,7 +344,7 @@ public class IndicesService extends AbstractLifecycleComponent
355344
private volatile boolean idFieldDataEnabled;
356345
private volatile boolean allowExpensiveQueries;
357346
private final RecoverySettings recoverySettings;
358-
347+
private final RemoteStoreSettings remoteStoreSettings;
359348
@Nullable
360349
private final OpenSearchThreadPoolExecutor danglingIndicesThreadPoolExecutor;
361350
private final Set<Index> danglingIndicesToWrite = Sets.newConcurrentHashSet();
@@ -364,8 +353,6 @@ public class IndicesService extends AbstractLifecycleComponent
364353
private final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory;
365354
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
366355
private volatile TimeValue clusterDefaultRefreshInterval;
367-
private volatile TimeValue clusterRemoteTranslogBufferInterval;
368-
369356
private final SearchRequestStats searchRequestStats;
370357

371358
@Override
@@ -400,7 +387,8 @@ public IndicesService(
400387
SearchRequestStats searchRequestStats,
401388
@Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
402389
RecoverySettings recoverySettings,
403-
CacheService cacheService
390+
CacheService cacheService,
391+
RemoteStoreSettings remoteStoreSettings
404392
) {
405393
this.settings = settings;
406394
this.threadPool = threadPool;
@@ -504,10 +492,8 @@ protected void closeInternal() {
504492
this.clusterDefaultRefreshInterval = CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.get(clusterService.getSettings());
505493
clusterService.getClusterSettings()
506494
.addSettingsUpdateConsumer(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING, this::onRefreshIntervalUpdate);
507-
this.clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(clusterService.getSettings());
508-
clusterService.getClusterSettings()
509-
.addSettingsUpdateConsumer(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, this::setClusterRemoteTranslogBufferInterval);
510495
this.recoverySettings = recoverySettings;
496+
this.remoteStoreSettings = remoteStoreSettings;
511497
}
512498

513499
/**
@@ -913,8 +899,8 @@ private synchronized IndexService createIndexService(
913899
remoteDirectoryFactory,
914900
translogFactorySupplier,
915901
this::getClusterDefaultRefreshInterval,
916-
this::getClusterRemoteTranslogBufferInterval,
917-
this.recoverySettings
902+
this.recoverySettings,
903+
this.remoteStoreSettings
918904
);
919905
}
920906

@@ -2034,12 +2020,7 @@ private TimeValue getClusterDefaultRefreshInterval() {
20342020
return this.clusterDefaultRefreshInterval;
20352021
}
20362022

2037-
// Exclusively for testing, please do not use it elsewhere.
2038-
public TimeValue getClusterRemoteTranslogBufferInterval() {
2039-
return clusterRemoteTranslogBufferInterval;
2040-
}
2041-
2042-
private void setClusterRemoteTranslogBufferInterval(TimeValue clusterRemoteTranslogBufferInterval) {
2043-
this.clusterRemoteTranslogBufferInterval = clusterRemoteTranslogBufferInterval;
2023+
public RemoteStoreSettings getRemoteStoreSettings() {
2024+
return this.remoteStoreSettings;
20442025
}
20452026
}

0 commit comments

Comments
 (0)