-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Closed
Description
Multicastprocessor does not work as expected wrt. backpressure/request handling:
With buffer size 1, everything is ok. However, for larger buffer sizes requests are no sent upstream if the amount of requests does not exactly match the buffer size or the amount of requests is a whole multiple of the buffer size.
The code below illustrates this behavior - the first example works just because the backpressure handling towards an FlowableFromArray seems to be handled differently.
We assume that there might be an issue in the drain() method of MulticastProcessor.
What's your opinion on this? Could this be a bug or does it work as designed?
Code that works as expected
MulticastProcessor<Integer> multicastProcessor = MulticastProcessor.create(2, true);
TestSubscriber<Integer> testSubscriber = TestSubscriber.create(0);
multicastProcessor.subscribe(testSubscriber);
Flowable.fromArray(1,2,3,4,5,6,7,8,9).subscribe(multicastProcessor);
testSubscriber.requestMore(3);
testSubscriber.requestMore(3);
assertEquals(Arrays.asList(1,2,3,4,5,6), testSubscriber.values()); // expected result
Code with unexpected behaviour
MulticastProcessor<Integer> multicastProcessor = MulticastProcessor.create(2, true);
TestSubscriber<Integer> testSubscriber = TestSubscriber.create(0);
multicastProcessor.subscribe(testSubscriber);
Flowable.fromArray(1,2,3,4,5,6,7,8,9)
.doOnCancel(() -> System.out.println("log that should not change behaviour"))
.subscribe(multicastProcessor);
testSubscriber.requestMore(3);
testSubscriber.requestMore(3);
assertEquals(Arrays.asList(1,2,3,4), testSubscriber.values()); // unexpected result
Code that works again, after adjusting the request amount
MulticastProcessor<Integer> multicastProcessor = MulticastProcessor.create(2, true);
TestSubscriber<Integer> testSubscriber = TestSubscriber.create(0);
multicastProcessor.subscribe(testSubscriber);
Flowable.fromArray(1,2,3,4,5,6,7,8,9)
.doOnCancel(() -> System.out.println("log that should not change behaviour"))
.subscribe(multicastProcessor);
testSubscriber.requestMore(2);
testSubscriber.requestMore(2);
assertEquals(Arrays.asList(1,2,3,4,5,6), testSubscriber.values()); // expected result
version
2.2.14