Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions apps/bender/src/bender.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@
prometheus_cowboy,
woody, % woody's hackney declares prometheus metrics on start
scoper, % should be before any scoper event handler usage
progressor,
epg_migrator, %% should be after epg_connector (progressor)
machinery,
epg_connector,
epg_migrator, %% should be after epg_connector
erl_health,
opentelemetry_api,
opentelemetry_exporter,
Expand Down
37 changes: 7 additions & 30 deletions apps/bender/src/bender.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ stop() ->
-spec start(normal, any()) -> {ok, pid()} | {error, any()}.
start(_StartType, _StartArgs) ->
ok = setup_metrics(),
ok = db_init(application:get_env(bender, backend_mode, machinery)),
ok = db_init(),
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

-spec stop(any()) -> ok.
Expand All @@ -60,7 +60,7 @@ init([]) ->
shutdown_timeout => get_shutdown_timeout(),
event_handler => EventHandlers,
handlers => get_handler_spec(),
additional_routes => get_routes(EventHandlers, genlib_app:env(?MODULE, machinery_backend))
additional_routes => get_routes(EventHandlers)
}
),
Flags = #{strategy => one_for_all, intensity => 6, period => 30},
Expand Down Expand Up @@ -105,32 +105,11 @@ get_handler_spec() ->
}}
].

-spec get_routes(woody:ev_handlers(), machinegun | progressor | hybrid) -> [woody_server_thrift_http_handler:route(_)].
get_routes(_EventHandlers, progressor) ->
-spec get_routes(woody:ev_handlers()) -> [woody_server_thrift_http_handler:route(_)].
get_routes(_EventHandlers) ->
%% Shared routes
Check = enable_health_logging(genlib_app:env(?MODULE, health_check, #{})),
[erl_health_handle:get_route(Check), get_prometheus_route()];
get_routes(EventHandlers, Mode) when Mode == machinegun orelse Mode == hybrid ->
%% Machinegun specific routes
RouteOptsEnv = genlib_app:env(?MODULE, route_opts, #{}),
RouteOpts = RouteOptsEnv#{event_handler => EventHandlers},
Generator = genlib_app:env(bender, generator, #{}),
Sequence = genlib_app:env(bender, sequence, #{}),
Handlers = [
{bender_generator, #{
path => maps:get(path, Generator, <<"/v1/stateproc/bender_generator">>),
backend_config => #{
schema => maps:get(schema, Generator, machinery_mg_schema_generic)
}
}},
{bender_sequence, #{
path => maps:get(path, Sequence, <<"/v1/stateproc/bender_sequence">>),
backend_config => #{
schema => maps:get(schema, Sequence, machinery_mg_schema_generic)
}
}}
],
get_routes(EventHandlers, progressor) ++ machinery_mg_backend:get_routes(Handlers, RouteOpts).
[erl_health_handle:get_route(Check), get_prometheus_route()].

-spec enable_health_logging(erl_health:check()) -> erl_health:check().
enable_health_logging(Check) ->
Expand All @@ -148,7 +127,7 @@ setup_metrics() ->
ok = woody_ranch_prometheus_collector:setup(),
ok = woody_hackney_prometheus_collector:setup().

db_init(postgres) ->
db_init() ->
case code:priv_dir(bender) of
{error, _} ->
error({migration_error, cant_find_priv_dir});
Expand All @@ -159,9 +138,7 @@ db_init(postgres) ->
{ok, Databases} = application:get_env(epg_connector, databases),
DbOpts = maps:get(DbRef, Databases),
db_init(MigrationsDir, DbOpts)
end;
db_init(machinery) ->
ok.
end.

db_init(MigrationsDir, DbOpts) ->
MigrationOpts = application:get_env(bender, migration_opts, ?DEFAULT_MIGRATION_OPTS),
Expand Down
147 changes: 2 additions & 145 deletions apps/bender/src/bender_generator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,60 +6,20 @@
-export([generate/2]).
-export([get_internal_id/2]).

%% Machinery callbacks

-behaviour(machinery).

-export([init/4]).
-export([process_call/4]).
-export([process_timeout/3]).
-export([process_repair/4]).
-export([process_notification/4]).

-type external_id() :: binary().
-type internal_id() :: binary() | {binary(), pos_integer()}.

-type schema() :: bender:schema().
-type user_context() :: msgp_msgpack_thrift:'Value'() | undefined.
-type state() :: #{
internal_id := internal_id(),
user_context := user_context()
}.

