Skip to content

refactor for bulk staging#5880

Open
isegall-da wants to merge 12 commits into
mainfrom
isegall/refactor-for-bulk-staging
Open

refactor for bulk staging#5880
isegall-da wants to merge 12 commits into
mainfrom
isegall/refactor-for-bulk-staging

Conversation

@isegall-da

@isegall-da isegall-da commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

Part of #5884.
The idea is that the workflow that copies from staging from committed follows the same pattern, thus extends the same abstract base images, and only needs to implement the process stage. That PR became a bit too large, so I extracted only the refactoring to a separate PR. This one should be a pure refactor, to allow that reuse.

Pull Request Checklist

Cluster Testing

  • If a cluster test is required, comment /cluster_test on this PR to request it, and ping someone with access to the DA-internal system to approve it.
  • If an upgrade test is required, comment /upgrade_test on this PR to request it, and ping someone with access to the DA-internal system to approve it.
  • If a hard-migration test is required (from the latest release), comment /hdm_test on this PR to request it, and ping someone with access to the DA-internal system to approve it.
  • If a logical synchronizer upgrade test is required (from canton-3.5), comment /lsu_test on this PR to request it, and ping someone with access to the DA-internal system to approve it.

PR Guidelines

  • Include any change that might be observable by our partners or affect their deployment in the release notes.
  • Specify fixed issues with Fixes #n, and mention issues worked on using #n
  • Include a screenshot for frontend-related PRs - see README or use your favorite screenshot tool

Merge Guidelines

  • Make the git commit message look sensible when squash-merging on GitHub (most likely: just copy your PR description).

Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
@isegall-da isegall-da marked this pull request as ready for review June 9, 2026 21:30
@isegall-da isegall-da changed the title WIP: refactor for bulk staging refactor for bulk staging Jun 9, 2026
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
@isegall-da

Copy link
Copy Markdown
Contributor Author

ping @ray-roestenburg-da @rautenrieth-da can I get a review please?

@ray-roestenburg-da ray-roestenburg-da left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some feedback on structure / design, thanks

/** The key in the key-value store where the timestamp of the latest processed snapshot is stored. This is
* used to resume processing from the correct point in case of restarts.
*/
protected val kvStoreKey: String

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer for these vals (kvStoreKey and processedTimestampMetric) as arguments to the constructor for this abstract class. no initialization order surprises and makes it natural that you have to specify a stable value when extending the abstract base class.

protected val processedTimestampMetric: MetricHandle.Gauge[CantonTimestamp]

/** This method should return the timestamp of the next snapshot available, after `last`, if any.
* The pipeline will poll this method until it returns a new snapshot, and then process that snapshot

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* The pipeline will poll this method until it returns a new snapshot, and then process that snapshot
* The pipeline will poll this method until it returns a new snapshot, and then process that snapshot


/** This method should return the timestamp of the next snapshot available, after `last`, if any.
* The pipeline will poll this method until it returns a new snapshot, and then process that snapshot
* before polling for the next one. It is ok for this method to return a snapshot that should not actually

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't know if this info changes anything for reader that wants to implement this. Maybe better to document this in one place, in the non-abstract method where the process occurs and both should.. and this method are called.


/** This method should return true if the snapshot at the given timestamp should be processed,
* or false if it should be skipped. This is used to skip snapshots that are not relevant for bulk storage
* (e.g. because they are in the DB, but are more frequent then the frequency we need for bulk storage).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of a situation at this moment where we would want to skip a snapshot (the snapshots are created every 3 hours and it seems very useful to have all of them), what is the use case for this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already skip them today, this just refactors that. We store in bulk storage only one snapshot per day, not every 3 hours. Just to save on storage, as ACSs can always be reconstructed from the updates.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok thanks

tc: TraceContext
): Flow[TimestampWithMigrationId, TimestampWithMigrationId, NotUsed]

protected[bulk] def readLatestProcessedSnapshotTimestamp(implicit

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could put these methods about progress in a separate class that wraps the kvProvider store and just does that, and then you use it where you need it. Pass that class in via constructor and delegate to it.

}
}

protected[bulk] def readLatestProcessedSegment(implicit

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks very similar to protected[bulk] def readLatestProcessedSnapshotTimestamp(implicit
in the other class, maybe these kind of helpers can go to it's own class where you simplify reading any type from the kv store.


protected val description: String

/** The key in the key-value store where the timestamp of the latest updates segment is stored. This is

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here prefer these as args to the constructor

import org.apache.pekko.stream.scaladsl.Flow
import org.lfdecentralizedtrust.splice.scan.store.bulk.AcsSnapshotBulkStorage.AcsSnapshotObjects

class AcsSnapshotBulkStorageStaging(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to have some docs here, what is staging for (or intended to be for soon).
I assume it is for a "workplace" or scratchpad where files are stored while everything is getting processed, and then at the end (atomically at best) files are moved to committed? so from outside API, this is purely a write side process?

I don't fully get why that staging place has an "is a" relationship with AcsSnapshotBulkStorage (why it is designed that way). In the comment on getAcsSnapshotAtOrBefore, that becomes confusing for me, that the staging returns data from committed.

ec: ExecutionContext,
): OptionT[Future, UpdatesSegment] =
store.readValueAndLogOnDecodingFailure(latestUpdatesSegmentInBulkStorageKey)
// private val latestUpdatesSegmentInBulkStorageKey = "latest_updates_segment_in_bulk_storage"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably want to remove commented code.

)
implicit val acsSnapshotTimestampMigrationCodec: Codec[TimestampWithMigrationId] =
deriveCodec[TimestampWithMigrationId]
// implicit val acsSnapshotTimestampMigrationEncoder: Encoder[TimestampWithMigrationId] =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why it was needed to remove these

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, note that this only adds them commented out, so they never actually existed :)
I added them in an attempt to fix some implicit mess, which didn't work, but then accidentally only commented them out instead of deleting them.

@ray-roestenburg-da ray-roestenburg-da self-requested a review June 11, 2026 13:04

@ray-roestenburg-da ray-roestenburg-da left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my suggestions, happy to approve this to speed up things since it's a refactor.

Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
@isegall-da

Copy link
Copy Markdown
Contributor Author

@ray-roestenburg-da is something like this commit what you had in mind? (of course, it's very partial, doesn't even compile as I haven't changed the call site where we construct it to adjust, just wondering if it's in the direction you had envisioned)

Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
@isegall-da

Copy link
Copy Markdown
Contributor Author

@ray-roestenburg-da I restructure this as discussed, can you make another pass please?

@ray-roestenburg-da ray-roestenburg-da left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants