Skip to content

Commit b6c80b1

Browse files
authored
Refactor remote writeable entity and store to make it more reusable (#15210)
* Refactor remote writeable entity and store to make it more reusable Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
1 parent d442f7c commit b6c80b1

27 files changed

+176
-128
lines changed

server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@
2020
import org.opensearch.common.blobstore.BlobPath;
2121
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
2222
import org.opensearch.common.remote.RemoteWritableEntityStore;
23+
import org.opensearch.common.remote.RemoteWriteableEntityBlobStore;
2324
import org.opensearch.common.settings.ClusterSettings;
2425
import org.opensearch.common.settings.Settings;
2526
import org.opensearch.common.util.io.IOUtils;
2627
import org.opensearch.core.action.ActionListener;
2728
import org.opensearch.core.compress.Compressor;
2829
import org.opensearch.gateway.remote.ClusterMetadataManifest;
30+
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
2931
import org.opensearch.gateway.remote.RemoteStateTransferException;
30-
import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore;
3132
import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore;
3233
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable;
3334
import org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff;
@@ -262,12 +263,13 @@ protected void doStart() {
262263
clusterSettings
263264
);
264265

265-
this.remoteRoutingTableDiffStore = new RemoteClusterStateBlobStore<>(
266+
this.remoteRoutingTableDiffStore = new RemoteWriteableEntityBlobStore<>(
266267
new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool),
267268
blobStoreRepository,
268269
clusterName,
269270
threadPool,
270-
ThreadPool.Names.REMOTE_STATE_READ
271+
ThreadPool.Names.REMOTE_STATE_READ,
272+
RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN
271273
);
272274
}
273275

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.remote;
10+
11+
import org.opensearch.core.compress.Compressor;
12+
import org.opensearch.core.xcontent.NamedXContentRegistry;
13+
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
14+
15+
/**
16+
* An extension of {@link RemoteWriteableEntity} class which caters to the use case of writing to and reading from a blob storage
17+
*
18+
* @param <T> The class type which can be uploaded to or downloaded from a blob storage.
19+
*/
20+
public abstract class AbstractClusterMetadataWriteableBlobEntity<T> extends RemoteWriteableBlobEntity<T> {
21+
22+
protected final NamedXContentRegistry namedXContentRegistry;
23+
24+
public AbstractClusterMetadataWriteableBlobEntity(
25+
final String clusterUUID,
26+
final Compressor compressor,
27+
final NamedXContentRegistry namedXContentRegistry
28+
) {
29+
super(clusterUUID, compressor);
30+
this.namedXContentRegistry = namedXContentRegistry;
31+
}
32+
33+
public AbstractClusterMetadataWriteableBlobEntity(final String clusterUUID, final Compressor compressor) {
34+
super(clusterUUID, compressor);
35+
this.namedXContentRegistry = null;
36+
}
37+
38+
public abstract UploadedMetadata getUploadedMetadata();
39+
40+
public NamedXContentRegistry getNamedXContentRegistry() {
41+
return namedXContentRegistry;
42+
}
43+
}

