Skip to content

Commit 10659e7

Browse files
committed
Move cluster state publication to remote
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
1 parent d499a74 commit 10659e7

File tree

69 files changed

+7078
-1104
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+7078
-1104
lines changed

libs/core/src/main/java/org/opensearch/core/common/transport/TransportAddress.java

+10
Original file line numberDiff line numberDiff line change
@@ -162,4 +162,14 @@ public String toString() {
162162
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
163163
return builder.value(toString());
164164
}
165+
166+
public static TransportAddress fromString(String address) throws UnknownHostException {
167+
String[] addressSplit = address.split(":");
168+
if (addressSplit.length != 2) {
169+
throw new IllegalArgumentException("address must be of the form [hostname/ip]:[port]");
170+
}
171+
String hostname = addressSplit[0];
172+
int port = Integer.parseInt(addressSplit[1]);
173+
return new TransportAddress(InetAddress.getByName(hostname), port);
174+
}
165175
}

libs/core/src/main/java/org/opensearch/core/xcontent/XContentParserUtils.java

+12
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import org.opensearch.core.xcontent.XContentParser.Token;
3939

4040
import java.io.IOException;
41+
import java.util.ArrayList;
42+
import java.util.List;
4143
import java.util.Locale;
4244
import java.util.function.Consumer;
4345

