-
Notifications
You must be signed in to change notification settings - Fork 0
Middleware
Senna uses a middleware pattern for cross-cutting concerns. Middleware wraps job handlers and can perform actions before and after job execution.
Middleware is a function that takes a handler and returns a new handler:
type Handler func(ctx context.Context, job *Job) error
type Middleware func(next Handler) HandlerWhen you register middleware with w.Use(), it wraps all handlers:
w.Use(middleware1)
w.Use(middleware2)
w.Register("job", handler)
// Execution order: middleware1 → middleware2 → handler → middleware2 → middleware1Note
Middleware is executed in registration order going in, and reverse order going out (like an onion). The first middleware registered is the outermost layer.
Catches panics and converts them to errors. Automatically added to all workers.
// Automatically added - no need to register manuallyIf a handler panics, the job will be retried instead of crashing the worker.
Tip
You don't need to add recovery middleware manually—it's automatically included in every worker to prevent crashes from panicking handlers.
Logs job start, completion, and failures:
import "log/slog"
logger := slog.Default()
w.Use(senna.LoggingMiddleware(logger))Output:
INFO Starting job job_id=abc123 job_type=send_email
INFO Completed job job_id=abc123 job_type=send_email duration=1.23s
ERROR Failed job job_id=abc123 job_type=send_email error="connection refused"
Sets a timeout for all jobs. If a job exceeds the timeout, its context is cancelled:
w.Use(senna.TimeoutMiddleware(5 * time.Minute))The handler should respect context cancellation:
w.Register("long_job", func(ctx context.Context, job *senna.Job) error {
for {
select {
case <-ctx.Done():
return ctx.Err() // Will be retried
default:
// Continue processing...
}
}
})Automatic retry with custom backoff:
// Exponential backoff: 1s, 2s, 4s, 8s... (max 1 hour)
backoff := senna.ExponentialBackoff(time.Second, time.Hour)
w.Use(senna.RetryMiddleware(3, backoff))Apply rate limiting to all jobs:
limiter := ratelimit.Bucket(w.Redis(), ratelimit.BucketConfig{
Name: "global",
Limit: 1000,
Interval: time.Minute,
})
// Skip jobs when rate limited
w.Use(senna.RateLimitMiddleware(limiter))Instead of failing rate-limited jobs, reschedule them:
w.Use(senna.RateLimitMiddlewareWithReschedule(limiter))Jobs will be retried after the rate limit window without counting against their retry limit.
func MyMiddleware() senna.Middleware {
return func(next senna.Handler) senna.Handler {
return func(ctx context.Context, job *senna.Job) error {
// Before job execution
fmt.Printf("Starting job %s\n", job.ID)
// Call the next handler
err := next(ctx, job)
// After job execution
if err != nil {
fmt.Printf("Job %s failed: %v\n", job.ID, err)
} else {
fmt.Printf("Job %s completed\n", job.ID)
}
return err
}
}
}
w.Use(MyMiddleware())Track job metrics:
func MetricsMiddleware(metrics *MyMetrics) senna.Middleware {
return func(next senna.Handler) senna.Handler {
return func(ctx context.Context, job *senna.Job) error {
start := time.Now()
metrics.JobStarted(job.Type)
err := next(ctx, job)
duration := time.Since(start)
if err != nil {
metrics.JobFailed(job.Type, duration, err)
} else {
metrics.JobSucceeded(job.Type, duration)
}
return err
}
}
}
w.Use(MetricsMiddleware(myMetrics))Add distributed tracing:
func TracingMiddleware(tracer Tracer) senna.Middleware {
return func(next senna.Handler) senna.Handler {
return func(ctx context.Context, job *senna.Job) error {
span := tracer.StartSpan("job.execute",
WithTag("job.type", job.Type),
WithTag("job.id", job.ID),
)
defer span.Finish()
ctx = tracer.ContextWithSpan(ctx, span)
err := next(ctx, job)
if err != nil {
span.SetTag("error", true)
span.LogError(err)
}
return err
}
}
}
w.Use(TracingMiddleware(tracer))Validate job arguments before execution:
func ValidationMiddleware() senna.Middleware {
return func(next senna.Handler) senna.Handler {
return func(ctx context.Context, job *senna.Job) error {
// Validate common fields
if job.Type == "" {
return errors.New("job type is required")
}
// Validate specific job types
switch job.Type {
case "send_email":
if _, ok := job.Args["to"]; !ok {
return errors.New("'to' is required for send_email")
}
}
return next(ctx, job)
}
}
}
w.Use(ValidationMiddleware())Add values to the context for use in handlers:
type contextKey string
const requestIDKey contextKey = "request_id"
func RequestIDMiddleware() senna.Middleware {
return func(next senna.Handler) senna.Handler {
return func(ctx context.Context, job *senna.Job) error {
// Use job ID as request ID, or generate new one
requestID := job.ID
ctx = context.WithValue(ctx, requestIDKey, requestID)
return next(ctx, job)
}
}
}
// In handler:
w.Register("job", func(ctx context.Context, job *senna.Job) error {
requestID := ctx.Value(requestIDKey).(string)
// Use requestID...
return nil
})Middleware is executed in the order it's registered:
w.Use(LoggingMiddleware(logger)) // 1st - outermost
w.Use(MetricsMiddleware(metrics)) // 2nd
w.Use(TimeoutMiddleware(5*time.Minute)) // 3rd - innermost
// Execution: Logging → Metrics → Timeout → Handler → Timeout → Metrics → Logging- Logging - Log all job activity
- Recovery - (automatic) Catch panics
- Tracing - Start trace spans
- Metrics - Record timing
- Timeout - Enforce time limits
- Rate Limiting - Control throughput
- Validation - Validate inputs
You can also apply middleware to specific handlers using handler options:
// Rate limiter for specific job type
limiter := ratelimit.Bucket(w.Redis(), config)
w.Register("api_call", handler, worker.WithRateLimiter(limiter))
// Timeout for specific job type
w.Register("slow_job", handler, worker.WithJobTimeout(10*time.Minute))package main
import (
"context"
"log/slog"
"time"
"github.com/mgomes/senna"
"github.com/mgomes/senna/ratelimit"
"github.com/mgomes/senna/worker"
)
func main() {
w, _ := worker.New(&worker.Config{
Redis: senna.RedisConfig{Addr: "localhost:6379"},
Namespace: "myapp",
})
// Add middleware in order
logger := slog.Default()
w.Use(senna.LoggingMiddleware(logger))
w.Use(MetricsMiddleware(myMetrics))
w.Use(senna.TimeoutMiddleware(5 * time.Minute))
// Rate limiting for external API calls
apiLimiter := ratelimit.Bucket(w.Redis(), ratelimit.BucketConfig{
Name: "external-api",
Limit: 60,
Interval: time.Minute,
})
w.Use(senna.RateLimitMiddlewareWithReschedule(apiLimiter))
// Register handlers
w.Register("job", handler)
w.Run(context.Background())
}
func MetricsMiddleware(m *Metrics) senna.Middleware {
return func(next senna.Handler) senna.Handler {
return func(ctx context.Context, job *senna.Job) error {
start := time.Now()
err := next(ctx, job)
m.RecordJob(job.Type, time.Since(start), err)
return err
}
}
}- Handle Errors and configure retries
- Use Rate Limiters to control throughput
- Group jobs with Batches