Description
Describe the bug
When collecting a ChannelFlow
, on which first map
and then flowOn(span.asContextElement())
was called, after the the collect{}
call completes, Context.current()
still contains the span used earlier, while coroutineContext.getOpenTelemetryContext()
returns an empty context.
Given that no direct calls to makeCurrent()
or close()
were made, this feels like a bug in the kotlin-extension.
Steps to reproduce
The following test fails:
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.context.Context
import io.opentelemetry.extension.kotlin.asContextElement
import io.opentelemetry.extension.kotlin.getOpenTelemetryContext
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.test.assertEquals
class CollectChannelFlowSpanContextTest {
val tracer by lazy {
GlobalOpenTelemetry.getTracer("CollectChannelFlowSpanContextTest")
}
@Test
fun collectChannelFlowWithSpanDoesNotLeakContext() = runTest {
val constructedFlow = channelFlow<Nothing?> { }
val transformedFlow = constructedFlow.map { it } //.map prevents fusion of flowOn into channelFlow
val flowExecutionSpan = tracer.spanBuilder("flowSpan").startSpan()
transformedFlow.flowOn(flowExecutionSpan.asContextElement()).collect { }
flowExecutionSpan.end()
assertEquals(coroutineContext.getOpenTelemetryContext(), Context.current())
}
}
Both .map { it }
(or a similar flowOn
-fusion-blocking operator) and channelFlow
are required for this to occur. Removing the call to map
or replacing the channelFlow
with a normal flow
both result in the test passing / the correct context being returned.
What did you expect to see?
I expected Context.current()
to return the empty root context, just like coroutineContext.getOpenTelemetryContext()
.
What did you see instead?
The context returned by Context.current()
was not empty:
org.opentest4j.AssertionFailedError: expected: <{}> but was: <{opentelemetry-trace-span-key=PropagatedSpan{ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}}}>
What version and what artifacts are you using?
Artifacts: opentelemetry-api
, opentelemetry-context
, opentelemetry-extension-kotlin
Version: 1.48.0
How did you reference these artifacts?
plugins {
kotlin("jvm") version "2.1.10"
}
//[...]
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1")
implementation(platform("io.opentelemetry:opentelemetry-bom:1.48.0"))
implementation("io.opentelemetry:opentelemetry-api:1.48.0")
implementation("io.opentelemetry:opentelemetry-context:1.48.0")
implementation("io.opentelemetry:opentelemetry-extension-kotlin:1.48.0")
testImplementation(kotlin("test"))
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.1")
}
//[...]
Environment
Java-Compiler: Oracle OpenJDK 17 (javac)
Kotlin-Compiler: 2.1.10, K2
OS: Windows10
Gradle: 8.10
IDE: IntelliJ IDEA 2024.3.4.1 (Ultimate Edition)
Additional context
Original symptom of the bug was an incorrectly attached span.
After the Flow::collect
call (which in my case was Flow::toSet()
instead), the following span was attached to the span of the flow, not the "parent" span spanning the whole function.
The code that led to this was part of a custom kotlin http-request library, which I wanted to add first-party span-data to. I then simplified the code until I was left with the above test.
As part of a different project I am working on (kotlin multiplatform opentelemetry facade) I also have an additional kotlin implementation of the ContinuationInterceptor targeting JavaScript, which appears to not have this problem. I will see if I can share it tomorrow, although I have not checked if it passes your existing tests here.