Skip to content

Middleware

Mauricio Gomes edited this page Jan 3, 2026 · 3 revisions

Senna uses a middleware pattern for cross-cutting concerns. Middleware wraps job handlers and can perform actions before and after job execution.

How Middleware Works

Middleware is a function that takes a handler and returns a new handler:

type Handler func(ctx context.Context, job *Job) error

type Middleware func(next Handler) Handler

When you register middleware with w.Use(), it wraps all handlers:

w.Use(middleware1)
w.Use(middleware2)
w.Register("job", handler)

// Execution order: middleware1 → middleware2 → handler → middleware2 → middleware1

Note

Middleware is executed in registration order going in, and reverse order going out (like an onion). The first middleware registered is the outermost layer.

Built-in Middleware

Recovery Middleware

Catches panics and converts them to errors. Automatically added to all workers.

// Automatically added - no need to register manually

If a handler panics, the job will be retried instead of crashing the worker.

Tip

You don't need to add recovery middleware manually—it's automatically included in every worker to prevent crashes from panicking handlers.

Logging Middleware

Logs job start, completion, and failures:

import "log/slog"

logger := slog.Default()
w.Use(senna.LoggingMiddleware(logger))

Output:

INFO  Starting job    job_id=abc123 job_type=send_email
INFO  Completed job   job_id=abc123 job_type=send_email duration=1.23s
ERROR Failed job      job_id=abc123 job_type=send_email error="connection refused"

Timeout Middleware

Sets a timeout for all jobs. If a job exceeds the timeout, its context is cancelled:

w.Use(senna.TimeoutMiddleware(5 * time.Minute))

The handler should respect context cancellation:

w.Register("long_job", func(ctx context.Context, job *senna.Job) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err() // Will be retried
        default:
            // Continue processing...
        }
    }
})

Retry Middleware

Automatic retry with custom backoff:

// Exponential backoff: 1s, 2s, 4s, 8s... (max 1 hour)
backoff := senna.ExponentialBackoff(time.Second, time.Hour)
w.Use(senna.RetryMiddleware(3, backoff))

Rate Limit Middleware

Apply rate limiting to all jobs:

limiter := ratelimit.Bucket(w.Redis(), ratelimit.BucketConfig{
    Name:     "global",
    Limit:    1000,
    Interval: time.Minute,
})

// Skip jobs when rate limited
w.Use(senna.RateLimitMiddleware(limiter))

Rate Limit with Reschedule

Instead of failing rate-limited jobs, reschedule them:

w.Use(senna.RateLimitMiddlewareWithReschedule(limiter))

Jobs will be retried after the rate limit window without counting against their retry limit.

Custom Middleware

Basic Structure

func MyMiddleware() senna.Middleware {
    return func(next senna.Handler) senna.Handler {
        return func(ctx context.Context, job *senna.Job) error {
            // Before job execution
            fmt.Printf("Starting job %s\n", job.ID)

            // Call the next handler
            err := next(ctx, job)

            // After job execution
            if err != nil {
                fmt.Printf("Job %s failed: %v\n", job.ID, err)
            } else {
                fmt.Printf("Job %s completed\n", job.ID)
            }

            return err
        }
    }
}

w.Use(MyMiddleware())

Metrics Middleware

Track job metrics:

func MetricsMiddleware(metrics *MyMetrics) senna.Middleware {
    return func(next senna.Handler) senna.Handler {
        return func(ctx context.Context, job *senna.Job) error {
            start := time.Now()
            metrics.JobStarted(job.Type)

            err := next(ctx, job)

            duration := time.Since(start)
            if err != nil {
                metrics.JobFailed(job.Type, duration, err)
            } else {
                metrics.JobSucceeded(job.Type, duration)
            }

            return err
        }
    }
}

w.Use(MetricsMiddleware(myMetrics))

Tracing Middleware

Add distributed tracing:

