18
18
import org .opensearch .common .blobstore .BlobContainer ;
19
19
import org .opensearch .core .action .ActionListener ;
20
20
import org .opensearch .core .xcontent .ToXContent ;
21
+ import org .opensearch .gateway .remote .RemoteClusterStateUtils .RemoteStateTransferException ;
21
22
import org .opensearch .index .remote .RemoteStoreUtils ;
23
+ import org .opensearch .index .translog .transfer .BlobStoreTransferService ;
22
24
import org .opensearch .repositories .blobstore .BlobStoreRepository ;
23
25
import org .opensearch .repositories .blobstore .ChecksumBlobStoreFormat ;
24
26
import org .opensearch .threadpool .ThreadPool ;
@@ -35,23 +37,17 @@ public class RemoteClusterStateAttributesManager {
35
37
public static final String DISCOVERY_NODES = "nodes" ;
36
38
public static final String CLUSTER_BLOCKS = "blocks" ;
37
39
public static final String CUSTOM_PREFIX = "custom" ;
38
- public static final ChecksumBlobStoreFormat <DiscoveryNodes > DISCOVERY_NODES_FORMAT = new ChecksumBlobStoreFormat <>(
39
- "nodes" ,
40
- METADATA_NAME_FORMAT ,
41
- DiscoveryNodes ::fromXContent
42
- );
43
- public static final ChecksumBlobStoreFormat <ClusterBlocks > CLUSTER_BLOCKS_FORMAT = new ChecksumBlobStoreFormat <>(
44
- "blocks" ,
45
- METADATA_NAME_FORMAT ,
46
- ClusterBlocks ::fromXContent
47
- );
48
40
public static final int CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION = 1 ;
41
+ private final BlobStoreTransferService blobStoreTransferService ;
49
42
private final BlobStoreRepository blobStoreRepository ;
50
43
private final ThreadPool threadPool ;
44
+ private final String clusterName ;
51
45
52
- RemoteClusterStateAttributesManager (BlobStoreRepository repository , ThreadPool threadPool ) {
46
+ RemoteClusterStateAttributesManager (BlobStoreTransferService blobStoreTransferService , BlobStoreRepository repository , ThreadPool threadPool , String clusterName ) {
47
+ this .blobStoreTransferService = blobStoreTransferService ;
53
48
this .blobStoreRepository = repository ;
54
49
this .threadPool = threadPool ;
50
+ this .clusterName = clusterName ;
55
51
}
56
52
57
53
/**
@@ -60,82 +56,48 @@ public class RemoteClusterStateAttributesManager {
60
56
CheckedRunnable <IOException > getAsyncMetadataWriteAction (
61
57
ClusterState clusterState ,
62
58
String component ,
63
- ChecksumBlobStoreFormat componentMetadataBlobStore ,
64
59
ToXContent componentData ,
65
60
LatchedActionListener <ClusterMetadataManifest .UploadedMetadata > latchedActionListener
66
61
) {
67
- final BlobContainer remoteStateAttributeContainer = clusterStateAttributeContainer (clusterState .getClusterName ().value (), clusterState .metadata ().clusterUUID ());
68
- final String componentMetadataFilename = clusterStateAttributeFileName (component , clusterState .metadata ().version ());
62
+ AbstractRemoteBlobStoreObject remoteObject = getRemoteObject (componentData , clusterState .version (), clusterState .metadata ().clusterUUID ());
69
63
ActionListener <Void > completionListener = ActionListener .wrap (
70
64
resp -> latchedActionListener .onResponse (
71
- new ClusterMetadataManifest . UploadedMetadataAttribute ( component , componentMetadataFilename )
65
+ remoteObject . getUploadedMetadata ( )
72
66
),
73
67
ex -> latchedActionListener .onFailure (new RemoteClusterStateUtils .RemoteStateTransferException (component , ex ))
74
68
);
75
- return () -> componentMetadataBlobStore .writeAsyncWithUrgentPriority (
76
- componentData ,
77
- remoteStateAttributeContainer ,
78
- componentMetadataFilename ,
79
- blobStoreRepository .getCompressor (),
80
- completionListener ,
81
- FORMAT_PARAMS
82
- );
69
+ return remoteObject .writeAsync (completionListener );
70
+ }
71
+
72
+ private AbstractRemoteBlobStoreObject getRemoteObject (ToXContent componentData , long stateVersion , String clusterUUID ) {
73
+ if (componentData instanceof DiscoveryNodes ) {
74
+ return new RemoteDiscoveryNodes ((DiscoveryNodes )componentData , stateVersion , clusterUUID , blobStoreTransferService , blobStoreRepository , clusterName , threadPool );
75
+ } else if (componentData instanceof ClusterBlocks ) {
76
+ return new RemoteClusterBlocks ((ClusterBlocks ) componentData , stateVersion , clusterUUID , blobStoreTransferService , blobStoreRepository , clusterName , threadPool );
77
+ } else {
78
+ throw new RemoteStateTransferException ("Remote object not found for " + componentData .getClass ());
79
+ }
83
80
}
84
81
85
82
public CheckedRunnable <IOException > getAsyncMetadataReadAction (
86
- String clusterName ,
87
83
String clusterUUID ,
88
84
String component ,
89
85
String uploadedFilename ,
90
- ChecksumBlobStoreFormat componentBlobStore ,
91
86
LatchedActionListener <RemoteClusterStateUtils .RemoteReadResult > listener
92
87
) {
93
- String [] splitFilename = uploadedFilename .split ("/" );
94
- return () -> componentBlobStore .readAsync (
95
- clusterStateAttributeContainer (clusterName , clusterUUID ),
96
- splitFilename [splitFilename .length -1 ],
97
- blobStoreRepository .getNamedXContentRegistry (),
98
- threadPool .executor (ThreadPool .Names .GENERIC ),
99
- ActionListener .wrap (response -> listener .onResponse (new RemoteClusterStateUtils .RemoteReadResult ((ToXContent ) response , CLUSTER_STATE_ATTRIBUTE , component )), listener ::onFailure )
100
- );
88
+ AbstractRemoteBlobStoreObject remoteObject = getRemoteObject (component , uploadedFilename , clusterUUID );
89
+ ActionListener actionListener = ActionListener .wrap (response -> listener .onResponse (new RemoteClusterStateUtils .RemoteReadResult ((ToXContent ) response , CLUSTER_STATE_ATTRIBUTE , component )), listener ::onFailure );
90
+ return () -> remoteObject .readAsync (actionListener );
101
91
}
102
92
103
- public ToXContent readMetadata (ChecksumBlobStoreFormat componentMetadataBlobStore , String clusterName , String clusterUUID , String fileName ) {
104
- final BlobContainer remoteStateAttributeContainer = clusterStateAttributeContainer (clusterName , clusterUUID );
105
- try {
106
- // Fetch custom metadata
107
- if (fileName != null ) {
108
- String [] splitPath = fileName .split ("/" );
109
- return componentMetadataBlobStore .read (
110
- remoteStateAttributeContainer ,
111
- splitPath [splitPath .length - 1 ],
112
- blobStoreRepository .getNamedXContentRegistry ()
113
- );
114
- } else {
115
- return TemplatesMetadata .EMPTY_METADATA ;
116
- }
117
- } catch (IOException e ) {
118
- throw new IllegalStateException (
119
- String .format (Locale .ROOT , "Error while downloading Templates Metadata - %s" , fileName ),
120
- e
121
- );
93
+ private AbstractRemoteBlobStoreObject getRemoteObject (String component , String blobName , String clusterUUID ) {
94
+ if (component .equals (RemoteDiscoveryNodes .DISCOVERY_NODES )) {
95
+ return new RemoteDiscoveryNodes (blobName , clusterUUID , blobStoreTransferService , blobStoreRepository , clusterName , threadPool );
96
+ } else if (component .equals (RemoteClusterBlocks .CLUSTER_BLOCKS )) {
97
+ return new RemoteClusterBlocks (blobName , clusterUUID , blobStoreTransferService , blobStoreRepository , clusterName , threadPool );
98
+ } else {
99
+ throw new RemoteStateTransferException ("Remote object not found for " + component );
122
100
}
123
101
}
124
102
125
- private BlobContainer clusterStateAttributeContainer (String clusterName , String clusterUUID ) {
126
- // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/
127
- return blobStoreRepository .blobStore ()
128
- .blobContainer (getCusterMetadataBasePath (blobStoreRepository , clusterName , clusterUUID ));
129
- }
130
-
131
- private static String clusterStateAttributeFileName (String componentPrefix , Long metadataVersion ) {
132
- // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/<componentPrefix>__<inverted_metadata_version>__<inverted__timestamp>__<codec_version>
133
- return String .join (
134
- DELIMITER ,
135
- componentPrefix ,
136
- RemoteStoreUtils .invertLong (metadataVersion ),
137
- RemoteStoreUtils .invertLong (System .currentTimeMillis ()),
138
- String .valueOf (CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION )
139
- );
140
- }
141
103
}
0 commit comments