Skip to content

Commit 9a87624

Browse files
Fix NPE on restore searchable snapshot (#13911)
Signed-off-by: panguixin <panguixin@bytedance.com> Signed-off-by: Andrew Ross <andrross@amazon.com> Co-authored-by: Andrew Ross <andrross@amazon.com>
1 parent bb84ca7 commit 9a87624

File tree

17 files changed

+145
-58
lines changed

17 files changed

+145
-58
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4848
- Fix ReplicaShardBatchAllocator to batch shards without duplicates ([#13710](https://github.com/opensearch-project/OpenSearch/pull/13710))
4949
- Don't return negative scores from `multi_match` query with `cross_fields` type ([#13829](https://github.com/opensearch-project/OpenSearch/pull/13829))
5050
- Pass parent filter to inner hit query ([#13903](https://github.com/opensearch-project/OpenSearch/pull/13903))
51+
- Fix NPE on restore searchable snapshot ([#13911](https://github.com/opensearch-project/OpenSearch/pull/13911))
5152

5253
### Security
5354

server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java

+51
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.opensearch.node.Node;
4848
import org.opensearch.repositories.fs.FsRepository;
4949
import org.hamcrest.MatcherAssert;
50+
import org.junit.After;
5051

5152
import java.io.IOException;
5253
import java.nio.file.Files;
@@ -62,6 +63,10 @@
6263

6364
import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.FS;
6465
import static org.opensearch.core.common.util.CollectionUtils.iterableAsArrayList;
66+
import static org.opensearch.index.store.remote.filecache.FileCacheSettings.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING;
67+
import static org.opensearch.test.NodeRoles.clusterManagerOnlyNode;
68+
import static org.opensearch.test.NodeRoles.dataNode;
69+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
6570
import static org.hamcrest.Matchers.contains;
6671
import static org.hamcrest.Matchers.containsString;
6772
import static org.hamcrest.Matchers.equalTo;
@@ -939,6 +944,52 @@ public void testRelocateSearchableSnapshotIndex() throws Exception {
939944
assertSearchableSnapshotIndexDirectoryExistence(searchNode2, index, false);
940945
}
941946

947+
public void testCreateSearchableSnapshotWithSpecifiedRemoteDataRatio() throws Exception {
948+
final String snapshotName = "test-snap";
949+
final String repoName = "test-repo";
950+
final String indexName1 = "test-idx-1";
951+
final String restoredIndexName1 = indexName1 + "-copy";
952+
final String indexName2 = "test-idx-2";
953+
final String restoredIndexName2 = indexName2 + "-copy";
954+
final int numReplicasIndex1 = 1;
955+
final int numReplicasIndex2 = 1;
956+
957+
Settings clusterManagerNodeSettings = clusterManagerOnlyNode();
958+
internalCluster().startNodes(2, clusterManagerNodeSettings);
959+
Settings dateNodeSettings = dataNode();
960+
internalCluster().startNodes(2, dateNodeSettings);
961+
createIndexWithDocsAndEnsureGreen(numReplicasIndex1, 100, indexName1);
962+
createIndexWithDocsAndEnsureGreen(numReplicasIndex2, 100, indexName2);
963+
964+
final Client client = client();
965+
assertAcked(
966+
client.admin()
967+
.cluster()
968+
.prepareUpdateSettings()
969+
.setTransientSettings(Settings.builder().put(DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.getKey(), 5))
970+
);
971+
972+
createRepositoryWithSettings(null, repoName);
973+
takeSnapshot(client, snapshotName, repoName, indexName1, indexName2);
974+
975+
internalCluster().ensureAtLeastNumSearchNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1);
976+
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
977+
978+
assertDocCount(restoredIndexName1, 100L);
979+
assertDocCount(restoredIndexName2, 100L);
980+
assertIndexDirectoryDoesNotExist(restoredIndexName1, restoredIndexName2);
981+
}
982+
983+
@After
984+
public void cleanup() throws Exception {
985+
assertAcked(
986+
client().admin()
987+
.cluster()
988+
.prepareUpdateSettings()
989+
.setTransientSettings(Settings.builder().putNull(DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.getKey()))
990+
);
991+
}
992+
942993
private void assertSearchableSnapshotIndexDirectoryExistence(String nodeName, Index index, boolean exists) throws Exception {
943994
final Node node = internalCluster().getInstance(Node.class, nodeName);
944995
final ShardId shardId = new ShardId(index, 0);

server/src/main/java/org/opensearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import org.opensearch.core.action.ActionListener;
5151
import org.opensearch.core.common.io.stream.StreamInput;
5252
import org.opensearch.core.index.Index;
53-
import org.opensearch.index.IndexModule;
5453
import org.opensearch.threadpool.ThreadPool;
5554
import org.opensearch.transport.TransportService;
5655

@@ -59,8 +58,6 @@
5958
import java.util.Set;
6059
import java.util.stream.Stream;
6160

62-
import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
63-
6461
/**
6562
* Transport action for updating index settings
6663
*
@@ -133,9 +130,7 @@ protected ClusterBlockException checkBlock(UpdateSettingsRequest request, Cluste
133130
for (Index index : requestIndices) {
134131
if (state.blocks().indexBlocked(ClusterBlockLevel.METADATA_WRITE, index.getName())) {
135132
allowSearchableSnapshotSettingsUpdate = allowSearchableSnapshotSettingsUpdate
136-
&& IndexModule.Type.REMOTE_SNAPSHOT.match(
137-
state.getMetadata().getIndexSafe(index).getSettings().get(INDEX_STORE_TYPE_SETTING.getKey())
138-
);
133+
&& state.getMetadata().getIndexSafe(index).isRemoteSnapshot();
139134
}
140135
}
141136
// check if all settings in the request are in the allow list

server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import org.opensearch.core.common.io.stream.StreamInput;
4343
import org.opensearch.core.common.io.stream.StreamOutput;
4444
import org.opensearch.core.rest.RestStatus;
45-
import org.opensearch.index.IndexModule;
4645

4746
import java.io.IOException;
4847
import java.util.Collections;
@@ -399,7 +398,7 @@ public Builder addBlocks(IndexMetadata indexMetadata) {
399398
if (IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING.get(indexMetadata.getSettings())) {
400399
addIndexBlock(indexName, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK);
401400
}
402-
if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) {
401+
if (indexMetadata.isRemoteSnapshot()) {
403402
addIndexBlock(indexName, IndexMetadata.REMOTE_READ_ONLY_ALLOW_DELETE);
404403
}
405404
return this;

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

+7
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.opensearch.core.xcontent.XContentBuilder;
6666
import org.opensearch.core.xcontent.XContentParser;
6767
import org.opensearch.gateway.MetadataStateFormat;
68+
import org.opensearch.index.IndexModule;
6869
import org.opensearch.index.mapper.MapperService;
6970
import org.opensearch.index.seqno.SequenceNumbers;
7071
import org.opensearch.indices.replication.common.ReplicationType;
@@ -683,6 +684,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
683684
private final ActiveShardCount waitForActiveShards;
684685
private final Map<String, RolloverInfo> rolloverInfos;
685686
private final boolean isSystem;
687+
private final boolean isRemoteSnapshot;
686688

687689
private IndexMetadata(
688690
final Index index,
@@ -743,6 +745,7 @@ private IndexMetadata(
743745
this.waitForActiveShards = waitForActiveShards;
744746
this.rolloverInfos = Collections.unmodifiableMap(rolloverInfos);
745747
this.isSystem = isSystem;
748+
this.isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings);
746749
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
747750
}
748751

@@ -1204,6 +1207,10 @@ public boolean isSystem() {
12041207
return isSystem;
12051208
}
12061209

1210+
public boolean isRemoteSnapshot() {
1211+
return isRemoteSnapshot;
1212+
}
1213+
12071214
public static Builder builder(String index) {
12081215
return new Builder(index);
12091216
}

server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.opensearch.common.settings.Settings;
4545
import org.opensearch.core.common.Strings;
4646
import org.opensearch.core.index.shard.ShardId;
47-
import org.opensearch.index.IndexModule;
4847
import org.opensearch.index.IndexNotFoundException;
4948
import org.opensearch.node.ResponseCollectorService;
5049

@@ -242,9 +241,7 @@ public GroupShardsIterator<ShardIterator> searchShards(
242241
final Set<ShardIterator> set = new HashSet<>(shards.size());
243242
for (IndexShardRoutingTable shard : shards) {
244243
IndexMetadata indexMetadataForShard = indexMetadata(clusterState, shard.shardId.getIndex().getName());
245-
if (IndexModule.Type.REMOTE_SNAPSHOT.match(
246-
indexMetadataForShard.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey())
247-
) && (preference == null || preference.isEmpty())) {
244+
if (indexMetadataForShard.isRemoteSnapshot() && (preference == null || preference.isEmpty())) {
248245
preference = Preference.PRIMARY.type();
249246
}
250247

server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java

+1-7
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
import org.opensearch.cluster.metadata.IndexMetadata;
1212
import org.opensearch.cluster.node.DiscoveryNode;
1313
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
14-
import org.opensearch.common.settings.Settings;
15-
import org.opensearch.index.IndexModule;
1614

1715
/**
1816
* {@link RoutingPool} defines the different node types based on the assigned capabilities. The methods
@@ -60,10 +58,6 @@ public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation all
6058
* @return {@link RoutingPool} for the given index.
6159
*/
6260
public static RoutingPool getIndexPool(IndexMetadata indexMetadata) {
63-
Settings indexSettings = indexMetadata.getSettings();
64-
if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexSettings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) {
65-
return REMOTE_CAPABLE;
66-
}
67-
return LOCAL_ONLY;
61+
return indexMetadata.isRemoteSnapshot() ? REMOTE_CAPABLE : LOCAL_ONLY;
6862
}
6963
}

server/src/main/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDecider.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.opensearch.core.common.unit.ByteSizeValue;
5555
import org.opensearch.core.index.Index;
5656
import org.opensearch.core.index.shard.ShardId;
57+
import org.opensearch.index.store.remote.filecache.FileCacheSettings;
5758
import org.opensearch.index.store.remote.filecache.FileCacheStats;
5859
import org.opensearch.snapshots.SnapshotShardSizeInfo;
5960

@@ -68,7 +69,6 @@
6869
import static org.opensearch.cluster.routing.RoutingPool.getShardPool;
6970
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
7071
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING;
71-
import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING;
7272

7373
/**
7474
* The {@link DiskThresholdDecider} checks that the node a shard is potentially
@@ -109,11 +109,13 @@ public class DiskThresholdDecider extends AllocationDecider {
109109

110110
private final DiskThresholdSettings diskThresholdSettings;
111111
private final boolean enableForSingleDataNode;
112+
private final FileCacheSettings fileCacheSettings;
112113

113114
public DiskThresholdDecider(Settings settings, ClusterSettings clusterSettings) {
114115
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
115116
assert Version.CURRENT.major < 9 : "remove enable_for_single_data_node in 9";
116117
this.enableForSingleDataNode = ENABLE_FOR_SINGLE_DATA_NODE.get(settings);
118+
this.fileCacheSettings = new FileCacheSettings(settings, clusterSettings);
117119
}
118120

119121
/**
@@ -179,6 +181,12 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
179181
The following block enables allocation for remote shards within safeguard limits of the filecache.
180182
*/
181183
if (REMOTE_CAPABLE.equals(getNodePool(node)) && REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
184+
final double dataToFileCacheSizeRatio = fileCacheSettings.getRemoteDataRatio();
185+
// we don't need to check the ratio
186+
if (dataToFileCacheSizeRatio <= 0.1f) {
187+
return Decision.YES;
188+
}
189+
182190
final List<ShardRouting> remoteShardsOnNode = StreamSupport.stream(node.spliterator(), false)
183191
.filter(shard -> shard.primary() && REMOTE_CAPABLE.equals(getShardPool(shard, allocation)))
184192
.collect(Collectors.toList());
@@ -199,7 +207,6 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
199207
final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
200208
final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0;
201209
final long totalNodeRemoteShardSize = currentNodeRemoteShardSize + shardSize;
202-
final double dataToFileCacheSizeRatio = DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.get(allocation.metadata().settings());
203210
if (dataToFileCacheSizeRatio > 0.0f && totalNodeRemoteShardSize > dataToFileCacheSizeRatio * nodeCacheSize) {
204211
return allocation.decision(
205212
Decision.NO,
@@ -208,6 +215,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
208215
);
209216
}
210217
return Decision.YES;
218+
} else if (REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
219+
return Decision.NO;
211220
}
212221

213222
Map<String, DiskUsage> usages = clusterInfo.getNodeMostAvailableDiskUsages();

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@
116116
import org.opensearch.index.ShardIndexingPressureStore;
117117
import org.opensearch.index.remote.RemoteStorePressureSettings;
118118
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
119-
import org.opensearch.index.store.remote.filecache.FileCache;
119+
import org.opensearch.index.store.remote.filecache.FileCacheSettings;
120120
import org.opensearch.indices.IndexingMemoryController;
121121
import org.opensearch.indices.IndicesQueryCache;
122122
import org.opensearch.indices.IndicesRequestCache;
@@ -689,7 +689,7 @@ public void apply(Settings value, Settings current, Settings previous) {
689689

690690
// Settings related to Searchable Snapshots
691691
Node.NODE_SEARCH_CACHE_SIZE_SETTING,
692-
FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING,
692+
FileCacheSettings.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING,
693693

694694
// Settings related to Remote Refresh Segment Pressure
695695
RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED,

server/src/main/java/org/opensearch/index/IndexSettings.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -728,7 +728,6 @@ public static IndexMergePolicy fromString(String text) {
728728
private volatile TimeValue remoteTranslogUploadBufferInterval;
729729
private final String remoteStoreTranslogRepository;
730730
private final String remoteStoreRepository;
731-
private final boolean isRemoteSnapshot;
732731
private int remoteTranslogKeepExtraGen;
733732
private Version extendedCompatibilitySnapshotVersion;
734733

@@ -919,9 +918,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
919918
remoteTranslogUploadBufferInterval = INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings);
920919
remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY);
921920
this.remoteTranslogKeepExtraGen = INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.get(settings);
922-
isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings);
923921

924-
if (isRemoteSnapshot && FeatureFlags.isEnabled(SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) {
922+
if (isRemoteSnapshot() && FeatureFlags.isEnabled(SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) {
925923
extendedCompatibilitySnapshotVersion = SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION;
926924
} else {
927925
extendedCompatibilitySnapshotVersion = Version.CURRENT.minimumIndexCompatibilityVersion();
@@ -1278,7 +1276,7 @@ public String getRemoteStoreTranslogRepository() {
12781276
* Returns true if this is remote/searchable snapshot
12791277
*/
12801278
public boolean isRemoteSnapshot() {
1281-
return isRemoteSnapshot;
1279+
return indexMetadata.isRemoteSnapshot();
12821280
}
12831281

12841282
/**

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java

-16
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
import org.apache.lucene.store.IndexInput;
1212
import org.opensearch.common.annotation.PublicApi;
13-
import org.opensearch.common.settings.Setting;
1413
import org.opensearch.core.common.breaker.CircuitBreaker;
1514
import org.opensearch.core.common.breaker.CircuitBreakingException;
1615
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
@@ -52,21 +51,6 @@ public class FileCache implements RefCountedCache<Path, CachedIndexInput> {
5251

5352
private final CircuitBreaker circuitBreaker;
5453

55-
/**
56-
* Defines a limit of how much total remote data can be referenced as a ratio of the size of the disk reserved for
57-
* the file cache. For example, if 100GB disk space is configured for use as a file cache and the
58-
* remote_data_ratio of 5 is defined, then a total of 500GB of remote data can be loaded as searchable snapshots.
59-
* This is designed to be a safeguard to prevent oversubscribing a cluster.
60-
* Specify a value of zero for no limit, which is the default for compatibility reasons.
61-
*/
62-
public static final Setting<Double> DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING = Setting.doubleSetting(
63-
"cluster.filecache.remote_data_ratio",
64-
0.0,
65-
0.0,
66-
Setting.Property.NodeScope,
67-
Setting.Property.Dynamic
68-
);
69-
7054
public FileCache(SegmentedCache<Path, CachedIndexInput> cache, CircuitBreaker circuitBreaker) {
7155
this.theCache = cache;
7256
this.circuitBreaker = circuitBreaker;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.store.remote.filecache;
10+
11+
import org.opensearch.common.settings.ClusterSettings;
12+
import org.opensearch.common.settings.Setting;
13+
import org.opensearch.common.settings.Settings;
14+
15+
/**
16+
* Settings relate to file cache
17+
*
18+
* @opensearch.internal
19+
*/
20+
public class FileCacheSettings {
21+
/**
22+
* Defines a limit of how much total remote data can be referenced as a ratio of the size of the disk reserved for
23+
* the file cache. For example, if 100GB disk space is configured for use as a file cache and the
24+
* remote_data_ratio of 5 is defined, then a total of 500GB of remote data can be loaded as searchable snapshots.
25+
* This is designed to be a safeguard to prevent oversubscribing a cluster.
26+
* Specify a value of zero for no limit, which is the default for compatibility reasons.
27+
*/
28+
public static final Setting<Double> DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING = Setting.doubleSetting(
29+
"cluster.filecache.remote_data_ratio",
30+
0.0,
31+
0.0,
32+
Setting.Property.NodeScope,
33+
Setting.Property.Dynamic
34+
);
35+
36+
private volatile double remoteDataRatio;
37+
38+
public FileCacheSettings(Settings settings, ClusterSettings clusterSettings) {
39+
setRemoteDataRatio(DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.get(settings));
40+
clusterSettings.addSettingsUpdateConsumer(DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING, this::setRemoteDataRatio);
41+
}
42+
43+
public void setRemoteDataRatio(double remoteDataRatio) {
44+
this.remoteDataRatio = remoteDataRatio;
45+
}
46+
47+
public double getRemoteDataRatio() {
48+
return remoteDataRatio;
49+
}
50+
}

0 commit comments

Comments
 (0)