-type woody_context() :: woody_context:ctx().

-type args(T) :: machinery:args(T).
-type machine() :: machinery:machine(_, state()).
-type handler_args() :: machinery:handler_args(_).
-type handler_opts() :: machinery:handler_opts(_).
-type result(A) :: machinery:result(none(), A).

-include("bender_internal.hrl").

-define(NS, bender_generator).

%%% API

-spec bind(external_id(), schema(), user_context(), woody_context()) ->
{ok, internal_id(), user_context()} | no_return().
bind(ExternalID, Schema, UserCtx, WoodyCtx) ->
case backend_mode() of
machinery ->
bind_via_machinery(ExternalID, Schema, UserCtx, WoodyCtx);
postgres ->
bind_via_postgres(ExternalID, Schema, UserCtx, WoodyCtx)
end.

bind_via_machinery(ExternalID, Schema, UserCtx, WoodyCtx) ->
InternalID = generate(Schema, WoodyCtx),
case start(ExternalID, InternalID, UserCtx, WoodyCtx) of
ok ->
{ok, InternalID, undefined};
{error, exists} ->
get_internal_id_with_retry(ExternalID, WoodyCtx)
end.

bind_via_postgres(ExternalID, Schema, UserCtx, WoodyCtx) ->
InternalID = generate(Schema, WoodyCtx),
SQL = "INSERT INTO bender_generator_states (id, state) values ($1, $2) ON CONFLICT (id) DO NOTHING RETURNING state",
State = term_to_binary(
Expand All @@ -73,7 +33,7 @@ bind_via_postgres(ExternalID, Schema, UserCtx, WoodyCtx) ->
case Result of
{ok, _, _, []} ->
%% already inserted
get_internal_id_via_postgres(ExternalID, WoodyCtx);
get_internal_id(ExternalID, WoodyCtx);
{ok, _, _, [{_SavedState}]} ->
%% first insert
{ok, InternalID, undefined};
Expand All @@ -83,27 +43,7 @@ bind_via_postgres(ExternalID, Schema, UserCtx, WoodyCtx) ->
end.

-spec get_internal_id(external_id(), woody_context()) -> {ok, internal_id(), user_context()} | no_return().
get_internal_id(ExternalID, WoodyCtx) ->
case backend_mode() of
machinery ->
get_internal_id_via_machinery(ExternalID, WoodyCtx);
postgres ->
get_internal_id_via_postgres(ExternalID, WoodyCtx)
end.

get_internal_id_via_machinery(ExternalID, WoodyCtx) ->
case machinery:get(?NS, ExternalID, get_backend(WoodyCtx)) of
{ok, Machine} ->
#{
internal_id := InternalID,
user_context := UserCtx
} = get_machine_state(Machine),
{ok, InternalID, UserCtx};
{error, notfound} ->
throw({not_found, ExternalID})
end.

get_internal_id_via_postgres(ExternalID, _WoodyCtx) ->
get_internal_id(ExternalID, _WoodyCtx) ->
Pool = application:get_env(bender, generator_pool, default_pool),
SQL = "SELECT state FROM bender_generator_states WHERE id = $1",
Result = epg_pool:query(Pool, SQL, [ExternalID]),
Expand All @@ -121,86 +61,6 @@ get_internal_id_via_postgres(ExternalID, _WoodyCtx) ->
error({internal_error, Error})
end.

