Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion example/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"os"
"testing"
"time"

"github.com/vikstrous/tempts"
"go.temporal.io/sdk/client"
Expand All @@ -33,7 +34,8 @@ func testReplayability(t *testing.T, workflowDeclaration tempts.WorkflowDeclarat
if err != nil {
t.Fatal(err)
}
historiesData, err = tempts.GetWorkflowHistoriesBundle(ctx, c, workflowDeclaration)
maxAge := 90 * 24 * time.Hour
historiesData, err = tempts.GetWorkflowHistoriesBundle(ctx, c, workflowDeclaration, &maxAge)
if err != nil {
t.Fatal(err)
}
Expand Down
20 changes: 18 additions & 2 deletions replaytest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/gogo/protobuf/proto"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/filter/v1"
"go.temporal.io/api/history/v1"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)

// GetWorkflowHistoriesBundle connects to the temporal server and fetches the most recent 10 open and 10 closed executions.
// If maxAge is not nil, it filters out closed executions that were started more than maxAge ago.
// It returns a byte seralized piece of data that can be used immediately or in the future to call ReplayWorkflow.
func GetWorkflowHistoriesBundle(ctx context.Context, client *Client, w WorkflowDeclaration) ([]byte, error) {
func GetWorkflowHistoriesBundle(ctx context.Context, client *Client, w WorkflowDeclaration, maxAge *time.Duration) ([]byte, error) {
closedExecutions, err := client.Client.WorkflowService().ListClosedWorkflowExecutions(ctx, &workflowservice.ListClosedWorkflowExecutionsRequest{
Namespace: client.namespace,
MaximumPageSize: 10,
Expand All @@ -27,6 +30,19 @@ func GetWorkflowHistoriesBundle(ctx context.Context, client *Client, w WorkflowD
if err != nil {
return nil, fmt.Errorf("failed to get closed executions: %w", err)
}

// Filter out closed executions that were started more than maxAge ago (if maxAge is specified)
filteredClosedExecutions := closedExecutions.Executions
if maxAge != nil {
cutoffTime := time.Now().Add(-*maxAge)
filteredClosedExecutions = make([]*workflowpb.WorkflowExecutionInfo, 0, len(closedExecutions.Executions))
for _, exec := range closedExecutions.Executions {
if exec.StartTime != nil && exec.StartTime.After(cutoffTime) {
filteredClosedExecutions = append(filteredClosedExecutions, exec)
}
}
}

openExecutions, err := client.Client.WorkflowService().ListOpenWorkflowExecutions(ctx, &workflowservice.ListOpenWorkflowExecutionsRequest{
Namespace: client.namespace,
MaximumPageSize: 10,
Expand All @@ -37,7 +53,7 @@ func GetWorkflowHistoriesBundle(ctx context.Context, client *Client, w WorkflowD
if err != nil {
return nil, fmt.Errorf("failed to get open executions: %w", err)
}
allExecutions := append(closedExecutions.Executions, openExecutions.Executions...)
allExecutions := append(filteredClosedExecutions, openExecutions.Executions...)
hists := []*history.History{}
for _, e := range allExecutions {
var hist history.History
Expand Down