server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public abstract class AbstractRemoteWritableEntityManager implements RemoteWrita
3131
* @return the remote writable entity store for the given entity
3232
* @throws IllegalArgumentException if the entity type is unknown
3333
*/
34-
protected RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) {
34+
protected RemoteWritableEntityStore getStore(AbstractClusterMetadataWriteableBlobEntity entity) {
3535
RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType());
3636
if (remoteStore == null) {
3737
throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]");
@@ -49,7 +49,7 @@ protected RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity en
4949
*/
5050
protected abstract ActionListener<Void> getWrappedWriteListener(
5151
String component,
52-
AbstractRemoteWritableBlobEntity remoteEntity,
52+
AbstractClusterMetadataWriteableBlobEntity remoteEntity,
5353
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
5454
);
5555

@@ -64,21 +64,21 @@ protected abstract ActionListener<Void> getWrappedWriteListener(
6464
*/
6565
protected abstract ActionListener<Object> getWrappedReadListener(
6666
String component,
67-
AbstractRemoteWritableBlobEntity remoteEntity,
67+
AbstractClusterMetadataWriteableBlobEntity remoteEntity,
6868
ActionListener<RemoteReadResult> listener
6969
);
7070

7171
@Override
7272
public void writeAsync(
7373
String component,
74-
AbstractRemoteWritableBlobEntity entity,
74+
AbstractClusterMetadataWriteableBlobEntity entity,
7575
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
7676
) {
7777
getStore(entity).writeAsync(entity, getWrappedWriteListener(component, entity, listener));
7878
}
7979

8080
@Override
81-
public void readAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<RemoteReadResult> listener) {
81+
public void readAsync(String component, AbstractClusterMetadataWriteableBlobEntity entity, ActionListener<RemoteReadResult> listener) {
8282
getStore(entity).readAsync(entity, getWrappedReadListener(component, entity, listener));
8383
}
8484
}

server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityManager.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public interface RemoteWritableEntityManager {
2929
* {@link ActionListener#onFailure(Exception)} method is called with
3030
* an exception if the read operation fails.
3131
*/
32-
void readAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<RemoteReadResult> listener);
32+
void readAsync(String component, AbstractClusterMetadataWriteableBlobEntity entity, ActionListener<RemoteReadResult> listener);
3333

3434
/**
3535
* Performs an asynchronous write operation for the specified component and entity.
@@ -43,5 +43,5 @@ public interface RemoteWritableEntityManager {
4343
* {@link ActionListener#onFailure(Exception)} method is called with
4444
* an exception if the write operation fails.
4545
*/
46-
void writeAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<UploadedMetadata> listener);
46+
void writeAsync(String component, AbstractClusterMetadataWriteableBlobEntity entity, ActionListener<UploadedMetadata> listener);
4747
}

server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java server/src/main/java/org/opensearch/common/remote/RemoteWriteableBlobEntity.java

+4-23
Original file line numberDiff line numberDiff line change
@@ -10,38 +10,25 @@
1010

1111
import org.opensearch.common.blobstore.BlobPath;
1212
import org.opensearch.core.compress.Compressor;
13-
import org.opensearch.core.xcontent.NamedXContentRegistry;
14-
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
1513

1614
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER;
1715

