Skip to content

Commit 41fd1ec

Browse files
Fixing github workflows and clearing up IT file
Signed-off-by: Nishant Goel <nisgoel@amazon.com>
1 parent 1ef59a0 commit 41fd1ec

File tree

6 files changed

+30
-98
lines changed

6 files changed

+30
-98
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
package org.opensearch.indices.replication;
1010

11-
import java.util.Objects;
1211
import org.apache.lucene.index.SegmentInfos;
1312
import org.opensearch.action.search.SearchResponse;
1413
import org.opensearch.cluster.ClusterState;
@@ -43,6 +42,7 @@
4342
import java.util.Collection;
4443
import java.util.List;
4544
import java.util.Map;
45+
import java.util.Objects;
4646
import java.util.Optional;
4747
import java.util.Set;
4848
import java.util.concurrent.CountDownLatch;
@@ -246,6 +246,9 @@ protected SegmentInfos getLatestSegmentInfos(IndexShard shard) throws IOExceptio
246246
}
247247

248248
protected boolean warmIndexSegmentReplicationEnabled() {
249-
return Objects.equals(IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(), IndexModule.DataLocalityType.PARTIAL.name());
249+
return Objects.equals(
250+
IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(),
251+
IndexModule.DataLocalityType.PARTIAL.name()
252+
);
250253
}
251254
}

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
import org.apache.lucene.index.IndexWriterConfig;
2121
import org.apache.lucene.index.SegmentInfos;
2222
import org.apache.lucene.index.StandardDirectoryReader;
23-
import org.apache.lucene.store.Directory;
24-
import org.apache.lucene.store.FilterDirectory;
2523
import org.apache.lucene.tests.util.TestUtil;
2624
import org.apache.lucene.util.BytesRef;
2725
import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse;
@@ -451,8 +449,9 @@ private void performReplicationAfterForceMerge(boolean primaryOnly, int expected
451449
assertThat(forceMergeResponse.getFailedShards(), is(0));
452450
assertThat(forceMergeResponse.getSuccessfulShards(), is(expectedSuccessfulShards));
453451
refresh(INDEX_NAME);
454-
//skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote store.
455-
if(!warmIndexSegmentReplicationEnabled()) {
452+
// skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote
453+
// store.
454+
if (!warmIndexSegmentReplicationEnabled()) {
456455
verifyStoreContent();
457456
}
458457
}
@@ -961,7 +960,8 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {
961960
}
962961
ensureGreen(INDEX_NAME);
963962
waitForSearchableDocs(docCount, primaryNode, replicaNode);
964-
//skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote store.
963+
// skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote
964+
// store.
965965
if (!warmIndexSegmentReplicationEnabled()) {
966966
verifyStoreContent();
967967
}

server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexRemoteStoreSegmentReplicationIT.java

+12-83
Original file line numberDiff line numberDiff line change
@@ -8,40 +8,26 @@
88

99
package org.opensearch.indices.replication;
1010

11-
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
12-
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
13-
1411
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
15-
import java.nio.file.Path;
16-
import java.util.Locale;
17-
import java.util.Map;
18-
import java.util.stream.Collectors;
19-
import org.junit.After;
20-
import org.junit.Before;
21-
import org.opensearch.cluster.metadata.RepositoriesMetadata;
22-
import org.opensearch.cluster.metadata.RepositoryMetadata;
23-
import org.opensearch.cluster.node.DiscoveryNode;
24-
import org.opensearch.cluster.service.ClusterService;
12+
2513
import org.opensearch.common.settings.Settings;
2614
import org.opensearch.common.util.FeatureFlags;
2715
import org.opensearch.index.IndexModule;
2816
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
2917
import org.opensearch.index.store.remote.filecache.FileCache;
3018
import org.opensearch.node.Node;
31-
import org.opensearch.repositories.RepositoriesService;
32-
import org.opensearch.repositories.blobstore.BlobStoreRepository;
3319
import org.opensearch.test.OpenSearchIntegTestCase;
20+
import org.junit.After;
21+
import org.junit.Before;
22+
23+
import java.nio.file.Path;
3424

3525
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
3626
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
3727
public class WarmIndexRemoteStoreSegmentReplicationIT extends SegmentReplicationIT {
3828

3929
protected static final String REPOSITORY_NAME = "test-remote-store-repo";
40-
protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2";
41-
42-
protected Path segmentRepoPath;
43-
protected Path translogRepoPath;
44-
protected boolean clusterSettingsSuppliedByTest = false;
30+
protected Path absolutePath;
4531

4632
@Before
4733
private void setup() {
@@ -58,19 +44,13 @@ public Settings indexSettings() {
5844

5945
@Override
6046
protected Settings nodeSettings(int nodeOrdinal) {
61-
if (segmentRepoPath == null || translogRepoPath == null) {
62-
segmentRepoPath = randomRepoPath().toAbsolutePath();
63-
translogRepoPath = randomRepoPath().toAbsolutePath();
64-
}
65-
if (clusterSettingsSuppliedByTest) {
66-
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build();
67-
} else {
68-
return Settings.builder()
69-
.put(super.nodeSettings(nodeOrdinal))
70-
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath))
71-
//.put(RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), -1)
72-
.build();
47+
if (absolutePath == null) {
48+
absolutePath = randomRepoPath().toAbsolutePath();
7349
}
50+
return Settings.builder()
51+
.put(super.nodeSettings(nodeOrdinal))
52+
.put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath))
53+
.build();
7454
}
7555

7656
@Override
@@ -92,62 +72,11 @@ protected boolean warmIndexSegmentReplicationEnabled() {
9272

9373
@After
9474
public void teardown() {
95-
clusterSettingsSuppliedByTest = false;
9675
for (String nodeName : internalCluster().getNodeNames()) {
9776
logger.info("file cache node name is {}", nodeName);
9877
FileCache fileCache = internalCluster().getInstance(Node.class, nodeName).fileCache();
9978
fileCache.clear();
10079
}
101-
assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_NAME);
102-
assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_2_NAME);
10380
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
104-
clusterAdmin().prepareCleanupRepository(REPOSITORY_2_NAME).get();
10581
}
106-
107-
public RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String name) {
108-
Map<String, String> nodeAttributes = node.getAttributes();
109-
String type = nodeAttributes.get(String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name));
110-
111-
String settingsAttributeKeyPrefix = String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, name);
112-
Map<String, String> settingsMap = node.getAttributes()
113-
.keySet()
114-
.stream()
115-
.filter(key -> key.startsWith(settingsAttributeKeyPrefix))
116-
.collect(Collectors.toMap(key -> key.replace(settingsAttributeKeyPrefix, ""), key -> node.getAttributes().get(key)));
117-
118-
Settings.Builder settings = Settings.builder();
119-
settingsMap.entrySet().forEach(entry -> settings.put(entry.getKey(), entry.getValue()));
120-
settings.put(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING.getKey(), true);
121-
122-
return new RepositoryMetadata(name, type, settings.build());
123-
}
124-
125-
public void assertRemoteStoreRepositoryOnAllNodes(String repositoryName) {
126-
RepositoriesMetadata repositories = internalCluster().getInstance(ClusterService.class, internalCluster().getNodeNames()[0])
127-
.state()
128-
.metadata()
129-
.custom(RepositoriesMetadata.TYPE);
130-
RepositoryMetadata actualRepository = repositories.repository(repositoryName);
131-
132-
final RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
133-
final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repositoryName);
134-
135-
for (String nodeName : internalCluster().getNodeNames()) {
136-
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nodeName);
137-
DiscoveryNode node = clusterService.localNode();
138-
RepositoryMetadata expectedRepository = buildRepositoryMetadata(node, repositoryName);
139-
140-
// Validated that all the restricted settings are entact on all the nodes.
141-
repository.getRestrictedSystemRepositorySettings()
142-
.stream()
143-
.forEach(
144-
setting -> assertEquals(
145-
String.format(Locale.ROOT, "Restricted Settings mismatch [%s]", setting.getKey()),
146-
setting.get(actualRepository.settings()),
147-
setting.get(expectedRepository.settings())
148-
)
149-
);
150-
}
151-
}
152-
15382
}

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep
171171
// a lower gen from a newly elected primary shard that is behind this shard's last commit gen.
172172
// In that case we still commit into the next local generation.
173173
if (incomingGeneration != this.lastReceivedPrimaryGen) {
174-
if(engineConfig.getIndexSettings().isStoreLocalityPartial() == false) {
174+
if (engineConfig.getIndexSettings().isStoreLocalityPartial() == false) {
175175
flush(false, true);
176176
}
177177
translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo);

server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,11 @@ public void getSegmentFiles(
117117
final List<String> toDownloadSegmentNames = new ArrayList<>();
118118
for (StoreFileMetadata fileMetadata : filesToFetch) {
119119
String file = fileMetadata.name();
120-
assert directoryFiles.contains(file) == false || indexShard.indexSettings().isStoreLocalityPartial() : "Local store already contains the file " + file;
120+
assert directoryFiles.contains(file) == false || indexShard.indexSettings().isStoreLocalityPartial()
121+
: "Local store already contains the file " + file;
121122
toDownloadSegmentNames.add(file);
122123
}
123-
if(indexShard.indexSettings().isStoreLocalityPartial()) {
124+
if (indexShard.indexSettings().isStoreLocalityPartial()) {
124125
listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
125126
return;
126127
}

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
package org.opensearch.indices.replication;
1010

11-
import java.util.Map;
1211
import org.apache.logging.log4j.message.ParameterizedMessage;
1312
import org.apache.lucene.codecs.CodecUtil;
1413
import org.apache.lucene.index.CorruptIndexException;
@@ -39,6 +38,7 @@
3938
import java.io.UncheckedIOException;
4039
import java.util.List;
4140
import java.util.Locale;
41+
import java.util.Map;
4242
import java.util.Set;
4343
import java.util.stream.Collectors;
4444

@@ -205,7 +205,8 @@ public void startReplication(ActionListener<Void> listener) {
205205
}, listener::onFailure);
206206
}
207207

208-
private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo, Map<String, StoreFileMetadata> finalReplicaMd) throws IOException {
208+
private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo, Map<String, StoreFileMetadata> finalReplicaMd)
209+
throws IOException {
209210
cancellableThreads.checkForCancel();
210211
state.setStage(SegmentReplicationState.Stage.FILE_DIFF);
211212
final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), finalReplicaMd);
@@ -223,9 +224,7 @@ private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo,
223224
.map(StoreFileMetadata::name)
224225
.collect(Collectors.toSet());
225226

226-
missingFiles = diff.missing.stream()
227-
.filter(md -> reuseFiles.contains(md.name()) == false)
228-
.collect(Collectors.toList());
227+
missingFiles = diff.missing.stream().filter(md -> reuseFiles.contains(md.name()) == false).collect(Collectors.toList());
229228

230229
logger.trace(
231230
() -> new ParameterizedMessage(

0 commit comments

Comments
 (0)