[SPARK-56551][SQL][FOLLOW-UP] Fix setting numDeletedRows metric as -1#55576
[SPARK-56551][SQL][FOLLOW-UP] Fix setting numDeletedRows metric as -1#55576ZiyaZa wants to merge 5 commits intoapache:masterfrom
Conversation
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Could you add a test case for this bug fix, @ZiyaZa ?
Fix setting numDeletedRows metric as -1
| // SQLMetric.set call is a no-op if value is -1. Override numDeletedRows value in summary. | ||
| metrics("numDeletedRows").set(numDeletedRows) | ||
| super.getWriteSummary(query) | ||
| .map(_.asInstanceOf[DeleteSummaryImpl].copy(numDeletedRows = numDeletedRows)) |
There was a problem hiding this comment.
This seems awkward to read.
There was a problem hiding this comment.
Changed it to pattern-matching instead. I want to call super.getWriteSummary here because in future we might have more fields inside DeleteSummaryImpl, and repeating the same code here is not ideal.
Note that even if we set the initial value of the metric as -1, accessing .value would still give us 0 as SQLMetric hides negative values.
If you have a better proposal here, I can change it.
| -1L | ||
| } | ||
|
|
||
| // SQLMetric.set call is a no-op if value is -1. Override numDeletedRows value in summary. |
There was a problem hiding this comment.
Hm, does it mean the values in UI would differ from the commit info?
Take a closer look at SQLMetric. Shall we create them with -1 to indicate they are invalid? Today, we create them with 0 initial value making them valid metrics by default.
There was a problem hiding this comment.
class SQLMetric(
val metricType: String,
initValue: Long = 0L) extends AccumulatorV2[Long, Long] {
// initValue defines the initial value of the metric. 0 is the lowest value considered valid.
// If a SQLMetric is invalid, it is set to 0 upon receiving any updates, and it also reports
// 0 as its value to avoid exposing it to the user programmatically.
//
// For many SQLMetrics, we use initValue = -1 to indicate that the metric is by default invalid.
// At the end of a task, we will update the metric making it valid, and the invalid metrics will
// be filtered out when calculating min, max, etc. as a workaround
// for SPARK-11013.
There was a problem hiding this comment.
If my understanding is correct, setting the initial value as -1 will result it in the final numDeletedRows metric value being equal to -1 x num_of_tasks as no task calls .add() on it and each task will send -1 as the value and finally sum these up. I'm not sure how it behaves with task failures etc.
There are also places where we legitimately rely on metric values being initialized to 0 unless updated. For instance, MERGE numTargetRowsCopied should stay as 0, unless we have a row that is being copied and then we increment it.
Since the meaning of -1 is only documented at Summary classes as not found, I don't think we have to make the metrics behave the same way. It looks like metrics were not built to support negative values properly.
This is not something that can be tested. Ideally, this should never happen as it would mean we have a bug in the code. If we could write a test that hits this, then we should fix another bug and make sure this code can't be hit. I only detected this during further development. I have tested this manually by changing the code and making it buggy, and saw that the metric in summary was set as -1. |
|
|
||
| // SQLMetric.set call is a no-op if value is -1. Override numDeletedRows value in summary. | ||
| metrics("numDeletedRows").set(numDeletedRows) | ||
| super.getWriteSummary(query).map { |
There was a problem hiding this comment.
These calls to super and then adjusting output is a sign that we may need to split the parent method into smaller blocks and override those.
What about something like below?
private def buildMergeSummary(): Option[WriteSummary] = {
collectFirst(query) { case m: MergeRowsExec => m }.map { n =>
...
}
}
private def buildUpdateSummary(): Option[WriteSummary] = {
...
}
protected def buildDeleteSummary(): Option[WriteSummary] = {
...
}
override protected def buildWriteSummary(): Option[WriteSummary] = {
rowLevelCommand match {
case MERGE => buildMergeSummary()
case UPDATE => buildUpdateSummary()
case DELETE => buildDeleteSummary()
}
}
Then you only need to override buildDeleteSummary.
There was a problem hiding this comment.
I applied what you suggested, except I made the new summary getters return the subclass type (e.g., buildDeleteSummary returns Option[DeleteSummaryImpl]). This way in the override we can do:
super.getDeleteSummary().map(_.copy(numDeletedRows = numDeletedRows)).
I believe calling the super here and overwriting numDeletedRows is future proof if we ever add more fields / calculations to the base function. But if you have strong opinions here, I can also construct DeleteSummaryImpl in place instead of calling super.
| } else { | ||
| super.getWriteSummary(query) | ||
| } | ||
| super.getWriteSummary(query) |
There was a problem hiding this comment.
Question: do we have variable shadowing in getWriteSummary? Does it accept query that's also a field in the method? Do we need to pass query?
There was a problem hiding this comment.
Yes, we did shadow it. I removed the parameter.
What changes were proposed in this pull request?
We were previously calling
SQLMetric.set(-1)when we couldn't compute the value ofnumDeletedRowsmetric. However, this call was a no-op, and we reported this metric in the write summary as 0 instead. This PR fixes it to report -1 as intended.Why are the changes needed?
Fix the bug above.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added a new test.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7