Skip to content

Commit c8b84a8

Browse files
committed
make index.merge_on_flush.enabled, index.merge_on_flush.max_full_flush_merge_wait_time, index.merge_on_flush.policy, index.check_pending_flush.enabled dynamic
Signed-off-by: kkewwei <kewei.11@bytedance.com> Signed-off-by: kkewwei <kkewwei@163.com>
1 parent 968eafb commit c8b84a8

File tree

4 files changed

+96
-9
lines changed

4 files changed

+96
-9
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4040
- Fix exists queries on nested flat_object fields throws exception ([#16803](https://github.com/opensearch-project/OpenSearch/pull/16803))
4141
- Add highlighting for wildcard search on `match_only_text` field ([#17101](https://github.com/opensearch-project/OpenSearch/pull/17101))
4242
- Fix illegal argument exception when creating a PIT ([#16781](https://github.com/opensearch-project/OpenSearch/pull/16781))
43+
- Make `index.merge_on_flush.enabled`, `index.merge_on_flush.max_full_flush_merge_wait_time`, `index.merge_on_flush.policy`, `index.check_pending_flush.enabled` dynamic ([#17495](https://github.com/opensearch-project/OpenSearch/pull/17495))
4344

4445
### Security
4546

server/src/main/java/org/opensearch/index/IndexSettings.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -648,7 +648,8 @@ public static IndexMergePolicy fromString(String text) {
648648
public static final Setting<Boolean> INDEX_CHECK_PENDING_FLUSH_ENABLED = Setting.boolSetting(
649649
"index.check_pending_flush.enabled",
650650
true,
651-
Property.IndexScope
651+
Property.IndexScope,
652+
Property.Dynamic
652653
);
653654

654655
public static final Setting<String> TIME_SERIES_INDEX_MERGE_POLICY = Setting.simpleString(
@@ -901,7 +902,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
901902
/**
902903
* Is flush check by write threads enabled or not
903904
*/
904-
private final boolean checkPendingFlushEnabled;
905+
private volatile boolean checkPendingFlushEnabled;
905906
/**
906907
* Is fuzzy set enabled for doc id
907908
*/
@@ -1200,6 +1201,11 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
12001201
IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING,
12011202
this::setRemoteStoreTranslogRepository
12021203
);
1204+
scopedSettings.addSettingsUpdateConsumer(INDEX_CHECK_PENDING_FLUSH_ENABLED, this::setCheckPendingFlushEnabled);
1205+
}
1206+
1207+
public void setCheckPendingFlushEnabled(boolean checkPendingFlushEnabled) {
1208+
this.checkPendingFlushEnabled = checkPendingFlushEnabled;
12031209
}
12041210

12051211
private void setSearchIdleAfter(TimeValue searchIdleAfter) {

server/src/main/java/org/opensearch/index/engine/InternalEngine.java

+33-7
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ public class InternalEngine extends Engine {
156156
private volatile long lastDeleteVersionPruneTimeMSec;
157157

158158
protected final TranslogManager translogManager;
159+
private final IndexWriterConfig indexWriterConfig;
159160
protected final IndexWriter indexWriter;
160161
protected final LocalCheckpointTracker localCheckpointTracker;
161162
protected final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
@@ -166,6 +167,7 @@ public class InternalEngine extends Engine {
166167
protected final String historyUUID;
167168

168169
private final OpenSearchConcurrentMergeScheduler mergeScheduler;
170+
private MergePolicy unMergeOnFlushPolicy;
169171
private final ExternalReaderManager externalReaderManager;
170172
private final OpenSearchReaderManager internalReaderManager;
171173

@@ -292,7 +294,8 @@ public void onFailure(String reason, Exception ex) {
292294
translogManager::getLastSyncedGlobalCheckpoint
293295
);
294296
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
295-
writer = createWriter();
297+
this.indexWriterConfig = getIndexWriterConfig();
298+
writer = createWriter(indexWriterConfig);
296299
bootstrapAppendOnlyInfoFromWriter(writer);
297300
final Map<String, String> commitData = commitDataAsMap(writer);
298301
historyUUID = loadHistoryUUID(commitData);
@@ -2303,9 +2306,8 @@ protected final ReferenceManager<OpenSearchDirectoryReader> getReferenceManager(
23032306
}
23042307
}
23052308

2306-
private IndexWriter createWriter() throws IOException {
2309+
private IndexWriter createWriter(IndexWriterConfig iwc) throws IOException {
23072310
try {
2308-
final IndexWriterConfig iwc = getIndexWriterConfig();
23092311
return createWriter(store.directory(), iwc);
23102312
} catch (LockObtainFailedException ex) {
23112313
logger.warn("could not lock IndexWriter", ex);
@@ -2336,26 +2338,27 @@ private IndexWriterConfig getIndexWriterConfig() {
23362338
iwc.setMergeScheduler(mergeScheduler);
23372339
// Give us the opportunity to upgrade old segments while performing
23382340
// background merges
2339-
MergePolicy mergePolicy = config().getMergePolicy();
2341+
this.unMergeOnFlushPolicy = config().getMergePolicy();
23402342
// always configure soft-deletes field so an engine with soft-deletes disabled can open a Lucene index with soft-deletes.
23412343
iwc.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD);
2342-
mergePolicy = new RecoverySourcePruneMergePolicy(
2344+
unMergeOnFlushPolicy = new RecoverySourcePruneMergePolicy(
23432345
SourceFieldMapper.RECOVERY_SOURCE_NAME,
23442346
softDeletesPolicy::getRetentionQuery,
23452347
new SoftDeletesRetentionMergePolicy(
23462348
Lucene.SOFT_DELETES_FIELD,
23472349
softDeletesPolicy::getRetentionQuery,
2348-
new PrunePostingsMergePolicy(mergePolicy, IdFieldMapper.NAME)
2350+
new PrunePostingsMergePolicy(unMergeOnFlushPolicy, IdFieldMapper.NAME)
23492351
)
23502352
);
23512353
boolean shuffleForcedMerge = Booleans.parseBoolean(System.getProperty("opensearch.shuffle_forced_merge", Boolean.TRUE.toString()));
23522354
if (shuffleForcedMerge) {
23532355
// We wrap the merge policy for all indices even though it is mostly useful for time-based indices
23542356
// but there should be no overhead for other type of indices so it's simpler than adding a setting
23552357
// to enable it.
2356-
mergePolicy = new ShuffleForcedMergePolicy(mergePolicy);
2358+
unMergeOnFlushPolicy = new ShuffleForcedMergePolicy(unMergeOnFlushPolicy);
23572359
}
23582360

2361+
MergePolicy mergePolicy = unMergeOnFlushPolicy;
23592362
if (config().getIndexSettings().isMergeOnFlushEnabled()) {
23602363
final long maxFullFlushMergeWaitMillis = config().getIndexSettings().getMaxFullFlushMergeWaitTime().millis();
23612364
if (maxFullFlushMergeWaitMillis > 0) {
@@ -2604,6 +2607,29 @@ public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue tran
26042607
// the setting will be re-interpreted if it's set to true
26052608
updateAutoIdTimestamp(Long.MAX_VALUE, true);
26062609
}
2610+
IndexSettings indexSettings = engineConfig.getIndexSettings();
2611+
if (indexSettings.isCheckPendingFlushEnabled() != indexWriterConfig.isCheckPendingFlushOnUpdate()) {
2612+
indexWriterConfig.setCheckPendingFlushUpdate(indexSettings.isCheckPendingFlushEnabled());
2613+
}
2614+
if (indexSettings.isMergeOnFlushEnabled()) {
2615+
final long maxFullFlushMergeWaitMillis = indexSettings.getMaxFullFlushMergeWaitTime().millis();
2616+
if (maxFullFlushMergeWaitMillis > 0) {
2617+
indexWriterConfig.setMaxFullFlushMergeWaitMillis(maxFullFlushMergeWaitMillis);
2618+
final Optional<UnaryOperator<MergePolicy>> mergeOnFlushPolicy = indexSettings.getMergeOnFlushPolicy();
2619+
if (mergeOnFlushPolicy.isPresent()) {
2620+
indexWriterConfig.setMergePolicy(new OpenSearchMergePolicy(mergeOnFlushPolicy.get().apply(unMergeOnFlushPolicy)));
2621+
}
2622+
} else {
2623+
logger.warn(
2624+
"The {} is enabled but {} is set to 0, merge on flush will not be activated",
2625+
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(),
2626+
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey()
2627+
);
2628+
}
2629+
} else {
2630+
indexWriterConfig.setMaxFullFlushMergeWaitMillis(0);
2631+
indexWriterConfig.setMergePolicy(new OpenSearchMergePolicy(unMergeOnFlushPolicy));
2632+
}
26072633
final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getDeletionPolicy();
26082634
translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis());
26092635
translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes());

server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java

+54
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.apache.lucene.index.Terms;
6868
import org.apache.lucene.index.TermsEnum;
6969
import org.apache.lucene.index.TieredMergePolicy;
70+
import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy;
7071
import org.apache.lucene.search.IndexSearcher;
7172
import org.apache.lucene.search.MatchAllDocsQuery;
7273
import org.apache.lucene.search.ReferenceManager;
@@ -142,6 +143,7 @@
142143
import org.opensearch.index.seqno.RetentionLeases;
143144
import org.opensearch.index.seqno.SeqNoStats;
144145
import org.opensearch.index.seqno.SequenceNumbers;
146+
import org.opensearch.index.shard.OpenSearchMergePolicy;
145147
import org.opensearch.index.shard.ShardUtils;
146148
import org.opensearch.index.store.Store;
147149
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
@@ -6604,6 +6606,58 @@ public void testStressShouldPeriodicallyFlush() throws Exception {
66046606
}
66056607
}
66066608

6609+
public void testMultiSettingsDynamicForMerge() {
6610+
boolean checkPendingFlushEnabled = true;
6611+
boolean mergeOnFlushEnabled = true;
6612+
TimeValue maxFullFlushMergeWaitTime = TimeValue.timeValueSeconds(1);
6613+
6614+
final IndexSettings indexSettings = engine.config().getIndexSettings();
6615+
IndexMetadata indexMetadata = IndexMetadata.builder(indexSettings.getIndexMetadata())
6616+
.settings(
6617+
Settings.builder()
6618+
.put(indexSettings.getSettings())
6619+
.put(IndexSettings.INDEX_CHECK_PENDING_FLUSH_ENABLED.getKey(), checkPendingFlushEnabled)
6620+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), mergeOnFlushEnabled)
6621+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), maxFullFlushMergeWaitTime)
6622+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey(), "merge-on-flush")
6623+
)
6624+
.build();
6625+
indexSettings.updateIndexMetadata(indexMetadata);
6626+
engine.onSettingsChanged(
6627+
indexSettings.getTranslogRetentionAge(),
6628+
indexSettings.getTranslogRetentionSize(),
6629+
indexSettings.getSoftDeleteRetentionOperations()
6630+
);
6631+
6632+
assertEquals(checkPendingFlushEnabled, engine.getCurrentIndexWriterConfig().isCheckPendingFlushOnUpdate());
6633+
assertEquals(maxFullFlushMergeWaitTime.millis(), engine.getCurrentIndexWriterConfig().getMaxFullFlushMergeWaitMillis());
6634+
MergePolicy mergePolicy = engine.getCurrentIndexWriterConfig().getMergePolicy();
6635+
assertTrue(mergePolicy instanceof OpenSearchMergePolicy);
6636+
assertTrue(((OpenSearchMergePolicy) mergePolicy).getDelegate() instanceof MergeOnFlushMergePolicy);
6637+
6638+
mergeOnFlushEnabled = false;
6639+
indexMetadata = IndexMetadata.builder(indexSettings.getIndexMetadata())
6640+
.settings(
6641+
Settings.builder()
6642+
.put(indexSettings.getSettings())
6643+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), mergeOnFlushEnabled)
6644+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), maxFullFlushMergeWaitTime)
6645+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey(), "merge-on-flush")
6646+
)
6647+
.build();
6648+
indexSettings.updateIndexMetadata(indexMetadata);
6649+
engine.onSettingsChanged(
6650+
indexSettings.getTranslogRetentionAge(),
6651+
indexSettings.getTranslogRetentionSize(),
6652+
indexSettings.getSoftDeleteRetentionOperations()
6653+
);
6654+
6655+
assertEquals(0, engine.getCurrentIndexWriterConfig().getMaxFullFlushMergeWaitMillis());
6656+
mergePolicy = engine.getCurrentIndexWriterConfig().getMergePolicy();
6657+
assertTrue(mergePolicy instanceof OpenSearchMergePolicy);
6658+
assertTrue(((OpenSearchMergePolicy) mergePolicy).getDelegate() instanceof ShuffleForcedMergePolicy);
6659+
}
6660+
66076661
public void testStressUpdateSameDocWhileGettingIt() throws IOException, InterruptedException {
66086662
final int iters = randomIntBetween(1, 15);
66096663
for (int i = 0; i < iters; i++) {

0 commit comments

Comments
 (0)