diff --git a/docs/Operator-Matrix.md b/docs/Operator-Matrix.md
index cd71558ddb..2ddd1cf21f 100644
--- a/docs/Operator-Matrix.md
+++ b/docs/Operator-Matrix.md
@@ -32,10 +32,10 @@ Operator | |||||
`concat`||||||
`concatArray`||||||
-`concatArrayDelayError`||||||
+`concatArrayDelayError`||||||
`concatArrayEager`||||| ([24](#notes-24))|
-`concatArrayEagerDelayError`||||| ([25](#notes-25))|
-`concatDelayError`||||||
+`concatArrayEagerDelayError`||||| ([25](#notes-25))|
+`concatDelayError`||||||
`concatEager`||||| ([26](#notes-26))|
`concatMap`||||| ([27](#notes-27))|
`concatMapCompletable`||||| ([27](#notes-27))|
@@ -237,7 +237,7 @@ Operator | |||| ([111](#notes-111))|
`zipArray`||||| ([112](#notes-112))|
`zipWith`||||| ([113](#notes-113))|
-**237 operators** | **215** | **209** | **115** | **100** | **78** |
+**237 operators** | **215** | **209** | **116** | **103** | **80** |
#### Notes
1 Use [`contains()`](#contains).
@@ -356,19 +356,13 @@ Operator | 
-2. Completable.concatArrayDelayError()
-3. Maybe.concatArrayEagerDelayError()
-4. Single.concatArrayEagerDelayError()
-5. Single.concatDelayError()
-6. Completable.concatDelayError()
-7. Single.mergeArray()
-8. Single.mergeArrayDelayError()
-9. Completable.onErrorReturn()
-10. Completable.onErrorReturnItem()
-11. Maybe.safeSubscribe()
-12. Single.safeSubscribe()
-13. Completable.safeSubscribe()
-14. Completable.sequenceEqual()
-15. Maybe.startWith()
-16. Single.startWith()
+1. Single.mergeArray()
+2. Single.mergeArrayDelayError()
+3. Completable.onErrorReturn()
+4. Completable.onErrorReturnItem()
+5. Maybe.safeSubscribe()
+6. Single.safeSubscribe()
+7. Completable.safeSubscribe()
+8. Completable.sequenceEqual()
+9. Maybe.startWith()
+10. Single.startWith()
diff --git a/src/main/java/io/reactivex/rxjava3/core/Completable.java b/src/main/java/io/reactivex/rxjava3/core/Completable.java
index 9febbd1204..253610ef80 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Completable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Completable.java
@@ -202,6 +202,27 @@ public static Completable concatArray(@NonNull CompletableSource... sources) {
return RxJavaPlugins.onAssembly(new CompletableConcatArray(sources));
}
+ /**
+ * Returns a {@code Completable} which completes only when all sources complete, one after another.
+ *
+ *
+ *
+ * - Scheduler:
+ * - {@code concatArrayDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param sources the sources to concatenate
+ * @return the new {@code Completable} instance
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @SafeVarargs
+ public static Completable concatArrayDelayError(@NonNull CompletableSource... sources) {
+ return Flowable.fromArray(sources).concatMapCompletableDelayError(Functions.identity(), true, 2);
+ }
+
/**
* Returns a {@code Completable} which completes only when all sources complete, one after another.
*
@@ -273,6 +294,76 @@ public static Completable concat(@NonNull Publisher<@NonNull ? extends Completab
return RxJavaPlugins.onAssembly(new CompletableConcat(sources, prefetch));
}
+ /**
+ * Returns a {@code Completable} which completes only when all sources complete, one after another.
+ *
+ *
+ *
+ * - Scheduler:
+ * - {@code concatDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param sources the sources to concatenate
+ * @return the new {@code Completable} instance
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public static Completable concatDelayError(@NonNull Iterable<@NonNull ? extends CompletableSource> sources) {
+ return Flowable.fromIterable(sources).concatMapCompletableDelayError(Functions.identity());
+ }
+
+ /**
+ * Returns a {@code Completable} which completes only when all sources complete, one after another.
+ *
+ *
+ *
+ * - Backpressure:
+ * - The returned {@code Completable} honors the backpressure of the downstream consumer
+ * and expects the other {@link Publisher} to honor it as well.
+ * - Scheduler:
+ * - {@code concatDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param sources the sources to concatenate
+ * @return the new {@code Completable} instance
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @BackpressureSupport(BackpressureKind.FULL)
+ @NonNull
+ public static Completable concatDelayError(@NonNull Publisher<@NonNull ? extends CompletableSource> sources) {
+ return concatDelayError(sources, 2);
+ }
+
+ /**
+ * Returns a {@code Completable} which completes only when all sources complete, one after another.
+ *
+ *
+ *
+ * - Backpressure:
+ * - The returned {@code Completable} honors the backpressure of the downstream consumer
+ * and expects the other {@link Publisher} to honor it as well.
+ * - Scheduler:
+ * - {@code concatDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param sources the sources to concatenate
+ * @param prefetch the number of sources to prefetch from the sources
+ * @return the new {@code Completable} instance
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @throws IllegalArgumentException if {@code prefetch} is non-positive
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @BackpressureSupport(BackpressureKind.FULL)
+ public static Completable concatDelayError(@NonNull Publisher<@NonNull ? extends CompletableSource> sources, int prefetch) {
+ return Flowable.fromPublisher(sources).concatMapCompletableDelayError(Functions.identity(), true, prefetch);
+ }
+
/**
* Provides an API (via a cold {@code Completable}) that bridges the reactive world with the callback-style world.
*
diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
index 6754f65e7b..ce02849d56 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
@@ -446,13 +446,42 @@ public static Flowable concatArrayDelayError(@NonNull MaybeSource exten
public static Flowable concatArrayEager(@NonNull MaybeSource extends T>... sources) {
return Flowable.fromArray(sources).concatMapEager((Function)MaybeToPublisher.instance());
}
+ /**
+ * Concatenates a sequence of {@link MaybeSource} eagerly into a {@link Flowable} sequence.
+ *
+ * Eager concatenation means that once an observer subscribes, this operator subscribes to all of the
+ * source {@code MaybeSource}s. The operator buffers the value emitted by these {@code MaybeSource}s and then drains them
+ * in order, each one after the previous one completes.
+ *
+ *
+ *
+ * - Backpressure:
+ * - The operator honors backpressure from downstream.
+ * - Scheduler:
+ * - This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type
+ * @param sources a sequence of {@code MaybeSource}s that need to be eagerly concatenated
+ * @return the new {@code Flowable} instance with the specified concatenation behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @since 3.0.0
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @BackpressureSupport(BackpressureKind.FULL)
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ @SafeVarargs
+ public static Flowable concatArrayEagerDelayError(@NonNull MaybeSource extends T>... sources) {
+ return Flowable.fromArray(sources).concatMapEagerDelayError((Function)MaybeToPublisher.instance(), true);
+ }
/**
* Concatenates the {@link Iterable} sequence of {@link MaybeSource}s into a single sequence by subscribing to each {@code MaybeSource},
* one after the other, one at a time and delays any errors till the all inner {@code MaybeSource}s terminate
* as a {@link Flowable} sequence.
*
- *
+ *
*
* - Backpressure:
* - The operator honors backpressure from downstream.
@@ -465,14 +494,12 @@ public static Flowable concatArrayEager(@NonNull MaybeSource extends T>
* @return the new {@code Flowable} with the concatenating behavior
* @throws NullPointerException if {@code sources} is {@code null}
*/
- @SuppressWarnings({ "unchecked", "rawtypes" })
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Flowable concatDelayError(@NonNull Iterable<@NonNull ? extends MaybeSource extends T>> sources) {
- Objects.requireNonNull(sources, "sources is null");
- return Flowable.fromIterable(sources).concatMapDelayError((Function)MaybeToPublisher.instance());
+ return Flowable.fromIterable(sources).concatMapMaybeDelayError(Functions.identity());
}
/**
@@ -493,13 +520,42 @@ public static Flowable concatDelayError(@NonNull Iterable<@NonNull ? exte
* @return the new {@code Flowable} with the concatenating behavior
* @throws NullPointerException if {@code sources} is {@code null}
*/
- @SuppressWarnings({ "unchecked", "rawtypes" })
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static Flowable concatDelayError(@NonNull Publisher<@NonNull ? extends MaybeSource extends T>> sources) {
- return Flowable.fromPublisher(sources).concatMapDelayError((Function)MaybeToPublisher.instance());
+ return Flowable.fromPublisher(sources).concatMapMaybeDelayError(Functions.identity());
+ }
+ /**
+ * Concatenates the {@link Publisher} sequence of {@link MaybeSource}s into a single sequence by subscribing to each inner {@code MaybeSource},
+ * one after the other, one at a time and delays any errors till the all inner and the outer {@code Publisher} terminate
+ * as a {@link Flowable} sequence.
+ *
+ *
+ *
+ * - Backpressure:
+ * - {@code concatDelayError} fully supports backpressure.
+ * - Scheduler:
+ * - {@code concatDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ *
+ * @param the common element base type
+ * @param sources the {@code Publisher} sequence of {@code MaybeSource}s
+ * @param prefetch The number of upstream items to prefetch so that fresh items are
+ * ready to be mapped when a previous {@code MaybeSource} terminates.
+ * The operator replenishes after half of the prefetch amount has been consumed
+ * and turned into {@code MaybeSource}s.
+ * @return the new {@code Flowable} with the concatenating behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @throws IllegalArgumentException if {@code prefetch} is non-positive
+ */
+ @BackpressureSupport(BackpressureKind.FULL)
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ public static Flowable concatDelayError(@NonNull Publisher<@NonNull ? extends MaybeSource extends T>> sources, int prefetch) {
+ return Flowable.fromPublisher(sources).concatMapMaybeDelayError(Functions.identity(), true, prefetch);
}
/**
diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java
index 0a1d872b14..9f5c3446c5 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Single.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Single.java
@@ -406,10 +406,35 @@ public static Flowable concat(
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
- @SuppressWarnings({ "unchecked", "rawtypes" })
@SafeVarargs
public static Flowable concatArray(@NonNull SingleSource extends T>... sources) {
- return RxJavaPlugins.onAssembly(new FlowableConcatMap(Flowable.fromArray(sources), SingleInternalHelper.toFlowable(), 2, ErrorMode.BOUNDARY));
+ return Flowable.fromArray(sources).concatMap(SingleInternalHelper.toFlowable(), 2);
+ }
+
+ /**
+ * Concatenate the single values, in a non-overlapping fashion, of the {@link SingleSource}s provided in
+ * an array.
+ *
+ *
+ *
+ * - Backpressure:
+ * - The returned {@link Flowable} honors the backpressure of the downstream consumer.
+ * - Scheduler:
+ * - {@code concatArrayDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type
+ * @param sources the array of {@code SingleSource} instances
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @BackpressureSupport(BackpressureKind.FULL)
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @SafeVarargs
+ public static Flowable concatArrayDelayError(@NonNull SingleSource extends T>... sources) {
+ return Flowable.fromArray(sources).concatMapDelayError(SingleInternalHelper.toFlowable(), true, 2);
}
/**
@@ -440,6 +465,35 @@ public static Flowable concatArrayEager(@NonNull SingleSource extends T
return Flowable.fromArray(sources).concatMapEager(SingleInternalHelper.toFlowable());
}
+ /**
+ * Concatenates a sequence of {@link SingleSource} eagerly into a single stream of values.
+ *
+ *
+ *
+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
+ * source {@code SingleSource}s. The operator buffers the value emitted by these {@code SingleSource}s and then drains them
+ * in order, each one after the previous one succeeds.
+ *
+ * - Backpressure:
+ * - The operator honors backpressure from downstream.
+ * - Scheduler:
+ * - This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type
+ * @param sources a sequence of {@code SingleSource}s that need to be eagerly concatenated
+ * @return the new {@link Flowable} instance with the specified concatenation behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @since 3.0.0
+ */
+ @BackpressureSupport(BackpressureKind.FULL)
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @SafeVarargs
+ public static Flowable concatArrayEagerDelayError(@NonNull SingleSource extends T>... sources) {
+ return Flowable.fromArray(sources).concatMapEagerDelayError(SingleInternalHelper.toFlowable(), true);
+ }
+
/**
* Concatenates a {@link Publisher} sequence of {@link SingleSource}s eagerly into a single stream of values.
*
@@ -469,6 +523,92 @@ public static Flowable concatEager(@NonNull Publisher<@NonNull ? extends
return Flowable.fromPublisher(sources).concatMapEager(SingleInternalHelper.toFlowable());
}
+ /**
+ * Concatenates the {@link Iterable} sequence of {@link SingleSource}s into a single sequence by subscribing to each {@code SingleSource},
+ * one after the other, one at a time and delays any errors till the all inner {@code SingleSource}s terminate
+ * as a {@link Flowable} sequence.
+ *
+ *
+ *
+ * - Backpressure:
+ * - The operator honors backpressure from downstream.
+ * - Scheduler:
+ * - {@code concatDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ *
+ * @param the common element base type
+ * @param sources the {@code Iterable} sequence of {@code SingleSource}s
+ * @return the new {@code Flowable} with the concatenating behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @since 3.0.0
+ */
+ @BackpressureSupport(BackpressureKind.FULL)
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public static Flowable concatDelayError(@NonNull Iterable<@NonNull ? extends SingleSource extends T>> sources) {
+ return Flowable.fromIterable(sources).concatMapSingleDelayError(Functions.identity());
+ }
+
+ /**
+ * Concatenates the {@link Publisher} sequence of {@link SingleSource}s into a single sequence by subscribing to each inner {@code SingleSource},
+ * one after the other, one at a time and delays any errors till the all inner and the outer {@code Publisher} terminate
+ * as a {@link Flowable} sequence.
+ *
+ *
+ *
+ * - Backpressure:
+ * - {@code concatDelayError} fully supports backpressure.
+ * - Scheduler:
+ * - {@code concatDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ *
+ * @param the common element base type
+ * @param sources the {@code Publisher} sequence of {@code SingleSource}s
+ * @return the new {@code Flowable} with the concatenating behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @since 3.0.0
+ */
+ @BackpressureSupport(BackpressureKind.FULL)
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ public static Flowable concatDelayError(@NonNull Publisher<@NonNull ? extends SingleSource extends T>> sources) {
+ return Flowable.fromPublisher(sources).concatMapSingleDelayError(Functions.identity());
+ }
+
+ /**
+ * Concatenates the {@link Publisher} sequence of {@link SingleSource}s into a single sequence by subscribing to each inner {@code SingleSource},
+ * one after the other, one at a time and delays any errors till the all inner and the outer {@code Publisher} terminate
+ * as a {@link Flowable} sequence.
+ *
+ *
+ *
+ * - Backpressure:
+ * - {@code concatDelayError} fully supports backpressure.
+ * - Scheduler:
+ * - {@code concatDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ *
+ * @param the common element base type
+ * @param sources the {@code Publisher} sequence of {@code SingleSource}s
+ * @param prefetch The number of upstream items to prefetch so that fresh items are
+ * ready to be mapped when a previous {@code SingleSource} terminates.
+ * The operator replenishes after half of the prefetch amount has been consumed
+ * and turned into {@code SingleSource}s.
+ * @return the new {@code Flowable} with the concatenating behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @throws IllegalArgumentException if {@code prefetch} is non-positive
+ * @since 3.0.0
+ */
+ @BackpressureSupport(BackpressureKind.FULL)
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ public static Flowable concatDelayError(@NonNull Publisher<@NonNull ? extends SingleSource extends T>> sources, int prefetch) {
+ return Flowable.fromPublisher(sources).concatMapSingleDelayError(Functions.identity(), true, prefetch);
+ }
+
/**
* Concatenates an {@link Iterable} sequence of {@link SingleSource}s eagerly into a single stream of values.
*
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcatArrayDelayErrorTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcatArrayDelayErrorTest.java
new file mode 100644
index 0000000000..229d00acc6
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcatArrayDelayErrorTest.java
@@ -0,0 +1,43 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.completable;
+
+import static org.mockito.Mockito.*;
+import org.junit.Test;
+
+import io.reactivex.rxjava3.core.Completable;
+import io.reactivex.rxjava3.exceptions.TestException;
+import io.reactivex.rxjava3.functions.Action;
+
+public class CompletableConcatArrayDelayErrorTest {
+
+ @Test
+ public void normal() throws Throwable {
+ Action action1 = mock(Action.class);
+ Action action2 = mock(Action.class);
+
+ Completable.concatArrayDelayError(
+ Completable.fromAction(action1),
+ Completable.error(new TestException()),
+ Completable.fromAction(action2)
+ )
+ .test()
+ .assertFailure(TestException.class);
+
+ verify(action1).run();
+
+ verify(action2).run();
+ }
+
+}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcatDelayErrorTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcatDelayErrorTest.java
new file mode 100644
index 0000000000..fd3124a96c
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcatDelayErrorTest.java
@@ -0,0 +1,82 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.completable;
+
+import static org.mockito.Mockito.*;
+
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.exceptions.TestException;
+import io.reactivex.rxjava3.functions.Action;
+
+public class CompletableConcatDelayErrorTest {
+
+ @Test
+ public void normalIterable() throws Throwable {
+ Action action1 = mock(Action.class);
+ Action action2 = mock(Action.class);
+
+ Completable.concatDelayError(Arrays.asList(
+ Completable.fromAction(action1),
+ Completable.error(new TestException()),
+ Completable.fromAction(action2)
+ ))
+ .test()
+ .assertFailure(TestException.class);
+
+ verify(action1).run();
+
+ verify(action2).run();
+ }
+
+ @Test
+ public void normalPublisher() throws Throwable {
+ Action action1 = mock(Action.class);
+ Action action2 = mock(Action.class);
+
+ Completable.concatDelayError(Flowable.fromArray(
+ Completable.fromAction(action1),
+ Completable.error(new TestException()),
+ Completable.fromAction(action2)
+ ))
+ .test()
+ .assertFailure(TestException.class);
+
+ verify(action1).run();
+
+ verify(action2).run();
+ }
+
+ @Test
+ public void normalPublisherPrefetch() throws Throwable {
+ Action action1 = mock(Action.class);
+ Action action2 = mock(Action.class);
+
+ Completable.concatDelayError(Flowable.fromArray(
+ Completable.fromAction(action1),
+ Completable.error(new TestException()),
+ Completable.fromAction(action2)
+ ), 1)
+ .test()
+ .assertFailure(TestException.class);
+
+ verify(action1).run();
+
+ verify(action2).run();
+ }
+
+}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeConcatArrayEagerDelayErrorTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeConcatArrayEagerDelayErrorTest.java
new file mode 100644
index 0000000000..6608fbc424
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeConcatArrayEagerDelayErrorTest.java
@@ -0,0 +1,34 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.maybe;
+
+import org.junit.Test;
+
+import io.reactivex.rxjava3.core.Maybe;
+import io.reactivex.rxjava3.exceptions.TestException;
+
+public class MaybeConcatArrayEagerDelayErrorTest {
+
+ @Test
+ public void normal() {
+ Maybe.concatArrayEagerDelayError(
+ Maybe.just(1),
+ Maybe.error(new TestException()),
+ Maybe.empty(),
+ Maybe.just(2)
+ )
+ .test()
+ .assertFailure(TestException.class, 1, 2);
+ }
+}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatArrayDelayErrorTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatArrayDelayErrorTest.java
new file mode 100644
index 0000000000..10fec20410
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatArrayDelayErrorTest.java
@@ -0,0 +1,34 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.single;
+
+import org.junit.Test;
+
+import io.reactivex.rxjava3.core.Single;
+import io.reactivex.rxjava3.exceptions.TestException;
+
+public class SingleConcatArrayDelayErrorTest {
+
+ @Test
+ public void normal() {
+ Single.concatArrayDelayError(
+ Single.just(1),
+ Single.error(new TestException()),
+ Single.just(2)
+ )
+ .test()
+ .assertFailure(TestException.class, 1, 2);
+ }
+
+}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatArrayEagerDelayErrorTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatArrayEagerDelayErrorTest.java
new file mode 100644
index 0000000000..ce0df373bb
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatArrayEagerDelayErrorTest.java
@@ -0,0 +1,34 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.single;
+
+import org.junit.Test;
+
+import io.reactivex.rxjava3.core.Single;
+import io.reactivex.rxjava3.exceptions.TestException;
+
+public class SingleConcatArrayEagerDelayErrorTest {
+
+ @Test
+ public void normal() {
+ Single.concatArrayEagerDelayError(
+ Single.just(1),
+ Single.error(new TestException()),
+ Single.just(2)
+ )
+ .test()
+ .assertFailure(TestException.class, 1, 2);
+ }
+
+}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatDelayErrorTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatDelayErrorTest.java
new file mode 100644
index 0000000000..83ae9cbf9e
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatDelayErrorTest.java
@@ -0,0 +1,58 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.single;
+
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.exceptions.TestException;
+
+public class SingleConcatDelayErrorTest {
+
+ @Test
+ public void normalIterable() {
+ Single.concatDelayError(Arrays.asList(
+ Single.just(1),
+ Single.error(new TestException()),
+ Single.just(2)
+ ))
+ .test()
+ .assertFailure(TestException.class, 1, 2);
+ }
+
+ @Test
+ public void normalPublisher() {
+ Single.concatDelayError(Flowable.fromArray(
+ Single.just(1),
+ Single.error(new TestException()),
+ Single.just(2)
+ ))
+ .test()
+ .assertFailure(TestException.class, 1, 2);
+ }
+
+ @Test
+ public void normalPublisherPrefetch() {
+ Single.concatDelayError(Flowable.fromArray(
+ Single.just(1),
+ Single.error(new TestException()),
+ Single.just(2)
+ ), 1)
+ .test()
+ .assertFailure(TestException.class, 1, 2);
+ }
+
+}
diff --git a/src/test/java/io/reactivex/rxjava3/maybe/MaybeTest.java b/src/test/java/io/reactivex/rxjava3/maybe/MaybeTest.java
index 7a0fe1ae7d..94cf62d641 100644
--- a/src/test/java/io/reactivex/rxjava3/maybe/MaybeTest.java
+++ b/src/test/java/io/reactivex/rxjava3/maybe/MaybeTest.java
@@ -2388,6 +2388,17 @@ public void concatPublisherDelayError() {
.assertFailure(TestException.class, 1);
}
+ @Test
+ public void concatPublisherDelayErrorPrefetch() {
+ Maybe.concatDelayError(Flowable.just(Maybe.empty(), Maybe.just(1), Maybe.error(new TestException())), 1)
+ .test()
+ .assertFailure(TestException.class, 1);
+
+ Maybe.concatDelayError(Flowable.just(Maybe.error(new TestException()), Maybe.empty(), Maybe.just(1)), 1)
+ .test()
+ .assertFailure(TestException.class, 1);
+ }
+
@Test
public void concatEagerArray() {
PublishProcessor pp1 = PublishProcessor.create();