Skip to content

feat(publisher): structured StorageACK declines instead of stream resets#559

Open
branarakic wants to merge 5 commits into
mainfrom
feat/publisher-storage-ack-typed-declines
Open

feat(publisher): structured StorageACK declines instead of stream resets#559
branarakic wants to merge 5 commits into
mainfrom
feat/publisher-storage-ack-typed-declines

Conversation

@branarakic
Copy link
Copy Markdown
Contributor

Summary

Today, when a core node legitimately can't ACK a publish — it doesn't host the CG, its SWM is missing or stale, or its operational signer was just rotated — the StorageACK handler throws. libp2p resets the stream, the publisher sees a generic IO error, and the ACK collector retries the same peer 3× with exponential backoff before giving up. The publisher's final error reads as a stream-reset / timeout with no per-peer reason — that's the surface shape behind GitHub issue #541.

This PR introduces a typed-decline path so the receive side can say "I can't ACK this, here's why" in band, and the publisher can deselect that peer without retries and surface the reason to the operator.

Companion to PR #556 (publisher-side hosting filter); independently useful even if #556 doesn't land.

What's in this PR

Wire — packages/core/src/proto/storage-ack.ts

  • Adds two optional string fields to the StorageACK proto: declineCode (field 6) and declineMessage (field 7).
  • Strictly additive: old encoders never set them, old decoders ignore them, so cross-version traffic is byte-identical to today.
  • New decline codes — NOT_HOSTED, NO_DATA_IN_SWM, MERKLE_MISMATCH_IN_SWM, SIGNER_NOT_REGISTERED, CG_ID_INVALID — live in STORAGE_ACK_DECLINE_CODES and are exported from @origintrail-official/dkg-core. Helper isStorageACKDecline(msg) codifies the empty-string-vs-undefined check protobufjs uses for unset string fields.

Receive side — packages/publisher/src/storage-ack-handler.ts

Converts four graceful-throw paths to return encodeDecline(...):

  • NO_DATA_IN_SWM — SWM CONSTRUCT returned no quads (the literal RepNet CG15 publish cannot collect identity 1 ACK while public graph can #541 case)
  • MERKLE_MISMATCH_IN_SWM — core has data, publisher's merkle root doesn't match (decline so the publisher tries another core; inline-staging mismatch keeps throw because it's a publisher bug, not a network mismatch)
  • CG_ID_INVALID — non-numeric or non-positive cgId
  • SIGNER_NOT_REGISTEREDisSignerRegistered=false

True protocol violations keep throw (oversize / unparseable staging, kaCount mismatch, leaf-count mismatch, edge-node-tried-to-ACK, identity > 2^64). Those still reset the stream so we don't keep the connection open on bad input.

Send side — packages/publisher/src/ack-collector.ts

  • After decodeStorageACK, checks isStorageACKDecline. If declining: log per-peer reason, no retry, return null.
  • Tracks per-peer decline reasons; appends a Declines: peerX→NO_DATA_IN_SWM (no swm data for repnet-v2-official); peerY→MERKLE_MISMATCH_IN_SWM (...) summary to the final storage_ack_insufficient error so operators see exactly why quorum failed from a single log line.

Backward compatibility

Every cross-version pair stays correct:

  • Old core ↔ new publisher — old core never emits decline fields → publisher sees a normal ACK or a thrown-and-reset stream (legacy retry path). No regression.
  • New core ↔ old publisher — new core only emits decline fields for the four converted paths; old publisher's decoder silently ignores them. The signature/merkleRoot fields are zero-length on declines, so the old publisher's recoverACKSigner returns null, the ACK is rejected as invalid (same as today's failure mode for these cases), and the collector moves on. No regression.
  • New ↔ new — full benefit: per-peer decline reasons in the final error, no retries against declining peers.

Test plan

  • pnpm --filter @origintrail-official/dkg-core build + test — green (new round-trip + forward-compat tests for the proto change)
  • pnpm --filter @origintrail-official/dkg-publisher build — green
  • pnpm --filter @origintrail-official/dkg-publisher test — 918 passed / 1 skipped (62 files), all green
  • pnpm --filter @origintrail-official/dkg-agent build — green
  • pnpm --filter "./packages/cli" build — green
  • Proto round-trip (packages/core/test/v10-proto.test.ts):
    • decline-only message round-trips through encode/decode with empty signature/merkleRoot
    • old-shape encoded bytes still decode correctly (no spurious decline fields)
    • new-encoder produces identical bytes for the legacy ACK path (deterministic forward compat)
  • Receive side (packages/publisher/test/v10-ack-edge-cases.test.ts + storage-ack-handler.test.ts + v10-protocol-operations.test.ts):
    • NO_DATA_IN_SWM decline returned (was throw)
    • MERKLE_MISMATCH_IN_SWM decline returned (was throw)
    • CG_ID_INVALID decline returned (two variants: non-numeric, "0")
    • SIGNER_NOT_REGISTERED decline returned (was throw); onSignerUnregistered callback still fires
    • inline-staging merkle mismatch still throws (publisher protocol bug — connection-level reset is appropriate)
  • Send side (packages/publisher/test/v10-ack-edge-cases.test.ts):
    • quorum still reachable when 2 of 5 peers decline (declining peers NOT retried, sendCounts pinned to 1)
    • final storage_ack_insufficient error names every per-peer reason with both code and message
    • unknown decline code is logged + skipped (forward compat with future codes)
    • empty declineMessage produces clean error formatting (no () artefact)

Related

Made with Cursor

Comment thread packages/publisher/src/ack-collector.ts Outdated
]);

if (collected.length < REQUIRED_ACKS) {
let detail = '';
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug: This only adds decline detail after every peer request has settled. In the mixed case this PR is trying to improve (for example 2 peers decline and 1 remaining peer hangs), collect() still waits the full ACK_TIMEOUT_MS and returns storage_ack_timeout, so the new per-peer decline reasons never surface. Track how many peers are still capable of producing an ACK and fail fast with storage_ack_insufficient once quorum becomes impossible.

* Encode a structured decline response. Used in place of `throw` for
* the subset of failures that represent "I as a core legitimately
* cannot ACK this request right now" — most importantly the
* `<contextGraphsServed>` mismatch that GitHub issue #541 stalls on,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Issue: this docstring now says <contextGraphsServed> mismatches are handled via typed declines, but the implementation below never checks whether the peer actually serves the requested context graph and never emits NOT_HOSTED. As written, an unserved graph still collapses into NO_DATA_IN_SWM, which loses the distinction this PR is trying to add. Either wire the host-check in this PR or defer documenting/exporting that decline code until it can be produced.

expect(new Uint8Array(decoded.coreNodeSignatureR).length).toBe(0);
});

