Skip to content

Commit a8008e2

Browse files
author
Bhumika Saini
authored
Integrate local recovery with remote store seeding during migration (#12922)
Signed-off-by: Bhumika Saini <sabhumik@amazon.com>
1 parent 9e9ab6b commit a8008e2

File tree

15 files changed

+285
-47
lines changed

15 files changed

+285
-47
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.remotemigration;
10+
11+
import org.opensearch.action.admin.indices.stats.ShardStats;
12+
import org.opensearch.cluster.metadata.IndexMetadata;
13+
import org.opensearch.cluster.node.DiscoveryNode;
14+
import org.opensearch.cluster.node.DiscoveryNodes;
15+
import org.opensearch.cluster.routing.ShardRouting;
16+
import org.opensearch.common.blobstore.BlobPath;
17+
import org.opensearch.common.settings.Settings;
18+
import org.opensearch.common.unit.TimeValue;
19+
import org.opensearch.core.util.FileSystemUtils;
20+
import org.opensearch.index.remote.RemoteSegmentStats;
21+
import org.opensearch.index.translog.RemoteTranslogStats;
22+
import org.opensearch.test.InternalTestCluster;
23+
import org.opensearch.test.OpenSearchIntegTestCase;
24+
25+
import java.nio.file.Path;
26+
import java.util.Arrays;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.Set;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.stream.Collectors;
32+
33+
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
34+
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
35+
import static org.opensearch.index.store.RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR;
36+
37+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
38+
public class RemotePrimaryLocalRecoveryIT extends MigrationBaseTestCase {
39+
String indexName = "idx1";
40+
int numOfNodes = randomIntBetween(6, 9);
41+
42+
/**
43+
* Tests local recovery sanity in the happy path flow
44+
*/
45+
public void testLocalRecoveryRollingRestart() throws Exception {
46+
triggerRollingRestartForRemoteMigration(0);
47+
internalCluster().stopAllNodes();
48+
}
49+
50+
/**
51+
* Tests local recovery sanity during remote migration with a node restart in between
52+
*/
53+
public void testLocalRecoveryRollingRestartAndNodeFailure() throws Exception {
54+
triggerRollingRestartForRemoteMigration(0);
55+
56+
DiscoveryNodes discoveryNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
57+
DiscoveryNode nodeToRestart = (DiscoveryNode) discoveryNodes.getDataNodes().values().toArray()[randomIntBetween(0, numOfNodes - 4)];
58+
internalCluster().restartNode(nodeToRestart.getName());
59+
60+
Map<ShardRouting, ShardStats> shardStatsMap = internalCluster().client().admin().indices().prepareStats(indexName).get().asMap();
61+
for (Map.Entry<ShardRouting, ShardStats> entry : shardStatsMap.entrySet()) {
62+
ShardRouting shardRouting = entry.getKey();
63+
ShardStats shardStats = entry.getValue();
64+
if (nodeToRestart.equals(shardRouting.currentNodeId())) {
65+
RemoteSegmentStats remoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats();
66+
assertTrue(remoteSegmentStats.getTotalUploadTime() > 0);
67+
assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0);
68+
}
69+
70+
assertBusy(() -> {
71+
String shardPath = getShardLevelBlobPath(
72+
client(),
73+
indexName,
74+
new BlobPath(),
75+
String.valueOf(shardRouting.getId()),
76+
SEGMENTS,
77+
DATA
78+
).buildAsString();
79+
Path segmentDataRepoPath = segmentRepoPath.resolve(shardPath);
80+
List<String> segmentsNFilesInRepo = Arrays.stream(FileSystemUtils.files(segmentDataRepoPath))
81+
.filter(path -> path.getFileName().toString().contains("segments_"))
82+
.map(path -> path.getFileName().toString())
83+
.collect(Collectors.toList());
84+
Set<String> expectedUniqueSegmentsNFiles = segmentsNFilesInRepo.stream()
85+
.map(fileName -> fileName.split(SEGMENT_NAME_UUID_SEPARATOR)[0])
86+
.collect(Collectors.toSet());
87+
assertEquals(
88+
"Expected no duplicate segments_N files in remote but duplicates were found " + segmentsNFilesInRepo,
89+
expectedUniqueSegmentsNFiles.size(),
90+
segmentsNFilesInRepo.size()
91+
);
92+
}, 90, TimeUnit.SECONDS);
93+
}
94+
95+
internalCluster().stopAllNodes();
96+
}
97+
98+
/**
99+
* Tests local recovery flow sanity in the happy path flow with replicas in place
100+
*/
101+
public void testLocalRecoveryFlowWithReplicas() throws Exception {
102+
triggerRollingRestartForRemoteMigration(randomIntBetween(1, 2));
103+
internalCluster().stopAllNodes();
104+
}
105+
106+
/**
107+
* Helper method to run a rolling restart for migration to remote backed cluster
108+
*/
109+
private void triggerRollingRestartForRemoteMigration(int replicaCount) throws Exception {
110+
internalCluster().startClusterManagerOnlyNodes(3);
111+
internalCluster().startNodes(numOfNodes - 3);
112+
113+
// create index
114+
Settings indexSettings = Settings.builder()
115+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replicaCount)
116+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 10))
117+
.build();
118+
createIndex(indexName, indexSettings);
119+
ensureGreen(indexName);
120+
indexBulk(indexName, randomIntBetween(100, 10000));
121+
refresh(indexName);
122+
indexBulk(indexName, randomIntBetween(100, 10000));
123+
124+
initDocRepToRemoteMigration();
125+
126+
// rolling restart
127+
final Settings remoteNodeAttributes = remoteStoreClusterSettings(
128+
REPOSITORY_NAME,
129+
segmentRepoPath,
130+
REPOSITORY_2_NAME,
131+
translogRepoPath
132+
);
133+
internalCluster().rollingRestart(new InternalTestCluster.RestartCallback() {
134+
// Update remote attributes
135+
@Override
136+
public Settings onNodeStopped(String nodeName) {
137+
return remoteNodeAttributes;
138+
}
139+
});
140+
ensureStableCluster(numOfNodes);
141+
ensureGreen(TimeValue.timeValueSeconds(90), indexName);
142+
assertEquals(internalCluster().size(), numOfNodes);
143+
144+
// Assert on remote uploads
145+
Map<ShardRouting, ShardStats> shardStatsMap = internalCluster().client().admin().indices().prepareStats(indexName).get().asMap();
146+
DiscoveryNodes discoveryNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
147+
shardStatsMap.forEach((shardRouting, shardStats) -> {
148+
if (discoveryNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode() && shardRouting.primary()) {
149+
RemoteSegmentStats remoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats();
150+
assertTrue(remoteSegmentStats.getTotalUploadTime() > 0);
151+
assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0);
152+
}
153+
});
154+
155+
// Assert on new remote uploads after seeding
156+
indexBulk(indexName, randomIntBetween(100, 10000));
157+
refresh(indexName);
158+
indexBulk(indexName, randomIntBetween(100, 10000));
159+
Map<ShardRouting, ShardStats> newShardStatsMap = internalCluster().client().admin().indices().prepareStats(indexName).get().asMap();
160+
newShardStatsMap.forEach((shardRouting, shardStats) -> {
161+
if (discoveryNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode() && shardRouting.primary()) {
162+
RemoteSegmentStats prevRemoteSegmentStats = shardStatsMap.get(shardRouting)
163+
.getStats()
164+
.getSegments()
165+
.getRemoteSegmentStats();
166+
RemoteSegmentStats newRemoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats();
167+
assertTrue(newRemoteSegmentStats.getTotalUploadTime() > prevRemoteSegmentStats.getTotalUploadTime());
168+
assertTrue(newRemoteSegmentStats.getUploadBytesSucceeded() > prevRemoteSegmentStats.getUploadBytesSucceeded());
169+
170+
RemoteTranslogStats prevRemoteTranslogStats = shardStatsMap.get(shardRouting)
171+
.getStats()
172+
.getTranslog()
173+
.getRemoteTranslogStats();
174+
RemoteTranslogStats newRemoteTranslogStats = shardStats.getStats().getTranslog().getRemoteTranslogStats();
175+
assertTrue(newRemoteTranslogStats.getUploadBytesSucceeded() > prevRemoteTranslogStats.getUploadBytesSucceeded());
176+
}
177+
});
178+
}
179+
}

