Skip to content

Commit 4e14cb3

Browse files
cxzl25dongjoon-hyundchristleguiyanakuang
committed
ORC-817, ORC-1088: Support ZStandard compression using zstd-jni
### What changes were proposed in this pull request? Original PR: #988 Original author: dchristle This PR will support the use of [zstd-jni](https://github.com/luben/zstd-jni) library as the implementation of ORC zstd, with better performance than [aircompressor](https://github.com/airlift/aircompressor). (#988 (comment)) This PR also exposes the compression level and "long mode" settings to ORC users. These settings allow the user to select different speed/compression trade-offs that were not supported by the original aircompressor. - Add zstd-jni dependency, and add a new CompressionCodec ZstdCodec that uses it. Add ORC conf to set compression level. - Add ORC conf to use long mode, and add configuration setters for windowLog. - Add tests that verify the correctness of writing and reading across compression levels, window sizes, and long mode use. - Add test for compatibility between Zstd aircompressor and zstd-jni implementations. ### Why are the changes needed? These change makes sense for a few reasons: ORC users will gain all the improvements from the main zstd library. It is under active development and receives regular speed and compression improvements. In contrast, aircompressor's zstd implementation is older and stale. ORC users will be able to use the entire speed/compression tradeoff space. Today, aircompressor's implementation has only one of eight compression strategies ([link](https://github.com/airlift/aircompressor/blob/c5e6972bd37e1d3834514957447028060a268eea/src/main/java/io/airlift/compress/zstd/CompressionParameters.java#L143)). This means only a small range of faster but less compressive strategies can be exposed to ORC users. ORC storage with high compression (e.g. for large-but-infrequently-used data) is a clear use case that this PR would unlock. It will harmonize the Java ORC implementation with other projects in the Hadoop ecosystem. Parquet, Spark, and even the C++ ORC reader/writers all rely on the official zstd implementation either via zstd-jni or directly. In this way, the Java reader/writer code is an outlier. Detection and fixing any bugs or regressions will generally happen much faster, given the larger number of users and active developer community of zstd and zstd-jni. The largest tradeoff is that zstd-jni wraps compiled code. That said, many microprocessor architectures are already targeted & bundled into zstd-jni, so this should be a rare hurdle. ### How was this patch tested? - Unit tests for reading and writing ORC files using a variety of compression levels, window logs, all pass. - Unit test to compress and decompress between aircompressor and zstd-jni passes. Note that the current aircompressor implementation uses a small subset of levels, so the test only compares data using the default compression settings. ### Was this patch authored or co-authored using generative AI tooling? No Closes #1743 from cxzl25/ORC-817. Lead-authored-by: sychen <sychen@ctrip.com> Co-authored-by: Dongjoon Hyun <dongjoon@apache.org> Co-authored-by: David Christle <dchristle@squareup.com> Co-authored-by: Yiqun Zhang <guiyanakuang@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 33be571) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent fcdac8a commit 4e14cb3

File tree

9 files changed

+477
-45
lines changed

9 files changed

+477
-45
lines changed

java/core/pom.xml

+4
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@
5151
<groupId>io.airlift</groupId>
5252
<artifactId>aircompressor</artifactId>
5353
</dependency>
54+
<dependency>
55+
<groupId>com.github.luben</groupId>
56+
<artifactId>zstd-jni</artifactId>
57+
</dependency>
5458
<dependency>
5559
<groupId>org.apache.hadoop</groupId>
5660
<artifactId>hadoop-client-api</artifactId>

java/core/src/java/org/apache/orc/OrcConf.java

+8
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,14 @@ public enum OrcConf {
7272
"Define the compression strategy to use while writing data.\n" +
7373
"This changes the compression level of higher level compression\n" +
7474
"codec (like ZLIB)."),
75+
COMPRESSION_ZSTD_LEVEL("orc.compression.zstd.level",
76+
"hive.exec.orc.compression.zstd.level", 1,
77+
"Define the compression level to use with ZStandard codec "
78+
+ "while writing data. The valid range is 1~22"),
79+
COMPRESSION_ZSTD_WINDOWLOG("orc.compression.zstd.windowlog",
80+
"hive.exec.orc.compression.zstd.windowlog", 0,
81+
"Set the maximum allowed back-reference distance for "
82+
+ "ZStandard codec, expressed as power of 2."),
7583
BLOCK_PADDING_TOLERANCE("orc.block.padding.tolerance",
7684
"hive.exec.orc.block.padding.tolerance", 0.05,
7785
"Define the tolerance for block padding as a decimal fraction of\n" +

java/core/src/java/org/apache/orc/OrcFile.java

+32
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,27 @@ public static BloomFilterVersion fromString(String s) {
426426
}
427427
}
428428

