Skip to content

Commit 287a9da

Browse files
author
Sital Kedia
committed
[SPARK-19112][CORE] Support for ZStandard codec
1 parent 77cc0d6 commit 287a9da

File tree

4 files changed

+58
-2
lines changed

4 files changed

+58
-2
lines changed

core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,10 @@
193193
<groupId>net.jpountz.lz4</groupId>
194194
<artifactId>lz4</artifactId>
195195
</dependency>
196+
<dependency>
197+
<groupId>com.github.luben</groupId>
198+
<artifactId>zstd-jni</artifactId>
199+
</dependency>
196200
<dependency>
197201
<groupId>org.roaringbitmap</groupId>
198202
<artifactId>RoaringBitmap</artifactId>

core/src/main/scala/org/apache/spark/io/CompressionCodec.scala

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.io
2020
import java.io._
2121
import java.util.Locale
2222

23+
import com.github.luben.zstd.{ZstdInputStream, ZstdOutputStream}
2324
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
2425
import net.jpountz.lz4.LZ4BlockOutputStream
2526
import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
@@ -50,13 +51,14 @@ private[spark] object CompressionCodec {
5051

5152
private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = {
5253
(codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
53-
|| codec.isInstanceOf[LZ4CompressionCodec])
54+
|| codec.isInstanceOf[LZ4CompressionCodec] || codec.isInstanceOf[ZStandardCompressionCodec])
5455
}
5556

5657
private val shortCompressionCodecNames = Map(
5758
"lz4" -> classOf[LZ4CompressionCodec].getName,
5859
"lzf" -> classOf[LZFCompressionCodec].getName,
59-
"snappy" -> classOf[SnappyCompressionCodec].getName)
60+
"snappy" -> classOf[SnappyCompressionCodec].getName,
61+
"zstd" -> classOf[SnappyCompressionCodec].getName)
6062

6163
def getCodecName(conf: SparkConf): String = {
6264
conf.get(configKey, DEFAULT_COMPRESSION_CODEC)
@@ -216,3 +218,30 @@ private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends Ou
216218
}
217219
}
218220
}
221+
222+
/**
223+
* :: DeveloperApi ::
224+
* ZStandard implementation of [[org.apache.spark.io.CompressionCodec]].
225+
*
226+
* @note The wire protocol for this codec is not guaranteed to be compatible across versions
227+
* of Spark. This is intended for use as an internal compression utility within a single Spark
228+
* application.
229+
*/
230+
@DeveloperApi
231+
class ZStandardCompressionCodec(conf: SparkConf) extends CompressionCodec {
232+
233+
override def compressedOutputStream(s: OutputStream): OutputStream = {
234+
val level = conf.getSizeAsBytes("spark.io.compression.zstandard.level", "1").toInt
235+
val compressionBuffer = conf.getSizeAsBytes("spark.io.compression.lz4.blockSize", "32k").toInt
236+
// Wrap the zstd output stream in a buffered output stream, so that we can
237+
// avoid overhead excessive of JNI call while trying to compress small amount of data.
238+
new BufferedOutputStream(new ZstdOutputStream(s, level), compressionBuffer)
239+
}
240+
241+
override def compressedInputStream(s: InputStream): InputStream = {
242+
val compressionBuffer = conf.getSizeAsBytes("spark.io.compression.lz4.blockSize", "32k").toInt
243+
// Wrap the zstd input stream in a buffered input stream so that we can
244+
// avoid overhead excessive of JNI call while trying to uncompress small amount of data.
245+
new BufferedInputStream(new ZstdInputStream(s), compressionBuffer)
246+
}
247+
}

core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,24 @@ class CompressionCodecSuite extends SparkFunSuite {
104104
testConcatenationOfSerializedStreams(codec)
105105
}
106106

107+
test("zstd compression codec") {
108+
val codec = CompressionCodec.createCodec(conf, classOf[ZStandardCompressionCodec].getName)
109+
assert(codec.getClass === classOf[ZStandardCompressionCodec])
110+
testCodec(codec)
111+
}
112+
113+
test("zstd compression codec short form") {
114+
val codec = CompressionCodec.createCodec(conf, "zstd")
115+
assert(codec.getClass === classOf[ZStandardCompressionCodec])
116+
testCodec(codec)
117+
}
118+
119+
test("zstd supports concatenation of serialized zstd") {
120+
val codec = CompressionCodec.createCodec(conf, classOf[ZStandardCompressionCodec].getName)
121+
assert(codec.getClass === classOf[ZStandardCompressionCodec])
122+
testConcatenationOfSerializedStreams(codec)
123+
}
124+
107125
test("bad compression codec") {
108126
intercept[IllegalArgumentException] {
109127
CompressionCodec.createCodec(conf, "foobar")

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,11 @@
534534
<artifactId>lz4</artifactId>
535535
<version>1.3.0</version>
536536
</dependency>
537+
<dependency>
538+
<groupId>com.github.luben</groupId>
539+
<artifactId>zstd-jni</artifactId>
540+
<version>1.3.0-1</version>
541+
</dependency>
537542
<dependency>
538543
<groupId>com.clearspring.analytics</groupId>
539544
<artifactId>stream</artifactId>

0 commit comments

Comments
 (0)