Skip to content

Provide Kotlin friendly, i.e., coroutine, API in interceptors? #223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
kvn-esque opened this issue Jan 3, 2021 · 9 comments
Open

Provide Kotlin friendly, i.e., coroutine, API in interceptors? #223

kvn-esque opened this issue Jan 3, 2021 · 9 comments
Assignees

Comments

@kvn-esque
Copy link

Looking at the interceptor API, it seems the only way to perform async non-blocking IO work is something along the lines of
https://stackoverflow.com/questions/53651024/grpc-java-async-call-in-serverinterceptor

Is it possible to provide a more Kotlin friendly API for interceptors?

@kvn-esque
Copy link
Author

Any thoughts? @jamesward @lowasser

@jamesward
Copy link
Collaborator

This would be one for @lowasser

@lowasser
Copy link
Collaborator

lowasser commented Jun 1, 2021

Does Java provide any async API for interceptors? I don't think it does.

@kvn-esque
Copy link
Author

This seems like it's an async API for gRPC in Java land

https://grpc.github.io/grpc-java/javadoc/io/grpc/ServerCall.Listener.html

@lowasser
Copy link
Collaborator

lowasser commented Aug 3, 2021

Yes, but that's for the RPC part, not interceptors.

Java interceptors appear strictly asynchronous.

@lns-ross
Copy link

It's been a while and this is still an open question (and I hate to be 'that person') but ... was there any consensus or conclusion (good or bad) for a Kotlin-friendly approach? We'd rather do it by convention in the calls than use the solution offered up in the link in the OP. At least then we can do coroutine-based calls out to sibling gRPC services. TIA.

@be-hase
Copy link

be-hase commented Jun 2, 2022

I think this Issue proposal is very good.
Therefore, I have implemented the following interceptor. I was also able to confirm that the example code works.

/**
 * https://stackoverflow.com/questions/53651024/grpc-java-async-call-in-serverinterceptor
 */
abstract class SuspendableServerInterceptor(
    private val context: CoroutineContext = EmptyCoroutineContext
) : ServerInterceptor {
    override fun <ReqT : Any, RespT : Any> interceptCall(
        call: ServerCall<ReqT, RespT>,
        headers: Metadata,
        next: ServerCallHandler<ReqT, RespT>
    ): ServerCall.Listener<ReqT> {
        val delayedListener = DelayedListener<ReqT>()
        delayedListener.job = CoroutineScope(
            GrpcContextElement.current()
                    + COROUTINE_CONTEXT_KEY.get()
                    + context
        ).launch {
            try {
                delayedListener.realListener = suspendableInterceptCall(call, headers, next)
                delayedListener.drainPendingCallbacks()
            } catch (e: CancellationException) {
                log.debug { "Caught CancellationException. $e" }
                call.close(Status.CANCELLED, Metadata())
            } catch (e: Exception) {
                log.error(e) { "Unhandled exception. $e" }
                call.close(Status.UNKNOWN, Metadata())
            }
        }
        return delayedListener
    }

    abstract suspend fun <ReqT : Any, RespT : Any> suspendableInterceptCall(
        call: ServerCall<ReqT, RespT>,
        headers: Metadata,
        next: ServerCallHandler<ReqT, RespT>
    ): ServerCall.Listener<ReqT>

    /**
     * ref: https://github.com/grpc/grpc-java/blob/84edc332397ed01fae2400c25196fc90d8c1a6dd/core/src/main/java/io/grpc/internal/DelayedClientCall.java#L415
     */
    private class DelayedListener<ReqT> : ServerCall.Listener<ReqT>() {
        var realListener: ServerCall.Listener<ReqT>? = null

        @Volatile
        private var passThrough = false

        @GuardedBy("this")
        private var pendingCallbacks: MutableList<Runnable> = mutableListOf()

        var job: Job? = null

        override fun onMessage(message: ReqT) {
            if (passThrough) {
                checkNotNull(realListener).onMessage(message)
            } else {
                delayOrExecute { checkNotNull(realListener).onMessage(message) }
            }
        }

        override fun onHalfClose() {
            if (passThrough) {
                checkNotNull(realListener).onHalfClose()
            } else {
                delayOrExecute { checkNotNull(realListener).onHalfClose() }
            }
        }

        override fun onCancel() {
            job?.cancel()
            if (passThrough) {
                checkNotNull(realListener).onCancel()
            } else {
                delayOrExecute { checkNotNull(realListener).onCancel() }
            }
        }

        override fun onComplete() {
            if (passThrough) {
                checkNotNull(realListener).onComplete()
            } else {
                delayOrExecute { checkNotNull(realListener).onComplete() }
            }
        }

        override fun onReady() {
            if (passThrough) {
                checkNotNull(realListener).onReady()
            } else {
                delayOrExecute { checkNotNull(realListener).onReady() }
            }
        }

        private fun delayOrExecute(runnable: Runnable) {
            synchronized(this) {
                if (!passThrough) {
                    pendingCallbacks.add(runnable)
                    return
                }
            }
            runnable.run()
        }

        fun drainPendingCallbacks() {
            check(!passThrough)
            var toRun: MutableList<Runnable> = mutableListOf()
            while (true) {
                synchronized(this) {
                    if (pendingCallbacks.isEmpty()) {
                        pendingCallbacks = mutableListOf()
                        passThrough = true
                        return
                    }
                    // Since there were pendingCallbacks, we need to process them. To maintain ordering we
                    // can't set passThrough=true until we run all pendingCallbacks, but new Runnables may be
                    // added after we drop the lock. So we will have to re-check pendingCallbacks.
                    val tmp: MutableList<Runnable> = toRun
                    toRun = pendingCallbacks
                    pendingCallbacks = tmp
                }
                for (runnable in toRun) {
                    // Avoid calling listener while lock is held to prevent deadlocks.
                    runnable.run()
                }
                toRun.clear()
            }
        }
    }

    companion object {
        private val log = KotlinLogging.logger {}

        @Suppress("UNCHECKED_CAST")
        // Get by using reflection
        internal val COROUTINE_CONTEXT_KEY: Context.Key<CoroutineContext> =
            CoroutineContextServerInterceptor::class.let { kclass ->
                val companionObject = kclass.companionObject!!
                val property = companionObject.memberProperties.single { it.name == "COROUTINE_CONTEXT_KEY" }
                checkNotNull(property.getter.call(kclass.companionObjectInstance!!)) as Context.Key<CoroutineContext>
            }
    }
}

@jetaggart
Copy link

@be-hase can you provide your imports? I'm having a hard time seeing where a few things come from.

@gregkonush
Copy link

Wonder if there is any easy approach to this, would be nice if the library provides this natively.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants