Skip to content

Commit bad49ef

Browse files
authored
Add implementation for remote store path types (#13103)
Signed-off-by: Ashish Singh <ssashish@amazon.com>
1 parent d202d90 commit bad49ef

File tree

14 files changed

+623
-63
lines changed

14 files changed

+623
-63
lines changed

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java

+19-6
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.opensearch.cluster.ClusterState;
2222
import org.opensearch.cluster.metadata.IndexMetadata;
2323
import org.opensearch.common.Nullable;
24+
import org.opensearch.common.blobstore.BlobPath;
2425
import org.opensearch.common.io.PathUtils;
2526
import org.opensearch.common.settings.Settings;
2627
import org.opensearch.common.util.io.IOUtils;
@@ -56,6 +57,10 @@
5657

5758
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY;
5859
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
60+
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
61+
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
62+
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
63+
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
5964
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING;
6065
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
6166
import static org.hamcrest.Matchers.equalTo;
@@ -279,6 +284,11 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() {
279284
String restoredIndexName1version1 = indexName1 + "-restored-1";
280285
String restoredIndexName1version2 = indexName1 + "-restored-2";
281286

287+
client(clusterManagerNode).admin()
288+
.cluster()
289+
.prepareUpdateSettings()
290+
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING.getKey(), PathType.FIXED))
291+
.get();
282292
createRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, true));
283293
Client client = client();
284294
Settings indexSettings = getIndexSettings(1, 0).build();
@@ -476,12 +486,15 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {
476486
}
477487

478488
void assertRemoteSegmentsAndTranslogUploaded(String idx) throws IOException {
479-
String indexUUID = client().admin().indices().prepareGetSettings(idx).get().getSetting(idx, IndexMetadata.SETTING_INDEX_UUID);
480-
481-
Path remoteTranslogMetadataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/translog/metadata");
482-
Path remoteTranslogDataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/translog/data");
483-
Path segmentMetadataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/segments/metadata");
484-
Path segmentDataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/segments/data");
489+
Client client = client();
490+
String path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, METADATA).buildAsString();
491+
Path remoteTranslogMetadataPath = Path.of(remoteRepoPath + "/" + path);
492+
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, DATA).buildAsString();
493+
Path remoteTranslogDataPath = Path.of(remoteRepoPath + "/" + path);
494+
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, METADATA).buildAsString();
495+
Path segmentMetadataPath = Path.of(remoteRepoPath + "/" + path);
496+
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, DATA).buildAsString();
497+
Path segmentDataPath = Path.of(remoteRepoPath + "/" + path);
485498

