Skip to content

Commit fa5caec

Browse files
committed
Create serde utility for Writable classes
Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
1 parent fbe048f commit fa5caec

File tree

3 files changed

+187
-0
lines changed

3 files changed

+187
-0
lines changed

libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java

+13
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,19 @@ public static Compressor compressor(final BytesReference bytes) {
7878
return null;
7979
}
8080

81+
/**
82+
* @param bytes The bytes to check the compression for
83+
* @return The detected compressor. If no compressor detected then return NoneCompressor.
84+
*/
85+
public static Compressor getCompressor(final BytesReference bytes) {
86+
for (Compressor compressor : registeredCompressors.values()) {
87+
if (compressor.isCompressed(bytes) == true) {
88+
return compressor;
89+
}
90+
}
91+
return CompressorRegistry.none();
92+
}
93+
8194
/** Decompress the provided {@link BytesReference}. */
8295
public static BytesReference uncompress(BytesReference bytes) throws IOException {
8396
Compressor compressor = compressor(bytes);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.repositories.blobstore;
10+
11+
import org.apache.lucene.codecs.CodecUtil;
12+
import org.apache.lucene.index.CorruptIndexException;
13+
import org.apache.lucene.index.IndexFormatTooNewException;
14+
import org.apache.lucene.index.IndexFormatTooOldException;
15+
import org.apache.lucene.store.ByteBuffersDataInput;
16+
import org.apache.lucene.store.ByteBuffersIndexInput;
17+
import org.apache.lucene.store.IndexInput;
18+
import org.apache.lucene.store.OutputStreamIndexOutput;
19+
import org.apache.lucene.util.BytesRef;
20+
import org.opensearch.Version;
21+
import org.opensearch.common.CheckedFunction;
22+
import org.opensearch.common.io.stream.BytesStreamOutput;
23+
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
24+
import org.opensearch.common.lucene.store.IndexOutputOutputStream;
25+
import org.opensearch.core.common.bytes.BytesReference;
26+
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
27+
import org.opensearch.core.common.io.stream.OutputStreamStreamOutput;
28+
import org.opensearch.core.common.io.stream.StreamInput;
29+
import org.opensearch.core.common.io.stream.StreamOutput;
30+
import org.opensearch.core.common.io.stream.Writeable;
31+
import org.opensearch.core.compress.Compressor;
32+
import org.opensearch.core.compress.CompressorRegistry;
33+
import org.opensearch.gateway.CorruptStateException;
34+
35+
import java.io.IOException;
36+
import java.io.OutputStream;
37+
import java.util.Arrays;
38+
39+
/**
40+
* Checksum File format used to serialize/deserialize {@link Writeable} objects
41+
*
42+
* @opensearch.internal
43+
*/
44+
public class ChecksumWritableBlobStoreFormat<T extends Writeable> {
45+
46+
public static final int VERSION = 1;
47+
48+
private static final int BUFFER_SIZE = 4096;
49+
50+
private final String codec;
51+
private final CheckedFunction<StreamInput, T, IOException> reader;
52+
53+
public ChecksumWritableBlobStoreFormat(String codec, CheckedFunction<StreamInput, T, IOException> reader) {
54+
this.codec = codec;
55+
this.reader = reader;
56+
}
57+
58+
public BytesReference serialize(final T obj, final String blobName, final Compressor compressor) throws IOException {
59+
try (BytesStreamOutput outputStream = new BytesStreamOutput()) {
60+
try (
61+
OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(
62+
"ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")",
63+
blobName,
64+
outputStream,
65+
BUFFER_SIZE
66+
)
67+
) {
68+
CodecUtil.writeHeader(indexOutput, codec, VERSION);
69+
70+
try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) {
71+
@Override
72+
public void close() throws IOException {
73+
// this is important since some of the XContentBuilders write bytes on close.
74+
// in order to write the footer we need to prevent closing the actual index input.
75+
}
76+
}; StreamOutput stream = new OutputStreamStreamOutput(compressor.threadLocalOutputStream(indexOutputOutputStream));) {
77+
// TODO The stream version should be configurable
78+
stream.setVersion(Version.CURRENT);
79+
obj.writeTo(stream);
80+
}
81+
CodecUtil.writeFooter(indexOutput);
82+
}
83+
return outputStream.bytes();
84+
}
85+
}
86+
87+
public T deserialize(String blobName, CheckedFunction<StreamInput, T, IOException> reader, BytesReference bytes) throws IOException {
88+
final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")";
89+
try {
90+
final IndexInput indexInput = bytes.length() > 0
91+
? new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(BytesReference.toByteBuffers(bytes))), resourceDesc)
92+
: new ByteArrayIndexInput(resourceDesc, BytesRef.EMPTY_BYTES);
93+
CodecUtil.checksumEntireFile(indexInput);
94+
CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION);
95+
long filePointer = indexInput.getFilePointer();
96+
long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer;
97+
BytesReference bytesReference = bytes.slice((int) filePointer, (int) contentSize);
98+
Compressor compressor = CompressorRegistry.getCompressor(bytesReference);
99+
try (StreamInput in = new InputStreamStreamInput(compressor.threadLocalInputStream(bytesReference.streamInput()))) {
100+
return reader.apply(in);
101+
}
102+
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
103+
// we trick this into a dedicated exception with the original stacktrace
104+
throw new CorruptStateException(ex);
105+
}
106+
}
107+
108+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.repositories.blobstore;
10+
11+
import org.opensearch.Version;
12+
import org.opensearch.cluster.metadata.IndexMetadata;
13+
import org.opensearch.common.compress.DeflateCompressor;
14+
import org.opensearch.common.settings.Settings;
15+
import org.opensearch.core.common.bytes.BytesReference;
16+
import org.opensearch.core.compress.CompressorRegistry;
17+
import org.opensearch.core.index.Index;
18+
import org.opensearch.test.OpenSearchTestCase;
19+
20+
import java.io.IOException;
21+
22+
import static org.hamcrest.Matchers.is;
23+
24+
/**
25+
* Tests for {@link ChecksumWritableBlobStoreFormat}
26+
*/
27+
public class ChecksumWritableBlobStoreFormatTests extends OpenSearchTestCase {
28+
private static final String TEST_BLOB_FILE_NAME = "test-blob-name";
29+
private static final long VERSION = 5L;
30+
31+
private final ChecksumWritableBlobStoreFormat<IndexMetadata> clusterBlocksFormat = new ChecksumWritableBlobStoreFormat<>(
32+
"index-metadata",
33+
IndexMetadata::readFrom
34+
);
35+
36+
public void testSerDe() throws IOException {
37+
IndexMetadata indexMetadata = getIndexMetadata();
38+
BytesReference bytesReference = clusterBlocksFormat.serialize(indexMetadata, TEST_BLOB_FILE_NAME, CompressorRegistry.none());
39+
IndexMetadata readIndexMetadata = clusterBlocksFormat.deserialize(TEST_BLOB_FILE_NAME, IndexMetadata::readFrom, bytesReference);
40+
assertThat(readIndexMetadata, is(indexMetadata));
41+
}
42+
43+
public void testSerDeForCompressed() throws IOException {
44+
IndexMetadata indexMetadata = getIndexMetadata();
45+
BytesReference bytesReference = clusterBlocksFormat.serialize(
46+
indexMetadata,
47+
TEST_BLOB_FILE_NAME,
48+
CompressorRegistry.getCompressor(DeflateCompressor.NAME)
49+
);
50+
IndexMetadata readIndexMetadata = clusterBlocksFormat.deserialize(TEST_BLOB_FILE_NAME, IndexMetadata::readFrom, bytesReference);
51+
assertThat(readIndexMetadata, is(indexMetadata));
52+
}
53+
54+
private IndexMetadata getIndexMetadata() {
55+
final Index index = new Index("test-index", "index-uuid");
56+
final Settings idxSettings = Settings.builder()
57+
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
58+
.put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID())
59+
.build();
60+
return new IndexMetadata.Builder(index.getName()).settings(idxSettings)
61+
.version(VERSION)
62+
.numberOfShards(1)
63+
.numberOfReplicas(0)
64+
.build();
65+
}
66+
}

0 commit comments

Comments
 (0)