func TracingMiddleware(tracer Tracer) senna.Middleware {
    return func(next senna.Handler) senna.Handler {
        return func(ctx context.Context, job *senna.Job) error {
            span := tracer.StartSpan("job.execute",
                WithTag("job.type", job.Type),
                WithTag("job.id", job.ID),
            )
            defer span.Finish()

            ctx = tracer.ContextWithSpan(ctx, span)

            err := next(ctx, job)
            if err != nil {
                span.SetTag("error", true)
                span.LogError(err)
            }

            return err
        }
    }
}

w.Use(TracingMiddleware(tracer))

Validation Middleware

Validate job arguments before execution:

func ValidationMiddleware() senna.Middleware {
    return func(next senna.Handler) senna.Handler {
        return func(ctx context.Context, job *senna.Job) error {
            // Validate common fields
            if job.Type == "" {
                return errors.New("job type is required")
            }

            // Validate specific job types
            switch job.Type {
            case "send_email":
                if _, ok := job.Args["to"]; !ok {
                    return errors.New("'to' is required for send_email")
                }
            }

            return next(ctx, job)
        }
    }
}

w.Use(ValidationMiddleware())

Context Enrichment Middleware

Add values to the context for use in handlers:

type contextKey string

const requestIDKey contextKey = "request_id"

func RequestIDMiddleware() senna.Middleware {
    return func(next senna.Handler) senna.Handler {
        return func(ctx context.Context, job *senna.Job) error {
            // Use job ID as request ID, or generate new one
            requestID := job.ID

            ctx = context.WithValue(ctx, requestIDKey, requestID)

            return next(ctx, job)
        }
    }
}

// In handler:
w.Register("job", func(ctx context.Context, job *senna.Job) error {
    requestID := ctx.Value(requestIDKey).(string)
    // Use requestID...
    return nil
})

Middleware Order

Middleware is executed in the order it's registered:

w.Use(LoggingMiddleware(logger))  // 1st - outermost
w.Use(MetricsMiddleware(metrics)) // 2nd
w.Use(TimeoutMiddleware(5*time.Minute)) // 3rd - innermost

// Execution: Logging → Metrics → Timeout → Handler → Timeout → Metrics → Logging

Recommended Order

  1. Logging - Log all job activity
  2. Recovery - (automatic) Catch panics
  3. Tracing - Start trace spans
  4. Metrics - Record timing
  5. Timeout - Enforce time limits
  6. Rate Limiting - Control throughput
  7. Validation - Validate inputs

Per-Handler Middleware

You can also apply middleware to specific handlers using handler options:

// Rate limiter for specific job type
limiter := ratelimit.Bucket(w.Redis(), config)
w.Register("api_call", handler, worker.WithRateLimiter(limiter))

// Timeout for specific job type
w.Register("slow_job", handler, worker.WithJobTimeout(10*time.Minute))

Complete Example

package main

import (
    "context"
    "log/slog"
    "time"

    "github.com/mgomes/senna"
    "github.com/mgomes/senna/ratelimit"
    "github.com/mgomes/senna/worker"
)

func main() {
    w, _ := worker.New(&worker.Config{
        Redis:     senna.RedisConfig{Addr: "localhost:6379"},
        Namespace: "myapp",
    })

    // Add middleware in order
    logger := slog.Default()
    w.Use(senna.LoggingMiddleware(logger))
    w.Use(MetricsMiddleware(myMetrics))
    w.Use(senna.TimeoutMiddleware(5 * time.Minute))

    // Rate limiting for external API calls
    apiLimiter := ratelimit.Bucket(w.Redis(), ratelimit.BucketConfig{
        Name:     "external-api",
        Limit:    60,
        Interval: time.Minute,
    })
    w.Use(senna.RateLimitMiddlewareWithReschedule(apiLimiter))

    // Register handlers
    w.Register("job", handler)

    w.Run(context.Background())
}

func MetricsMiddleware(m *Metrics) senna.Middleware {
    return func(next senna.Handler) senna.Handler {
        return func(ctx context.Context, job *senna.Job) error {
            start := time.Now()
            err := next(ctx, job)
            m.RecordJob(job.Type, time.Since(start), err)
            return err
        }
    }
}

Next Steps

Clone this wiki locally