Skip to content

Commit 330b249

Browse files
authored
Add restriction to have a single repository with shallow snapshot v2 setting (opensearch-project#15901)
* Add restriction to have a single repository with shallow snapshot v2 setting Signed-off-by: Sachin Kale <sachinpkale@gmail.com> * Do not allow shallow snapshot v2 repo name to contain SNAPSHOT_PINNED_TIMESTAMP_DELIMITER Signed-off-by: Sachin Kale <sachinpkale@gmail.com> --------- Signed-off-by: Sachin Kale <sachinpkale@gmail.com>
1 parent 12ff5ed commit 330b249

File tree

3 files changed

+228
-6
lines changed

3 files changed

+228
-6
lines changed

server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java

+27-4
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import java.io.ByteArrayInputStream;
3131
import java.io.Closeable;
3232
import java.io.IOException;
33+
import java.util.HashMap;
34+
import java.util.HashSet;
3335
import java.util.List;
3436
import java.util.Locale;
3537
import java.util.Map;
@@ -75,25 +77,46 @@ public RemoteStorePinnedTimestampService(
7577
* and starts the asynchronous update task.
7678
*/
7779
public void start() {
78-
validateRemoteStoreConfiguration();
80+
blobContainer = validateAndCreateBlobContainer(settings, repositoriesService.get());
7981
startAsyncUpdateTask(RemoteStoreSettings.getPinnedTimestampsSchedulerInterval());
8082
}
8183

82-
private void validateRemoteStoreConfiguration() {
84+
private static BlobContainer validateAndCreateBlobContainer(Settings settings, RepositoriesService repositoriesService) {
8385
final String remoteStoreRepo = settings.get(
8486
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY
8587
);
8688
assert remoteStoreRepo != null : "Remote Segment Store repository is not configured";
87-
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
89+
final Repository repository = repositoriesService.repository(remoteStoreRepo);
8890
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
8991
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
90-
blobContainer = blobStoreRepository.blobStore().blobContainer(blobStoreRepository.basePath().add(PINNED_TIMESTAMPS_PATH_TOKEN));
92+
return blobStoreRepository.blobStore().blobContainer(blobStoreRepository.basePath().add(PINNED_TIMESTAMPS_PATH_TOKEN));
9193
}
9294

9395
private void startAsyncUpdateTask(TimeValue pinnedTimestampsSchedulerInterval) {
9496
asyncUpdatePinnedTimestampTask = new AsyncUpdatePinnedTimestampTask(logger, threadPool, pinnedTimestampsSchedulerInterval, true);
9597
}
9698

99+
public static Map<String, Set<Long>> fetchPinnedTimestamps(Settings settings, RepositoriesService repositoriesService)
100+
throws IOException {
101+
BlobContainer blobContainer = validateAndCreateBlobContainer(settings, repositoriesService);
102+
Set<String> pinnedTimestamps = blobContainer.listBlobs().keySet();
103+
Map<String, Set<Long>> pinningEntityTimestampMap = new HashMap<>();
104+
for (String pinnedTimestamp : pinnedTimestamps) {
105+
try {
106+
String[] tokens = pinnedTimestamp.split(PINNED_TIMESTAMPS_FILENAME_SEPARATOR);
107+
Long timestamp = Long.parseLong(tokens[tokens.length - 1]);
108+
String pinningEntity = pinnedTimestamp.substring(0, pinnedTimestamp.lastIndexOf(PINNED_TIMESTAMPS_FILENAME_SEPARATOR));
109+
if (pinningEntityTimestampMap.containsKey(pinningEntity) == false) {
110+
pinningEntityTimestampMap.put(pinningEntity, new HashSet<>());
111+
}
112+
pinningEntityTimestampMap.get(pinningEntity).add(timestamp);
113+
} catch (NumberFormatException e) {
114+
logger.error("Exception while parsing pinned timestamp from {}, skipping this entry", pinnedTimestamp);
115+
}
116+
}
117+
return pinningEntityTimestampMap;
118+
}
119+
97120
/**
98121
* Pins a timestamp in the remote store.
99122
*

server/src/main/java/org/opensearch/repositories/RepositoriesService.java

+77-2
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@
6868
import org.opensearch.common.util.io.IOUtils;
6969
import org.opensearch.core.action.ActionListener;
7070
import org.opensearch.core.common.Strings;
71+
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
7172
import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository;
73+
import org.opensearch.snapshots.SnapshotsService;
7274
import org.opensearch.threadpool.ThreadPool;
7375
import org.opensearch.transport.TransportService;
7476

@@ -84,6 +86,7 @@
8486
import java.util.stream.Stream;
8587

8688
import static org.opensearch.repositories.blobstore.BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY;
89+
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SHALLOW_SNAPSHOT_V2;
8790
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;
8891

8992
/**
@@ -123,6 +126,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
123126
private final RepositoriesStatsArchive repositoriesStatsArchive;
124127
private final ClusterManagerTaskThrottler.ThrottlingKey putRepositoryTaskKey;
125128
private final ClusterManagerTaskThrottler.ThrottlingKey deleteRepositoryTaskKey;
129+
private final Settings settings;
126130

127131
public RepositoriesService(
128132
Settings settings,
@@ -132,6 +136,7 @@ public RepositoriesService(
132136
Map<String, Repository.Factory> internalTypesRegistry,
133137
ThreadPool threadPool
134138
) {
139+
this.settings = settings;
135140
this.typesRegistry = typesRegistry;
136141
this.internalTypesRegistry = internalTypesRegistry;
137142
this.clusterService = clusterService;
@@ -173,7 +178,7 @@ public void registerOrUpdateRepository(final PutRepositoryRequest request, final
173178
CryptoMetadata.fromRequest(request.cryptoSettings())
174179
);
175180
validate(request.name());
176-
validateRepositoryMetadataSettings(clusterService, request.name(), request.settings());
181+
validateRepositoryMetadataSettings(clusterService, request.name(), request.settings(), repositories, settings, this);
177182
if (newRepositoryMetadata.cryptoMetadata() != null) {
178183
validate(newRepositoryMetadata.cryptoMetadata().keyProviderName());
179184
}
@@ -684,7 +689,10 @@ public static void validate(final String identifier) {
684689
public static void validateRepositoryMetadataSettings(
685690
ClusterService clusterService,
686691
final String repositoryName,
687-
final Settings repositoryMetadataSettings
692+
final Settings repositoryMetadataSettings,
693+
Map<String, Repository> repositories,
694+
Settings settings,
695+
RepositoriesService repositoriesService
688696
) {
689697
// We can add more validations here for repository settings in the future.
690698
Version minVersionInCluster = clusterService.state().getNodes().getMinNodeVersion();
@@ -699,6 +707,51 @@ public static void validateRepositoryMetadataSettings(
699707
+ minVersionInCluster
700708
);
701709
}
710+
if (SHALLOW_SNAPSHOT_V2.get(repositoryMetadataSettings)) {
711+
if (minVersionInCluster.onOrAfter(Version.V_2_17_0) == false) {
712+
throw new RepositoryException(
713+
repositoryName,
714+
"setting "
715+
+ SHALLOW_SNAPSHOT_V2.getKey()
716+
+ " cannot be enabled as some of the nodes in cluster are on version older than "
717+
+ Version.V_2_17_0
718+
+ ". Minimum node version in cluster is: "
719+
+ minVersionInCluster
720+
);
721+
}
722+
if (repositoryName.contains(SnapshotsService.SNAPSHOT_PINNED_TIMESTAMP_DELIMITER)) {
723+
throw new RepositoryException(
724+
repositoryName,
725+
"setting "
726+
+ SHALLOW_SNAPSHOT_V2.getKey()
727+
+ " cannot be enabled for repository with "
728+
+ SnapshotsService.SNAPSHOT_PINNED_TIMESTAMP_DELIMITER
729+
+ " in the name as this delimiter is used to create pinning entity"
730+
);
731+
}
732+
if (repositoryWithShallowV2Exists(repositories)) {
733+
throw new RepositoryException(
734+
repositoryName,
735+
"setting "
736+
+ SHALLOW_SNAPSHOT_V2.getKey()
737+
+ " cannot be enabled as this setting can be enabled only on one repository "
738+
+ " and one or more repositories in the cluster have the setting as enabled"
739+
);
740+
}
741+
try {
742+
if (pinnedTimestampExistsWithDifferentRepository(repositoryName, settings, repositoriesService)) {
743+
throw new RepositoryException(
744+
repositoryName,
745+
"setting "
746+
+ SHALLOW_SNAPSHOT_V2.getKey()
747+
+ " cannot be enabled if there are existing snapshots created with shallow V2 "
748+
+ "setting using different repository."
749+
);
750+
}
751+
} catch (IOException e) {
752+
throw new RepositoryException(repositoryName, "Exception while fetching pinned timestamp details");
753+
}
754+
}
702755
// Validation to not allow users to create system repository via put repository call.
703756
if (isSystemRepositorySettingPresent(repositoryMetadataSettings)) {
704757
throw new RepositoryException(
@@ -710,6 +763,28 @@ public static void validateRepositoryMetadataSettings(
710763
}
711764
}
712765

766+
private static boolean repositoryWithShallowV2Exists(Map<String, Repository> repositories) {
767+
return repositories.values().stream().anyMatch(repo -> SHALLOW_SNAPSHOT_V2.get(repo.getMetadata().settings()));
768+
}
769+
770+
private static boolean pinnedTimestampExistsWithDifferentRepository(
771+
String newRepoName,
772+
Settings settings,
773+
RepositoriesService repositoriesService
774+
) throws IOException {
775+
Map<String, Set<Long>> pinningEntityTimestampMap = RemoteStorePinnedTimestampService.fetchPinnedTimestamps(
776+
settings,
777+
repositoriesService
778+
);
779+
for (String pinningEntity : pinningEntityTimestampMap.keySet()) {
780+
String repoNameWithPinnedTimestamps = pinningEntity.split(SnapshotsService.SNAPSHOT_PINNED_TIMESTAMP_DELIMITER)[0];
781+
if (repoNameWithPinnedTimestamps.equals(newRepoName) == false) {
782+
return true;
783+
}
784+
}
785+
return false;
786+
}
787+
713788
private static void ensureRepositoryNotInUse(ClusterState clusterState, String repository) {
714789
if (isRepositoryInUse(clusterState, repository)) {
715790
throw new IllegalStateException("trying to modify or unregister repository that is currently used");

server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRemoteIndexTests.java

+124
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
package org.opensearch.repositories.blobstore;
3434

3535
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
36+
import org.opensearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
37+
import org.opensearch.action.support.master.AcknowledgedResponse;
3638
import org.opensearch.client.Client;
3739
import org.opensearch.cluster.metadata.RepositoryMetadata;
3840
import org.opensearch.common.settings.Settings;
@@ -41,13 +43,16 @@
4143
import org.opensearch.gateway.remote.RemoteClusterStateService;
4244
import org.opensearch.index.IndexSettings;
4345
import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot;
46+
import org.opensearch.indices.RemoteStoreSettings;
4447
import org.opensearch.indices.replication.common.ReplicationType;
4548
import org.opensearch.repositories.IndexId;
4649
import org.opensearch.repositories.RepositoriesService;
4750
import org.opensearch.repositories.RepositoryData;
51+
import org.opensearch.repositories.RepositoryException;
4852
import org.opensearch.repositories.fs.FsRepository;
4953
import org.opensearch.snapshots.SnapshotId;
5054
import org.opensearch.snapshots.SnapshotInfo;
55+
import org.opensearch.snapshots.SnapshotsService;
5156
import org.opensearch.test.OpenSearchIntegTestCase;
5257

5358
import java.io.IOException;
@@ -64,6 +69,9 @@
6469
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
6570
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
6671
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
72+
import static org.opensearch.repositories.blobstore.BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY;
73+
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SHALLOW_SNAPSHOT_V2;
74+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
6775
import static org.hamcrest.Matchers.equalTo;
6876

6977
/**
@@ -81,6 +89,7 @@ protected Settings nodeSettings() {
8189
.put(Environment.PATH_HOME_SETTING.getKey(), tempDir)
8290
.put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo"))
8391
.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), tempDir.getParent())
92+
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
8493
.build();
8594
}
8695

@@ -373,4 +382,119 @@ public void testRetrieveShallowCopySnapshotCase2() throws IOException {
373382
assertThat(snapshotIds, equalTo(originalSnapshots));
374383
}
375384

385+
public void testRepositoryCreationShallowV2() throws Exception {
386+
Client client = client();
387+
388+
Settings snapshotRepoSettings1 = Settings.builder()
389+
.put(node().settings())
390+
.put("location", OpenSearchIntegTestCase.randomRepoPath(node().settings()))
391+
.put(REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
392+
.put(SHALLOW_SNAPSHOT_V2.getKey(), true)
393+
.build();
394+
395+
String invalidRepoName = "test" + SnapshotsService.SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + "repo-1";
396+
try {
397+
createRepository(client, invalidRepoName, snapshotRepoSettings1);
398+
} catch (RepositoryException e) {
399+
assertEquals(
400+
"["
401+
+ invalidRepoName
402+
+ "] setting shallow_snapshot_v2 cannot be enabled for repository with __ in the name as this delimiter is used to create pinning entity",
403+
e.getMessage()
404+
);
405+
}
406+
407+
// Create repo with shallow snapshot V2 enabled
408+
createRepository(client, "test-repo-1", snapshotRepoSettings1);
409+
410+
logger.info("--> verify the repository");
411+
VerifyRepositoryResponse verifyRepositoryResponse = client.admin().cluster().prepareVerifyRepository("test-repo-1").get();
412+
assertNotNull(verifyRepositoryResponse.getNodes());
413+
414+
GetRepositoriesResponse getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-1").get();
415+
assertTrue(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings()));
416+
417+
Settings snapshotRepoSettings2 = Settings.builder()
418+
.put(node().settings())
419+
.put("location", OpenSearchIntegTestCase.randomRepoPath(node().settings()))
420+
.put(REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
421+
.put(SHALLOW_SNAPSHOT_V2.getKey(), true)
422+
.build();
423+
424+
// Create another repo with shallow snapshot V2 enabled, this should fail.
425+
try {
426+
createRepository(client, "test-repo-2", snapshotRepoSettings2);
427+
} catch (RepositoryException e) {
428+
assertEquals(
429+
"[test-repo-2] setting shallow_snapshot_v2 cannot be enabled as this setting can be enabled only on one repository and one or more repositories in the cluster have the setting as enabled",
430+
e.getMessage()
431+
);
432+
}
433+
434+
// Disable shallow snapshot V2 setting on test-repo-1
435+
updateRepository(
436+
client,
437+
"test-repo-1",
438+
Settings.builder().put(snapshotRepoSettings1).put(SHALLOW_SNAPSHOT_V2.getKey(), false).build()
439+
);
440+
getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-1").get();
441+
assertFalse(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings()));
442+
443+
// Create test-repo-2 with shallow snapshot V2 enabled, this should pass now.
444+
createRepository(client, "test-repo-2", snapshotRepoSettings2);
445+
getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-2").get();
446+
assertTrue(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings()));
447+
448+
final String indexName = "test-idx";
449+
createIndex(indexName);
450+
ensureGreen();
451+
indexDocuments(client, indexName);
452+
453+
// Create pinned timestamp snapshot in test-repo-2
454+
SnapshotInfo snapshotInfo = createSnapshot("test-repo-2", "test-snap-2", new ArrayList<>());
455+
assertNotNull(snapshotInfo.snapshotId());
456+
457+
// As snapshot is present, even after disabling shallow snapshot setting in test-repo-2, we will not be able to
458+
// enable shallow snapshot v2 setting in test-repo-1
459+
updateRepository(
460+
client,
461+
"test-repo-2",
462+
Settings.builder().put(snapshotRepoSettings2).put(SHALLOW_SNAPSHOT_V2.getKey(), false).build()
463+
);
464+
getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-2").get();
465+
assertFalse(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings()));
466+
467+
try {
468+
updateRepository(client, "test-repo-1", snapshotRepoSettings1);
469+
} catch (RepositoryException e) {
470+
assertEquals(
471+
"[test-repo-1] setting shallow_snapshot_v2 cannot be enabled if there are existing snapshots created with shallow V2 setting using different repository.",
472+
e.getMessage()
473+
);
474+
}
475+
476+
// After deleting the snapshot, we will be able to enable shallow snapshot v2 setting in test-repo-1
477+
AcknowledgedResponse deleteSnapshotResponse = client().admin().cluster().prepareDeleteSnapshot("test-repo-2", "test-snap-2").get();
478+
479+
assertAcked(deleteSnapshotResponse);
480+
481+
updateRepository(client, "test-repo-1", snapshotRepoSettings1);
482+
getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-1").get();
483+
assertTrue(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings()));
484+
485+
// Having a snapshot in the same repo should allow disabling and re-enabling shallow snapshot v2 setting
486+
snapshotInfo = createSnapshot("test-repo-1", "test-snap-1", new ArrayList<>());
487+
assertNotNull(snapshotInfo.snapshotId());
488+
updateRepository(
489+
client,
490+
"test-repo-1",
491+
Settings.builder().put(snapshotRepoSettings1).put(SHALLOW_SNAPSHOT_V2.getKey(), false).build()
492+
);
493+
getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-1").get();
494+
assertFalse(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings()));
495+
496+
updateRepository(client, "test-repo-1", snapshotRepoSettings1);
497+
getRepositoriesResponse = client.admin().cluster().prepareGetRepositories("test-repo-1").get();
498+
assertTrue(SHALLOW_SNAPSHOT_V2.get(getRepositoriesResponse.repositories().get(0).settings()));
499+
}
376500
}

0 commit comments

Comments
 (0)