Skip to content

Commit c987d6b

Browse files
authored
Revert "Update checkpoint from remote nodes replicas (opensearch-project#13888)" (opensearch-project#14056)
This reverts commit c89a17c. Signed-off-by: Chenyang Ji <cyji@amazon.com>
1 parent 1852b7e commit c987d6b

File tree

6 files changed

+88
-41
lines changed

6 files changed

+88
-41
lines changed

server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -186,11 +186,10 @@ private Thread getIndexingThread() {
186186
indexSingleDoc(indexName);
187187
long currentDocCount = indexedDocs.incrementAndGet();
188188
if (currentDocCount > 0 && currentDocCount % refreshFrequency == 0) {
189+
logger.info("--> [iteration {}] flushing index", currentDocCount);
189190
if (rarely()) {
190-
logger.info("--> [iteration {}] flushing index", currentDocCount);
191191
client().admin().indices().prepareFlush(indexName).get();
192192
} else {
193-
logger.info("--> [iteration {}] refreshing index", currentDocCount);
194193
client().admin().indices().prepareRefresh(indexName).get();
195194
}
196195
}

server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java

+29-11
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,14 @@
88

99
package org.opensearch.remotemigration;
1010

11+
import org.opensearch.action.DocWriteResponse;
1112
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
1213
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
1314
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
1415
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
1516
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
17+
import org.opensearch.action.delete.DeleteResponse;
18+
import org.opensearch.action.index.IndexResponse;
1619
import org.opensearch.client.Client;
1720
import org.opensearch.client.Requests;
1821
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
@@ -63,8 +66,8 @@ public void testRemotePrimaryRelocation() throws Exception {
6366

6467
AtomicInteger numAutoGenDocs = new AtomicInteger();
6568
final AtomicBoolean finished = new AtomicBoolean(false);
66-
AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test");
67-
asyncIndexingService.startIndexing();
69+
Thread indexingThread = getIndexingThread(finished, numAutoGenDocs);
70+
6871
refresh("test");
6972

7073
// add remote node in mixed mode cluster
@@ -138,19 +141,17 @@ public void testRemotePrimaryRelocation() throws Exception {
138141
logger.info("--> relocation from remote to remote complete");
139142

140143
finished.set(true);
141-
asyncIndexingService.stopIndexing();
144+
indexingThread.join();
142145
refresh("test");
143-
OpenSearchAssertions.assertHitCount(
144-
client().prepareSearch("test").setTrackTotalHits(true).get(),
145-
asyncIndexingService.getIndexedDocs()
146-
);
146+
OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get());
147147
OpenSearchAssertions.assertHitCount(
148148
client().prepareSearch("test")
149149
.setTrackTotalHits(true)// extra paranoia ;)
150150
.setQuery(QueryBuilders.termQuery("auto", true))
151151
.get(),
152-
asyncIndexingService.getIndexedDocs()
152+
numAutoGenDocs.get()
153153
);
154+
154155
}
155156

156157
public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
@@ -164,8 +165,9 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
164165
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
165166
ensureGreen("test");
166167

167-
AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test");
168-
asyncIndexingService.startIndexing();
168+
AtomicInteger numAutoGenDocs = new AtomicInteger();
169+
final AtomicBoolean finished = new AtomicBoolean(false);
170+
Thread indexingThread = getIndexingThread(finished, numAutoGenDocs);
169171

170172
refresh("test");
171173

@@ -207,11 +209,27 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
207209
assertEquals(actionGet.getRelocatingShards(), 0);
208210
assertEquals(docRepNode, primaryNodeName("test"));
209211

210-
asyncIndexingService.stopIndexing();
212+
finished.set(true);
213+
indexingThread.join();
211214
client().admin()
212215
.cluster()
213216
.prepareUpdateSettings()
214217
.setTransientSettings(Settings.builder().put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), (String) null))
215218
.get();
216219
}
220+
221+
private static Thread getIndexingThread(AtomicBoolean finished, AtomicInteger numAutoGenDocs) {
222+
Thread indexingThread = new Thread(() -> {
223+
while (finished.get() == false && numAutoGenDocs.get() < 10_000) {
224+
IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get();
225+
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
226+
DeleteResponse deleteResponse = client().prepareDelete("test", "id").get();
227+
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
228+
client().prepareIndex("test").setSource("auto", true).get();
229+
numAutoGenDocs.incrementAndGet();
230+
}
231+
});
232+
indexingThread.start();
233+
return indexingThread;
234+
}
217235
}

