Skip to content

Add CDC-bootstrap-aware dedup logic to SourceBasedDeduper#1012

Merged
mittalprince merged 5 commits into
masterfrom
pmittal/datapipes-33203-cdc-bootstrap-sla-dedup
May 19, 2026
Merged

Add CDC-bootstrap-aware dedup logic to SourceBasedDeduper#1012
mittalprince merged 5 commits into
masterfrom
pmittal/datapipes-33203-cdc-bootstrap-sla-dedup

Conversation

@mittalprince
Copy link
Copy Markdown
Collaborator

@mittalprince mittalprince commented May 16, 2026

Summary

  • SourceBasedDeduper: Added two time-window guards to prevent unsafe cross-type dedup between CDC-only and CDC+BST streams:
    • newStreamGracePeriodMs: blocks a new CDC-only stream from deduping against a CDC+BST stream still in its catch-up grace period. The CDC+BST stream's offsets are behind the live tail — deduping would cause the CDC-only stream to miss events.
    • snapshotWorstRefreshDays: blocks a new CDC+BST stream from deduping against a CDC-only stream created within the worst-case GPFS snapshot staleness window (4 days). The CDC-only stream's offset
      history would not reach back far enough for the CDC+BST stream's required earlier start position.
  • SourceBasedDeduperFactory: Pass deduperProperties through to the SourceBasedDeduper constructor so domain-sliced connector config reaches the deduper correctly.

Both windows default to 0 (disabled), preserving existing behavior for all non-CDC connectors. When system.cdcBootstrapRequired is absent on the new stream the legacy source-equality path is taken
unchanged.

Testing Done

  • testCdcBstNewVsExistingCdcBst — same type always dedups
  • testCdcOnlyNewVsExistingCdcBstGraceActive — CDC-only blocked during CDC+BST grace window
  • testCdcOnlyNewVsExistingCdcBstGraceExpired — CDC-only allowed after grace expires
  • testCdcBstNewVsExistingCdcOnlySnapshotWindowActive — CDC+BST blocked during snapshot window
  • testCdcBstNewVsExistingCdcOnlySnapshotWindowExpired — CDC+BST allowed after window expires
  • testCdcOnlyNewVsExistingCdcOnly — same type always dedups
  • testLegacyPathNoCdcBootstrapKey — legacy source-equality path unaffected
  • testCdcBstNewPrefersExistingCdcBstOverExpiredCdcOnly — priority ordering verified
  • testCdcOnlyNewPrefersExpiredCdcBstOverCdcOnly — priority ordering verified
  • testCdcOnlyNewVsCdcBstMissingCreationMs — conservative block when system.creation.ms absent
  • testCdcBstNewVsCdcOnlyMissingCreationMs — conservative block when system.creation.ms absent
  • testDefaultDedupeConfigWindowsDisabledwindowMs=0 always allows dedup (backward-compatible default)
  • testCdcAwarePathDifferentSourceNoMatch — different source → no match

@mittalprince mittalprince marked this pull request as ready for review May 16, 2026 19:24
Add two time-window guards to prevent unsafe cross-type dedup:
- newStreamGracePeriodMs: blocks CDC-only from deduping against a
CDC+BST stream still catching up on lag (grace period active)
- snapshotWorstRefreshDays: blocks CDC+BST from deduping against a
CDC-only stream too recent to cover the GPFS snapshot window
Both default to 0 (disabled), preserving existing behavior.
Also pass deduperProperties through SourceBasedDeduperFactory.
@mittalprince mittalprince force-pushed the pmittal/datapipes-33203-cdc-bootstrap-sla-dedup branch from 91923a5 to 0f06d67 Compare May 16, 2026 19:42
.filter(d -> d.getSource().equals(newStream.getSource()))
.collect(Collectors.toList());

List<Datastream> cdcOnlyCandidates = sameSourceCandidates.stream()
Copy link
Copy Markdown
Collaborator

