Skip to content

Commit e93314f

Browse files
himshikhaHimshikha Gupta
authored and
Himshikha Gupta
committed
Optimize checksum creation for remote cluster state (opensearch-project#16046)
* Support parallelisation in remote publication checksum computation Signed-off-by: Himshikha Gupta <himshikh@amazon.com>
1 parent af5a3d1 commit e93314f

File tree

6 files changed

+164
-82
lines changed

6 files changed

+164
-82
lines changed

server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java

+99-51
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
import org.apache.logging.log4j.Logger;
1313
import org.opensearch.cluster.ClusterState;
1414
import org.opensearch.cluster.metadata.DiffableStringMap;
15+
import org.opensearch.common.CheckedFunction;
1516
import org.opensearch.common.io.stream.BytesStreamOutput;
1617
import org.opensearch.common.settings.Settings;
18+
import org.opensearch.common.unit.TimeValue;
1719
import org.opensearch.core.common.io.stream.StreamInput;
1820
import org.opensearch.core.common.io.stream.StreamOutput;
1921
import org.opensearch.core.common.io.stream.Writeable;
@@ -22,11 +24,15 @@
2224
import org.opensearch.core.xcontent.XContentParseException;
2325
import org.opensearch.core.xcontent.XContentParser;
2426
import org.opensearch.index.translog.BufferedChecksumStreamOutput;
27+
import org.opensearch.threadpool.ThreadPool;
2528

2629
import java.io.IOException;
2730
import java.util.ArrayList;
2831
import java.util.List;
2932
import java.util.Objects;
33+
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.ExecutorService;
35+
import java.util.function.Consumer;
3036

3137
import com.jcraft.jzlib.JZlib;
3238

@@ -37,6 +43,7 @@
3743
*/
3844
public class ClusterStateChecksum implements ToXContentFragment, Writeable {
3945

46+
public static final int COMPONENT_SIZE = 11;
4047
static final String ROUTING_TABLE_CS = "routing_table";
4148
static final String NODES_CS = "discovery_nodes";
4249
static final String BLOCKS_CS = "blocks";
@@ -65,62 +72,103 @@ public class ClusterStateChecksum implements ToXContentFragment, Writeable {
6572
long indicesChecksum;
6673
long clusterStateChecksum;
6774

68-
public ClusterStateChecksum(ClusterState clusterState) {
69-
try (
70-
BytesStreamOutput out = new BytesStreamOutput();
71-
BufferedChecksumStreamOutput checksumOut = new BufferedChecksumStreamOutput(out)
72-
) {
73-
clusterState.routingTable().writeVerifiableTo(checksumOut);
74-
routingTableChecksum = checksumOut.getChecksum();
75-
76-
checksumOut.reset();
77-
clusterState.nodes().writeVerifiableTo(checksumOut);
78-
nodesChecksum = checksumOut.getChecksum();
79-
80-
checksumOut.reset();
81-
clusterState.coordinationMetadata().writeVerifiableTo(checksumOut);
82-
coordinationMetadataChecksum = checksumOut.getChecksum();
83-
84-
// Settings create sortedMap by default, so no explicit sorting required here.
85-
checksumOut.reset();
86-
Settings.writeSettingsToStream(clusterState.metadata().persistentSettings(), checksumOut);
87-
settingMetadataChecksum = checksumOut.getChecksum();
88-
89-
checksumOut.reset();
90-
Settings.writeSettingsToStream(clusterState.metadata().transientSettings(), checksumOut);
91-
transientSettingsMetadataChecksum = checksumOut.getChecksum();
92-
93-
checksumOut.reset();
94-
clusterState.metadata().templatesMetadata().writeVerifiableTo(checksumOut);
95-
templatesMetadataChecksum = checksumOut.getChecksum();
96-
97-
checksumOut.reset();
98-
checksumOut.writeStringCollection(clusterState.metadata().customs().keySet());
99-
customMetadataMapChecksum = checksumOut.getChecksum();
100-
101-
checksumOut.reset();
102-
((DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings()).writeTo(checksumOut);
103-
hashesOfConsistentSettingsChecksum = checksumOut.getChecksum();
104-
105-
checksumOut.reset();
106-
checksumOut.writeMapValues(
75+
public ClusterStateChecksum(ClusterState clusterState, ThreadPool threadpool) {
76+
long start = threadpool.relativeTimeInNanos();
77+
ExecutorService executorService = threadpool.executor(ThreadPool.Names.REMOTE_STATE_CHECKSUM);
78+
CountDownLatch latch = new CountDownLatch(COMPONENT_SIZE);
79+
80+
executeChecksumTask((stream) -> {
81+
clusterState.routingTable().writeVerifiableTo(stream);
82+
return null;
83+
}, checksum -> routingTableChecksum = checksum, executorService, latch);
84+
85+
executeChecksumTask((stream) -> {
86+
clusterState.nodes().writeVerifiableTo(stream);
87+
return null;
88+
}, checksum -> nodesChecksum = checksum, executorService, latch);
89+
90+
executeChecksumTask((stream) -> {
91+
clusterState.coordinationMetadata().writeVerifiableTo(stream);
92+
return null;
93+
}, checksum -> coordinationMetadataChecksum = checksum, executorService, latch);
94+
95+
executeChecksumTask((stream) -> {
96+
Settings.writeSettingsToStream(clusterState.metadata().persistentSettings(), stream);
97+
return null;
98+
}, checksum -> settingMetadataChecksum = checksum, executorService, latch);
99+
100+
executeChecksumTask((stream) -> {
101+
Settings.writeSettingsToStream(clusterState.metadata().transientSettings(), stream);
102+
return null;
103+
}, checksum -> transientSettingsMetadataChecksum = checksum, executorService, latch);
104+
105+
executeChecksumTask((stream) -> {
106+
clusterState.metadata().templatesMetadata().writeVerifiableTo(stream);
107+
return null;
108+
}, checksum -> templatesMetadataChecksum = checksum, executorService, latch);
109+
110+
executeChecksumTask((stream) -> {
111+
stream.writeStringCollection(clusterState.metadata().customs().keySet());
112+
return null;
113+
}, checksum -> customMetadataMapChecksum = checksum, executorService, latch);
114+
115+
executeChecksumTask((stream) -> {
116+
((DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings()).writeTo(stream);
117+
return null;
118+
}, checksum -> hashesOfConsistentSettingsChecksum = checksum, executorService, latch);
119+
120+
executeChecksumTask((stream) -> {
121+
stream.writeMapValues(
107122
clusterState.metadata().indices(),
108-
(stream, value) -> value.writeVerifiableTo((BufferedChecksumStreamOutput) stream)
123+
(checksumStream, value) -> value.writeVerifiableTo((BufferedChecksumStreamOutput) checksumStream)
109124
);
110-
indicesChecksum = checksumOut.getChecksum();
111-
112-
checksumOut.reset();
113-
clusterState.blocks().writeVerifiableTo(checksumOut);
114-
blocksChecksum = checksumOut.getChecksum();
115-
116-
checksumOut.reset();
117-
checksumOut.writeStringCollection(clusterState.customs().keySet());
118-
clusterStateCustomsChecksum = checksumOut.getChecksum();
119-
} catch (IOException e) {
120-
logger.error("Failed to create checksum for cluster state.", e);
125+
return null;
126+
}, checksum -> indicesChecksum = checksum, executorService, latch);
127+
128+
executeChecksumTask((stream) -> {
129+
clusterState.blocks().writeVerifiableTo(stream);
130+
return null;
131+
}, checksum -> blocksChecksum = checksum, executorService, latch);
132+
133+
executeChecksumTask((stream) -> {
134+
stream.writeStringCollection(clusterState.customs().keySet());
135+
return null;
136+
}, checksum -> clusterStateCustomsChecksum = checksum, executorService, latch);
137+
138+
try {
139+
latch.await();
140+
} catch (InterruptedException e) {
121141
throw new RemoteStateTransferException("Failed to create checksum for cluster state.", e);
122142
}
123143
createClusterStateChecksum();
144+
logger.debug("Checksum execution time {}", TimeValue.nsecToMSec(threadpool.relativeTimeInNanos() - start));
145+
}
146+
147+
private void executeChecksumTask(
148+
CheckedFunction<BufferedChecksumStreamOutput, Void, IOException> checksumTask,
149+
Consumer<Long> checksumConsumer,
150+
ExecutorService executorService,
151+
CountDownLatch latch
152+
) {
153+
executorService.execute(() -> {
154+
try {
155+
long checksum = createChecksum(checksumTask);
156+
checksumConsumer.accept(checksum);
157+
latch.countDown();
158+
} catch (IOException e) {
159+
throw new RemoteStateTransferException("Failed to execute checksum task", e);
160+
}
161+
});
162+
}
163+
164+
private long createChecksum(CheckedFunction<BufferedChecksumStreamOutput, Void, IOException> task) throws IOException {
165+
try (
166+
BytesStreamOutput out = new BytesStreamOutput();
167+
BufferedChecksumStreamOutput checksumOut = new BufferedChecksumStreamOutput(out)
168+
) {
169+
task.apply(checksumOut);
170+
return checksumOut.getChecksum();
171+
}
124172
}
125173

126174
private void createClusterStateChecksum() {

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,9 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat
332332
uploadedMetadataResults,
333333
previousClusterUUID,
334334
clusterStateDiffManifest,
335-
!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null,
335+
!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE)
336+
? new ClusterStateChecksum(clusterState, threadpool)
337+
: null,
336338
false,
337339
codecVersion
338340
);
@@ -539,7 +541,9 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
539541
uploadedMetadataResults,
540542
previousManifest.getPreviousClusterUUID(),
541543
clusterStateDiffManifest,
542-
!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null,
544+
!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE)
545+
? new ClusterStateChecksum(clusterState, threadpool)
546+
: null,
543547
false,
544548
previousManifest.getCodecVersion()
545549
);
@@ -1010,7 +1014,9 @@ public RemoteClusterStateManifestInfo markLastStateAsCommitted(
10101014
uploadedMetadataResults,
10111015
previousManifest.getPreviousClusterUUID(),
10121016
previousManifest.getDiffManifest(),
1013-
!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null,
1017+
!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE)
1018+
? new ClusterStateChecksum(clusterState, threadpool)
1019+
: null,
10141020
true,
10151021
previousManifest.getCodecVersion()
10161022
);
@@ -1631,7 +1637,7 @@ void validateClusterStateFromChecksum(
16311637
String localNodeId,
16321638
boolean isFullStateDownload
16331639
) {
1634-
ClusterStateChecksum newClusterStateChecksum = new ClusterStateChecksum(clusterState);
1640+
ClusterStateChecksum newClusterStateChecksum = new ClusterStateChecksum(clusterState, threadpool);
16351641
List<String> failedValidation = newClusterStateChecksum.getMismatchEntities(manifest.getClusterStateChecksum());
16361642
if (failedValidation.isEmpty()) {
16371643
return;

server/src/main/java/org/opensearch/threadpool/ThreadPool.java

+7
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.opensearch.core.service.ReportingService;
5353
import org.opensearch.core.xcontent.ToXContentFragment;
5454
import org.opensearch.core.xcontent.XContentBuilder;
55+
import org.opensearch.gateway.remote.ClusterStateChecksum;
5556
import org.opensearch.node.Node;
5657

5758
import java.io.IOException;
@@ -117,6 +118,7 @@ public static class Names {
117118
public static final String REMOTE_RECOVERY = "remote_recovery";
118119
public static final String REMOTE_STATE_READ = "remote_state_read";
119120
public static final String INDEX_SEARCHER = "index_searcher";
121+
public static final String REMOTE_STATE_CHECKSUM = "remote_state_checksum";
120122
}
121123

122124
/**
@@ -190,6 +192,7 @@ public static ThreadPoolType fromType(String type) {
190192
map.put(Names.REMOTE_RECOVERY, ThreadPoolType.SCALING);
191193
map.put(Names.INDEX_SEARCHER, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE);
192194
map.put(Names.REMOTE_STATE_READ, ThreadPoolType.SCALING);
195+
map.put(Names.REMOTE_STATE_CHECKSUM, ThreadPoolType.FIXED);
193196
THREAD_POOL_TYPES = Collections.unmodifiableMap(map);
194197
}
195198

@@ -321,6 +324,10 @@ public ThreadPool(
321324
runnableTaskListener
322325
)
323326
);
327+
builders.put(
328+
Names.REMOTE_STATE_CHECKSUM,
329+
new FixedExecutorBuilder(settings, Names.REMOTE_STATE_CHECKSUM, ClusterStateChecksum.COMPONENT_SIZE, 1000)
330+
);
324331

325332
for (final ExecutorBuilder<?> builder : customBuilders) {
326333
if (builders.containsKey(builder.name())) {

server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@
3434
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
3535
import org.opensearch.test.EqualsHashCodeTestUtils;
3636
import org.opensearch.test.OpenSearchTestCase;
37+
import org.opensearch.threadpool.TestThreadPool;
38+
import org.opensearch.threadpool.ThreadPool;
39+
import org.junit.After;
3740

3841
import java.io.IOException;
3942
import java.util.ArrayList;
@@ -64,6 +67,14 @@
6467

6568
public class ClusterMetadataManifestTests extends OpenSearchTestCase {
6669

70+
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
71+
72+
@After
73+
public void teardown() throws Exception {
74+
super.tearDown();
75+
threadPool.shutdown();
76+
}
77+
6778
public void testClusterMetadataManifestXContentV0() throws IOException {
6879
UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path", CODEC_V0);
6980
ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder()
@@ -214,7 +225,7 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() {
214225
"indicesRoutingDiffPath"
215226
)
216227
)
217-
.checksum(new ClusterStateChecksum(createClusterState()))
228+
.checksum(new ClusterStateChecksum(createClusterState(), threadPool))
218229
.build();
219230
{ // Mutate Cluster Term
220231
EqualsHashCodeTestUtils.checkEqualsAndHashCode(
@@ -647,7 +658,7 @@ public void testClusterMetadataManifestXContentV4() throws IOException {
647658
UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path");
648659
UploadedMetadataAttribute uploadedMetadataAttribute = new UploadedMetadataAttribute("attribute_name", "testing_attribute");
649660
final StringKeyDiffProvider<IndexRoutingTable> routingTableIncrementalDiff = Mockito.mock(StringKeyDiffProvider.class);
650-
ClusterStateChecksum checksum = new ClusterStateChecksum(createClusterState());
661+
ClusterStateChecksum checksum = new ClusterStateChecksum(createClusterState(), threadPool);
651662
ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder()
652663
.clusterTerm(1L)
653664
.stateVersion(1L)

0 commit comments

Comments
 (0)