server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java

+54-25
Original file line numberDiff line numberDiff line change
@@ -8,27 +8,32 @@
88

99
package org.opensearch.remotemigration;
1010

11+
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
12+
13+
import org.opensearch.action.DocWriteResponse;
1114
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
1215
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
13-
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
1416
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
17+
import org.opensearch.action.delete.DeleteResponse;
18+
import org.opensearch.action.index.IndexResponse;
1519
import org.opensearch.cluster.metadata.IndexMetadata;
1620
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
1721
import org.opensearch.common.Priority;
1822
import org.opensearch.common.settings.Settings;
1923
import org.opensearch.common.unit.TimeValue;
20-
import org.opensearch.index.SegmentReplicationPerGroupStats;
2124
import org.opensearch.index.query.QueryBuilders;
2225
import org.opensearch.test.OpenSearchIntegTestCase;
2326
import org.opensearch.test.hamcrest.OpenSearchAssertions;
2427

25-
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicBoolean;
29+
import java.util.concurrent.atomic.AtomicInteger;
2630

2731
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
2832
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
2933
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
3034

3135
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)
36+
3237
public class RemoteReplicaRecoveryIT extends MigrationBaseTestCase {
3338

3439
protected int maximumNumberOfShards() {
@@ -58,8 +63,10 @@ public void testReplicaRecovery() throws Exception {
5863
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
5964
String replicaNode = internalCluster().startNode();
6065
ensureGreen("test");
61-
AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test");
62-
asyncIndexingService.startIndexing();
66+
67+
AtomicInteger numAutoGenDocs = new AtomicInteger();
68+
final AtomicBoolean finished = new AtomicBoolean(false);
69+
Thread indexingThread = getThread(finished, numAutoGenDocs);
6370

6471
refresh("test");
6572

@@ -71,10 +78,12 @@ public void testReplicaRecovery() throws Exception {
7178
updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store"));
7279
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
7380

74-
internalCluster().startNode();
81+
String remoteNode2 = internalCluster().startNode();
7582
internalCluster().validateClusterFormed();
7683

7784
// identify the primary
85+
86+
Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000));
7887
logger.info("--> relocating primary from {} to {} ", primaryNode, remoteNode);
7988
client().admin()
8089
.cluster()
@@ -93,6 +102,7 @@ public void testReplicaRecovery() throws Exception {
93102

94103
assertEquals(0, clusterHealthResponse.getRelocatingShards());
95104
logger.info("--> relocation of primary from docrep to remote complete");
105+
Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000));
96106

97107
logger.info("--> getting up the new replicas now to doc rep node as well as remote node ");
98108
// Increase replica count to 3
@@ -119,33 +129,52 @@ public void testReplicaRecovery() throws Exception {
119129
logger.info("--> replica is up now on another docrep now as well as remote node");
120130

121131
assertEquals(0, clusterHealthResponse.getRelocatingShards());
122-
asyncIndexingService.stopIndexing();
123-
refresh("test");
124132

125-
// segrep lag should be zero
126-
assertBusy(() -> {
127-
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
128-
.indices()
129-
.prepareSegmentReplicationStats("test")
130-
.setDetailed(true)
131-
.execute()
132-
.actionGet();
133-
SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get("test").get(0);
134-
assertEquals(segmentReplicationStatsResponse.getReplicationStats().size(), 1);
135-
perGroupStats.getReplicaStats().stream().forEach(e -> assertEquals(e.getCurrentReplicationLagMillis(), 0));
136-
}, 20, TimeUnit.SECONDS);
133+
Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000));
137134

