Skip to content

Commit 741cb76

Browse files
dongjinleekrhachikuji
authored andcommitted
KAFKA-4514; Add Codec for ZStandard Compression (#2267)
This patch adds support for zstandard compression to Kafka as documented in KIP-110: https://cwiki.apache.org/confluence/display/KAFKA/KIP-110%3A+Add+Codec+for+ZStandard+Compression. Reviewers: Ivan Babrou <[email protected]>, Ismael Juma <[email protected]>, Jason Gustafson <[email protected]>
1 parent 578205c commit 741cb76

38 files changed

+610
-86
lines changed

LICENSE

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@
201201
See the License for the specific language governing permissions and
202202
limitations under the License.
203203

204+
------------------------------------------------------------------------------------
204205
This distribution has a binary dependency on jersey, which is available under the CDDL
205206
License as described below.
206207

@@ -328,3 +329,68 @@ As between Initial Developer and the Contributors, each party is responsible for
328329
NOTICE PURSUANT TO SECTION 9 OF THE COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL)
329330

330331
The code released under the CDDL shall be governed by the laws of the State of California (excluding conflict-of-law provisions). Any litigation relating to this License shall be subject to the jurisdiction of the Federal Courts of the Northern District of California and the state courts of the State of California, with venue lying in Santa Clara County, California.
332+
333+
------------------------------------------------------------------------------------
334+
This distribution has a binary dependency on zstd, which is available under the BSD 3-Clause License as described below.
335+
336+
BSD License
337+
338+
For Zstandard software
339+
340+
Copyright (c) 2016-present, Facebook, Inc. All rights reserved.
341+
342+
Redistribution and use in source and binary forms, with or without modification,
343+
are permitted provided that the following conditions are met:
344+
345+
* Redistributions of source code must retain the above copyright notice, this
346+
list of conditions and the following disclaimer.
347+
348+
* Redistributions in binary form must reproduce the above copyright notice,
349+
this list of conditions and the following disclaimer in the documentation
350+
and/or other materials provided with the distribution.
351+
352+
* Neither the name Facebook nor the names of its contributors may be used to
353+
endorse or promote products derived from this software without specific
354+
prior written permission.
355+
356+
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
357+
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
358+
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
359+
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
360+
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
361+
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
362+
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
363+
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
364+
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
365+
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
366+
367+
------------------------------------------------------------------------------------
368+
This distribution has a binary dependency on zstd-jni, which is available under the BSD 2-Clause License
369+
as described below.
370+
371+
Zstd-jni: JNI bindings to Zstd Library
372+
373+
Copyright (c) 2015-2016, Luben Karavelov/ All rights reserved.
374+
375+
BSD License
376+
377+
Redistribution and use in source and binary forms, with or without modification,
378+
are permitted provided that the following conditions are met:
379+
380+
* Redistributions of source code must retain the above copyright notice, this
381+
list of conditions and the following disclaimer.
382+
383+
* Redistributions in binary form must reproduce the above copyright notice, this
384+
list of conditions and the following disclaimer in the documentation and/or
385+
other materials provided with the distribution.
386+
387+
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
388+
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
389+
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
390+
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
391+
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
392+
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
393+
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
394+
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
395+
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
396+
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -820,6 +820,7 @@ project(':clients') {
820820
conf2ScopeMappings.addMapping(1000, configurations.jacksonDatabindConfig, "provided")
821821

822822
dependencies {
823+
compile libs.zstd
823824
compile libs.lz4
824825
compile libs.snappy
825826
compile libs.slf4jApi

clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public class ProducerConfig extends AbstractConfig {
159159
/** <code>compression.type</code> */
160160
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
161161
private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid "
162-
+ " values are <code>none</code>, <code>gzip</code>, <code>snappy</code>, or <code>lz4</code>. "
162+
+ " values are <code>none</code>, <code>gzip</code>, <code>snappy</code>, <code>lz4</code>, or <code>zstd</code>. "
163163
+ "Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).";
164164

165165
/** <code>metrics.sample.window.ms</code> */

clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public class TopicConfig {
140140

141141
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
142142
public static final String COMPRESSION_TYPE_DOC = "Specify the final compression type for a given topic. " +
143-
"This configuration accepts the standard compression codecs ('gzip', 'snappy', lz4). It additionally " +
143+
"This configuration accepts the standard compression codecs ('gzip', 'snappy', 'lz4', 'zstd'). It additionally " +
144144
"accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the " +
145145
"original compression codec set by the producer.";
146146

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.common.errors;
18+
19+
/**
20+
* The requesting client does not support the compression type of given partition.
21+
*/
22+
public class UnsupportedCompressionTypeException extends ApiException {
23+
24+
private static final long serialVersionUID = 1L;
25+
26+
public UnsupportedCompressionTypeException(String message) {
27+
super(message);
28+
}
29+
30+
public UnsupportedCompressionTypeException(String message, Throwable cause) {
31+
super(message, cause);
32+
}
33+
34+
}

clients/src/main/java/org/apache/kafka/common/protocol/Errors.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import org.apache.kafka.common.errors.UnknownServerException;
9292
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
9393
import org.apache.kafka.common.errors.UnsupportedByAuthenticationException;
94+
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
9495
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
9596
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
9697
import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -284,7 +285,9 @@ public enum Errors {
284285
FENCED_LEADER_EPOCH(74, "The leader epoch in the request is older than the epoch on the broker",
285286
FencedLeaderEpochException::new),
286287
UNKNOWN_LEADER_EPOCH(75, "The leader epoch in the request is newer than the epoch on the broker",
287-
UnknownLeaderEpochException::new);
288+
UnknownLeaderEpochException::new),
289+
UNSUPPORTED_COMPRESSION_TYPE(76, "The requesting client does not support the compression type of given partition.",
290+
UnsupportedCompressionTypeException::new);
288291

289292
private static final Logger log = LoggerFactory.getLogger(Errors.class);
290293

clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,8 @@ private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry,
322322
throw new InvalidRecordException("Invalid wrapper magic found in legacy deep record iterator " + wrapperMagic);
323323

324324
CompressionType compressionType = wrapperRecord.compressionType();
325+
if (compressionType == CompressionType.ZSTD)
326+
throw new InvalidRecordException("Invalid wrapper compressionType found in legacy deep record iterator " + wrapperMagic);
325327
ByteBuffer wrapperValue = wrapperRecord.value();
326328
if (wrapperValue == null)
327329
throw new InvalidRecordException("Found invalid compressed record set with null value (magic = " +

clients/src/main/java/org/apache/kafka/common/record/CompressionType.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,26 @@ public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, Buf
113113
throw new KafkaException(e);
114114
}
115115
}
116+
},
117+
118+
ZSTD(4, "zstd", 1.0f) {
119+
@Override
120+
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
121+
try {
122+
return (OutputStream) ZstdConstructors.OUTPUT.invoke(buffer);
123+
} catch (Throwable e) {
124+
throw new KafkaException(e);
125+
}
126+
}
127+
128+
@Override
129+
public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
130+
try {
131+
return (InputStream) ZstdConstructors.INPUT.invoke(new ByteBufferInputStream(buffer));
132+
} catch (Throwable e) {
133+
throw new KafkaException(e);
134+
}
135+
}
116136
};
117137

