Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions java/core/src/java/org/apache/orc/OrcConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ public enum OrcConf {
"hive.exec.orc.compression.zstd.windowlog", 0,
"Set the maximum allowed back-reference distance for "
+ "ZStandard codec, expressed as power of 2."),
COMPRESSION_ZSTD_STRATEGY("orc.compression.zstd.strategy",
"hive.exec.orc.compression.zstd.strategy", 0,
"Define the compression strategy to use with ZStandard codec "
+ "while writing data. The valid range is 0~9."),
BLOCK_PADDING_TOLERANCE("orc.block.padding.tolerance",
"hive.exec.orc.block.padding.tolerance", 0.05,
"Define the tolerance for block padding as a decimal fraction of\n" +
Expand Down
11 changes: 11 additions & 0 deletions java/core/src/java/org/apache/orc/OrcFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ public static BloomFilterVersion fromString(String s) {
public static class ZstdCompressOptions {
private int compressionZstdLevel;
private int compressionZstdWindowLog;
private int compressionZstdStrategy;

public int getCompressionZstdLevel() {
return compressionZstdLevel;
Expand All @@ -445,6 +446,14 @@ public int getCompressionZstdWindowLog() {
public void setCompressionZstdWindowLog(int compressionZstdWindowLog) {
this.compressionZstdWindowLog = compressionZstdWindowLog;
}

public int getCompressionZstdStrategy() {
return compressionZstdStrategy;
}

public void setCompressionZstdStrategy(int compressionZstdStrategy) {
this.compressionZstdStrategy = compressionZstdStrategy;
}
}

/**
Expand Down Expand Up @@ -520,6 +529,8 @@ protected WriterOptions(Properties tableProperties, Configuration conf) {
OrcConf.COMPRESSION_ZSTD_LEVEL.getInt(tableProperties, conf));
zstdCompressOptions.setCompressionZstdWindowLog(
OrcConf.COMPRESSION_ZSTD_WINDOWLOG.getInt(tableProperties, conf));
zstdCompressOptions.setCompressionZstdStrategy(
OrcConf.COMPRESSION_ZSTD_STRATEGY.getInt(tableProperties, conf));

paddingTolerance =
OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(tableProperties, conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public PhysicalFsWriter(FSDataOutputStream outputStream,
if (zstdCompressOptions != null) {
options.setLevel(zstdCompressOptions.getCompressionZstdLevel());
options.setWindowLog(zstdCompressOptions.getCompressionZstdWindowLog());
options.setStrategy(zstdCompressOptions.getCompressionZstdStrategy());
}
}
compress.withCodec(codec, tempOptions);
Expand Down
24 changes: 18 additions & 6 deletions java/core/src/java/org/apache/orc/impl/ZstdCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ public class ZstdCodec implements CompressionCodec, DirectDecompressionCodec {
private ZstdOptions zstdOptions = null;
private ZstdCompressCtx zstdCompressCtx = null;

public ZstdCodec(int level, int windowLog) {
this.zstdOptions = new ZstdOptions(level, windowLog);
public ZstdCodec(int level, int windowLog, int strategy) {
this.zstdOptions = new ZstdOptions(level, windowLog, strategy);
}

public ZstdCodec() {
this(3, 0);
this(3, 0, 0);
}

public ZstdOptions getZstdOptions() {
Expand All @@ -57,15 +57,17 @@ protected static byte[] getBuffer(int size) {
static class ZstdOptions implements Options {
private int level;
private int windowLog;
private int strategy;

ZstdOptions(int level, int windowLog) {
ZstdOptions(int level, int windowLog, int strategy) {
this.level = level;
this.windowLog = windowLog;
this.strategy = strategy;
}

@Override
public ZstdOptions copy() {
return new ZstdOptions(level, windowLog);
return new ZstdOptions(level, windowLog, strategy);
}

@Override
Expand Down Expand Up @@ -123,6 +125,13 @@ public ZstdOptions setLevel(int newValue) {
return this;
}

public ZstdOptions setStrategy(int newValue) {
// https://facebook.github.io/zstd/zstd_manual.html#Chapter5
// Although the value is between 1 and 9 and 0 means `use default`, ZStd can change it.
strategy = newValue;
return this;
}

@Override
public ZstdOptions setData(DataKind newValue) {
return this; // We don't support setting DataKind in ZstdCodec.
Expand All @@ -136,19 +145,21 @@ public boolean equals(Object o) {
ZstdOptions that = (ZstdOptions) o;

if (level != that.level) return false;
if (strategy != that.strategy) return false;
return windowLog == that.windowLog;
}

@Override
public int hashCode() {
int result = level;
result = 31 * result + windowLog;
result = 31 * result + strategy;
return result;
}
}

private static final ZstdOptions DEFAULT_OPTIONS =
new ZstdOptions(3, 0);
new ZstdOptions(3, 0, 0);

@Override
public Options getDefaultOptions() {
Expand Down Expand Up @@ -183,6 +194,7 @@ public boolean compress(ByteBuffer in, ByteBuffer out,
zstdCompressCtx.setLevel(zso.level);
zstdCompressCtx.setLong(zso.windowLog);
zstdCompressCtx.setChecksum(false);
zstdCompressCtx.setStrategy(zso.strategy);

try {
byte[] compressed = getBuffer((int) Zstd.compressBound(inBytes));
Expand Down
Loading