From 37c4114b2b2f99014908db353bcfc8db0f0a94df Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Fri, 26 Jun 2026 01:04:22 +0300 Subject: [PATCH] chore: collapse duplicated adapter idioms in sdk-io-okio3 and sdk-async-reactor Three behavior-preserving internal refactors that remove copy-pasted blocks so each shared contract lives in one place: - SlicedOkioBufferedSource: extract a private drainRemaining() helper for the three full-slice reads (readByteArray/readUtf8/readString), which differed only in how they decode the drained bytes. checkOpen() still runs ahead of the drain in every override. - sdk-io-okio3 adapters: lift the identity-keyed "wrap once per okio.Buffer" cache out of ForeignSourceAdapter and ForeignSinkAdapter into a small per-adapter OkioBufferWrapperCache, so the wrapper-reuse rationale and lookup live once. Each adapter keeps its own instance; no shared state. - Reactor: extract deferMono()/logEvent() from executeMono/sendMono, which were the same Mono.defer bridge differing only in executeAsync vs sendAsync. MDC is still captured per subscription, coldness and cancellation are unchanged. Public surface is untouched, so apiCheck stays green with no apiDump. --- .../org/dexpace/sdk/async/reactor/Reactor.kt | 65 ++++++++----------- .../sdk/io/internal/ForeignSinkAdapter.kt | 14 +--- .../sdk/io/internal/ForeignSourceAdapter.kt | 16 +---- .../sdk/io/internal/OkioBufferWrapperCache.kt | 31 +++++++++ .../io/internal/SlicedOkioBufferedSource.kt | 24 ++++--- 5 files changed, 75 insertions(+), 75 deletions(-) create mode 100644 sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/OkioBufferWrapperCache.kt diff --git a/sdk-async-reactor/src/main/kotlin/org/dexpace/sdk/async/reactor/Reactor.kt b/sdk-async-reactor/src/main/kotlin/org/dexpace/sdk/async/reactor/Reactor.kt index f86f6bdd..fe9db629 100644 --- a/sdk-async-reactor/src/main/kotlin/org/dexpace/sdk/async/reactor/Reactor.kt +++ b/sdk-async-reactor/src/main/kotlin/org/dexpace/sdk/async/reactor/Reactor.kt @@ -20,6 +20,7 @@ import org.dexpace.sdk.core.instrumentation.MdcSnapshot import org.dexpace.sdk.core.io.BufferedSource import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import java.util.concurrent.CompletableFuture private val log = ClientLogger("org.dexpace.sdk.async.reactor.Reactor") @@ -43,27 +44,7 @@ private val log = ClientLogger("org.dexpace.sdk.async.reactor.Reactor") * To extend MDC propagation through user-supplied downstream operators, enable * `Hooks.enableAutomaticContextPropagation()` at the application level. */ -public fun AsyncHttpClient.executeMono(request: Request): Mono = - Mono.defer { - val mdc = MdcSnapshot.capture() - Mono.fromFuture { mdc.withMdc { executeAsync(request) } } - .doOnSubscribe { - mdc.withMdc { - log.atVerbose() - .event("async.adapter.subscribed") - .field("adapter.type", "reactor") - .log() - } - } - .doOnCancel { - mdc.withMdc { - log.atVerbose() - .event("async.adapter.cancel_propagated") - .field("adapter.type", "reactor") - .log() - } - } - } +public fun AsyncHttpClient.executeMono(request: Request): Mono = deferMono { executeAsync(request) } /** * Pipeline-level [Mono] facade — see [executeMono]. @@ -79,27 +60,33 @@ public fun AsyncHttpClient.executeMono(request: Request): Mono = * To extend MDC propagation through user-supplied downstream operators, enable * `Hooks.enableAutomaticContextPropagation()` at the application level. */ -public fun AsyncHttpPipeline.sendMono(request: Request): Mono = +public fun AsyncHttpPipeline.sendMono(request: Request): Mono = deferMono { sendAsync(request) } + +/** + * Bridges a [CompletableFuture]-returning [supplier] to a cold [Mono]. Each subscription runs + * the supplier afresh under [Mono.defer], captures the subscriber's MDC, and reinstates it for + * the future-producing call and for both lifecycle log hooks. Cancelling the subscription + * cancels the underlying future through [Mono.fromFuture]. + */ +private fun deferMono(supplier: () -> CompletableFuture): Mono = Mono.defer { val mdc = MdcSnapshot.capture() - Mono.fromFuture { mdc.withMdc { sendAsync(request) } } - .doOnSubscribe { - mdc.withMdc { - log.atVerbose() - .event("async.adapter.subscribed") - .field("adapter.type", "reactor") - .log() - } - } - .doOnCancel { - mdc.withMdc { - log.atVerbose() - .event("async.adapter.cancel_propagated") - .field("adapter.type", "reactor") - .log() - } - } + Mono.fromFuture { mdc.withMdc { supplier() } } + .doOnSubscribe { logEvent(mdc, "async.adapter.subscribed") } + .doOnCancel { logEvent(mdc, "async.adapter.cancel_propagated") } + } + +private fun logEvent( + mdc: MdcSnapshot, + event: String, +) { + mdc.withMdc { + log.atVerbose() + .event(event) + .field("adapter.type", "reactor") + .log() } +} /** * Exposes the SSE event stream as a Reactor [Flux]. Backpressure is honored via diff --git a/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/ForeignSinkAdapter.kt b/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/ForeignSinkAdapter.kt index e374d94b..2eb559f7 100644 --- a/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/ForeignSinkAdapter.kt +++ b/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/ForeignSinkAdapter.kt @@ -22,23 +22,13 @@ import org.dexpace.sdk.core.io.Sink * wrapper field is read/written without synchronization. */ internal class ForeignSinkAdapter(private val delegate: Sink) : okio.Sink { - private var cachedBuffer: okio.Buffer? = null - private var cachedWrapper: OkioBuffer? = null + private val wrappers = OkioBufferWrapperCache() override fun write( source: okio.Buffer, byteCount: Long, ) { - // Cache the OkioBuffer wrapper keyed by reference identity of the okio.Buffer Okio - // passes us. Okio reuses the same source for a buffered producer's lifetime, so this - // amortizes wrapper allocation to once per producer. - val wrapper = - cachedWrapper.takeIf { source === cachedBuffer } - ?: OkioBuffer(source).also { - cachedBuffer = source - cachedWrapper = it - } - delegate.write(wrapper, byteCount) + delegate.write(wrappers.wrap(source), byteCount) } override fun flush() { diff --git a/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/ForeignSourceAdapter.kt b/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/ForeignSourceAdapter.kt index 98eed474..423a9065 100644 --- a/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/ForeignSourceAdapter.kt +++ b/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/ForeignSourceAdapter.kt @@ -23,24 +23,12 @@ import org.dexpace.sdk.core.io.Source * wrapper field is read/written without synchronization. */ internal class ForeignSourceAdapter(private val delegate: Source) : okio.Source { - private var cachedBuffer: okio.Buffer? = null - private var cachedWrapper: OkioBuffer? = null + private val wrappers = OkioBufferWrapperCache() override fun read( sink: okio.Buffer, byteCount: Long, - ): Long { - // Cache the OkioBuffer wrapper keyed by reference identity of the okio.Buffer Okio - // passes us. Okio reuses the same sink for a buffered consumer's lifetime, so this - // amortizes wrapper allocation to once per consumer. - val wrapper = - cachedWrapper.takeIf { sink === cachedBuffer } - ?: OkioBuffer(sink).also { - cachedBuffer = sink - cachedWrapper = it - } - return delegate.read(wrapper, byteCount) - } + ): Long = delegate.read(wrappers.wrap(sink), byteCount) override fun timeout(): okio.Timeout = okio.Timeout.NONE diff --git a/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/OkioBufferWrapperCache.kt b/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/OkioBufferWrapperCache.kt new file mode 100644 index 00000000..747ff3c9 --- /dev/null +++ b/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/OkioBufferWrapperCache.kt @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.io.internal + +/** + * Caches a single [OkioBuffer] wrapper keyed by the reference identity of the [okio.Buffer] it + * adapts. Okio hands a buffered consumer (or producer) the same buffer instance for its whole + * lifetime, so wrapping it once and reusing the wrapper amortizes allocation to once per + * consumer instead of once per chunk. + * + * ## Thread-safety + * + * Not safe for concurrent use — the cached fields are read/written without synchronization, + * matching the single-threaded contract of the Okio buffered source/sink that owns the cache. + */ +internal class OkioBufferWrapperCache { + private var cachedBuffer: okio.Buffer? = null + private var cachedWrapper: OkioBuffer? = null + + fun wrap(buffer: okio.Buffer): OkioBuffer = + cachedWrapper.takeIf { buffer === cachedBuffer } + ?: OkioBuffer(buffer).also { + cachedBuffer = buffer + cachedWrapper = it + } +} diff --git a/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/SlicedOkioBufferedSource.kt b/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/SlicedOkioBufferedSource.kt index f482f606..26648adb 100644 --- a/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/SlicedOkioBufferedSource.kt +++ b/sdk-io-okio3/src/main/kotlin/org/dexpace/sdk/io/internal/SlicedOkioBufferedSource.kt @@ -135,10 +135,7 @@ internal class SlicedOkioBufferedSource( @Throws(IOException::class) override fun readByteArray(): ByteArray { checkOpen() - if (!realizeOffset() || atEnd()) return EMPTY_BYTES - val bytes = readUpTo(remaining) - remaining -= bytes.size - return bytes + return drainRemaining() } @Throws(IOException::class) @@ -161,10 +158,7 @@ internal class SlicedOkioBufferedSource( @Throws(IOException::class) override fun readUtf8(): String { checkOpen() - if (!realizeOffset() || atEnd()) return "" - val bytes = readUpTo(remaining) - remaining -= bytes.size - return String(bytes, Charsets.UTF_8) + return String(drainRemaining(), Charsets.UTF_8) } @Throws(IOException::class) @@ -217,10 +211,20 @@ internal class SlicedOkioBufferedSource( @Throws(IOException::class) override fun readString(charset: Charset): String { checkOpen() - if (!realizeOffset() || atEnd()) return "" + return String(drainRemaining(), charset) + } + + /** + * Drains everything still inside the slice window as raw bytes, advancing [remaining]. + * Returns [EMPTY_BYTES] when the offset cannot be realized or the slice is already at its + * end; that empty array decodes to an empty string for the two text reads above. + */ + @Throws(IOException::class) + private fun drainRemaining(): ByteArray { + if (!realizeOffset() || atEnd()) return EMPTY_BYTES val bytes = readUpTo(remaining) remaining -= bytes.size - return String(bytes, charset) + return bytes } @Throws(IOException::class)