Skip to content

Commit 4fe2a1e

Browse files
authored
chore: Make timeoutCompletionStage accept java Duration. (#2063) (#2071)
(cherry picked from commit 442ecd7)
1 parent 2aa1e8b commit 4fe2a1e

File tree

3 files changed

+10
-7
lines changed

3 files changed

+10
-7
lines changed

actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,7 @@ public void testCompletedWithTimeout() throws Exception {
533533
delayedStage.toCompletableFuture().get(3, SECONDS);
534534
} catch (ExecutionException e) {
535535
assertTrue(e.getCause() instanceof TimeoutException);
536-
assertEquals("Timeout of 200 milliseconds expired", e.getCause().getMessage());
536+
assertEquals("Timeout of PT0.2S expired", e.getCause().getMessage());
537537
}
538538
}
539539

actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import java.util.concurrent.{ CompletableFuture, CompletionStage, TimeoutExcepti
1717
import scala.concurrent.{ ExecutionContext, Future, Promise }
1818
import scala.concurrent.duration.FiniteDuration
1919
import scala.util.control.NonFatal
20+
2021
import org.apache.pekko
2122
import pekko.actor._
2223
import pekko.dispatch.Futures
@@ -126,7 +127,7 @@ trait FutureTimeoutSupport {
126127
* @since 1.2.0
127128
*/
128129
@nowarn("msg=deprecated")
129-
def timeoutCompletionStage[T](duration: FiniteDuration, using: Scheduler)(value: => CompletionStage[T])(
130+
def timeoutCompletionStage[T](duration: java.time.Duration, using: Scheduler)(value: => CompletionStage[T])(
130131
implicit ec: ExecutionContext): CompletionStage[T] = {
131132
val stage: CompletionStage[T] =
132133
try value
@@ -137,10 +138,12 @@ trait FutureTimeoutSupport {
137138
stage
138139
} else {
139140
val p = new CompletableFuture[T]
140-
val timeout = using.scheduleOnce(duration) {
141-
p.completeExceptionally(new TimeoutException(s"Timeout of $duration expired"))
142-
stage.toCompletableFuture.cancel(true)
143-
}
141+
val timeout = using.scheduleOnce(duration,
142+
() => {
143+
p.completeExceptionally(new TimeoutException(s"Timeout of $duration expired"))
144+
stage.toCompletableFuture.cancel(true)
145+
()
146+
})
144147
stage.handle[Unit]((v: T, ex: Throwable) => {
145148
timeout.cancel()
146149
if (v != null) p.complete(v)

actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,7 @@ object Patterns {
480480
scheduler: Scheduler,
481481
context: ExecutionContext,
482482
value: Callable[CompletionStage[T]]): CompletionStage[T] =
483-
timeoutCompletionStage(duration.asScala, scheduler)(value.call())(context)
483+
timeoutCompletionStage(duration, scheduler)(value.call())(context)
484484

485485
/**
486486
* Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided Callable

0 commit comments

Comments
 (0)