Skip to content

Commit 32e87fc

Browse files
shiv0408wangdongyu.danny
authored and
wangdongyu.danny
committed
Add POJO classes required for cluster state publication from remote (opensearch-project#14006)
* Add POJO classes required for cluster state publication from remote * Use InputStreams rather than XContent for serialization for ehpemeral objects Signed-off-by: Shivansh Arora <hishiv@amazon.com> * Add remote routing table changes in diff Manifest Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
1 parent 0a8ab15 commit 32e87fc

26 files changed

+3399
-233
lines changed

server/src/main/java/org/opensearch/cluster/DiffableUtils.java

+16
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,18 @@ public void writeDiff(Diff<V> value, StreamOutput out) throws IOException {
494494
* @opensearch.internal
495495
*/
496496
public abstract static class NonDiffableValueSerializer<K, V> implements ValueSerializer<K, V> {
497+
private static final NonDiffableValueSerializer ABSTRACT_INSTANCE = new NonDiffableValueSerializer<>() {
498+
@Override
499+
public void write(Object value, StreamOutput out) {
500+
throw new UnsupportedOperationException();
501+
}
502+
503+
@Override
504+
public Object read(StreamInput in, Object key) {
505+
throw new UnsupportedOperationException();
506+
}
507+
};
508+
497509
@Override
498510
public boolean supportsDiffableValues() {
499511
return false;
@@ -513,6 +525,10 @@ public void writeDiff(Diff<V> value, StreamOutput out) throws IOException {
513525
public Diff<V> readDiff(StreamInput in, K key) throws IOException {
514526
throw new UnsupportedOperationException();
515527
}
528+
529+
public static <K, V> NonDiffableValueSerializer<K, V> getAbstractInstance() {
530+
return ABSTRACT_INSTANCE;
531+
}
516532
}
517533

518534
/**

server/src/main/java/org/opensearch/cluster/metadata/DiffableStringMap.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public static DiffableStringMap readFrom(StreamInput in) throws IOException {
6666
return map.isEmpty() ? EMPTY : new DiffableStringMap(map);
6767
}
6868

69-
DiffableStringMap(final Map<String, String> map) {
69+
public DiffableStringMap(final Map<String, String> map) {
7070
this.innerMap = Collections.unmodifiableMap(map);
7171
}
7272

server/src/main/java/org/opensearch/cluster/metadata/Metadata.java

+4
Original file line numberDiff line numberDiff line change
@@ -973,6 +973,10 @@ public static boolean isSettingsMetadataEqual(Metadata metadata1, Metadata metad
973973
return metadata1.persistentSettings.equals(metadata2.persistentSettings);
974974
}
975975

976+
public static boolean isTransientSettingsMetadataEqual(Metadata metadata1, Metadata metadata2) {
977+
return metadata1.transientSettings.equals(metadata2.transientSettings);
978+
}
979+
976980
public static boolean isTemplatesMetadataEqual(Metadata metadata1, Metadata metadata2) {
977981
return metadata1.templates.equals(metadata2.templates);
978982
}

server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* Interface for RemoteRoutingTableService. Exposes methods to orchestrate upload and download of routing table from remote store.
2929
*/
3030
public interface RemoteRoutingTableService extends LifecycleComponent {
31-
static final DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER =
31+
public static final DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER =
3232
new DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable>() {
3333
@Override
3434
public void write(IndexRoutingTable value, StreamOutput out) throws IOException {

server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java

+334-39
Large diffs are not rendered by default.

server/src/main/java/org/opensearch/gateway/remote/ClusterStateDiffManifest.java

+656-3
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.gateway.remote;
10+
11+
import org.opensearch.action.LatchedActionListener;
12+
import org.opensearch.cluster.ClusterState;
13+
import org.opensearch.cluster.ClusterState.Custom;
14+
import org.opensearch.cluster.block.ClusterBlocks;
15+
import org.opensearch.cluster.node.DiscoveryNodes;
16+
import org.opensearch.common.CheckedRunnable;
17+
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
18+
import org.opensearch.core.action.ActionListener;
19+
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
20+
import org.opensearch.core.compress.Compressor;
21+
import org.opensearch.core.xcontent.NamedXContentRegistry;
22+
import org.opensearch.core.xcontent.ToXContent;
23+
import org.opensearch.gateway.remote.model.RemoteClusterBlocks;
24+
import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore;
25+
import org.opensearch.gateway.remote.model.RemoteClusterStateCustoms;
26+
import org.opensearch.gateway.remote.model.RemoteDiscoveryNodes;
27+
import org.opensearch.gateway.remote.model.RemoteReadResult;
28+
29+
import java.io.IOException;
30+
import java.util.HashMap;
31+
import java.util.HashSet;
32+
import java.util.Map;
33+
import java.util.Set;
34+
35+
/**
36+
* A Manager which provides APIs to upload and download attributes of ClusterState to the {@link RemoteClusterStateBlobStore}
37+
*
38+
* @opensearch.internal
39+
*/
40+
public class RemoteClusterStateAttributesManager {
41+
public static final String CLUSTER_STATE_ATTRIBUTE = "cluster_state_attribute";
42+
public static final String DISCOVERY_NODES = "nodes";
43+
public static final String CLUSTER_BLOCKS = "blocks";
44+
public static final int CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION = 1;
45+
private final RemoteClusterStateBlobStore<ClusterBlocks, RemoteClusterBlocks> clusterBlocksBlobStore;
46+
private final RemoteClusterStateBlobStore<DiscoveryNodes, RemoteDiscoveryNodes> discoveryNodesBlobStore;
47+
private final RemoteClusterStateBlobStore<Custom, RemoteClusterStateCustoms> customsBlobStore;
48+
private final Compressor compressor;
49+
private final NamedXContentRegistry namedXContentRegistry;
50+
private final NamedWriteableRegistry namedWriteableRegistry;
51+
52+
RemoteClusterStateAttributesManager(
53+
RemoteClusterStateBlobStore<ClusterBlocks, RemoteClusterBlocks> clusterBlocksBlobStore,
54+
RemoteClusterStateBlobStore<DiscoveryNodes, RemoteDiscoveryNodes> discoveryNodesBlobStore,
55+
RemoteClusterStateBlobStore<Custom, RemoteClusterStateCustoms> customsBlobStore,
56+
Compressor compressor,
57+
NamedXContentRegistry namedXContentRegistry,
58+
NamedWriteableRegistry namedWriteableRegistry
59+
) {
60+
this.clusterBlocksBlobStore = clusterBlocksBlobStore;
61+
this.discoveryNodesBlobStore = discoveryNodesBlobStore;
62+
this.customsBlobStore = customsBlobStore;
63+
this.compressor = compressor;
64+
this.namedXContentRegistry = namedXContentRegistry;
65+
this.namedWriteableRegistry = namedWriteableRegistry;
66+
}
67+
68+
/**
69+
* Allows async upload of Cluster State Attribute components to remote
70+
*/
71+
CheckedRunnable<IOException> getAsyncMetadataWriteAction(
72+
String component,
73+
AbstractRemoteWritableBlobEntity blobEntity,
74+
RemoteClusterStateBlobStore remoteEntityStore,
75+
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
76+
) {
77+
return () -> remoteEntityStore.writeAsync(blobEntity, getActionListener(component, blobEntity, latchedActionListener));
78+
}
79+
80+
private ActionListener<Void> getActionListener(
81+
String component,
82+
AbstractRemoteWritableBlobEntity remoteObject,
83+
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
84+
) {
85+
return ActionListener.wrap(
86+
resp -> latchedActionListener.onResponse(remoteObject.getUploadedMetadata()),
87+
ex -> latchedActionListener.onFailure(new RemoteStateTransferException(component, remoteObject, ex))
88+
);
89+
}
90+
91+
public CheckedRunnable<IOException> getAsyncMetadataReadAction(
92+
String component,
93+
AbstractRemoteWritableBlobEntity blobEntity,
94+
RemoteClusterStateBlobStore remoteEntityStore,
95+
LatchedActionListener<RemoteReadResult> listener
96+
) {
97+
final ActionListener actionListener = ActionListener.wrap(
98+
response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, component)),
99+
listener::onFailure
100+
);
101+
return () -> remoteEntityStore.readAsync(blobEntity, actionListener);
102+
}
103+
104+
public Map<String, ClusterState.Custom> getUpdatedCustoms(ClusterState clusterState, ClusterState previousClusterState) {
105+
Map<String, ClusterState.Custom> updatedCustoms = new HashMap<>();
106+
Set<String> currentCustoms = new HashSet<>(clusterState.customs().keySet());
107+
for (Map.Entry<String, ClusterState.Custom> entry : previousClusterState.customs().entrySet()) {
108+
if (currentCustoms.contains(entry.getKey()) && !entry.getValue().equals(clusterState.customs().get(entry.getKey()))) {
109+
updatedCustoms.put(entry.getKey(), clusterState.customs().get(entry.getKey()));
110+
}
111+
currentCustoms.remove(entry.getKey());
112+
}
113+
for (String custom : currentCustoms) {
114+
updatedCustoms.put(custom, clusterState.customs().get(custom));
115+
}
116+
return updatedCustoms;
117+
}
118+
}

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

+20-29
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575

7676
import static java.util.Objects.requireNonNull;
7777
import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD;
78+
import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION;
7879
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;
7980

8081
/**
@@ -171,12 +172,6 @@ public class RemoteClusterStateService implements Closeable {
171172
/**
172173
* Manifest format compatible with codec v2, where global metadata file is replaced with multiple metadata attribute files
173174
*/
174-
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT_V2 =
175-
new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV2);
176-
177-
/**
178-
* Manifest format compatible with codec v3, where global metadata file is replaced with multiple metadata attribute files
179-
*/
180175
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT = new ChecksumBlobStoreFormat<>(
181176
"cluster-metadata-manifest",
182177
METADATA_MANIFEST_NAME_FORMAT,
@@ -226,7 +221,6 @@ public class RemoteClusterStateService implements Closeable {
226221
+ "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata "
227222
+ "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]";
228223
public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1;
229-
public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V3;
230224
public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 2;
231225

232226
// ToXContent Params with gateway mode.
@@ -836,26 +830,25 @@ private RemoteClusterStateManifestInfo uploadManifest(
836830
committed,
837831
MANIFEST_CURRENT_CODEC_VERSION
838832
);
839-
final ClusterMetadataManifest manifest = new ClusterMetadataManifest(
840-
clusterState.term(),
841-
clusterState.getVersion(),
842-
clusterState.metadata().clusterUUID(),
843-
clusterState.stateUUID(),
844-
Version.CURRENT,
845-
nodeId,
846-
committed,
847-
MANIFEST_CURRENT_CODEC_VERSION,
848-
null,
849-
uploadedIndexMetadata,
850-
previousClusterUUID,
851-
clusterState.metadata().clusterUUIDCommitted(),
852-
uploadedCoordinationMetadata,
853-
uploadedSettingsMetadata,
854-
uploadedTemplatesMetadata,
855-
uploadedCustomMetadataMap,
856-
clusterState.routingTable().version(),
857-
uploadedIndicesRouting
858-
);
833+
final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder()
834+
.clusterTerm(clusterState.term())
835+
.stateVersion(clusterState.getVersion())
836+
.clusterUUID(clusterState.metadata().clusterUUID())
837+
.stateUUID(clusterState.stateUUID())
838+
.opensearchVersion(Version.CURRENT)
839+
.nodeId(nodeId)
840+
.committed(committed)
841+
.codecVersion(MANIFEST_CURRENT_CODEC_VERSION)
842+
.indices(uploadedIndexMetadata)
843+
.previousClusterUUID(previousClusterUUID)
844+
.clusterUUIDCommitted(clusterState.metadata().clusterUUIDCommitted())
845+
.coordinationMetadata(uploadedCoordinationMetadata)
846+
.settingMetadata(uploadedSettingsMetadata)
847+
.templatesMetadata(uploadedTemplatesMetadata)
848+
.customMetadataMap(uploadedCustomMetadataMap)
849+
.routingTableVersion(clusterState.routingTable().version())
850+
.indicesRouting(uploadedIndicesRouting)
851+
.build();
859852
writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest, manifestFileName);
860853
return new RemoteClusterStateManifestInfo(manifest, manifestFileName);
861854
}
@@ -1540,8 +1533,6 @@ private ChecksumBlobStoreFormat<ClusterMetadataManifest> getClusterMetadataManif
15401533
long codecVersion = getManifestCodecVersion(fileName);
15411534
if (codecVersion == MANIFEST_CURRENT_CODEC_VERSION) {
15421535
return CLUSTER_METADATA_MANIFEST_FORMAT;
1543-
} else if (codecVersion == ClusterMetadataManifest.CODEC_V2) {
1544-
return CLUSTER_METADATA_MANIFEST_FORMAT_V2;
15451536
} else if (codecVersion == ClusterMetadataManifest.CODEC_V1) {
15461537
return CLUSTER_METADATA_MANIFEST_FORMAT_V1;
15471538
} else if (codecVersion == ClusterMetadataManifest.CODEC_V0) {

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java

+90-3
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,17 @@
99
package org.opensearch.gateway.remote;
1010

1111
import org.opensearch.cluster.metadata.Metadata;
12+
import org.opensearch.common.blobstore.BlobContainer;
13+
import org.opensearch.common.blobstore.BlobPath;
1214
import org.opensearch.core.xcontent.ToXContent;
15+
import org.opensearch.repositories.blobstore.BlobStoreRepository;
1316

1417
import java.nio.charset.StandardCharsets;
18+
import java.util.ArrayList;
1519
import java.util.Base64;
20+
import java.util.HashMap;
21+
import java.util.List;
22+
import java.util.Locale;
1623
import java.util.Map;
1724

1825
/**
@@ -21,19 +28,99 @@
2128
public class RemoteClusterStateUtils {
2229

2330
public static final String DELIMITER = "__";
24-
public static final String PATH_DELIMITER = "/";
25-
public static final String GLOBAL_METADATA_PATH_TOKEN = "global-metadata";
2631
public static final String METADATA_NAME_FORMAT = "%s.dat";
27-
public static final String METADATA_NAME_PLAIN_FORMAT = "%s";
32+
public static final String CLUSTER_STATE_PATH_TOKEN = "cluster-state";
33+
public static final String GLOBAL_METADATA_PATH_TOKEN = "global-metadata";
34+
public static final String CLUSTER_STATE_EPHEMERAL_PATH_TOKEN = "ephemeral";
2835
public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 1;
36+
public static final String CUSTOM_DELIMITER = "--";
37+
public static final String PATH_DELIMITER = "/";
38+
public static final String METADATA_NAME_PLAIN_FORMAT = "%s";
2939

3040
// ToXContent Params with gateway mode.
3141
// We are using gateway context mode to persist all custom metadata.
3242
public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(
3343
Map.of(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_GATEWAY)
3444
);
3545

46+
public static BlobPath getCusterMetadataBasePath(BlobStoreRepository blobStoreRepository, String clusterName, String clusterUUID) {
47+
return blobStoreRepository.basePath().add(encodeString(clusterName)).add(CLUSTER_STATE_PATH_TOKEN).add(clusterUUID);
48+
}
49+
3650
public static String encodeString(String content) {
3751
return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8));
3852
}
53+
54+
public static String getFormattedFileName(String fileName, int codecVersion) {
55+
if (codecVersion < ClusterMetadataManifest.CODEC_V2) {
56+
return String.format(Locale.ROOT, METADATA_NAME_FORMAT, fileName);
57+
}
58+
return fileName;
59+
}
60+
61+
static BlobContainer clusterUUIDContainer(BlobStoreRepository blobStoreRepository, String clusterName) {
62+
return blobStoreRepository.blobStore()
63+
.blobContainer(
64+
blobStoreRepository.basePath()
65+
.add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8)))
66+
.add(CLUSTER_STATE_PATH_TOKEN)
67+
);
68+
}
69+
70+
/**
71+
* Container class to keep metadata of all uploaded attributes
72+
*/
73+
public static class UploadedMetadataResults {
74+
List<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndexMetadata;
75+
Map<String, ClusterMetadataManifest.UploadedMetadataAttribute> uploadedCustomMetadataMap;
76+
Map<String, ClusterMetadataManifest.UploadedMetadataAttribute> uploadedClusterStateCustomMetadataMap;
77+
ClusterMetadataManifest.UploadedMetadataAttribute uploadedCoordinationMetadata;
78+
ClusterMetadataManifest.UploadedMetadataAttribute uploadedSettingsMetadata;
79+
ClusterMetadataManifest.UploadedMetadataAttribute uploadedTransientSettingsMetadata;
80+
ClusterMetadataManifest.UploadedMetadataAttribute uploadedTemplatesMetadata;
81+
ClusterMetadataManifest.UploadedMetadataAttribute uploadedDiscoveryNodes;
82+
ClusterMetadataManifest.UploadedMetadataAttribute uploadedClusterBlocks;
83+
List<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndicesRoutingMetadata;
84+
ClusterMetadataManifest.UploadedMetadataAttribute uploadedHashesOfConsistentSettings;
85+
86+
public UploadedMetadataResults(
87+
List<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndexMetadata,
88+
Map<String, ClusterMetadataManifest.UploadedMetadataAttribute> uploadedCustomMetadataMap,
89+
ClusterMetadataManifest.UploadedMetadataAttribute uploadedCoordinationMetadata,
90+
ClusterMetadataManifest.UploadedMetadataAttribute uploadedSettingsMetadata,
91+
ClusterMetadataManifest.UploadedMetadataAttribute uploadedTransientSettingsMetadata,
92+
ClusterMetadataManifest.UploadedMetadataAttribute uploadedTemplatesMetadata,
93+
ClusterMetadataManifest.UploadedMetadataAttribute uploadedDiscoveryNodes,
94+
ClusterMetadataManifest.UploadedMetadataAttribute uploadedClusterBlocks,
95+
List<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndicesRoutingMetadata,
96+
ClusterMetadataManifest.UploadedMetadataAttribute uploadedHashesOfConsistentSettings,
97+
Map<String, ClusterMetadataManifest.UploadedMetadataAttribute> uploadedClusterStateCustomMap
98+
) {
99+
this.uploadedIndexMetadata = uploadedIndexMetadata;
100+
this.uploadedCustomMetadataMap = uploadedCustomMetadataMap;
101+
this.uploadedCoordinationMetadata = uploadedCoordinationMetadata;
102+
this.uploadedSettingsMetadata = uploadedSettingsMetadata;
103+
this.uploadedTransientSettingsMetadata = uploadedTransientSettingsMetadata;
104+
this.uploadedTemplatesMetadata = uploadedTemplatesMetadata;
105+
this.uploadedDiscoveryNodes = uploadedDiscoveryNodes;
106+
this.uploadedClusterBlocks = uploadedClusterBlocks;
107+
this.uploadedIndicesRoutingMetadata = uploadedIndicesRoutingMetadata;
108+
this.uploadedHashesOfConsistentSettings = uploadedHashesOfConsistentSettings;
109+
this.uploadedClusterStateCustomMetadataMap = uploadedClusterStateCustomMap;
110+
}
111+
112+
public UploadedMetadataResults() {
113+
this.uploadedIndexMetadata = new ArrayList<>();
114+
this.uploadedCustomMetadataMap = new HashMap<>();
115+
this.uploadedCoordinationMetadata = null;
116+
this.uploadedSettingsMetadata = null;
117+
this.uploadedTransientSettingsMetadata = null;
118+
this.uploadedTemplatesMetadata = null;
119+
this.uploadedDiscoveryNodes = null;
120+
this.uploadedClusterBlocks = null;
121+
this.uploadedIndicesRoutingMetadata = new ArrayList<>();
122+
this.uploadedHashesOfConsistentSettings = null;
123+
this.uploadedClusterStateCustomMetadataMap = new HashMap<>();
124+
}
125+
}
39126
}

0 commit comments

Comments
 (0)