Skip to content

Commit d2bf2f5

Browse files
jed326Jay Deng
authored and
Jay Deng
committed
Support terminate_after force termination for concurrent segment search
Signed-off-by: Jay Deng <jayd0104@gmail.com>
1 parent c178d8e commit d2bf2f5

File tree

8 files changed

+87
-70
lines changed

8 files changed

+87
-70
lines changed

server/src/internalClusterTest/java/org/opensearch/search/simple/ParameterizedSimpleSearchIT.java

+34
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,40 @@ public void testSimpleTerminateAfterCountWithSizeAndTrackHits() throws Exception
308308
assertEquals(0, searchResponse.getFailedShards());
309309
}
310310

311+
public void testSimpleTerminateAfterCount() throws Exception {
312+
prepareCreate("test").setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)).get();
313+
ensureGreen();
314+
int max = randomIntBetween(3, 29);
315+
List<IndexRequestBuilder> docbuilders = new ArrayList<>(max);
316+
317+
for (int i = 1; i <= max; i++) {
318+
String id = String.valueOf(i);
319+
docbuilders.add(client().prepareIndex("test").setId(id).setSource("field", i));
320+
}
321+
322+
indexRandom(true, docbuilders);
323+
ensureGreen();
324+
refresh();
325+
326+
SearchResponse searchResponse;
327+
for (int i = 1; i < max; i++) {
328+
searchResponse = client().prepareSearch("test")
329+
.setQuery(QueryBuilders.rangeQuery("field").gte(1).lte(max))
330+
.setTerminateAfter(i)
331+
.get();
332+
assertHitCount(searchResponse, i);
333+
assertTrue(searchResponse.isTerminatedEarly());
334+
}
335+
336+
searchResponse = client().prepareSearch("test")
337+
.setQuery(QueryBuilders.rangeQuery("field").gte(1).lte(max))
338+
.setTerminateAfter(2 * max)
339+
.get();
340+
341+
assertHitCount(searchResponse, max);
342+
assertFalse(searchResponse.isTerminatedEarly());
343+
}
344+
311345
public void testSimpleIndexSortEarlyTerminate() throws Exception {
312346
prepareCreate("test").setSettings(
313347
Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0).put("index.sort.field", "rank")

server/src/internalClusterTest/java/org/opensearch/search/simple/SimpleSearchIT.java

-60
This file was deleted.

server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.opensearch.search.profile.query.ProfileWeight;
7575
import org.opensearch.search.profile.query.QueryProfiler;
7676
import org.opensearch.search.profile.query.QueryTimingType;
77+
import org.opensearch.search.query.EarlyTerminatingCollector;
7778
import org.opensearch.search.query.QueryPhase;
7879
import org.opensearch.search.query.QuerySearchResult;
7980
import org.opensearch.search.sort.FieldSortBuilder;
@@ -292,7 +293,7 @@ protected void search(List<LeafReaderContext> leaves, Weight weight, Collector c
292293
private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {
293294

294295
// Check if at all we need to call this leaf for collecting results.
295-
if (canMatch(ctx) == false) {
296+
if (canMatch(ctx) == false || searchContext.isTerminatedEarly()) {
296297
return;
297298
}
298299

@@ -310,6 +311,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
310311
// there is no doc of interest in this reader context
311312
// continue with the following leaf
312313
return;
314+
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
315+
searchContext.setTerminatedEarly(true);
316+
return;
313317
} catch (QueryPhase.TimeExceededException e) {
314318
searchContext.setSearchTimedOut(true);
315319
return;
@@ -325,6 +329,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
325329
} catch (CollectionTerminatedException e) {
326330
// collection was terminated prematurely
327331
// continue with the following leaf
332+
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
333+
searchContext.setTerminatedEarly(true);
334+
return;
328335
} catch (QueryPhase.TimeExceededException e) {
329336
searchContext.setSearchTimedOut(true);
330337
return;
@@ -344,6 +351,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
344351
} catch (CollectionTerminatedException e) {
345352
// collection was terminated prematurely
346353
// continue with the following leaf
354+
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
355+
searchContext.setTerminatedEarly(true);
356+
return;
347357
} catch (QueryPhase.TimeExceededException e) {
348358
searchContext.setSearchTimedOut(true);
349359
return;

server/src/main/java/org/opensearch/search/internal/SearchContext.java

+9
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ public List<Aggregator> toAggregators(Collection<Collector> collectors) {
119119
private InnerHitsContext innerHitsContext;
120120

121121
private volatile boolean searchTimedOut;
122+
private volatile boolean terminatedEarly;
122123

123124
protected SearchContext() {}
124125

@@ -136,6 +137,14 @@ public void setSearchTimedOut(boolean searchTimedOut) {
136137
this.searchTimedOut = searchTimedOut;
137138
}
138139

140+
public boolean isTerminatedEarly() {
141+
return this.terminatedEarly;
142+
}
143+
144+
public void setTerminatedEarly(boolean terminatedEarly) {
145+
this.terminatedEarly = terminatedEarly;
146+
}
147+
139148
@Override
140149
public final void close() {
141150
if (closed.compareAndSet(false, true)) {

server/src/main/java/org/opensearch/search/query/ConcurrentQueryPhaseSearcher.java

+3
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ private static boolean searchWithCollectorManager(
9494
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) {
9595
queryResult.terminatedEarly(false);
9696
}
97+
if (searchContext.isTerminatedEarly()) {
98+
queryResult.terminatedEarly(true);
99+
}
97100

98101
return topDocsFactory.shouldRescore();
99102
}

server/src/main/java/org/opensearch/search/query/EarlyTerminatingCollector.java

+18-5
Original file line numberDiff line numberDiff line change
@@ -40,22 +40,27 @@
4040
import org.apache.lucene.search.LeafCollector;
4141

4242
import java.io.IOException;
43+
import java.util.concurrent.atomic.AtomicLong;
4344

4445
/**
4546
* A {@link Collector} that early terminates collection after <code>maxCountHits</code> docs have been collected.
4647
*
4748
* @opensearch.internal
4849
*/
4950
public class EarlyTerminatingCollector extends FilterCollector {
50-
static final class EarlyTerminationException extends RuntimeException {
51+
52+
/**
53+
* Force termination exception
54+
*/
55+
public static final class EarlyTerminationException extends RuntimeException {
5156
EarlyTerminationException(String msg) {
5257
super(msg);
5358
}
5459
}
5560

5661
private final int maxCountHits;
57-
private int numCollected;
58-
private boolean forceTermination;
62+
private final AtomicLong numCollected;
63+
private final boolean forceTermination;
5964
private boolean earlyTerminated;
6065

6166
/**
@@ -69,11 +74,19 @@ static final class EarlyTerminationException extends RuntimeException {
6974
super(delegate);
7075
this.maxCountHits = maxCountHits;
7176
this.forceTermination = forceTermination;
77+
this.numCollected = new AtomicLong();
78+
}
79+
80+
EarlyTerminatingCollector(final Collector delegate, int maxCountHits, boolean forceTermination, AtomicLong numCollected) {
81+
super(delegate);
82+
this.maxCountHits = maxCountHits;
83+
this.forceTermination = forceTermination;
84+
this.numCollected = numCollected;
7285
}
7386

7487
@Override
7588
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
76-
if (numCollected >= maxCountHits) {
89+
if (numCollected.get() >= maxCountHits) {
7790
earlyTerminated = true;
7891
if (forceTermination) {
7992
throw new EarlyTerminationException("early termination [CountBased]");
@@ -84,7 +97,7 @@ public LeafCollector getLeafCollector(LeafReaderContext context) throws IOExcept
8497
return new FilterLeafCollector(super.getLeafCollector(context)) {
8598
@Override
8699
public void collect(int doc) throws IOException {
87-
if (++numCollected > maxCountHits) {
100+
if (numCollected.incrementAndGet() > maxCountHits) {
88101
earlyTerminated = true;
89102
if (forceTermination) {
90103
throw new EarlyTerminationException("early termination [CountBased]");

server/src/main/java/org/opensearch/search/query/EarlyTerminatingCollectorManager.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.ArrayList;
1616
import java.util.Collection;
1717
import java.util.List;
18+
import java.util.concurrent.atomic.AtomicLong;
1819

1920
/**
2021
* Manager for the EarlyTerminatingCollector
@@ -29,16 +30,23 @@ public class EarlyTerminatingCollectorManager<C extends Collector>
2930
private final CollectorManager<C, ReduceableSearchResult> manager;
3031
private final int maxCountHits;
3132
private boolean forceTermination;
33+
private final AtomicLong numCollected;
3234

3335
EarlyTerminatingCollectorManager(CollectorManager<C, ReduceableSearchResult> manager, int maxCountHits, boolean forceTermination) {
3436
this.manager = manager;
3537
this.maxCountHits = maxCountHits;
3638
this.forceTermination = forceTermination;
39+
this.numCollected = new AtomicLong();
3740
}
3841

3942
@Override
4043
public EarlyTerminatingCollector newCollector() throws IOException {
41-
return new EarlyTerminatingCollector(manager.newCollector(), maxCountHits, false /* forced termination is not supported */);
44+
return new EarlyTerminatingCollector(
45+
manager.newCollector(),
46+
maxCountHits,
47+
forceTermination /* forced termination is not supported */,
48+
numCollected
49+
);
4250
}
4351

4452
@SuppressWarnings("unchecked")

server/src/main/java/org/opensearch/search/query/QueryPhase.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q
277277
}
278278

279279
try {
280+
// EarlyTerminationException gets swallowed here?
280281
boolean shouldRescore = queryPhaseSearcher.searchWith(
281282
searchContext,
282283
searcher,
@@ -350,9 +351,8 @@ private static boolean searchWithCollector(
350351
queryCollector = QueryCollectorContext.createQueryCollector(collectors);
351352
}
352353
QuerySearchResult queryResult = searchContext.queryResult();
353-
try {
354-
searcher.search(query, queryCollector);
355-
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
354+
searcher.search(query, queryCollector);
355+
if (searchContext.isTerminatedEarly()) {
356356
queryResult.terminatedEarly(true);
357357
}
358358
if (searchContext.isSearchTimedOut()) {

0 commit comments

Comments
 (0)