Skip to content

Commit 29cecd4

Browse files
Davies Liudavies
Davies Liu
authored andcommitted
[SPARK-12388] change default compression to lz4
According the benchmark [1], LZ4-java could be 80% (or 30%) faster than Snappy. After changing the compressor to LZ4, I saw 20% improvement on end-to-end time for a TPCDS query (Q4). [1] https://github.com/ning/jvm-compressor-benchmark/wiki cc rxin Author: Davies Liu <[email protected]> Closes #10342 from davies/lz4.
1 parent d655d37 commit 29cecd4

File tree

6 files changed

+276
-14
lines changed

6 files changed

+276
-14
lines changed

.rat-excludes

+1
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,4 @@ gen-java.*
8484
org.apache.spark.sql.sources.DataSourceRegister
8585
org.apache.spark.scheduler.SparkHistoryListenerFactory
8686
.*parquet
87+
LZ4BlockInputStream.java

core/src/main/scala/org/apache/spark/io/CompressionCodec.scala

+6-6
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717

1818
package org.apache.spark.io
1919

20-
import java.io.{IOException, InputStream, OutputStream}
20+
import java.io._
2121

2222
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
23-
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
23+
import net.jpountz.lz4.LZ4BlockOutputStream
2424
import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
2525

2626
import org.apache.spark.SparkConf
@@ -49,7 +49,8 @@ private[spark] object CompressionCodec {
4949
private val configKey = "spark.io.compression.codec"
5050

5151
private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = {
52-
codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
52+
(codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
53+
|| codec.isInstanceOf[LZ4CompressionCodec])
5354
}
5455

5556
private val shortCompressionCodecNames = Map(
@@ -92,12 +93,11 @@ private[spark] object CompressionCodec {
9293
}
9394
}
9495

95-
val FALLBACK_COMPRESSION_CODEC = "lzf"
96-
val DEFAULT_COMPRESSION_CODEC = "snappy"
96+
val FALLBACK_COMPRESSION_CODEC = "snappy"
97+
val DEFAULT_COMPRESSION_CODEC = "lz4"
9798
val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
9899
}
99100

