Skip to content

Commit 91ee47d

Browse files
committed
[Remote Migration] Changes for Primary Relocation during migration (opensearch-project#12494)
Changes for Primary Relocation during migration --------- Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
1 parent 41d11e1 commit 91ee47d

File tree

47 files changed

+746
-184
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+746
-184
lines changed

server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -690,7 +690,8 @@ public static final IndexShard newIndexShard(
690690
null,
691691
() -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL,
692692
nodeId,
693-
null
693+
null,
694+
false
694695
);
695696
}
696697

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationClusterSettingIT.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,8 @@ public void testIndexReplicationSettingOverridesSegRepClusterSetting() throws Ex
8989
Index index = resolveIndex(INDEX_NAME);
9090
Index anotherIndex = resolveIndex(ANOTHER_INDEX);
9191
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNode);
92-
assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), false);
93-
assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabled(), true);
92+
assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabledOrRemoteNode(), false);
93+
assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabledOrRemoteNode(), true);
9494
}
9595

9696
public void testIndexReplicationSettingOverridesDocRepClusterSetting() throws Exception {
@@ -119,8 +119,8 @@ public void testIndexReplicationSettingOverridesDocRepClusterSetting() throws Ex
119119
Index index = resolveIndex(INDEX_NAME);
120120
Index anotherIndex = resolveIndex(ANOTHER_INDEX);
121121
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNode);
122-
assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), true);
123-
assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabled(), false);
122+
assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabledOrRemoteNode(), true);
123+
assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabledOrRemoteNode(), false);
124124
}
125125

126126
public void testReplicationTypesOverrideNotAllowed_IndexAPI() {

server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java

+20-3
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,19 @@
88

99
package org.opensearch.remotemigration;
1010

11+
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
12+
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
13+
import org.opensearch.cluster.metadata.RepositoryMetadata;
1114
import org.opensearch.common.settings.Settings;
1215
import org.opensearch.common.util.FeatureFlags;
16+
import org.opensearch.repositories.fs.ReloadableFsRepository;
1317
import org.opensearch.test.OpenSearchIntegTestCase;
1418

1519
import java.nio.file.Path;
20+
import java.util.concurrent.ExecutionException;
1621

17-
import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
22+
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;
23+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
1824

1925
public class MigrationBaseTestCase extends OpenSearchIntegTestCase {
2026
protected static final String REPOSITORY_NAME = "test-remote-store-repo";
@@ -35,16 +41,27 @@ protected Settings nodeSettings(int nodeOrdinal) {
3541
return Settings.builder()
3642
.put(super.nodeSettings(nodeOrdinal))
3743
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath))
38-
.put("discovery.initial_state_timeout", "500ms")
3944
.build();
4045
} else {
4146
logger.info("Adding docrep node");
42-
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put("discovery.initial_state_timeout", "500ms").build();
47+
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build();
4348
}
4449
}
4550

