11
11
import static org .opensearch .plugin .insights .core .service .TopQueriesService .isTopQueriesIndex ;
12
12
import static org .opensearch .plugin .insights .settings .QueryInsightsSettings .DEFAULT_DELETE_AFTER_VALUE ;
13
13
14
+ import java .io .IOException ;
15
+ import java .nio .charset .Charset ;
14
16
import java .time .Instant ;
15
17
import java .time .ZoneOffset ;
16
18
import java .time .ZonedDateTime ;
17
19
import java .time .format .DateTimeFormatter ;
18
20
import java .util .List ;
19
21
import java .util .Locale ;
20
22
import java .util .Map ;
23
+ import java .util .Objects ;
21
24
import java .util .concurrent .TimeUnit ;
22
25
import org .apache .logging .log4j .LogManager ;
23
26
import org .apache .logging .log4j .Logger ;
27
+ import org .opensearch .ResourceAlreadyExistsException ;
28
+ import org .opensearch .action .admin .indices .create .CreateIndexRequest ;
29
+ import org .opensearch .action .admin .indices .create .CreateIndexResponse ;
24
30
import org .opensearch .action .bulk .BulkRequestBuilder ;
25
31
import org .opensearch .action .bulk .BulkResponse ;
26
32
import org .opensearch .action .index .IndexRequest ;
27
33
import org .opensearch .client .Client ;
34
+ import org .opensearch .cluster .ClusterState ;
28
35
import org .opensearch .cluster .metadata .IndexMetadata ;
36
+ import org .opensearch .cluster .service .ClusterService ;
37
+ import org .opensearch .common .settings .Settings ;
29
38
import org .opensearch .common .unit .TimeValue ;
30
39
import org .opensearch .common .xcontent .XContentFactory ;
31
40
import org .opensearch .core .action .ActionListener ;
@@ -44,19 +53,38 @@ public final class LocalIndexExporter implements QueryInsightsExporter {
44
53
*/
45
54
private final Logger logger = LogManager .getLogger ();
46
55
private final Client client ;
56
+ private final ClusterService clusterService ;
57
+ private final String indexMapping ;
47
58
private DateTimeFormatter indexPattern ;
48
59
private int deleteAfter ;
49
60
private final String id ;
61
+ private static final int DEFAULT_NUMBER_OF_REPLICA = 1 ;
62
+ private static final int DEFAULT_NUMBER_OF_SHARDS = 1 ;
63
+ private static final List <String > DEFAULT_SORTED_FIELDS = List .of (
64
+ "measurements.latency.number" ,
65
+ "measurements.cpu.number" ,
66
+ "measurements.memory.number"
67
+ );
68
+ private static final List <String > DEFAULT_SORTED_ORDERS = List .of (
69
+ "desc" ,
70
+ "desc" ,
71
+ "desc"
72
+ );
50
73
51
74
/**
52
75
* Constructor of LocalIndexExporter
53
76
*
54
77
* @param client OS client
78
+ * @param clusterService cluster service
55
79
* @param indexPattern the pattern of index to export to
80
+ * @param indexMapping the index mapping file
81
+ * @param id id of the exporter
56
82
*/
57
- public LocalIndexExporter (final Client client , final DateTimeFormatter indexPattern , final String id ) {
83
+ public LocalIndexExporter (final Client client , final ClusterService clusterService , final DateTimeFormatter indexPattern , final String indexMapping , final String id ) {
58
84
this .indexPattern = indexPattern ;
59
85
this .client = client ;
86
+ this .clusterService = clusterService ;
87
+ this .indexMapping = indexMapping ;
60
88
this .deleteAfter = DEFAULT_DELETE_AFTER_VALUE ;
61
89
this .id = id ;
62
90
}
@@ -96,28 +124,72 @@ public void export(final List<SearchQueryRecord> records) {
96
124
}
97
125
try {
98
126
final String indexName = buildLocalIndexName ();
99
- final BulkRequestBuilder bulkRequestBuilder = client .prepareBulk ().setTimeout (TimeValue .timeValueMinutes (1 ));
100
- for (SearchQueryRecord record : records ) {
101
- bulkRequestBuilder .add (
102
- new IndexRequest (indexName ).source (record .toXContent (XContentFactory .jsonBuilder (), ToXContent .EMPTY_PARAMS ))
127
+ if (!checkIndexExists (indexName )) {
128
+ CreateIndexRequest createIndexRequest = new CreateIndexRequest (indexName );
129
+
130
+ createIndexRequest .settings (Settings .builder ()
131
+ .putList ("index.sort.field" , DEFAULT_SORTED_FIELDS )
132
+ .putList ("index.sort.order" , DEFAULT_SORTED_ORDERS )
133
+ .put ("index.number_of_shards" , DEFAULT_NUMBER_OF_SHARDS )
134
+ .put ("index.number_of_replicas" , DEFAULT_NUMBER_OF_REPLICA )
103
135
);
136
+ createIndexRequest .mapping (readIndexMappings ());
137
+
138
+ client .admin ().indices ().create (createIndexRequest , new ActionListener <>() {
139
+ @ Override
140
+ public void onResponse (CreateIndexResponse createIndexResponse ) {
141
+ if (createIndexResponse .isAcknowledged ()) {
142
+ try {
143
+ bulk (indexName , records );
144
+ } catch (IOException e ) {
145
+ OperationalMetricsCounter .getInstance ().incrementCounter (OperationalMetric .LOCAL_INDEX_EXPORTER_EXCEPTIONS );
146
+ logger .error ("Unable to index query insights data: " , e );
147
+ }
148
+ }
149
+ }
150
+ @ Override
151
+ public void onFailure (Exception e ) {
152
+ if (e instanceof ResourceAlreadyExistsException ) {
153
+ try {
154
+ bulk (indexName , records );
155
+ } catch (IOException ex ) {
156
+ OperationalMetricsCounter .getInstance ().incrementCounter (OperationalMetric .LOCAL_INDEX_EXPORTER_EXCEPTIONS );
157
+ logger .error ("Unable to index query insights data: " , e );
158
+ }
159
+ } else {
160
+ OperationalMetricsCounter .getInstance ().incrementCounter (OperationalMetric .LOCAL_INDEX_EXPORTER_EXCEPTIONS );
161
+ logger .error ("Unable to create query insights index: " , e );
162
+ }
163
+ }
164
+ });
165
+ } else {
166
+ bulk (indexName , records );
104
167
}
105
- bulkRequestBuilder .execute (new ActionListener <BulkResponse >() {
106
- @ Override
107
- public void onResponse (BulkResponse bulkItemResponses ) {}
108
-
109
- @ Override
110
- public void onFailure (Exception e ) {
111
- OperationalMetricsCounter .getInstance ().incrementCounter (OperationalMetric .LOCAL_INDEX_EXPORTER_BULK_FAILURES );
112
- logger .error ("Failed to execute bulk operation for query insights data: " , e );
113
- }
114
- });
115
168
} catch (final Exception e ) {
116
169
OperationalMetricsCounter .getInstance ().incrementCounter (OperationalMetric .LOCAL_INDEX_EXPORTER_EXCEPTIONS );
117
170
logger .error ("Unable to index query insights data: " , e );
118
171
}
119
172
}
120
173
174
+ private void bulk (final String indexName , final List <SearchQueryRecord > records ) throws IOException {
175
+ final BulkRequestBuilder bulkRequestBuilder = client .prepareBulk ().setTimeout (TimeValue .timeValueMinutes (1 ));
176
+ for (SearchQueryRecord record : records ) {
177
+ bulkRequestBuilder .add (
178
+ new IndexRequest (indexName ).id (record .getId ()).source (record .toXContent (XContentFactory .jsonBuilder (), ToXContent .EMPTY_PARAMS ))
179
+ );
180
+ }
181
+ bulkRequestBuilder .execute (new ActionListener <BulkResponse >() {
182
+ @ Override
183
+ public void onResponse (BulkResponse bulkItemResponses ) {}
184
+
185
+ @ Override
186
+ public void onFailure (Exception e ) {
187
+ OperationalMetricsCounter .getInstance ().incrementCounter (OperationalMetric .LOCAL_INDEX_EXPORTER_BULK_FAILURES );
188
+ logger .error ("Failed to execute bulk operation for query insights data: " , e );
189
+ }
190
+ });
191
+ }
192
+
121
193
/**
122
194
* Close the exporter sink
123
195
*/
@@ -174,4 +246,27 @@ public static String generateLocalIndexDateHash() {
174
246
// Generate a 5-digit numeric hash from the date's hashCode
175
247
return String .format (Locale .ROOT , "%05d" , (currentDate .hashCode () % 100000 + 100000 ) % 100000 );
176
248
}
249
+
250
+ /**
251
+ * check if index exists
252
+ * @return boolean
253
+ */
254
+ private boolean checkIndexExists (String indexName ) {
255
+ ClusterState clusterState = clusterService .state ();
256
+ return clusterState .getRoutingTable ().hasIndex (indexName );
257
+ }
258
+
259
+ /**
260
+ * get correlation rule index mappings
261
+ * @return mappings of correlation rule index
262
+ * @throws IOException IOException
263
+ */
264
+ private String readIndexMappings () throws IOException {
265
+ return new String (
266
+ Objects .requireNonNull (LocalIndexExporter .class .getClassLoader ().getResourceAsStream (indexMapping ))
267
+ .readAllBytes (),
268
+ Charset .defaultCharset ()
269
+ );
270
+ }
271
+
177
272
}
0 commit comments