diff --git a/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java b/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java index ce46e495a..e7850d771 100644 --- a/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java +++ b/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java @@ -6,8 +6,6 @@ import java.util.function.BooleanSupplier; import java.util.function.Consumer; -import javax.annotation.Nullable; - import io.grpc.Metadata; import tech.ydb.core.impl.call.GrpcFlows; @@ -87,7 +85,6 @@ public GrpcFlowControl getFlowControl() { return flowControl; } - @Nullable public Span getSpan() { return span; } @@ -103,7 +100,7 @@ public static final class Builder { private Consumer trailersHandler = null; private BooleanSupplier pessimizationHook = null; private GrpcFlowControl flowControl = GrpcFlows.SIMPLE_FLOW; - private Span span = null; + private Span span = Span.NOOP; /** * Returns a new {@code Builder} with a deadline, based on the running Java Virtual Machine's @@ -185,7 +182,7 @@ public Builder withPessimizationHook(BooleanSupplier pessimizationHook) { } public Builder withSpan(Span span) { - this.span = span; + this.span = span == null ? Span.NOOP : span; return this; } diff --git a/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java b/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java index 787d05df5..96816601f 100644 --- a/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java +++ b/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java @@ -43,7 +43,7 @@ GrpcReadWriteStream readWriteStreamCall( ScheduledExecutorService getScheduler(); default Tracer getTracer() { - return NoopTracer.INSTANCE; + return NoopTracer.getInstance(); } @Override diff --git a/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java b/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java index 9273d99a0..534cf08be 100644 --- a/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java +++ b/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java @@ -30,7 +30,6 @@ import tech.ydb.core.tracing.Tracer; import tech.ydb.core.utils.Version; - /** * * @author Aleksandr Gorshenin diff --git a/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java b/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java index b6a879b99..4dea38d01 100644 --- a/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java +++ b/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java @@ -126,13 +126,14 @@ public CompletableFuture> unaryCall( ClientCall call = channel.getReadyChannel().newCall(method, options); ChannelStatusHandler handler = new ChannelStatusHandler(channel, settings); - + Span contextSpan = settings.getSpan(); if (logger.isTraceEnabled()) { logger.trace("UnaryCall[{}] with method {} and endpoint {} created", traceId, method.getFullMethodName(), endpoint.getHostAndPort()); } Metadata metadata = makeMetadataFromSettings(settings, endpoint); - return new UnaryCall<>(traceId, endpoint.getHostAndPort(), call, handler).startCall(request, metadata); + return new UnaryCall<>(traceId, endpoint.getHostAndPort(), call, handler, contextSpan) + .startCall(request, metadata); } catch (UnexpectedResultException ex) { logger.warn("UnaryCall[{}] got unexpected status {}", traceId, ex.getStatus()); return CompletableFuture.completedFuture(Result.fail(ex)); @@ -164,6 +165,7 @@ public GrpcReadStream readStreamCall( ClientCall call = channel.getReadyChannel().newCall(method, options); ChannelStatusHandler handler = new ChannelStatusHandler(channel, settings); + Span contextSpan = settings.getSpan(); if (logger.isTraceEnabled()) { logger.trace("ReadStreamCall[{}] with method {} and endpoint {} created", @@ -173,7 +175,9 @@ public GrpcReadStream readStreamCall( Metadata metadata = makeMetadataFromSettings(settings, endpoint); GrpcFlowControl flowCtrl = settings.getFlowControl(); - return new ReadStreamCall<>(traceId, endpoint.getHostAndPort(), call, flowCtrl, request, metadata, handler); + return new ReadStreamCall<>( + endpoint.getHostAndPort(), call, flowCtrl, request, metadata, handler, contextSpan + ); } catch (UnexpectedResultException ex) { logger.warn("ReadStreamCall[{}] got unexpected status {}", traceId, ex.getStatus()); return new EmptyStream<>(ex.getStatus()); @@ -254,7 +258,7 @@ private Metadata makeMetadataFromSettings(GrpcRequestSettings settings, Endpoint } Span span = settings.getSpan(); - if (span != null) { + if (span.isValid()) { span.setAttribute("db.system.name", "ydb"); span.setAttribute("db.namespace", getDatabase()); span.setAttribute("server.address", serverEndpoint.getHost()); diff --git a/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java b/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java index 1d1ee65b8..7ed1f6801 100644 --- a/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java +++ b/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java @@ -19,6 +19,8 @@ import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.grpc.GrpcStatuses; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.tracing.Span; +import tech.ydb.core.tracing.SpanScope; /** * @@ -29,11 +31,12 @@ public class ReadStreamCall extends ClientCall.Listener implements GrpcReadStream { private static final Logger logger = LoggerFactory.getLogger(GrpcTransport.class); - private final String traceId; private final String endpoint; private final ClientCall call; private final Lock callLock = new ReentrantLock(); private final GrpcStatusHandler statusConsumer; + private final Span parentSpan; + private final Span span; private final ReqT request; private final Metadata headers; private final GrpcFlowControl.Call flow; @@ -42,14 +45,15 @@ public class ReadStreamCall extends ClientCall.Listener impl private Observer consumer; - public ReadStreamCall(String traceId, String endpoint, ClientCall call, GrpcFlowControl flowCtrl, - ReqT req, Metadata headers, GrpcStatusHandler statusHandler) { - this.traceId = traceId; + public ReadStreamCall(String endpoint, ClientCall call, GrpcFlowControl flowCtrl, ReqT req, + Metadata headers, GrpcStatusHandler statusHandler, Span span) { this.endpoint = endpoint; this.call = call; this.request = req; this.headers = headers; this.statusConsumer = statusHandler; + this.parentSpan = span.getParentSpan(); + this.span = span; this.flow = flowCtrl.newCall(this::nextRequest); } @@ -67,7 +71,7 @@ public CompletableFuture start(Observer observer) { consumer = observer; call.start(this, headers); if (logger.isTraceEnabled()) { - logger.trace("ReadStreamCall[{}] --> {}", traceId, TextFormat.shortDebugString((Message) request)); + logger.trace("ReadStreamCall[{}] --> {}", endpoint, TextFormat.shortDebugString((Message) request)); } call.sendMessage(request); // close stream by client side @@ -80,7 +84,7 @@ public CompletableFuture start(Observer observer) { try { call.cancel(null, th); } catch (Throwable ex) { - logger.error("ReadStreamCall[{}] got exception while canceling", traceId, ex); + logger.error("ReadStreamCall[{}] got exception while canceling", endpoint, ex); } } finally { callLock.unlock(); @@ -111,42 +115,45 @@ private void nextRequest(int count) { @Override public void onMessage(RespT message) { - try { - if (logger.isTraceEnabled()) { - logger.trace("ReadStreamCall[{}] <-- {}", traceId, TextFormat.shortDebugString((Message) message)); - } - consumer.onNext(message); - flow.onMessageRead(); - } catch (Exception ex) { - statusFuture.completeExceptionally(ex); - + try (SpanScope ignored = span.makeCurrent()) { try { - callLock.lock(); + if (logger.isTraceEnabled()) { + logger.trace("ReadStreamCall[{}] <-- {}", endpoint, TextFormat.shortDebugString((Message) message)); + } + consumer.onNext(message); + flow.onMessageRead(); + } catch (Exception ex) { + statusFuture.completeExceptionally(ex); + try { - call.cancel("Canceled by exception from observer", ex); - } finally { - callLock.unlock(); + callLock.lock(); + try { + call.cancel("Canceled by exception from observer", ex); + } finally { + callLock.unlock(); + } + } catch (Throwable th) { + logger.error("ReadStreamCall[{}] got exception while canceling", endpoint, th); } - } catch (Throwable th) { - logger.error("ReadStreamCall[{}] got exception while canceling", traceId, th); } } } @Override public void onClose(io.grpc.Status status, @Nullable Metadata trailers) { - if (logger.isTraceEnabled()) { - logger.trace("ReadStreamCall[{}] closed with status {}", traceId, status); - } + try (SpanScope ignored = parentSpan.makeCurrent()) { + if (logger.isTraceEnabled()) { + logger.trace("ReadStreamCall[{}] closed with status {}", endpoint, status); + } - statusConsumer.accept(status, trailers); + statusConsumer.accept(status, trailers); - if (status.isOk()) { - statusFuture.complete(Status.SUCCESS); - } else { - statusFuture.complete(GrpcStatuses.toStatus(status, endpoint)); + if (status.isOk()) { + statusFuture.complete(Status.SUCCESS); + } else { + statusFuture.complete(GrpcStatuses.toStatus(status, endpoint)); + } + statusConsumer.postComplete(); } - - statusConsumer.postComplete(); } } diff --git a/core/src/main/java/tech/ydb/core/impl/call/UnaryCall.java b/core/src/main/java/tech/ydb/core/impl/call/UnaryCall.java index 7f74196f6..0ac6075f3 100644 --- a/core/src/main/java/tech/ydb/core/impl/call/UnaryCall.java +++ b/core/src/main/java/tech/ydb/core/impl/call/UnaryCall.java @@ -18,6 +18,8 @@ import tech.ydb.core.StatusCode; import tech.ydb.core.grpc.GrpcStatuses; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.tracing.Span; +import tech.ydb.core.tracing.SpanScope; import tech.ydb.proto.auth.YdbAuth; /** @@ -39,15 +41,18 @@ public class UnaryCall extends ClientCall.Listener { private final String endpoint; private final ClientCall call; private final GrpcStatusHandler statusConsumer; + private final Span parentSpan; private final CompletableFuture> future = new CompletableFuture<>(); private final AtomicReference value = new AtomicReference<>(); - public UnaryCall(String traceId, String endpoint, ClientCall call, GrpcStatusHandler statusConsumer) { + public UnaryCall(String traceId, String endpoint, ClientCall call, + GrpcStatusHandler statusConsumer, Span contextSpan) { this.traceId = traceId; this.endpoint = endpoint; this.call = call; this.statusConsumer = statusConsumer; + this.parentSpan = contextSpan.getParentSpan(); } public CompletableFuture> startCall(ReqT request, Metadata headers) { @@ -91,23 +96,25 @@ public void onMessage(RespT value) { @Override public void onClose(io.grpc.Status status, @Nullable Metadata trailers) { - statusConsumer.accept(status, trailers); - if (logger.isTraceEnabled()) { - logger.trace("UnaryCall[{}] closed with status {}", traceId, status); - } + try (SpanScope ignored = parentSpan.makeCurrent()) { + statusConsumer.accept(status, trailers); + if (logger.isTraceEnabled()) { + logger.trace("UnaryCall[{}] closed with status {}", traceId, status); + } - if (status.isOk()) { - RespT snapshotValue = value.get(); + if (status.isOk()) { + RespT snapshotValue = value.get(); - if (snapshotValue == null) { - future.complete(Result.fail(NO_VALUE)); + if (snapshotValue == null) { + future.complete(Result.fail(NO_VALUE)); + } else { + future.complete(Result.success(snapshotValue)); + } } else { - future.complete(Result.success(snapshotValue)); + future.complete(GrpcStatuses.toResult(status, endpoint)); } - } else { - future.complete(GrpcStatuses.toResult(status, endpoint)); - } - statusConsumer.postComplete(); + statusConsumer.postComplete(); + } } } diff --git a/core/src/main/java/tech/ydb/core/tracing/NoopTracer.java b/core/src/main/java/tech/ydb/core/tracing/NoopTracer.java index 37ee3668f..7187448be 100644 --- a/core/src/main/java/tech/ydb/core/tracing/NoopTracer.java +++ b/core/src/main/java/tech/ydb/core/tracing/NoopTracer.java @@ -7,7 +7,7 @@ * singletons, no allocations per call (except caller code). */ public final class NoopTracer implements Tracer { - public static final NoopTracer INSTANCE = new NoopTracer(); + private static final NoopTracer INSTANCE = new NoopTracer(); private NoopTracer() { } @@ -18,6 +18,6 @@ public static NoopTracer getInstance() { @Override public Span startSpan(String spanName, SpanKind spanKind) { - return null; + return Span.NOOP; } } diff --git a/core/src/main/java/tech/ydb/core/tracing/Span.java b/core/src/main/java/tech/ydb/core/tracing/Span.java index b4c5ec933..b2534064d 100644 --- a/core/src/main/java/tech/ydb/core/tracing/Span.java +++ b/core/src/main/java/tech/ydb/core/tracing/Span.java @@ -12,24 +12,56 @@ * A span represents a timed operation. */ public interface Span { + Span NOOP = new Span() { + }; - String getId(); + /** + * Returns W3C traceparent value for request propagation. + * + *

For {@link #NOOP} this returns an empty string. Check {@link #isValid()} to decide whether + * trace headers should be sent to server. + * + * @return traceparent value + */ + default String getId() { + return ""; + } + + /** + * Indicates whether this span carries a real tracing context. + * + * @return true for real spans, false for noop span + */ + default boolean isValid() { + return false; + } + + /** + * Returns parent span captured at the moment when this span was created. + * + * @return parent span or {@link #NOOP} if parent is unavailable + */ + default Span getParentSpan() { + return NOOP; + } /** - * Sets a string attribute on the span (ignored by Noop implementation). + * Sets a string attribute on the span. * * @param key attribute key * @param value attribute value, may be null */ - void setAttribute(String key, @Nullable String value); + default void setAttribute(String key, @Nullable String value) { + } /** - * Sets a long attribute on the span (ignored by Noop implementation). + * Sets a long attribute on the span. * * @param key attribute key * @param value attribute value */ - void setAttribute(String key, long value); + default void setAttribute(String key, long value) { + } /** * Sets span status (success or error) with human-readable message. @@ -37,27 +69,53 @@ public interface Span { * @param status operation status used to map span attributes * @param error operation exception used to map span attributes */ - void setStatus(Status status, Throwable error); + default void setStatus(@Nullable Status status, @Nullable Throwable error) { + } + + /** + * Makes this span current in the active execution context. + * + * @return closeable scope handle + */ + default SpanScope makeCurrent() { + return () -> { + }; + } - void end(); + /** + * Ends (finishes) this span. + */ + default void end() { + } + /** + * Subscribes to a {@link Status} future: when it completes, sets status/error and ends the span. + * For non-valid spans returns the future as-is without subscribing. + * + * @param span the span to finalize + * @param future the future to observe + * @return the same future (for chaining) + */ static CompletableFuture endOnStatus(Span span, CompletableFuture future) { - if (span != null) { - future.whenComplete((status, th) -> { - span.setStatus(status, FutureTools.unwrapCompletionException(th)); - span.end(); - }); - } - return future; + return span.isValid() ? future.whenComplete((status, th) -> { + span.setStatus(status, FutureTools.unwrapCompletionException(th)); + span.end(); + }) : future; } + /** + * Subscribes to a {@link Result} future: when it completes, sets status/error and ends the span. + * For non-valid spans returns the future as-is without subscribing. + * + * @param result value type + * @param span the span to finalize + * @param future the future to observe + * @return the same future (for chaining) + */ static CompletableFuture> endOnResult(Span span, CompletableFuture> future) { - if (span != null) { - future.whenComplete((result, th) -> { - span.setStatus(result != null ? result.getStatus() : null, FutureTools.unwrapCompletionException(th)); - span.end(); - }); - } - return future; + return span.isValid() ? future.whenComplete((result, th) -> { + span.setStatus(result != null ? result.getStatus() : null, FutureTools.unwrapCompletionException(th)); + span.end(); + }) : future; } } diff --git a/core/src/main/java/tech/ydb/core/tracing/SpanScope.java b/core/src/main/java/tech/ydb/core/tracing/SpanScope.java new file mode 100644 index 000000000..5281b0702 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/tracing/SpanScope.java @@ -0,0 +1,9 @@ +package tech.ydb.core.tracing; + +/** + * Closeable scope returned by {@link Span#makeCurrent()}. + */ +public interface SpanScope extends AutoCloseable { + @Override + void close(); +} diff --git a/core/src/main/java/tech/ydb/core/tracing/Tracer.java b/core/src/main/java/tech/ydb/core/tracing/Tracer.java index 74b698663..7bf37a7a3 100644 --- a/core/src/main/java/tech/ydb/core/tracing/Tracer.java +++ b/core/src/main/java/tech/ydb/core/tracing/Tracer.java @@ -1,7 +1,5 @@ package tech.ydb.core.tracing; -import javax.annotation.Nullable; - /** * Tracer is an entry point to create spans. * @@ -17,8 +15,16 @@ public interface Tracer { * * @param spanName logical span name (for example, ydb.ExecuteQuery) * @param spanKind span kind that defines operation role - * @return created span instance, or null if implementation does not create spans + * @return created span instance */ - @Nullable Span startSpan(String spanName, SpanKind spanKind); + + /** + * Returns the currently active span. + * + * @return active span or {@link Span#NOOP} when unavailable + */ + default Span currentSpan() { + return Span.NOOP; + } } diff --git a/core/src/test/java/tech/ydb/core/impl/YdbTransportImplTest.java b/core/src/test/java/tech/ydb/core/impl/YdbTransportImplTest.java index c173aa78f..1ec3cd56f 100644 --- a/core/src/test/java/tech/ydb/core/impl/YdbTransportImplTest.java +++ b/core/src/test/java/tech/ydb/core/impl/YdbTransportImplTest.java @@ -34,6 +34,7 @@ import tech.ydb.core.operation.OperationBinder; import tech.ydb.core.tracing.NoopTracer; import tech.ydb.core.tracing.Span; +import tech.ydb.core.tracing.SpanScope; import tech.ydb.core.tracing.Tracer; import tech.ydb.proto.discovery.DiscoveryProtos; import tech.ydb.proto.discovery.v1.DiscoveryServiceGrpc; @@ -410,6 +411,11 @@ public String getId() { return id; } + @Override + public boolean isValid() { + return true; + } + @Override public void setAttribute(String key, String value) { stringAttrs.put(key, value); @@ -420,11 +426,6 @@ public void setAttribute(String key, long value) { longAttrs.put(key, value); } - @Override - public void setStatus(tech.ydb.core.Status status, Throwable error) { - // not needed in this test - } - @Override public void end() { // not needed in this test diff --git a/core/src/test/java/tech/ydb/core/tracing/NoopTracerTest.java b/core/src/test/java/tech/ydb/core/tracing/NoopTracerTest.java index 35ace261d..3718efa51 100644 --- a/core/src/test/java/tech/ydb/core/tracing/NoopTracerTest.java +++ b/core/src/test/java/tech/ydb/core/tracing/NoopTracerTest.java @@ -20,8 +20,9 @@ public void singletonTest() { public void spanTest() { Tracer tracer = NoopTracer.getInstance(); - Assert.assertNull(tracer.startSpan("test", SpanKind.CLIENT)); - Assert.assertNull(tracer.startSpan("test", SpanKind.INTERNAL)); - Assert.assertNull(tracer.startSpan(null, null)); + Assert.assertSame(Span.NOOP, tracer.startSpan("test", SpanKind.CLIENT)); + Assert.assertSame(Span.NOOP, tracer.startSpan("test", SpanKind.INTERNAL)); + Assert.assertSame(Span.NOOP, tracer.startSpan(null, null)); + Assert.assertSame(Span.NOOP, tracer.currentSpan()); } } diff --git a/core/src/test/java/tech/ydb/core/tracing/SpanTest.java b/core/src/test/java/tech/ydb/core/tracing/SpanTest.java index d03fc6432..186f9de14 100644 --- a/core/src/test/java/tech/ydb/core/tracing/SpanTest.java +++ b/core/src/test/java/tech/ydb/core/tracing/SpanTest.java @@ -19,8 +19,8 @@ public void finishByStatusExceptionTest() { CompletableFuture future = new CompletableFuture<>(); - Assert.assertSame(future, Span.endOnStatus(null, future)); - Assert.assertSame(future, Span.endOnStatus(span, future)); + Assert.assertSame(future, Span.endOnStatus(Span.NOOP, future)); + Assert.assertNotSame(future, Span.endOnStatus(span, future)); Assert.assertFalse(span.ended); Assert.assertNull(span.statusError); @@ -40,7 +40,7 @@ public void finishByResultTest() { CompletableFuture> future = new CompletableFuture<>(); - Assert.assertSame(future, Span.endOnResult(span, future)); + Assert.assertNotSame(future, Span.endOnResult(span, future)); Assert.assertFalse(span.ended); Assert.assertNull(span.statusError); @@ -60,8 +60,8 @@ public void finishByResultExceptionTest() { CompletableFuture> future = new CompletableFuture<>(); - Assert.assertSame(future, Span.endOnResult(null, future)); - Assert.assertSame(future, Span.endOnResult(span, future)); + Assert.assertSame(future, Span.endOnResult(Span.NOOP, future)); + Assert.assertNotSame(future, Span.endOnResult(span, future)); Assert.assertFalse(span.ended); Assert.assertNull(span.statusError); @@ -80,18 +80,8 @@ private static final class RecordingSpan implements Span { private boolean ended; @Override - public String getId() { - return "test-span"; - } - - @Override - public void setAttribute(String key, String value) { - // not needed in this test - } - - @Override - public void setAttribute(String key, long value) { - // not needed in this test + public boolean isValid() { + return true; } @Override diff --git a/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java index 40e358dcb..30cb77f23 100644 --- a/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java +++ b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java @@ -2,13 +2,18 @@ import java.util.Objects; +import javax.annotation.Nullable; + import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; import tech.ydb.core.Status; +import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.tracing.Span; import tech.ydb.core.tracing.SpanKind; +import tech.ydb.core.tracing.SpanScope; import tech.ydb.core.tracing.Tracer; public final class OpenTelemetryTracer implements Tracer { @@ -36,10 +41,20 @@ public static OpenTelemetryTracer fromOpenTelemetry(OpenTelemetry openTelemetry, @Override public Span startSpan(String spanName, SpanKind spanKind) { + Span parentSpan = currentSpan(); io.opentelemetry.api.trace.Span span = tracer.spanBuilder(spanName) .setSpanKind(mapSpanKind(spanKind)) .startSpan(); - return new OtelSpan(span); + return new OtelSpan(span, parentSpan); + } + + @Override + public Span currentSpan() { + io.opentelemetry.api.trace.Span current = io.opentelemetry.api.trace.Span.current(); + if (!current.getSpanContext().isValid()) { + return Span.NOOP; + } + return new OtelSpan(current, Span.NOOP); } private static io.opentelemetry.api.trace.SpanKind mapSpanKind(SpanKind kind) { @@ -51,9 +66,11 @@ private static io.opentelemetry.api.trace.SpanKind mapSpanKind(SpanKind kind) { private static final class OtelSpan implements Span { private final io.opentelemetry.api.trace.Span span; + private final Span parentSpan; - private OtelSpan(io.opentelemetry.api.trace.Span span) { + private OtelSpan(io.opentelemetry.api.trace.Span span, Span parentSpan) { this.span = span; + this.parentSpan = parentSpan; } @Override @@ -62,7 +79,17 @@ public String getId() { } @Override - public void setAttribute(String key, String value) { + public boolean isValid() { + return span.getSpanContext().isValid(); + } + + @Override + public Span getParentSpan() { + return parentSpan; + } + + @Override + public void setAttribute(String key, @Nullable String value) { span.setAttribute(key, value); } @@ -72,7 +99,7 @@ public void setAttribute(String key, long value) { } @Override - public void setStatus(Status status, Throwable error) { + public void setStatus(@Nullable Status status, @Nullable Throwable error) { if (status != null) { if (status.isSuccess()) { span.setStatus(StatusCode.OK); @@ -84,11 +111,23 @@ public void setStatus(Status status, Throwable error) { } } if (error != null) { - span.setAttribute("error.type", error.getClass().getName()); + if (error instanceof UnexpectedResultException) { + tech.ydb.core.StatusCode code = ((UnexpectedResultException) error).getStatus().getCode(); + span.setAttribute("db.response.status_code", code.toString()); + span.setAttribute("error.type", code.isTransportError() ? "transport_error" : "ydb_error"); + } else { + span.setAttribute("error.type", error.getClass().getName()); + } span.setStatus(StatusCode.ERROR, error.getMessage()); } } + @Override + public SpanScope makeCurrent() { + Scope scope = span.makeCurrent(); + return scope::close; + } + @Override public void end() { span.end(); diff --git a/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryQueryTracingIntegrationTest.java b/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryQueryTracingIntegrationTest.java index a7cb6c6f7..732b0ef17 100644 --- a/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryQueryTracingIntegrationTest.java +++ b/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryQueryTracingIntegrationTest.java @@ -1,7 +1,11 @@ package tech.ydb.opentelemetry; import java.time.Duration; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.AttributeKey; @@ -25,11 +29,13 @@ import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.StatusCode; +import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.query.QueryClient; import tech.ydb.query.QuerySession; import tech.ydb.query.QueryTransaction; import tech.ydb.query.result.QueryInfo; +import tech.ydb.query.tools.SessionRetryContext; import tech.ydb.table.Session; import tech.ydb.table.TableClient; import tech.ydb.table.query.DataQueryResult; @@ -196,6 +202,231 @@ public void sdkSpanIsChildOfApplicationSpan() { } } + @Test + public void retrySpanIsParentForRpcSpans() { + AtomicInteger attempt = new AtomicInteger(); + SessionRetryContext retryContext = SessionRetryContext.create(queryClient) + .maxRetries(1) + .backoffSlot(Duration.ofMillis(1)) + .fastBackoffSlot(Duration.ofMillis(1)) + .build(); + + io.opentelemetry.api.trace.Span appSpan = appTracer.spanBuilder("app.parent.retry").startSpan(); + try (Scope scope = appSpan.makeCurrent()) { + Assert.assertNotNull(scope); + Status status = retryContext.supplyStatus(session -> { + if (attempt.getAndIncrement() == 0) { + return CompletableFuture.completedFuture(Status.of(StatusCode.OVERLOADED)); + } + return session.createQuery("SELECT 1", TxMode.NONE).execute().thenApply(Result::getStatus); + }).join(); + status.expectSuccess(); + } finally { + appSpan.end(); + } + + List spans = spanExporter.getFinishedSpanItems(); + SpanData querySpan = null; + Set retrySpanIds = new HashSet<>(); + int retrySpansCount = 0; + + for (SpanData span : spans) { + if ("ydb.ExecuteWithRetry".equals(span.getName())) { + retrySpansCount++; + retrySpanIds.add(span.getSpanId()); + Assert.assertEquals(appSpan.getSpanContext().getTraceId(), span.getTraceId()); + Assert.assertEquals(appSpan.getSpanContext().getSpanId(), span.getParentSpanId()); + } + if ("ydb.ExecuteQuery".equals(span.getName())) { + querySpan = span; + } + } + + Assert.assertEquals(2, retrySpansCount); + Assert.assertNotNull("Retry should produce query rpc span", querySpan); + Assert.assertEquals(appSpan.getSpanContext().getTraceId(), querySpan.getTraceId()); + Assert.assertTrue("RPC span parent must be ExecuteWithRetry span", + retrySpanIds.contains(querySpan.getParentSpanId())); + } + + @Test + public void tableProxyRetrySpanIsParentForRpcSpans() { + AtomicInteger attempt = new AtomicInteger(); + io.opentelemetry.api.trace.Span appSpan = appTracer.spanBuilder("app.parent.table.retry").startSpan(); + try (Scope scope = appSpan.makeCurrent()) { + Assert.assertNotNull(scope); + try (TableClient tableClient = QueryClient.newTableClient(transport).build()) { + tech.ydb.table.SessionRetryContext retryContext = tech.ydb.table.SessionRetryContext.create(tableClient) + .maxRetries(1) + .backoffSlot(Duration.ofMillis(1)) + .fastBackoffSlot(Duration.ofMillis(1)) + .build(); + + Status status = retryContext.supplyStatus(session -> { + if (attempt.getAndIncrement() == 0) { + return CompletableFuture.completedFuture(Status.of(StatusCode.OVERLOADED)); + } + TableTransaction tx = session.createNewTransaction(TxMode.SERIALIZABLE_RW); + Result queryResult = tx.executeDataQuery("SELECT 1", Params.empty()).join(); + queryResult.getStatus().expectSuccess(); + return tx.commit(); + }).join(); + status.expectSuccess(); + } + } finally { + appSpan.end(); + } + + assertSpanOK("ydb.CreateSession", 1); + assertSpanOK("ydb.ExecuteQuery", 1); + assertSpanOK("ydb.Commit", 1); + + List spans = spanExporter.getFinishedSpanItems(); + Set retrySpanIds = new HashSet<>(); + + for (SpanData span : spans) { + if ("ydb.ExecuteWithRetry".equals(span.getName())) { + retrySpanIds.add(span.getSpanId()); + Assert.assertEquals(appSpan.getSpanContext().getTraceId(), span.getTraceId()); + Assert.assertEquals(appSpan.getSpanContext().getSpanId(), span.getParentSpanId()); + } + } + Assert.assertEquals(2, retrySpanIds.size()); + + int createSessionChildren = 0; + int executeQueryChildren = 0; + int commitChildren = 0; + + for (SpanData span : spans) { + if ("ydb.CreateSession".equals(span.getName()) && retrySpanIds.contains(span.getParentSpanId())) { + createSessionChildren++; + } + if ("ydb.ExecuteQuery".equals(span.getName()) && retrySpanIds.contains(span.getParentSpanId())) { + executeQueryChildren++; + } + if ("ydb.Commit".equals(span.getName()) && retrySpanIds.contains(span.getParentSpanId())) { + commitChildren++; + } + } + + Assert.assertTrue("CreateSession span must be child of ExecuteWithRetry", createSessionChildren >= 1); + Assert.assertTrue("ExecuteQuery span must be child of ExecuteWithRetry", executeQueryChildren >= 1); + Assert.assertTrue("Commit span must be child of ExecuteWithRetry", commitChildren >= 1); + } + + @Test + public void nonRetryableExceptionProducesErrorRetrySpan() { + SessionRetryContext retryContext = SessionRetryContext.create(queryClient) + .maxRetries(5) + .backoffSlot(Duration.ofMillis(1)) + .fastBackoffSlot(Duration.ofMillis(1)) + .build(); + + RuntimeException thrown; + try { + retryContext.supplyStatus(session -> { + throw new IllegalStateException("boom"); + }).join(); + throw new AssertionError("Exception expected"); + } catch (RuntimeException ex) { + thrown = ex; + } + + Assert.assertNotNull(thrown.getCause()); + Assert.assertTrue(thrown.getCause() instanceof IllegalStateException); + assertSpanOK("ydb.CreateSession", 1); + assertSpanOK("ydb.ExecuteQuery", 0); + + List spans = spanExporter.getFinishedSpanItems(); + int retrySpans = 0; + int errorRetrySpans = 0; + for (SpanData span : spans) { + if (!"ydb.ExecuteWithRetry".equals(span.getName())) { + continue; + } + retrySpans++; + if (io.opentelemetry.api.trace.StatusCode.ERROR.equals(span.getStatus().getStatusCode())) { + errorRetrySpans++; + Assert.assertEquals(IllegalStateException.class.getName(), + span.getAttributes().get(ERROR_TYPE)); + } + } + Assert.assertEquals(1, retrySpans); + Assert.assertEquals(1, errorRetrySpans); + } + + @Test + public void retryableUnexpectedResultExceptionRetriesAndSetsErrorType() { + AtomicInteger attempt = new AtomicInteger(); + SessionRetryContext retryContext = SessionRetryContext.create(queryClient) + .maxRetries(5) + .backoffSlot(Duration.ofMillis(1)) + .fastBackoffSlot(Duration.ofMillis(1)) + .build(); + + Status status = retryContext.supplyStatus(session -> { + if (attempt.getAndIncrement() == 0) { + throw new UnexpectedResultException("retryable", Status.of(StatusCode.OVERLOADED)); + } + return session.createQuery("SELECT 1", TxMode.NONE).execute().thenApply(Result::getStatus); + }).join(); + status.expectSuccess(); + + assertSpanOK("ydb.ExecuteQuery", 1); + + List spans = spanExporter.getFinishedSpanItems(); + int retrySpans = 0; + int errorRetrySpans = 0; + int okRetrySpans = 0; + for (SpanData span : spans) { + if (!"ydb.ExecuteWithRetry".equals(span.getName())) { + continue; + } + retrySpans++; + if (io.opentelemetry.api.trace.StatusCode.ERROR.equals(span.getStatus().getStatusCode())) { + errorRetrySpans++; + Assert.assertEquals("ydb_error", span.getAttributes().get(ERROR_TYPE)); + Assert.assertEquals(StatusCode.OVERLOADED.toString(), span.getAttributes().get(STATUS_CODE)); + } else if (io.opentelemetry.api.trace.StatusCode.OK.equals(span.getStatus().getStatusCode())) { + okRetrySpans++; + } + } + Assert.assertEquals(2, retrySpans); + Assert.assertEquals(1, errorRetrySpans); + Assert.assertEquals(1, okRetrySpans); + } + + @Test + public void tableProxySpansAreChildrenOfApplicationSpan() { + io.opentelemetry.api.trace.Span appSpan = appTracer.spanBuilder("app.parent.table").startSpan(); + try (Scope scope = appSpan.makeCurrent()) { + Assert.assertNotNull(scope); + try (TableClient tableClient = QueryClient.newTableClient(transport).build()) { + try (Session session = tableClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + TableTransaction tx = session.createNewTransaction(TxMode.SERIALIZABLE_RW); + Result queryResult = tx.executeDataQuery("SELECT 1", Params.empty()).join(); + queryResult.getStatus().expectSuccess(); + Status commitStatus = tx.commit().join(); + commitStatus.expectSuccess(); + } + } + } finally { + appSpan.end(); + } + + List spans = spanExporter.getFinishedSpanItems(); + int matched = 0; + for (SpanData span : spans) { + if (!span.getName().startsWith("ydb.")) { + continue; + } + Assert.assertEquals(appSpan.getSpanContext().getTraceId(), span.getTraceId()); + Assert.assertEquals(appSpan.getSpanContext().getSpanId(), span.getParentSpanId()); + matched++; + } + Assert.assertEquals(3, matched); + } + private void assertBaseAttributes(SpanData span) { Assert.assertEquals(io.opentelemetry.api.trace.SpanKind.CLIENT, span.getKind()); Assert.assertEquals("ydb", span.getAttributes().get(DB_SYSTEM_NAME)); @@ -222,7 +453,7 @@ private void assertSpanError(String spanName, int expectedCount, Status status) assertBaseAttributes(span); return; } - Assert.assertEquals("Unexpected count of span " + spanName, expectedCount, count); + Assert.assertEquals("Unexpected count of span " + spanName, count, expectedCount); } private void assertSpanOK(String spanName, int expectedCount) { diff --git a/query/src/main/java/tech/ydb/query/QueryClient.java b/query/src/main/java/tech/ydb/query/QueryClient.java index a71d8c12f..343ccb01f 100644 --- a/query/src/main/java/tech/ydb/query/QueryClient.java +++ b/query/src/main/java/tech/ydb/query/QueryClient.java @@ -8,6 +8,8 @@ import tech.ydb.core.Result; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.tracing.NoopTracer; +import tech.ydb.core.tracing.Tracer; import tech.ydb.query.impl.QueryClientImpl; import tech.ydb.query.impl.TableClientImpl; import tech.ydb.table.SessionPoolStats; @@ -37,6 +39,10 @@ static TableClient.Builder newTableClient(@WillNotClose GrpcTransport transport) ScheduledExecutorService getScheduler(); + default Tracer getTracer() { + return NoopTracer.getInstance(); + } + SessionPoolStats getSessionPoolStats(); @Override @@ -44,6 +50,7 @@ static TableClient.Builder newTableClient(@WillNotClose GrpcTransport transport) interface Builder { Builder sessionPoolMinSize(int minSize); + Builder sessionPoolMaxSize(int maxSize); Builder sessionMaxIdleTime(Duration duration); diff --git a/query/src/main/java/tech/ydb/query/impl/QueryClientImpl.java b/query/src/main/java/tech/ydb/query/impl/QueryClientImpl.java index 88cd83b7f..4716abc85 100644 --- a/query/src/main/java/tech/ydb/query/impl/QueryClientImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/QueryClientImpl.java @@ -9,6 +9,7 @@ import tech.ydb.core.Result; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.tracing.Tracer; import tech.ydb.query.QueryClient; import tech.ydb.query.QuerySession; import tech.ydb.table.SessionPoolStats; @@ -20,6 +21,7 @@ public class QueryClientImpl implements QueryClient { private final SessionPool pool; private final ScheduledExecutorService scheduler; + private final Tracer tracer; public QueryClientImpl(Builder builder) { this.pool = new SessionPool( @@ -31,6 +33,7 @@ public QueryClientImpl(Builder builder) { builder.sessionPoolIdleDuration ); this.scheduler = builder.transport.getScheduler(); + this.tracer = builder.transport.getTracer(); } @Override @@ -43,6 +46,11 @@ public ScheduledExecutorService getScheduler() { return scheduler; } + @Override + public Tracer getTracer() { + return tracer; + } + public void updatePoolMaxSize(int maxSize) { pool.updateMaxSize(maxSize); } diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index c22dff0d2..763d15c39 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -312,7 +312,7 @@ GrpcReadStream createGrpcStream( @Override public QueryStream createQuery(String query, TxMode tx, Params prms, ExecuteQuerySettings settings) { YdbQuery.TransactionControl tc = TxControl.txModeCtrl(tx, true); - Span span = rpc.startSpan("ydb.ExecuteQuery"); + Span span = startSpan("ydb.ExecuteQuery"); return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span) { @Override void handleTxMeta(String txID) { @@ -377,51 +377,52 @@ public CompletableFuture> execute(PartsHandler handler) { final UpdatableOptional operationStatus = new UpdatableOptional<>(); final UpdatableOptional stats = new UpdatableOptional<>(); return Span.endOnResult(span, grpcStream.start(msg -> { - if (isTraceEnabled) { - logger.trace("{} got stream message {}", SessionImpl.this, TextFormat.shortDebugString(msg)); - } - Issue[] issues = Issue.fromPb(msg.getIssuesList()); - Status status = Status.of(StatusCode.fromProto(msg.getStatus()), issues); + if (isTraceEnabled) { + logger.trace("{} got stream message {}", + SessionImpl.this, TextFormat.shortDebugString(msg)); + } + Issue[] issues = Issue.fromPb(msg.getIssuesList()); + Status status = Status.of(StatusCode.fromProto(msg.getStatus()), issues); - updateSessionState(status); + updateSessionState(status); - if (!status.isSuccess()) { - handleTxMeta(null); - operationStatus.update(status); - return; - } + if (!status.isSuccess()) { + handleTxMeta(null); + operationStatus.update(status); + return; + } - if (msg.hasTxMeta()) { - handleTxMeta(msg.getTxMeta().getId()); - } - if (issues.length > 0) { - if (handler != null) { - handler.onIssues(issues); - } else { - logger.trace("{} lost issues message", SessionImpl.this); - } - } - if (msg.hasExecStats()) { - stats.update(new QueryStats(msg.getExecStats())); - } + if (msg.hasTxMeta()) { + handleTxMeta(msg.getTxMeta().getId()); + } + if (issues.length > 0) { + if (handler != null) { + handler.onIssues(issues); + } else { + logger.trace("{} lost issues message", SessionImpl.this); + } + } + if (msg.hasExecStats()) { + stats.update(new QueryStats(msg.getExecStats())); + } - if (msg.hasResultSet()) { - long index = msg.getResultSetIndex(); - if (handler != null) { - handler.onNextRawPart(index, msg.getResultSet()); - } else { - logger.trace("{} lost result set part with index {}", SessionImpl.this, index); - } - } - }).whenComplete(this::handleCompletion).thenApply(streamStatus -> { - updateSessionState(streamStatus); - Status status = operationStatus.orElse(streamStatus); - if (status.isSuccess()) { - return Result.success(new QueryInfo(stats.get()), streamStatus); - } else { - return Result.fail(status); - } - }) + if (msg.hasResultSet()) { + long index = msg.getResultSetIndex(); + if (handler != null) { + handler.onNextRawPart(index, msg.getResultSet()); + } else { + logger.trace("{} lost result set part with index {}", SessionImpl.this, index); + } + } + }).whenComplete(this::handleCompletion).thenApply(streamStatus -> { + updateSessionState(streamStatus); + Status status = operationStatus.orElse(streamStatus); + if (status.isSuccess()) { + return Result.success(new QueryInfo(stats.get()), streamStatus); + } else { + return Result.fail(status); + } + }) ); } @@ -459,7 +460,7 @@ public QueryStream createQuery(String query, boolean commitAtEnd, Params prms, E ? TxControl.txIdCtrl(currentId, commitAtEnd) : TxControl.txModeCtrl(txMode, commitAtEnd); - Span span = rpc.startSpan("ydb.ExecuteQuery"); + Span span = startSpan("ydb.ExecuteQuery"); return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span) { @Override void handleTxMeta(String txID) { @@ -499,7 +500,7 @@ public void cancel() { @Override public CompletableFuture> commit(CommitTransactionSettings settings) { - Span span = rpc.startSpan("ydb.Commit"); + Span span = startSpan("ydb.Commit"); CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>()); String transactionId = txId.get(); if (transactionId == null) { @@ -533,7 +534,7 @@ public CompletableFuture> commit(CommitTransactionSettings set @Override public CompletableFuture rollback(RollbackTransactionSettings settings) { - Span span = rpc.startSpan("ydb.Rollback"); + Span span = startSpan("ydb.Rollback"); CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>()); String transactionId = txId.get(); diff --git a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java index f5ebadeb6..9baadd0a2 100644 --- a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java @@ -16,6 +16,7 @@ import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.core.tracing.Span; +import tech.ydb.core.tracing.Tracer; import tech.ydb.proto.ValueProtos; import tech.ydb.proto.query.YdbQuery; import tech.ydb.proto.table.YdbTable; @@ -60,6 +61,11 @@ public ScheduledExecutorService getScheduler() { return proxy.getScheduler(); } + @Override + public Tracer getTracer() { + return proxy.getTracer(); + } + @Override public SessionPoolStats sessionPoolStats() { return proxy.getSessionPoolStats(); diff --git a/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java b/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java index cbecca71a..b416b07ec 100644 --- a/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java +++ b/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java @@ -19,6 +19,10 @@ import tech.ydb.core.Status; import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; +import tech.ydb.core.tracing.Span; +import tech.ydb.core.tracing.SpanKind; +import tech.ydb.core.tracing.SpanScope; +import tech.ydb.core.tracing.Tracer; import tech.ydb.core.utils.FutureTools; import tech.ydb.query.QueryClient; import tech.ydb.query.QuerySession; @@ -29,6 +33,9 @@ */ @ParametersAreNonnullByDefault public class SessionRetryContext { + private static final String EXECUTE_WITH_RETRY_SPAN_NAME = "ydb.ExecuteWithRetry"; + private static final String RETRY_ATTEMPT_ATTR = "ydb.retry.attempt"; + private static final String RETRY_SLEEP_MS_ATTR = "ydb.retry.sleep_ms"; private final QueryClient queryClient; private final Executor executor; @@ -136,29 +143,39 @@ private abstract class BaseRetryableTask implements Runnable { private final CompletableFuture promise = new CompletableFuture<>(); private final AtomicInteger retryNumber = new AtomicInteger(); private final Function> fn; + private final Tracer tracer; + private final Span parentSpan; + private Span retrySpan = Span.NOOP; BaseRetryableTask(Function> fn) { this.fn = fn; + this.tracer = queryClient.getTracer(); + this.parentSpan = tracer.currentSpan(); } CompletableFuture getFuture() { return promise; } - abstract StatusCode toStatusCode(R result); + abstract Status toStatus(R result); + abstract R toFailedResult(Result sessionResult); // called on timer expiration @Override public void run() { if (promise.isCancelled()) { + finishRetrySpan(null, null); return; } executor.execute(this::requestSession); } public void requestSession() { - CompletableFuture> sessionFuture = queryClient.createSession(sessionCreationTimeout); + try (SpanScope ignored = parentSpan.makeCurrent()) { + retrySpan = tracer.startSpan(EXECUTE_WITH_RETRY_SPAN_NAME, SpanKind.INTERNAL); + } + CompletableFuture> sessionFuture = createSessionWithRetrySpanParent(); if (sessionFuture.isDone() && !sessionFuture.isCompletedExceptionally()) { // faster than subscribing on future acceptSession(sessionFuture.join()); @@ -176,31 +193,37 @@ public void requestSession() { private void acceptSession(@Nonnull Result sessionResult) { if (!sessionResult.isSuccess()) { - handleError(sessionResult.getStatus().getCode(), toFailedResult(sessionResult)); + handleError(sessionResult.getStatus(), toFailedResult(sessionResult)); return; } final QuerySession session = sessionResult.getValue(); try { - fn.apply(session).whenComplete((fnResult, fnException) -> { - try { - session.close(); - - if (fnException != null) { - handleException(fnException); - return; + try (SpanScope ignored = retrySpan.makeCurrent()) { + fn.apply(session).whenComplete((fnResult, fnException) -> { + try { + try (SpanScope ignored1 = retrySpan.makeCurrent()) { + session.close(); + + if (fnException != null) { + handleException(fnException); + return; + } + + Status status = toStatus(fnResult); + if (status.isSuccess()) { + finishRetrySpan(status, null); + promise.complete(fnResult); + } else { + handleError(status, fnResult); + } + } + } catch (Throwable unexpected) { + finishRetrySpan(null, unexpected); + promise.completeExceptionally(unexpected); } - - StatusCode statusCode = toStatusCode(fnResult); - if (statusCode == StatusCode.SUCCESS) { - promise.complete(fnResult); - } else { - handleError(statusCode, fnResult); - } - } catch (Throwable unexpected) { - promise.completeExceptionally(unexpected); - } - }); + }); + } } catch (RuntimeException ex) { session.close(); handleException(ex); @@ -214,18 +237,23 @@ private void scheduleNext(long delayMillis) { queryClient.getScheduler().schedule(this, delayMillis, TimeUnit.MILLISECONDS); } - private void handleError(@Nonnull StatusCode code, R result) { + private void handleError(@Nonnull Status status, R result) { // Check retrayable status - if (!canRetry(code)) { + if (!canRetry(status.getCode())) { + finishRetrySpan(status, null); promise.complete(result); return; } + int failedAttempt = retryNumber.get(); int retry = retryNumber.incrementAndGet(); if (retry <= maxRetries) { - long next = backoffTimeMillis(code, retry); + long next = backoffTimeMillis(status.getCode(), retry); + recordRetrySchedule(failedAttempt, next); + finishRetrySpan(status, null); scheduleNext(next); } else { + finishRetrySpan(status, null); promise.complete(result); } } @@ -233,18 +261,40 @@ private void handleError(@Nonnull StatusCode code, R result) { private void handleException(@Nonnull Throwable ex) { // Check retrayable execption if (!canRetry(ex)) { + finishRetrySpan(null, ex); promise.completeExceptionally(ex); return; } + int failedAttempt = retryNumber.get(); int retry = retryNumber.incrementAndGet(); if (retry <= maxRetries) { long next = backoffTimeMillis(ex, retry); + recordRetrySchedule(failedAttempt, next); + finishRetrySpan(null, ex); scheduleNext(next); } else { + finishRetrySpan(null, ex); promise.completeExceptionally(ex); } } + + private CompletableFuture> createSessionWithRetrySpanParent() { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return queryClient.createSession(sessionCreationTimeout); + } + } + + private void recordRetrySchedule(int failedAttempt, long nextDelayMillis) { + retrySpan.setAttribute(RETRY_ATTEMPT_ATTR, failedAttempt); + retrySpan.setAttribute(RETRY_SLEEP_MS_ATTR, nextDelayMillis); + } + + private void finishRetrySpan(Status status, Throwable throwable) { + retrySpan.setStatus(status, throwable); + retrySpan.end(); + retrySpan = Span.NOOP; + } } /** @@ -256,8 +306,8 @@ private final class RetryableResultTask extends BaseRetryableTask> } @Override - StatusCode toStatusCode(Result result) { - return result.getStatus().getCode(); + Status toStatus(Result result) { + return result.getStatus(); } @Override @@ -275,8 +325,8 @@ private final class RetryableStatusTask extends BaseRetryableTask { } @Override - StatusCode toStatusCode(Status status) { - return status.getCode(); + Status toStatus(Status status) { + return status; } @Override diff --git a/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java b/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java index 4e148d663..3904bc786 100644 --- a/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java +++ b/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java @@ -4,6 +4,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.annotation.Nullable; import org.junit.After; import org.junit.AfterClass; @@ -17,14 +21,18 @@ import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Result; import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.core.tracing.Span; import tech.ydb.core.tracing.SpanKind; +import tech.ydb.core.tracing.SpanScope; import tech.ydb.core.tracing.Tracer; import tech.ydb.query.QueryClient; import tech.ydb.query.QuerySession; import tech.ydb.query.QueryTransaction; import tech.ydb.query.result.QueryInfo; +import tech.ydb.query.tools.SessionRetryContext; import tech.ydb.table.Session; import tech.ydb.table.TableClient; import tech.ydb.table.query.DataQueryResult; @@ -158,12 +166,136 @@ public void executeQuerySpansAreRecordedInTableProxyTransaction() { Assert.assertEquals(1, tracer.countClosedSpan("ydb.Rollback")); } + @Test + public void querySpanIsChildOfApplicationSpan() { + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + Span appParent = tracer.startSpan("app.parent", SpanKind.INTERNAL); + try (SpanScope ignored = appParent.makeCurrent()) { + session.createQuery("SELECT 1", TxMode.NONE).execute().join().getStatus().expectSuccess(); + } finally { + appParent.end(); + } + } + + Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "app.parent")); + } + + @Test + public void retrySpanIsParentOfRpcSpans() { + AtomicInteger attempt = new AtomicInteger(); + SessionRetryContext retryContext = SessionRetryContext.create(queryClient) + .maxRetries(5) + .backoffSlot(Duration.ofMillis(1)) + .fastBackoffSlot(Duration.ofMillis(1)) + .build(); + + Span appParent = tracer.startSpan("app.parent.retry", SpanKind.INTERNAL); + try (SpanScope ignored = appParent.makeCurrent()) { + Status status = retryContext.supplyStatus(session -> { + if (attempt.getAndIncrement() == 0) { + return CompletableFuture.completedFuture(Status.of(StatusCode.OVERLOADED)); + } + return session.createQuery("SELECT 1", TxMode.NONE).execute().thenApply(Result::getStatus); + }).join(); + status.expectSuccess(); + } finally { + appParent.end(); + } + + Assert.assertEquals(2, tracer.countClosedSpan("ydb.ExecuteWithRetry")); + Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.CreateSession", "ydb.ExecuteWithRetry")); + Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.ExecuteWithRetry")); + } + + @Test + public void tableProxyRetrySpanIsParentOfRpcSpans() { + AtomicInteger attempt = new AtomicInteger(); + tableClient = QueryClient.newTableClient(transport).build(); + tech.ydb.table.SessionRetryContext retryContext = tech.ydb.table.SessionRetryContext.create(tableClient) + .maxRetries(5) + .backoffSlot(Duration.ofMillis(1)) + .fastBackoffSlot(Duration.ofMillis(1)) + .build(); + + Span appParent = tracer.startSpan("app.parent.table.retry", SpanKind.INTERNAL); + try (SpanScope ignored = appParent.makeCurrent()) { + Status status = retryContext.supplyStatus(session -> { + if (attempt.getAndIncrement() == 0) { + return CompletableFuture.completedFuture(Status.of(StatusCode.OVERLOADED)); + } + TableTransaction tx = session.createNewTransaction(TxMode.SERIALIZABLE_RW); + Result queryResult = tx.executeDataQuery("SELECT 1", Params.empty()).join(); + queryResult.getStatus().expectSuccess(); + return tx.commit(); + }).join(); + status.expectSuccess(); + } finally { + appParent.end(); + } + + Assert.assertEquals(2, tracer.countClosedSpan("ydb.ExecuteWithRetry")); + Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.CreateSession", "ydb.ExecuteWithRetry")); + Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.ExecuteWithRetry")); + Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.Commit", "ydb.ExecuteWithRetry")); + } + + @Test + public void nonRetryableExceptionClosesRetrySpan() { + SessionRetryContext retryContext = SessionRetryContext.create(queryClient) + .maxRetries(5) + .backoffSlot(Duration.ofMillis(1)) + .fastBackoffSlot(Duration.ofMillis(1)) + .build(); + + RuntimeException thrown; + try { + retryContext.supplyStatus(session -> { + throw new IllegalStateException("boom"); + }).join(); + throw new AssertionError("Exception expected"); + } catch (RuntimeException ex) { + thrown = ex; + } + + Assert.assertNotNull(thrown.getCause()); + Assert.assertTrue(thrown.getCause() instanceof IllegalStateException); + Assert.assertEquals(1, tracer.countClosedSpan("ydb.ExecuteWithRetry")); + Assert.assertEquals(1, + tracer.countClosedSpanWithErrorType("ydb.ExecuteWithRetry", IllegalStateException.class.getName())); + Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.CreateSession", "ydb.ExecuteWithRetry")); + Assert.assertEquals(0, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.ExecuteWithRetry")); + } + + @Test + public void retryableUnexpectedResultExceptionRetriesAndSetsErrorType() { + AtomicInteger attempt = new AtomicInteger(); + SessionRetryContext retryContext = SessionRetryContext.create(queryClient) + .maxRetries(5) + .backoffSlot(Duration.ofMillis(1)) + .fastBackoffSlot(Duration.ofMillis(1)) + .build(); + + Status status = retryContext.supplyStatus(session -> { + if (attempt.getAndIncrement() == 0) { + throw new UnexpectedResultException("retryable", Status.of(StatusCode.OVERLOADED)); + } + return session.createQuery("SELECT 1", TxMode.NONE).execute().thenApply(Result::getStatus); + }).join(); + status.expectSuccess(); + + Assert.assertEquals(2, tracer.countClosedSpan("ydb.ExecuteWithRetry")); + Assert.assertEquals(1, + tracer.countClosedSpanWithErrorType("ydb.ExecuteWithRetry", UnexpectedResultException.class.getName())); + Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.ExecuteWithRetry")); + } + private static final class RecordingTracer implements Tracer { private final List spans = Collections.synchronizedList(new ArrayList<>()); + private final ThreadLocal currentSpan = new ThreadLocal<>(); @Override public Span startSpan(String spanName, SpanKind spanKind) { - RecordingSpan span = new RecordingSpan(spanName, spanKind); + RecordingSpan span = new RecordingSpan(this, spanName, spanKind, currentSpan.get()); spans.add(span); return span; } @@ -183,16 +315,63 @@ int countClosedSpan(String spanName) { } return count; } + + int countClosedSpanWithParent(String spanName, String parentSpanName) { + int count = 0; + synchronized (spans) { + for (RecordingSpan span : spans) { + if (span.closed + && span.name.equals(spanName) + && span.parent != null + && span.parent.name.equals(parentSpanName)) { + count++; + } + } + } + return count; + } + + int countClosedSpanWithErrorType(String spanName, String errorType) { + int count = 0; + synchronized (spans) { + for (RecordingSpan span : spans) { + if (span.closed + && span.name.equals(spanName) + && span.throwableError != null + && span.throwableError.getClass().getName().equals(errorType)) { + count++; + } + } + } + return count; + } + + SpanScope makeSpanCurrent(RecordingSpan span) { + RecordingSpan previous = currentSpan.get(); + currentSpan.set(span); + return () -> { + if (previous == null) { + currentSpan.remove(); + } else { + currentSpan.set(previous); + } + }; + } } private static final class RecordingSpan implements Span { + private final RecordingTracer tracer; private final String name; private final SpanKind kind; + private final RecordingSpan parent; + private Throwable throwableError; private volatile boolean closed = false; - RecordingSpan(String name, SpanKind kind) { + RecordingSpan(RecordingTracer tracer, String name, SpanKind kind, RecordingSpan parent) { + this.tracer = tracer; this.name = name; this.kind = kind; + this.parent = parent; } @Override @@ -201,18 +380,18 @@ public String getId() { } @Override - public void setAttribute(String key, String value) { - // not needed for this test + public boolean isValid() { + return true; } @Override - public void setAttribute(String key, long value) { - // not needed for this test + public SpanScope makeCurrent() { + return tracer.makeSpanCurrent(this); } @Override - public void setStatus(Status status, Throwable error) { - // not needed for this test + public void setStatus(@Nullable Status status, @Nullable Throwable error) { + this.throwableError = error; } @Override diff --git a/table/src/main/java/tech/ydb/table/SessionRetryContext.java b/table/src/main/java/tech/ydb/table/SessionRetryContext.java index b828ae182..128df3e16 100644 --- a/table/src/main/java/tech/ydb/table/SessionRetryContext.java +++ b/table/src/main/java/tech/ydb/table/SessionRetryContext.java @@ -20,6 +20,10 @@ import tech.ydb.core.Status; import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; +import tech.ydb.core.tracing.Span; +import tech.ydb.core.tracing.SpanKind; +import tech.ydb.core.tracing.SpanScope; +import tech.ydb.core.tracing.Tracer; import tech.ydb.core.utils.FutureTools; @@ -28,6 +32,9 @@ */ @ParametersAreNonnullByDefault public class SessionRetryContext { + private static final String EXECUTE_WITH_RETRY_SPAN_NAME = "ydb.ExecuteWithRetry"; + private static final String RETRY_ATTEMPT_ATTR = "ydb.retry.attempt"; + private static final String RETRY_SLEEP_MS_ATTR = "ydb.retry.sleep_ms"; private final SessionSupplier sessionSupplier; private final Executor executor; @@ -66,14 +73,14 @@ public CompletableFuture supplyStatus(Function CompletableFuture> supplyResult(SessionRetryHandler h, - Function>> fn) { + Function>> fn) { RetryableResultTask task = new RetryableResultTask<>(h, fn); task.requestSession(); return task.getFuture(); } public CompletableFuture supplyStatus(SessionRetryHandler h, - Function> fn) { + Function> fn) { RetryableStatusTask task = new RetryableStatusTask(h, fn); task.requestSession(); return task.getFuture(); @@ -147,17 +154,23 @@ private abstract class BaseRetryableTask implements Runnable { private final Function> fn; private final long createTimestamp = Instant.now().toEpochMilli(); private final SessionRetryHandler handler; + private final Tracer tracer; + private final Span parentSpan; + private Span retrySpan = Span.NOOP; BaseRetryableTask(SessionRetryHandler h, Function> fn) { this.fn = fn; this.handler = h; + this.tracer = sessionSupplier.getTracer(); + this.parentSpan = tracer.currentSpan(); } CompletableFuture getFuture() { return promise; } - abstract StatusCode toStatusCode(R result); + abstract Status toStatus(R result); + abstract R toFailedResult(Result sessionResult); private long ms() { @@ -169,13 +182,17 @@ private long ms() { public void run() { if (promise.isCancelled()) { handler.onCancel(SessionRetryContext.this, retryNumber.get(), ms()); + finishRetrySpan(null, null); return; } executor.execute(this::requestSession); } public void requestSession() { - CompletableFuture> sessionFuture = sessionSupplier.createSession(sessionCreationTimeout); + try (SpanScope ignored = parentSpan.makeCurrent()) { + retrySpan = tracer.startSpan(EXECUTE_WITH_RETRY_SPAN_NAME, SpanKind.INTERNAL); + } + CompletableFuture> sessionFuture = createSessionWithRetrySpanParent(); if (sessionFuture.isDone() && !sessionFuture.isCompletedExceptionally()) { // faster than subscribing on future acceptSession(sessionFuture.join()); @@ -193,33 +210,39 @@ public void requestSession() { private void acceptSession(@Nonnull Result sessionResult) { if (!sessionResult.isSuccess()) { - handleError(sessionResult.getStatus().getCode(), toFailedResult(sessionResult)); + handleError(sessionResult.getStatus(), toFailedResult(sessionResult)); return; } final Session session = sessionResult.getValue(); try { - fn.apply(session).whenComplete((fnResult, fnException) -> { - try { - session.close(); - - if (fnException != null) { - handleException(fnException); - return; + try (SpanScope ignored = retrySpan.makeCurrent()) { + fn.apply(session).whenComplete((fnResult, fnException) -> { + try { + try (SpanScope ignored1 = retrySpan.makeCurrent()) { + session.close(); + + if (fnException != null) { + handleException(fnException); + return; + } + + Status status = toStatus(fnResult); + if (status.isSuccess()) { + handler.onSuccess(SessionRetryContext.this, retryNumber.get(), ms()); + finishRetrySpan(status, null); + promise.complete(fnResult); + } else { + handleError(status, fnResult); + } + } + } catch (Throwable unexpected) { + handler.onError(SessionRetryContext.this, unexpected, retryNumber.get(), ms()); + finishRetrySpan(null, unexpected); + promise.completeExceptionally(unexpected); } - - StatusCode statusCode = toStatusCode(fnResult); - if (statusCode == StatusCode.SUCCESS) { - handler.onSuccess(SessionRetryContext.this, retryNumber.get(), ms()); - promise.complete(fnResult); - } else { - handleError(statusCode, fnResult); - } - } catch (Throwable unexpected) { - handler.onError(SessionRetryContext.this, unexpected, retryNumber.get(), ms()); - promise.completeExceptionally(unexpected); - } - }); + }); + } } catch (RuntimeException ex) { session.close(); handleException(ex); @@ -233,21 +256,26 @@ private void scheduleNext(long delayMillis) { sessionSupplier.getScheduler().schedule(this, delayMillis, TimeUnit.MILLISECONDS); } - private void handleError(@Nonnull StatusCode code, R result) { - // Check retrayable status + private void handleError(@Nonnull Status status, R result) { + StatusCode code = status.getCode(); if (!canRetry(code)) { handler.onError(SessionRetryContext.this, code, retryNumber.get(), ms()); + finishRetrySpan(status, null); promise.complete(result); return; } + int failedAttempt = retryNumber.get(); int retry = retryNumber.incrementAndGet(); if (retry <= maxRetries) { long next = backoffTimeMillis(code, retry); handler.onRetry(SessionRetryContext.this, code, retry, next, ms()); + recordRetrySchedule(failedAttempt, next); + finishRetrySpan(status, null); scheduleNext(next); } else { handler.onLimit(SessionRetryContext.this, code, maxRetries, ms()); + finishRetrySpan(status, null); promise.complete(result); } } @@ -256,20 +284,42 @@ private void handleException(@Nonnull Throwable ex) { // Check retrayable execption if (!canRetry(ex)) { handler.onError(SessionRetryContext.this, ex, retryNumber.get(), ms()); + finishRetrySpan(null, ex); promise.completeExceptionally(ex); return; } + int failedAttempt = retryNumber.get(); int retry = retryNumber.incrementAndGet(); if (retry <= maxRetries) { long next = backoffTimeMillis(ex, retry); handler.onRetry(SessionRetryContext.this, ex, retry, next, ms()); + recordRetrySchedule(failedAttempt, next); + finishRetrySpan(null, ex); scheduleNext(next); } else { handler.onLimit(SessionRetryContext.this, ex, maxRetries, ms()); + finishRetrySpan(null, ex); promise.completeExceptionally(ex); } } + + private CompletableFuture> createSessionWithRetrySpanParent() { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return sessionSupplier.createSession(sessionCreationTimeout); + } + } + + private void recordRetrySchedule(int failedAttempt, long nextDelayMillis) { + retrySpan.setAttribute(RETRY_ATTEMPT_ATTR, failedAttempt); + retrySpan.setAttribute(RETRY_SLEEP_MS_ATTR, nextDelayMillis); + } + + private void finishRetrySpan(Status status, Throwable throwable) { + retrySpan.setStatus(status, throwable); + retrySpan.end(); + retrySpan = Span.NOOP; + } } /** @@ -281,8 +331,8 @@ private final class RetryableResultTask extends BaseRetryableTask> } @Override - StatusCode toStatusCode(Result result) { - return result.getStatus().getCode(); + Status toStatus(Result result) { + return result.getStatus(); } @Override @@ -300,8 +350,8 @@ private final class RetryableStatusTask extends BaseRetryableTask { } @Override - StatusCode toStatusCode(Status status) { - return status.getCode(); + Status toStatus(Status status) { + return status; } @Override diff --git a/table/src/main/java/tech/ydb/table/SessionSupplier.java b/table/src/main/java/tech/ydb/table/SessionSupplier.java index 35ecd007f..dbec11281 100644 --- a/table/src/main/java/tech/ydb/table/SessionSupplier.java +++ b/table/src/main/java/tech/ydb/table/SessionSupplier.java @@ -5,6 +5,8 @@ import java.util.concurrent.ScheduledExecutorService; import tech.ydb.core.Result; +import tech.ydb.core.tracing.NoopTracer; +import tech.ydb.core.tracing.Tracer; /** @@ -26,4 +28,7 @@ public interface SessionSupplier { */ ScheduledExecutorService getScheduler(); + default Tracer getTracer() { + return NoopTracer.getInstance(); + } } diff --git a/table/src/main/java/tech/ydb/table/impl/SimpleTableClient.java b/table/src/main/java/tech/ydb/table/impl/SimpleTableClient.java index 199d3ae73..596466acb 100644 --- a/table/src/main/java/tech/ydb/table/impl/SimpleTableClient.java +++ b/table/src/main/java/tech/ydb/table/impl/SimpleTableClient.java @@ -6,6 +6,7 @@ import tech.ydb.core.Result; import tech.ydb.core.StatusCode; +import tech.ydb.core.tracing.Tracer; import tech.ydb.table.Session; import tech.ydb.table.SessionSupplier; import tech.ydb.table.rpc.TableRpc; @@ -38,6 +39,11 @@ public ScheduledExecutorService getScheduler() { return tableRpc.getScheduler(); } + @Override + public Tracer getTracer() { + return tableRpc.getTracer(); + } + public static Builder newClient(TableRpc rpc) { return new Builder(rpc); } diff --git a/table/src/main/java/tech/ydb/table/rpc/TableRpc.java b/table/src/main/java/tech/ydb/table/rpc/TableRpc.java index 714e242ba..c60684d59 100644 --- a/table/src/main/java/tech/ydb/table/rpc/TableRpc.java +++ b/table/src/main/java/tech/ydb/table/rpc/TableRpc.java @@ -7,6 +7,8 @@ import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.grpc.GrpcRequestSettings; +import tech.ydb.core.tracing.NoopTracer; +import tech.ydb.core.tracing.Tracer; import tech.ydb.proto.table.YdbTable; import tech.ydb.proto.table.YdbTable.AlterTableRequest; import tech.ydb.proto.table.YdbTable.BeginTransactionRequest; @@ -50,6 +52,10 @@ public interface TableRpc extends AutoCloseable { ScheduledExecutorService getScheduler(); + default Tracer getTracer() { + return NoopTracer.getInstance(); + } + @Override void close(); diff --git a/table/src/main/java/tech/ydb/table/rpc/grpc/GrpcTableRpc.java b/table/src/main/java/tech/ydb/table/rpc/grpc/GrpcTableRpc.java index eaad07cc6..f1c597646 100644 --- a/table/src/main/java/tech/ydb/table/rpc/grpc/GrpcTableRpc.java +++ b/table/src/main/java/tech/ydb/table/rpc/grpc/GrpcTableRpc.java @@ -14,6 +14,7 @@ import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.core.operation.OperationBinder; import tech.ydb.core.operation.StatusExtractor; +import tech.ydb.core.tracing.Tracer; import tech.ydb.proto.table.YdbTable; import tech.ydb.proto.table.v1.TableServiceGrpc; import tech.ydb.table.rpc.TableRpc; @@ -246,6 +247,11 @@ public ScheduledExecutorService getScheduler() { return transport.getScheduler(); } + @Override + public Tracer getTracer() { + return transport.getTracer(); + } + @Override public void close() { if (transportOwned) {