Skip to content

Commit 1ef59a0

Browse files
Making changes for replication and recovery flow for writable warm index
Signed-off-by: Nishant Goel <nisgoel@amazon.com>
1 parent f14b5c8 commit 1ef59a0

File tree

7 files changed

+239
-36
lines changed

7 files changed

+239
-36
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.indices.replication;
1010

11+
import java.util.Objects;
1112
import org.apache.lucene.index.SegmentInfos;
1213
import org.opensearch.action.search.SearchResponse;
1314
import org.opensearch.cluster.ClusterState;
@@ -49,8 +50,6 @@
4950
import java.util.stream.Collectors;
5051

5152
import static java.util.Arrays.asList;
52-
import static org.opensearch.test.OpenSearchIntegTestCase.client;
53-
import static org.opensearch.test.OpenSearchTestCase.assertBusy;
5453
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
5554

5655
public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase {
@@ -245,4 +244,8 @@ protected SegmentInfos getLatestSegmentInfos(IndexShard shard) throws IOExceptio
245244
return closeable.get();
246245
}
247246
}
247+
248+
protected boolean warmIndexSegmentReplicationEnabled() {
249+
return Objects.equals(IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(), IndexModule.DataLocalityType.PARTIAL.name());
250+
}
248251
}

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java

+24-5
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.apache.lucene.index.IndexWriterConfig;
2121
import org.apache.lucene.index.SegmentInfos;
2222
import org.apache.lucene.index.StandardDirectoryReader;
23+
import org.apache.lucene.store.Directory;
24+
import org.apache.lucene.store.FilterDirectory;
2325
import org.apache.lucene.tests.util.TestUtil;
2426
import org.apache.lucene.util.BytesRef;
2527
import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse;
@@ -430,7 +432,6 @@ private void performReplicationAfterForceMerge(boolean primaryOnly, int expected
430432
) {
431433
indexer.start(initialDocCount);
432434
waitForDocs(initialDocCount, indexer);
433-
434435
flush(INDEX_NAME);
435436
waitForSearchableDocs(initialDocCount, nodeA, nodeB);
436437

@@ -450,7 +451,10 @@ private void performReplicationAfterForceMerge(boolean primaryOnly, int expected
450451
assertThat(forceMergeResponse.getFailedShards(), is(0));
451452
assertThat(forceMergeResponse.getSuccessfulShards(), is(expectedSuccessfulShards));
452453
refresh(INDEX_NAME);
453-
verifyStoreContent();
454+
//skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote store.
455+
if(!warmIndexSegmentReplicationEnabled()) {
456+
verifyStoreContent();
457+
}
454458
}
455459
}
456460

