@@ -64,6 +64,9 @@ import {
64
64
getLatestDetectorTasksQuery ,
65
65
isRealTimeTask ,
66
66
getFiltersFromEntityList ,
67
+ convertStaticFieldsToCamelCase ,
68
+ getLatestTaskForDetectorQuery ,
69
+ convertTaskAndJobFieldsToCamelCase ,
67
70
} from './utils/adHelpers' ;
68
71
import { isNumber , set } from 'lodash' ;
69
72
import {
@@ -239,30 +242,71 @@ export default class AdService {
239
242
) : Promise < IOpenSearchDashboardsResponse < any > > => {
240
243
try {
241
244
const { detectorId } = request . params as { detectorId : string } ;
242
- const response = await this . client
245
+ const detectorResponse = await this . client
243
246
. asScoped ( request )
244
247
. callAsCurrentUser ( 'ad.getDetector' , {
245
248
detectorId,
246
249
} ) ;
247
250
248
- let resp = {
249
- ...response . anomaly_detector ,
250
- id : response . _id ,
251
- primaryTerm : response . _primary_term ,
252
- seqNo : response . _seq_no ,
253
- adJob : { ...response . anomaly_detector_job } ,
254
- historicalTask : { ...response . historical_analysis_task } ,
255
- curState : getTaskState ( response . realtime_detection_task ) ,
256
- stateError : processTaskError (
257
- get ( response , 'realtime_detection_task.error' , '' )
251
+ // Populating static detector fields
252
+ const staticFields = {
253
+ id : detectorResponse . _id ,
254
+ primaryTerm : detectorResponse . _primary_term ,
255
+ seqNo : detectorResponse . _seq_no ,
256
+ ...convertStaticFieldsToCamelCase ( detectorResponse . anomaly_detector ) ,
257
+ } ;
258
+
259
+ // Get real-time and historical task info to populate the
260
+ // task and job-related fields
261
+ const realtimeTaskResponse : any = await this . client
262
+ . asScoped ( request )
263
+ . callAsCurrentUser ( 'ad.searchTasks' , {
264
+ body : getLatestTaskForDetectorQuery ( detectorId , true ) ,
265
+ } ) ;
266
+ const historicalTaskResponse : any = await this . client
267
+ . asScoped ( request )
268
+ . callAsCurrentUser ( 'ad.searchTasks' , {
269
+ body : getLatestTaskForDetectorQuery ( detectorId , false ) ,
270
+ } ) ;
271
+
272
+ const realtimeTask = get (
273
+ get ( realtimeTaskResponse , 'hits.hits' , [ ] ) . map ( ( taskResponse : any ) => {
274
+ return {
275
+ id : get ( taskResponse , '_id' ) ,
276
+ ...get ( taskResponse , '_source' ) ,
277
+ } ;
278
+ } ) ,
279
+ 0
280
+ ) ;
281
+ const historicalTask = get (
282
+ get ( historicalTaskResponse , 'hits.hits' , [ ] ) . map (
283
+ ( taskResponse : any ) => {
284
+ return {
285
+ id : get ( taskResponse , '_id' ) ,
286
+ ...get ( taskResponse , '_source' ) ,
287
+ } ;
288
+ }
258
289
) ,
259
- initProgress : getTaskInitProgress ( response . realtime_detection_task ) ,
290
+ 0
291
+ ) ;
292
+
293
+ const taskAndJobFields = convertTaskAndJobFieldsToCamelCase (
294
+ realtimeTask ,
295
+ historicalTask ,
296
+ detectorResponse . anomaly_detector_job
297
+ ) ;
298
+
299
+ // Combine the static and task-and-job-related fields into
300
+ // a final response
301
+ const finalResponse = {
302
+ ...staticFields ,
303
+ ...taskAndJobFields ,
260
304
} ;
261
305
262
306
return opensearchDashboardsResponse . ok ( {
263
307
body : {
264
308
ok : true ,
265
- response : convertDetectorKeysToCamelCase ( resp ) as Detector ,
309
+ response : finalResponse ,
266
310
} ,
267
311
} ) ;
268
312
} catch ( err ) {
@@ -523,16 +567,16 @@ export default class AdService {
523
567
. callAsCurrentUser ( 'ad.searchDetector' , { body : requestBody } ) ;
524
568
525
569
const totalDetectors = get ( response , 'hits.total.value' , 0 ) ;
570
+
526
571
//Get all detectors from search detector API
527
572
const allDetectors = get ( response , 'hits.hits' , [ ] ) . reduce (
528
- ( acc : any , detector : any ) => ( {
573
+ ( acc : any , detectorResponse : any ) => ( {
529
574
...acc ,
530
- [ detector . _id ] : {
531
- id : detector . _id ,
532
- description : get ( detector , '_source.description' , '' ) ,
533
- indices : get ( detector , '_source.indices' , [ ] ) ,
534
- lastUpdateTime : get ( detector , '_source.last_update_time' , 0 ) ,
535
- ...convertDetectorKeysToCamelCase ( get ( detector , '_source' , { } ) ) ,
575
+ [ detectorResponse . _id ] : {
576
+ id : detectorResponse . _id ,
577
+ primaryTerm : detectorResponse . _primary_term ,
578
+ seqNo : detectorResponse . _seq_no ,
579
+ ...convertStaticFieldsToCamelCase ( detectorResponse . _source ) ,
536
580
} ,
537
581
} ) ,
538
582
{ }
@@ -602,46 +646,60 @@ export default class AdService {
602
646
) ;
603
647
}
604
648
605
- // Get real-time and historical task info by looping through each ID & retrieving
606
- // - curState by getting real-time task state
607
- // - enabledTime by getting real-time task's execution_start time
608
- // - taskId by getting historical task's _id
609
- const latestDetectorTasksQuery = getLatestDetectorTasksQuery ( ) ;
610
- const detectorTasksResponse : any = await this . client
649
+ // Fetch the latest realtime and historical tasks for all detectors
650
+ // using terms aggregations
651
+ const realtimeTasksResponse : any = await this . client
611
652
. asScoped ( request )
612
653
. callAsCurrentUser ( 'ad.searchTasks' , {
613
- body : latestDetectorTasksQuery ,
654
+ body : getLatestDetectorTasksQuery ( true ) ,
614
655
} ) ;
656
+ const historicalTasksResponse : any = await this . client
657
+ . asScoped ( request )
658
+ . callAsCurrentUser ( 'ad.searchTasks' , {
659
+ body : getLatestDetectorTasksQuery ( false ) ,
660
+ } ) ;
661
+
662
+ const realtimeTasks = get (
663
+ realtimeTasksResponse ,
664
+ 'aggregations.detectors.buckets' ,
665
+ [ ]
666
+ ) . reduce ( ( acc : any , bucket : any ) => {
667
+ return {
668
+ ...acc ,
669
+ [ bucket . key ] : {
670
+ realtimeTask : get ( bucket , 'latest_tasks.hits.hits.0' , undefined ) ,
671
+ } ,
672
+ } ;
673
+ } , { } ) ;
615
674
616
- // Convert response to a map of each detector ID => list of latest tasks (historical & real-time)
617
- const detectorTasks = get (
618
- detectorTasksResponse ,
675
+ const historicalTasks = get (
676
+ historicalTasksResponse ,
619
677
'aggregations.detectors.buckets' ,
620
678
[ ]
621
679
) . reduce ( ( acc : any , bucket : any ) => {
622
680
return {
623
681
...acc ,
624
682
[ bucket . key ] : {
625
- tasks : bucket . latest_tasks . hits . hits ,
683
+ historicalTask : get ( bucket , ' latest_tasks.hits.hits.0' , undefined ) ,
626
684
} ,
627
685
} ;
628
686
} , { } ) ;
629
687
688
+ // Get real-time and historical task info by looping through each detector & retrieving
689
+ // - curState by getting real-time task state
690
+ // - enabledTime by getting real-time task's execution_start time
691
+ // - taskId by getting historical task's _id
630
692
finalDetectors . forEach ( ( detector ) => {
631
- // Set default values for the task-related fields,
632
- // override if latest tasks were found
633
- detector . curState = DETECTOR_STATE . DISABLED ;
634
- detector . enabledTime = undefined ;
635
- detector . taskId = undefined ;
636
-
637
- get ( detectorTasks [ detector . id ] , 'tasks' , [ ] ) . forEach ( ( task : any ) => {
638
- if ( isRealTimeTask ( task . _source ) ) {
639
- detector . curState = getTaskState ( task . _source ) ;
640
- detector . enabledTime = task . _source . execution_start_time ;
641
- } else {
642
- detector . taskId = task . _id ;
643
- }
644
- } ) ;
693
+ const realtimeTask = get (
694
+ realtimeTasks [ detector . id ] ,
695
+ 'realtimeTask._source'
696
+ ) ;
697
+ detector . curState = getTaskState ( realtimeTask ) ;
698
+ detector . enabledTime = get ( realtimeTask , 'execution_start_time' ) ;
699
+ detector . taskId = get (
700
+ historicalTasks [ detector . id ] ,
701
+ 'historicalTask._id'
702
+ ) ;
645
703
} ) ;
646
704
647
705
return opensearchDashboardsResponse . ok ( {
0 commit comments