diff --git a/compiler/dag/op.go b/compiler/dag/op.go index 359347343..50af13bca 100644 --- a/compiler/dag/op.go +++ b/compiler/dag/op.go @@ -53,13 +53,12 @@ func (seq *Seq) Delete(from, to int) { type ( AggregateOp struct { - Kind string `json:"kind" unpack:""` - Limit int `json:"limit"` - Keys []Assignment `json:"keys"` - Aggs []Assignment `json:"aggs"` - InputSortDir int `json:"input_sort_dir,omitempty"` - PartialsIn bool `json:"partials_in,omitempty"` - PartialsOut bool `json:"partials_out,omitempty"` + Kind string `json:"kind" unpack:""` + Limit int `json:"limit"` + Keys []Assignment `json:"keys"` + Aggs []Assignment `json:"aggs"` + PartialsIn bool `json:"partials_in,omitempty"` + PartialsOut bool `json:"partials_out,omitempty"` } CombineOp struct { Kind string `json:"kind" unpack:""` diff --git a/compiler/optimizer/optimizer.go b/compiler/optimizer/optimizer.go index f464104d7..7e6ddc12e 100644 --- a/compiler/optimizer/optimizer.go +++ b/compiler/optimizer/optimizer.go @@ -332,29 +332,6 @@ func (o *Optimizer) propagateSortKeyOp(op dag.Op, parents []order.SortKeys) ([]o } switch op := op.(type) { case *dag.AggregateOp: - if parent.IsNil() { - return []order.SortKeys{nil}, nil - } - //XXX handle only primary sortKey for now - sortKey := parent.Primary() - for _, k := range op.Keys { - if groupingKey := fieldOf(k.LHS); groupingKey.Equal(sortKey.Key) { - rhsExpr := k.RHS - rhs := fieldOf(rhsExpr) - if rhs.Equal(sortKey.Key) || orderPreservingCall(rhsExpr, groupingKey) { - op.InputSortDir = int(sortKey.Order.Direction()) - // Currently, the aggregate operator will sort its - // output according to the primary key, but we - // should relax this and do an analysis here as - // to whether the sort is necessary for the - // downstream consumer. - return []order.SortKeys{parent}, nil - } - } - } - // We'll leave this as unknown for now in spite of the aggregate - // and not try to optimize downstream of the first aggregate - // unless there is an excplicit sort encountered. return []order.SortKeys{nil}, nil case *dag.ForkOp: var keys []order.SortKeys diff --git a/compiler/package.go b/compiler/package.go index cba8c9c37..0dfe777f8 100644 --- a/compiler/package.go +++ b/compiler/package.go @@ -12,7 +12,6 @@ import ( "github.com/brimdata/super/compiler/semantic" "github.com/brimdata/super/compiler/srcfiles" "github.com/brimdata/super/dbid" - "github.com/brimdata/super/order" "github.com/brimdata/super/runtime" "github.com/brimdata/super/runtime/exec" "github.com/brimdata/super/runtime/vam/op" @@ -140,25 +139,3 @@ func VectorFilterCompile(rctx *runtime.Context, query string, env *exec.Environm } return rungen.NewBuilder(rctx, env).BuildVamToSeqFilter(f.Expr, poolID, commitID) } - -// XXX currently used only by aggregate test, need to deprecate -func CompileWithSortKey(rctx *runtime.Context, ast *parser.AST, r sio.Reader, sortKey order.SortKey) (*exec.Query, error) { - env := exec.NewEnvironment(nil, nil) - main, err := Analyze(rctx, ast, env, true) - if err != nil { - return nil, err - } - scan, ok := main.Body[0].(*dag.DefaultScan) - if !ok { - return nil, errors.New("CompileWithSortKey: expected a reader") - } - scan.SortKeys = order.SortKeys{sortKey} - if err := Optimize(rctx, main, env, 0); err != nil { - return nil, err - } - outputs, debugs, meter, err := Build(rctx, main, env, []sio.Reader{r}) - if err != nil { - return nil, err - } - return exec.NewQuery(rctx, bundleOutputs(rctx, outputs, debugs), meter), nil -} diff --git a/compiler/rungen/aggregate.go b/compiler/rungen/aggregate.go index 12fcc43b1..70db59f97 100644 --- a/compiler/rungen/aggregate.go +++ b/compiler/rungen/aggregate.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/brimdata/super/compiler/dag" - "github.com/brimdata/super/order" "github.com/brimdata/super/pkg/field" "github.com/brimdata/super/runtime/sam/expr" "github.com/brimdata/super/runtime/sam/op/aggregate" @@ -21,11 +20,10 @@ func (b *Builder) compileAggregate(parent sbuf.Puller, a *dag.AggregateOp) (sbuf if err != nil { return nil, err } - dir := order.Direction(a.InputSortDir) if len(keys) == 0 { return aggregate.NewScalar(b.rctx, parent, names, reducers, a.PartialsIn, a.PartialsOut) } - return aggregate.New(b.rctx, parent, keys, names, reducers, a.Limit, dir, a.PartialsIn, a.PartialsOut) + return aggregate.New(b.rctx, parent, keys, names, reducers, a.Limit, a.PartialsIn, a.PartialsOut) } func (b *Builder) compileAggAssignments(assignments []dag.Assignment) (field.List, []*expr.Aggregator, error) { diff --git a/compiler/sfmt/dag.go b/compiler/sfmt/dag.go index cefc9dd30..79bee77a5 100644 --- a/compiler/sfmt/dag.go +++ b/compiler/sfmt/dag.go @@ -398,9 +398,6 @@ func (c *canonDAG) op(p dag.Op) { if p.PartialsOut { c.write(" partials-out") } - if p.InputSortDir != 0 { - c.write(" sort-dir %d", p.InputSortDir) - } c.ret() c.open() c.assignments(p.Aggs) diff --git a/compiler/ztests/par-ts.yaml b/compiler/ztests/par-ts.yaml index c3f027402..3af8dcc8e 100644 --- a/compiler/ztests/par-ts.yaml +++ b/compiler/ztests/par-ts.yaml @@ -80,16 +80,16 @@ outputs: | scatter ( seqscan ... - | aggregate partials-out sort-dir 1 + | aggregate partials-out count:=count() by y:=y,ts:=bucket(ts, 1h) ) ( seqscan ... - | aggregate partials-out sort-dir 1 + | aggregate partials-out count:=count() by y:=y,ts:=bucket(ts, 1h) ) | merge ts asc nulls last - | aggregate partials-in sort-dir 1 + | aggregate partials-in count:=count() by y:=y,ts:=ts | output main diff --git a/compiler/ztests/sem-aggregate-input-dir.yaml b/compiler/ztests/sem-aggregate-input-dir.yaml deleted file mode 100644 index 3069e571b..000000000 --- a/compiler/ztests/sem-aggregate-input-dir.yaml +++ /dev/null @@ -1,15 +0,0 @@ -script: | - export SUPER_DB=test - super db init -q - super db create -q -orderby ts pool-ts - super db compile -C -O "from 'pool-ts' | count() by ts:=bucket(ts, 1h)" | sed -e 's/pool .*/.../' - -outputs: - - name: stdout - data: | - lister ... - | slicer - | seqscan ... - | aggregate sort-dir 1 - count:=count() by ts:=bucket(ts, 1h) - | output main diff --git a/runtime/sam/op/aggregate/aggregate.go b/runtime/sam/op/aggregate/aggregate.go index 991e17a74..1328cd085 100644 --- a/runtime/sam/op/aggregate/aggregate.go +++ b/runtime/sam/op/aggregate/aggregate.go @@ -4,7 +4,6 @@ import ( "context" "encoding/binary" "errors" - "slices" "sync" "github.com/brimdata/super" @@ -54,12 +53,7 @@ type Aggregator struct { recordTypes map[int]*super.TypeRecord table map[string]*Row limit int - valueCompare expr.CompareFn // to compare primary group keys for early key output - keyCompare expr.CompareFn // compare the first key (used when input sorted) keysComparator *expr.Comparator // compare all keys - maxTableKey *super.Value - maxSpillKey *super.Value - inputDir order.Direction spiller *spill.MergeSort partialsIn bool partialsOut bool @@ -71,26 +65,17 @@ type Row struct { reducers valRow } -func NewAggregator(ctx context.Context, sctx *super.Context, keyRefs, keyExprs, aggRefs []expr.Evaluator, aggs []*expr.Aggregator, builder *super.RecordBuilder, limit int, inputDir order.Direction, partialsIn, partialsOut bool) (*Aggregator, error) { +func NewAggregator(ctx context.Context, sctx *super.Context, keyRefs, keyExprs, aggRefs []expr.Evaluator, aggs []*expr.Aggregator, builder *super.RecordBuilder, limit int, partialsIn, partialsOut bool) (*Aggregator, error) { if limit == 0 { limit = DefaultLimit } - var keyCompare, valueCompare expr.CompareFn - nkeys := len(keyExprs) - o, ok := inputDir.Which() - if ok && nkeys > 0 { - keySortExpr := expr.NewSortExpr(keyRefs[0], o, o.NullsMax(true)) - keyCompare = expr.NewComparator(keySortExpr).WithMissingAsNull().Compare - valueCompare = expr.NewValueCompareFn(o, o.NullsMax(true)) - } var sortExprs []expr.SortExpr for _, e := range keyRefs { - sortExprs = append(sortExprs, expr.NewSortExpr(e, o, o.NullsMax(true))) + sortExprs = append(sortExprs, expr.NewSortExpr(e, order.Asc, order.NullsLast)) } return &Aggregator{ ctx: ctx, sctx: sctx, - inputDir: inputDir, limit: limit, keyTypes: super.NewTypeVectorTable(), outTypes: super.NewTypeVectorTable(), @@ -99,19 +84,17 @@ func NewAggregator(ctx context.Context, sctx *super.Context, keyRefs, keyExprs, aggRefs: aggRefs, aggs: aggs, builder: builder, - typeCache: make([]super.Type, nkeys+len(aggs)), + typeCache: make([]super.Type, len(keyExprs)+len(aggs)), keyCache: make(scode.Bytes, 0, 128), table: make(map[string]*Row), recordTypes: make(map[int]*super.TypeRecord), - keyCompare: keyCompare, keysComparator: expr.NewComparator(sortExprs...).WithMissingAsNull(), - valueCompare: valueCompare, partialsIn: partialsIn, partialsOut: partialsOut, }, nil } -func New(rctx *runtime.Context, parent sbuf.Puller, keys []expr.Assignment, aggNames field.List, aggs []*expr.Aggregator, limit int, inputSortDir order.Direction, partialsIn, partialsOut bool) (sbuf.Puller, error) { +func New(rctx *runtime.Context, parent sbuf.Puller, keys []expr.Assignment, aggNames field.List, aggs []*expr.Aggregator, limit int, partialsIn, partialsOut bool) (*Op, error) { names := make(field.List, 0, len(keys)+len(aggNames)) for _, e := range keys { p, ok := e.LHS.Path() @@ -135,7 +118,7 @@ func New(rctx *runtime.Context, parent sbuf.Puller, keys []expr.Assignment, aggN keyRefs = append(keyRefs, expr.NewDottedExpr(rctx.Sctx, names[i])) keyExprs = append(keyExprs, keys[i].RHS) } - agg, err := NewAggregator(rctx.Context, rctx.Sctx, keyRefs, keyExprs, valRefs, aggs, builder, limit, inputSortDir, partialsIn, partialsOut) + agg, err := NewAggregator(rctx.Context, rctx.Sctx, keyRefs, keyExprs, valRefs, aggs, builder, limit, partialsIn, partialsOut) if err != nil { return nil, err } @@ -220,31 +203,6 @@ func (o *Op) run() { return } } - if o.agg.inputDir == 0 { - batch.Unref() - continue - } - // sorted input: see if we have any completed keys we can emit. - for { - res, err := o.agg.nextResult(false, batch) - if err != nil { - if _, ok := o.sendResult(nil, err); !ok { - return - } - break - } - if res == nil { - break - } - slices.SortStableFunc(res.Values(), o.agg.keyCompare) - done, ok := o.sendResult(res, nil) - if !ok { - return - } - if done { - break - } - } batch.Unref() } } @@ -318,14 +276,11 @@ func (a *Aggregator) Consume(batch sbuf.Batch, this super.Value) error { types := a.typeCache[:0] keyBytes := a.keyCache[:0] var prim super.Value - for i, keyExpr := range a.keyExprs { + for _, keyExpr := range a.keyExprs { key := keyExpr.Eval(this).SuperDeunion() if key.IsQuiet() { return nil } - if i == 0 && a.inputDir != 0 { - prim = a.updateMaxTableKey(key) - } types = append(types, key.Type()) // Append each value to the key as a flat value, independent // of whether this is a primitive or container. @@ -361,7 +316,7 @@ func (a *Aggregator) Consume(batch sbuf.Batch, this super.Value) error { } func (a *Aggregator) spillTable(eof bool, ref sbuf.Batch) error { - batch, err := a.readTable(true, true, ref) + batch, err := a.readTable(true, ref) if err != nil || batch == nil { return err } @@ -373,33 +328,7 @@ func (a *Aggregator) spillTable(eof bool, ref sbuf.Batch) error { } recs := batch.Values() // Note that this will sort recs according to g.keysComparator. - if err := a.spiller.Spill(a.ctx, recs); err != nil { - return err - } - if !eof && a.inputDir != 0 { - val := a.keyExprs[0].Eval(recs[len(recs)-1]) - if !val.IsError() { - // pass volatile super.Value since updateMaxSpillKey will make - // a copy if needed. - a.updateMaxSpillKey(val) - } - } - return nil -} - -// updateMaxTableKey is called with a volatile super.Value to update the -// max value seen in the table for the streaming logic when the input is sorted. -func (a *Aggregator) updateMaxTableKey(val super.Value) super.Value { - if a.maxTableKey == nil || a.valueCompare(val, *a.maxTableKey) > 0 { - a.maxTableKey = val.Copy().Ptr() - } - return *a.maxTableKey -} - -func (a *Aggregator) updateMaxSpillKey(v super.Value) { - if a.maxSpillKey == nil || a.valueCompare(v, *a.maxSpillKey) > 0 { - a.maxSpillKey = v.Copy().Ptr() - } + return a.spiller.Spill(a.ctx, recs) } // Results returns a batch of aggregation result records. Upon eof, @@ -408,7 +337,7 @@ func (a *Aggregator) updateMaxSpillKey(v super.Value) { // before eof, and keys that are completed will returned. func (a *Aggregator) nextResult(eof bool, batch sbuf.Batch) (sbuf.Batch, error) { if a.spiller == nil { - return a.readTable(eof, a.partialsOut, batch) + return a.readTable(a.partialsOut, batch) } if eof { // EOF: spill in-memory table before merging all files for output. @@ -416,28 +345,12 @@ func (a *Aggregator) nextResult(eof bool, batch sbuf.Batch) (sbuf.Batch, error) return nil, err } } - return a.readSpills(eof, batch) + return a.readSpills(batch) } -func (a *Aggregator) readSpills(eof bool, batch sbuf.Batch) (sbuf.Batch, error) { +func (a *Aggregator) readSpills(batch sbuf.Batch) (sbuf.Batch, error) { recs := make([]super.Value, 0, op.BatchLen) - if !eof && a.inputDir == 0 { - return nil, nil - } for len(recs) < op.BatchLen { - if !eof && a.inputDir != 0 { - rec, err := a.spiller.Peek() - if err != nil { - return nil, err - } - if rec == nil { - break - } - keyVal := a.keyExprs[0].Eval(*rec) - if !keyVal.IsError() && a.valueCompare(keyVal, *a.maxSpillKey) >= 0 { - break - } - } rec, err := a.nextResultFromSpills() if err != nil { return nil, err @@ -511,19 +424,11 @@ func (a *Aggregator) nextResultFromSpills() (*super.Value, error) { } // readTable returns a slice of records from the in-memory aggregate -// table. If flush is true, the entire table is returned. If flush is -// false and input is sorted only completed keys are returned. -// If partialsOut is true, it returns partial aggregation results as +// table. If partialsOut is true, it returns partial aggregation results as // defined by each agg.Function.ResultAsPartial() method. -func (a *Aggregator) readTable(flush, partialsOut bool, batch sbuf.Batch) (sbuf.Batch, error) { +func (a *Aggregator) readTable(partialsOut bool, batch sbuf.Batch) (sbuf.Batch, error) { var recs []super.Value for key, row := range a.table { - if !flush && a.valueCompare == nil { - panic("internal bug: tried to fetch completed tuples on non-sorted input") - } - if !flush && a.valueCompare(row.groupval, *a.maxTableKey) >= 0 { - continue - } // To build the output record, we spin over the key values // and append them with the buidler, then spin over the aggregations // and append each value. The builder is already set up with diff --git a/runtime/sam/op/aggregate/aggregate_test.go b/runtime/sam/op/aggregate/aggregate_test.go index 91a48c044..b28c12465 100644 --- a/runtime/sam/op/aggregate/aggregate_test.go +++ b/runtime/sam/op/aggregate/aggregate_test.go @@ -1,30 +1,10 @@ package aggregate_test import ( - "bytes" - "context" - "fmt" - "sort" - "strings" - "sync/atomic" "testing" - "github.com/brimdata/super" - "github.com/brimdata/super/compiler" - "github.com/brimdata/super/compiler/parser" - "github.com/brimdata/super/order" - "github.com/brimdata/super/pkg/field" - "github.com/brimdata/super/pkg/nano" - "github.com/brimdata/super/runtime" "github.com/brimdata/super/runtime/sam/op/aggregate" - "github.com/brimdata/super/sbuf" - "github.com/brimdata/super/sio" - "github.com/brimdata/super/sio/supio" - "github.com/brimdata/super/vector" - "github.com/brimdata/super/vector/vio" "github.com/brimdata/super/ztest" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestAggregateZtestsSpill(t *testing.T) { @@ -33,128 +13,3 @@ func TestAggregateZtestsSpill(t *testing.T) { aggregate.DefaultLimit = 1 ztest.Run(t, "../../../ztests/op/aggregate") } - -type countReader struct { - r sio.Reader - n atomic.Int64 -} - -var _ sbuf.ScannerAble = (*countReader)(nil) - -func (c *countReader) NewScanner(context.Context, sbuf.Pushdown) (sbuf.Scanner, error) { - return c, nil -} - -func (*countReader) Progress() vio.Progress { - panic("unused") -} - -func (c *countReader) Pull(bool) (sbuf.Batch, error) { - val, err := c.r.Read() - if val == nil || err != nil { - return nil, err - } - // Feed values to the caller one at a time. - c.n.Add(1) - return sbuf.NewArray([]super.Value{val.Copy()}), nil -} - -func (*countReader) Read() (*super.Value, error) { - panic("unused") -} - -type testAggregateWriter struct { - n int - writer sio.Writer - cb func(n int) -} - -func (w *testAggregateWriter) Push(vec vector.Any) error { - return sbuf.WriteVec(w, vec) -} - -func (w *testAggregateWriter) Write(val super.Value) error { - if err := w.writer.Write(val); err != nil { - return err - } - w.n += 1 - w.cb(w.n) - return nil -} - -func TestAggregateStreamingSpill(t *testing.T) { - // This test verifies that with sorted input, spillable aggregate streams results as input arrives. - // - // The sorted input key is ts. The input and config parameters are carefully chosen such that: - // - spills are not aligned with ts changes (at least some - // transitions from ts=n to ts=n+1 happen mid-spill) - // - secondary keys repeat in a ts bin - // - // Together these conditions test that the read barrier (using - // Aggregator.maxSpillKey) does not read a key from a - // spill before that all records for that key have been - // written to the spill. - // - savedPullerBatchValues := sbuf.PullerBatchValues - sbuf.PullerBatchValues = 1 - savedDefaultLimit := aggregate.DefaultLimit - aggregate.DefaultLimit = 2 - defer func() { - sbuf.PullerBatchValues = savedPullerBatchValues - aggregate.DefaultLimit = savedDefaultLimit - }() - - const totRecs = 200 - const recsPerTs = 9 - const uniqueIpsPerTs = 3 - - var data []string - for i := range totRecs { - t := i / recsPerTs - data = append(data, fmt.Sprintf("{ts:%s,ip:1.1.1.%d}", nano.Unix(int64(t), 0), i%uniqueIpsPerTs)) - } - - runOne := func(inputSortKey string) []string { - ast, err := parser.ParseText("count() by ts:=bucket(ts, 1s), ip") - assert.NoError(t, err) - - sctx := super.NewContext() - zr := supio.NewReader(sctx, strings.NewReader(strings.Join(data, "\n"))) - cr := &countReader{r: zr} - var outbuf bytes.Buffer - zw := supio.NewWriter(sio.NopCloser(&outbuf), supio.WriterOpts{}) - checker := &testAggregateWriter{ - writer: zw, - cb: func(n int) { - if inputSortKey != "" { - if n == uniqueIpsPerTs { - require.Less(t, cr.n.Load(), int64(totRecs)) - } - } - }, - } - sortKey := order.NewSortKey(order.Asc, field.Path{inputSortKey}) - query, err := newQueryOnOrderedReader(t.Context(), sctx, ast, cr, sortKey) - require.NoError(t, err) - defer query.Pull(true) - err = vio.Copy(checker, query) - require.NoError(t, err) - outData := strings.Split(outbuf.String(), "\n") - sort.Strings(outData) - return outData - } - - res := runOne("") // run once in non-streaming mode to have reference results to compare with. - resStreaming := runOne("ts") - require.Equal(t, res, resStreaming) -} - -func newQueryOnOrderedReader(ctx context.Context, sctx *super.Context, ast *parser.AST, reader sio.Reader, sortKey order.SortKey) (runtime.Query, error) { - rctx := runtime.NewContext(ctx, sctx) - q, err := compiler.CompileWithSortKey(rctx, ast, reader, sortKey) - if err != nil { - rctx.Cancel() - return nil, err - } - return q, nil -} diff --git a/service/handlers_test.go b/service/handlers_test.go index 98335b418..a276c0519 100644 --- a/service/handlers_test.go +++ b/service/handlers_test.go @@ -38,22 +38,6 @@ func TestQueryEmptyPool(t *testing.T) { assert.Equal(t, "", conn.TestQuery("from test")) } -func TestQueryAggregateReverse(t *testing.T) { - src := ` -{ts:1970-01-01T00:00:01Z,uid:"A"} -{ts:1970-01-01T00:00:01Z,uid:"B"} -{ts:1970-01-01T00:00:02Z,uid:"B"} -` - counts := ` -{ts:1970-01-01T00:00:02Z,count:1} -{ts:1970-01-01T00:00:01Z,count:2} -` - _, conn := newCore(t) - poolID := conn.TestPoolPost(api.PoolPostRequest{Name: "test"}) - conn.TestLoad(poolID, "main", strings.NewReader(src)) - require.Equal(t, counts, "\n"+conn.TestQuery("from test | count() by ts:=bucket(ts, 1s)")) -} - func TestPoolStats(t *testing.T) { src := ` {_path:"conn",ts:1970-01-01T00:00:01Z,uid:"CBrzd94qfowOqJwCHa"}