@@ -156,6 +156,7 @@ public class InternalEngine extends Engine {
156
156
private volatile long lastDeleteVersionPruneTimeMSec ;
157
157
158
158
protected final TranslogManager translogManager ;
159
+ private final IndexWriterConfig indexWriterConfig ;
159
160
protected final IndexWriter indexWriter ;
160
161
protected final LocalCheckpointTracker localCheckpointTracker ;
161
162
protected final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong (-1 );
@@ -166,6 +167,7 @@ public class InternalEngine extends Engine {
166
167
protected final String historyUUID ;
167
168
168
169
private final OpenSearchConcurrentMergeScheduler mergeScheduler ;
170
+ private MergePolicy unMergeOnFlushPolicy ;
169
171
private final ExternalReaderManager externalReaderManager ;
170
172
private final OpenSearchReaderManager internalReaderManager ;
171
173
@@ -292,7 +294,8 @@ public void onFailure(String reason, Exception ex) {
292
294
translogManager ::getLastSyncedGlobalCheckpoint
293
295
);
294
296
this .localCheckpointTracker = createLocalCheckpointTracker (localCheckpointTrackerSupplier );
295
- writer = createWriter ();
297
+ this .indexWriterConfig = getIndexWriterConfig ();
298
+ writer = createWriter (indexWriterConfig );
296
299
bootstrapAppendOnlyInfoFromWriter (writer );
297
300
final Map <String , String > commitData = commitDataAsMap (writer );
298
301
historyUUID = loadHistoryUUID (commitData );
@@ -2303,9 +2306,8 @@ protected final ReferenceManager<OpenSearchDirectoryReader> getReferenceManager(
2303
2306
}
2304
2307
}
2305
2308
2306
- private IndexWriter createWriter () throws IOException {
2309
+ private IndexWriter createWriter (IndexWriterConfig iwc ) throws IOException {
2307
2310
try {
2308
- final IndexWriterConfig iwc = getIndexWriterConfig ();
2309
2311
return createWriter (store .directory (), iwc );
2310
2312
} catch (LockObtainFailedException ex ) {
2311
2313
logger .warn ("could not lock IndexWriter" , ex );
@@ -2336,26 +2338,27 @@ private IndexWriterConfig getIndexWriterConfig() {
2336
2338
iwc .setMergeScheduler (mergeScheduler );
2337
2339
// Give us the opportunity to upgrade old segments while performing
2338
2340
// background merges
2339
- MergePolicy mergePolicy = config ().getMergePolicy ();
2341
+ this . unMergeOnFlushPolicy = config ().getMergePolicy ();
2340
2342
// always configure soft-deletes field so an engine with soft-deletes disabled can open a Lucene index with soft-deletes.
2341
2343
iwc .setSoftDeletesField (Lucene .SOFT_DELETES_FIELD );
2342
- mergePolicy = new RecoverySourcePruneMergePolicy (
2344
+ unMergeOnFlushPolicy = new RecoverySourcePruneMergePolicy (
2343
2345
SourceFieldMapper .RECOVERY_SOURCE_NAME ,
2344
2346
softDeletesPolicy ::getRetentionQuery ,
2345
2347
new SoftDeletesRetentionMergePolicy (
2346
2348
Lucene .SOFT_DELETES_FIELD ,
2347
2349
softDeletesPolicy ::getRetentionQuery ,
2348
- new PrunePostingsMergePolicy (mergePolicy , IdFieldMapper .NAME )
2350
+ new PrunePostingsMergePolicy (unMergeOnFlushPolicy , IdFieldMapper .NAME )
2349
2351
)
2350
2352
);
2351
2353
boolean shuffleForcedMerge = Booleans .parseBoolean (System .getProperty ("opensearch.shuffle_forced_merge" , Boolean .TRUE .toString ()));
2352
2354
if (shuffleForcedMerge ) {
2353
2355
// We wrap the merge policy for all indices even though it is mostly useful for time-based indices
2354
2356
// but there should be no overhead for other type of indices so it's simpler than adding a setting
2355
2357
// to enable it.
2356
- mergePolicy = new ShuffleForcedMergePolicy (mergePolicy );
2358
+ unMergeOnFlushPolicy = new ShuffleForcedMergePolicy (unMergeOnFlushPolicy );
2357
2359
}
2358
2360
2361
+ MergePolicy mergePolicy = unMergeOnFlushPolicy ;
2359
2362
if (config ().getIndexSettings ().isMergeOnFlushEnabled ()) {
2360
2363
final long maxFullFlushMergeWaitMillis = config ().getIndexSettings ().getMaxFullFlushMergeWaitTime ().millis ();
2361
2364
if (maxFullFlushMergeWaitMillis > 0 ) {
@@ -2604,6 +2607,29 @@ public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue tran
2604
2607
// the setting will be re-interpreted if it's set to true
2605
2608
updateAutoIdTimestamp (Long .MAX_VALUE , true );
2606
2609
}
2610
+ IndexSettings indexSettings = engineConfig .getIndexSettings ();
2611
+ if (indexSettings .isCheckPendingFlushEnabled () != indexWriterConfig .isCheckPendingFlushOnUpdate ()) {
2612
+ indexWriterConfig .setCheckPendingFlushUpdate (indexSettings .isCheckPendingFlushEnabled ());
2613
+ }
2614
+ if (indexSettings .isMergeOnFlushEnabled ()) {
2615
+ final long maxFullFlushMergeWaitMillis = indexSettings .getMaxFullFlushMergeWaitTime ().millis ();
2616
+ if (maxFullFlushMergeWaitMillis > 0 ) {
2617
+ indexWriterConfig .setMaxFullFlushMergeWaitMillis (maxFullFlushMergeWaitMillis );
2618
+ final Optional <UnaryOperator <MergePolicy >> mergeOnFlushPolicy = indexSettings .getMergeOnFlushPolicy ();
2619
+ if (mergeOnFlushPolicy .isPresent ()) {
2620
+ indexWriterConfig .setMergePolicy (new OpenSearchMergePolicy (mergeOnFlushPolicy .get ().apply (unMergeOnFlushPolicy )));
2621
+ }
2622
+ } else {
2623
+ logger .warn (
2624
+ "The {} is enabled but {} is set to 0, merge on flush will not be activated" ,
2625
+ IndexSettings .INDEX_MERGE_ON_FLUSH_ENABLED .getKey (),
2626
+ IndexSettings .INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME .getKey ()
2627
+ );
2628
+ }
2629
+ } else {
2630
+ indexWriterConfig .setMaxFullFlushMergeWaitMillis (0 );
2631
+ indexWriterConfig .setMergePolicy (new OpenSearchMergePolicy (unMergeOnFlushPolicy ));
2632
+ }
2607
2633
final TranslogDeletionPolicy translogDeletionPolicy = translogManager .getDeletionPolicy ();
2608
2634
translogDeletionPolicy .setRetentionAgeInMillis (translogRetentionAge .millis ());
2609
2635
translogDeletionPolicy .setRetentionSizeInBytes (translogRetentionSize .getBytes ());
0 commit comments