100-
101101
/**
102102
* :: DeveloperApi ::
103103
* LZ4 implementation of [[org.apache.spark.io.CompressionCodec]].
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
package org.apache.spark.io;
2+
3+
/*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import java.io.EOFException;
18+
import java.io.FilterInputStream;
19+
import java.io.IOException;
20+
import java.io.InputStream;
21+
import java.util.zip.Checksum;
22+
23+
import net.jpountz.lz4.LZ4BlockOutputStream;
24+
import net.jpountz.lz4.LZ4Exception;
25+
import net.jpountz.lz4.LZ4Factory;
26+
import net.jpountz.lz4.LZ4FastDecompressor;
27+
import net.jpountz.util.SafeUtils;
28+
import net.jpountz.xxhash.StreamingXXHash32;
29+
import net.jpountz.xxhash.XXHash32;
30+
import net.jpountz.xxhash.XXHashFactory;
31+
32+
/**
33+
* {@link InputStream} implementation to decode data written with
34+
* {@link LZ4BlockOutputStream}. This class is not thread-safe and does not
35+
* support {@link #mark(int)}/{@link #reset()}.
36+
* @see LZ4BlockOutputStream
37+
*
38+
* This is based on net.jpountz.lz4.LZ4BlockInputStream
39+
*
40+
* changes: https://github.com/davies/lz4-java/commit/cc1fa940ac57cc66a0b937300f805d37e2bf8411
41+
*
42+
* TODO: merge this into upstream
43+
*/
44+
public final class LZ4BlockInputStream extends FilterInputStream {
45+
46+
// Copied from net.jpountz.lz4.LZ4BlockOutputStream
47+
static final byte[] MAGIC = new byte[] { 'L', 'Z', '4', 'B', 'l', 'o', 'c', 'k' };
48+
static final int MAGIC_LENGTH = MAGIC.length;
49+
50+
static final int HEADER_LENGTH =
51+
MAGIC_LENGTH // magic bytes
52+
+ 1 // token
53+
+ 4 // compressed length
54+
+ 4 // decompressed length
55+
+ 4; // checksum
56+
57+
static final int COMPRESSION_LEVEL_BASE = 10;
58+
59+
static final int COMPRESSION_METHOD_RAW = 0x10;
60+
static final int COMPRESSION_METHOD_LZ4 = 0x20;
61+
62+
static final int DEFAULT_SEED = 0x9747b28c;
63+
64+
private final LZ4FastDecompressor decompressor;
65+
private final Checksum checksum;
66+
private byte[] buffer;
67+
private byte[] compressedBuffer;
68+
private int originalLen;
69+
private int o;
70+
private boolean finished;
71+
72+
/**
73+
* Create a new {@link InputStream}.
74+
*
75+
* @param in the {@link InputStream} to poll
76+
* @param decompressor the {@link LZ4FastDecompressor decompressor} instance to
77+
* use
78+
* @param checksum the {@link Checksum} instance to use, must be
79+
* equivalent to the instance which has been used to
80+
* write the stream
81+
*/
82+
public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor, Checksum checksum) {
83+
super(in);
84+
this.decompressor = decompressor;
85+
this.checksum = checksum;
86+
this.buffer = new byte[0];
87+
this.compressedBuffer = new byte[HEADER_LENGTH];
88+
o = originalLen = 0;
89+
finished = false;
90+
}
91+
92+
/**
93+
* Create a new instance using {@link XXHash32} for checksuming.
94+
* @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor, Checksum)
95+
* @see StreamingXXHash32#asChecksum()
96+
*/
97+
public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor) {
98+
this(in, decompressor, XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum());
99+
}
100+
101+
/**
102+
* Create a new instance which uses the fastest {@link LZ4FastDecompressor} available.
103+
* @see LZ4Factory#fastestInstance()
104+
* @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor)
105+
*/
106+
public LZ4BlockInputStream(InputStream in) {
107+
this(in, LZ4Factory.fastestInstance().fastDecompressor());
108+
}
109+
110+
@Override
111+
public int available() throws IOException {
112+
refill();
113+
return originalLen - o;
114+
}
115+
116+
@Override
117+
public int read() throws IOException {
118+
refill();
119+
if (finished) {
120+
return -1;
121+
}
122+
return buffer[o++] & 0xFF;
123+
}
124+
125+
@Override
126+
public int read(byte[] b, int off, int len) throws IOException {
127+
SafeUtils.checkRange(b, off, len);
128+
refill();
129+
if (finished) {
130+
return -1;
131+
}
132+
len = Math.min(len, originalLen - o);
133+
System.arraycopy(buffer, o, b, off, len);
134+
o += len;
135+
return len;
136+
}
137+
138+
@Override
139+
public int read(byte[] b) throws IOException {
140+
return read(b, 0, b.length);
141+
}
142+
143+
@Override
144+
public long skip(long n) throws IOException {
145+
refill();
146+
if (finished) {
147+
return -1;
148+
}
149+
final int skipped = (int) Math.min(n, originalLen - o);
150+
o += skipped;
151+
return skipped;
152+
}
153+
154+
private void refill() throws IOException {
155+
if (finished || o < originalLen) {
156+
return;
157+
}
158+
try {
159+
readFully(compressedBuffer, HEADER_LENGTH);
160+
} catch (EOFException e) {
161+
finished = true;
162+
return;
163+
}
164+
for (int i = 0; i < MAGIC_LENGTH; ++i) {
165+
if (compressedBuffer[i] != MAGIC[i]) {
166+
throw new IOException("Stream is corrupted");
167+
}
168+
}
169+
final int token = compressedBuffer[MAGIC_LENGTH] & 0xFF;
170+
final int compressionMethod = token & 0xF0;
171+
final int compressionLevel = COMPRESSION_LEVEL_BASE + (token & 0x0F);
172+
if (compressionMethod != COMPRESSION_METHOD_RAW && compressionMethod != COMPRESSION_METHOD_LZ4)
173+
{
174+
throw new IOException("Stream is corrupted");
175+
}
176+
final int compressedLen = SafeUtils.readIntLE(compressedBuffer, MAGIC_LENGTH + 1);
177+
originalLen = SafeUtils.readIntLE(compressedBuffer, MAGIC_LENGTH + 5);
178+
final int check = SafeUtils.readIntLE(compressedBuffer, MAGIC_LENGTH + 9);
179+
assert HEADER_LENGTH == MAGIC_LENGTH + 13;
180+
if (originalLen > 1 << compressionLevel
181+
|| originalLen < 0
182+
|| compressedLen < 0
183+
|| (originalLen == 0 && compressedLen != 0)
184+
|| (originalLen != 0 && compressedLen == 0)
185+
|| (compressionMethod == COMPRESSION_METHOD_RAW && originalLen != compressedLen)) {
186+
throw new IOException("Stream is corrupted");
187+
}
188+
if (originalLen == 0 && compressedLen == 0) {
189+
if (check != 0) {
190+
throw new IOException("Stream is corrupted");
191+
}
192+
refill();
193+
return;
194+
}
195+
if (buffer.length < originalLen) {
196+
buffer = new byte[Math.max(originalLen, buffer.length * 3 / 2)];
197+
}
198+
switch (compressionMethod) {
199+
case COMPRESSION_METHOD_RAW:
200+
readFully(buffer, originalLen);
201+
break;
202+
case COMPRESSION_METHOD_LZ4:
203+
if (compressedBuffer.length < originalLen) {
204+
compressedBuffer = new byte[Math.max(compressedLen, compressedBuffer.length * 3 / 2)];
205+
}
206+
readFully(compressedBuffer, compressedLen);
207+
try {
208+
final int compressedLen2 =
209+
decompressor.decompress(compressedBuffer, 0, buffer, 0, originalLen);
210+
if (compressedLen != compressedLen2) {
211+
throw new IOException("Stream is corrupted");
212+
}
213+
} catch (LZ4Exception e) {
214+
throw new IOException("Stream is corrupted", e);
215+
}
216+
break;
217+
default:
218+
throw new AssertionError();
219+
}
220+
checksum.reset();
221+
checksum.update(buffer, 0, originalLen);
222+
if ((int) checksum.getValue() != check) {
223+
throw new IOException("Stream is corrupted");
224+
}
225+
o = 0;
226+
}
227+
228+
private void readFully(byte[] b, int len) throws IOException {
229+
int read = 0;
230+
while (read < len) {
231+
final int r = in.read(b, read, len - read);
232+
if (r < 0) {
233+
throw new EOFException("Stream ended prematurely");
234+
}
235+
read += r;
236+
}
237+
assert len == read;
238+
}
239+
240+
@Override
241+
public boolean markSupported() {
242+
return false;
243+
}
244+
245+
@SuppressWarnings("sync-override")
246+
@Override
247+
public void mark(int readlimit) {
248+
// unsupported
249+
}
250+
251+
@SuppressWarnings("sync-override")
252+
@Override
253+
public void reset() throws IOException {
254+
throw new IOException("mark/reset not supported");
255+
}
256+
257+
@Override
258+
public String toString() {
259+
return getClass().getSimpleName() + "(in=" + in
260+
+ ", decompressor=" + decompressor + ", checksum=" + checksum + ")";
261+
}
262+
263+
}

