Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Populate RecoveryState details for shallow snapshot restore (#15353) #15566

Merged
merged 1 commit into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.io.PathUtils;
Expand All @@ -31,6 +33,7 @@
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.snapshots.SnapshotInfo;
Expand Down Expand Up @@ -63,6 +66,8 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteRestoreSnapshotIT extends AbstractSnapshotIntegTestCase {
Expand Down Expand Up @@ -579,6 +584,37 @@ public void testRestoreShallowSnapshotRepository() throws ExecutionException, In
ensureGreen(restoredIndexName1);
assertDocsPresentInIndex(client, restoredIndexName1, numDocsInIndex1);

// ensure recovery details are non-zero
RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(restoredIndexName1).execute().actionGet();
assertEquals(1, recoveryResponse.getTotalShards());
assertEquals(1, recoveryResponse.getSuccessfulShards());
assertEquals(0, recoveryResponse.getFailedShards());
assertEquals(1, recoveryResponse.shardRecoveryStates().size());
assertTrue(recoveryResponse.shardRecoveryStates().containsKey(restoredIndexName1));
assertEquals(1, recoveryResponse.shardRecoveryStates().get(restoredIndexName1).size());

RecoveryState recoveryState = recoveryResponse.shardRecoveryStates().get(restoredIndexName1).get(0);
assertEquals(RecoveryState.Stage.DONE, recoveryState.getStage());
assertEquals(0, recoveryState.getShardId().getId());
assertTrue(recoveryState.getPrimary());
assertEquals(RecoverySource.Type.SNAPSHOT, recoveryState.getRecoverySource().getType());
assertThat(recoveryState.getIndex().time(), greaterThanOrEqualTo(0L));

// ensure populated file details
assertTrue(recoveryState.getIndex().totalFileCount() > 0);
assertTrue(recoveryState.getIndex().totalRecoverFiles() > 0);
assertTrue(recoveryState.getIndex().recoveredFileCount() > 0);
assertThat(recoveryState.getIndex().recoveredFilesPercent(), greaterThanOrEqualTo(0.0f));
assertThat(recoveryState.getIndex().recoveredFilesPercent(), lessThanOrEqualTo(100.0f));
assertFalse(recoveryState.getIndex().fileDetails().isEmpty());

// ensure populated bytes details
assertTrue(recoveryState.getIndex().recoveredBytes() > 0L);
assertTrue(recoveryState.getIndex().totalBytes() > 0L);
assertTrue(recoveryState.getIndex().totalRecoverBytes() > 0L);
assertThat(recoveryState.getIndex().recoveredBytesPercent(), greaterThanOrEqualTo(0.0f));
assertThat(recoveryState.getIndex().recoveredBytesPercent(), lessThanOrEqualTo(100.0f));

// indexing some new docs and validating
indexDocuments(client, restoredIndexName1, numDocsInIndex1, numDocsInIndex1 + 2);
ensureGreen(restoredIndexName1);
Expand Down
17 changes: 15 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -5130,10 +5130,23 @@
}
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = sourceRemoteDirectory
.getSegmentsUploadedToRemoteStore();
final Directory storeDirectory = store.directory();
store.incRef();

try {
final Directory storeDirectory;
if (recoveryState.getStage() == RecoveryState.Stage.INDEX) {
storeDirectory = new StoreRecovery.StatsDirectoryWrapper(store.directory(), recoveryState.getIndex());

Check warning on line 5137 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L5137

Added line #L5137 was not covered by tests
for (String file : uploadedSegments.keySet()) {
long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum());

Check warning on line 5139 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L5139

Added line #L5139 was not covered by tests
if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) {
recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), false);

Check warning on line 5141 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L5141

Added line #L5141 was not covered by tests
} else {
recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), true);

Check warning on line 5143 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L5143

Added line #L5143 was not covered by tests
}
}

Check warning on line 5145 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L5145

Added line #L5145 was not covered by tests
} else {
storeDirectory = store.directory();
}

String segmentsNFile = copySegmentFiles(
storeDirectory,
sourceRemoteDirectory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2834,9 +2834,9 @@ public void testSyncSegmentsFromGivenRemoteSegmentStore() throws IOException {
RecoverySource.ExistingStoreRecoverySource.INSTANCE
);
routing = ShardRoutingHelper.newWithRestoreSource(routing, new RecoverySource.EmptyStoreRecoverySource());

target = reinitShard(target, routing);

DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
target.markAsRecovering("from snapshot", new RecoveryState(routing, localNode, null));
target.syncSegmentsFromGivenRemoteSegmentStore(false, tempRemoteSegmentDirectory, primaryTerm, commitGeneration);
RemoteSegmentStoreDirectory remoteStoreDirectory = ((RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) target
.remoteStore()
Expand Down
Loading