Skip to content

Commit ec5669f

Browse files
Adds share API to allow resource sharing
Signed-off-by: Darshit Chanpura <dchanp@amazon.com>
1 parent 687e6d9 commit ec5669f

10 files changed

+295
-2
lines changed

src/main/java/org/opensearch/ad/transport/IndexAnomalyDetectorTransportAction.java

+5
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,11 @@ protected void adExecute(
178178
);
179179
indexAnomalyDetectorActionHandler.start(listener);
180180
}, listener);
181+
182+
// This call was added to ensure that existing functionality of sharing the resource via backend_role exists
183+
// TODO 3.0 and later the following must be removed and a new REST API where user must explicitly share the detector should be
184+
// exposed
185+
shareResourceWithBackendRoles(detectorId, user, listener);
181186
}
182187

183188
private void checkIndicesAndExecute(

src/main/java/org/opensearch/timeseries/TimeSeriesAnalyticsPlugin.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@
293293
import org.opensearch.timeseries.function.ThrowingSupplierWrapper;
294294
import org.opensearch.timeseries.model.Job;
295295
import org.opensearch.timeseries.ratelimit.CheckPointMaintainRequestAdapter;
296+
import org.opensearch.timeseries.rest.RestShareConfigAction;
296297
import org.opensearch.timeseries.settings.TimeSeriesEnabledSetting;
297298
import org.opensearch.timeseries.settings.TimeSeriesSettings;
298299
import org.opensearch.timeseries.stats.StatNames;
@@ -302,6 +303,8 @@
302303
import org.opensearch.timeseries.stats.suppliers.SettableSupplier;
303304
import org.opensearch.timeseries.task.TaskCacheManager;
304305
import org.opensearch.timeseries.transport.CronTransportAction;
306+
import org.opensearch.timeseries.transport.ShareConfigAction;
307+
import org.opensearch.timeseries.transport.ShareConfigTransportAction;
305308
import org.opensearch.timeseries.transport.handler.ResultBulkIndexingHandler;
306309
import org.opensearch.timeseries.util.ClientUtil;
307310
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
@@ -439,6 +442,9 @@ public List<RestHandler> getRestHandlers(
439442
RestValidateForecasterAction validateForecasterAction = new RestValidateForecasterAction(settings, clusterService);
440443
RestForecasterSuggestAction suggestForecasterParamAction = new RestForecasterSuggestAction(settings, clusterService);
441444

445+
// Config sharing and access control
446+
RestShareConfigAction restShareConfigAction = new RestShareConfigAction();
447+
442448
ForecastJobProcessor forecastJobRunner = ForecastJobProcessor.getInstance();
443449
forecastJobRunner.setClient(client);
444450
forecastJobRunner.setThreadPool(threadPool);
@@ -478,7 +484,9 @@ public List<RestHandler> getRestHandlers(
478484
statsForecasterAction,
479485
runOnceForecasterAction,
480486
validateForecasterAction,
481-
suggestForecasterParamAction
487+
suggestForecasterParamAction,
488+
// Config sharing and access control
489+
restShareConfigAction
482490
);
483491
}
484492

@@ -1711,7 +1719,8 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
17111719
new ActionHandler<>(ForecastRunOnceAction.INSTANCE, ForecastRunOnceTransportAction.class),
17121720
new ActionHandler<>(ForecastRunOnceProfileAction.INSTANCE, ForecastRunOnceProfileTransportAction.class),
17131721
new ActionHandler<>(ValidateForecasterAction.INSTANCE, ValidateForecasterTransportAction.class),
1714-
new ActionHandler<>(SuggestForecasterParamAction.INSTANCE, SuggestForecasterParamTransportAction.class)
1722+
new ActionHandler<>(SuggestForecasterParamAction.INSTANCE, SuggestForecasterParamTransportAction.class),
1723+
new ActionHandler<>(ShareConfigAction.INSTANCE, ShareConfigTransportAction.class)
17151724
);
17161725
}
17171726

src/main/java/org/opensearch/timeseries/constant/CommonValue.java

