Skip to content

Commit 75c1a2e

Browse files
authored
Add hardware-accelerated codecs for DEFLATE and LZ4 (#122) (#150)
* Add QAT accelerated compression. Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> * Use own classes for QAT codec. Apply SpotlessJavaCheck. Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> * Declare fields final, unless required not to. Throw a valid type of exception. Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> * Use assumeThat in the Qat test classes. Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> * Add more QAT availability check in QatCodecTests. Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> * Make LZ4 the default algorithm for QAT. Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> * Make 'auto' the default execution mode for QAT. Also, minor clean up work. Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> * Revert compression level for ZSTD to 3. Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> * Replace QatLz4/DeflateCompressionMode classes with QatCompressionMode. Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> * Fix a MultiCodecMergeIT test fail. Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> * Remove hard-coded values for default compression level. Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> --------- Signed-off-by: Mulugeta Mammo <mulugeta.mammo@intel.com> Signed-off-by: mulugetam <mulugeta.mammo@intel.com> Co-authored-by: Mulugeta Mammo <cppx86@gmail.com> (cherry picked from commit c8b0d80)
1 parent 7eb55e6 commit 75c1a2e

34 files changed

+1649
-76
lines changed

build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ opensearchplugin {
7272

7373
dependencies {
7474
api "com.github.luben:zstd-jni:1.5.5-5"
75+
api "com.intel.qat:qat-java:1.1.1"
7576
}
7677

7778
allprojects {

licenses/qat-java-1.1.1.jar.sha1

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3333601cdedf6a711d445118d5bc44ec6a9c65f9

licenses/qat-java-LICENSE.txt

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
-----------------------------------------------------------------------------
2+
** Beginning of "BSD License" text. **
3+
4+
Qat-Java: Qat-Java is a compression library that uses Intel® QAT to accelerate
5+
compression and decompression.
6+
7+
Copyright(c) 2007-2023 Intel Corporation. All rights reserved.
8+
All rights reserved.
9+
10+
BSD License
11+
12+
Redistribution and use in source and binary forms, with or without
13+
modification, are permitted provided that the following conditions
14+
are met:
15+
16+
* Redistributions of source code must retain the above copyright
17+
notice, this list of conditions and the following disclaimer.
18+
* Redistributions in binary form must reproduce the above copyright
19+
notice, this list of conditions and the following disclaimer in
20+
the documentation and/or other materials provided with the
21+
distribution.
22+
* Neither the name of Intel Corporation nor the names of its
23+
contributors may be used to endorse or promote products derived
24+
from this software without specific prior written permission.
25+
26+
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
27+
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
28+
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
29+
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
30+
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
31+
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
32+
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
33+
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
34+
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
35+
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
36+
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

licenses/qat-java-NOTICE.txt

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Qat-Java is a compression library that uses Intel® QAT to accelerate compression and decompression.

src/integrationTest/java/org/opensearch/index/codec/rest/CreateIndexWithCodecIT.java

+32-3
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,25 @@
2222
import org.opensearch.cluster.metadata.IndexMetadata;
2323
import org.opensearch.common.settings.Settings;
2424
import org.opensearch.core.common.Strings;
25+
import org.opensearch.index.codec.customcodecs.QatZipperFactory;
2526
import org.opensearch.test.rest.OpenSearchRestTestCase;
2627

28+
import javax.net.ssl.SSLContext;
29+
2730
import java.io.IOException;
2831
import java.security.KeyManagementException;
2932
import java.security.KeyStoreException;
3033
import java.security.NoSuchAlgorithmException;
31-
3234
import java.util.Objects;
3335

34-
import javax.net.ssl.SSLContext;
35-
36+
import static org.opensearch.client.RestClientBuilder.DEFAULT_MAX_CONN_PER_ROUTE;
37+
import static org.opensearch.client.RestClientBuilder.DEFAULT_MAX_CONN_TOTAL;
38+
import static org.opensearch.index.codec.customcodecs.CustomCodecService.QAT_DEFLATE_CODEC;
39+
import static org.opensearch.index.codec.customcodecs.CustomCodecService.QAT_LZ4_CODEC;
3640
import static org.opensearch.index.codec.customcodecs.CustomCodecService.ZSTD_CODEC;
3741
import static org.opensearch.index.codec.customcodecs.CustomCodecService.ZSTD_NO_DICT_CODEC;
42+
import static org.hamcrest.Matchers.is;
43+
import static org.junit.Assume.assumeThat;
3844

3945
public class CreateIndexWithCodecIT extends OpenSearchRestTestCase {
4046
public void testCreateIndexWithZstdCodec() throws IOException {
@@ -58,6 +64,29 @@ public void testCreateIndexWithZstdCodec() throws IOException {
5864
}
5965
}
6066

67+
public void testCreateIndexWithQatCodec() throws IOException {
68+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
69+
70+
final String index = "custom-codecs-test-index";
71+
72+
// creating index
73+
createIndex(
74+
index,
75+
Settings.builder()
76+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
77+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
78+
.put("index.codec", randomFrom(QAT_DEFLATE_CODEC, QAT_LZ4_CODEC))
79+
.put("index.codec.compression_level", randomIntBetween(1, 6))
80+
.build()
81+
);
82+
83+
try {
84+
ensureGreen(index);
85+
} finally {
86+
deleteIndex(index);
87+
}
88+
}
89+
6190
@Override
6291
protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException {
6392
RestClientBuilder builder = RestClient.builder(hosts);

src/internalClusterTest/java/org/opensearch/index/codec/CodecCompressionLevelIT.java

+131
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,21 @@
1313
import org.opensearch.cluster.metadata.IndexMetadata;
1414
import org.opensearch.common.settings.Settings;
1515
import org.opensearch.index.codec.customcodecs.CustomCodecPlugin;
16+
import org.opensearch.index.codec.customcodecs.QatZipperFactory;
1617
import org.opensearch.plugins.Plugin;
1718
import org.opensearch.test.OpenSearchIntegTestCase;
1819

1920
import java.util.Collection;
2021
import java.util.Collections;
2122
import java.util.concurrent.ExecutionException;
2223

24+
import static org.opensearch.index.codec.customcodecs.CustomCodecService.QAT_DEFLATE_CODEC;
25+
import static org.opensearch.index.codec.customcodecs.CustomCodecService.QAT_LZ4_CODEC;
2326
import static org.opensearch.index.codec.customcodecs.CustomCodecService.ZSTD_CODEC;
2427
import static org.opensearch.index.codec.customcodecs.CustomCodecService.ZSTD_NO_DICT_CODEC;
2528
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
29+
import static org.hamcrest.Matchers.is;
30+
import static org.junit.Assume.assumeThat;
2631

2732
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
2833
public class CodecCompressionLevelIT extends OpenSearchIntegTestCase {
@@ -80,6 +85,26 @@ public void testZStandardCodecsCreateIndexWithCompressionLevel() {
8085
ensureGreen(index);
8186
}
8287

88+
public void testQatCodecsCreateIndexWithCompressionLevel() {
89+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
90+
91+
internalCluster().ensureAtLeastNumDataNodes(1);
92+
final String index = "test-index";
93+
94+
// creating index
95+
createIndex(
96+
index,
97+
Settings.builder()
98+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
99+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
100+
.put("index.codec", randomFrom(QAT_DEFLATE_CODEC, QAT_LZ4_CODEC))
101+
.put("index.codec.compression_level", randomIntBetween(1, 6))
102+
.build()
103+
);
104+
105+
ensureGreen(index);
106+
}
107+
83108
public void testZStandardToLuceneCodecsWithCompressionLevel() throws ExecutionException, InterruptedException {
84109

85110
internalCluster().ensureAtLeastNumDataNodes(1);
@@ -132,6 +157,59 @@ public void testZStandardToLuceneCodecsWithCompressionLevel() throws ExecutionEx
132157
ensureGreen(index);
133158
}
134159

160+
public void testQatToLuceneCodecsWithCompressionLevel() throws ExecutionException, InterruptedException {
161+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
162+
163+
internalCluster().ensureAtLeastNumDataNodes(1);
164+
final String index = "test-index";
165+
166+
// creating index
167+
createIndex(
168+
index,
169+
Settings.builder()
170+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
171+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
172+
.put("index.codec", randomFrom(QAT_DEFLATE_CODEC, QAT_LZ4_CODEC))
173+
.put("index.codec.compression_level", randomIntBetween(1, 6))
174+
.build()
175+
);
176+
ensureGreen(index);
177+
178+
assertAcked(client().admin().indices().prepareClose(index).setWaitForActiveShards(1));
179+
180+
Throwable executionException = expectThrows(
181+
ExecutionException.class,
182+
() -> client().admin()
183+
.indices()
184+
.updateSettings(
185+
new UpdateSettingsRequest(index).settings(
186+
Settings.builder().put("index.codec", randomFrom(CodecService.DEFAULT_CODEC, CodecService.BEST_COMPRESSION_CODEC))
187+
)
188+
)
189+
.get()
190+
);
191+
192+
Throwable rootCause = Throwables.getRootCause(executionException);
193+
assertEquals(IllegalArgumentException.class, rootCause.getClass());
194+
assertTrue(rootCause.getMessage().startsWith("Compression level cannot be set"));
195+
196+
assertAcked(
197+
client().admin()
198+
.indices()
199+
.updateSettings(
200+
new UpdateSettingsRequest(index).settings(
201+
Settings.builder()
202+
.put("index.codec", randomFrom(CodecService.DEFAULT_CODEC, CodecService.BEST_COMPRESSION_CODEC))
203+
.put("index.codec.compression_level", (String) null)
204+
)
205+
)
206+
.get()
207+
);
208+
209+
assertAcked(client().admin().indices().prepareOpen(index).setWaitForActiveShards(1));
210+
ensureGreen(index);
211+
}
212+
135213
public void testLuceneToZStandardCodecsWithCompressionLevel() throws ExecutionException, InterruptedException {
136214

137215
internalCluster().ensureAtLeastNumDataNodes(1);
@@ -185,4 +263,57 @@ public void testLuceneToZStandardCodecsWithCompressionLevel() throws ExecutionEx
185263
ensureGreen(index);
186264
}
187265

266+
public void testLuceneToQatCodecsWithCompressionLevel() throws ExecutionException, InterruptedException {
267+
assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true));
268+
269+
internalCluster().ensureAtLeastNumDataNodes(1);
270+
final String index = "test-index";
271+
272+
// creating index
273+
createIndex(
274+
index,
275+
Settings.builder()
276+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
277+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
278+
.put("index.codec", randomFrom(CodecService.DEFAULT_CODEC, CodecService.BEST_COMPRESSION_CODEC))
279+
.build()
280+
);
281+
ensureGreen(index);
282+
283+
assertAcked(client().admin().indices().prepareClose(index).setWaitForActiveShards(1));
284+
285+
Throwable executionException = expectThrows(
286+
ExecutionException.class,
287+
() -> client().admin()
288+
.indices()
289+
.updateSettings(
290+
new UpdateSettingsRequest(index).settings(
291+
Settings.builder()
292+
.put("index.codec", randomFrom(CodecService.DEFAULT_CODEC, CodecService.BEST_COMPRESSION_CODEC))
293+
.put("index.codec.compression_level", randomIntBetween(1, 6))
294+
)
295+
)
296+
.get()
297+
);
298+
299+
Throwable rootCause = Throwables.getRootCause(executionException);
300+
assertEquals(IllegalArgumentException.class, rootCause.getClass());
301+
assertTrue(rootCause.getMessage().startsWith("Compression level cannot be set"));
302+
303+
assertAcked(
304+
client().admin()
305+
.indices()
306+
.updateSettings(
307+
new UpdateSettingsRequest(index).settings(
308+
Settings.builder()
309+
.put("index.codec", randomFrom(QAT_DEFLATE_CODEC, QAT_LZ4_CODEC))
310+
.put("index.codec.compression_level", randomIntBetween(1, 6))
311+
)
312+
)
313+
.get()
314+
);
315+
316+
assertAcked(client().admin().indices().prepareOpen(index).setWaitForActiveShards(1));
317+
ensureGreen(index);
318+
}
188319
}

src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecMergeIT.java

+16-15
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.opensearch.cluster.metadata.IndexMetadata;
1717
import org.opensearch.common.settings.Settings;
1818
import org.opensearch.index.codec.customcodecs.CustomCodecPlugin;
19+
import org.opensearch.index.codec.customcodecs.QatZipperFactory;
1920
import org.opensearch.index.engine.Segment;
2021
import org.opensearch.plugins.Plugin;
2122
import org.opensearch.test.OpenSearchIntegTestCase;
@@ -24,6 +25,7 @@
2425
import java.util.Arrays;
2526
import java.util.Collection;
2627
import java.util.Collections;
28+
import java.util.HashMap;
2729
import java.util.List;
2830
import java.util.Map;
2931
import java.util.UUID;
@@ -51,25 +53,24 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
5153

5254
public void testForceMergeMultipleCodecs() throws ExecutionException, InterruptedException {
5355

54-
Map<String, String> codecMap = Map.of(
55-
"best_compression",
56-
"BEST_COMPRESSION",
57-
"zlib",
58-
"BEST_COMPRESSION",
59-
"zstd_no_dict",
60-
"ZSTD_NO_DICT",
61-
"zstd",
62-
"ZSTD",
63-
"default",
64-
"BEST_SPEED",
65-
"lz4",
66-
"BEST_SPEED"
67-
);
56+
Map<String, String> codecMap = new HashMap<String, String>() {
57+
{
58+
put("best_compression", "BEST_COMPRESSION");
59+
put("zlib", "BEST_COMPRESSION");
60+
put("zstd_no_dict", "ZSTD_NO_DICT");
61+
put("zstd", "ZSTD");
62+
put("default", "BEST_SPEED");
63+
put("lz4", "BEST_SPEED");
64+
if (QatZipperFactory.isQatAvailable()) {
65+
put("qat_lz4", "QAT_LZ4");
66+
put("qat_deflate", "QAT_DEFLATE");
67+
}
68+
}
69+
};
6870

6971
for (Map.Entry<String, String> codec : codecMap.entrySet()) {
7072
forceMergeMultipleCodecs(codec.getKey(), codec.getValue(), codecMap);
7173
}
72-
7374
}
7475

7576
private void forceMergeMultipleCodecs(String finalCodec, String finalCodecMode, Map<String, String> codecMap) throws ExecutionException,

src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecPlugin.java

+18-6
Original file line numberDiff line numberDiff line change
@@ -8,28 +8,32 @@
88

99
package org.opensearch.index.codec.customcodecs;
1010

11+
import org.opensearch.common.settings.Setting;
1112
import org.opensearch.index.IndexSettings;
1213
import org.opensearch.index.codec.CodecServiceFactory;
1314
import org.opensearch.index.engine.EngineConfig;
1415
import org.opensearch.plugins.EnginePlugin;
1516
import org.opensearch.plugins.Plugin;
1617

18+
import java.util.Arrays;
19+
import java.util.List;
1720
import java.util.Optional;
1821

1922
/**
2023
* A plugin that implements custom codecs. Supports these codecs:
24+
*
2125
* <ul>
22-
* <li>ZSTD
23-
* <li>ZSTDNODICT
26+
* <li>ZSTD_CODEC
27+
* <li>ZSTD_NO_DICT_CODEC
28+
* <li>QAT_LZ4
29+
* <li>QAT_DEFLATE
2430
* </ul>
2531
*
2632
* @opensearch.internal
2733
*/
2834
public final class CustomCodecPlugin extends Plugin implements EnginePlugin {
2935

30-
/**
31-
* Creates a new instance
32-
*/
36+
/** Creates a new instance */
3337
public CustomCodecPlugin() {}
3438

3539
/**
@@ -39,9 +43,17 @@ public CustomCodecPlugin() {}
3943
@Override
4044
public Optional<CodecServiceFactory> getCustomCodecServiceFactory(final IndexSettings indexSettings) {
4145
String codecName = indexSettings.getValue(EngineConfig.INDEX_CODEC_SETTING);
42-
if (codecName.equals(CustomCodecService.ZSTD_NO_DICT_CODEC) || codecName.equals(CustomCodecService.ZSTD_CODEC)) {
46+
if (codecName.equals(CustomCodecService.ZSTD_NO_DICT_CODEC)
47+
|| codecName.equals(CustomCodecService.ZSTD_CODEC)
48+
|| codecName.equals(CustomCodecService.QAT_LZ4_CODEC)
49+
|| codecName.equals(CustomCodecService.QAT_DEFLATE_CODEC)) {
4350
return Optional.of(new CustomCodecServiceFactory());
4451
}
4552
return Optional.empty();
4653
}
54+
55+
@Override
56+
public List<Setting<?>> getSettings() {
57+
return Arrays.asList(Lucene99QatCodec.INDEX_CODEC_QAT_MODE_SETTING);
58+
}
4759
}

0 commit comments

Comments
 (0)