core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala

+3-5
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class CompressionCodecSuite extends SparkFunSuite {
4646

4747
test("default compression codec") {
4848
val codec = CompressionCodec.createCodec(conf)
49-
assert(codec.getClass === classOf[SnappyCompressionCodec])
49+
assert(codec.getClass === classOf[LZ4CompressionCodec])
5050
testCodec(codec)
5151
}
5252

@@ -62,12 +62,10 @@ class CompressionCodecSuite extends SparkFunSuite {
6262
testCodec(codec)
6363
}
6464

65-
test("lz4 does not support concatenation of serialized streams") {
65+
test("lz4 supports concatenation of serialized streams") {
6666
val codec = CompressionCodec.createCodec(conf, classOf[LZ4CompressionCodec].getName)
6767
assert(codec.getClass === classOf[LZ4CompressionCodec])
68-
intercept[Exception] {
69-
testConcatenationOfSerializedStreams(codec)
70-
}
68+
testConcatenationOfSerializedStreams(codec)
7169
}
7270

7371
test("lzf compression codec") {

docs/configuration.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -595,7 +595,7 @@ Apart from these, the following properties are also available, and may be useful
595595
</tr>
596596
<tr>
597597
<td><code>spark.io.compression.codec</code></td>
598-
<td>snappy</td>
598+
<td>lz4</td>
599599
<td>
600600
The codec used to compress internal data such as RDD partitions, broadcast variables and
601601
shuffle outputs. By default, Spark provides three codecs: <code>lz4</code>, <code>lzf</code>,

sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
319319
}
320320
}
321321

322-
withSQLContext(test, 1536, minNumPostShufflePartitions)
322+
withSQLContext(test, 2000, minNumPostShufflePartitions)
323323
}
324324

325325
test(s"determining the number of reducers: join operator$testNameNote") {
@@ -422,7 +422,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
422422
}
423423
}
424424

425-
withSQLContext(test, 6144, minNumPostShufflePartitions)
425+
withSQLContext(test, 6644, minNumPostShufflePartitions)
426426
}
427427

428428
test(s"determining the number of reducers: complex query 2$testNameNote") {

0 commit comments

Comments
 (0)