Skip to content

Commit 30c4210

Browse files
[Segment Replication] Add Segment Replication Specific Integration Tests (#11773)
* Run few tests with Segment Replication enabled. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * Update reason for ignoring test. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * remove @ignore to resolve :server:forbiddenApisInternalClusterTest check. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * fix spotlessCheck. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * add conditional logic of force refresh. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * Address comments on PR. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * fix failing errors. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * Use parameterization for running segment replication tests. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * Fix failing tests. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * Fix failing test. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * add new waitForReplication() and refactor. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * Address comments on PR and revert back changes made to SegmentReplication Tests. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * revert changes made to Segrep tests. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * Refactor and address comments. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * fix failure of using forbidden api new Random(). Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * Add comments to debug. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * Remove non-critical tests from running with segrep. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * Fix test to run with segrep. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * separate out refresh and waitForReplication into different methods. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * refactor with usage of waitForReplication(). Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * fix parameters passed in factory for IndexStatsIT. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * Update IndexstatsIT to run with segrep Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> --------- Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
1 parent 7b99274 commit 30c4210

File tree

5 files changed

+150
-32
lines changed

5 files changed

+150
-32
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java

-12
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.opensearch.common.lease.Releasable;
2020
import org.opensearch.common.settings.Settings;
2121
import org.opensearch.core.index.Index;
22-
import org.opensearch.core.index.shard.ShardId;
2322
import org.opensearch.index.IndexModule;
2423
import org.opensearch.index.IndexService;
2524
import org.opensearch.index.SegmentReplicationShardStats;
@@ -175,17 +174,6 @@ private IndexShard getIndexShard(ClusterState state, ShardRouting routing, Strin
175174
return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), routing.shardId(), indexName);
176175
}
177176

178-
/**
179-
* Fetch IndexShard by shardId, multiple shards per node allowed.
180-
*/
181-
protected IndexShard getIndexShard(String node, ShardId shardId, String indexName) {
182-
final Index index = resolveIndex(indexName);
183-
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
184-
IndexService indexService = indicesService.indexServiceSafe(index);
185-
final Optional<Integer> id = indexService.shardIds().stream().filter(sid -> sid == shardId.id()).findFirst();
186-
return indexService.getShard(id.get());
187-
}
188-
189177
/**
190178
* Fetch IndexShard, assumes only a single shard per node.
191179
*/

server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java

+17-7
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@
107107

108108
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
109109
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
110+
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
110111
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
111112
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
112113
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAllSuccessful;
@@ -130,7 +131,8 @@ public IndexStatsIT(Settings settings) {
130131
public static Collection<Object[]> parameters() {
131132
return Arrays.asList(
132133
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
133-
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
134+
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() },
135+
new Object[] { Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build() }
134136
);
135137
}
136138

