异步链路追踪

同步调用的链路追踪很简单:请求来,Span 创建,调用下游,下游 Span 创建,层层传递。但异步场景就不一样了——线程池提交任务、消息队列消费、定时任务,这些场景下当前线程和发起请求的线程不是同一个,Context 不会自动跟着线程切换走。

这就是异步链路追踪的难题:如何确保异步任务的 Span 能正确关联到原始请求?

为什么异步链路追踪是难题

线程切换导致 Context 丢失

Context
@Service
public class OrderService {

    private final ExecutorService executor = Executors.newFixedThreadPool(10);

    public void placeOrderAsync(Order order) {
        Span span = tracer.spanBuilder("placeOrder").startSpan();

        try (Scope scope = span.makeCurrent()) {
            // 主线程:Context 在这里
            executor.submit(() -> {
                // ⚠️ 新线程:Context 丢失了!
                // span 在主线程创建,但执行在新线程
                processPayment(order); // 这个 Span 和主线程的 Span 失去关联
            });
        } finally {
            span.end();
        }
    }
}

问题的本质

OTel 的 Context 存储在 ThreadLocal 中。当线程切换时,ThreadLocal 的值不会自动跟随——新线程看不到主线程的 Context。

CompletableFuture 场景

错误做法

// ❌ 错误:CompletableFuture 内部线程执行,Context 丢失
public CompletableFuture<Order> placeOrder(Order order) {
    return CompletableFuture.supplyAsync(() -> {
        // 这里拿不到原始 Context
        return orderService.process(order);
    });
}

正确做法:显式传递 Context

// ✅ 正确:在主线程捕获 Context,传递给异步任务
public CompletableFuture<Order> placeOrder(Order order) {
    // 在主线程中捕获当前 Context
    Context context = Context.current();

    return CompletableFuture.supplyAsync(() -> {
        // 在异步线程中恢复 Context
        try (Scope scope = context.makeCurrent()) {
            Span span = tracer.spanBuilder("placeOrder-async")
                .setParent(context)  // 关联到原始 Span
                .startSpan();

            try (Scope childScope = span.makeCurrent()) {
                return orderService.process(order);
            } finally {
                span.end();
            }
        }
    }, executor);
}

使用 OTel 的 Context 工具

AsyncContextHelper.java
@Component
@Slf4j
public class AsyncContextHelper {

    private final OpenTelemetry openTelemetry;
    private final Tracer tracer;

    public AsyncContextHelper(OpenTelemetry openTelemetry) {
        this.openTelemetry = openTelemetry;
        this.tracer = openTelemetry.getTracer("async-service");
    }

    /**
     * 在 CompletableFuture 中自动传播 Context
     */
    public <T> CompletableFuture<T> traceAsync(
            Supplier<T> task,
            String spanName) {

        // 捕获当前 Context
        Context currentContext = Context.current();

        return CompletableFuture
            .supplyAsync(() -> {
                // 恢复 Context
                try (Scope scope = currentContext.makeCurrent()) {
                    Span span = tracer.spanBuilder(spanName)
                        .setParent(currentContext)
                        .startSpan();

                    try {
                        T result = task.get();
                        span.setStatus(StatusCode.OK);
                        return result;
                    } catch (Exception e) {
                        span.setStatus(StatusCode.ERROR, e.getMessage());
                        span.recordException(e);
                        throw e;
                    } finally {
                        span.end();
                    }
                }
            });
    }

    /**
     * 线程池的 Context 传播包装
     */
    public ExecutorService wrapExecutor(ExecutorService executor) {
        return new ContextPreservingExecutorService(executor);
    }

    private static class ContextPreservingExecutorService
            implements ExecutorService {

        private final ExecutorService delegate;

        ContextPreservingExecutorService(ExecutorService delegate) {
            this.delegate = delegate;
        }

        @Override
        public void execute(Runnable task) {
            // 捕获提交线程的 Context
            Context context = Context.current();

            delegate.execute(() -> {
                // 在执行线程恢复 Context
                try (Scope scope = context.makeCurrent()) {
                    task.run();
                }
            });
        }

        // 其他 ExecutorService 方法类似...
    }
}

线程池场景

自定义线程池

TracingThreadPoolExecutor.java
public class TracingThreadPoolExecutor extends ThreadPoolExecutor {

    private final Context context;
    private final Tracer tracer;

    public TracingThreadPoolExecutor(
            int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue,
            OpenTelemetry openTelemetry) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.context = Context.current();
        this.tracer = openTelemetry.getTracer("thread-pool");
    }

    @Override
    public void execute(Runnable task) {
        // 捕获提交线程的 Context
        Context currentContext = this.context;

        super.execute(() -> {
            // 在工作线程中恢复 Context
            try (Scope scope = currentContext.makeCurrent()) {
                task.run();
            }
        });
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        Context currentContext = this.context;

        return super.submit(() -> {
            try (Scope scope = currentContext.makeCurrent()) {
                return task.call();
            }
        });
    }
}

Spring 的线程池配置

AsyncConfig.java
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {

    @Autowired
    private OpenTelemetry openTelemetry;

    @Override
    public Executor getAsyncExecutor() {
        return new ContextAwareThreadPoolTaskExecutor(
            SimpleAsyncTaskExecutor.builder()
                .corePoolSize(10)
                .maxPoolSize(50)
                .queueCapacity(100)
                .build(),
            openTelemetry
        );
    }
}

