20
20
import org .opensearch .plugin .insights .rules .model .GroupingType ;
21
21
import org .opensearch .plugin .insights .rules .model .MetricType ;
22
22
import org .opensearch .plugin .insights .rules .model .SearchQueryRecord ;
23
+ import org .opensearch .plugin .insights .rules .model .healthStats .QueryGrouperHealthStats ;
23
24
import org .opensearch .plugin .insights .settings .QueryInsightsSettings ;
24
25
25
26
/**
@@ -39,12 +40,12 @@ public class MinMaxHeapQueryGrouper implements QueryGrouper {
39
40
/**
40
41
* Metric type for the current grouping service
41
42
*/
42
- private MetricType metricType ;
43
+ private final MetricType metricType ;
43
44
44
45
/**
45
46
* Aggregation type for the current grouping service
46
47
*/
47
- private AggregationType aggregationType ;
48
+ private final AggregationType aggregationType ;
48
49
/**
49
50
* Map storing groupingId to Tuple containing Aggregate search query record and boolean.
50
51
* SearchQueryRecord: Aggregate search query record to store the aggregate of a metric type based on the aggregation type..
@@ -53,18 +54,18 @@ public class MinMaxHeapQueryGrouper implements QueryGrouper {
53
54
* boolean: True if the aggregate record is in the Top N queries priority query (min heap) and False if the aggregate
54
55
* record is in the Max Heap
55
56
*/
56
- private ConcurrentHashMap <String , Tuple <SearchQueryRecord , Boolean >> groupIdToAggSearchQueryRecord ;
57
+ private final ConcurrentHashMap <String , Tuple <SearchQueryRecord , Boolean >> groupIdToAggSearchQueryRecord ;
57
58
/**
58
59
* Min heap to keep track of the Top N query groups and is passed from TopQueriesService as the topQueriesStore
59
60
*/
60
- private PriorityBlockingQueue <SearchQueryRecord > minHeapTopQueriesStore ;
61
+ private final PriorityBlockingQueue <SearchQueryRecord > minHeapTopQueriesStore ;
61
62
/**
62
63
* The Max heap is an overflow data structure used to manage records that exceed the capacity of the Min heap.
63
64
* It stores all records not included in the Top N query results. When the aggregate measurement for one of these
64
65
* records is updated and it now qualifies as part of the Top N, the record is moved from the Max heap to the Min heap,
65
66
* and the records are rearranged accordingly.
66
67
*/
67
- private PriorityBlockingQueue <SearchQueryRecord > maxHeapQueryStore ;
68
+ private final PriorityBlockingQueue <SearchQueryRecord > maxHeapQueryStore ;
68
69
69
70
/**
70
71
* Top N size based on the configuration set
@@ -80,11 +81,11 @@ public class MinMaxHeapQueryGrouper implements QueryGrouper {
80
81
private int maxGroups ;
81
82
82
83
public MinMaxHeapQueryGrouper (
83
- MetricType metricType ,
84
- GroupingType groupingType ,
85
- AggregationType aggregationType ,
86
- PriorityBlockingQueue <SearchQueryRecord > topQueriesStore ,
87
- int topNSize
84
+ final MetricType metricType ,
85
+ final GroupingType groupingType ,
86
+ final AggregationType aggregationType ,
87
+ final PriorityBlockingQueue <SearchQueryRecord > topQueriesStore ,
88
+ final int topNSize
88
89
) {
89
90
this .groupingType = groupingType ;
90
91
this .metricType = metricType ;
@@ -103,7 +104,7 @@ public MinMaxHeapQueryGrouper(
103
104
* @return return the search query record that represents the group
104
105
*/
105
106
@ Override
106
- public SearchQueryRecord add (SearchQueryRecord searchQueryRecord ) {
107
+ public SearchQueryRecord add (final SearchQueryRecord searchQueryRecord ) {
107
108
if (groupingType == GroupingType .NONE ) {
108
109
throw new IllegalArgumentException ("Do not use addQueryToGroup when GroupingType is None" );
109
110
}
@@ -120,8 +121,7 @@ public SearchQueryRecord add(SearchQueryRecord searchQueryRecord) {
120
121
// Add to min PQ and promote to max
121
122
// If max PQ is empty return else try to promote record from max to min
122
123
if (!groupIdToAggSearchQueryRecord .containsKey (groupId )) {
123
- boolean maxGroupsLimitReached = checkMaxGroupsLimitReached (groupId );
124
- if (maxGroupsLimitReached ) {
124
+ if (checkMaxGroupsLimitReached (groupId )) {
125
125
return null ;
126
126
}
127
127
aggregateSearchQueryRecord = searchQueryRecord ;
@@ -158,7 +158,7 @@ public void drain() {
158
158
* @return grouping type changed
159
159
*/
160
160
@ Override
161
- public boolean setGroupingType (GroupingType newGroupingType ) {
161
+ public boolean setGroupingType (final GroupingType newGroupingType ) {
162
162
if (this .groupingType != newGroupingType ) {
163
163
this .groupingType = newGroupingType ;
164
164
drain ();
@@ -183,7 +183,7 @@ public GroupingType getGroupingType() {
183
183
* @return max groups changed
184
184
*/
185
185
@ Override
186
- public boolean setMaxGroups (int maxGroups ) {
186
+ public boolean setMaxGroups (final int maxGroups ) {
187
187
if (this .maxGroups != maxGroups ) {
188
188
this .maxGroups = maxGroups ;
189
189
drain ();
@@ -197,17 +197,21 @@ public boolean setMaxGroups(int maxGroups) {
197
197
* @param newSize new size
198
198
*/
199
199
@ Override
200
- public void updateTopNSize (int newSize ) {
200
+ public void updateTopNSize (final int newSize ) {
201
201
this .topNSize = newSize ;
202
202
}
203
203
204
- private void addToMinPQ (SearchQueryRecord searchQueryRecord , String groupId ) {
204
+ private void addToMinPQ (final SearchQueryRecord searchQueryRecord , final String groupId ) {
205
205
minHeapTopQueriesStore .add (searchQueryRecord );
206
206
groupIdToAggSearchQueryRecord .put (groupId , new Tuple <>(searchQueryRecord , true ));
207
207
overflow ();
208
208
}
209
209
210
- private void addAndPromote (SearchQueryRecord searchQueryRecord , SearchQueryRecord aggregateSearchQueryRecord , String groupId ) {
210
+ private void addAndPromote (
211
+ final SearchQueryRecord searchQueryRecord ,
212
+ final SearchQueryRecord aggregateSearchQueryRecord ,
213
+ final String groupId
214
+ ) {
211
215
Number measurementToAdd = searchQueryRecord .getMeasurement (metricType );
212
216
aggregateSearchQueryRecord .addMeasurement (metricType , measurementToAdd );
213
217
addToMinPQ (aggregateSearchQueryRecord , groupId );
@@ -228,7 +232,7 @@ private void overflow() {
228
232
}
229
233
}
230
234
231
- private boolean checkMaxGroupsLimitReached (String groupId ) {
235
+ private boolean checkMaxGroupsLimitReached (final String groupId ) {
232
236
if (maxGroups <= maxHeapQueryStore .size () && minHeapTopQueriesStore .size () >= topNSize ) {
233
237
log .warn (
234
238
"Exceeded [{}] setting threshold which is set at {}. Discarding new group with id {}." ,
@@ -259,11 +263,11 @@ int numberOfTopGroups() {
259
263
}
260
264
261
265
/**
262
- * Get groupingId. This should be query hashcode for SIMILARITY grouping and user_id for USER_ID grouping.
266
+ * Get groupingId. This should be the query hashcode for SIMILARITY grouping and user_id for USER_ID grouping.
263
267
* @param searchQueryRecord record
264
268
* @return Grouping Id
265
269
*/
266
- private String getGroupingId (SearchQueryRecord searchQueryRecord ) {
270
+ private String getGroupingId (final SearchQueryRecord searchQueryRecord ) {
267
271
switch (groupingType ) {
268
272
case SIMILARITY :
269
273
return searchQueryRecord .getAttributes ().get (Attribute .QUERY_HASHCODE ).toString ();
@@ -273,4 +277,13 @@ private String getGroupingId(SearchQueryRecord searchQueryRecord) {
273
277
throw new IllegalArgumentException ("The following grouping type is not supported : " + groupingType );
274
278
}
275
279
}
280
+
281
+ /**
282
+ * Get health stats of the MinMaxHeapQueryGrouperService
283
+ *
284
+ * @return QueryGrouperHealthStats
285
+ */
286
+ public QueryGrouperHealthStats getHealthStats () {
287
+ return new QueryGrouperHealthStats (this .groupIdToAggSearchQueryRecord .size (), this .maxHeapQueryStore .size ());
288
+ }
276
289
}
0 commit comments