diff --git a/runtime/sam/op/aggregate/aggregate.go b/runtime/sam/op/aggregate/aggregate.go index 1af3155b9a..991e17a746 100644 --- a/runtime/sam/op/aggregate/aggregate.go +++ b/runtime/sam/op/aggregate/aggregate.go @@ -319,7 +319,7 @@ func (a *Aggregator) Consume(batch sbuf.Batch, this super.Value) error { keyBytes := a.keyCache[:0] var prim super.Value for i, keyExpr := range a.keyExprs { - key := keyExpr.Eval(this) + key := keyExpr.Eval(this).SuperDeunion() if key.IsQuiet() { return nil } diff --git a/runtime/vam/op/aggregate/aggregate.go b/runtime/vam/op/aggregate/aggregate.go index 97a96f0f7a..13c8be6f2c 100644 --- a/runtime/vam/op/aggregate/aggregate.go +++ b/runtime/vam/op/aggregate/aggregate.go @@ -85,7 +85,7 @@ func (a *Aggregate) Pull(done bool) (vector.Any, error) { vals = append(vals, v) } } - vector.Apply(vector.ApplyRipUnions, func(args ...vector.Any) vector.Any { + vector.Apply(vector.ApplyRipUnions|vector.ApplyRipFusions, func(args ...vector.Any) vector.Any { a.consume(args[:len(keys)], args[len(keys):]) // XXX Perhaps there should be a "consume" version of Apply where // no return value is expected. diff --git a/runtime/ztests/op/aggregate/keys.yaml b/runtime/ztests/op/aggregate/keys.yaml new file mode 100644 index 0000000000..f815ce10c4 --- /dev/null +++ b/runtime/ztests/op/aggregate/keys.yaml @@ -0,0 +1,10 @@ +# Test that key values are deunioned and defused. +spq: count() by this + +input: | + 1 + 1::(int64|null) + fusion(1::(int64|null),) + +output: | + {that:1,count:3}