Skip to content

Conversation

chickenchickenlove
Copy link
Contributor

// HandlerAdapter.java
Object[] args = new Object[providedArgs.length + 1];
args[0] = message.getPayload();
System.arraycopy(providedArgs, 0, args, 1, providedArgs.length);
return this.delegatingHandler.invoke(message, args);

If defaultHandler is set, the above code is executed.
During this call stack, the args array contains a payload of KafkaMessage, which is then passed to the invoked method.

InvocableHandlerMethod.java (L212-L224)

protected @Nullable Object[] getMethodArgumentValues(Message<?> message, @Nullable Object... providedArgs) throws Exception {
		MethodParameter[] parameters = getMethodParameters();
                 ...
 
		@Nullable Object[] args = new Object[parameters.length];
		for (int i = 0; i < parameters.length; i++) {
                 ...
			args[i] = findProvidedArgument(parameter, providedArgs); // Resolved by ProvidedArgs
			if (args[i] != null) {
				continue;
			}
                 ...
			try {
				args[i] = this.resolvers.resolveArgument(parameter, message); // Resolved By @Header
			}

each paramater's value is resolved here.

protected static @Nullable Object findProvidedArgument(MethodParameter parameter, @Nullable Object... providedArgs) {
		if (!ObjectUtils.isEmpty(providedArgs)) {
			for (Object providedArg : providedArgs) {
				if (parameter.getParameterType().isInstance(providedArg)) {
					return providedArg;
				}
			}
		}
		return null;
	}

However, the method findProvidedArgument(...) resolves parameter by only checking parameterType.
So, if either the payload type or the parameter type annotated with @Header is String,
the payload and the annotated parameter will have the same value.

args[0] = message.getPayload();
System.arraycopy(providedArgs, 0, args, 1, providedArgs.length);
return this.delegatingHandler.invoke(message, args);
return this.delegatingHandler.invoke(message, providedArgs);
Copy link
Contributor Author

@chickenchickenlove chickenchickenlove May 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline comment;

I think previous code cause the bug. (#577)
However, I don't fully understand why the payload needs to be resolved and included in the args when a defaultHandler is set.

I did dig into the lower call stack of this.delegatingHandler.invoke(...), but the defaultHandler was not used in relation to providedArgs.
So, I think this code change is better in terms of bug fix.

However, there may be reasons I haven't realized, so I would appreciate any advice from the maintainers on this point.
For now, only one of the existing test cases has been modified.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See that comment:

// Needed to avoid returning raw Message which matches Object

If @KafkaHandler(isDefault = true) method has argument like Object payload, it would match the whole message instead of just its payload.

Usually, a @KafkaHandler(isDefault = true) method is used as a last resort when any other @KafkaHandler cannot match the provided payload.

The whole point of the @KafkaHandler is to handle different payload types with different methods.
So, I'm not sure if your fix is legit: https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/class-level-kafkalistener.html#page-title

Copy link
Contributor Author

@chickenchickenlove chickenchickenlove May 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artembilan
I have a question!

@KafkaListener
class MyListener { 

   @KafkaHandler(isDefault = true)
   public void (@Header("receivedTopic") String topic, Object payload) { 
        boolean isSame = topic == payload;
   }
}

There are two parameters.
One is annotated with @Header, and the other one is the expected payload.

I means that the topic and payload parameters are same in the current implementation.
So, the variable isSame is True. 😓
May I ask if this behavior is intentional by design?
Thanks for your time 🙇‍♂️

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to look into those details deeper, but that looks suspicious that payload argument is resolved to header 🤷

Copy link
Contributor Author

@chickenchickenlove chickenchickenlove May 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to look into those details deeper, but that looks suspicious that payload argument is resolved to header 🤷

The code I mentioned PR description causes the problem.
It is not part of spring-kafka. 😢
InvocableHandlerMethod

I think the easiest way to address this issue is that InvocationHandler respect ArgumentResolver. (
However, since it would affect a larger number of users, I believe it might not be accepted.
That said, I personally think that explicitly used annotations should take precedence over resolving parameters by type.

If this change affects the design intention of the DefaultKafkaHandler, it might be a good idea to document that annotations cannot be explicitly used with the default handler.
What do you think?

args[0] = message.getPayload();
System.arraycopy(providedArgs, 0, args, 1, providedArgs.length);
return this.delegatingHandler.invoke(message, args);
return this.delegatingHandler.invoke(message, providedArgs);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See that comment:

// Needed to avoid returning raw Message which matches Object

If @KafkaHandler(isDefault = true) method has argument like Object payload, it would match the whole message instead of just its payload.

Usually, a @KafkaHandler(isDefault = true) method is used as a last resort when any other @KafkaHandler cannot match the provided payload.

The whole point of the @KafkaHandler is to handle different payload types with different methods.
So, I'm not sure if your fix is legit: https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/class-level-kafkalistener.html#page-title

@artembilan
Copy link
Member

Just wrote this test:

@SpringBootTest
@EmbeddedKafka
class SpringKafkaIssue3880ApplicationTests {

	static final String TEST_TOPIC = "TEST_TOPIC";

	@Autowired
	KafkaTemplate<String, String> kafkaTemplate;

	@Autowired
	KafkaListenerConfiguration kafkaListenerConfiguration;

	@Test
	void verifyKafkaHandlerDefault() throws InterruptedException {
		this.kafkaTemplate.send(TEST_TOPIC, "test data");

		assertThat(this.kafkaListenerConfiguration.dataReceivedLatch.await(10, TimeUnit.SECONDS)).isTrue();
		assertThat(this.kafkaListenerConfiguration.payload).isEqualTo("test data");
		assertThat(this.kafkaListenerConfiguration.topic).isEqualTo(TEST_TOPIC);
	}

	@TestConfiguration
	@KafkaListener(topics = SpringKafkaIssue3880ApplicationTests.TEST_TOPIC, groupId = "test_group")
	static class KafkaListenerConfiguration {

		CountDownLatch dataReceivedLatch = new CountDownLatch(1);

		volatile String topic;

		volatile Object payload;

		@KafkaHandler(isDefault = true)
		public void defaultHandler(@Header("receivedTopic") String topic, Object payload) {
			this.topic = topic;
			this.payload = payload;
			this.dataReceivedLatch.countDown();
		}

	}

}

It fails on the assertThat(this.kafkaListenerConfiguration.topic).isEqualTo(TEST_TOPIC);.

So, apparently the header is resolved to the payload somehow...

@artembilan
Copy link
Member

According to the doc, the existing behavior is correct:

Due to some limitations in the way Spring resolves method arguments, a default @KafkaHandler cannot receive discrete headers; it must use the ConsumerRecordMetadata as discussed in Consumer Record Metadata.

So, that if (parameter.getParameterType().isInstance(providedArg)) { would apply for any first argument provided.
And if we have Object payload, but no real payload as a first provided arg, then this payload param would be resolved to something else, e.g. ConsumerRecord, or Acknowledgment.
Which really is not expected. It is less harmful to request a default method signature like @Headers Map<String, Object> headers, Object payload, instead of trying to mitigate all the possible use-cases when we fix the framework like you suggest.

Do I miss anything else?

@chickenchickenlove
Copy link
Contributor Author

Exactly! This is exactly what I'm saying.
Would it be okay to revert current commits and make a new commit to write description and limitation to adoc like @Headers Map<String, Object> headers instead of @Header("blahblah") String header.
WDYT?

@artembilan
Copy link
Member

a new commit to write description and limitation to adoc

sure! That could be a good plan.
Thanks

----

Also, this won't work as well.
The `payload` value will be injected as the `topic`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

" will be injected into the topic parameter"?

Otherwise I ma getting confused as before.
Such a sentence makes me think that header value is injected into the payload parameter.

}
----

If you need discrete custom headers in a default method, use this:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we discussed before that no "you", "me", "we" or even "they" in the technical documentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I will keep in mind 😢

@artembilan
Copy link
Member

@chickenchickenlove ,

you contribute a lot and we are grateful for that.

Tried to merge this your PR and noticed this:

Signed-off-by: chickenchickenlove [email protected]

from legal respective that doesn’t follow DCO requirements: https://docs.brigade.sh/topics/contributor-guide/signing/#:~:text=Verify%20the%20contribution%20in%20your,the%20terms%20of%20the%20DCO.&text=You%20MUST%20use%20your%20legal,Configure%20your%20git%20client%20appropriately.

Please , revise your Git client for your real name and real email.

@chickenchickenlove
Copy link
Contributor Author

@artembilan Thanks for your comment!
I have a question!
Should I revert to all commits I made in this PR and make new commits with legal names?
Or would it be okay to apply it starting from the next PR?

@artembilan
Copy link
Member

You can just squash all the commits to one, add real Sign off by, and force push to PR.
Of course, you would need to remove those wrong Sign offs from a new commit message.

@chickenchickenlove
Copy link
Contributor Author

chickenchickenlove commented May 7, 2025

I squashed commits to update legal name, and force pushed.!!

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, re-push with the proper commit message.
Here are some tips how that supposed to be: https://cbea.ms/git-commit/

@chickenchickenlove
Copy link
Contributor Author

@artembilan
Sorry for my poor commit message and to bother you again.
I didn't catch your point clearly.

I read the article you shared, and I assumed it might be one of the following requests.

  • Should I include all commit messages in the rebased commit's description body?
  • Are you recommending that I add an appropriate description since the commit message currently doesn’t have one?

What is your point? Please let me know 🙇‍♂️

@artembilan
Copy link
Member

I mean mostly “what?” and “why?”.
Turned out we had to change docs, and that had to be mentioned in the respective commit and with a reasoning .

Very often there is a situation when we look into the past change and we cannot determine why.

I don’t insist on that, but if you’d like to avoid this kind of discussion in the future contributions I’d recommend to make commits meaningful. For example: b0db30c

…e for the parameter annotated with @Header.

Fixes: spring-projects#3880
Issue link: spring-projects#3880

Add an documentation on how to correctly inject headers in default
methods of class-level @KafkaListener.

Because currently, The DefaultHandler resolve an incorrect value for the
parameter annotated with @Header. also, the spring-kafka's intended
usage is incompatiable with the design intent of the InvocationHandler,
so spring-kafka cannot resolve this issue.

Therefore, we will add documentation to prevent users from becoming
confused.

Signed-off-by: Sanghyeok An <[email protected]>
@chickenchickenlove
Copy link
Contributor Author

Thanks for sharing proper examples 🙇‍♂️
I revised my commits to follow your guide and force push!
When you have time, please take another look. 🙇‍♂️

@artembilan artembilan merged commit 05f10a4 into spring-projects:main May 8, 2025
3 checks passed
@artembilan
Copy link
Member

@chickenchickenlove ,
thank you again.
I have changed your commit message to make it more neutral.
Otherwise it sounds like you blame Spring for Apache Kafka for those limitations as bug.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

The DefaultHandler resolves an incorrect value for the parameter annotated with @Header.
2 participants