Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,23 +101,17 @@ public abstract class AsyncAuthStep : AsyncHttpStep {
return Futures.failed(t)
}

return challengeFuture.handle { retryRequest, hookError ->
HookOutcome(retryRequest, hookError)
}.thenCompose { outcome ->
val hookError = outcome.error
return challengeFuture.handle<CompletableFuture<Response>> { retryRequest, hookError ->
if (hookError != null) {
response.close()
return@thenCompose Futures.failed<Response>(Futures.unwrap(hookError))
return@handle Futures.failed<Response>(Futures.unwrap(hookError))
}
val retryRequest = outcome.request ?: return@thenCompose CompletableFuture.completedFuture(response)
if (retryRequest == null) return@handle CompletableFuture.completedFuture(response)
response.close()
next.copy().processAsync(retryRequest)
}
}.thenCompose { it }
}

/** Carrier so the challenge future's outcome (value or error) survives [CompletableFuture.handle]. */
private class HookOutcome(val request: Request?, val error: Throwable?)

/**
* Returns a future of [request] with the credential's auth header attached. Subclasses
* implement the concrete async stamping (e.g. fetch-or-refresh a bearer token off-thread,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.dexpace.sdk.core.http.pipeline.steps

import org.dexpace.sdk.core.http.auth.AuthChallengeParser
import org.dexpace.sdk.core.http.auth.BearerToken
import org.dexpace.sdk.core.http.auth.BearerTokenProvider
import org.dexpace.sdk.core.http.common.HttpHeaderName
Expand Down Expand Up @@ -255,15 +254,6 @@ public open class AsyncBearerTokenAuthStep
return nonNull
}

/**
* Returns `true` when [response]'s `WWW-Authenticate` header advertises a `Bearer`
* challenge.
*/
private fun offersBearerChallenge(response: Response): Boolean {
val header = response.headers.get(HttpHeaderName.WWW_AUTHENTICATE) ?: return false
return AuthChallengeParser.parse(header).any { it.scheme == BEARER_SCHEME }
}

/**
* Clears [cachedToken] iff it is still the token whose stamped header is [rejectedHeader].
* Guarded by the same [lock] as the fetch path so the read-compare-clear is atomic against
Expand All @@ -287,6 +277,5 @@ public open class AsyncBearerTokenAuthStep

private companion object {
private const val DEFAULT_REFRESH_MARGIN_SECONDS = 30L
private const val BEARER_SCHEME = "bearer"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,6 @@ public open class BearerTokenAuthStep
return authorizeRequest(request)
}

/**
* Returns `true` when [response]'s `WWW-Authenticate` header advertises a `Bearer`
* challenge. A header with only non-bearer challenges (or one that does not parse) returns
* `false`. [AuthStep] guarantees the header is present before this hook runs; the explicit
* null-guard keeps the method correct if called from elsewhere.
*/
private fun offersBearerChallenge(response: Response): Boolean {
val header = response.headers.get(HttpHeaderName.WWW_AUTHENTICATE) ?: return false
return AuthChallengeParser.parse(header).any { it.scheme == BEARER_SCHEME }
}

/**
* Clears [cachedToken] iff it is still the token whose stamped header is [rejectedHeader].
* Guarded by the same [lock] as the refresh path so the read-compare-clear is atomic
Expand Down Expand Up @@ -224,9 +213,21 @@ public open class BearerTokenAuthStep
// Default refresh margin: refresh the bearer token 30 seconds before its expiry
// so an in-flight request never carries a near-expired credential.
private const val DEFAULT_REFRESH_MARGIN_SECONDS = 30L

// Lower-cased `Bearer` scheme name; AuthChallengeParser normalises schemes to lower
// case, so the eviction gate compares against this constant.
private const val BEARER_SCHEME = "bearer"
}
}

/**
* Returns `true` when [response]'s `WWW-Authenticate` header advertises a `Bearer` challenge. A
* header with only non-bearer challenges (or one that does not parse) returns `false`. The
* [AuthStep] / [AsyncAuthStep] pillar guarantees the header is present before the challenge hook
* runs; the explicit null-guard keeps the function correct if called from elsewhere. Shared by
* [BearerTokenAuthStep] and [AsyncBearerTokenAuthStep].
*/
internal fun offersBearerChallenge(response: Response): Boolean {
val header = response.headers.get(HttpHeaderName.WWW_AUTHENTICATE) ?: return false
return AuthChallengeParser.parse(header).any { it.scheme == BEARER_SCHEME }
}

