From 8ce6a3d04f4de96349681a8156e88d253b37f0af Mon Sep 17 00:00:00 2001 From: Sean George Date: Wed, 18 Feb 2026 21:43:32 +0000 Subject: [PATCH] Fix audit findings and add root package test suite Remediate all HIGH/MEDIUM/LOW findings from the security audit: - Add sync.Mutex to Queue for concurrent map access safety (#1) - Remove hardcoded 10s activity timeout from WithImplementation (#2) - Remove debug fmt.Println leaking workflow IDs (#3) - Add validateName to all constructors, panic on empty names (#4) - Add nil client check in NewFromSDK (#5) - Fix %s to %w for proper error wrapping in replaytest (#6) - Fix "activtity" typo and wrong noun in error messages (#7) - Fix goroutine leak in Worker.Run with done channel pattern (#8) - Change mock stubs from panic to error return (#19) Add 39 tests across 8 files bringing root package from 0% to 49% coverage. Remaining uncovered code requires a live Temporal server. --- activity.go | 27 +++++- activity_test.go | 215 ++++++++++++++++++++++++++++++++++++++++++ client.go | 13 ++- client_test.go | 40 ++++++++ example/main.go | 7 ++ go.mod | 3 +- query_update.go | 1 + query_update_test.go | 86 +++++++++++++++++ queue.go | 31 +++++- queue_test.go | 70 ++++++++++++++ replaytest.go | 3 +- replaytest_test.go | 47 ++++++++++ signal.go | 1 + signal_test.go | 155 ++++++++++++++++++++++++++++++ worker.go | 15 +-- worker_test.go | 172 ++++++++++++++++++++++++++++++++++ workflow.go | 33 +++---- workflow_test.go | 218 +++++++++++++++++++++++++++++++++++++++++++ 18 files changed, 1098 insertions(+), 39 deletions(-) create mode 100644 activity_test.go create mode 100644 client_test.go create mode 100644 query_update_test.go create mode 100644 queue_test.go create mode 100644 replaytest_test.go create mode 100644 signal_test.go create mode 100644 worker_test.go create mode 100644 workflow_test.go diff --git a/activity.go b/activity.go index 34f76ce..67016a9 100644 --- a/activity.go +++ b/activity.go @@ -27,7 +27,7 @@ func (a ActivityWithImpl) validate(q *Queue, v *validationState) error { } _, ok := v.activitiesValidated[a.activityName] if ok { - return fmt.Errorf("duplicate activtity name %s for queue %s", a.activityName, q.name) + return fmt.Errorf("duplicate activity name %s for queue %s", a.activityName, q.name) } v.activitiesValidated[a.activityName] = struct{}{} return nil @@ -43,9 +43,11 @@ type Activity[Param, Return any] struct { // NewActivity declares the existence of an activity on a given queue with a given name. func NewActivity[Param, Return any](q *Queue, name string) Activity[Param, Return] { + validateName(name) panicIfNotStruct[Param]("NewActivity") - q.registerActivity(name, func(ctx context.Context, param Param) (Return, error) { - panic(fmt.Sprintf("Activity %s execution not mocked", name)) + q.registerActivity(name, func(_ context.Context, _ Param) (Return, error) { + var zero Return + return zero, fmt.Errorf("Activity %s execution not mocked", name) }) return Activity[Param, Return]{Name: name, queue: q} } @@ -54,6 +56,7 @@ func NewActivity[Param, Return any](q *Queue, name string) Activity[Param, Retur // Instead of passing the Param struct directly to the activity, it passes each field of the struct // as a separate positional argument in the order they are defined. func NewActivityPositional[Param, Return any](q *Queue, name string) Activity[Param, Return] { + validateName(name) panicIfNotStruct[Param]("NewActivityPositional") // Get the type information for the Param struct @@ -74,9 +77,11 @@ func NewActivityPositional[Param, Return any](q *Queue, name string) Activity[Pa errorType := reflect.TypeOf((*error)(nil)).Elem() fnType := reflect.FuncOf(paramTypes, []reflect.Type{returnType, errorType}, false) - // Create a function that panics with the message "Function execution not mocked" + // Create a function that returns an error instead of panicking mockFn := reflect.MakeFunc(fnType, func(args []reflect.Value) []reflect.Value { - panic(fmt.Sprintf("Activity %s execution not mocked", name)) + zero := reflect.New(returnType).Elem() + errVal := reflect.ValueOf(fmt.Errorf("Activity %s execution not mocked", name)) + return []reflect.Value{zero, errVal} }) // Register the mock function @@ -85,6 +90,12 @@ func NewActivityPositional[Param, Return any](q *Queue, name string) Activity[Pa return Activity[Param, Return]{Name: name, queue: q, positional: true} } +// panicIfNotStruct enforces that the Param type parameter is a struct (or *struct). +// +// Why panic: Go's generics cannot express a "must be struct" constraint, so this +// is a runtime enforcement of a compile-time invariant. It fires during package +// init (these constructors are called in var declarations), never at request time. +// If Go adds structural type constraints, this function becomes unnecessary. func panicIfNotStruct[Param any](funcName string) { paramType := reflect.TypeOf((*Param)(nil)).Elem() if paramType.Kind() == reflect.Ptr { @@ -95,6 +106,12 @@ func panicIfNotStruct[Param any](funcName string) { } } +// extractFieldTypes returns the types of all exported fields in a struct type. +// +// Why panic: This is a defensive assertion in an internal helper. It is only +// reachable if the caller bypasses panicIfNotStruct, which is a programming +// error internal to this package. Returning an error here would propagate +// complexity to every call site for a condition that cannot occur in correct code. func extractFieldTypes(structType reflect.Type) []reflect.Type { if structType.Kind() == reflect.Ptr { structType = structType.Elem() diff --git a/activity_test.go b/activity_test.go new file mode 100644 index 0000000..5b2d26c --- /dev/null +++ b/activity_test.go @@ -0,0 +1,215 @@ +package tempts_test + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/vikstrous/tempts" + "go.temporal.io/sdk/testsuite" + "go.temporal.io/sdk/workflow" +) + +func TestNewActivity(t *testing.T) { + q := tempts.NewQueue("act-new-q") + type P struct{ V string } + type R struct{ V string } + + act := tempts.NewActivity[P, R](q, "test-act") + if act.Name != "test-act" { + t.Fatalf("expected name 'test-act', got %q", act.Name) + } +} + +func TestNewActivity_EmptyName(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatal("expected panic for empty activity name") + } + }() + q := tempts.NewQueue("act-empty-q") + type P struct{ V string } + type R struct{ V string } + tempts.NewActivity[P, R](q, "") +} + +func TestNewActivity_NonStructParam(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatal("expected panic for non-struct param") + } + }() + q := tempts.NewQueue("act-nonstruct-q") + tempts.NewActivity[string, string](q, "non-struct-act") +} + +func TestNewActivityPositional(t *testing.T) { + q := tempts.NewQueue("act-pos-q") + type P struct { + First string + Last string + } + type R struct{ Full string } + + act := tempts.NewActivityPositional[P, R](q, "pos-act") + if act.Name != "pos-act" { + t.Fatalf("expected name 'pos-act', got %q", act.Name) + } +} + +func TestActivity_WithImplementation(t *testing.T) { + q := tempts.NewQueue("act-impl-q") + type P struct{ V string } + type R struct{ V string } + + act := tempts.NewActivity[P, R](q, "impl-act") + impl := act.WithImplementation(func(_ context.Context, p P) (R, error) { + return R{V: p.V}, nil + }) + if impl == nil { + t.Fatal("expected non-nil ActivityWithImpl") + } +} + +func TestActivity_WithImplementationPositional(t *testing.T) { + q := tempts.NewQueue("act-impl-pos-q") + type P struct { + First string + Last string + } + type R struct{ Full string } + + act := tempts.NewActivityPositional[P, R](q, "impl-pos-act") + impl := act.WithImplementation(func(_ context.Context, p P) (R, error) { + return R{Full: p.First + " " + p.Last}, nil + }) + if impl == nil { + t.Fatal("expected non-nil ActivityWithImpl") + } +} + +func TestActivity_RunInWorkflow(t *testing.T) { + q := tempts.NewQueue("act-run-q") + type AP struct{ Name string } + type AR struct{ Name string } + type WP struct{ Name string } + type WR struct{ Result string } + + act := tempts.NewActivity[AP, AR](q, "run-act") + wf := tempts.NewWorkflow[WP, WR](q, "run-wf") + + wfImpl := wf.WithImplementation(func(ctx workflow.Context, p WP) (WR, error) { + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Second, + }) + r, err := act.Run(ctx, AP{Name: p.Name}) + return WR{Result: r.Name}, err + }) + + wrk, err := tempts.NewWorker(q, []tempts.Registerable{ + act.WithImplementation(func(_ context.Context, p AP) (AR, error) { + return AR{Name: strings.ToUpper(p.Name)}, nil + }), + wfImpl, + }) + if err != nil { + t.Fatal(err) + } + + ts := testsuite.WorkflowTestSuite{} + ts.SetDisableRegistrationAliasing(true) + env := ts.NewTestWorkflowEnvironment() + t.Cleanup(func() { env.AssertExpectations(t) }) + wrk.Register(env) + + result, err := wfImpl.ExecuteInTest(env, WP{Name: "test"}) + if err != nil { + t.Fatal(err) + } + if result.Result != "TEST" { + t.Fatalf("expected 'TEST', got %q", result.Result) + } +} + +func TestActivity_RunPositionalInWorkflow(t *testing.T) { + q := tempts.NewQueue("act-run-pos-q") + type AP struct { + First string + Last string + } + type AR struct{ Full string } + type WP struct{ Name string } + type WR struct{ Result string } + + act := tempts.NewActivityPositional[AP, AR](q, "run-pos-act") + wf := tempts.NewWorkflow[WP, WR](q, "run-pos-wf") + + wfImpl := wf.WithImplementation(func(ctx workflow.Context, p WP) (WR, error) { + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Second, + }) + r, err := act.Run(ctx, AP{First: "John", Last: "Doe"}) + return WR{Result: r.Full}, err + }) + + wrk, err := tempts.NewWorker(q, []tempts.Registerable{ + act.WithImplementation(func(_ context.Context, p AP) (AR, error) { + return AR{Full: p.First + " " + p.Last}, nil + }), + wfImpl, + }) + if err != nil { + t.Fatal(err) + } + + ts := testsuite.WorkflowTestSuite{} + ts.SetDisableRegistrationAliasing(true) + env := ts.NewTestWorkflowEnvironment() + t.Cleanup(func() { env.AssertExpectations(t) }) + wrk.Register(env) + + result, err := wfImpl.ExecuteInTest(env, WP{Name: "test"}) + if err != nil { + t.Fatal(err) + } + if result.Result != "John Doe" { + t.Fatalf("expected 'John Doe', got %q", result.Result) + } +} + +func TestActivity_MockStubReturnsError(t *testing.T) { + q := tempts.NewQueue("act-mock-stub-q") + type AP struct{ V string } + type AR struct{ V string } + type WP struct{ V string } + type WR struct{ V string } + + act := tempts.NewActivity[AP, AR](q, "mock-stub-act") + wf := tempts.NewWorkflow[WP, WR](q, "mock-stub-wf") + + // Workflow calls the unmocked activity + wfImpl := wf.WithImplementation(func(ctx workflow.Context, p WP) (WR, error) { + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Second, + }) + r, err := act.Run(ctx, AP{V: p.V}) + return WR{V: r.V}, err + }) + + ts := testsuite.WorkflowTestSuite{} + ts.SetDisableRegistrationAliasing(true) + env := ts.NewTestWorkflowEnvironment() + + // Register mock fallbacks (not real implementations) + q.RegisterMockFallbacks(env) + + // Execute the workflow — the activity stub should return an error, not panic + _, err := wfImpl.ExecuteInTest(env, WP{V: "test"}) + if err == nil { + t.Fatal("expected error from unmocked activity") + } + if !strings.Contains(err.Error(), "not mocked") { + t.Fatalf("expected 'not mocked' in error, got: %v", err) + } +} diff --git a/client.go b/client.go index 0a48343..2638f75 100644 --- a/client.go +++ b/client.go @@ -1,8 +1,12 @@ package tempts -import "go.temporal.io/sdk/client" +import ( + "fmt" -// Client is a wrapper for the temporal SDK client that keeps track of which namepace the client is connected to to return more useful errors if the wrong namespace is used. + "go.temporal.io/sdk/client" +) + +// Client is a wrapper for the temporal SDK client that keeps track of which namespace the client is connected to to return more useful errors if the wrong namespace is used. type Client struct { namespace string Client client.Client @@ -39,8 +43,11 @@ func Dial(opts client.Options) (*Client, error) { return &Client{Client: c, namespace: namespace}, nil } -// NewFromSDK allows the caller to pass in an existing temporal SDK client and manually specify which name that client was connected to. +// NewFromSDK allows the caller to pass in an existing temporal SDK client and manually specify which namespace that client was connected to. func NewFromSDK(c client.Client, namespace string) (*Client, error) { + if c == nil { + return nil, fmt.Errorf("client must not be nil") + } if namespace == "" { namespace = client.DefaultNamespace } diff --git a/client_test.go b/client_test.go new file mode 100644 index 0000000..c972b1f --- /dev/null +++ b/client_test.go @@ -0,0 +1,40 @@ +package tempts_test + +import ( + "testing" + + "github.com/vikstrous/tempts" + "go.temporal.io/sdk/client" +) + +// mockSDKClient satisfies client.Client without connecting to a real server. +type mockSDKClient struct { + client.Client +} + +func TestNewFromSDK_NilClient(t *testing.T) { + _, err := tempts.NewFromSDK(nil, "default") + if err == nil { + t.Fatal("expected error for nil client") + } +} + +func TestNewFromSDK_ValidClient(t *testing.T) { + c, err := tempts.NewFromSDK(mockSDKClient{}, "test-ns") + if err != nil { + t.Fatal(err) + } + if c == nil { + t.Fatal("expected non-nil client") + } +} + +func TestNewFromSDK_EmptyNamespace(t *testing.T) { + c, err := tempts.NewFromSDK(mockSDKClient{}, "") + if err != nil { + t.Fatal(err) + } + if c == nil { + t.Fatal("expected non-nil client") + } +} diff --git a/example/main.go b/example/main.go index 521345f..a395c32 100644 --- a/example/main.go +++ b/example/main.go @@ -156,6 +156,10 @@ func workflowFormatAndGreet(ctx workflow.Context, params FormatAndGreetParams) ( newName := "unknown" suffix := "" + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: time.Second * 10, + }) + workflowTypeFormatAndGreetGetName.SetHandler(ctx, func(struct{}) (FormatAndGreetGetNameResult, error) { return FormatAndGreetGetNameResult{Name: newName + suffix}, nil }) @@ -186,6 +190,9 @@ func workflowFormatAndGreet(ctx workflow.Context, params FormatAndGreetParams) ( } func workflowJustGreet(ctx workflow.Context, params JustGreetParams) (JustGreetResult, error) { + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: time.Second * 10, + }) name, err := activityTypeGreet.Run(ctx, GreetParams{Name: params.Name}) return JustGreetResult{Name: name.Name}, err } diff --git a/go.mod b/go.mod index 9239f0c..94799aa 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,11 @@ module github.com/vikstrous/tempts +// TODO: Upgrade to go 1.25 for testing/synctest, sync.WaitGroup.Go(), t.Context(), maps.Keys(), reflect.TypeAssert go 1.22.0 require ( github.com/gogo/protobuf v1.3.2 + github.com/stretchr/testify v1.8.4 go.temporal.io/api v1.24.0 go.temporal.io/sdk v1.25.1 ) @@ -22,7 +24,6 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.0 // indirect - github.com/stretchr/testify v1.8.4 // indirect go.uber.org/atomic v1.9.0 // indirect golang.org/x/net v0.14.0 // indirect golang.org/x/sys v0.11.0 // indirect diff --git a/query_update.go b/query_update.go index 9ebb3ce..09c3f93 100644 --- a/query_update.go +++ b/query_update.go @@ -14,6 +14,7 @@ type QueryHandler[Param, Return any] struct { // NewQueryHandler declares the name and types for a query to a workflow. func NewQueryHandler[Param, Return any](queryName string) *QueryHandler[Param, Return] { + validateName(queryName) panicIfNotStruct[Param]("NewQuery") return &QueryHandler[Param, Return]{name: queryName} } diff --git a/query_update_test.go b/query_update_test.go new file mode 100644 index 0000000..fabd960 --- /dev/null +++ b/query_update_test.go @@ -0,0 +1,86 @@ +package tempts_test + +import ( + "testing" + "time" + + "github.com/vikstrous/tempts" + "go.temporal.io/sdk/testsuite" + "go.temporal.io/sdk/workflow" +) + +func TestNewQueryHandler(t *testing.T) { + type QP struct{} + type QR struct{ Value string } + + qh := tempts.NewQueryHandler[QP, QR]("test-query") + if qh == nil { + t.Fatal("expected non-nil QueryHandler") + } +} + +func TestNewQueryHandler_EmptyName(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatal("expected panic for empty query name") + } + }() + type QP struct{} + type QR struct{ Value string } + tempts.NewQueryHandler[QP, QR]("") +} + +func TestQueryHandler_SetHandlerAndQuery(t *testing.T) { + q := tempts.NewQueue("query-test-q") + type WP struct{} + type WR struct{} + type QP struct{} + type QR struct{ Value string } + + wf := tempts.NewWorkflow[WP, WR](q, "query-wf") + qh := tempts.NewQueryHandler[QP, QR]("query-handler") + + wfImpl := wf.WithImplementation(func(ctx workflow.Context, _ WP) (WR, error) { + qh.SetHandler(ctx, func(_ QP) (QR, error) { + return QR{Value: "query-result"}, nil + }) + // Sleep to flush the command queue so the query handler is registered + // before the delayed callback fires + workflow.Sleep(ctx, time.Millisecond) + // Keep the workflow alive for the query + workflow.GetSignalChannel(ctx, "done").Receive(ctx, nil) + return WR{}, nil + }) + + wrk, err := tempts.NewWorker(q, []tempts.Registerable{wfImpl}) + if err != nil { + t.Fatal(err) + } + + ts := testsuite.WorkflowTestSuite{} + ts.SetDisableRegistrationAliasing(true) + env := ts.NewTestWorkflowEnvironment() + t.Cleanup(func() { env.AssertExpectations(t) }) + wrk.Register(env) + + env.RegisterDelayedCallback(func() { + result, err := env.QueryWorkflow("query-handler", QP{}) + if err != nil { + t.Fatalf("query failed: %v", err) + } + var qr QR + if err := result.Get(&qr); err != nil { + t.Fatalf("get query result: %v", err) + } + if qr.Value != "query-result" { + t.Fatalf("expected 'query-result', got %q", qr.Value) + } + // Signal workflow to complete + env.SignalWorkflow("done", nil) + }, time.Millisecond) + + _, err = wfImpl.ExecuteInTest(env, WP{}) + if err != nil { + t.Fatal(err) + } +} diff --git a/queue.go b/queue.go index 4dbf0d2..11f8cac 100644 --- a/queue.go +++ b/queue.go @@ -1,13 +1,17 @@ package tempts import ( + "strings" + "sync" + "go.temporal.io/sdk/activity" "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" ) -// Queue is the declaration of a temporal, queue, which is used for routing workflows and activities to workers. +// Queue is the declaration of a temporal queue, which is used for routing workflows and activities to workers. type Queue struct { + mu sync.Mutex name string activities map[string]any workflows map[string]any @@ -19,22 +23,45 @@ type registry interface { SetActivityTaskQueue(string, ...any) } -// NewQueue declares the existence of a queue +// validateName panics on empty or whitespace-only names. +// +// Why panic instead of returning error: These constructors are designed for +// package-level var declarations (var q = tempts.NewQueue("main")) where +// error returns can't be handled. An empty name is always a programming +// mistake — it can never be correct at runtime — so it's the same class +// of invariant violation as panicIfNotStruct. The panic fires during +// process init, before any Temporal work is processed, giving immediate +// feedback during development. See regexp.MustCompile for the stdlib +// precedent of panicking on invalid compile-time-known inputs. +func validateName(name string) { + if strings.TrimSpace(name) == "" { + panic("tempts: name must not be empty or whitespace-only") + } +} + +// NewQueue declares the existence of a queue. func NewQueue(name string) *Queue { + validateName(name) return &Queue{name: name, activities: map[string]any{}, workflows: map[string]any{}} } func (q *Queue) registerActivity(activityName string, fn any) { + q.mu.Lock() + defer q.mu.Unlock() q.activities[activityName] = fn } func (q *Queue) registerWorkflow(workflowName string, fn any) { + q.mu.Lock() + defer q.mu.Unlock() q.workflows[workflowName] = fn } // RegisterMockFallbacks registers fake activities and workflows for queue in the context of a unit test. This is necessary so that the test environment knows their types and they can be mocked. // Any unmocked activities or workflows trigger a panic and fail the test with a descriptive error message. func (q *Queue) RegisterMockFallbacks(r registry) { + q.mu.Lock() + defer q.mu.Unlock() for k, fn := range q.activities { r.RegisterActivityWithOptions(fn, activity.RegisterOptions{Name: k}) r.SetActivityTaskQueue(q.name, k) diff --git a/queue_test.go b/queue_test.go new file mode 100644 index 0000000..ee6e2ce --- /dev/null +++ b/queue_test.go @@ -0,0 +1,70 @@ +package tempts_test + +import ( + "fmt" + "sync" + "testing" + + "github.com/vikstrous/tempts" + "go.temporal.io/sdk/testsuite" +) + +func TestNewQueue(t *testing.T) { + q := tempts.NewQueue("test-queue") + if q == nil { + t.Fatal("expected non-nil queue") + } +} + +func TestNewQueue_EmptyName(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatal("expected panic for empty name") + } + }() + tempts.NewQueue("") +} + +func TestNewQueue_WhitespaceOnlyName(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatal("expected panic for whitespace-only name") + } + }() + tempts.NewQueue(" ") +} + +func TestQueue_ConcurrentRegistration(t *testing.T) { + q := tempts.NewQueue("concurrent-queue") + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(2) + actName := fmt.Sprintf("act-%d", i) + wfName := fmt.Sprintf("wf-%d", i) + go func() { + defer wg.Done() + tempts.NewActivity[struct{ N int }, struct{ R string }](q, actName) + }() + go func() { + defer wg.Done() + tempts.NewWorkflow[struct{ N int }, struct{ R string }](q, wfName) + }() + } + wg.Wait() +} + +func TestRegisterMockFallbacks(t *testing.T) { + q := tempts.NewQueue("mock-fallback-queue") + type P struct{ V string } + type R struct{ V string } + tempts.NewActivity[P, R](q, "test-activity") + tempts.NewWorkflow[P, R](q, "test-workflow") + + ts := testsuite.WorkflowTestSuite{} + ts.SetDisableRegistrationAliasing(true) + env := ts.NewTestWorkflowEnvironment() + t.Cleanup(func() { env.AssertExpectations(t) }) + + // Should not panic — mock fallbacks registered successfully + q.RegisterMockFallbacks(env) +} diff --git a/replaytest.go b/replaytest.go index fdb020b..fb12f9b 100644 --- a/replaytest.go +++ b/replaytest.go @@ -41,7 +41,6 @@ func GetWorkflowHistoriesBundle(ctx context.Context, client *Client, w WorkflowD hists := []*history.History{} for _, e := range allExecutions { var hist history.History - fmt.Println(e.Execution) iter := client.Client.GetWorkflowHistory(ctx, e.Execution.WorkflowId, e.Execution.RunId, false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) for iter.HasNext() { event, err := iter.Next() @@ -63,7 +62,7 @@ func GetWorkflowHistoriesBundle(ctx context.Context, client *Client, w WorkflowD } hBytes, err := proto.Marshal(h) if err != nil { - return nil, fmt.Errorf("failed to marshal history: %s", err) + return nil, fmt.Errorf("failed to marshal history: %w", err) } historiesData.Histories = append(historiesData.Histories, historyWithMetadata{ WorkflowID: allExecutions[i].Execution.WorkflowId, diff --git a/replaytest_test.go b/replaytest_test.go new file mode 100644 index 0000000..5b52d9a --- /dev/null +++ b/replaytest_test.go @@ -0,0 +1,47 @@ +package tempts_test + +import ( + "encoding/json" + "strings" + "testing" + + "github.com/vikstrous/tempts" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" +) + +func TestReplayWorkflow_InvalidJSON(t *testing.T) { + err := tempts.ReplayWorkflow( + []byte("invalid json"), + func(workflow.Context, struct{}) (struct{}, error) { return struct{}{}, nil }, + worker.WorkflowReplayerOptions{}, + ) + if err == nil { + t.Fatal("expected error for invalid JSON") + } +} + +func TestReplayWorkflow_EmptyHistories(t *testing.T) { + data, err := json.Marshal(struct { + Histories []any + WorkflowName string + }{ + Histories: []any{}, + WorkflowName: "test", + }) + if err != nil { + t.Fatal(err) + } + + err = tempts.ReplayWorkflow( + data, + func(workflow.Context, struct{}) (struct{}, error) { return struct{}{}, nil }, + worker.WorkflowReplayerOptions{}, + ) + if err == nil { + t.Fatal("expected error for empty histories") + } + if !strings.Contains(err.Error(), "no histories available") { + t.Fatalf("expected 'no histories available', got: %v", err) + } +} diff --git a/signal.go b/signal.go index f37f5a8..d79c2e3 100644 --- a/signal.go +++ b/signal.go @@ -23,6 +23,7 @@ func NewWorkflowSignal[SignalParam any, WorkflowParam, WorkflowReturn any]( w *Workflow[WorkflowParam, WorkflowReturn], signalName string, ) *WorkflowSignal[WorkflowParam, WorkflowReturn, SignalParam] { + validateName(signalName) panicIfNotStruct[SignalParam]("NewWorkflowSignal") return &WorkflowSignal[WorkflowParam, WorkflowReturn, SignalParam]{ workflow: w, diff --git a/signal_test.go b/signal_test.go new file mode 100644 index 0000000..02355d2 --- /dev/null +++ b/signal_test.go @@ -0,0 +1,155 @@ +package tempts_test + +import ( + "testing" + + "github.com/vikstrous/tempts" + "go.temporal.io/sdk/testsuite" + "go.temporal.io/sdk/workflow" +) + +func TestNewWorkflowSignal(t *testing.T) { + q := tempts.NewQueue("sig-new-q") + type WP struct{ V string } + type WR struct{ V string } + type SP struct{ S string } + + wf := tempts.NewWorkflow[WP, WR](q, "sig-wf") + sig := tempts.NewWorkflowSignal[SP](&wf, "test-signal") + if sig.Name() != "test-signal" { + t.Fatalf("expected name 'test-signal', got %q", sig.Name()) + } +} + +func TestNewWorkflowSignal_EmptyName(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatal("expected panic for empty signal name") + } + }() + q := tempts.NewQueue("sig-empty-q") + type WP struct{ V string } + type WR struct{ V string } + type SP struct{ S string } + + wf := tempts.NewWorkflow[WP, WR](q, "sig-empty-wf") + tempts.NewWorkflowSignal[SP](&wf, "") +} + +func TestWorkflowSignal_Receive(t *testing.T) { + q := tempts.NewQueue("sig-recv-q") + type WP struct{} + type WR struct{ V string } + type SP struct{ S string } + + wf := tempts.NewWorkflow[WP, WR](q, "sig-recv-wf") + sig := tempts.NewWorkflowSignal[SP](&wf, "recv-signal") + + wfImpl := wf.WithImplementation(func(ctx workflow.Context, _ WP) (WR, error) { + s := sig.Receive(ctx) + return WR{V: s.S}, nil + }) + + wrk, err := tempts.NewWorker(q, []tempts.Registerable{wfImpl}) + if err != nil { + t.Fatal(err) + } + + ts := testsuite.WorkflowTestSuite{} + ts.SetDisableRegistrationAliasing(true) + env := ts.NewTestWorkflowEnvironment() + t.Cleanup(func() { env.AssertExpectations(t) }) + wrk.Register(env) + + env.RegisterDelayedCallback(func() { + env.SignalWorkflow(sig.Name(), SP{S: "hello-signal"}) + }, 0) + + result, err := wfImpl.ExecuteInTest(env, WP{}) + if err != nil { + t.Fatal(err) + } + if result.V != "hello-signal" { + t.Fatalf("expected 'hello-signal', got %q", result.V) + } +} + +func TestWorkflowSignal_TryReceive(t *testing.T) { + q := tempts.NewQueue("sig-try-q") + type WP struct{} + type WR struct{ V string } + type SP struct{ S string } + + wf := tempts.NewWorkflow[WP, WR](q, "sig-try-wf") + sig := tempts.NewWorkflowSignal[SP](&wf, "try-signal") + + wfImpl := wf.WithImplementation(func(ctx workflow.Context, _ WP) (WR, error) { + _, ok := sig.TryReceive(ctx) + if ok { + return WR{V: "unexpected"}, nil + } + return WR{V: "none"}, nil + }) + + wrk, err := tempts.NewWorker(q, []tempts.Registerable{wfImpl}) + if err != nil { + t.Fatal(err) + } + + ts := testsuite.WorkflowTestSuite{} + ts.SetDisableRegistrationAliasing(true) + env := ts.NewTestWorkflowEnvironment() + t.Cleanup(func() { env.AssertExpectations(t) }) + wrk.Register(env) + + result, err := wfImpl.ExecuteInTest(env, WP{}) + if err != nil { + t.Fatal(err) + } + if result.V != "none" { + t.Fatalf("expected 'none', got %q", result.V) + } +} + +func TestWorkflowSignal_AddToSelector(t *testing.T) { + q := tempts.NewQueue("sig-sel-q") + type WP struct{} + type WR struct{ V string } + type SP struct{ S string } + + wf := tempts.NewWorkflow[WP, WR](q, "sig-sel-wf") + sig := tempts.NewWorkflowSignal[SP](&wf, "sel-signal") + + wfImpl := wf.WithImplementation(func(ctx workflow.Context, _ WP) (WR, error) { + selector := workflow.NewSelector(ctx) + var received string + sig.AddToSelector(ctx, selector, func(p SP) { + received = p.S + }) + selector.Select(ctx) + return WR{V: received}, nil + }) + + wrk, err := tempts.NewWorker(q, []tempts.Registerable{wfImpl}) + if err != nil { + t.Fatal(err) + } + + ts := testsuite.WorkflowTestSuite{} + ts.SetDisableRegistrationAliasing(true) + env := ts.NewTestWorkflowEnvironment() + t.Cleanup(func() { env.AssertExpectations(t) }) + wrk.Register(env) + + env.RegisterDelayedCallback(func() { + env.SignalWorkflow(sig.Name(), SP{S: "selector-works"}) + }, 0) + + result, err := wfImpl.ExecuteInTest(env, WP{}) + if err != nil { + t.Fatal(err) + } + if result.V != "selector-works" { + t.Fatalf("expected 'selector-works', got %q", result.V) + } +} diff --git a/worker.go b/worker.go index 847f2f2..441a6c5 100644 --- a/worker.go +++ b/worker.go @@ -77,16 +77,19 @@ func (w *Worker) Register(wrk worker.Registry) { } // Run starts the worker. To stop it, cancel the context. This function returns when the worker completes. -// Make sure to always cancel the context eventually, or a goroutine will be leaked. func (w *Worker) Run(ctx context.Context, client *Client, options worker.Options) error { options.DisableRegistrationAliasing = true wrk := worker.New(client.Client, w.queue.name, options) w.Register(wrk) - + done := make(chan struct{}) go func() { - // There's no way to pass the channel from ctx.Done directly into Run because it's of the wrong type. - <-ctx.Done() - wrk.Stop() + select { + case <-ctx.Done(): + wrk.Stop() + case <-done: + } }() - return wrk.Run(nil) + err := wrk.Run(nil) + close(done) + return err } diff --git a/worker_test.go b/worker_test.go new file mode 100644 index 0000000..4806b13 --- /dev/null +++ b/worker_test.go @@ -0,0 +1,172 @@ +package tempts_test + +import ( + "context" + "strings" + "testing" + + "github.com/vikstrous/tempts" + "go.temporal.io/sdk/workflow" +) + +func TestNewWorker_Valid(t *testing.T) { + q := tempts.NewQueue("worker-valid-q") + type P struct{ V string } + type R struct{ V string } + + act := tempts.NewActivity[P, R](q, "worker-act") + wf := tempts.NewWorkflow[P, R](q, "worker-wf") + + wrk, err := tempts.NewWorker(q, []tempts.Registerable{ + act.WithImplementation(func(_ context.Context, p P) (R, error) { return R{V: p.V}, nil }), + wf.WithImplementation(func(_ workflow.Context, p P) (R, error) { return R{V: p.V}, nil }), + }) + if err != nil { + t.Fatal(err) + } + if wrk == nil { + t.Fatal("expected non-nil worker") + } +} + +func TestNewWorker_MissingActivityImpl(t *testing.T) { + q := tempts.NewQueue("worker-missing-act-q") + type P struct{ V string } + type R struct{ V string } + + tempts.NewActivity[P, R](q, "missing-act") + wf := tempts.NewWorkflow[P, R](q, "present-wf") + + _, err := tempts.NewWorker(q, []tempts.Registerable{ + wf.WithImplementation(func(_ workflow.Context, p P) (R, error) { return R{}, nil }), + }) + if err == nil { + t.Fatal("expected error for missing activity implementation") + } + if !strings.Contains(err.Error(), "missing-act") { + t.Fatalf("expected error to mention activity name, got: %v", err) + } +} + +func TestNewWorker_MissingWorkflowImpl(t *testing.T) { + q := tempts.NewQueue("worker-missing-wf-q") + type P struct{ V string } + type R struct{ V string } + + act := tempts.NewActivity[P, R](q, "present-act") + tempts.NewWorkflow[P, R](q, "missing-wf") + + _, err := tempts.NewWorker(q, []tempts.Registerable{ + act.WithImplementation(func(_ context.Context, p P) (R, error) { return R{}, nil }), + }) + if err == nil { + t.Fatal("expected error for missing workflow implementation") + } + if !strings.Contains(err.Error(), "missing-wf") { + t.Fatalf("expected error to mention workflow name, got: %v", err) + } +} + +func TestNewWorker_ExtraActivity(t *testing.T) { + q1 := tempts.NewQueue("worker-extra-act-q") + q2 := tempts.NewQueue("worker-extra-act-q") // same name, different queue object + type P struct{ V string } + type R struct{ V string } + + act1 := tempts.NewActivity[P, R](q1, "act1") + act2 := tempts.NewActivity[P, R](q2, "act2") // registered on q2, not q1 + wf := tempts.NewWorkflow[P, R](q1, "wf1") + + _, err := tempts.NewWorker(q1, []tempts.Registerable{ + act1.WithImplementation(func(_ context.Context, p P) (R, error) { return R{}, nil }), + act2.WithImplementation(func(_ context.Context, p P) (R, error) { return R{}, nil }), + wf.WithImplementation(func(_ workflow.Context, p P) (R, error) { return R{}, nil }), + }) + if err == nil { + t.Fatal("expected error for extra activity") + } + if !strings.Contains(err.Error(), "act2") { + t.Fatalf("expected error to mention extra activity, got: %v", err) + } +} + +func TestNewWorker_ExtraWorkflow(t *testing.T) { + q1 := tempts.NewQueue("worker-extra-wf-q") + q2 := tempts.NewQueue("worker-extra-wf-q") // same name, different queue object + type P struct{ V string } + type R struct{ V string } + + wf1 := tempts.NewWorkflow[P, R](q1, "wf1") + wf2 := tempts.NewWorkflow[P, R](q2, "wf2") // registered on q2, not q1 + + _, err := tempts.NewWorker(q1, []tempts.Registerable{ + wf1.WithImplementation(func(_ workflow.Context, p P) (R, error) { return R{}, nil }), + wf2.WithImplementation(func(_ workflow.Context, p P) (R, error) { return R{}, nil }), + }) + if err == nil { + t.Fatal("expected error for extra workflow") + } + if !strings.Contains(err.Error(), "wf2") { + t.Fatalf("expected error to mention extra workflow, got: %v", err) + } +} + +func TestNewWorker_DuplicateActivity(t *testing.T) { + q := tempts.NewQueue("worker-dup-act-q") + type P struct{ V string } + type R struct{ V string } + + act := tempts.NewActivity[P, R](q, "dup-act") + wf := tempts.NewWorkflow[P, R](q, "dup-wf") + + _, err := tempts.NewWorker(q, []tempts.Registerable{ + act.WithImplementation(func(_ context.Context, p P) (R, error) { return R{}, nil }), + act.WithImplementation(func(_ context.Context, p P) (R, error) { return R{}, nil }), + wf.WithImplementation(func(_ workflow.Context, p P) (R, error) { return R{}, nil }), + }) + if err == nil { + t.Fatal("expected error for duplicate activity") + } + if !strings.Contains(err.Error(), "duplicate") { + t.Fatalf("expected 'duplicate' in error, got: %v", err) + } +} + +func TestNewWorker_DuplicateWorkflow(t *testing.T) { + q := tempts.NewQueue("worker-dup-wf-q") + type P struct{ V string } + type R struct{ V string } + + act := tempts.NewActivity[P, R](q, "dup-wf-act") + wf := tempts.NewWorkflow[P, R](q, "dup-wf") + + _, err := tempts.NewWorker(q, []tempts.Registerable{ + act.WithImplementation(func(_ context.Context, p P) (R, error) { return R{}, nil }), + wf.WithImplementation(func(_ workflow.Context, p P) (R, error) { return R{}, nil }), + wf.WithImplementation(func(_ workflow.Context, p P) (R, error) { return R{}, nil }), + }) + if err == nil { + t.Fatal("expected error for duplicate workflow") + } + if !strings.Contains(err.Error(), "duplicate") { + t.Fatalf("expected 'duplicate' in error, got: %v", err) + } +} + +func TestNewWorker_WrongQueue(t *testing.T) { + q1 := tempts.NewQueue("queue-1") + q2 := tempts.NewQueue("queue-2") + type P struct{ V string } + type R struct{ V string } + + act := tempts.NewActivity[P, R](q2, "wrong-q-act") // on q2 + wf := tempts.NewWorkflow[P, R](q1, "wrong-q-wf") + + _, err := tempts.NewWorker(q1, []tempts.Registerable{ + act.WithImplementation(func(_ context.Context, p P) (R, error) { return R{}, nil }), + wf.WithImplementation(func(_ workflow.Context, p P) (R, error) { return R{}, nil }), + }) + if err == nil { + t.Fatal("expected error for wrong queue") + } +} diff --git a/workflow.go b/workflow.go index 0664420..b968f3e 100644 --- a/workflow.go +++ b/workflow.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "reflect" - "time" "go.temporal.io/api/serviceerror" "go.temporal.io/sdk/client" @@ -16,7 +15,7 @@ import ( // WorkflowWithImpl is a temporary struct that implements Registerable. It's meant to be passed into `tempts.NewWorker`. type WorkflowWithImpl[Param any, Return any] struct { workflowName string - queue Queue + queue *Queue fn any positional bool } @@ -36,7 +35,7 @@ func (w WorkflowWithImpl[Param, Return]) validate(q *Queue, v *validationState) } _, ok := v.workflowsValidated[w.workflowName] if ok { - return fmt.Errorf("duplicate activtity name %s for queue %s", w.workflowName, q.name) + return fmt.Errorf("duplicate workflow name %s for queue %s", w.workflowName, q.name) } v.workflowsValidated[w.workflowName] = struct{}{} return nil @@ -79,9 +78,11 @@ func NewWorkflow[ Return any, ](queue *Queue, name string, ) Workflow[Param, Return] { + validateName(name) panicIfNotStruct[Param]("NewWorkflow") - queue.registerWorkflow(name, func(workflow.Context, Param) (Return, error) { - panic(fmt.Sprintf("Workflow %s execution not mocked", name)) + queue.registerWorkflow(name, func(_ workflow.Context, _ Param) (Return, error) { + var zero Return + return zero, fmt.Errorf("Workflow %s execution not mocked", name) }) return Workflow[Param, Return]{ name: name, @@ -93,6 +94,7 @@ func NewWorkflow[ // Instead of passing the Param struct directly to the workflow, it passes each field of the struct // as a separate positional argument in the order they are defined. func NewWorkflowPositional[Param any, Return any](queue *Queue, name string) Workflow[Param, Return] { + validateName(name) panicIfNotStruct[Param]("NewWorkflowPositional") // Get the type information for the Param struct @@ -115,9 +117,11 @@ func NewWorkflowPositional[Param any, Return any](queue *Queue, name string) Wor errorType := reflect.TypeOf((*error)(nil)).Elem() fnType := reflect.FuncOf(paramTypes, []reflect.Type{returnType, errorType}, false) - // Create a function that panics with the message "Function execution not mocked" + // Create a function that returns an error instead of panicking mockFn := reflect.MakeFunc(fnType, func(args []reflect.Value) []reflect.Value { - panic(fmt.Sprintf("Workflow %s execution not mocked", name)) + zero := reflect.New(returnType).Elem() + errVal := reflect.ValueOf(fmt.Errorf("Workflow %s execution not mocked", name)) + return []reflect.Value{zero, errVal} }) // Register the mock function @@ -143,13 +147,7 @@ func (w Workflow[Param, Return]) getQueue() *Queue { // WithImplementation should be called to create the parameters for NewWorker(). It declares which function implements the workflow. func (w Workflow[Param, Return]) WithImplementation(fn func(workflow.Context, Param) (Return, error)) *WorkflowWithImpl[Param, Return] { if !w.positional { - return &WorkflowWithImpl[Param, Return]{workflowName: w.name, queue: *w.queue, fn: func(ctx workflow.Context, param Param) (Return, error) { - // Set a default timeout so if a workflow doesn't need to customize it, it doesn't have to call this function. - ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: time.Second * 10, - }) - return fn(ctx, param) - }} + return &WorkflowWithImpl[Param, Return]{workflowName: w.name, queue: w.queue, fn: fn} } // For positional workflows, create a wrapper function that converts positional arguments to a struct @@ -186,11 +184,6 @@ func (w Workflow[Param, Return]) WithImplementation(fn func(workflow.Context, Pa } } - // Set default timeout - ctx = reflect.ValueOf(workflow.WithActivityOptions(ctx.Interface().(workflow.Context), workflow.ActivityOptions{ - StartToCloseTimeout: time.Second * 10, - })) - // Call the implementation function with the context and constructed struct results := reflect.ValueOf(fn).Call([]reflect.Value{ctx, paramVal}) return results @@ -199,7 +192,7 @@ func (w Workflow[Param, Return]) WithImplementation(fn func(workflow.Context, Pa return &WorkflowWithImpl[Param, Return]{ workflowName: w.name, - queue: *w.queue, + queue: w.queue, fn: wrapper.Interface(), positional: true, } diff --git a/workflow_test.go b/workflow_test.go new file mode 100644 index 0000000..eda0b60 --- /dev/null +++ b/workflow_test.go @@ -0,0 +1,218 @@ +package tempts_test + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/vikstrous/tempts" + "go.temporal.io/sdk/testsuite" + "go.temporal.io/sdk/workflow" +) + +func TestNewWorkflow(t *testing.T) { + q := tempts.NewQueue("wf-new-q") + type P struct{ V string } + type R struct{ V string } + + wf := tempts.NewWorkflow[P, R](q, "test-wf") + if wf.Name() != "test-wf" { + t.Fatalf("expected name 'test-wf', got %q", wf.Name()) + } +} + +func TestNewWorkflow_EmptyName(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatal("expected panic for empty workflow name") + } + }() + q := tempts.NewQueue("wf-empty-q") + type P struct{ V string } + type R struct{ V string } + tempts.NewWorkflow[P, R](q, "") +} + +func TestNewWorkflow_NonStructParam(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatal("expected panic for non-struct param") + } + }() + q := tempts.NewQueue("wf-nonstruct-q") + tempts.NewWorkflow[string, string](q, "non-struct-wf") +} + +func TestNewWorkflowPositional(t *testing.T) { + q := tempts.NewQueue("wf-pos-q") + type P struct { + First string + Last string + } + type R struct{ Full string } + + wf := tempts.NewWorkflowPositional[P, R](q, "pos-wf") + if wf.Name() != "pos-wf" { + t.Fatalf("expected name 'pos-wf', got %q", wf.Name()) + } +} + +func TestWorkflow_WithImplementation_NoDefaultTimeout(t *testing.T) { + q := tempts.NewQueue("wf-no-timeout-q") + type AP struct{ Name string } + type AR struct{ Name string } + type WP struct{ Name string } + type WR struct{ Name string } + + act := tempts.NewActivity[AP, AR](q, "no-timeout-act") + wf := tempts.NewWorkflow[WP, WR](q, "no-timeout-wf") + + // Workflow that does NOT set activity options (no timeout) + wfImpl := wf.WithImplementation(func(ctx workflow.Context, p WP) (WR, error) { + r, err := act.Run(ctx, AP{Name: p.Name}) + return WR{Name: r.Name}, err + }) + + wrk, err := tempts.NewWorker(q, []tempts.Registerable{ + act.WithImplementation(func(_ context.Context, p AP) (AR, error) { + return AR{Name: p.Name}, nil + }), + wfImpl, + }) + if err != nil { + t.Fatal(err) + } + + ts := testsuite.WorkflowTestSuite{} + ts.SetDisableRegistrationAliasing(true) + env := ts.NewTestWorkflowEnvironment() + t.Cleanup(func() { env.AssertExpectations(t) }) + wrk.Register(env) + + // Without explicit activity timeout, activity execution should fail + _, err = wfImpl.ExecuteInTest(env, WP{Name: "test"}) + if err == nil { + t.Fatal("expected error when no activity timeout is set") + } +} + +func TestWorkflow_ExecuteInTest(t *testing.T) { + q := tempts.NewQueue("wf-exec-q") + type AP struct{ Name string } + type AR struct{ Name string } + type WP struct{ Name string } + type WR struct{ Result string } + + act := tempts.NewActivity[AP, AR](q, "exec-act") + wf := tempts.NewWorkflow[WP, WR](q, "exec-wf") + + wfImpl := wf.WithImplementation(func(ctx workflow.Context, p WP) (WR, error) { + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Second, + }) + r, err := act.Run(ctx, AP{Name: p.Name}) + return WR{Result: r.Name}, err + }) + + wrk, err := tempts.NewWorker(q, []tempts.Registerable{ + act.WithImplementation(func(_ context.Context, p AP) (AR, error) { + return AR{Name: strings.ToUpper(p.Name)}, nil + }), + wfImpl, + }) + if err != nil { + t.Fatal(err) + } + + ts := testsuite.WorkflowTestSuite{} + ts.SetDisableRegistrationAliasing(true) + env := ts.NewTestWorkflowEnvironment() + t.Cleanup(func() { env.AssertExpectations(t) }) + wrk.Register(env) + + result, err := wfImpl.ExecuteInTest(env, WP{Name: "hello"}) + if err != nil { + t.Fatal(err) + } + if result.Result != "HELLO" { + t.Fatalf("expected 'HELLO', got %q", result.Result) + } +} + +func TestWorkflow_RunChild(t *testing.T) { + q := tempts.NewQueue("wf-child-q") + type CP struct{ V string } + type CR struct{ V string } + type PP struct{ V string } + type PR struct{ V string } + + childWf := tempts.NewWorkflow[CP, CR](q, "child-wf") + parentWf := tempts.NewWorkflow[PP, PR](q, "parent-wf") + + parentImpl := parentWf.WithImplementation(func(ctx workflow.Context, p PP) (PR, error) { + result, err := childWf.RunChild(ctx, workflow.ChildWorkflowOptions{}, CP{V: p.V}) + return PR{V: result.V}, err + }) + childImpl := childWf.WithImplementation(func(_ workflow.Context, p CP) (CR, error) { + return CR{V: "child-" + p.V}, nil + }) + + wrk, err := tempts.NewWorker(q, []tempts.Registerable{parentImpl, childImpl}) + if err != nil { + t.Fatal(err) + } + + ts := testsuite.WorkflowTestSuite{} + ts.SetDisableRegistrationAliasing(true) + env := ts.NewTestWorkflowEnvironment() + t.Cleanup(func() { env.AssertExpectations(t) }) + wrk.Register(env) + + result, err := parentImpl.ExecuteInTest(env, PP{V: "test"}) + if err != nil { + t.Fatal(err) + } + if result.V != "child-test" { + t.Fatalf("expected 'child-test', got %q", result.V) + } +} + +func TestWorkflowWithImpl_Name(t *testing.T) { + q := tempts.NewQueue("wf-impl-name-q") + type P struct{ V string } + type R struct{ V string } + + wf := tempts.NewWorkflow[P, R](q, "named-wf") + impl := wf.WithImplementation(func(_ workflow.Context, p P) (R, error) { + return R{V: p.V}, nil + }) + if impl.Name() != "named-wf" { + t.Fatalf("expected 'named-wf', got %q", impl.Name()) + } +} + +func TestWorkflow_MockStubReturnsError(t *testing.T) { + q := tempts.NewQueue("wf-mock-stub-q") + type P struct{ V string } + type R struct{ V string } + + wf := tempts.NewWorkflow[P, R](q, "mock-stub-wf") + + ts := testsuite.WorkflowTestSuite{} + ts.SetDisableRegistrationAliasing(true) + env := ts.NewTestWorkflowEnvironment() + + // Register mock fallbacks (the workflow stub returns an error, not a panic) + q.RegisterMockFallbacks(env) + + // Execute the workflow by name — uses the mock stub + env.ExecuteWorkflow(wf.Name(), P{V: "test"}) + err := env.GetWorkflowError() + if err == nil { + t.Fatal("expected error from unmocked workflow") + } + if !strings.Contains(err.Error(), "not mocked") { + t.Fatalf("expected 'not mocked' in error, got: %v", err) + } +}