Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tools/prow-job-executor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.25.5

require (
github.com/Azure/ARO-Tools/tools/cmdutils v0.0.0-20260227032723-11f678744bf9
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.21.0
github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/azsecrets v1.4.0
github.com/go-logr/logr v1.4.3
github.com/google/uuid v1.6.0
Expand All @@ -25,7 +26,6 @@ require (
cloud.google.com/go/trace v1.11.5 // indirect
contrib.go.opencensus.io/exporter/ocagent v0.7.1-0.20200907061046-05415f1de66d // indirect
contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.21.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.2.0 // indirect
Expand Down
97 changes: 97 additions & 0 deletions tools/prow-job-executor/internal/retry/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2025 Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package retry provides a small, generic exponential-backoff retry helper shared
// by the prow-job-executor's transient-failure paths (Gangway job submission, job
// status polling, and the Key Vault prow-token lookup).
package retry

import (
"context"
"fmt"

"github.com/go-logr/logr"

"k8s.io/apimachinery/pkg/util/wait"
)

// WithValue invokes fn with exponential backoff and returns its value once it
// succeeds, retrying only errors that isRetryable classifies as transient.
//
// Behavior:
// - fn is called at least once. On success its value is returned immediately.
// - When fn returns an error that isRetryable reports as false (a permanent or
// deterministic failure), WithValue stops immediately and propagates that error
// unchanged, without consuming the remaining backoff budget.
// - When isRetryable reports true, the error is logged at info level (if a logr
// logger is present on ctx) and the call is retried until the backoff budget is
// exhausted, after which the last transient error is wrapped and returned.
// - A cancelled or expired parent context always takes precedence: its error is
// returned as-is rather than masked behind the last transient error.
//
// fn must respect ctx for cancellation. The parent context bounds the total runtime
// regardless of the backoff schedule.
func WithValue[T any](ctx context.Context, backoff wait.Backoff, isRetryable func(error) bool, fn func(ctx context.Context) (T, error)) (T, error) {
logger, err := logr.FromContext(ctx)
if err != nil {
logger = logr.Discard()
}

var result T
var lastErr error
condition := func(ctx context.Context) (bool, error) {
v, err := fn(ctx)
if err != nil {
// A cancelled/expired parent context is terminal: stop immediately,
// without logging a misleading "will retry" or recording it as the last
// transient error. (Some callers' isRetryable does not special-case
// context errors, e.g. GetJobStatus.)
if ctxErr := ctx.Err(); ctxErr != nil {
return false, ctxErr
}

// Permanent/deterministic failures surface immediately instead of
// after a long backoff.
if !isRetryable(err) {
return false, err // Stop retrying and propagate the error as-is.
}

lastErr = err
logger.Info("Operation failed with a transient error, will retry", "error", err.Error())
return false, nil
}

result = v
return true, nil // Success, stop retrying.
}

if err := wait.ExponentialBackoffWithContext(ctx, backoff, condition); err != nil {
// A cancelled/expired parent context takes precedence: report it as-is
// rather than masking it behind the last transient error.
if ctxErr := ctx.Err(); ctxErr != nil {
var zero T
return zero, ctxErr
}
// Retries were exhausted: surface the last transient error for context.
if lastErr != nil {
var zero T
return zero, fmt.Errorf("retry budget exhausted after transient errors: %w", lastErr)
}
// A permanent error returned by the condition propagates unchanged.
var zero T
return zero, err
}

return result, nil
}
170 changes: 170 additions & 0 deletions tools/prow-job-executor/internal/retry/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright 2025 Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package retry

import (
"context"
"errors"
"strings"
"testing"
"time"

"github.com/go-logr/logr"

"k8s.io/apimachinery/pkg/util/wait"
)

// fastBackoff returns a backoff with negligible delays so retry tests run quickly
// while still exercising a deterministic number of attempts. No Cap is set so the
// attempt count is exactly Steps (apimachinery's Cap would also halt retries early).
func fastBackoff(steps int) wait.Backoff {
return wait.Backoff{
Duration: time.Millisecond,
Factor: 2.0,
Jitter: 0.0,
Steps: steps,
}
}

func testContext() context.Context {
return logr.NewContext(context.Background(), logr.Discard())
}

// retryAll treats every error as transient.
func retryAll(error) bool { return true }

func TestWithValueSucceedsFirstAttempt(t *testing.T) {
calls := 0
got, err := WithValue(testContext(), fastBackoff(4), retryAll, func(context.Context) (int, error) {
calls++
return 42, nil
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got != 42 {
t.Fatalf("got %d, want 42", got)
}
if calls != 1 {
t.Fatalf("fn called %d times, want 1", calls)
}
}

func TestWithValueRetriesTransientThenSucceeds(t *testing.T) {
calls := 0
got, err := WithValue(testContext(), fastBackoff(4), retryAll, func(context.Context) (string, error) {
calls++
if calls < 3 {
return "", errors.New("transient")
}
return "ok", nil
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got != "ok" {
t.Fatalf("got %q, want %q", got, "ok")
}
if calls != 3 {
t.Fatalf("fn called %d times, want 3", calls)
}
}

func TestWithValueFailsFastOnNonRetryable(t *testing.T) {
sentinel := errors.New("permanent")
calls := 0
_, err := WithValue(testContext(), fastBackoff(4), func(error) bool { return false }, func(context.Context) (int, error) {
calls++
return 0, sentinel
})
if !errors.Is(err, sentinel) {
t.Fatalf("error = %v, want it to wrap sentinel", err)
}
if calls != 1 {
t.Fatalf("fn called %d times, want 1 (no retries on permanent error)", calls)
}
// A permanent error must propagate unchanged, not behind the "retry budget" wrapper.
if strings.Contains(err.Error(), "retry budget exhausted") {
t.Fatalf("permanent error should propagate as-is, got %q", err.Error())
}
}

func TestWithValueWrapsLastErrorWhenExhausted(t *testing.T) {
sentinel := errors.New("still failing")
calls := 0
_, err := WithValue(testContext(), fastBackoff(3), retryAll, func(context.Context) (int, error) {
calls++
return 0, sentinel
})
if err == nil {
t.Fatal("expected error, got nil")
}
if !errors.Is(err, sentinel) {
t.Fatalf("error = %v, want it to wrap the last transient error", err)
}
if !strings.Contains(err.Error(), "retry budget exhausted") {
t.Fatalf("exhausted error %q should mention the retry budget", err.Error())
}
if calls != 3 {
t.Fatalf("fn called %d times, want 3", calls)
}
}

// TestWithValueContextCanceledBeforeStart verifies a context cancelled before the
// first attempt surfaces as-is and never invokes fn (ExponentialBackoffWithContext
// checks the context before the first condition call).
func TestWithValueContextCanceledBeforeStart(t *testing.T) {
ctx, cancel := context.WithCancel(testContext())
cancel()

calls := 0
_, err := WithValue(ctx, fastBackoff(4), retryAll, func(ctx context.Context) (int, error) {
calls++
return 0, ctx.Err()
})
if !errors.Is(err, context.Canceled) {
t.Fatalf("error = %v, want context.Canceled", err)
}
if strings.Contains(err.Error(), "retry budget exhausted") {
t.Fatalf("context error should surface as-is, got %q", err.Error())
}
if calls != 0 {
t.Fatalf("fn called %d times, want 0 (context checked before first attempt)", calls)
}
}

// TestWithValueContextCanceledDuringCall verifies that when the parent context is
// cancelled while fn is running, the next iteration fails fast on the context error
// without logging a "will retry" or wrapping it as an exhausted-budget error — even
// though fn returned an otherwise-retryable error.
func TestWithValueContextCanceledDuringCall(t *testing.T) {
ctx, cancel := context.WithCancel(testContext())

calls := 0
_, err := WithValue(ctx, fastBackoff(4), retryAll, func(ctx context.Context) (int, error) {
calls++
cancel() // cancel the parent mid-call
return 0, errors.New("transient") // a normally-retryable error
})
if !errors.Is(err, context.Canceled) {
t.Fatalf("error = %v, want context.Canceled", err)
}
if strings.Contains(err.Error(), "retry budget exhausted") {
t.Fatalf("cancelled context should not be wrapped as exhausted budget, got %q", err.Error())
}
if calls != 1 {
t.Fatalf("fn called %d times, want 1 (no retry after cancellation)", calls)
}
}
71 changes: 14 additions & 57 deletions tools/prow-job-executor/prowjob/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
prowjobs "sigs.k8s.io/prow/pkg/apis/prowjobs/v1"
prowgangway "sigs.k8s.io/prow/pkg/gangway"
"sigs.k8s.io/yaml"

"github.com/Azure/ARO-Tools/tools/prow-job-executor/internal/retry"
)

// jobSubmissionResponse represents the minimal JSON response from Gangway API for job submission
Expand Down Expand Up @@ -85,40 +87,9 @@ func NewClient(token, gangwayURL, prowURL string) *Client {
// forcing the entire deployment job to be restarted. Only transient failures
// (HTTP 429, 5xx, and network errors) are retried; everything else fails fast.
func (c *Client) SubmitJob(ctx context.Context, request *prowgangway.CreateJobExecutionRequest) (string, error) {
logger, err := logr.FromContext(ctx)
if err != nil {
return "", err
}

var executionID string
var lastErr error
condition := func(ctx context.Context) (bool, error) {
id, err := c.submitJobOnce(ctx, request)
if err != nil {
lastErr = err
// Only retry known-transient errors. Deterministic failures (marshal,
// request construction, response decode, 4xx other than 429) are not
// retried so they surface immediately instead of after a long backoff.
if !isRetryableError(err) {
return false, err // Stop retrying and propagate the error
}

logger.Info("Job submission failed with a transient error, will retry", "error", err.Error())
return false, nil
}

executionID = id
return true, nil // Success, stop retrying
}

if err := wait.ExponentialBackoffWithContext(ctx, c.submitBackoff, condition); err != nil {
if lastErr != nil {
return "", fmt.Errorf("failed to submit job after retries: %w", lastErr)
}
return "", err
}

return executionID, nil
return retry.WithValue(ctx, c.submitBackoff, isRetryableError, func(ctx context.Context) (string, error) {
return c.submitJobOnce(ctx, request)
})
}

// submitJobOnce performs a single job submission request without retry logic.
Expand Down Expand Up @@ -167,39 +138,25 @@ func (c *Client) submitJobOnce(ctx context.Context, request *prowgangway.CreateJ

// GetJobStatus retrieves the full job information by Prow execution ID with retry logic
func (c *Client) GetJobStatus(ctx context.Context, prowExecutionID string) (*prowjobs.ProwJob, error) {
var result *prowjobs.ProwJob

// Configure exponential backoff with jitter
backoff := wait.Backoff{
Duration: time.Second, // Initial delay
Factor: 2.0, // Exponential factor
Jitter: 0.1, // 10% jitter
Steps: 3, // Maximum retries
Steps: 3, // Maximum attempts
Cap: 10 * time.Second, // Maximum delay cap
Comment thread
raelga marked this conversation as resolved.
}

condition := func(ctx context.Context) (bool, error) {
job, err := c.getJobStatusOnce(ctx, prowExecutionID)
if err != nil {
// Check if this is a non-retryable error (e.g., 403 Forbidden)
if isNonRetryableHTTPError(err) {
return false, err // Stop retrying and propagate the error
}

// For retryable errors continue
return false, nil
}

result = job
return true, nil // Success, stop retrying
}

err := wait.ExponentialBackoffWithContext(ctx, backoff, condition)
if err != nil {
return nil, err
// Everything except a non-retryable HTTP status (e.g. 401/403) is retried: a
// freshly submitted job's status may 404 until it propagates, and transport
// errors are transient.
isRetryable := func(err error) bool {
return !isNonRetryableHTTPError(err)
}

return result, nil
return retry.WithValue(ctx, backoff, isRetryable, func(ctx context.Context) (*prowjobs.ProwJob, error) {
return c.getJobStatusOnce(ctx, prowExecutionID)
})
}

// getJobStatusOnce performs a single job status request without retry logic
Expand Down
Loading
Loading