Skip to content

Commit ca070bc

Browse files
Initial commit for scale to zero
Signed-off-by: Prudhvi Godithi <pgodithi@amazon.com>
1 parent 9f790ee commit ca070bc

27 files changed

+1639
-134
lines changed

gradle/run.gradle

+11
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,17 @@ testClusters {
4545
plugin('plugins:'.concat(p))
4646
}
4747
}
48+
setting 'opensearch.experimental.feature.read.write.split.enabled', 'true'
49+
setting 'path.repo', '["/tmp/my-repo"]'
50+
setting 'node.attr.remote_store', 'true'
51+
setting 'cluster.remote_store.state.enabled', 'true'
52+
setting 'node.attr.remote_store.segment.repository', 'my-repository'
53+
setting 'node.attr.remote_store.translog.repository', 'my-repository'
54+
setting 'node.attr.remote_store.repository.my-repository.type', 'fs'
55+
setting 'node.attr.remote_store.state.repository', 'my-repository'
56+
setting 'node.attr.remote_store.repository.my-repository.settings.location', '/tmp/my-repo'
57+
58+
4859
}
4960
}
5061

server/src/main/java/org/opensearch/action/ActionModule.java

+5
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,8 @@
184184
import org.opensearch.action.admin.indices.resolve.ResolveIndexAction;
185185
import org.opensearch.action.admin.indices.rollover.RolloverAction;
186186
import org.opensearch.action.admin.indices.rollover.TransportRolloverAction;
187+
import org.opensearch.action.admin.indices.scaleToZero.PreScaleSyncAction;
188+
import org.opensearch.action.admin.indices.scaleToZero.TransportPreScaleSyncAction;
187189
import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction;
188190
import org.opensearch.action.admin.indices.segments.PitSegmentsAction;
189191
import org.opensearch.action.admin.indices.segments.TransportIndicesSegmentsAction;
@@ -685,6 +687,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
685687
actions.register(AutoPutMappingAction.INSTANCE, TransportAutoPutMappingAction.class);
686688
actions.register(IndicesAliasesAction.INSTANCE, TransportIndicesAliasesAction.class);
687689
actions.register(UpdateSettingsAction.INSTANCE, TransportUpdateSettingsAction.class);
690+
691+
actions.register(PreScaleSyncAction.INSTANCE, TransportPreScaleSyncAction.class);
692+
688693
actions.register(AnalyzeAction.INSTANCE, TransportAnalyzeAction.class);
689694
actions.register(PutIndexTemplateAction.INSTANCE, TransportPutIndexTemplateAction.class);
690695
actions.register(GetIndexTemplatesAction.INSTANCE, TransportGetIndexTemplatesAction.class);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.scaleToZero;
10+
11+
import org.opensearch.core.common.io.stream.StreamInput;
12+
import org.opensearch.core.common.io.stream.StreamOutput;
13+
import org.opensearch.core.index.shard.ShardId;
14+
import org.opensearch.transport.TransportRequest;
15+
16+
import java.io.IOException;
17+
import java.util.List;
18+
19+
public class NodePreScaleSyncRequest extends TransportRequest {
20+
private final String index;
21+
private final List<ShardId> shardIds;
22+
23+
public NodePreScaleSyncRequest(String index, List<ShardId> shardIds) {
24+
this.index = index;
25+
this.shardIds = shardIds;
26+
}
27+
28+
public NodePreScaleSyncRequest(StreamInput in) throws IOException {
29+
super(in);
30+
this.index = in.readString();
31+
this.shardIds = in.readList(ShardId::new);
32+
}
33+
34+
@Override
35+
public void writeTo(StreamOutput out) throws IOException {
36+
super.writeTo(out);
37+
out.writeString(index);
38+
out.writeList(shardIds);
39+
}
40+
41+
public String getIndex() {
42+
return index;
43+
}
44+
45+
public List<ShardId> getShardIds() {
46+
return shardIds;
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.scaleToZero;
10+
11+
import org.opensearch.cluster.node.DiscoveryNode;
12+
import org.opensearch.core.common.io.stream.StreamInput;
13+
import org.opensearch.core.common.io.stream.StreamOutput;
14+
import org.opensearch.core.transport.TransportResponse;
15+
16+
import java.io.IOException;
17+
import java.util.List;
18+
19+
public class NodePreScaleSyncResponse extends TransportResponse {
20+
private final DiscoveryNode node;
21+
private final List<ShardPreScaleSyncResponse> shardResponses;
22+
23+
public NodePreScaleSyncResponse(DiscoveryNode node, List<ShardPreScaleSyncResponse> shardResponses) {
24+
this.node = node;
25+
this.shardResponses = shardResponses;
26+
}
27+
28+
public NodePreScaleSyncResponse(StreamInput in) throws IOException {
29+
node = new DiscoveryNode(in);
30+
shardResponses = in.readList(ShardPreScaleSyncResponse::new);
31+
}
32+
33+
@Override
34+
public void writeTo(StreamOutput out) throws IOException {
35+
node.writeTo(out);
36+
out.writeList(shardResponses);
37+
}
38+
39+
public DiscoveryNode getNode() {
40+
return node;
41+
}
42+
43+
public List<ShardPreScaleSyncResponse> getShardResponses() {
44+
return shardResponses;
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.scaleToZero;
10+
11+
import org.opensearch.action.ActionType;
12+
13+
public class PreScaleSyncAction extends ActionType<PreScaleSyncResponse> {
14+
public static final PreScaleSyncAction INSTANCE = new PreScaleSyncAction();
15+
public static final String NAME = "indices:admin/settings/pre_scale_sync";
16+
17+
private PreScaleSyncAction() {
18+
super(NAME, PreScaleSyncResponse::new);
19+
}
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.scaleToZero;
10+
11+
import org.opensearch.action.ActionRequest;
12+
import org.opensearch.action.ActionRequestValidationException;
13+
import org.opensearch.core.common.io.stream.StreamInput;
14+
import org.opensearch.core.common.io.stream.StreamOutput;
15+
16+
import java.io.IOException;
17+
18+
public class PreScaleSyncRequest extends ActionRequest {
19+
private final String index;
20+
21+
public PreScaleSyncRequest(String index) {
22+
this.index = index;
23+
}
24+
25+
public PreScaleSyncRequest(StreamInput in) throws IOException {
26+
super(in);
27+
this.index = in.readString();
28+
}
29+
30+
@Override
31+
public void writeTo(StreamOutput out) throws IOException {
32+
super.writeTo(out);
33+
out.writeString(index);
34+
}
35+
36+
public String getIndex() {
37+
return index;
38+
}
39+
40+
@Override
41+
public ActionRequestValidationException validate() {
42+
return null;
43+
}
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.scaleToZero;
10+
11+
import org.opensearch.core.action.ActionResponse;
12+
import org.opensearch.core.common.io.stream.StreamInput;
13+
import org.opensearch.core.common.io.stream.StreamOutput;
14+
import org.opensearch.core.xcontent.ToXContent;
15+
import org.opensearch.core.xcontent.XContentBuilder;
16+
17+
import java.io.IOException;
18+
import java.util.ArrayList;
19+
import java.util.Collection;
20+
21+
public class PreScaleSyncResponse extends ActionResponse implements ToXContent {
22+
private final Collection<NodePreScaleSyncResponse> nodeResponses;
23+
private final String failureReason;
24+
private final boolean hasFailures;
25+
26+
public PreScaleSyncResponse(Collection<NodePreScaleSyncResponse> responses) {
27+
this.nodeResponses = responses;
28+
this.hasFailures = responses.stream()
29+
.anyMatch(r -> r.getShardResponses().stream().anyMatch(s -> s.hasUncommittedOperations() || s.needsSync()));
30+
this.failureReason = buildFailureReason();
31+
}
32+
33+
public PreScaleSyncResponse(StreamInput in) throws IOException {
34+
this.nodeResponses = in.readList(NodePreScaleSyncResponse::new);
35+
this.hasFailures = in.readBoolean();
36+
this.failureReason = in.readOptionalString();
37+
}
38+
39+
@Override
40+
public void writeTo(StreamOutput out) throws IOException {
41+
out.writeList(new ArrayList<>(nodeResponses)); // Convert Collection to List
42+
out.writeBoolean(hasFailures);
43+
out.writeOptionalString(failureReason);
44+
}
45+
46+
public boolean hasFailures() {
47+
return hasFailures;
48+
}
49+
50+
public String getFailureReason() {
51+
return failureReason;
52+
}
53+
54+
private String buildFailureReason() {
55+
if (!hasFailures) {
56+
return null;
57+
}
58+
StringBuilder reason = new StringBuilder();
59+
for (NodePreScaleSyncResponse nodeResponse : nodeResponses) {
60+
for (ShardPreScaleSyncResponse shardResponse : nodeResponse.getShardResponses()) {
61+
if (shardResponse.hasUncommittedOperations() || shardResponse.needsSync()) {
62+
reason.append("Shard ")
63+
.append(shardResponse.getShardId())
64+
.append(" on node ")
65+
.append(nodeResponse.getNode())
66+
.append(": ");
67+
if (shardResponse.hasUncommittedOperations()) {
68+
reason.append("has uncommitted operations ");
69+
}
70+
if (shardResponse.needsSync()) {
71+
reason.append("needs sync ");
72+
}
73+
reason.append("; ");
74+
}
75+
}
76+
}
77+
return reason.toString();
78+
}
79+
80+
@Override
81+
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
82+
builder.startObject();
83+
builder.field("has_failures", hasFailures);
84+
if (failureReason != null) {
85+
builder.field("failure_reason", failureReason);
86+
}
87+
builder.endObject();
88+
return builder;
89+
}
90+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.scaleToZero;
10+
11+
import org.opensearch.core.common.io.stream.StreamInput;
12+
import org.opensearch.core.common.io.stream.StreamOutput;
13+
import org.opensearch.core.common.io.stream.Writeable;
14+
import org.opensearch.core.index.shard.ShardId;
15+
16+
import java.io.IOException;
17+
18+
public class ShardPreScaleSyncResponse implements Writeable {
19+
private final ShardId shardId;
20+
private final boolean needsSync;
21+
private final int uncommittedOperations;
22+
23+
public ShardPreScaleSyncResponse(ShardId shardId, boolean needsSync, int uncommittedOperations) {
24+
this.shardId = shardId;
25+
this.needsSync = needsSync;
26+
this.uncommittedOperations = uncommittedOperations;
27+
}
28+
29+
public ShardPreScaleSyncResponse(StreamInput in) throws IOException {
30+
this.shardId = new ShardId(in);
31+
this.needsSync = in.readBoolean();
32+
this.uncommittedOperations = in.readVInt();
33+
}
34+
35+
@Override
36+
public void writeTo(StreamOutput out) throws IOException {
37+
shardId.writeTo(out);
38+
out.writeBoolean(needsSync);
39+
out.writeVInt(uncommittedOperations);
40+
}
41+
42+
public ShardId getShardId() {
43+
return shardId;
44+
}
45+
46+
public boolean needsSync() {
47+
return needsSync;
48+
}
49+
50+
public boolean hasUncommittedOperations() {
51+
return uncommittedOperations > 0;
52+
}
53+
}

0 commit comments

Comments
 (0)