diff --git a/example/replay_test.go b/example/replay_test.go index 45fa9fd..cc0e610 100644 --- a/example/replay_test.go +++ b/example/replay_test.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "testing" + "time" "github.com/vikstrous/tempts" "go.temporal.io/sdk/client" @@ -33,7 +34,8 @@ func testReplayability(t *testing.T, workflowDeclaration tempts.WorkflowDeclarat if err != nil { t.Fatal(err) } - historiesData, err = tempts.GetWorkflowHistoriesBundle(ctx, c, workflowDeclaration) + maxAge := 90 * 24 * time.Hour + historiesData, err = tempts.GetWorkflowHistoriesBundle(ctx, c, workflowDeclaration, &maxAge) if err != nil { t.Fatal(err) } diff --git a/replaytest.go b/replaytest.go index fdb020b..f518512 100644 --- a/replaytest.go +++ b/replaytest.go @@ -4,19 +4,22 @@ import ( "context" "encoding/json" "fmt" + "time" "github.com/gogo/protobuf/proto" "go.temporal.io/api/enums/v1" "go.temporal.io/api/filter/v1" "go.temporal.io/api/history/v1" + workflowpb "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" ) // GetWorkflowHistoriesBundle connects to the temporal server and fetches the most recent 10 open and 10 closed executions. +// If maxAge is not nil, it filters out closed executions that were started more than maxAge ago. // It returns a byte seralized piece of data that can be used immediately or in the future to call ReplayWorkflow. -func GetWorkflowHistoriesBundle(ctx context.Context, client *Client, w WorkflowDeclaration) ([]byte, error) { +func GetWorkflowHistoriesBundle(ctx context.Context, client *Client, w WorkflowDeclaration, maxAge *time.Duration) ([]byte, error) { closedExecutions, err := client.Client.WorkflowService().ListClosedWorkflowExecutions(ctx, &workflowservice.ListClosedWorkflowExecutionsRequest{ Namespace: client.namespace, MaximumPageSize: 10, @@ -27,6 +30,19 @@ func GetWorkflowHistoriesBundle(ctx context.Context, client *Client, w WorkflowD if err != nil { return nil, fmt.Errorf("failed to get closed executions: %w", err) } + + // Filter out closed executions that were started more than maxAge ago (if maxAge is specified) + filteredClosedExecutions := closedExecutions.Executions + if maxAge != nil { + cutoffTime := time.Now().Add(-*maxAge) + filteredClosedExecutions = make([]*workflowpb.WorkflowExecutionInfo, 0, len(closedExecutions.Executions)) + for _, exec := range closedExecutions.Executions { + if exec.StartTime != nil && exec.StartTime.After(cutoffTime) { + filteredClosedExecutions = append(filteredClosedExecutions, exec) + } + } + } + openExecutions, err := client.Client.WorkflowService().ListOpenWorkflowExecutions(ctx, &workflowservice.ListOpenWorkflowExecutionsRequest{ Namespace: client.namespace, MaximumPageSize: 10, @@ -37,7 +53,7 @@ func GetWorkflowHistoriesBundle(ctx context.Context, client *Client, w WorkflowD if err != nil { return nil, fmt.Errorf("failed to get open executions: %w", err) } - allExecutions := append(closedExecutions.Executions, openExecutions.Executions...) + allExecutions := append(filteredClosedExecutions, openExecutions.Executions...) hists := []*history.History{} for _, e := range allExecutions { var hist history.History