Skip to content

Commit 44994aa

Browse files
committed
Add support to run RemoteStoreIT with S3 integration
Signed-off-by: Ashish Singh <ssashish@amazon.com>
1 parent 9bef705 commit 44994aa

File tree

18 files changed

+1671
-1180
lines changed

18 files changed

+1671
-1180
lines changed

plugins/repository-s3/build.gradle

+3
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ test {
141141
// this is tested explicitly in separate test tasks
142142
exclude '**/RepositoryCredentialsTests.class'
143143
exclude '**/S3RepositoryThirdPartyTests.class'
144+
exclude '**/S3RemoteStoreIT.class'
144145
}
145146

146147
boolean useFixture = false
@@ -252,6 +253,7 @@ processYamlRestTestResources {
252253
internalClusterTest {
253254
// this is tested explicitly in a separate test task
254255
exclude '**/S3RepositoryThirdPartyTests.class'
256+
exclude '**/S3RemoteStoreIT.class'
255257
}
256258

257259
yamlRestTest {
@@ -408,6 +410,7 @@ TaskProvider s3ThirdPartyTest = tasks.register("s3ThirdPartyTest", Test) {
408410
setTestClassesDirs(internalTestSourceSet.getOutput().getClassesDirs())
409411
setClasspath(internalTestSourceSet.getRuntimeClasspath())
410412
include '**/S3RepositoryThirdPartyTests.class'
413+
include '**/S3RemoteStoreIT.class'
411414
systemProperty 'test.s3.account', s3PermanentAccessKey
412415
systemProperty 'test.s3.key', s3PermanentSecretKey
413416
systemProperty 'test.s3.bucket', s3PermanentBucket
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.repositories.s3;
10+
11+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
12+
13+
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
14+
import org.opensearch.common.SuppressForbidden;
15+
import org.opensearch.common.blobstore.BlobPath;
16+
import org.opensearch.common.settings.MockSecureSettings;
17+
import org.opensearch.common.settings.SecureSettings;
18+
import org.opensearch.common.settings.Settings;
19+
import org.opensearch.common.unit.TimeValue;
20+
import org.opensearch.core.common.Strings;
21+
import org.opensearch.index.remote.RemoteStoreEnums;
22+
import org.opensearch.indices.RemoteStoreSettings;
23+
import org.opensearch.plugins.Plugin;
24+
import org.opensearch.remotestore.RemoteStoreBaseIT;
25+
import org.opensearch.repositories.RepositoriesService;
26+
import org.opensearch.repositories.blobstore.BlobStoreRepository;
27+
import org.opensearch.threadpool.ThreadPool;
28+
import org.opensearch.threadpool.ThreadPoolStats;
29+
30+
import java.io.IOException;
31+
import java.nio.file.Path;
32+
import java.util.Collection;
33+
import java.util.Collections;
34+
import java.util.Locale;
35+
import java.util.concurrent.ExecutionException;
36+
37+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
38+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
39+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
40+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
41+
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
42+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
43+
44+
@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
45+
public class S3RemoteStoreIT extends RemoteStoreBaseIT {
46+
47+
@Override
48+
@SuppressForbidden(reason = "Need to set system property here for AWS SDK v2")
49+
public void setUp() throws Exception {
50+
SocketAccess.doPrivileged(() -> System.setProperty("opensearch.path.conf", "config"));
51+
super.setUp();
52+
}
53+
54+
@Override
55+
@SuppressForbidden(reason = "Need to reset system property here for AWS SDK v2")
56+
public void tearDown() throws Exception {
57+
SocketAccess.doPrivileged(() -> System.clearProperty("opensearch.path.conf"));
58+
clearIndices();
59+
waitForEmptyRemotePurgeQueue();
60+
super.tearDown();
61+
}
62+
63+
private void clearIndices() throws Exception {
64+
assertAcked(client().admin().indices().delete(new DeleteIndexRequest("*")).get());
65+
}
66+
67+
@Override
68+
protected Collection<Class<? extends Plugin>> nodePlugins() {
69+
return Collections.singletonList(S3RepositoryPlugin.class);
70+
}
71+
72+
@Override
73+
protected Settings nodeSettings(int nodeOrdinal) {
74+
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).setSecureSettings(credentials()).build();
75+
}
76+
77+
private SecureSettings credentials() {
78+
assertFalse(Strings.isNullOrEmpty(System.getProperty("test.s3.account")));
79+
assertFalse(Strings.isNullOrEmpty(System.getProperty("test.s3.key")));
80+
assertFalse(Strings.isNullOrEmpty(System.getProperty("test.s3.bucket")));
81+
82+
MockSecureSettings secureSettings = new MockSecureSettings();
83+
secureSettings.setString("s3.client.default.access_key", System.getProperty("test.s3.account"));
84+
secureSettings.setString("s3.client.default.secret_key", System.getProperty("test.s3.key"));
85+
return secureSettings;
86+
}
87+
88+
@Override
89+
protected Settings remoteStoreRepoSettings() {
90+
91+
String segmentRepoName = REPOSITORY_NAME;
92+
String translogRepoName = REPOSITORY_2_NAME;
93+
String stateRepoName = REPOSITORY_3_NAME;
94+
String segmentRepoTypeAttributeKey = String.format(
95+
Locale.getDefault(),
96+
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
97+
segmentRepoName
98+
);
99+
String segmentRepoSettingsAttributeKeyPrefix = String.format(
100+
Locale.getDefault(),
101+
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
102+
segmentRepoName
103+
);
104+
String translogRepoTypeAttributeKey = String.format(
105+
Locale.getDefault(),
106+
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
107+
translogRepoName
108+
);
109+
String translogRepoSettingsAttributeKeyPrefix = String.format(
110+
Locale.getDefault(),
111+
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
112+
translogRepoName
113+
);
114+
String stateRepoTypeAttributeKey = String.format(
115+
Locale.getDefault(),
116+
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
117+
stateRepoName
118+
);
119+
String stateRepoSettingsAttributeKeyPrefix = String.format(
120+
Locale.getDefault(),
121+
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
122+
stateRepoName
123+
);
124+
125+
String prefixModeVerificationSuffix = BlobStoreRepository.PREFIX_MODE_VERIFICATION_SETTING.getKey();
126+
127+
String bucket = System.getProperty("test.s3.bucket");
128+
String region = System.getProperty("test.s3.region", "us-west-2");
129+
String basePath = System.getProperty("test.s3.base", "testpath");
130+
String segmentBasePath = basePath + "-segments";
131+
String translogBasePath = basePath + "-translog";
132+
String stateBasePath = basePath + "-state";
133+
134+
Settings.Builder settings = Settings.builder()
135+
.put("node.attr." + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName)
136+
.put(segmentRepoTypeAttributeKey, S3Repository.TYPE)
137+
.put(segmentRepoSettingsAttributeKeyPrefix + "bucket", bucket)
138+
.put(segmentRepoSettingsAttributeKeyPrefix + "region", region)
139+
.put(segmentRepoSettingsAttributeKeyPrefix + "base_path", segmentBasePath)
140+
.put(segmentRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, prefixModeVerificationEnable)
141+
.put("node.attr." + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, translogRepoName)
142+
.put(translogRepoTypeAttributeKey, S3Repository.TYPE)
143+
.put(translogRepoSettingsAttributeKeyPrefix + "bucket", bucket)
144+
.put(translogRepoSettingsAttributeKeyPrefix + "region", region)
145+
.put(translogRepoSettingsAttributeKeyPrefix + "base_path", translogBasePath)
146+
.put(translogRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, prefixModeVerificationEnable)
147+
.put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, stateRepoName)
148+
.put(stateRepoTypeAttributeKey, S3Repository.TYPE)
149+
.put(stateRepoSettingsAttributeKeyPrefix + "bucket", bucket)
150+
.put(stateRepoSettingsAttributeKeyPrefix + "region", region)
151+
.put(stateRepoSettingsAttributeKeyPrefix + "base_path", stateBasePath)
152+
.put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, prefixModeVerificationEnable);
153+
154+
final String endpoint = System.getProperty("test.s3.endpoint");
155+
if (endpoint != null) {
156+
settings.put(segmentRepoSettingsAttributeKeyPrefix + "endpoint", endpoint);
157+
settings.put(translogRepoSettingsAttributeKeyPrefix + "endpoint", endpoint);
158+
settings.put(stateRepoSettingsAttributeKeyPrefix + "endpoint", endpoint);
159+
}
160+
161+
settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), randomFrom(RemoteStoreEnums.PathType.values()));
162+
settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.getKey(), randomBoolean());
163+
settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), randomBoolean());
164+
settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.getKey(), segmentsPathFixedPrefix ? "a" : "");
165+
settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.getKey(), translogPathFixedPrefix ? "b" : "");
166+
settings.put(BlobStoreRepository.SNAPSHOT_SHARD_PATH_PREFIX_SETTING.getKey(), snapshotShardPathFixedPrefix ? "c" : "");
167+
168+
return settings.build();
169+
}
170+
171+
@Override
172+
@AwaitsFix(bugUrl = "assertion of cluster health timeout trips")
173+
public void testNoMultipleWriterDuringPrimaryRelocation() throws ExecutionException, InterruptedException {
174+
super.testNoMultipleWriterDuringPrimaryRelocation();
175+
}
176+
177+
@Override
178+
@AwaitsFix(bugUrl = "assertion of cluster health timeout trips")
179+
public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionException, InterruptedException, IOException {
180+
super.testResumeUploadAfterFailedPrimaryRelocation();
181+
}
182+
183+
@Override
184+
@AwaitsFix(bugUrl = "Test times out due to too many translog upload")
185+
public void testFlushOnTooManyRemoteTranslogFiles() throws Exception {
186+
super.testFlushOnTooManyRemoteTranslogFiles();
187+
}
188+
189+
@Override
190+
protected boolean addMockIndexStorePlugin() {
191+
return false;
192+
}
193+
194+
protected BlobStoreRepository getRepository() {
195+
return (BlobStoreRepository) internalCluster().getDataNodeInstance(RepositoriesService.class).repository(REPOSITORY_2_NAME);
196+
}
197+
198+
@Override
199+
protected int getActualFileCount(Path ignoredSegmentRepoPath, String shardPath) throws IOException {
200+
BlobStoreRepository repository = getRepository();
201+
return repository.blobStore().blobContainer(BlobPath.cleanPath().add(shardPath)).listBlobs().size();
202+
}
203+
204+
@Override
205+
protected void delete(Path baseRepoPath, String shardPath) throws IOException {
206+
BlobStoreRepository repository = getRepository();
207+
repository.blobStore().blobContainer(repository.basePath().add(shardPath)).delete();
208+
}
209+
210+
private void waitForEmptyRemotePurgeQueue() throws Exception {
211+
if (internalCluster().getDataNodeNames().isEmpty()) {
212+
return;
213+
}
214+
assertBusyWithFixedSleepTime(() -> {
215+
ThreadPoolStats.Stats remotePurgeThreadPoolStats = getRemotePurgeThreadPoolStats();
216+
assertEquals(0, remotePurgeThreadPoolStats.getQueue());
217+
assertEquals(0, remotePurgeThreadPoolStats.getQueue());
218+
}, TimeValue.timeValueSeconds(60), TimeValue.timeValueMillis(500));
219+
}
220+
221+
ThreadPoolStats.Stats getRemotePurgeThreadPoolStats() {
222+
final ThreadPoolStats stats = internalCluster().getDataNodeInstance(ThreadPool.class).stats();
223+
for (ThreadPoolStats.Stats s : stats) {
224+
if (s.getName().equals(ThreadPool.Names.REMOTE_PURGE)) {
225+
return s;
226+
}
227+
}
228+
throw new AssertionError("refresh thread pool stats not found [" + stats + "]");
229+
}
230+
231+
@Override
232+
protected BlobPath getSegmentBasePath() {
233+
String basePath = System.getProperty("test.s3.base", "testpath");
234+
String segmentBasePath = basePath + "-segments";
235+
return BlobPath.cleanPath().add(segmentBasePath);
236+
}
237+
}

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ private <T> T getFutureValue(PlainActionFuture<T> future) throws IOException {
392392
return future.get();
393393
} catch (InterruptedException e) {
394394
Thread.currentThread().interrupt();
395-
throw new IllegalStateException("Future got interrupted", e);
395+
throw new IOException("Future got interrupted", e);
396396
} catch (ExecutionException e) {
397397
if (e.getCause() instanceof IOException) {
398398
throw (IOException) e.getCause();

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3ClientSettings.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
/**
6464
* A container for settings used to create an S3 client.
6565
*/
66-
final class S3ClientSettings {
66+
public final class S3ClientSettings {
6767

6868
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(S3ClientSettings.class);
6969

0 commit comments

Comments
 (0)