Skip to content

Commit 05f56a3

Browse files
committed
address comments
1 parent ffd7d94 commit 05f56a3

File tree

9 files changed

+161
-91
lines changed

9 files changed

+161
-91
lines changed

src/main/java/org/opensearch/ad/transport/ADResultBulkTransportAction.java

+2-5
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.opensearch.common.settings.Settings;
3434
import org.opensearch.core.xcontent.XContentBuilder;
3535
import org.opensearch.index.IndexingPressure;
36-
import org.opensearch.timeseries.NodeStateManager;
3736
import org.opensearch.timeseries.transport.ResultBulkTransportAction;
3837
import org.opensearch.timeseries.util.RestHandlerUtils;
3938
import org.opensearch.transport.TransportService;
@@ -51,8 +50,7 @@ public ADResultBulkTransportAction(
5150
IndexingPressure indexingPressure,
5251
Settings settings,
5352
ClusterService clusterService,
54-
Client client,
55-
NodeStateManager stateManager
53+
Client client
5654
) {
5755
super(
5856
ADResultBulkAction.NAME,
@@ -64,8 +62,7 @@ public ADResultBulkTransportAction(
6462
AD_INDEX_PRESSURE_SOFT_LIMIT.get(settings),
6563
AD_INDEX_PRESSURE_HARD_LIMIT.get(settings),
6664
ADCommonName.ANOMALY_RESULT_INDEX_ALIAS,
67-
ADResultBulkRequest::new,
68-
stateManager
65+
ADResultBulkRequest::new
6966
);
7067
this.clusterService = clusterService;
7168
this.client = client;

src/main/java/org/opensearch/forecast/transport/ForecastResultBulkTransportAction.java

+2-5
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.opensearch.forecast.model.ForecastResult;
2727
import org.opensearch.forecast.ratelimit.ForecastResultWriteRequest;
2828
import org.opensearch.index.IndexingPressure;
29-
import org.opensearch.timeseries.NodeStateManager;
3029
import org.opensearch.timeseries.transport.ResultBulkTransportAction;
3130
import org.opensearch.transport.TransportService;
3231

@@ -40,8 +39,7 @@ public ForecastResultBulkTransportAction(
4039
IndexingPressure indexingPressure,
4140
Settings settings,
4241
ClusterService clusterService,
43-
Client client,
44-
NodeStateManager stateManager
42+
Client client
4543
) {
4644
super(
4745
ForecastResultBulkAction.NAME,
@@ -53,8 +51,7 @@ public ForecastResultBulkTransportAction(
5351
FORECAST_INDEX_PRESSURE_SOFT_LIMIT.get(settings),
5452
FORECAST_INDEX_PRESSURE_HARD_LIMIT.get(settings),
5553
ForecastIndex.RESULT.getIndexName(),
56-
ForecastResultBulkRequest::new,
57-
stateManager
54+
ForecastResultBulkRequest::new
5855
);
5956
clusterService.getClusterSettings().addSettingsUpdateConsumer(FORECAST_INDEX_PRESSURE_SOFT_LIMIT, it -> softLimit = it);
6057
clusterService.getClusterSettings().addSettingsUpdateConsumer(FORECAST_INDEX_PRESSURE_HARD_LIMIT, it -> hardLimit = it);

src/main/java/org/opensearch/timeseries/ratelimit/ResultWriteRequest.java

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public ResultWriteRequest(StreamInput in, Reader<ResultType> resultReader) throw
4848
public void writeTo(StreamOutput out) throws IOException {
4949
result.writeTo(out);
5050
out.writeOptionalString(resultIndex);
51+
out.writeBoolean(flattenResultIndex);
5152
}
5253

5354
public ResultType getResult() {

src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java

+48-47
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import org.apache.logging.log4j.LogManager;
2323
import org.apache.logging.log4j.Logger;
2424
import org.opensearch.OpenSearchStatusException;
25-
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;
26-
import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest;
2725
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
2826
import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsAction;
2927
import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsRequest;
@@ -474,23 +472,11 @@ private void handlePostRequest(boolean indexingDryRun, ActionListener<T> listene
474472
.getFlattenedResultIndexAlias(config.getCustomResultIndexOrAlias(), configId);
475473

476474
timeSeriesIndices
477-
.initFlattenedResultIndex(
478-
flattenedResultIndexAlias,
479-
ActionListener.wrap(
480-
initResponse -> setupIngestPipeline(configId, listener, createConfigResponse),
481-
listener::onFailure));
482-
483-
// timeSeriesIndices
484-
// .initFlattenedResultIndex(
485-
// flattenedResultIndexAlias,
486-
// ActionListener.wrap(initResponse -> setupIngestPipeline(configId, ActionListener.wrap(pipelineResponse -> {
487-
// updateResultIndexSetting(
488-
// pipelineId,
489-
// flattenedResultIndexAlias,
490-
// ActionListener.wrap(updateResponse -> listener.onResponse(createConfigResponse), listener::onFailure)
491-
// );
492-
// }, listener::onFailure)), listener::onFailure)
493-
// );
475+
.initFlattenedResultIndex(
476+
flattenedResultIndexAlias,
477+
ActionListener
478+
.wrap(initResponse -> setupIngestPipeline(configId, listener, createConfigResponse), listener::onFailure)
479+
);
494480
} else {
495481
listener.onResponse(createConfigResponse);
496482
}
@@ -512,10 +498,9 @@ protected void setupIngestPipeline(String configId, ActionListener<T> listener,
512498

513499
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, pipelineSource, XContentType.JSON);
514500

515-
client.admin().cluster().putPipeline(putPipelineRequest, ActionListener.wrap(
516-
putPipelineResponse -> {
517-
logger.info("Ingest pipeline created successfully for pipelineId: {}", pipelineId);
518-
bindIngestPipelineWithFlattenedResultIndex(pipelineId, configId, flattenedResultIndexAlias, listener, createConfigResponse);
501+
client.admin().cluster().putPipeline(putPipelineRequest, ActionListener.wrap(putPipelineResponse -> {
502+
logger.info("Ingest pipeline created successfully for pipelineId: {}", pipelineId);
503+
bindIngestPipelineWithFlattenedResultIndex(pipelineId, configId, flattenedResultIndexAlias, listener, createConfigResponse);
519504

520505
}, exception -> {
521506
logger.error("Error while creating ingest pipeline for pipelineId: {}", pipelineId, exception);
@@ -553,13 +538,31 @@ private BytesReference createPipelineDefinition(String indexName) throws IOExcep
553538
return BytesReference.bytes(pipelineBuilder);
554539
}
555540

556-
protected void bindIngestPipelineWithFlattenedResultIndex(String pipelineId, String configId, String flattenedResultIndexAlias, ActionListener<T> listener, T createConfigResponse) {
541+
private UpdateSettingsRequest buildUpdateSettingsRequest(String defaultPipelineName, String configId) {
542+
String flattenedResultIndex = timeSeriesIndices.getFlattenedResultIndexAlias(config.getCustomResultIndexOrAlias(), configId);
543+
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest();
544+
updateSettingsRequest.indices(flattenedResultIndex);
545+
546+
Settings.Builder settingsBuilder = Settings.builder();
547+
settingsBuilder.put("index.default_pipeline", defaultPipelineName);
548+
549+
updateSettingsRequest.settings(settingsBuilder);
550+
551+
return updateSettingsRequest;
552+
}
553+
554+
protected void bindIngestPipelineWithFlattenedResultIndex(
555+
String pipelineId,
556+
String configId,
557+
String flattenedResultIndexAlias,
558+
ActionListener<T> listener,
559+
T createConfigResponse
560+
) {
557561
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest(pipelineId, configId);
558562

559-
client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(
560-
updateSettingsResponse -> {
561-
logger.info("Successfully updated settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId);
562-
listener.onResponse(createConfigResponse);
563+
client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(updateSettingsResponse -> {
564+
logger.info("Successfully updated settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId);
565+
listener.onResponse(createConfigResponse);
563566
}, exception -> {
564567
logger.error("Failed to update settings for index: {} with pipeline: {}", flattenedResultIndexAlias, pipelineId, exception);
565568
listener.onFailure(exception);
@@ -654,27 +657,25 @@ private void onGetConfigResponse(GetResponse response, boolean indexingDryRun, S
654657
}
655658
}
656659

657-
private UpdateSettingsRequest buildUpdateSettingsRequest(String defaultPipelineName, String configId) {
658-
String flattenedResultIndex = timeSeriesIndices
659-
.getFlattenedResultIndexAlias(config.getCustomResultIndexOrAlias(), configId);
660-
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest();
661-
updateSettingsRequest.indices(flattenedResultIndex);
662-
663-
Settings.Builder settingsBuilder = Settings.builder();
664-
settingsBuilder.put("index.default_pipeline", defaultPipelineName);
665-
666-
updateSettingsRequest.settings(settingsBuilder);
667-
668-
return updateSettingsRequest;
669-
}
670-
671-
private void unbindIngestPipelineWithFlattenedResultIndex(Config existingConfig, ActionListener<T> listener, String id, boolean indexingDryRun) {
660+
private void unbindIngestPipelineWithFlattenedResultIndex(
661+
Config existingConfig,
662+
ActionListener<T> listener,
663+
String id,
664+
boolean indexingDryRun
665+
) {
672666
// The pipeline name _none specifies that the index does not have an ingest pipeline.
673667
UpdateSettingsRequest updateSettingsRequest = buildUpdateSettingsRequest("_none", existingConfig.getId());
674-
client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(
675-
updateSettingsResponse -> deleteIngestPipeline(existingConfig, listener, id, indexingDryRun),
676-
exception -> listener.onFailure(exception)
677-
));
668+
client
669+
.admin()
670+
.indices()
671+
.updateSettings(
672+
updateSettingsRequest,
673+
ActionListener
674+
.wrap(
675+
updateSettingsResponse -> deleteIngestPipeline(existingConfig, listener, id, indexingDryRun),
676+
exception -> listener.onFailure(exception)
677+
)
678+
);
678679
}
679680

680681
private void deleteIngestPipeline(Config existingConfig, ActionListener<T> listener, String id, boolean indexingDryRun) {

src/main/java/org/opensearch/timeseries/transport/ResultBulkTransportAction.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@ public ResultBulkTransportAction(
6464
float softLimit,
6565
float hardLimit,
6666
String indexName,
67-
Writeable.Reader<ResultBulkRequestType> requestReader,
68-
NodeStateManager nodeStateManager
67+
Writeable.Reader<ResultBulkRequestType> requestReader
6968
) {
7069
super(actionName, transportService, actionFilters, requestReader, ThreadPool.Names.SAME);
7170
this.indexingPressure = indexingPressure;
@@ -75,7 +74,6 @@ public ResultBulkTransportAction(
7574
this.softLimit = softLimit;
7675
this.hardLimit = hardLimit;
7776
this.indexName = indexName;
78-
this.nodeStateManager = nodeStateManager;
7977

8078
// random seed is 42. Can be any number
8179
this.random = new Random(42);

src/test/java/org/opensearch/ad/ratelimit/ResultWriteWorkerTests.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,8 @@ public void testRegular() {
120120
detectorId,
121121
RequestPriority.MEDIUM,
122122
detectResult,
123-
null
123+
null,
124+
false
124125
);
125126
request.add(resultWriteRequest);
126127

@@ -130,7 +131,7 @@ public void testRegular() {
130131
return null;
131132
}).when(resultHandler).flush(any(), any());
132133

133-
resultWriteQueue.put(new ADResultWriteRequest(Long.MAX_VALUE, detectorId, RequestPriority.MEDIUM, detectResult, null));
134+
resultWriteQueue.put(new ADResultWriteRequest(Long.MAX_VALUE, detectorId, RequestPriority.MEDIUM, detectResult, null, false));
134135

135136
// the request results one flush
136137
verify(resultHandler, times(1)).flush(any(), any());
@@ -152,7 +153,8 @@ public void testSingleRetryRequest() throws IOException {
152153
detectorId,
153154
RequestPriority.MEDIUM,
154155
detectResult,
155-
null
156+
null,
157+
false
156158
);
157159
request.add(resultWriteRequest);
158160

@@ -168,7 +170,7 @@ public void testSingleRetryRequest() throws IOException {
168170
return null;
169171
}).when(resultHandler).flush(any(), any());
170172

171-
resultWriteQueue.put(new ADResultWriteRequest(Long.MAX_VALUE, detectorId, RequestPriority.MEDIUM, detectResult, null));
173+
resultWriteQueue.put(new ADResultWriteRequest(Long.MAX_VALUE, detectorId, RequestPriority.MEDIUM, detectResult, null, false));
172174

173175
// one flush from the original request; and one due to retry
174176
verify(resultHandler, times(2)).flush(any(), any());
@@ -188,7 +190,7 @@ public void testRetryException() {
188190
return null;
189191
}).when(resultHandler).flush(any(), any());
190192

191-
resultWriteQueue.put(new ADResultWriteRequest(Long.MAX_VALUE, detectorId, RequestPriority.MEDIUM, detectResult, null));
193+
resultWriteQueue.put(new ADResultWriteRequest(Long.MAX_VALUE, detectorId, RequestPriority.MEDIUM, detectResult, null, false));
192194
// one flush from the original request; and one due to retry
193195
verify(resultHandler, times(2)).flush(any(), any());
194196
verify(nodeStateManager, times(1)).setException(eq(detectorId), any(OpenSearchStatusException.class));
@@ -202,7 +204,7 @@ public void testOverloaded() {
202204
return null;
203205
}).when(resultHandler).flush(any(), any());
204206

205-
resultWriteQueue.put(new ADResultWriteRequest(Long.MAX_VALUE, detectorId, RequestPriority.MEDIUM, detectResult, null));
207+
resultWriteQueue.put(new ADResultWriteRequest(Long.MAX_VALUE, detectorId, RequestPriority.MEDIUM, detectResult, null, false));
206208
// one flush from the original request; and one due to retry
207209
verify(resultHandler, times(1)).flush(any(), any());
208210
verify(nodeStateManager, times(1)).setException(eq(detectorId), any(OpenSearchRejectedExecutionException.class));

0 commit comments

Comments
 (0)