@@ -178,4 +180,14 @@ public static <T> void parseTypedKeysObject(XContentParser parser, String delimi
178180
throw new ParsingException(parser.getTokenLocation(), "Failed to parse object: empty key");
179181
}
180182
}
183+
184+
public static List<String> parseStringList(XContentParser parser) throws IOException {
185+
List<String> valueList = new ArrayList<>();
186+
ensureExpectedToken(Token.START_ARRAY, parser.currentToken(), parser);
187+
while (parser.nextToken() != Token.END_ARRAY) {
188+
ensureExpectedToken(Token.VALUE_STRING, parser.currentToken(), parser);
189+
valueList.add(parser.text());
190+
}
191+
return valueList;
192+
}
181193
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.gateway.remote;
10+
11+
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
12+
import org.opensearch.common.blobstore.BlobPath;
13+
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
15+
import org.opensearch.repositories.RepositoriesService;
16+
import org.opensearch.repositories.blobstore.BlobStoreRepository;
17+
import org.opensearch.test.OpenSearchIntegTestCase;
18+
19+
import java.nio.charset.StandardCharsets;
20+
import java.util.Base64;
21+
import java.util.Map;
22+
import java.util.concurrent.TimeUnit;
23+
24+
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT;
25+
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING;
26+
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
27+
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
28+
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
29+
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
30+
31+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
32+
public class RemoteClusterStateCleanupManagerIT extends RemoteStoreBaseIntegTestCase {
33+
34+
private static final String INDEX_NAME = "test-index";
35+
36+
@Override
37+
protected Settings nodeSettings(int nodeOrdinal) {
38+
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
39+
}
40+
41+
private Map<String, Long> initialTestSetup(int shardCount, int replicaCount, int dataNodeCount, int clusterManagerNodeCount) {
42+
prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
43+
Map<String, Long> indexStats = indexData(1, false, INDEX_NAME);
44+
assertEquals(shardCount * (replicaCount + 1), getNumShards(INDEX_NAME).totalNumShards);
45+
ensureGreen(INDEX_NAME);
46+
return indexStats;
47+
}
48+
49+
public void testRemoteCleanupTaskUpdated() {
50+
int shardCount = randomIntBetween(1, 2);
51+
int replicaCount = 1;
52+
int dataNodeCount = shardCount * (replicaCount + 1);
53+
int clusterManagerNodeCount = 1;
54+
55+
initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
56+
RemoteClusterStateCleanupManager remoteClusterStateCleanupManager = internalCluster().getClusterManagerNodeInstance(
57+
RemoteClusterStateCleanupManager.class
58+
);
59+
60+
assertEquals(CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval());
61+
assertTrue(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled());
62+
63+
// now disable
64+
client().admin()
65+
.cluster()
66+
.prepareUpdateSettings()
67+
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), -1))
68+
.get();
69+
70+
assertEquals(-1, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMillis());
71+
assertFalse(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled());
72+
73+
// now set Clean up interval to 1 min
74+
client().admin()
75+
.cluster()
76+
.prepareUpdateSettings()
77+
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "1m"))
78+
.get();
79+
assertEquals(1, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMinutes());
80+
}
81+
82+
public void testRemoteCleanupDeleteStale() throws Exception {
83+
int shardCount = randomIntBetween(1, 2);
84+
int replicaCount = 1;
85+
int dataNodeCount = shardCount * (replicaCount + 1);
86+
int clusterManagerNodeCount = 1;
87+
88+
initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
89+
90+
// set cleanup interval to 100 ms to make the test faster
91+
ClusterUpdateSettingsResponse response = client().admin()
92+
.cluster()
93+
.prepareUpdateSettings()
94+
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "100ms"))
95+
.get();
96+
97+
assertTrue(response.isAcknowledged());
98+
99+
// update cluster state 21 times to ensure that clean up has run after this will upload 42 manifest files
100+
// to repository, if manifest files are less than that it means clean up has run
101+
updateClusterStateNTimes(RETAINED_MANIFESTS + SKIP_CLEANUP_STATE_CHANGES + 1);
102+
103+
RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
104+
BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
105+
BlobPath baseMetadataPath = repository.basePath()
106+
.add(
107+
Base64.getUrlEncoder()
108+
.withoutPadding()
109+
.encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
110+
)
111+
.add("cluster-state")
112+
.add(getClusterState().metadata().clusterUUID());
113+
BlobPath manifestContainerPath = baseMetadataPath.add("manifest");
114+
115+
assertBusy(() -> {
116+
int manifestFiles = repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size();
117+
logger.info("number of current manifest file: {}", manifestFiles);
118+
// we can't guarantee that we have same number of manifest as Retained manifest in our repo as there can be other queued task
119+
// other than replica count change which can upload new manifest files, that's why we check that number of manifests is between
120+
// Retained manifests and Retained manifests + 2 * Skip cleanup state changes (each cluster state update uploads 2 manifests)
121+
assertTrue(
122+
"Current number of manifest files: " + manifestFiles,
123+
manifestFiles >= RETAINED_MANIFESTS && manifestFiles < RETAINED_MANIFESTS + 2 * SKIP_CLEANUP_STATE_CHANGES
124+
);
125+
}, 500, TimeUnit.MILLISECONDS);
126+
}
127+
128+
private void updateClusterStateNTimes(int n) {
129+
int newReplicaCount = randomIntBetween(0, 3);
130+
for (int i = n; i > 0; i--) {
131+
ClusterUpdateSettingsResponse response = client().admin()
132+
.cluster()
133+
.prepareUpdateSettings()
134+
.setPersistentSettings(Settings.builder().put(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.getKey(), i, TimeUnit.SECONDS))
135+
.get();
136+
assertTrue(response.isAcknowledged());
137+
}
138+
}
139+
}

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java

+10
Original file line numberDiff line numberDiff line change
@@ -311,4 +311,14 @@ protected void restore(boolean restoreAllShards, String... indices) {
311311
PlainActionFuture.newFuture()
312312
);
313313
}
314+
315+
protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
316+
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes);
317+
internalCluster().startDataOnlyNodes(numDataOnlyNodes);
318+
for (String index : indices.split(",")) {
319+
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
320+
ensureYellowAndNoInitializingShards(index);
321+
ensureGreen(index);
322+
}
323+
}
314324
}

server/src/main/java/org/opensearch/cluster/ClusterState.java

+2-28
Original file line numberDiff line numberDiff line change
@@ -496,38 +496,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
496496
}
497497

