Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,31 @@ private static final class RateLimiterHolder {
}

public static <T> void startQueuingTimer(
ContextStore<T, State> taskContextStore, Class<?> schedulerClass, T task) {
ContextStore<T, State> taskContextStore,
Class<?> schedulerClass,
Class<?> queueClass,
int queueLength,
T task) {
State state = taskContextStore.get(task);
startQueuingTimer(state, schedulerClass, task);
startQueuingTimer(state, schedulerClass, queueClass, queueLength, task);
}

public static void startQueuingTimer(State state, Class<?> schedulerClass, Object task) {
public static void startQueuingTimer(
State state, Class<?> schedulerClass, Class<?> queueClass, int queueLength, Object task) {
if (Platform.isNativeImage()) {
// explicitly not supported for Graal native image
return;
}
// TODO consider queue length based sampling here to reduce overhead
// avoid calling this before JFR is initialised because it will lead to reading the wrong
// TSC frequency before JFR has set it up properly
if (task != null && state != null && InstrumentationBasedProfiling.isJFRReady()) {
QueueTiming timing =
(QueueTiming) AgentTracer.get().getProfilingContext().start(Timer.TimerType.QUEUEING);
timing.setTask(task);
timing.setScheduler(schedulerClass);
timing.setQueue(queueClass);
timing.setQueueLength(queueLength);
state.setTiming(timing);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ public class QueueTimeEvent extends Event implements QueueTiming {
@Label("Scheduler")
private Class<?> scheduler;

@Label("Queue")
private Class<?> queueType;

@Label("Queue Length on Entry")
private int queueLength;

public QueueTimeEvent() {
this.origin = Thread.currentThread();
AgentSpan activeSpan = AgentTracer.activeSpan();
Expand All @@ -55,6 +61,16 @@ public void setScheduler(Class<?> scheduler) {
this.scheduler = scheduler;
}

@Override
public void setQueue(Class<?> queueType) {
this.queueType = queueType;
}

@Override
public void setQueueLength(int queueLength) {
this.queueLength = queueLength;
}

@Override
public void report() {
commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,14 +451,21 @@ boolean shouldRecordQueueTimeEvent(long startMillis) {
return System.currentTimeMillis() - startMillis >= queueTimeThresholdMillis;
}

void recordQueueTimeEvent(long startTicks, Object task, Class<?> scheduler, Thread origin) {
void recordQueueTimeEvent(
long startTicks,
Object task,
Class<?> scheduler,
Class<?> queueType,
int queueLength,
Thread origin) {
if (profiler != null) {
// note: because this type traversal can update secondary_super_cache (see JDK-8180450)
// we avoid doing this unless we are absolutely certain we will record the event
Class<?> taskType = TaskWrapper.getUnwrappedType(task);
if (taskType != null) {
long endTicks = profiler.getCurrentTicks();
profiler.recordQueueTime(startTicks, endTicks, taskType, scheduler, origin);
profiler.recordQueueTime(
startTicks, endTicks, taskType, scheduler, queueType, queueLength, origin);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public class QueueTimeTracker implements QueueTiming {
// FIXME this can be eliminated by altering the instrumentation
// since it is known when the item is polled from the queue
private Class<?> scheduler;
private Class<?> queue;
private int queueLength;

public QueueTimeTracker(DatadogProfiler profiler, long startTicks) {
this.profiler = profiler;
Expand All @@ -31,13 +33,23 @@ public void setScheduler(Class<?> scheduler) {
this.scheduler = scheduler;
}

@Override
public void setQueue(Class<?> queue) {
this.queue = queue;
}

@Override
public void setQueueLength(int queueLength) {
this.queueLength = queueLength;
}

@Override
public void report() {
assert weakTask != null && scheduler != null;
Object task = this.weakTask.get();
if (task != null) {
// indirection reduces shallow size of the tracker instance
profiler.recordQueueTimeEvent(startTicks, task, scheduler, origin);
profiler.recordQueueTimeEvent(startTicks, task, scheduler, queue, queueLength, origin);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.nio.channels.Channel;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import net.bytebuddy.asm.Advice;

@AutoService(InstrumenterModule.class)
Expand Down Expand Up @@ -66,7 +67,14 @@ public static final class Construct {
public static void after(@Advice.This Object command) {
ContextStore<Object, State> contextStore = InstrumentationContext.get(QUEUED_COMMAND, STATE);
capture(contextStore, command);
QueueTimerHelper.startQueuingTimer(contextStore, Channel.class, command);
// FIXME hard to handle both the lifecyle and get access to the queue instance in the same
// frame within the WriteQueue class.
// This means we can't get the queue length. A (bad) alternative would be to instrument
// ConcurrentLinkedQueue broadly,
// or we could write more brittle instrumentation targeting code patterns in different gRPC
// versions.
QueueTimerHelper.startQueuingTimer(
contextStore, Channel.class, ConcurrentLinkedQueue.class, 0, command);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import net.bytebuddy.asm.Advice;
Expand Down Expand Up @@ -162,12 +163,20 @@ public static void capture(
// excluded as
// Runnables but it is not until now that they will be put on the executor's queue
if (!exclude(RUNNABLE, task)) {
Queue<?> queue = tpe.getQueue();
QueueTimerHelper.startQueuingTimer(
InstrumentationContext.get(Runnable.class, State.class), tpe.getClass(), task);
InstrumentationContext.get(Runnable.class, State.class),
tpe.getClass(),
queue.getClass(),
queue.size(),
task);
} else if (!exclude(RUNNABLE_FUTURE, task) && task instanceof RunnableFuture) {
Queue<?> queue = tpe.getQueue();
QueueTimerHelper.startQueuingTimer(
InstrumentationContext.get(RunnableFuture.class, State.class),
tpe.getClass(),
queue.getClass(),
queue.size(),
(RunnableFuture<?>) task);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.java.concurrent.QueueTimerHelper;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import net.bytebuddy.asm.Advice;

Expand Down Expand Up @@ -53,13 +51,11 @@ public void methodAdvice(MethodTransformer transformer) {
public static final class ExternalPush {
@SuppressWarnings("rawtypes")
@Advice.OnMethodEnter
public static <T> void externalPush(
@Advice.This ForkJoinPool pool, @Advice.Argument(0) ForkJoinTask<T> task) {
public static <T> void externalPush(@Advice.Argument(0) ForkJoinTask<T> task) {
if (!exclude(FORK_JOIN_TASK, task)) {
ContextStore<ForkJoinTask, State> contextStore =
InstrumentationContext.get(ForkJoinTask.class, State.class);
capture(contextStore, task);
QueueTimerHelper.startQueuingTimer(contextStore, pool.getClass(), task);
}
}

Expand All @@ -74,13 +70,11 @@ public static <T> void cleanup(

public static final class PoolSubmit {
@Advice.OnMethodEnter
public static <T> void poolSubmit(
@Advice.This ForkJoinPool pool, @Advice.Argument(1) ForkJoinTask<T> task) {
public static <T> void poolSubmit(@Advice.Argument(1) ForkJoinTask<T> task) {
if (!exclude(FORK_JOIN_TASK, task)) {
ContextStore<ForkJoinTask, State> contextStore =
InstrumentationContext.get(ForkJoinTask.class, State.class);
capture(contextStore, task);
QueueTimerHelper.startQueuingTimer(contextStore, pool.getClass(), task);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package datadog.trace.instrumentation.java.concurrent.forkjoin;

import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.declaresField;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.ExcludeType.FORK_JOIN_TASK;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.exclude;
import static datadog.trace.instrumentation.java.concurrent.ConcurrentInstrumentationNames.EXECUTOR_INSTRUMENTATION_NAME;
import static datadog.trace.instrumentation.java.concurrent.ConcurrentInstrumentationNames.FORK_JOIN_POOL_INSTRUMENTATION_NAME;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.fieldType;
import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.config.ProfilingConfig;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.config.provider.ConfigProvider;
import datadog.trace.bootstrap.instrumentation.java.concurrent.QueueTimerHelper;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import net.bytebuddy.asm.Advice;

@AutoService(InstrumenterModule.class)
public class JavaForkJoinWorkQueueInstrumentation extends InstrumenterModule.Profiling
implements Instrumenter.ForBootstrap, Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

public JavaForkJoinWorkQueueInstrumentation() {
super(
EXECUTOR_INSTRUMENTATION_NAME,
FORK_JOIN_POOL_INSTRUMENTATION_NAME,
FORK_JOIN_POOL_INSTRUMENTATION_NAME + "-workqueue");
}

@Override
public String instrumentedType() {
return "java.util.concurrent.ForkJoinPool$WorkQueue";
}

@Override
public boolean isEnabled() {
return super.isEnabled()
&& ConfigProvider.getInstance()
.getBoolean(
ProfilingConfig.PROFILING_QUEUEING_TIME_ENABLED,
ProfilingConfig.PROFILING_QUEUEING_TIME_ENABLED_DEFAULT);
}

@Override
public Map<String, String> contextStore() {
return singletonMap("java.util.concurrent.ForkJoinTask", State.class.getName());
}

@Override
public void methodAdvice(MethodTransformer transformer) {
String name = getClass().getName();
transformer.applyAdvice(
isMethod()
.and(named("push"))
.and(takesArgument(0, named("java.util.concurrent.ForkJoinTask")))
.and(
isDeclaredBy(
declaresField(fieldType(int.class).and(named("top")))
.and(declaresField(fieldType(int.class).and(named("base")))))),
name + "$PushTask");
}

public static final class PushTask {
@SuppressWarnings("rawtypes")
@Advice.OnMethodEnter(suppress = Throwable.class)
public static <T> void push(
@Advice.This Object workQueue,
@Advice.FieldValue("top") int top,
@Advice.FieldValue("base") int base,
@Advice.Argument(0) ForkJoinTask<T> task) {
if (!exclude(FORK_JOIN_TASK, task)) {
ContextStore<ForkJoinTask, State> contextStore =
InstrumentationContext.get(ForkJoinTask.class, State.class);
QueueTimerHelper.startQueuingTimer(
contextStore, ForkJoinPool.class, workQueue.getClass(), top - base, task);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.ExcludeType.RUNNABLE;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.exclude;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.QueueTimerHelper.startQueuingTimer;
import static datadog.trace.instrumentation.java.concurrent.ConcurrentInstrumentationNames.EXECUTOR_INSTRUMENTATION_NAME;
import static datadog.trace.instrumentation.java.concurrent.ConcurrentInstrumentationNames.RUNNABLE_INSTRUMENTATION_NAME;
import static java.util.Collections.singletonMap;
Expand All @@ -21,7 +20,6 @@
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import net.bytebuddy.asm.Advice;

Expand Down Expand Up @@ -67,7 +65,6 @@ public static void before(@Advice.Argument(0) TimerTask task, @Advice.Argument(2
ContextStore<Runnable, State> contextStore =
InstrumentationContext.get(Runnable.class, State.class);
capture(contextStore, task);
startQueuingTimer(contextStore, Timer.class, task);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.TestProfilingContextIntegration
import datadog.trace.api.Platform
import datadog.trace.bootstrap.instrumentation.jfr.InstrumentationBasedProfiling

import java.util.concurrent.Executors
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.LinkedBlockingQueue

import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace

Expand All @@ -20,21 +23,34 @@ class QueueTimingForkedTest extends AgentTestRunner {
def "test queue timing with submit"() {
setup:
def executor = Executors.newSingleThreadExecutor()
def fjp = new ForkJoinPool(1)

when:
runUnderTrace("parent", {
executor.submit(new TestRunnable()).get()
})

then:
verify()
verify(LinkedBlockingQueue.name)

when:
runUnderTrace("parent", {
fjp.submit(new TestRunnable()).get()
})

then:
// flaky before JDK21
if (Platform.isJavaVersionAtLeast(21)) {
verify("java.util.concurrent.ForkJoinPool\$WorkQueue")
}

cleanup:
executor.shutdown()
fjp.shutdown()
TEST_PROFILING_CONTEXT_INTEGRATION.closedTimings.clear()
}

void verify() {
void verify(expectedQueueType) {
assert TEST_PROFILING_CONTEXT_INTEGRATION.isBalanced()
assert !TEST_PROFILING_CONTEXT_INTEGRATION.closedTimings.isEmpty()
int numAsserts = 0
Expand All @@ -45,6 +61,8 @@ class QueueTimingForkedTest extends AgentTestRunner {
assert timing.task == TestRunnable
assert timing.scheduler != null
assert timing.origin == Thread.currentThread()
assert timing.queueLength >= 0
assert timing.queue.name == expectedQueueType
numAsserts++
}
}
Expand Down
Loading
Loading