Skip to content

Commit 0b06937

Browse files
jed326Jay Deng
authored and
Jay Deng
committed
Add slice level operation listener methods
Signed-off-by: Jay Deng <jayd0104@gmail.com>
1 parent 348c04e commit 0b06937

File tree

10 files changed

+194
-12
lines changed

10 files changed

+194
-12
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1616
- Add `rangeQuery` and `regexpQuery` for `constant_keyword` field type ([#14711](https://github.com/opensearch-project/OpenSearch/pull/14711))
1717
- Add took time to request nodes stats ([#15054](https://github.com/opensearch-project/OpenSearch/pull/15054))
1818
- [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897))
19+
- Add slice execution listeners to SearchOperationListener interface ([#15153](https://github.com/opensearch-project/OpenSearch/pull/15153))
1920

2021
### Dependencies
2122
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))

server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java

+39
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,12 @@ default void onFailedQueryPhase(SearchContext searchContext) {}
7171
*/
7272
default void onQueryPhase(SearchContext searchContext, long tookInNanos) {}
7373

74+
default void onPreSliceExecution(SearchContext searchContext) {}
75+
76+
default void onFailedSliceExecution(SearchContext searchContext) {}
77+
78+
default void onSliceExecution(SearchContext searchContext) {}
79+
7480
/**
7581
* Executed before the fetch phase is executed
7682
* @param searchContext the current search context
@@ -195,6 +201,39 @@ public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
195201
}
196202
}
197203

204+
@Override
205+
public void onPreSliceExecution(SearchContext searchContext) {
206+
for (SearchOperationListener listener : listeners) {
207+
try {
208+
listener.onPreSliceExecution(searchContext);
209+
} catch (Exception e) {
210+
logger.warn(() -> new ParameterizedMessage("onPreSliceExecution listener [{}] failed", listener), e);
211+
}
212+
}
213+
}
214+
215+
@Override
216+
public void onFailedSliceExecution(SearchContext searchContext) {
217+
for (SearchOperationListener listener : listeners) {
218+
try {
219+
listener.onFailedSliceExecution(searchContext);
220+
} catch (Exception e) {
221+
logger.warn(() -> new ParameterizedMessage("onFailedSliceExecution listener [{}] failed", listener), e);
222+
}
223+
}
224+
}
225+
226+
@Override
227+
public void onSliceExecution(SearchContext searchContext) {
228+
for (SearchOperationListener listener : listeners) {
229+
try {
230+
listener.onSliceExecution(searchContext);
231+
} catch (Exception e) {
232+
logger.warn(() -> new ParameterizedMessage("onSliceExecution listener [{}] failed", listener), e);
233+
}
234+
}
235+
}
236+
198237
@Override
199238
public void onPreFetchPhase(SearchContext searchContext) {
200239
for (SearchOperationListener listener : listeners) {

server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java

+19-12
Original file line numberDiff line numberDiff line change
@@ -270,20 +270,27 @@ public void search(
270270

271271
@Override
272272
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
273-
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
274-
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
275-
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
276-
// reader order here.
277-
if (searchContext.shouldUseTimeSeriesDescSortOptimization()) {
278-
for (int i = leaves.size() - 1; i >= 0; i--) {
279-
searchLeaf(leaves.get(i), weight, collector);
280-
}
281-
} else {
282-
for (int i = 0; i < leaves.size(); i++) {
283-
searchLeaf(leaves.get(i), weight, collector);
273+
searchContext.indexShard().getSearchOperationListener().onPreSliceExecution(searchContext);
274+
try {
275+
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
276+
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
277+
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
278+
// reader order here.
279+
if (searchContext.shouldUseTimeSeriesDescSortOptimization()) {
280+
for (int i = leaves.size() - 1; i >= 0; i--) {
281+
searchLeaf(leaves.get(i), weight, collector);
282+
}
283+
} else {
284+
for (int i = 0; i < leaves.size(); i++) {
285+
searchLeaf(leaves.get(i), weight, collector);
286+
}
284287
}
288+
searchContext.bucketCollectorProcessor().processPostCollection(collector);
289+
} catch (Throwable t) {
290+
searchContext.indexShard().getSearchOperationListener().onFailedSliceExecution(searchContext);
291+
throw t;
285292
}
286-
searchContext.bucketCollectorProcessor().processPostCollection(collector);
293+
searchContext.indexShard().getSearchOperationListener().onSliceExecution(searchContext);
287294
}
288295

289296
/**

server/src/test/java/org/opensearch/index/shard/SearchOperationListenerTests.java

+105
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ public void testListenersAreExecuted() {
5656
AtomicInteger preQuery = new AtomicInteger();
5757
AtomicInteger failedQuery = new AtomicInteger();
5858
AtomicInteger onQuery = new AtomicInteger();
59+
AtomicInteger preSlice = new AtomicInteger();
60+
AtomicInteger failedSlice = new AtomicInteger();
61+
AtomicInteger onSlice = new AtomicInteger();
5962
AtomicInteger onFetch = new AtomicInteger();
6063
AtomicInteger preFetch = new AtomicInteger();
6164
AtomicInteger failedFetch = new AtomicInteger();
@@ -86,6 +89,24 @@ public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
8689
onQuery.incrementAndGet();
8790
}
8891

92+
@Override
93+
public void onPreSliceExecution(SearchContext searchContext) {
94+
assertNotNull(searchContext);
95+
preSlice.incrementAndGet();
96+
}
97+
98+
@Override
99+
public void onFailedSliceExecution(SearchContext searchContext) {
100+
assertNotNull(searchContext);
101+
failedSlice.incrementAndGet();
102+
}
103+
104+
@Override
105+
public void onSliceExecution(SearchContext searchContext) {
106+
assertNotNull(searchContext);
107+
onSlice.incrementAndGet();
108+
}
109+
89110
@Override
90111
public void onPreFetchPhase(SearchContext searchContext) {
91112
assertNotNull(searchContext);
@@ -167,10 +188,30 @@ public void onSearchIdleReactivation() {
167188
compositeListener.onQueryPhase(ctx, timeInNanos.get());
168189
assertEquals(0, preFetch.get());
169190
assertEquals(0, preQuery.get());
191+
assertEquals(0, preSlice.get());
192+
assertEquals(0, failedFetch.get());
193+
assertEquals(0, failedQuery.get());
194+
assertEquals(0, failedSlice.get());
195+
assertEquals(2, onQuery.get());
196+
assertEquals(0, onFetch.get());
197+
assertEquals(0, onSlice.get());
198+
assertEquals(0, newContext.get());
199+
assertEquals(0, newScrollContext.get());
200+
assertEquals(0, freeContext.get());
201+
assertEquals(0, freeScrollContext.get());
202+
assertEquals(0, searchIdleReactivateCount.get());
203+
assertEquals(0, validateSearchContext.get());
204+
205+
compositeListener.onSliceExecution(ctx);
206+
assertEquals(0, preFetch.get());
207+
assertEquals(0, preQuery.get());
208+
assertEquals(0, preSlice.get());
170209
assertEquals(0, failedFetch.get());
171210
assertEquals(0, failedQuery.get());
211+
assertEquals(0, failedSlice.get());
172212
assertEquals(2, onQuery.get());
173213
assertEquals(0, onFetch.get());
214+
assertEquals(2, onSlice.get());
174215
assertEquals(0, newContext.get());
175216
assertEquals(0, newScrollContext.get());
176217
assertEquals(0, freeContext.get());
@@ -181,10 +222,13 @@ public void onSearchIdleReactivation() {
181222
compositeListener.onFetchPhase(ctx, timeInNanos.get());
182223
assertEquals(0, preFetch.get());
183224
assertEquals(0, preQuery.get());
225+
assertEquals(0, preSlice.get());
184226
assertEquals(0, failedFetch.get());
185227
assertEquals(0, failedQuery.get());
228+
assertEquals(0, failedSlice.get());
186229
assertEquals(2, onQuery.get());
187230
assertEquals(2, onFetch.get());
231+
assertEquals(2, onSlice.get());
188232
assertEquals(0, newContext.get());
189233
assertEquals(0, newScrollContext.get());
190234
assertEquals(0, freeContext.get());
@@ -195,10 +239,30 @@ public void onSearchIdleReactivation() {
195239
compositeListener.onPreQueryPhase(ctx);
196240
assertEquals(0, preFetch.get());
197241
assertEquals(2, preQuery.get());
242+
assertEquals(0, preSlice.get());
198243
assertEquals(0, failedFetch.get());
199244
assertEquals(0, failedQuery.get());
245+
assertEquals(0, failedSlice.get());
200246
assertEquals(2, onQuery.get());
201247
assertEquals(2, onFetch.get());
248+
assertEquals(2, onSlice.get());
249+
assertEquals(0, newContext.get());
250+
assertEquals(0, newScrollContext.get());
251+
assertEquals(0, freeContext.get());
252+
assertEquals(0, freeScrollContext.get());
253+
assertEquals(0, searchIdleReactivateCount.get());
254+
assertEquals(0, validateSearchContext.get());
255+
256+
compositeListener.onPreSliceExecution(ctx);
257+
assertEquals(0, preFetch.get());
258+
assertEquals(2, preQuery.get());
259+
assertEquals(2, preSlice.get());
260+
assertEquals(0, failedFetch.get());
261+
assertEquals(0, failedQuery.get());
262+
assertEquals(0, failedSlice.get());
263+
assertEquals(2, onQuery.get());
264+
assertEquals(2, onFetch.get());
265+
assertEquals(2, onSlice.get());
202266
assertEquals(0, newContext.get());
203267
assertEquals(0, newScrollContext.get());
204268
assertEquals(0, freeContext.get());
@@ -209,10 +273,13 @@ public void onSearchIdleReactivation() {
209273
compositeListener.onPreFetchPhase(ctx);
210274
assertEquals(2, preFetch.get());
211275
assertEquals(2, preQuery.get());
276+
assertEquals(2, preSlice.get());
212277
assertEquals(0, failedFetch.get());
213278
assertEquals(0, failedQuery.get());
279+
assertEquals(0, failedSlice.get());
214280
assertEquals(2, onQuery.get());
215281
assertEquals(2, onFetch.get());
282+
assertEquals(2, onSlice.get());
216283
assertEquals(0, newContext.get());
217284
assertEquals(0, newScrollContext.get());
218285
assertEquals(0, freeContext.get());
@@ -223,10 +290,13 @@ public void onSearchIdleReactivation() {
223290
compositeListener.onFailedFetchPhase(ctx);
224291
assertEquals(2, preFetch.get());
225292
assertEquals(2, preQuery.get());
293+
assertEquals(2, preSlice.get());
226294
assertEquals(2, failedFetch.get());
227295
assertEquals(0, failedQuery.get());
296+
assertEquals(0, failedSlice.get());
228297
assertEquals(2, onQuery.get());
229298
assertEquals(2, onFetch.get());
299+
assertEquals(2, onSlice.get());
230300
assertEquals(0, newContext.get());
231301
assertEquals(0, newScrollContext.get());
232302
assertEquals(0, freeContext.get());
@@ -237,10 +307,30 @@ public void onSearchIdleReactivation() {
237307
compositeListener.onFailedQueryPhase(ctx);
238308
assertEquals(2, preFetch.get());
239309
assertEquals(2, preQuery.get());
310+
assertEquals(2, preSlice.get());
311+
assertEquals(2, failedFetch.get());
312+
assertEquals(2, failedQuery.get());
313+
assertEquals(0, failedSlice.get());
314+
assertEquals(2, onQuery.get());
315+
assertEquals(2, onFetch.get());
316+
assertEquals(2, onSlice.get());
317+
assertEquals(0, newContext.get());
318+
assertEquals(0, newScrollContext.get());
319+
assertEquals(0, freeContext.get());
320+
assertEquals(0, freeScrollContext.get());
321+
assertEquals(0, searchIdleReactivateCount.get());
322+
assertEquals(0, validateSearchContext.get());
323+
324+
compositeListener.onFailedSliceExecution(ctx);
325+
assertEquals(2, preFetch.get());
326+
assertEquals(2, preQuery.get());
327+
assertEquals(2, preSlice.get());
240328
assertEquals(2, failedFetch.get());
241329
assertEquals(2, failedQuery.get());
330+
assertEquals(2, failedSlice.get());
242331
assertEquals(2, onQuery.get());
243332
assertEquals(2, onFetch.get());
333+
assertEquals(2, onSlice.get());
244334
assertEquals(0, newContext.get());
245335
assertEquals(0, newScrollContext.get());
246336
assertEquals(0, freeContext.get());
@@ -251,10 +341,13 @@ public void onSearchIdleReactivation() {
251341
compositeListener.onNewReaderContext(mock(ReaderContext.class));
252342
assertEquals(2, preFetch.get());
253343
assertEquals(2, preQuery.get());
344+
assertEquals(2, preSlice.get());
254345
assertEquals(2, failedFetch.get());
255346
assertEquals(2, failedQuery.get());
347+
assertEquals(2, failedSlice.get());
256348
assertEquals(2, onQuery.get());
257349
assertEquals(2, onFetch.get());
350+
assertEquals(2, onSlice.get());
258351
assertEquals(2, newContext.get());
259352
assertEquals(0, newScrollContext.get());
260353
assertEquals(0, freeContext.get());
@@ -265,10 +358,13 @@ public void onSearchIdleReactivation() {
265358
compositeListener.onNewScrollContext(mock(ReaderContext.class));
266359
assertEquals(2, preFetch.get());
267360
assertEquals(2, preQuery.get());
361+
assertEquals(2, preSlice.get());
268362
assertEquals(2, failedFetch.get());
269363
assertEquals(2, failedQuery.get());
364+
assertEquals(2, failedSlice.get());
270365
assertEquals(2, onQuery.get());
271366
assertEquals(2, onFetch.get());
367+
assertEquals(2, onSlice.get());
272368
assertEquals(2, newContext.get());
273369
assertEquals(2, newScrollContext.get());
274370
assertEquals(0, freeContext.get());
@@ -279,10 +375,13 @@ public void onSearchIdleReactivation() {
279375
compositeListener.onFreeReaderContext(mock(ReaderContext.class));
280376
assertEquals(2, preFetch.get());
281377
assertEquals(2, preQuery.get());
378+
assertEquals(2, preSlice.get());
282379
assertEquals(2, failedFetch.get());
283380
assertEquals(2, failedQuery.get());
381+
assertEquals(2, failedSlice.get());
284382
assertEquals(2, onQuery.get());
285383
assertEquals(2, onFetch.get());
384+
assertEquals(2, onSlice.get());
286385
assertEquals(2, newContext.get());
287386
assertEquals(2, newScrollContext.get());
288387
assertEquals(2, freeContext.get());
@@ -293,10 +392,13 @@ public void onSearchIdleReactivation() {
293392
compositeListener.onFreeScrollContext(mock(ReaderContext.class));
294393
assertEquals(2, preFetch.get());
295394
assertEquals(2, preQuery.get());
395+
assertEquals(2, preSlice.get());
296396
assertEquals(2, failedFetch.get());
297397
assertEquals(2, failedQuery.get());
398+
assertEquals(2, failedSlice.get());
298399
assertEquals(2, onQuery.get());
299400
assertEquals(2, onFetch.get());
401+
assertEquals(2, onSlice.get());
300402
assertEquals(2, newContext.get());
301403
assertEquals(2, newScrollContext.get());
302404
assertEquals(2, freeContext.get());
@@ -307,10 +409,13 @@ public void onSearchIdleReactivation() {
307409
compositeListener.onSearchIdleReactivation();
308410
assertEquals(2, preFetch.get());
309411
assertEquals(2, preQuery.get());
412+
assertEquals(2, preSlice.get());
310413
assertEquals(2, failedFetch.get());
311414
assertEquals(2, failedQuery.get());
415+
assertEquals(2, failedSlice.get());
312416
assertEquals(2, onQuery.get());
313417
assertEquals(2, onFetch.get());
418+
assertEquals(2, onSlice.get());
314419
assertEquals(2, newContext.get());
315420
assertEquals(2, newScrollContext.get());
316421
assertEquals(2, freeContext.get());

server/src/test/java/org/opensearch/search/SearchCancellationTests.java

+4
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.opensearch.common.util.io.IOUtils;
5252
import org.opensearch.core.tasks.TaskCancelledException;
5353
import org.opensearch.index.shard.IndexShard;
54+
import org.opensearch.index.shard.SearchOperationListener;
5455
import org.opensearch.search.internal.ContextIndexSearcher;
5556
import org.opensearch.search.internal.SearchContext;
5657
import org.opensearch.test.OpenSearchTestCase;
@@ -138,6 +139,9 @@ public void testCancellableCollector() throws IOException {
138139
Runnable cancellation = () -> { throw new TaskCancelledException("cancelled"); };
139140
IndexShard indexShard = mock(IndexShard.class);
140141
when(searchContext.indexShard()).thenReturn(indexShard);
142+
SearchOperationListener searchOperationListener = new SearchOperationListener() {
143+
};
144+
when(indexShard.getSearchOperationListener()).thenReturn(searchOperationListener);
141145
ContextIndexSearcher searcher = new ContextIndexSearcher(
142146
reader,
143147
IndexSearcher.getDefaultSimilarity(),

server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java

+4
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.opensearch.index.IndexSettings;
8282
import org.opensearch.index.cache.bitset.BitsetFilterCache;
8383
import org.opensearch.index.shard.IndexShard;
84+
import org.opensearch.index.shard.SearchOperationListener;
8485
import org.opensearch.search.SearchService;
8586
import org.opensearch.search.aggregations.LeafBucketCollector;
8687
import org.opensearch.test.IndexSettingsModule;
@@ -262,6 +263,9 @@ public void onRemoval(ShardId shardId, Accountable accountable) {
262263
SearchContext searchContext = mock(SearchContext.class);
263264
IndexShard indexShard = mock(IndexShard.class);
264265
when(searchContext.indexShard()).thenReturn(indexShard);
266+
SearchOperationListener searchOperationListener = new SearchOperationListener() {
267+
};
268+
when(indexShard.getSearchOperationListener()).thenReturn(searchOperationListener);
265269
when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR);
266270
ContextIndexSearcher searcher = new ContextIndexSearcher(
267271
filteredReader,

server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java

+4
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.apache.lucene.tests.util.TestUtil;
6363
import org.opensearch.common.util.io.IOUtils;
6464
import org.opensearch.index.shard.IndexShard;
65+
import org.opensearch.index.shard.SearchOperationListener;
6566
import org.opensearch.search.internal.ContextIndexSearcher;
6667
import org.opensearch.search.internal.SearchContext;
6768
import org.opensearch.search.profile.ProfileResult;
@@ -128,6 +129,9 @@ public void setUp() throws Exception {
128129
SearchContext searchContext = mock(SearchContext.class);
129130
IndexShard indexShard = mock(IndexShard.class);
130131
when(searchContext.indexShard()).thenReturn(indexShard);
132+
SearchOperationListener searchOperationListener = new SearchOperationListener() {
133+
};
134+
when(indexShard.getSearchOperationListener()).thenReturn(searchOperationListener);
131135
when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR);
132136
searcher = new ContextIndexSearcher(
133137
reader,

0 commit comments

Comments
 (0)