From c8c8542ec0e29da51fea3866ef8e635707a3acaa Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Sun, 28 Jun 2026 05:10:58 +0300 Subject: [PATCH 1/4] chore: drop the throwaway MDC capture in the async instrumentation step DefaultAsyncInstrumentationStep.processAsync initialised `mdc` with an MdcSnapshot.capture() that is overwritten on the first line inside the trace-context use{} block, so every async request paid for one MDC copy that was immediately discarded. Use `lateinit var` instead: the real capture still happens inside the scope (after trace.id / span.id are pushed), the post-scope read is only reachable once the block has run, and the wasted snapshot is gone. --- .../core/http/pipeline/steps/DefaultAsyncInstrumentationStep.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncInstrumentationStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncInstrumentationStep.kt index 99d02131..d83d7cb5 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncInstrumentationStep.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncInstrumentationStep.kt @@ -109,7 +109,7 @@ public class DefaultAsyncInstrumentationStep // .handle do NOT get MDC by default. We capture the MDC snapshot INSIDE the // use{} block (after makeCurrentWithLoggingContext has pushed trace.id / span.id) // and restore it in each branch of the .handle callback. - var mdc: MdcSnapshot = MdcSnapshot.capture() // will be overwritten inside the scope + lateinit var mdc: MdcSnapshot val downstream: CompletableFuture = span.makeCurrentWithLoggingContext().use { // Capture after the scope has pushed trace.id / span.id so the snapshot carries them. From 3e0e9b606cd2778df94e0332023985ff0d0a09f8 Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Sun, 28 Jun 2026 05:10:59 +0300 Subject: [PATCH 2/4] chore: fold the AsyncAuthStep challenge carrier into a single handle handleChallenge ran the challenge future through a handle that only packed its (value, error) into a one-field HookOutcome, then a thenCompose that unpacked it to do the real work. Do the close / fail / dispatch decision inside a single handle that returns the next CompletableFuture, and flatten it with thenCompose { it }. This removes the carrier type and a whole composition stage; the explicit null-check smart-casts the retry request for the dispatch. Behaviour is identical across all three completion paths (no retry, retry, hook error). --- .../sdk/core/http/pipeline/steps/AsyncAuthStep.kt | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncAuthStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncAuthStep.kt index 4534e4e5..ca6c1d29 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncAuthStep.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncAuthStep.kt @@ -101,23 +101,17 @@ public abstract class AsyncAuthStep : AsyncHttpStep { return Futures.failed(t) } - return challengeFuture.handle { retryRequest, hookError -> - HookOutcome(retryRequest, hookError) - }.thenCompose { outcome -> - val hookError = outcome.error + return challengeFuture.handle> { retryRequest, hookError -> if (hookError != null) { response.close() - return@thenCompose Futures.failed(Futures.unwrap(hookError)) + return@handle Futures.failed(Futures.unwrap(hookError)) } - val retryRequest = outcome.request ?: return@thenCompose CompletableFuture.completedFuture(response) + if (retryRequest == null) return@handle CompletableFuture.completedFuture(response) response.close() next.copy().processAsync(retryRequest) - } + }.thenCompose { it } } - /** Carrier so the challenge future's outcome (value or error) survives [CompletableFuture.handle]. */ - private class HookOutcome(val request: Request?, val error: Throwable?) - /** * Returns a future of [request] with the credential's auth header attached. Subclasses * implement the concrete async stamping (e.g. fetch-or-refresh a bearer token off-thread, From e18743a20cf8adb8d7076e822a0db7b902f5bb98 Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Sun, 28 Jun 2026 05:10:59 +0300 Subject: [PATCH 3/4] chore: hoist the shared bearer-challenge check to one top-level function MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BearerTokenAuthStep and AsyncBearerTokenAuthStep each defined a private offersBearerChallenge(response) and a private "bearer" scheme constant with identical bodies. Hoist both to a single top-level internal function and constant in BearerTokenAuthStep.kt (mirroring how defaultShouldRetryResponse already lives top-level next to HttpRetryOptions) and call it from both steps. The async file drops its copy and its now-orphaned AuthChallengeParser import. evictRejectedToken stays per-step — it touches each step's own lock and cached token. --- .../steps/AsyncBearerTokenAuthStep.kt | 11 ------- .../pipeline/steps/BearerTokenAuthStep.kt | 31 ++++++++++--------- 2 files changed, 16 insertions(+), 26 deletions(-) diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep.kt index 369130ad..8102279d 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/AsyncBearerTokenAuthStep.kt @@ -7,7 +7,6 @@ package org.dexpace.sdk.core.http.pipeline.steps -import org.dexpace.sdk.core.http.auth.AuthChallengeParser import org.dexpace.sdk.core.http.auth.BearerToken import org.dexpace.sdk.core.http.auth.BearerTokenProvider import org.dexpace.sdk.core.http.common.HttpHeaderName @@ -255,15 +254,6 @@ public open class AsyncBearerTokenAuthStep return nonNull } - /** - * Returns `true` when [response]'s `WWW-Authenticate` header advertises a `Bearer` - * challenge. - */ - private fun offersBearerChallenge(response: Response): Boolean { - val header = response.headers.get(HttpHeaderName.WWW_AUTHENTICATE) ?: return false - return AuthChallengeParser.parse(header).any { it.scheme == BEARER_SCHEME } - } - /** * Clears [cachedToken] iff it is still the token whose stamped header is [rejectedHeader]. * Guarded by the same [lock] as the fetch path so the read-compare-clear is atomic against @@ -287,6 +277,5 @@ public open class AsyncBearerTokenAuthStep private companion object { private const val DEFAULT_REFRESH_MARGIN_SECONDS = 30L - private const val BEARER_SCHEME = "bearer" } } diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/BearerTokenAuthStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/BearerTokenAuthStep.kt index ff1e8b81..2443fac7 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/BearerTokenAuthStep.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/BearerTokenAuthStep.kt @@ -139,17 +139,6 @@ public open class BearerTokenAuthStep return authorizeRequest(request) } - /** - * Returns `true` when [response]'s `WWW-Authenticate` header advertises a `Bearer` - * challenge. A header with only non-bearer challenges (or one that does not parse) returns - * `false`. [AuthStep] guarantees the header is present before this hook runs; the explicit - * null-guard keeps the method correct if called from elsewhere. - */ - private fun offersBearerChallenge(response: Response): Boolean { - val header = response.headers.get(HttpHeaderName.WWW_AUTHENTICATE) ?: return false - return AuthChallengeParser.parse(header).any { it.scheme == BEARER_SCHEME } - } - /** * Clears [cachedToken] iff it is still the token whose stamped header is [rejectedHeader]. * Guarded by the same [lock] as the refresh path so the read-compare-clear is atomic @@ -224,9 +213,21 @@ public open class BearerTokenAuthStep // Default refresh margin: refresh the bearer token 30 seconds before its expiry // so an in-flight request never carries a near-expired credential. private const val DEFAULT_REFRESH_MARGIN_SECONDS = 30L - - // Lower-cased `Bearer` scheme name; AuthChallengeParser normalises schemes to lower - // case, so the eviction gate compares against this constant. - private const val BEARER_SCHEME = "bearer" } } + +/** + * Returns `true` when [response]'s `WWW-Authenticate` header advertises a `Bearer` challenge. A + * header with only non-bearer challenges (or one that does not parse) returns `false`. The + * [AuthStep] / [AsyncAuthStep] pillar guarantees the header is present before the challenge hook + * runs; the explicit null-guard keeps the function correct if called from elsewhere. Shared by + * [BearerTokenAuthStep] and [AsyncBearerTokenAuthStep]. + */ +internal fun offersBearerChallenge(response: Response): Boolean { + val header = response.headers.get(HttpHeaderName.WWW_AUTHENTICATE) ?: return false + return AuthChallengeParser.parse(header).any { it.scheme == BEARER_SCHEME } +} + +// Lower-cased `Bearer` scheme name; AuthChallengeParser normalises schemes to lower case, so the +// eviction gate compares against this constant. +internal const val BEARER_SCHEME: String = "bearer" From 054d8802f9d68d464095bda6a89b026c1ebfeb0c Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Sun, 28 Jun 2026 05:11:11 +0300 Subject: [PATCH 4/4] chore: extract the shared retry policy into RetryPolicySupport MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DefaultRetryStep and DefaultAsyncRetryStep carried byte-for-byte copies of the entire stateless retry policy: options clamping, the RetrySettings backoff view, re-sendability gating, interrupt normalisation, predicate invocation, the caller delay override, and fixed/exponential backoff. Only the per-call loop machinery and terminal-failure paths genuinely differ between the blocking and async stacks. Pull the stateless policy into one internal RetryPolicySupport that clamps the options and builds the backoff settings once and exposes the shared helpers; both steps now hold a `support` and delegate. The retry decision and backoff logic lives in one place, so a change to clamping, backoff, or predicate-exception handling can no longer apply to only one stack. Each step keeps what actually differs: the per-call should-retry checks (they read tryCount / suppressed), the distinct terminal paths, the protected-open delay hooks, and closeQuietly with its deliberately different catch width (sync swallows IOException, async swallows Exception). RetryPolicySupport takes only (rawOptions, logger): the clock is unused by any helper that moved — retryAfterFromHeaders, the only clock consumer, stays on each step. Also add an internal HttpRetryOptions.withMaxRetries copy helper so the negative-maxRetries clamp no longer hand-rebuilds all eight fields through the constructor; RetryPolicySupport.clampOptions collapses to a single line. All internal — no public signature changes, apiCheck unaffected. --- .../pipeline/steps/DefaultAsyncRetryStep.kt | 114 ++---------- .../http/pipeline/steps/DefaultRetryStep.kt | 176 ++---------------- .../http/pipeline/steps/HttpRetryOptions.kt | 17 ++ .../http/pipeline/steps/RetryPolicySupport.kt | 121 ++++++++++++ 4 files changed, 173 insertions(+), 255 deletions(-) create mode 100644 sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/RetryPolicySupport.kt diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep.kt index c903d2ef..a0a3ea1b 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncRetryStep.kt @@ -9,7 +9,6 @@ package org.dexpace.sdk.core.http.pipeline.steps import org.dexpace.sdk.core.http.common.HttpHeaderName import org.dexpace.sdk.core.http.pipeline.AsyncPipelineNext -import org.dexpace.sdk.core.http.request.Method import org.dexpace.sdk.core.http.request.Request import org.dexpace.sdk.core.http.response.Response import org.dexpace.sdk.core.instrumentation.ClientLogger @@ -103,30 +102,20 @@ public open class DefaultAsyncRetryStep private val clock: Clock = Clock.SYSTEM, internal val logger: ClientLogger = ClientLogger(DefaultAsyncRetryStep::class), ) : AsyncRetryStep() { - /** Effective options. `maxRetries < 0` is clamped to [DefaultRetryStep.DEFAULT_MAX_RETRIES]. */ - private val options: HttpRetryOptions = clampOptions(options) - /** - * The [options]' exponential parameters as a [RetrySettings] view so the shared - * [BackoffCalculator] computes this stack's schedule — built once, exactly as - * [DefaultRetryStep.backoffSettings]. `totalTimeout = ZERO` disables the deadline cap. - * Building it eagerly validates the delay magnitudes at construction. + * The stateless retry policy shared with [DefaultRetryStep]: the clamped [HttpRetryOptions] + * (`maxRetries < 0` → [DefaultRetryStep.DEFAULT_MAX_RETRIES]), the [RetrySettings] backoff + * view, and the re-sendability / predicate / delay helpers both stacks share. Built once; + * immutable after construction, so it is safe to share across this step's concurrent calls. */ - private val backoffSettings: RetrySettings = - RetrySettings.builder() - .initialDelay(this.options.baseDelay) - .maxDelay(this.options.maxDelay) - .delayMultiplier(RetrySettings.DEFAULT_DELAY_MULTIPLIER) - .jitter(RetrySettings.DEFAULT_JITTER) - .totalTimeout(Duration.ZERO) - .build() + private val support = RetryPolicySupport(options, logger) override fun processAsync( request: Request, next: AsyncPipelineNext, ): CompletableFuture { val result = CompletableFuture() - val driver = RetryDriver(next, isRetrySafe(request), result) + val driver = RetryDriver(next, support.isRetrySafe(request), result) driver.drive() return result } @@ -234,7 +223,7 @@ public open class DefaultAsyncRetryStep try { val retry = retrySafe && - tryCount < options.maxRetries && + tryCount < support.options.maxRetries && shouldRetryResponse(response) if (!retry) { // Not retrying: hand the still-open response to the caller, who then @@ -288,14 +277,14 @@ public open class DefaultAsyncRetryStep // pre-classification interrupt carve-out. if (exception is InterruptedIOException || exception is InterruptedException) { Thread.currentThread().interrupt() - failTerminally(asInterruptedIo(exception)) + failTerminally(support.asInterruptedIo(exception)) return } val delay: Duration = try { val retry = retrySafe && - tryCount < options.maxRetries && + tryCount < support.options.maxRetries && shouldRetryException(exception) if (!retry) { failTerminally(exception) @@ -371,12 +360,12 @@ public open class DefaultAsyncRetryStep private fun shouldRetryResponse(response: Response): Boolean { val condition = HttpRetryCondition(response, null, tryCount, (suppressed ?: emptyList())) - return invokeShouldRetry(options.shouldRetryCondition, condition) + return support.invokeShouldRetry(support.options.shouldRetryCondition, condition) } private fun shouldRetryException(exception: Exception): Boolean { val condition = HttpRetryCondition(null, exception, tryCount, (suppressed ?: emptyList())) - return invokeShouldRetry(options.shouldRetryException, condition) + return support.invokeShouldRetry(support.options.shouldRetryException, condition) } // --------------- Logging --------------- @@ -405,53 +394,6 @@ public open class DefaultAsyncRetryStep } } - // --------------- Shared helpers (stateless across calls) --------------- - - private fun isRetrySafe(request: Request): Boolean { - val body = request.body ?: return request.method in IDEMPOTENT_METHODS - return body.isReplayable() - } - - /** - * Normalises an interrupt-signalling exception to [InterruptedIOException]: an - * [InterruptedIOException] is returned as-is; a bare [InterruptedException] is wrapped with - * the original attached as its cause. Mirrors [DefaultRetryStep]'s helper of the same name. - */ - private fun asInterruptedIo(exception: Exception): InterruptedIOException = - when (exception) { - is InterruptedIOException -> exception - else -> InterruptedIOException("retry interrupted").apply { initCause(exception) } - } - - private fun invokeShouldRetry( - predicate: HttpRetryConditionPredicate, - condition: HttpRetryCondition, - ): Boolean = - try { - predicate.shouldRetry(condition) - } catch (t: Throwable) { - @Suppress("InstanceOfCheckForException") - if (t is Error) throw t - throw IllegalStateException("shouldRetry predicate threw", t) - } - - private fun invokeDelayFromCondition(condition: HttpRetryCondition): Duration? = - try { - options.delayFromCondition.delayFor(condition) - } catch (t: Throwable) { - @Suppress("InstanceOfCheckForException") - if (t is Error) throw t - logger.atWarning() - .event("http.retry.delay_override_failed") - .field("error.type", t::class.java.simpleName ?: "Throwable") - .cause(t) - .log() - null - } - - private fun backoffOrFixed(tryCount: Int): Duration = - options.fixedDelay ?: BackoffCalculator.computeDelay(tryCount + 1, backoffSettings) - // --------------- Delay computation (subclass extension points) --------------- /** @@ -467,9 +409,9 @@ public open class DefaultAsyncRetryStep * by the loop's close-on-throw guard). */ protected open fun computeResponseDelay(condition: HttpRetryCondition): Duration { - invokeDelayFromCondition(condition)?.let { return it } + support.invokeDelayFromCondition(condition)?.let { return it } condition.response?.let { retryAfterFromHeaders(it) }?.let { return it } - return backoffOrFixed(condition.tryCount) + return support.backoffOrFixed(condition.tryCount) } /** @@ -478,8 +420,8 @@ public open class DefaultAsyncRetryStep * with the same invariants. Mirrors [DefaultRetryStep.computeExceptionDelay]. */ protected open fun computeExceptionDelay(condition: HttpRetryCondition): Duration { - invokeDelayFromCondition(condition)?.let { return it } - return backoffOrFixed(condition.tryCount) + support.invokeDelayFromCondition(condition)?.let { return it } + return support.backoffOrFixed(condition.tryCount) } /** @@ -490,7 +432,7 @@ public open class DefaultAsyncRetryStep */ protected open fun retryAfterFromHeaders(response: Response): Duration? { val now = clock.now() - for (name in options.retryAfterHeaders) { + for (name in support.options.retryAfterHeaders) { val raw = response.headers.get(name) ?: continue RetryAfterParser.parseHeaderValue(name, raw, now)?.let { return it } } @@ -514,32 +456,8 @@ public open class DefaultAsyncRetryStep } } - private fun clampOptions(opts: HttpRetryOptions): HttpRetryOptions { - if (opts.maxRetries >= 0) return opts - logger.atVerbose() - .event("http.retry.maxRetries_clamped") - .field("http.retry.max_retries.requested", opts.maxRetries.toLong()) - .field("http.retry.max_retries.applied", DefaultRetryStep.DEFAULT_MAX_RETRIES.toLong()) - .log() - return HttpRetryOptions( - maxRetries = DefaultRetryStep.DEFAULT_MAX_RETRIES, - baseDelay = opts.baseDelay, - maxDelay = opts.maxDelay, - fixedDelay = opts.fixedDelay, - retryAfterHeaders = opts.retryAfterHeaders, - shouldRetryCondition = opts.shouldRetryCondition, - shouldRetryException = opts.shouldRetryException, - delayFromCondition = opts.delayFromCondition, - ) - } - public companion object { // Nanoseconds in one millisecond — converts monotonic deltas to ms for log events. private const val NANOS_PER_MILLI = 1_000_000L - - // Methods safe to re-send regardless of body replayability (idempotent per RFC 9110). - // Mirrors DefaultRetryStep.IDEMPOTENT_METHODS / RetrySettings.DEFAULT_RETRYABLE_METHODS. - private val IDEMPOTENT_METHODS: Set = - setOf(Method.GET, Method.HEAD, Method.OPTIONS, Method.PUT, Method.DELETE) } } diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultRetryStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultRetryStep.kt index 8df0f34a..4e814951 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultRetryStep.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultRetryStep.kt @@ -9,7 +9,6 @@ package org.dexpace.sdk.core.http.pipeline.steps import org.dexpace.sdk.core.http.common.HttpHeaderName import org.dexpace.sdk.core.http.pipeline.PipelineNext -import org.dexpace.sdk.core.http.request.Method import org.dexpace.sdk.core.http.request.Request import org.dexpace.sdk.core.http.response.Response import org.dexpace.sdk.core.instrumentation.ClientLogger @@ -145,37 +144,15 @@ public open class DefaultRetryStep internal val logger: ClientLogger = ClientLogger(DefaultRetryStep::class), ) : RetryStep() { /** - * Effective options. `maxRetries < 0` is clamped to [DEFAULT_MAX_RETRIES] at - * construction; a verbose log records the clamp so callers can spot config bugs. + * The stateless retry policy shared with [DefaultAsyncRetryStep]: the clamped + * [HttpRetryOptions] (`maxRetries < 0` → [DEFAULT_MAX_RETRIES], logged at construction), the + * [RetrySettings] backoff view, and the re-sendability / predicate / delay helpers both + * stacks share. Built once; immutable after construction. Building the backoff view also + * validates the delay magnitudes eagerly — a pathological `baseDelay`/`maxDelay` surfaces as + * an [IllegalArgumentException] here, at step construction, rather than at delay-computation + * time. */ - private val options: HttpRetryOptions = clampOptions(options) - - /** - * The [options]' exponential parameters expressed as a [RetrySettings] view so the shared - * [BackoffCalculator] can compute this stack's schedule. Built once per step instance: - * - `initialDelay` / `maxDelay` come from the options. - * - `delayMultiplier` (2.0) and `jitter` (0.2) are the canonical shared constants — the - * options object does not expose its own multiplier/jitter, so the SDK defaults apply. - * If [HttpRetryOptions] ever gains configurable multiplier/jitter knobs, this view must - * read them from the options instead of the constants, or the new knobs are silently - * ignored on this stack. - * - `totalTimeout = ZERO` disables the deadline cap: the stage-based step has no budget. - * The `fixedDelay` path never consults this view; it short-circuits in [backoffOrFixed]. - * - * Building this view also validates the delay magnitudes eagerly: [RetrySettings.builder] - * rejects a negative `baseDelay`/`maxDelay` and one larger than the calculator's - * ~292-year nanosecond ceiling. [HttpRetryOptions] performs no such range check, so a - * pathological delay surfaces as an [IllegalArgumentException] here, at step construction, - * rather than later at delay-computation time. - */ - private val backoffSettings: RetrySettings = - RetrySettings.builder() - .initialDelay(this.options.baseDelay) - .maxDelay(this.options.maxDelay) - .delayMultiplier(RetrySettings.DEFAULT_DELAY_MULTIPLIER) - .jitter(RetrySettings.DEFAULT_JITTER) - .totalTimeout(Duration.ZERO) - .build() + private val support = RetryPolicySupport(options, logger) /** * Sends [request] through the downstream pipeline with automatic retry on retryable failures. @@ -209,7 +186,7 @@ public open class DefaultRetryStep // safely re-sent: the second writeTo would trip the body's consume-once guard. When // that holds, the loop runs exactly one attempt and never retries — mirroring the // pipeline-primitives RetryStep.canRetry() invariant. - val retrySafe = isRetrySafe(request) + val retrySafe = support.isRetrySafe(request) // The retry loop has distinct continue / return paths per attempt outcome // (success-retryable / success-final / exception-retryable / exception-final). @@ -222,7 +199,7 @@ public open class DefaultRetryStep val response = attemptResult.getOrThrow() val shouldRetry = retrySafe && - tryCount < options.maxRetries && + tryCount < support.options.maxRetries && decideRetryResponse(response, tryCount, suppressed, retrySequenceStartNanos) if (shouldRetry) { tryCount++ @@ -246,11 +223,11 @@ public open class DefaultRetryStep // the @Throws(IOException) contract holds, with prior failures attached. if (exception is InterruptedIOException || exception is InterruptedException) { Thread.currentThread().interrupt() - val ioe = asInterruptedIo(exception) + val ioe = support.asInterruptedIo(exception) suppressed?.forEach(ioe::addSuppressed) throw ioe } - if (retrySafe && tryCount < options.maxRetries) { + if (retrySafe && tryCount < support.options.maxRetries) { val accumulator = suppressed ?: ArrayList().also { suppressed = it } if (decideRetryException(exception, tryCount, accumulator, retrySequenceStartNanos)) { tryCount++ @@ -280,34 +257,6 @@ public open class DefaultRetryStep } } - /** - * Returns `true` when [request] may be re-sent. A body-less request is retry-safe only - * when its method is idempotent ([IDEMPOTENT_METHODS]) — the gate keys off method - * idempotency, not off the absence of a body, so a body-less non-idempotent request (a - * bare `POST`) is NOT retry-safe even though there is no payload to re-send. A body-bearing - * request is retry-safe only when its body is replayable — a non-replayable body cannot be - * re-sent (the second `RequestBody.writeTo` trips the body's consume-once guard and - * surfaces as a confusing wrapped [IllegalStateException]). Making a re-sent body-bearing - * request idempotent is the caller's responsibility. Mirrors - * `pipeline.step.retry.RetryStep.canRetry`. - */ - private fun isRetrySafe(request: Request): Boolean { - val body = request.body ?: return request.method in IDEMPOTENT_METHODS - return body.isReplayable() - } - - /** - * Normalises an interrupt-signalling exception to [InterruptedIOException]. An - * [InterruptedIOException] is returned as-is; a bare [InterruptedException] is wrapped - * (with the original attached as cause) so the loop can satisfy its `@Throws(IOException)` - * contract while preserving the cancellation signal. - */ - private fun asInterruptedIo(exception: Exception): InterruptedIOException = - when (exception) { - is InterruptedIOException -> exception - else -> InterruptedIOException("retry interrupted").apply { initCause(exception) } - } - /** * Executes a single pipeline attempt and returns the result (success or exception) * wrapped in a [Result]. Errors ([Error] subclasses) are NOT caught here — they @@ -343,7 +292,7 @@ public open class DefaultRetryStep // closing on the throw path is safe even though the happy path closes again. val delay: Duration = try { - if (!invokeShouldRetryResponse(condition)) return false + if (!support.invokeShouldRetry(support.options.shouldRetryCondition, condition)) return false computeResponseDelay(condition) } catch (t: Throwable) { closeQuietly(response) @@ -370,7 +319,7 @@ public open class DefaultRetryStep retrySequenceStartNanos: Long, ): Boolean { val condition = HttpRetryCondition(null, exception, tryCount, accumulator) - if (!invokeShouldRetryException(condition)) return false + if (!support.invokeShouldRetry(support.options.shouldRetryException, condition)) return false val delay = computeExceptionDelay(condition) logRetry(tryCount, delay, statusCode = -1, cause = exception, retrySequenceStartNanos) // Append the current exception BEFORE sleeping. If the sleep is @@ -398,33 +347,6 @@ public open class DefaultRetryStep } } - // --------------- Should-retry hooks --------------- - - /** - * Invokes [HttpRetryOptions.shouldRetryCondition] and wraps any thrown exception as - * [IllegalStateException] — predicates must not destabilise the retry loop. - */ - private fun invokeShouldRetryResponse(condition: HttpRetryCondition): Boolean = - invokeShouldRetry(options.shouldRetryCondition, condition) - - private fun invokeShouldRetryException(condition: HttpRetryCondition): Boolean = - invokeShouldRetry(options.shouldRetryException, condition) - - private fun invokeShouldRetry( - predicate: HttpRetryConditionPredicate, - condition: HttpRetryCondition, - ): Boolean = - try { - predicate.shouldRetry(condition) - } catch (t: Throwable) { - // Error subclasses still rethrown; an OOM in the predicate must not be wrapped. - // Splitting Error from RuntimeException via `is` is the canonical JVM idiom for - // retry classification — there is no other way to distinguish JVM Errors here. - @Suppress("InstanceOfCheckForException") - if (t is Error) throw t - throw IllegalStateException("shouldRetry predicate threw", t) - } - // --------------- Delay computation --------------- /** @@ -434,9 +356,9 @@ public open class DefaultRetryStep * 3. [HttpRetryOptions.fixedDelay] or exponential backoff. */ protected open fun computeResponseDelay(condition: HttpRetryCondition): Duration { - invokeDelayFromCondition(condition)?.let { return it } + support.invokeDelayFromCondition(condition)?.let { return it } condition.response?.let { retryAfterFromHeaders(it) }?.let { return it } - return backoffOrFixed(condition.tryCount) + return support.backoffOrFixed(condition.tryCount) } /** @@ -444,41 +366,10 @@ public open class DefaultRetryStep * consult, so the resolution order skips step (2) of [computeResponseDelay]. */ protected open fun computeExceptionDelay(condition: HttpRetryCondition): Duration { - invokeDelayFromCondition(condition)?.let { return it } - return backoffOrFixed(condition.tryCount) + support.invokeDelayFromCondition(condition)?.let { return it } + return support.backoffOrFixed(condition.tryCount) } - private fun invokeDelayFromCondition(condition: HttpRetryCondition): Duration? = - try { - options.delayFromCondition.delayFor(condition) - } catch (t: Throwable) { - // `is Error` is the canonical JVM idiom for splitting catastrophic Errors from - // user-recoverable Exceptions; detekt's preference for class hierarchy doesn't - // apply when classifying for retry/rethrow. - @Suppress("InstanceOfCheckForException") - if (t is Error) throw t - // Don't fail the whole pipeline if the user override misbehaves — fall back to - // the default delay calculation. Log loud enough that the bug is observable. - logger.atWarning() - .event("http.retry.delay_override_failed") - .field("error.type", t::class.java.simpleName ?: "Throwable") - .cause(t) - .log() - null - } - - /** - * Returns [HttpRetryOptions.fixedDelay] if set, otherwise the exponential-backoff delay - * for [tryCount]. The backoff is computed by the shared [BackoffCalculator] from - * [backoffSettings] so this stack and the recovery-aware `RetryStep` share one formula. - * - * [tryCount] is 0-indexed here (`0` = the delay before the first retry), whereas - * [BackoffCalculator.computeDelay] is 1-indexed (`1` = first retry); the `+ 1` bridges - * the two so both produce `baseDelay`, `2·baseDelay`, `4·baseDelay`, … capped at `maxDelay`. - */ - private fun backoffOrFixed(tryCount: Int): Duration = - options.fixedDelay ?: BackoffCalculator.computeDelay(tryCount + 1, backoffSettings) - // --------------- Retry-After parsing --------------- /** @@ -495,7 +386,7 @@ public open class DefaultRetryStep */ protected open fun retryAfterFromHeaders(response: Response): Duration? { val now = clock.now() - for (name in options.retryAfterHeaders) { + for (name in support.options.retryAfterHeaders) { val raw = response.headers.get(name) ?: continue RetryAfterParser.parseHeaderValue(name, raw, now)?.let { return it } } @@ -552,29 +443,6 @@ public open class DefaultRetryStep event.log() } - // --------------- Options clamping --------------- - - private fun clampOptions(opts: HttpRetryOptions): HttpRetryOptions { - if (opts.maxRetries >= 0) return opts - logger.atVerbose() - .event("http.retry.maxRetries_clamped") - .field("http.retry.max_retries.requested", opts.maxRetries.toLong()) - .field("http.retry.max_retries.applied", DEFAULT_MAX_RETRIES.toLong()) - .log() - // HttpRetryOptions isn't a data class, so an explicit copy via the constructor is - // the cheapest correct fix — re-using every other field as-is. - return HttpRetryOptions( - maxRetries = DEFAULT_MAX_RETRIES, - baseDelay = opts.baseDelay, - maxDelay = opts.maxDelay, - fixedDelay = opts.fixedDelay, - retryAfterHeaders = opts.retryAfterHeaders, - shouldRetryCondition = opts.shouldRetryCondition, - shouldRetryException = opts.shouldRetryException, - delayFromCondition = opts.delayFromCondition, - ) - } - public companion object { /** * Default retry count applied when the caller passes a negative @@ -589,11 +457,5 @@ public open class DefaultRetryStep // Nanoseconds in one millisecond — used to convert monotonic-clock deltas to ms // for retry log events. private const val NANOS_PER_MILLI = 1_000_000L - - // Methods safe to re-send regardless of body replayability (idempotent per RFC 9110). - // A request using one of these may always be retried; others require a replayable - // body. Mirrors RetrySettings.DEFAULT_RETRYABLE_METHODS. - private val IDEMPOTENT_METHODS: Set = - setOf(Method.GET, Method.HEAD, Method.OPTIONS, Method.PUT, Method.DELETE) } } diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/HttpRetryOptions.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/HttpRetryOptions.kt index 8248ac25..eaabe013 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/HttpRetryOptions.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/HttpRetryOptions.kt @@ -80,6 +80,23 @@ public class HttpRetryOptions HttpRetryConditionPredicate(::defaultShouldRetryException), public val delayFromCondition: HttpRetryDelayProvider = HttpRetryDelayProvider { null }, ) { + /** + * Returns a copy of these options with [maxRetries] replaced and every other field + * preserved. Used by the retry steps to apply the negative-`maxRetries` clamp without + * hand-rebuilding all eight fields at each call site. + */ + internal fun withMaxRetries(maxRetries: Int): HttpRetryOptions = + HttpRetryOptions( + maxRetries = maxRetries, + baseDelay = baseDelay, + maxDelay = maxDelay, + fixedDelay = fixedDelay, + retryAfterHeaders = retryAfterHeaders, + shouldRetryCondition = shouldRetryCondition, + shouldRetryException = shouldRetryException, + delayFromCondition = delayFromCondition, + ) + public companion object { // The default retry count is the canonical SDK budget, kept in one place on // DefaultRetryStep (initial send + DEFAULT_MAX_RETRIES == RetrySettings.DEFAULT_MAX_ATTEMPTS). diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/RetryPolicySupport.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/RetryPolicySupport.kt new file mode 100644 index 00000000..dd1e2b1e --- /dev/null +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/RetryPolicySupport.kt @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.http.pipeline.steps + +import org.dexpace.sdk.core.http.request.Method +import org.dexpace.sdk.core.http.request.Request +import org.dexpace.sdk.core.instrumentation.ClientLogger +import org.dexpace.sdk.core.pipeline.step.retry.BackoffCalculator +import org.dexpace.sdk.core.pipeline.step.retry.RetrySettings +import java.io.InterruptedIOException +import java.time.Duration + +/** + * Stateless retry policy shared by [DefaultRetryStep] and [DefaultAsyncRetryStep]. Clamps the + * caller's [HttpRetryOptions] (a negative `maxRetries` becomes + * [DefaultRetryStep.DEFAULT_MAX_RETRIES]) and builds the [RetrySettings] backoff view once, then + * exposes the policy helpers both drivers share: re-sendability gating, interrupt normalisation, + * predicate invocation, the caller delay override, and fixed-or-exponential backoff. + * + * Per-call state (try count, suppressed trail, terminal completion), the `protected open` delay + * hooks, and the close-on-discard helper stay on each step — those differ between the blocking and + * async stacks. Holds no mutable state after construction, so it is safe to share across + * concurrent calls. + */ +internal class RetryPolicySupport( + rawOptions: HttpRetryOptions, + private val logger: ClientLogger, +) { + /** Effective options; `maxRetries < 0` is clamped to [DefaultRetryStep.DEFAULT_MAX_RETRIES]. */ + val options: HttpRetryOptions = clampOptions(rawOptions) + + /** + * The [options]' exponential parameters as a [RetrySettings] view so the shared + * [BackoffCalculator] computes this stack's schedule. Built once; `totalTimeout = ZERO` + * disables the deadline cap. Building it eagerly validates the delay magnitudes. + */ + val backoffSettings: RetrySettings = + RetrySettings.builder() + .initialDelay(options.baseDelay) + .maxDelay(options.maxDelay) + .delayMultiplier(RetrySettings.DEFAULT_DELAY_MULTIPLIER) + .jitter(RetrySettings.DEFAULT_JITTER) + .totalTimeout(Duration.ZERO) + .build() + + /** + * Returns `true` when [request] may be re-sent: a body-less request only when its method is + * idempotent ([IDEMPOTENT_METHODS]); a body-bearing request only when its body is replayable. + */ + fun isRetrySafe(request: Request): Boolean { + val body = request.body ?: return request.method in IDEMPOTENT_METHODS + return body.isReplayable() + } + + /** + * Normalises an interrupt-signalling exception to [InterruptedIOException]: an + * [InterruptedIOException] is returned as-is; a bare [InterruptedException] is wrapped with the + * original attached as its cause. + */ + fun asInterruptedIo(exception: Exception): InterruptedIOException = + when (exception) { + is InterruptedIOException -> exception + else -> InterruptedIOException("retry interrupted").apply { initCause(exception) } + } + + fun invokeShouldRetry( + predicate: HttpRetryConditionPredicate, + condition: HttpRetryCondition, + ): Boolean = + try { + predicate.shouldRetry(condition) + } catch (t: Throwable) { + // Error subclasses still rethrown; an OOM in the predicate must not be wrapped. + // Splitting Error from RuntimeException via `is` is the canonical JVM idiom for + // retry classification — there is no other way to distinguish JVM Errors here. + @Suppress("InstanceOfCheckForException") + if (t is Error) throw t + throw IllegalStateException("shouldRetry predicate threw", t) + } + + fun invokeDelayFromCondition(condition: HttpRetryCondition): Duration? = + try { + options.delayFromCondition.delayFor(condition) + } catch (t: Throwable) { + @Suppress("InstanceOfCheckForException") + if (t is Error) throw t + // Don't fail the whole pipeline if the user override misbehaves — fall back to the + // default delay calculation. Log loud enough that the bug is observable. + logger.atWarning() + .event("http.retry.delay_override_failed") + .field("error.type", t::class.java.simpleName ?: "Throwable") + .cause(t) + .log() + null + } + + fun backoffOrFixed(tryCount: Int): Duration = + options.fixedDelay ?: BackoffCalculator.computeDelay(tryCount + 1, backoffSettings) + + private fun clampOptions(opts: HttpRetryOptions): HttpRetryOptions { + if (opts.maxRetries >= 0) return opts + logger.atVerbose() + .event("http.retry.maxRetries_clamped") + .field("http.retry.max_retries.requested", opts.maxRetries.toLong()) + .field("http.retry.max_retries.applied", DefaultRetryStep.DEFAULT_MAX_RETRIES.toLong()) + .log() + return opts.withMaxRetries(DefaultRetryStep.DEFAULT_MAX_RETRIES) + } + + private companion object { + // Methods safe to re-send regardless of body replayability (idempotent per RFC 9110). + // Mirrors RetrySettings.DEFAULT_RETRYABLE_METHODS. + private val IDEMPOTENT_METHODS: Set = + setOf(Method.GET, Method.HEAD, Method.OPTIONS, Method.PUT, Method.DELETE) + } +}