Skip to content

Commit 3ea0a31

Browse files
authored
Upload remote paths during index creation or full cluster upload (opensearch-project#13150)
* Upload remote paths during index creation or full cluster upload Signed-off-by: Ashish Singh <ssashish@amazon.com>
1 parent 97e3191 commit 3ea0a31

17 files changed

+1409
-128
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.remotestore;
10+
11+
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
12+
import org.opensearch.cluster.metadata.IndexMetadata;
13+
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.core.util.FileSystemUtils;
15+
import org.opensearch.index.remote.RemoteIndexPath;
16+
import org.opensearch.index.remote.RemoteStoreEnums;
17+
import org.opensearch.test.OpenSearchIntegTestCase;
18+
19+
import java.io.IOException;
20+
import java.util.Locale;
21+
import java.util.concurrent.ExecutionException;
22+
23+
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
24+
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING;
25+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
26+
27+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
28+
public class RemoteStoreUploadIndexPathIT extends RemoteStoreBaseIntegTestCase {
29+
30+
private final String INDEX_NAME = "remote-store-test-idx-1";
31+
32+
@Override
33+
protected Settings nodeSettings(int nodeOrdinal) {
34+
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
35+
}
36+
37+
/**
38+
* Checks that the remote index path file gets created for the intended remote store path type and does not get created
39+
* wherever not required.
40+
*/
41+
public void testRemoteIndexPathFileCreation() throws ExecutionException, InterruptedException, IOException {
42+
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
43+
internalCluster().startDataOnlyNodes(2);
44+
45+
// Case 1 - Hashed_prefix, we would need the remote index path file to be created.
46+
client(clusterManagerNode).admin()
47+
.cluster()
48+
.prepareUpdateSettings()
49+
.setTransientSettings(
50+
Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.HASHED_PREFIX)
51+
)
52+
.get();
53+
54+
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 1));
55+
validateRemoteIndexPathFile(true);
56+
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
57+
FileSystemUtils.deleteSubDirectories(translogRepoPath);
58+
FileSystemUtils.deleteSubDirectories(segmentRepoPath);
59+
60+
// Case 2 - Hashed_infix, we would not have the remote index path file created here.
61+
client(clusterManagerNode).admin()
62+
.cluster()
63+
.prepareUpdateSettings()
64+
.setTransientSettings(
65+
Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.HASHED_INFIX)
66+
)
67+
.get();
68+
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 1));
69+
validateRemoteIndexPathFile(false);
70+
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
71+
72+
// Case 3 - fixed, we would not have the remote index path file created here either.
73+
client(clusterManagerNode).admin()
74+
.cluster()
75+
.prepareUpdateSettings()
76+
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.FIXED))
77+
.get();
78+
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 1));
79+
validateRemoteIndexPathFile(false);
80+
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
81+
82+
}
83+
84+
private void validateRemoteIndexPathFile(boolean exists) {
85+
String indexUUID = client().admin()
86+
.indices()
87+
.prepareGetSettings(INDEX_NAME)
88+
.get()
89+
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
90+
91+
assertEquals(exists, FileSystemUtils.exists(translogRepoPath.resolve(RemoteIndexPath.DIR)));
92+
assertEquals(
93+
exists,
94+
FileSystemUtils.exists(
95+
translogRepoPath.resolve(RemoteIndexPath.DIR)
96+
.resolve(String.format(Locale.ROOT, RemoteIndexPath.FILE_NAME_FORMAT, indexUUID))
97+
)
98+
);
99+
assertEquals(exists, FileSystemUtils.exists(segmentRepoPath.resolve(RemoteIndexPath.DIR)));
100+
assertEquals(
101+
exists,
102+
FileSystemUtils.exists(
103+
segmentRepoPath.resolve(RemoteIndexPath.DIR)
104+
.resolve(String.format(Locale.ROOT, RemoteIndexPath.FILE_NAME_FORMAT, indexUUID))
105+
)
106+
);
107+
}
108+
}

