Skip to content

Commit 7e5b243

Browse files
committed
Coordinator can return partial results after the timeout when allow_partial_search_results is true
Signed-off-by: kkewwei <kewei.11@bytedance.com> Signed-off-by: kkewwei <kkewwei@163.com>
1 parent 42dc22e commit 7e5b243

15 files changed

+474
-106
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2121
- Support prefix list for remote repository attributes([#16271](https://github.com/opensearch-project/OpenSearch/pull/16271))
2222
- Add new configuration setting `synonym_analyzer`, to the `synonym` and `synonym_graph` filters, enabling the specification of a custom analyzer for reading the synonym file ([#16488](https://github.com/opensearch-project/OpenSearch/pull/16488)).
2323
- Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/))
24+
- Coordinator can return partial results after the timeout when allow_partial_search_results is true ([#16681](https://github.com/opensearch-project/OpenSearch/pull/16681)).
2425

2526
### Dependencies
2627
- Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search;
10+
11+
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
12+
13+
import org.opensearch.action.search.MultiSearchResponse;
14+
import org.opensearch.action.search.SearchPhaseExecutionException;
15+
import org.opensearch.action.search.SearchResponse;
16+
import org.opensearch.action.search.ShardSearchFailure;
17+
import org.opensearch.common.action.ActionFuture;
18+
import org.opensearch.common.settings.Settings;
19+
import org.opensearch.common.unit.TimeValue;
20+
import org.opensearch.plugins.Plugin;
21+
import org.opensearch.plugins.PluginsService;
22+
import org.opensearch.script.Script;
23+
import org.opensearch.script.ScriptType;
24+
import org.opensearch.test.OpenSearchIntegTestCase;
25+
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
26+
import org.opensearch.transport.ReceiveTimeoutTransportException;
27+
28+
import java.util.ArrayList;
29+
import java.util.Arrays;
30+
import java.util.Collection;
31+
import java.util.Collections;
32+
import java.util.HashSet;
33+
import java.util.List;
34+
import java.util.Set;
35+
import java.util.concurrent.TimeUnit;
36+
37+
import static org.opensearch.index.query.QueryBuilders.scriptQuery;
38+
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
39+
40+
@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 2, numClientNodes = 0)
41+
public class CoordinatorTimeoutIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {
42+
43+
private long coordinatorTimeoutMills = 500;
44+
45+
public CoordinatorTimeoutIT(Settings nodeSettings) {
46+
super(nodeSettings);
47+
}
48+
49+
@ParametersFactory
50+
public static Collection<Object[]> parameters() {
51+
return Arrays.asList(
52+
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
53+
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
54+
);
55+
}
56+
57+
@Override
58+
protected Collection<Class<? extends Plugin>> nodePlugins() {
59+
return Collections.singleton(ScriptedBlockPlugin.class);
60+
}
61+
62+
public void testTimeoutDuringQueryPhase() throws Exception {
63+
int dataNumber = internalCluster().numDataNodes();
64+
createIndex("test", Settings.builder().put("index.number_of_shards", dataNumber).put("index.number_of_replicas", 0).build());
65+
66+
List<ScriptedBlockPlugin> plugins = initBlockFactory();
67+
indexTestData(client());
68+
TimeValue coordinatorTimeout = new TimeValue(coordinatorTimeoutMills, TimeUnit.MILLISECONDS);
69+
ActionFuture<SearchResponse> searchResponseFuture = client().prepareSearch("test")
70+
.setCoordinatorTimeout(coordinatorTimeout)
71+
.setAllowPartialSearchResults(true)
72+
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())))
73+
.execute();
74+
awaitForBlock(plugins);
75+
logger.info("begin to sleep for " + coordinatorTimeout.getMillis() + " ms");
76+
Thread.sleep(coordinatorTimeout.getMillis() + 100);
77+
logger.info("wake up");
78+
disableBlocks(plugins);
79+
SearchResponse searchResponse = searchResponseFuture.get();
80+
assertEquals(1, searchResponse.getSuccessfulShards());
81+
verifyFailedException(searchResponse.getShardFailures());
82+
// wait in-flight contexts to finish
83+
Thread.sleep(100);
84+
}
85+
86+
public void testMSearchChildRequestTimeout() throws Exception {
87+
int dataNumber = internalCluster().numDataNodes();
88+
createIndex("test", Settings.builder().put("index.number_of_shards", dataNumber).put("index.number_of_replicas", 0).build());
89+
90+
List<ScriptedBlockPlugin> plugins = initBlockFactory();
91+
indexTestData(client());
92+
93+
TimeValue coordinatorTimeout = new TimeValue(coordinatorTimeoutMills, TimeUnit.MILLISECONDS);
94+
ActionFuture<MultiSearchResponse> mSearchResponse = client().prepareMultiSearch()
95+
.add(
96+
client().prepareSearch("test")
97+
.setAllowPartialSearchResults(true)
98+
.setRequestCache(false)
99+
.setCoordinatorTimeout(coordinatorTimeout)
100+
.setQuery(
101+
scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))
102+
)
103+
)
104+
.add(
105+
client().prepareSearch("test")
106+
.setAllowPartialSearchResults(true)
107+
.setRequestCache(false)
108+
.setQuery(
109+
scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))
110+
)
111+
)
112+
.execute();
113+
awaitForBlock(plugins);
114+
Thread.sleep(coordinatorTimeout.getMillis() + 100);
115+
// unblock the search thread
116+
disableBlocks(plugins);
117+
// one child request is expected to fail
118+
final Set<Integer> expectedFailedRequests = new HashSet<>();
119+
expectedFailedRequests.add(0);
120+
ensureMSearchThrowException(mSearchResponse, expectedFailedRequests);
121+
// wait in-flight contexts to finish
122+
Thread.sleep(100);
123+
}
124+
125+
private void verifyFailedException(ShardSearchFailure[] shardFailures) {
126+
for (ShardSearchFailure shardFailure : shardFailures) {
127+
final Throwable topFailureCause = shardFailure.getCause();
128+
assertTrue(shardFailure.toString(), topFailureCause instanceof ReceiveTimeoutTransportException);
129+
}
130+
}
131+
132+
private void ensureMSearchThrowException(ActionFuture<MultiSearchResponse> mSearchResponse, Set<Integer> expectedFailedChildRequests) {
133+
MultiSearchResponse response = mSearchResponse.actionGet();
134+
Set<Integer> actualFailedChildRequests = new HashSet<>();
135+
for (int i = 0; i < response.getResponses().length; ++i) {
136+
SearchResponse sResponse = response.getResponses()[i].getResponse();
137+
// check if response is null means all the shard failed for this search request
138+
if (sResponse == null) {
139+
Exception ex = response.getResponses()[i].getFailure();
140+
assertTrue(ex instanceof SearchPhaseExecutionException);
141+
verifyFailedException(((SearchPhaseExecutionException) ex).shardFailures());
142+
actualFailedChildRequests.add(i);
143+
144+
} else if (sResponse.getShardFailures().length > 0) {
145+
verifyFailedException(sResponse.getShardFailures());
146+
actualFailedChildRequests.add(i);
147+
}
148+
}
149+
assertEquals(
150+
"Actual child request with timeout failure is different that expected",
151+
expectedFailedChildRequests,
152+
actualFailedChildRequests
153+
);
154+
}
155+
156+
private List<ScriptedBlockPlugin> initBlockFactory() {
157+
List<ScriptedBlockPlugin> plugins = new ArrayList<>();
158+
boolean notBlockFirst = true;
159+
for (PluginsService pluginsService : internalCluster().getDataNodeInstances(PluginsService.class)) {
160+
List<ScriptedBlockPlugin> scriptedBlockPlugins = pluginsService.filterPlugins(ScriptedBlockPlugin.class);
161+
for (ScriptedBlockPlugin plugin : scriptedBlockPlugins) {
162+
plugin.reset();
163+
// just block the first node
164+
if (notBlockFirst) {
165+
notBlockFirst = false;
166+
// default is enable block
167+
plugin.disableBlock();
168+
} else {
169+
plugin.enableBlock();
170+
}
171+
}
172+
plugins.addAll(scriptedBlockPlugins);
173+
}
174+
return plugins;
175+
}
176+
177+
}

0 commit comments

Comments
 (0)