Skip to content

Commit ccbddc0

Browse files
jed326Jay Deng
authored and
Jay Deng
committed
Add slice level operation listener methods
Signed-off-by: Jay Deng <jayd0104@gmail.com>
1 parent f03dde9 commit ccbddc0

File tree

10 files changed

+217
-12
lines changed

10 files changed

+217
-12
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1616
- Add `rangeQuery` and `regexpQuery` for `constant_keyword` field type ([#14711](https://github.com/opensearch-project/OpenSearch/pull/14711))
1717
- Add took time to request nodes stats ([#15054](https://github.com/opensearch-project/OpenSearch/pull/15054))
1818
- [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897))
19+
- Add slice execution listeners to SearchOperationListener interface ([#15153](https://github.com/opensearch-project/OpenSearch/pull/15153))
1920

2021
### Dependencies
2122
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))

server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java

+61
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
import org.apache.logging.log4j.Logger;
3535
import org.apache.logging.log4j.message.ParameterizedMessage;
36+
import org.apache.lucene.index.LeafReaderContext;
3637
import org.opensearch.ExceptionsHelper;
3738
import org.opensearch.common.annotation.PublicApi;
3839
import org.opensearch.search.internal.ReaderContext;
@@ -71,6 +72,33 @@ default void onFailedQueryPhase(SearchContext searchContext) {}
7172
*/
7273
default void onQueryPhase(SearchContext searchContext, long tookInNanos) {}
7374

75+
/**
76+
* Executed before the slice execution in
77+
* {@link org.opensearch.search.internal.ContextIndexSearcher#search(List, org.apache.lucene.search.Weight, org.apache.lucene.search.Collector)}.
78+
* This will be called once per segment slice in concurrent search and only once in non-concurrent search.
79+
* @param searchContext the current search context
80+
*/
81+
default void onPreSliceExecution(SearchContext searchContext, List<LeafReaderContext> leaves) {}
82+
83+
/**
84+
* Executed if the slice execution in
85+
* {@link org.opensearch.search.internal.ContextIndexSearcher#search(List, org.apache.lucene.search.Weight, org.apache.lucene.search.Collector)} failed.
86+
* This will be called once per segment slice in concurrent search and only once in non-concurrent search.
87+
* @param searchContext the current search context
88+
*/
89+
default void onFailedSliceExecution(SearchContext searchContext, List<LeafReaderContext> leaves) {}
90+
91+
/**
92+
* Executed after the slice execution in
93+
* {@link org.opensearch.search.internal.ContextIndexSearcher#search(List, org.apache.lucene.search.Weight, org.apache.lucene.search.Collector)} successfully finished.
94+
* This will be called once per segment slice in concurrent search and only once in non-concurrent search.
95+
* Note: this is not invoked if the slice execution failed.
96+
* @param searchContext the current search context
97+
*
98+
* @see #onFailedSliceExecution(SearchContext, List)
99+
*/
100+
default void onSliceExecution(SearchContext searchContext, List<LeafReaderContext> leaves) {}
101+
74102
/**
75103
* Executed before the fetch phase is executed
76104
* @param searchContext the current search context
@@ -195,6 +223,39 @@ public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
195223
}
196224
}
197225

226+
@Override
227+
public void onPreSliceExecution(SearchContext searchContext, List<LeafReaderContext> leaves) {
228+
for (SearchOperationListener listener : listeners) {
229+
try {
230+
listener.onPreSliceExecution(searchContext, leaves);
231+
} catch (Exception e) {
232+
logger.warn(() -> new ParameterizedMessage("onPreSliceExecution listener [{}] failed", listener), e);
233+
}
234+
}
235+
}
236+
237+
@Override
238+
public void onFailedSliceExecution(SearchContext searchContext, List<LeafReaderContext> leaves) {
239+
for (SearchOperationListener listener : listeners) {
240+
try {
241+
listener.onFailedSliceExecution(searchContext, leaves);
242+
} catch (Exception e) {
243+
logger.warn(() -> new ParameterizedMessage("onFailedSliceExecution listener [{}] failed", listener), e);
244+
}
245+
}
246+
}
247+
248+
@Override
249+
public void onSliceExecution(SearchContext searchContext, List<LeafReaderContext> leaves) {
250+
for (SearchOperationListener listener : listeners) {
251+
try {
252+
listener.onSliceExecution(searchContext, leaves);
253+
} catch (Exception e) {
254+
logger.warn(() -> new ParameterizedMessage("onSliceExecution listener [{}] failed", listener), e);
255+
}
256+
}
257+
}
258+
198259
@Override
199260
public void onPreFetchPhase(SearchContext searchContext) {
200261
for (SearchOperationListener listener : listeners) {

server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java

+19-12
Original file line numberDiff line numberDiff line change
@@ -270,20 +270,27 @@ public void search(
270270

271271
@Override
272272
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
273-
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
274-
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
275-
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
276-
// reader order here.
277-
if (searchContext.shouldUseTimeSeriesDescSortOptimization()) {
278-
for (int i = leaves.size() - 1; i >= 0; i--) {
279-
searchLeaf(leaves.get(i), weight, collector);
280-
}
281-
} else {
282-
for (int i = 0; i < leaves.size(); i++) {
283-
searchLeaf(leaves.get(i), weight, collector);
273+
searchContext.indexShard().getSearchOperationListener().onPreSliceExecution(searchContext, leaves);
274+
try {
275+
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
276+
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
277+
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
278+
// reader order here.
279+
if (searchContext.shouldUseTimeSeriesDescSortOptimization()) {
280+
for (int i = leaves.size() - 1; i >= 0; i--) {
281+
searchLeaf(leaves.get(i), weight, collector);
282+
}
283+
} else {
284+
for (int i = 0; i < leaves.size(); i++) {
285+
searchLeaf(leaves.get(i), weight, collector);
286+
}
284287
}
288+
searchContext.bucketCollectorProcessor().processPostCollection(collector);
289+
} catch (Throwable t) {
290+
searchContext.indexShard().getSearchOperationListener().onFailedSliceExecution(searchContext, leaves);
291+
throw t;
285292
}
286-
searchContext.bucketCollectorProcessor().processPostCollection(collector);
293+
searchContext.indexShard().getSearchOperationListener().onSliceExecution(searchContext, leaves);
287294
}
288295

289296
/**

server/src/test/java/org/opensearch/index/shard/SearchOperationListenerTests.java

+106
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
package org.opensearch.index.shard;
3333

34+
import org.apache.lucene.index.LeafReaderContext;
3435
import org.opensearch.search.internal.ReaderContext;
3536
import org.opensearch.search.internal.SearchContext;
3637
import org.opensearch.test.OpenSearchTestCase;
@@ -56,6 +57,9 @@ public void testListenersAreExecuted() {
5657
AtomicInteger preQuery = new AtomicInteger();
5758
AtomicInteger failedQuery = new AtomicInteger();
5859
AtomicInteger onQuery = new AtomicInteger();
60+
AtomicInteger preSlice = new AtomicInteger();
61+
AtomicInteger failedSlice = new AtomicInteger();
62+
AtomicInteger onSlice = new AtomicInteger();
5963
AtomicInteger onFetch = new AtomicInteger();
6064
AtomicInteger preFetch = new AtomicInteger();
6165
AtomicInteger failedFetch = new AtomicInteger();
@@ -86,6 +90,24 @@ public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
8690
onQuery.incrementAndGet();
8791
}
8892

93+
@Override
94+
public void onPreSliceExecution(SearchContext searchContext, List<LeafReaderContext> leaves) {
95+
assertNotNull(searchContext);
96+
preSlice.incrementAndGet();
97+
}
98+
99+
@Override
100+
public void onFailedSliceExecution(SearchContext searchContext, List<LeafReaderContext> leaves) {
101+
assertNotNull(searchContext);
102+
failedSlice.incrementAndGet();
103+
}
104+
105+
@Override
106+
public void onSliceExecution(SearchContext searchContext, List<LeafReaderContext> leaves) {
107+
assertNotNull(searchContext);
108+
onSlice.incrementAndGet();
109+
}
110+
89111
@Override
90112
public void onPreFetchPhase(SearchContext searchContext) {
91113
assertNotNull(searchContext);
@@ -167,10 +189,30 @@ public void onSearchIdleReactivation() {
167189
compositeListener.onQueryPhase(ctx, timeInNanos.get());
168190
assertEquals(0, preFetch.get());
169191
assertEquals(0, preQuery.get());
192+
assertEquals(0, preSlice.get());
193+
assertEquals(0, failedFetch.get());
194+
assertEquals(0, failedQuery.get());
195+
assertEquals(0, failedSlice.get());
196+
assertEquals(2, onQuery.get());
197+
assertEquals(0, onFetch.get());
198+
assertEquals(0, onSlice.get());
199+
assertEquals(0, newContext.get());
200+
assertEquals(0, newScrollContext.get());
201+
assertEquals(0, freeContext.get());
202+
assertEquals(0, freeScrollContext.get());
203+
assertEquals(0, searchIdleReactivateCount.get());
204+
assertEquals(0, validateSearchContext.get());
205+
206+
compositeListener.onSliceExecution(ctx, Collections.emptyList());
207+
assertEquals(0, preFetch.get());
208+
assertEquals(0, preQuery.get());
209+
assertEquals(0, preSlice.get());
170210
assertEquals(0, failedFetch.get());
171211
assertEquals(0, failedQuery.get());
212+
assertEquals(0, failedSlice.get());
172213
assertEquals(2, onQuery.get());
173214
assertEquals(0, onFetch.get());
215+
assertEquals(2, onSlice.get());
174216
assertEquals(0, newContext.get());
175217
assertEquals(0, newScrollContext.get());
176218
assertEquals(0, freeContext.get());
@@ -181,10 +223,13 @@ public void onSearchIdleReactivation() {
181223
compositeListener.onFetchPhase(ctx, timeInNanos.get());
182224
assertEquals(0, preFetch.get());
183225
assertEquals(0, preQuery.get());
226+
assertEquals(0, preSlice.get());
184227
assertEquals(0, failedFetch.get());
185228
assertEquals(0, failedQuery.get());
229+
assertEquals(0, failedSlice.get());
186230
assertEquals(2, onQuery.get());
187231
assertEquals(2, onFetch.get());
232+
assertEquals(2, onSlice.get());
188233
assertEquals(0, newContext.get());
189234
assertEquals(0, newScrollContext.get());
190235
assertEquals(0, freeContext.get());
@@ -195,10 +240,30 @@ public void onSearchIdleReactivation() {
195240
compositeListener.onPreQueryPhase(ctx);
196241
assertEquals(0, preFetch.get());
197242
assertEquals(2, preQuery.get());
243+
assertEquals(0, preSlice.get());
198244
assertEquals(0, failedFetch.get());
199245
assertEquals(0, failedQuery.get());
246+
assertEquals(0, failedSlice.get());
200247
assertEquals(2, onQuery.get());
201248
assertEquals(2, onFetch.get());
249+
assertEquals(2, onSlice.get());
250+
assertEquals(0, newContext.get());
251+
assertEquals(0, newScrollContext.get());
252+
assertEquals(0, freeContext.get());
253+
assertEquals(0, freeScrollContext.get());
254+
assertEquals(0, searchIdleReactivateCount.get());
255+
assertEquals(0, validateSearchContext.get());
256+
257+
compositeListener.onPreSliceExecution(ctx, Collections.emptyList());
258+
assertEquals(0, preFetch.get());
259+
assertEquals(2, preQuery.get());
260+
assertEquals(2, preSlice.get());
261+
assertEquals(0, failedFetch.get());
262+
assertEquals(0, failedQuery.get());
263+
assertEquals(0, failedSlice.get());
264+
assertEquals(2, onQuery.get());
265+
assertEquals(2, onFetch.get());
266+
assertEquals(2, onSlice.get());
202267
assertEquals(0, newContext.get());
203268
assertEquals(0, newScrollContext.get());
204269
assertEquals(0, freeContext.get());
@@ -209,10 +274,13 @@ public void onSearchIdleReactivation() {
209274
compositeListener.onPreFetchPhase(ctx);
210275
assertEquals(2, preFetch.get());
211276
assertEquals(2, preQuery.get());
277+
assertEquals(2, preSlice.get());
212278
assertEquals(0, failedFetch.get());
213279
assertEquals(0, failedQuery.get());
280+
assertEquals(0, failedSlice.get());
214281
assertEquals(2, onQuery.get());
215282
assertEquals(2, onFetch.get());
283+
assertEquals(2, onSlice.get());
216284
assertEquals(0, newContext.get());
217285
assertEquals(0, newScrollContext.get());
218286
assertEquals(0, freeContext.get());
@@ -223,10 +291,13 @@ public void onSearchIdleReactivation() {
223291
compositeListener.onFailedFetchPhase(ctx);
224292
assertEquals(2, preFetch.get());
225293
assertEquals(2, preQuery.get());
294+
assertEquals(2, preSlice.get());
226295
assertEquals(2, failedFetch.get());
227296
assertEquals(0, failedQuery.get());
297+
assertEquals(0, failedSlice.get());
228298
assertEquals(2, onQuery.get());
229299
assertEquals(2, onFetch.get());
300+
assertEquals(2, onSlice.get());
230301
assertEquals(0, newContext.get());
231302
assertEquals(0, newScrollContext.get());
232303
assertEquals(0, freeContext.get());
@@ -237,10 +308,30 @@ public void onSearchIdleReactivation() {
237308
compositeListener.onFailedQueryPhase(ctx);
238309
assertEquals(2, preFetch.get());
239310
assertEquals(2, preQuery.get());
311+
assertEquals(2, preSlice.get());
312+
assertEquals(2, failedFetch.get());
313+
assertEquals(2, failedQuery.get());
314+
assertEquals(0, failedSlice.get());
315+
assertEquals(2, onQuery.get());
316+
assertEquals(2, onFetch.get());
317+
assertEquals(2, onSlice.get());
318+
assertEquals(0, newContext.get());
319+
assertEquals(0, newScrollContext.get());
320+
assertEquals(0, freeContext.get());
321+
assertEquals(0, freeScrollContext.get());
322+
assertEquals(0, searchIdleReactivateCount.get());
323+
assertEquals(0, validateSearchContext.get());
324+
325+
compositeListener.onFailedSliceExecution(ctx, Collections.emptyList());
326+
assertEquals(2, preFetch.get());
327+
assertEquals(2, preQuery.get());
328+
assertEquals(2, preSlice.get());
240329
assertEquals(2, failedFetch.get());
241330
assertEquals(2, failedQuery.get());
331+
assertEquals(2, failedSlice.get());
242332
assertEquals(2, onQuery.get());
243333
assertEquals(2, onFetch.get());
334+
assertEquals(2, onSlice.get());
244335
assertEquals(0, newContext.get());
245336
assertEquals(0, newScrollContext.get());
246337
assertEquals(0, freeContext.get());
@@ -251,10 +342,13 @@ public void onSearchIdleReactivation() {
251342
compositeListener.onNewReaderContext(mock(ReaderContext.class));
252343
assertEquals(2, preFetch.get());
253344
assertEquals(2, preQuery.get());
345+
assertEquals(2, preSlice.get());
254346
assertEquals(2, failedFetch.get());
255347
assertEquals(2, failedQuery.get());
348+
assertEquals(2, failedSlice.get());
256349
assertEquals(2, onQuery.get());
257350
assertEquals(2, onFetch.get());
351+
assertEquals(2, onSlice.get());
258352
assertEquals(2, newContext.get());
259353
assertEquals(0, newScrollContext.get());
260354
assertEquals(0, freeContext.get());
@@ -265,10 +359,13 @@ public void onSearchIdleReactivation() {
265359
compositeListener.onNewScrollContext(mock(ReaderContext.class));
266360
assertEquals(2, preFetch.get());
267361
assertEquals(2, preQuery.get());
362+
assertEquals(2, preSlice.get());
268363
assertEquals(2, failedFetch.get());
269364
assertEquals(2, failedQuery.get());
365+
assertEquals(2, failedSlice.get());
270366
assertEquals(2, onQuery.get());
271367
assertEquals(2, onFetch.get());
368+
assertEquals(2, onSlice.get());
272369
assertEquals(2, newContext.get());
273370
assertEquals(2, newScrollContext.get());
274371
assertEquals(0, freeContext.get());
@@ -279,10 +376,13 @@ public void onSearchIdleReactivation() {
279376
compositeListener.onFreeReaderContext(mock(ReaderContext.class));
280377
assertEquals(2, preFetch.get());
281378
assertEquals(2, preQuery.get());
379+
assertEquals(2, preSlice.get());
282380
assertEquals(2, failedFetch.get());
283381
assertEquals(2, failedQuery.get());
382+
assertEquals(2, failedSlice.get());
284383
assertEquals(2, onQuery.get());
285384
assertEquals(2, onFetch.get());
385+
assertEquals(2, onSlice.get());
286386
assertEquals(2, newContext.get());
287387
assertEquals(2, newScrollContext.get());
288388
assertEquals(2, freeContext.get());
@@ -293,10 +393,13 @@ public void onSearchIdleReactivation() {
293393
compositeListener.onFreeScrollContext(mock(ReaderContext.class));
294394
assertEquals(2, preFetch.get());
295395
assertEquals(2, preQuery.get());
396+
assertEquals(2, preSlice.get());
296397
assertEquals(2, failedFetch.get());
297398
assertEquals(2, failedQuery.get());
399+
assertEquals(2, failedSlice.get());
298400
assertEquals(2, onQuery.get());
299401
assertEquals(2, onFetch.get());
402+
assertEquals(2, onSlice.get());
300403
assertEquals(2, newContext.get());
301404
assertEquals(2, newScrollContext.get());
302405
assertEquals(2, freeContext.get());
@@ -307,10 +410,13 @@ public void onSearchIdleReactivation() {
307410
compositeListener.onSearchIdleReactivation();
308411
assertEquals(2, preFetch.get());
309412
assertEquals(2, preQuery.get());
413+
assertEquals(2, preSlice.get());
310414
assertEquals(2, failedFetch.get());
311415
assertEquals(2, failedQuery.get());
416+
assertEquals(2, failedSlice.get());
312417
assertEquals(2, onQuery.get());
313418
assertEquals(2, onFetch.get());
419+
assertEquals(2, onSlice.get());
314420
assertEquals(2, newContext.get());
315421
assertEquals(2, newScrollContext.get());
316422
assertEquals(2, freeContext.get());

server/src/test/java/org/opensearch/search/SearchCancellationTests.java

+4
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.opensearch.common.util.io.IOUtils;
5252
import org.opensearch.core.tasks.TaskCancelledException;
5353
import org.opensearch.index.shard.IndexShard;
54+
import org.opensearch.index.shard.SearchOperationListener;
5455
import org.opensearch.search.internal.ContextIndexSearcher;
5556
import org.opensearch.search.internal.SearchContext;
5657
import org.opensearch.test.OpenSearchTestCase;
@@ -138,6 +139,9 @@ public void testCancellableCollector() throws IOException {
138139
Runnable cancellation = () -> { throw new TaskCancelledException("cancelled"); };
139140
IndexShard indexShard = mock(IndexShard.class);
140141
when(searchContext.indexShard()).thenReturn(indexShard);
142+
SearchOperationListener searchOperationListener = new SearchOperationListener() {
143+
};
144+
when(indexShard.getSearchOperationListener()).thenReturn(searchOperationListener);
141145
ContextIndexSearcher searcher = new ContextIndexSearcher(
142146
reader,
143147
IndexSearcher.getDefaultSimilarity(),

0 commit comments

Comments
 (0)