|
28 | 28 | import reactor.core.publisher.Flux; |
29 | 29 | import reactor.core.publisher.Mono; |
30 | 30 | import reactor.core.publisher.Sinks; |
31 | | -import reactor.core.scheduler.Scheduler; |
32 | | -import reactor.core.scheduler.Schedulers; |
33 | 31 | import reactor.util.context.ContextView; |
34 | 32 |
|
35 | 33 | import org.springframework.core.log.LogMessage; |
|
54 | 52 | public class FluxMessageChannel extends AbstractMessageChannel |
55 | 53 | implements Publisher<Message<?>>, ReactiveStreamsSubscribableChannel { |
56 | 54 |
|
57 | | - private final Scheduler scheduler = Schedulers.boundedElastic(); |
58 | | - |
59 | 55 | private final Sinks.Many<Message<?>> sink = Sinks.many().multicast().onBackpressureBuffer(1, false); |
60 | 56 |
|
61 | 57 | private final Sinks.Many<Boolean> subscribedSignal = Sinks.many().replay().limit(1); |
@@ -107,7 +103,8 @@ private boolean tryEmitMessage(Message<?> message) { |
107 | 103 | public void subscribe(Subscriber<? super Message<?>> subscriber) { |
108 | 104 | this.sink.asFlux() |
109 | 105 | .doFinally((s) -> this.subscribedSignal.tryEmitNext(this.sink.currentSubscriberCount() > 0)) |
110 | | - .share() |
| 106 | + .publish(1) |
| 107 | + .refCount() |
111 | 108 | .subscribe(subscriber); |
112 | 109 |
|
113 | 110 | Mono<Boolean> subscribersBarrier = |
@@ -148,7 +145,7 @@ public void subscribeTo(Publisher<? extends Message<?>> publisher) { |
148 | 145 | Flux<Object> upstreamPublisher = |
149 | 146 | Flux.from(publisher) |
150 | 147 | .delaySubscription(this.subscribedSignal.asFlux().filter(Boolean::booleanValue).next()) |
151 | | - .publishOn(this.scheduler) |
| 148 | +// .publishOn(this.scheduler) |
152 | 149 | .flatMap((message) -> |
153 | 150 | Mono.just(message) |
154 | 151 | .handle((messageToHandle, syncSink) -> sendReactiveMessage(messageToHandle)) |
@@ -185,7 +182,6 @@ public void destroy() { |
185 | 182 | this.upstreamSubscriptions.dispose(); |
186 | 183 | this.subscribedSignal.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST); |
187 | 184 | this.sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST); |
188 | | - this.scheduler.dispose(); |
189 | 185 | super.destroy(); |
190 | 186 | } |
191 | 187 |
|
|
0 commit comments