Skip to content

Headers propagation during message conversion #3573

@CyberpunkPerson

Description

@CyberpunkPerson

Hi Team,

I faced with issue in message conversion. I have few datasources and appropriate adapters for each of them. In case if I want to add some additional info to be logged if exception occurs in message conversion, like source name or type of event there are no options to do that.
Right now in KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener#onMessage no any data throws in ConversionException nor headers nor original record. If exception will be thrown during message conversion all headers will be lost.

@Override
public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
	Message<?> message = null;
	try {
		message = enhanceHeaders(toMessagingMessage(record, acknowledgment, consumer), record);
		setAttributesIfNecessary(record, message);
	}
	catch (RuntimeException e) {
		RuntimeException exception = new ConversionException("Failed to convert to message for: " + record, e);
		sendErrorMessageIfNecessary(null, exception);
	}

	sendMessageIfAny(message, record);
}

Would be great if it could be fixed on your side.

Thanks in advance 😄

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions