Skip to content

Commit e9b6a8d

Browse files
authored
Add capability to disable source recovery_source for an index (opensearch-project#13590)
Signed-off-by: Navneet Verma <navneev@amazon.com>
1 parent b9ca5a8 commit e9b6a8d

File tree

4 files changed

+385
-13
lines changed

4 files changed

+385
-13
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1515
- Add getMetadataFields to MapperService ([#13819](https://github.com/opensearch-project/OpenSearch/pull/13819))
1616
- [Remote State] Add async remote state deletion task running on an interval, configurable by a setting ([#13131](https://github.com/opensearch-project/OpenSearch/pull/13131))
1717
- Allow setting query parameters on requests ([#13776](https://github.com/opensearch-project/OpenSearch/issues/13776))
18+
- Add capability to disable source recovery_source for an index ([#13590](https://github.com/opensearch-project/OpenSearch/pull/13590))
1819
- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304))
1920
- [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027))
2021

server/src/internalClusterTest/java/org/opensearch/recovery/FullRollingRestartIT.java

+143
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
import org.opensearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
3838
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
39+
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
3940
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
4041
import org.opensearch.cluster.ClusterState;
4142
import org.opensearch.cluster.metadata.IndexMetadata;
@@ -45,6 +46,8 @@
4546
import org.opensearch.common.collect.MapBuilder;
4647
import org.opensearch.common.settings.Settings;
4748
import org.opensearch.common.unit.TimeValue;
49+
import org.opensearch.common.xcontent.XContentFactory;
50+
import org.opensearch.core.xcontent.XContentBuilder;
4851
import org.opensearch.indices.recovery.RecoveryState;
4952
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
5053
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
@@ -253,4 +256,144 @@ public void testNoRebalanceOnRollingRestart() throws Exception {
253256
);
254257
}
255258
}
259+
260+
public void testFullRollingRestart_withNoRecoveryPayloadAndSource() throws Exception {
261+
internalCluster().startNode();
262+
XContentBuilder builder = XContentFactory.jsonBuilder()
263+
.startObject()
264+
.startObject("_source")
265+
.field("enabled")
266+
.value(false)
267+
.field("recovery_source_enabled")
268+
.value(false)
269+
.endObject()
270+
.endObject();
271+
CreateIndexResponse response = prepareCreate("test").setMapping(builder).get();
272+
logger.info("Create index response is : {}", response);
273+
274+
final String healthTimeout = "1m";
275+
276+
for (int i = 0; i < 1000; i++) {
277+
client().prepareIndex("test")
278+
.setId(Long.toString(i))
279+
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + i).map())
280+
.execute()
281+
.actionGet();
282+
}
283+
284+
for (int i = 1000; i < 2000; i++) {
285+
client().prepareIndex("test")
286+
.setId(Long.toString(i))
287+
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + i).map())
288+
.execute()
289+
.actionGet();
290+
}
291+
// ensuring all docs are committed to file system
292+
flush();
293+
294+
logger.info("--> now start adding nodes");
295+
internalCluster().startNode();
296+
internalCluster().startNode();
297+
298+
// make sure the cluster state is green, and all has been recovered
299+
assertTimeout(
300+
client().admin()
301+
.cluster()
302+
.prepareHealth()
303+
.setWaitForEvents(Priority.LANGUID)
304+
.setTimeout(healthTimeout)
305+
.setWaitForGreenStatus()
306+
.setWaitForNoRelocatingShards(true)
307+
.setWaitForNodes("3")
308+
);
309+
310+
logger.info("--> add two more nodes");
311+
internalCluster().startNode();
312+
internalCluster().startNode();
313+
314+
// make sure the cluster state is green, and all has been recovered
315+
assertTimeout(
316+
client().admin()
317+
.cluster()
318+
.prepareHealth()
319+
.setWaitForEvents(Priority.LANGUID)
320+
.setTimeout(healthTimeout)
321+
.setWaitForGreenStatus()
322+
.setWaitForNoRelocatingShards(true)
323+
.setWaitForNodes("5")
324+
);
325+
326+
logger.info("--> refreshing and checking data");
327+
refreshAndWaitForReplication();
328+
for (int i = 0; i < 10; i++) {
329+
assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2000L);
330+
}
331+
332+
// now start shutting nodes down
333+
internalCluster().stopRandomDataNode();
334+
// make sure the cluster state is green, and all has been recovered
335+
assertTimeout(
336+
client().admin()
337+
.cluster()
338+
.prepareHealth()
339+
.setWaitForEvents(Priority.LANGUID)
340+
.setTimeout(healthTimeout)
341+
.setWaitForGreenStatus()
342+
.setWaitForNoRelocatingShards(true)
343+
.setWaitForNodes("4")
344+
);
345+
346+
internalCluster().stopRandomDataNode();
347+
// make sure the cluster state is green, and all has been recovered
348+
assertTimeout(
349+
client().admin()
350+
.cluster()
351+
.prepareHealth()
352+
.setWaitForEvents(Priority.LANGUID)
353+
.setTimeout(healthTimeout)
354+
.setWaitForGreenStatus()
355+
.setWaitForNoRelocatingShards(true)
356+
.setWaitForNodes("3")
357+
);
358+
359+
logger.info("--> stopped two nodes, verifying data");
360+
refreshAndWaitForReplication();
361+
for (int i = 0; i < 10; i++) {
362+
assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2000L);
363+
}
364+
365+
// closing the 3rd node
366+
internalCluster().stopRandomDataNode();
367+
// make sure the cluster state is green, and all has been recovered
368+
assertTimeout(
369+
client().admin()
370+
.cluster()
371+
.prepareHealth()
372+
.setWaitForEvents(Priority.LANGUID)
373+
.setTimeout(healthTimeout)
374+
.setWaitForGreenStatus()
375+
.setWaitForNoRelocatingShards(true)
376+
.setWaitForNodes("2")
377+
);
378+
379+
internalCluster().stopRandomDataNode();
380+
381+
// make sure the cluster state is yellow, and all has been recovered
382+
assertTimeout(
383+
client().admin()
384+
.cluster()
385+
.prepareHealth()
386+
.setWaitForEvents(Priority.LANGUID)
387+
.setTimeout(healthTimeout)
388+
.setWaitForYellowStatus()
389+
.setWaitForNoRelocatingShards(true)
390+
.setWaitForNodes("1")
391+
);
392+
393+
logger.info("--> one node left, verifying data");
394+
refreshAndWaitForReplication();
395+
for (int i = 0; i < 10; i++) {
396+
assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2000L);
397+
}
398+
}
256399
}

