Skip to content

Commit edcbdaf

Browse files
authored
[Remote Store] Upload translog checkpoint as object metadata to translog (#13637) (#13795)
* Upload translog checkpoint as object metadata to translog Signed-off-by: Sandeep Kumawat <2025sandeepkumawat@gmail.com>
1 parent 03c13cb commit edcbdaf

File tree

56 files changed

+1287
-385
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+1287
-385
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
88
- Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423))
99
- Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478))
1010
- Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293))
11+
- [Remote Store] Upload translog checkpoint as object metadata to translog.tlog([#13637](https://github.com/opensearch-project/OpenSearch/pull/13637))
1112

1213
### Dependencies
1314
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@
7878
import org.opensearch.common.blobstore.BlobPath;
7979
import org.opensearch.common.blobstore.BlobStoreException;
8080
import org.opensearch.common.blobstore.DeleteResult;
81-
import org.opensearch.common.blobstore.FetchBlobResult;
81+
import org.opensearch.common.blobstore.InputStreamWithMetadata;
8282
import org.opensearch.common.blobstore.stream.read.ReadContext;
8383
import org.opensearch.common.blobstore.stream.write.WriteContext;
8484
import org.opensearch.common.blobstore.stream.write.WritePriority;
@@ -143,9 +143,9 @@ public boolean blobExists(String blobName) {
143143

144144
@ExperimentalApi
145145
@Override
146-
public FetchBlobResult readBlobWithMetadata(String blobName) throws IOException {
146+
public InputStreamWithMetadata readBlobWithMetadata(String blobName) throws IOException {
147147
S3RetryingInputStream s3RetryingInputStream = new S3RetryingInputStream(blobStore, buildKey(blobName));
148-
return new FetchBlobResult(s3RetryingInputStream, s3RetryingInputStream.getMetadata());
148+
return new InputStreamWithMetadata(s3RetryingInputStream, s3RetryingInputStream.getMetadata());
149149
}
150150

151151
@Override

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java

+5
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,11 @@ public Map<Metric, Map<String, Long>> extendedStats() {
244244
return extendedStats;
245245
}
246246

247+
@Override
248+
public boolean isBlobMetadataEnabled() {
249+
return true;
250+
}
251+
247252
public ObjectCannedACL getCannedACL() {
248253
return cannedACL;
249254
}

server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteCloneIndexIT.java

+3
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.opensearch.index.query.TermsQueryBuilder;
5353
import org.opensearch.indices.recovery.RecoverySettings;
5454
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
55+
import org.opensearch.test.OpenSearchIntegTestCase;
5556
import org.opensearch.test.VersionUtils;
5657

5758
import java.util.concurrent.ExecutionException;
@@ -60,6 +61,7 @@
6061
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
6162
import static org.hamcrest.Matchers.equalTo;
6263

64+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
6365
public class RemoteCloneIndexIT extends RemoteStoreBaseIntegTestCase {
6466

6567
@Override
@@ -139,6 +141,7 @@ public void testCreateCloneIndex() {
139141
}
140142

141143
public void testCreateCloneIndexFailure() throws ExecutionException, InterruptedException {
144+
asyncUploadMockFsRepo = false;
142145
Version version = VersionUtils.randomIndexCompatibleVersion(random());
143146
int numPrimaryShards = 1;
144147
prepareCreate("source").setSettings(

server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteShrinkIndexIT.java

+9
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@
4848
import org.opensearch.indices.IndicesService;
4949
import org.opensearch.indices.replication.common.ReplicationType;
5050
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
51+
import org.opensearch.test.OpenSearchIntegTestCase;
5152
import org.opensearch.test.VersionUtils;
53+
import org.junit.Before;
5254

5355
import java.util.Arrays;
5456
import java.util.Map;
@@ -61,12 +63,18 @@
6163
import static org.hamcrest.Matchers.equalTo;
6264
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
6365

66+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
6467
public class RemoteShrinkIndexIT extends RemoteStoreBaseIntegTestCase {
6568
@Override
6669
protected boolean forbidPrivateIndexSettings() {
6770
return false;
6871
}
6972

73+
@Before
74+
public void setup() {
75+
asyncUploadMockFsRepo = false;
76+
}
77+
7078
public Settings indexSettings() {
7179
return Settings.builder()
7280
.put(super.indexSettings())
@@ -84,6 +92,7 @@ public void testCreateShrinkIndexToN() {
8492
int[] shardSplits = randomFrom(possibleShardSplits);
8593
assertEquals(shardSplits[0], (shardSplits[0] / shardSplits[1]) * shardSplits[1]);
8694
assertEquals(shardSplits[1], (shardSplits[1] / shardSplits[2]) * shardSplits[2]);
95+
8796
internalCluster().ensureAtLeastNumDataNodes(2);
8897
prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", shardSplits[0])).get();
8998
for (int i = 0; i < 20; i++) {

server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteSplitIndexIT.java

+2
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.opensearch.indices.IndicesService;
6868
import org.opensearch.indices.replication.common.ReplicationType;
6969
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
70+
import org.opensearch.test.OpenSearchIntegTestCase;
7071
import org.opensearch.test.VersionUtils;
7172

7273
import java.io.IOException;
@@ -86,6 +87,7 @@
8687
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;
8788
import static org.hamcrest.Matchers.equalTo;
8889

90+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
8991
public class RemoteSplitIndexIT extends RemoteStoreBaseIntegTestCase {
9092

9193
@Override

server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java

+6
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.opensearch.repositories.RepositoriesService;
1919
import org.opensearch.repositories.blobstore.BlobStoreRepository;
2020
import org.opensearch.test.OpenSearchIntegTestCase;
21+
import org.junit.Before;
2122

2223
import java.nio.charset.StandardCharsets;
2324
import java.util.Base64;
@@ -32,6 +33,11 @@ public class RemoteClusterStateServiceIT extends RemoteStoreBaseIntegTestCase {
3233

3334
private static String INDEX_NAME = "test-index";
3435

36+
@Before
37+
public void setup() {
38+
asyncUploadMockFsRepo = false;
39+
}
40+
3541
@Override
3642
protected Settings nodeSettings(int nodeOrdinal) {
3743
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();

server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java

+1
Original file line numberDiff line numberDiff line change
@@ -512,5 +512,6 @@ private void assertCustomIndexMetadata(String index) {
512512
logger.info("---> Asserting custom index metadata");
513513
IndexMetadata iMd = internalCluster().client().admin().cluster().prepareState().get().getState().metadata().index(index);
514514
assertNotNull(iMd.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY));
515+
assertNotNull(iMd.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY).get(IndexMetadata.TRANSLOG_METADATA_KEY));
515516
}
516517
}

server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,18 @@
1111
import org.opensearch.action.index.IndexResponse;
1212
import org.opensearch.common.settings.Settings;
1313
import org.opensearch.plugins.Plugin;
14+
import org.opensearch.test.OpenSearchIntegTestCase;
1415
import org.opensearch.test.transport.MockTransportService;
1516

16-
import java.util.Arrays;
1717
import java.util.Collection;
1818
import java.util.Map;
1919
import java.util.concurrent.TimeUnit;
20+
import java.util.stream.Collectors;
21+
import java.util.stream.Stream;
2022

2123
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
2224

25+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
2326
public class BaseRemoteStoreRestoreIT extends RemoteStoreBaseIntegTestCase {
2427
static final String INDEX_NAME = "remote-store-test-idx-1";
2528
static final String INDEX_NAMES = "test-remote-store-1,test-remote-store-2,remote-store-test-index-1,remote-store-test-index-2";
@@ -39,7 +42,7 @@ public Settings indexSettings(int shards, int replicas) {
3942

4043
@Override
4144
protected Collection<Class<? extends Plugin>> nodePlugins() {
42-
return Arrays.asList(MockTransportService.TestPlugin.class);
45+
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockTransportService.TestPlugin.class)).collect(Collectors.toList());
4346
}
4447

4548
protected void restore(String... indices) {

server/src/internalClusterTest/java/org/opensearch/remotestore/PrimaryTermValidationIT.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.junit.Before;
3131

3232
import java.nio.file.Path;
33-
import java.util.Arrays;
3433
import java.util.Collection;
3534
import java.util.HashSet;
3635
import java.util.Set;
@@ -50,7 +49,7 @@ public class PrimaryTermValidationIT extends RemoteStoreBaseIntegTestCase {
5049

5150
@Override
5251
protected Collection<Class<? extends Plugin>> nodePlugins() {
53-
return Arrays.asList(MockTransportService.TestPlugin.class);
52+
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockTransportService.TestPlugin.class)).collect(Collectors.toList());
5453
}
5554

5655
@Before

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java

+43-4
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@
3636
import org.opensearch.index.shard.IndexShard;
3737
import org.opensearch.indices.IndicesService;
3838
import org.opensearch.indices.replication.common.ReplicationType;
39+
import org.opensearch.plugins.Plugin;
40+
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
41+
import org.opensearch.remotestore.translogmetadata.mocks.MockFsMetadataSupportedRepositoryPlugin;
3942
import org.opensearch.repositories.RepositoriesService;
4043
import org.opensearch.repositories.blobstore.BlobStoreRepository;
4144
import org.opensearch.repositories.fs.ReloadableFsRepository;
@@ -48,13 +51,15 @@
4851
import java.nio.file.Path;
4952
import java.nio.file.SimpleFileVisitor;
5053
import java.nio.file.attribute.BasicFileAttributes;
54+
import java.util.Collection;
5155
import java.util.HashMap;
5256
import java.util.List;
5357
import java.util.Locale;
5458
import java.util.Map;
5559
import java.util.concurrent.ExecutionException;
5660
import java.util.concurrent.atomic.AtomicInteger;
5761
import java.util.stream.Collectors;
62+
import java.util.stream.Stream;
5863

5964
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
6065
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
@@ -74,6 +79,8 @@ public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
7479
protected Path segmentRepoPath;
7580
protected Path translogRepoPath;
7681
protected boolean clusterSettingsSuppliedByTest = false;
82+
protected boolean asyncUploadMockFsRepo = randomBoolean();
83+
private boolean metadataSupportedType = randomBoolean();
7784
private final List<String> documentKeys = List.of(
7885
randomAlphaOfLength(5),
7986
randomAlphaOfLength(5),
@@ -129,6 +136,19 @@ protected Map<String, Long> indexData(int numberOfIterations, boolean invokeFlus
129136
return indexingStats;
130137
}
131138

139+
@Override
140+
protected Collection<Class<? extends Plugin>> nodePlugins() {
141+
if (!clusterSettingsSuppliedByTest && asyncUploadMockFsRepo) {
142+
if (metadataSupportedType) {
143+
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockFsMetadataSupportedRepositoryPlugin.class))
144+
.collect(Collectors.toList());
145+
} else {
146+
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockFsRepositoryPlugin.class)).collect(Collectors.toList());
147+
}
148+
}
149+
return super.nodePlugins();
150+
}
151+
132152
@Override
133153
protected Settings nodeSettings(int nodeOrdinal) {
134154
if (segmentRepoPath == null || translogRepoPath == null) {
@@ -138,10 +158,27 @@ protected Settings nodeSettings(int nodeOrdinal) {
138158
if (clusterSettingsSuppliedByTest) {
139159
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build();
140160
} else {
141-
return Settings.builder()
142-
.put(super.nodeSettings(nodeOrdinal))
143-
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath))
144-
.build();
161+
if (asyncUploadMockFsRepo) {
162+
String repoType = metadataSupportedType ? MockFsMetadataSupportedRepositoryPlugin.TYPE_MD : MockFsRepositoryPlugin.TYPE;
163+
return Settings.builder()
164+
.put(super.nodeSettings(nodeOrdinal))
165+
.put(
166+
remoteStoreClusterSettings(
167+
REPOSITORY_NAME,
168+
segmentRepoPath,
169+
repoType,
170+
REPOSITORY_2_NAME,
171+
translogRepoPath,
172+
repoType
173+
)
174+
)
175+
.build();
176+
} else {
177+
return Settings.builder()
178+
.put(super.nodeSettings(nodeOrdinal))
179+
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath))
180+
.build();
181+
}
145182
}
146183
}
147184

@@ -221,6 +258,8 @@ protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFiel
221258
@After
222259
public void teardown() {
223260
clusterSettingsSuppliedByTest = false;
261+
asyncUploadMockFsRepo = randomBoolean();
262+
metadataSupportedType = randomBoolean();
224263
assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_NAME);
225264
assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_2_NAME);
226265
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java

+6
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.opensearch.gateway.remote.RemoteClusterStateService;
2525
import org.opensearch.test.InternalTestCluster;
2626
import org.opensearch.test.OpenSearchIntegTestCase;
27+
import org.junit.Before;
2728

2829
import java.io.IOException;
2930
import java.nio.file.Files;
@@ -47,6 +48,11 @@
4748
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
4849
public class RemoteStoreClusterStateRestoreIT extends BaseRemoteStoreRestoreIT {
4950

51+
@Before
52+
public void setup() {
53+
asyncUploadMockFsRepo = false;
54+
}
55+
5056
@Override
5157
protected Settings nodeSettings(int nodeOrdinal) {
5258
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreForceMergeIT.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@
1919

2020
import java.io.IOException;
2121
import java.util.ArrayList;
22-
import java.util.Arrays;
2322
import java.util.Collection;
2423
import java.util.HashMap;
2524
import java.util.List;
2625
import java.util.Map;
26+
import java.util.stream.Collectors;
27+
import java.util.stream.Stream;
2728

2829
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
2930
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
@@ -37,7 +38,7 @@ public class RemoteStoreForceMergeIT extends RemoteStoreBaseIntegTestCase {
3738

3839
@Override
3940
protected Collection<Class<? extends Plugin>> nodePlugins() {
40-
return Arrays.asList(MockTransportService.TestPlugin.class);
41+
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockTransportService.TestPlugin.class)).collect(Collectors.toList());
4142
}
4243

4344
@Override

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java

+4-22
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.opensearch.indices.recovery.RecoverySettings;
3939
import org.opensearch.indices.recovery.RecoveryState;
4040
import org.opensearch.plugins.Plugin;
41-
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
4241
import org.opensearch.test.InternalTestCluster;
4342
import org.opensearch.test.OpenSearchIntegTestCase;
4443
import org.opensearch.test.transport.MockTransportService;
@@ -48,14 +47,14 @@
4847
import java.io.IOException;
4948
import java.nio.file.Files;
5049
import java.nio.file.Path;
51-
import java.util.Arrays;
5250
import java.util.Collection;
5351
import java.util.List;
5452
import java.util.Map;
5553
import java.util.Optional;
5654
import java.util.concurrent.CountDownLatch;
5755
import java.util.concurrent.ExecutionException;
5856
import java.util.concurrent.TimeUnit;
57+
import java.util.stream.Collectors;
5958
import java.util.stream.Stream;
6059

6160
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
@@ -81,7 +80,7 @@ public class RemoteStoreIT extends RemoteStoreBaseIntegTestCase {
8180

8281
@Override
8382
protected Collection<Class<? extends Plugin>> nodePlugins() {
84-
return Arrays.asList(MockTransportService.TestPlugin.class, MockFsRepositoryPlugin.class);
83+
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockTransportService.TestPlugin.class)).collect(Collectors.toList());
8584
}
8685

8786
@Override
@@ -797,25 +796,8 @@ public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionExcep
797796
// Test local only translog files which are not uploaded to remote store (no metadata present in remote)
798797
// Without the cleanup change in RemoteFsTranslog.createEmptyTranslog, this test fails with NPE.
799798
public void testLocalOnlyTranslogCleanupOnNodeRestart() throws Exception {
800-
clusterSettingsSuppliedByTest = true;
801-
802-
// Overriding settings to use AsyncMultiStreamBlobContainer
803-
Settings settings = Settings.builder()
804-
.put(super.nodeSettings(1))
805-
.put(
806-
remoteStoreClusterSettings(
807-
REPOSITORY_NAME,
808-
segmentRepoPath,
809-
MockFsRepositoryPlugin.TYPE,
810-
REPOSITORY_2_NAME,
811-
translogRepoPath,
812-
MockFsRepositoryPlugin.TYPE
813-
)
814-
)
815-
.build();
816-
817-
internalCluster().startClusterManagerOnlyNode(settings);
818-
String dataNode = internalCluster().startDataOnlyNode(settings);
799+
internalCluster().startClusterManagerOnlyNode();
800+
String dataNode = internalCluster().startDataOnlyNode();
819801

820802
// 1. Create index with 0 replica
821803
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1));

0 commit comments

Comments
 (0)