24
24
import java .util .Collection ;
25
25
import java .util .List ;
26
26
import java .util .Locale ;
27
- import java .util .PriorityQueue ;
27
+ import java .util .concurrent . PriorityBlockingQueue ;
28
28
import java .util .concurrent .atomic .AtomicReference ;
29
29
import java .util .stream .Collectors ;
30
30
import java .util .stream .Stream ;
35
35
import org .opensearch .plugin .insights .core .exporter .QueryInsightsExporter ;
36
36
import org .opensearch .plugin .insights .core .exporter .QueryInsightsExporterFactory ;
37
37
import org .opensearch .plugin .insights .core .exporter .SinkType ;
38
+ import org .opensearch .plugin .insights .core .service .grouper .MinMaxHeapQueryGrouper ;
39
+ import org .opensearch .plugin .insights .core .service .grouper .QueryGrouper ;
40
+ import org .opensearch .plugin .insights .rules .model .AggregationType ;
41
+ import org .opensearch .plugin .insights .rules .model .GroupingType ;
38
42
import org .opensearch .plugin .insights .rules .model .MetricType ;
39
43
import org .opensearch .plugin .insights .rules .model .SearchQueryRecord ;
40
44
import org .opensearch .plugin .insights .settings .QueryInsightsSettings ;
@@ -66,7 +70,7 @@ public class TopQueriesService {
66
70
/**
67
71
* The internal thread-safe store that holds the top n queries insight data
68
72
*/
69
- private final PriorityQueue <SearchQueryRecord > topQueriesStore ;
73
+ private final PriorityBlockingQueue <SearchQueryRecord > topQueriesStore ;
70
74
71
75
/**
72
76
* The AtomicReference of a snapshot of the current window top queries for getters to consume
@@ -93,6 +97,8 @@ public class TopQueriesService {
93
97
*/
94
98
private QueryInsightsExporter exporter ;
95
99
100
+ private QueryGrouper queryGrouper ;
101
+
96
102
TopQueriesService (
97
103
final MetricType metricType ,
98
104
final ThreadPool threadPool ,
@@ -106,9 +112,16 @@ public class TopQueriesService {
106
112
this .windowSize = QueryInsightsSettings .DEFAULT_WINDOW_SIZE ;
107
113
this .windowStart = -1L ;
108
114
this .exporter = null ;
109
- topQueriesStore = new PriorityQueue <>(topNSize , (a , b ) -> SearchQueryRecord .compare (a , b , metricType ));
115
+ topQueriesStore = new PriorityBlockingQueue <>(topNSize , (a , b ) -> SearchQueryRecord .compare (a , b , metricType ));
110
116
topQueriesCurrentSnapshot = new AtomicReference <>(new ArrayList <>());
111
117
topQueriesHistorySnapshot = new AtomicReference <>(new ArrayList <>());
118
+ queryGrouper = new MinMaxHeapQueryGrouper (
119
+ metricType ,
120
+ QueryInsightsSettings .DEFAULT_GROUPING_TYPE ,
121
+ AggregationType .AVERAGE ,
122
+ topQueriesStore ,
123
+ topNSize
124
+ );
112
125
}
113
126
114
127
/**
@@ -118,6 +131,7 @@ public class TopQueriesService {
118
131
*/
119
132
public void setTopNSize (final int topNSize ) {
120
133
this .topNSize = topNSize ;
134
+ this .queryGrouper .updateTopNSize (topNSize );
121
135
}
122
136
123
137
/**
@@ -169,6 +183,20 @@ public void setWindowSize(final TimeValue windowSize) {
169
183
this .windowStart = -1L ;
170
184
}
171
185
186
+ public void setGrouping (final GroupingType groupingType ) {
187
+ boolean changed = queryGrouper .setGroupingType (groupingType );
188
+ if (changed ) {
189
+ drain ();
190
+ }
191
+ }
192
+
193
+ public void setMaxGroups (final int maxGroups ) {
194
+ boolean changed = queryGrouper .setMaxGroups (maxGroups );
195
+ if (changed ) {
196
+ drain ();
197
+ }
198
+ }
199
+
172
200
/**
173
201
* Validate if the window size is valid, based on internal constrains.
174
202
*
@@ -306,10 +334,16 @@ void consumeRecords(final List<SearchQueryRecord> records) {
306
334
}
307
335
308
336
private void addToTopNStore (final List <SearchQueryRecord > records ) {
309
- topQueriesStore .addAll (records );
310
- // remove top elements for fix sizing priority queue
311
- while (topQueriesStore .size () > topNSize ) {
312
- topQueriesStore .poll ();
337
+ if (queryGrouper .getGroupingType () != GroupingType .NONE ) {
338
+ for (SearchQueryRecord record : records ) {
339
+ queryGrouper .add (record );
340
+ }
341
+ } else {
342
+ topQueriesStore .addAll (records );
343
+ // remove top elements for fix sizing priority queue
344
+ while (topQueriesStore .size () > topNSize ) {
345
+ topQueriesStore .poll ();
346
+ }
313
347
}
314
348
}
315
349
@@ -329,6 +363,9 @@ private void rotateWindowIfNecessary(final long newWindowStart) {
329
363
}
330
364
topQueriesHistorySnapshot .set (history );
331
365
topQueriesStore .clear ();
366
+ if (queryGrouper .getGroupingType () != GroupingType .NONE ) {
367
+ queryGrouper .drain ();
368
+ }
332
369
topQueriesCurrentSnapshot .set (new ArrayList <>());
333
370
windowStart = newWindowStart ;
334
371
// export to the configured sink
@@ -368,4 +405,13 @@ public List<SearchQueryRecord> getTopQueriesCurrentSnapshot() {
368
405
public void close () throws IOException {
369
406
queryInsightsExporterFactory .closeExporter (this .exporter );
370
407
}
408
+
409
+ /**
410
+ * Drain internal stores.
411
+ */
412
+ private void drain () {
413
+ topQueriesStore .clear ();
414
+ topQueriesHistorySnapshot .set (new ArrayList <>());
415
+ topQueriesCurrentSnapshot .set (new ArrayList <>());
416
+ }
371
417
}
0 commit comments