Skip to content

Commit 4ccbf9d

Browse files
mch2dreamer-89
andauthored
Fix bug where ReplicationListeners would not complete on cancellation. (opensearch-project#8478)
* [Segment Replication] Fix bug where ReplicationListeners would not complete on target cancellation. This change updates cancellation with Segment Replication to ensure all listeners are resolved. It does this by requesting cancellation before shard closure instead of using ReplicationCollection's cancelForShard which immediately removes it from the replicationCollection. This would cause the underlying ReplicationListener to never get invoked on close. This change includes new tests using suite scope to catch for any open tasks. This caught other locations where this was possible: 1. On a replica during force sync if the shard was closed while resolving its listeners, it would never call back to the primary if an exception was caught in the onDone method. - Fixed by refactoring those paths to use a ChannelActionListener and always reply to primary. 2. On the primary during forceSync, the primary would not successfully cancel before shard close during a forceSync, Fixed by wrapping the synchronous recoveryTarget::forceSync call in cancellableThreads. Signed-off-by: Marc Handalian <handalm@amazon.com> PR cleanup. Signed-off-by: Marc Handalian <handalm@amazon.com> Update log message Signed-off-by: Marc Handalian <handalm@amazon.com> * PR feedback. Signed-off-by: Marc Handalian <handalm@amazon.com> * Update server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java Co-authored-by: Suraj Singh <surajrider@gmail.com> Signed-off-by: Marc Handalian <handalm@amazon.com> * Add more tests. Signed-off-by: Marc Handalian <handalm@amazon.com> --------- Signed-off-by: Marc Handalian <handalm@amazon.com> Co-authored-by: Suraj Singh <surajrider@gmail.com>
1 parent 542041f commit 4ccbf9d

12 files changed

+623
-255
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java

+26-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import org.opensearch.index.IndexService;
2525
import org.opensearch.index.SegmentReplicationPerGroupStats;
2626
import org.opensearch.index.SegmentReplicationShardStats;
27+
import org.opensearch.index.engine.Engine;
2728
import org.opensearch.index.shard.IndexShard;
29+
import org.opensearch.index.shard.ShardId;
2830
import org.opensearch.index.store.Store;
2931
import org.opensearch.index.store.StoreFileMetadata;
3032
import org.opensearch.indices.IndicesService;
@@ -158,13 +160,16 @@ protected void verifyStoreContent() throws Exception {
158160
final String indexName = primaryRouting.getIndexName();
159161
final List<ShardRouting> replicaRouting = shardRoutingTable.replicaShards();
160162
final IndexShard primaryShard = getIndexShard(clusterState, primaryRouting, indexName);
163+
final int primaryDocCount = getDocCountFromShard(primaryShard);
161164
final Map<String, StoreFileMetadata> primarySegmentMetadata = primaryShard.getSegmentMetadataMap();
162165
for (ShardRouting replica : replicaRouting) {
163166
IndexShard replicaShard = getIndexShard(clusterState, replica, indexName);
164167
final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff(
165168
primarySegmentMetadata,
166169
replicaShard.getSegmentMetadataMap()
167170
);
171+
final int replicaDocCount = getDocCountFromShard(replicaShard);
172+
assertEquals("Doc counts should match", primaryDocCount, replicaDocCount);
168173
if (recoveryDiff.missing.isEmpty() == false || recoveryDiff.different.isEmpty() == false) {
169174
fail(
170175
"Expected no missing or different segments between primary and replica but diff was missing: "
@@ -185,10 +190,30 @@ protected void verifyStoreContent() throws Exception {
185190
}, 1, TimeUnit.MINUTES);
186191
}
187192

193+
private int getDocCountFromShard(IndexShard shard) {
194+
try (final Engine.Searcher searcher = shard.acquireSearcher("test")) {
195+
return searcher.getDirectoryReader().numDocs();
196+
}
197+
}
198+
188199
private IndexShard getIndexShard(ClusterState state, ShardRouting routing, String indexName) {
189-
return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), indexName);
200+
return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), routing.shardId(), indexName);
201+
}
202+
203+
/**
204+
* Fetch IndexShard by shardId, multiple shards per node allowed.
205+
*/
206+
protected IndexShard getIndexShard(String node, ShardId shardId, String indexName) {
207+
final Index index = resolveIndex(indexName);
208+
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
209+
IndexService indexService = indicesService.indexServiceSafe(index);
210+
final Optional<Integer> id = indexService.shardIds().stream().filter(sid -> sid == shardId.id()).findFirst();
211+
return indexService.getShard(id.get());
190212
}
191213

