Skip to content

Using Json for both key and value when sending to a topic results in erreneous record headers #610

@mark--

Description

@mark--

I am using Spring Kafka with KafkaListeners, which does not work when I use Json for both the value and the key. See the minimal example project here: https://github.com/mark--/spring-kafka-error

When the test is run the listener cannot be called due to the following exception:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message Endpoint handler details: Method [public void foo.bar.Listener.verify(foo.bar.Record,foo.bar.Key) throws java.text.ParseException,java.net.URISyntaxException] Bean [foo.bar.Listener$$EnhancerBySpringCGLIB$$90782327@f679798]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [foo.bar.Key] to [foo.bar.Record] for GenericMessage [payload=Key [value=null], headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4d54e4d4, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=Key [value=123], kafka_receivedPartitionId=1, kafka_receivedTopic=topic.foo, kafka_receivedTimestamp=1521588483791, __TypeId__=[B@3238664b}], failedMessage=GenericMessage [payload=Key [value=null], headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4d54e4d4, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=Key [value=123], kafka_receivedPartitionId=1, kafka_receivedTopic=topic.foo, kafka_receivedTimestamp=1521588483791, __TypeId__=[B@3238664b}] ...

After some debugging I think I found the following error in Spring Kafka:

In
org.apache.kafka.clients.producer.KafkaProducer.doSend(ProducerRecord<K, V> record, Callback callback) (line 775)
the serialized key is computed by
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
as well as in line 783 (*) the serialised value by
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());

In both cases the same serialiser class
org.springframework.kafka.support.serializer.JsonSerializer
is used.

During both serialisation calls,
org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.fromJavaType(JavaType javaType, Headers headers) (line 176)
sets the __TypeId__ header field which occurs twice afterwards, resulting in the following ProducerRecord after (*):

ProducerRecord(topic=topic.foo, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [102, 111, 111, 46, 98, 97, 114, 46, 75, 101, 121]), RecordHeader(key = __TypeId__, value = [102, 111, 111, 46, 98, 97, 114, 46, 82, 101, 99, 111, 114, 100])], isReadOnly = false), key=Key [value=123], value=Record [key=foofoo], timestamp=null)

This record is sent via Kafka, received and parsed in
org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(TopicPartition partition, RecordBatch batch, Record record).
When the value is deserialised in line 923 via
V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
the value type is taken from the FIRST __TypeId__ header which stores the type of the key. This happens ultimately in
org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.retrieveHeaderAsString(Headers headers, String headerName) (line 161)
where the first fitting header is returned.

Therefore, the value is deserialised as an instance of Key which results in the exception from above.

When I use a String as key, everything works as expected and also, when I uncomment the lines (*) in KafkaConfig in the example project, which is probably the workaround for the time being.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions