@@ -23,18 +23,15 @@ import kotlinx.coroutines.FlowPreview
2323import kotlinx.coroutines.GlobalScope
2424import kotlinx.coroutines.async
2525import kotlinx.coroutines.flow.Flow
26- import kotlinx.coroutines.flow.collect
27- import kotlinx.coroutines.flow.flow
2826import kotlinx.coroutines.reactive.awaitFirstOrNull
27+ import kotlinx.coroutines.reactive.flow.asPublisher
2928
3029import kotlinx.coroutines.reactor.mono
3130import reactor.core.publisher.Mono
3231import reactor.core.publisher.onErrorMap
3332import java.lang.reflect.InvocationTargetException
3433import java.lang.reflect.Method
3534import kotlin.reflect.full.callSuspend
36- import kotlin.reflect.full.isSubtypeOf
37- import kotlin.reflect.full.starProjectedType
3835import kotlin.reflect.jvm.kotlinFunction
3936
4037/* *
@@ -56,7 +53,8 @@ internal fun <T: Any> monoToDeferred(source: Mono<T>) =
5653 GlobalScope .async(Dispatchers .Unconfined ) { source.awaitFirstOrNull() }
5754
5855/* *
59- * Invoke an handler method converting suspending method to [Mono] or [Flow] if necessary.
56+ * Invoke an handler method converting suspending method to [Mono] or
57+ * [reactor.core.publisher.Flux] if necessary.
6058 *
6159 * @author Sebastien Deleuze
6260 * @since 5.2
@@ -66,18 +64,15 @@ internal fun <T: Any> monoToDeferred(source: Mono<T>) =
6664internal fun invokeHandlerMethod (method : Method , bean : Any , vararg args : Any? ): Any? {
6765 val function = method.kotlinFunction!!
6866 return if (function.isSuspend) {
69- if (function.returnType.isSubtypeOf( Flow :: class .starProjectedType) ) {
70- flow {
71- (function.callSuspend(bean, * args.sliceArray( 0 .. (args.size - 2 ))) as Flow < * >).collect {
72- emit(it)
73- }
74- }
67+ val mono = GlobalScope .mono( Dispatchers . Unconfined ) {
68+ function.callSuspend(bean, * args.sliceArray( 0 .. (args.size - 2 )))
69+ . let { if (it == Unit ) null else it }
70+ }.onErrorMap( InvocationTargetException :: class ) { it.targetException }
71+ if (function.returnType.classifier == Flow :: class ) {
72+ mono.flatMapMany { (it as Flow < Any >).asPublisher() }
7573 }
7674 else {
77- GlobalScope .mono(Dispatchers .Unconfined ) {
78- function.callSuspend(bean, * args.sliceArray(0 .. (args.size- 2 )))
79- .let { if (it == Unit ) null else it}
80- }.onErrorMap(InvocationTargetException ::class ) { it.targetException }
75+ mono
8176 }
8277 }
8378 else {
0 commit comments