+3
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,7 @@ public class CommonValue {
99
// unknown or no schema version
1010
public static Integer NO_SCHEMA_VERSION = 0;
1111

12+
// config access control
13+
public static String CONFIG_ACCESS_CONTROL_BASE_ACTION = "cluster:admin/timeseries/config/access";
14+
public static String CONFIG_ACCESS_CONTROL_BASE_URI = "/_plugins/_timeseries/config/access";
1215
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package org.opensearch.timeseries.rest;
2+
3+
import static java.util.Collections.singletonList;
4+
import static org.opensearch.rest.RestRequest.Method.POST;
5+
import static org.opensearch.timeseries.constant.CommonValue.CONFIG_ACCESS_CONTROL_BASE_URI;
6+
7+
import java.io.IOException;
8+
import java.util.List;
9+
import java.util.Map;
10+
11+
import org.opensearch.accesscontrol.resources.ShareWith;
12+
import org.opensearch.client.node.NodeClient;
13+
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
14+
import org.opensearch.common.xcontent.XContentFactory;
15+
import org.opensearch.common.xcontent.XContentType;
16+
import org.opensearch.core.xcontent.NamedXContentRegistry;
17+
import org.opensearch.core.xcontent.XContentParser;
18+
import org.opensearch.rest.BaseRestHandler;
19+
import org.opensearch.rest.RestRequest;
20+
import org.opensearch.rest.action.RestToXContentListener;
21+
import org.opensearch.timeseries.transport.ShareConfigAction;
22+
import org.opensearch.timeseries.transport.ShareConfigRequest;
23+
24+
public class RestShareConfigAction extends BaseRestHandler {
25+
public RestShareConfigAction() {}
26+
27+
@Override
28+
public List<Route> routes() {
29+
return singletonList(new Route(POST, CONFIG_ACCESS_CONTROL_BASE_URI + "/share"));
30+
}
31+
32+
@Override
33+
public String getName() {
34+
return "share_timeseries_config";
35+
}
36+
37+
@Override
38+
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
39+
Map<String, Object> source;
40+
try (XContentParser parser = request.contentParser()) {
41+
source = parser.map();
42+
}
43+
44+
String resourceId = (String) source.get("config_id");
45+
46+
ShareWith shareWith = parseShareWith(source);
47+
final ShareConfigRequest shareResourceRequest = new ShareConfigRequest(resourceId, shareWith);
48+
return channel -> client.executeLocally(ShareConfigAction.INSTANCE, shareResourceRequest, new RestToXContentListener<>(channel));
49+
}
50+
51+
private ShareWith parseShareWith(Map<String, Object> source) throws IOException {
52+
@SuppressWarnings("unchecked")
53+
Map<String, Object> shareWithMap = (Map<String, Object>) source.get("share_with");
54+
if (shareWithMap == null || shareWithMap.isEmpty()) {
55+
throw new IllegalArgumentException("share_with is required and cannot be empty");
56+
}
57+
58+
String jsonString = XContentFactory.jsonBuilder().map(shareWithMap).toString();
59+
60+
try (
61+
XContentParser parser = XContentType.JSON
62+
.xContent()
63+
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, jsonString)
64+
) {
65+
return ShareWith.fromXContent(parser);
66+
} catch (IllegalArgumentException e) {
67+
throw new IllegalArgumentException("Invalid share_with structure: " + e.getMessage(), e);
68+
}
69+
}
70+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.opensearch.timeseries.transport;
2+
3+
import static org.opensearch.timeseries.constant.CommonValue.CONFIG_ACCESS_CONTROL_BASE_ACTION;
4+
5+
import org.opensearch.action.ActionType;
6+
7+
public class ShareConfigAction extends ActionType<ShareConfigResponse> {
8+
9+
/**
10+
* Share config action instance.
11+
*/
12+
public static final ShareConfigAction INSTANCE = new ShareConfigAction();
13+
/**
14+
* Share config action name
15+
*/
16+
public static final String NAME = CONFIG_ACCESS_CONTROL_BASE_ACTION + "/share";
17+
18+
private ShareConfigAction() {
19+
super(NAME, ShareConfigResponse::new);
20+
}
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package org.opensearch.timeseries.transport;
2+
3+
import java.io.IOException;
4+
import java.util.stream.Collectors;
5+
6+
import org.opensearch.accesscontrol.resources.ShareWith;
7+
import org.opensearch.accesscontrol.resources.SharedWithScope;
8+
import org.opensearch.action.ActionRequest;
9+
import org.opensearch.action.ActionRequestValidationException;
10+
import org.opensearch.core.common.io.stream.StreamInput;
11+
import org.opensearch.core.common.io.stream.StreamOutput;
12+
import org.opensearch.timeseries.util.ValidationUtil;
13+
14+
public class ShareConfigRequest extends ActionRequest {
15+
private final String configId;
16+
private final ShareWith shareWith;
17+
18+
public ShareConfigRequest(String configId, ShareWith shareWith) {
19+
this.configId = configId;
20+
this.shareWith = shareWith;
21+
}
22+
23+
public ShareConfigRequest(StreamInput in) throws IOException {
24+
this.configId = in.readString();
25+
this.shareWith = in.readNamedWriteable(ShareWith.class);
26+
}
27+
28+
@Override
29+
public void writeTo(final StreamOutput out) throws IOException {
30+
out.writeString(configId);
31+
out.writeNamedWriteable(shareWith);
32+
}
33+
34+
@Override
35+
public ActionRequestValidationException validate() {
36+
37+
return ValidationUtil
38+
.validateScopes(shareWith.getSharedWithScopes().stream().map(SharedWithScope::getScope).collect(Collectors.toSet()));
39+
}
40+
41+
public String getConfigId() {
42+
return configId;
43+
}
44+
45+
public ShareWith getShareWith() {
46+
return shareWith;
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package org.opensearch.timeseries.transport;
2+
3+
import java.io.IOException;
4+
5+
import org.opensearch.core.action.ActionResponse;
6+
import org.opensearch.core.common.io.stream.StreamInput;
7+
import org.opensearch.core.common.io.stream.StreamOutput;
8+
import org.opensearch.core.xcontent.ToXContent;
9+
import org.opensearch.core.xcontent.ToXContentObject;
10+
import org.opensearch.core.xcontent.XContentBuilder;
11+
12+
public class ShareConfigResponse extends ActionResponse implements ToXContentObject {
13+
private final String message;
14+
15+
public ShareConfigResponse(String message) {
16+
this.message = message;
17+
}
18+
19+
@Override
20+
public void writeTo(StreamOutput out) throws IOException {
21+
out.writeString(message);
22+
}
23+
24+
public ShareConfigResponse(final StreamInput in) throws IOException {
25+
message = in.readString();
26+
}
27+
28+
@Override
29+
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
30+
builder.startObject();
31+
builder.field("message", message);
32+
builder.endObject();
33+
return builder;
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package org.opensearch.timeseries.transport;
2+
3+
import org.apache.logging.log4j.LogManager;
4+
import org.apache.logging.log4j.Logger;
5+
import org.opensearch.accesscontrol.resources.ResourceService;
6+
import org.opensearch.accesscontrol.resources.ResourceSharing;
7+
import org.opensearch.action.support.ActionFilters;
8+
import org.opensearch.action.support.HandledTransportAction;
9+
import org.opensearch.common.inject.Inject;
10+
import org.opensearch.core.action.ActionListener;
11+
import org.opensearch.tasks.Task;
12+
import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin;
13+
import org.opensearch.timeseries.constant.CommonName;
14+
import org.opensearch.transport.TransportService;
15+
16+
public class ShareConfigTransportAction extends HandledTransportAction<ShareConfigRequest, ShareConfigResponse> {
17+
18+
private static final Logger log = LogManager.getLogger(ShareConfigTransportAction.class);
19+
20+
@Inject
21+
public ShareConfigTransportAction(TransportService transportService, ActionFilters actionFilters) {
22+
super(ShareConfigAction.NAME, transportService, actionFilters, ShareConfigRequest::new);
23+
}
24+
25+
@Override
26+
protected void doExecute(Task task, ShareConfigRequest request, ActionListener<ShareConfigResponse> listener) {
27+
ResourceSharing sharing;
28+
try {
29+
sharing = shareConfig(request);
30+
log.info("Shared config : {} with {}", request.getConfigId(), sharing.toString());
31+
listener.onResponse(new ShareConfigResponse("Resource " + request.getConfigId() + " shared successfully with " + sharing));
32+
} catch (Exception e) {
33+
log.error("Something went wrong trying to share config {} with {}", request.getConfigId(), request.getShareWith().toString());
34+
listener.onFailure(e);
35+
}
36+
}
37+
38+
private ResourceSharing shareConfig(ShareConfigRequest request) {
39+
ResourceService rs = TimeSeriesAnalyticsPlugin.GuiceHolder.getResourceService();
40+
return rs.getResourceAccessControlPlugin().shareWith(request.getConfigId(), CommonName.CONFIG_INDEX, request.getShareWith());
41+
}
42+
}

src/main/java/org/opensearch/timeseries/util/ParseUtils.java

+24
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,14 @@
3636
import org.apache.logging.log4j.Logger;
3737
import org.apache.lucene.search.join.ScoreMode;
3838
import org.opensearch.OpenSearchStatusException;
39+
import org.opensearch.accesscontrol.resources.RecipientType;
40+
import org.opensearch.accesscontrol.resources.ShareWith;
41+
import org.opensearch.accesscontrol.resources.SharedWithScope;
3942
import org.opensearch.action.get.GetRequest;
4043
import org.opensearch.action.get.GetResponse;
4144
import org.opensearch.action.search.SearchResponse;
4245
import org.opensearch.ad.constant.ADResourceScope;
46+
import org.opensearch.ad.transport.IndexAnomalyDetectorResponse;
4347
import org.opensearch.client.Client;
4448
import org.opensearch.cluster.service.ClusterService;
4549
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
@@ -841,4 +845,24 @@ public static void validatePermissions(String detectorId, ActionListener<? exten
841845
}
842846
}
843847

848+
public static void shareResourceWithBackendRoles(String detectorId, User user, ActionListener<IndexAnomalyDetectorResponse> listener) {
849+
SharedWithScope.ScopeRecipients recipients = new SharedWithScope.ScopeRecipients(
850+
Map.of(new RecipientType("backend_roles"), Set.copyOf(user.getBackendRoles()))
851+
);
852+
ShareWith shareWith = new ShareWith(Set.of(new SharedWithScope(ADResourceScope.AD_FULL_ACCESS.getScopeName(), recipients)));
853+
854+
TimeSeriesAnalyticsPlugin.GuiceHolder
855+
.getResourceService()
856+
.getResourceAccessControlPlugin()
857+
.shareWith(detectorId, CommonName.CONFIG_INDEX, shareWith);
858+
859+
logger
860+
.info(
861+
"Detector {} shared with backend roles of user {} for scope {}",
862+
detectorId,
863+
user.getName(),
864+
ADResourceScope.AD_FULL_ACCESS.getScopeName()
865+
);
866+
}
867+
844868
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.timeseries.util;
10+
11+
import java.util.HashSet;
12+
import java.util.Set;
13+
14+
import org.opensearch.accesscontrol.resources.ResourceAccessScope;
15+
import org.opensearch.action.ActionRequestValidationException;
16+
import org.opensearch.ad.constant.ADResourceScope;
17+
18+
public class ValidationUtil {
19+
public static ActionRequestValidationException validateScopes(Set<String> scopes) {
20+
Set<String> validScopes = new HashSet<>();
21+
for (ADResourceScope scope : ADResourceScope.values()) {
22+
validScopes.add(scope.name());
23+
}
24+
validScopes.add(ResourceAccessScope.READ_ONLY);
25+
validScopes.add(ResourceAccessScope.READ_WRITE);
26+
27+
for (String s : scopes) {
28+
if (!validScopes.contains(s)) {
29+
ActionRequestValidationException exception = new ActionRequestValidationException();
30+
exception.addValidationError("Invalid scope: " + s + ". Scope must be one of: " + validScopes);
31+
return exception;
32+
}
33+
}
34+
return null;
35+
}
36+
}

0 commit comments

Comments
 (0)