From 98ba0ba3bedcb2a078921e90bbe7ecb979431a90 Mon Sep 17 00:00:00 2001 From: Noah Treuhaft Date: Fri, 5 Jun 2026 22:36:52 -0400 Subject: [PATCH] use ast.FileScan of stdin instead of ast.DefaultScan When its extInput parameter is true, compiler/semantic.Analyze prepends an ast.DefaultScan to the AST. Change it to prepend an ast.FileScan for standard input instead. To allow that FileScan to access sio.Readers passed to compiler.CompileWithAST, add a new runtime/exec.Environment.Stdin field, set the field in CompileWithAST if the length of the readers paramter is nonzero, and return a scanner for it in Environment.Open if the field is set when opening standard input. --- compiler/package.go | 6 +++++- compiler/semantic/analyzer.go | 7 +++---- fuzz/fuzz.go | 7 ++----- runtime/exec/environment.go | 5 +++++ runtime/sam/expr/filter_test.go | 4 ++-- 5 files changed, 17 insertions(+), 12 deletions(-) diff --git a/compiler/package.go b/compiler/package.go index 0dfe777f88..db877cb104 100644 --- a/compiler/package.go +++ b/compiler/package.go @@ -62,6 +62,10 @@ func BuildWithBuilder(rctx *runtime.Context, main *dag.Main, env *exec.Environme } func CompileWithAST(rctx *runtime.Context, ast *parser.AST, env *exec.Environment, optimize bool, parallel int, readers []sio.Reader) (*exec.Query, error) { + if len(readers) > 0 { + env = new(*env) + env.Stdin = sio.ConcatReader(readers...) + } main, err := Analyze(rctx, ast, env, len(readers) > 0) if err != nil { return nil, err @@ -72,7 +76,7 @@ func CompileWithAST(rctx *runtime.Context, ast *parser.AST, env *exec.Environmen return nil, err } } - outputs, debugs, meter, err := Build(rctx, main, env, readers) + outputs, debugs, meter, err := Build(rctx, main, env, nil) if err != nil { return nil, err } diff --git a/compiler/semantic/analyzer.go b/compiler/semantic/analyzer.go index e9852a7979..89d5c58543 100644 --- a/compiler/semantic/analyzer.go +++ b/compiler/semantic/analyzer.go @@ -19,13 +19,12 @@ import ( // to DAG form, resolving syntax ambiguities, and performing constant propagation. // After semantic analysis, the DAG is ready for either optimization or compilation. func Analyze(ctx context.Context, p *parser.AST, env *exec.Environment, extInput bool) (*dag.Main, error) { - t := newTranslator(ctx, reporter{p.Files()}, env) - astseq := p.Parsed() if extInput { - astseq.Prepend(&ast.DefaultScan{Kind: "DefaultScan"}) + p.PrependFileScan([]string{"stdio:stdin"}) } + t := newTranslator(ctx, reporter{p.Files()}, env) t.checker.pushErrs() - seq, _ := t.seq(astseq, super.TypeNull) + seq, _ := t.seq(p.Parsed(), super.TypeNull) errs := t.checker.popErrs() errs.flushErrs(t.reporter) if err := t.Error(); err != nil { diff --git a/fuzz/fuzz.go b/fuzz/fuzz.go index ce885f1e2e..63f255efab 100644 --- a/fuzz/fuzz.go +++ b/fuzz/fuzz.go @@ -19,7 +19,6 @@ import ( "github.com/brimdata/super/csup" "github.com/brimdata/super/pkg/field" "github.com/brimdata/super/pkg/nano" - "github.com/brimdata/super/pkg/storage/mock" "github.com/brimdata/super/runtime" "github.com/brimdata/super/runtime/exec" "github.com/brimdata/super/sbuf" @@ -30,7 +29,6 @@ import ( "github.com/brimdata/super/sup" "github.com/stretchr/testify/require" "github.com/x448/float16" - "go.uber.org/mock/gomock" ) func ReadBSUP(bs []byte) ([]super.Value, error) { @@ -91,8 +89,7 @@ func RunQueryCSUP(t testing.TB, buf *bytes.Buffer, querySource string) []super.V func RunQuery(t testing.TB, sctx *super.Context, readers []sio.Reader, querySource string, useDemand func(demandIn demand.Demand)) []super.Value { // Compile query - engine := mock.NewMockEngine(gomock.NewController(t)) - comp := compiler.NewCompiler(engine) + comp := compiler.NewCompiler(nil) ast, err := parser.ParseText(querySource) if err != nil { t.Skipf("%v", err) @@ -105,7 +102,7 @@ func RunQuery(t testing.TB, sctx *super.Context, readers []sio.Reader, querySour // Infer demand // TODO This is a hack and should be replaced by a cleaner interface in CompileQuery. - env := exec.NewEnvironment(engine, nil) + env := exec.NewEnvironment(nil, nil) main, err := semantic.Analyze(t.Context(), ast, env, true) if err != nil { t.Skipf("%v", err) diff --git a/runtime/exec/environment.go b/runtime/exec/environment.go index a3f495ed04..3da60bb54e 100644 --- a/runtime/exec/environment.go +++ b/runtime/exec/environment.go @@ -14,6 +14,7 @@ import ( "github.com/brimdata/super/pkg/field" "github.com/brimdata/super/pkg/storage" "github.com/brimdata/super/sbuf" + "github.com/brimdata/super/sio" "github.com/brimdata/super/sio/anyio" "github.com/brimdata/super/sio/csupio" "github.com/brimdata/super/sio/fjsonio" @@ -56,6 +57,7 @@ type Environment struct { ReaderOpts anyio.ReaderOpts Runtime Runtime SampleSize int + Stdin sio.Reader } func NewEnvironment(engine storage.Engine, d *db.Root) *Environment { @@ -114,6 +116,9 @@ func (e *Environment) Open(ctx context.Context, sctx *super.Context, path, forma fields = proj.Paths() } } + if path == "stdio:stdin" && e.Stdin != nil { + return sbuf.NewScanner(ctx, e.Stdin, pushdown) + } file, err := anyio.Open(ctx, sctx, e.engine, path, e.readerOpts(fields, format)) if err != nil { return nil, fmt.Errorf("%s: %w", path, err) diff --git a/runtime/sam/expr/filter_test.go b/runtime/sam/expr/filter_test.go index 7b478d281e..255c4e0dc5 100644 --- a/runtime/sam/expr/filter_test.go +++ b/runtime/sam/expr/filter_test.go @@ -65,9 +65,9 @@ func runCasesHelper(t *testing.T, record string, cases []testcase, expectBufferF require.NoError(t, err, "filter: %q", c.filter) _, _, builder, err := compiler.BuildWithBuilder(rctx, main, env, nil) require.NoError(t, err, "filter: %q", c.filter) - scan, ok := main.Body[0].(*dag.DefaultScan) + scan, ok := main.Body[0].(*dag.FileScan) require.True(t, ok) - filterMaker := rungen.NewPushdown(builder, scan.Filter) + filterMaker := rungen.NewPushdown(builder, scan.Pushdown.DataFilter.Expr) f, err := filterMaker.DataFilter() assert.NoError(t, err, "filter: %q", c.filter) if f != nil {