Skip to content

Commit 2dc3f74

Browse files
authored
Add version details in remote index path file with code enhancements (opensearch-project#13386)
Signed-off-by: Ashish Singh <ssashish@amazon.com>
1 parent a47e123 commit 2dc3f74

File tree

9 files changed

+183
-105
lines changed

9 files changed

+183
-105
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2929
- [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992))
3030
- [Tiered caching] Make Indices Request Cache Stale Key Mgmt Threshold setting dynamic ([#12941](https://github.com/opensearch-project/OpenSearch/pull/12941))
3131
- Batch mode for async fetching shard information in GatewayAllocator for unassigned shards ([#8746](https://github.com/opensearch-project/OpenSearch/pull/8746))
32+
- [Remote Store] Add settings for remote path type and hash algorithm ([#13225](https://github.com/opensearch-project/OpenSearch/pull/13225))
33+
- [Remote Store] Upload remote paths during remote enabled index creation ([#13386](https://github.com/opensearch-project/OpenSearch/pull/13386))
3234

3335
### Dependencies
3436
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreUploadIndexPathIT.java

+21-18
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@
1313
import org.opensearch.common.settings.Settings;
1414
import org.opensearch.core.util.FileSystemUtils;
1515
import org.opensearch.index.remote.RemoteIndexPath;
16+
import org.opensearch.index.remote.RemoteIndexPathUploader;
1617
import org.opensearch.index.remote.RemoteStoreEnums;
1718
import org.opensearch.test.OpenSearchIntegTestCase;
1819

1920
import java.io.IOException;
20-
import java.util.Locale;
21+
import java.nio.file.Path;
22+
import java.util.Arrays;
2123
import java.util.concurrent.ExecutionException;
2224

2325
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
@@ -81,28 +83,29 @@ public void testRemoteIndexPathFileCreation() throws ExecutionException, Interru
8183

8284
}
8385

84-
private void validateRemoteIndexPathFile(boolean exists) {
86+
private void validateRemoteIndexPathFile(boolean exists) throws IOException {
8587
String indexUUID = client().admin()
8688
.indices()
8789
.prepareGetSettings(INDEX_NAME)
8890
.get()
8991
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
90-
92+
String fileName = generatePartFileName(indexUUID);
9193
assertEquals(exists, FileSystemUtils.exists(translogRepoPath.resolve(RemoteIndexPath.DIR)));
92-
assertEquals(
93-
exists,
94-
FileSystemUtils.exists(
95-
translogRepoPath.resolve(RemoteIndexPath.DIR)
96-
.resolve(String.format(Locale.ROOT, RemoteIndexPath.FILE_NAME_FORMAT, indexUUID))
97-
)
98-
);
99-
assertEquals(exists, FileSystemUtils.exists(segmentRepoPath.resolve(RemoteIndexPath.DIR)));
100-
assertEquals(
101-
exists,
102-
FileSystemUtils.exists(
103-
segmentRepoPath.resolve(RemoteIndexPath.DIR)
104-
.resolve(String.format(Locale.ROOT, RemoteIndexPath.FILE_NAME_FORMAT, indexUUID))
105-
)
106-
);
94+
if (exists) {
95+
Path[] files = FileSystemUtils.files(translogRepoPath.resolve(RemoteIndexPath.DIR));
96+
assertEquals(1, files.length);
97+
assertTrue(Arrays.stream(files).anyMatch(file -> file.toString().contains(fileName)));
98+
String translogPathFile = files[0].toString();
99+
assertTrue(FileSystemUtils.exists(segmentRepoPath.resolve(RemoteIndexPath.DIR)));
100+
files = FileSystemUtils.files(segmentRepoPath.resolve(RemoteIndexPath.DIR));
101+
assertEquals(1, files.length);
102+
assertTrue(Arrays.stream(files).anyMatch(file -> file.toString().contains(fileName)));
103+
String segmentPathFile = files[0].toString();
104+
assertNotEquals(translogPathFile, segmentPathFile);
105+
}
106+
}
107+
108+
private String generatePartFileName(String indexUUID) {
109+
return String.join(RemoteIndexPathUploader.DELIMITER, indexUUID, "2", RemoteIndexPath.DEFAULT_VERSION);
107110
}
108111
}

server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadListener.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.threadpool.ThreadPool;
1414

1515
import java.util.List;
16+
import java.util.Map;
1617
import java.util.Objects;
1718
import java.util.concurrent.ExecutorService;
1819

@@ -41,9 +42,17 @@ public IndexMetadataUploadListener(ThreadPool threadPool, String threadPoolName)
4142
* @param indexMetadataList list of index metadata of new indexes (or first time index metadata upload).
4243
* @param actionListener listener to be invoked on success or failure.
4344
*/
44-
public final void onNewIndexUpload(List<IndexMetadata> indexMetadataList, ActionListener<Void> actionListener) {
45-
executorService.execute(() -> doOnNewIndexUpload(indexMetadataList, actionListener));
45+
public final void onUpload(
46+
List<IndexMetadata> indexMetadataList,
47+
Map<String, IndexMetadata> prevIndexMetadataByName,
48+
ActionListener<Void> actionListener
49+
) {
50+
executorService.execute(() -> doOnUpload(indexMetadataList, prevIndexMetadataByName, actionListener));
4651
}
4752

48-
protected abstract void doOnNewIndexUpload(List<IndexMetadata> indexMetadataList, ActionListener<Void> actionListener);
53+
protected abstract void doOnUpload(
54+
List<IndexMetadata> indexMetadataList,
55+
Map<String, IndexMetadata> prevIndexMetadataByName,
56+
ActionListener<Void> actionListener
57+
);
4958
}

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

+25-20
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri
242242
final List<UploadedIndexMetadata> allUploadedIndexMetadata = writeIndexMetadataParallel(
243243
clusterState,
244244
toUpload,
245-
ClusterState.UNKNOWN_UUID.equals(previousClusterUUID) ? toUpload : Collections.emptyList()
245+
Collections.emptyMap()
246246
);
247247
final ClusterMetadataManifest manifest = uploadManifest(
248248
clusterState,
@@ -307,9 +307,9 @@ public ClusterMetadataManifest writeIncrementalMetadata(
307307
}
308308

309309
// Write Index Metadata
310-
final Map<String, Long> previousStateIndexMetadataVersionByName = new HashMap<>();
310+
final Map<String, IndexMetadata> previousStateIndexMetadataByName = new HashMap<>();
311311
for (final IndexMetadata indexMetadata : previousClusterState.metadata().indices().values()) {
312-
previousStateIndexMetadataVersionByName.put(indexMetadata.getIndex().getName(), indexMetadata.getVersion());
312+
previousStateIndexMetadataByName.put(indexMetadata.getIndex().getName(), indexMetadata);
313313
}
314314

315315
int numIndicesUpdated = 0;
@@ -319,9 +319,12 @@ public ClusterMetadataManifest writeIncrementalMetadata(
319319
.collect(Collectors.toMap(UploadedIndexMetadata::getIndexName, Function.identity()));
320320

321321
List<IndexMetadata> toUpload = new ArrayList<>();
322-
List<IndexMetadata> newIndexMetadataList = new ArrayList<>();
322+
// We prepare a map that contains the previous index metadata for the indexes for which version has changed.
323+
Map<String, IndexMetadata> prevIndexMetadataByName = new HashMap<>();
323324
for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) {
324-
final Long previousVersion = previousStateIndexMetadataVersionByName.get(indexMetadata.getIndex().getName());
325+
String indexName = indexMetadata.getIndex().getName();
326+
final IndexMetadata prevIndexMetadata = previousStateIndexMetadataByName.get(indexName);
327+
Long previousVersion = prevIndexMetadata != null ? prevIndexMetadata.getVersion() : null;
325328
if (previousVersion == null || indexMetadata.getVersion() != previousVersion) {
326329
logger.debug(
327330
"updating metadata for [{}], changing version from [{}] to [{}]",
@@ -331,22 +334,19 @@ public ClusterMetadataManifest writeIncrementalMetadata(
331334
);
332335
numIndicesUpdated++;
333336
toUpload.add(indexMetadata);
337+
prevIndexMetadataByName.put(indexName, prevIndexMetadata);
334338
} else {
335339
numIndicesUnchanged++;
336340
}
337-
previousStateIndexMetadataVersionByName.remove(indexMetadata.getIndex().getName());
338-
// Adding the indexMetadata to newIndexMetadataList if there is no previous version present for the index.
339-
if (previousVersion == null) {
340-
newIndexMetadataList.add(indexMetadata);
341-
}
341+
previousStateIndexMetadataByName.remove(indexMetadata.getIndex().getName());
342342
}
343343

344-
List<UploadedIndexMetadata> uploadedIndexMetadataList = writeIndexMetadataParallel(clusterState, toUpload, newIndexMetadataList);
344+
List<UploadedIndexMetadata> uploadedIndexMetadataList = writeIndexMetadataParallel(clusterState, toUpload, prevIndexMetadataByName);
345345
uploadedIndexMetadataList.forEach(
346346
uploadedIndexMetadata -> allUploadedIndexMetadata.put(uploadedIndexMetadata.getIndexName(), uploadedIndexMetadata)
347347
);
348348

349-
for (String removedIndexName : previousStateIndexMetadataVersionByName.keySet()) {
349+
for (String removedIndexName : previousStateIndexMetadataByName.keySet()) {
350350
allUploadedIndexMetadata.remove(removedIndexName);
351351
}
352352
final ClusterMetadataManifest manifest = uploadManifest(
@@ -452,7 +452,7 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException
452452
private List<UploadedIndexMetadata> writeIndexMetadataParallel(
453453
ClusterState clusterState,
454454
List<IndexMetadata> toUpload,
455-
List<IndexMetadata> newIndexMetadataList
455+
Map<String, IndexMetadata> prevIndexMetadataByName
456456
) throws IOException {
457457
assert Objects.nonNull(indexMetadataUploadListeners) : "indexMetadataUploadListeners can not be null";
458458
int latchCount = toUpload.size() + indexMetadataUploadListeners.size();
@@ -482,7 +482,7 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(
482482
writeIndexMetadataAsync(clusterState, indexMetadata, latchedActionListener);
483483
}
484484

485-
invokeIndexMetadataUploadListeners(newIndexMetadataList, latch, exceptionList);
485+
invokeIndexMetadataUploadListeners(toUpload, prevIndexMetadataByName, latch, exceptionList);
486486

487487
try {
488488
if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
@@ -527,22 +527,25 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(
527527
* Invokes the index metadata upload listener but does not wait for the execution to complete.
528528
*/
529529
private void invokeIndexMetadataUploadListeners(
530-
List<IndexMetadata> newIndexMetadataList,
530+
List<IndexMetadata> updatedIndexMetadataList,
531+
Map<String, IndexMetadata> prevIndexMetadataByName,
531532
CountDownLatch latch,
532533
List<Exception> exceptionList
533534
) {
534535
for (IndexMetadataUploadListener listener : indexMetadataUploadListeners) {
535536
String listenerName = listener.getClass().getSimpleName();
536-
listener.onNewIndexUpload(
537-
newIndexMetadataList,
538-
getIndexMetadataUploadActionListener(newIndexMetadataList, latch, exceptionList, listenerName)
537+
listener.onUpload(
538+
updatedIndexMetadataList,
539+
prevIndexMetadataByName,
540+
getIndexMetadataUploadActionListener(updatedIndexMetadataList, prevIndexMetadataByName, latch, exceptionList, listenerName)
539541
);
540542
}
541543

542544
}
543545

544546
private ActionListener<Void> getIndexMetadataUploadActionListener(
545547
List<IndexMetadata> newIndexMetadataList,
548+
Map<String, IndexMetadata> prevIndexMetadataByName,
546549
CountDownLatch latch,
547550
List<Exception> exceptionList,
548551
String listenerName
@@ -552,18 +555,20 @@ private ActionListener<Void> getIndexMetadataUploadActionListener(
552555
ActionListener.wrap(
553556
ignored -> logger.trace(
554557
new ParameterizedMessage(
555-
"{} : Invoked listener={} successfully tookTimeNs={}",
558+
"listener={} : Invoked successfully with indexMetadataList={} prevIndexMetadataList={} tookTimeNs={}",
556559
listenerName,
557560
newIndexMetadataList,
561+
prevIndexMetadataByName.values(),
558562
(System.nanoTime() - startTime)
559563
)
560564
),
561565
ex -> {
562566
logger.error(
563567
new ParameterizedMessage(
564-
"{} : Exception during invocation of listener={} tookTimeNs={}",
568+
"listener={} : Exception during invocation with indexMetadataList={} prevIndexMetadataList={} tookTimeNs={}",
565569
listenerName,
566570
newIndexMetadataList,
571+
prevIndexMetadataByName.values(),
567572
(System.nanoTime() - startTime)
568573
),
569574
ex

server/src/main/java/org/opensearch/index/IndexSettings.java

+2-17
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,8 @@
4848
import org.opensearch.core.common.unit.ByteSizeUnit;
4949
import org.opensearch.core.common.unit.ByteSizeValue;
5050
import org.opensearch.core.index.Index;
51-
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
52-
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
5351
import org.opensearch.index.remote.RemoteStorePathStrategy;
52+
import org.opensearch.index.remote.RemoteStoreUtils;
5453
import org.opensearch.index.translog.Translog;
5554
import org.opensearch.indices.replication.common.ReplicationType;
5655
import org.opensearch.ingest.IngestService;
@@ -62,8 +61,6 @@
6261
import java.util.Collections;
6362
import java.util.List;
6463
import java.util.Locale;
65-
import java.util.Map;
66-
import java.util.Objects;
6764
import java.util.Optional;
6865
import java.util.concurrent.TimeUnit;
6966
import java.util.function.Consumer;
@@ -990,7 +987,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
990987
*/
991988
widenIndexSortType = IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(settings).before(V_2_7_0);
992989
assignedOnRemoteNode = RemoteStoreNodeAttribute.isRemoteDataAttributePresent(this.getNodeSettings());
993-
remoteStorePathStrategy = determineRemoteStorePathStrategy();
990+
remoteStorePathStrategy = RemoteStoreUtils.determineRemoteStorePathStrategy(indexMetadata);
994991

995992
setEnableFuzzySetForDocId(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING));
996993
setDocIdFuzzySetFalsePositiveProbability(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING));
@@ -1911,18 +1908,6 @@ public void setDocIdFuzzySetFalsePositiveProbability(double docIdFuzzySetFalsePo
19111908
this.docIdFuzzySetFalsePositiveProbability = docIdFuzzySetFalsePositiveProbability;
19121909
}
19131910

1914-
private RemoteStorePathStrategy determineRemoteStorePathStrategy() {
1915-
Map<String, String> remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
1916-
assert remoteCustomData == null || remoteCustomData.containsKey(PathType.NAME);
1917-
if (remoteCustomData != null && remoteCustomData.containsKey(PathType.NAME)) {
1918-
PathType pathType = PathType.parseString(remoteCustomData.get(PathType.NAME));
1919-
String hashAlgoStr = remoteCustomData.get(PathHashAlgorithm.NAME);
1920-
PathHashAlgorithm hashAlgorithm = Objects.nonNull(hashAlgoStr) ? PathHashAlgorithm.parseString(hashAlgoStr) : null;
1921-
return new RemoteStorePathStrategy(pathType, hashAlgorithm);
1922-
}
1923-
return new RemoteStorePathStrategy(PathType.FIXED);
1924-
}
1925-
19261911
public RemoteStorePathStrategy getRemoteStorePathStrategy() {
19271912
return remoteStorePathStrategy;
19281913
}

server/src/main/java/org/opensearch/index/remote/RemoteIndexPath.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,16 @@ public class RemoteIndexPath implements ToXContentFragment {
5252
combinedPath.putAll(SEGMENT_PATH);
5353
COMBINED_PATH = Collections.unmodifiableMap(combinedPath);
5454
}
55-
private static final String DEFAULT_VERSION = "1";
55+
public static final String DEFAULT_VERSION = "1";
5656
public static final String DIR = "remote-index-path";
5757
public static final String FILE_NAME_FORMAT = "remote_path_%s";
5858
static final String KEY_VERSION = "version";
5959
static final String KEY_INDEX_UUID = "index_uuid";
6060
static final String KEY_SHARD_COUNT = "shard_count";
6161
static final String KEY_PATH_CREATION_MAP = "path_creation_map";
6262
static final String KEY_PATHS = "paths";
63+
64+
private final String version;
6365
private final String indexUUID;
6466
private final int shardCount;
6567
private final Iterable<String> basePath;
@@ -109,6 +111,7 @@ public RemoteIndexPath(
109111
.getFormattedMessage()
110112
);
111113
}
114+
this.version = DEFAULT_VERSION;
112115
this.indexUUID = indexUUID;
113116
this.shardCount = shardCount;
114117
this.basePath = basePath;
@@ -119,7 +122,7 @@ public RemoteIndexPath(
119122

120123
@Override
121124
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
122-
builder.field(KEY_VERSION, DEFAULT_VERSION);
125+
builder.field(KEY_VERSION, version);
123126
builder.field(KEY_INDEX_UUID, indexUUID);
124127
builder.field(KEY_SHARD_COUNT, shardCount);
125128
builder.field(PathType.NAME, pathType.name());
@@ -156,4 +159,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
156159
public static RemoteIndexPath fromXContent(XContentParser ignored) {
157160
throw new UnsupportedOperationException("RemoteIndexPath.fromXContent() is not supported");
158161
}
162+
163+
String getVersion() {
164+
return version;
165+
}
159166
}

0 commit comments

Comments
 (0)