server/src/main/java/org/opensearch/index/mapper/SourceFieldMapper.java

+105-12
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public class SourceFieldMapper extends MetadataFieldMapper {
7474

7575
public static final String CONTENT_TYPE = "_source";
7676
private final Function<Map<String, ?>, Map<String, Object>> filter;
77+
private final Function<Map<String, ?>, Map<String, Object>> recoverySourceFilter;
7778

7879
/**
7980
* Default parameters for source fields
@@ -119,21 +120,75 @@ public static class Builder extends MetadataFieldMapper.Builder {
119120
Collections.emptyList()
120121
);
121122

123+
/**
124+
* A mapping parameter which define whether the recovery_source should be added or not. Default value is true.
125+
* <p>
126+
* Recovery source gets added if source is disabled or there are filters that are applied on _source using
127+
* {@link #includes}/{@link #excludes}, which has the possibility to change the original document provided by
128+
* customer. Recovery source is not a permanent field and gets removed during merges. Refer this merge
129+
* policy: org.opensearch.index.engine.RecoverySourcePruneMergePolicy
130+
* <p>
131+
* The main reason for adding the _recovery_source was to ensure Peer to Peer recovery if segments
132+
* are not flushed to the disk. If you are disabling the recovery source, then ensure that you are calling
133+
* flush operation of Opensearch periodically to ensure that segments are flushed to the disk and if required
134+
* Peer to Peer recovery can happen using segment files rather than replaying traffic by querying Lucene
135+
* snapshot.
136+
*
137+
* <p>
138+
* This is an expert mapping parameter.
139+
*
140+
*/
141+
private final Parameter<Boolean> recoverySourceEnabled = Parameter.boolParam(
142+
"recovery_source_enabled",
143+
false,
144+
m -> toType(m).recoverySourceEnabled,
145+
Defaults.ENABLED
146+
);
147+
148+
/**
149+
* Provides capability to add specific fields in the recovery_source.
150+
* <p>
151+
* Refer {@link #recoverySourceEnabled} for more details
152+
* This is an expert parameter.
153+
*/
154+
private final Parameter<List<String>> recoverySourceIncludes = Parameter.stringArrayParam(
155+
"recovery_source_includes",
156+
false,
157+
m -> Arrays.asList(toType(m).recoverySourceIncludes),
158+
Collections.emptyList()
159+
);
160+
161+
/**
162+
* Provides capability to remove specific fields in the recovery_source.
163+
*
164+
* Refer {@link #recoverySourceEnabled} for more details
165+
* This is an expert parameter.
166+
*/
167+
private final Parameter<List<String>> recoverySourceExcludes = Parameter.stringArrayParam(
168+
"recovery_source_excludes",
169+
false,
170+
m -> Arrays.asList(toType(m).recoverySourceExcludes),
171+
Collections.emptyList()
172+
);
173+
122174
public Builder() {
123175
super(Defaults.NAME);
124176
}
125177

126178
@Override
127179
protected List<Parameter<?>> getParameters() {
128-
return Arrays.asList(enabled, includes, excludes);
180+
return Arrays.asList(enabled, includes, excludes, recoverySourceEnabled, recoverySourceIncludes, recoverySourceExcludes);
129181
}
130182

131183
@Override
132184
public SourceFieldMapper build(BuilderContext context) {
133185
return new SourceFieldMapper(
134186
enabled.getValue(),
135187
includes.getValue().toArray(new String[0]),
136-
excludes.getValue().toArray(new String[0])
188+
excludes.getValue().toArray(new String[0]),
189+
recoverySourceEnabled.getValue(),
190+
recoverySourceIncludes.getValue().toArray(new String[0]),
191+
recoverySourceExcludes.getValue().toArray(new String[0])
137192
);
138193
}
139194
}
@@ -173,24 +228,44 @@ public Query termQuery(Object value, QueryShardContext context) {
173228
}
174229