server/src/main/java/org/opensearch/index/IndexService.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -500,13 +500,14 @@ public synchronized IndexShard createShard(
500500
if (this.indexSettings.isRemoteStoreEnabled()) {
501501
remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path);
502502
} else {
503-
if (sourceNode != null && sourceNode.isRemoteStoreNode() == false) {
503+
if (sourceNode == null || sourceNode.isRemoteStoreNode() == false) {
504504
if (routing.primary() == false) {
505505
throw new IllegalStateException("Can't migrate a remote shard to replica before primary " + routing.shardId());
506506
}
507507
logger.info("DocRep shard {} is migrating to remote", shardId);
508508
seedRemote = true;
509509
}
510+
510511
remoteDirectory = ((RemoteSegmentStoreDirectoryFactory) remoteDirectoryFactory).newDirectory(
511512
RemoteStoreNodeAttribute.getRemoteStoreSegmentRepo(this.indexSettings.getNodeSettings()),
512513
this.indexSettings.getUUID(),

server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -1106,12 +1106,11 @@ private ReplicationGroup calculateReplicationGroup() {
11061106
} else {
11071107
newVersion = replicationGroup.getVersion() + 1;
11081108
}
1109-
assert indexSettings.isRemoteTranslogStoreEnabled()
1110-
// Handle migration cases. Ignore assertion if any of the shard copies in the replication group is assigned to a remote node
1111-
|| (replicationGroup != null
1112-
&& replicationGroup.getReplicationTargets()
1113-
.stream()
1114-
.anyMatch(shardRouting -> isShardOnRemoteEnabledNode.apply(shardRouting.currentNodeId())))
1109+
assert newVersion == 0 || indexSettings.isRemoteTranslogStoreEnabled()
1110+
// Handle migration cases. Ignore assertion if any of the shard copies in the replication group is assigned to a remote node
1111+
|| replicationGroup.getReplicationTargets()
1112+
.stream()
1113+
.anyMatch(shardRouting -> isShardOnRemoteEnabledNode.apply(shardRouting.currentNodeId()))
11151114
|| checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).allMatch(e -> e.getValue().replicated)
11161115
: "In absence of remote translog store, all tracked shards must have replication mode as LOGICAL_REPLICATION";
11171116

server/src/main/java/org/opensearch/index/shard/IndexShard.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,7 @@ public IndexShard(
436436
logger.debug("state: [CREATED]");
437437

438438
this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
439-
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays, nodeId);
439+
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays, nodeId, seedRemote);
440440
final String aId = shardRouting.allocationId().getId();
441441
final long primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardId.id());
442442
this.pendingPrimaryTerm = primaryTerm;
@@ -5000,7 +5000,8 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
50005000
shardPath().resolveTranslog(),
50015001
indexSettings.getRemoteStorePathStrategy(),
50025002
remoteStoreSettings,
5003-
logger
5003+
logger,
5004+
shouldSeedRemoteStore()
50045005
);
50055006
}
50065007

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

