Skip to content

Commit 00fb53c

Browse files
committed
Refactor Compressors from CompressorFactory to CompressorRegistry for extensibility (#9262)
This commit refactors the CompressorFactory static singleton class and CompressorType enum to a formal CompressorRegistry and enables downstream implementations to register their own compression implementations for use in compressing Blob stores and MediaType data. This is different from Lucene's Codec compression extension points which expose different compression implementations for Lucene's Stored Fields. --------- Signed-off-by: Nicholas Walter Knize <nknize@apache.org> (cherry picked from commit c6b67e1)
1 parent 1fdc08a commit 00fb53c

File tree

49 files changed

+526
-244
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+526
-244
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
8080
- Add support for wrapping CollectorManager with profiling during concurrent execution ([#9129](https://github.com/opensearch-project/OpenSearch/pull/9129))
8181
- Rethrow OpenSearch exception for non-concurrent path while using concurrent search ([#9177](https://github.com/opensearch-project/OpenSearch/pull/9177))
8282
- Improve performance of encoding composite keys in multi-term aggregations ([#9412](https://github.com/opensearch-project/OpenSearch/pull/9412))
83+
- Refactor Compressors from CompressorFactory to CompressorRegistry for extensibility ([#9262](https://github.com/opensearch-project/OpenSearch/pull/9262))
8384

8485
### Deprecated
8586

gradle/missing-javadoc.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ configure([
180180
configure([
181181
project(":libs:opensearch-common"),
182182
project(":libs:opensearch-core"),
183+
project(":libs:opensearch-compress"),
183184
project(":server")
184185
]) {
185186
project.tasks.withType(MissingJavadocTask) {

libs/compress/build.gradle

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*
8+
* Modifications Copyright OpenSearch Contributors. See
9+
* GitHub history for details.
10+
*/
11+
12+
apply plugin: 'opensearch.build'
13+
apply plugin: 'opensearch.publish'
14+
15+
base {
16+
archivesName = 'opensearch-compress'
17+
}
18+
19+
dependencies {
20+
api project(':libs:opensearch-common')
21+
api project(':libs:opensearch-core')
22+
23+
//zstd
24+
api "com.github.luben:zstd-jni:${versions.zstd}"
25+
26+
testImplementation(project(":test:framework")) {
27+
// tests use the locally compiled version of server
28+
exclude group: 'org.opensearch', module: 'opensearch-compress'
29+
}
30+
}
31+
32+
tasks.named('forbiddenApisMain').configure {
33+
// :libs:opensearch-compress does not depend on server
34+
// TODO: Need to decide how we want to handle for forbidden signatures with the changes to server
35+
replaceSignatureFiles 'jdk-signatures'
36+
}
37+
38+
jarHell.enabled = false
File renamed without changes.

server/src/main/java/org/opensearch/common/compress/ZstdCompressor.java libs/compress/src/main/java/org/opensearch/compress/ZstdCompressor.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@
66
* compatible open source license.
77
*/
88

9-
package org.opensearch.common.compress;
9+
package org.opensearch.compress;
1010

1111
import com.github.luben.zstd.RecyclingBufferPool;
1212
import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
1313
import com.github.luben.zstd.ZstdOutputStreamNoFinalizer;
1414

15+
import org.opensearch.common.annotation.PublicApi;
1516
import org.opensearch.core.common.bytes.BytesReference;
16-
import org.opensearch.core.common.compress.Compressor;
17+
import org.opensearch.core.compress.Compressor;
1718

1819
import java.io.BufferedInputStream;
1920
import java.io.BufferedOutputStream;
@@ -25,7 +26,8 @@
2526
/**
2627
* {@link Compressor} implementation based on the ZSTD compression algorithm.
2728
*
28-
* @opensearch.internal
29+
* @opensearch.api - registered name requires BWC support
30+
* @opensearch.experimental - class methods might change
2931
*/
3032
public class ZstdCompressor implements Compressor {
3133
// An arbitrary header that we use to identify compressed streams
@@ -34,6 +36,14 @@ public class ZstdCompressor implements Compressor {
3436
// a XContent
3537
private static final byte[] HEADER = new byte[] { 'Z', 'S', 'T', 'D', '\0' };
3638

39+
/**
40+
* The name to register the compressor by
41+
*
42+
* @opensearch.api - requires BWC support
43+
*/
44+
@PublicApi(since = "2.10.0")
45+
public static final String NAME = "ZSTD";
46+
3747
private static final int LEVEL = 3;
3848

3949
private static final int BUFFER_SIZE = 4096;

libs/core/src/main/java/org/opensearch/core/common/compress/package-info.java libs/compress/src/main/java/org/opensearch/compress/package-info.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,7 @@
66
* compatible open source license.
77
*/
88

9-
/** Classes for core compress module */
10-
package org.opensearch.core.common.compress;
9+
/**
10+
* Concrete {@link org.opensearch.core.compress.Compressor} implementations
11+
*/
12+
package org.opensearch.compress;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.compress.spi;
10+
11+
import org.opensearch.compress.ZstdCompressor;
12+
import org.opensearch.core.compress.Compressor;
13+
import org.opensearch.core.compress.spi.CompressorProvider;
14+
15+
import java.util.AbstractMap.SimpleEntry;
16+
import java.util.List;
17+
import java.util.Map.Entry;
18+
19+
/**
20+
* Additional "optional" compressor implementations provided by the opensearch compress library
21+
*
22+
* @opensearch.internal
23+
*/
24+
public class CompressionProvider implements CompressorProvider {
25+
26+
/** Returns the concrete {@link Compressor}s provided by the compress library */
27+
@SuppressWarnings({ "unchecked", "rawtypes" })
28+
@Override
29+
public List<Entry<String, Compressor>> getCompressors() {
30+
return List.of(new SimpleEntry<>(ZstdCompressor.NAME, new ZstdCompressor()));
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/**
10+
* Service Provider Interface for registering concrete {@link org.opensearch.core.compress.Compressor}
11+
* implementations.
12+
*
13+
* See {@link org.opensearch.compress.ZstdCompressor}
14+
*/
15+
package org.opensearch.compress.spi;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/**
10+
* This is the compress library for registering optional
11+
* {@link org.opensearch.core.compress.Compressor} implementations
12+
*/
13+
package org.opensearch;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#
2+
# SPDX-License-Identifier: Apache-2.0
3+
#
4+
# The OpenSearch Contributors require contributions made to
5+
# this file be licensed under the Apache-2.0 license or a
6+
# compatible open source license.
7+
#
8+
9+
org.opensearch.compress.spi.CompressionProvider

server/src/test/java/org/opensearch/common/compress/ZstdCompressTests.java libs/compress/src/test/java/org/opensearch/compress/ZstdCompressTests.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,20 @@
66
* compatible open source license.
77
*/
88

9-
package org.opensearch.common.compress;
9+
package org.opensearch.compress;
1010

11-
import org.opensearch.core.common.compress.Compressor;
11+
import org.opensearch.core.compress.Compressor;
12+
import org.opensearch.test.core.compress.AbstractCompressorTestCase;
1213

1314
/**
1415
* Test streaming compression
1516
*/
16-
public class ZstdCompressTests extends AbstractCompressorTests {
17+
public class ZstdCompressTests extends AbstractCompressorTestCase {
1718

1819
private final Compressor compressor = new ZstdCompressor();
1920

2021
@Override
21-
Compressor compressor() {
22+
protected Compressor compressor() {
2223
return compressor;
2324
}
2425
}

libs/core/src/main/java/org/opensearch/core/common/compress/Compressor.java libs/core/src/main/java/org/opensearch/core/compress/Compressor.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,29 @@
3030
* GitHub history for details.
3131
*/
3232

33-
package org.opensearch.core.common.compress;
33+
package org.opensearch.core.compress;
3434

35+
import org.opensearch.common.annotation.ExperimentalApi;
36+
import org.opensearch.common.annotation.PublicApi;
3537
import org.opensearch.core.common.bytes.BytesReference;
3638

3739
import java.io.IOException;
3840
import java.io.InputStream;
3941
import java.io.OutputStream;
4042

4143
/**
42-
* Compressor interface
44+
* Compressor interface used for compressing {@link org.opensearch.core.xcontent.MediaType} and
45+
* {@code org.opensearch.repositories.blobstore.BlobStoreRepository} implementations.
4346
*
44-
* @opensearch.internal
47+
* This is not to be confused with {@link org.apache.lucene.codecs.compressing.Compressor} which is used
48+
* for codec implementations such as {@code org.opensearch.index.codec.customcodecs.Lucene95CustomCodec}
49+
* for compressing {@link org.apache.lucene.document.StoredField}s
50+
*
51+
* @opensearch.api - intended to be extended
52+
* @opensearch.experimental - however, bwc is not guaranteed at this time
4553
*/
54+
@ExperimentalApi
55+
@PublicApi(since = "2.10.0")
4656
public interface Compressor {
4757

4858
boolean isCompressed(BytesReference bytes);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.core.compress;
10+
11+
import org.opensearch.common.Nullable;
12+
import org.opensearch.common.annotation.InternalApi;
13+
import org.opensearch.core.common.bytes.BytesReference;
14+
import org.opensearch.core.compress.spi.CompressorProvider;
15+
import org.opensearch.core.xcontent.MediaTypeRegistry;
16+
17+
import java.io.IOException;
18+
import java.util.Map;
19+
import java.util.Objects;
20+
import java.util.ServiceLoader;
21+
import java.util.stream.Collectors;
22+
23+
/**
24+
* A registry that wraps a static Map singleton which holds a mapping of unique String names (typically the
25+
* compressor header as a string) to registerd {@link Compressor} implementations.
26+
*
27+
* This enables plugins, modules, extensions to register their own compression implementations through SPI
28+
*
29+
* @opensearch.experimental
30+
* @opensearch.internal
31+
*/
32+
@InternalApi
33+
public final class CompressorRegistry {
34+
35+
// the backing registry map
36+
private static final Map<String, Compressor> registeredCompressors = ServiceLoader.load(
37+
CompressorProvider.class,
38+
CompressorProvider.class.getClassLoader()
39+
)
40+
.stream()
41+
.flatMap(p -> p.get().getCompressors().stream())
42+
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
43+
44+
// no instance:
45+
private CompressorRegistry() {}
46+
47+
/**
48+
* Returns the default compressor
49+
*/
50+
public static Compressor defaultCompressor() {
51+
return registeredCompressors.get("DEFLATE");
52+
}
53+
54+
public static Compressor none() {
55+
return registeredCompressors.get(NoneCompressor.NAME);
56+
}
57+
58+
public static boolean isCompressed(BytesReference bytes) {
59+
return compressor(bytes) != null;
60+
}
61+
62+
@Nullable
63+
public static Compressor compressor(final BytesReference bytes) {
64+
for (Compressor compressor : registeredCompressors.values()) {
65+
if (compressor.isCompressed(bytes) == true) {
66+
// bytes should be either detected as compressed or as xcontent,
67+
// if we have bytes that can be either detected as compressed or
68+
// as a xcontent, we have a problem
69+
assert MediaTypeRegistry.xContentType(bytes) == null;
70+
return compressor;
71+
}
72+
}
73+
74+
if (MediaTypeRegistry.xContentType(bytes) == null) {
75+
throw new NotXContentException("Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes");
76+
}
77+
78+
return null;
79+
}
80+
81+
/** Decompress the provided {@link BytesReference}. */
82+
public static BytesReference uncompress(BytesReference bytes) throws IOException {
83+
Compressor compressor = compressor(bytes);
84+
if (compressor == null) {
85+
throw new NotCompressedException();
86+
}
87+
return compressor.uncompress(bytes);
88+
}
89+
90+
/**
91+
* Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(BytesReference)}.
92+
*/
93+
public static BytesReference uncompressIfNeeded(BytesReference bytes) throws IOException {
94+
Compressor compressor = compressor(Objects.requireNonNull(bytes, "the BytesReference must not be null"));
95+
return compressor == null ? bytes : compressor.uncompress(bytes);
96+
}
97+
98+
/** Returns a registered compressor by its registered name */
99+
public static Compressor getCompressor(final String name) {
100+
if (registeredCompressors.containsKey(name)) {
101+
return registeredCompressors.get(name);
102+
}
103+
throw new IllegalArgumentException("No registered compressor found by name [" + name + "]");
104+
}
105+
106+
/**
107+
* Returns the registered compressors as an Immutable collection
108+
*
109+
* note: used for testing
110+
*/
111+
public static Map<String, Compressor> registeredCompressors() {
112+
// no destructive danger as backing map is immutable
113+
return registeredCompressors;
114+
}
115+
}

server/src/main/java/org/opensearch/common/compress/NoneCompressor.java libs/core/src/main/java/org/opensearch/core/compress/NoneCompressor.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
* compatible open source license.
77
*/
88

9-
package org.opensearch.common.compress;
9+
package org.opensearch.core.compress;
1010

11+
import org.opensearch.common.annotation.PublicApi;
1112
import org.opensearch.core.common.bytes.BytesReference;
12-
import org.opensearch.core.common.compress.Compressor;
1313

1414
import java.io.IOException;
1515
import java.io.InputStream;
@@ -18,9 +18,18 @@
1818
/**
1919
* {@link Compressor} no compressor implementation.
2020
*
21-
* @opensearch.internal
21+
* @opensearch.api - registered name requires BWC support
22+
* @opensearch.experimental - class methods might change
2223
*/
2324
public class NoneCompressor implements Compressor {
25+
/**
26+
* The name to register the compressor by
27+
*
28+
* @opensearch.api - requires BWC support
29+
*/
30+
@PublicApi(since = "2.10.0")
31+
public static final String NAME = "NONE";
32+
2433
@Override
2534
public boolean isCompressed(BytesReference bytes) {
2635
return false;

server/src/main/java/org/opensearch/common/compress/NotCompressedException.java libs/core/src/main/java/org/opensearch/core/compress/NotCompressedException.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
* GitHub history for details.
3131
*/
3232

33-
package org.opensearch.common.compress;
33+
package org.opensearch.core.compress;
3434

3535
/**
3636
* Exception indicating that we were expecting something compressed, which

0 commit comments

Comments
 (0)