Using CompletableFuture in Kotlinx.Coroutine
kotlin/coroutine/completable-future
Await the result of CompletableFuture in Kotlinx.Coroutine
1private class ContinuationConsumer<T>(
2    @Volatile @JvmField var cont: Continuation<T>?
3) : BiConsumer<T?, Throwable?> {
4    @Suppress("UNCHECKED_CAST")
5    override fun accept(result: T?, exception: Throwable?) {
6        val cont = this.cont ?: return // atomically read current value unless null
7        if (exception == null) {
8            // the future has completed normally
9            cont.resume(result as T)
10        } else {
11            // the future has completed with an exception, unwrap it to provide consistent view of .await() result and to propagate only original exception
12            cont.resumeWithException((exception as? CompletionException)?.cause ?: exception)
13        }
14    }
15}
16
17/**
18 * Awaits for completion of [CompletionStage] without blocking a thread.
19 *
20 * This suspending function is cancellable.
21 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
22 * stops waiting for the completion stage and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
23 *
24 * This method is intended to be used with one-shot futures, so on coroutine cancellation the [CompletableFuture] that
25 * corresponds to this [CompletionStage] (see [CompletionStage.toCompletableFuture])
26 * is cancelled. If cancelling the given stage is undesired, `stage.asDeferred().await()` should be used instead.
27 */
28public suspend fun <T> CompletionStage<T>.await(): T {
29    val future = toCompletableFuture() // retrieve the future
30    // fast path when CompletableFuture is already done (does not suspend)
31    if (future.isDone) {
32        try {
33            @Suppress("BlockingMethodInNonBlockingContext")
34            return future.get() as T
35        } catch (e: ExecutionException) {
36            throw e.cause ?: e // unwrap original cause from ExecutionException
37        }
38    }
39    // slow path -- suspend
40    return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
41        val consumer = ContinuationConsumer(cont)
42        whenComplete(consumer)
43        cont.invokeOnCancellation {
44            future.cancel(false)
45            consumer.cont = null // shall clear reference to continuation to aid GC
46        }
47    }
48}
Unauthorized reproduction of original content on this website is prohibited.