@@ -175,7 +177,7 @@ public void testFieldDataStats() throws InterruptedException {
175177
ensureGreen();
176178
client().prepareIndex("test").setId("1").setSource("field", "value1", "field2", "value1").execute().actionGet();
177179
client().prepareIndex("test").setId("2").setSource("field", "value2", "field2", "value2").execute().actionGet();
178-
client().admin().indices().prepareRefresh().execute().actionGet();
180+
refreshAndWaitForReplication();
179181
indexRandomForConcurrentSearch("test");
180182

181183
NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet();
@@ -299,7 +301,7 @@ public void testClearAllCaches() throws Exception {
299301
client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
300302
client().prepareIndex("test").setId("1").setSource("field", "value1").execute().actionGet();
301303
client().prepareIndex("test").setId("2").setSource("field", "value2").execute().actionGet();
302-
client().admin().indices().prepareRefresh().execute().actionGet();
304+
refreshAndWaitForReplication();
303305
indexRandomForConcurrentSearch("test");
304306

305307
NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet();
@@ -667,7 +669,7 @@ public void testSimpleStats() throws Exception {
667669
client().prepareIndex("test1").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet();
668670
client().prepareIndex("test1").setId(Integer.toString(2)).setSource("field", "value").execute().actionGet();
669671
client().prepareIndex("test2").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet();
670-
refresh();
672+
refreshAndWaitForReplication();
671673

672674
NumShards test1 = getNumShards("test1");
673675
long test1ExpectedWrites = 2 * test1.dataCopies;
@@ -682,7 +684,13 @@ public void testSimpleStats() throws Exception {
682684
assertThat(stats.getPrimaries().getIndexing().getTotal().getIndexFailedCount(), equalTo(0L));
683685
assertThat(stats.getPrimaries().getIndexing().getTotal().isThrottled(), equalTo(false));
684686
assertThat(stats.getPrimaries().getIndexing().getTotal().getThrottleTime().millis(), equalTo(0L));
685-
assertThat(stats.getTotal().getIndexing().getTotal().getIndexCount(), equalTo(totalExpectedWrites));
687+
688+
// This assert should not be done on segrep enabled indices because we are asserting Indexing/Write operations count on
689+
// all primary and replica shards. But in case of segrep, Indexing/Write operation don't happen on replica shards. So we can
690+
// ignore this assert check for segrep enabled indices.
691+
if (isSegmentReplicationEnabledForIndex("test1") == false && isSegmentReplicationEnabledForIndex("test2") == false) {
692+
assertThat(stats.getTotal().getIndexing().getTotal().getIndexCount(), equalTo(totalExpectedWrites));
693+
}
686694
assertThat(stats.getTotal().getStore(), notNullValue());
687695
assertThat(stats.getTotal().getMerge(), notNullValue());
688696
assertThat(stats.getTotal().getFlush(), notNullValue());
@@ -825,6 +833,7 @@ public void testMergeStats() {
825833
client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet();
826834
stats = client().admin().indices().prepareStats().setMerge(true).execute().actionGet();
827835

836+
refreshAndWaitForReplication();
828837
assertThat(stats.getTotal().getMerge(), notNullValue());
829838
assertThat(stats.getTotal().getMerge().getTotal(), greaterThan(0L));
830839
}
@@ -851,7 +860,7 @@ public void testSegmentsStats() {
851860

852861
client().admin().indices().prepareFlush().get();
853862
client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet();
854-
client().admin().indices().prepareRefresh().get();
863+
refreshAndWaitForReplication();
855864
stats = client().admin().indices().prepareStats().setSegments(true).get();
856865

857866
assertThat(stats.getTotal().getSegments(), notNullValue());
@@ -869,7 +878,7 @@ public void testAllFlags() throws Exception {
869878
client().prepareIndex("test_index").setId(Integer.toString(2)).setSource("field", "value").execute().actionGet();
870879
client().prepareIndex("test_index_2").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet();
871880

872-
client().admin().indices().prepareRefresh().execute().actionGet();
881+
refreshAndWaitForReplication();
873882
IndicesStatsRequestBuilder builder = client().admin().indices().prepareStats();
874883
Flag[] values = CommonStatsFlags.Flag.values();
875884
for (Flag flag : values) {
@@ -1453,6 +1462,7 @@ public void testZeroRemoteStoreStatsOnNonRemoteStoreIndex() {
14531462
.get()
14541463
.status()
14551464
);
1465+
refreshAndWaitForReplication();
14561466
ShardStats shard = client().admin().indices().prepareStats(indexName).setSegments(true).setTranslog(true).get().getShards()[0];
14571467
RemoteSegmentStats remoteSegmentStatsFromIndexStats = shard.getStats().getSegments().getRemoteSegmentStats();
14581468
assertZeroRemoteSegmentStats(remoteSegmentStatsFromIndexStats);

server/src/internalClusterTest/java/org/opensearch/recovery/RecoveryWhileUnderLoadIT.java

+25-7
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232

3333
package org.opensearch.recovery;
3434

35+
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
36+
3537
import org.apache.logging.log4j.LogManager;
3638
import org.apache.logging.log4j.Logger;
3739
import org.opensearch.action.admin.indices.refresh.RefreshResponse;
@@ -52,10 +54,11 @@
5254
import org.opensearch.index.IndexSettings;
5355
import org.opensearch.index.shard.DocsStats;
5456
import org.opensearch.index.translog.Translog;
57+
import org.opensearch.indices.replication.common.ReplicationType;
5558
import org.opensearch.plugins.Plugin;
5659
import org.opensearch.search.sort.SortOrder;
5760
import org.opensearch.test.BackgroundIndexer;
58-
import org.opensearch.test.OpenSearchIntegTestCase;
61+
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
5962

6063
import java.util.Arrays;
6164
import java.util.Collection;
@@ -69,12 +72,26 @@
6972
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
7073
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
7174
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
75+
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
7276
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
7377
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAllSuccessful;
7478
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
7579
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoTimeout;
7680

77-
public class RecoveryWhileUnderLoadIT extends OpenSearchIntegTestCase {
81+
public class RecoveryWhileUnderLoadIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {
82+
83+
public RecoveryWhileUnderLoadIT(Settings settings) {
84+
super(settings);
85+
}
86+
87+
@ParametersFactory
88+
public static Collection<Object[]> parameters() {
89+
return Arrays.asList(
90+
new Object[] { Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT).build() },
91+
new Object[] { Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build() }
92+
);
93+
}
94+
7895
private final Logger logger = LogManager.getLogger(RecoveryWhileUnderLoadIT.class);
7996

8097
public static final class RetentionLeaseSyncIntervalSettingPlugin extends Plugin {
@@ -150,7 +167,7 @@ public void testRecoverWhileUnderLoadAllocateReplicasTest() throws Exception {
150167
logger.info("--> indexing threads stopped");
151168

152169
logger.info("--> refreshing the index");
153-
refreshAndAssert();
170+
assertAfterRefreshAndWaitForReplication();
154171
logger.info("--> verifying indexed content");
155172
iterateAssertCount(numberOfShards, 10, indexer.getIds());
156173
}
@@ -211,7 +228,7 @@ public void testRecoverWhileUnderLoadAllocateReplicasRelocatePrimariesTest() thr
211228
logger.info("--> indexing threads stopped");
212229

213230
logger.info("--> refreshing the index");
214-
refreshAndAssert();
231+
assertAfterRefreshAndWaitForReplication();
215232
logger.info("--> verifying indexed content");
216233
iterateAssertCount(numberOfShards, 10, indexer.getIds());
217234
}
@@ -325,7 +342,7 @@ public void testRecoverWhileUnderLoadWithReducedAllowedNodes() throws Exception
325342
);
326343

327344
logger.info("--> refreshing the index");
328-
refreshAndAssert();
345+
assertAfterRefreshAndWaitForReplication();
329346
logger.info("--> verifying indexed content");
330347
iterateAssertCount(numberOfShards, 10, indexer.getIds());
331348
}
@@ -375,7 +392,7 @@ public void testRecoverWhileRelocating() throws Exception {
375392
ensureGreen(TimeValue.timeValueMinutes(5));
376393

377394
logger.info("--> refreshing the index");
378-
refreshAndAssert();
395+
assertAfterRefreshAndWaitForReplication();
379396
logger.info("--> verifying indexed content");
380397
iterateAssertCount(numShards, 10, indexer.getIds());
381398
}
@@ -474,10 +491,11 @@ private void logSearchResponse(int numberOfShards, long numberOfDocs, int iterat
474491
);
475492
}
476493

477-
private void refreshAndAssert() throws Exception {
494+
private void assertAfterRefreshAndWaitForReplication() throws Exception {
478495
assertBusy(() -> {
479496
RefreshResponse actionGet = client().admin().indices().prepareRefresh().get();
480497
assertAllSuccessful(actionGet);
481498
}, 5, TimeUnit.MINUTES);
499+
waitForReplication();
482500
}
483501
}

0 commit comments

Comments
 (0)