server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class RemoteTransferContainer implements Closeable {
5252
private final String remoteFileName;
5353
private final boolean failTransferIfFileExists;
5454
private final WritePriority writePriority;
55-
private final long expectedChecksum;
55+
private final Long expectedChecksum;
5656
private final OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier;
5757
private final boolean isRemoteDataIntegritySupported;
5858
private final AtomicBoolean readBlock = new AtomicBoolean();
@@ -79,7 +79,7 @@ public RemoteTransferContainer(
7979
boolean failTransferIfFileExists,
8080
WritePriority writePriority,
8181
OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier,
82-
long expectedChecksum,
82+
Long expectedChecksum,
8383
boolean isRemoteDataIntegritySupported
8484
) {
8585
this(
@@ -115,7 +115,7 @@ public RemoteTransferContainer(
115115
boolean failTransferIfFileExists,
116116
WritePriority writePriority,
117117
OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier,
118-
long expectedChecksum,
118+
Long expectedChecksum,
119119
boolean isRemoteDataIntegritySupported,
120120
Map<String, String> metadata
121121
) {
@@ -230,15 +230,15 @@ private LocalStreamSupplier<InputStreamContainer> getMultipartStreamSupplier(
230230
}
231231

232232
private boolean isRemoteDataIntegrityCheckPossible() {
233-
return isRemoteDataIntegritySupported;
233+
return isRemoteDataIntegritySupported && Objects.nonNull(expectedChecksum);
234234
}
235235

236236
private void finalizeUpload(boolean uploadSuccessful) throws IOException {
237237
if (isRemoteDataIntegrityCheckPossible()) {
238238
return;
239239
}
240240

241-
if (uploadSuccessful) {
241+
if (uploadSuccessful && Objects.nonNull(expectedChecksum)) {
242242
long actualChecksum = getActualChecksum();
243243
if (actualChecksum != expectedChecksum) {
244244
throw new CorruptIndexException(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.gateway.remote;
10+
11+
import org.opensearch.cluster.metadata.IndexMetadata;
12+
import org.opensearch.core.action.ActionListener;
13+
import org.opensearch.threadpool.ThreadPool;
14+
15+
import java.util.List;
16+
import java.util.Objects;
17+
import java.util.concurrent.ExecutorService;
18+
19+
/**
20+
* Hook for running code that needs to be executed before the upload of index metadata. Here we have introduced a hook
21+
* for index creation (also triggerred after enabling the remote cluster statement for the first time). The listener
22+
* is intended to be run in parallel and async with the index metadata upload.
23+
*
24+
* @opensearch.internal
25+
*/
26+
public abstract class IndexMetadataUploadListener {
27+
28+
private final ExecutorService executorService;
29+
30+
public IndexMetadataUploadListener(ThreadPool threadPool, String threadPoolName) {
31+
Objects.requireNonNull(threadPool);
32+
Objects.requireNonNull(threadPoolName);
33+
assert ThreadPool.THREAD_POOL_TYPES.containsKey(threadPoolName) && ThreadPool.Names.SAME.equals(threadPoolName) == false;
34+
this.executorService = threadPool.executor(threadPoolName);
35+
}
36+
37+
/**
38+
* Runs before the new index upload of index metadata (or first time upload). The caller is expected to trigger
39+
* onSuccess or onFailure of the {@code ActionListener}.
40+
*
41+
* @param indexMetadataList list of index metadata of new indexes (or first time index metadata upload).
42+
* @param actionListener listener to be invoked on success or failure.
43+
*/
44+
public final void onNewIndexUpload(List<IndexMetadata> indexMetadataList, ActionListener<Void> actionListener) {
45+
executorService.execute(() -> doOnNewIndexUpload(indexMetadataList, actionListener));
46+
}
47+
48+
protected abstract void doOnNewIndexUpload(List<IndexMetadata> indexMetadataList, ActionListener<Void> actionListener);
49+
}

0 commit comments

Comments
 (0)