diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ExecutionPipeline.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ExecutionPipeline.kt index 482786c7..91bea356 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ExecutionPipeline.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ExecutionPipeline.kt @@ -109,15 +109,4 @@ public class ExecutionPipeline failureOf(t) } } - - /** - * Wraps [t] in a [ResponseOutcome.Failure]. [InterruptedException] preserves the interrupt - * flag on the current thread per the SDK's cancellation contract. - */ - private fun failureOf(t: Throwable): ResponseOutcome.Failure { - if (t is InterruptedException) { - Thread.currentThread().interrupt() - } - return ResponseOutcome.Failure(t) - } } diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ResponseOutcome.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ResponseOutcome.kt index db9829f3..2a140ebd 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ResponseOutcome.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ResponseOutcome.kt @@ -88,3 +88,17 @@ public sealed class ResponseOutcome { is Failure -> onFailure(error) } } + +/** + * Wraps [t] in a [ResponseOutcome.Failure]. When [t] is an [InterruptedException] the interrupt + * flag is restored on the current thread before wrapping, honouring the SDK's cancellation + * contract so a thread blocked on the surfaced outcome still observes the cancellation. Shared by + * [ExecutionPipeline], [ResponsePipeline], and [org.dexpace.sdk.core.pipeline.step.retry.RetryStep] + * so the interrupt-aware wrapper has exactly one definition. + */ +internal fun failureOf(t: Throwable): ResponseOutcome.Failure { + if (t is InterruptedException) { + Thread.currentThread().interrupt() + } + return ResponseOutcome.Failure(t) +} diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ResponsePipeline.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ResponsePipeline.kt index 6ebafd0e..c3b6b00b 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ResponsePipeline.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/ResponsePipeline.kt @@ -112,7 +112,7 @@ public class ResponsePipeline ResponseOutcome.Success(step.execute(inResponse, context)) } catch (t: Throwable) { closeQuietly(inResponse, t) - handleStepThrowable(t) + failureOf(t) } } is ResponseOutcome.Failure -> return inbound @@ -137,20 +137,9 @@ public class ResponsePipeline if (outcome is ResponseOutcome.Success) { closeQuietly(outcome.response, t) } - handleStepThrowable(t) + failureOf(t) } - /** - * Converts a step-raised throwable into a [ResponseOutcome.Failure]. [InterruptedException] - * preserves the interrupt flag on the current thread per the SDK's cancellation contract. - */ - private fun handleStepThrowable(t: Throwable): ResponseOutcome.Failure { - if (t is InterruptedException) { - Thread.currentThread().interrupt() - } - return ResponseOutcome.Failure(t) - } - /** * Closes [response], swallowing any close error so it never masks [primary]. A failure to * close is attached to [primary] as a suppressed throwable for diagnostics. diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetryAfterParser.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetryAfterParser.kt index 6f94ec95..1148c4b3 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetryAfterParser.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetryAfterParser.kt @@ -152,11 +152,10 @@ public object RetryAfterParser { parseNumericSeconds(retryAfter)?.let { return it } parseHttpDate(retryAfter, now)?.let { return it } } - headers.get(HEADER_RETRY_AFTER_MS)?.trim()?.let { value -> - if (value.isNotEmpty()) parseMillis(value)?.let { return it } - } - headers.get(HEADER_X_MS_RETRY_AFTER_MS)?.trim()?.let { value -> - if (value.isNotEmpty()) parseMillis(value)?.let { return it } + for (header in arrayOf(HEADER_RETRY_AFTER_MS, HEADER_X_MS_RETRY_AFTER_MS)) { + headers.get(header)?.trim()?.let { value -> + if (value.isNotEmpty()) parseMillis(value)?.let { return it } + } } val rateLimitReset = headers.get(HEADER_X_RATELIMIT_RESET)?.trim() if (!rateLimitReset.isNullOrEmpty()) { diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetrySettings.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetrySettings.kt index 9d29be75..f936b0bb 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetrySettings.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetrySettings.kt @@ -128,23 +128,33 @@ public class RetrySettings this.attemptHeaderName = settings.attemptHeaderName } + /** + * Validates a [Duration] setting: it must be non-negative and small enough that the + * backoff math can convert it to nanoseconds without overflowing. Shared by the + * [totalTimeout], [initialDelay], and [maxDelay] setters; [name] names the offending + * field in the rejection message. + */ + private fun requireRepresentable( + name: String, + value: Duration, + ) { + require(!value.isNegative) { "$name must be non-negative" } + require(value <= MAX_NANO_REPRESENTABLE_DELAY) { + "$name must be representable in nanoseconds (≤ ~292 years); got $value" + } + } + /** Sets [RetrySettings.totalTimeout]. Must be non-negative. */ public fun totalTimeout(totalTimeout: Duration): RetrySettingsBuilder = apply { - require(!totalTimeout.isNegative) { "totalTimeout must be non-negative" } - require(totalTimeout <= MAX_NANO_REPRESENTABLE_DELAY) { - "totalTimeout must be representable in nanoseconds (≤ ~292 years); got $totalTimeout" - } + requireRepresentable("totalTimeout", totalTimeout) this.totalTimeout = totalTimeout } /** Sets [RetrySettings.initialDelay]. Must be non-negative. */ public fun initialDelay(initialDelay: Duration): RetrySettingsBuilder = apply { - require(!initialDelay.isNegative) { "initialDelay must be non-negative" } - require(initialDelay <= MAX_NANO_REPRESENTABLE_DELAY) { - "initialDelay must be representable in nanoseconds (≤ ~292 years); got $initialDelay" - } + requireRepresentable("initialDelay", initialDelay) this.initialDelay = initialDelay } @@ -158,10 +168,7 @@ public class RetrySettings /** Sets [RetrySettings.maxDelay]. Must be non-negative. */ public fun maxDelay(maxDelay: Duration): RetrySettingsBuilder = apply { - require(!maxDelay.isNegative) { "maxDelay must be non-negative" } - require(maxDelay <= MAX_NANO_REPRESENTABLE_DELAY) { - "maxDelay must be representable in nanoseconds (≤ ~292 years); got $maxDelay" - } + requireRepresentable("maxDelay", maxDelay) this.maxDelay = maxDelay } diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetryStep.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetryStep.kt index dd262950..4ccb3ba6 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetryStep.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pipeline/step/retry/RetryStep.kt @@ -14,6 +14,7 @@ import org.dexpace.sdk.core.http.response.exception.HttpException import org.dexpace.sdk.core.http.response.exception.NetworkException import org.dexpace.sdk.core.http.response.exception.Retryable import org.dexpace.sdk.core.pipeline.ResponseOutcome +import org.dexpace.sdk.core.pipeline.failureOf import org.dexpace.sdk.core.pipeline.step.ResponseRecoveryStep import java.io.InterruptedIOException import java.time.Clock @@ -289,11 +290,8 @@ public class RetryStep private fun executeOnce(attemptOrdinal: Int): ResponseOutcome = try { ResponseOutcome.Success(httpClient.execute(stampAttempt(request, attemptOrdinal))) - } catch (e: InterruptedException) { - Thread.currentThread().interrupt() - ResponseOutcome.Failure(e) } catch (t: Throwable) { - ResponseOutcome.Failure(t) + failureOf(t) } /** @@ -362,10 +360,7 @@ public class RetryStep * process-wide scheduler is a companion `by lazy` (SYNCHRONIZED), so it is initialised * at most once across the whole VM — no per-instance guard is involved. */ - private fun resolveScheduler(): ScheduledExecutorService { - settings.scheduler?.let { return it } - return DEFAULT_SCHEDULER - } + private fun resolveScheduler(): ScheduledExecutorService = settings.scheduler ?: DEFAULT_SCHEDULER /** * Returns true when [error] is an SDK-classified retryable condition. Classification