Skip to content

Commit 9de21d1

Browse files
authored
Using RemoteDirectory#delete to clear all segments during migration (#17021)
Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
1 parent 38e4b33 commit 9de21d1

File tree

4 files changed

+89
-2
lines changed

4 files changed

+89
-2
lines changed

server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java

+86
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@
1818
import org.opensearch.common.settings.Settings;
1919
import org.opensearch.common.unit.TimeValue;
2020
import org.opensearch.index.query.QueryBuilders;
21+
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
2122
import org.opensearch.indices.recovery.RecoverySettings;
2223
import org.opensearch.plugins.Plugin;
2324
import org.opensearch.test.OpenSearchIntegTestCase;
2425
import org.opensearch.test.hamcrest.OpenSearchAssertions;
2526
import org.opensearch.test.transport.MockTransportService;
27+
import org.opensearch.transport.TransportService;
2628
import org.opensearch.transport.client.Client;
2729
import org.opensearch.transport.client.Requests;
2830

31+
import java.io.IOException;
2932
import java.util.Collection;
3033
import java.util.List;
3134
import java.util.concurrent.atomic.AtomicBoolean;
@@ -195,4 +198,87 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
195198
.setTransientSettings(Settings.builder().put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), (String) null))
196199
.get();
197200
}
201+
202+
public void testMixedModeRelocation_FailInFinalize() throws Exception {
203+
String docRepNode = internalCluster().startNode();
204+
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
205+
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
206+
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
207+
208+
// create shard with 0 replica and 1 shard
209+
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
210+
ensureGreen("test");
211+
212+
AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test");
213+
asyncIndexingService.startIndexing();
214+
215+
refresh("test");
216+
217+
// add remote node in mixed mode cluster
218+
setAddRemote(true);
219+
String remoteNode = internalCluster().startNode();
220+
internalCluster().validateClusterFormed();
221+
222+
AtomicBoolean failFinalize = new AtomicBoolean(true);
223+
224+
MockTransportService remoteNodeTransportService = (MockTransportService) internalCluster().getInstance(
225+
TransportService.class,
226+
remoteNode
227+
);
228+
229+
remoteNodeTransportService.addRequestHandlingBehavior(
230+
PeerRecoveryTargetService.Actions.FINALIZE,
231+
(handler, request, channel, task) -> {
232+
if (failFinalize.get()) {
233+
throw new IOException("Failing finalize");
234+
} else {
235+
handler.messageReceived(request, channel, task);
236+
}
237+
}
238+
);
239+
240+
client().admin()
241+
.cluster()
242+
.prepareUpdateSettings()
243+
.setTransientSettings(Settings.builder().put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), "40s"))
244+
.get();
245+
246+
// Change direction to remote store
247+
updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store"));
248+
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
249+
250+
logger.info("--> relocating from {} to {} ", docRepNode, remoteNode);
251+
client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, docRepNode, remoteNode)).execute().actionGet();
252+
ClusterHealthResponse clusterHealthResponse = client().admin()
253+
.cluster()
254+
.prepareHealth()
255+
.setTimeout(TimeValue.timeValueSeconds(5))
256+
.setWaitForEvents(Priority.LANGUID)
257+
.setWaitForNoRelocatingShards(true)
258+
.execute()
259+
.actionGet();
260+
261+
assertTrue(clusterHealthResponse.getRelocatingShards() == 1);
262+
263+
ClusterHealthRequest healthRequest = Requests.clusterHealthRequest()
264+
.waitForNoRelocatingShards(true)
265+
.waitForNoInitializingShards(true);
266+
ClusterHealthResponse actionGet = client().admin().cluster().health(healthRequest).actionGet();
267+
assertEquals(actionGet.getRelocatingShards(), 0);
268+
assertEquals(docRepNode, primaryNodeName("test"));
269+
270+
// now unblock it
271+
logger.info("Unblocking the finalize recovery now");
272+
failFinalize.set(false);
273+
274+
client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, docRepNode, remoteNode)).execute().actionGet();
275+
waitForRelocation();
276+
277+
asyncIndexingService.stopIndexing();
278+
client().admin()
279+
.cluster()
280+
.prepareUpdateSettings()
281+
.setTransientSettings(Settings.builder().put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), (String) null))
282+
.get();
283+
}
198284
}

server/src/main/java/org/opensearch/common/blobstore/fs/FsBlobContainer.java

+1
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ public void writeBlobAtomic(final String blobName, final InputStream inputStream
225225
}
226226

227227
private void writeToPath(InputStream inputStream, Path tempBlobPath, long blobSize) throws IOException {
228+
Files.createDirectories(path);
228229
try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) {
229230
final int bufferSize = blobStore.bufferSizeInBytes();
230231
org.opensearch.common.util.io.Streams.copy(

server/src/main/java/org/opensearch/index/shard/IndexShard.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -5056,7 +5056,7 @@ public void deleteTranslogFilesFromRemoteTranslog() throws IOException {
50565056
*/
50575057
public void deleteRemoteStoreContents() throws IOException {
50585058
deleteTranslogFilesFromRemoteTranslog();
5059-
getRemoteDirectory().deleteStaleSegments(0);
5059+
getRemoteDirectory().delete();
50605060
}
50615061

50625062
public void syncTranslogFilesFromRemoteTranslog() throws IOException {

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1061,7 +1061,7 @@ private boolean deleteIfEmpty() throws IOException {
10611061
return delete();
10621062
}
10631063

1064-
private boolean delete() {
1064+
public boolean delete() {
10651065
try {
10661066
remoteDataDirectory.delete();
10671067
remoteMetadataDirectory.delete();

0 commit comments

Comments
 (0)