+8
Original file line numberDiff line numberDiff line change
@@ -650,6 +650,14 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
650650
indexShard.recoveryState().getIndex().setFileDetailsComplete();
651651
}
652652
indexShard.openEngineAndRecoverFromTranslog();
653+
if (indexShard.shouldSeedRemoteStore()) {
654+
indexShard.getThreadPool().executor(ThreadPool.Names.GENERIC).execute(() -> {
655+
logger.info("Attempting to seed Remote Store via local recovery for {}", indexShard.shardId());
656+
indexShard.refresh("remote store migration");
657+
});
658+
indexShard.waitForRemoteStoreSync();
659+
logger.info("Remote Store is now seeded via local recovery for {}", indexShard.shardId());
660+
}
653661
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
654662
indexShard.finalizeRecovery();
655663
indexShard.postRecovery("post recovery from shard_store");

server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

+12-7
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public RemoteFsTranslog(
120120
remoteStoreSettings
121121
);
122122
try {
123-
download(translogTransferManager, location, logger);
123+
download(translogTransferManager, location, logger, config.shouldSeedRemote());
124124
Checkpoint checkpoint = readCheckpoint(location);
125125
logger.info("Downloaded data from remote translog till maxSeqNo = {}", checkpoint.maxSeqNo);
126126
this.readers.addAll(recoverFromFiles(checkpoint));
@@ -168,7 +168,8 @@ public static void download(
168168
Path location,
169169
RemoteStorePathStrategy pathStrategy,
170170
RemoteStoreSettings remoteStoreSettings,
171-
Logger logger
171+
Logger logger,
172+
boolean seedRemote
172173
) throws IOException {
173174
assert repository instanceof BlobStoreRepository : String.format(
174175
Locale.ROOT,
@@ -189,11 +190,12 @@ public static void download(
189190
pathStrategy,
190191
remoteStoreSettings
191192
);
192-
RemoteFsTranslog.download(translogTransferManager, location, logger);
193+
RemoteFsTranslog.download(translogTransferManager, location, logger, seedRemote);
193194
logger.trace(remoteTranslogTransferTracker.toString());
194195
}
195196

196-
static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
197+
static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger, boolean seedRemote)
198+
throws IOException {
197199
/*
198200
In Primary to Primary relocation , there can be concurrent upload and download of translog.
199201
While translog files are getting downloaded by new primary, it might hence be deleted by the primary
@@ -206,7 +208,7 @@ static void download(TranslogTransferManager translogTransferManager, Path locat
206208
boolean success = false;
207209
long startTimeMs = System.currentTimeMillis();
208210
try {
209-
downloadOnce(translogTransferManager, location, logger);
211+
downloadOnce(translogTransferManager, location, logger, seedRemote);
210212
success = true;
211213
return;
212214
} catch (FileNotFoundException | NoSuchFileException e) {
@@ -220,7 +222,8 @@ static void download(TranslogTransferManager translogTransferManager, Path locat
220222
throw ex;
221223
}
222224

223-
private static void downloadOnce(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
225+
private static void downloadOnce(TranslogTransferManager translogTransferManager, Path location, Logger logger, boolean seedRemote)
226+
throws IOException {
224227
logger.debug("Downloading translog files from remote");
225228
RemoteTranslogTransferTracker statsTracker = translogTransferManager.getRemoteTranslogTransferTracker();
226229
long prevDownloadBytesSucceeded = statsTracker.getDownloadBytesSucceeded();
@@ -262,7 +265,9 @@ private static void downloadOnce(TranslogTransferManager translogTransferManager
262265
logger.debug("No translog files found on remote, checking local filesystem for cleanup");
263266
if (FileSystemUtils.exists(location.resolve(CHECKPOINT_FILE_NAME))) {
264267
final Checkpoint checkpoint = readCheckpoint(location);
265-
if (isEmptyTranslog(checkpoint) == false) {
268+
if (seedRemote) {
269+
logger.debug("Remote migration ongoing. Retaining the translog on local, skipping clean-up");
270+
} else if (isEmptyTranslog(checkpoint) == false) {
266271
logger.debug("Translog files exist on local without any metadata in remote, cleaning up these files");
267272
// Creating empty translog will cleanup the older un-referenced tranlog files, we don't have to explicitly delete
268273
Translog.createEmptyTranslog(location, translogTransferManager.getShardId(), checkpoint);

server/src/main/java/org/opensearch/index/translog/TranslogConfig.java

+18-3
Original file line numberDiff line numberDiff line change
@@ -59,16 +59,25 @@ public final class TranslogConfig {
5959
private final Path translogPath;
6060
private final ByteSizeValue bufferSize;
6161
private final String nodeId;
62+
private final boolean seedRemote;
6263

6364
/**
6465
* Creates a new TranslogConfig instance
6566
* @param shardId the shard ID this translog belongs to
6667
* @param translogPath the path to use for the transaction log files
6768
* @param indexSettings the index settings used to set internal variables
6869
* @param bigArrays a bigArrays instance used for temporarily allocating write operations
70+
* @param seedRemote boolean denoting whether remote store needs to be seeded as part of remote migration
6971
*/
70-
public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, BigArrays bigArrays, String nodeId) {
71-
this(shardId, translogPath, indexSettings, bigArrays, DEFAULT_BUFFER_SIZE, nodeId);
72+
public TranslogConfig(
73+
ShardId shardId,
74+
Path translogPath,
75+
IndexSettings indexSettings,
76+
BigArrays bigArrays,
77+
String nodeId,
78+
boolean seedRemote
79+
) {
80+
this(shardId, translogPath, indexSettings, bigArrays, DEFAULT_BUFFER_SIZE, nodeId, seedRemote);
7281
}
7382

7483
TranslogConfig(
@@ -77,14 +86,16 @@ public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSet
7786
IndexSettings indexSettings,
7887
BigArrays bigArrays,
7988
ByteSizeValue bufferSize,
80-
String nodeId
89+
String nodeId,
90+
boolean seedRemote
8191
) {
8292
this.bufferSize = bufferSize;
8393
this.indexSettings = indexSettings;
8494
this.shardId = shardId;
8595
this.translogPath = translogPath;
8696
this.bigArrays = bigArrays;
8797
this.nodeId = nodeId;
98+
this.seedRemote = seedRemote;
8899
}
89100

90101
/**
@@ -125,4 +136,8 @@ public ByteSizeValue getBufferSize() {
125136
public String getNodeId() {
126137
return nodeId;
127138
}
139+
140+
public boolean shouldSeedRemote() {
141+
return seedRemote;
142+
}
128143
}

server/src/main/java/org/opensearch/index/translog/TruncateTranslogAction.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,8 @@ private boolean isTranslogClean(ShardPath shardPath, ClusterState clusterState,
195195
translogPath,
196196
indexSettings,
197197
BigArrays.NON_RECYCLING_INSTANCE,
198-
""
198+
"",
199+
false
199200
);
200201
long primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardPath.getShardId().id());
201202
// We open translog to check for corruption, do not clean anything.

0 commit comments

Comments
 (0)