Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion compiler/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func BuildWithBuilder(rctx *runtime.Context, main *dag.Main, env *exec.Environme
}

func CompileWithAST(rctx *runtime.Context, ast *parser.AST, env *exec.Environment, optimize bool, parallel int, readers []sio.Reader) (*exec.Query, error) {
if len(readers) > 0 {
env = new(*env)
env.Stdin = sio.ConcatReader(readers...)
}
main, err := Analyze(rctx, ast, env, len(readers) > 0)
if err != nil {
return nil, err
Expand All @@ -72,7 +76,7 @@ func CompileWithAST(rctx *runtime.Context, ast *parser.AST, env *exec.Environmen
return nil, err
}
}
outputs, debugs, meter, err := Build(rctx, main, env, readers)
outputs, debugs, meter, err := Build(rctx, main, env, nil)
if err != nil {
return nil, err
}
Expand Down
7 changes: 3 additions & 4 deletions compiler/semantic/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ import (
// to DAG form, resolving syntax ambiguities, and performing constant propagation.
// After semantic analysis, the DAG is ready for either optimization or compilation.
func Analyze(ctx context.Context, p *parser.AST, env *exec.Environment, extInput bool) (*dag.Main, error) {
t := newTranslator(ctx, reporter{p.Files()}, env)
astseq := p.Parsed()
if extInput {
astseq.Prepend(&ast.DefaultScan{Kind: "DefaultScan"})
p.PrependFileScan([]string{"stdio:stdin"})
}
t := newTranslator(ctx, reporter{p.Files()}, env)
t.checker.pushErrs()
seq, _ := t.seq(astseq, super.TypeNull)
seq, _ := t.seq(p.Parsed(), super.TypeNull)
errs := t.checker.popErrs()
errs.flushErrs(t.reporter)
if err := t.Error(); err != nil {
Expand Down
7 changes: 2 additions & 5 deletions fuzz/fuzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/brimdata/super/csup"
"github.com/brimdata/super/pkg/field"
"github.com/brimdata/super/pkg/nano"
"github.com/brimdata/super/pkg/storage/mock"
"github.com/brimdata/super/runtime"
"github.com/brimdata/super/runtime/exec"
"github.com/brimdata/super/sbuf"
Expand All @@ -30,7 +29,6 @@ import (
"github.com/brimdata/super/sup"
"github.com/stretchr/testify/require"
"github.com/x448/float16"
"go.uber.org/mock/gomock"
)

func ReadBSUP(bs []byte) ([]super.Value, error) {
Expand Down Expand Up @@ -91,8 +89,7 @@ func RunQueryCSUP(t testing.TB, buf *bytes.Buffer, querySource string) []super.V

func RunQuery(t testing.TB, sctx *super.Context, readers []sio.Reader, querySource string, useDemand func(demandIn demand.Demand)) []super.Value {
// Compile query
engine := mock.NewMockEngine(gomock.NewController(t))
comp := compiler.NewCompiler(engine)
comp := compiler.NewCompiler(nil)
ast, err := parser.ParseText(querySource)
if err != nil {
t.Skipf("%v", err)
Expand All @@ -105,7 +102,7 @@ func RunQuery(t testing.TB, sctx *super.Context, readers []sio.Reader, querySour

// Infer demand
// TODO This is a hack and should be replaced by a cleaner interface in CompileQuery.
env := exec.NewEnvironment(engine, nil)
env := exec.NewEnvironment(nil, nil)
main, err := semantic.Analyze(t.Context(), ast, env, true)
if err != nil {
t.Skipf("%v", err)
Expand Down
5 changes: 5 additions & 0 deletions runtime/exec/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/brimdata/super/pkg/field"
"github.com/brimdata/super/pkg/storage"
"github.com/brimdata/super/sbuf"
"github.com/brimdata/super/sio"
"github.com/brimdata/super/sio/anyio"
"github.com/brimdata/super/sio/csupio"
"github.com/brimdata/super/sio/fjsonio"
Expand Down Expand Up @@ -56,6 +57,7 @@ type Environment struct {
ReaderOpts anyio.ReaderOpts
Runtime Runtime
SampleSize int
Stdin sio.Reader
}

func NewEnvironment(engine storage.Engine, d *db.Root) *Environment {
Expand Down Expand Up @@ -114,6 +116,9 @@ func (e *Environment) Open(ctx context.Context, sctx *super.Context, path, forma
fields = proj.Paths()
}
}
if path == "stdio:stdin" && e.Stdin != nil {
return sbuf.NewScanner(ctx, e.Stdin, pushdown)
}
file, err := anyio.Open(ctx, sctx, e.engine, path, e.readerOpts(fields, format))
if err != nil {
return nil, fmt.Errorf("%s: %w", path, err)
Expand Down
4 changes: 2 additions & 2 deletions runtime/sam/expr/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ func runCasesHelper(t *testing.T, record string, cases []testcase, expectBufferF
require.NoError(t, err, "filter: %q", c.filter)
_, _, builder, err := compiler.BuildWithBuilder(rctx, main, env, nil)
require.NoError(t, err, "filter: %q", c.filter)
scan, ok := main.Body[0].(*dag.DefaultScan)
scan, ok := main.Body[0].(*dag.FileScan)
require.True(t, ok)
filterMaker := rungen.NewPushdown(builder, scan.Filter)
filterMaker := rungen.NewPushdown(builder, scan.Pushdown.DataFilter.Expr)
f, err := filterMaker.DataFilter()
assert.NoError(t, err, "filter: %q", c.filter)
if f != nil {
Expand Down
Loading