Skip to content

Commit a50b2f8

Browse files
committed
Add support to run RemoteStoreIT with S3 integration
Signed-off-by: Ashish Singh <ssashish@amazon.com>
1 parent 9bef705 commit a50b2f8

File tree

18 files changed

+1672
-1180
lines changed

18 files changed

+1672
-1180
lines changed

plugins/repository-s3/build.gradle

+3
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ test {
141141
// this is tested explicitly in separate test tasks
142142
exclude '**/RepositoryCredentialsTests.class'
143143
exclude '**/S3RepositoryThirdPartyTests.class'
144+
exclude '**/S3RemoteStoreIT.class'
144145
}
145146

146147
boolean useFixture = false
@@ -252,6 +253,7 @@ processYamlRestTestResources {
252253
internalClusterTest {
253254
// this is tested explicitly in a separate test task
254255
exclude '**/S3RepositoryThirdPartyTests.class'
256+
exclude '**/S3RemoteStoreIT.class'
255257
}
256258

257259
yamlRestTest {
@@ -408,6 +410,7 @@ TaskProvider s3ThirdPartyTest = tasks.register("s3ThirdPartyTest", Test) {
408410
setTestClassesDirs(internalTestSourceSet.getOutput().getClassesDirs())
409411
setClasspath(internalTestSourceSet.getRuntimeClasspath())
410412
include '**/S3RepositoryThirdPartyTests.class'
413+
include '**/S3RemoteStoreIT.class'
411414
systemProperty 'test.s3.account', s3PermanentAccessKey
412415
systemProperty 'test.s3.key', s3PermanentSecretKey
413416
systemProperty 'test.s3.bucket', s3PermanentBucket
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.repositories.s3;
10+
11+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
12+
13+
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
14+
import org.opensearch.common.SuppressForbidden;
15+
import org.opensearch.common.blobstore.BlobPath;
16+
import org.opensearch.common.settings.MockSecureSettings;
17+
import org.opensearch.common.settings.SecureSettings;
18+
import org.opensearch.common.settings.Settings;
19+
import org.opensearch.common.unit.TimeValue;
20+
import org.opensearch.core.common.Strings;
21+
import org.opensearch.index.remote.RemoteStoreEnums;
22+
import org.opensearch.indices.RemoteStoreSettings;
23+
import org.opensearch.plugins.Plugin;
24+
import org.opensearch.remotestore.RemoteStoreBaseIT;
25+
import org.opensearch.repositories.RepositoriesService;
26+
import org.opensearch.repositories.blobstore.BlobStoreRepository;
27+
import org.opensearch.threadpool.ThreadPool;
28+
import org.opensearch.threadpool.ThreadPoolStats;
29+
import org.junit.Ignore;
30+
31+
import java.io.IOException;
32+
import java.nio.file.Path;
33+
import java.util.Collection;
34+
import java.util.Collections;
35+
import java.util.Locale;
36+
import java.util.concurrent.ExecutionException;
37+
38+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
39+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
40+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
41+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
42+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
43+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
44+
45+
@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
46+
public class S3RemoteStoreIT extends RemoteStoreBaseIT {
47+
48+
@Override
49+
@SuppressForbidden(reason = "Need to set system property here for AWS SDK v2")
50+
public void setUp() throws Exception {
51+
SocketAccess.doPrivileged(() -> System.setProperty("opensearch.path.conf", "config"));
52+
super.setUp();
53+
}
54+
55+
@Override
56+
@SuppressForbidden(reason = "Need to reset system property here for AWS SDK v2")
57+
public void tearDown() throws Exception {
58+
SocketAccess.doPrivileged(() -> System.clearProperty("opensearch.path.conf"));
59+
clearIndices();
60+
waitForEmptyRemotePurgeQueue();
61+
super.tearDown();
62+
}
63+
64+
private void clearIndices() throws Exception {
65+
assertAcked(client().admin().indices().delete(new DeleteIndexRequest("*")).get());
66+
}
67+
68+
@Override
69+
protected Collection<Class<? extends Plugin>> nodePlugins() {
70+
return Collections.singletonList(S3RepositoryPlugin.class);
71+
}
72+
73+
@Override
74+
protected Settings nodeSettings(int nodeOrdinal) {
75+
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).setSecureSettings(credentials()).build();
76+
}
77+
78+
private SecureSettings credentials() {
79+
assertFalse(Strings.isNullOrEmpty(System.getProperty("test.s3.account")));
80+
assertFalse(Strings.isNullOrEmpty(System.getProperty("test.s3.key")));
81+
assertFalse(Strings.isNullOrEmpty(System.getProperty("test.s3.bucket")));
82+
83+
MockSecureSettings secureSettings = new MockSecureSettings();
84+
secureSettings.setString("s3.client.default.access_key", System.getProperty("test.s3.account"));
85+
secureSettings.setString("s3.client.default.secret_key", System.getProperty("test.s3.key"));
86+
return secureSettings;
87+
}
88+
89+
@Override
90+
protected Settings remoteStoreRepoSettings() {
91+
92+
String segmentRepoName = REPOSITORY_NAME;
93+
String translogRepoName = REPOSITORY_2_NAME;
94+
String stateRepoName = REPOSITORY_3_NAME;
95+
String segmentRepoTypeAttributeKey = String.format(
96+
Locale.getDefault(),
97+
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
98+
segmentRepoName
99+
);
100+
String segmentRepoSettingsAttributeKeyPrefix = String.format(
101+
Locale.getDefault(),
102+
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
103+
segmentRepoName
104+
);
105+
String translogRepoTypeAttributeKey = String.format(
106+
Locale.getDefault(),
107+
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
108+
translogRepoName
109+
);
110+
String translogRepoSettingsAttributeKeyPrefix = String.format(
111+
Locale.getDefault(),
112+
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
113+
translogRepoName
114+
);
115+
String stateRepoTypeAttributeKey = String.format(
116+
Locale.getDefault(),
117+
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
118+
stateRepoName
119+
);
120+
String stateRepoSettingsAttributeKeyPrefix = String.format(
121+
Locale.getDefault(),
122+
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
123+
stateRepoName
124+
);
125+
126+
String prefixModeVerificationSuffix = BlobStoreRepository.PREFIX_MODE_VERIFICATION_SETTING.getKey();
127+
128+
String bucket = System.getProperty("test.s3.bucket");
129+
String region = System.getProperty("test.s3.region", "us-west-2");
130+
String basePath = System.getProperty("test.s3.base", "testpath");
131+
String segmentBasePath = basePath + "-segments";
132+
String translogBasePath = basePath + "-translog";
133+
String stateBasePath = basePath + "-state";
134+
135+
Settings.Builder settings = Settings.builder()
136+
.put("node.attr." + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName)
137+
.put(segmentRepoTypeAttributeKey, S3Repository.TYPE)
138+
.put(segmentRepoSettingsAttributeKeyPrefix + "bucket", bucket)
139+
.put(segmentRepoSettingsAttributeKeyPrefix + "region", region)
140+
.put(segmentRepoSettingsAttributeKeyPrefix + "base_path", segmentBasePath)
141+
.put(segmentRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, prefixModeVerificationEnable)
142+
.put("node.attr." + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, translogRepoName)
143+
.put(translogRepoTypeAttributeKey, S3Repository.TYPE)
144+
.put(translogRepoSettingsAttributeKeyPrefix + "bucket", bucket)
145+
.put(translogRepoSettingsAttributeKeyPrefix + "region", region)
146+
.put(translogRepoSettingsAttributeKeyPrefix + "base_path", translogBasePath)
147+
.put(translogRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, prefixModeVerificationEnable)
148+
.put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, stateRepoName)
149+
.put(stateRepoTypeAttributeKey, S3Repository.TYPE)
150+
.put(stateRepoSettingsAttributeKeyPrefix + "bucket", bucket)
151+
.put(stateRepoSettingsAttributeKeyPrefix + "region", region)
152+
.put(stateRepoSettingsAttributeKeyPrefix + "base_path", stateBasePath)
153+
.put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, prefixModeVerificationEnable);
154+
155+
final String endpoint = System.getProperty("test.s3.endpoint");
156+
if (endpoint != null) {
157+
settings.put(segmentRepoSettingsAttributeKeyPrefix + "endpoint", endpoint);
158+
settings.put(translogRepoSettingsAttributeKeyPrefix + "endpoint", endpoint);
159+
settings.put(stateRepoSettingsAttributeKeyPrefix + "endpoint", endpoint);
160+
}
161+
162+
settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), randomFrom(RemoteStoreEnums.PathType.values()));
163+
settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.getKey(), randomBoolean());
164+
settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), randomBoolean());
165+
settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.getKey(), segmentsPathFixedPrefix ? "a" : "");
166+
settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.getKey(), translogPathFixedPrefix ? "b" : "");
167+
settings.put(BlobStoreRepository.SNAPSHOT_SHARD_PATH_PREFIX_SETTING.getKey(), snapshotShardPathFixedPrefix ? "c" : "");
168+
169+
return settings.build();
170+
}
171+
172+
@Override
173+
@Ignore("assertion of cluster health timeout trips")
174+
public void testNoMultipleWriterDuringPrimaryRelocation() throws ExecutionException, InterruptedException {
175+
super.testNoMultipleWriterDuringPrimaryRelocation();
176+
}
177+
178+
@Override
179+
@Ignore("assertion of cluster health timeout trips")
180+
public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionException, InterruptedException, IOException {
181+
super.testResumeUploadAfterFailedPrimaryRelocation();
182+
}
183+
184+
@Override
185+
@Ignore("Test times out due to too many translog upload")
186+
public void testFlushOnTooManyRemoteTranslogFiles() throws Exception {
187+
super.testFlushOnTooManyRemoteTranslogFiles();
188+
}
189+
190+
@Override
191+
protected boolean addMockIndexStorePlugin() {
192+
return false;
193+
}
194+
195+
protected BlobStoreRepository getRepository() {
196+
return (BlobStoreRepository) internalCluster().getDataNodeInstance(RepositoriesService.class).repository(REPOSITORY_2_NAME);
197+
}
198+
199+
@Override
200+
protected int getActualFileCount(Path ignoredSegmentRepoPath, String shardPath) throws IOException {
201+
BlobStoreRepository repository = getRepository();
202+
return repository.blobStore().blobContainer(BlobPath.cleanPath().add(shardPath)).listBlobs().size();
203+
}
204+
205+
@Override
206+
protected void delete(Path baseRepoPath, String shardPath) throws IOException {
207+
BlobStoreRepository repository = getRepository();
208+
repository.blobStore().blobContainer(repository.basePath().add(shardPath)).delete();
209+
}
210+
211+
private void waitForEmptyRemotePurgeQueue() throws Exception {
212+
if (internalCluster().getDataNodeNames().isEmpty()) {
213+
return;
214+
}
215+
assertBusyWithFixedSleepTime(() -> {
216+
ThreadPoolStats.Stats remotePurgeThreadPoolStats = getRemotePurgeThreadPoolStats();
217+
assertEquals(0, remotePurgeThreadPoolStats.getQueue());
218+
assertEquals(0, remotePurgeThreadPoolStats.getQueue());
219+
}, TimeValue.timeValueSeconds(60), TimeValue.timeValueMillis(500));
220+
}
221+
222+
ThreadPoolStats.Stats getRemotePurgeThreadPoolStats() {
223+
final ThreadPoolStats stats = internalCluster().getDataNodeInstance(ThreadPool.class).stats();
224+
for (ThreadPoolStats.Stats s : stats) {
225+
if (s.getName().equals(ThreadPool.Names.REMOTE_PURGE)) {
226+
return s;
227+
}
228+
}
229+
throw new AssertionError("refresh thread pool stats not found [" + stats + "]");
230+
}
231+
232+
@Override
233+
protected BlobPath getSegmentBasePath() {
234+
String basePath = System.getProperty("test.s3.base", "testpath");
235+
String segmentBasePath = basePath + "-segments";
236+
return BlobPath.cleanPath().add(segmentBasePath);
237+
}
238+
}

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ private <T> T getFutureValue(PlainActionFuture<T> future) throws IOException {
392392
return future.get();
393393
} catch (InterruptedException e) {
394394
Thread.currentThread().interrupt();
395-
throw new IllegalStateException("Future got interrupted", e);
395+
throw new IOException("Future got interrupted", e);
396396
} catch (ExecutionException e) {
397397
if (e.getCause() instanceof IOException) {
398398
throw (IOException) e.getCause();

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3ClientSettings.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
/**
6464
* A container for settings used to create an S3 client.
6565
*/
66-
final class S3ClientSettings {
66+
public final class S3ClientSettings {
6767

6868
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(S3ClientSettings.class);
6969

0 commit comments

Comments
 (0)