Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-817, ORC-1088: Support ZStandard compression using zstd-jni #1743

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions java/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
Expand Down
8 changes: 8 additions & 0 deletions java/core/src/java/org/apache/orc/OrcConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ public enum OrcConf {
"Define the compression strategy to use while writing data.\n" +
"This changes the compression level of higher level compression\n" +
"codec (like ZLIB)."),
COMPRESSION_ZSTD_LEVEL("orc.compression.zstd.level",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also fix ORC-1088

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

source table: ORC zlib 4408374802439 4TB

zstd-jni

orc.compression.zstd.level=3 (default)
zstd compress size: 3119313447131 2905G

orc.compression.zstd.level=10
zstd compress size: 2621369844393 2441G

aircompressor

zstd compress size: 3138804372295 2923G

"hive.exec.orc.compression.zstd.level", 1,
"Define the compression level to use with ZStandard codec "
+ "while writing data. The valid range is 1~22"),
COMPRESSION_ZSTD_WINDOWLOG("orc.compression.zstd.windowlog",
"hive.exec.orc.compression.zstd.windowlog", 0,
"Set the maximum allowed back-reference distance for "
+ "ZStandard codec, expressed as power of 2."),
BLOCK_PADDING_TOLERANCE("orc.block.padding.tolerance",
"hive.exec.orc.block.padding.tolerance", 0.05,
"Define the tolerance for block padding as a decimal fraction of\n" +
Expand Down
32 changes: 32 additions & 0 deletions java/core/src/java/org/apache/orc/OrcFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,27 @@ public static BloomFilterVersion fromString(String s) {
}
}

public static class ZstdCompressOptions {
private int compressionZstdLevel;
private int compressionZstdWindowLog;

public int getCompressionZstdLevel() {
return compressionZstdLevel;
}

public void setCompressionZstdLevel(int compressionZstdLevel) {
this.compressionZstdLevel = compressionZstdLevel;
}

public int getCompressionZstdWindowLog() {
return compressionZstdWindowLog;
}

public void setCompressionZstdWindowLog(int compressionZstdWindowLog) {
this.compressionZstdWindowLog = compressionZstdWindowLog;
}
}

/**
* Options for creating ORC file writers.
*/
Expand All @@ -447,6 +468,7 @@ public static class WriterOptions implements Cloneable {
private WriterCallback callback;
private EncodingStrategy encodingStrategy;
private CompressionStrategy compressionStrategy;
private ZstdCompressOptions zstdCompressOptions;
private double paddingTolerance;
private String bloomFilterColumns;
private double bloomFilterFpp;
Expand Down Expand Up @@ -493,6 +515,12 @@ protected WriterOptions(Properties tableProperties, Configuration conf) {
OrcConf.COMPRESSION_STRATEGY.getString(tableProperties, conf);
compressionStrategy = CompressionStrategy.valueOf(compString);

zstdCompressOptions = new ZstdCompressOptions();
zstdCompressOptions.setCompressionZstdLevel(
OrcConf.COMPRESSION_ZSTD_LEVEL.getInt(tableProperties, conf));
zstdCompressOptions.setCompressionZstdWindowLog(
OrcConf.COMPRESSION_ZSTD_WINDOWLOG.getInt(tableProperties, conf));

paddingTolerance =
OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(tableProperties, conf);

Expand Down Expand Up @@ -938,6 +966,10 @@ public EncodingStrategy getEncodingStrategy() {
return encodingStrategy;
}

public ZstdCompressOptions getZstdCompressOptions() {
return zstdCompressOptions;
}

public double getPaddingTolerance() {
return paddingTolerance;
}
Expand Down
12 changes: 11 additions & 1 deletion java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,18 @@ public PhysicalFsWriter(FSDataOutputStream outputStream,
}
CompressionCodec codec = OrcCodecPool.getCodec(opts.getCompress());
if (codec != null){
compress.withCodec(codec, codec.getDefaultOptions());
CompressionCodec.Options tempOptions = codec.getDefaultOptions();
if (codec instanceof ZstdCodec &&
codec.getDefaultOptions() instanceof ZstdCodec.ZstdOptions options) {
OrcFile.ZstdCompressOptions zstdCompressOptions = opts.getZstdCompressOptions();
if (zstdCompressOptions != null) {
options.setLevel(zstdCompressOptions.getCompressionZstdLevel());
options.setWindowLog(zstdCompressOptions.getCompressionZstdWindowLog());
}
}
compress.withCodec(codec, tempOptions);
}

this.compressionStrategy = opts.getCompressionStrategy();
this.maxPadding = (int) (opts.getPaddingTolerance() * defaultStripeSize);
this.blockSize = opts.getBlockSize();
Expand Down
24 changes: 22 additions & 2 deletions java/core/src/java/org/apache/orc/impl/WriterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.orc.impl;

import com.github.luben.zstd.util.Native;
import com.google.protobuf.ByteString;
import io.airlift.compress.lz4.Lz4Compressor;
import io.airlift.compress.lz4.Lz4Decompressor;
Expand Down Expand Up @@ -273,6 +274,17 @@ private static int getClosestBufferSize(int size) {
return Math.min(kb256, Math.max(kb4, pow2));
}

static {
try {
if (!"java".equalsIgnoreCase(System.getProperty("orc.compression.zstd.impl"))) {
Native.load();
}
} catch (UnsatisfiedLinkError | ExceptionInInitializerError e) {
LOG.warn("Unable to load zstd-jni library for your platform. " +
"Using builtin-java classes where applicable");
}
}

public static CompressionCodec createCodec(CompressionKind kind) {
switch (kind) {
case NONE:
Expand All @@ -288,8 +300,16 @@ public static CompressionCodec createCodec(CompressionKind kind) {
return new AircompressorCodec(kind, new Lz4Compressor(),
new Lz4Decompressor());
case ZSTD:
return new AircompressorCodec(kind, new ZstdCompressor(),
new ZstdDecompressor());
if ("java".equalsIgnoreCase(System.getProperty("orc.compression.zstd.impl"))) {
return new AircompressorCodec(kind, new ZstdCompressor(),
new ZstdDecompressor());
}
if (Native.isLoaded()) {
return new ZstdCodec();
} else {
return new AircompressorCodec(kind, new ZstdCompressor(),
new ZstdDecompressor());
}
case BROTLI:
return new BrotliCodec();
default:
Expand Down
Loading