429+
public static class ZstdCompressOptions {
430+
private int compressionZstdLevel;
431+
private int compressionZstdWindowLog;
432+
433+
public int getCompressionZstdLevel() {
434+
return compressionZstdLevel;
435+
}
436+
437+
public void setCompressionZstdLevel(int compressionZstdLevel) {
438+
this.compressionZstdLevel = compressionZstdLevel;
439+
}
440+
441+
public int getCompressionZstdWindowLog() {
442+
return compressionZstdWindowLog;
443+
}
444+
445+
public void setCompressionZstdWindowLog(int compressionZstdWindowLog) {
446+
this.compressionZstdWindowLog = compressionZstdWindowLog;
447+
}
448+
}
449+
429450
/**
430451
* Options for creating ORC file writers.
431452
*/
@@ -447,6 +468,7 @@ public static class WriterOptions implements Cloneable {
447468
private WriterCallback callback;
448469
private EncodingStrategy encodingStrategy;
449470
private CompressionStrategy compressionStrategy;
471+
private ZstdCompressOptions zstdCompressOptions;
450472
private double paddingTolerance;
451473
private String bloomFilterColumns;
452474
private double bloomFilterFpp;
@@ -493,6 +515,12 @@ protected WriterOptions(Properties tableProperties, Configuration conf) {
493515
OrcConf.COMPRESSION_STRATEGY.getString(tableProperties, conf);
494516
compressionStrategy = CompressionStrategy.valueOf(compString);
495517

518+
zstdCompressOptions = new ZstdCompressOptions();
519+
zstdCompressOptions.setCompressionZstdLevel(
520+
OrcConf.COMPRESSION_ZSTD_LEVEL.getInt(tableProperties, conf));
521+
zstdCompressOptions.setCompressionZstdWindowLog(
522+
OrcConf.COMPRESSION_ZSTD_WINDOWLOG.getInt(tableProperties, conf));
523+
496524
paddingTolerance =
497525
OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(tableProperties, conf);
498526

@@ -938,6 +966,10 @@ public EncodingStrategy getEncodingStrategy() {
938966
return encodingStrategy;
939967
}
940968

969+
public ZstdCompressOptions getZstdCompressOptions() {
970+
return zstdCompressOptions;
971+
}
972+
941973
public double getPaddingTolerance() {
942974
return paddingTolerance;
943975
}

java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,18 @@ public PhysicalFsWriter(FSDataOutputStream outputStream,
115115
}
116116
CompressionCodec codec = OrcCodecPool.getCodec(opts.getCompress());
117117
if (codec != null){
118-
compress.withCodec(codec, codec.getDefaultOptions());
118+
CompressionCodec.Options tempOptions = codec.getDefaultOptions();
119+
if (codec instanceof ZstdCodec &&
120+
codec.getDefaultOptions() instanceof ZstdCodec.ZstdOptions options) {
121+
OrcFile.ZstdCompressOptions zstdCompressOptions = opts.getZstdCompressOptions();
122+
if (zstdCompressOptions != null) {
123+
options.setLevel(zstdCompressOptions.getCompressionZstdLevel());
124+
options.setWindowLog(zstdCompressOptions.getCompressionZstdWindowLog());
125+
}
126+
}
127+
compress.withCodec(codec, tempOptions);
119128
}
129+
120130
this.compressionStrategy = opts.getCompressionStrategy();
121131
this.maxPadding = (int) (opts.getPaddingTolerance() * defaultStripeSize);
122132
this.blockSize = opts.getBlockSize();

java/core/src/java/org/apache/orc/impl/WriterImpl.java

+22-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.orc.impl;
2020

21+
import com.github.luben.zstd.util.Native;
2122
import com.google.protobuf.ByteString;
2223
import io.airlift.compress.lz4.Lz4Compressor;
2324
import io.airlift.compress.lz4.Lz4Decompressor;
@@ -273,6 +274,17 @@ private static int getClosestBufferSize(int size) {
273274
return Math.min(kb256, Math.max(kb4, pow2));
274275
}
275276

277+
static {
278+
try {
279+
if (!"java".equalsIgnoreCase(System.getProperty("orc.compression.zstd.impl"))) {
280+
Native.load();
281+
}
282+
} catch (UnsatisfiedLinkError | ExceptionInInitializerError e) {
283+
LOG.warn("Unable to load zstd-jni library for your platform. " +
284+
"Using builtin-java classes where applicable");
285+
}
286+
}
287+
276288
public static CompressionCodec createCodec(CompressionKind kind) {
277289
switch (kind) {
278290
case NONE:
@@ -288,8 +300,16 @@ public static CompressionCodec createCodec(CompressionKind kind) {
288300
return new AircompressorCodec(kind, new Lz4Compressor(),
289301
new Lz4Decompressor());
290302
case ZSTD:
291-
return new AircompressorCodec(kind, new ZstdCompressor(),
292-
new ZstdDecompressor());
303+
if ("java".equalsIgnoreCase(System.getProperty("orc.compression.zstd.impl"))) {
304+
return new AircompressorCodec(kind, new ZstdCompressor(),
305+
new ZstdDecompressor());
306+
}
307+
if (Native.isLoaded()) {
308+
return new ZstdCodec();
309+
} else {
310+
return new AircompressorCodec(kind, new ZstdCompressor(),
311+
new ZstdDecompressor());
312+
}
293313
case BROTLI:
294314
return new BrotliCodec();
295315
default:

0 commit comments

Comments
 (0)