4651
@Override
4752
protected Settings featureFlagSettings() {
4853
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build();
4954
}
55+
56+
protected void setFailRate(String repoName, int value) throws ExecutionException, InterruptedException {
57+
GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName });
58+
GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get();
59+
RepositoryMetadata rmd = res.repositories().get(0);
60+
Settings.Builder settings = Settings.builder()
61+
.put("location", rmd.settings().get("location"))
62+
.put(REPOSITORIES_FAILRATE_SETTING.getKey(), value);
63+
assertAcked(
64+
client().admin().cluster().preparePutRepository(repoName).setType(ReloadableFsRepository.TYPE).setSettings(settings).get()
65+
);
66+
}
5067
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.remotemigration;
10+
11+
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
12+
13+
import org.opensearch.action.DocWriteResponse;
14+
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
15+
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
16+
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
17+
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
18+
import org.opensearch.action.delete.DeleteResponse;
19+
import org.opensearch.action.index.IndexResponse;
20+
import org.opensearch.client.Client;
21+
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
22+
import org.opensearch.common.Priority;
23+
import org.opensearch.common.settings.Settings;
24+
import org.opensearch.common.unit.TimeValue;
25+
import org.opensearch.index.query.QueryBuilders;
26+
import org.opensearch.plugins.Plugin;
27+
import org.opensearch.test.OpenSearchIntegTestCase;
28+
import org.opensearch.test.hamcrest.OpenSearchAssertions;
29+
import org.opensearch.test.transport.MockTransportService;
30+
31+
import java.util.Collection;
32+
import java.util.concurrent.atomic.AtomicBoolean;
33+
import java.util.concurrent.atomic.AtomicInteger;
34+
35+
import static java.util.Arrays.asList;
36+
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
37+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
38+
39+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
40+
public class RemotePrimaryRelocationIT extends MigrationBaseTestCase {
41+
protected int maximumNumberOfShards() {
42+
return 1;
43+
}
44+
45+
// ToDo : Fix me when we support migration of replicas
46+
protected int maximumNumberOfReplicas() {
47+
return 0;
48+
}
49+
50+
protected Collection<Class<? extends Plugin>> nodePlugins() {
51+
return asList(MockTransportService.TestPlugin.class);
52+
}
53+
54+
public void testMixedModeRelocation() throws Exception {
55+
String docRepNode = internalCluster().startNode();
56+
Client client = internalCluster().client(docRepNode);
57+
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
58+
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
59+
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
60+
61+
// create shard with 0 replica and 1 shard
62+
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
63+
ensureGreen("test");
64+
65+
AtomicInteger numAutoGenDocs = new AtomicInteger();
66+
final AtomicBoolean finished = new AtomicBoolean(false);
67+
Thread indexingThread = getIndexingThread(finished, numAutoGenDocs);
68+
69+
refresh("test");
70+
71+
// add remote node in mixed mode cluster
72+
addRemote = true;
73+
String remoteNode = internalCluster().startNode();
74+
internalCluster().validateClusterFormed();
75+
76+
String remoteNode2 = internalCluster().startNode();
77+
internalCluster().validateClusterFormed();
78+
79+
// assert repo gets registered
80+
GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { REPOSITORY_NAME });
81+
GetRepositoriesResponse getRepositoriesResponse = client.admin().cluster().getRepositories(gr).actionGet();
82+
assertEquals(1, getRepositoriesResponse.repositories().size());
83+
84+
// Index some more docs
85+
int currentDoc = numAutoGenDocs.get();
86+
int finalCurrentDoc1 = currentDoc;
87+
waitUntil(() -> numAutoGenDocs.get() > finalCurrentDoc1 + 5);
88+
89+
logger.info("--> relocating from {} to {} ", docRepNode, remoteNode);
90+
client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, docRepNode, remoteNode)).execute().actionGet();
91+
ClusterHealthResponse clusterHealthResponse = client().admin()
92+
.cluster()
93+
.prepareHealth()
94+
.setTimeout(TimeValue.timeValueSeconds(60))
95+
.setWaitForEvents(Priority.LANGUID)
96+
.setWaitForNoRelocatingShards(true)
97+
.execute()
98+
.actionGet();
99+
100+
assertEquals(0, clusterHealthResponse.getRelocatingShards());
101+
assertEquals(remoteNode, primaryNodeName("test"));
102+
logger.info("--> relocation from docrep to remote complete");
103+
104+
// Index some more docs
105+
currentDoc = numAutoGenDocs.get();
106+
int finalCurrentDoc = currentDoc;
107+
waitUntil(() -> numAutoGenDocs.get() > finalCurrentDoc + 5);
108+
109+
client().admin()
110+
.cluster()
111+
.prepareReroute()
112+
.add(new MoveAllocationCommand("test", 0, remoteNode, remoteNode2))
113+
.execute()
114+
.actionGet();
115+
clusterHealthResponse = client().admin()
116+
.cluster()
117+
.prepareHealth()
118+
.setTimeout(TimeValue.timeValueSeconds(60))
119+
.setWaitForEvents(Priority.LANGUID)
120+
.setWaitForNoRelocatingShards(true)
121+
.execute()
122+
.actionGet();
123+
124+
assertEquals(0, clusterHealthResponse.getRelocatingShards());
125+
assertEquals(remoteNode2, primaryNodeName("test"));
126+
127+
logger.info("--> relocation from remote to remote complete");
128+
129+
finished.set(true);
130+
indexingThread.join();
131+
refresh("test");
132+
OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get());
133+
OpenSearchAssertions.assertHitCount(
134+
client().prepareSearch("test")
135+
.setTrackTotalHits(true)// extra paranoia ;)
136+
.setQuery(QueryBuilders.termQuery("auto", true))
137+
.get(),
138+
numAutoGenDocs.get()
139+
);
140+
141+
}
142+
143+
public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
144+
String docRepNode = internalCluster().startNode();
145+
Client client = internalCluster().client(docRepNode);
146+
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
147+
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
148+
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
149+
150+
// create shard with 0 replica and 1 shard
151+
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
152+
ensureGreen("test");
153+
154+
AtomicInteger numAutoGenDocs = new AtomicInteger();
155+
final AtomicBoolean finished = new AtomicBoolean(false);
156+
Thread indexingThread = getIndexingThread(finished, numAutoGenDocs);
157+
158+
refresh("test");
159+
160+
// add remote node in mixed mode cluster
161+
addRemote = true;
162+
String remoteNode = internalCluster().startNode();
163+
internalCluster().validateClusterFormed();
164+
165+
// assert repo gets registered
166+
GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { REPOSITORY_NAME });
167+
GetRepositoriesResponse getRepositoriesResponse = client.admin().cluster().getRepositories(gr).actionGet();
168+
assertEquals(1, getRepositoriesResponse.repositories().size());
169+
170+
setFailRate(REPOSITORY_NAME, 100);
171+
172+
logger.info("--> relocating from {} to {} ", docRepNode, remoteNode);
173+
client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, docRepNode, remoteNode)).execute().actionGet();
174+
ClusterHealthResponse clusterHealthResponse = client().admin()
175+
.cluster()
176+
.prepareHealth()
177+
.setTimeout(TimeValue.timeValueSeconds(5))
178+
.setWaitForEvents(Priority.LANGUID)
179+
.setWaitForNoRelocatingShards(true)
180+
.execute()
181+
.actionGet();
182+
183+
assertTrue(clusterHealthResponse.getRelocatingShards() == 1);
184+
setFailRate(REPOSITORY_NAME, 0);
185+
Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000));
186+
clusterHealthResponse = client().admin()
187+
.cluster()
188+
.prepareHealth()
189+
.setTimeout(TimeValue.timeValueSeconds(45))
190+
.setWaitForEvents(Priority.LANGUID)
191+
.setWaitForNoRelocatingShards(true)
192+
.execute()
193+
.actionGet();
194+
assertTrue(clusterHealthResponse.getRelocatingShards() == 0);
195+
logger.info("--> remote to remote relocation complete");
196+
finished.set(true);
197+
indexingThread.join();
198+
refresh("test");
199+
OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get());
200+
OpenSearchAssertions.assertHitCount(
201+
client().prepareSearch("test")
202+
.setTrackTotalHits(true)// extra paranoia ;)
203+
.setQuery(QueryBuilders.termQuery("auto", true))
204+
.get(),
205+
numAutoGenDocs.get()
206+
);
207+
}
208+
209+
private static Thread getIndexingThread(AtomicBoolean finished, AtomicInteger numAutoGenDocs) {
210+
Thread indexingThread = new Thread(() -> {
211+
while (finished.get() == false && numAutoGenDocs.get() < 10_000) {
212+
IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get();
213+
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
214+
DeleteResponse deleteResponse = client().prepareDelete("test", "id").get();
215+
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
216+
client().prepareIndex("test").setSource("auto", true).get();
217+
numAutoGenDocs.incrementAndGet();
218+
}
219+
});
220+
indexingThread.start();
221+
return indexingThread;
222+
}
223+
}

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java

-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ public Settings indexSettings() {
4444
.build();
4545
}
4646

47-
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9191")
4847
public void testPrimaryRelocationWhileIndexing() throws Exception {
4948
internalCluster().startClusterManagerOnlyNode();
5049
super.testPrimaryRelocationWhileIndexing();

0 commit comments

Comments
 (0)