/**
 * 自动传播 Context 的 Spring 异步执行器
 */
public class ContextAwareThreadPoolTaskExecutor
        extends TaskExecutorAdapter {

    private final OpenTelemetry openTelemetry;

    public ContextAwareThreadPoolTaskExecutor(
            TaskExecutor delegate,
            OpenTelemetry openTelemetry) {
        super(delegate);
        this.openTelemetry = openTelemetry;
    }

    @Override
    public void execute(Runnable task) {
        Context context = Context.current();
        super.execute(() -> {
            try (Scope scope = context.makeCurrent()) {
                task.run();
            }
        });
    }
}

消息队列场景

Kafka 的异步处理已在「上下文传播」文章中讲过,这里补充 RabbitMQ 和 RocketMQ。

RocketMQ

RocketMQTracingInterceptor.java
@Service
public class RocketMQTracingInterceptor {

    private final OpenTelemetry openTelemetry;
    private final Tracer tracer;

    public RocketMQTracingInterceptor(OpenTelemetry openTelemetry) {
        this.openTelemetry = openTelemetry;
        this.tracer = openTelemetry.getTracer("rocketmq");
    }

    @rocketmqFilter
    public void handleOrderMessage(Message message, ConsumeOrderContext context) {
        // 从消息 Header 提取 TraceContext
        Context extracted = W3CTraceContextPropagator.getInstance()
            .extract(Context.current(), message.getProperties(), GETTER);

        Span span = tracer.spanBuilder("rocketmq.consume")
            .setParent(extracted)
            .setAttribute("messaging.system", "rocketmq")
            .setAttribute("messaging.destination", message.getTopic())
            .setAttribute("messaging.operation", "receive")
            .startSpan();

        try (Scope scope = span.makeCurrent()) {
            processMessage(message);
            span.setStatus(StatusCode.OK);
        } catch (Exception e) {
            span.setStatus(StatusCode.ERROR, e.getMessage());
            span.recordException(e);
            throw e;
        } finally {
            span.end();
        }
    }

    private TextMapGetter<Map<String, String>> GETTER = (carrier, key) -> {
        if (carrier instanceof Map) {
            return ((Map<String, String>) carrier).get(key);
        }
        return null;
    };
}

定时任务场景

Spring @Scheduled

ScheduledTaskTracing.java
@Configuration
public class ScheduledTaskTracing implements SchedulingConfigurer {

    @Autowired
    private OpenTelemetry openTelemetry;

    @Override
    public void configureTasks(ScheduledTaskRegistrar registrar) {
        registrar.setTaskScheduler(
            new ContextAwareTaskScheduler(
                new ConcurrentTaskScheduler(),
                openTelemetry
            )
        );
    }

    @Bean
    public ScheduledExecutorService scheduledExecutor() {
        return new ContextAwareScheduledExecutorService(
            Executors.newScheduledThreadPool(5),
            openTelemetry
        );
    }
}

public class ContextAwareScheduledExecutorService
        implements ScheduledExecutorService {

    private final ScheduledExecutorService delegate;
    private final Tracer tracer;

    public ContextAwareScheduledExecutorService(
            ScheduledExecutorService delegate,
            OpenTelemetry openTelemetry) {
        this.delegate = delegate;
        this.tracer = openTelemetry.getTracer("scheduled");
    }

    @Override
    public ScheduledFuture<?> schedule(
            Runnable command, long delay, TimeUnit unit) {

        Context context = Context.current();
        Span span = tracer.spanBuilder("scheduled-task")
            .setParent(context)
            .startSpan();

        Runnable traced = () -> {
            try (Scope scope = span.makeCurrent()) {
                command.run();
            } finally {
                span.end();
            }
        };

        return delegate.schedule(traced, delay, unit);
    }
    // ... 其他方法
}

常见问题

问题一:Context 传播到一半断了

排查方法:

  1. 检查提交任务的线程是否有活跃的 Span
  2. 检查 Context.current() 是否返回了有效的 Context
  3. 检查线程池是否被包装了 ContextPreservingExecutorService

问题二:异步 Span 没有父 Span

正常现象。异步任务的 Span 不是原始请求 Span 的子 Span,而是根 Span。如果需要强关联,使用 setParent(context) 设置依赖关系。

问题三:Kafka 消费端 Span 和生产端不关联

检查:

  1. 生产端是否正确注入了 traceparent Header
  2. 消费者端是否正确从 Header 提取了 Context
  3. extract()inject() 的 Getter/Setter 是否配置正确

质量判断标准

读完本节后,你应该能够回答:

  1. 为什么说异步链路追踪是难题?问题的本质是什么?
  2. 在 CompletableFuture 场景下,如何正确传递 Context?哪些关键 API 是必须使用的?
  3. OTel 的 Context.makeCurrent() 方法的作用是什么?为什么需要 try-finally 确保 Scope 被关闭?
  4. Spring 的 @Async 注解背后使用的是哪个线程池?如何让 Spring 的异步任务自动传播 Context?
  5. 定时任务(@Scheduled)的链路追踪和生产环境消息队列消费(Kafka/RocketMQ)的链路追踪,在 Context 处理上有什么共同点?