Skip to content

Commit e6eec36

Browse files
authored
Force merge API supports performing only on primary shards (opensearch-project#11269)
* Force merge API supports performing on primary shards only Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Modify change log Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Fix test failure Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Fix typo Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Modify skip version Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Add version check and more tests Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Format code Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Modify supported version and add more test Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Change the supported version to 3.0.0 Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Add test case in SegmentReplicationIT Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Optimize the test code Signed-off-by: Gao Binlong <gbinlong@amazon.com> --------- Signed-off-by: Gao Binlong <gbinlong@amazon.com>
1 parent a702f6a commit e6eec36

File tree

13 files changed

+260
-8
lines changed

13 files changed

+260
-8
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
104104
- Add toString methods to MultiSearchRequest, MultiGetRequest and CreateIndexRequest ([#12163](https://github.com/opensearch-project/OpenSearch/pull/12163))
105105
- Support for returning scores in matched queries ([#11626](https://github.com/opensearch-project/OpenSearch/pull/11626))
106106
- Add shard id property to SearchLookup for use in field types provided by plugins ([#1063](https://github.com/opensearch-project/OpenSearch/pull/1063))
107+
- Force merge API supports performing on primary shards only ([#11269](https://github.com/opensearch-project/OpenSearch/pull/11269))
107108
- [Tiered caching] Make IndicesRequestCache implementation configurable [EXPERIMENTAL] ([#12533](https://github.com/opensearch-project/OpenSearch/pull/12533))
108109
- Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835))
109110
- The org.opensearch.bootstrap.Security should support codebase for JAR files with classifiers ([#12586](https://github.com/opensearch-project/OpenSearch/issues/12586))

rest-api-spec/src/main/resources/rest-api-spec/api/indices.forcemerge.json

+4
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@
6363
"wait_for_completion": {
6464
"type" : "boolean",
6565
"description" : "If false, the request will return a task immediately and the operation will run in background. Defaults to true."
66+
},
67+
"primary_only": {
68+
"type" : "boolean",
69+
"description" : "Specify whether the operation should only perform on primary shards. Defaults to false."
6670
}
6771
}
6872
}

rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/10_basic.yml

+20
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,23 @@
2727
index: test
2828
max_num_segments: 10
2929
only_expunge_deletes: true
30+
31+
---
32+
"Test primary_only parameter":
33+
- skip:
34+
version: " - 2.99.99"
35+
reason: "primary_only is available in 3.0+"
36+
37+
- do:
38+
indices.create:
39+
index: test
40+
body:
41+
settings:
42+
index.number_of_shards: 2
43+
index.number_of_replicas: 1
44+
45+
- do:
46+
indices.forcemerge:
47+
index: test
48+
primary_only: true
49+
- match: { _shards.total: 2 }

rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/20_wait_for_completion.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
wait_for_completion: true
2626
task_id: $taskId
2727
- match: { task.action: "indices:admin/forcemerge" }
28-
- match: { task.description: "Force-merge indices [test_index], maxSegments[1], onlyExpungeDeletes[false], flush[true]" }
28+
- match: { task.description: "Force-merge indices [test_index], maxSegments[1], onlyExpungeDeletes[false], flush[true], primaryOnly[false]" }
2929

3030
# .tasks index is created when the force-merge operation completes, so we should delete .tasks index finally,
3131
# if not, the .tasks index may introduce unexpected warnings and then cause other test cases to fail.

server/src/internalClusterTest/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeIT.java

+18
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,24 @@ public void testForceMergeUUIDConsistent() throws IOException {
100100
assertThat(primaryForceMergeUUID, is(replicaForceMergeUUID));
101101
}
102102

103+
public void testForceMergeOnlyOnPrimaryShards() throws IOException {
104+
internalCluster().ensureAtLeastNumDataNodes(2);
105+
final String index = "test-index";
106+
createIndex(
107+
index,
108+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
109+
);
110+
ensureGreen(index);
111+
final ForceMergeResponse forceMergeResponse = client().admin()
112+
.indices()
113+
.prepareForceMerge(index)
114+
.setMaxNumSegments(1)
115+
.setPrimaryOnly(true)
116+
.get();
117+
assertThat(forceMergeResponse.getFailedShards(), is(0));
118+
assertThat(forceMergeResponse.getSuccessfulShards(), is(1));
119+
}
120+
103121
private static String getForceMergeUUID(IndexShard indexShard) throws IOException {
104122
try (GatedCloseable<IndexCommit> wrappedIndexCommit = indexShard.acquireLastIndexCommit(true)) {
105123
return wrappedIndexCommit.get().getUserData().get(Engine.FORCE_MERGE_UUID_KEY);

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java

+19-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse;
2626
import org.opensearch.action.admin.indices.alias.Alias;
2727
import org.opensearch.action.admin.indices.flush.FlushRequest;
28+
import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse;
2829
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
2930
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
3031
import org.opensearch.action.get.GetResponse;
@@ -400,6 +401,14 @@ public void testMultipleShards() throws Exception {
400401
}
401402

402403
public void testReplicationAfterForceMerge() throws Exception {
404+
performReplicationAfterForceMerge(false, SHARD_COUNT * (1 + REPLICA_COUNT));
405+
}
406+
407+
public void testReplicationAfterForceMergeOnPrimaryShardsOnly() throws Exception {
408+
performReplicationAfterForceMerge(true, SHARD_COUNT);
409+
}
410+
411+
private void performReplicationAfterForceMerge(boolean primaryOnly, int expectedSuccessfulShards) throws Exception {
403412
final String nodeA = internalCluster().startDataOnlyNode();
404413
final String nodeB = internalCluster().startDataOnlyNode();
405414
createIndex(INDEX_NAME);
@@ -430,8 +439,16 @@ public void testReplicationAfterForceMerge() throws Exception {
430439
waitForDocs(expectedHitCount, indexer);
431440
waitForSearchableDocs(expectedHitCount, nodeA, nodeB);
432441

433-
// Force a merge here so that the in memory SegmentInfos does not reference old segments on disk.
434-
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();
442+
// Perform force merge only on the primary shards.
443+
final ForceMergeResponse forceMergeResponse = client().admin()
444+
.indices()
445+
.prepareForceMerge(INDEX_NAME)
446+
.setPrimaryOnly(primaryOnly)
447+
.setMaxNumSegments(1)
448+
.setFlush(false)
449+
.get();
450+
assertThat(forceMergeResponse.getFailedShards(), is(0));
451+
assertThat(forceMergeResponse.getSuccessfulShards(), is(expectedSuccessfulShards));
435452
refresh(INDEX_NAME);
436453
verifyStoreContent();
437454
}

server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java

+27
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,13 @@ public static final class Defaults {
6969
public static final int MAX_NUM_SEGMENTS = -1;
7070
public static final boolean ONLY_EXPUNGE_DELETES = false;
7171
public static final boolean FLUSH = true;
72+
public static final boolean PRIMARY_ONLY = false;
7273
}
7374

7475
private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS;
7576
private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES;
7677
private boolean flush = Defaults.FLUSH;
78+
private boolean primaryOnly = Defaults.PRIMARY_ONLY;
7779

7880
private static final Version FORCE_MERGE_UUID_VERSION = Version.V_3_0_0;
7981

@@ -100,6 +102,9 @@ public ForceMergeRequest(StreamInput in) throws IOException {
100102
maxNumSegments = in.readInt();
101103
onlyExpungeDeletes = in.readBoolean();
102104
flush = in.readBoolean();
105+
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
106+
primaryOnly = in.readBoolean();
107+
}
103108
if (in.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) {
104109
forceMergeUUID = in.readString();
105110
} else if ((forceMergeUUID = in.readOptionalString()) == null) {
@@ -166,6 +171,21 @@ public ForceMergeRequest flush(boolean flush) {
166171
return this;
167172
}
168173

174+
/**
175+
* Should force merge only performed on primary shards. Defaults to {@code false}.
176+
*/
177+
public boolean primaryOnly() {
178+
return primaryOnly;
179+
}
180+
181+
/**
182+
* Should force merge only performed on primary shards. Defaults to {@code false}.
183+
*/
184+
public ForceMergeRequest primaryOnly(boolean primaryOnly) {
185+
this.primaryOnly = primaryOnly;
186+
return this;
187+
}
188+
169189
/**
170190
* Should this task store its result after it has finished?
171191
*/
@@ -188,6 +208,8 @@ public String getDescription() {
188208
+ onlyExpungeDeletes
189209
+ "], flush["
190210
+ flush
211+
+ "], primaryOnly["
212+
+ primaryOnly
191213
+ "]";
192214
}
193215

@@ -197,6 +219,9 @@ public void writeTo(StreamOutput out) throws IOException {
197219
out.writeInt(maxNumSegments);
198220
out.writeBoolean(onlyExpungeDeletes);
199221
out.writeBoolean(flush);
222+
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
223+
out.writeBoolean(primaryOnly);
224+
}
200225
if (out.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) {
201226
out.writeString(forceMergeUUID);
202227
} else {
@@ -213,6 +238,8 @@ public String toString() {
213238
+ onlyExpungeDeletes
214239
+ ", flush="
215240
+ flush
241+
+ ", primaryOnly="
242+
+ primaryOnly
216243
+ '}';
217244
}
218245
}

server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestBuilder.java

+8
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,12 @@ public ForceMergeRequestBuilder setFlush(boolean flush) {
8181
request.flush(flush);
8282
return this;
8383
}
84+
85+
/**
86+
* Should force merge only performed on primary shards. Defaults to {@code false}.
87+
*/
88+
public ForceMergeRequestBuilder setPrimaryOnly(boolean primaryOnly) {
89+
request.primaryOnly(primaryOnly);
90+
return this;
91+
}
8492
}

server/src/main/java/org/opensearch/action/admin/indices/forcemerge/TransportForceMergeAction.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,16 @@ protected EmptyResult shardOperation(ForceMergeRequest request, ShardRouting sha
115115
}
116116

117117
/**
118-
* The refresh request works against *all* shards.
118+
* The force merge request works against *all* shards by default, but it can work against all primary shards only
119+
* by setting primary_only to true.
119120
*/
120121
@Override
121122
protected ShardsIterator shards(ClusterState clusterState, ForceMergeRequest request, String[] concreteIndices) {
122-
return clusterState.routingTable().allShards(concreteIndices);
123+
if (request.primaryOnly()) {
124+
return clusterState.routingTable().allShardsSatisfyingPredicate(concreteIndices, ShardRouting::primary);
125+
} else {
126+
return clusterState.routingTable().allShards(concreteIndices);
127+
}
123128
}
124129

125130
@Override

server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java

+10
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,16 @@ public ShardsIterator allShardsSatisfyingPredicate(Predicate<ShardRouting> predi
307307
return allShardsSatisfyingPredicate(indices, predicate, false);
308308
}
309309

310+
/**
311+
* All the shards for the provided indices on the node which match the predicate
312+
* @param indices indices to return all the shards.
313+
* @param predicate condition to match
314+
* @return iterator over shards matching the predicate for the specific indices
315+
*/
316+
public ShardsIterator allShardsSatisfyingPredicate(String[] indices, Predicate<ShardRouting> predicate) {
317+
return allShardsSatisfyingPredicate(indices, predicate, false);
318+
}
319+
310320
private ShardsIterator allShardsSatisfyingPredicate(
311321
String[] indices,
312322
Predicate<ShardRouting> predicate,

server/src/main/java/org/opensearch/rest/action/admin/indices/RestForceMergeAction.java

+1
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
7676
mergeRequest.maxNumSegments(request.paramAsInt("max_num_segments", mergeRequest.maxNumSegments()));
7777
mergeRequest.onlyExpungeDeletes(request.paramAsBoolean("only_expunge_deletes", mergeRequest.onlyExpungeDeletes()));
7878
mergeRequest.flush(request.paramAsBoolean("flush", mergeRequest.flush()));
79+
mergeRequest.primaryOnly(request.paramAsBoolean("primary_only", mergeRequest.primaryOnly()));
7980
if (mergeRequest.onlyExpungeDeletes() && mergeRequest.maxNumSegments() != ForceMergeRequest.Defaults.MAX_NUM_SEGMENTS) {
8081
deprecationLogger.deprecate(
8182
"force_merge_expunge_deletes_and_max_num_segments_deprecation",

server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java

+119-3
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,137 @@
3131

3232
package org.opensearch.action.admin.indices.forcemerge;
3333

34+
import org.opensearch.Version;
35+
import org.opensearch.common.io.stream.BytesStreamOutput;
36+
import org.opensearch.core.common.io.stream.StreamInput;
3437
import org.opensearch.test.OpenSearchTestCase;
38+
import org.opensearch.test.VersionUtils;
3539

3640
public class ForceMergeRequestTests extends OpenSearchTestCase {
3741

3842
public void testDescription() {
3943
ForceMergeRequest request = new ForceMergeRequest();
40-
assertEquals("Force-merge indices [], maxSegments[-1], onlyExpungeDeletes[false], flush[true]", request.getDescription());
44+
assertEquals(
45+
"Force-merge indices [], maxSegments[-1], onlyExpungeDeletes[false], flush[true], primaryOnly[false]",
46+
request.getDescription()
47+
);
4148

4249
request = new ForceMergeRequest("shop", "blog");
43-
assertEquals("Force-merge indices [shop, blog], maxSegments[-1], onlyExpungeDeletes[false], flush[true]", request.getDescription());
50+
assertEquals(
51+
"Force-merge indices [shop, blog], maxSegments[-1], onlyExpungeDeletes[false], flush[true], primaryOnly[false]",
52+
request.getDescription()
53+
);
4454

4555
request = new ForceMergeRequest();
4656
request.maxNumSegments(12);
4757
request.onlyExpungeDeletes(true);
4858
request.flush(false);
49-
assertEquals("Force-merge indices [], maxSegments[12], onlyExpungeDeletes[true], flush[false]", request.getDescription());
59+
request.primaryOnly(true);
60+
assertEquals(
61+
"Force-merge indices [], maxSegments[12], onlyExpungeDeletes[true], flush[false], primaryOnly[true]",
62+
request.getDescription()
63+
);
64+
}
65+
66+
public void testToString() {
67+
ForceMergeRequest request = new ForceMergeRequest();
68+
assertEquals("ForceMergeRequest{maxNumSegments=-1, onlyExpungeDeletes=false, flush=true, primaryOnly=false}", request.toString());
69+
70+
request = new ForceMergeRequest();
71+
request.maxNumSegments(12);
72+
request.onlyExpungeDeletes(true);
73+
request.flush(false);
74+
request.primaryOnly(true);
75+
assertEquals("ForceMergeRequest{maxNumSegments=12, onlyExpungeDeletes=true, flush=false, primaryOnly=true}", request.toString());
76+
}
77+
78+
public void testSerialization() throws Exception {
79+
final ForceMergeRequest request = randomRequest();
80+
try (BytesStreamOutput out = new BytesStreamOutput()) {
81+
request.writeTo(out);
82+
83+
final ForceMergeRequest deserializedRequest;
84+
try (StreamInput in = out.bytes().streamInput()) {
85+
deserializedRequest = new ForceMergeRequest(in);
86+
}
87+
assertEquals(request.maxNumSegments(), deserializedRequest.maxNumSegments());
88+
assertEquals(request.onlyExpungeDeletes(), deserializedRequest.onlyExpungeDeletes());
89+
assertEquals(request.flush(), deserializedRequest.flush());
90+
assertEquals(request.primaryOnly(), deserializedRequest.primaryOnly());
91+
assertEquals(request.forceMergeUUID(), deserializedRequest.forceMergeUUID());
92+
}
93+
}
94+
95+
public void testBwcSerialization() throws Exception {
96+
{
97+
final ForceMergeRequest sample = randomRequest();
98+
final Version compatibleVersion = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT);
99+
try (BytesStreamOutput out = new BytesStreamOutput()) {
100+
out.setVersion(compatibleVersion);
101+
sample.writeTo(out);
102+
103+
final ForceMergeRequest deserializedRequest;
104+
try (StreamInput in = out.bytes().streamInput()) {
105+
in.setVersion(Version.CURRENT);
106+
deserializedRequest = new ForceMergeRequest(in);
107+
}
108+
109+
assertEquals(sample.maxNumSegments(), deserializedRequest.maxNumSegments());
110+
assertEquals(sample.onlyExpungeDeletes(), deserializedRequest.onlyExpungeDeletes());
111+
assertEquals(sample.flush(), deserializedRequest.flush());
112+
if (compatibleVersion.onOrAfter(Version.V_3_0_0)) {
113+
assertEquals(sample.primaryOnly(), deserializedRequest.primaryOnly());
114+
assertEquals(sample.forceMergeUUID(), deserializedRequest.forceMergeUUID());
115+
}
116+
}
117+
}
118+
119+
{
120+
final ForceMergeRequest sample = randomRequest();
121+
final Version compatibleVersion = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT);
122+
try (BytesStreamOutput out = new BytesStreamOutput()) {
123+
out.setVersion(Version.CURRENT);
124+
sample.getParentTask().writeTo(out);
125+
out.writeStringArray(sample.indices());
126+
sample.indicesOptions().writeIndicesOptions(out);
127+
out.writeInt(sample.maxNumSegments());
128+
out.writeBoolean(sample.onlyExpungeDeletes());
129+
out.writeBoolean(sample.flush());
130+
if (compatibleVersion.onOrAfter(Version.V_3_0_0)) {
131+
out.writeBoolean(sample.primaryOnly());
132+
}
133+
if (compatibleVersion.onOrAfter(Version.V_3_0_0)) {
134+
out.writeString(sample.forceMergeUUID());
135+
} else {
136+
out.writeOptionalString(sample.forceMergeUUID());
137+
}
138+
139+
final ForceMergeRequest deserializedRequest;
140+
try (StreamInput in = out.bytes().streamInput()) {
141+
in.setVersion(compatibleVersion);
142+
deserializedRequest = new ForceMergeRequest(in);
143+
}
144+
145+
assertEquals(sample.maxNumSegments(), deserializedRequest.maxNumSegments());
146+
assertEquals(sample.onlyExpungeDeletes(), deserializedRequest.onlyExpungeDeletes());
147+
assertEquals(sample.flush(), deserializedRequest.flush());
148+
if (compatibleVersion.onOrAfter(Version.V_3_0_0)) {
149+
assertEquals(sample.primaryOnly(), deserializedRequest.primaryOnly());
150+
}
151+
assertEquals(sample.forceMergeUUID(), deserializedRequest.forceMergeUUID());
152+
153+
}
154+
}
155+
}
156+
157+
private ForceMergeRequest randomRequest() {
158+
ForceMergeRequest request = new ForceMergeRequest();
159+
if (randomBoolean()) {
160+
request.maxNumSegments(randomIntBetween(1, 10));
161+
}
162+
request.onlyExpungeDeletes(true);
163+
request.flush(randomBoolean());
164+
request.primaryOnly(randomBoolean());
165+
return request;
50166
}
51167
}

0 commit comments

Comments
 (0)