Skip to content

Commit f09ba53

Browse files
committedMay 30, 2024
Add hardware-accelerated codecs for DEFLATE and LZ4 (opensearch-project#122)
* Add QAT accelerated compression. Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> * Use own classes for QAT codec. Apply SpotlessJavaCheck. Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> * Declare fields final, unless required not to. Throw a valid type of exception. Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> * Use assumeThat in the Qat test classes. Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> * Add more QAT availability check in QatCodecTests. Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> * Make LZ4 the default algorithm for QAT. Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> * Make 'auto' the default execution mode for QAT. Also, minor clean up work. Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> * Revert compression level for ZSTD to 3. Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> * Replace QatLz4/DeflateCompressionMode classes with QatCompressionMode. Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> * Fix a MultiCodecMergeIT test fail. Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> * Remove hard-coded values for default compression level. Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> --------- Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> Signed-off-by: mulugetam <mulugeta.mammo@intel.com> Co-authored-by: Mulugeta Mammo <cppx86@gmail.com> (cherry picked from commit c8b0d80)
1 parent 7eb55e6 commit f09ba53

34 files changed

+1649
-76
lines changed
 

‎build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ opensearchplugin {
7272

7373
dependencies {
7474
api "com.github.luben:zstd-jni:1.5.5-5"
75+
api "com.intel.qat:qat-java:1.1.1"
7576
}
7677

7778
allprojects {

‎licenses/qat-java-1.1.1.jar.sha1

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3333601cdedf6a711d445118d5bc44ec6a9c65f9

‎licenses/qat-java-LICENSE.txt

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
-----------------------------------------------------------------------------
2+
** Beginning of "BSD License" text. **
3+
4+
Qat-Java: Qat-Java is a compression library that uses Intel® QAT to accelerate
5+
compression and decompression.
6+
7+
Copyright(c) 2007-2023 Intel Corporation. All rights reserved.
8+
All rights reserved.
9+
10+
BSD License
11+
12+
Redistribution and use in source and binary forms, with or without
13+
modification, are permitted provided that the following conditions
14+
are met:
15+
16+
* Redistributions of source code must retain the above copyright
17+
notice, this list of conditions and the following disclaimer.
18+
* Redistributions in binary form must reproduce the above copyright
19+
notice, this list of conditions and the following disclaimer in
20+
the documentation and/or other materials provided with the
21+
distribution.
22+
* Neither the name of Intel Corporation nor the names of its
23+
contributors may be used to endorse or promote products derived
24+
from this software without specific prior written permission.
25+
26+
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
27+
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
28+
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
29+
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
30+
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
31+
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
32+
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
33+
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
34+
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
35+
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
36+
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

‎licenses/qat-java-NOTICE.txt

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Qat-Java is a compression library that uses Intel® QAT to accelerate compression and decompression.

‎src/integrationTest/java/org/opensearch/index/codec/rest/CreateIndexWithCodecIT.java

+32-3
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,25 @@
2222
import org.opensearch.cluster.metadata.IndexMetadata;
2323
import org.opensearch.common.settings.Settings;
2424
import org.opensearch.core.common.Strings;
25+
import org.opensearch.index.codec.customcodecs.QatZipperFactory;
2526
import org.opensearch.test.rest.OpenSearchRestTestCase;
2627

28+
import javax.net.ssl.SSLContext;
29+
2730
import java.io.IOException;
2831
import java.security.KeyManagementException;
2932
import java.security.KeyStoreException;
3033
import java.security.NoSuchAlgorithmException;
31-
3234
import java.util.Objects;
3335

34-
import javax.net.ssl.SSLContext;
35-
36+
import static org.opensearch.client.RestClientBuilder.DEFAULT_MAX_CONN_PER_ROUTE;
37+
import static org.opensearch.client.RestClientBuilder.DEFAULT_MAX_CONN_TOTAL;
38+
import static org.opensearch.index.codec.customcodecs.CustomCodecService.QAT_DEFLATE_CODEC;
39+
import static org.opensearch.index.codec.customcodecs.CustomCodecService.QAT_LZ4_CODEC;
3640
import static org.opensearch.index.codec.customcodecs.CustomCodecService.ZSTD_CODEC;
3741
import static org.opensearch.index.codec.customcodecs.CustomCodecService.ZSTD_NO_DICT_CODEC;
42+
import static org.hamcrest.Matchers.is;
43+
import static org.junit.Assume.assumeThat;
3844

3945
public class CreateIndexWithCodecIT extends OpenSearchRestTestCase {
4046
public void testCreateIndexWithZstdCodec() throws IOException {
@@ -58,6 +64,29 @@ public void testCreateIndexWithZstdCodec() throws IOException {
5864
}
5965
}
6066

67+
public void testCreateIndexWithQatCodec() throws IOException {
68+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
69+
70+
final String index = "custom-codecs-test-index";
71+
72+
// creating index
73+
createIndex(
74+
index,
75+
Settings.builder()
76+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
77+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
78+
.put("index.codec", randomFrom(QAT_DEFLATE_CODEC, QAT_LZ4_CODEC))
79+
.put("index.codec.compression_level", randomIntBetween(1, 6))
80+
.build()
81+
);
82+
83+
try {
84+
ensureGreen(index);
85+
} finally {
86+
deleteIndex(index);
87+
}
88+
}
89+
6190
@Override
6291
protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException {
6392
RestClientBuilder builder = RestClient.builder(hosts);

‎src/internalClusterTest/java/org/opensearch/index/codec/CodecCompressionLevelIT.java

+131
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,21 @@
1313
import org.opensearch.cluster.metadata.IndexMetadata;
1414
import org.opensearch.common.settings.Settings;
1515
import org.opensearch.index.codec.customcodecs.CustomCodecPlugin;
16+
import org.opensearch.index.codec.customcodecs.QatZipperFactory;
1617
import org.opensearch.plugins.Plugin;
1718
import org.opensearch.test.OpenSearchIntegTestCase;
1819

1920
import java.util.Collection;
2021
import java.util.Collections;
2122
import java.util.concurrent.ExecutionException;
2223

24+
import static org.opensearch.index.codec.customcodecs.CustomCodecService.QAT_DEFLATE_CODEC;
25+
import static org.opensearch.index.codec.customcodecs.CustomCodecService.QAT_LZ4_CODEC;
2326
import static org.opensearch.index.codec.customcodecs.CustomCodecService.ZSTD_CODEC;
2427
import static org.opensearch.index.codec.customcodecs.CustomCodecService.ZSTD_NO_DICT_CODEC;
2528
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
29+
import static org.hamcrest.Matchers.is;
30+
import static org.junit.Assume.assumeThat;
2631

2732
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
2833
public class CodecCompressionLevelIT extends OpenSearchIntegTestCase {
@@ -80,6 +85,26 @@ public void testZStandardCodecsCreateIndexWithCompressionLevel() {
8085
ensureGreen(index);
8186
}
8287

88+
public void testQatCodecsCreateIndexWithCompressionLevel() {
89+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
90+
91+
internalCluster().ensureAtLeastNumDataNodes(1);
92+
final String index = "test-index";
93+
94+
// creating index
95+
createIndex(
96+
index,
97+
Settings.builder()
98+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
99+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
100+
.put("index.codec", randomFrom(QAT_DEFLATE_CODEC, QAT_LZ4_CODEC))
101+
.put("index.codec.compression_level", randomIntBetween(1, 6))
102+
.build()
103+
);
104+
105+
ensureGreen(index);
106+
}
107+
83108
public void testZStandardToLuceneCodecsWithCompressionLevel() throws ExecutionException, InterruptedException {
84109

85110
internalCluster().ensureAtLeastNumDataNodes(1);
@@ -132,6 +157,59 @@ public void testZStandardToLuceneCodecsWithCompressionLevel() throws ExecutionEx
132157
ensureGreen(index);
133158
}
134159

160+
public void testQatToLuceneCodecsWithCompressionLevel() throws ExecutionException, InterruptedException {
161+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
162+
163+
internalCluster().ensureAtLeastNumDataNodes(1);
164+
final String index = "test-index";
165+
166+
// creating index
167+
createIndex(
168+
index,
169+
Settings.builder()
170+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
171+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
172+
.put("index.codec", randomFrom(QAT_DEFLATE_CODEC, QAT_LZ4_CODEC))
173+
.put("index.codec.compression_level", randomIntBetween(1, 6))
174+
.build()
175+
);
176+
ensureGreen(index);
177+
178+
assertAcked(client().admin().indices().prepareClose(index).setWaitForActiveShards(1));
179+
180+
Throwable executionException = expectThrows(
181+
ExecutionException.class,
182+
() -> client().admin()
183+
.indices()
184+
.updateSettings(
185+
new UpdateSettingsRequest(index).settings(
186+
Settings.builder().put("index.codec", randomFrom(CodecService.DEFAULT_CODEC, CodecService.BEST_COMPRESSION_CODEC))
187+
)
188+
)
189+
.get()
190+
);
191+
192+
Throwable rootCause = Throwables.getRootCause(executionException);
193+
assertEquals(IllegalArgumentException.class, rootCause.getClass());
194+
assertTrue(rootCause.getMessage().startsWith("Compression level cannot be set"));
195+
196+
assertAcked(
197+
client().admin()
198+
.indices()
199+
.updateSettings(
200+
new UpdateSettingsRequest(index).settings(
201+
Settings.builder()
202+
.put("index.codec", randomFrom(CodecService.DEFAULT_CODEC, CodecService.BEST_COMPRESSION_CODEC))
203+
.put("index.codec.compression_level", (String) null)
204+
)
205+
)
206+
.get()
207+
);
208+
209+
assertAcked(client().admin().indices().prepareOpen(index).setWaitForActiveShards(1));
210+
ensureGreen(index);
211+
}
212+
135213
public void testLuceneToZStandardCodecsWithCompressionLevel() throws ExecutionException, InterruptedException {
136214

137215
internalCluster().ensureAtLeastNumDataNodes(1);
@@ -185,4 +263,57 @@ public void testLuceneToZStandardCodecsWithCompressionLevel() throws ExecutionEx
185263
ensureGreen(index);
186264
}
187265

266+
public void testLuceneToQatCodecsWithCompressionLevel() throws ExecutionException, InterruptedException {
267+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
268+
269+
internalCluster().ensureAtLeastNumDataNodes(1);
270+
final String index = "test-index";
271+
272+
// creating index
273+
createIndex(
274+
index,
275+
Settings.builder()
276+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
277+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
278+
.put("index.codec", randomFrom(CodecService.DEFAULT_CODEC, CodecService.BEST_COMPRESSION_CODEC))
279+
.build()
280+
);
281+
ensureGreen(index);
282+
283+
assertAcked(client().admin().indices().prepareClose(index).setWaitForActiveShards(1));
284+
285+
Throwable executionException = expectThrows(
286+
ExecutionException.class,
287+
() -> client().admin()
288+
.indices()
289+
.updateSettings(
290+
new UpdateSettingsRequest(index).settings(
291+
Settings.builder()
292+
.put("index.codec", randomFrom(CodecService.DEFAULT_CODEC, CodecService.BEST_COMPRESSION_CODEC))
293+
.put("index.codec.compression_level", randomIntBetween(1, 6))
294+
)
295+
)
296+
.get()
297+
);
298+
299+
Throwable rootCause = Throwables.getRootCause(executionException);
300+
assertEquals(IllegalArgumentException.class, rootCause.getClass());
301+
assertTrue(rootCause.getMessage().startsWith("Compression level cannot be set"));
302+
303+
assertAcked(
304+
client().admin()
305+
.indices()
306+
.updateSettings(
307+
new UpdateSettingsRequest(index).settings(
308+
Settings.builder()
309+
.put("index.codec", randomFrom(QAT_DEFLATE_CODEC, QAT_LZ4_CODEC))
310+
.put("index.codec.compression_level", randomIntBetween(1, 6))
311+
)
312+
)
313+
.get()
314+
);
315+
316+
assertAcked(client().admin().indices().prepareOpen(index).setWaitForActiveShards(1));
317+
ensureGreen(index);
318+
}
188319
}

‎src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecMergeIT.java

+16-15
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.opensearch.cluster.metadata.IndexMetadata;
1717
import org.opensearch.common.settings.Settings;
1818
import org.opensearch.index.codec.customcodecs.CustomCodecPlugin;
19+
import org.opensearch.index.codec.customcodecs.QatZipperFactory;
1920
import org.opensearch.index.engine.Segment;
2021
import org.opensearch.plugins.Plugin;
2122
import org.opensearch.test.OpenSearchIntegTestCase;
@@ -24,6 +25,7 @@
2425
import java.util.Arrays;
2526
import java.util.Collection;
2627
import java.util.Collections;
28+
import java.util.HashMap;
2729
import java.util.List;
2830
import java.util.Map;
2931
import java.util.UUID;
@@ -51,25 +53,24 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
5153

5254
public void testForceMergeMultipleCodecs() throws ExecutionException, InterruptedException {
5355

54-
Map<String, String> codecMap = Map.of(
55-
"best_compression",
56-
"BEST_COMPRESSION",
57-
"zlib",
58-
"BEST_COMPRESSION",
59-
"zstd_no_dict",
60-
"ZSTD_NO_DICT",
61-
"zstd",
62-
"ZSTD",
63-
"default",
64-
"BEST_SPEED",
65-
"lz4",
66-
"BEST_SPEED"
67-
);
56+
Map<String, String> codecMap = new HashMap<String, String>() {
57+
{
58+
put("best_compression", "BEST_COMPRESSION");
59+
put("zlib", "BEST_COMPRESSION");
60+
put("zstd_no_dict", "ZSTD_NO_DICT");
61+
put("zstd", "ZSTD");
62+
put("default", "BEST_SPEED");
63+
put("lz4", "BEST_SPEED");
64+
if (QatZipperFactory.isQatAvailable()) {
65+
put("qat_lz4", "QAT_LZ4");
66+
put("qat_deflate", "QAT_DEFLATE");
67+
}
68+
}
69+
};
6870

6971
for (Map.Entry<String, String> codec : codecMap.entrySet()) {
7072
forceMergeMultipleCodecs(codec.getKey(), codec.getValue(), codecMap);
7173
}
72-
7374
}
7475

7576
private void forceMergeMultipleCodecs(String finalCodec, String finalCodecMode, Map<String, String> codecMap) throws ExecutionException,

‎src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecPlugin.java

+18-6
Original file line numberDiff line numberDiff line change
@@ -8,28 +8,32 @@
88

99
package org.opensearch.index.codec.customcodecs;
1010

11+
import org.opensearch.common.settings.Setting;
1112
import org.opensearch.index.IndexSettings;
1213
import org.opensearch.index.codec.CodecServiceFactory;
1314
import org.opensearch.index.engine.EngineConfig;
1415
import org.opensearch.plugins.EnginePlugin;
1516
import org.opensearch.plugins.Plugin;
1617

18+
import java.util.Arrays;
19+
import java.util.List;
1720
import java.util.Optional;
1821

1922
/**
2023
* A plugin that implements custom codecs. Supports these codecs:
24+
*
2125
* <ul>
22-
* <li>ZSTD
23-
* <li>ZSTDNODICT
26+
* <li>ZSTD_CODEC
27+
* <li>ZSTD_NO_DICT_CODEC
28+
* <li>QAT_LZ4
29+
* <li>QAT_DEFLATE
2430
* </ul>
2531
*
2632
* @opensearch.internal
2733
*/
2834
public final class CustomCodecPlugin extends Plugin implements EnginePlugin {
2935

30-
/**
31-
* Creates a new instance
32-
*/
36+
/** Creates a new instance */
3337
public CustomCodecPlugin() {}
3438

3539
/**
@@ -39,9 +43,17 @@ public CustomCodecPlugin() {}
3943
@Override
4044
public Optional<CodecServiceFactory> getCustomCodecServiceFactory(final IndexSettings indexSettings) {
4145
String codecName = indexSettings.getValue(EngineConfig.INDEX_CODEC_SETTING);
42-
if (codecName.equals(CustomCodecService.ZSTD_NO_DICT_CODEC) || codecName.equals(CustomCodecService.ZSTD_CODEC)) {
46+
if (codecName.equals(CustomCodecService.ZSTD_NO_DICT_CODEC)
47+
|| codecName.equals(CustomCodecService.ZSTD_CODEC)
48+
|| codecName.equals(CustomCodecService.QAT_LZ4_CODEC)
49+
|| codecName.equals(CustomCodecService.QAT_DEFLATE_CODEC)) {
4350
return Optional.of(new CustomCodecServiceFactory());
4451
}
4552
return Optional.empty();
4653
}
54+
55+
@Override
56+
public List<Setting<?>> getSettings() {
57+
return Arrays.asList(Lucene99QatCodec.INDEX_CODEC_QAT_MODE_SETTING);
58+
}
4759
}

‎src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecService.java

+24-10
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,28 @@
2121

2222
import static org.opensearch.index.engine.EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING;
2323

24-
/**
25-
* CustomCodecService provides ZSTD and ZSTD_NO_DICT compression codecs.
26-
*/
24+
/** CustomCodecService provides ZSTD, ZSTD_NO_DICT, QAT_LZ4, and QAT_DEFLATE compression codecs. */
2725
public class CustomCodecService extends CodecService {
2826
private final Map<String, Codec> codecs;
29-
/**
30-
* ZStandard codec
31-
*/
27+
28+
/** ZStandard codec */
3229
public static final String ZSTD_CODEC = "zstd";
33-
/**
34-
* ZStandard without dictionary codec
35-
*/
30+
31+
/** ZStandard without dictionary codec */
3632
public static final String ZSTD_NO_DICT_CODEC = "zstd_no_dict";
3733

34+
/** Hardware accelerated (Intel QAT) compression codec for LZ4. */
35+
public static final String QAT_LZ4_CODEC = "qat_lz4";
36+
37+
/** Hardware accelerated (Intel QAT) compression codec for DEFLATE. */
38+
public static final String QAT_DEFLATE_CODEC = "qat_deflate";
39+
3840
/**
3941
* Creates a new CustomCodecService.
4042
*
4143
* @param mapperService The mapper service.
4244
* @param indexSettings The index settings.
43-
* @param logger The logger.
45+
* @param logger The logger.
4446
*/
4547
public CustomCodecService(MapperService mapperService, IndexSettings indexSettings, Logger logger) {
4648
super(mapperService, indexSettings, logger);
@@ -49,9 +51,21 @@ public CustomCodecService(MapperService mapperService, IndexSettings indexSettin
4951
if (mapperService == null) {
5052
codecs.put(ZSTD_CODEC, new Zstd99Codec(compressionLevel));
5153
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDict99Codec(compressionLevel));
54+
codecs.put(QAT_LZ4_CODEC, new QatLz499Codec(compressionLevel, () -> {
55+
return indexSettings.getValue(Lucene99QatCodec.INDEX_CODEC_QAT_MODE_SETTING);
56+
}));
57+
codecs.put(QAT_DEFLATE_CODEC, new QatDeflate99Codec(compressionLevel, () -> {
58+
return indexSettings.getValue(Lucene99QatCodec.INDEX_CODEC_QAT_MODE_SETTING);
59+
}));
5260
} else {
5361
codecs.put(ZSTD_CODEC, new Zstd99Codec(mapperService, logger, compressionLevel));
5462
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDict99Codec(mapperService, logger, compressionLevel));
63+
codecs.put(QAT_LZ4_CODEC, new QatLz499Codec(mapperService, logger, compressionLevel, () -> {
64+
return indexSettings.getValue(Lucene99QatCodec.INDEX_CODEC_QAT_MODE_SETTING);
65+
}));
66+
codecs.put(QAT_DEFLATE_CODEC, new QatDeflate99Codec(mapperService, logger, compressionLevel, () -> {
67+
return indexSettings.getValue(Lucene99QatCodec.INDEX_CODEC_QAT_MODE_SETTING);
68+
}));
5569
}
5670
this.codecs = codecs.immutableMap();
5771
}

‎src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecServiceFactory.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@
1212
import org.opensearch.index.codec.CodecServiceConfig;
1313
import org.opensearch.index.codec.CodecServiceFactory;
1414

15-
/**
16-
* A factory for creating new {@link CodecService} instance
17-
*/
15+
/** A factory for creating new {@link CodecService} instance */
1816
public class CustomCodecServiceFactory implements CodecServiceFactory {
1917

2018
/** Creates a new instance. */

‎src/main/java/org/opensearch/index/codec/customcodecs/Lucene99CustomCodec.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,14 @@
1212
import org.apache.lucene.codecs.FilterCodec;
1313
import org.apache.lucene.codecs.StoredFieldsFormat;
1414
import org.apache.lucene.codecs.lucene99.Lucene99Codec;
15+
import org.opensearch.common.settings.Settings;
1516
import org.opensearch.index.codec.PerFieldMappingPostingFormatCodec;
1617
import org.opensearch.index.mapper.MapperService;
1718

1819
import java.util.Set;
1920

21+
import static org.opensearch.index.engine.EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING;
22+
2023
/**
2124
*
2225
* Extends {@link FilterCodec} to reuse the functionality of Lucene Codec.
@@ -28,7 +31,7 @@
2831
public abstract class Lucene99CustomCodec extends FilterCodec {
2932

3033
/** Default compression level used for compression */
31-
public static final int DEFAULT_COMPRESSION_LEVEL = 3;
34+
public static final int DEFAULT_COMPRESSION_LEVEL = INDEX_CODEC_COMPRESSION_LEVEL_SETTING.getDefault(Settings.EMPTY);
3235

3336
/** Each mode represents a compression algorithm. */
3437
public enum Mode {

‎src/main/java/org/opensearch/index/codec/customcodecs/Lucene99CustomStoredFieldsFormat.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ StoredFieldsFormat impl(Lucene99CustomCodec.Mode mode) {
107107
case ZSTD_NO_DICT:
108108
return getCustomCompressingStoredFieldsFormat("CustomStoredFieldsZstdNoDict", this.zstdNoDictCompressionMode);
109109
default:
110-
throw new AssertionError();
110+
throw new IllegalStateException("Unsupported compression mode: " + mode);
111111
}
112112
}
113113

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.codec.customcodecs;
10+
11+
import org.apache.logging.log4j.Logger;
12+
import org.apache.lucene.codecs.FilterCodec;
13+
import org.apache.lucene.codecs.StoredFieldsFormat;
14+
import org.apache.lucene.codecs.lucene99.Lucene99Codec;
15+
import org.opensearch.common.settings.Setting;
16+
import org.opensearch.common.settings.Setting.Property;
17+
import org.opensearch.common.settings.Settings;
18+
import org.opensearch.index.codec.PerFieldMappingPostingFormatCodec;
19+
import org.opensearch.index.mapper.MapperService;
20+
21+
import java.util.Set;
22+
import java.util.function.Supplier;
23+
24+
import com.intel.qat.QatZipper;
25+
26+
import static org.opensearch.index.engine.EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING;
27+
28+
/**
29+
* Extends {@link FilterCodec} to reuse the functionality of Lucene Codec.
30+
*
31+
* @opensearch.internal
32+
*/
33+
public abstract class Lucene99QatCodec extends FilterCodec {
34+
35+
/** A setting to specifiy the QAT acceleration mode. */
36+
public static final Setting<QatZipper.Mode> INDEX_CODEC_QAT_MODE_SETTING = new Setting<>("index.codec.qatmode", "auto", s -> {
37+
switch (s) {
38+
case "auto":
39+
return QatZipper.Mode.AUTO;
40+
case "hardware":
41+
return QatZipper.Mode.HARDWARE;
42+
default:
43+
throw new IllegalArgumentException("Unknown value for [index.codec.qatmode] must be one of [auto, hardware] but was: " + s);
44+
}
45+
}, Property.IndexScope, Property.Dynamic);
46+
47+
/** A terse way to reference the default QAT execution mode. */
48+
public static final QatZipper.Mode DEFAULT_QAT_MODE = INDEX_CODEC_QAT_MODE_SETTING.getDefault(Settings.EMPTY);
49+
50+
/** Default compression level used for compression */
51+
public static final int DEFAULT_COMPRESSION_LEVEL = INDEX_CODEC_COMPRESSION_LEVEL_SETTING.getDefault(Settings.EMPTY);
52+
53+
/** Each mode represents a compression algorithm. */
54+
public enum Mode {
55+
/** QAT lz4 mode. */
56+
QAT_LZ4("QATLZ499", Set.of("qat_lz4")),
57+
58+
/** QAT deflate mode. */
59+
QAT_DEFLATE("QATDEFLATE99", Set.of("qat_deflate"));
60+
61+
private final String codec;
62+
private final Set<String> aliases;
63+
64+
Mode(String codec, Set<String> aliases) {
65+
this.codec = codec;
66+
this.aliases = aliases;
67+
}
68+
69+
/** Returns the Codec that is registered with Lucene */
70+
public String getCodec() {
71+
return codec;
72+
}
73+
74+
/** Returns the aliases of the Codec */
75+
public Set<String> getAliases() {
76+
return aliases;
77+
}
78+
}
79+
80+
/** The default compression mode. */
81+
public static final Mode DEFAULT_COMPRESSION_MODE = Mode.QAT_LZ4;
82+
83+
private final StoredFieldsFormat storedFieldsFormat;
84+
85+
/**
86+
* Creates a new compression codec with the default compression level.
87+
*
88+
* @param mode The compression codec (QAT_LZ4 or QAT_DEFLATE).
89+
*/
90+
public Lucene99QatCodec(Mode mode) {
91+
this(mode, DEFAULT_COMPRESSION_LEVEL);
92+
}
93+
94+
/**
95+
* Creates a new compression codec with the given compression level. We use lowercase letters when
96+
* registering the codec so that we remain consistent with the other compression codecs: default,
97+
* lucene_default, and best_compression.
98+
*
99+
* @param mode The compression codec (QAT_LZ4 or QAT_DEFLATE).
100+
* @param compressionLevel The compression level.
101+
*/
102+
public Lucene99QatCodec(Mode mode, int compressionLevel) {
103+
super(mode.getCodec(), new Lucene99Codec());
104+
this.storedFieldsFormat = new Lucene99QatStoredFieldsFormat(mode, compressionLevel);
105+
}
106+
107+
/**
108+
* Creates a new compression codec with the given compression level. We use lowercase letters when
109+
* registering the codec so that we remain consistent with the other compression codecs: default,
110+
* lucene_default, and best_compression.
111+
*
112+
* @param mode The compression codec (QAT_LZ4 or QAT_DEFLATE).
113+
* @param compressionLevel The compression level.
114+
* @param supplier supplier for QAT mode.
115+
*/
116+
public Lucene99QatCodec(Mode mode, int compressionLevel, Supplier<QatZipper.Mode> supplier) {
117+
super(mode.getCodec(), new Lucene99Codec());
118+
this.storedFieldsFormat = new Lucene99QatStoredFieldsFormat(mode, compressionLevel, supplier);
119+
}
120+
121+
/**
122+
* Creates a new compression codec with the given compression level. We use lowercase letters when
123+
* registering the codec so that we remain consistent with the other compression codecs: default,
124+
* lucene_default, and best_compression.
125+
*
126+
* @param mode The compression codec (QAT_LZ4 or QAT_DEFLATE).
127+
* @param compressionLevel The compression level.
128+
* @param mapperService The mapper service.
129+
* @param logger The logger.
130+
*/
131+
public Lucene99QatCodec(Mode mode, int compressionLevel, MapperService mapperService, Logger logger) {
132+
super(mode.getCodec(), new PerFieldMappingPostingFormatCodec(Lucene99Codec.Mode.BEST_SPEED, mapperService, logger));
133+
this.storedFieldsFormat = new Lucene99QatStoredFieldsFormat(mode, compressionLevel);
134+
}
135+
136+
/**
137+
* Creates a new compression codec with the given compression level. We use lowercase letters when
138+
* registering the codec so that we remain consistent with the other compression codecs: default,
139+
* lucene_default, and best_compression.
140+
*
141+
* @param mode The compression codec (QAT_LZ4 or QAT_DEFLATE).
142+
* @param compressionLevel The compression level.
143+
* @param mapperService The mapper service.
144+
* @param logger The logger.
145+
* @param supplier supplier for QAT mode.
146+
*/
147+
public Lucene99QatCodec(
148+
Mode mode,
149+
int compressionLevel,
150+
MapperService mapperService,
151+
Logger logger,
152+
Supplier<QatZipper.Mode> supplier
153+
) {
154+
super(mode.getCodec(), new PerFieldMappingPostingFormatCodec(Lucene99Codec.Mode.BEST_SPEED, mapperService, logger));
155+
this.storedFieldsFormat = new Lucene99QatStoredFieldsFormat(mode, compressionLevel, supplier);
156+
}
157+
158+
@Override
159+
public StoredFieldsFormat storedFieldsFormat() {
160+
return storedFieldsFormat;
161+
}
162+
163+
@Override
164+
public String toString() {
165+
return getClass().getSimpleName();
166+
}
167+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.codec.customcodecs;
10+
11+
import org.apache.lucene.codecs.StoredFieldsFormat;
12+
import org.apache.lucene.codecs.StoredFieldsReader;
13+
import org.apache.lucene.codecs.StoredFieldsWriter;
14+
import org.apache.lucene.codecs.compressing.CompressionMode;
15+
import org.apache.lucene.codecs.lucene90.compressing.Lucene90CompressingStoredFieldsFormat;
16+
import org.apache.lucene.index.FieldInfos;
17+
import org.apache.lucene.index.SegmentInfo;
18+
import org.apache.lucene.store.Directory;
19+
import org.apache.lucene.store.IOContext;
20+
21+
import java.io.IOException;
22+
import java.util.Objects;
23+
import java.util.function.Supplier;
24+
25+
import com.intel.qat.QatZipper;
26+
27+
/** Stored field format used by pluggable codec */
28+
public class Lucene99QatStoredFieldsFormat extends StoredFieldsFormat {
29+
30+
/** A key that we use to map to a mode */
31+
public static final String MODE_KEY = Lucene99QatStoredFieldsFormat.class.getSimpleName() + ".mode";
32+
33+
private static final int QAT_DEFLATE_BLOCK_LENGTH = 10 * 48 * 1024;
34+
private static final int QAT_DEFLATE_MAX_DOCS_PER_BLOCK = 4096;
35+
private static final int QAT_DEFLATE_BLOCK_SHIFT = 10;
36+
37+
private static final int QAT_LZ4_BLOCK_LENGTH = 10 * 8 * 1024;
38+
private static final int QAT_LZ4_MAX_DOCS_PER_BLOCK = 4096;
39+
private static final int QAT_LZ4_BLOCK_SHIFT = 10;
40+
41+
private final QatCompressionMode qatCompressionMode;
42+
private final Lucene99QatCodec.Mode mode;
43+
44+
/** default constructor */
45+
public Lucene99QatStoredFieldsFormat() {
46+
this(Lucene99QatCodec.DEFAULT_COMPRESSION_MODE, Lucene99QatCodec.DEFAULT_COMPRESSION_LEVEL);
47+
}
48+
49+
/**
50+
* Creates a new instance.
51+
*
52+
* @param mode The mode represents QAT_LZ4 or QAT_DEFLATE
53+
*/
54+
public Lucene99QatStoredFieldsFormat(Lucene99QatCodec.Mode mode) {
55+
this(mode, Lucene99QatCodec.DEFAULT_COMPRESSION_LEVEL);
56+
}
57+
58+
/**
59+
* Creates a new instance with the specified mode and compression level.
60+
*
61+
* @param mode The mode represents QAT_LZ4 or QAT_DEFLATE
62+
* @param compressionLevel The compression level for the mode.
63+
*/
64+
public Lucene99QatStoredFieldsFormat(Lucene99QatCodec.Mode mode, int compressionLevel) {
65+
this(mode, compressionLevel, () -> { return Lucene99QatCodec.DEFAULT_QAT_MODE; });
66+
}
67+
68+
/**
69+
* Creates a new instance.
70+
*
71+
* @param mode The mode represents QAT_LZ4 or QAT_DEFLATE
72+
* @param supplier a supplier for QAT acceleration mode.
73+
*/
74+
public Lucene99QatStoredFieldsFormat(Lucene99QatCodec.Mode mode, Supplier<QatZipper.Mode> supplier) {
75+
this(mode, Lucene99QatCodec.DEFAULT_COMPRESSION_LEVEL, supplier);
76+
}
77+
78+
/**
79+
* Creates a new instance with the specified mode and compression level.
80+
*
81+
* @param mode The mode represents QAT_LZ4 or QAT_DEFLATE
82+
* @param compressionLevel The compression level for the mode.
83+
* @param supplier a supplier for QAT acceleration mode.
84+
*/
85+
public Lucene99QatStoredFieldsFormat(Lucene99QatCodec.Mode mode, int compressionLevel, Supplier<QatZipper.Mode> supplier) {
86+
this.mode = Objects.requireNonNull(mode);
87+
qatCompressionMode = new QatCompressionMode(mode, compressionLevel, supplier);
88+
}
89+
90+
/**
91+
* Returns a {@link StoredFieldsReader} to load stored fields.
92+
*
93+
* @param directory The index directory.
94+
* @param si The SegmentInfo that stores segment information.
95+
* @param fn The fieldInfos.
96+
* @param context The IOContext that holds additional details on the merge/search context.
97+
*/
98+
@Override
99+
public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo si, FieldInfos fn, IOContext context) throws IOException {
100+
if (si.getAttribute(MODE_KEY) != null) {
101+
String value = si.getAttribute(MODE_KEY);
102+
Lucene99QatCodec.Mode mode = Lucene99QatCodec.Mode.valueOf(value);
103+
return impl(mode).fieldsReader(directory, si, fn, context);
104+
} else {
105+
throw new IllegalStateException("missing value for " + MODE_KEY + " for segment: " + si.name);
106+
}
107+
}
108+
109+
/**
110+
* Returns a {@link StoredFieldsReader} to write stored fields.
111+
*
112+
* @param directory The index directory.
113+
* @param si The SegmentInfo that stores segment information.
114+
* @param context The IOContext that holds additional details on the merge/search context.
115+
*/
116+
@Override
117+
public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo si, IOContext context) throws IOException {
118+
String previous = si.putAttribute(MODE_KEY, mode.name());
119+
if (previous != null && previous.equals(mode.name()) == false) {
120+
throw new IllegalStateException(
121+
"found existing value for " + MODE_KEY + " for segment: " + si.name + " old = " + previous + ", new = " + mode.name()
122+
);
123+
}
124+
return impl(mode).fieldsWriter(directory, si, context);
125+
}
126+
127+
private StoredFieldsFormat impl(Lucene99QatCodec.Mode mode) {
128+
switch (mode) {
129+
case QAT_LZ4:
130+
return getQatCompressingStoredFieldsFormat(
131+
"QatStoredFieldsLz4",
132+
qatCompressionMode,
133+
QAT_LZ4_BLOCK_LENGTH,
134+
QAT_LZ4_MAX_DOCS_PER_BLOCK,
135+
QAT_LZ4_BLOCK_SHIFT
136+
);
137+
case QAT_DEFLATE:
138+
return getQatCompressingStoredFieldsFormat(
139+
"QatStoredFieldsDeflate",
140+
qatCompressionMode,
141+
QAT_DEFLATE_BLOCK_LENGTH,
142+
QAT_DEFLATE_MAX_DOCS_PER_BLOCK,
143+
QAT_DEFLATE_BLOCK_SHIFT
144+
);
145+
default:
146+
throw new IllegalStateException("Unsupported compression mode: " + mode);
147+
}
148+
}
149+
150+
private StoredFieldsFormat getQatCompressingStoredFieldsFormat(
151+
String formatName,
152+
CompressionMode compressionMode,
153+
int blockSize,
154+
int maxDocs,
155+
int blockShift
156+
) {
157+
return new Lucene90CompressingStoredFieldsFormat(formatName, compressionMode, blockSize, maxDocs, blockShift);
158+
}
159+
160+
/**
161+
* Gets the mode of compression.
162+
*
163+
* @return either QAT_LZ4 or QAT_DEFLATE
164+
*/
165+
public Lucene99QatCodec.Mode getMode() {
166+
return mode;
167+
}
168+
169+
/**
170+
*
171+
* @return the CompressionMode instance.
172+
*/
173+
public QatCompressionMode getCompressionMode() {
174+
return qatCompressionMode;
175+
}
176+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.codec.customcodecs;
10+
11+
import org.apache.lucene.codecs.compressing.CompressionMode;
12+
import org.apache.lucene.codecs.compressing.Compressor;
13+
import org.apache.lucene.codecs.compressing.Decompressor;
14+
import org.apache.lucene.store.ByteBuffersDataInput;
15+
import org.apache.lucene.store.DataInput;
16+
import org.apache.lucene.store.DataOutput;
17+
import org.apache.lucene.util.ArrayUtil;
18+
import org.apache.lucene.util.BytesRef;
19+
20+
import java.io.IOException;
21+
import java.util.function.Supplier;
22+
23+
import com.intel.qat.QatZipper;
24+
25+
/** QatCompressionMode offers QAT_LZ4 and QAT_DEFLATE compressors. */
26+
public class QatCompressionMode extends CompressionMode {
27+
28+
private static final int NUM_SUB_BLOCKS = 10;
29+
30+
private final QatZipper.Algorithm algorithm;
31+
private final int compressionLevel;
32+
private final Supplier<QatZipper.Mode> supplier;
33+
34+
/** default constructor */
35+
protected QatCompressionMode() {
36+
this(Lucene99QatCodec.DEFAULT_COMPRESSION_MODE, Lucene99QatCodec.DEFAULT_COMPRESSION_LEVEL, () -> {
37+
return Lucene99QatCodec.DEFAULT_QAT_MODE;
38+
});
39+
}
40+
41+
/**
42+
* Creates a new instance.
43+
*
44+
* @param mode The compression mode (QAT_LZ4 or QAT_DEFLATE)
45+
*/
46+
protected QatCompressionMode(Lucene99QatCodec.Mode mode) {
47+
this(mode, Lucene99QatCodec.DEFAULT_COMPRESSION_LEVEL, () -> { return Lucene99QatCodec.DEFAULT_QAT_MODE; });
48+
}
49+
50+
/**
51+
* Creates a new instance.
52+
*
53+
* @param mode The compression mode (QAT_LZ4 or QAT_DEFLATE)
54+
* @param compressionLevel The compression level to use.
55+
*/
56+
protected QatCompressionMode(Lucene99QatCodec.Mode mode, int compressionLevel) {
57+
this(mode, compressionLevel, () -> { return Lucene99QatCodec.DEFAULT_QAT_MODE; });
58+
}
59+
60+
/**
61+
* Creates a new instance.
62+
*
63+
* @param mode The compression mode (QAT_LZ4 or QAT_DEFLATE)
64+
* @param compressionLevel The compression level to use.
65+
* @param supplier a supplier for QAT acceleration mode.
66+
*/
67+
protected QatCompressionMode(Lucene99QatCodec.Mode mode, int compressionLevel, Supplier<QatZipper.Mode> supplier) {
68+
this.algorithm = mode == Lucene99QatCodec.Mode.QAT_LZ4 ? QatZipper.Algorithm.LZ4 : QatZipper.Algorithm.DEFLATE;
69+
this.compressionLevel = compressionLevel;
70+
this.supplier = supplier;
71+
}
72+
73+
@Override
74+
public Compressor newCompressor() {
75+
return new QatCompressor(algorithm, compressionLevel, supplier.get());
76+
}
77+
78+
@Override
79+
public Decompressor newDecompressor() {
80+
return new QatDecompressor(algorithm, supplier.get());
81+
}
82+
83+
public int getCompressionLevel() {
84+
return compressionLevel;
85+
}
86+
87+
/** The QatCompressor. */
88+
private static final class QatCompressor extends Compressor {
89+
90+
private byte[] compressedBuffer;
91+
private final QatZipper qatZipper;
92+
93+
/** compressor with a given compresion level */
94+
public QatCompressor(QatZipper.Algorithm algorithm, int compressionLevel, QatZipper.Mode qatMode) {
95+
compressedBuffer = BytesRef.EMPTY_BYTES;
96+
qatZipper = QatZipperFactory.createInstance(algorithm, compressionLevel, qatMode, QatZipper.PollingMode.PERIODICAL);
97+
}
98+
99+
private void compress(byte[] bytes, int offset, int length, DataOutput out) throws IOException {
100+
assert offset >= 0 : "Offset value must be greater than 0.";
101+
102+
int blockLength = (length + NUM_SUB_BLOCKS - 1) / NUM_SUB_BLOCKS;
103+
out.writeVInt(blockLength);
104+
105+
final int end = offset + length;
106+
assert end >= 0 : "Buffer read size must be greater than 0.";
107+
108+
for (int start = offset; start < end; start += blockLength) {
109+
int l = Math.min(blockLength, end - start);
110+
111+
if (l == 0) {
112+
out.writeVInt(0);
113+
return;
114+
}
115+
116+
final int maxCompressedLength = qatZipper.maxCompressedLength(l);
117+
compressedBuffer = ArrayUtil.grow(compressedBuffer, maxCompressedLength);
118+
119+
int compressedSize = qatZipper.compress(bytes, start, l, compressedBuffer, 0, compressedBuffer.length);
120+
out.writeVInt(compressedSize);
121+
out.writeBytes(compressedBuffer, compressedSize);
122+
}
123+
}
124+
125+
@Override
126+
public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException {
127+
final int length = (int) buffersInput.size();
128+
byte[] bytes = new byte[length];
129+
buffersInput.readBytes(bytes, 0, length);
130+
compress(bytes, 0, length, out);
131+
}
132+
133+
@Override
134+
public void close() throws IOException {}
135+
}
136+
137+
/** QAT_DEFLATE decompressor */
138+
private static final class QatDecompressor extends Decompressor {
139+
140+
private byte[] compressed;
141+
private final QatZipper qatZipper;
142+
private final QatZipper.Mode qatMode;
143+
private final QatZipper.Algorithm algorithm;
144+
145+
/** default decompressor */
146+
public QatDecompressor(QatZipper.Algorithm algorithm, QatZipper.Mode qatMode) {
147+
this.algorithm = algorithm;
148+
this.qatMode = qatMode;
149+
compressed = BytesRef.EMPTY_BYTES;
150+
qatZipper = QatZipperFactory.createInstance(algorithm, qatMode, QatZipper.PollingMode.PERIODICAL);
151+
}
152+
153+
/*resuable decompress function*/
154+
@Override
155+
public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes) throws IOException {
156+
assert offset + length <= originalLength : "Buffer read size must be within limit.";
157+
158+
if (length == 0) {
159+
bytes.length = 0;
160+
return;
161+
}
162+
163+
final int blockLength = in.readVInt();
164+
bytes.offset = bytes.length = 0;
165+
int offsetInBlock = 0;
166+
int offsetInBytesRef = offset;
167+
168+
// Skip unneeded blocks
169+
while (offsetInBlock + blockLength < offset) {
170+
final int compressedLength = in.readVInt();
171+
in.skipBytes(compressedLength);
172+
offsetInBlock += blockLength;
173+
offsetInBytesRef -= blockLength;
174+
}
175+
176+
// Read blocks that intersect with the interval we need
177+
while (offsetInBlock < offset + length) {
178+
final int compressedLength = in.readVInt();
179+
if (compressedLength == 0) {
180+
return;
181+
}
182+
compressed = ArrayUtil.grow(compressed, compressedLength);
183+
in.readBytes(compressed, 0, compressedLength);
184+
185+
int l = Math.min(blockLength, originalLength - offsetInBlock);
186+
bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + l);
187+
188+
final int uncompressed = qatZipper.decompress(compressed, 0, compressedLength, bytes.bytes, bytes.length, l);
189+
190+
bytes.length += uncompressed;
191+
offsetInBlock += blockLength;
192+
}
193+
194+
bytes.offset = offsetInBytesRef;
195+
bytes.length = length;
196+
197+
assert bytes.isValid() : "Decompression output is corrupted.";
198+
}
199+
200+
@Override
201+
public Decompressor clone() {
202+
return new QatDecompressor(algorithm, qatMode);
203+
}
204+
}
205+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.codec.customcodecs;
10+
11+
import org.apache.logging.log4j.Logger;
12+
import org.opensearch.common.settings.Setting;
13+
import org.opensearch.index.codec.CodecAliases;
14+
import org.opensearch.index.codec.CodecSettings;
15+
import org.opensearch.index.engine.EngineConfig;
16+
import org.opensearch.index.mapper.MapperService;
17+
18+
import java.util.Set;
19+
import java.util.function.Supplier;
20+
21+
import com.intel.qat.QatZipper;
22+
23+
/**
24+
* QatDeflate99Codec provides a DEFLATE compressor using the <a
25+
* href="https://github.com/intel/qat-java">qat-java</a> library.
26+
*/
27+
public class QatDeflate99Codec extends Lucene99QatCodec implements CodecSettings, CodecAliases {
28+
29+
/** Creates a new QatDeflate99Codec instance with the default compression level. */
30+
public QatDeflate99Codec() {
31+
this(DEFAULT_COMPRESSION_LEVEL);
32+
}
33+
34+
/**
35+
* Creates a new QatDeflate99Codec instance.
36+
*
37+
* @param compressionLevel The compression level.
38+
*/
39+
public QatDeflate99Codec(int compressionLevel) {
40+
super(Mode.QAT_DEFLATE, compressionLevel);
41+
}
42+
43+
/**
44+
* Creates a new QatDeflate99Codec instance with the default compression level.
45+
*
46+
* @param compressionLevel The compression level.
47+
* @param supplier supplier for QAT acceleration mode.
48+
*/
49+
public QatDeflate99Codec(int compressionLevel, Supplier<QatZipper.Mode> supplier) {
50+
super(Mode.QAT_DEFLATE, compressionLevel, supplier);
51+
}
52+
53+
/**
54+
* Creates a new QatDeflate99Codec instance.
55+
*
56+
* @param mapperService The mapper service.
57+
* @param logger The logger.
58+
* @param compressionLevel The compression level.
59+
*/
60+
public QatDeflate99Codec(MapperService mapperService, Logger logger, int compressionLevel) {
61+
super(Mode.QAT_DEFLATE, compressionLevel, mapperService, logger);
62+
}
63+
64+
/**
65+
* Creates a new QatDeflate99Codec instance.
66+
*
67+
* @param mapperService The mapper service.
68+
* @param logger The logger.
69+
* @param compressionLevel The compression level.
70+
* @param supplier supplier for QAT acceleration mode.
71+
*/
72+
public QatDeflate99Codec(MapperService mapperService, Logger logger, int compressionLevel, Supplier<QatZipper.Mode> supplier) {
73+
super(Mode.QAT_DEFLATE, compressionLevel, mapperService, logger, supplier);
74+
}
75+
76+
/** The name for this codec. */
77+
@Override
78+
public String toString() {
79+
return getClass().getSimpleName();
80+
}
81+
82+
@Override
83+
public boolean supports(Setting<?> setting) {
84+
return setting.equals(EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING);
85+
}
86+
87+
@Override
88+
public Set<String> aliases() {
89+
return Mode.QAT_DEFLATE.getAliases();
90+
}
91+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.codec.customcodecs;
10+
11+
import org.apache.logging.log4j.Logger;
12+
import org.opensearch.common.settings.Setting;
13+
import org.opensearch.index.codec.CodecAliases;
14+
import org.opensearch.index.codec.CodecSettings;
15+
import org.opensearch.index.engine.EngineConfig;
16+
import org.opensearch.index.mapper.MapperService;
17+
18+
import java.util.Set;
19+
import java.util.function.Supplier;
20+
21+
import com.intel.qat.QatZipper;
22+
23+
/**
24+
* QatLz499Codec provides an LZ4 compressor using the <a
25+
* href="https://github.com/intel/qat-java">qat-java</a> library.
26+
*/
27+
public class QatLz499Codec extends Lucene99QatCodec implements CodecSettings, CodecAliases {
28+
29+
/** Creates a new QatLz499Codec instance with the default compression level. */
30+
public QatLz499Codec() {
31+
this(DEFAULT_COMPRESSION_LEVEL);
32+
}
33+
34+
/**
35+
* Creates a new QatLz499Codec instance.
36+
*
37+
* @param compressionLevel The compression level.
38+
*/
39+
public QatLz499Codec(int compressionLevel) {
40+
super(Mode.QAT_LZ4, compressionLevel);
41+
}
42+
43+
/**
44+
* Creates a new QatLz499Codec instance with the default compression level.
45+
*
46+
* @param compressionLevel The compression level.
47+
* @param supplier supplier for QAT acceleration mode.
48+
*/
49+
public QatLz499Codec(int compressionLevel, Supplier<QatZipper.Mode> supplier) {
50+
super(Mode.QAT_LZ4, compressionLevel, supplier);
51+
}
52+
53+
/**
54+
* Creates a new QatLz499Codec instance.
55+
*
56+
* @param mapperService The mapper service.
57+
* @param logger The logger.
58+
* @param compressionLevel The compression level.
59+
*/
60+
public QatLz499Codec(MapperService mapperService, Logger logger, int compressionLevel) {
61+
super(Mode.QAT_LZ4, compressionLevel, mapperService, logger);
62+
}
63+
64+
/**
65+
* Creates a new QatLz499Codec instance.
66+
*
67+
* @param mapperService The mapper service.
68+
* @param logger The logger.
69+
* @param compressionLevel The compression level.
70+
* @param supplier supplier for QAT acceleration mode.
71+
*/
72+
public QatLz499Codec(MapperService mapperService, Logger logger, int compressionLevel, Supplier<QatZipper.Mode> supplier) {
73+
super(Mode.QAT_LZ4, compressionLevel, mapperService, logger, supplier);
74+
}
75+
76+
/** The name for this codec. */
77+
@Override
78+
public String toString() {
79+
return getClass().getSimpleName();
80+
}
81+
82+
@Override
83+
public boolean supports(Setting<?> setting) {
84+
return setting.equals(EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING);
85+
}
86+
87+
@Override
88+
public Set<String> aliases() {
89+
return Mode.QAT_LZ4.getAliases();
90+
}
91+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.codec.customcodecs;
10+
11+
import com.intel.qat.QatZipper;
12+
13+
import static com.intel.qat.QatZipper.Algorithm;
14+
import static com.intel.qat.QatZipper.DEFAULT_COMPRESS_LEVEL;
15+
import static com.intel.qat.QatZipper.DEFAULT_MODE;
16+
import static com.intel.qat.QatZipper.DEFAULT_POLLING_MODE;
17+
import static com.intel.qat.QatZipper.DEFAULT_RETRY_COUNT;
18+
import static com.intel.qat.QatZipper.Mode;
19+
import static com.intel.qat.QatZipper.PollingMode;
20+
21+
/** A factory class to create instances of QatZipper */
22+
public class QatZipperFactory {
23+
24+
/**
25+
* Creates a new QatZipper with the specified parameters.
26+
*
27+
* @param algorithm the compression algorithm
28+
* @param level the compression level.
29+
* @param mode the mode of QAT execution
30+
* @param retryCount the number of attempts to acquire hardware resources
31+
* @param pmode polling mode.
32+
*/
33+
public static QatZipper createInstance(Algorithm algorithm, int level, Mode mode, int retryCount, PollingMode pmode) {
34+
return new QatZipper(algorithm, level, mode, retryCount, pmode);
35+
}
36+
37+
/**
38+
* Creates a new QatZipper that uses the DEFLATE algorithm and the default compression level,
39+
* mode, retry count, and polling mode.
40+
*/
41+
public static QatZipper createInstance() {
42+
return createInstance(Algorithm.DEFLATE, DEFAULT_COMPRESS_LEVEL, DEFAULT_MODE, DEFAULT_RETRY_COUNT, DEFAULT_POLLING_MODE);
43+
}
44+
45+
/**
46+
* Creates a new QatZipper with the specified compression algorithm. Uses the default compression
47+
* level, mode, retry count, and polling mode.
48+
*
49+
* @param algorithm the compression algorithm
50+
*/
51+
public static QatZipper createInstance(Algorithm algorithm) {
52+
return createInstance(algorithm, DEFAULT_COMPRESS_LEVEL, DEFAULT_MODE, DEFAULT_RETRY_COUNT, DEFAULT_POLLING_MODE);
53+
}
54+
55+
/**
56+
* Creates a new QatZipper with the specified execution mode. Uses the DEFLATE algorithm with the
57+
* default compression level, retry count, and polling mode.
58+
*
59+
* @param mode the mode of QAT execution
60+
*/
61+
public static QatZipper createInstance(Mode mode) {
62+
return createInstance(Algorithm.DEFLATE, DEFAULT_COMPRESS_LEVEL, mode, DEFAULT_RETRY_COUNT, DEFAULT_POLLING_MODE);
63+
}
64+
65+
/**
66+
* Creates a new QatZipper with the specified polling polling mode. Uses the DEFLATE algorithm
67+
* with the default compression level, mode, and retry count.
68+
*
69+
* @param pmode the polling mode.
70+
*/
71+
public static QatZipper createInstance(PollingMode pmode) {
72+
return createInstance(Algorithm.DEFLATE, DEFAULT_COMPRESS_LEVEL, DEFAULT_MODE, DEFAULT_RETRY_COUNT, pmode);
73+
}
74+
75+
/**
76+
* Creates a new QatZipper with the specified algorithm and compression level. Uses the default
77+
* mode, retry count, and polling mode.
78+
*
79+
* @param algorithm the compression algorithm (deflate or LZ4).
80+
* @param level the compression level.
81+
*/
82+
public static QatZipper createInstance(Algorithm algorithm, int level) {
83+
return createInstance(algorithm, level, DEFAULT_MODE, DEFAULT_RETRY_COUNT, DEFAULT_POLLING_MODE);
84+
}
85+
86+
/**
87+
* Creates a new QatZipper with the specified algorithm and mode of execution. Uses the default
88+
* compression level, retry count, and polling mode.
89+
*
90+
* @param algorithm the compression algorithm
91+
* @param mode the mode of QAT execution
92+
*/
93+
public static QatZipper createInstance(Algorithm algorithm, Mode mode) {
94+
return createInstance(algorithm, DEFAULT_COMPRESS_LEVEL, mode, DEFAULT_RETRY_COUNT, DEFAULT_POLLING_MODE);
95+
}
96+
97+
/**
98+
* Creates a new QatZipper with the specified algorithm and polling mode of execution. Uses the
99+
* default compression level, mode, and retry count.
100+
*
101+
* @param algorithm the compression algorithm
102+
* @param pmode the polling mode.
103+
*/
104+
public static QatZipper createInstance(Algorithm algorithm, PollingMode pmode) {
105+
return createInstance(algorithm, DEFAULT_COMPRESS_LEVEL, DEFAULT_MODE, DEFAULT_RETRY_COUNT, pmode);
106+
}
107+
108+
/**
109+
* Creates a new QatZipper with the specified algorithm and mode of execution. Uses compression
110+
* level and retry count.
111+
*
112+
* @param algorithm the compression algorithm
113+
* @param mode the mode of QAT execution
114+
* @param pmode the polling mode.
115+
*/
116+
public static QatZipper createInstance(Algorithm algorithm, Mode mode, PollingMode pmode) {
117+
return createInstance(algorithm, DEFAULT_COMPRESS_LEVEL, mode, DEFAULT_RETRY_COUNT, pmode);
118+
}
119+
120+
/**
121+
* Creates a new QatZipper with the specified algorithm, compression level, and mode . Uses the
122+
* default retry count and polling mode.
123+
*
124+
* @param algorithm the compression algorithm (deflate or LZ4).
125+
* @param level the compression level.
126+
* @param mode the mode of operation (HARDWARE - only hardware, AUTO - hardware with a software
127+
* failover.)
128+
*/
129+
public static QatZipper createInstance(Algorithm algorithm, int level, Mode mode) {
130+
return createInstance(algorithm, level, mode, DEFAULT_RETRY_COUNT, DEFAULT_POLLING_MODE);
131+
}
132+
133+
/**
134+
* Creates a new QatZipper with the specified algorithm, compression level, and polling mode .
135+
* Uses the default mode and retry count.
136+
*
137+
* @param algorithm the compression algorithm (deflate or LZ4).
138+
* @param level the compression level.
139+
* @param pmode the polling mode.
140+
*/
141+
public static QatZipper createInstance(Algorithm algorithm, int level, PollingMode pmode) {
142+
return createInstance(algorithm, level, DEFAULT_MODE, DEFAULT_RETRY_COUNT, pmode);
143+
}
144+
145+
/**
146+
* Creates a new QatZipper with the specified parameters and polling mode.
147+
*
148+
* @param algorithm the compression algorithm
149+
* @param level the compression level.
150+
* @param mode the mode of QAT execution
151+
* @param retryCount the number of attempts to acquire hardware resources
152+
*/
153+
public static QatZipper createInstance(Algorithm algorithm, int level, Mode mode, int retryCount) {
154+
return createInstance(algorithm, level, mode, retryCount, DEFAULT_POLLING_MODE);
155+
}
156+
157+
/**
158+
* Creates a new QatZipper with the specified parameters and retry count.
159+
*
160+
* @param algorithm the compression algorithm
161+
* @param level the compression level.
162+
* @param mode the mode of QAT execution
163+
* @param pmode the polling mode.
164+
*/
165+
public static QatZipper createInstance(Algorithm algorithm, int level, Mode mode, PollingMode pmode) {
166+
return createInstance(algorithm, level, mode, DEFAULT_RETRY_COUNT, pmode);
167+
}
168+
169+
/**
170+
* Checks if QAT hardware is available.
171+
*
172+
* @return true if QAT hardware is available, false otherwise.
173+
*/
174+
public static boolean isQatAvailable() {
175+
try {
176+
QatZipper qzip = QatZipperFactory.createInstance();
177+
qzip.end();
178+
return true;
179+
} catch (UnsatisfiedLinkError | ExceptionInInitializerError | NoClassDefFoundError e) {
180+
return false;
181+
}
182+
}
183+
}

‎src/main/java/org/opensearch/index/codec/customcodecs/Zstd99Codec.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,12 @@
1818
import java.util.Set;
1919

2020
/**
21-
* ZstdCodec provides ZSTD compressor using the <a href="https://github.com/luben/zstd-jni">zstd-jni</a> library.
21+
* ZstdCodec provides ZSTD compressor using the <a
22+
* href="https://github.com/luben/zstd-jni">zstd-jni</a> library.
2223
*/
2324
public class Zstd99Codec extends Lucene99CustomCodec implements CodecSettings, CodecAliases {
2425

25-
/**
26-
* Creates a new ZstdCodec instance with the default compression level.
27-
*/
26+
/** Creates a new ZstdCodec instance with the default compression level. */
2827
public Zstd99Codec() {
2928
this(DEFAULT_COMPRESSION_LEVEL);
3029
}

‎src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,12 @@ public class ZstdCompressionMode extends CompressionMode {
3030

3131
private static final int NUM_SUB_BLOCKS = 10;
3232
private static final int DICT_SIZE_FACTOR = 6;
33-
private static final int DEFAULT_COMPRESSION_LEVEL = 6;
3433

3534
private final int compressionLevel;
3635

3736
/** default constructor */
3837
protected ZstdCompressionMode() {
39-
this.compressionLevel = DEFAULT_COMPRESSION_LEVEL;
38+
this.compressionLevel = Lucene99CustomCodec.DEFAULT_COMPRESSION_LEVEL;
4039
}
4140

4241
/**
@@ -48,7 +47,7 @@ protected ZstdCompressionMode(int compressionLevel) {
4847
this.compressionLevel = compressionLevel;
4948
}
5049

51-
/** Creates a new compressor instance.*/
50+
/** Creates a new compressor instance. */
5251
@Override
5352
public Compressor newCompressor() {
5453
return new ZstdCompressor(compressionLevel);

‎src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDict99Codec.java

+2-6
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,10 @@
1717

1818
import java.util.Set;
1919

20-
/**
21-
* ZstdNoDictCodec provides ZSTD compressor without a dictionary support.
22-
*/
20+
/** ZstdNoDictCodec provides ZSTD compressor without a dictionary support. */
2321
public class ZstdNoDict99Codec extends Lucene99CustomCodec implements CodecSettings, CodecAliases {
2422

25-
/**
26-
* Creates a new ZstdNoDictCodec instance with the default compression level.
27-
*/
23+
/** Creates a new ZstdNoDictCodec instance with the default compression level. */
2824
public ZstdNoDict99Codec() {
2925
this(DEFAULT_COMPRESSION_LEVEL);
3026
}

‎src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressionMode.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,12 @@
2525
public class ZstdNoDictCompressionMode extends CompressionMode {
2626

2727
private static final int NUM_SUB_BLOCKS = 10;
28-
private static final int DEFAULT_COMPRESSION_LEVEL = 6;
2928

3029
private final int compressionLevel;
3130

3231
/** default constructor */
3332
protected ZstdNoDictCompressionMode() {
34-
this.compressionLevel = DEFAULT_COMPRESSION_LEVEL;
33+
this.compressionLevel = Lucene99CustomCodec.DEFAULT_COMPRESSION_LEVEL;
3534
}
3635

3736
/**
@@ -43,7 +42,7 @@ protected ZstdNoDictCompressionMode(int compressionLevel) {
4342
this.compressionLevel = compressionLevel;
4443
}
4544

46-
/** Creates a new compressor instance.*/
45+
/** Creates a new compressor instance. */
4746
@Override
4847
public Compressor newCompressor() {
4948
return new ZstdCompressor(compressionLevel);
@@ -155,7 +154,7 @@ public void decompress(DataInput in, int originalLength, int offset, int length,
155154
compressed = ArrayUtil.growNoCopy(compressed, compressedLength);
156155
in.readBytes(compressed, 0, compressedLength);
157156

158-
int l = Math.min(blockLength, originalLength - offsetInBlock);
157+
final int l = Math.min(blockLength, originalLength - offsetInBlock);
159158
bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + l);
160159

161160
final int uncompressed = (int) Zstd.decompressByteArray(bytes.bytes, bytes.length, l, compressed, 0, compressedLength);

‎src/main/java/org/opensearch/index/codec/customcodecs/backward_codecs/Lucene95CustomStoredFieldsFormat.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode mode, int compr
6868

6969
/**
7070
* Returns a {@link StoredFieldsReader} to load stored fields.
71+
*
7172
* @param directory The index directory.
7273
* @param si The SegmentInfo that stores segment information.
7374
* @param fn The fieldInfos.
@@ -85,6 +86,7 @@ public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo si, Fiel
8586

8687
/**
8788
* Returns a {@link StoredFieldsReader} to write stored fields.
89+
*
8890
* @param directory The index directory.
8991
* @param si The SegmentInfo that stores segment information.
9092
* @param context The IOContext that holds additional details on the merge/search context.
@@ -120,23 +122,20 @@ StoredFieldsFormat impl(Lucene95CustomCodec.Mode mode) {
120122
ZSTD_BLOCK_SHIFT
121123
);
122124
default:
123-
throw new AssertionError();
125+
throw new IllegalStateException("Unsupported compression mode: " + mode);
124126
}
125127
}
126128

127129
public Lucene95CustomCodec.Mode getMode() {
128130
return mode;
129131
}
130132

131-
/**
132-
* Returns the compression level.
133-
*/
133+
/** Returns the compression level. */
134134
public int getCompressionLevel() {
135135
return compressionLevel;
136136
}
137137

138138
public CompressionMode getCompressionMode() {
139139
return mode == Lucene95CustomCodec.Mode.ZSTD_NO_DICT ? zstdNoDictCompressionMode : zstdCompressionMode;
140140
}
141-
142-
}
141+
}

‎src/main/java/org/opensearch/index/codec/customcodecs/package-info.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,5 @@
66
* compatible open source license.
77
*/
88

9-
/**
10-
* A plugin that implements compression codecs with native implementation.
11-
*/
9+
/** A plugin that implements compression codecs with native implementation. */
1210
package org.opensearch.index.codec.customcodecs;

‎src/main/plugin-metadata/plugin-security.policy

+5
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,8 @@
99
grant codeBase "${codebase.zstd-jni}" {
1010
permission java.lang.RuntimePermission "loadLibrary.*";
1111
};
12+
13+
grant codeBase "${codebase.qat-java}" {
14+
permission java.lang.RuntimePermission "loadLibrary.*";
15+
permission org.opensearch.secure_sm.ThreadPermission "modifyArbitraryThread";
16+
};

‎src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec

+3-1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,6 @@ org.opensearch.index.codec.customcodecs.backward_codecs.Zstd95Codec
22
org.opensearch.index.codec.customcodecs.backward_codecs.ZstdNoDict95Codec
33
org.opensearch.index.codec.customcodecs.backward_codecs.Zstd95DeprecatedCodec
44
org.opensearch.index.codec.customcodecs.Zstd99Codec
5-
org.opensearch.index.codec.customcodecs.ZstdNoDict99Codec
5+
org.opensearch.index.codec.customcodecs.ZstdNoDict99Codec
6+
org.opensearch.index.codec.customcodecs.QatDeflate99Codec
7+
org.opensearch.index.codec.customcodecs.QatLz499Codec

‎src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ private void addBytes(Random r, ByteArrayOutputStream bos) throws IOException {
193193
bos.write(bytes);
194194
}
195195

196-
private void doTest(byte[] bytes) throws IOException {
196+
protected void doTest(byte[] bytes) throws IOException {
197197
final int length = bytes.length;
198198

199199
ByteBuffersDataInput in = new ByteBuffersDataInput(List.of(ByteBuffer.wrap(bytes)));
@@ -215,5 +215,4 @@ private void doTest(byte[] bytes) throws IOException {
215215

216216
assertArrayEquals(bytes, restored);
217217
}
218-
219218
}

‎src/test/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormatTests.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -55,5 +55,4 @@ public void testZstdNoDictCompressionModes(){
5555
Lucene95CustomStoredFieldsFormat lucene95CustomStoredFieldsFormat = new Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode.ZSTD_NO_DICT);
5656
assertTrue(lucene95CustomStoredFieldsFormat.getCompressionMode() instanceof ZstdNoDictCompressionMode);
5757
}
58-
59-
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.codec.customcodecs;
10+
11+
import org.opensearch.test.OpenSearchTestCase;
12+
13+
import static org.hamcrest.Matchers.is;
14+
import static org.junit.Assume.assumeThat;
15+
16+
public class Lucene99QatStoredFieldsFormatTests extends OpenSearchTestCase {
17+
18+
public void testLz4Lucene99QatCodecMode() {
19+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
20+
Lucene99QatStoredFieldsFormat lucene99QatStoredFieldsFormat = new Lucene99QatStoredFieldsFormat(Lucene99QatCodec.Mode.QAT_LZ4);
21+
assertEquals(Lucene99QatCodec.Mode.QAT_LZ4, lucene99QatStoredFieldsFormat.getMode());
22+
}
23+
24+
public void testDeflateLucene99QatCodecMode() {
25+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
26+
Lucene99QatStoredFieldsFormat lucene99QatStoredFieldsFormat = new Lucene99QatStoredFieldsFormat(Lucene99QatCodec.Mode.QAT_DEFLATE);
27+
assertEquals(Lucene99QatCodec.Mode.QAT_DEFLATE, lucene99QatStoredFieldsFormat.getMode());
28+
}
29+
30+
public void testLz4Lucene99QatCodecModeWithCompressionLevel() {
31+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
32+
int randomCompressionLevel = randomIntBetween(1, 6);
33+
Lucene99QatStoredFieldsFormat lucene99QatStoredFieldsFormat = new Lucene99QatStoredFieldsFormat(
34+
Lucene99QatCodec.Mode.QAT_LZ4,
35+
randomCompressionLevel
36+
);
37+
assertEquals(Lucene99QatCodec.Mode.QAT_LZ4, lucene99QatStoredFieldsFormat.getMode());
38+
assertEquals(randomCompressionLevel, lucene99QatStoredFieldsFormat.getCompressionMode().getCompressionLevel());
39+
}
40+
41+
public void testDeflateLucene99QatCodecModeWithCompressionLevel() {
42+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
43+
int randomCompressionLevel = randomIntBetween(1, 6);
44+
Lucene99QatStoredFieldsFormat lucene99QatStoredFieldsFormat = new Lucene99QatStoredFieldsFormat(
45+
Lucene99QatCodec.Mode.QAT_DEFLATE,
46+
randomCompressionLevel
47+
);
48+
assertEquals(Lucene99QatCodec.Mode.QAT_DEFLATE, lucene99QatStoredFieldsFormat.getMode());
49+
assertEquals(randomCompressionLevel, lucene99QatStoredFieldsFormat.getCompressionMode().getCompressionLevel());
50+
}
51+
52+
public void testLz4CompressionModes() {
53+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
54+
Lucene99QatStoredFieldsFormat lucene99QatStoredFieldsFormat = new Lucene99QatStoredFieldsFormat(Lucene99QatCodec.Mode.QAT_LZ4);
55+
assertTrue(lucene99QatStoredFieldsFormat.getCompressionMode() instanceof QatCompressionMode);
56+
}
57+
58+
public void testDeflateCompressionModes() {
59+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
60+
Lucene99QatStoredFieldsFormat lucene99QatStoredFieldsFormat = new Lucene99QatStoredFieldsFormat(Lucene99QatCodec.Mode.QAT_DEFLATE);
61+
assertTrue(lucene99QatStoredFieldsFormat.getCompressionMode() instanceof QatCompressionMode);
62+
}
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/*
10+
* Licensed to Elasticsearch under one or more contributor
11+
* license agreements. See the NOTICE file distributed with
12+
* this work for additional information regarding copyright
13+
* ownership. Elasticsearch licenses this file to you under
14+
* the Apache License, Version 2.0 (the "License"); you may
15+
* not use this file except in compliance with the License.
16+
* You may obtain a copy of the License at
17+
*
18+
* http://www.apache.org/licenses/LICENSE-2.0
19+
*
20+
* Unless required by applicable law or agreed to in writing,
21+
* software distributed under the License is distributed on an
22+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
23+
* KIND, either express or implied. See the License for the
24+
* specific language governing permissions and limitations
25+
* under the License.
26+
*/
27+
28+
/*
29+
* Modifications Copyright OpenSearch Contributors. See
30+
* GitHub history for details.
31+
*/
32+
33+
package org.opensearch.index.codec.customcodecs;
34+
35+
import org.apache.logging.log4j.LogManager;
36+
import org.apache.lucene.codecs.Codec;
37+
import org.apache.lucene.document.Document;
38+
import org.apache.lucene.index.DirectoryReader;
39+
import org.apache.lucene.index.IndexWriter;
40+
import org.apache.lucene.index.IndexWriterConfig;
41+
import org.apache.lucene.index.SegmentReader;
42+
import org.apache.lucene.store.Directory;
43+
import org.apache.lucene.tests.util.LuceneTestCase.SuppressCodecs;
44+
import org.opensearch.common.settings.Settings;
45+
import org.opensearch.env.Environment;
46+
import org.opensearch.index.IndexSettings;
47+
import org.opensearch.index.analysis.IndexAnalyzers;
48+
import org.opensearch.index.codec.CodecService;
49+
import org.opensearch.index.codec.CodecServiceConfig;
50+
import org.opensearch.index.codec.CodecServiceFactory;
51+
import org.opensearch.index.codec.CodecSettings;
52+
import org.opensearch.index.mapper.MapperService;
53+
import org.opensearch.index.similarity.SimilarityService;
54+
import org.opensearch.indices.mapper.MapperRegistry;
55+
import org.opensearch.plugins.MapperPlugin;
56+
import org.opensearch.test.IndexSettingsModule;
57+
import org.opensearch.test.OpenSearchTestCase;
58+
import org.junit.Before;
59+
60+
import java.io.IOException;
61+
import java.util.Collections;
62+
import java.util.Optional;
63+
64+
import static org.opensearch.index.engine.EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING;
65+
import static org.hamcrest.Matchers.is;
66+
import static org.junit.Assume.assumeThat;
67+
68+
@SuppressCodecs("*") // we test against default codec so never get a random one here!
69+
public class QatCodecTests extends OpenSearchTestCase {
70+
71+
private CustomCodecPlugin plugin;
72+
73+
@Before
74+
public void setup() {
75+
plugin = new CustomCodecPlugin();
76+
}
77+
78+
public void testQatLz4() throws Exception {
79+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
80+
Codec codec = createCodecService(false).codec("qat_lz4");
81+
assertStoredFieldsCompressionEquals(Lucene99QatCodec.Mode.QAT_LZ4, codec);
82+
Lucene99QatStoredFieldsFormat storedFieldsFormat = (Lucene99QatStoredFieldsFormat) codec.storedFieldsFormat();
83+
assertEquals(Lucene99CustomCodec.DEFAULT_COMPRESSION_LEVEL, storedFieldsFormat.getCompressionMode().getCompressionLevel());
84+
}
85+
86+
public void testQatDeflate() throws Exception {
87+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
88+
Codec codec = createCodecService(false).codec("qat_deflate");
89+
assertStoredFieldsCompressionEquals(Lucene99QatCodec.Mode.QAT_DEFLATE, codec);
90+
Lucene99QatStoredFieldsFormat storedFieldsFormat = (Lucene99QatStoredFieldsFormat) codec.storedFieldsFormat();
91+
assertEquals(Lucene99CustomCodec.DEFAULT_COMPRESSION_LEVEL, storedFieldsFormat.getCompressionMode().getCompressionLevel());
92+
}
93+
94+
public void testQatLz4WithCompressionLevel() throws Exception {
95+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
96+
int randomCompressionLevel = randomIntBetween(1, 6);
97+
Codec codec = createCodecService(randomCompressionLevel, "qat_lz4").codec("qat_lz4");
98+
assertStoredFieldsCompressionEquals(Lucene99QatCodec.Mode.QAT_LZ4, codec);
99+
Lucene99QatStoredFieldsFormat storedFieldsFormat = (Lucene99QatStoredFieldsFormat) codec.storedFieldsFormat();
100+
assertEquals(randomCompressionLevel, storedFieldsFormat.getCompressionMode().getCompressionLevel());
101+
}
102+
103+
public void testQatDeflateWithCompressionLevel() throws Exception {
104+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
105+
int randomCompressionLevel = randomIntBetween(1, 6);
106+
Codec codec = createCodecService(randomCompressionLevel, "qat_deflate").codec("qat_deflate");
107+
assertStoredFieldsCompressionEquals(Lucene99QatCodec.Mode.QAT_DEFLATE, codec);
108+
Lucene99QatStoredFieldsFormat storedFieldsFormat = (Lucene99QatStoredFieldsFormat) codec.storedFieldsFormat();
109+
assertEquals(randomCompressionLevel, storedFieldsFormat.getCompressionMode().getCompressionLevel());
110+
}
111+
112+
public void testQatCompressionLevelSupport() throws Exception {
113+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
114+
CodecService codecService = createCodecService(false);
115+
CodecSettings qatDeflateCodec = (CodecSettings) codecService.codec("qat_deflate");
116+
CodecSettings qatLz4Codec = (CodecSettings) codecService.codec("qat_lz4");
117+
assertTrue(qatDeflateCodec.supports(INDEX_CODEC_COMPRESSION_LEVEL_SETTING));
118+
assertTrue(qatLz4Codec.supports(INDEX_CODEC_COMPRESSION_LEVEL_SETTING));
119+
}
120+
121+
public void testQatLz4MapperServiceNull() throws Exception {
122+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
123+
Codec codec = createCodecService(true).codec("qat_lz4");
124+
assertStoredFieldsCompressionEquals(Lucene99QatCodec.Mode.QAT_LZ4, codec);
125+
Lucene99QatStoredFieldsFormat storedFieldsFormat = (Lucene99QatStoredFieldsFormat) codec.storedFieldsFormat();
126+
assertEquals(Lucene99QatCodec.DEFAULT_COMPRESSION_LEVEL, storedFieldsFormat.getCompressionMode().getCompressionLevel());
127+
}
128+
129+
public void testQatDeflateMapperServiceNull() throws Exception {
130+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
131+
Codec codec = createCodecService(true).codec("qat_deflate");
132+
assertStoredFieldsCompressionEquals(Lucene99QatCodec.Mode.QAT_DEFLATE, codec);
133+
Lucene99QatStoredFieldsFormat storedFieldsFormat = (Lucene99QatStoredFieldsFormat) codec.storedFieldsFormat();
134+
assertEquals(Lucene99QatCodec.DEFAULT_COMPRESSION_LEVEL, storedFieldsFormat.getCompressionMode().getCompressionLevel());
135+
}
136+
137+
private void assertStoredFieldsCompressionEquals(Lucene99QatCodec.Mode expected, Codec actual) throws Exception {
138+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
139+
SegmentReader sr = getSegmentReader(actual);
140+
String v = sr.getSegmentInfo().info.getAttribute(Lucene99QatStoredFieldsFormat.MODE_KEY);
141+
assertNotNull(v);
142+
assertEquals(expected, Lucene99QatCodec.Mode.valueOf(v));
143+
}
144+
145+
private CodecService createCodecService(boolean isMapperServiceNull) throws IOException {
146+
Settings nodeSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build();
147+
if (isMapperServiceNull) {
148+
return new CustomCodecService(
149+
null,
150+
IndexSettingsModule.newIndexSettings("_na", nodeSettings, Lucene99QatCodec.INDEX_CODEC_QAT_MODE_SETTING),
151+
LogManager.getLogger("test")
152+
);
153+
}
154+
return buildCodecService(nodeSettings);
155+
}
156+
157+
private CodecService createCodecService(int randomCompressionLevel, String codec) throws IOException {
158+
Settings nodeSettings = Settings.builder()
159+
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
160+
.put("index.codec", codec)
161+
.put("index.codec.compression_level", randomCompressionLevel)
162+
.build();
163+
return buildCodecService(nodeSettings);
164+
}
165+
166+
private CodecService buildCodecService(Settings nodeSettings) throws IOException {
167+
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(
168+
"_na",
169+
nodeSettings,
170+
Lucene99QatCodec.INDEX_CODEC_QAT_MODE_SETTING
171+
);
172+
SimilarityService similarityService = new SimilarityService(indexSettings, null, Collections.emptyMap());
173+
IndexAnalyzers indexAnalyzers = createTestAnalysis(indexSettings, nodeSettings).indexAnalyzers;
174+
MapperRegistry mapperRegistry = new MapperRegistry(Collections.emptyMap(), Collections.emptyMap(), MapperPlugin.NOOP_FIELD_FILTER);
175+
MapperService service = new MapperService(
176+
indexSettings,
177+
indexAnalyzers,
178+
xContentRegistry(),
179+
similarityService,
180+
mapperRegistry,
181+
() -> null,
182+
() -> false,
183+
null
184+
);
185+
186+
Optional<CodecServiceFactory> customCodecServiceFactory = plugin.getCustomCodecServiceFactory(indexSettings);
187+
if (customCodecServiceFactory.isPresent()) {
188+
return customCodecServiceFactory.get().createCodecService(new CodecServiceConfig(indexSettings, service, logger));
189+
}
190+
return new CustomCodecService(service, indexSettings, LogManager.getLogger("test"));
191+
}
192+
193+
private SegmentReader getSegmentReader(Codec codec) throws IOException {
194+
Directory dir = newDirectory();
195+
IndexWriterConfig iwc = newIndexWriterConfig(null);
196+
iwc.setCodec(codec);
197+
IndexWriter iw = new IndexWriter(dir, iwc);
198+
iw.addDocument(new Document());
199+
iw.commit();
200+
iw.close();
201+
DirectoryReader ir = DirectoryReader.open(dir);
202+
SegmentReader sr = (SegmentReader) ir.leaves().get(0).reader();
203+
ir.close();
204+
dir.close();
205+
return sr;
206+
}
207+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
package org.opensearch.index.codec.customcodecs;
9+
10+
import org.apache.lucene.codecs.compressing.Compressor;
11+
import org.apache.lucene.codecs.compressing.Decompressor;
12+
13+
import java.io.IOException;
14+
15+
import static org.hamcrest.Matchers.is;
16+
import static org.junit.Assume.assumeThat;
17+
18+
/** Test QAT DEFLATE compression */
19+
public class QatDeflateCompressorTests extends AbstractCompressorTests {
20+
21+
@Override
22+
Compressor compressor() {
23+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
24+
return new QatCompressionMode(Lucene99QatCodec.Mode.QAT_DEFLATE).newCompressor();
25+
}
26+
27+
@Override
28+
Decompressor decompressor() {
29+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
30+
return new QatCompressionMode(Lucene99QatCodec.Mode.QAT_DEFLATE).newDecompressor();
31+
}
32+
33+
@Override
34+
public void testEmpty() throws IOException {
35+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
36+
super.testEmpty();
37+
}
38+
39+
@Override
40+
public void testShortLiterals() throws IOException {
41+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
42+
super.testShortLiterals();
43+
}
44+
45+
@Override
46+
public void testRandom() throws IOException {
47+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
48+
super.testRandom();
49+
}
50+
51+
@Override
52+
public void testLineDocs() throws IOException {
53+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
54+
super.testLineDocs();
55+
}
56+
57+
@Override
58+
public void testRepetitionsL() throws IOException {
59+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
60+
super.testRepetitionsL();
61+
}
62+
63+
@Override
64+
public void testRepetitionsI() throws IOException {
65+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
66+
super.testRepetitionsI();
67+
}
68+
69+
@Override
70+
public void testRepetitionsS() throws IOException {
71+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
72+
super.testRepetitionsS();
73+
}
74+
75+
@Override
76+
public void testMixed() throws IOException {
77+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
78+
super.testMixed();
79+
}
80+
81+
@Override
82+
protected void doTest(byte[] bytes) throws IOException {
83+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
84+
super.doTest(bytes);
85+
}
86+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
package org.opensearch.index.codec.customcodecs;
9+
10+
import org.apache.lucene.codecs.compressing.Compressor;
11+
import org.apache.lucene.codecs.compressing.Decompressor;
12+
13+
import java.io.IOException;
14+
15+
import static org.hamcrest.Matchers.is;
16+
import static org.junit.Assume.assumeThat;
17+
18+
/** Test QAT LZ4 */
19+
public class QatLz4CompressorTests extends AbstractCompressorTests {
20+
21+
@Override
22+
Compressor compressor() {
23+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
24+
return new QatCompressionMode(Lucene99QatCodec.Mode.QAT_LZ4).newCompressor();
25+
}
26+
27+
@Override
28+
Decompressor decompressor() {
29+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
30+
return new QatCompressionMode(Lucene99QatCodec.Mode.QAT_LZ4).newDecompressor();
31+
}
32+
33+
@Override
34+
public void testEmpty() throws IOException {
35+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
36+
super.testEmpty();
37+
}
38+
39+
@Override
40+
public void testShortLiterals() throws IOException {
41+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
42+
super.testShortLiterals();
43+
}
44+
45+
@Override
46+
public void testRandom() throws IOException {
47+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
48+
super.testRandom();
49+
}
50+
51+
@Override
52+
public void testLineDocs() throws IOException {
53+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
54+
super.testLineDocs();
55+
}
56+
57+
@Override
58+
public void testRepetitionsL() throws IOException {
59+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
60+
super.testRepetitionsL();
61+
}
62+
63+
@Override
64+
public void testRepetitionsI() throws IOException {
65+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
66+
super.testRepetitionsI();
67+
}
68+
69+
@Override
70+
public void testRepetitionsS() throws IOException {
71+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
72+
super.testRepetitionsS();
73+
}
74+
75+
@Override
76+
public void testMixed() throws IOException {
77+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
78+
super.testMixed();
79+
}
80+
81+
@Override
82+
protected void doTest(byte[] bytes) throws IOException {
83+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
84+
super.doTest(bytes);
85+
}
86+
}

‎src/test/java/org/opensearch/index/codec/customcodecs/ZstdCompressorTests.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@
1010
import org.apache.lucene.codecs.compressing.Compressor;
1111
import org.apache.lucene.codecs.compressing.Decompressor;
1212

13-
/**
14-
* Test ZSTD compression (with dictionary enabled)
15-
*/
13+
/** Test ZSTD compression (with dictionary enabled) */
1614
public class ZstdCompressorTests extends AbstractCompressorTests {
1715

1816
private final Compressor compressor = new ZstdCompressionMode().newCompressor();

‎src/test/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressorTests.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@
1010
import org.apache.lucene.codecs.compressing.Compressor;
1111
import org.apache.lucene.codecs.compressing.Decompressor;
1212

13-
/**
14-
* Test ZSTD compression (with no dictionary).
15-
*/
13+
/** Test ZSTD compression (with no dictionary). */
1614
public class ZstdNoDictCompressorTests extends AbstractCompressorTests {
1715

1816
private final Compressor compressor = new ZstdNoDictCompressionMode().newCompressor();

0 commit comments

Comments
 (0)
Please sign in to comment.