Add CDC-bootstrap-aware dedup logic to SourceBasedDeduper#1012
Conversation
Add two time-window guards to prevent unsafe cross-type dedup: - newStreamGracePeriodMs: blocks CDC-only from deduping against a CDC+BST stream still catching up on lag (grace period active) - snapshotWorstRefreshDays: blocks CDC+BST from deduping against a CDC-only stream too recent to cover the GPFS snapshot window Both default to 0 (disabled), preserving existing behavior. Also pass deduperProperties through SourceBasedDeduperFactory.
91923a5 to
0f06d67
Compare
| .filter(d -> d.getSource().equals(newStream.getSource())) | ||
| .collect(Collectors.toList()); | ||
|
|
||
| List<Datastream> cdcOnlyCandidates = sameSourceCandidates.stream() |
There was a problem hiding this comment.
To be precise - Since legacy datastreams do not have this metadata and we are defaulting to false now, this list could potentially have bootstrap enabled streams as well. But I think that is ok since those streams would have already caught up. Right?
There was a problem hiding this comment.
Akshay I don't think that should be the case bootstrap stream can't be candidate for cdc dedup. Here allStreams coming as input to deduper are only the streams with same connectorName only. So a BST stream (e.g. EspressoBootstrap) will never appear as a candidate when deduping a CDC stream (e.g. Espresso). Coordinator already filters allDatastreams by connectorName before passing them to the deduper.
There was a problem hiding this comment.
sorry, I meant to say bootstrap enabled cdc streams and not bootstrap stream.
There was a problem hiding this comment.
yes Akshay, those old created stream already caught up the lag and good to dedup.
| * @return {@code true} if the window has expired, {@code false} otherwise | ||
| */ | ||
| private static boolean hasWindowExpired(Datastream stream, long windowMs) { | ||
| if (!stream.hasMetadata() || stream.getMetadata() == null |
There was a problem hiding this comment.
nit: I think it will be safer to return true when windowMs == 0 regardless of creation_ms metadata present or not. Suppose some legacy datastream doesn't have this config.
There was a problem hiding this comment.
Akshay, I didn’t fully understand your thought process here. This flow executes only when the system.cdcBootstrapRequired flag is set. If that condition is true, then windowMs should definitely exist.
Otherwise, returning true would mean the stream can always be deduped, which shouldn’t be the case in every scenario.
- Use VerifiableProperties.getLong instead of Long.parseLong for config - Use Math.multiplyExact for snapshotWorstRefreshMs calculation - Return true from hasWindowExpired when windowMs == 0 regardless of metadata - Remove redundant getMetadata() == null checks (hasMetadata() covers this) - Fix hasWindowExpired Javadoc to match actual >= semantics
e00abaa to
78fc111
Compare
Move the system.cdcBootstrapRequired string literal to a shared constant in DatastreamMetadataConstants so internal consumers can reference it without duplicating the string. SourceBasedDeduper now uses it as the default value for its config key.
Define CDC_START_POSITION_EARLIEST and CDC_START_POSITION_LATEST as shared constants so internal consumers can reference them without duplicating the string literals.
| public static final String CDC_START_POSITION_EARLIEST = "earliest"; | ||
|
|
||
| /** | ||
| * Start position value indicating the consumer should begin from the latest offset. | ||
| */ | ||
| public static final String CDC_START_POSITION_LATEST = "latest"; |
There was a problem hiding this comment.
these 2 don't really fit in this class since they are not brooklin system properties. Is there any other place we can put them?
There was a problem hiding this comment.
Added in KafkaDatastreamMetadataConstants
| .filter(d -> d.getSource().equals(newStream.getSource())) | ||
| .collect(Collectors.toList()); | ||
|
|
||
| List<Datastream> cdcOnlyCandidates = sameSourceCandidates.stream() |
There was a problem hiding this comment.
sorry, I meant to say bootstrap enabled cdc streams and not bootstrap stream.
earliest/latest are Kafka-specific values, not Brooklin system properties, so they belong in KafkaDatastreamMetadataConstants rather than DatastreamMetadataConstants.
akshayrai
left a comment
There was a problem hiding this comment.
Thanks for the quick turn-around.
| } | ||
| if (!stream.hasMetadata() | ||
| || !stream.getMetadata().containsKey(DatastreamMetadataConstants.CREATION_MS)) { | ||
| return false; |
There was a problem hiding this comment.
so for streams that dont have this metadata, we would not dedupe against it? Should this be other way around as they would mostly be older streams that probably would have enough overlap with the source topic?
There was a problem hiding this comment.
I have checked there are no such streams where creationtime.ms field doesn't exist. I have checked in all prod cluster for tidb & espresso.
| * @return {@code true} if the window has expired, {@code false} otherwise | ||
| */ | ||
| private static boolean hasWindowExpired(Datastream stream, long windowMs) { | ||
| if (windowMs == 0) { |
There was a problem hiding this comment.
is this condition ever true?
Summary
newStreamGracePeriodMs: blocks a new CDC-only stream from deduping against a CDC+BST stream still in its catch-up grace period. The CDC+BST stream's offsets are behind the live tail — deduping would cause the CDC-only stream to miss events.snapshotWorstRefreshDays: blocks a new CDC+BST stream from deduping against a CDC-only stream created within the worst-case GPFS snapshot staleness window (4 days). The CDC-only stream's offsethistory would not reach back far enough for the CDC+BST stream's required earlier start position.
deduperPropertiesthrough to theSourceBasedDeduperconstructor so domain-sliced connector config reaches the deduper correctly.Both windows default to
0(disabled), preserving existing behavior for all non-CDC connectors. Whensystem.cdcBootstrapRequiredis absent on the new stream the legacy source-equality path is takenunchanged.
Testing Done
testCdcBstNewVsExistingCdcBst— same type always dedupstestCdcOnlyNewVsExistingCdcBstGraceActive— CDC-only blocked during CDC+BST grace windowtestCdcOnlyNewVsExistingCdcBstGraceExpired— CDC-only allowed after grace expirestestCdcBstNewVsExistingCdcOnlySnapshotWindowActive— CDC+BST blocked during snapshot windowtestCdcBstNewVsExistingCdcOnlySnapshotWindowExpired— CDC+BST allowed after window expirestestCdcOnlyNewVsExistingCdcOnly— same type always dedupstestLegacyPathNoCdcBootstrapKey— legacy source-equality path unaffectedtestCdcBstNewPrefersExistingCdcBstOverExpiredCdcOnly— priority ordering verifiedtestCdcOnlyNewPrefersExpiredCdcBstOverCdcOnly— priority ordering verifiedtestCdcOnlyNewVsCdcBstMissingCreationMs— conservative block whensystem.creation.msabsenttestCdcBstNewVsCdcOnlyMissingCreationMs— conservative block whensystem.creation.msabsenttestDefaultDedupeConfigWindowsDisabled—windowMs=0always allows dedup (backward-compatible default)testCdcAwarePathDifferentSourceNoMatch— different source → no match