Skip to content

Commit 6b60f22

Browse files
Adding support for append only indices (#17039) (#17158)
(cherry picked from commit 5e12737) Signed-off-by: RS146BIJAY <rishavsagar4b1@gmail.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 46d327d commit 6b60f22

File tree

9 files changed

+428
-28
lines changed

9 files changed

+428
-28
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2121
- Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/))
2222
- Update script supports java.lang.String.sha1() and java.lang.String.sha256() methods ([#16923](https://github.com/opensearch-project/OpenSearch/pull/16923))
2323
- Added a precaution to handle extreme date values during sorting to prevent `arithmetic_exception: long overflow` ([#16812](https://github.com/opensearch-project/OpenSearch/pull/16812)).
24+
- Add support for append only indices([#17039](https://github.com/opensearch-project/OpenSearch/pull/17039))
2425
- Add `verbose_pipeline` parameter to output each processor's execution details ([#16843](https://github.com/opensearch-project/OpenSearch/pull/16843)).
2526
- Add search replica stats to segment replication stats API ([#16678](https://github.com/opensearch-project/OpenSearch/pull/16678))
2627
- Introduce framework for auxiliary transports and an experimental gRPC transport plugin ([#16534](https://github.com/opensearch-project/OpenSearch/pull/16534))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/*
10+
* Licensed to Elasticsearch under one or more contributor
11+
* license agreements. See the NOTICE file distributed with
12+
* this work for additional information regarding copyright
13+
* ownership. Elasticsearch licenses this file to you under
14+
* the Apache License, Version 2.0 (the "License"); you may
15+
* not use this file except in compliance with the License.
16+
* You may obtain a copy of the License at
17+
*
18+
* http://www.apache.org/licenses/LICENSE-2.0
19+
*
20+
* Unless required by applicable law or agreed to in writing,
21+
* software distributed under the License is distributed on an
22+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
23+
* KIND, either express or implied. See the License for the
24+
* specific language governing permissions and limitations
25+
* under the License.
26+
*/
27+
28+
/*
29+
* Modifications Copyright OpenSearch Contributors. See
30+
* GitHub history for details.
31+
*/
32+
33+
package org.opensearch.core.index;
34+
35+
import org.opensearch.OpenSearchException;
36+
37+
/**
38+
* This exception indicates that retry has been made during indexing for AppendOnly index. If the response of any
39+
* indexing request contains this Exception in the response, we do not need to add a translog entry for this request.
40+
*
41+
* @opensearch.internal
42+
*/
43+
public class AppendOnlyIndexOperationRetryException extends OpenSearchException {
44+
public AppendOnlyIndexOperationRetryException(String message) {
45+
super(message);
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.bulk;
10+
11+
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
12+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
13+
import org.opensearch.action.search.SearchResponse;
14+
import org.opensearch.client.Client;
15+
import org.opensearch.cluster.metadata.IndexMetadata;
16+
import org.opensearch.common.settings.Settings;
17+
import org.opensearch.core.xcontent.XContentBuilder;
18+
import org.opensearch.index.query.QueryBuilders;
19+
import org.opensearch.ingest.IngestTestPlugin;
20+
import org.opensearch.plugins.Plugin;
21+
import org.opensearch.test.OpenSearchIntegTestCase;
22+
import org.opensearch.test.transport.MockTransportService;
23+
import org.opensearch.transport.ConnectTransportException;
24+
import org.opensearch.transport.TransportService;
25+
26+
import java.util.Arrays;
27+
import java.util.Collection;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.stream.Collectors;
31+
32+
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING;
33+
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING;
34+
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
35+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
36+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
37+
import static org.hamcrest.CoreMatchers.equalTo;
38+
import static org.hamcrest.Matchers.containsString;
39+
40+
public class AppendOnlyIndicesIT extends OpenSearchIntegTestCase {
41+
42+
@Override
43+
protected Collection<Class<? extends Plugin>> nodePlugins() {
44+
return Arrays.asList(IngestTestPlugin.class, MockTransportService.TestPlugin.class);
45+
}
46+
47+
public void testIndexDocumentWithACustomDocIdForAppendOnlyIndices() throws Exception {
48+
Client client = internalCluster().coordOnlyNodeClient();
49+
assertAcked(
50+
client().admin()
51+
.indices()
52+
.prepareCreate("index")
53+
.setSettings(
54+
Settings.builder()
55+
.put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
56+
.put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
57+
.put(IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey(), true)
58+
)
59+
);
60+
ensureGreen("index");
61+
62+
BulkRequestBuilder bulkBuilder = client.prepareBulk();
63+
64+
XContentBuilder doc = null;
65+
doc = jsonBuilder().startObject().field("foo", "bar").endObject();
66+
bulkBuilder.add(client.prepareIndex("index").setId(Integer.toString(0)).setSource(doc));
67+
68+
BulkResponse response = bulkBuilder.get();
69+
assertThat(
70+
response.getItems()[0].getFailureMessage(),
71+
containsString(
72+
"Operation [INDEX] is not allowed with a custom document id 0 as setting `"
73+
+ IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey()
74+
+ "` is enabled for this index: index;"
75+
)
76+
);
77+
}
78+
79+
public void testUpdateDeleteDocumentForAppendOnlyIndices() throws Exception {
80+
Client client = internalCluster().coordOnlyNodeClient();
81+
assertAcked(
82+
client().admin()
83+
.indices()
84+
.prepareCreate("index")
85+
.setSettings(
86+
Settings.builder()
87+
.put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
88+
.put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
89+
.put(IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey(), true)
90+
)
91+
);
92+
ensureGreen("index");
93+
94+
BulkRequestBuilder bulkBuilder = client.prepareBulk();
95+
96+
XContentBuilder doc = null;
97+
doc = jsonBuilder().startObject().field("foo", "bar").endObject();
98+
bulkBuilder.add(client.prepareIndex("index").setSource(doc));
99+
100+
bulkBuilder.get();
101+
BulkResponse response = client().prepareBulk().add(client().prepareUpdate("index", "0").setDoc("foo", "updated")).get();
102+
assertThat(
103+
response.getItems()[0].getFailureMessage(),
104+
containsString(
105+
"Operation [UPDATE] is not allowed as setting `"
106+
+ IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey()
107+
+ "` is enabled for this index"
108+
)
109+
);
110+
111+
response = client().prepareBulk().add(client().prepareDelete("index", "0")).get();
112+
assertThat(
113+
response.getItems()[0].getFailureMessage(),
114+
containsString(
115+
"Operation [DELETE] is not allowed as setting `"
116+
+ IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey()
117+
+ "` is enabled for this index"
118+
)
119+
);
120+
}
121+
122+
public void testRetryForAppendOnlyIndices() throws Exception {
123+
final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
124+
int numDocs = scaledRandomIntBetween(100, 1000);
125+
Client client = internalCluster().coordOnlyNodeClient();
126+
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get();
127+
NodeStats unluckyNode = randomFrom(
128+
nodeStats.getNodes().stream().filter((s) -> s.getNode().isDataNode()).collect(Collectors.toList())
129+
);
130+
assertAcked(
131+
client().admin()
132+
.indices()
133+
.prepareCreate("index")
134+
.setSettings(
135+
Settings.builder()
136+
.put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
137+
.put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
138+
.put(IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey(), true)
139+
)
140+
);
141+
ensureGreen("index");
142+
logger.info("unlucky node: {}", unluckyNode.getNode());
143+
// create a transport service that throws a ConnectTransportException for one bulk request and therefore triggers a retry.
144+
for (NodeStats dataNode : nodeStats.getNodes()) {
145+
if (exceptionThrown.get()) {
146+
break;
147+
}
148+
149+
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
150+
TransportService.class,
151+
dataNode.getNode().getName()
152+
));
153+
mockTransportService.addSendBehavior(
154+
internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
155+
(connection, requestId, action, request, options) -> {
156+
connection.sendRequest(requestId, action, request, options);
157+
if (action.equals(TransportShardBulkAction.ACTION_NAME) && exceptionThrown.compareAndSet(false, true)) {
158+
logger.debug("Throw ConnectTransportException");
159+
throw new ConnectTransportException(connection.getNode(), action);
160+
}
161+
}
162+
);
163+
}
164+
165+
BulkRequestBuilder bulkBuilder = client.prepareBulk();
166+
167+
for (int i = 0; i < numDocs; i++) {
168+
XContentBuilder doc = null;
169+
doc = jsonBuilder().startObject().field("foo", "bar").endObject();
170+
bulkBuilder.add(client.prepareIndex("index").setSource(doc));
171+
}
172+
173+
BulkResponse response = bulkBuilder.get();
174+
for (BulkItemResponse singleIndexResponse : response.getItems()) {
175+
// Retry will not create a new version.
176+
assertThat(singleIndexResponse.getVersion(), equalTo(1L));
177+
}
178+
}
179+
180+
public void testNodeReboot() throws Exception {
181+
int numDocs = scaledRandomIntBetween(100, 1000);
182+
Client client = internalCluster().coordOnlyNodeClient();
183+
assertAcked(
184+
client().admin()
185+
.indices()
186+
.prepareCreate("index")
187+
.setSettings(
188+
Settings.builder()
189+
.put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
190+
.put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
191+
.put(IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey(), true)
192+
)
193+
);
194+
195+
ensureGreen("index");
196+
197+
BulkRequestBuilder bulkBuilder = client.prepareBulk();
198+
199+
for (int i = 0; i < numDocs; i++) {
200+
XContentBuilder doc = null;
201+
doc = jsonBuilder().startObject().field("foo", "bar").endObject();
202+
bulkBuilder.add(client.prepareIndex("index").setSource(doc));
203+
}
204+
205+
BulkResponse response = bulkBuilder.get();
206+
assertFalse(response.hasFailures());
207+
internalCluster().restartRandomDataNode();
208+
ensureGreen("index");
209+
refresh();
210+
SearchResponse searchResponse = client().prepareSearch()
211+
.setQuery(QueryBuilders.matchAllQuery())
212+
.setIndices("index")
213+
.setSize(numDocs)
214+
.get();
215+
216+
assertBusy(() -> { assertHitCount(searchResponse, numDocs); }, 20L, TimeUnit.SECONDS);
217+
218+
}
219+
}

server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java

+30-13
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.opensearch.action.index.IndexResponse;
3939
import org.opensearch.action.support.replication.ReplicationResponse;
4040
import org.opensearch.action.support.replication.TransportWriteAction;
41+
import org.opensearch.core.index.AppendOnlyIndexOperationRetryException;
4142
import org.opensearch.index.engine.Engine;
4243
import org.opensearch.index.shard.IndexShard;
4344
import org.opensearch.index.translog.Translog;
@@ -297,20 +298,36 @@ public void markOperationAsExecuted(Engine.Result result) {
297298
locationToSync = TransportWriteAction.locationToSync(locationToSync, result.getTranslogLocation());
298299
break;
299300
case FAILURE:
300-
executionResult = new BulkItemResponse(
301-
current.id(),
302-
docWriteRequest.opType(),
303-
// Make sure to use request.index() here, if you
304-
// use docWriteRequest.index() it will use the
305-
// concrete index instead of an alias if used!
306-
new BulkItemResponse.Failure(
307-
request.index(),
308-
docWriteRequest.id(),
309-
result.getFailure(),
301+
if (result.getFailure() instanceof AppendOnlyIndexOperationRetryException) {
302+
Engine.IndexResult indexResult = (Engine.IndexResult) result;
303+
DocWriteResponse indexResponse = new IndexResponse(
304+
primary.shardId(),
305+
requestToExecute.id(),
310306
result.getSeqNo(),
311-
result.getTerm()
312-
)
313-
);
307+
result.getTerm(),
308+
indexResult.getVersion(),
309+
indexResult.isCreated()
310+
);
311+
312+
executionResult = new BulkItemResponse(current.id(), current.request().opType(), indexResponse);
313+
// set a blank ShardInfo so we can safely send it to the replicas. We won't use it in the real response though.
314+
executionResult.getResponse().setShardInfo(new ReplicationResponse.ShardInfo());
315+
} else {
316+
executionResult = new BulkItemResponse(
317+
current.id(),
318+
docWriteRequest.opType(),
319+
// Make sure to use request.index() here, if you
320+
// use docWriteRequest.index() it will use the
321+
// concrete index instead of an alias if used!
322+
new BulkItemResponse.Failure(
323+
request.index(),
324+
docWriteRequest.id(),
325+
result.getFailure(),
326+
result.getSeqNo(),
327+
result.getTerm()
328+
)
329+
);
330+
}
314331
break;
315332
default:
316333
throw new AssertionError("unknown result type for " + getCurrentItem() + ": " + result.getResultType());

0 commit comments

Comments
 (0)