Skip to content

Commit 156eca3

Browse files
authored
[Remote Cluster State] Remote state interfaces (#13785)
* Remote Writable Entity interfaces Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
1 parent 619554d commit 156eca3

8 files changed

+340
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.remote;
10+
11+
import org.opensearch.common.blobstore.BlobPath;
12+
import org.opensearch.core.compress.Compressor;
13+
import org.opensearch.core.xcontent.NamedXContentRegistry;
14+
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
15+
16+
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER;
17+
18+
/**
19+
* An extension of {@link RemoteWriteableEntity} class which caters to the use case of writing to and reading from a blob storage
20+
*
21+
* @param <T> The class type which can be uploaded to or downloaded from a blob storage.
22+
*/
23+
public abstract class AbstractRemoteWritableBlobEntity<T> implements RemoteWriteableEntity<T> {
24+
25+
protected String blobFileName;
26+
27+
protected String blobName;
28+
private final String clusterUUID;
29+
private final Compressor compressor;
30+
private final NamedXContentRegistry namedXContentRegistry;
31+
private String[] pathTokens;
32+
33+
public AbstractRemoteWritableBlobEntity(
34+
final String clusterUUID,
35+
final Compressor compressor,
36+
final NamedXContentRegistry namedXContentRegistry
37+
) {
38+
this.clusterUUID = clusterUUID;
39+
this.compressor = compressor;
40+
this.namedXContentRegistry = namedXContentRegistry;
41+
}
42+
43+
public abstract BlobPathParameters getBlobPathParameters();
44+
45+
public String getFullBlobName() {
46+
return blobName;
47+
}
48+
49+
public String getBlobFileName() {
50+
if (blobFileName == null) {
51+
String[] pathTokens = getBlobPathTokens();
52+
if (pathTokens == null || pathTokens.length < 1) {
53+
return null;
54+
}
55+
blobFileName = pathTokens[pathTokens.length - 1];
56+
}
57+
return blobFileName;
58+
}
59+
60+
public String[] getBlobPathTokens() {
61+
if (pathTokens != null) {
62+
return pathTokens;
63+
}
64+
if (blobName == null) {
65+
return null;
66+
}
67+
pathTokens = blobName.split(PATH_DELIMITER);
68+
return pathTokens;
69+
}
70+
71+
public abstract String generateBlobFileName();
72+
73+
public String clusterUUID() {
74+
return clusterUUID;
75+
}
76+
77+
public abstract UploadedMetadata getUploadedMetadata();
78+
79+
public void setFullBlobName(BlobPath blobPath) {
80+
this.blobName = blobPath.buildAsString() + blobFileName;
81+
}
82+
83+
public NamedXContentRegistry getNamedXContentRegistry() {
84+
return namedXContentRegistry;
85+
}
86+
87+
protected Compressor getCompressor() {
88+
return compressor;
89+
}
90+
91+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.remote;
10+
11+
import java.util.List;
12+
13+
/**
14+
* Parameters which can be used to construct a blob path
15+
*
16+
*/
17+
public class BlobPathParameters {
18+
19+
private final List<String> pathTokens;
20+
private final String filePrefix;
21+
22+
public BlobPathParameters(final List<String> pathTokens, final String filePrefix) {
23+
this.pathTokens = pathTokens;
24+
this.filePrefix = filePrefix;
25+
}
26+
27+
public List<String> getPathTokens() {
28+
return pathTokens;
29+
}
30+
31+
public String getFilePrefix() {
32+
return filePrefix;
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.remote;
10+
11+
import org.opensearch.core.action.ActionListener;
12+
13+
import java.io.IOException;
14+
15+
/**
16+
* An interface to read/write an object from/to a remote storage. This interface is agnostic of the remote storage type.
17+
*
18+
* @param <T> The object type which can be uploaded to or downloaded from remote storage.
19+
* @param <U> The wrapper entity which provides methods for serializing/deserializing entity T.
20+
*/
21+
public interface RemoteWritableEntityStore<T, U extends RemoteWriteableEntity<T>> {
22+
23+
public void writeAsync(U entity, ActionListener<Void> listener);
24+
25+
public T read(U entity) throws IOException;
26+
27+
public void readAsync(U entity, ActionListener<T> listener);
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.remote;
10+
11+
import java.io.IOException;
12+
import java.io.InputStream;
13+
14+
/**
15+
* An interface to which provides defines the serialization/deserialization methods for objects to be uploaded to or downloaded from remote store.
16+
* This interface is agnostic of the remote storage type.
17+
*
18+
* @param <T> The object type which can be uploaded to or downloaded from remote storage.
19+
*/
20+
public interface RemoteWriteableEntity<T> {
21+
/**
22+
* @return An InputStream created by serializing the entity T
23+
* @throws IOException Exception encountered while serialization
24+
*/
25+
public InputStream serialize() throws IOException;
26+
27+
/**
28+
* @param inputStream The InputStream which is used to read the serialized entity
29+
* @return The entity T after deserialization
30+
* @throws IOException Exception encountered while deserialization
31+
*/
32+
public T deserialize(InputStream inputStream) throws IOException;
33+
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
/**
9+
* Common remote store package
10+
*/
11+
package org.opensearch.common.remote;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.gateway.remote;
10+
11+
import java.nio.charset.StandardCharsets;
12+
import java.util.Base64;
13+
14+
/**
15+
* Utility class for Remote Cluster State
16+
*/
17+
public class RemoteClusterStateUtils {
18+
public static final String PATH_DELIMITER = "/";
19+
20+
public static String encodeString(String content) {
21+
return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8));
22+
}
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.gateway.remote.model;
10+
11+
import org.opensearch.common.blobstore.BlobPath;
12+
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
13+
import org.opensearch.common.remote.RemoteWritableEntityStore;
14+
import org.opensearch.common.remote.RemoteWriteableEntity;
15+
import org.opensearch.core.action.ActionListener;
16+
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
17+
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
18+
import org.opensearch.repositories.blobstore.BlobStoreRepository;
19+
import org.opensearch.threadpool.ThreadPool;
20+
21+
import java.io.IOException;
22+
import java.io.InputStream;
23+
import java.util.concurrent.ExecutorService;
24+
25+
/**
26+
* Abstract class for a blob type storage
27+
*
28+
* @param <T> The entity which can be uploaded to / downloaded from blob store
29+
* @param <U> The concrete class implementing {@link RemoteWriteableEntity} which is used as a wrapper for T entity.
30+
*/
31+
public class RemoteClusterStateBlobStore<T, U extends AbstractRemoteWritableBlobEntity<T>> implements RemoteWritableEntityStore<T, U> {
32+
33+
private final BlobStoreTransferService transferService;
34+
private final BlobStoreRepository blobStoreRepository;
35+
private final String clusterName;
36+
private final ExecutorService executorService;
37+
38+
public RemoteClusterStateBlobStore(
39+
final BlobStoreTransferService blobStoreTransferService,
40+
final BlobStoreRepository blobStoreRepository,
41+
final String clusterName,
42+
final ThreadPool threadPool,
43+
final String executor
44+
) {
45+
this.transferService = blobStoreTransferService;
46+
this.blobStoreRepository = blobStoreRepository;
47+
this.clusterName = clusterName;
48+
this.executorService = threadPool.executor(executor);
49+
}
50+
51+
@Override
52+
public void writeAsync(final U entity, final ActionListener<Void> listener) {
53+
try {
54+
try (InputStream inputStream = entity.serialize()) {
55+
BlobPath blobPath = getBlobPathForUpload(entity);
56+
entity.setFullBlobName(blobPath);
57+
// TODO uncomment below logic after merging PR https://github.com/opensearch-project/OpenSearch/pull/13836
58+
// transferService.uploadBlob(inputStream, getBlobPathForUpload(entity), entity.getBlobFileName(), WritePriority.URGENT,
59+
// listener);
60+
}
61+
} catch (Exception e) {
62+
listener.onFailure(e);
63+
}
64+
}
65+
66+
public T read(final U entity) throws IOException {
67+
// TODO Add timing logs and tracing
68+
assert entity.getFullBlobName() != null;
69+
return entity.deserialize(transferService.downloadBlob(getBlobPathForDownload(entity), entity.getBlobFileName()));
70+
}
71+
72+
@Override
73+
public void readAsync(final U entity, final ActionListener<T> listener) {
74+
executorService.execute(() -> {
75+
try {
76+
listener.onResponse(read(entity));
77+
} catch (Exception e) {
78+
listener.onFailure(e);
79+
}
80+
});
81+
}
82+
83+
private BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity<T> obj) {
84+
BlobPath blobPath = blobStoreRepository.basePath()
85+
.add(RemoteClusterStateUtils.encodeString(clusterName))
86+
.add("cluster-state")
87+
.add(obj.clusterUUID());
88+
for (String token : obj.getBlobPathParameters().getPathTokens()) {
89+
blobPath = blobPath.add(token);
90+
}
91+
return blobPath;
92+
}
93+
94+
private BlobPath getBlobPathForDownload(final AbstractRemoteWritableBlobEntity<T> obj) {
95+
String[] pathTokens = obj.getBlobPathTokens();
96+
BlobPath blobPath = new BlobPath();
97+
if (pathTokens == null || pathTokens.length < 1) {
98+
return blobPath;
99+
}
100+
// Iterate till second last path token to get the blob folder
101+
for (int i = 0; i < pathTokens.length - 1; i++) {
102+
blobPath = blobPath.add(pathTokens[i]);
103+
}
104+
return blobPath;
105+
}
106+
107+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/**
10+
* Package containing models for remote cluster state
11+
*/
12+
package org.opensearch.gateway.remote.model;

0 commit comments

Comments
 (0)