486499
try (
487500
Stream<Path> translogMetadata = Files.list(remoteTranslogMetadataPath);

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java

+17-31
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.opensearch.cluster.routing.RecoverySource;
2424
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
2525
import org.opensearch.common.Priority;
26+
import org.opensearch.common.blobstore.BlobPath;
2627
import org.opensearch.common.settings.Settings;
2728
import org.opensearch.common.unit.TimeValue;
2829
import org.opensearch.common.util.concurrent.BufferedAsyncIOProcessor;
@@ -57,7 +58,11 @@
5758

5859
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
5960
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
61+
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
62+
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
63+
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
6064
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
65+
import static org.opensearch.test.OpenSearchTestCase.getShardLevelBlobPath;
6166
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
6267
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
6368
import static org.hamcrest.Matchers.comparesEqualTo;
@@ -182,13 +187,9 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
182187
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
183188
int numberOfIterations = randomIntBetween(5, 15);
184189
indexData(numberOfIterations, true, INDEX_NAME);
185-
String indexUUID = client().admin()
186-
.indices()
187-
.prepareGetSettings(INDEX_NAME)
188-
.get()
189-
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
190-
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");
191-
190+
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
191+
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
192+
;
192193
IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME);
193194
int lastNMetadataFilesToKeep = indexShard.getRemoteStoreSettings().getMinRemoteSegmentMetadataFiles();
194195
// Delete is async.
@@ -212,12 +213,8 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
212213
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
213214
int numberOfIterations = randomIntBetween(5, 15);
214215
indexData(numberOfIterations, false, INDEX_NAME);
215-
String indexUUID = client().admin()
216-
.indices()
217-
.prepareGetSettings(INDEX_NAME)
218-
.get()
219-
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
220-
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");
216+
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
217+
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
221218
int actualFileCount = getFileCount(indexPath);
222219
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
223220
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1)));
@@ -231,12 +228,8 @@ public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception {
231228
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
232229
int numberOfIterations = randomIntBetween(5, 15);
233230
indexData(numberOfIterations, true, INDEX_NAME);
234-
String indexUUID = client().admin()
235-
.indices()
236-
.prepareGetSettings(INDEX_NAME)
237-
.get()
238-
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
239-
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");
231+
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
232+
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
240233
int actualFileCount = getFileCount(indexPath);
241234
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
242235
MatcherAssert.assertThat(actualFileCount, is(oneOf(4)));
@@ -250,12 +243,9 @@ public void testStaleCommitDeletionWithMinSegmentFiles_Disabled() throws Excepti
250243
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
251244
int numberOfIterations = randomIntBetween(12, 18);
252245
indexData(numberOfIterations, true, INDEX_NAME);
253-
String indexUUID = client().admin()
254-
.indices()
255-
.prepareGetSettings(INDEX_NAME)
256-
.get()
257-
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
258-
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");
246+
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
247+
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
248+
;
259249
int actualFileCount = getFileCount(indexPath);
260250
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
261251
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations + 1)));
@@ -589,12 +579,8 @@ public void testFallbackToNodeToNodeSegmentCopy() throws Exception {
589579
flushAndRefresh(INDEX_NAME);
590580

591581
// 3. Delete data from remote segment store
592-
String indexUUID = client().admin()
593-
.indices()
594-
.prepareGetSettings(INDEX_NAME)
595-
.get()
596-
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
597-
Path segmentDataPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/data");
582+
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, DATA).buildAsString();
583+
Path segmentDataPath = Path.of(segmentRepoPath + "/" + shardPath);
598584

