#异步链路追踪
同步调用的链路追踪很简单:请求来,Span 创建,调用下游,下游 Span 创建,层层传递。但异步场景就不一样了——线程池提交任务、消息队列消费、定时任务,这些场景下当前线程和发起请求的线程不是同一个,Context 不会自动跟着线程切换走。
这就是异步链路追踪的难题:如何确保异步任务的 Span 能正确关联到原始请求?
#为什么异步链路追踪是难题
#线程切换导致 Context 丢失
Context
@Service
public class OrderService {
private final ExecutorService executor = Executors.newFixedThreadPool(10);
public void placeOrderAsync(Order order) {
Span span = tracer.spanBuilder("placeOrder").startSpan();
try (Scope scope = span.makeCurrent()) {
// 主线程:Context 在这里
executor.submit(() -> {
// ⚠️ 新线程:Context 丢失了!
// span 在主线程创建,但执行在新线程
processPayment(order); // 这个 Span 和主线程的 Span 失去关联
});
} finally {
span.end();
}
}
}#问题的本质
OTel 的 Context 存储在 ThreadLocal 中。当线程切换时,ThreadLocal 的值不会自动跟随——新线程看不到主线程的 Context。
#CompletableFuture 场景
#错误做法
// ❌ 错误:CompletableFuture 内部线程执行,Context 丢失
public CompletableFuture<Order> placeOrder(Order order) {
return CompletableFuture.supplyAsync(() -> {
// 这里拿不到原始 Context
return orderService.process(order);
});
}#正确做法:显式传递 Context
// ✅ 正确:在主线程捕获 Context,传递给异步任务
public CompletableFuture<Order> placeOrder(Order order) {
// 在主线程中捕获当前 Context
Context context = Context.current();
return CompletableFuture.supplyAsync(() -> {
// 在异步线程中恢复 Context
try (Scope scope = context.makeCurrent()) {
Span span = tracer.spanBuilder("placeOrder-async")
.setParent(context) // 关联到原始 Span
.startSpan();
try (Scope childScope = span.makeCurrent()) {
return orderService.process(order);
} finally {
span.end();
}
}
}, executor);
}#使用 OTel 的 Context 工具
AsyncContextHelper.java
@Component
@Slf4j
public class AsyncContextHelper {
private final OpenTelemetry openTelemetry;
private final Tracer tracer;
public AsyncContextHelper(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
this.tracer = openTelemetry.getTracer("async-service");
}
/**
* 在 CompletableFuture 中自动传播 Context
*/
public <T> CompletableFuture<T> traceAsync(
Supplier<T> task,
String spanName) {
// 捕获当前 Context
Context currentContext = Context.current();
return CompletableFuture
.supplyAsync(() -> {
// 恢复 Context
try (Scope scope = currentContext.makeCurrent()) {
Span span = tracer.spanBuilder(spanName)
.setParent(currentContext)
.startSpan();
try {
T result = task.get();
span.setStatus(StatusCode.OK);
return result;
} catch (Exception e) {
span.setStatus(StatusCode.ERROR, e.getMessage());
span.recordException(e);
throw e;
} finally {
span.end();
}
}
});
}
/**
* 线程池的 Context 传播包装
*/
public ExecutorService wrapExecutor(ExecutorService executor) {
return new ContextPreservingExecutorService(executor);
}
private static class ContextPreservingExecutorService
implements ExecutorService {
private final ExecutorService delegate;
ContextPreservingExecutorService(ExecutorService delegate) {
this.delegate = delegate;
}
@Override
public void execute(Runnable task) {
// 捕获提交线程的 Context
Context context = Context.current();
delegate.execute(() -> {
// 在执行线程恢复 Context
try (Scope scope = context.makeCurrent()) {
task.run();
}
});
}
// 其他 ExecutorService 方法类似...
}
}#线程池场景
#自定义线程池
TracingThreadPoolExecutor.java
public class TracingThreadPoolExecutor extends ThreadPoolExecutor {
private final Context context;
private final Tracer tracer;
public TracingThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
OpenTelemetry openTelemetry) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.context = Context.current();
this.tracer = openTelemetry.getTracer("thread-pool");
}
@Override
public void execute(Runnable task) {
// 捕获提交线程的 Context
Context currentContext = this.context;
super.execute(() -> {
// 在工作线程中恢复 Context
try (Scope scope = currentContext.makeCurrent()) {
task.run();
}
});
}
@Override
public <T> Future<T> submit(Callable<T> task) {
Context currentContext = this.context;
return super.submit(() -> {
try (Scope scope = currentContext.makeCurrent()) {
return task.call();
}
});
}
}#Spring 的线程池配置
AsyncConfig.java
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Autowired
private OpenTelemetry openTelemetry;
@Override
public Executor getAsyncExecutor() {
return new ContextAwareThreadPoolTaskExecutor(
SimpleAsyncTaskExecutor.builder()
.corePoolSize(10)
.maxPoolSize(50)
.queueCapacity(100)
.build(),
openTelemetry
);
}
}
/**
* 自动传播 Context 的 Spring 异步执行器
*/
public class ContextAwareThreadPoolTaskExecutor
extends TaskExecutorAdapter {
private final OpenTelemetry openTelemetry;
public ContextAwareThreadPoolTaskExecutor(
TaskExecutor delegate,
OpenTelemetry openTelemetry) {
super(delegate);
this.openTelemetry = openTelemetry;
}
@Override
public void execute(Runnable task) {
Context context = Context.current();
super.execute(() -> {
try (Scope scope = context.makeCurrent()) {
task.run();
}
});
}
}#消息队列场景
Kafka 的异步处理已在「上下文传播」文章中讲过,这里补充 RabbitMQ 和 RocketMQ。
#RocketMQ
RocketMQTracingInterceptor.java
@Service
public class RocketMQTracingInterceptor {
private final OpenTelemetry openTelemetry;
private final Tracer tracer;
public RocketMQTracingInterceptor(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
this.tracer = openTelemetry.getTracer("rocketmq");
}
@rocketmqFilter
public void handleOrderMessage(Message message, ConsumeOrderContext context) {
// 从消息 Header 提取 TraceContext
Context extracted = W3CTraceContextPropagator.getInstance()
.extract(Context.current(), message.getProperties(), GETTER);
Span span = tracer.spanBuilder("rocketmq.consume")
.setParent(extracted)
.setAttribute("messaging.system", "rocketmq")
.setAttribute("messaging.destination", message.getTopic())
.setAttribute("messaging.operation", "receive")
.startSpan();
try (Scope scope = span.makeCurrent()) {
processMessage(message);
span.setStatus(StatusCode.OK);
} catch (Exception e) {
span.setStatus(StatusCode.ERROR, e.getMessage());
span.recordException(e);
throw e;
} finally {
span.end();
}
}
private TextMapGetter<Map<String, String>> GETTER = (carrier, key) -> {
if (carrier instanceof Map) {
return ((Map<String, String>) carrier).get(key);
}
return null;
};
}#定时任务场景
#Spring @Scheduled
ScheduledTaskTracing.java
@Configuration
public class ScheduledTaskTracing implements SchedulingConfigurer {
@Autowired
private OpenTelemetry openTelemetry;
@Override
public void configureTasks(ScheduledTaskRegistrar registrar) {
registrar.setTaskScheduler(
new ContextAwareTaskScheduler(
new ConcurrentTaskScheduler(),
openTelemetry
)
);
}
@Bean
public ScheduledExecutorService scheduledExecutor() {
return new ContextAwareScheduledExecutorService(
Executors.newScheduledThreadPool(5),
openTelemetry
);
}
}
public class ContextAwareScheduledExecutorService
implements ScheduledExecutorService {
private final ScheduledExecutorService delegate;
private final Tracer tracer;
public ContextAwareScheduledExecutorService(
ScheduledExecutorService delegate,
OpenTelemetry openTelemetry) {
this.delegate = delegate;
this.tracer = openTelemetry.getTracer("scheduled");
}
@Override
public ScheduledFuture<?> schedule(
Runnable command, long delay, TimeUnit unit) {
Context context = Context.current();
Span span = tracer.spanBuilder("scheduled-task")
.setParent(context)
.startSpan();
Runnable traced = () -> {
try (Scope scope = span.makeCurrent()) {
command.run();
} finally {
span.end();
}
};
return delegate.schedule(traced, delay, unit);
}
// ... 其他方法
}#常见问题
#问题一:Context 传播到一半断了
排查方法:
- 检查提交任务的线程是否有活跃的 Span
- 检查
Context.current()是否返回了有效的 Context - 检查线程池是否被包装了
ContextPreservingExecutorService
#问题二:异步 Span 没有父 Span
正常现象。异步任务的 Span 不是原始请求 Span 的子 Span,而是根 Span。如果需要强关联,使用 setParent(context) 设置依赖关系。
#问题三:Kafka 消费端 Span 和生产端不关联
检查:
- 生产端是否正确注入了
traceparentHeader - 消费者端是否正确从 Header 提取了 Context
extract()和inject()的 Getter/Setter 是否配置正确
#质量判断标准
读完本节后,你应该能够回答:
- 为什么说异步链路追踪是难题?问题的本质是什么?
- 在 CompletableFuture 场景下,如何正确传递 Context?哪些关键 API 是必须使用的?
- OTel 的
Context.makeCurrent()方法的作用是什么?为什么需要 try-finally 确保 Scope 被关闭? - Spring 的
@Async注解背后使用的是哪个线程池?如何让 Spring 的异步任务自动传播 Context? - 定时任务(@Scheduled)的链路追踪和生产环境消息队列消费(Kafka/RocketMQ)的链路追踪,在 Context 处理上有什么共同点?