-spec get_internal_id_with_retry(external_id(), woody_context()) -> {ok, internal_id(), user_context()} | no_return().
get_internal_id_with_retry(ExternalID, WoodyCtx) ->
with_retry(get_retry_strategy(), fun() ->
try
{done, get_internal_id(ExternalID, WoodyCtx)}
catch
%% NOTE Underlying machinery backend can experience race
%% condition on machine writes. Thus it MAY occur that
%% 'aux_state' is undefined on machine read.
error:({badmatch, undefined}):_Stacktrace ->
retry
end
end).

%%% Machinery callbacks

-spec init(args({internal_id(), user_context()}), machine(), handler_args(), handler_opts()) -> result(state()).
init({InternalID, UserCtx}, _Machine, _HandlerArgs, _HandlerOpts) ->
#{
aux_state => #{
internal_id => InternalID,
user_context => UserCtx
}
}.

-spec process_call(args(_), machine(), handler_args(), handler_opts()) -> no_return().
process_call(_Args, _Machine, _HandlerArgs, _HandlerOpts) ->
not_implemented(call).

-spec process_timeout(machine(), handler_args(), handler_opts()) -> no_return().
process_timeout(_Machine, _HandlerArgs, _HandlerOpts) ->
not_implemented(timeout).

-spec process_repair(args(_), machine(), handler_args(), handler_opts()) -> no_return().
process_repair(_Args, _Machine, _HandlerArgs, _HandlerOpts) ->
not_implemented(repair).

-spec process_notification(args(_), machine(), handler_args(), handler_opts()) -> no_return().
process_notification(_Args, _Machine, _HandlerArgs, _HandlerOpts) ->
not_implemented(notification).

%%% Internal functions

-spec get_retry_strategy() -> genlib_retry:strategy().
get_retry_strategy() ->
Opts = genlib_app:env(bender, generator),
DefaultPolicy = genlib_retry:exponential(5, 2, {jitter, 200, 100}),
genlib_retry:new_strategy(maps:get(retry_policy, Opts, DefaultPolicy)).

-spec with_retry(genlib_retry:strategy(), fun(() -> {done, T} | retry)) -> T | no_return().
with_retry(Strategy, Fun) ->
case Fun() of
{done, Result} ->
Result;
retry ->
case genlib_retry:next_step(Strategy) of
{wait, Timeout, NextStrategy} ->
_ = timer:sleep(Timeout),
with_retry(NextStrategy, Fun);
finish ->
erlang:error(retries_exhausted)
end
end.

-spec start(external_id(), internal_id(), user_context(), woody_context()) -> ok | {error, exists}.
start(ExternalID, InternalID, UserCtx, WoodyCtx) ->
machinery:start(?NS, ExternalID, {InternalID, UserCtx}, get_backend(WoodyCtx)).

-spec get_machine_state(machine()) -> state().
get_machine_state(#{aux_state := State}) ->
State.

-spec get_backend(woody_context()) -> machinery_mg_backend:backend().
get_backend(WoodyCtx) ->
bender_utils:get_backend(generator, WoodyCtx).

-spec not_implemented(any()) -> no_return().
not_implemented(What) ->
erlang:error({not_implemented, What}).

-spec generate(schema(), woody_context()) -> internal_id().
generate(snowflake, _WoodyCtx) ->
<<IntegerID:64>> = snowflake:new(),
Expand All @@ -211,6 +71,3 @@ generate(#constant{internal_id = InternalID}, _WoodyCtx) ->
generate(#sequence{id = SequenceID, minimum = Minimum}, WoodyCtx) ->
{ok, IntegerID} = bender_sequence:get_next(SequenceID, Minimum, WoodyCtx),
{integer_to_binary(IntegerID), IntegerID}.

backend_mode() ->
application:get_env(bender, backend_mode, machinery).
Loading
Loading