118138
public final int id;
@@ -156,6 +176,8 @@ public static CompressionType forId(int id) {
156176
return SNAPPY;
157177
case 3:
158178
return LZ4;
179+
case 4:
180+
return ZSTD;
159181
default:
160182
throw new IllegalArgumentException("Unknown compression type id: " + id);
161183
}
@@ -170,14 +192,16 @@ else if (SNAPPY.name.equals(name))
170192
return SNAPPY;
171193
else if (LZ4.name.equals(name))
172194
return LZ4;
195+
else if (ZSTD.name.equals(name))
196+
return ZSTD;
173197
else
174198
throw new IllegalArgumentException("Unknown compression name: " + name);
175199
}
176200

177201
// We should only have a runtime dependency on compression algorithms in case the native libraries don't support
178202
// some platforms.
179203
//
180-
// For Snappy, we dynamically load the classes and rely on the initialization-on-demand holder idiom to ensure
204+
// For Snappy and Zstd, we dynamically load the classes and rely on the initialization-on-demand holder idiom to ensure
181205
// they're only loaded if used.
182206
//
183207
// For LZ4 we are using org.apache.kafka classes, which should always be in the classpath, and would not trigger
@@ -190,6 +214,13 @@ private static class SnappyConstructors {
190214
MethodType.methodType(void.class, OutputStream.class));
191215
}
192216