214+
/**
215+
* Fetch IndexShard, assumes only a single shard per node.
216+
*/
192217
protected IndexShard getIndexShard(String node, String indexName) {
193218
final Index index = resolveIndex(indexName);
194219
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices.replication;
10+
11+
import org.junit.Before;
12+
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
13+
import org.opensearch.cluster.metadata.IndexMetadata;
14+
import org.opensearch.common.settings.Settings;
15+
import org.opensearch.indices.replication.common.ReplicationType;
16+
import org.opensearch.test.OpenSearchIntegTestCase;
17+
18+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
19+
public class SegmentReplicationSuiteIT extends SegmentReplicationBaseIT {
20+
21+
@Before
22+
public void setup() {
23+
internalCluster().startClusterManagerOnlyNode();
24+
createIndex(INDEX_NAME);
25+
}
26+
27+
@Override
28+
public Settings indexSettings() {
29+
final Settings.Builder builder = Settings.builder()
30+
.put(super.indexSettings())
31+
// reset shard & replica count to random values set by OpenSearchIntegTestCase.
32+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards())
33+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas())
34+
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);
35+
36+
// TODO: Randomly enable remote store on these tests.
37+
return builder.build();
38+
}
39+
40+
public void testBasicReplication() throws Exception {
41+
final int docCount = scaledRandomIntBetween(10, 200);
42+
for (int i = 0; i < docCount; i++) {
43+
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
44+
}
45+
refresh();
46+
ensureGreen(INDEX_NAME);
47+
verifyStoreContent();
48+
}
49+
50+
public void testDropRandomNodeDuringReplication() throws Exception {
51+
internalCluster().ensureAtLeastNumDataNodes(2);
52+
internalCluster().startClusterManagerOnlyNodes(1);
53+
54+
final int docCount = scaledRandomIntBetween(10, 200);
55+
for (int i = 0; i < docCount; i++) {
56+
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
57+
}
58+
refresh();
59+
60+
internalCluster().restartRandomDataNode();
61+
62+
ensureYellow(INDEX_NAME);
63+
client().prepareIndex(INDEX_NAME).setId(Integer.toString(docCount)).setSource("field", "value" + docCount).execute().get();
64+
internalCluster().startDataOnlyNode();
65+
client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).actionGet();
66+
}
67+
68+
public void testDeleteIndexWhileReplicating() throws Exception {
69+
internalCluster().startClusterManagerOnlyNode();
70+
final int docCount = scaledRandomIntBetween(10, 200);
71+
for (int i = 0; i < docCount; i++) {
72+
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
73+
}
74+
refresh(INDEX_NAME);
75+
client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).actionGet();
76+
}
77+
78+
public void testFullRestartDuringReplication() throws Exception {
79+
internalCluster().startNode();
80+
final int docCount = scaledRandomIntBetween(10, 200);
81+
for (int i = 0; i < docCount; i++) {
82+
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
83+
}
84+
refresh(INDEX_NAME);
85+
internalCluster().fullRestart();
86+
ensureGreen(INDEX_NAME);
87+
}
88+
}

server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -835,7 +835,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis
835835
} else {
836836
// Force round of segment replication to update its checkpoint to primary's
837837
if (shard.indexSettings().isSegRepEnabled()) {
838-
recoveryTarget.forceSegmentFileSync();
838+
cancellableThreads.execute(recoveryTarget::forceSegmentFileSync);
839839
}
840840
}
841841
stopWatch.stop();

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java

+1-10
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,7 @@ public enum Stage {
4545
GET_CHECKPOINT_INFO((byte) 3),
4646
FILE_DIFF((byte) 4),
4747
GET_FILES((byte) 5),
48-
FINALIZE_REPLICATION((byte) 6),
49-
CANCELLED((byte) 7);
48+
FINALIZE_REPLICATION((byte) 6);
5049

5150
private static final Stage[] STAGES = new Stage[Stage.values().length];
5251

@@ -245,14 +244,6 @@ public void setStage(Stage stage) {
245244
overallTimer.stop();
246245
timingData.put("OVERALL", overallTimer.time());
247246
break;
248-
case CANCELLED:
249-
if (this.stage == Stage.DONE) {
250-
throw new IllegalStateException("can't move replication to Cancelled state from Done.");
251-
}
252-
this.stage = Stage.CANCELLED;
253-
overallTimer.stop();
254-
timingData.put("OVERALL", overallTimer.time());
255-
break;
256247
default:
257248
throw new IllegalArgumentException("unknown SegmentReplicationState.Stage [" + stage + "]");
258249
}

0 commit comments

Comments
 (0)