@akshayrai akshayrai May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be precise - Since legacy datastreams do not have this metadata and we are defaulting to false now, this list could potentially have bootstrap enabled streams as well. But I think that is ok since those streams would have already caught up. Right?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Akshay I don't think that should be the case bootstrap stream can't be candidate for cdc dedup. Here allStreams coming as input to deduper are only the streams with same connectorName only. So a BST stream (e.g. EspressoBootstrap) will never appear as a candidate when deduping a CDC stream (e.g. Espresso). Coordinator already filters allDatastreams by connectorName before passing them to the deduper.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I meant to say bootstrap enabled cdc streams and not bootstrap stream.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes Akshay, those old created stream already caught up the lag and good to dedup.

* @return {@code true} if the window has expired, {@code false} otherwise
*/
private static boolean hasWindowExpired(Datastream stream, long windowMs) {
if (!stream.hasMetadata() || stream.getMetadata() == null
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it will be safer to return true when windowMs == 0 regardless of creation_ms metadata present or not. Suppose some legacy datastream doesn't have this config.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Akshay, I didn’t fully understand your thought process here. This flow executes only when the system.cdcBootstrapRequired flag is set. If that condition is true, then windowMs should definitely exist.
Otherwise, returning true would mean the stream can always be deduped, which shouldn’t be the case in every scenario.

- Use VerifiableProperties.getLong instead of Long.parseLong for config
- Use Math.multiplyExact for snapshotWorstRefreshMs calculation
- Return true from hasWindowExpired when windowMs == 0 regardless of metadata
- Remove redundant getMetadata() == null checks (hasMetadata() covers this)
- Fix hasWindowExpired Javadoc to match actual >= semantics
@mittalprince mittalprince force-pushed the pmittal/datapipes-33203-cdc-bootstrap-sla-dedup branch from e00abaa to 78fc111 Compare May 18, 2026 08:04
pmittal added 2 commits May 18, 2026 13:39
Move the system.cdcBootstrapRequired string literal to a shared constant
in DatastreamMetadataConstants so internal consumers can reference it
without duplicating the string. SourceBasedDeduper now uses it as the
default value for its config key.
Define CDC_START_POSITION_EARLIEST and CDC_START_POSITION_LATEST as
shared constants so internal consumers can reference them without
duplicating the string literals.
Comment on lines +149 to +154
public static final String CDC_START_POSITION_EARLIEST = "earliest";

/**
* Start position value indicating the consumer should begin from the latest offset.
*/
public static final String CDC_START_POSITION_LATEST = "latest";
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these 2 don't really fit in this class since they are not brooklin system properties. Is there any other place we can put them?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in KafkaDatastreamMetadataConstants

.filter(d -> d.getSource().equals(newStream.getSource()))
.collect(Collectors.toList());

List<Datastream> cdcOnlyCandidates = sameSourceCandidates.stream()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I meant to say bootstrap enabled cdc streams and not bootstrap stream.

earliest/latest are Kafka-specific values, not Brooklin system properties,
so they belong in KafkaDatastreamMetadataConstants rather than
DatastreamMetadataConstants.
Copy link
Copy Markdown
Collaborator

@akshayrai akshayrai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick turn-around.

}
if (!stream.hasMetadata()
|| !stream.getMetadata().containsKey(DatastreamMetadataConstants.CREATION_MS)) {
return false;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so for streams that dont have this metadata, we would not dedupe against it? Should this be other way around as they would mostly be older streams that probably would have enough overlap with the source topic?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have checked there are no such streams where creationtime.ms field doesn't exist. I have checked in all prod cluster for tidb & espresso.

* @return {@code true} if the window has expired, {@code false} otherwise
*/
private static boolean hasWindowExpired(Datastream stream, long windowMs) {
if (windowMs == 0) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this condition ever true?

@mittalprince mittalprince merged commit 86b6c98 into master May 19, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants