8
8
import static java .util .concurrent .TimeUnit .SECONDS ;
9
9
import static org .opensearch .ml .common .CommonValue .MASTER_KEY ;
10
10
import static org .opensearch .ml .common .CommonValue .ML_CONFIG_INDEX ;
11
+ import static org .opensearch .ml .common .MLConfig .CREATE_TIME_FIELD ;
11
12
12
13
import java .nio .charset .StandardCharsets ;
13
14
import java .security .SecureRandom ;
15
+ import java .time .Instant ;
14
16
import java .util .Base64 ;
15
17
import java .util .concurrent .CountDownLatch ;
16
18
import java .util .concurrent .atomic .AtomicReference ;
17
19
18
20
import javax .crypto .spec .SecretKeySpec ;
19
21
22
+ import org .apache .commons .lang3 .exception .ExceptionUtils ;
20
23
import org .opensearch .ResourceNotFoundException ;
21
- import org .opensearch .action .LatchedActionListener ;
24
+ import org .opensearch .action .DocWriteRequest ;
22
25
import org .opensearch .action .get .GetRequest ;
23
- import org .opensearch .action .get .GetResponse ;
26
+ import org .opensearch .action .index .IndexRequest ;
27
+ import org .opensearch .action .support .WriteRequest ;
24
28
import org .opensearch .client .Client ;
25
29
import org .opensearch .cluster .service .ClusterService ;
26
30
import org .opensearch .common .util .concurrent .ThreadContext ;
27
31
import org .opensearch .core .action .ActionListener ;
32
+ import org .opensearch .index .engine .VersionConflictEngineException ;
28
33
import org .opensearch .ml .common .exception .MLException ;
34
+ import org .opensearch .ml .engine .indices .MLIndicesHandler ;
29
35
30
36
import com .amazonaws .encryptionsdk .AwsCrypto ;
31
37
import com .amazonaws .encryptionsdk .CommitmentPolicy ;
32
38
import com .amazonaws .encryptionsdk .CryptoResult ;
33
39
import com .amazonaws .encryptionsdk .jce .JceMasterKey ;
40
+ import com .google .common .collect .ImmutableMap ;
34
41
35
42
import lombok .extern .log4j .Log4j2 ;
36
43
@@ -42,11 +49,13 @@ public class EncryptorImpl implements Encryptor {
42
49
private ClusterService clusterService ;
43
50
private Client client ;
44
51
private volatile String masterKey ;
52
+ private MLIndicesHandler mlIndicesHandler ;
45
53
46
- public EncryptorImpl (ClusterService clusterService , Client client ) {
54
+ public EncryptorImpl (ClusterService clusterService , Client client , MLIndicesHandler mlIndicesHandler ) {
47
55
this .masterKey = null ;
48
56
this .clusterService = clusterService ;
49
57
this .client = client ;
58
+ this .mlIndicesHandler = mlIndicesHandler ;
50
59
}
51
60
52
61
public EncryptorImpl (String masterKey ) {
@@ -104,28 +113,68 @@ private void initMasterKey() {
104
113
AtomicReference <Exception > exceptionRef = new AtomicReference <>();
105
114
106
115
CountDownLatch latch = new CountDownLatch (1 );
107
- if (clusterService .state ().metadata ().hasIndex (ML_CONFIG_INDEX )) {
116
+ mlIndicesHandler .initMLConfigIndex (ActionListener .wrap (r -> {
117
+ GetRequest getRequest = new GetRequest (ML_CONFIG_INDEX ).id (MASTER_KEY );
108
118
try (ThreadContext .StoredContext context = client .threadPool ().getThreadContext ().stashContext ()) {
109
- GetRequest getRequest = new GetRequest (ML_CONFIG_INDEX ).id (MASTER_KEY );
110
- client .get (getRequest , ActionListener .runBefore (new LatchedActionListener (ActionListener .<GetResponse >wrap (r -> {
111
- if (r .isExists ()) {
112
- String masterKey = (String ) r .getSourceAsMap ().get (MASTER_KEY );
113
- this .masterKey = masterKey ;
119
+ client .get (getRequest , ActionListener .wrap (getResponse -> {
120
+ if (getResponse == null || !getResponse .isExists ()) {
121
+ IndexRequest indexRequest = new IndexRequest (ML_CONFIG_INDEX ).id (MASTER_KEY );
122
+ final String generatedMasterKey = generateMasterKey ();
123
+ indexRequest
124
+ .source (ImmutableMap .of (MASTER_KEY , generatedMasterKey , CREATE_TIME_FIELD , Instant .now ().toEpochMilli ()));
125
+ indexRequest .setRefreshPolicy (WriteRequest .RefreshPolicy .IMMEDIATE );
126
+ indexRequest .opType (DocWriteRequest .OpType .CREATE );
127
+ client .index (indexRequest , ActionListener .wrap (indexResponse -> {
128
+ this .masterKey = generatedMasterKey ;
129
+ log .info ("ML encryption master key initialized successfully" );
130
+ latch .countDown ();
131
+ }, e -> {
132
+
133
+ if (ExceptionUtils .getRootCause (e ) instanceof VersionConflictEngineException ) {
134
+ GetRequest getMasterKeyRequest = new GetRequest (ML_CONFIG_INDEX ).id (MASTER_KEY );
135
+ try (ThreadContext .StoredContext threadContext = client .threadPool ().getThreadContext ().stashContext ()) {
136
+ client .get (getMasterKeyRequest , ActionListener .wrap (getMasterKeyResponse -> {
137
+ if (getMasterKeyResponse != null && getMasterKeyResponse .isExists ()) {
138
+ final String masterKey = (String ) getMasterKeyResponse .getSourceAsMap ().get (MASTER_KEY );
139
+ this .masterKey = masterKey ;
140
+ log .info ("ML encryption master key already initialized, no action needed" );
141
+ latch .countDown ();
142
+ } else {
143
+ exceptionRef .set (new ResourceNotFoundException (MASTER_KEY_NOT_READY_ERROR ));
144
+ latch .countDown ();
145
+ }
146
+ }, error -> {
147
+ log .debug ("Failed to get ML encryption master key" , e );
148
+ exceptionRef .set (error );
149
+ latch .countDown ();
150
+ }));
151
+ }
152
+ } else {
153
+ log .debug ("Failed to index ML encryption master key" , e );
154
+ exceptionRef .set (e );
155
+ latch .countDown ();
156
+ }
157
+ }));
114
158
} else {
115
- exceptionRef .set (new ResourceNotFoundException (MASTER_KEY_NOT_READY_ERROR ));
159
+ final String masterKey = (String ) getResponse .getSourceAsMap ().get (MASTER_KEY );
160
+ this .masterKey = masterKey ;
161
+ log .info ("ML encryption master key already initialized, no action needed" );
162
+ latch .countDown ();
116
163
}
117
164
}, e -> {
118
- log .error ("Failed to get ML encryption master key" , e );
165
+ log .debug ("Failed to get ML encryption master key from config index " , e );
119
166
exceptionRef .set (e );
120
- }), latch ), () -> context .restore ()));
167
+ latch .countDown ();
168
+ }));
121
169
}
122
- } else {
123
- exceptionRef .set (new ResourceNotFoundException (MASTER_KEY_NOT_READY_ERROR ));
170
+ }, e -> {
171
+ log .debug ("Failed to init ML config index" , e );
172
+ exceptionRef .set (e );
124
173
latch .countDown ();
125
- }
174
+ }));
126
175
127
176
try {
128
- latch .await (5 , SECONDS );
177
+ latch .await (1 , SECONDS );
129
178
} catch (InterruptedException e ) {
130
179
throw new IllegalStateException (e );
131
180
}
@@ -142,4 +191,5 @@ private void initMasterKey() {
142
191
throw new ResourceNotFoundException (MASTER_KEY_NOT_READY_ERROR );
143
192
}
144
193
}
194
+
145
195
}
0 commit comments