Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion stub/src/main/java/io/grpc/kotlin/Helpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

package io.grpc.kotlin

import io.grpc.Metadata
import io.grpc.Status
import io.grpc.StatusException
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.single
import kotlinx.coroutines.runBlocking
Expand Down Expand Up @@ -84,3 +85,14 @@ internal suspend fun <T> Flow<T>.singleOrStatus(
expected: String,
descriptor: Any
): T = singleOrStatusFlow(expected, descriptor).single()

/**
* Returns gRPC Metadata.
*/
suspend fun grpcMetadata(): Metadata {
val metadataElement = coroutineContext[MetadataElement]
?: throw Status.INTERNAL
.withDescription("gRPC Metadata not found in coroutineContext. Ensure that MetadataCoroutineContextInterceptor is used in gRPC server.")
.asException()
return metadataElement.value
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.grpc.kotlin

import io.grpc.Metadata
import io.grpc.ServerCall
import kotlin.coroutines.CoroutineContext

/**
* Propagates gRPC Metadata (HTTP Headers) to coroutineContext.
* Attach the interceptor to gRPC Server and then access the Metadata using grpcMetadata() function.
*
* Example usage:
*
* ServerBuilder.forPort(8060)
* .addService(GreeterImpl())
* .intercept(MetadataCoroutineContextInterceptor())
*
* grpcMetadata()
*/
class MetadataCoroutineContextInterceptor : CoroutineContextServerInterceptor() {
final override fun coroutineContext(call: ServerCall<*, *>, headers: Metadata): CoroutineContext {
return MetadataElement(value = headers)
}
}

/**
* Used for accessing the gRPC Metadata from coroutineContext.
* Example usage:
* coroutineContext[MetadataElement]?.value
*/
internal data class MetadataElement(val value: Metadata) : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<MetadataElement>

override val key: CoroutineContext.Key<MetadataElement> get() = Key
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package io.grpc.kotlin

import io.grpc.BindableService
import io.grpc.Channel
import io.grpc.Metadata
import io.grpc.Status
import io.grpc.StatusException
import io.grpc.examples.helloworld.GreeterGrpcKt
import io.grpc.examples.helloworld.HelloReply
import io.grpc.examples.helloworld.HelloRequest
import io.grpc.inprocess.InProcessChannelBuilder
import io.grpc.inprocess.InProcessServerBuilder
import io.grpc.testing.GrpcCleanupRule
import kotlinx.coroutines.runBlocking
import org.junit.Rule
import org.junit.Test
import org.junit.jupiter.api.Assertions
import org.junit.runner.RunWith
import org.junit.runners.JUnit4

@RunWith(JUnit4::class)
class MetadataCoroutineContextInterceptorTest {
@Rule
@JvmField
val grpcCleanup = GrpcCleanupRule()

@Test
fun `interceptor provides gRPC Metadata to coroutineContext`() {
val key = Metadata.Key.of("test-header", Metadata.ASCII_STRING_MARSHALLER)
val clientStub =
GreeterGrpcKt.GreeterCoroutineStub(testChannel(object : GreeterGrpcKt.GreeterCoroutineImplBase() {
override suspend fun sayHello(request: HelloRequest): HelloReply {
val metadata = grpcMetadata()
return HelloReply.newBuilder()
.setMessage(metadata.get(key).toString())
.build()
}
}))
val metadata = Metadata()
metadata.put(key, "Test message")

val response = runBlocking { clientStub.sayHello(HelloRequest.getDefaultInstance(), metadata) }

Assertions.assertEquals("Test message", response.message)
}

@Test
fun `fails to extract gRPC Metadata if interceptor is not injected`() {
val key = Metadata.Key.of("test-header", Metadata.ASCII_STRING_MARSHALLER)
val clientStub =
GreeterGrpcKt.GreeterCoroutineStub(testChannel(object : GreeterGrpcKt.GreeterCoroutineImplBase() {
override suspend fun sayHello(request: HelloRequest): HelloReply {
val metadata = grpcMetadata()
return HelloReply.newBuilder()
.setMessage(metadata.get(key).toString())
.build()
}
}, false))
val metadata = Metadata()
metadata.put(key, "Test message")

val exception = Assertions.assertThrows(StatusException::class.java) {
runBlocking { clientStub.sayHello(HelloRequest.getDefaultInstance(), metadata) }
}
Assertions.assertEquals(Status.INTERNAL.code, exception.status.code)
Assertions.assertEquals(
"gRPC Metadata not found in coroutineContext. Ensure that MetadataCoroutineContextInterceptor is used in gRPC server.",
exception.status.description
)
}

private fun testChannel(service: BindableService, attachInterceptor: Boolean = true): Channel {
val serverName = InProcessServerBuilder.generateName()
var builder = InProcessServerBuilder.forName(serverName).directExecutor()
if (attachInterceptor) {
builder = builder.intercept(MetadataCoroutineContextInterceptor())
}
grpcCleanup.register(builder.addService(service).build().start())
return grpcCleanup.register(InProcessChannelBuilder.forName(serverName).directExecutor().build())
}
}
Loading