refactor for bulk staging#5880
Conversation
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
|
ping @ray-roestenburg-da @rautenrieth-da can I get a review please? |
ray-roestenburg-da
left a comment
There was a problem hiding this comment.
Some feedback on structure / design, thanks
| /** The key in the key-value store where the timestamp of the latest processed snapshot is stored. This is | ||
| * used to resume processing from the correct point in case of restarts. | ||
| */ | ||
| protected val kvStoreKey: String |
There was a problem hiding this comment.
Prefer for these vals (kvStoreKey and processedTimestampMetric) as arguments to the constructor for this abstract class. no initialization order surprises and makes it natural that you have to specify a stable value when extending the abstract base class.
| protected val processedTimestampMetric: MetricHandle.Gauge[CantonTimestamp] | ||
|
|
||
| /** This method should return the timestamp of the next snapshot available, after `last`, if any. | ||
| * The pipeline will poll this method until it returns a new snapshot, and then process that snapshot |
There was a problem hiding this comment.
| * The pipeline will poll this method until it returns a new snapshot, and then process that snapshot | |
| * The pipeline will poll this method until it returns a new snapshot, and then process that snapshot |
|
|
||
| /** This method should return the timestamp of the next snapshot available, after `last`, if any. | ||
| * The pipeline will poll this method until it returns a new snapshot, and then process that snapshot | ||
| * before polling for the next one. It is ok for this method to return a snapshot that should not actually |
There was a problem hiding this comment.
nit: don't know if this info changes anything for reader that wants to implement this. Maybe better to document this in one place, in the non-abstract method where the process occurs and both should.. and this method are called.
|
|
||
| /** This method should return true if the snapshot at the given timestamp should be processed, | ||
| * or false if it should be skipped. This is used to skip snapshots that are not relevant for bulk storage | ||
| * (e.g. because they are in the DB, but are more frequent then the frequency we need for bulk storage). |
There was a problem hiding this comment.
I can't think of a situation at this moment where we would want to skip a snapshot (the snapshots are created every 3 hours and it seems very useful to have all of them), what is the use case for this?
There was a problem hiding this comment.
We already skip them today, this just refactors that. We store in bulk storage only one snapshot per day, not every 3 hours. Just to save on storage, as ACSs can always be reconstructed from the updates.
| tc: TraceContext | ||
| ): Flow[TimestampWithMigrationId, TimestampWithMigrationId, NotUsed] | ||
|
|
||
| protected[bulk] def readLatestProcessedSnapshotTimestamp(implicit |
There was a problem hiding this comment.
you could put these methods about progress in a separate class that wraps the kvProvider store and just does that, and then you use it where you need it. Pass that class in via constructor and delegate to it.
| } | ||
| } | ||
|
|
||
| protected[bulk] def readLatestProcessedSegment(implicit |
There was a problem hiding this comment.
this looks very similar to protected[bulk] def readLatestProcessedSnapshotTimestamp(implicit
in the other class, maybe these kind of helpers can go to it's own class where you simplify reading any type from the kv store.
|
|
||
| protected val description: String | ||
|
|
||
| /** The key in the key-value store where the timestamp of the latest updates segment is stored. This is |
There was a problem hiding this comment.
same here prefer these as args to the constructor
| import org.apache.pekko.stream.scaladsl.Flow | ||
| import org.lfdecentralizedtrust.splice.scan.store.bulk.AcsSnapshotBulkStorage.AcsSnapshotObjects | ||
|
|
||
| class AcsSnapshotBulkStorageStaging( |
There was a problem hiding this comment.
would be nice to have some docs here, what is staging for (or intended to be for soon).
I assume it is for a "workplace" or scratchpad where files are stored while everything is getting processed, and then at the end (atomically at best) files are moved to committed? so from outside API, this is purely a write side process?
I don't fully get why that staging place has an "is a" relationship with AcsSnapshotBulkStorage (why it is designed that way). In the comment on getAcsSnapshotAtOrBefore, that becomes confusing for me, that the staging returns data from committed.
| ec: ExecutionContext, | ||
| ): OptionT[Future, UpdatesSegment] = | ||
| store.readValueAndLogOnDecodingFailure(latestUpdatesSegmentInBulkStorageKey) | ||
| // private val latestUpdatesSegmentInBulkStorageKey = "latest_updates_segment_in_bulk_storage" |
There was a problem hiding this comment.
Probably want to remove commented code.
| ) | ||
| implicit val acsSnapshotTimestampMigrationCodec: Codec[TimestampWithMigrationId] = | ||
| deriveCodec[TimestampWithMigrationId] | ||
| // implicit val acsSnapshotTimestampMigrationEncoder: Encoder[TimestampWithMigrationId] = |
There was a problem hiding this comment.
curious why it was needed to remove these
There was a problem hiding this comment.
Well, note that this only adds them commented out, so they never actually existed :)
I added them in an attempt to fix some implicit mess, which didn't work, but then accidentally only commented them out instead of deleting them.
ray-roestenburg-da
left a comment
There was a problem hiding this comment.
Please see my suggestions, happy to approve this to speed up things since it's a refactor.
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
|
@ray-roestenburg-da is something like this commit what you had in mind? (of course, it's very partial, doesn't even compile as I haven't changed the call site where we construct it to adjust, just wondering if it's in the direction you had envisioned) |
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
|
@ray-roestenburg-da I restructure this as discussed, can you make another pass please? |
Part of #5884.
The idea is that the workflow that copies from staging from committed follows the same pattern, thus extends the same abstract base images, and only needs to implement the
processstage. That PR became a bit too large, so I extracted only the refactoring to a separate PR. This one should be a pure refactor, to allow that reuse.Pull Request Checklist
Cluster Testing
/cluster_teston this PR to request it, and ping someone with access to the DA-internal system to approve it./upgrade_teston this PR to request it, and ping someone with access to the DA-internal system to approve it./hdm_teston this PR to request it, and ping someone with access to the DA-internal system to approve it./lsu_teston this PR to request it, and ping someone with access to the DA-internal system to approve it.PR Guidelines
Fixes #n, and mention issues worked on using#nMerge Guidelines