@@ -623,7 +627,7 @@ private void cancelDuringReplicaAction(String actionToblock) throws Exception {
623627
// this test stubs transport calls specific to node-node replication.
624628
assumeFalse(
625629
"Skipping the test as its not compatible with segment replication with remote store.",
626-
segmentReplicationWithRemoteEnabled()
630+
segmentReplicationWithRemoteEnabled() || warmIndexSegmentReplicationEnabled()
627631
);
628632
final String primaryNode = internalCluster().startDataOnlyNode();
629633
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
@@ -957,7 +961,10 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {
957961
}
958962
ensureGreen(INDEX_NAME);
959963
waitForSearchableDocs(docCount, primaryNode, replicaNode);
960-
verifyStoreContent();
964+
//skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote store.
965+
if (!warmIndexSegmentReplicationEnabled()) {
966+
verifyStoreContent();
967+
}
961968
final IndexShard replicaAfterFailure = getIndexShard(replicaNode, INDEX_NAME);
962969
assertNotEquals(replicaAfterFailure.routingEntry().allocationId().getId(), replicaShard.routingEntry().allocationId().getId());
963970
}
@@ -1068,6 +1075,12 @@ private void assertAllocationIdsInReplicaShardStats(Set<String> expected, Set<Se
10681075
* @throws Exception when issue is encountered
10691076
*/
10701077
public void testScrollCreatedOnReplica() throws Exception {
1078+
// Skipping this test in case of remote store enabled warm index
1079+
assumeFalse(
1080+
"Skipping the test as its not compatible with segment replication with remote store.",
1081+
warmIndexSegmentReplicationEnabled()
1082+
);
1083+
10711084
// create the cluster with one primary node containing primary shard and replica node containing replica shard
10721085
final String primary = internalCluster().startDataOnlyNode();
10731086
prepareCreate(
@@ -1179,7 +1192,7 @@ public void testScrollWithOngoingSegmentReplication() throws Exception {
11791192
// this test stubs transport calls specific to node-node replication.
11801193
assumeFalse(
11811194
"Skipping the test as its not compatible with segment replication with remote store.",
1182-
segmentReplicationWithRemoteEnabled()
1195+
segmentReplicationWithRemoteEnabled() || warmIndexSegmentReplicationEnabled()
11831196
);
11841197

11851198
// create the cluster with one primary node containing primary shard and replica node containing replica shard
@@ -1306,6 +1319,12 @@ public void testScrollWithOngoingSegmentReplication() throws Exception {
13061319
}
13071320

13081321
public void testPitCreatedOnReplica() throws Exception {
1322+
//// Skipping this test in case of remote store enabled warm index
1323+
assumeFalse(
1324+
"Skipping the test as its not compatible with segment replication with remote store.",
1325+
warmIndexSegmentReplicationEnabled()
1326+
);
1327+
13091328
final String primary = internalCluster().startDataOnlyNode();
13101329
createIndex(INDEX_NAME);
13111330
ensureYellowAndNoInitializingShards(INDEX_NAME);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices.replication;
10+
11+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
12+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
13+
14+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
15+
import java.nio.file.Path;
16+
import java.util.Locale;
17+
import java.util.Map;
18+
import java.util.stream.Collectors;
19+
import org.junit.After;
20+
import org.junit.Before;
21+
import org.opensearch.cluster.metadata.RepositoriesMetadata;
22+
import org.opensearch.cluster.metadata.RepositoryMetadata;
23+
import org.opensearch.cluster.node.DiscoveryNode;
24+
import org.opensearch.cluster.service.ClusterService;
25+
import org.opensearch.common.settings.Settings;
26+
import org.opensearch.common.util.FeatureFlags;
27+
import org.opensearch.index.IndexModule;
28+
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
29+
import org.opensearch.index.store.remote.filecache.FileCache;
30+
import org.opensearch.node.Node;
31+
import org.opensearch.repositories.RepositoriesService;
32+
import org.opensearch.repositories.blobstore.BlobStoreRepository;
33+
import org.opensearch.test.OpenSearchIntegTestCase;
34+
35+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
36+
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
37+
public class WarmIndexRemoteStoreSegmentReplicationIT extends SegmentReplicationIT {
38+
39+
protected static final String REPOSITORY_NAME = "test-remote-store-repo";
40+
protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2";
41+
42+
protected Path segmentRepoPath;
43+
protected Path translogRepoPath;
44+
protected boolean clusterSettingsSuppliedByTest = false;
45+
46+
@Before
47+
private void setup() {
48+
internalCluster().startClusterManagerOnlyNode();
49+
}
50+
51+
@Override
52+
public Settings indexSettings() {
53+
return Settings.builder()
54+
.put(super.indexSettings())
55+
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())
56+
.build();
57+
}
58+
59+
@Override
60+
protected Settings nodeSettings(int nodeOrdinal) {
61+
if (segmentRepoPath == null || translogRepoPath == null) {
62+
segmentRepoPath = randomRepoPath().toAbsolutePath();
63+
translogRepoPath = randomRepoPath().toAbsolutePath();
64+
}
65+
if (clusterSettingsSuppliedByTest) {
66+
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build();
67+
} else {
68+
return Settings.builder()
69+
.put(super.nodeSettings(nodeOrdinal))
70+
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath))
71+
//.put(RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), -1)
72+
.build();
73+
}
74+
}
75+
76+
@Override
77+
protected Settings featureFlagSettings() {
78+
Settings.Builder featureSettings = Settings.builder();
79+
featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true);
80+
81+
return featureSettings.build();
82+
}
83+
84+
@Override
85+
protected boolean addMockIndexStorePlugin() {
86+
return false;
87+
}
88+
89+
protected boolean warmIndexSegmentReplicationEnabled() {
90+
return true;
91+
}
92+
93+
@After
94+
public void teardown() {
95+
clusterSettingsSuppliedByTest = false;
96+
for (String nodeName : internalCluster().getNodeNames()) {
97+
logger.info("file cache node name is {}", nodeName);
98+
FileCache fileCache = internalCluster().getInstance(Node.class, nodeName).fileCache();
99+
fileCache.clear();
100+
}
101+
assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_NAME);
102+
assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_2_NAME);
103+
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
104+
clusterAdmin().prepareCleanupRepository(REPOSITORY_2_NAME).get();
105+
}
106+
107+
public RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String name) {
108+
Map<String, String> nodeAttributes = node.getAttributes();
109+
String type = nodeAttributes.get(String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name));
110+
111+
String settingsAttributeKeyPrefix = String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, name);
112+
Map<String, String> settingsMap = node.getAttributes()
113+
.keySet()
114+
.stream()
115+
.filter(key -> key.startsWith(settingsAttributeKeyPrefix))
116+
.collect(Collectors.toMap(key -> key.replace(settingsAttributeKeyPrefix, ""), key -> node.getAttributes().get(key)));
117+
118+
Settings.Builder settings = Settings.builder();
119+
settingsMap.entrySet().forEach(entry -> settings.put(entry.getKey(), entry.getValue()));
120+
settings.put(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING.getKey(), true);
121+
122+
return new RepositoryMetadata(name, type, settings.build());
123+
}
124+
125+
public void assertRemoteStoreRepositoryOnAllNodes(String repositoryName) {
126+
RepositoriesMetadata repositories = internalCluster().getInstance(ClusterService.class, internalCluster().getNodeNames()[0])
127+
.state()
128+
.metadata()
129+
.custom(RepositoriesMetadata.TYPE);
130+
RepositoryMetadata actualRepository = repositories.repository(repositoryName);
131+
132+
final RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
133+
final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repositoryName);
134+
135+
for (String nodeName : internalCluster().getNodeNames()) {
136+
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nodeName);
137+
DiscoveryNode node = clusterService.localNode();
138+
RepositoryMetadata expectedRepository = buildRepositoryMetadata(node, repositoryName);
139+
140+
// Validated that all the restricted settings are entact on all the nodes.
141+
repository.getRestrictedSystemRepositorySettings()
142+
.stream()
143+
.forEach(
144+
setting -> assertEquals(
145+
String.format(Locale.ROOT, "Restricted Settings mismatch [%s]", setting.getKey()),
146+
setting.get(actualRepository.settings()),
147+
setting.get(expectedRepository.settings())
148+
)
149+
);
150+
}
151+
}
152+
153+
}

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,9 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep
171171
// a lower gen from a newly elected primary shard that is behind this shard's last commit gen.
172172
// In that case we still commit into the next local generation.
173173
if (incomingGeneration != this.lastReceivedPrimaryGen) {
174-
flush(false, true);
174+
if(engineConfig.getIndexSettings().isStoreLocalityPartial() == false) {
175+
flush(false, true);
176+
}
175177
translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo);
176178
translogManager.rollTranslogGeneration();
177179
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