217+
private static class ZstdConstructors {
218+
static final MethodHandle INPUT = findConstructor("com.github.luben.zstd.ZstdInputStream",
219+
MethodType.methodType(void.class, InputStream.class));
220+
static final MethodHandle OUTPUT = findConstructor("com.github.luben.zstd.ZstdOutputStream",
221+
MethodType.methodType(void.class, OutputStream.class));
222+
}
223+
193224
private static MethodHandle findConstructor(String className, MethodType methodType) {
194225
try {
195226
return MethodHandles.publicLookup().findConstructor(Class.forName(className), methodType);

clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ public class LazyDownConversionRecords implements BaseRecords {
4444
* @param firstOffset The starting offset for down-converted records. This only impacts some cases. See
4545
* {@link RecordsUtil#downConvert(Iterable, byte, long, Time)} for an explanation.
4646
* @param time The time instance to use
47+
*
48+
* @throws org.apache.kafka.common.errors.UnsupportedCompressionTypeException If the first batch to down-convert
49+
* has a compression type which we do not support down-conversion for.
4750
*/
4851
public LazyDownConversionRecords(TopicPartition topicPartition, Records records, byte toMagic, long firstOffset, Time time) {
4952
this.topicPartition = Objects.requireNonNull(topicPartition);
@@ -150,7 +153,7 @@ protected ConvertedRecords makeNext() {
150153
}
151154

152155
while (batchIterator.hasNext()) {
153-
List<RecordBatch> batches = new ArrayList<>();
156+
final List<RecordBatch> batches = new ArrayList<>();
154157
boolean isFirstBatch = true;
155158
long sizeSoFar = 0;
156159

@@ -162,6 +165,7 @@ protected ConvertedRecords makeNext() {
162165
sizeSoFar += currentBatch.sizeInBytes();
163166
isFirstBatch = false;
164167
}
168+
165169
ConvertedRecords convertedRecords = RecordsUtil.downConvert(batches, toMagic, firstOffset, time);
166170
// During conversion, it is possible that we drop certain batches because they do not have an equivalent
167171
// representation in the message format we want to convert to. For example, V0 and V1 message formats

clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java

Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.kafka.common.record;
1818

1919
import org.apache.kafka.common.TopicPartition;
20+
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
2021
import org.slf4j.Logger;
2122
import org.slf4j.LoggerFactory;
2223

@@ -45,35 +46,50 @@ public LazyDownConversionRecordsSend(String destination, LazyDownConversionRecor
4546
convertedRecordsIterator = records().iterator(MAX_READ_SIZE);
4647
}
4748

49+
private MemoryRecords buildOverflowBatch(int remaining) {
50+
// We do not have any records left to down-convert. Construct an overflow message for the length remaining.
51+
// This message will be ignored by the consumer because its length will be past the length of maximum
52+
// possible response size.
53+
// DefaultRecordBatch =>
54+
// BaseOffset => Int64
55+
// Length => Int32
56+
// ...
57+
ByteBuffer overflowMessageBatch = ByteBuffer.allocate(
58+
Math.max(MIN_OVERFLOW_MESSAGE_LENGTH, Math.min(remaining + 1, MAX_READ_SIZE)));
59+
overflowMessageBatch.putLong(-1L);
60+
61+
// Fill in the length of the overflow batch. A valid batch must be at least as long as the minimum batch
62+
// overhead.
63+
overflowMessageBatch.putInt(Math.max(remaining + 1, DefaultRecordBatch.RECORD_BATCH_OVERHEAD));
64+
log.debug("Constructed overflow message batch for partition {} with length={}", topicPartition(), remaining);
65+
return MemoryRecords.readableRecords(overflowMessageBatch);
66+
}
67+
4868
@Override
4969
public long writeTo(GatheringByteChannel channel, long previouslyWritten, int remaining) throws IOException {
5070
if (convertedRecordsWriter == null || convertedRecordsWriter.completed()) {
5171
MemoryRecords convertedRecords;
52-
// Check if we have more chunks left to down-convert
53-
if (convertedRecordsIterator.hasNext()) {
54-
// Get next chunk of down-converted messages
55-
ConvertedRecords<MemoryRecords> recordsAndStats = convertedRecordsIterator.next();
56-
convertedRecords = recordsAndStats.records();
57-
recordConversionStats.add(recordsAndStats.recordConversionStats());
58-
log.debug("Down-converted records for partition {} with length={}", topicPartition(), convertedRecords.sizeInBytes());
59-
} else {
60-
// We do not have any records left to down-convert. Construct an overflow message for the length remaining.
61-
// This message will be ignored by the consumer because its length will be past the length of maximum
62-
// possible response size.
63-
// DefaultRecordBatch =>
64-
// BaseOffset => Int64
65-
// Length => Int32
66-
// ...
67-
ByteBuffer overflowMessageBatch = ByteBuffer.allocate(
68-
Math.max(MIN_OVERFLOW_MESSAGE_LENGTH, Math.min(remaining + 1, MAX_READ_SIZE)));
69-
overflowMessageBatch.putLong(-1L);
7072

71-
// Fill in the length of the overflow batch. A valid batch must be at least as long as the minimum batch
72-
// overhead.
73-
overflowMessageBatch.putInt(Math.max(remaining + 1, DefaultRecordBatch.RECORD_BATCH_OVERHEAD));
74-
convertedRecords = MemoryRecords.readableRecords(overflowMessageBatch);
75-
log.debug("Constructed overflow message batch for partition {} with length={}", topicPartition(), remaining);
73+
try {
74+
// Check if we have more chunks left to down-convert
75+
if (convertedRecordsIterator.hasNext()) {
76+
// Get next chunk of down-converted messages
77+
ConvertedRecords<MemoryRecords> recordsAndStats = convertedRecordsIterator.next();
78+
convertedRecords = recordsAndStats.records();
79+
recordConversionStats.add(recordsAndStats.recordConversionStats());
80+
log.debug("Down-converted records for partition {} with length={}", topicPartition(), convertedRecords.sizeInBytes());
81+
} else {
82+
convertedRecords = buildOverflowBatch(remaining);
83+
}
84+
} catch (UnsupportedCompressionTypeException e) {
85+
// We have encountered a compression type which does not support down-conversion (e.g. zstd).
86+
// Since we have already sent at least one batch and we have committed to the fetch size, we
87+
// send an overflow batch. The consumer will read the first few records and then fetch from the
88+
// offset of the batch which has the unsupported compression type. At that time, we will
89+
// send back the UNSUPPORTED_COMPRESSION_TYPE erro which will allow the consumer to fail gracefully.
90+
convertedRecords = buildOverflowBatch(remaining);
7691
}
92+
7793
convertedRecordsWriter = new DefaultRecordsSend(destination(), convertedRecords, Math.min(convertedRecords.sizeInBytes(), remaining));
7894
}
7995
return convertedRecordsWriter.writeTo(channel);

0 commit comments

Comments
 (0)