Skip to content

Commit 73f7270

Browse files
committed
Create serde utility for Writable classes
Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
1 parent fbe048f commit 73f7270

File tree

2 files changed

+160
-0
lines changed

2 files changed

+160
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.repositories.blobstore;
10+
11+
import org.apache.lucene.codecs.CodecUtil;
12+
import org.apache.lucene.index.CorruptIndexException;
13+
import org.apache.lucene.index.IndexFormatTooNewException;
14+
import org.apache.lucene.index.IndexFormatTooOldException;
15+
import org.apache.lucene.store.ByteBuffersDataInput;
16+
import org.apache.lucene.store.ByteBuffersIndexInput;
17+
import org.apache.lucene.store.IndexInput;
18+
import org.apache.lucene.store.OutputStreamIndexOutput;
19+
import org.apache.lucene.util.BytesRef;
20+
import org.opensearch.Version;
21+
import org.opensearch.common.CheckedFunction;
22+
import org.opensearch.common.io.stream.BytesStreamOutput;
23+
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
24+
import org.opensearch.common.lucene.store.IndexOutputOutputStream;
25+
import org.opensearch.core.common.bytes.BytesReference;
26+
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
27+
import org.opensearch.core.common.io.stream.OutputStreamStreamOutput;
28+
import org.opensearch.core.common.io.stream.StreamInput;
29+
import org.opensearch.core.common.io.stream.StreamOutput;
30+
import org.opensearch.core.common.io.stream.Writeable;
31+
import org.opensearch.core.compress.Compressor;
32+
import org.opensearch.core.compress.CompressorRegistry;
33+
import org.opensearch.core.compress.NotXContentException;
34+
import org.opensearch.gateway.CorruptStateException;
35+
36+
import java.io.IOException;
37+
import java.io.OutputStream;
38+
import java.util.Arrays;
39+
40+
public class ChecksumWritableBlobStoreFormat<T extends Writeable> {
41+
42+
public static final int VERSION = 1;
43+
44+
private static final int BUFFER_SIZE = 4096;
45+
46+
private final String codec;
47+
private final CheckedFunction<StreamInput, T, IOException> reader;
48+
49+
public ChecksumWritableBlobStoreFormat(String codec, CheckedFunction<StreamInput, T, IOException> reader) {
50+
this.codec = codec;
51+
this.reader = reader;
52+
}
53+
54+
public BytesReference serialize(final T obj, final String blobName, final Compressor compressor) throws IOException {
55+
try (BytesStreamOutput outputStream = new BytesStreamOutput()) {
56+
try (
57+
OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(
58+
"ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")",
59+
blobName,
60+
outputStream,
61+
BUFFER_SIZE
62+
)
63+
) {
64+
CodecUtil.writeHeader(indexOutput, codec, VERSION);
65+
66+
try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) {
67+
@Override
68+
public void close() throws IOException {
69+
// this is important since some of the XContentBuilders write bytes on close.
70+
// in order to write the footer we need to prevent closing the actual index input.
71+
}
72+
}; StreamOutput stream = new OutputStreamStreamOutput(compressor.threadLocalOutputStream(indexOutputOutputStream));) {
73+
// TODO The stream version should be configurable
74+
stream.setVersion(Version.CURRENT);
75+
obj.writeTo(stream);
76+
}
77+
CodecUtil.writeFooter(indexOutput);
78+
}
79+
return outputStream.bytes();
80+
}
81+
}
82+
83+
public T deserialize(String blobName, CheckedFunction<StreamInput, T, IOException> reader, BytesReference bytes) throws IOException {
84+
final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")";
85+
try {
86+
final IndexInput indexInput = bytes.length() > 0
87+
? new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(BytesReference.toByteBuffers(bytes))), resourceDesc)
88+
: new ByteArrayIndexInput(resourceDesc, BytesRef.EMPTY_BYTES);
89+
CodecUtil.checksumEntireFile(indexInput);
90+
CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION);
91+
long filePointer = indexInput.getFilePointer();
92+
long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer;
93+
BytesReference bytesReference = bytes.slice((int) filePointer, (int) contentSize);
94+
Compressor compressor;
95+
try {
96+
compressor = CompressorRegistry.compressor(bytesReference);
97+
} catch (NotXContentException e) {
98+
compressor = CompressorRegistry.none();
99+
}
100+
try (StreamInput in = new InputStreamStreamInput(compressor.threadLocalInputStream(bytesReference.streamInput()))) {
101+
return reader.apply(in);
102+
}
103+
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
104+
// we trick this into a dedicated exception with the original stacktrace
105+
throw new CorruptStateException(ex);
106+
}
107+
}
108+
109+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.repositories.blobstore;
10+
11+
import org.opensearch.Version;
12+
import org.opensearch.cluster.metadata.IndexMetadata;
13+
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.core.common.bytes.BytesReference;
15+
import org.opensearch.core.compress.CompressorRegistry;
16+
import org.opensearch.core.index.Index;
17+
import org.opensearch.test.OpenSearchTestCase;
18+
19+
import java.io.IOException;
20+
21+
import static org.hamcrest.Matchers.is;
22+
23+
public class ChecksumWritableBlobStoreFormatTests extends OpenSearchTestCase {
24+
private static final String TEST_BLOB_FILE_NAME = "test-blob-name";
25+
private static final long VERSION = 5L;
26+
27+
private final ChecksumWritableBlobStoreFormat<IndexMetadata> clusterBlocksFormat = new ChecksumWritableBlobStoreFormat<>(
28+
"index-metadata",
29+
IndexMetadata::readFrom
30+
);
31+
32+
public void testSerDe() throws IOException {
33+
IndexMetadata indexMetadata = getIndexMetadata();
34+
BytesReference bytesReference = clusterBlocksFormat.serialize(indexMetadata, TEST_BLOB_FILE_NAME, CompressorRegistry.none());
35+
IndexMetadata readIndexMetadata = clusterBlocksFormat.deserialize(TEST_BLOB_FILE_NAME, IndexMetadata::readFrom, bytesReference);
36+
assertThat(readIndexMetadata, is(indexMetadata));
37+
}
38+
39+
private IndexMetadata getIndexMetadata() {
40+
final Index index = new Index("test-index", "index-uuid");
41+
final Settings idxSettings = Settings.builder()
42+
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
43+
.put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID())
44+
.build();
45+
return new IndexMetadata.Builder(index.getName()).settings(idxSettings)
46+
.version(VERSION)
47+
.numberOfShards(1)
48+
.numberOfReplicas(0)
49+
.build();
50+
}
51+
}

0 commit comments

Comments
 (0)