// Lower-cased `Bearer` scheme name; AuthChallengeParser normalises schemes to lower case, so the
// eviction gate compares against this constant.
internal const val BEARER_SCHEME: String = "bearer"
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public class DefaultAsyncInstrumentationStep
// .handle do NOT get MDC by default. We capture the MDC snapshot INSIDE the
// use{} block (after makeCurrentWithLoggingContext has pushed trace.id / span.id)
// and restore it in each branch of the .handle callback.
var mdc: MdcSnapshot = MdcSnapshot.capture() // will be overwritten inside the scope
lateinit var mdc: MdcSnapshot
val downstream: CompletableFuture<Response> =
span.makeCurrentWithLoggingContext().use {
// Capture after the scope has pushed trace.id / span.id so the snapshot carries them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package org.dexpace.sdk.core.http.pipeline.steps

import org.dexpace.sdk.core.http.common.HttpHeaderName
import org.dexpace.sdk.core.http.pipeline.AsyncPipelineNext
import org.dexpace.sdk.core.http.request.Method
import org.dexpace.sdk.core.http.request.Request
import org.dexpace.sdk.core.http.response.Response
import org.dexpace.sdk.core.instrumentation.ClientLogger
Expand Down Expand Up @@ -103,30 +102,20 @@ public open class DefaultAsyncRetryStep
private val clock: Clock = Clock.SYSTEM,
internal val logger: ClientLogger = ClientLogger(DefaultAsyncRetryStep::class),
) : AsyncRetryStep() {
/** Effective options. `maxRetries < 0` is clamped to [DefaultRetryStep.DEFAULT_MAX_RETRIES]. */
private val options: HttpRetryOptions = clampOptions(options)

/**
* The [options]' exponential parameters as a [RetrySettings] view so the shared
* [BackoffCalculator] computes this stack's schedule — built once, exactly as
* [DefaultRetryStep.backoffSettings]. `totalTimeout = ZERO` disables the deadline cap.
* Building it eagerly validates the delay magnitudes at construction.
* The stateless retry policy shared with [DefaultRetryStep]: the clamped [HttpRetryOptions]
* (`maxRetries < 0` → [DefaultRetryStep.DEFAULT_MAX_RETRIES]), the [RetrySettings] backoff
* view, and the re-sendability / predicate / delay helpers both stacks share. Built once;
* immutable after construction, so it is safe to share across this step's concurrent calls.
*/
private val backoffSettings: RetrySettings =
RetrySettings.builder()
.initialDelay(this.options.baseDelay)
.maxDelay(this.options.maxDelay)
.delayMultiplier(RetrySettings.DEFAULT_DELAY_MULTIPLIER)
.jitter(RetrySettings.DEFAULT_JITTER)
.totalTimeout(Duration.ZERO)
.build()
private val support = RetryPolicySupport(options, logger)

override fun processAsync(
request: Request,
next: AsyncPipelineNext,
): CompletableFuture<Response> {
val result = CompletableFuture<Response>()
val driver = RetryDriver(next, isRetrySafe(request), result)
val driver = RetryDriver(next, support.isRetrySafe(request), result)
driver.drive()
return result
}
Expand Down Expand Up @@ -234,7 +223,7 @@ public open class DefaultAsyncRetryStep
try {
val retry =
retrySafe &&
tryCount < options.maxRetries &&
tryCount < support.options.maxRetries &&
shouldRetryResponse(response)
if (!retry) {
// Not retrying: hand the still-open response to the caller, who then
Expand Down Expand Up @@ -288,14 +277,14 @@ public open class DefaultAsyncRetryStep
// pre-classification interrupt carve-out.
if (exception is InterruptedIOException || exception is InterruptedException) {
Thread.currentThread().interrupt()
failTerminally(asInterruptedIo(exception))
failTerminally(support.asInterruptedIo(exception))
return
}
val delay: Duration =
try {
val retry =
retrySafe &&
tryCount < options.maxRetries &&
tryCount < support.options.maxRetries &&
shouldRetryException(exception)
if (!retry) {
failTerminally(exception)
Expand Down Expand Up @@ -371,12 +360,12 @@ public open class DefaultAsyncRetryStep

private fun shouldRetryResponse(response: Response): Boolean {
val condition = HttpRetryCondition(response, null, tryCount, (suppressed ?: emptyList()))
return invokeShouldRetry(options.shouldRetryCondition, condition)
return support.invokeShouldRetry(support.options.shouldRetryCondition, condition)
}

private fun shouldRetryException(exception: Exception): Boolean {
val condition = HttpRetryCondition(null, exception, tryCount, (suppressed ?: emptyList()))
return invokeShouldRetry(options.shouldRetryException, condition)
return support.invokeShouldRetry(support.options.shouldRetryException, condition)
}

// --------------- Logging ---------------
Expand Down Expand Up @@ -405,53 +394,6 @@ public open class DefaultAsyncRetryStep
}
}

// --------------- Shared helpers (stateless across calls) ---------------

private fun isRetrySafe(request: Request): Boolean {
val body = request.body ?: return request.method in IDEMPOTENT_METHODS
return body.isReplayable()
}

/**
* Normalises an interrupt-signalling exception to [InterruptedIOException]: an
* [InterruptedIOException] is returned as-is; a bare [InterruptedException] is wrapped with
* the original attached as its cause. Mirrors [DefaultRetryStep]'s helper of the same name.
*/
private fun asInterruptedIo(exception: Exception): InterruptedIOException =
when (exception) {
is InterruptedIOException -> exception
else -> InterruptedIOException("retry interrupted").apply { initCause(exception) }
}

private fun invokeShouldRetry(
predicate: HttpRetryConditionPredicate,
condition: HttpRetryCondition,
): Boolean =
try {
predicate.shouldRetry(condition)
} catch (t: Throwable) {
@Suppress("InstanceOfCheckForException")
if (t is Error) throw t
throw IllegalStateException("shouldRetry predicate threw", t)
}

private fun invokeDelayFromCondition(condition: HttpRetryCondition): Duration? =
try {
options.delayFromCondition.delayFor(condition)
} catch (t: Throwable) {
@Suppress("InstanceOfCheckForException")
if (t is Error) throw t
logger.atWarning()
.event("http.retry.delay_override_failed")
.field("error.type", t::class.java.simpleName ?: "Throwable")
.cause(t)
.log()
null
}

private fun backoffOrFixed(tryCount: Int): Duration =
options.fixedDelay ?: BackoffCalculator.computeDelay(tryCount + 1, backoffSettings)

// --------------- Delay computation (subclass extension points) ---------------

/**
Expand All @@ -467,9 +409,9 @@ public open class DefaultAsyncRetryStep
* by the loop's close-on-throw guard).
*/
protected open fun computeResponseDelay(condition: HttpRetryCondition): Duration {
invokeDelayFromCondition(condition)?.let { return it }
support.invokeDelayFromCondition(condition)?.let { return it }
condition.response?.let { retryAfterFromHeaders(it) }?.let { return it }
return backoffOrFixed(condition.tryCount)
return support.backoffOrFixed(condition.tryCount)
}

/**
Expand All @@ -478,8 +420,8 @@ public open class DefaultAsyncRetryStep
* with the same invariants. Mirrors [DefaultRetryStep.computeExceptionDelay].
*/
protected open fun computeExceptionDelay(condition: HttpRetryCondition): Duration {
invokeDelayFromCondition(condition)?.let { return it }
return backoffOrFixed(condition.tryCount)
support.invokeDelayFromCondition(condition)?.let { return it }
return support.backoffOrFixed(condition.tryCount)
}

/**
Expand All @@ -490,7 +432,7 @@ public open class DefaultAsyncRetryStep
*/
protected open fun retryAfterFromHeaders(response: Response): Duration? {
val now = clock.now()
for (name in options.retryAfterHeaders) {
for (name in support.options.retryAfterHeaders) {
val raw = response.headers.get(name) ?: continue
RetryAfterParser.parseHeaderValue(name, raw, now)?.let { return it }
}
Expand All @@ -514,32 +456,8 @@ public open class DefaultAsyncRetryStep
}
}

private fun clampOptions(opts: HttpRetryOptions): HttpRetryOptions {
if (opts.maxRetries >= 0) return opts
logger.atVerbose()
.event("http.retry.maxRetries_clamped")
.field("http.retry.max_retries.requested", opts.maxRetries.toLong())
.field("http.retry.max_retries.applied", DefaultRetryStep.DEFAULT_MAX_RETRIES.toLong())
.log()
return HttpRetryOptions(
maxRetries = DefaultRetryStep.DEFAULT_MAX_RETRIES,
baseDelay = opts.baseDelay,
maxDelay = opts.maxDelay,
fixedDelay = opts.fixedDelay,
retryAfterHeaders = opts.retryAfterHeaders,
shouldRetryCondition = opts.shouldRetryCondition,
shouldRetryException = opts.shouldRetryException,
delayFromCondition = opts.delayFromCondition,
)
}

public companion object {
// Nanoseconds in one millisecond — converts monotonic deltas to ms for log events.
private const val NANOS_PER_MILLI = 1_000_000L

// Methods safe to re-send regardless of body replayability (idempotent per RFC 9110).
// Mirrors DefaultRetryStep.IDEMPOTENT_METHODS / RetrySettings.DEFAULT_RETRYABLE_METHODS.
private val IDEMPOTENT_METHODS: Set<Method> =
setOf(Method.GET, Method.HEAD, Method.OPTIONS, Method.PUT, Method.DELETE)
}
}
Loading
Loading