138-
OpenSearchAssertions.assertHitCount(
139-
client().prepareSearch("test").setTrackTotalHits(true).get(),
140-
asyncIndexingService.getIndexedDocs()
141-
);
135+
// Stop replicas on docrep now.
136+
// ToDo : Remove once we have dual replication enabled
137+
client().admin()
138+
.indices()
139+
.updateSettings(
140+
new UpdateSettingsRequest("test").settings(
141+
Settings.builder()
142+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
143+
.put("index.routing.allocation.exclude._name", primaryNode + "," + replicaNode)
144+
.build()
145+
)
146+
)
147+
.get();
148+
149+
finished.set(true);
150+
indexingThread.join();
151+
refresh("test");
152+
OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get());
142153
OpenSearchAssertions.assertHitCount(
143154
client().prepareSearch("test")
144155
.setTrackTotalHits(true)// extra paranoia ;)
145156
.setQuery(QueryBuilders.termQuery("auto", true))
157+
// .setPreference("_prefer_nodes:" + (remoteNode+ "," + remoteNode2))
146158
.get(),
147-
asyncIndexingService.getIndexedDocs()
159+
numAutoGenDocs.get()
148160
);
149161

150162
}
163+
164+
private Thread getThread(AtomicBoolean finished, AtomicInteger numAutoGenDocs) {
165+
Thread indexingThread = new Thread(() -> {
166+
while (finished.get() == false && numAutoGenDocs.get() < 100) {
167+
IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get();
168+
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
169+
DeleteResponse deleteResponse = client().prepareDelete("test", "id").get();
170+
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
171+
client().prepareIndex("test").setSource("auto", true).get();
172+
numAutoGenDocs.incrementAndGet();
173+
logger.info("Indexed {} docs here", numAutoGenDocs.get());
174+
}
175+
});
176+
indexingThread.start();
177+
return indexingThread;
178+
}
179+
151180
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

+1
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,7 @@ public boolean shouldSeedRemoteStore() {
523523
public Function<String, Boolean> isShardOnRemoteEnabledNode = nodeId -> {
524524
DiscoveryNode node = discoveryNodes.get(nodeId);
525525
if (node != null) {
526+
logger.trace("Node {} has remote_enabled as {}", nodeId, node.isRemoteStoreNode());
526527
return node.isRemoteStoreNode();
527528
}
528529
return false;

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ private void logReplicationFailure(SegmentReplicationState state, ReplicationFai
384384
protected void updateVisibleCheckpoint(long replicationId, IndexShard replicaShard) {
385385
// Update replication checkpoint on source via transport call only supported for remote store integration. For node-
386386
// node communication, checkpoint update is piggy-backed to GET_SEGMENT_FILES transport call
387-
if (replicaShard.indexSettings().isAssignedOnRemoteNode() == false) {
387+
if (replicaShard.indexSettings().isRemoteStoreEnabled() == false) {
388388
return;
389389
}
390390
ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(replicaShard.shardId()).primaryShard();

server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIn
9191
// make sure the store is not released until we are done.
9292
this.cancellableThreads = new CancellableThreads();
9393
store.incRef();
94-
if (indexShard.indexSettings().isAssignedOnRemoteNode()) {
94+
if (indexShard.indexSettings().isRemoteStoreEnabled()) {
9595
indexShard.remoteStore().incRef();
9696
}
9797
}
@@ -284,7 +284,7 @@ protected void closeInternal() {
284284
try {
285285
store.decRef();
286286
} finally {
287-
if (indexShard.indexSettings().isAssignedOnRemoteNode()) {
287+
if (indexShard.indexSettings().isRemoteStoreEnabled()) {
288288
indexShard.remoteStore().decRef();
289289
}
290290
}

0 commit comments

Comments
 (0)