1816
/**
19-
* An extension of {@link RemoteWriteableEntity} class which caters to the use case of writing to and reading from a blob storage
20-
*
21-
* @param <T> The class type which can be uploaded to or downloaded from a blob storage.
17+
* The abstract class which represents a {@link RemoteWriteableEntity} that can be written to a store
18+
* @param <T> the entity to be written
2219
*/
23-
public abstract class AbstractRemoteWritableBlobEntity<T> implements RemoteWriteableEntity<T> {
20+
public abstract class RemoteWriteableBlobEntity<T> implements RemoteWriteableEntity<T> {
2421

2522
protected String blobFileName;
2623

2724
protected String blobName;
2825
private final String clusterUUID;
2926
private final Compressor compressor;
30-
private final NamedXContentRegistry namedXContentRegistry;
3127
private String[] pathTokens;
3228

33-
public AbstractRemoteWritableBlobEntity(
34-
final String clusterUUID,
35-
final Compressor compressor,
36-
final NamedXContentRegistry namedXContentRegistry
37-
) {
29+
public RemoteWriteableBlobEntity(final String clusterUUID, final Compressor compressor) {
3830
this.clusterUUID = clusterUUID;
3931
this.compressor = compressor;
40-
this.namedXContentRegistry = namedXContentRegistry;
41-
}
42-
43-
public AbstractRemoteWritableBlobEntity(final String clusterUUID, final Compressor compressor) {
44-
this(clusterUUID, compressor, null);
4532
}
4633

4734
public abstract BlobPathParameters getBlobPathParameters();
@@ -80,16 +67,10 @@ public String clusterUUID() {
8067
return clusterUUID;
8168
}
8269

83-
public abstract UploadedMetadata getUploadedMetadata();
84-
8570
public void setFullBlobName(BlobPath blobPath) {
8671
this.blobName = blobPath.buildAsString() + blobFileName;
8772
}
8873

89-
public NamedXContentRegistry getNamedXContentRegistry() {
90-
return namedXContentRegistry;
91-
}
92-
9374
protected Compressor getCompressor() {
9475
return compressor;
9576
}

server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java server/src/main/java/org/opensearch/common/remote/RemoteWriteableEntityBlobStore.java

+16-16
Original file line numberDiff line numberDiff line change
@@ -6,49 +6,48 @@
66
* compatible open source license.
77
*/
88

9-
package org.opensearch.gateway.remote.model;
9+
package org.opensearch.common.remote;
1010

1111
import org.opensearch.common.blobstore.BlobPath;
1212
import org.opensearch.common.blobstore.stream.write.WritePriority;
13-
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
14-
import org.opensearch.common.remote.RemoteWritableEntityStore;
15-
import org.opensearch.common.remote.RemoteWriteableEntity;
1613
import org.opensearch.core.action.ActionListener;
17-
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
1814
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
1915
import org.opensearch.repositories.blobstore.BlobStoreRepository;
2016
import org.opensearch.threadpool.ThreadPool;
2117

2218
import java.io.IOException;
2319
import java.io.InputStream;
20+
import java.nio.charset.StandardCharsets;
21+
import java.util.Base64;
2422
import java.util.concurrent.ExecutorService;
2523

26-
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN;
27-
2824
/**
2925
* Abstract class for a blob type storage
3026
*
3127
* @param <T> The entity which can be uploaded to / downloaded from blob store
3228
* @param <U> The concrete class implementing {@link RemoteWriteableEntity} which is used as a wrapper for T entity.
3329
*/
34-
public class RemoteClusterStateBlobStore<T, U extends AbstractRemoteWritableBlobEntity<T>> implements RemoteWritableEntityStore<T, U> {
30+
public class RemoteWriteableEntityBlobStore<T, U extends RemoteWriteableBlobEntity<T>> implements RemoteWritableEntityStore<T, U> {
3531

3632
private final BlobStoreTransferService transferService;
3733
private final BlobStoreRepository blobStoreRepository;
3834
private final String clusterName;
3935
private final ExecutorService executorService;
36+
private final String pathToken;
4037

41-
public RemoteClusterStateBlobStore(
38+
public RemoteWriteableEntityBlobStore(
4239
final BlobStoreTransferService blobStoreTransferService,
4340
final BlobStoreRepository blobStoreRepository,
4441
final String clusterName,
4542
final ThreadPool threadPool,
46-
final String executor
43+
final String executor,
44+
final String pathToken
4745
) {
4846
this.transferService = blobStoreTransferService;
4947
this.blobStoreRepository = blobStoreRepository;
5048
this.clusterName = clusterName;
5149
this.executorService = threadPool.executor(executor);
50+
this.pathToken = pathToken;
5251
}
5352

5453
@Override
@@ -95,21 +94,18 @@ public String getClusterName() {
9594
}
9695

9796
public BlobPath getBlobPathPrefix(String clusterUUID) {
98-
return blobStoreRepository.basePath()
99-
.add(RemoteClusterStateUtils.encodeString(getClusterName()))
100-
.add(CLUSTER_STATE_PATH_TOKEN)
101-
.add(clusterUUID);
97+
return blobStoreRepository.basePath().add(encodeString(getClusterName())).add(pathToken).add(clusterUUID);
10298
}
10399

104-
public BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity<T> obj) {
100+
public BlobPath getBlobPathForUpload(final RemoteWriteableBlobEntity<T> obj) {
105101
BlobPath blobPath = getBlobPathPrefix(obj.clusterUUID());
106102
for (String token : obj.getBlobPathParameters().getPathTokens()) {
107103
blobPath = blobPath.add(token);
108104
}
109105
return blobPath;
110106
}
111107

112-
public BlobPath getBlobPathForDownload(final AbstractRemoteWritableBlobEntity<T> obj) {
108+
public BlobPath getBlobPathForDownload(final RemoteWriteableBlobEntity<T> obj) {
113109
String[] pathTokens = obj.getBlobPathTokens();
114110
BlobPath blobPath = new BlobPath();
115111
if (pathTokens == null || pathTokens.length < 1) {
@@ -122,4 +118,8 @@ public BlobPath getBlobPathForDownload(final AbstractRemoteWritableBlobEntity<T>
122118
return blobPath;
123119
}
124120

121+
private static String encodeString(String content) {
122+
return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8));
123+
}
124+
125125
}

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java

+14-11
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@
1111
import org.opensearch.cluster.ClusterState;
1212
import org.opensearch.cluster.DiffableUtils;
1313
import org.opensearch.cluster.DiffableUtils.NonDiffableValueSerializer;
14-
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
14+
import org.opensearch.common.remote.AbstractClusterMetadataWriteableBlobEntity;
1515
import org.opensearch.common.remote.AbstractRemoteWritableEntityManager;
16+
import org.opensearch.common.remote.RemoteWriteableEntityBlobStore;
1617
import org.opensearch.core.action.ActionListener;
1718
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
1819
import org.opensearch.gateway.remote.model.RemoteClusterBlocks;
19-
import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore;
2020
import org.opensearch.gateway.remote.model.RemoteClusterStateCustoms;
2121
import org.opensearch.gateway.remote.model.RemoteDiscoveryNodes;
2222
import org.opensearch.gateway.remote.model.RemoteReadResult;
@@ -28,7 +28,7 @@
2828
import java.util.Map;
2929

3030
/**
31-
* A Manager which provides APIs to upload and download attributes of ClusterState to the {@link RemoteClusterStateBlobStore}
31+
* A Manager which provides APIs to upload and download attributes of ClusterState to the {@link RemoteWriteableEntityBlobStore}
3232
*
3333
* @opensearch.internal
3434
*/
@@ -47,40 +47,43 @@ public class RemoteClusterStateAttributesManager extends AbstractRemoteWritableE
4747
) {
4848
this.remoteWritableEntityStores.put(
4949
RemoteDiscoveryNodes.DISCOVERY_NODES,
50-
new RemoteClusterStateBlobStore<>(
50+
new RemoteWriteableEntityBlobStore<>(
5151
blobStoreTransferService,
5252
blobStoreRepository,
5353
clusterName,
5454
threadpool,
55-
ThreadPool.Names.REMOTE_STATE_READ
55+
ThreadPool.Names.REMOTE_STATE_READ,
56+
RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN
5657
)
5758
);
5859
this.remoteWritableEntityStores.put(
5960
RemoteClusterBlocks.CLUSTER_BLOCKS,
60-
new RemoteClusterStateBlobStore<>(
61+
new RemoteWriteableEntityBlobStore<>(
6162
blobStoreTransferService,
6263
blobStoreRepository,
6364
clusterName,
6465
threadpool,
65-
ThreadPool.Names.REMOTE_STATE_READ
66+
ThreadPool.Names.REMOTE_STATE_READ,
67+
RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN
6668
)
6769
);
6870
this.remoteWritableEntityStores.put(
6971
RemoteClusterStateCustoms.CLUSTER_STATE_CUSTOM,
70-
new RemoteClusterStateBlobStore<>(
72+
new RemoteWriteableEntityBlobStore<>(
7173
blobStoreTransferService,
7274
blobStoreRepository,
7375
clusterName,
7476
threadpool,
75-
ThreadPool.Names.REMOTE_STATE_READ
77+
ThreadPool.Names.REMOTE_STATE_READ,
78+
RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN
7679
)
7780
);
7881
}
7982

8083
@Override
8184
protected ActionListener<Void> getWrappedWriteListener(
8285
String component,
83-
AbstractRemoteWritableBlobEntity remoteEntity,
86+
AbstractClusterMetadataWriteableBlobEntity remoteEntity,
8487
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
8588
) {
8689
return ActionListener.wrap(
@@ -92,7 +95,7 @@ protected ActionListener<Void> getWrappedWriteListener(
9295
@Override
9396
protected ActionListener<Object> getWrappedReadListener(
9497
String component,
95-
AbstractRemoteWritableBlobEntity remoteEntity,
98+
AbstractClusterMetadataWriteableBlobEntity remoteEntity,
9699
ActionListener<RemoteReadResult> listener
97100
) {
98101
return ActionListener.wrap(

0 commit comments

Comments
 (0)