Skip to content

Commit b4c95bf

Browse files
committed
ReactiveAdapterRegistry uses Reactor's JdkFlowAdapter
Issue: SPR-16052
1 parent 16a08cb commit b4c95bf

File tree

1 file changed

+49
-2
lines changed

1 file changed

+49
-2
lines changed

spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.core;
1818

19+
import java.lang.reflect.Method;
1920
import java.util.ArrayList;
2021
import java.util.List;
2122
import java.util.Optional;
@@ -29,16 +30,22 @@
2930
import rx.RxReactiveStreams;
3031

3132
import org.springframework.lang.Nullable;
33+
import org.springframework.util.ClassUtils;
34+
import org.springframework.util.ReflectionUtils;
3235

33-
import static org.springframework.core.ReactiveTypeDescriptor.*;
36+
import static org.springframework.core.ReactiveTypeDescriptor.multiValue;
37+
import static org.springframework.core.ReactiveTypeDescriptor.noValue;
38+
import static org.springframework.core.ReactiveTypeDescriptor.singleOptionalValue;
39+
import static org.springframework.core.ReactiveTypeDescriptor.singleRequiredValue;
3440

3541
/**
3642
* A registry of adapters to adapt a Reactive Streams {@link Publisher} to/from
3743
* various async/reactive types such as {@code CompletableFuture}, RxJava
3844
* {@code Observable}, and others.
3945
*
4046
* <p>By default, depending on classpath availability, adapters are registered
41-
* for Reactor, RxJava 1, RxJava 2 types, and {@link CompletableFuture}.
47+
* for Reactor, RxJava 1, RxJava 2 types, {@link CompletableFuture}, and Java 9+
48+
* Flow.Publisher.
4249
*
4350
* @author Rossen Stoyanchev
4451
* @author Sebastien Deleuze
@@ -82,6 +89,18 @@ public ReactiveAdapterRegistry() {
8289
catch (Throwable ex) {
8390
// Ignore
8491
}
92+
93+
// Java 9+ Flow.Publisher
94+
try {
95+
new ReactorJdkFlowAdapterRegistrar().registerAdapter(this);
96+
}
97+
catch (NoSuchMethodException ex) {
98+
throw new IllegalStateException("Failed to find JdkFlowAdapter methods", ex);
99+
}
100+
catch (Throwable ex) {
101+
// Ignore
102+
// We can fall back on "reactive-streams-flow-bridge" (once released)
103+
}
85104
}
86105

87106

@@ -232,6 +251,34 @@ void registerAdapters(ReactiveAdapterRegistry registry) {
232251
}
233252

234253

254+
private static class ReactorJdkFlowAdapterRegistrar {
255+
256+
// TODO: remove reflection when build requires JDK 9+
257+
258+
void registerAdapter(ReactiveAdapterRegistry registry)
259+
throws NoSuchMethodException, ClassNotFoundException {
260+
261+
String name = "java.util.concurrent.Flow.Publisher";
262+
Class<?> type = ClassUtils.forName(name, getClass().getClassLoader());
263+
264+
Method toFlowMethod = getMethod("publisherToFlowPublisher", Publisher.class);
265+
Method toFluxMethod = getMethod("flowPublisherToFlux", type);
266+
267+
Object emptyFlow = ReflectionUtils.invokeMethod(toFlowMethod, null, Flux.empty());
268+
269+
registry.registerReactiveType(
270+
multiValue(type, () -> emptyFlow),
271+
source -> (Publisher<?>) ReflectionUtils.invokeMethod(toFluxMethod, null, source),
272+
publisher -> ReflectionUtils.invokeMethod(toFlowMethod, null, publisher)
273+
);
274+
}
275+
276+
private static Method getMethod(String name, Class<?> argumentType) throws NoSuchMethodException {
277+
return reactor.adapter.JdkFlowAdapter.class.getMethod(name, argumentType);
278+
}
279+
}
280+
281+
235282
/**
236283
* Extension of ReactiveAdapter that wraps adapted (raw) Publisher's as
237284
* {@link Flux} or {@link Mono} depending on the underlying reactive type's

0 commit comments

Comments
 (0)