Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit efaa30a

Browse files
committed
Added SMono.retryWhen(Retry) and deprecate all other SMono.retry and SMono.retryWhen
1 parent 68f2656 commit efaa30a

File tree

2 files changed

+64
-1
lines changed

2 files changed

+64
-1
lines changed

src/main/scala/reactor/core/scala/publisher/SMono.scala

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@ import reactor.core.{Disposable, Scannable => JScannable}
1313
import reactor.util.concurrent.Queues.SMALL_BUFFER_SIZE
1414
import reactor.util.context.Context
1515
import reactor.util.function.{Tuple2, Tuple3, Tuple4, Tuple5, Tuple6}
16+
import reactor.util.retry.Retry
1617

1718
import scala.annotation.unchecked.uncheckedVariance
1819
import scala.jdk.CollectionConverters._
1920
import scala.concurrent.duration.Duration
2021
import scala.concurrent.{ExecutionContext, Future, Promise}
2122
import scala.reflect.ClassTag
2223
import scala.util.{Failure, Success, Try}
23-
2424
import scala.language.implicitConversions
2525

2626

@@ -960,6 +960,7 @@ trait SMono[+T] extends SMonoLike[T] with MapablePublisher[T] with ScalaConverte
960960
* @return a re-subscribing [[SMono]] on onError up to the specified number of retries.
961961
*
962962
*/
963+
@deprecated("Use retryWhen(Retry)", since = "0.8.x")
963964
final def retry(numRetries: Long = Long.MaxValue, retryMatcher: Throwable => Boolean = (_: Throwable) => true): SMono[T] = coreMono.retry(numRetries, retryMatcher).asScala
964965

965966
/**
@@ -976,13 +977,68 @@ trait SMono[+T] extends SMonoLike[T] with MapablePublisher[T] with ScalaConverte
976977
* @return a re-subscribing [[SMono]] on onError when the companion [[Publisher]] produces an
977978
* onNext signal
978979
*/
980+
@deprecated("Use retryWhen(Retry)", since = "0.8.x")
979981
final def retryWhen(whenFactory: SFlux[Throwable] => Publisher[_]): SMono[T] = {
980982
val when = new Function[JFlux[Throwable], Publisher[_]] {
981983
override def apply(t: JFlux[Throwable]): Publisher[_] = whenFactory(new ReactiveSFlux[Throwable](t))
982984
}
983985
coreMono.retryWhen(when).asScala
984986
}
985987

988+
/**
989+
* Retries this [[SMono]] in response to signals emitted by a companion [[Publisher]].
990+
* The companion is generated by the provided [[Retry]] instance, see [[Retry.max(long)]], [[Retry.maxInARow(long)]]
991+
* and [[Retry.backoff(long, Duration)]] for readily available strategy builders.
992+
* <p>
993+
* The operator generates a base for the companion, a [[SFlux]] of [[reactor.util.retry.Retry.RetrySignal]]
994+
* which each give metadata about each retryable failure whenever this [[SMono]] signals an error. The final companion
995+
* should be derived from that base companion and emit data in response to incoming onNext (although it can emit less
996+
* elements, or delay the emissions).
997+
* <p>
998+
* Terminal signals in the companion terminate the sequence with the same signal, so emitting an [[Subscriber.onError(Throwable)]]
999+
* will fail the resulting [[SMono]] with that same error.
1000+
* <p>
1001+
* <img class="marble" src="doc-files/marbles/retryWhenSpecForMono.svg" alt="">
1002+
* <p>
1003+
* Note that the [[Retry.RetrySignal]] state can be transient and change between each source
1004+
* [[org.reactivestreams.Subscriber.onError(Throwable) onError]] or
1005+
* [[org.reactivestreams.Subscriber.onNext(Object) onNext]]. If processed with a delay,
1006+
* this could lead to the represented state being out of sync with the state at which the retry
1007+
* was evaluated. Map it to [[Retry.RetrySignal.copy]] right away to mediate this.
1008+
* <p>
1009+
* Note that if the companion [[Publisher]] created by the <code>whenFactory</code>
1010+
* emits [[Context]] as trigger objects, these [[Context]] will be merged with
1011+
* the previous Context:
1012+
* <blockquote>
1013+
* <pre>
1014+
* <code>
1015+
* Retry customStrategy = SRetry.from(companion => companion.handle((retrySignal, sink) => {
1016+
* val ctx: Context = sink.currentContext();
1017+
* val rl: Int = ctx.getOrDefault("retriesLeft", 0);
1018+
* if (rl > 0) {
1019+
* sink.next(Context.of(
1020+
* "retriesLeft", rl - 1,
1021+
* "lastError", retrySignal.failure()
1022+
* ));
1023+
* } else {
1024+
* sink.error(Exceptions.retryExhausted("retries exhausted", retrySignal.failure()));
1025+
* }
1026+
* }));
1027+
* val retried: SMono[T] = originalMono.retryWhen(customStrategy);
1028+
* }
1029+
* </code>
1030+
* </pre>
1031+
* </blockquote>
1032+
*
1033+
* @param retrySpec the { @link Retry} strategy that will generate the companion [[Publisher]],
1034+
* given a [[SFlux]] that signals each onError as a { @link reactor.util.retry.Retry.RetrySignal}.
1035+
* @return a [[SMono]] that retries on onError when a companion [[Publisher]] produces an onNext signal
1036+
* @see Retry.max(long)
1037+
* @see Retry.maxInARow(long)
1038+
* @see Retry.backoff(long, Duration)
1039+
*/
1040+
final def retryWhen(retrySpec: Retry): SMono[T] = coreMono.retryWhen(retrySpec).asScala
1041+
9861042
/**
9871043
* Expect exactly one item from this [[SMono]] source or signal
9881044
* [[java.util.NoSuchElementException]] for an empty source.

src/test/scala/reactor/core/scala/publisher/SMonoTest.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import reactor.core.scheduler.{Scheduler, Schedulers}
2222
import reactor.test.scheduler.VirtualTimeScheduler
2323
import reactor.test.{StepVerifier, StepVerifierOptions}
2424
import reactor.util.context.Context
25+
import reactor.util.scala.retry.SRetry
2526

2627
import scala.concurrent.Future
2728
import scala.concurrent.duration._
@@ -1126,6 +1127,12 @@ class SMonoTest extends AnyFreeSpec with Matchers with TestSupport with Idiomati
11261127
counter.get() shouldBe 4
11271128
}
11281129

1130+
".retryWhen should retry according to the spec" in {
1131+
val sMono = SMono.error[Long](new RuntimeException("ex")).retryWhen(SRetry.from(_ => SMono.just(randomValue)))
1132+
StepVerifier.create(sMono)
1133+
.verifyComplete()
1134+
}
1135+
11291136
".single" - {
11301137
"should enforce the existence of element" in {
11311138
StepVerifier.create(just(randomValue).single())

0 commit comments

Comments
 (0)