it('a new decoder reading bytes from an old encoder still yields a valid ACK (forward compat)', () => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Issue: this test title claims forward compatibility with an old encoder, but it still generates the wire bytes with the current schema and simply leaves the new fields unset. That will pass even if a future protobufjs/schema change breaks decoding of historical payloads. Consider pinning a literal pre-change byte sequence (or generating one with the old schema) so the compatibility guarantee is actually exercised.

return this.encodeDecline(
cgId,
STORAGE_ACK_DECLINE_CODES.NO_DATA_IN_SWM,
`No data found in SWM graph ${swmGraphUri} for entities: ${intent.rootEntities.join(', ')}`,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Issue: this decline message includes every requested root entity. On large publishFromSharedMemory batches that can make the /storage-ack reply itself very large, which risks turning the new in-band decline back into a transport-level failure and bringing back the retry/timeout behavior this PR is trying to avoid. Please cap/summarize the entity list here (for example count + first few IDs).

Comment thread packages/publisher/src/ack-collector.ts Outdated

if (isStorageACKDecline(ack)) {
const code = ack.declineCode ?? 'UNKNOWN';
const declineMessage = ack.declineMessage ?? '';
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Issue: declineMessage is new peer-controlled protocol input, but we keep it verbatim and later splice it into logs and thrown errors. A buggy or malicious peer can inject newlines/control characters or force very large exception strings here. Please sanitize and truncate the message before storing/logging it.

const response = await this.deps.sendP2P(peerId, PROTOCOL_STORAGE_ACK, intentBytes);
const ack: StorageACKMsg = decodeStorageACK(response);

if (isStorageACKDecline(ack)) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug: This treats every typed decline as permanent, but the new SWM-state codes (NO_DATA_IN_SWM / MERKLE_MISMATCH_IN_SWM) can be transient while gossip replication catches up. Combined with the fast-fail path, a peer that would ACK on the next retry now permanently reduces the quorum pool, so publishes can fail even though enough cores become ready seconds later. Keep retries for replication-state declines, or split decline codes into retryable vs permanent.

contextGraphIdBigInt = BigInt(cgId);
} catch {
throw new Error(
return this.encodeDecline(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug: CG_ID_INVALID is a malformed publish request, not a peer-local refusal. Returning it as a normal decline makes the publisher fan out to every core and eventually report storage_ack_insufficient, which masks the actual caller error and can still wait on unrelated peers. Either keep throwing here, or teach the collector to treat this code as an immediate fatal publish error.

branarakic and others added 4 commits May 20, 2026 10:34
Today, when a core node legitimately can't ACK a publish (it doesn't
host the CG, its SWM is missing or stale, or its operational signer was
just rotated), the StorageACK handler `throw`s. The libp2p stream is
reset and the publisher only sees a generic IO error — so the ACK
collector retries the same peer 3× with exponential backoff before
giving up, and the publisher's final error is opaque (`stream reset`)
with no per-peer reason. This is the surface shape of GitHub issue
#541, where one core failed to mirror a `replicationPolicy=full` CG and
the publisher kept dialling it.

Add a typed-decline path:

* Extend the StorageACK protobuf with two optional fields —
  `declineCode: string` (field 6) and `declineMessage: string`
  (field 7). The change is strictly additive: old encoders never
  populate them, old decoders silently ignore them, so cross-version
  traffic is byte-identical to today. New decoders inspect
  `declineCode` first; non-empty means decline (signature/merkleRoot
  are unset). The decline codes — `NOT_HOSTED`, `NO_DATA_IN_SWM`,
  `MERKLE_MISMATCH_IN_SWM`, `SIGNER_NOT_REGISTERED`, `CG_ID_INVALID`
  — live in `STORAGE_ACK_DECLINE_CODES` and are part of the wire
  contract.

* Convert four graceful-throw paths in
  `packages/publisher/src/storage-ack-handler.ts` to decline returns:
  no-data-in-SWM, merkle mismatch *from SWM* (the inline-staging
  mismatch stays a throw — that's a publisher bug, not a network state
  mismatch), non-numeric / non-positive cgId, and the
  `isSignerRegistered=false` path. True protocol violations
  (oversize / unparseable staging, kaCount mismatch, leaf-count
  mismatch, edge-node-tried-to-ACK, identity > 2^64) keep `throw` so
  the connection still resets on actually-bad input.

* Update `packages/publisher/src/ack-collector.ts` to recognise
  declines: per-peer log, no retry against a declining peer (decline
  is permanent for the request), and a per-peer decline summary
  appended to the final `storage_ack_insufficient` error so operators
  see *why* each core declined from a single log line.

Tests:

* `packages/core/test/v10-proto.test.ts` — three new round-trip cases
  (decline-only message, old-shape forward compat, old-encoder bytes
  decode without spurious decline fields).
* `packages/publisher/test/v10-ack-edge-cases.test.ts` — four new
  decline-shape assertions on the handler (NO_DATA_IN_SWM,
  MERKLE_MISMATCH_IN_SWM, two CG_ID_INVALID variants,
  SIGNER_NOT_REGISTERED), one assertion that inline-staging mismatch
  *still* throws, and four collector-side cases covering quorum-with-
  declines, decline reasons surfaced in the final error, unknown
  decline codes (forward compat), and empty-message declines.
* `packages/publisher/test/storage-ack-handler.test.ts` and
  `v10-protocol-operations.test.ts` — three pre-existing tests
  updated to assert the new decline-vs-throw boundary.

Co-authored-by: Cursor <cursoragent@cursor.com>
ack-collector now fast-fails with `storage_ack_insufficient` (and the
full per-peer decline detail) the moment the still-pending peer pool
can no longer satisfy `REQUIRED_ACKS`, instead of waiting out the
full ACK_TIMEOUT_MS for a hung peer that — by that point — couldn't
change the outcome.

The check is conservative: it only fires once
`collected.length + still_pending < REQUIRED_ACKS`, so a publish that
could still reach quorum is never aborted early. Every requestACK
settles via a `try/finally`, so transport-error retries also
contribute to the impossibility detection (covered by the existing
"transport errors exhaust enough peers" test, which previously
expected a slow `storage_ack_timeout`).

Also factor the decline-formatting into `formatDeclineDetail()` so
the timeout / fast-fail / final-throw paths all surface the same
diagnostic. Drop the stale `PR#557` reference in the comment block.

Co-authored-by: Cursor <cursoragent@cursor.com>
@branarakic branarakic force-pushed the feat/publisher-storage-ack-typed-declines branch from 49757d9 to 8f6a83e Compare May 20, 2026 08:37
@branarakic
Copy link
Copy Markdown
Contributor Author

Rebased onto current main (264a5fe3, post-rc.9 merge). Clean rebase, no conflicts — main's proto/index.ts and storage-ack-handler.ts activity since the merge-base touched different lines.

Local verification:

  • pnpm --filter @origintrail-official/dkg-core --filter @origintrail-official/dkg-publisher build — both green.
  • dkg-core test suite: 905 passed.
  • dkg-publisher test suite: 949 passed, 1 skipped.

The four commits on the branch (feat + Codex fixup + 2 follow-up tightenings) were preserved as-is. Happy to squash to a single commit on request — the PR uses a merge-commit strategy on main, so I left history intact.

Rebased HEAD: 8f6a83e5 (was 49757d95).

throw new Error(
return this.encodeDecline(
cgId,
STORAGE_ACK_DECLINE_CODES.CG_ID_INVALID,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug: CG_ID_INVALID is a malformed publish request, not a peer-specific decline. Returning it as a typed decline makes the publisher report storage_ack_insufficient after querying multiple peers instead of surfacing the real contract error immediately, which hides the root cause and changes the failure mode for callers. Keep invalid/non-positive contextGraphId on the stream-error path (throw) and reserve typed declines for cases where a well-formed request cannot be ACKed because of this peer's local state.

Two last-round Codex bugs from the May 17 and May 20 review passes.

1. CG_ID_INVALID is a malformed-publish error, not a peer-local decline.
   Returning it as a typed decline made the publisher fan out to every
   other core looking for a different answer and report
   `storage_ack_insufficient` only after the full retry budget — masking
   the real caller error. Reverted both code-paths in the handler to
   `throw new Error(...)` (libp2p stream reset) and dropped the
   `CG_ID_INVALID` member from `STORAGE_ACK_DECLINE_CODES`. The two
   `declines (CG_ID_INVALID)` tests are rewritten as
   `expect(...).rejects.toThrow(...)`.

2. The collector treated every typed decline as permanent, but
   `NO_DATA_IN_SWM` / `MERKLE_MISMATCH_IN_SWM` can be transient while
   gossip replication catches up. Combined with the fast-fail path on
   impossible quorum, a core that would have ACKed seconds later was
   being permanently removed from the quorum pool the moment its SWM
   trailed the publish by even one gossip cycle.

   Added `TRANSIENT_STORAGE_ACK_DECLINE_CODES` + an
   `isTransientStorageACKDeclineCode()` helper next to the existing
   `STORAGE_ACK_DECLINE_CODES` enum, exported through `proto/index.ts`.
   The collector now distinguishes transient declines (retry through
   the normal transport backoff) from permanent ones (return null
   immediately) and clears any stored decline reason on success so
   stale decline reasons do not leak into `storage_ack_insufficient`
   error messages if quorum later fails for unrelated reasons.

   Updated test #1 to exercise SIGNER_NOT_REGISTERED (permanent
   semantics, sendCount=1 preserved) and added two new tests covering
   the transient path:
     - peer dialled MAX_RETRIES times on persistent transient decline
     - peer that resolves to a valid ACK on retry counts toward quorum

   Existing decline tests that hit transient codes legitimately take
   ~3s now (1s + 2s of backoff) — bumped their per-test timeouts to
   15s; the fast-fail-on-hang test's elapsed bound moved from <5s to
   <15s (still two orders of magnitude under ACK_TIMEOUT_MS).

Local verification: dkg-core 905/905, dkg-publisher 951/951 (+1 skip).

Co-authored-by: Cursor <cursoragent@cursor.com>
@branarakic
Copy link
Copy Markdown
Contributor Author

Addressed the two open 🔴 items from Codex rounds 4 + 5 in e3452dc4. Audit of the full review thread is below — the other 6 items had already been resolved in earlier fixup commits on this branch.

What landed in e3452dc4

1. CG_ID_INVALID reverted to throw (Codex round 4 + round 5 reaffirmation)

Both encodeDecline(...) calls in the handler's CG-id-validation block were reverted to throw new Error(...). A malformed contextGraphId is a publisher-side mistake the contract will never accept, not peer-local state — fanning out to every other core and reporting storage_ack_insufficient after the full retry budget hides the real caller error.

Also dropped CG_ID_INVALID from STORAGE_ACK_DECLINE_CODES so the enum can no longer be misused for this case, and rewrote the two declines (CG_ID_INVALID) tests as expect(...).rejects.toThrow(...).

2. Transient vs permanent decline classification (Codex round 4)

Added TRANSIENT_STORAGE_ACK_DECLINE_CODES + isTransientStorageACKDeclineCode() in packages/core/src/proto/storage-ack.ts:

  • Transient: NO_DATA_IN_SWM, MERKLE_MISMATCH_IN_SWM — replication is expected to catch up via gossip; collector retries through the existing transport backoff.
  • Permanent: SIGNER_NOT_REGISTERED (and any future code) — return null immediately; the peer is deselected for this request.

The collector's requestACK() loop:

  • Records the latest decline reason per peer on every attempt.
  • On a transient decline (and not the final attempt), logs "Transient decline from <peer>: <code> (retry N/MAX_RETRIES)", sleeps the same (attempt + 1) * 1000ms backoff used for transport errors, and continues.
  • On a permanent decline or the final attempt of a transient decline, logs "Decline from <peer>: <code>" and returns null.
  • On a peer that ultimately ACKs successfully (e.g. transient decline that resolves on retry), clears the prior declines entry so a stale decline reason cannot leak into a later storage_ack_insufficient error if quorum fails for unrelated reasons.

Test changes

  • Updated test Game coordinator gossip hardening (PR #29 follow-up) #1 to use SIGNER_NOT_REGISTERED (the canonical permanent code) so its sendCount === 1 assertion still pins the no-retry contract.
  • Added transient declines (NO_DATA_IN_SWM) are retried against the same peer up to MAX_RETRIES — sets up a 3-peer / requiredACKs=3 scenario where peer-0 always declines transiently and the other two ACK, so quorum cannot complete via shortcut and the full retry budget is observably spent.
  • Added a transient decline that resolves to a valid ACK on retry contributes to quorum — first dial declines, second dial returns a real ACK; the peer ends up in result.acks with sendCount === 2.
  • Existing decline tests that legitimately take ~3s now (1s + 2s of backoff): bumped per-test timeouts to 15s. The fast-fail-on-hang test's elapsed bound moved from <5s to <15s (still ≈ 1/10th of ACK_TIMEOUT_MS = 120s).

Audit of all 8 Codex review items

# Item File Severity Status
1 Fast-fail when quorum impossible ack-collector.ts 🔴 979c232d
2 Docstring references NOT_HOSTED that's never produced storage-ack-handler.ts 🟡 93ba4b6e
3 Forward-compat test must pin literal pre-change bytes v10-proto.test.ts 🟡 93ba4b6e
4 Decline message must cap entity list storage-ack-handler.ts 🟡 8f6a83e5
5 declineMessage is peer-controlled — sanitize + truncate ack-collector.ts 🟡 8f6a83e5
6 Transient SWM-state declines must retry ack-collector.ts 🔴 e3452dc4 (this push)
7 CG_ID_INVALID should throw, not decline storage-ack-handler.ts 🔴 e3452dc4 (this push)
8 Round-5 reaffirmation of #7 after rebase storage-ack-handler.ts 🔴 e3452dc4 (this push)

Local verification

  • pnpm --filter @origintrail-official/dkg-core --filter @origintrail-official/dkg-publisher build — green.
  • dkg-core tests: 905/905 passed.
  • dkg-publisher tests: 951 passed, 1 skipped (+2 vs prior runs: the two new transient-retry tests).

HEAD: e3452dc4.


if (swmQuads.length === 0) {
throw new Error(`No data found in SWM graph ${swmGraphUri} for entities: ${intent.rootEntities.join(', ')}`);
return this.encodeDecline(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug: returning a typed decline on the existing /dkg/10.0.0/storage-ack contract is not backward-compatible with older publishers. They ignore fields 6/7, see the empty signature fields in this payload, and treat it as an invalid ACK instead of following the old retry-on-stream-reset path. That means mixed-version rollouts can still lose quorum on transient SWM lag. Please gate declines behind a new protocol/capability or keep the legacy throw behavior until every publisher in the cluster understands decline messages. The same issue applies to the other encodeDecline(...) returns below.

// final `storage_ack_insufficient` error. Overwriting any
// prior entry is intentional — operators care most about
// why the peer ultimately could not ACK.
declines.set(peerId, { code, message: declineMessage });
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Issue: this per-peer decline record is only cleared on a later valid ACK. If the next retry fails with a transport error, the final storage_ack_insufficient message will still report the stale decline even though the terminal reason was connection/reset/timeout. Clear or overwrite the map entry in the catch/final-failure path so the aggregated diagnostics reflect the last observed outcome for that peer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant