An Elixir library providing an Off-Broadway producer for resilient WebSocket ingestion with :gun.
It is designed for exchange-style or session-oriented WebSocket feeds where a Broadway pipeline needs:
- demand-aware delivery
- reconnect and backoff handling
- ping/pong and idle-timeout monitoring
- optional post-upgrade bootstrap frames
- optional stateful inbound frame handling
HexDocs includes the API reference and focused guides:
- Getting Started
- Configuration
- Auth Refresh and Handshake Failures
- On-Upgrade Bootstrap
- Frame Handler
- Retry and Liveness
- Telemetry
Add off_broadway_websocket to your list of dependencies in mix.exs:
def deps do
[
{:off_broadway_websocket, "~> 1.2.1"}
]
endThen fetch dependencies:
mix deps.getdefmodule MyApp.Broadway do
use Broadway
alias Broadway.Message
alias Broadway.NoopAcknowledger
def start_link(_args) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {
OffBroadwayWebSocket.Producer,
url: "wss://example.com:443",
path: "/stream/trades",
ws_timeout: 15_000,
telemetry_id: :my_app_ws,
gun_opts: %{
transport: :tls,
protocols: [:http],
tls_opts: [
verify: :verify_peer,
cacertfile: CAStore.file_path(),
verify_fun: {
&:ssl_verify_hostname.verify_fun/3,
[check_hostname: String.to_charlist("example.com")]
}
],
http_opts: %{version: :"HTTP/1.1"},
ws_opts: %{keepalive: 10_000, silence_pings: false}
},
on_upgrade: {MyApp.WebSocket, :subscription_frames, [[]]},
frame_handler: {MyApp.WebSocket, :handle_frame, []},
frame_handler_state: %{subscriptions: %{}}
},
transformer: {__MODULE__, :transform, []},
concurrency: 1
],
processors: [default: [min_demand: 0, max_demand: 100, concurrency: 4]]
)
end
def handle_message(_stage, %Message{data: payload} = message, _context) do
message
|> Map.put(:data, payload)
end
def transform(payload, _opts) do
%Broadway.Message{
data: payload,
acknowledger: NoopAcknowledger.init()
}
end
endUse :on_upgrade when a websocket API requires subscribe or auth frames to be sent
immediately after upgrade and before the connection should be considered ready.
The callback must return one of:
{:ok, []}{:ok, [{:text | :binary, iodata()}, ...]}{:error, reason}
Immediate callback failure or immediate :gun.ws_send/3 failure is treated as a
bootstrap failure and follows the reconnect/backoff path.
Use :frame_handler when raw websocket frames depend on connection-local session
state, for example subscription ids or channel mappings.
The callback receives a normalized inbound payload and the current handler state and must return one of:
{:emit, [payload, ...], new_state}{:skip, new_state}{:error, reason, new_state}
frame_handler_state resets to its initial value on reconnect.
Required startup options:
:url:path
Common optional startup options:
:headers:headers_fn:ws_timeout:await_timeout:telemetry_id:gun_opts:ws_retry_opts:ws_retry_fun:on_upgrade:frame_handler:frame_handler_state
See Configuration for the full option contract and defaults.
Use Auth Refresh and Handshake Failures for rotating websocket auth headers and troubleshooting failed upgrades.
Connection telemetry is emitted under:
[:<telemetry_id>, :connection, :success][:<telemetry_id>, :connection, :failure][:<telemetry_id>, :connection, :disconnected][:<telemetry_id>, :connection, :timeout][:<telemetry_id>, :connection, :status]
See Telemetry for event shapes and usage examples.
OffBroadwayWebSocket owns:
- websocket connection lifecycle
- reconnect/backoff behavior
- idle-timeout and keepalive handling
- optional bootstrap frame dispatch
- optional connection-local inbound frame handling
Application code owns:
- payload decoding and validation
- domain-specific adaptation
- downstream routing and publishing
- session policy beyond the transport boundary
mix testApache License 2.0. See LICENSE.