175230
private final boolean enabled;
231+
private final boolean recoverySourceEnabled;
176232
/** indicates whether the source will always exist and be complete, for use by features like the update API */
177233
private final boolean complete;
178234

179235
private final String[] includes;
180236
private final String[] excludes;
237+
private final String[] recoverySourceIncludes;
238+
private final String[] recoverySourceExcludes;
181239

182240
private SourceFieldMapper() {
183-
this(Defaults.ENABLED, Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY);
241+
this(Defaults.ENABLED, Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY, Defaults.ENABLED, Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY);
184242
}
185243

186-
private SourceFieldMapper(boolean enabled, String[] includes, String[] excludes) {
244+
private SourceFieldMapper(
245+
boolean enabled,
246+
String[] includes,
247+
String[] excludes,
248+
boolean recoverySourceEnabled,
249+
String[] recoverySourceIncludes,
250+
String[] recoverySourceExcludes
251+
) {
187252
super(new SourceFieldType(enabled));
188253
this.enabled = enabled;
189254
this.includes = includes;
190255
this.excludes = excludes;
191256
final boolean filtered = CollectionUtils.isEmpty(includes) == false || CollectionUtils.isEmpty(excludes) == false;
192257
this.filter = enabled && filtered ? XContentMapValues.filter(includes, excludes) : null;
193258
this.complete = enabled && CollectionUtils.isEmpty(includes) && CollectionUtils.isEmpty(excludes);
259+
260+
// Set parameters for recovery source
261+
this.recoverySourceEnabled = recoverySourceEnabled;
262+
this.recoverySourceIncludes = recoverySourceIncludes;
263+
this.recoverySourceExcludes = recoverySourceExcludes;
264+
final boolean recoverySourcefiltered = CollectionUtils.isEmpty(recoverySourceIncludes) == false
265+
|| CollectionUtils.isEmpty(recoverySourceExcludes) == false;
266+
this.recoverySourceFilter = this.recoverySourceEnabled && recoverySourcefiltered
267+
? XContentMapValues.filter(recoverySourceIncludes, recoverySourceExcludes)
268+
: null;
194269
}
195270

196271
public boolean enabled() {
@@ -212,22 +287,40 @@ public void preParse(ParseContext context) throws IOException {
212287
context.doc().add(new StoredField(fieldType().name(), ref.bytes, ref.offset, ref.length));
213288
}
214289

215-
if (originalSource != null && adaptedSource != originalSource) {
216-
// if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery
217-
BytesRef ref = originalSource.toBytesRef();
218-
context.doc().add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length));
219-
context.doc().add(new NumericDocValuesField(RECOVERY_SOURCE_NAME, 1));
290+
if (recoverySourceEnabled) {
291+
if (originalSource != null && adaptedSource != originalSource) {
292+
final BytesReference adaptedRecoverySource = applyFilters(
293+
originalSource,
294+
contentType,
295+
recoverySourceEnabled,
296+
recoverySourceFilter
297+
);
298+
// if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery
299+
BytesRef ref = adaptedRecoverySource.toBytesRef();
300+
context.doc().add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length));
301+
context.doc().add(new NumericDocValuesField(RECOVERY_SOURCE_NAME, 1));
302+
}
220303
}
221304
}
222305

223306
@Nullable
224307
public BytesReference applyFilters(@Nullable BytesReference originalSource, @Nullable MediaType contentType) throws IOException {
225-
if (enabled && originalSource != null) {
308+
return applyFilters(originalSource, contentType, enabled, filter);
309+
}
310+
311+
@Nullable
312+
private BytesReference applyFilters(
313+
@Nullable BytesReference originalSource,
314+
@Nullable MediaType contentType,
315+
boolean isProvidedSourceEnabled,
316+
@Nullable final Function<Map<String, ?>, Map<String, Object>> filters
317+
) throws IOException {
318+
if (isProvidedSourceEnabled && originalSource != null) {
226319
// Percolate and tv APIs may not set the source and that is ok, because these APIs will not index any data
227-
if (filter != null) {
320+
if (filters != null) {
228321
// we don't update the context source if we filter, we want to keep it as is...
229322
Tuple<? extends MediaType, Map<String, Object>> mapTuple = XContentHelper.convertToMap(originalSource, true, contentType);
230-
Map<String, Object> filteredSource = filter.apply(mapTuple.v2());
323+
Map<String, Object> filteredSource = filters.apply(mapTuple.v2());
231324
BytesStreamOutput bStream = new BytesStreamOutput();
232325
MediaType actualContentType = mapTuple.v1();
233326
XContentBuilder builder = MediaTypeRegistry.contentBuilder(actualContentType, bStream).map(filteredSource);

0 commit comments

Comments
 (0)