Skip to content

Commit f9c239d

Browse files
authored
Filter shards for sliced search at coordinator (opensearch-project#16771)
* Filter shards for sliced search at coordinator Prior to this commit, a sliced search would fan out to every shard, then apply a MatchNoDocsQuery filter on shards that don't correspond to the current slice. This still creates a (useless) search context on each shard for every slice, though. For a long-running sliced scroll, this can quickly exhaust the number of available scroll contexts. This change avoids fanning out to all the shards by checking at the coordinator if a shard is matched by the current slice. This should reduce the number of open scroll contexts to max(numShards, numSlices) instead of numShards * numSlices. --------- Signed-off-by: Michael Froh <froh@amazon.com>
1 parent a609e63 commit f9c239d

File tree

12 files changed

+219
-46
lines changed

12 files changed

+219
-46
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6868
### Changed
6969
- Indexed IP field supports `terms_query` with more than 1025 IP masks [#16391](https://github.com/opensearch-project/OpenSearch/pull/16391)
7070
- Make entries for dependencies from server/build.gradle to gradle version catalog ([#16707](https://github.com/opensearch-project/OpenSearch/pull/16707))
71+
- Sliced search only fans out to shards matched by the selected slice, reducing open search contexts ([#16771](https://github.com/opensearch-project/OpenSearch/pull/16771))
7172
- Allow extended plugins to be optional ([#16909](https://github.com/opensearch-project/OpenSearch/pull/16909))
7273
- Use the correct type to widen the sort fields when merging top docs ([#16881](https://github.com/opensearch-project/OpenSearch/pull/16881))
7374
- Limit reader writer separation to remote store enabled clusters [#16760](https://github.com/opensearch-project/OpenSearch/pull/16760)

rest-api-spec/src/main/resources/rest-api-spec/api/search_shards.json

+3
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@
6262
"default":"open",
6363
"description":"Whether to expand wildcard expression to concrete indices that are open, closed or both."
6464
}
65+
},
66+
"body":{
67+
"description":"The search source (in order to specify slice parameters)"
6568
}
6669
}
6770
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
---
2+
"Search shards with slice specified in body":
3+
- skip:
4+
version: " - 2.99.99"
5+
reason: "Added slice body to search_shards in 2.19"
6+
- do:
7+
indices.create:
8+
index: test_index
9+
body:
10+
settings:
11+
index:
12+
number_of_shards: 7
13+
number_of_replicas: 0
14+
15+
- do:
16+
search_shards:
17+
index: test_index
18+
body:
19+
slice:
20+
id: 0
21+
max: 3
22+
- length: { shards: 3 }
23+
- match: { shards.0.0.index: "test_index" }
24+
- match: { shards.0.0.shard: 0 }
25+
- match: { shards.1.0.shard: 3 }
26+
- match: { shards.2.0.shard: 6 }
27+
28+
- do:
29+
search_shards:
30+
index: test_index
31+
body:
32+
slice:
33+
id: 1
34+
max: 3
35+
- length: { shards: 2 }
36+
- match: { shards.0.0.index: "test_index" }
37+
- match: { shards.0.0.shard: 1 }
38+
- match: { shards.1.0.shard: 4 }
39+
40+
- do:
41+
search_shards:
42+
index: test_index
43+
body:
44+
slice:
45+
id: 2
46+
max: 3
47+
- length: { shards: 2 }
48+
- match: { shards.0.0.index: "test_index" }
49+
- match: { shards.0.0.shard: 2 }
50+
- match: { shards.1.0.shard: 5 }
51+
52+
53+
- do:
54+
search_shards:
55+
index: test_index
56+
preference: "_shards:0,2,4,6"
57+
body:
58+
slice:
59+
id: 0
60+
max: 3
61+
- length: { shards: 2 }
62+
- match: { shards.0.0.index: "test_index" }
63+
- match: { shards.0.0.shard: 0 }
64+
- match: { shards.1.0.shard: 6 }
65+
66+
- do:
67+
search_shards:
68+
index: test_index
69+
preference: "_shards:0,2,4,6"
70+
body:
71+
slice:
72+
id: 1
73+
max: 3
74+
- length: { shards: 1 }
75+
- match: { shards.0.0.index: "test_index" }
76+
- match: { shards.0.0.shard: 2 }
77+
78+
- do:
79+
search_shards:
80+
index: test_index
81+
preference: "_shards:0,2,4,6"
82+
body:
83+
slice:
84+
id: 2
85+
max: 3
86+
- length: { shards: 1 }
87+
- match: { shards.0.0.index: "test_index" }
88+
- match: { shards.0.0.shard: 4 }

server/src/main/java/org/opensearch/action/admin/cluster/shards/ClusterSearchShardsRequest.java

+27-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.action.admin.cluster.shards;
3434

35+
import org.opensearch.Version;
3536
import org.opensearch.action.ActionRequestValidationException;
3637
import org.opensearch.action.IndicesRequest;
3738
import org.opensearch.action.support.IndicesOptions;
@@ -41,6 +42,7 @@
4142
import org.opensearch.core.common.Strings;
4243
import org.opensearch.core.common.io.stream.StreamInput;
4344
import org.opensearch.core.common.io.stream.StreamOutput;
45+
import org.opensearch.search.slice.SliceBuilder;
4446

4547
import java.io.IOException;
4648
import java.util.Objects;
@@ -61,6 +63,8 @@ public class ClusterSearchShardsRequest extends ClusterManagerNodeReadRequest<Cl
6163
@Nullable
6264
private String preference;
6365
private IndicesOptions indicesOptions = IndicesOptions.lenientExpandOpen();
66+
@Nullable
67+
private SliceBuilder sliceBuilder;
6468

6569
public ClusterSearchShardsRequest() {}
6670

@@ -76,6 +80,12 @@ public ClusterSearchShardsRequest(StreamInput in) throws IOException {
7680
preference = in.readOptionalString();
7781

7882
indicesOptions = IndicesOptions.readIndicesOptions(in);
83+
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
84+
boolean hasSlice = in.readBoolean();
85+
if (hasSlice) {
86+
sliceBuilder = new SliceBuilder(in);
87+
}
88+
}
7989
}
8090

8191
@Override
@@ -84,8 +94,15 @@ public void writeTo(StreamOutput out) throws IOException {
8494
out.writeStringArray(indices);
8595
out.writeOptionalString(routing);
8696
out.writeOptionalString(preference);
87-
8897
indicesOptions.writeIndicesOptions(out);
98+
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
99+
if (sliceBuilder != null) {
100+
out.writeBoolean(true);
101+
sliceBuilder.writeTo(out);
102+
} else {
103+
out.writeBoolean(false);
104+
}
105+
}
89106
}
90107

91108
@Override
@@ -166,4 +183,13 @@ public ClusterSearchShardsRequest preference(String preference) {
166183
public String preference() {
167184
return this.preference;
168185
}
186+
187+
public ClusterSearchShardsRequest slice(SliceBuilder sliceBuilder) {
188+
this.sliceBuilder = sliceBuilder;
189+
return this;
190+
}
191+
192+
public SliceBuilder slice() {
193+
return this.sliceBuilder;
194+
}
169195
}

server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ protected void clusterManagerOperation(
133133

134134
Set<String> nodeIds = new HashSet<>();
135135
GroupShardsIterator<ShardIterator> groupShardsIterator = clusterService.operationRouting()
136-
.searchShards(clusterState, concreteIndices, routingMap, request.preference());
136+
.searchShards(clusterState, concreteIndices, routingMap, request.preference(), null, null, request.slice());
137137
ShardRouting shard;
138138
ClusterSearchShardsGroup[] groupResponses = new ClusterSearchShardsGroup[groupShardsIterator.size()];
139139
int currentGroup = 0;

server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,7 @@ private AsyncShardsAction(FieldCapabilitiesIndexRequest request, ActionListener<
247247
throw blockException;
248248
}
249249

250-
shardsIt = clusterService.operationRouting()
251-
.searchShards(clusterService.state(), new String[] { request.index() }, null, null, null, null);
250+
shardsIt = clusterService.operationRouting().searchShards(clusterService.state(), new String[] { request.index() }, null, null);
252251
}
253252

254253
public void start() {

server/src/main/java/org/opensearch/action/search/TransportSearchAction.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
import org.opensearch.search.pipeline.SearchPipelineService;
8686
import org.opensearch.search.profile.ProfileShardResult;
8787
import org.opensearch.search.profile.SearchProfileShardResults;
88+
import org.opensearch.search.slice.SliceBuilder;
8889
import org.opensearch.tasks.CancellableTask;
8990
import org.opensearch.tasks.Task;
9091
import org.opensearch.tasks.TaskResourceTrackingService;
@@ -551,6 +552,7 @@ private ActionListener<SearchSourceBuilder> buildRewriteListener(
551552
);
552553
} else {
553554
AtomicInteger skippedClusters = new AtomicInteger(0);
555+
SliceBuilder slice = searchRequest.source() == null ? null : searchRequest.source().slice();
554556
collectSearchShards(
555557
searchRequest.indicesOptions(),
556558
searchRequest.preference(),
@@ -559,6 +561,7 @@ private ActionListener<SearchSourceBuilder> buildRewriteListener(
559561
remoteClusterIndices,
560562
remoteClusterService,
561563
threadPool,
564+
slice,
562565
ActionListener.wrap(searchShardsResponses -> {
563566
final BiFunction<String, String, DiscoveryNode> clusterNodeLookup = getRemoteClusterNodeLookup(
564567
searchShardsResponses
@@ -787,6 +790,7 @@ static void collectSearchShards(
787790
Map<String, OriginalIndices> remoteIndicesByCluster,
788791
RemoteClusterService remoteClusterService,
789792
ThreadPool threadPool,
793+
SliceBuilder slice,
790794
ActionListener<Map<String, ClusterSearchShardsResponse>> listener
791795
) {
792796
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
@@ -800,7 +804,8 @@ static void collectSearchShards(
800804
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices).indicesOptions(indicesOptions)
801805
.local(true)
802806
.preference(preference)
803-
.routing(routing);
807+
.routing(routing)
808+
.slice(slice);
804809
clusterClient.admin()
805810
.cluster()
806811
.searchShards(
@@ -1042,14 +1047,16 @@ private void executeSearch(
10421047
concreteLocalIndices[i] = indices[i].getName();
10431048
}
10441049
Map<String, Long> nodeSearchCounts = searchTransportService.getPendingSearchRequests();
1050+
SliceBuilder slice = searchRequest.source() == null ? null : searchRequest.source().slice();
10451051
GroupShardsIterator<ShardIterator> localShardRoutings = clusterService.operationRouting()
10461052
.searchShards(
10471053
clusterState,
10481054
concreteLocalIndices,
10491055
routingMap,
10501056
searchRequest.preference(),
10511057
searchService.getResponseCollectorService(),
1052-
nodeSearchCounts
1058+
nodeSearchCounts,
1059+
slice
10531060
);
10541061
localShardIterators = StreamSupport.stream(localShardRoutings.spliterator(), false)
10551062
.map(it -> new SearchShardIterator(searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices))

server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java

+34-5
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.cluster.routing;
3434

35+
import org.apache.lucene.util.CollectionUtil;
3536
import org.opensearch.cluster.ClusterState;
3637
import org.opensearch.cluster.metadata.IndexMetadata;
3738
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
@@ -44,14 +45,17 @@
4445
import org.opensearch.common.settings.Settings;
4546
import org.opensearch.common.util.FeatureFlags;
4647
import org.opensearch.core.common.Strings;
48+
import org.opensearch.core.index.Index;
4749
import org.opensearch.core.index.shard.ShardId;
4850
import org.opensearch.index.IndexModule;
4951
import org.opensearch.index.IndexNotFoundException;
5052
import org.opensearch.node.ResponseCollectorService;
53+
import org.opensearch.search.slice.SliceBuilder;
5154

5255
import java.util.ArrayList;
5356
import java.util.Arrays;
5457
import java.util.Collections;
58+
import java.util.HashMap;
5559
import java.util.HashSet;
5660
import java.util.List;
5761
import java.util.Map;
@@ -230,7 +234,7 @@ public GroupShardsIterator<ShardIterator> searchShards(
230234
@Nullable Map<String, Set<String>> routing,
231235
@Nullable String preference
232236
) {
233-
return searchShards(clusterState, concreteIndices, routing, preference, null, null);
237+
return searchShards(clusterState, concreteIndices, routing, preference, null, null, null);
234238
}
235239

236240
public GroupShardsIterator<ShardIterator> searchShards(
@@ -239,11 +243,14 @@ public GroupShardsIterator<ShardIterator> searchShards(
239243
@Nullable Map<String, Set<String>> routing,
240244
@Nullable String preference,
241245
@Nullable ResponseCollectorService collectorService,
242-
@Nullable Map<String, Long> nodeCounts
246+
@Nullable Map<String, Long> nodeCounts,
247+
@Nullable SliceBuilder slice
243248
) {
244249
final Set<IndexShardRoutingTable> shards = computeTargetedShards(clusterState, concreteIndices, routing);
245-
final Set<ShardIterator> set = new HashSet<>(shards.size());
250+
251+
Map<Index, List<ShardIterator>> shardIterators = new HashMap<>();
246252
for (IndexShardRoutingTable shard : shards) {
253+
247254
IndexMetadata indexMetadataForShard = indexMetadata(clusterState, shard.shardId.getIndex().getName());
248255
if (indexMetadataForShard.isRemoteSnapshot() && (preference == null || preference.isEmpty())) {
249256
preference = Preference.PRIMARY.type();
@@ -274,10 +281,31 @@ public GroupShardsIterator<ShardIterator> searchShards(
274281
clusterState.metadata().weightedRoutingMetadata()
275282
);
276283
if (iterator != null) {
277-
set.add(iterator);
284+
shardIterators.computeIfAbsent(iterator.shardId().getIndex(), k -> new ArrayList<>()).add(iterator);
285+
}
286+
}
287+
List<ShardIterator> allShardIterators = new ArrayList<>();
288+
if (slice != null) {
289+
for (List<ShardIterator> indexIterators : shardIterators.values()) {
290+
// Filter the returned shards for the given slice
291+
CollectionUtil.timSort(indexIterators);
292+
// We use the ordinal of the iterator in the group (after sorting) rather than the shard id, because
293+
// computeTargetedShards may return a subset of shards for an index, if a routing parameter was
294+
// specified. In that case, the set of routable shards is considered the full universe of available
295+
// shards for each index, when mapping shards to slices. If no routing parameter was specified,
296+
// then ordinals and shard IDs are the same. This mimics the logic in
297+
// org.opensearch.search.slice.SliceBuilder.toFilter.
298+
for (int i = 0; i < indexIterators.size(); i++) {
299+
if (slice.shardMatches(i, indexIterators.size())) {
300+
allShardIterators.add(indexIterators.get(i));
301+
}
302+
}
278303
}
304+
} else {
305+
shardIterators.values().forEach(allShardIterators::addAll);
279306
}
280-
return GroupShardsIterator.sortAndCreate(new ArrayList<>(set));
307+
308+
return GroupShardsIterator.sortAndCreate(allShardIterators);
281309
}
282310

283311
public static ShardIterator getShards(ClusterState clusterState, ShardId shardId) {
@@ -311,6 +339,7 @@ private Set<IndexShardRoutingTable> computeTargetedShards(
311339
set.add(indexShard);
312340
}
313341
}
342+
314343
}
315344
return set;
316345
}

server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterSearchShardsAction.java

+8
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.opensearch.rest.BaseRestHandler;
4141
import org.opensearch.rest.RestRequest;
4242
import org.opensearch.rest.action.RestToXContentListener;
43+
import org.opensearch.search.builder.SearchSourceBuilder;
4344

4445
import java.io.IOException;
4546
import java.util.List;
@@ -81,6 +82,13 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
8182
clusterSearchShardsRequest.routing(request.param("routing"));
8283
clusterSearchShardsRequest.preference(request.param("preference"));
8384
clusterSearchShardsRequest.indicesOptions(IndicesOptions.fromRequest(request, clusterSearchShardsRequest.indicesOptions()));
85+
if (request.hasContentOrSourceParam()) {
86+
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
87+
sourceBuilder.parseXContent(request.contentOrSourceParamParser());
88+
if (sourceBuilder.slice() != null) {
89+
clusterSearchShardsRequest.slice(sourceBuilder.slice());
90+
}
91+
}
8492
return channel -> client.admin().cluster().searchShards(clusterSearchShardsRequest, new RestToXContentListener<>(channel));
8593
}
8694
}

0 commit comments

Comments
 (0)