T - the value type pushed@FunctionalInterface -public interface FlowableOnSubscribe<T>-
subscribe() method that receives
- an instance of a FlowableEmitter instance that allows pushing
- events in a backpressure-safe and cancellation-safe manner.| Modifier and Type | -Method and Description | -
|---|---|
void |
-subscribe(@NonNull FlowableEmitter<T> emitter)
-Called for each
-Subscriber that subscribes. |
-
void subscribe(@NonNull - @NonNull FlowableEmitter<T> emitter) - throws Throwable-
Subscriber that subscribes.emitter - the safe emitter instance, never nullThrowable - on errorT - the type of item the Observer expects to observepublic interface Observer<T>
-
- When an Observer is subscribed to an ObservableSource through the ObservableSource.subscribe(Observer) method,
- the ObservableSource calls onSubscribe(Disposable) with a Disposable that allows
- disposing the sequence at any time, then the
- ObservableSource may call the Observer's onNext(T) method any number of times
- to provide notifications. A well-behaved
- ObservableSource will call an Observer's onComplete() method exactly once or the Observer's
- onError(java.lang.Throwable) method exactly once.
-
- Calling the Observer's method must happen in a serialized fashion, that is, they must not
- be invoked concurrently by multiple threads in an overlapping fashion and the invocation pattern must
- adhere to the following protocol:
-
onSubscribe onNext* (onError | onComplete)?
-
- Subscribing an Observer to multiple ObservableSources is not recommended. If such reuse
- happens, it is the duty of the Observer implementation to be ready to receive multiple calls to
- its methods and ensure proper concurrent behavior of its business logic.
-
- Calling onSubscribe(Disposable), onNext(Object) or onError(Throwable) with a
- null argument is forbidden.
-
- The implementations of the onXXX methods should avoid throwing runtime exceptions other than the following cases
- (see Rule 2.13 of the Reactive Streams specification):
-
null, the methods can throw a NullPointerException.
- Note though that RxJava prevents nulls to enter into the flow and thus there is generally no
- need to check for nulls in flows assembled from standard sources and intermediate operators.
- VirtualMachineError).- Violating Rule 2.13 results in undefined flow behavior. Generally, the following can happen: -
onError(java.lang.Throwable) call.ObservableSource.subscribe(Observer) throws instead of returning normally.Scheduler or Executor)
- providing the asynchronous boundary the code is running and either routes the exception to the global
- RxJavaPlugins.onError(Throwable) handler or the current thread's
- Thread.UncaughtExceptionHandler.uncaughtException(Thread, Throwable) handler.Observable's perspective, an Observer is the end consumer thus it is the Observer's
- responsibility to handle the error case and signal it "further down". This means unreliable code in the onXXX
- methods should be wrapped into `try-catch`es, specifically in onError(Throwable) or onComplete(), and handled there
- (for example, by logging it or presenting the user with an error dialog). However, if the error would be thrown from
- onNext(Object), Rule 2.13 mandates
- the implementation calls Disposable.dispose() and signals the exception in a way that is adequate to the target context,
- for example, by calling onError(Throwable) on the same Observer instance.
-
- If, for some reason, the Observer won't follow Rule 2.13, the Observable.safeSubscribe(Observer) can wrap it
- with the necessary safeguards and route exceptions thrown from onNext into onError and route exceptions thrown
- from onError and onComplete into the global error handler via RxJavaPlugins.onError(Throwable).
| Modifier and Type | -Method and Description | -
|---|---|
void |
-onComplete()
-Notifies the
-Observer that the Observable has finished sending push-based notifications. |
-
void |
-onError(@NonNull Throwable e)
-Notifies the
-Observer that the Observable has experienced an error condition. |
-
void |
-onNext(T t)
-Provides the
-Observer with a new item to observe. |
-
void |
-onSubscribe(@NonNull Disposable d)
-Provides the
-Observer with the means of cancelling (disposing) the
- connection (channel) with the Observable in both
- synchronous (from within onNext(Object)) and asynchronous manner. |
-
void onSubscribe(@NonNull - @NonNull Disposable d)-
Observer with the means of cancelling (disposing) the
- connection (channel) with the Observable in both
- synchronous (from within onNext(Object)) and asynchronous manner.d - the Disposable instance whose Disposable.dispose() can
- be called anytime to cancel the connectionvoid onNext(@NonNull - T t)-
Observer with a new item to observe.
-
- The Observable may call this method 0 or more times.
-
- The Observable will not call this method again after it calls either onComplete() or
- onError(java.lang.Throwable).
t - the item emitted by the Observablevoid onError(@NonNull - @NonNull Throwable e)-
Observer that the Observable has experienced an error condition.
-
- If the Observable calls this method, it will not thereafter call onNext(T) or
- onComplete().
e - the exception encountered by the Observablevoid onComplete()-
Observer that the Observable has finished sending push-based notifications.
-
- The Observable will not call this method if it calls onError(java.lang.Throwable).
public abstract class Scheduler -extends Object-
Scheduler is an object that specifies an API for scheduling
- units of work provided in the form of Runnables to be
- executed without delay (effectively as soon as possible), after a specified time delay or periodically
- and represents an abstraction over an asynchronous boundary that ensures
- these units of work get executed by some underlying task-execution scheme
- (such as custom Threads, event loop, Executor or Actor system)
- with some uniform properties and guarantees regardless of the particular underlying
- scheme.
-
- You can get various standard, RxJava-specific instances of this class via
- the static methods of the Schedulers utility class.
-
- The so-called Scheduler.Workers of a Scheduler can be created via the createWorker() method which allow the scheduling
- of multiple Runnable tasks in an isolated manner. Runnable tasks scheduled on a Worker are guaranteed to be
- executed sequentially and in a non-overlapping fashion. Non-delayed Runnable tasks are guaranteed to execute in a
- First-In-First-Out order but their execution may be interleaved with delayed tasks.
- In addition, outstanding or running tasks can be cancelled together via
- Disposable.dispose() without affecting any other Worker instances of the same Scheduler.
-
- Implementations of the scheduleDirect(java.lang.Runnable) and Scheduler.Worker.schedule(java.lang.Runnable) methods are encouraged to call the RxJavaPlugins.onSchedule(Runnable)
- method to allow a scheduler hook to manipulate (wrap or replace) the original Runnable task before it is submitted to the
- underlying task-execution scheme.
-
- The default implementations of the scheduleDirect methods provided by this abstract class
- delegate to the respective schedule methods in the Scheduler.Worker instance created via createWorker()
- for each individual Runnable task submitted. Implementors of this class are encouraged to provide
- a more efficient direct scheduling implementation to avoid the time and memory overhead of creating such Workers
- for every task.
- This delegation is done via special wrapper instances around the original Runnable before calling the respective
- Worker.schedule method. Note that this can lead to multiple RxJavaPlugins.onSchedule calls and potentially
- multiple hooks applied. Therefore, the default implementations of scheduleDirect (and the Scheduler.Worker.schedulePeriodically(Runnable, long, long, TimeUnit))
- wrap the incoming Runnable into a class that implements the SchedulerRunnableIntrospection
- interface which can grant access to the original or hooked Runnable, thus, a repeated RxJavaPlugins.onSchedule
- can detect the earlier hook and not apply a new one over again.
-
- The default implementation of now(TimeUnit) and Scheduler.Worker.now(TimeUnit) methods to return current
- System.currentTimeMillis() value in the desired time unit. Custom Scheduler implementations can override this
- to provide specialized time accounting (such as virtual time to be advanced programmatically).
- Note that operators requiring a Scheduler may rely on either of the now() calls provided by
- Scheduler or Worker respectively, therefore, it is recommended they represent a logically
- consistent source of the current time.
-
- The default implementation of the Scheduler.Worker.schedulePeriodically(Runnable, long, long, TimeUnit) method uses
- the Scheduler.Worker.schedule(Runnable, long, TimeUnit) for scheduling the Runnable task periodically.
- The algorithm calculates the next absolute time when the task should run again and schedules this execution
- based on the relative time between it and Scheduler.Worker.now(TimeUnit). However, drifts or changes in the
- system clock could affect this calculation either by scheduling subsequent runs too frequently or too far apart.
- Therefore, the default implementation uses the clockDriftTolerance() value (set via
- rx3.scheduler.drift-tolerance in minutes) to detect a drift in Scheduler.Worker.now(TimeUnit) and
- re-adjust the absolute/relative time calculation accordingly.
-
- The default implementations of start() and shutdown() do nothing and should be overridden if the
- underlying task-execution scheme supports stopping and restarting itself.
-
- If the Scheduler is shut down or a Worker is disposed, the schedule methods
- should return the Disposable.disposed() singleton instance indicating the shut down/disposed
- state to the caller. Since the shutdown or dispose can happen from any thread, the schedule implementations
- should make best effort to cancel tasks immediately after those tasks have been submitted to the
- underlying task-execution scheme if the shutdown/dispose was detected after this submission.
-
- All methods on the Scheduler and Worker classes should be thread safe.
| Modifier and Type | -Class and Description | -
|---|---|
static class |
-Scheduler.Worker
-Represents an isolated, sequential worker of a parent Scheduler for executing
-Runnable tasks on
- an underlying task-execution scheme (such as custom Threads, event loop, Executor or Actor system). |
-
| Modifier and Type | -Method and Description | -
|---|---|
static long |
-clockDriftTolerance()
-Returns the clock drift tolerance in nanoseconds.
- |
-
abstract @NonNull Scheduler.Worker |
-createWorker()
-Retrieves or creates a new
-Scheduler.Worker that represents sequential execution of actions. |
-
long |
-now(@NonNull TimeUnit unit)
-Returns the 'current time' of the Scheduler in the specified time unit.
- |
-
@NonNull Disposable |
-scheduleDirect(@NonNull Runnable run)
-Schedules the given task on this Scheduler without any time delay.
- |
-
@NonNull Disposable |
-scheduleDirect(@NonNull Runnable run,
- long delay,
- @NonNull TimeUnit unit)
-Schedules the execution of the given task with the given time delay.
- |
-
@NonNull Disposable |
-schedulePeriodicallyDirect(@NonNull Runnable run,
- long initialDelay,
- long period,
- @NonNull TimeUnit unit)
-Schedules a periodic execution of the given task with the given initial time delay and repeat period.
- |
-
void |
-shutdown()
-Instructs the Scheduler instance to stop threads,
- stop accepting tasks on any outstanding
-Scheduler.Worker instances
- and clean up any associated resources with this Scheduler. |
-
void |
-start()
-Allows the Scheduler instance to start threads
- and accept tasks on them.
- |
-
<S extends Scheduler & Disposable> |
-when(@NonNull Function<Flowable<Flowable<Completable>>,Completable> combine)
-Allows the use of operators for controlling the timing around when
- actions scheduled on workers are actually done.
- |
-
public static long clockDriftTolerance()-
Related system property: rx3.scheduler.drift-tolerance in minutes.
@NonNull -public abstract @NonNull Scheduler.Worker createWorker()-
Scheduler.Worker that represents sequential execution of actions.
-
- When work is completed, the Worker instance should be released
- by calling Disposable.dispose() to avoid potential resource leaks in the
- underlying task-execution scheme.
-
- Work on a Scheduler.Worker is guaranteed to be sequential and non-overlapping.
public long now(@NonNull - @NonNull TimeUnit unit)-
unit - the time unitNullPointerException - if unit is nullpublic void start()-
- Implementations should make sure the call is idempotent, thread-safe and
- should not throw any RuntimeException if it doesn't support this
- functionality.
public void shutdown()-
Scheduler.Worker instances
- and clean up any associated resources with this Scheduler.
-
- Implementations should make sure the call is idempotent, thread-safe and
- should not throw any RuntimeException if it doesn't support this
- functionality.
@NonNull -public @NonNull Disposable scheduleDirect(@NonNull - @NonNull Runnable run)-
- This method is safe to be called from multiple threads but there are no - ordering or non-overlapping guarantees between tasks.
run - the task to executeNullPointerException - if run is null@NonNull -public @NonNull Disposable scheduleDirect(@NonNull - @NonNull Runnable run, - long delay, - @NonNull - @NonNull TimeUnit unit)-
- This method is safe to be called from multiple threads but there are no - ordering guarantees between tasks.
run - the task to scheduledelay - the delay amount, non-positive values indicate non-delayed schedulingunit - the unit of measure of the delay amountNullPointerException - if run or unit is null@NonNull -public @NonNull Disposable schedulePeriodicallyDirect(@NonNull - @NonNull Runnable run, - long initialDelay, - long period, - @NonNull - @NonNull TimeUnit unit)-
- This method is safe to be called from multiple threads but there are no - ordering guarantees between tasks. - -
- The periodic execution is at a fixed rate, that is, the first execution will be after the
- initialDelay, the second after initialDelay + period, the third after
- initialDelay + 2 * period, and so on.
run - the task to scheduleinitialDelay - the initial delay amount, non-positive values indicate non-delayed schedulingperiod - the period at which the task should be re-executedunit - the unit of measure of the delay amountNullPointerException - if run or unit is null@NonNull -public <S extends Scheduler & Disposable> S when(@NonNull - @NonNull Function<Flowable<Flowable<Completable>>,Completable> combine)-
Scheduler. The only parameter
- is a function that flattens an Flowable of Flowable
- of Completables into just one Completable. There must be
- a chain of operators connecting the returned value to the source
- Flowable otherwise any work scheduled on the returned
- Scheduler will not be executed.
-
- When createWorker() is invoked a Flowable of
- Completables is onNext'd to the combinator to be flattened. If
- the inner Flowable is not immediately subscribed to an calls to
- Scheduler.Worker.schedule(java.lang.Runnable) are buffered. Once the Flowable is
- subscribed to actions are then onNext'd as Completables.
-
- Finally the actions scheduled on the parent Scheduler when the
- inner most Completables are subscribed to.
-
- When the Scheduler.Worker is unsubscribed the Completable emits an
- onComplete and triggers any behavior in the flattening operator. The
- Flowable and all Completables give to the flattening
- function never onError.
-
- Limit the amount concurrency two at a time without creating a new fix - size thread pool: - -
- Scheduler limitScheduler = Schedulers.computation().when(workers -> {
- // use merge max concurrent to limit the number of concurrent
- // callbacks two at a time
- return Completable.merge(Flowable.merge(workers), 2);
- });
-
-
- This is a slightly different way to limit the concurrency but it has some
- interesting benefits and drawbacks to the method above. It works by
- limited the number of concurrent Scheduler.Workers rather than individual
- actions. Generally each Flowable uses its own Scheduler.Worker.
- This means that this will essentially limit the number of concurrent
- subscribes. The danger comes from using operators like
- Flowable.zip(org.reactivestreams.Publisher, org.reactivestreams.Publisher, io.reactivex.rxjava3.functions.BiFunction) where
- subscribing to the first Flowable could deadlock the
- subscription to the second.
-
-
- Scheduler limitScheduler = Schedulers.computation().when(workers -> {
- // use merge max concurrent to limit the number of concurrent
- // Flowables two at a time
- return Completable.merge(Flowable.merge(workers, 2));
- });
-
-
- Slowing down the rate to no more than than 1 a second. This suffers from
- the same problem as the one above I could find an Flowable
- operator that limits the rate without dropping the values (aka leaky
- bucket algorithm).
-
-
- Scheduler slowScheduler = Schedulers.computation().when(workers -> {
- // use concatenate to make each worker happen one at a time.
- return Completable.concat(workers.map(actions -> {
- // delay the starting of the next worker by 1 second.
- return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
- }));
- });
-
-
- History: 2.0.1 - experimental
S - a Scheduler and a Subscriptioncombine - the function that takes a two-level nested Flowable sequence of a Completable and returns
- the Completable that will be subscribed to and should trigger the execution of the scheduled Actions.NullPointerException - if combine is nullT1 - the first value typeT2 - the second value typeT3 - the third value typeR - the result type@FunctionalInterface -public interface Function3<T1,T2,T3,R>-
| Modifier and Type | -Method and Description | -
|---|---|
R |
-apply(T1 t1,
- T2 t2,
- T3 t3)
-Calculate a value based on the input values.
- |
-
R apply(T1 t1, - T2 t2, - T3 t3) - throws Throwable-
t1 - the first valuet2 - the second valuet3 - the third valueThrowable - if the implementation wishes to throw any type of exception@FunctionalInterface -public interface LongConsumer-
| Modifier and Type | -Method and Description | -
|---|---|
void |
-accept(long t)
-Consume a primitive long input.
- |
-