74
74
import org .opensearch .core .xcontent .NamedXContentRegistry ;
75
75
import org .opensearch .core .xcontent .XContentParser ;
76
76
import org .opensearch .core .xcontent .XContentParser .Token ;
77
+ import org .opensearch .forecast .constant .ForecastCommonName ;
77
78
import org .opensearch .forecast .indices .ForecastIndex ;
78
79
import org .opensearch .index .IndexNotFoundException ;
79
80
import org .opensearch .index .query .BoolQueryBuilder ;
@@ -289,7 +290,7 @@ protected void choosePrimaryShards(CreateIndexRequest request, boolean hiddenInd
289
290
);
290
291
}
291
292
292
- protected void deleteOldHistoryIndices (String indexPattern , TimeValue historyRetentionPeriod , Integer customResultIndexTtl ) {
293
+ protected void deleteOldHistoryIndices (String indexPattern , TimeValue historyRetentionPeriod ) {
293
294
Set <String > candidates = new HashSet <String >();
294
295
295
296
ClusterStateRequest clusterStateRequest = new ClusterStateRequest ()
@@ -302,12 +303,12 @@ protected void deleteOldHistoryIndices(String indexPattern, TimeValue historyRet
302
303
adminClient .cluster ().state (clusterStateRequest , ActionListener .wrap (clusterStateResponse -> {
303
304
String latestToDelete = null ;
304
305
long latest = Long .MIN_VALUE ;
305
- long customTtlMillis = (customResultIndexTtl != null ) ? customResultIndexTtl * 24 * 60 * 60 * 1000L : Long .MAX_VALUE ;
306
306
for (IndexMetadata indexMetaData : clusterStateResponse .getState ().metadata ().indices ().values ()) {
307
307
long creationTime = indexMetaData .getCreationDate ();
308
308
long indexAgeMillis = Instant .now ().toEpochMilli () - creationTime ;
309
- if (indexAgeMillis > historyRetentionPeriod .millis () || indexAgeMillis > customTtlMillis ) {
309
+ if (indexAgeMillis > historyRetentionPeriod .millis ()) {
310
310
String indexName = indexMetaData .getIndex ().getName ();
311
+ System .out .println ("indexName: " + indexName );
311
312
candidates .add (indexName );
312
313
if (latest < creationTime ) {
313
314
latest = creationTime ;
@@ -317,7 +318,7 @@ protected void deleteOldHistoryIndices(String indexPattern, TimeValue historyRet
317
318
}
318
319
if (candidates .size () > 1 ) {
319
320
// delete all indices except the last one because the last one may contain docs newer than the retention period
320
- candidates .remove (latestToDelete );
321
+ // candidates.remove(latestToDelete);
321
322
String [] toDelete = candidates .toArray (Strings .EMPTY_ARRAY );
322
323
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest (toDelete );
323
324
adminClient .indices ().delete (deleteIndexRequest , ActionListener .wrap (deleteIndexResponse -> {
@@ -1085,7 +1086,7 @@ public void onClusterManager() {
1085
1086
1086
1087
// schedule the next rollover for approx MAX_AGE later
1087
1088
scheduledRollover = threadPool
1088
- .scheduleWithFixedDelay (() -> rolloverAndDeleteHistoryIndex (), TimeValue .timeValueMinutes (5 ), executorName ());
1089
+ .scheduleWithFixedDelay (() -> rolloverAndDeleteHistoryIndex (), TimeValue .timeValueMinutes (1 ), executorName ());
1089
1090
} catch (Exception e ) {
1090
1091
// This should be run on cluster startup
1091
1092
logger .error ("Error rollover result indices. " + "Can't rollover result until clusterManager node is restarted." , e );
@@ -1108,7 +1109,6 @@ protected void rescheduleRollover() {
1108
1109
if (scheduledRollover != null ) {
1109
1110
scheduledRollover .cancel ();
1110
1111
}
1111
- System .out .println (5 );
1112
1112
1113
1113
scheduledRollover = threadPool
1114
1114
.scheduleWithFixedDelay (() -> rolloverAndDeleteHistoryIndex (), historyRolloverPeriod , executorName ());
@@ -1244,13 +1244,9 @@ protected void rolloverAndDeleteHistoryIndex(
1244
1244
String rolloverIndexPattern ,
1245
1245
IndexType resultIndex
1246
1246
) {
1247
- System .out .println ("resultIndexAlias: " + resultIndexAlias );
1248
- System .out .println ("allResultIndicesPattern: " + allResultIndicesPattern );
1249
- System .out .println ("rolloverIndexPattern: " + rolloverIndexPattern );
1250
- System .out .println ("resultIndex: " + resultIndex .getIndexName ());
1251
-
1252
1247
// build rollover request for default result index
1253
1248
RolloverRequest defaultResultIndexRolloverRequest = buildRolloverRequest (resultIndexAlias , rolloverIndexPattern );
1249
+ defaultResultIndexRolloverRequest .addMaxIndexDocsCondition (historyMaxDocs * getNumberOfPrimaryShards ());
1254
1250
1255
1251
// get config files that have custom result index alias to perform rollover on
1256
1252
getConfigsWithCustomResultIndexAlias (ActionListener .wrap (candidateResultAliases -> {
@@ -1271,8 +1267,6 @@ protected void rolloverAndDeleteHistoryIndex(
1271
1267
return ;
1272
1268
}
1273
1269
1274
- System .out .println ("size: " + candidateResultAliases .size ());
1275
-
1276
1270
// perform rollover and delete on found custom result index alias
1277
1271
candidateResultAliases .forEach (config -> handleCustomResultIndex (config , resultIndex ));
1278
1272
@@ -1284,16 +1278,14 @@ protected void rolloverAndDeleteHistoryIndex(
1284
1278
}
1285
1279
1286
1280
private void handleCustomResultIndex (Config config , IndexType resultIndex ) {
1287
- System .out .println ("detector name: " + config .getName ());
1288
- System .out .println ("custom index name: " + config .getCustomResultIndexOrAlias ());
1289
1281
RolloverRequest rolloverRequest = buildRolloverRequest (
1290
1282
config .getCustomResultIndexOrAlias (),
1291
1283
getCustomResultIndexPattern (config .getCustomResultIndexOrAlias ())
1292
1284
);
1293
1285
1294
1286
// add rollover conditions if found in config
1295
1287
if (config .getCustomResultIndexMinAge () != null ) {
1296
- rolloverRequest .addMaxIndexAgeCondition (TimeValue .timeValueMinutes (10 ));
1288
+ rolloverRequest .addMaxIndexAgeCondition (TimeValue .timeValueMinutes (1 ));
1297
1289
1298
1290
// rolloverRequest.addMaxIndexAgeCondition(TimeValue.timeValueDays(config.getCustomResultIndexMinAge()));
1299
1291
}
@@ -1326,7 +1318,6 @@ private RolloverRequest buildRolloverRequest(String resultIndexAlias, String rol
1326
1318
1327
1319
createRequest .index (rolloverIndexPattern ).mapping (resultMapping , XContentType .JSON );
1328
1320
choosePrimaryShards (createRequest , true );
1329
- rollOverRequest .addMaxIndexDocsCondition (historyMaxDocs * getNumberOfPrimaryShards ());
1330
1321
1331
1322
return rollOverRequest ;
1332
1323
}
@@ -1345,7 +1336,17 @@ private void proceedWithRolloverAndDelete(
1345
1336
IndexState indexState = indexStates .computeIfAbsent (resultIndex , k -> new IndexState (k .getMapping ()));
1346
1337
indexState .mappingUpToDate = true ;
1347
1338
logger .info ("{} rolled over. Conditions were: {}" , resultIndexAlias , response .getConditionStatus ());
1348
- deleteOldHistoryIndices (allResultIndicesPattern , historyRetentionPeriod , customResultIndexTtl );
1339
+ if (resultIndexAlias .startsWith (ADCommonName .CUSTOM_RESULT_INDEX_PREFIX ) || resultIndexAlias .startsWith (CUSTOM_RESULT_INDEX_PREFIX )) {
1340
+ // handle custom result index deletion
1341
+ if (customResultIndexTtl != null ) {
1342
+ // deleteOldHistoryIndices(allResultIndicesPattern, TimeValue.timeValueHours(customResultIndexTtl * 24));
1343
+ deleteOldHistoryIndices (allResultIndicesPattern , TimeValue .timeValueMinutes (1 ));
1344
+
1345
+ }
1346
+ } else {
1347
+ // handle default result index deletion
1348
+ deleteOldHistoryIndices (allResultIndicesPattern , historyRetentionPeriod );
1349
+ }
1349
1350
}
1350
1351
}, exception -> { logger .error ("Fail to roll over result index" , exception ); }));
1351
1352
}
0 commit comments