Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-817: Replace aircompressor ZStandard compression with zstd-jni #988

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions java/core/pom.xml
Original file line number Diff line number Diff line change
@@ -51,6 +51,10 @@
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
12 changes: 12 additions & 0 deletions java/core/src/java/org/apache/orc/OrcConf.java
Original file line number Diff line number Diff line change
@@ -65,6 +65,18 @@ public enum OrcConf {
"Define the compression strategy to use while writing data.\n" +
"This changes the compression level of higher level compression\n" +
"codec (like ZLIB)."),
COMPRESSION_ZSTD_LEVEL("orc.compression.zstd.level",
"hive.exec.orc.compression.zstd.level", 3,
"Define the compression level to use with ZStandard codec "
+ "while writing data."),
COMPRESSION_ZSTD_WINDOWLOG("orc.compression.zstd.windowlog",
"hive.exec.orc.compression.zstd.windowlog", 0,
"Set the maximum allowed back-reference distance for "
+ "ZStandard codec, expressed as power of 2."),
COMPRESSION_ZSTD_LONGMODE("orc.compression.zstd.longmode",
"hive.exec.orc.compression.zstd.longmode", false,
"If enabled, the Zstandard codec will employ long mode during "
+ "compression."),
BLOCK_PADDING_TOLERANCE("orc.block.padding.tolerance",
"hive.exec.orc.block.padding.tolerance", 0.05,
"Define the tolerance for block padding as a decimal fraction of\n" +
24 changes: 24 additions & 0 deletions java/core/src/java/org/apache/orc/OrcFile.java
Original file line number Diff line number Diff line change
@@ -441,6 +441,9 @@ public static class WriterOptions implements Cloneable {
private WriterCallback callback;
private EncodingStrategy encodingStrategy;
private CompressionStrategy compressionStrategy;
private int compressionZstdLevel;
private int compressionZstdWindowLog;
private boolean compressionZstdLongMode;
private double paddingTolerance;
private String bloomFilterColumns;
private double bloomFilterFpp;
@@ -485,6 +488,15 @@ protected WriterOptions(Properties tableProperties, Configuration conf) {
OrcConf.COMPRESSION_STRATEGY.getString(tableProperties, conf);
compressionStrategy = CompressionStrategy.valueOf(compString);

compressionZstdLevel =
OrcConf.COMPRESSION_ZSTD_LEVEL.getInt(tableProperties, conf);

compressionZstdWindowLog =
OrcConf.COMPRESSION_ZSTD_WINDOWLOG.getInt(tableProperties, conf);

compressionZstdLongMode =
OrcConf.COMPRESSION_ZSTD_LONGMODE.getBoolean(tableProperties, conf);

paddingTolerance =
OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(tableProperties, conf);

@@ -903,6 +915,18 @@ public CompressionStrategy getCompressionStrategy() {
return compressionStrategy;
}

public int getCompressionZstdLevel() {
return compressionZstdLevel;
}

public int getCompressionZstdWindowLog() {
return compressionZstdWindowLog;
}

public boolean getCompressionZstdLongMode() {
return compressionZstdLongMode;
}

public EncodingStrategy getEncodingStrategy() {
return encodingStrategy;
}
4 changes: 3 additions & 1 deletion java/core/src/java/org/apache/orc/impl/OutStream.java
Original file line number Diff line number Diff line change
@@ -280,9 +280,11 @@ private void spill() throws java.io.IOException {
outputBuffer(current);
getNewInputBuffer();
} else {
// Make sure both compressed and overflow are not null before passing to compress
if (compressed == null) {
compressed = getNewOutputBuffer();
} else if (overflow == null) {
}
if (overflow == null) {
overflow = getNewOutputBuffer();
}
int sizePosn = compressed.position();
11 changes: 10 additions & 1 deletion java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
Original file line number Diff line number Diff line change
@@ -104,8 +104,17 @@ public PhysicalFsWriter(FileSystem fs,
}
CompressionCodec codec = OrcCodecPool.getCodec(opts.getCompress());
if (codec != null){
compress.withCodec(codec, codec.getDefaultOptions());
// HACK: Fetch Zstd-specific options and set them here -- need to find a
// better way before merging.
CompressionCodec.Options tempOptions = codec.getDefaultOptions();
if (codec instanceof ZstdCodec) {
((ZstdCodec.ZstdOptions) tempOptions).setLevel(opts.getCompressionZstdLevel());
((ZstdCodec.ZstdOptions) tempOptions).setWindowLog(opts.getCompressionZstdWindowLog());
((ZstdCodec.ZstdOptions) tempOptions).setLongMode(opts.getCompressionZstdLongMode());
}
compress.withCodec(codec, tempOptions);
}

this.compressionStrategy = opts.getCompressionStrategy();
this.maxPadding = (int) (opts.getPaddingTolerance() * defaultStripeSize);
this.blockSize = opts.getBlockSize();
9 changes: 5 additions & 4 deletions java/core/src/java/org/apache/orc/impl/WriterImpl.java
Original file line number Diff line number Diff line change
@@ -23,8 +23,6 @@
import io.airlift.compress.lz4.Lz4Decompressor;
import io.airlift.compress.lzo.LzoCompressor;
import io.airlift.compress.lzo.LzoDecompressor;
import io.airlift.compress.zstd.ZstdCompressor;
import io.airlift.compress.zstd.ZstdDecompressor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -126,6 +124,8 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
private final boolean[] directEncodingColumns;
private final List<OrcProto.ColumnEncoding> unencryptedEncodings =
new ArrayList<>();
private final int compressionZstdLevel;
private final int compressionZstdWindowLog;

// the list of maskDescriptions, keys, and variants
private SortedMap<String, MaskDescriptionImpl> maskDescriptions = new TreeMap<>();
@@ -181,6 +181,8 @@ public WriterImpl(FileSystem fs,

this.encodingStrategy = opts.getEncodingStrategy();
this.compressionStrategy = opts.getCompressionStrategy();
this.compressionZstdLevel = opts.getCompressionZstdLevel();
this.compressionZstdWindowLog = opts.getCompressionZstdWindowLog();

this.rowIndexStride = opts.getRowIndexStride();
buildIndex = rowIndexStride > 0;
@@ -278,8 +280,7 @@ public static CompressionCodec createCodec(CompressionKind kind) {
return new AircompressorCodec(kind, new Lz4Compressor(),
new Lz4Decompressor());
case ZSTD:
return new AircompressorCodec(kind, new ZstdCompressor(),
new ZstdDecompressor());
return new ZstdCodec();
default:
throw new IllegalArgumentException("Unknown compression codec: " +
kind);
343 changes: 343 additions & 0 deletions java/core/src/java/org/apache/orc/impl/ZstdCodec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,343 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.orc.impl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

import com.github.luben.zstd.Zstd;
import com.github.luben.zstd.ZstdCompressCtx;
import com.github.luben.zstd.ZstdDecompressCtx;

import org.apache.orc.CompressionCodec;
import org.apache.orc.CompressionKind;

public class ZstdCodec implements CompressionCodec {
private ZstdOptions zstdOptions = null;
private Boolean direct = null;
private ZstdCompressCtx zstdCompressCtx = null;
private ZstdDecompressCtx zstdDecompressCtx = null;

public ZstdCodec(int level, int windowLog, boolean longMode, boolean fixed) {
this.zstdOptions = new ZstdOptions(level, windowLog, longMode, fixed);
}

public ZstdCodec(int level, int windowLog) {
this(level, windowLog, false, false);
}

public ZstdCodec() {
this(3, 0);
}

// Thread local buffer
private static final ThreadLocal<byte[]> threadBuffer =
new ThreadLocal<byte[]>() {
@Override
protected byte[] initialValue() {
return null;
}
};

protected static byte[] getBuffer(int size) {
byte[] result = threadBuffer.get();
if (result == null || result.length < size || result.length > size * 2) {
result = new byte[size];
threadBuffer.set(result);
}
return result;
}

static class ZstdOptions implements Options {
private int level;
private int windowLog;
private boolean longMode;
private final boolean FIXED;

ZstdOptions(int level, int windowLog, boolean longMode, boolean FIXED) {
this.level = level;
this.windowLog = windowLog;
this.longMode = longMode;
this.FIXED = FIXED;
}

@Override
public ZstdOptions copy() {
return new ZstdOptions(level, windowLog, longMode, FIXED);
}

/**
* Sets the Zstandard long mode maximum back-reference distance, expressed
* as a power of 2.
*
* The value must be between ZSTD_WINDOWLOG_MIN (10) and ZSTD_WINDOWLOG_MAX
* (30 and 31 on 32/64-bit architectures, respectively).
*
* A value of 0 is a special value indicating to use the default
* ZSTD_WINDOWLOG_LIMIT_DEFAULT of 27, which corresponds to back-reference
* window size of 128MiB.
*
* @param newValue The desired power-of-2 value back-reference distance.
* @return
*/
public ZstdOptions setWindowLog(int newValue) {
if ((newValue < Zstd.windowLogMin() || newValue > Zstd.windowLogMax())
&& newValue != 0) {
throw new IllegalArgumentException(
String.format(
"Zstd compression window size should be in the range %d to %d,"
+ " or set to the default value of 0.",
Zstd.windowLogMin(),
Zstd.windowLogMax()));
}
windowLog = newValue;
return this;
}

/**
* Sets explicitly whether long mode is used. Note that long mode will be
* enabled by default in the underlying library if windowLog >= 128 MB and
* compression level is 16+ (compression strategy >= ZSTD_btopt).
*
* @param newValue A boolean indicating whether to explicitly use long mode.
* @return
*/
public ZstdOptions setLongMode(boolean newValue) {
longMode = newValue;
return this;
}

/**
* Sets the Zstandard compression codec compression level directly using
* the integer setting. This value is typically between 0 and 22, with
* larger numbers indicating more aggressive compression and lower speed.
* <p>
* This method provides additional granularity beyond the setSpeed method
* so that users can select a specific level.
*
* @param newValue The level value of compression to set.
* @return
*/
public ZstdOptions setLevel(int newValue) {
if (newValue < Zstd.minCompressionLevel()
|| newValue > Zstd.maxCompressionLevel()) {
throw new IllegalArgumentException(
String.format(
"Zstd compression level should be in the range %d to %d",
Zstd.minCompressionLevel(),
Zstd.maxCompressionLevel()));
}
level = newValue;
return this;
}

/**
* Sets the Zstandard compression codec compression level via the Enum
* (FASTEST, FAST, DEFAULT). The default value of 3 is the
* ZSTD_CLEVEL_DEFAULT level.
* <p>
* Alternatively, the compression level can be set directly with setLevel.
*
* @param newValue An Enum specifying how aggressively to compress.
* @return
*/
@Override
public ZstdOptions setSpeed(SpeedModifier newValue) {
if (FIXED) {
throw new IllegalStateException(
"Attempt to modify the default options");
}
switch (newValue) {
case FAST:
setLevel(2);
break;
case DEFAULT:
// zstd level 3 achieves good ratio/speed tradeoffs, and is the
// ZSTD_CLEVEL_DEFAULT level.
setLevel(3);
break;
case FASTEST:
// zstd level 1 is the fastest level.
setLevel(1);
break;
default:
break;
}
return this;
}

@Override
public ZstdOptions setData(DataKind newValue) {
return this; // We don't support setting DataKind in ZstdCodec.
}

@Override
public boolean equals(Object other) {
if (other == null || getClass() != other.getClass()) {
return false;
} else if (this == other) {
return true;
} else {
ZstdOptions otherOpts = (ZstdOptions) other;
return (level == otherOpts.level) &&
(windowLog == otherOpts.windowLog);
}
}

@Override
public int hashCode() {
return Objects.hash(level, windowLog, FIXED);
}
}

/**
*
*/
private static final ZstdOptions DEFAULT_OPTIONS =
new ZstdOptions(3, 0, false, false);

@Override
public Options getDefaultOptions() {
return DEFAULT_OPTIONS;
}

/**
* Compresses an input ByteBuffer into an output ByteBuffer using Zstandard
* compression. If the maximum bound of the number of output bytes exceeds
* the output ByteBuffer size, the remaining bytes are written to the overflow
* ByteBuffer.
*
* @param in the bytes to compress
* @param out the compressed bytes
* @param overflow put any additional bytes here
* @param options the options to control compression
* @return
*/
@Override
public boolean compress(ByteBuffer in, ByteBuffer out,
ByteBuffer overflow,
Options options) throws IOException {
ZstdOptions zlo = (ZstdOptions) options;
// TODO(@dchristle): Add case for when ByteBuffers are direct.

zstdCompressCtx = new ZstdCompressCtx();
zstdCompressCtx.setLevel(zlo.level);
zstdCompressCtx.setLong(zlo.windowLog);
zstdCompressCtx.setChecksum(false);

int inBytes = in.remaining();
int srcOffset = in.arrayOffset() + in.position();
int dstOffset = out.arrayOffset() + out.position();
int compressBound = (int) Zstd.compressBound(inBytes);
int dstSize = out.limit() - out.position();
long compressOutput;

if (dstSize < compressBound) {
// The detected output ByteBuffer is too small, based on the maximum
// compression estimate. Allocate a temporary buffer of the appropriate
// size.
byte[] compressed = new byte[compressBound];
int remaining = out.remaining();

compressOutput =
zstdCompressCtx.compressByteArray(compressed, 0, compressBound,
in.array(), srcOffset, inBytes);
if (Zstd.isError(compressOutput)) {
throw new IOException(String.format("Error code %s!", compressOutput));
}

if ((int) compressOutput <= remaining) {
// Single copy ok, no need for overflow
System.arraycopy(compressed, 0, out.array(), out.arrayOffset() +
out.position(), (int) compressOutput);
out.position(out.position() + (int) compressOutput);
} else {
// Single copy not OK, need to copy to both out and overflow
System.arraycopy(compressed, 0, out.array(), out.arrayOffset() +
out.position(), remaining);
out.position(out.limit());

System.arraycopy(compressed, remaining, overflow.array(),
overflow.arrayOffset(), (int) compressOutput - remaining);
overflow.position((int) compressOutput - remaining);
}
} else {
// Copy directly to output buffer
compressOutput =
Zstd.compressByteArray(out.array(), dstOffset, dstSize, in.array(),
srcOffset, inBytes, zlo.level);
if (Zstd.isError(compressOutput)) {
throw new IOException(String.format("Error code %s!", compressOutput));
}
out.position(dstOffset + (int) compressOutput);
}
zstdCompressCtx.close();
return inBytes > (int) compressOutput;
}

// TODO(dchristle): Do we need to add loops similar to ZlibCodec, e.g.
// "while (!deflater.finished() && (length > outSize)) { ..."
@Override
public void decompress(ByteBuffer in, ByteBuffer out) throws IOException {

if (zstdDecompressCtx == null) {
zstdDecompressCtx = new ZstdDecompressCtx();
}

int srcOffset = in.arrayOffset() + in.position();
int srcSize = in.remaining();
int dstOffset = out.arrayOffset() + out.position();
int dstSize = out.remaining() - dstOffset;

long decompressOut =
Zstd.decompressByteArray(out.array(), dstOffset, dstSize, in.array(),
srcOffset, srcSize);
if (Zstd.isError(decompressOut)) {
System.out.format("Error code %s!", decompressOut);
}
in.position(in.limit());
out.position(dstOffset + (int) decompressOut);
out.flip();
}

@Override
public void reset() {

}

@Override
public void destroy() {
if (zstdCompressCtx != null) {
zstdCompressCtx.close();
}
if (zstdDecompressCtx != null) {
zstdDecompressCtx.close();
}
}

@Override
public CompressionKind getKind() {
return CompressionKind.ZSTD;
}

@Override
public void close() {
OrcCodecPool.returnCodec(CompressionKind.ZSTD, this);
}
}
Original file line number Diff line number Diff line change
@@ -332,7 +332,7 @@ public void filterWithSeek() throws IOException {
}
FileSystem.Statistics stats = readEnd();
double readPercentage = readPercentage(stats, fs.getFileStatus(filePath).getLen());
assertTrue(readPercentage > 130);
assertTrue(readPercentage > 120);
}

private void seekToRow(RecordReader rr, VectorizedRowBatch b, long row) throws IOException {
103 changes: 65 additions & 38 deletions java/core/src/test/org/apache/orc/TestVectorOrcFile.java
Original file line number Diff line number Diff line change
@@ -2271,51 +2271,71 @@ public void testLz4(Version fileFormat) throws Exception {
}

/**
* Read and write a randomly generated zstd file.
* Write a randomly generated zstd-compressed file, read it back, and check
* that the output matches the input.
*
* Checks correctness across a variety of valid settings:
*
* * Negative, low, moderate, and high compression levels
* * Valid window sizes in [10-31], and default value of 0.
* * Long mode explicitly enabled and disabled.
*
* @throws Exception
*/
@ParameterizedTest
@MethodSource("data")
public void testZstd(Version fileFormat) throws Exception {
TypeDescription schema =
TypeDescription.fromString("struct<x:bigint,y:int,z:bigint>");
try (Writer writer = OrcFile.createWriter(testFilePath,
OrcFile.writerOptions(conf)
.setSchema(schema)
.compress(CompressionKind.ZSTD)
.bufferSize(1000)
.version(fileFormat))) {
VectorizedRowBatch batch = schema.createRowBatch();
Random rand = new Random(3);
batch.size = 1000;
for (int b = 0; b < 10; ++b) {
for (int r = 0; r < 1000; ++r) {
((LongColumnVector) batch.cols[0]).vector[r] = rand.nextInt();
((LongColumnVector) batch.cols[1]).vector[r] = b * 1000 + r;
((LongColumnVector) batch.cols[2]).vector[r] = rand.nextLong();
}
writer.addRowBatch(batch);
}
}
try (Reader reader = OrcFile.createReader(testFilePath,
OrcFile.readerOptions(conf).filesystem(fs));
RecordReader rows = reader.rows()) {
assertEquals(CompressionKind.ZSTD, reader.getCompressionKind());
VectorizedRowBatch batch = reader.getSchema().createRowBatch(1000);
Random rand = new Random(3);
for (int b = 0; b < 10; ++b) {
rows.nextBatch(batch);
assertEquals(1000, batch.size);
for (int r = 0; r < batch.size; ++r) {
assertEquals(rand.nextInt(),
((LongColumnVector) batch.cols[0]).vector[r]);
assertEquals(b * 1000 + r,
((LongColumnVector) batch.cols[1]).vector[r]);
assertEquals(rand.nextLong(),
((LongColumnVector) batch.cols[2]).vector[r]);

for (Integer level : new ArrayList<>(Arrays.asList(-4, -1, 0, 1, 3, 8, 12, 17, 22))) {
for (Integer windowLog : new ArrayList<>(Arrays.asList(0, 10, 20, 31))) {
for (Boolean longMode : new ArrayList<>(Arrays.asList(false, true))) {
OrcConf.COMPRESSION_ZSTD_LEVEL.setInt(conf, level);
OrcConf.COMPRESSION_ZSTD_WINDOWLOG.setInt(conf, windowLog);
OrcConf.COMPRESSION_ZSTD_LONGMODE.setBoolean(conf, longMode);
try (Writer writer = OrcFile.createWriter(testFilePath,
OrcFile.writerOptions(conf)
.setSchema(schema)
.compress(CompressionKind.ZSTD)
.bufferSize(1000)
.version(fileFormat))) {
VectorizedRowBatch batch = schema.createRowBatch();
Random rand = new Random(27182);
batch.size = 1000;
for (int b = 0; b < 10; ++b) {
for (int r = 0; r < 1000; ++r) {
((LongColumnVector) batch.cols[0]).vector[r] = rand.nextInt();
((LongColumnVector) batch.cols[1]).vector[r] = b * 1000 + r;
((LongColumnVector) batch.cols[2]).vector[r] = rand.nextLong();
}
writer.addRowBatch(batch);
}
}
try (Reader reader = OrcFile.createReader(testFilePath,
OrcFile.readerOptions(conf).filesystem(fs));
RecordReader rows = reader.rows()) {
assertEquals(CompressionKind.ZSTD, reader.getCompressionKind());
VectorizedRowBatch batch = reader.getSchema().createRowBatch(1000);
Random rand = new Random(27182);
for (int b = 0; b < 10; ++b) {
rows.nextBatch(batch);
assertEquals(1000, batch.size);
for (int r = 0; r < batch.size; ++r) {
assertEquals(rand.nextInt(),
((LongColumnVector) batch.cols[0]).vector[r]);
assertEquals(b * 1000 + r,
((LongColumnVector) batch.cols[1]).vector[r]);
assertEquals(rand.nextLong(),
((LongColumnVector) batch.cols[2]).vector[r]);
}
}
rows.nextBatch(batch);
assertEquals(0, batch.size);
}
fs.delete(testFilePath, false);
}
}
rows.nextBatch(batch);
assertEquals(0, batch.size);
}
}

@@ -2332,7 +2352,7 @@ public void testCodecPool(Version fileFormat) throws Exception {
WriterOptions opts = OrcFile.writerOptions(conf)
.setSchema(schema).stripeSize(1000).bufferSize(100).version(fileFormat);

CompressionCodec snappyCodec, zlibCodec;
CompressionCodec snappyCodec, zlibCodec, zstdCodec;
snappyCodec = writeBatchesAndGetCodec(10, 1000, opts.compress(CompressionKind.SNAPPY), batch);
assertEquals(1, OrcCodecPool.getPoolSize(CompressionKind.SNAPPY));
Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs));
@@ -2354,6 +2374,13 @@ public void testCodecPool(Version fileFormat) throws Exception {
assertEquals(1, OrcCodecPool.getPoolSize(CompressionKind.ZLIB));
assertSame(zlibCodec, codec);

zstdCodec = writeBatchesAndGetCodec(10, 1000, opts.compress(CompressionKind.ZSTD), batch);
assertNotSame(zlibCodec, zstdCodec);
assertEquals(1, OrcCodecPool.getPoolSize(CompressionKind.ZSTD));
codec = writeBatchesAndGetCodec(10, 1000, opts.compress(CompressionKind.ZSTD), batch);
assertEquals(1, OrcCodecPool.getPoolSize(CompressionKind.ZSTD));
assertSame(zstdCodec, codec);

assertSame(snappyCodec, OrcCodecPool.getCodec(CompressionKind.SNAPPY));
CompressionCodec snappyCodec2 = writeBatchesAndGetCodec(
10, 1000, opts.compress(CompressionKind.SNAPPY), batch);
124 changes: 113 additions & 11 deletions java/core/src/test/org/apache/orc/impl/TestZstd.java
Original file line number Diff line number Diff line change
@@ -18,28 +18,130 @@

package org.apache.orc.impl;

import com.github.luben.zstd.Zstd;
import com.github.luben.zstd.ZstdException;
import io.airlift.compress.zstd.ZstdCompressor;
import io.airlift.compress.zstd.ZstdDecompressor;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Random;
import org.apache.orc.CompressionCodec;
import org.apache.orc.CompressionKind;
import org.junit.jupiter.api.Test;

import java.nio.ByteBuffer;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.fail;

public class TestZstd {

/**
* Test that Zstandard compression does not overflow nor throw an
* exception when the allocated output array matches
* the estimated upper bound ZSTD_compressBound. Random byte inputs are
* used, which are a worst-case for output
* compression sizes.
*/
@Test
public void testNoOverflow() throws Exception {
ByteBuffer in = ByteBuffer.allocate(10);
ByteBuffer out = ByteBuffer.allocate(10);
in.put(new byte[]{1,2,3,4,5,6,7,10});
in.flip();
CompressionCodec codec = new AircompressorCodec(
CompressionKind.ZSTD, new ZstdCompressor(), new ZstdDecompressor());
assertFalse(codec.compress(in, out, null,
codec.getDefaultOptions()));
public void testZstdCodecNoOverflow() {
Random rd = new Random();
ArrayList<Integer> testInputDataSizes =
new ArrayList<>(Arrays.asList(8, 27, 182, 818, 28459));

testInputDataSizes.forEach(inputSize -> {
ByteBuffer in = ByteBuffer.allocate(inputSize);
ByteBuffer out =
ByteBuffer.allocate((int) Zstd.compressBound((long) inputSize));

byte[] arr = new byte[inputSize];
rd.nextBytes(arr);

in.put(arr);
in.flip();
CompressionCodec codec = new ZstdCodec();
boolean overflow;
try {
overflow = codec.compress(in, out, null, codec.getDefaultOptions());
} catch (IOException e) {
overflow = true;
}
assertFalse(overflow);
}
);
}

@Test
public void testCorrupt() throws Exception {
ByteBuffer buf = ByteBuffer.allocate(1000);
buf.put(new byte[] {127, 125, 1, 99, 98, 1});
buf.flip();
CompressionCodec codec = new ZstdCodec();
ByteBuffer out = ByteBuffer.allocate(1000);
try {
codec.decompress(buf, out);
fail();
} catch (ZstdException ioe) {
// EXPECTED
}
}

/**
* Test compatibility of zstd-jni and aircompressor Zstd implementations
* by checking that bytes compressed with one can be decompressed by the
* other when using the default options.
*/
@Test
public void testZstdAircompressorJniCompressDecompress() throws Exception {
int inputSize = 27182;
Random rd = new Random();

CompressionCodec zstdAircompressorCodec = new AircompressorCodec(
CompressionKind.ZSTD, new ZstdCompressor(), new ZstdDecompressor());
CompressionCodec zstdJniCodec = new ZstdCodec();

ByteBuffer sourceCompressorIn = ByteBuffer.allocate(inputSize);
ByteBuffer sourceCompressorOut =
ByteBuffer.allocate((int) Zstd.compressBound(inputSize));
ByteBuffer destCompressorOut = ByteBuffer.allocate(inputSize);

// Use an array half filled with a constant value & half filled with
// random values.
byte[] constantBytes = new byte[inputSize / 2];
java.util.Arrays.fill(constantBytes, 0, inputSize / 2, (byte) 2);
sourceCompressorIn.put(constantBytes);
byte[] randomBytes = new byte[inputSize - inputSize / 2];
rd.nextBytes(randomBytes);
sourceCompressorIn.put(randomBytes);
sourceCompressorIn.flip();

// Verify that input -> aircompressor compresson -> zstd-jni
// decompression returns the input.
// Note: This function returns false if the bytes get larger. But why is
// that a problem? sourceCompressorOut has the
// capacity.
zstdAircompressorCodec.compress(sourceCompressorIn, sourceCompressorOut,
null, zstdAircompressorCodec.getDefaultOptions());
sourceCompressorOut.flip();

zstdJniCodec.decompress(sourceCompressorOut, destCompressorOut);
assertEquals(sourceCompressorIn, destCompressorOut,
"aircompressor compression with zstd-jni decompression did not return"
+ " the input!");

sourceCompressorIn.rewind();
sourceCompressorOut.clear();
destCompressorOut.clear();

// Verify that input -> zstd-jni compresson -> aircompressor
// decompression returns the input.
zstdJniCodec.compress(sourceCompressorIn, sourceCompressorOut, null,
zstdJniCodec.getDefaultOptions());
sourceCompressorOut.flip();
zstdAircompressorCodec.decompress(sourceCompressorOut, destCompressorOut);
assertEquals(sourceCompressorIn, destCompressorOut,
"zstd-jni compression with aircompressor decompression did not return"
+ " the input!");
}
}
25 changes: 15 additions & 10 deletions java/pom.xml
Original file line number Diff line number Diff line change
@@ -27,12 +27,12 @@
<name>Apache ORC</name>
<url>https://orc.apache.org</url>
<description>
ORC is a self-describing type-aware columnar file format designed
for Hadoop workloads. It is optimized for large streaming reads,
but with integrated support for finding required rows
quickly. Storing data in a columnar format lets the reader read,
decompress, and process only the values that are required for the
current query.
ORC is a self-describing type-aware columnar file format designed
for Hadoop workloads. It is optimized for large streaming reads,
but with integrated support for finding required rows
quickly. Storing data in a columnar format lets the reader read,
decompress, and process only the values that are required for the
current query.
</description>
<inceptionYear>2013</inceptionYear>

@@ -343,8 +343,8 @@
<goal>run</goal>
</goals>
<configuration>
<protocArtifact>${protoc.artifact}</protocArtifact>
<protocVersion>2.5.0</protocVersion>
<protocArtifact>${protoc.artifact}</protocArtifact>
<protocVersion>2.5.0</protocVersion>
<addSources>none</addSources>
<includeDirectories>
<include>../../proto</include>
@@ -478,7 +478,7 @@

<dependencyManagement>
<dependencies>
<!-- intra-project depedencies -->
<!-- intra-project dependencies -->
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-shims</artifactId>
@@ -510,7 +510,7 @@
<version>1.8.0-SNAPSHOT</version>
</dependency>

<!-- inter-project depedencies -->
<!-- inter-project dependencies -->
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
@@ -557,6 +557,11 @@
<artifactId>aircompressor</artifactId>
<version>0.21</version>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>1.5.1-1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>