+13-7
Original file line numberDiff line numberDiff line change
@@ -5030,6 +5030,8 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE
50305030
*/
50315031
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runnable onFileSync) throws IOException {
50325032
boolean syncSegmentSuccess = false;
5033+
boolean shouldOverrideLocalFiles = overrideLocal && indexSettings.isStoreLocalityPartial() == false;
5034+
50335035
long startTimeMs = System.currentTimeMillis();
50345036
assert indexSettings.isRemoteStoreEnabled() || this.isRemoteSeeded();
50355037
logger.trace("Downloading segments from remote segment store");
@@ -5052,7 +5054,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
50525054
storeDirectory = new StoreRecovery.StatsDirectoryWrapper(store.directory(), recoveryState.getIndex());
50535055
for (String file : uploadedSegments.keySet()) {
50545056
long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum());
5055-
if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) {
5057+
if (shouldOverrideLocalFiles || localDirectoryContains(storeDirectory, file, checksum) == false) {
50565058
recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), false);
50575059
} else {
50585060
recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), true);
@@ -5061,7 +5063,9 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
50615063
} else {
50625064
storeDirectory = store.directory();
50635065
}
5064-
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync);
5066+
if (indexSettings.isStoreLocalityPartial() == false) {
5067+
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync);
5068+
}
50655069

50665070
if (remoteSegmentMetadata != null) {
50675071
final SegmentInfos infosSnapshot = store.buildSegmentInfos(
@@ -5071,13 +5075,15 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
50715075
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
50725076
// delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes.
50735077
// Extra segments will be wiped on engine open.
5074-
for (String file : List.of(store.directory().listAll())) {
5075-
if (file.startsWith(IndexFileNames.SEGMENTS)) {
5076-
store.deleteQuiet(file);
5078+
if (indexSettings.isStoreLocalityPartial() == false) {
5079+
for (String file : List.of(store.directory().listAll())) {
5080+
if (file.startsWith(IndexFileNames.SEGMENTS)) {
5081+
store.deleteQuiet(file);
5082+
}
50775083
}
5084+
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
5085+
: "There should not be any segments file in the dir";
50785086
}
5079-
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
5080-
: "There should not be any segments file in the dir";
50815087
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
50825088
}
50835089
syncSegmentSuccess = true;

server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,13 @@ public void getSegmentFiles(
117117
final List<String> toDownloadSegmentNames = new ArrayList<>();
118118
for (StoreFileMetadata fileMetadata : filesToFetch) {
119119
String file = fileMetadata.name();
120-
assert directoryFiles.contains(file) == false : "Local store already contains the file " + file;
120+
assert directoryFiles.contains(file) == false || indexShard.indexSettings().isStoreLocalityPartial() : "Local store already contains the file " + file;
121121
toDownloadSegmentNames.add(file);
122122
}
123+
if(indexShard.indexSettings().isStoreLocalityPartial()) {
124+
listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
125+
return;
126+
}
123127
indexShard.getFileDownloader()
124128
.downloadAsync(
125129
cancellableThreads,

0 commit comments

Comments
 (0)