From f0cb2fd57f9b3d163c510e257f304a904f7a4e25 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Wed, 11 Mar 2026 14:27:48 +0300 Subject: [PATCH 1/4] feat: support RetryCtx span --- .../ydb/core/grpc/GrpcRequestSettings.java | 7 +- .../tech/ydb/core/grpc/GrpcTransport.java | 2 +- .../ydb/core/grpc/GrpcTransportBuilder.java | 1 - .../tech/ydb/core/impl/BaseGrpcTransport.java | 3 +- .../tech/ydb/core/tracing/NoopTracer.java | 4 +- .../main/java/tech/ydb/core/tracing/Span.java | 91 +++- .../java/tech/ydb/core/tracing/SpanScope.java | 9 + .../java/tech/ydb/core/tracing/Tracer.java | 14 +- .../ydb/core/impl/YdbTransportImplTest.java | 11 +- .../tech/ydb/core/tracing/NoopTracerTest.java | 7 +- .../java/tech/ydb/core/tracing/SpanTest.java | 24 +- .../opentelemetry/OpenTelemetryTracer.java | 22 + ...nTelemetryQueryTracingIntegrationTest.java | 150 ++++++- .../main/java/tech/ydb/query/QueryClient.java | 7 + .../tech/ydb/query/impl/QueryClientImpl.java | 8 + .../java/tech/ydb/query/impl/SessionImpl.java | 91 ++-- .../tech/ydb/query/impl/TableClientImpl.java | 6 + .../ydb/query/tools/SessionRetryContext.java | 212 ++++++++- .../tech/ydb/query/impl/QueryTracingTest.java | 127 +++++- .../tech/ydb/table/SessionRetryContext.java | 403 +++++++++++++++++- .../java/tech/ydb/table/SessionSupplier.java | 5 + .../ydb/table/impl/SimpleTableClient.java | 6 + .../java/tech/ydb/table/rpc/TableRpc.java | 6 + .../tech/ydb/table/rpc/grpc/GrpcTableRpc.java | 6 + 24 files changed, 1078 insertions(+), 144 deletions(-) create mode 100644 core/src/main/java/tech/ydb/core/tracing/SpanScope.java diff --git a/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java b/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java index ce46e495a..e7850d771 100644 --- a/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java +++ b/core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java @@ -6,8 +6,6 @@ import java.util.function.BooleanSupplier; import java.util.function.Consumer; -import javax.annotation.Nullable; - import io.grpc.Metadata; import tech.ydb.core.impl.call.GrpcFlows; @@ -87,7 +85,6 @@ public GrpcFlowControl getFlowControl() { return flowControl; } - @Nullable public Span getSpan() { return span; } @@ -103,7 +100,7 @@ public static final class Builder { private Consumer trailersHandler = null; private BooleanSupplier pessimizationHook = null; private GrpcFlowControl flowControl = GrpcFlows.SIMPLE_FLOW; - private Span span = null; + private Span span = Span.NOOP; /** * Returns a new {@code Builder} with a deadline, based on the running Java Virtual Machine's @@ -185,7 +182,7 @@ public Builder withPessimizationHook(BooleanSupplier pessimizationHook) { } public Builder withSpan(Span span) { - this.span = span; + this.span = span == null ? Span.NOOP : span; return this; } diff --git a/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java b/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java index 787d05df5..96816601f 100644 --- a/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java +++ b/core/src/main/java/tech/ydb/core/grpc/GrpcTransport.java @@ -43,7 +43,7 @@ GrpcReadWriteStream readWriteStreamCall( ScheduledExecutorService getScheduler(); default Tracer getTracer() { - return NoopTracer.INSTANCE; + return NoopTracer.getInstance(); } @Override diff --git a/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java b/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java index 9273d99a0..534cf08be 100644 --- a/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java +++ b/core/src/main/java/tech/ydb/core/grpc/GrpcTransportBuilder.java @@ -30,7 +30,6 @@ import tech.ydb.core.tracing.Tracer; import tech.ydb.core.utils.Version; - /** * * @author Aleksandr Gorshenin diff --git a/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java b/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java index b6a879b99..0738de347 100644 --- a/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java +++ b/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java @@ -254,7 +254,8 @@ private Metadata makeMetadataFromSettings(GrpcRequestSettings settings, Endpoint } Span span = settings.getSpan(); - if (span != null) { + + if (span.isValid()) { span.setAttribute("db.system.name", "ydb"); span.setAttribute("db.namespace", getDatabase()); span.setAttribute("server.address", serverEndpoint.getHost()); diff --git a/core/src/main/java/tech/ydb/core/tracing/NoopTracer.java b/core/src/main/java/tech/ydb/core/tracing/NoopTracer.java index 37ee3668f..7187448be 100644 --- a/core/src/main/java/tech/ydb/core/tracing/NoopTracer.java +++ b/core/src/main/java/tech/ydb/core/tracing/NoopTracer.java @@ -7,7 +7,7 @@ * singletons, no allocations per call (except caller code). */ public final class NoopTracer implements Tracer { - public static final NoopTracer INSTANCE = new NoopTracer(); + private static final NoopTracer INSTANCE = new NoopTracer(); private NoopTracer() { } @@ -18,6 +18,6 @@ public static NoopTracer getInstance() { @Override public Span startSpan(String spanName, SpanKind spanKind) { - return null; + return Span.NOOP; } } diff --git a/core/src/main/java/tech/ydb/core/tracing/Span.java b/core/src/main/java/tech/ydb/core/tracing/Span.java index b4c5ec933..ff2402e52 100644 --- a/core/src/main/java/tech/ydb/core/tracing/Span.java +++ b/core/src/main/java/tech/ydb/core/tracing/Span.java @@ -12,24 +12,47 @@ * A span represents a timed operation. */ public interface Span { + Span NOOP = new Span() { + }; - String getId(); + /** + * Returns W3C traceparent value for request propagation. + * + *

For {@link #NOOP} this returns an empty string. Check {@link #isValid()} to decide whether + * trace headers should be sent to server. + * + * @return traceparent value + */ + default String getId() { + return ""; + } + + /** + * Indicates whether this span carries a real tracing context. + * + * @return true for real spans, false for noop span + */ + default boolean isValid() { + return false; + } /** - * Sets a string attribute on the span (ignored by Noop implementation). + * Sets a string attribute on the span. * * @param key attribute key * @param value attribute value, may be null */ - void setAttribute(String key, @Nullable String value); + default void setAttribute(String key, @Nullable String value) { + } /** - * Sets a long attribute on the span (ignored by Noop implementation). + * Sets a long attribute on the span. * * @param key attribute key * @param value attribute value */ - void setAttribute(String key, long value); + default void setAttribute(String key, long value) { + } /** * Sets span status (success or error) with human-readable message. @@ -37,27 +60,53 @@ public interface Span { * @param status operation status used to map span attributes * @param error operation exception used to map span attributes */ - void setStatus(Status status, Throwable error); + default void setStatus(@Nullable Status status, @Nullable Throwable error) { + } - void end(); + /** + * Makes this span current in the active execution context. + * + * @return closeable scope handle + */ + default SpanScope makeCurrent() { + return () -> { + }; + } + + /** + * Ends (finishes) this span. + */ + default void end() { + } + /** + * Subscribes to a {@link Status} future: when it completes, sets status/error and ends the span. + * For non-valid spans returns the future as-is without subscribing. + * + * @param span the span to finalize + * @param future the future to observe + * @return the same future (for chaining) + */ static CompletableFuture endOnStatus(Span span, CompletableFuture future) { - if (span != null) { - future.whenComplete((status, th) -> { - span.setStatus(status, FutureTools.unwrapCompletionException(th)); - span.end(); - }); - } - return future; + return span.isValid() ? future.whenComplete((status, th) -> { + span.setStatus(status, FutureTools.unwrapCompletionException(th)); + span.end(); + }) : future; } + /** + * Subscribes to a {@link Result} future: when it completes, sets status/error and ends the span. + * For non-valid spans returns the future as-is without subscribing. + * + * @param result value type + * @param span the span to finalize + * @param future the future to observe + * @return the same future (for chaining) + */ static CompletableFuture> endOnResult(Span span, CompletableFuture> future) { - if (span != null) { - future.whenComplete((result, th) -> { - span.setStatus(result != null ? result.getStatus() : null, FutureTools.unwrapCompletionException(th)); - span.end(); - }); - } - return future; + return span.isValid() ? future.whenComplete((result, th) -> { + span.setStatus(result != null ? result.getStatus() : null, FutureTools.unwrapCompletionException(th)); + span.end(); + }) : future; } } diff --git a/core/src/main/java/tech/ydb/core/tracing/SpanScope.java b/core/src/main/java/tech/ydb/core/tracing/SpanScope.java new file mode 100644 index 000000000..5281b0702 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/tracing/SpanScope.java @@ -0,0 +1,9 @@ +package tech.ydb.core.tracing; + +/** + * Closeable scope returned by {@link Span#makeCurrent()}. + */ +public interface SpanScope extends AutoCloseable { + @Override + void close(); +} diff --git a/core/src/main/java/tech/ydb/core/tracing/Tracer.java b/core/src/main/java/tech/ydb/core/tracing/Tracer.java index 74b698663..7bf37a7a3 100644 --- a/core/src/main/java/tech/ydb/core/tracing/Tracer.java +++ b/core/src/main/java/tech/ydb/core/tracing/Tracer.java @@ -1,7 +1,5 @@ package tech.ydb.core.tracing; -import javax.annotation.Nullable; - /** * Tracer is an entry point to create spans. * @@ -17,8 +15,16 @@ public interface Tracer { * * @param spanName logical span name (for example, ydb.ExecuteQuery) * @param spanKind span kind that defines operation role - * @return created span instance, or null if implementation does not create spans + * @return created span instance */ - @Nullable Span startSpan(String spanName, SpanKind spanKind); + + /** + * Returns the currently active span. + * + * @return active span or {@link Span#NOOP} when unavailable + */ + default Span currentSpan() { + return Span.NOOP; + } } diff --git a/core/src/test/java/tech/ydb/core/impl/YdbTransportImplTest.java b/core/src/test/java/tech/ydb/core/impl/YdbTransportImplTest.java index c173aa78f..1ec3cd56f 100644 --- a/core/src/test/java/tech/ydb/core/impl/YdbTransportImplTest.java +++ b/core/src/test/java/tech/ydb/core/impl/YdbTransportImplTest.java @@ -34,6 +34,7 @@ import tech.ydb.core.operation.OperationBinder; import tech.ydb.core.tracing.NoopTracer; import tech.ydb.core.tracing.Span; +import tech.ydb.core.tracing.SpanScope; import tech.ydb.core.tracing.Tracer; import tech.ydb.proto.discovery.DiscoveryProtos; import tech.ydb.proto.discovery.v1.DiscoveryServiceGrpc; @@ -410,6 +411,11 @@ public String getId() { return id; } + @Override + public boolean isValid() { + return true; + } + @Override public void setAttribute(String key, String value) { stringAttrs.put(key, value); @@ -420,11 +426,6 @@ public void setAttribute(String key, long value) { longAttrs.put(key, value); } - @Override - public void setStatus(tech.ydb.core.Status status, Throwable error) { - // not needed in this test - } - @Override public void end() { // not needed in this test diff --git a/core/src/test/java/tech/ydb/core/tracing/NoopTracerTest.java b/core/src/test/java/tech/ydb/core/tracing/NoopTracerTest.java index 35ace261d..3718efa51 100644 --- a/core/src/test/java/tech/ydb/core/tracing/NoopTracerTest.java +++ b/core/src/test/java/tech/ydb/core/tracing/NoopTracerTest.java @@ -20,8 +20,9 @@ public void singletonTest() { public void spanTest() { Tracer tracer = NoopTracer.getInstance(); - Assert.assertNull(tracer.startSpan("test", SpanKind.CLIENT)); - Assert.assertNull(tracer.startSpan("test", SpanKind.INTERNAL)); - Assert.assertNull(tracer.startSpan(null, null)); + Assert.assertSame(Span.NOOP, tracer.startSpan("test", SpanKind.CLIENT)); + Assert.assertSame(Span.NOOP, tracer.startSpan("test", SpanKind.INTERNAL)); + Assert.assertSame(Span.NOOP, tracer.startSpan(null, null)); + Assert.assertSame(Span.NOOP, tracer.currentSpan()); } } diff --git a/core/src/test/java/tech/ydb/core/tracing/SpanTest.java b/core/src/test/java/tech/ydb/core/tracing/SpanTest.java index d03fc6432..186f9de14 100644 --- a/core/src/test/java/tech/ydb/core/tracing/SpanTest.java +++ b/core/src/test/java/tech/ydb/core/tracing/SpanTest.java @@ -19,8 +19,8 @@ public void finishByStatusExceptionTest() { CompletableFuture future = new CompletableFuture<>(); - Assert.assertSame(future, Span.endOnStatus(null, future)); - Assert.assertSame(future, Span.endOnStatus(span, future)); + Assert.assertSame(future, Span.endOnStatus(Span.NOOP, future)); + Assert.assertNotSame(future, Span.endOnStatus(span, future)); Assert.assertFalse(span.ended); Assert.assertNull(span.statusError); @@ -40,7 +40,7 @@ public void finishByResultTest() { CompletableFuture> future = new CompletableFuture<>(); - Assert.assertSame(future, Span.endOnResult(span, future)); + Assert.assertNotSame(future, Span.endOnResult(span, future)); Assert.assertFalse(span.ended); Assert.assertNull(span.statusError); @@ -60,8 +60,8 @@ public void finishByResultExceptionTest() { CompletableFuture> future = new CompletableFuture<>(); - Assert.assertSame(future, Span.endOnResult(null, future)); - Assert.assertSame(future, Span.endOnResult(span, future)); + Assert.assertSame(future, Span.endOnResult(Span.NOOP, future)); + Assert.assertNotSame(future, Span.endOnResult(span, future)); Assert.assertFalse(span.ended); Assert.assertNull(span.statusError); @@ -80,18 +80,8 @@ private static final class RecordingSpan implements Span { private boolean ended; @Override - public String getId() { - return "test-span"; - } - - @Override - public void setAttribute(String key, String value) { - // not needed in this test - } - - @Override - public void setAttribute(String key, long value) { - // not needed in this test + public boolean isValid() { + return true; } @Override diff --git a/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java index 40e358dcb..14fc0621c 100644 --- a/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java +++ b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java @@ -5,10 +5,12 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; import tech.ydb.core.Status; import tech.ydb.core.tracing.Span; import tech.ydb.core.tracing.SpanKind; +import tech.ydb.core.tracing.SpanScope; import tech.ydb.core.tracing.Tracer; public final class OpenTelemetryTracer implements Tracer { @@ -42,6 +44,15 @@ public Span startSpan(String spanName, SpanKind spanKind) { return new OtelSpan(span); } + @Override + public Span currentSpan() { + io.opentelemetry.api.trace.Span current = io.opentelemetry.api.trace.Span.current(); + if (!current.getSpanContext().isValid()) { + return Span.NOOP; + } + return new OtelSpan(current); + } + private static io.opentelemetry.api.trace.SpanKind mapSpanKind(SpanKind kind) { if (kind == SpanKind.CLIENT) { return io.opentelemetry.api.trace.SpanKind.CLIENT; @@ -61,6 +72,11 @@ public String getId() { return "00-" + span.getSpanContext().getTraceId() + "-" + span.getSpanContext().getSpanId() + "-01"; } + @Override + public boolean isValid() { + return span.getSpanContext().isValid(); + } + @Override public void setAttribute(String key, String value) { span.setAttribute(key, value); @@ -89,6 +105,12 @@ public void setStatus(Status status, Throwable error) { } } + @Override + public SpanScope makeCurrent() { + Scope scope = span.makeCurrent(); + return scope::close; + } + @Override public void end() { span.end(); diff --git a/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryQueryTracingIntegrationTest.java b/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryQueryTracingIntegrationTest.java index a7cb6c6f7..ff2d91f6f 100644 --- a/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryQueryTracingIntegrationTest.java +++ b/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryQueryTracingIntegrationTest.java @@ -1,7 +1,11 @@ package tech.ydb.opentelemetry; import java.time.Duration; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.AttributeKey; @@ -30,6 +34,7 @@ import tech.ydb.query.QuerySession; import tech.ydb.query.QueryTransaction; import tech.ydb.query.result.QueryInfo; +import tech.ydb.query.tools.SessionRetryContext; import tech.ydb.table.Session; import tech.ydb.table.TableClient; import tech.ydb.table.query.DataQueryResult; @@ -196,6 +201,149 @@ public void sdkSpanIsChildOfApplicationSpan() { } } + @Test + public void retrySpanIsParentForRpcSpans() { + AtomicInteger attempt = new AtomicInteger(); + SessionRetryContext retryContext = SessionRetryContext.create(queryClient) + .maxRetries(1) + .backoffSlot(Duration.ofMillis(1)) + .fastBackoffSlot(Duration.ofMillis(1)) + .build(); + + io.opentelemetry.api.trace.Span appSpan = appTracer.spanBuilder("app.parent.retry").startSpan(); + try (Scope scope = appSpan.makeCurrent()) { + Assert.assertNotNull(scope); + Status status = retryContext.supplyStatus(session -> { + if (attempt.getAndIncrement() == 0) { + return CompletableFuture.completedFuture(Status.of(StatusCode.OVERLOADED)); + } + return session.createQuery("SELECT 1", TxMode.NONE).execute().thenApply(Result::getStatus); + }).join(); + status.expectSuccess(); + } finally { + appSpan.end(); + } + + List spans = spanExporter.getFinishedSpanItems(); + SpanData querySpan = null; + Set retrySpanIds = new HashSet<>(); + int retrySpansCount = 0; + + for (SpanData span : spans) { + if ("ydb.ExecuteWithRetry".equals(span.getName())) { + retrySpansCount++; + retrySpanIds.add(span.getSpanId()); + Assert.assertEquals(appSpan.getSpanContext().getTraceId(), span.getTraceId()); + Assert.assertEquals(appSpan.getSpanContext().getSpanId(), span.getParentSpanId()); + } + if ("ydb.ExecuteQuery".equals(span.getName())) { + querySpan = span; + } + } + + Assert.assertEquals(2, retrySpansCount); + Assert.assertNotNull("Retry should produce query rpc span", querySpan); + Assert.assertEquals(appSpan.getSpanContext().getTraceId(), querySpan.getTraceId()); + Assert.assertTrue("RPC span parent must be ExecuteWithRetry span", + retrySpanIds.contains(querySpan.getParentSpanId())); + } + + @Test + public void tableProxyRetrySpanIsParentForRpcSpans() { + AtomicInteger attempt = new AtomicInteger(); + io.opentelemetry.api.trace.Span appSpan = appTracer.spanBuilder("app.parent.table.retry").startSpan(); + try (Scope scope = appSpan.makeCurrent()) { + Assert.assertNotNull(scope); + try (TableClient tableClient = QueryClient.newTableClient(transport).build()) { + tech.ydb.table.SessionRetryContext retryContext = tech.ydb.table.SessionRetryContext.create(tableClient) + .maxRetries(1) + .backoffSlot(Duration.ofMillis(1)) + .fastBackoffSlot(Duration.ofMillis(1)) + .build(); + + Status status = retryContext.supplyStatus(session -> { + if (attempt.getAndIncrement() == 0) { + return CompletableFuture.completedFuture(Status.of(StatusCode.OVERLOADED)); + } + TableTransaction tx = session.createNewTransaction(TxMode.SERIALIZABLE_RW); + Result queryResult = tx.executeDataQuery("SELECT 1", Params.empty()).join(); + queryResult.getStatus().expectSuccess(); + return tx.commit(); + }).join(); + status.expectSuccess(); + } + } finally { + appSpan.end(); + } + + assertSpanOK("ydb.CreateSession", 1); + assertSpanOK("ydb.ExecuteQuery", 1); + assertSpanOK("ydb.Commit", 1); + + List spans = spanExporter.getFinishedSpanItems(); + Set retrySpanIds = new HashSet<>(); + + for (SpanData span : spans) { + if ("ydb.ExecuteWithRetry".equals(span.getName())) { + retrySpanIds.add(span.getSpanId()); + Assert.assertEquals(appSpan.getSpanContext().getTraceId(), span.getTraceId()); + Assert.assertEquals(appSpan.getSpanContext().getSpanId(), span.getParentSpanId()); + } + } + Assert.assertEquals(2, retrySpanIds.size()); + + int createSessionChildren = 0; + int executeQueryChildren = 0; + int commitChildren = 0; + + for (SpanData span : spans) { + if ("ydb.CreateSession".equals(span.getName()) && retrySpanIds.contains(span.getParentSpanId())) { + createSessionChildren++; + } + if ("ydb.ExecuteQuery".equals(span.getName()) && retrySpanIds.contains(span.getParentSpanId())) { + executeQueryChildren++; + } + if ("ydb.Commit".equals(span.getName()) && retrySpanIds.contains(span.getParentSpanId())) { + commitChildren++; + } + } + + Assert.assertTrue("CreateSession span must be child of ExecuteWithRetry", createSessionChildren >= 1); + Assert.assertTrue("ExecuteQuery span must be child of ExecuteWithRetry", executeQueryChildren >= 1); + Assert.assertTrue("Commit span must be child of ExecuteWithRetry", commitChildren >= 1); + } + + @Test + public void tableProxySpansAreChildrenOfApplicationSpan() { + io.opentelemetry.api.trace.Span appSpan = appTracer.spanBuilder("app.parent.table").startSpan(); + try (Scope scope = appSpan.makeCurrent()) { + Assert.assertNotNull(scope); + try (TableClient tableClient = QueryClient.newTableClient(transport).build()) { + try (Session session = tableClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + TableTransaction tx = session.createNewTransaction(TxMode.SERIALIZABLE_RW); + Result queryResult = tx.executeDataQuery("SELECT 1", Params.empty()).join(); + queryResult.getStatus().expectSuccess(); + Status commitStatus = tx.commit().join(); + commitStatus.expectSuccess(); + } + } + } finally { + appSpan.end(); + } + + List spans = spanExporter.getFinishedSpanItems(); + int matched = 0; + for (SpanData span : spans) { + if (!span.getName().startsWith("ydb.")) { + continue; + } + Assert.assertEquals(appSpan.getSpanContext().getTraceId(), span.getTraceId()); + Assert.assertEquals(appSpan.getSpanContext().getSpanId(), span.getParentSpanId()); + matched++; + } + Assert.assertEquals(3, matched); + } + private void assertBaseAttributes(SpanData span) { Assert.assertEquals(io.opentelemetry.api.trace.SpanKind.CLIENT, span.getKind()); Assert.assertEquals("ydb", span.getAttributes().get(DB_SYSTEM_NAME)); @@ -222,7 +370,7 @@ private void assertSpanError(String spanName, int expectedCount, Status status) assertBaseAttributes(span); return; } - Assert.assertEquals("Unexpected count of span " + spanName, expectedCount, count); + Assert.assertEquals("Unexpected count of span " + spanName, count, expectedCount); } private void assertSpanOK(String spanName, int expectedCount) { diff --git a/query/src/main/java/tech/ydb/query/QueryClient.java b/query/src/main/java/tech/ydb/query/QueryClient.java index a71d8c12f..343ccb01f 100644 --- a/query/src/main/java/tech/ydb/query/QueryClient.java +++ b/query/src/main/java/tech/ydb/query/QueryClient.java @@ -8,6 +8,8 @@ import tech.ydb.core.Result; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.tracing.NoopTracer; +import tech.ydb.core.tracing.Tracer; import tech.ydb.query.impl.QueryClientImpl; import tech.ydb.query.impl.TableClientImpl; import tech.ydb.table.SessionPoolStats; @@ -37,6 +39,10 @@ static TableClient.Builder newTableClient(@WillNotClose GrpcTransport transport) ScheduledExecutorService getScheduler(); + default Tracer getTracer() { + return NoopTracer.getInstance(); + } + SessionPoolStats getSessionPoolStats(); @Override @@ -44,6 +50,7 @@ static TableClient.Builder newTableClient(@WillNotClose GrpcTransport transport) interface Builder { Builder sessionPoolMinSize(int minSize); + Builder sessionPoolMaxSize(int maxSize); Builder sessionMaxIdleTime(Duration duration); diff --git a/query/src/main/java/tech/ydb/query/impl/QueryClientImpl.java b/query/src/main/java/tech/ydb/query/impl/QueryClientImpl.java index 88cd83b7f..4716abc85 100644 --- a/query/src/main/java/tech/ydb/query/impl/QueryClientImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/QueryClientImpl.java @@ -9,6 +9,7 @@ import tech.ydb.core.Result; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.tracing.Tracer; import tech.ydb.query.QueryClient; import tech.ydb.query.QuerySession; import tech.ydb.table.SessionPoolStats; @@ -20,6 +21,7 @@ public class QueryClientImpl implements QueryClient { private final SessionPool pool; private final ScheduledExecutorService scheduler; + private final Tracer tracer; public QueryClientImpl(Builder builder) { this.pool = new SessionPool( @@ -31,6 +33,7 @@ public QueryClientImpl(Builder builder) { builder.sessionPoolIdleDuration ); this.scheduler = builder.transport.getScheduler(); + this.tracer = builder.transport.getTracer(); } @Override @@ -43,6 +46,11 @@ public ScheduledExecutorService getScheduler() { return scheduler; } + @Override + public Tracer getTracer() { + return tracer; + } + public void updatePoolMaxSize(int maxSize) { pool.updateMaxSize(maxSize); } diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index c22dff0d2..763d15c39 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -312,7 +312,7 @@ GrpcReadStream createGrpcStream( @Override public QueryStream createQuery(String query, TxMode tx, Params prms, ExecuteQuerySettings settings) { YdbQuery.TransactionControl tc = TxControl.txModeCtrl(tx, true); - Span span = rpc.startSpan("ydb.ExecuteQuery"); + Span span = startSpan("ydb.ExecuteQuery"); return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span) { @Override void handleTxMeta(String txID) { @@ -377,51 +377,52 @@ public CompletableFuture> execute(PartsHandler handler) { final UpdatableOptional operationStatus = new UpdatableOptional<>(); final UpdatableOptional stats = new UpdatableOptional<>(); return Span.endOnResult(span, grpcStream.start(msg -> { - if (isTraceEnabled) { - logger.trace("{} got stream message {}", SessionImpl.this, TextFormat.shortDebugString(msg)); - } - Issue[] issues = Issue.fromPb(msg.getIssuesList()); - Status status = Status.of(StatusCode.fromProto(msg.getStatus()), issues); + if (isTraceEnabled) { + logger.trace("{} got stream message {}", + SessionImpl.this, TextFormat.shortDebugString(msg)); + } + Issue[] issues = Issue.fromPb(msg.getIssuesList()); + Status status = Status.of(StatusCode.fromProto(msg.getStatus()), issues); - updateSessionState(status); + updateSessionState(status); - if (!status.isSuccess()) { - handleTxMeta(null); - operationStatus.update(status); - return; - } + if (!status.isSuccess()) { + handleTxMeta(null); + operationStatus.update(status); + return; + } - if (msg.hasTxMeta()) { - handleTxMeta(msg.getTxMeta().getId()); - } - if (issues.length > 0) { - if (handler != null) { - handler.onIssues(issues); - } else { - logger.trace("{} lost issues message", SessionImpl.this); - } - } - if (msg.hasExecStats()) { - stats.update(new QueryStats(msg.getExecStats())); - } + if (msg.hasTxMeta()) { + handleTxMeta(msg.getTxMeta().getId()); + } + if (issues.length > 0) { + if (handler != null) { + handler.onIssues(issues); + } else { + logger.trace("{} lost issues message", SessionImpl.this); + } + } + if (msg.hasExecStats()) { + stats.update(new QueryStats(msg.getExecStats())); + } - if (msg.hasResultSet()) { - long index = msg.getResultSetIndex(); - if (handler != null) { - handler.onNextRawPart(index, msg.getResultSet()); - } else { - logger.trace("{} lost result set part with index {}", SessionImpl.this, index); - } - } - }).whenComplete(this::handleCompletion).thenApply(streamStatus -> { - updateSessionState(streamStatus); - Status status = operationStatus.orElse(streamStatus); - if (status.isSuccess()) { - return Result.success(new QueryInfo(stats.get()), streamStatus); - } else { - return Result.fail(status); - } - }) + if (msg.hasResultSet()) { + long index = msg.getResultSetIndex(); + if (handler != null) { + handler.onNextRawPart(index, msg.getResultSet()); + } else { + logger.trace("{} lost result set part with index {}", SessionImpl.this, index); + } + } + }).whenComplete(this::handleCompletion).thenApply(streamStatus -> { + updateSessionState(streamStatus); + Status status = operationStatus.orElse(streamStatus); + if (status.isSuccess()) { + return Result.success(new QueryInfo(stats.get()), streamStatus); + } else { + return Result.fail(status); + } + }) ); } @@ -459,7 +460,7 @@ public QueryStream createQuery(String query, boolean commitAtEnd, Params prms, E ? TxControl.txIdCtrl(currentId, commitAtEnd) : TxControl.txModeCtrl(txMode, commitAtEnd); - Span span = rpc.startSpan("ydb.ExecuteQuery"); + Span span = startSpan("ydb.ExecuteQuery"); return new StreamImpl(createGrpcStream(query, tc, prms, settings, span), span) { @Override void handleTxMeta(String txID) { @@ -499,7 +500,7 @@ public void cancel() { @Override public CompletableFuture> commit(CommitTransactionSettings settings) { - Span span = rpc.startSpan("ydb.Commit"); + Span span = startSpan("ydb.Commit"); CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>()); String transactionId = txId.get(); if (transactionId == null) { @@ -533,7 +534,7 @@ public CompletableFuture> commit(CommitTransactionSettings set @Override public CompletableFuture rollback(RollbackTransactionSettings settings) { - Span span = rpc.startSpan("ydb.Rollback"); + Span span = startSpan("ydb.Rollback"); CompletableFuture currentStatusFuture = statusFuture.getAndSet(new CompletableFuture<>()); String transactionId = txId.get(); diff --git a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java index f5ebadeb6..9baadd0a2 100644 --- a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java @@ -16,6 +16,7 @@ import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.core.tracing.Span; +import tech.ydb.core.tracing.Tracer; import tech.ydb.proto.ValueProtos; import tech.ydb.proto.query.YdbQuery; import tech.ydb.proto.table.YdbTable; @@ -60,6 +61,11 @@ public ScheduledExecutorService getScheduler() { return proxy.getScheduler(); } + @Override + public Tracer getTracer() { + return proxy.getTracer(); + } + @Override public SessionPoolStats sessionPoolStats() { return proxy.getSessionPoolStats(); diff --git a/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java b/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java index cbecca71a..33b60b197 100644 --- a/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java +++ b/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java @@ -6,6 +6,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -15,13 +16,26 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; +import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; +import tech.ydb.core.tracing.Span; +import tech.ydb.core.tracing.SpanKind; +import tech.ydb.core.tracing.SpanScope; +import tech.ydb.core.tracing.Tracer; import tech.ydb.core.utils.FutureTools; import tech.ydb.query.QueryClient; import tech.ydb.query.QuerySession; +import tech.ydb.query.QueryStream; +import tech.ydb.query.QueryTransaction; +import tech.ydb.query.result.QueryInfo; +import tech.ydb.query.settings.BeginTransactionSettings; +import tech.ydb.query.settings.CommitTransactionSettings; +import tech.ydb.query.settings.ExecuteQuerySettings; +import tech.ydb.query.settings.RollbackTransactionSettings; +import tech.ydb.table.query.Params; /** @@ -29,6 +43,9 @@ */ @ParametersAreNonnullByDefault public class SessionRetryContext { + private static final String EXECUTE_WITH_RETRY_SPAN_NAME = "ydb.ExecuteWithRetry"; + private static final String RETRY_ATTEMPT_ATTR = "ydb.retry.attempt"; + private static final String RETRY_SLEEP_MS_ATTR = "ydb.retry.sleep_ms"; private final QueryClient queryClient; private final Executor executor; @@ -134,31 +151,40 @@ private long backoffTimeMillis(Throwable t, int retryNumber) { */ private abstract class BaseRetryableTask implements Runnable { private final CompletableFuture promise = new CompletableFuture<>(); + private final AtomicBoolean spanFinished = new AtomicBoolean(true); private final AtomicInteger retryNumber = new AtomicInteger(); private final Function> fn; + private final Tracer tracer; + private final Span parentSpan; + private Span retrySpan = Span.NOOP; BaseRetryableTask(Function> fn) { this.fn = fn; + this.tracer = queryClient.getTracer(); + this.parentSpan = tracer.currentSpan(); } CompletableFuture getFuture() { return promise; } - abstract StatusCode toStatusCode(R result); + abstract Status toStatus(R result); + abstract R toFailedResult(Result sessionResult); // called on timer expiration @Override public void run() { if (promise.isCancelled()) { + finishRetrySpan(null, null); return; } executor.execute(this::requestSession); } public void requestSession() { - CompletableFuture> sessionFuture = queryClient.createSession(sessionCreationTimeout); + startRetrySpan(); + CompletableFuture> sessionFuture = createSessionWithRetrySpanParent(); if (sessionFuture.isDone() && !sessionFuture.isCompletedExceptionally()) { // faster than subscribing on future acceptSession(sessionFuture.join()); @@ -176,13 +202,15 @@ public void requestSession() { private void acceptSession(@Nonnull Result sessionResult) { if (!sessionResult.isSuccess()) { - handleError(sessionResult.getStatus().getCode(), toFailedResult(sessionResult)); + handleError(sessionResult.getStatus(), toFailedResult(sessionResult)); return; } final QuerySession session = sessionResult.getValue(); + final QuerySession tracedSession = retrySpan.isValid() + ? new TracedQuerySession(session, retrySpan) : session; try { - fn.apply(session).whenComplete((fnResult, fnException) -> { + fn.apply(tracedSession).whenComplete((fnResult, fnException) -> { try { session.close(); @@ -191,13 +219,15 @@ private void acceptSession(@Nonnull Result sessionResult) { return; } - StatusCode statusCode = toStatusCode(fnResult); - if (statusCode == StatusCode.SUCCESS) { + Status status = toStatus(fnResult); + if (status.isSuccess()) { promise.complete(fnResult); + finishRetrySpan(status, null); } else { - handleError(statusCode, fnResult); + handleError(status, fnResult); } } catch (Throwable unexpected) { + finishRetrySpan(null, unexpected); promise.completeExceptionally(unexpected); } }); @@ -214,18 +244,23 @@ private void scheduleNext(long delayMillis) { queryClient.getScheduler().schedule(this, delayMillis, TimeUnit.MILLISECONDS); } - private void handleError(@Nonnull StatusCode code, R result) { + private void handleError(@Nonnull Status status, R result) { // Check retrayable status - if (!canRetry(code)) { + if (!canRetry(status.getCode())) { + finishRetrySpan(status, null); promise.complete(result); return; } + int failedAttempt = retryNumber.get(); int retry = retryNumber.incrementAndGet(); if (retry <= maxRetries) { - long next = backoffTimeMillis(code, retry); + long next = backoffTimeMillis(status.getCode(), retry); + recordRetrySchedule(failedAttempt, next); + finishRetrySpan(status, null); scheduleNext(next); } else { + finishRetrySpan(status, null); promise.complete(result); } } @@ -233,18 +268,55 @@ private void handleError(@Nonnull StatusCode code, R result) { private void handleException(@Nonnull Throwable ex) { // Check retrayable execption if (!canRetry(ex)) { + finishRetrySpan(null, ex); promise.completeExceptionally(ex); return; } + int failedAttempt = retryNumber.get(); int retry = retryNumber.incrementAndGet(); if (retry <= maxRetries) { long next = backoffTimeMillis(ex, retry); + recordRetrySchedule(failedAttempt, next); + finishRetrySpan(null, ex); scheduleNext(next); } else { + finishRetrySpan(null, ex); promise.completeExceptionally(ex); } } + + private CompletableFuture> createSessionWithRetrySpanParent() { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return queryClient.createSession(sessionCreationTimeout); + } + } + + private void startRetrySpan() { + if (!spanFinished.get()) { + return; + } + try (SpanScope ignored = parentSpan.makeCurrent()) { + retrySpan = tracer.startSpan(EXECUTE_WITH_RETRY_SPAN_NAME, SpanKind.INTERNAL); + } + spanFinished.set(false); + retrySpan.setAttribute(RETRY_ATTEMPT_ATTR, retryNumber.get()); + } + + private void recordRetrySchedule(int failedAttempt, long nextDelayMillis) { + retrySpan.setAttribute(RETRY_ATTEMPT_ATTR, failedAttempt); + retrySpan.setAttribute(RETRY_SLEEP_MS_ATTR, nextDelayMillis); + } + + private void finishRetrySpan(Status status, Throwable throwable) { + if (!spanFinished.compareAndSet(false, true)) { + return; + } + + retrySpan.setStatus(status, throwable); + retrySpan.end(); + retrySpan = Span.NOOP; + } } /** @@ -256,8 +328,8 @@ private final class RetryableResultTask extends BaseRetryableTask> } @Override - StatusCode toStatusCode(Result result) { - return result.getStatus().getCode(); + Status toStatus(Result result) { + return result.getStatus(); } @Override @@ -275,8 +347,8 @@ private final class RetryableStatusTask extends BaseRetryableTask { } @Override - StatusCode toStatusCode(Status status) { - return status.getCode(); + Status toStatus(Status status) { + return status; } @Override @@ -356,4 +428,116 @@ public SessionRetryContext build() { return new SessionRetryContext(this); } } + + /** + * Wraps QuerySession to propagate retry span as parent for all RPC spans within a retry attempt. + */ + private static final class TracedQuerySession implements QuerySession { + private final QuerySession delegate; + private final Span retrySpan; + + TracedQuerySession(QuerySession delegate, Span retrySpan) { + this.delegate = delegate; + this.retrySpan = retrySpan; + } + + @Override + public String getId() { + return delegate.getId(); + } + + @Override + public QueryTransaction currentTransaction() { + QueryTransaction tx = delegate.currentTransaction(); + return tx != null ? new TracedQueryTransaction(tx, retrySpan) : null; + } + + @Override + public QueryTransaction createNewTransaction(TxMode txMode) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return new TracedQueryTransaction(delegate.createNewTransaction(txMode), retrySpan); + } + } + + @Override + public CompletableFuture> beginTransaction( + TxMode txMode, BeginTransactionSettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.beginTransaction(txMode, settings) + .thenApply(r -> r.map(tx -> new TracedQueryTransaction(tx, retrySpan))); + } + } + + @Override + public QueryStream createQuery(String query, TxMode tx, Params params, ExecuteQuerySettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.createQuery(query, tx, params, settings); + } + } + + @Override + public void close() { + delegate.close(); + } + } + + /** + * Wraps QueryTransaction to propagate retry span as parent for commit/rollback/query spans. + */ + private static final class TracedQueryTransaction implements QueryTransaction { + private final QueryTransaction delegate; + private final Span retrySpan; + + TracedQueryTransaction(QueryTransaction delegate, Span retrySpan) { + this.delegate = delegate; + this.retrySpan = retrySpan; + } + + @Override + public QuerySession getSession() { + return new TracedQuerySession(delegate.getSession(), retrySpan); + } + + @Override + public String getId() { + return delegate.getId(); + } + + @Override + public TxMode getTxMode() { + return delegate.getTxMode(); + } + + @Override + public String getSessionId() { + return delegate.getSessionId(); + } + + @Override + public CompletableFuture getStatusFuture() { + return delegate.getStatusFuture(); + } + + @Override + public CompletableFuture> commit(CommitTransactionSettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.commit(settings); + } + } + + @Override + public CompletableFuture rollback(RollbackTransactionSettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.rollback(settings); + } + } + + @Override + public QueryStream createQuery(String query, boolean commitAtEnd, Params params, + ExecuteQuerySettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.createQuery(query, commitAtEnd, params, settings); + } + } + } } diff --git a/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java b/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java index 4e148d663..aa78ae79b 100644 --- a/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java +++ b/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java @@ -4,6 +4,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; import org.junit.AfterClass; @@ -17,14 +19,17 @@ import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Result; import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.core.tracing.Span; import tech.ydb.core.tracing.SpanKind; +import tech.ydb.core.tracing.SpanScope; import tech.ydb.core.tracing.Tracer; import tech.ydb.query.QueryClient; import tech.ydb.query.QuerySession; import tech.ydb.query.QueryTransaction; import tech.ydb.query.result.QueryInfo; +import tech.ydb.query.tools.SessionRetryContext; import tech.ydb.table.Session; import tech.ydb.table.TableClient; import tech.ydb.table.query.DataQueryResult; @@ -158,12 +163,86 @@ public void executeQuerySpansAreRecordedInTableProxyTransaction() { Assert.assertEquals(1, tracer.countClosedSpan("ydb.Rollback")); } + @Test + public void querySpanIsChildOfApplicationSpan() { + try (QuerySession session = queryClient.createSession(Duration.ofSeconds(5)).join().getValue()) { + Span appParent = tracer.startSpan("app.parent", SpanKind.INTERNAL); + try (SpanScope ignored = appParent.makeCurrent()) { + session.createQuery("SELECT 1", TxMode.NONE).execute().join().getStatus().expectSuccess(); + } finally { + appParent.end(); + } + } + + Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "app.parent")); + } + + @Test + public void retrySpanIsParentOfRpcSpans() { + AtomicInteger attempt = new AtomicInteger(); + SessionRetryContext retryContext = SessionRetryContext.create(queryClient) + .maxRetries(5) + .backoffSlot(Duration.ofMillis(1)) + .fastBackoffSlot(Duration.ofMillis(1)) + .build(); + + Span appParent = tracer.startSpan("app.parent.retry", SpanKind.INTERNAL); + try (SpanScope ignored = appParent.makeCurrent()) { + Status status = retryContext.supplyStatus(session -> { + if (attempt.getAndIncrement() == 0) { + return CompletableFuture.completedFuture(Status.of(StatusCode.OVERLOADED)); + } + return session.createQuery("SELECT 1", TxMode.NONE).execute().thenApply(Result::getStatus); + }).join(); + status.expectSuccess(); + } finally { + appParent.end(); + } + + Assert.assertEquals(2, tracer.countClosedSpan("ydb.ExecuteWithRetry")); + Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.CreateSession", "ydb.ExecuteWithRetry")); + Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.ExecuteWithRetry")); + } + + @Test + public void tableProxyRetrySpanIsParentOfRpcSpans() { + AtomicInteger attempt = new AtomicInteger(); + tableClient = QueryClient.newTableClient(transport).build(); + tech.ydb.table.SessionRetryContext retryContext = tech.ydb.table.SessionRetryContext.create(tableClient) + .maxRetries(5) + .backoffSlot(Duration.ofMillis(1)) + .fastBackoffSlot(Duration.ofMillis(1)) + .build(); + + Span appParent = tracer.startSpan("app.parent.table.retry", SpanKind.INTERNAL); + try (SpanScope ignored = appParent.makeCurrent()) { + Status status = retryContext.supplyStatus(session -> { + if (attempt.getAndIncrement() == 0) { + return CompletableFuture.completedFuture(Status.of(StatusCode.OVERLOADED)); + } + TableTransaction tx = session.createNewTransaction(TxMode.SERIALIZABLE_RW); + Result queryResult = tx.executeDataQuery("SELECT 1", Params.empty()).join(); + queryResult.getStatus().expectSuccess(); + return tx.commit(); + }).join(); + status.expectSuccess(); + } finally { + appParent.end(); + } + + Assert.assertEquals(2, tracer.countClosedSpan("ydb.ExecuteWithRetry")); + Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.CreateSession", "ydb.ExecuteWithRetry")); + Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.ExecuteWithRetry")); + Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.Commit", "ydb.ExecuteWithRetry")); + } + private static final class RecordingTracer implements Tracer { private final List spans = Collections.synchronizedList(new ArrayList<>()); + private final ThreadLocal currentSpan = new ThreadLocal<>(); @Override public Span startSpan(String spanName, SpanKind spanKind) { - RecordingSpan span = new RecordingSpan(spanName, spanKind); + RecordingSpan span = new RecordingSpan(this, spanName, spanKind, currentSpan.get()); spans.add(span); return span; } @@ -183,16 +262,47 @@ int countClosedSpan(String spanName) { } return count; } + + int countClosedSpanWithParent(String spanName, String parentSpanName) { + int count = 0; + synchronized (spans) { + for (RecordingSpan span : spans) { + if (span.closed + && span.name.equals(spanName) + && span.parent != null + && span.parent.name.equals(parentSpanName)) { + count++; + } + } + } + return count; + } + + SpanScope makeSpanCurrent(RecordingSpan span) { + RecordingSpan previous = currentSpan.get(); + currentSpan.set(span); + return () -> { + if (previous == null) { + currentSpan.remove(); + } else { + currentSpan.set(previous); + } + }; + } } private static final class RecordingSpan implements Span { + private final RecordingTracer tracer; private final String name; private final SpanKind kind; + private final RecordingSpan parent; private volatile boolean closed = false; - RecordingSpan(String name, SpanKind kind) { + RecordingSpan(RecordingTracer tracer, String name, SpanKind kind, RecordingSpan parent) { + this.tracer = tracer; this.name = name; this.kind = kind; + this.parent = parent; } @Override @@ -201,18 +311,13 @@ public String getId() { } @Override - public void setAttribute(String key, String value) { - // not needed for this test - } - - @Override - public void setAttribute(String key, long value) { - // not needed for this test + public boolean isValid() { + return true; } @Override - public void setStatus(Status status, Throwable error) { - // not needed for this test + public SpanScope makeCurrent() { + return tracer.makeSpanCurrent(this); } @Override diff --git a/table/src/main/java/tech/ydb/table/SessionRetryContext.java b/table/src/main/java/tech/ydb/table/SessionRetryContext.java index b828ae182..857b5606e 100644 --- a/table/src/main/java/tech/ydb/table/SessionRetryContext.java +++ b/table/src/main/java/tech/ydb/table/SessionRetryContext.java @@ -7,6 +7,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -16,11 +17,50 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; +import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; +import tech.ydb.core.grpc.GrpcReadStream; +import tech.ydb.core.tracing.Span; +import tech.ydb.core.tracing.SpanKind; +import tech.ydb.core.tracing.SpanScope; +import tech.ydb.core.tracing.Tracer; import tech.ydb.core.utils.FutureTools; +import tech.ydb.proto.ValueProtos; +import tech.ydb.table.description.TableDescription; +import tech.ydb.table.description.TableOptionDescription; +import tech.ydb.table.query.BulkUpsertData; +import tech.ydb.table.query.DataQuery; +import tech.ydb.table.query.DataQueryResult; +import tech.ydb.table.query.ExplainDataQueryResult; +import tech.ydb.table.query.Params; +import tech.ydb.table.query.ReadRowsResult; +import tech.ydb.table.query.ReadTablePart; +import tech.ydb.table.settings.AlterTableSettings; +import tech.ydb.table.settings.BeginTxSettings; +import tech.ydb.table.settings.BulkUpsertSettings; +import tech.ydb.table.settings.CommitTxSettings; +import tech.ydb.table.settings.CopyTableSettings; +import tech.ydb.table.settings.CopyTablesSettings; +import tech.ydb.table.settings.CreateTableSettings; +import tech.ydb.table.settings.DescribeTableOptionsSettings; +import tech.ydb.table.settings.DescribeTableSettings; +import tech.ydb.table.settings.DropTableSettings; +import tech.ydb.table.settings.ExecuteDataQuerySettings; +import tech.ydb.table.settings.ExecuteScanQuerySettings; +import tech.ydb.table.settings.ExecuteSchemeQuerySettings; +import tech.ydb.table.settings.ExplainDataQuerySettings; +import tech.ydb.table.settings.KeepAliveSessionSettings; +import tech.ydb.table.settings.PrepareDataQuerySettings; +import tech.ydb.table.settings.ReadRowsSettings; +import tech.ydb.table.settings.ReadTableSettings; +import tech.ydb.table.settings.RenameTablesSettings; +import tech.ydb.table.settings.RollbackTxSettings; +import tech.ydb.table.transaction.TableTransaction; +import tech.ydb.table.transaction.Transaction; +import tech.ydb.table.transaction.TxControl; /** @@ -28,6 +68,9 @@ */ @ParametersAreNonnullByDefault public class SessionRetryContext { + private static final String EXECUTE_WITH_RETRY_SPAN_NAME = "ydb.ExecuteWithRetry"; + private static final String RETRY_ATTEMPT_ATTR = "ydb.retry.attempt"; + private static final String RETRY_SLEEP_MS_ATTR = "ydb.retry.sleep_ms"; private final SessionSupplier sessionSupplier; private final Executor executor; @@ -143,21 +186,27 @@ private long backoffTimeMillis(Throwable t, int retryNumber) { */ private abstract class BaseRetryableTask implements Runnable { private final CompletableFuture promise = new CompletableFuture<>(); + private final AtomicBoolean spanFinished = new AtomicBoolean(true); private final AtomicInteger retryNumber = new AtomicInteger(); private final Function> fn; private final long createTimestamp = Instant.now().toEpochMilli(); private final SessionRetryHandler handler; + private final Tracer tracer; + private final Span parentSpan; + private Span retrySpan = Span.NOOP; BaseRetryableTask(SessionRetryHandler h, Function> fn) { this.fn = fn; this.handler = h; + this.tracer = sessionSupplier.getTracer(); + this.parentSpan = tracer.currentSpan(); } CompletableFuture getFuture() { return promise; } - abstract StatusCode toStatusCode(R result); + abstract Status toStatus(R result); abstract R toFailedResult(Result sessionResult); private long ms() { @@ -169,13 +218,15 @@ private long ms() { public void run() { if (promise.isCancelled()) { handler.onCancel(SessionRetryContext.this, retryNumber.get(), ms()); + finishRetrySpan(null, null); return; } executor.execute(this::requestSession); } public void requestSession() { - CompletableFuture> sessionFuture = sessionSupplier.createSession(sessionCreationTimeout); + startRetrySpan(); + CompletableFuture> sessionFuture = createSessionWithRetrySpanParent(); if (sessionFuture.isDone() && !sessionFuture.isCompletedExceptionally()) { // faster than subscribing on future acceptSession(sessionFuture.join()); @@ -193,13 +244,15 @@ public void requestSession() { private void acceptSession(@Nonnull Result sessionResult) { if (!sessionResult.isSuccess()) { - handleError(sessionResult.getStatus().getCode(), toFailedResult(sessionResult)); + handleError(sessionResult.getStatus(), toFailedResult(sessionResult)); return; } final Session session = sessionResult.getValue(); + final Session tracedSession = retrySpan.isValid() + ? new TracedSession(session, retrySpan) : session; try { - fn.apply(session).whenComplete((fnResult, fnException) -> { + fn.apply(tracedSession).whenComplete((fnResult, fnException) -> { try { session.close(); @@ -208,15 +261,17 @@ private void acceptSession(@Nonnull Result sessionResult) { return; } - StatusCode statusCode = toStatusCode(fnResult); - if (statusCode == StatusCode.SUCCESS) { + Status status = toStatus(fnResult); + if (status.isSuccess()) { handler.onSuccess(SessionRetryContext.this, retryNumber.get(), ms()); promise.complete(fnResult); + finishRetrySpan(status, null); } else { - handleError(statusCode, fnResult); + handleError(status, fnResult); } } catch (Throwable unexpected) { handler.onError(SessionRetryContext.this, unexpected, retryNumber.get(), ms()); + finishRetrySpan(null, unexpected); promise.completeExceptionally(unexpected); } }); @@ -233,21 +288,26 @@ private void scheduleNext(long delayMillis) { sessionSupplier.getScheduler().schedule(this, delayMillis, TimeUnit.MILLISECONDS); } - private void handleError(@Nonnull StatusCode code, R result) { - // Check retrayable status + private void handleError(@Nonnull Status status, R result) { + StatusCode code = status.getCode(); if (!canRetry(code)) { handler.onError(SessionRetryContext.this, code, retryNumber.get(), ms()); + finishRetrySpan(status, null); promise.complete(result); return; } + int failedAttempt = retryNumber.get(); int retry = retryNumber.incrementAndGet(); if (retry <= maxRetries) { long next = backoffTimeMillis(code, retry); handler.onRetry(SessionRetryContext.this, code, retry, next, ms()); + recordRetrySchedule(failedAttempt, next); + finishRetrySpan(status, null); scheduleNext(next); } else { handler.onLimit(SessionRetryContext.this, code, maxRetries, ms()); + finishRetrySpan(status, null); promise.complete(result); } } @@ -256,20 +316,56 @@ private void handleException(@Nonnull Throwable ex) { // Check retrayable execption if (!canRetry(ex)) { handler.onError(SessionRetryContext.this, ex, retryNumber.get(), ms()); + finishRetrySpan(null, ex); promise.completeExceptionally(ex); return; } + int failedAttempt = retryNumber.get(); int retry = retryNumber.incrementAndGet(); if (retry <= maxRetries) { long next = backoffTimeMillis(ex, retry); handler.onRetry(SessionRetryContext.this, ex, retry, next, ms()); + recordRetrySchedule(failedAttempt, next); + finishRetrySpan(null, ex); scheduleNext(next); } else { handler.onLimit(SessionRetryContext.this, ex, maxRetries, ms()); + finishRetrySpan(null, ex); promise.completeExceptionally(ex); } } + + private CompletableFuture> createSessionWithRetrySpanParent() { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return sessionSupplier.createSession(sessionCreationTimeout); + } + } + + private void startRetrySpan() { + if (!spanFinished.get()) { + return; + } + try (SpanScope ignored = parentSpan.makeCurrent()) { + retrySpan = tracer.startSpan(EXECUTE_WITH_RETRY_SPAN_NAME, SpanKind.INTERNAL); + } + spanFinished.set(false); + retrySpan.setAttribute(RETRY_ATTEMPT_ATTR, retryNumber.get()); + } + + private void recordRetrySchedule(int failedAttempt, long nextDelayMillis) { + retrySpan.setAttribute(RETRY_ATTEMPT_ATTR, failedAttempt); + retrySpan.setAttribute(RETRY_SLEEP_MS_ATTR, nextDelayMillis); + } + + private void finishRetrySpan(Status status, Throwable throwable) { + if (!spanFinished.compareAndSet(false, true)) { + return; + } + retrySpan.setStatus(status, throwable); + retrySpan.end(); + retrySpan = Span.NOOP; + } } /** @@ -281,8 +377,8 @@ private final class RetryableResultTask extends BaseRetryableTask> } @Override - StatusCode toStatusCode(Result result) { - return result.getStatus().getCode(); + Status toStatus(Result result) { + return result.getStatus(); } @Override @@ -300,8 +396,8 @@ private final class RetryableStatusTask extends BaseRetryableTask { } @Override - StatusCode toStatusCode(Status status) { - return status.getCode(); + Status toStatus(Status status) { + return status; } @Override @@ -381,4 +477,285 @@ public SessionRetryContext build() { return new SessionRetryContext(this); } } + + /** + * Wraps Session to propagate retry span as parent for all RPC spans within a retry attempt. + */ + private static final class TracedSession implements Session { + private final Session delegate; + private final Span retrySpan; + + TracedSession(Session delegate, Span retrySpan) { + this.delegate = delegate; + this.retrySpan = retrySpan; + } + + @Override + public String getId() { + return delegate.getId(); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public CompletableFuture createTable(String path, TableDescription tableDescriptions, + CreateTableSettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.createTable(path, tableDescriptions, settings); + } + } + + @Override + public CompletableFuture dropTable(String path, DropTableSettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.dropTable(path, settings); + } + } + + @Override + public CompletableFuture alterTable(String path, AlterTableSettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.alterTable(path, settings); + } + } + + @Override + public CompletableFuture copyTable(String src, String dst, CopyTableSettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.copyTable(src, dst, settings); + } + } + + @Override + public CompletableFuture copyTables(CopyTablesSettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.copyTables(settings); + } + } + + @Override + public CompletableFuture renameTables(RenameTablesSettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.renameTables(settings); + } + } + + @Override + public CompletableFuture> describeTable(String path, + DescribeTableSettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.describeTable(path, settings); + } + } + + @Override + public CompletableFuture> prepareDataQuery(String query, + PrepareDataQuerySettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.prepareDataQuery(query, settings); + } + } + + @Override + public CompletableFuture> executeDataQuery(String query, TxControl txControl, + Params params, ExecuteDataQuerySettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.executeDataQuery(query, txControl, params, settings); + } + } + + @Override + public CompletableFuture> readRows(String pathToTable, ReadRowsSettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.readRows(pathToTable, settings); + } + } + + @Override + public CompletableFuture executeSchemeQuery(String query, ExecuteSchemeQuerySettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.executeSchemeQuery(query, settings); + } + } + + @Override + public CompletableFuture> explainDataQuery(String query, + ExplainDataQuerySettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.explainDataQuery(query, settings); + } + } + + @Override + public CompletableFuture> describeTableOptions( + DescribeTableOptionsSettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.describeTableOptions(settings); + } + } + + @Override + public CompletableFuture> beginTransaction(Transaction.Mode transactionMode, + BeginTxSettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.beginTransaction(transactionMode, settings) + .thenApply(r -> r.map(tx -> new TracedTransaction(tx, retrySpan))); + } + } + + @Override + public TableTransaction createNewTransaction(TxMode txMode) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return new TracedTableTransaction(delegate.createNewTransaction(txMode), retrySpan); + } + } + + @Override + public CompletableFuture> beginTransaction(TxMode txMode, + BeginTxSettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.beginTransaction(txMode, settings) + .thenApply(r -> r.map(tx -> new TracedTableTransaction(tx, retrySpan))); + } + } + + @Override + public CompletableFuture commitTransaction(String txId, CommitTxSettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.commitTransaction(txId, settings); + } + } + + @Override + public CompletableFuture rollbackTransaction(String txId, RollbackTxSettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.rollbackTransaction(txId, settings); + } + } + + @Override + public GrpcReadStream executeReadTable(String tablePath, ReadTableSettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.executeReadTable(tablePath, settings); + } + } + + @Override + public GrpcReadStream executeScanQueryRaw(String query, Params params, + ExecuteScanQuerySettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.executeScanQueryRaw(query, params, settings); + } + } + + @Override + public CompletableFuture> keepAlive(KeepAliveSessionSettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.keepAlive(settings); + } + } + + @Override + public CompletableFuture executeBulkUpsert(String tablePath, BulkUpsertData data, + BulkUpsertSettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.executeBulkUpsert(tablePath, data, settings); + } + } + } + + /** + * Wraps TableTransaction to propagate retry span as parent for commit/rollback/query spans. + */ + private static final class TracedTableTransaction implements TableTransaction { + private final TableTransaction delegate; + private final Span retrySpan; + + TracedTableTransaction(TableTransaction delegate, Span retrySpan) { + this.delegate = delegate; + this.retrySpan = retrySpan; + } + + @Override + public Session getSession() { + return delegate.getSession(); + } + + @Override + public String getId() { + return delegate.getId(); + } + + @Override + public TxMode getTxMode() { + return delegate.getTxMode(); + } + + @Override + public String getSessionId() { + return delegate.getSessionId(); + } + + @Override + public CompletableFuture getStatusFuture() { + return delegate.getStatusFuture(); + } + + @Override + public CompletableFuture> executeDataQuery(String query, boolean commitAtEnd, + Params params, ExecuteDataQuerySettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.executeDataQuery(query, commitAtEnd, params, settings); + } + } + + @Override + public CompletableFuture commit(CommitTxSettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.commit(settings); + } + } + + @Override + public CompletableFuture rollback(RollbackTxSettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.rollback(settings); + } + } + } + + /** + * Wraps legacy Transaction to propagate retry span as parent for commit/rollback spans. + */ + private static final class TracedTransaction implements Transaction { + private final Transaction delegate; + private final Span retrySpan; + + TracedTransaction(Transaction delegate, Span retrySpan) { + this.delegate = delegate; + this.retrySpan = retrySpan; + } + + @Override + public String getId() { + return delegate.getId(); + } + + @Override + public CompletableFuture commit(CommitTxSettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.commit(settings); + } + } + + @Override + public CompletableFuture rollback(RollbackTxSettings settings) { + try (SpanScope ignored = retrySpan.makeCurrent()) { + return delegate.rollback(settings); + } + } + } } diff --git a/table/src/main/java/tech/ydb/table/SessionSupplier.java b/table/src/main/java/tech/ydb/table/SessionSupplier.java index 35ecd007f..dbec11281 100644 --- a/table/src/main/java/tech/ydb/table/SessionSupplier.java +++ b/table/src/main/java/tech/ydb/table/SessionSupplier.java @@ -5,6 +5,8 @@ import java.util.concurrent.ScheduledExecutorService; import tech.ydb.core.Result; +import tech.ydb.core.tracing.NoopTracer; +import tech.ydb.core.tracing.Tracer; /** @@ -26,4 +28,7 @@ public interface SessionSupplier { */ ScheduledExecutorService getScheduler(); + default Tracer getTracer() { + return NoopTracer.getInstance(); + } } diff --git a/table/src/main/java/tech/ydb/table/impl/SimpleTableClient.java b/table/src/main/java/tech/ydb/table/impl/SimpleTableClient.java index 199d3ae73..596466acb 100644 --- a/table/src/main/java/tech/ydb/table/impl/SimpleTableClient.java +++ b/table/src/main/java/tech/ydb/table/impl/SimpleTableClient.java @@ -6,6 +6,7 @@ import tech.ydb.core.Result; import tech.ydb.core.StatusCode; +import tech.ydb.core.tracing.Tracer; import tech.ydb.table.Session; import tech.ydb.table.SessionSupplier; import tech.ydb.table.rpc.TableRpc; @@ -38,6 +39,11 @@ public ScheduledExecutorService getScheduler() { return tableRpc.getScheduler(); } + @Override + public Tracer getTracer() { + return tableRpc.getTracer(); + } + public static Builder newClient(TableRpc rpc) { return new Builder(rpc); } diff --git a/table/src/main/java/tech/ydb/table/rpc/TableRpc.java b/table/src/main/java/tech/ydb/table/rpc/TableRpc.java index 714e242ba..c60684d59 100644 --- a/table/src/main/java/tech/ydb/table/rpc/TableRpc.java +++ b/table/src/main/java/tech/ydb/table/rpc/TableRpc.java @@ -7,6 +7,8 @@ import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.grpc.GrpcRequestSettings; +import tech.ydb.core.tracing.NoopTracer; +import tech.ydb.core.tracing.Tracer; import tech.ydb.proto.table.YdbTable; import tech.ydb.proto.table.YdbTable.AlterTableRequest; import tech.ydb.proto.table.YdbTable.BeginTransactionRequest; @@ -50,6 +52,10 @@ public interface TableRpc extends AutoCloseable { ScheduledExecutorService getScheduler(); + default Tracer getTracer() { + return NoopTracer.getInstance(); + } + @Override void close(); diff --git a/table/src/main/java/tech/ydb/table/rpc/grpc/GrpcTableRpc.java b/table/src/main/java/tech/ydb/table/rpc/grpc/GrpcTableRpc.java index eaad07cc6..f1c597646 100644 --- a/table/src/main/java/tech/ydb/table/rpc/grpc/GrpcTableRpc.java +++ b/table/src/main/java/tech/ydb/table/rpc/grpc/GrpcTableRpc.java @@ -14,6 +14,7 @@ import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.core.operation.OperationBinder; import tech.ydb.core.operation.StatusExtractor; +import tech.ydb.core.tracing.Tracer; import tech.ydb.proto.table.YdbTable; import tech.ydb.proto.table.v1.TableServiceGrpc; import tech.ydb.table.rpc.TableRpc; @@ -246,6 +247,11 @@ public ScheduledExecutorService getScheduler() { return transport.getScheduler(); } + @Override + public Tracer getTracer() { + return transport.getTracer(); + } + @Override public void close() { if (transportOwned) { From 2df0573a2894997d01fcad54d5b52c9213ff1e13 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 12 Mar 2026 13:39:22 +0300 Subject: [PATCH 2/4] fix flap test --- .../src/main/java/tech/ydb/query/tools/SessionRetryContext.java | 2 +- table/src/main/java/tech/ydb/table/SessionRetryContext.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java b/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java index 33b60b197..baacfaae8 100644 --- a/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java +++ b/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java @@ -221,8 +221,8 @@ private void acceptSession(@Nonnull Result sessionResult) { Status status = toStatus(fnResult); if (status.isSuccess()) { - promise.complete(fnResult); finishRetrySpan(status, null); + promise.complete(fnResult); } else { handleError(status, fnResult); } diff --git a/table/src/main/java/tech/ydb/table/SessionRetryContext.java b/table/src/main/java/tech/ydb/table/SessionRetryContext.java index 857b5606e..915f34dc7 100644 --- a/table/src/main/java/tech/ydb/table/SessionRetryContext.java +++ b/table/src/main/java/tech/ydb/table/SessionRetryContext.java @@ -264,8 +264,8 @@ private void acceptSession(@Nonnull Result sessionResult) { Status status = toStatus(fnResult); if (status.isSuccess()) { handler.onSuccess(SessionRetryContext.this, retryNumber.get(), ms()); - promise.complete(fnResult); finishRetrySpan(status, null); + promise.complete(fnResult); } else { handleError(status, fnResult); } From a6dd3f3ea9d4bef280ee6912656a542b5328caca Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 12 Mar 2026 18:13:28 +0300 Subject: [PATCH 3/4] added more tests --- .../opentelemetry/OpenTelemetryTracer.java | 9 +- ...nTelemetryQueryTracingIntegrationTest.java | 83 +++++++++++++++++++ .../tech/ydb/query/impl/QueryTracingTest.java | 74 +++++++++++++++++ 3 files changed, 165 insertions(+), 1 deletion(-) diff --git a/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java index 14fc0621c..70381da7c 100644 --- a/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java +++ b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java @@ -8,6 +8,7 @@ import io.opentelemetry.context.Scope; import tech.ydb.core.Status; +import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.tracing.Span; import tech.ydb.core.tracing.SpanKind; import tech.ydb.core.tracing.SpanScope; @@ -100,7 +101,13 @@ public void setStatus(Status status, Throwable error) { } } if (error != null) { - span.setAttribute("error.type", error.getClass().getName()); + if (error instanceof UnexpectedResultException) { + tech.ydb.core.StatusCode code = ((UnexpectedResultException) error).getStatus().getCode(); + span.setAttribute("db.response.status_code", code.toString()); + span.setAttribute("error.type", code.isTransportError() ? "transport_error" : "ydb_error"); + } else { + span.setAttribute("error.type", error.getClass().getName()); + } span.setStatus(StatusCode.ERROR, error.getMessage()); } } diff --git a/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryQueryTracingIntegrationTest.java b/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryQueryTracingIntegrationTest.java index ff2d91f6f..732b0ef17 100644 --- a/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryQueryTracingIntegrationTest.java +++ b/opentelemetry/src/test/java/tech/ydb/opentelemetry/OpenTelemetryQueryTracingIntegrationTest.java @@ -29,6 +29,7 @@ import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.StatusCode; +import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.query.QueryClient; import tech.ydb.query.QuerySession; @@ -313,6 +314,88 @@ public void tableProxyRetrySpanIsParentForRpcSpans() { Assert.assertTrue("Commit span must be child of ExecuteWithRetry", commitChildren >= 1); } + @Test + public void nonRetryableExceptionProducesErrorRetrySpan() { + SessionRetryContext retryContext = SessionRetryContext.create(queryClient) + .maxRetries(5) + .backoffSlot(Duration.ofMillis(1)) + .fastBackoffSlot(Duration.ofMillis(1)) + .build(); + + RuntimeException thrown; + try { + retryContext.supplyStatus(session -> { + throw new IllegalStateException("boom"); + }).join(); + throw new AssertionError("Exception expected"); + } catch (RuntimeException ex) { + thrown = ex; + } + + Assert.assertNotNull(thrown.getCause()); + Assert.assertTrue(thrown.getCause() instanceof IllegalStateException); + assertSpanOK("ydb.CreateSession", 1); + assertSpanOK("ydb.ExecuteQuery", 0); + + List spans = spanExporter.getFinishedSpanItems(); + int retrySpans = 0; + int errorRetrySpans = 0; + for (SpanData span : spans) { + if (!"ydb.ExecuteWithRetry".equals(span.getName())) { + continue; + } + retrySpans++; + if (io.opentelemetry.api.trace.StatusCode.ERROR.equals(span.getStatus().getStatusCode())) { + errorRetrySpans++; + Assert.assertEquals(IllegalStateException.class.getName(), + span.getAttributes().get(ERROR_TYPE)); + } + } + Assert.assertEquals(1, retrySpans); + Assert.assertEquals(1, errorRetrySpans); + } + + @Test + public void retryableUnexpectedResultExceptionRetriesAndSetsErrorType() { + AtomicInteger attempt = new AtomicInteger(); + SessionRetryContext retryContext = SessionRetryContext.create(queryClient) + .maxRetries(5) + .backoffSlot(Duration.ofMillis(1)) + .fastBackoffSlot(Duration.ofMillis(1)) + .build(); + + Status status = retryContext.supplyStatus(session -> { + if (attempt.getAndIncrement() == 0) { + throw new UnexpectedResultException("retryable", Status.of(StatusCode.OVERLOADED)); + } + return session.createQuery("SELECT 1", TxMode.NONE).execute().thenApply(Result::getStatus); + }).join(); + status.expectSuccess(); + + assertSpanOK("ydb.ExecuteQuery", 1); + + List spans = spanExporter.getFinishedSpanItems(); + int retrySpans = 0; + int errorRetrySpans = 0; + int okRetrySpans = 0; + for (SpanData span : spans) { + if (!"ydb.ExecuteWithRetry".equals(span.getName())) { + continue; + } + retrySpans++; + if (io.opentelemetry.api.trace.StatusCode.ERROR.equals(span.getStatus().getStatusCode())) { + errorRetrySpans++; + Assert.assertEquals("ydb_error", span.getAttributes().get(ERROR_TYPE)); + Assert.assertEquals(StatusCode.OVERLOADED.toString(), span.getAttributes().get(STATUS_CODE)); + } else if (io.opentelemetry.api.trace.StatusCode.OK.equals(span.getStatus().getStatusCode())) { + okRetrySpans++; + } + } + Assert.assertEquals(2, retrySpans); + Assert.assertEquals(1, errorRetrySpans); + Assert.assertEquals(1, okRetrySpans); + } + @Test public void tableProxySpansAreChildrenOfApplicationSpan() { io.opentelemetry.api.trace.Span appSpan = appTracer.spanBuilder("app.parent.table").startSpan(); diff --git a/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java b/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java index aa78ae79b..3904bc786 100644 --- a/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java +++ b/query/src/test/java/tech/ydb/query/impl/QueryTracingTest.java @@ -7,6 +7,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; + import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -20,6 +22,7 @@ import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.StatusCode; +import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.core.tracing.Span; import tech.ydb.core.tracing.SpanKind; @@ -236,6 +239,56 @@ public void tableProxyRetrySpanIsParentOfRpcSpans() { Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.Commit", "ydb.ExecuteWithRetry")); } + @Test + public void nonRetryableExceptionClosesRetrySpan() { + SessionRetryContext retryContext = SessionRetryContext.create(queryClient) + .maxRetries(5) + .backoffSlot(Duration.ofMillis(1)) + .fastBackoffSlot(Duration.ofMillis(1)) + .build(); + + RuntimeException thrown; + try { + retryContext.supplyStatus(session -> { + throw new IllegalStateException("boom"); + }).join(); + throw new AssertionError("Exception expected"); + } catch (RuntimeException ex) { + thrown = ex; + } + + Assert.assertNotNull(thrown.getCause()); + Assert.assertTrue(thrown.getCause() instanceof IllegalStateException); + Assert.assertEquals(1, tracer.countClosedSpan("ydb.ExecuteWithRetry")); + Assert.assertEquals(1, + tracer.countClosedSpanWithErrorType("ydb.ExecuteWithRetry", IllegalStateException.class.getName())); + Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.CreateSession", "ydb.ExecuteWithRetry")); + Assert.assertEquals(0, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.ExecuteWithRetry")); + } + + @Test + public void retryableUnexpectedResultExceptionRetriesAndSetsErrorType() { + AtomicInteger attempt = new AtomicInteger(); + SessionRetryContext retryContext = SessionRetryContext.create(queryClient) + .maxRetries(5) + .backoffSlot(Duration.ofMillis(1)) + .fastBackoffSlot(Duration.ofMillis(1)) + .build(); + + Status status = retryContext.supplyStatus(session -> { + if (attempt.getAndIncrement() == 0) { + throw new UnexpectedResultException("retryable", Status.of(StatusCode.OVERLOADED)); + } + return session.createQuery("SELECT 1", TxMode.NONE).execute().thenApply(Result::getStatus); + }).join(); + status.expectSuccess(); + + Assert.assertEquals(2, tracer.countClosedSpan("ydb.ExecuteWithRetry")); + Assert.assertEquals(1, + tracer.countClosedSpanWithErrorType("ydb.ExecuteWithRetry", UnexpectedResultException.class.getName())); + Assert.assertEquals(1, tracer.countClosedSpanWithParent("ydb.ExecuteQuery", "ydb.ExecuteWithRetry")); + } + private static final class RecordingTracer implements Tracer { private final List spans = Collections.synchronizedList(new ArrayList<>()); private final ThreadLocal currentSpan = new ThreadLocal<>(); @@ -278,6 +331,21 @@ int countClosedSpanWithParent(String spanName, String parentSpanName) { return count; } + int countClosedSpanWithErrorType(String spanName, String errorType) { + int count = 0; + synchronized (spans) { + for (RecordingSpan span : spans) { + if (span.closed + && span.name.equals(spanName) + && span.throwableError != null + && span.throwableError.getClass().getName().equals(errorType)) { + count++; + } + } + } + return count; + } + SpanScope makeSpanCurrent(RecordingSpan span) { RecordingSpan previous = currentSpan.get(); currentSpan.set(span); @@ -296,6 +364,7 @@ private static final class RecordingSpan implements Span { private final String name; private final SpanKind kind; private final RecordingSpan parent; + private Throwable throwableError; private volatile boolean closed = false; RecordingSpan(RecordingTracer tracer, String name, SpanKind kind, RecordingSpan parent) { @@ -320,6 +389,11 @@ public SpanScope makeCurrent() { return tracer.makeSpanCurrent(this); } + @Override + public void setStatus(@Nullable Status status, @Nullable Throwable error) { + this.throwableError = error; + } + @Override public void end() { this.closed = true; From 167cdc1d84464642c3c8b5e3c7effcc6148182b1 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Mon, 16 Mar 2026 11:33:23 +0300 Subject: [PATCH 4/4] added makeCurrent parent span on close --- .../tech/ydb/core/impl/BaseGrpcTransport.java | 11 +- .../ydb/core/impl/call/ReadStreamCall.java | 69 ++-- .../tech/ydb/core/impl/call/UnaryCall.java | 35 +- .../main/java/tech/ydb/core/tracing/Span.java | 9 + .../opentelemetry/OpenTelemetryTracer.java | 20 +- .../ydb/query/tools/SessionRetryContext.java | 188 ++------- .../tech/ydb/table/SessionRetryContext.java | 391 ++---------------- 7 files changed, 149 insertions(+), 574 deletions(-) diff --git a/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java b/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java index 0738de347..4dea38d01 100644 --- a/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java +++ b/core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java @@ -126,13 +126,14 @@ public CompletableFuture> unaryCall( ClientCall call = channel.getReadyChannel().newCall(method, options); ChannelStatusHandler handler = new ChannelStatusHandler(channel, settings); - + Span contextSpan = settings.getSpan(); if (logger.isTraceEnabled()) { logger.trace("UnaryCall[{}] with method {} and endpoint {} created", traceId, method.getFullMethodName(), endpoint.getHostAndPort()); } Metadata metadata = makeMetadataFromSettings(settings, endpoint); - return new UnaryCall<>(traceId, endpoint.getHostAndPort(), call, handler).startCall(request, metadata); + return new UnaryCall<>(traceId, endpoint.getHostAndPort(), call, handler, contextSpan) + .startCall(request, metadata); } catch (UnexpectedResultException ex) { logger.warn("UnaryCall[{}] got unexpected status {}", traceId, ex.getStatus()); return CompletableFuture.completedFuture(Result.fail(ex)); @@ -164,6 +165,7 @@ public GrpcReadStream readStreamCall( ClientCall call = channel.getReadyChannel().newCall(method, options); ChannelStatusHandler handler = new ChannelStatusHandler(channel, settings); + Span contextSpan = settings.getSpan(); if (logger.isTraceEnabled()) { logger.trace("ReadStreamCall[{}] with method {} and endpoint {} created", @@ -173,7 +175,9 @@ public GrpcReadStream readStreamCall( Metadata metadata = makeMetadataFromSettings(settings, endpoint); GrpcFlowControl flowCtrl = settings.getFlowControl(); - return new ReadStreamCall<>(traceId, endpoint.getHostAndPort(), call, flowCtrl, request, metadata, handler); + return new ReadStreamCall<>( + endpoint.getHostAndPort(), call, flowCtrl, request, metadata, handler, contextSpan + ); } catch (UnexpectedResultException ex) { logger.warn("ReadStreamCall[{}] got unexpected status {}", traceId, ex.getStatus()); return new EmptyStream<>(ex.getStatus()); @@ -254,7 +258,6 @@ private Metadata makeMetadataFromSettings(GrpcRequestSettings settings, Endpoint } Span span = settings.getSpan(); - if (span.isValid()) { span.setAttribute("db.system.name", "ydb"); span.setAttribute("db.namespace", getDatabase()); diff --git a/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java b/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java index 1d1ee65b8..7ed1f6801 100644 --- a/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java +++ b/core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java @@ -19,6 +19,8 @@ import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.grpc.GrpcStatuses; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.tracing.Span; +import tech.ydb.core.tracing.SpanScope; /** * @@ -29,11 +31,12 @@ public class ReadStreamCall extends ClientCall.Listener implements GrpcReadStream { private static final Logger logger = LoggerFactory.getLogger(GrpcTransport.class); - private final String traceId; private final String endpoint; private final ClientCall call; private final Lock callLock = new ReentrantLock(); private final GrpcStatusHandler statusConsumer; + private final Span parentSpan; + private final Span span; private final ReqT request; private final Metadata headers; private final GrpcFlowControl.Call flow; @@ -42,14 +45,15 @@ public class ReadStreamCall extends ClientCall.Listener impl private Observer consumer; - public ReadStreamCall(String traceId, String endpoint, ClientCall call, GrpcFlowControl flowCtrl, - ReqT req, Metadata headers, GrpcStatusHandler statusHandler) { - this.traceId = traceId; + public ReadStreamCall(String endpoint, ClientCall call, GrpcFlowControl flowCtrl, ReqT req, + Metadata headers, GrpcStatusHandler statusHandler, Span span) { this.endpoint = endpoint; this.call = call; this.request = req; this.headers = headers; this.statusConsumer = statusHandler; + this.parentSpan = span.getParentSpan(); + this.span = span; this.flow = flowCtrl.newCall(this::nextRequest); } @@ -67,7 +71,7 @@ public CompletableFuture start(Observer observer) { consumer = observer; call.start(this, headers); if (logger.isTraceEnabled()) { - logger.trace("ReadStreamCall[{}] --> {}", traceId, TextFormat.shortDebugString((Message) request)); + logger.trace("ReadStreamCall[{}] --> {}", endpoint, TextFormat.shortDebugString((Message) request)); } call.sendMessage(request); // close stream by client side @@ -80,7 +84,7 @@ public CompletableFuture start(Observer observer) { try { call.cancel(null, th); } catch (Throwable ex) { - logger.error("ReadStreamCall[{}] got exception while canceling", traceId, ex); + logger.error("ReadStreamCall[{}] got exception while canceling", endpoint, ex); } } finally { callLock.unlock(); @@ -111,42 +115,45 @@ private void nextRequest(int count) { @Override public void onMessage(RespT message) { - try { - if (logger.isTraceEnabled()) { - logger.trace("ReadStreamCall[{}] <-- {}", traceId, TextFormat.shortDebugString((Message) message)); - } - consumer.onNext(message); - flow.onMessageRead(); - } catch (Exception ex) { - statusFuture.completeExceptionally(ex); - + try (SpanScope ignored = span.makeCurrent()) { try { - callLock.lock(); + if (logger.isTraceEnabled()) { + logger.trace("ReadStreamCall[{}] <-- {}", endpoint, TextFormat.shortDebugString((Message) message)); + } + consumer.onNext(message); + flow.onMessageRead(); + } catch (Exception ex) { + statusFuture.completeExceptionally(ex); + try { - call.cancel("Canceled by exception from observer", ex); - } finally { - callLock.unlock(); + callLock.lock(); + try { + call.cancel("Canceled by exception from observer", ex); + } finally { + callLock.unlock(); + } + } catch (Throwable th) { + logger.error("ReadStreamCall[{}] got exception while canceling", endpoint, th); } - } catch (Throwable th) { - logger.error("ReadStreamCall[{}] got exception while canceling", traceId, th); } } } @Override public void onClose(io.grpc.Status status, @Nullable Metadata trailers) { - if (logger.isTraceEnabled()) { - logger.trace("ReadStreamCall[{}] closed with status {}", traceId, status); - } + try (SpanScope ignored = parentSpan.makeCurrent()) { + if (logger.isTraceEnabled()) { + logger.trace("ReadStreamCall[{}] closed with status {}", endpoint, status); + } - statusConsumer.accept(status, trailers); + statusConsumer.accept(status, trailers); - if (status.isOk()) { - statusFuture.complete(Status.SUCCESS); - } else { - statusFuture.complete(GrpcStatuses.toStatus(status, endpoint)); + if (status.isOk()) { + statusFuture.complete(Status.SUCCESS); + } else { + statusFuture.complete(GrpcStatuses.toStatus(status, endpoint)); + } + statusConsumer.postComplete(); } - - statusConsumer.postComplete(); } } diff --git a/core/src/main/java/tech/ydb/core/impl/call/UnaryCall.java b/core/src/main/java/tech/ydb/core/impl/call/UnaryCall.java index 7f74196f6..0ac6075f3 100644 --- a/core/src/main/java/tech/ydb/core/impl/call/UnaryCall.java +++ b/core/src/main/java/tech/ydb/core/impl/call/UnaryCall.java @@ -18,6 +18,8 @@ import tech.ydb.core.StatusCode; import tech.ydb.core.grpc.GrpcStatuses; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.tracing.Span; +import tech.ydb.core.tracing.SpanScope; import tech.ydb.proto.auth.YdbAuth; /** @@ -39,15 +41,18 @@ public class UnaryCall extends ClientCall.Listener { private final String endpoint; private final ClientCall call; private final GrpcStatusHandler statusConsumer; + private final Span parentSpan; private final CompletableFuture> future = new CompletableFuture<>(); private final AtomicReference value = new AtomicReference<>(); - public UnaryCall(String traceId, String endpoint, ClientCall call, GrpcStatusHandler statusConsumer) { + public UnaryCall(String traceId, String endpoint, ClientCall call, + GrpcStatusHandler statusConsumer, Span contextSpan) { this.traceId = traceId; this.endpoint = endpoint; this.call = call; this.statusConsumer = statusConsumer; + this.parentSpan = contextSpan.getParentSpan(); } public CompletableFuture> startCall(ReqT request, Metadata headers) { @@ -91,23 +96,25 @@ public void onMessage(RespT value) { @Override public void onClose(io.grpc.Status status, @Nullable Metadata trailers) { - statusConsumer.accept(status, trailers); - if (logger.isTraceEnabled()) { - logger.trace("UnaryCall[{}] closed with status {}", traceId, status); - } + try (SpanScope ignored = parentSpan.makeCurrent()) { + statusConsumer.accept(status, trailers); + if (logger.isTraceEnabled()) { + logger.trace("UnaryCall[{}] closed with status {}", traceId, status); + } - if (status.isOk()) { - RespT snapshotValue = value.get(); + if (status.isOk()) { + RespT snapshotValue = value.get(); - if (snapshotValue == null) { - future.complete(Result.fail(NO_VALUE)); + if (snapshotValue == null) { + future.complete(Result.fail(NO_VALUE)); + } else { + future.complete(Result.success(snapshotValue)); + } } else { - future.complete(Result.success(snapshotValue)); + future.complete(GrpcStatuses.toResult(status, endpoint)); } - } else { - future.complete(GrpcStatuses.toResult(status, endpoint)); - } - statusConsumer.postComplete(); + statusConsumer.postComplete(); + } } } diff --git a/core/src/main/java/tech/ydb/core/tracing/Span.java b/core/src/main/java/tech/ydb/core/tracing/Span.java index ff2402e52..b2534064d 100644 --- a/core/src/main/java/tech/ydb/core/tracing/Span.java +++ b/core/src/main/java/tech/ydb/core/tracing/Span.java @@ -36,6 +36,15 @@ default boolean isValid() { return false; } + /** + * Returns parent span captured at the moment when this span was created. + * + * @return parent span or {@link #NOOP} if parent is unavailable + */ + default Span getParentSpan() { + return NOOP; + } + /** * Sets a string attribute on the span. * diff --git a/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java index 70381da7c..30cb77f23 100644 --- a/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java +++ b/opentelemetry/src/main/java/tech/ydb/opentelemetry/OpenTelemetryTracer.java @@ -2,6 +2,8 @@ import java.util.Objects; +import javax.annotation.Nullable; + import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.StatusCode; @@ -39,10 +41,11 @@ public static OpenTelemetryTracer fromOpenTelemetry(OpenTelemetry openTelemetry, @Override public Span startSpan(String spanName, SpanKind spanKind) { + Span parentSpan = currentSpan(); io.opentelemetry.api.trace.Span span = tracer.spanBuilder(spanName) .setSpanKind(mapSpanKind(spanKind)) .startSpan(); - return new OtelSpan(span); + return new OtelSpan(span, parentSpan); } @Override @@ -51,7 +54,7 @@ public Span currentSpan() { if (!current.getSpanContext().isValid()) { return Span.NOOP; } - return new OtelSpan(current); + return new OtelSpan(current, Span.NOOP); } private static io.opentelemetry.api.trace.SpanKind mapSpanKind(SpanKind kind) { @@ -63,9 +66,11 @@ private static io.opentelemetry.api.trace.SpanKind mapSpanKind(SpanKind kind) { private static final class OtelSpan implements Span { private final io.opentelemetry.api.trace.Span span; + private final Span parentSpan; - private OtelSpan(io.opentelemetry.api.trace.Span span) { + private OtelSpan(io.opentelemetry.api.trace.Span span, Span parentSpan) { this.span = span; + this.parentSpan = parentSpan; } @Override @@ -79,7 +84,12 @@ public boolean isValid() { } @Override - public void setAttribute(String key, String value) { + public Span getParentSpan() { + return parentSpan; + } + + @Override + public void setAttribute(String key, @Nullable String value) { span.setAttribute(key, value); } @@ -89,7 +99,7 @@ public void setAttribute(String key, long value) { } @Override - public void setStatus(Status status, Throwable error) { + public void setStatus(@Nullable Status status, @Nullable Throwable error) { if (status != null) { if (status.isSuccess()) { span.setStatus(StatusCode.OK); diff --git a/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java b/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java index baacfaae8..b416b07ec 100644 --- a/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java +++ b/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java @@ -6,7 +6,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -16,7 +15,6 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; -import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.StatusCode; @@ -28,14 +26,6 @@ import tech.ydb.core.utils.FutureTools; import tech.ydb.query.QueryClient; import tech.ydb.query.QuerySession; -import tech.ydb.query.QueryStream; -import tech.ydb.query.QueryTransaction; -import tech.ydb.query.result.QueryInfo; -import tech.ydb.query.settings.BeginTransactionSettings; -import tech.ydb.query.settings.CommitTransactionSettings; -import tech.ydb.query.settings.ExecuteQuerySettings; -import tech.ydb.query.settings.RollbackTransactionSettings; -import tech.ydb.table.query.Params; /** @@ -151,7 +141,6 @@ private long backoffTimeMillis(Throwable t, int retryNumber) { */ private abstract class BaseRetryableTask implements Runnable { private final CompletableFuture promise = new CompletableFuture<>(); - private final AtomicBoolean spanFinished = new AtomicBoolean(true); private final AtomicInteger retryNumber = new AtomicInteger(); private final Function> fn; private final Tracer tracer; @@ -183,7 +172,9 @@ public void run() { } public void requestSession() { - startRetrySpan(); + try (SpanScope ignored = parentSpan.makeCurrent()) { + retrySpan = tracer.startSpan(EXECUTE_WITH_RETRY_SPAN_NAME, SpanKind.INTERNAL); + } CompletableFuture> sessionFuture = createSessionWithRetrySpanParent(); if (sessionFuture.isDone() && !sessionFuture.isCompletedExceptionally()) { // faster than subscribing on future @@ -207,30 +198,32 @@ private void acceptSession(@Nonnull Result sessionResult) { } final QuerySession session = sessionResult.getValue(); - final QuerySession tracedSession = retrySpan.isValid() - ? new TracedQuerySession(session, retrySpan) : session; try { - fn.apply(tracedSession).whenComplete((fnResult, fnException) -> { - try { - session.close(); - - if (fnException != null) { - handleException(fnException); - return; - } - - Status status = toStatus(fnResult); - if (status.isSuccess()) { - finishRetrySpan(status, null); - promise.complete(fnResult); - } else { - handleError(status, fnResult); + try (SpanScope ignored = retrySpan.makeCurrent()) { + fn.apply(session).whenComplete((fnResult, fnException) -> { + try { + try (SpanScope ignored1 = retrySpan.makeCurrent()) { + session.close(); + + if (fnException != null) { + handleException(fnException); + return; + } + + Status status = toStatus(fnResult); + if (status.isSuccess()) { + finishRetrySpan(status, null); + promise.complete(fnResult); + } else { + handleError(status, fnResult); + } + } + } catch (Throwable unexpected) { + finishRetrySpan(null, unexpected); + promise.completeExceptionally(unexpected); } - } catch (Throwable unexpected) { - finishRetrySpan(null, unexpected); - promise.completeExceptionally(unexpected); - } - }); + }); + } } catch (RuntimeException ex) { session.close(); handleException(ex); @@ -292,27 +285,12 @@ private CompletableFuture> createSessionWithRetrySpanParent } } - private void startRetrySpan() { - if (!spanFinished.get()) { - return; - } - try (SpanScope ignored = parentSpan.makeCurrent()) { - retrySpan = tracer.startSpan(EXECUTE_WITH_RETRY_SPAN_NAME, SpanKind.INTERNAL); - } - spanFinished.set(false); - retrySpan.setAttribute(RETRY_ATTEMPT_ATTR, retryNumber.get()); - } - private void recordRetrySchedule(int failedAttempt, long nextDelayMillis) { retrySpan.setAttribute(RETRY_ATTEMPT_ATTR, failedAttempt); retrySpan.setAttribute(RETRY_SLEEP_MS_ATTR, nextDelayMillis); } private void finishRetrySpan(Status status, Throwable throwable) { - if (!spanFinished.compareAndSet(false, true)) { - return; - } - retrySpan.setStatus(status, throwable); retrySpan.end(); retrySpan = Span.NOOP; @@ -428,116 +406,4 @@ public SessionRetryContext build() { return new SessionRetryContext(this); } } - - /** - * Wraps QuerySession to propagate retry span as parent for all RPC spans within a retry attempt. - */ - private static final class TracedQuerySession implements QuerySession { - private final QuerySession delegate; - private final Span retrySpan; - - TracedQuerySession(QuerySession delegate, Span retrySpan) { - this.delegate = delegate; - this.retrySpan = retrySpan; - } - - @Override - public String getId() { - return delegate.getId(); - } - - @Override - public QueryTransaction currentTransaction() { - QueryTransaction tx = delegate.currentTransaction(); - return tx != null ? new TracedQueryTransaction(tx, retrySpan) : null; - } - - @Override - public QueryTransaction createNewTransaction(TxMode txMode) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return new TracedQueryTransaction(delegate.createNewTransaction(txMode), retrySpan); - } - } - - @Override - public CompletableFuture> beginTransaction( - TxMode txMode, BeginTransactionSettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.beginTransaction(txMode, settings) - .thenApply(r -> r.map(tx -> new TracedQueryTransaction(tx, retrySpan))); - } - } - - @Override - public QueryStream createQuery(String query, TxMode tx, Params params, ExecuteQuerySettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.createQuery(query, tx, params, settings); - } - } - - @Override - public void close() { - delegate.close(); - } - } - - /** - * Wraps QueryTransaction to propagate retry span as parent for commit/rollback/query spans. - */ - private static final class TracedQueryTransaction implements QueryTransaction { - private final QueryTransaction delegate; - private final Span retrySpan; - - TracedQueryTransaction(QueryTransaction delegate, Span retrySpan) { - this.delegate = delegate; - this.retrySpan = retrySpan; - } - - @Override - public QuerySession getSession() { - return new TracedQuerySession(delegate.getSession(), retrySpan); - } - - @Override - public String getId() { - return delegate.getId(); - } - - @Override - public TxMode getTxMode() { - return delegate.getTxMode(); - } - - @Override - public String getSessionId() { - return delegate.getSessionId(); - } - - @Override - public CompletableFuture getStatusFuture() { - return delegate.getStatusFuture(); - } - - @Override - public CompletableFuture> commit(CommitTransactionSettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.commit(settings); - } - } - - @Override - public CompletableFuture rollback(RollbackTransactionSettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.rollback(settings); - } - } - - @Override - public QueryStream createQuery(String query, boolean commitAtEnd, Params params, - ExecuteQuerySettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.createQuery(query, commitAtEnd, params, settings); - } - } - } } diff --git a/table/src/main/java/tech/ydb/table/SessionRetryContext.java b/table/src/main/java/tech/ydb/table/SessionRetryContext.java index 915f34dc7..128df3e16 100644 --- a/table/src/main/java/tech/ydb/table/SessionRetryContext.java +++ b/table/src/main/java/tech/ydb/table/SessionRetryContext.java @@ -7,7 +7,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -17,50 +16,15 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; -import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; -import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.tracing.Span; import tech.ydb.core.tracing.SpanKind; import tech.ydb.core.tracing.SpanScope; import tech.ydb.core.tracing.Tracer; import tech.ydb.core.utils.FutureTools; -import tech.ydb.proto.ValueProtos; -import tech.ydb.table.description.TableDescription; -import tech.ydb.table.description.TableOptionDescription; -import tech.ydb.table.query.BulkUpsertData; -import tech.ydb.table.query.DataQuery; -import tech.ydb.table.query.DataQueryResult; -import tech.ydb.table.query.ExplainDataQueryResult; -import tech.ydb.table.query.Params; -import tech.ydb.table.query.ReadRowsResult; -import tech.ydb.table.query.ReadTablePart; -import tech.ydb.table.settings.AlterTableSettings; -import tech.ydb.table.settings.BeginTxSettings; -import tech.ydb.table.settings.BulkUpsertSettings; -import tech.ydb.table.settings.CommitTxSettings; -import tech.ydb.table.settings.CopyTableSettings; -import tech.ydb.table.settings.CopyTablesSettings; -import tech.ydb.table.settings.CreateTableSettings; -import tech.ydb.table.settings.DescribeTableOptionsSettings; -import tech.ydb.table.settings.DescribeTableSettings; -import tech.ydb.table.settings.DropTableSettings; -import tech.ydb.table.settings.ExecuteDataQuerySettings; -import tech.ydb.table.settings.ExecuteScanQuerySettings; -import tech.ydb.table.settings.ExecuteSchemeQuerySettings; -import tech.ydb.table.settings.ExplainDataQuerySettings; -import tech.ydb.table.settings.KeepAliveSessionSettings; -import tech.ydb.table.settings.PrepareDataQuerySettings; -import tech.ydb.table.settings.ReadRowsSettings; -import tech.ydb.table.settings.ReadTableSettings; -import tech.ydb.table.settings.RenameTablesSettings; -import tech.ydb.table.settings.RollbackTxSettings; -import tech.ydb.table.transaction.TableTransaction; -import tech.ydb.table.transaction.Transaction; -import tech.ydb.table.transaction.TxControl; /** @@ -109,14 +73,14 @@ public CompletableFuture supplyStatus(Function CompletableFuture> supplyResult(SessionRetryHandler h, - Function>> fn) { + Function>> fn) { RetryableResultTask task = new RetryableResultTask<>(h, fn); task.requestSession(); return task.getFuture(); } public CompletableFuture supplyStatus(SessionRetryHandler h, - Function> fn) { + Function> fn) { RetryableStatusTask task = new RetryableStatusTask(h, fn); task.requestSession(); return task.getFuture(); @@ -186,7 +150,6 @@ private long backoffTimeMillis(Throwable t, int retryNumber) { */ private abstract class BaseRetryableTask implements Runnable { private final CompletableFuture promise = new CompletableFuture<>(); - private final AtomicBoolean spanFinished = new AtomicBoolean(true); private final AtomicInteger retryNumber = new AtomicInteger(); private final Function> fn; private final long createTimestamp = Instant.now().toEpochMilli(); @@ -207,6 +170,7 @@ CompletableFuture getFuture() { } abstract Status toStatus(R result); + abstract R toFailedResult(Result sessionResult); private long ms() { @@ -225,7 +189,9 @@ public void run() { } public void requestSession() { - startRetrySpan(); + try (SpanScope ignored = parentSpan.makeCurrent()) { + retrySpan = tracer.startSpan(EXECUTE_WITH_RETRY_SPAN_NAME, SpanKind.INTERNAL); + } CompletableFuture> sessionFuture = createSessionWithRetrySpanParent(); if (sessionFuture.isDone() && !sessionFuture.isCompletedExceptionally()) { // faster than subscribing on future @@ -249,32 +215,34 @@ private void acceptSession(@Nonnull Result sessionResult) { } final Session session = sessionResult.getValue(); - final Session tracedSession = retrySpan.isValid() - ? new TracedSession(session, retrySpan) : session; try { - fn.apply(tracedSession).whenComplete((fnResult, fnException) -> { - try { - session.close(); - - if (fnException != null) { - handleException(fnException); - return; - } - - Status status = toStatus(fnResult); - if (status.isSuccess()) { - handler.onSuccess(SessionRetryContext.this, retryNumber.get(), ms()); - finishRetrySpan(status, null); - promise.complete(fnResult); - } else { - handleError(status, fnResult); + try (SpanScope ignored = retrySpan.makeCurrent()) { + fn.apply(session).whenComplete((fnResult, fnException) -> { + try { + try (SpanScope ignored1 = retrySpan.makeCurrent()) { + session.close(); + + if (fnException != null) { + handleException(fnException); + return; + } + + Status status = toStatus(fnResult); + if (status.isSuccess()) { + handler.onSuccess(SessionRetryContext.this, retryNumber.get(), ms()); + finishRetrySpan(status, null); + promise.complete(fnResult); + } else { + handleError(status, fnResult); + } + } + } catch (Throwable unexpected) { + handler.onError(SessionRetryContext.this, unexpected, retryNumber.get(), ms()); + finishRetrySpan(null, unexpected); + promise.completeExceptionally(unexpected); } - } catch (Throwable unexpected) { - handler.onError(SessionRetryContext.this, unexpected, retryNumber.get(), ms()); - finishRetrySpan(null, unexpected); - promise.completeExceptionally(unexpected); - } - }); + }); + } } catch (RuntimeException ex) { session.close(); handleException(ex); @@ -342,26 +310,12 @@ private CompletableFuture> createSessionWithRetrySpanParent() { } } - private void startRetrySpan() { - if (!spanFinished.get()) { - return; - } - try (SpanScope ignored = parentSpan.makeCurrent()) { - retrySpan = tracer.startSpan(EXECUTE_WITH_RETRY_SPAN_NAME, SpanKind.INTERNAL); - } - spanFinished.set(false); - retrySpan.setAttribute(RETRY_ATTEMPT_ATTR, retryNumber.get()); - } - private void recordRetrySchedule(int failedAttempt, long nextDelayMillis) { retrySpan.setAttribute(RETRY_ATTEMPT_ATTR, failedAttempt); retrySpan.setAttribute(RETRY_SLEEP_MS_ATTR, nextDelayMillis); } private void finishRetrySpan(Status status, Throwable throwable) { - if (!spanFinished.compareAndSet(false, true)) { - return; - } retrySpan.setStatus(status, throwable); retrySpan.end(); retrySpan = Span.NOOP; @@ -477,285 +431,4 @@ public SessionRetryContext build() { return new SessionRetryContext(this); } } - - /** - * Wraps Session to propagate retry span as parent for all RPC spans within a retry attempt. - */ - private static final class TracedSession implements Session { - private final Session delegate; - private final Span retrySpan; - - TracedSession(Session delegate, Span retrySpan) { - this.delegate = delegate; - this.retrySpan = retrySpan; - } - - @Override - public String getId() { - return delegate.getId(); - } - - @Override - public void close() { - delegate.close(); - } - - @Override - public CompletableFuture createTable(String path, TableDescription tableDescriptions, - CreateTableSettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.createTable(path, tableDescriptions, settings); - } - } - - @Override - public CompletableFuture dropTable(String path, DropTableSettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.dropTable(path, settings); - } - } - - @Override - public CompletableFuture alterTable(String path, AlterTableSettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.alterTable(path, settings); - } - } - - @Override - public CompletableFuture copyTable(String src, String dst, CopyTableSettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.copyTable(src, dst, settings); - } - } - - @Override - public CompletableFuture copyTables(CopyTablesSettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.copyTables(settings); - } - } - - @Override - public CompletableFuture renameTables(RenameTablesSettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.renameTables(settings); - } - } - - @Override - public CompletableFuture> describeTable(String path, - DescribeTableSettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.describeTable(path, settings); - } - } - - @Override - public CompletableFuture> prepareDataQuery(String query, - PrepareDataQuerySettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.prepareDataQuery(query, settings); - } - } - - @Override - public CompletableFuture> executeDataQuery(String query, TxControl txControl, - Params params, ExecuteDataQuerySettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.executeDataQuery(query, txControl, params, settings); - } - } - - @Override - public CompletableFuture> readRows(String pathToTable, ReadRowsSettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.readRows(pathToTable, settings); - } - } - - @Override - public CompletableFuture executeSchemeQuery(String query, ExecuteSchemeQuerySettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.executeSchemeQuery(query, settings); - } - } - - @Override - public CompletableFuture> explainDataQuery(String query, - ExplainDataQuerySettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.explainDataQuery(query, settings); - } - } - - @Override - public CompletableFuture> describeTableOptions( - DescribeTableOptionsSettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.describeTableOptions(settings); - } - } - - @Override - public CompletableFuture> beginTransaction(Transaction.Mode transactionMode, - BeginTxSettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.beginTransaction(transactionMode, settings) - .thenApply(r -> r.map(tx -> new TracedTransaction(tx, retrySpan))); - } - } - - @Override - public TableTransaction createNewTransaction(TxMode txMode) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return new TracedTableTransaction(delegate.createNewTransaction(txMode), retrySpan); - } - } - - @Override - public CompletableFuture> beginTransaction(TxMode txMode, - BeginTxSettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.beginTransaction(txMode, settings) - .thenApply(r -> r.map(tx -> new TracedTableTransaction(tx, retrySpan))); - } - } - - @Override - public CompletableFuture commitTransaction(String txId, CommitTxSettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.commitTransaction(txId, settings); - } - } - - @Override - public CompletableFuture rollbackTransaction(String txId, RollbackTxSettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.rollbackTransaction(txId, settings); - } - } - - @Override - public GrpcReadStream executeReadTable(String tablePath, ReadTableSettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.executeReadTable(tablePath, settings); - } - } - - @Override - public GrpcReadStream executeScanQueryRaw(String query, Params params, - ExecuteScanQuerySettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.executeScanQueryRaw(query, params, settings); - } - } - - @Override - public CompletableFuture> keepAlive(KeepAliveSessionSettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.keepAlive(settings); - } - } - - @Override - public CompletableFuture executeBulkUpsert(String tablePath, BulkUpsertData data, - BulkUpsertSettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.executeBulkUpsert(tablePath, data, settings); - } - } - } - - /** - * Wraps TableTransaction to propagate retry span as parent for commit/rollback/query spans. - */ - private static final class TracedTableTransaction implements TableTransaction { - private final TableTransaction delegate; - private final Span retrySpan; - - TracedTableTransaction(TableTransaction delegate, Span retrySpan) { - this.delegate = delegate; - this.retrySpan = retrySpan; - } - - @Override - public Session getSession() { - return delegate.getSession(); - } - - @Override - public String getId() { - return delegate.getId(); - } - - @Override - public TxMode getTxMode() { - return delegate.getTxMode(); - } - - @Override - public String getSessionId() { - return delegate.getSessionId(); - } - - @Override - public CompletableFuture getStatusFuture() { - return delegate.getStatusFuture(); - } - - @Override - public CompletableFuture> executeDataQuery(String query, boolean commitAtEnd, - Params params, ExecuteDataQuerySettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.executeDataQuery(query, commitAtEnd, params, settings); - } - } - - @Override - public CompletableFuture commit(CommitTxSettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.commit(settings); - } - } - - @Override - public CompletableFuture rollback(RollbackTxSettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.rollback(settings); - } - } - } - - /** - * Wraps legacy Transaction to propagate retry span as parent for commit/rollback spans. - */ - private static final class TracedTransaction implements Transaction { - private final Transaction delegate; - private final Span retrySpan; - - TracedTransaction(Transaction delegate, Span retrySpan) { - this.delegate = delegate; - this.retrySpan = retrySpan; - } - - @Override - public String getId() { - return delegate.getId(); - } - - @Override - public CompletableFuture commit(CommitTxSettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.commit(settings); - } - } - - @Override - public CompletableFuture rollback(RollbackTxSettings settings) { - try (SpanScope ignored = retrySpan.makeCurrent()) { - return delegate.rollback(settings); - } - } - } }