498498
if (metrics.contains(Metric.BLOCKS)) {
499-
builder.startObject("blocks");
500-
501-
if (blocks().global().isEmpty() == false) {
502-
builder.startObject("global");
503-
for (ClusterBlock block : blocks().global()) {
504-
block.toXContent(builder, params);
505-
}
506-
builder.endObject();
507-
}
508-
509-
if (blocks().indices().isEmpty() == false) {
510-
builder.startObject("indices");
511-
for (final Map.Entry<String, Set<ClusterBlock>> entry : blocks().indices().entrySet()) {
512-
builder.startObject(entry.getKey());
513-
for (ClusterBlock block : entry.getValue()) {
514-
block.toXContent(builder, params);
515-
}
516-
builder.endObject();
517-
}
518-
builder.endObject();
519-
}
520-
521-
builder.endObject();
499+
blocks().toXContent(builder, params);
522500
}
523501

524502
// nodes
525503
if (metrics.contains(Metric.NODES)) {
526-
builder.startObject("nodes");
527-
for (DiscoveryNode node : nodes) {
528-
node.toXContent(builder, params);
529-
}
530-
builder.endObject();
504+
nodes.toXContent(builder, params);
531505
}
532506

533507
// meta data

server/src/main/java/org/opensearch/cluster/RepositoryCleanupInProgress.java

+36
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,15 @@
3333

3434
import org.opensearch.LegacyESVersion;
3535
import org.opensearch.Version;
36+
import org.opensearch.cluster.metadata.Metadata;
3637
import org.opensearch.core.common.Strings;
3738
import org.opensearch.core.common.io.stream.StreamInput;
3839
import org.opensearch.core.common.io.stream.StreamOutput;
3940
import org.opensearch.core.common.io.stream.Writeable;
4041
import org.opensearch.core.xcontent.MediaTypeRegistry;
4142
import org.opensearch.core.xcontent.XContentBuilder;
43+
import org.opensearch.core.xcontent.XContentParser;
44+
import org.opensearch.core.xcontent.XContentParserUtils;
4245
import org.opensearch.repositories.RepositoryOperation;
4346

4447
import java.io.IOException;
@@ -101,13 +104,46 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
101104
builder.startObject();
102105
{
103106
builder.field("repository", entry.repository);
107+
if (params.param(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_API).equals(Metadata.CONTEXT_MODE_GATEWAY)) {
108+
builder.field("repository_state_id", entry.repositoryStateId);
109+
} // else we don't serialize it
104110
}
105111
builder.endObject();
106112
}
107113
builder.endArray();
108114
return builder;
109115
}
110116

117+
public static RepositoryCleanupInProgress fromXContent(XContentParser parser) throws IOException {
118+
if (parser.currentToken() == null) {
119+
parser.nextToken();
120+
}
121+
XContentParserUtils.ensureFieldName(parser, parser.currentToken(), TYPE);
122+
parser.nextToken();
123+
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser);
124+
List<Entry> entries = new ArrayList<>();
125+
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
126+
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
127+
String repository = null;
128+
long repositoryStateId = -1L;
129+
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
130+
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser);
131+
String currentFieldName = parser.currentName();
132+
parser.nextToken();
133+
if ("repository".equals(currentFieldName)) {
134+
repository = parser.text();
135+
} else if ("repository_state_id".equals(currentFieldName)) {
136+
// only XContent parsed with {@link Metadata.CONTEXT_MODE_GATEWAY} will have the repository state id and can be deserialized
137+
repositoryStateId = parser.longValue();
138+
} else {
139+
throw new IllegalArgumentException("unknown field [" + currentFieldName + "]");
140+
}
141+
}
142+
entries.add(new Entry(repository, repositoryStateId));
143+
}
144+
return new RepositoryCleanupInProgress(entries);
145+
}
146+
111147
@Override
112148
public String toString() {
113149
return Strings.toString(MediaTypeRegistry.JSON, this);

0 commit comments

Comments
 (0)