diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/AsyncHttpPipeline.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/AsyncHttpPipeline.kt index 2682a41..5b76289 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/AsyncHttpPipeline.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/AsyncHttpPipeline.kt @@ -56,7 +56,7 @@ public class AsyncHttpPipeline internal constructor( Futures.failed(e) } } - val state = AsyncPipelineCallState(this, request, httpClient) + val state = AsyncPipelineCallState(this, request) return AsyncPipelineNext(state).processAsync() } diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/AsyncHttpPipelineBuilder.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/AsyncHttpPipelineBuilder.kt index faf2878..52e4a0d 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/AsyncHttpPipelineBuilder.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/AsyncHttpPipelineBuilder.kt @@ -99,7 +99,7 @@ public class AsyncHttpPipelineBuilder(private val httpClient: AsyncHttpClient) { /** Builds an immutable [AsyncHttpPipeline]. */ public fun build(): AsyncHttpPipeline { val ordered = steps.flatten() - return AsyncHttpPipeline(httpClient, Array(ordered.size) { ordered[it] }) + return AsyncHttpPipeline(httpClient, ordered.toTypedArray()) } public companion object { diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/AsyncPipelineBridges.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/AsyncPipelineBridges.kt index e567af6..fd22d54 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/AsyncPipelineBridges.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/AsyncPipelineBridges.kt @@ -9,12 +9,12 @@ package org.dexpace.sdk.core.http.pipeline +import org.dexpace.sdk.core.client.AsyncHttpClient +import org.dexpace.sdk.core.client.asBlocking import org.dexpace.sdk.core.http.request.Request import org.dexpace.sdk.core.http.response.Response -import org.dexpace.sdk.core.util.Futures import java.io.InterruptedIOException import java.util.concurrent.CompletableFuture -import java.util.concurrent.ExecutionException import java.util.concurrent.Executor import java.util.concurrent.atomic.AtomicReference @@ -120,25 +120,5 @@ private fun sendInterruptibly( * The blocking wait honours `Thread.interrupt()`: interrupting the calling thread restores the * interrupt flag, cancels the in-flight future, and throws an [InterruptedIOException]. */ -public fun AsyncHttpPipeline.toBlocking(): HttpPipeline { - val async = this - return HttpPipeline.of { request -> - val future = async.sendAsync(request) - try { - future.get() - } catch (ie: InterruptedException) { - // `get()` parks interruptibly (unlike `join()`). Restore the interrupt flag, abort - // the in-flight send, and surface an InterruptedIOException so the caller's I/O - // error handling terminates cleanly. - Thread.currentThread().interrupt() - future.cancel(true) - val ioe = InterruptedIOException("Interrupted while waiting for response") - ioe.initCause(ie) - throw ioe - } catch (ee: ExecutionException) { - // `get()` wraps exceptional completion in ExecutionException; unwrap so callers' - // `catch (IOException)` sees the original failure rather than the JDK wrapper. - throw Futures.unwrap(ee) - } - } -} +public fun AsyncHttpPipeline.toBlocking(): HttpPipeline = + HttpPipeline.of(AsyncHttpClient { sendAsync(it) }.asBlocking()) diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/AsyncPipelineCallState.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/AsyncPipelineCallState.kt index f925b77..1cba85a 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/AsyncPipelineCallState.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/AsyncPipelineCallState.kt @@ -7,13 +7,11 @@ package org.dexpace.sdk.core.http.pipeline -import org.dexpace.sdk.core.client.AsyncHttpClient import org.dexpace.sdk.core.http.request.Request /** * Per-call mutable cursor over an [AsyncHttpPipeline]'s steps array. Async counterpart of - * [PipelineCallState]: holds the index of the next step to invoke, the in-flight [Request], - * and a reference to the [AsyncHttpClient] used when the cursor reaches the end. + * [PipelineCallState]: holds the index of the next step to invoke and the in-flight [Request]. * * Cloned via [copy] (exposed to user code through [AsyncPipelineNext.copy]) so async retry / * redirect steps can re-drive the downstream chain. Cloning copies the current index — the @@ -24,7 +22,6 @@ import org.dexpace.sdk.core.http.request.Request internal class AsyncPipelineCallState internal constructor( val pipeline: AsyncHttpPipeline, initialRequest: Request, - val httpClient: AsyncHttpClient, private var index: Int = 0, ) { /** @@ -42,5 +39,5 @@ internal class AsyncPipelineCallState internal constructor( } /** Returns an independent state cloned at the current cursor position. */ - fun copy(): AsyncPipelineCallState = AsyncPipelineCallState(pipeline, request, httpClient, index) + fun copy(): AsyncPipelineCallState = AsyncPipelineCallState(pipeline, request, index) } diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/AsyncPipelineNext.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/AsyncPipelineNext.kt index 3913e3a..b109f67 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/AsyncPipelineNext.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/AsyncPipelineNext.kt @@ -34,7 +34,7 @@ public class AsyncPipelineNext internal constructor(private val state: AsyncPipe val nextStep = state.advance() return try { if (nextStep == null) { - state.httpClient.executeAsync(state.request) + state.pipeline.httpClient.executeAsync(state.request) } else { nextStep.processAsync(state.request, this) } diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/HttpPipeline.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/HttpPipeline.kt index d830fdd..1253eb4 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/HttpPipeline.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/HttpPipeline.kt @@ -40,7 +40,7 @@ public class HttpPipeline internal constructor( @Throws(IOException::class) public fun send(request: Request): Response { if (stepArray.isEmpty()) return httpClient.execute(request) - val state = PipelineCallState(this, request, httpClient) + val state = PipelineCallState(this, request) return PipelineNext(state).process() } diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/HttpPipelineBuilder.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/HttpPipelineBuilder.kt index de2344b..b91e9ff 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/HttpPipelineBuilder.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/HttpPipelineBuilder.kt @@ -113,7 +113,7 @@ public class HttpPipelineBuilder(private val httpClient: HttpClient) { */ public fun build(): HttpPipeline { val ordered = steps.flatten() - return HttpPipeline(httpClient, Array(ordered.size) { ordered[it] }) + return HttpPipeline(httpClient, ordered.toTypedArray()) } public companion object { diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/PipelineCallState.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/PipelineCallState.kt index 7632fb2..c748afd 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/PipelineCallState.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/PipelineCallState.kt @@ -12,8 +12,7 @@ import org.dexpace.sdk.core.http.request.Request /** * Per-call mutable cursor over a [HttpPipeline]'s steps array. Holds the index of the - * next step to invoke, the originating [Request], and a reference to the [HttpClient] used - * when the cursor reaches the end. + * next step to invoke and the originating [Request]. * * Cloned via [copy] (exposed to user code through [PipelineNext.copy]) so retry / redirect * steps can re-drive the downstream chain. Cloning copies the current index — the new @@ -27,7 +26,6 @@ import org.dexpace.sdk.core.http.request.Request internal class PipelineCallState internal constructor( val pipeline: HttpPipeline, initialRequest: Request, - val httpClient: HttpClient, private var index: Int = 0, ) { /** @@ -51,5 +49,5 @@ internal class PipelineCallState internal constructor( } /** Returns an independent state cloned at the current cursor position. */ - fun copy(): PipelineCallState = PipelineCallState(pipeline, request, httpClient, index) + fun copy(): PipelineCallState = PipelineCallState(pipeline, request, index) } diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/PipelineNext.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/PipelineNext.kt index c584001..62c3d4b 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/PipelineNext.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/PipelineNext.kt @@ -27,7 +27,7 @@ public class PipelineNext internal constructor(private val state: PipelineCallSt public fun process(): Response { val nextStep = state.advance() return if (nextStep == null) { - state.httpClient.execute(state.request) + state.pipeline.httpClient.execute(state.request) } else { nextStep.process(state.request, this) } diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/Stage.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/Stage.kt index 1e30ecb..0c39dc7 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/Stage.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/Stage.kt @@ -17,8 +17,9 @@ package org.dexpace.sdk.core.http.pipeline * Sparse [order] values (100s apart) leave room to insert new stages later without * renumbering existing ones. * - * @property order Run-order key used by [HttpPipelineBuilder.build] to emit steps; lower - * values run first. + * @property order Stable numeric identity for the stage; ascends with declaration order. + * The builder emits steps in declaration order (`Stage.entries`), not by reading this + * value — it exists as a stable, sortable inspection key for callers. * @property isPillar True if the stage admits at most one step (singleton). False for * user-extensible stages backed by an ordered deque. */ diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/StagedSteps.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/StagedSteps.kt index 0dd71dc..6793d88 100644 --- a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/StagedSteps.kt +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/StagedSteps.kt @@ -84,14 +84,7 @@ internal class StagedSteps( fun reload(steps: List) { perStage.clear() pillars.clear() - for (s in steps) { - val stage = stageOf(s) - if (stage.isPillar) { - installPillar(s, stage) - } else { - perStage.getOrPut(stage) { ArrayDeque() }.addLast(s) - } - } + steps.forEach(::append) } /** diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/HttpPipelineTest.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/HttpPipelineTest.kt index c3e5e87..2b0dc8d 100644 --- a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/HttpPipelineTest.kt +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/http/pipeline/HttpPipelineTest.kt @@ -88,7 +88,7 @@ class HttpPipelineTest { val order = mutableListOf() val pipeline = HttpPipelineBuilder(client) - // Add in reverse-of-stage order to prove the builder sorts by Stage.order. + // Add in reverse declaration order to prove the builder emits in Stage declaration order. .append(TaggingStep(Stage.PRE_SEND, "pre-send", order)) .append(TaggingStep(Stage.POST_AUTH, "post-auth", order)) .append(TaggingStep(Stage.PRE_AUTH, "pre-auth", order))