599585
try (Stream<Path> files = Files.list(segmentDataPath)) {
600586
files.forEach(p -> {

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
1212
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
1313
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
14+
import org.opensearch.common.blobstore.BlobPath;
1415
import org.opensearch.common.settings.Settings;
1516
import org.opensearch.test.OpenSearchIntegTestCase;
1617

@@ -22,7 +23,10 @@
2223
import java.util.Set;
2324
import java.util.concurrent.TimeUnit;
2425

26+
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
27+
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
2528
import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;
29+
import static org.opensearch.test.OpenSearchTestCase.getShardLevelBlobPath;
2630

2731
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
2832
public class RemoteStoreRefreshListenerIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {
@@ -45,8 +49,10 @@ public void testRemoteRefreshRetryOnFailure() throws Exception {
4549
IndicesStatsResponse response = client().admin().indices().stats(new IndicesStatsRequest()).get();
4650
assertEquals(1, response.getShards().length);
4751

52+
String indexName = response.getShards()[0].getShardRouting().index().getName();
4853
String indexUuid = response.getShards()[0].getShardRouting().index().getUUID();
49-
Path segmentDataRepoPath = location.resolve(String.format(Locale.ROOT, "%s/0/segments/data", indexUuid));
54+
String shardPath = getShardLevelBlobPath(client(), indexName, new BlobPath(), "0", SEGMENTS, DATA).buildAsString();
55+
Path segmentDataRepoPath = location.resolve(shardPath);
5056
String segmentDataLocalPath = String.format(Locale.ROOT, "%s/indices/%s/0/index", response.getShards()[0].getDataPath(), indexUuid);
5157

5258
logger.info("--> Verify that the segment files are same on local and repository eventually");

server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java

+25-2
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,14 @@
1414
import org.opensearch.cluster.metadata.IndexMetadata;
1515
import org.opensearch.common.UUIDs;
1616
import org.opensearch.common.action.ActionFuture;
17+
import org.opensearch.common.blobstore.BlobContainer;
18+
import org.opensearch.common.blobstore.BlobPath;
1719
import org.opensearch.common.settings.Settings;
1820
import org.opensearch.common.unit.TimeValue;
21+
import org.opensearch.index.store.RemoteBufferedOutputDirectory;
1922
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
23+
import org.opensearch.repositories.RepositoriesService;
24+
import org.opensearch.repositories.blobstore.BlobStoreRepository;
2025
import org.opensearch.test.OpenSearchIntegTestCase;
2126

2227
import java.nio.file.Path;
@@ -27,6 +32,8 @@
2732
import java.util.concurrent.TimeUnit;
2833
import java.util.stream.Stream;
2934

35+
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
36+
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.LOCK_FILES;
3037
import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
3138
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
3239
import static org.hamcrest.Matchers.comparesEqualTo;
@@ -307,7 +314,21 @@ public void testRemoteStoreCleanupForDeletedIndex() throws Exception {
307314
SnapshotInfo snapshotInfo1 = createFullSnapshot(snapshotRepoName, "snap1");
308315
SnapshotInfo snapshotInfo2 = createFullSnapshot(snapshotRepoName, "snap2");
309316

310-
String[] lockFiles = getLockFilesInRemoteStore(remoteStoreEnabledIndexName, REMOTE_REPO_NAME);
317+
final RepositoriesService repositoriesService = internalCluster().getCurrentClusterManagerNodeInstance(RepositoriesService.class);
318+
final BlobStoreRepository remoteStoreRepository = (BlobStoreRepository) repositoriesService.repository(REMOTE_REPO_NAME);
319+
BlobPath shardLevelBlobPath = getShardLevelBlobPath(
320+
client(),
321+
remoteStoreEnabledIndexName,
322+
remoteStoreRepository.basePath(),
323+
"0",
324+
SEGMENTS,
325+
LOCK_FILES
326+
);
327+
BlobContainer blobContainer = remoteStoreRepository.blobStore().blobContainer(shardLevelBlobPath);
328+
String[] lockFiles;
329+
try (RemoteBufferedOutputDirectory lockDirectory = new RemoteBufferedOutputDirectory(blobContainer)) {
330+
lockFiles = lockDirectory.listAll();
331+
}
311332
assert (lockFiles.length == 2) : "lock files are " + Arrays.toString(lockFiles);
312333

313334
// delete remote store index
@@ -320,7 +341,9 @@ public void testRemoteStoreCleanupForDeletedIndex() throws Exception {
320341
.get();
321342
assertAcked(deleteSnapshotResponse);
322343

323-
lockFiles = getLockFilesInRemoteStore(remoteStoreEnabledIndexName, REMOTE_REPO_NAME, indexUUID);
344+
try (RemoteBufferedOutputDirectory lockDirectory = new RemoteBufferedOutputDirectory(blobContainer)) {
345+
lockFiles = lockDirectory.listAll();
346+
}
324347
assert (lockFiles.length == 1) : "lock files are " + Arrays.toString(lockFiles);
325348
assertTrue(lockFiles[0].contains(snapshotInfo2.snapshotId().getUUID()));
326349

server/src/main/java/org/opensearch/common/blobstore/BlobPath.java

+9
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,15 @@ public BlobPath add(String path) {
7979
return new BlobPath(Collections.unmodifiableList(paths));
8080
}
8181

82+
/**
83+
* Add additional level of paths to the existing path and returns new {@link BlobPath} with the updated paths.
84+
*/
85+
public BlobPath add(Iterable<String> paths) {
86+
List<String> updatedPaths = new ArrayList<>(this.paths);
87+
paths.iterator().forEachRemaining(updatedPaths::add);
88+
return new BlobPath(Collections.unmodifiableList(updatedPaths));
89+
}
90+
8291
public String buildAsString() {
8392
String p = String.join(SEPARATOR, paths);
8493
if (p.isEmpty() || p.endsWith(SEPARATOR)) {

server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java

+25-5
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,27 @@ boolean requiresHashAlgorithm() {
103103
HASHED_PREFIX(1) {
104104
@Override
105105
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
106-
// TODO - We need to implement this, keeping the same path as Fixed for sake of multiple tests that can fail otherwise.
107-
// throw new UnsupportedOperationException("Not implemented"); --> Not using this for unblocking couple of tests.
106+
assert Objects.nonNull(hashAlgorithm) : "hashAlgorithm is expected to be non-null";
107+
return BlobPath.cleanPath()
108+
.add(hashAlgorithm.hash(pathInput))
109+
.add(pathInput.basePath())
110+
.add(pathInput.indexUUID())
111+
.add(pathInput.shardId())
112+
.add(pathInput.dataCategory().getName())
113+
.add(pathInput.dataType().getName());
114+
}
115+
116+
@Override
117+
boolean requiresHashAlgorithm() {
118+
return true;
119+
}
120+
},
121+
HASHED_INFIX(2) {
122+
@Override
123+
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
124+
assert Objects.nonNull(hashAlgorithm) : "hashAlgorithm is expected to be non-null";
108125
return pathInput.basePath()
126+
.add(hashAlgorithm.hash(pathInput))
109127
.add(pathInput.indexUUID())
110128
.add(pathInput.shardId())
111129
.add(pathInput.dataCategory().getName())
@@ -200,10 +218,11 @@ public enum PathHashAlgorithm {
200218

201219
FNV_1A(0) {
202220
@Override
203-
long hash(PathInput pathInput) {
221+
String hash(PathInput pathInput) {
204222
String input = pathInput.indexUUID() + pathInput.shardId() + pathInput.dataCategory().getName() + pathInput.dataType()
205223
.getName();
206-
return FNV1a.hash32(input);
224+
long hash = FNV1a.hash64(input);
225+
return RemoteStoreUtils.longToUrlBase64(hash);
207226
}
208227
};
209228

@@ -218,6 +237,7 @@ public int getCode() {
218237
}
219238

220239
private static final Map<Integer, PathHashAlgorithm> CODE_TO_ENUM;
240+
221241
static {
222242
PathHashAlgorithm[] values = values();
223243
Map<Integer, PathHashAlgorithm> codeToStatus = new HashMap<>(values.length);
@@ -240,7 +260,7 @@ public static PathHashAlgorithm fromCode(int code) {
240260
return CODE_TO_ENUM.get(code);
241261
}
242262

243-
abstract long hash(PathInput pathInput);
263+
abstract String hash(PathInput pathInput);
244264

245265
public static PathHashAlgorithm parseString(String pathHashAlgorithm) {
246266
try {

server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java

+15
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010

1111
import org.opensearch.common.collect.Tuple;
1212

13+
import java.nio.ByteBuffer;
1314
import java.util.Arrays;
15+
import java.util.Base64;
1416
import java.util.HashMap;
1517
import java.util.List;
1618
import java.util.Map;
@@ -101,4 +103,17 @@ public static void verifyNoMultipleWriters(List<String> mdFiles, Function<String
101103
});
102104
}
103105

106+
/**
107+
* Converts an input hash which occupies 64 bits of space into Base64 (6 bits per character) String. This must not
108+
* be changed as it is used for creating path for storing remote store data on the remote store.
109+
* This converts the byte array to base 64 string. `/` is replaced with `_`, `+` is replaced with `-` and `=`
110+
* which is padded at the last is also removed. These characters are either used as delimiter or special character
111+
* requiring special handling in some vendors. The characters present in this base64 version are [A-Za-z0-9_-].
112+
* This must not be changed as it is used for creating path for storing remote store data on the remote store.
113+
*/
114+
static String longToUrlBase64(long value) {
115+
byte[] hashBytes = ByteBuffer.allocate(Long.BYTES).putLong(value).array();
116+
String base64Str = Base64.getUrlEncoder().encodeToString(hashBytes);
117+
return base64Str.substring(0, base64Str.length() - 1);
118+
}
104119
}

0 commit comments

Comments
 (0)