Skip to content

Commit 42d8918

Browse files
author
Jay Deng
committed
rough POC for ordered QueryCollectorContexts
1 parent f30e0e0 commit 42d8918

File tree

3 files changed

+108
-1
lines changed

3 files changed

+108
-1
lines changed

server/src/main/java/org/opensearch/search/query/QueryCollectorContext.java

+26
Original file line numberDiff line numberDiff line change
@@ -308,4 +308,30 @@ CollectorManager<? extends Collector, ReduceableSearchResult> createManager(
308308
}
309309
};
310310
}
311+
312+
static QueryCollectorContext createFakeContext() {
313+
return new QueryCollectorContext("fake_plugin") {
314+
@Override
315+
Collector create(Collector in) {
316+
return EMPTY_COLLECTOR;
317+
}
318+
319+
@Override
320+
CollectorManager<? extends Collector, ReduceableSearchResult> createManager(
321+
CollectorManager<? extends Collector, ReduceableSearchResult> in
322+
) throws IOException {
323+
return new CollectorManager<Collector, ReduceableSearchResult>() {
324+
@Override
325+
public Collector newCollector() throws IOException {
326+
return EMPTY_COLLECTOR;
327+
}
328+
329+
@Override
330+
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
331+
return result -> {};
332+
}
333+
};
334+
}
335+
};
336+
}
311337
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search.query;
10+
11+
import org.apache.lucene.search.Collector;
12+
import org.apache.lucene.search.CollectorManager;
13+
14+
import java.io.IOException;
15+
import java.util.ArrayList;
16+
import java.util.Comparator;
17+
import java.util.PriorityQueue;
18+
19+
public class QueryContextProvider {
20+
private final PriorityQueue<OrderedQueryContext> contextQueue = new PriorityQueue<>(new Comparator<OrderedQueryContext>() {
21+
@Override
22+
public int compare(OrderedQueryContext o1, OrderedQueryContext o2) {
23+
return o1.getPriority() - o2.getPriority();
24+
}
25+
});
26+
27+
public void addQueryContext(QueryCollectorContext collectorContext, int priority) {
28+
contextQueue.offer(new OrderedQueryContext(collectorContext, priority));
29+
}
30+
31+
public QueryCollectorContext getComposedContext() {
32+
return new QueryCollectorContext("provider") {
33+
@Override
34+
Collector create(Collector in) throws IOException {
35+
OrderedQueryContext octx = contextQueue.poll();
36+
Collector collector = null;
37+
while (octx != null) {
38+
collector = octx.getContext().create(collector);
39+
octx = contextQueue.poll();
40+
}
41+
return collector;
42+
}
43+
44+
@Override
45+
CollectorManager<?, ReduceableSearchResult> createManager(CollectorManager<?, ReduceableSearchResult> in) throws IOException {
46+
OrderedQueryContext octx = contextQueue.poll();
47+
CollectorManager<?, ReduceableSearchResult> manager = null;
48+
while (octx != null) {
49+
manager = octx.getContext().createManager(manager);
50+
octx = contextQueue.poll();
51+
}
52+
return manager;
53+
}
54+
};
55+
}
56+
}
57+
58+
class OrderedQueryContext {
59+
private final int priority;
60+
private final QueryCollectorContext context;
61+
62+
public OrderedQueryContext(QueryCollectorContext context, int priority) {
63+
this.priority = priority;
64+
this.context = context;
65+
}
66+
67+
public QueryCollectorContext getContext() {
68+
return this.context;
69+
}
70+
71+
public int getPriority() {
72+
return priority;
73+
}
74+
}

server/src/main/java/org/opensearch/search/query/QueryPhase.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import java.util.stream.Collectors;
8181

8282
import static org.opensearch.search.query.QueryCollectorContext.createEarlyTerminationCollectorContext;
83+
import static org.opensearch.search.query.QueryCollectorContext.createFakeContext;
8384
import static org.opensearch.search.query.QueryCollectorContext.createFilteredCollectorContext;
8485
import static org.opensearch.search.query.QueryCollectorContext.createMinScoreCollectorContext;
8586
import static org.opensearch.search.query.QueryCollectorContext.createMultiCollectorContext;
@@ -100,6 +101,7 @@ public class QueryPhase {
100101
private final QueryPhaseSearcher queryPhaseSearcher;
101102
private final SuggestProcessor suggestProcessor;
102103
private final RescoreProcessor rescoreProcessor;
104+
// private final QueryContextProvider contextProvider;
103105

104106
public QueryPhase() {
105107
this(DEFAULT_QUERY_PHASE_SEARCHER);
@@ -109,6 +111,7 @@ public QueryPhase(QueryPhaseSearcher queryPhaseSearcher) {
109111
this.queryPhaseSearcher = Objects.requireNonNull(queryPhaseSearcher, "QueryPhaseSearcher is required");
110112
this.suggestProcessor = new SuggestProcessor();
111113
this.rescoreProcessor = new RescoreProcessor();
114+
// this.contextProvider = new QueryContextProvider();
112115
}
113116

114117
public void preProcess(SearchContext context) {
@@ -192,6 +195,7 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q
192195
final ContextIndexSearcher searcher = searchContext.searcher();
193196
final IndexReader reader = searcher.getIndexReader();
194197
QuerySearchResult queryResult = searchContext.queryResult();
198+
QueryContextProvider contextProvider = new QueryContextProvider();
195199
queryResult.searchTimedOut(false);
196200
try {
197201
queryResult.from(searchContext.from());
@@ -248,7 +252,10 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q
248252
.map(Map.Entry::getValue)
249253
.collect(Collectors.toList());
250254
if (managersExceptGlobalAgg.isEmpty() == false) {
251-
collectors.add(createMultiCollectorContext(managersExceptGlobalAgg));
255+
contextProvider.addQueryContext(createFakeContext(), 100);
256+
contextProvider.addQueryContext(createMultiCollectorContext(managersExceptGlobalAgg), 50);
257+
collectors.add(contextProvider.getComposedContext());
258+
// collectors.add(createMultiCollectorContext(managersExceptGlobalAgg));
252259
}
253260

254261
if (searchContext.minimumScore() != null) {

0 commit comments

Comments
 (0)