8
8
9
9
package org .opensearch .plugin .insights .core .exporter ;
10
10
11
- import static org .opensearch .plugin .insights .core .service . TopQueriesService . isTopQueriesIndex ;
11
+ import static org .opensearch .plugin .insights .core .utils . ExporterReaderUtils . generateLocalIndexDateHash ;
12
12
import static org .opensearch .plugin .insights .settings .QueryInsightsSettings .DEFAULT_DELETE_AFTER_VALUE ;
13
13
14
- import java .time .Instant ;
14
+ import java .io .IOException ;
15
+ import java .nio .charset .Charset ;
15
16
import java .time .ZoneOffset ;
16
17
import java .time .ZonedDateTime ;
17
18
import java .time .format .DateTimeFormatter ;
18
19
import java .util .List ;
19
- import java .util .Locale ;
20
- import java .util .Map ;
21
- import java .util .concurrent .TimeUnit ;
20
+ import java .util .Objects ;
22
21
import org .apache .logging .log4j .LogManager ;
23
22
import org .apache .logging .log4j .Logger ;
23
+ import org .opensearch .ExceptionsHelper ;
24
+ import org .opensearch .ResourceAlreadyExistsException ;
25
+ import org .opensearch .action .admin .indices .create .CreateIndexRequest ;
26
+ import org .opensearch .action .admin .indices .create .CreateIndexResponse ;
27
+ import org .opensearch .action .admin .indices .delete .DeleteIndexRequest ;
24
28
import org .opensearch .action .bulk .BulkRequestBuilder ;
25
29
import org .opensearch .action .bulk .BulkResponse ;
26
30
import org .opensearch .action .index .IndexRequest ;
27
31
import org .opensearch .client .Client ;
28
- import org .opensearch .cluster .metadata .IndexMetadata ;
32
+ import org .opensearch .cluster .ClusterState ;
33
+ import org .opensearch .cluster .service .ClusterService ;
34
+ import org .opensearch .common .settings .Settings ;
29
35
import org .opensearch .common .unit .TimeValue ;
30
36
import org .opensearch .common .xcontent .XContentFactory ;
31
37
import org .opensearch .core .action .ActionListener ;
32
38
import org .opensearch .core .xcontent .ToXContent ;
39
+ import org .opensearch .index .IndexNotFoundException ;
33
40
import org .opensearch .plugin .insights .core .metrics .OperationalMetric ;
34
41
import org .opensearch .plugin .insights .core .metrics .OperationalMetricsCounter ;
35
- import org .opensearch .plugin .insights .core .service .TopQueriesService ;
36
42
import org .opensearch .plugin .insights .rules .model .SearchQueryRecord ;
37
43
38
44
/**
39
45
* Local index exporter for exporting query insights data to local OpenSearch indices.
40
46
*/
41
- public final class LocalIndexExporter implements QueryInsightsExporter {
47
+ public class LocalIndexExporter implements QueryInsightsExporter {
42
48
/**
43
49
* Logger of the local index exporter
44
50
*/
45
51
private final Logger logger = LogManager .getLogger ();
46
52
private final Client client ;
53
+ private final ClusterService clusterService ;
54
+ private final String indexMapping ;
47
55
private DateTimeFormatter indexPattern ;
48
56
private int deleteAfter ;
57
+ private final String id ;
58
+ private static final int DEFAULT_NUMBER_OF_REPLICA = 1 ;
59
+ private static final int DEFAULT_NUMBER_OF_SHARDS = 1 ;
60
+ private static final List <String > DEFAULT_SORTED_FIELDS = List .of (
61
+ "measurements.latency.number" ,
62
+ "measurements.cpu.number" ,
63
+ "measurements.memory.number"
64
+ );
65
+ private static final List <String > DEFAULT_SORTED_ORDERS = List .of ("desc" , "desc" , "desc" );
49
66
50
67
/**
51
68
* Constructor of LocalIndexExporter
52
69
*
53
70
* @param client OS client
71
+ * @param clusterService cluster service
54
72
* @param indexPattern the pattern of index to export to
73
+ * @param indexMapping the index mapping file
74
+ * @param id id of the exporter
55
75
*/
56
- public LocalIndexExporter (final Client client , final DateTimeFormatter indexPattern ) {
76
+ public LocalIndexExporter (
77
+ final Client client ,
78
+ final ClusterService clusterService ,
79
+ final DateTimeFormatter indexPattern ,
80
+ final String indexMapping ,
81
+ final String id
82
+ ) {
57
83
this .indexPattern = indexPattern ;
58
84
this .client = client ;
85
+ this .clusterService = clusterService ;
86
+ this .indexMapping = indexMapping ;
59
87
this .deleteAfter = DEFAULT_DELETE_AFTER_VALUE ;
88
+ this .id = id ;
89
+ }
90
+
91
+ @ Override
92
+ public String getId () {
93
+ return id ;
60
94
}
61
95
62
96
/**
@@ -73,7 +107,7 @@ public DateTimeFormatter getIndexPattern() {
73
107
*
74
108
* @param indexPattern index pattern
75
109
*/
76
- void setIndexPattern (DateTimeFormatter indexPattern ) {
110
+ public void setIndexPattern (DateTimeFormatter indexPattern ) {
77
111
this .indexPattern = indexPattern ;
78
112
}
79
113
@@ -89,28 +123,76 @@ public void export(final List<SearchQueryRecord> records) {
89
123
}
90
124
try {
91
125
final String indexName = buildLocalIndexName ();
92
- final BulkRequestBuilder bulkRequestBuilder = client .prepareBulk ().setTimeout (TimeValue .timeValueMinutes (1 ));
93
- for (SearchQueryRecord record : records ) {
94
- bulkRequestBuilder .add (
95
- new IndexRequest (indexName ).source (record .toXContent (XContentFactory .jsonBuilder (), ToXContent .EMPTY_PARAMS ))
126
+ if (!checkIndexExists (indexName )) {
127
+ CreateIndexRequest createIndexRequest = new CreateIndexRequest (indexName );
128
+
129
+ createIndexRequest .settings (
130
+ Settings .builder ()
131
+ .putList ("index.sort.field" , DEFAULT_SORTED_FIELDS )
132
+ .putList ("index.sort.order" , DEFAULT_SORTED_ORDERS )
133
+ .put ("index.number_of_shards" , DEFAULT_NUMBER_OF_SHARDS )
134
+ .put ("index.number_of_replicas" , DEFAULT_NUMBER_OF_REPLICA )
96
135
);
136
+ createIndexRequest .mapping (readIndexMappings ());
137
+
138
+ client .admin ().indices ().create (createIndexRequest , new ActionListener <>() {
139
+ @ Override
140
+ public void onResponse (CreateIndexResponse createIndexResponse ) {
141
+ if (createIndexResponse .isAcknowledged ()) {
142
+ try {
143
+ bulk (indexName , records );
144
+ } catch (IOException e ) {
145
+ OperationalMetricsCounter .getInstance ().incrementCounter (OperationalMetric .LOCAL_INDEX_EXPORTER_EXCEPTIONS );
146
+ logger .error ("Unable to index query insights data: " , e );
147
+ }
148
+ }
149
+ }
150
+
151
+ @ Override
152
+ public void onFailure (Exception e ) {
153
+ Throwable cause = ExceptionsHelper .unwrapCause (e );
154
+ if (cause instanceof ResourceAlreadyExistsException ) {
155
+ try {
156
+ bulk (indexName , records );
157
+ } catch (IOException ex ) {
158
+ OperationalMetricsCounter .getInstance ().incrementCounter (OperationalMetric .LOCAL_INDEX_EXPORTER_EXCEPTIONS );
159
+ logger .error ("Unable to index query insights data: " , e );
160
+ }
161
+ } else {
162
+ OperationalMetricsCounter .getInstance ().incrementCounter (OperationalMetric .LOCAL_INDEX_EXPORTER_EXCEPTIONS );
163
+ logger .error ("Unable to create query insights index: " , e );
164
+ }
165
+ }
166
+ });
167
+ } else {
168
+ bulk (indexName , records );
97
169
}
98
- bulkRequestBuilder .execute (new ActionListener <BulkResponse >() {
99
- @ Override
100
- public void onResponse (BulkResponse bulkItemResponses ) {}
101
-
102
- @ Override
103
- public void onFailure (Exception e ) {
104
- OperationalMetricsCounter .getInstance ().incrementCounter (OperationalMetric .LOCAL_INDEX_EXPORTER_BULK_FAILURES );
105
- logger .error ("Failed to execute bulk operation for query insights data: " , e );
106
- }
107
- });
108
170
} catch (final Exception e ) {
109
171
OperationalMetricsCounter .getInstance ().incrementCounter (OperationalMetric .LOCAL_INDEX_EXPORTER_EXCEPTIONS );
110
172
logger .error ("Unable to index query insights data: " , e );
111
173
}
112
174
}
113
175
176
+ private void bulk (final String indexName , final List <SearchQueryRecord > records ) throws IOException {
177
+ final BulkRequestBuilder bulkRequestBuilder = client .prepareBulk ().setTimeout (TimeValue .timeValueMinutes (1 ));
178
+ for (SearchQueryRecord record : records ) {
179
+ bulkRequestBuilder .add (
180
+ new IndexRequest (indexName ).id (record .getId ())
181
+ .source (record .toXContent (XContentFactory .jsonBuilder (), ToXContent .EMPTY_PARAMS ))
182
+ );
183
+ }
184
+ bulkRequestBuilder .execute (new ActionListener <BulkResponse >() {
185
+ @ Override
186
+ public void onResponse (BulkResponse bulkItemResponses ) {}
187
+
188
+ @ Override
189
+ public void onFailure (Exception e ) {
190
+ OperationalMetricsCounter .getInstance ().incrementCounter (OperationalMetric .LOCAL_INDEX_EXPORTER_BULK_FAILURES );
191
+ logger .error ("Failed to execute bulk operation for query insights data: " , e );
192
+ }
193
+ });
194
+ }
195
+
114
196
/**
115
197
* Close the exporter sink
116
198
*/
@@ -125,7 +207,8 @@ public void close() {
125
207
* @return A string representing the index name in the format "top_queries-YYYY.MM.dd-01234".
126
208
*/
127
209
String buildLocalIndexName () {
128
- return indexPattern .format (ZonedDateTime .now (ZoneOffset .UTC )) + "-" + generateLocalIndexDateHash ();
210
+ ZonedDateTime currentTime = ZonedDateTime .now (ZoneOffset .UTC );
211
+ return indexPattern .format (currentTime ) + "-" + generateLocalIndexDateHash (currentTime .toLocalDate ());
129
212
}
130
213
131
214
/**
@@ -138,33 +221,58 @@ public void setDeleteAfter(final int deleteAfter) {
138
221
}
139
222
140
223
/**
141
- * Delete Top N local indices older than the configured data retention period
224
+ * Get local index exporter data retention period
142
225
*
143
- * @param indexMetadataMap Map of index name {@link String} to {@link IndexMetadata}
226
+ * @return the number of days after which Top N local indices should be deleted
144
227
*/
145
- public void deleteExpiredTopNIndices (final Map <String , IndexMetadata > indexMetadataMap ) {
146
- long expirationMillisLong = System .currentTimeMillis () - TimeUnit .DAYS .toMillis (deleteAfter );
147
- for (Map .Entry <String , IndexMetadata > entry : indexMetadataMap .entrySet ()) {
148
- String indexName = entry .getKey ();
149
- if (isTopQueriesIndex (indexName ) && entry .getValue ().getCreationDate () <= expirationMillisLong ) {
150
- // delete this index
151
- TopQueriesService .deleteSingleIndex (indexName , client );
152
- }
153
- }
228
+ public int getDeleteAfter () {
229
+ return deleteAfter ;
154
230
}
155
231
156
232
/**
157
- * Generates a consistent 5-digit numeric hash based on the current UTC date.
158
- * The generated hash is deterministic, meaning it will return the same result for the same date.
233
+ * Deletes the specified index and logs any failure that occurs during the operation.
159
234
*
160
- * @return A 5-digit numeric string representation of the current date's hash.
235
+ * @param indexName The name of the index to delete.
236
+ * @param client The OpenSearch client used to perform the deletion.
237
+ */
238
+ public void deleteSingleIndex (String indexName , Client client ) {
239
+ Logger logger = LogManager .getLogger ();
240
+ client .admin ().indices ().delete (new DeleteIndexRequest (indexName ), new ActionListener <>() {
241
+ @ Override
242
+ // CS-SUPPRESS-SINGLE: RegexpSingleline It is not possible to use phrase "cluster manager" instead of master here
243
+ public void onResponse (org .opensearch .action .support .master .AcknowledgedResponse acknowledgedResponse ) {}
244
+
245
+ @ Override
246
+ public void onFailure (Exception e ) {
247
+ Throwable cause = ExceptionsHelper .unwrapCause (e );
248
+ if (cause instanceof IndexNotFoundException ) {
249
+ return ;
250
+ }
251
+ OperationalMetricsCounter .getInstance ().incrementCounter (OperationalMetric .LOCAL_INDEX_EXPORTER_DELETE_FAILURES );
252
+ logger .error ("Failed to delete index '{}': " , indexName , e );
253
+ }
254
+ });
255
+ }
256
+
257
+ /**
258
+ * check if index exists
259
+ * @return boolean
161
260
*/
162
- public static String generateLocalIndexDateHash ( ) {
163
- // Get the current date in UTC (yyyy-MM-dd format)
164
- String currentDate = DateTimeFormatter . ofPattern ( "yyyy-MM-dd" , Locale . ROOT )
165
- . format ( Instant . now (). atOffset ( ZoneOffset . UTC ). toLocalDate ());
261
+ private boolean checkIndexExists ( String indexName ) {
262
+ ClusterState clusterState = clusterService . state ();
263
+ return clusterState . getRoutingTable (). hasIndex ( indexName );
264
+ }
166
265
167
- // Generate a 5-digit numeric hash from the date's hashCode
168
- return String .format (Locale .ROOT , "%05d" , (currentDate .hashCode () % 100000 + 100000 ) % 100000 );
266
+ /**
267
+ * get correlation rule index mappings
268
+ * @return mappings of correlation rule index
269
+ * @throws IOException IOException
270
+ */
271
+ private String readIndexMappings () throws IOException {
272
+ return new String (
273
+ Objects .requireNonNull (LocalIndexExporter .class .getClassLoader ().getResourceAsStream (indexMapping )).readAllBytes (),
274
+ Charset .defaultCharset ()
275
+ );
169
276
}
277
+
170
278
}
0 commit comments