Skip to content

foomo/goflux

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

45 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Build Status Go Report Card GoDoc

goflux

goflux

Generic, transport-agnostic messaging patterns for Go.

Write business logic against core interfaces. Swap transports without touching handler code.

Architecture

Layer What it provides
Core Interfaces Publisher[T], Subscriber[T], Requester[Req, Resp], Responder[Req, Resp], Message[T], Handler[T]
Transports Channel (in-process), NATS, JetStream, HTTP — each implements the core interfaces
Middleware Chain, AutoAck, RetryAck, InjectMessageID, InjectHeader, ForwardMessageID
Pipeline Operators pipe.New, pipe.NewMap, pipe.NewFlatMap, ToChan, bridge.ToStream, bridge.FromStream, BindPublisher, RetryPublisher
Stream Processing Fan-out, fan-in, round-robin, filtering, dedup, throttling via goflow
Lifecycle Group — coordinated startup, fail-fast shutdown for multiple handlers
Telemetry OpenTelemetry tracing and metrics built into every transport

Supported Patterns

  • Fire & Forget — publish with no delivery guarantee (channels, NATS core)
  • At-Least-Once — ack/nak with auto-ack or manual control (JetStream)
  • Pull Consumer — JetStream pull consumers via Subscriber[T] with middleware composition (JetStream)
  • Request-Reply — typed request/response (NATS, HTTP)
  • Queue Groups — competing consumers (NATS)
  • Stream Processing — bridge to goflow via bridge.ToStream/bridge.FromStream for bounded concurrency, filtering, dedup, fan-out/fan-in, and more
  • Fan-Out / Fan-In — broadcast, merge, round-robin via goflow stream operators

Transport Feature Matrix

Interface Channel NATS JetStream HTTP
Publisher[T] yes yes yes yes
Subscriber[T] yes yes yes yes
Requester[Req, Resp] - yes - yes
Responder[Req, Resp] - yes - yes

Installation

go get github.com/foomo/goflux

Quick Start

package main

import (
  "context"
  "fmt"

  "github.com/foomo/goflux"
  "github.com/foomo/goflux/transport/channel"
)

func main() {
  ctx, cancel := context.WithCancel(context.Background())
  defer cancel()

  bus := channel.NewBus[string]()
  pub := channel.NewPublisher(bus)
  sub, _ := channel.NewSubscriber(bus, 1)

  go sub.Subscribe(ctx, "greetings", func(_ context.Context, msg goflux.Message[string]) error {
    fmt.Println(msg.Subject, msg.Payload)
    cancel()
    return nil
  })

  _ = pub.Publish(ctx, "greetings", "Hello, goflux!")
  <-ctx.Done()
}

Swap to NATS by changing the import and constructor — the handler stays the same. See the Getting Started guide.

Documentation

Full documentation: https://foomo.github.io/goflux/

Contributing

make check   # tidy + generate + lint + test + audit (full CI flow)

See CONTRIBUTING.md for details.

Contributors

License

Distributed under MIT License, see LICENSE for details.

Made with ♥ foomo by bestbytes

About

Generic, transport-agnostic messaging patterns for Go

Topics

Resources

License

Code of conduct

Contributing

Security policy

Stars

Watchers

Forks

Contributors