You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: src/main/java/io/reactivex/Completable.java
+122-5
Original file line number
Diff line number
Diff line change
@@ -1369,15 +1369,132 @@ public final Completable doFinally(Action onFinally) {
1369
1369
}
1370
1370
1371
1371
/**
1372
-
* <strong>Advanced use without safeguards:</strong> lifts a CompletableOperator
1373
-
* transformation into the chain of Completables.
1372
+
* <strong>This method requires advanced knowledge about building operators, please consider
1373
+
* other standard composition methods first;</strong>
1374
+
* Returns a {@code Completable} which, when subscribed to, invokes the {@link CompletableOperator#apply(CompletableObserver) apply(CompletableObserver)} method
1375
+
* of the provided {@link CompletableOperator} for each individual downstream {@link Completable} and allows the
1376
+
* insertion of a custom operator by accessing the downstream's {@link CompletableObserver} during this subscription phase
1377
+
* and providing a new {@code CompletableObserver}, containing the custom operator's intended business logic, that will be
1378
+
* used in the subscription process going further upstream.
1379
+
* <p>
1380
+
* Generally, such a new {@code CompletableObserver} will wrap the downstream's {@code CompletableObserver} and forwards the
1381
+
* {@code onError} and {@code onComplete} events from the upstream directly or according to the
1382
+
* emission pattern the custom operator's business logic requires. In addition, such operator can intercept the
1383
+
* flow control calls of {@code dispose} and {@code isDisposed} that would have traveled upstream and perform
1384
+
* additional actions depending on the same business logic requirements.
1385
+
* <p>
1386
+
* Example:
1387
+
* <pre><code>
1388
+
* // Step 1: Create the consumer type that will be returned by the CompletableOperator.apply():
1389
+
*
1390
+
* public final class CustomCompletableObserver implements CompletableObserver, Disposable {
1391
+
*
1392
+
* // The donstream's CompletableObserver that will receive the onXXX events
1393
+
* final CompletableObserver downstream;
1394
+
*
1395
+
* // The connection to the upstream source that will call this class' onXXX methods
1396
+
* Disposable upstream;
1397
+
*
1398
+
* // The constructor takes the downstream subscriber and usually any other parameters
1399
+
* public CustomCompletableObserver(CompletableObserver downstream) {
1400
+
* this.downstream = downstream;
1401
+
* }
1402
+
*
1403
+
* // In the subscription phase, the upstream sends a Disposable to this class
1404
+
* // and subsequently this class has to send a Disposable to the downstream.
1405
+
* // Note that relaying the upstream's Disposable directly is not allowed in RxJava
1406
+
* @Override
1407
+
* public void onSubscribe(Disposable s) {
1408
+
* if (upstream != null) {
1409
+
* s.cancel();
1410
+
* } else {
1411
+
* upstream = s;
1412
+
* downstream.onSubscribe(this);
1413
+
* }
1414
+
* }
1415
+
*
1416
+
* // Some operators may handle the upstream's error while others
1417
+
* // could just forward it to the downstream.
1418
+
* @Override
1419
+
* public void onError(Throwable throwable) {
1420
+
* downstream.onError(throwable);
1421
+
* }
1422
+
*
1423
+
* // When the upstream completes, usually the downstream should complete as well.
1424
+
* // In completable, this could also mean doing some side-effects
1425
+
* @Override
1426
+
* public void onComplete() {
1427
+
* System.out.println("Sequence completed");
1428
+
* downstream.onComplete();
1429
+
* }
1430
+
*
1431
+
* // Some operators may use their own resources which should be cleaned up if
1432
+
* // the downstream disposes the flow before it completed. Operators without
1433
+
* // resources can simply forward the dispose to the upstream.
1434
+
* // In some cases, a disposed flag may be set by this method so that other parts
1435
+
* // of this class may detect the dispose and stop sending events
1436
+
* // to the downstream.
1437
+
* @Override
1438
+
* public void dispose() {
1439
+
* upstream.dispose();
1440
+
* }
1441
+
*
1442
+
* // Some operators may simply forward the call to the upstream while others
1443
+
* // can return the disposed flag set in dispose().
1444
+
* @Override
1445
+
* public boolean isDisposed() {
1446
+
* return upstream.isDisposed();
1447
+
* }
1448
+
* }
1449
+
*
1450
+
* // Step 2: Create a class that implements the CompletableOperator interface and
1451
+
* // returns the custom consumer type from above in its apply() method.
1452
+
* // Such class may define additional parameters to be submitted to
1453
+
* // the custom consumer type.
1454
+
*
1455
+
* final class CustomCompletableOperator implements CompletableOperator {
1456
+
* @Override
1457
+
* public CompletableObserver apply(CompletableObserver upstream) {
1458
+
* return new CustomCompletableObserver(upstream);
1459
+
* }
1460
+
* }
1461
+
*
1462
+
* // Step 3: Apply the custom operator via lift() in a flow by creating an instance of it
1463
+
* // or reusing an existing one.
1464
+
*
1465
+
* Completable.complete()
1466
+
* .lift(new CustomCompletableOperator())
1467
+
* .test()
1468
+
* .assertResult();
1469
+
* </code></pre>
1470
+
* <p>
1471
+
* Creating custom operators can be complicated and it is recommended one consults the
1472
+
* <a href="https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0">RxJava wiki: Writing operators</a> page about
1473
+
* the tools, requirements, rules, considerations and pitfalls of implementing them.
1474
+
* <p>
1475
+
* Note that implementing custom operators via this {@code lift()} method adds slightly more overhead by requiring
1476
+
* an additional allocation and indirection per assembled flows. Instead, extending the abstract {@code Completable}
1477
+
* class and creating a {@link CompletableTransformer} with it is recommended.
1478
+
* <p>
1479
+
* Note also that it is not possible to stop the subscription phase in {@code lift()} as the {@code apply()} method
1480
+
* requires a non-null {@code CompletableObserver} instance to be returned, which is then unconditionally subscribed to
1481
+
* the upstream {@code Completable}. For example, if the operator decided there is no reason to subscribe to the
1482
+
* upstream source because of some optimization possibility or a failure to prepare the operator, it still has to
1483
+
* return a {@code CompletableObserver} that should immediately dispose the upstream's {@code Disposable} in its
1484
+
* {@code onSubscribe} method. Again, using a {@code CompletableTransformer} and extending the {@code Completable} is
1485
+
* a better option as {@link #subscribeActual} can decide to not subscribe to its upstream after all.
1374
1486
* <dl>
1375
1487
* <dt><b>Scheduler:</b></dt>
1376
-
* <dd>{@code lift} does not operate by default on a particular {@link Scheduler}.</dd>
1488
+
* <dd>{@code lift} does not operate by default on a particular {@link Scheduler}, however, the
1489
+
* {@link CompletableOperator} may use a {@code Scheduler} to support its own asynchronous behavior.</dd>
1377
1490
* </dl>
1378
-
* @param onLift the lifting function that transforms the child subscriber with a parent subscriber.
1491
+
*
1492
+
* @param onLift the {@link CompletableOperator} that receives the downstream's {@code CompletableObserver} and should return
1493
+
* a {@code CompletableObserver} with custom behavior to be used as the consumer for the current
0 commit comments