Skip to content

Commit 279d934

Browse files
[Feature] Neural Sparse Query Two Phase Search pipeline (#747) (#771)
* Poc of pipeline Signed-off-by: conggguan <congguan@amazon.com> * Complete some settings for two phase pipeline. Signed-off-by: conggguan <congguan@amazon.com> * Change the implement of two-phase from QueryBuilderVistor to custom process funciton. Signed-off-by: conggguan <congguan@amazon.com> * Add It and fix some bug on the state of multy same neuralsparsequerybuilder. Signed-off-by: conggguan <congguan@amazon.com> * Simplify some logic, and correct some format. Signed-off-by: conggguan <congguan@amazon.com> * Optimize some format. Signed-off-by: conggguan <congguan@amazon.com> * Add some test case. Signed-off-by: conggguan <congguan@amazon.com> * Optimize some logic for zhichao-aws's comments. Signed-off-by: conggguan <congguan@amazon.com> * Optimize a line without application. Signed-off-by: conggguan <congguan@amazon.com> * Add some comments, remove some redundant lines, fix some format. Signed-off-by: conggguan <congguan@amazon.com> * Remove a redundant null check, fix a if format. Signed-off-by: conggguan <congguan@amazon.com> * Fix a typo for a comment, camelcase format for some variable. Signed-off-by: conggguan <congguan@amazon.com> * Add some comments to illustrate the influence of the modify on 2-phase search pipeline to neural sparse query builder. Signed-off-by: conggguan <congguan@amazon.com> --------- Signed-off-by: conggguan <congguan@amazon.com> Signed-off-by: conggguan <157357330+conggguan@users.noreply.github.com> (cherry picked from commit 2b21110) Co-authored-by: conggguan <157357330+conggguan@users.noreply.github.com>
1 parent f2cd208 commit 279d934

File tree

10 files changed

+1234
-41
lines changed

10 files changed

+1234
-41
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1414

1515
## [Unreleased 2.x](https://github.com/opensearch-project/neural-search/compare/2.14...2.x)
1616
### Features
17+
- Speed up NeuralSparseQuery by two-phase using a custom search pipeline.([#646](https://github.com/opensearch-project/neural-search/issues/646))
1718
- Support batchExecute in TextEmbeddingProcessor and SparseEncodingProcessor ([#743](https://github.com/opensearch-project/neural-search/issues/743))
1819
### Enhancements
1920
- Pass empty doc collector instead of top docs collector to improve hybrid query latencies by 20% ([#731](https://github.com/opensearch-project/neural-search/pull/731))

src/main/java/org/opensearch/neuralsearch/plugin/NeuralSearch.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.opensearch.ml.client.MachineLearningNodeClient;
2828
import org.opensearch.neuralsearch.ml.MLCommonsClientAccessor;
2929
import org.opensearch.neuralsearch.processor.NeuralQueryEnricherProcessor;
30+
import org.opensearch.neuralsearch.processor.NeuralSparseTwoPhaseProcessor;
3031
import org.opensearch.neuralsearch.processor.NormalizationProcessor;
3132
import org.opensearch.neuralsearch.processor.NormalizationProcessorWorkflow;
3233
import org.opensearch.neuralsearch.processor.SparseEncodingProcessor;
@@ -157,7 +158,12 @@ public List<Setting<?>> getSettings() {
157158
public Map<String, org.opensearch.search.pipeline.Processor.Factory<SearchRequestProcessor>> getRequestProcessors(
158159
Parameters parameters
159160
) {
160-
return Map.of(NeuralQueryEnricherProcessor.TYPE, new NeuralQueryEnricherProcessor.Factory());
161+
return Map.of(
162+
NeuralQueryEnricherProcessor.TYPE,
163+
new NeuralQueryEnricherProcessor.Factory(),
164+
NeuralSparseTwoPhaseProcessor.TYPE,
165+
new NeuralSparseTwoPhaseProcessor.Factory()
166+
);
161167
}
162168

163169
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package org.opensearch.neuralsearch.processor;
6+
7+
import com.google.common.collect.ArrayListMultimap;
8+
import com.google.common.collect.Multimap;
9+
import lombok.Getter;
10+
import lombok.Setter;
11+
import org.opensearch.action.search.SearchRequest;
12+
import org.opensearch.common.collect.Tuple;
13+
import org.opensearch.index.query.BoolQueryBuilder;
14+
import org.opensearch.index.query.QueryBuilder;
15+
import org.opensearch.ingest.ConfigurationUtils;
16+
import org.opensearch.neuralsearch.query.NeuralSparseQueryBuilder;
17+
import org.opensearch.search.builder.SearchSourceBuilder;
18+
import org.opensearch.search.pipeline.AbstractProcessor;
19+
import org.opensearch.search.pipeline.Processor;
20+
import org.opensearch.search.pipeline.SearchRequestProcessor;
21+
import org.opensearch.search.rescore.QueryRescorerBuilder;
22+
import org.opensearch.search.rescore.RescorerBuilder;
23+
24+
import java.util.Collections;
25+
import java.util.Locale;
26+
import java.util.Map;
27+
import java.util.Objects;
28+
import java.util.stream.Collectors;
29+
30+
/**
31+
* A SearchRequestProcessor to generate two-phase NeuralSparseQueryBuilder,
32+
* and add it to the Rescore of a searchRequest.
33+
*/
34+
@Setter
35+
@Getter
36+
public class NeuralSparseTwoPhaseProcessor extends AbstractProcessor implements SearchRequestProcessor {
37+
38+
public static final String TYPE = "neural_sparse_two_phase_processor";
39+
private boolean enabled;
40+
private float ratio;
41+
private float windowExpansion;
42+
private int maxWindowSize;
43+
private static final String PARAMETER_KEY = "two_phase_parameter";
44+
private static final String RATIO_KEY = "prune_ratio";
45+
private static final String ENABLE_KEY = "enabled";
46+
private static final String EXPANSION_KEY = "expansion_rate";
47+
private static final String MAX_WINDOW_SIZE_KEY = "max_window_size";
48+
private static final boolean DEFAULT_ENABLED = true;
49+
private static final float DEFAULT_RATIO = 0.4f;
50+
private static final float DEFAULT_WINDOW_EXPANSION = 5.0f;
51+
private static final int DEFAULT_MAX_WINDOW_SIZE = 10000;
52+
private static final int DEFAULT_BASE_QUERY_SIZE = 10;
53+
private static final int MAX_WINDOWS_SIZE_LOWER_BOUND = 50;
54+
private static final float WINDOW_EXPANSION_LOWER_BOUND = 1.0f;
55+
private static final float RATIO_LOWER_BOUND = 0f;
56+
private static final float RATIO_UPPER_BOUND = 1f;
57+
58+
protected NeuralSparseTwoPhaseProcessor(
59+
String tag,
60+
String description,
61+
boolean ignoreFailure,
62+
boolean enabled,
63+
float ratio,
64+
float windowExpansion,
65+
int maxWindowSize
66+
) {
67+
super(tag, description, ignoreFailure);
68+
this.enabled = enabled;
69+
if (ratio < RATIO_LOWER_BOUND || ratio > RATIO_UPPER_BOUND) {
70+
throw new IllegalArgumentException(
71+
String.format(Locale.ROOT, "The two_phase_parameter.prune_ratio must be within [0, 1]. Received: %f", ratio)
72+
);
73+
}
74+
this.ratio = ratio;
75+
if (windowExpansion < WINDOW_EXPANSION_LOWER_BOUND) {
76+
throw new IllegalArgumentException(
77+
String.format(Locale.ROOT, "The two_phase_parameter.expansion_rate must >= 1.0. Received: %f", windowExpansion)
78+
);
79+
}
80+
this.windowExpansion = windowExpansion;
81+
if (maxWindowSize < MAX_WINDOWS_SIZE_LOWER_BOUND) {
82+
throw new IllegalArgumentException(
83+
String.format(Locale.ROOT, "The two_phase_parameter.max_window_size must >= 50. Received: %n" + maxWindowSize)
84+
);
85+
}
86+
this.maxWindowSize = maxWindowSize;
87+
}
88+
89+
/**
90+
* Process the search request of neural_sparse_two_phase_processor
91+
* @param request the search request (which may have been modified by an earlier processor)
92+
* @return request the search request that add the two-phase rescore query of neural sparse query.
93+
*/
94+
@Override
95+
public SearchRequest processRequest(final SearchRequest request) {
96+
if (!enabled || ratio == 0f) {
97+
return request;
98+
}
99+
QueryBuilder queryBuilder = request.source().query();
100+
// Collect the nested NeuralSparseQueryBuilder in the whole query.
101+
Multimap<NeuralSparseQueryBuilder, Float> queryBuilderMap;
102+
queryBuilderMap = collectNeuralSparseQueryBuilder(queryBuilder, 1.0f);
103+
if (queryBuilderMap.isEmpty()) {
104+
return request;
105+
}
106+
// Make a nestedQueryBuilder which includes all the two-phase QueryBuilder.
107+
QueryBuilder nestedTwoPhaseQueryBuilder = getNestedQueryBuilderFromNeuralSparseQueryBuilderMap(queryBuilderMap);
108+
nestedTwoPhaseQueryBuilder.boost(getOriginQueryWeightAfterRescore(request.source()));
109+
// Add it to the rescorer.
110+
RescorerBuilder<QueryRescorerBuilder> twoPhaseRescorer = buildRescoreQueryBuilderForTwoPhase(nestedTwoPhaseQueryBuilder, request);
111+
request.source().addRescorer(twoPhaseRescorer);
112+
return request;
113+
}
114+
115+
@Override
116+
public String getType() {
117+
return TYPE;
118+
}
119+
120+
/**
121+
* Based on ratio, split a Map into two map by the value.
122+
*
123+
* @param queryTokens the queryTokens map, key is the token String, value is the score.
124+
* @param thresholdRatio The ratio that control how tokens map be split.
125+
* @return A tuple has two element, { token map whose value above threshold, token map whose value below threshold }
126+
*/
127+
public static Tuple<Map<String, Float>, Map<String, Float>> splitQueryTokensByRatioedMaxScoreAsThreshold(
128+
final Map<String, Float> queryTokens,
129+
final float thresholdRatio
130+
) {
131+
if (Objects.isNull(queryTokens)) {
132+
throw new IllegalArgumentException("Query tokens cannot be null or empty.");
133+
}
134+
float max = 0f;
135+
for (Float value : queryTokens.values()) {
136+
max = Math.max(value, max);
137+
}
138+
float threshold = max * thresholdRatio;
139+
140+
Map<Boolean, Map<String, Float>> queryTokensByScore = queryTokens.entrySet()
141+
.stream()
142+
.collect(
143+
Collectors.partitioningBy(entry -> entry.getValue() >= threshold, Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
144+
);
145+
146+
Map<String, Float> highScoreTokens = queryTokensByScore.get(Boolean.TRUE);
147+
Map<String, Float> lowScoreTokens = queryTokensByScore.get(Boolean.FALSE);
148+
if (Objects.isNull(highScoreTokens)) {
149+
highScoreTokens = Collections.emptyMap();
150+
}
151+
if (Objects.isNull(lowScoreTokens)) {
152+
lowScoreTokens = Collections.emptyMap();
153+
}
154+
return Tuple.tuple(highScoreTokens, lowScoreTokens);
155+
}
156+
157+
private QueryBuilder getNestedQueryBuilderFromNeuralSparseQueryBuilderMap(
158+
final Multimap<NeuralSparseQueryBuilder, Float> queryBuilderFloatMap
159+
) {
160+
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
161+
queryBuilderFloatMap.asMap().forEach((neuralSparseQueryBuilder, boosts) -> {
162+
float reduceBoost = boosts.stream().reduce(0.0f, Float::sum);
163+
boolQueryBuilder.should(neuralSparseQueryBuilder.boost(reduceBoost));
164+
});
165+
return boolQueryBuilder;
166+
}
167+
168+
private float getOriginQueryWeightAfterRescore(final SearchSourceBuilder searchSourceBuilder) {
169+
if (Objects.isNull(searchSourceBuilder.rescores())) {
170+
return 1.0f;
171+
}
172+
return searchSourceBuilder.rescores()
173+
.stream()
174+
.map(rescorerBuilder -> ((QueryRescorerBuilder) rescorerBuilder).getQueryWeight())
175+
.reduce(1.0f, (a, b) -> a * b);
176+
}
177+
178+
private Multimap<NeuralSparseQueryBuilder, Float> collectNeuralSparseQueryBuilder(final QueryBuilder queryBuilder, float baseBoost) {
179+
Multimap<NeuralSparseQueryBuilder, Float> result = ArrayListMultimap.create();
180+
181+
if (queryBuilder instanceof BoolQueryBuilder) {
182+
BoolQueryBuilder boolQueryBuilder = (BoolQueryBuilder) queryBuilder;
183+
float updatedBoost = baseBoost * boolQueryBuilder.boost();
184+
for (QueryBuilder subQuery : boolQueryBuilder.should()) {
185+
Multimap<NeuralSparseQueryBuilder, Float> subResult = collectNeuralSparseQueryBuilder(subQuery, updatedBoost);
186+
result.putAll(subResult);
187+
}
188+
} else if (queryBuilder instanceof NeuralSparseQueryBuilder) {
189+
NeuralSparseQueryBuilder neuralSparseQueryBuilder = (NeuralSparseQueryBuilder) queryBuilder;
190+
float updatedBoost = baseBoost * neuralSparseQueryBuilder.boost();
191+
/*
192+
* We obtain a copied modifiedQueryBuilder from the valid origin NeuralSparseQueryBuilder. After this,
193+
* when the original NeuralSparseQueryBuilder starts to rewrite, it will only retain the tokens that
194+
* have higher scores (controlled by the maxScore * ratio). The tokens with lower scores will be
195+
* passed to the modifiedQueryBuilder's queryTokenSupplier.
196+
*
197+
* By doing this, we reduce the score computation time for the original NeuralSparseQueryBuilder,
198+
* and use the modifiedQueryBuilder to make a score increment on TopDocs.
199+
*
200+
* When 2-phase is enabled:
201+
* - Docs besides TopDocs: Score = HighScoreToken's score
202+
* - Final TopDocs: Score = HighScoreToken's score + LowScoreToken's score
203+
*/
204+
NeuralSparseQueryBuilder modifiedQueryBuilder = neuralSparseQueryBuilder.getCopyNeuralSparseQueryBuilderForTwoPhase(ratio);
205+
result.put(modifiedQueryBuilder, updatedBoost);
206+
}
207+
// We only support BoostQuery, BooleanQuery and NeuralSparseQuery now. For other compound query type which are not support now, will
208+
// do nothing and just quit.
209+
return result;
210+
}
211+
212+
private RescorerBuilder<QueryRescorerBuilder> buildRescoreQueryBuilderForTwoPhase(
213+
final QueryBuilder nestedTwoPhaseQueryBuilder,
214+
final SearchRequest searchRequest
215+
) {
216+
RescorerBuilder<QueryRescorerBuilder> twoPhaseRescorer = new QueryRescorerBuilder(nestedTwoPhaseQueryBuilder);
217+
int requestSize = searchRequest.source().size();
218+
int windowSize = (int) ((requestSize == -1 ? DEFAULT_BASE_QUERY_SIZE : requestSize) * windowExpansion);
219+
if (windowSize > maxWindowSize || windowSize < 0) {
220+
throw new IllegalArgumentException(
221+
String.format(
222+
Locale.ROOT,
223+
"The two-phase window size of neural_sparse_two_phase_processor should be [0,%d], but get the value of %d",
224+
maxWindowSize,
225+
windowSize
226+
)
227+
);
228+
}
229+
twoPhaseRescorer.windowSize(windowSize);
230+
return twoPhaseRescorer;
231+
}
232+
233+
/**
234+
* Factory to create NeuralSparseTwoPhaseProcessor, provide default parameter,
235+
*
236+
*/
237+
public static class Factory implements Processor.Factory<SearchRequestProcessor> {
238+
@Override
239+
public NeuralSparseTwoPhaseProcessor create(
240+
Map<String, Processor.Factory<SearchRequestProcessor>> processorFactories,
241+
String tag,
242+
String description,
243+
boolean ignoreFailure,
244+
Map<String, Object> config,
245+
PipelineContext pipelineContext
246+
) throws IllegalArgumentException {
247+
248+
boolean enabled = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, ENABLE_KEY, DEFAULT_ENABLED);
249+
Map<String, Object> twoPhaseConfigMap = ConfigurationUtils.readOptionalMap(TYPE, tag, config, PARAMETER_KEY);
250+
251+
float ratio = DEFAULT_RATIO;
252+
float windowExpansion = DEFAULT_WINDOW_EXPANSION;
253+
int maxWindowSize = DEFAULT_MAX_WINDOW_SIZE;
254+
if (Objects.nonNull(twoPhaseConfigMap)) {
255+
ratio = ((Number) twoPhaseConfigMap.getOrDefault(RATIO_KEY, ratio)).floatValue();
256+
windowExpansion = ((Number) twoPhaseConfigMap.getOrDefault(EXPANSION_KEY, windowExpansion)).floatValue();
257+
maxWindowSize = ((Number) twoPhaseConfigMap.getOrDefault(MAX_WINDOW_SIZE_KEY, maxWindowSize)).intValue();
258+
}
259+
260+
return new NeuralSparseTwoPhaseProcessor(tag, description, ignoreFailure, enabled, ratio, windowExpansion, maxWindowSize);
261+
}
262+
}
263+
264+
}

0 commit comments

Comments
 (0)