From 73480e625866a1fdfec9c3551678e03d0f3b02ab Mon Sep 17 00:00:00 2001 From: adavila0703 Date: Sat, 9 May 2026 23:17:16 -0500 Subject: [PATCH] refactor: extract internal process-event module and split tests - Move processEvent implementation to process-event.ts (not re-exported from package entry) - Add processor-defaults.ts for shared defaults/backoff to avoid import cycles - Add processor-event.test.ts for direct processEvent coverage - Extend processor integration tests (wakeup, poll limits, shutdown timeout) - Keep defaultBackoff public API via processor re-export --- src/process-event.ts | 457 +++++++++++++++++++++++++++++++++ src/processor-defaults.ts | 16 ++ src/processor-event.test.ts | 165 ++++++++++++ src/processor.test.ts | 394 ++++++++++++++++++++++++++++- src/processor.ts | 488 ++++-------------------------------- 5 files changed, 1066 insertions(+), 454 deletions(-) create mode 100644 src/process-event.ts create mode 100644 src/processor-defaults.ts create mode 100644 src/processor-event.test.ts diff --git a/src/process-event.ts b/src/process-event.ts new file mode 100644 index 0000000..0d65f1c --- /dev/null +++ b/src/process-event.ts @@ -0,0 +1,457 @@ +import { getDate } from "./date.js"; +import pLimit from "p-limit"; +import { deepClone } from "./clone.js"; +import { ErrorUnprocessableEventHandler, TxOBError } from "./error.js"; +import { + defaultBackoff, + defaultMaxErrors, + defaultMaxHandlerConcurrency, +} from "./processor-defaults.js"; +import { + endTelemetrySpan, + recordTelemetryCounter, + recordTelemetryDuration, + setTelemetrySpanAttributes, + startTelemetrySpan, + TxOBTelemetryAttributeKey, + TxOBTelemetryEventOutcome, + TxOBTelemetryHandlerOutcome, + TxOBTelemetrySpanName, + type TxOBTelemetryAttributes, + type TxOBTelemetryInstruments, + type TxOBTelemetrySpan, +} from "./telemetry.js"; +import type { + Logger, + TxOBBackoffContext, + TxOBEvent, + TxOBEventDataMap, + TxOBEventHandlerMap, + TxOBProcessorClient, + TxOBTransactionProcessorClient, +} from "./processor.js"; + +type TxOBEventByType< + TxOBEventType extends string, + TEventDataMap extends TxOBEventDataMap, +> = { + [TType in TxOBEventType]: TxOBEvent; +}[TxOBEventType]; + +type TxOBProcessEventsOpts< + TxOBEventType extends string, + TEventDataMap extends TxOBEventDataMap, +> = { + maxErrors: number; + backoff: (context: TxOBBackoffContext) => Date; + signal?: AbortSignal; + logger?: Logger; + maxEventConcurrency?: number; + maxHandlerConcurrency?: number; + maxQueuedEvents?: number; + onEventMaxErrorsReached?: (opts: { + event: Readonly>; + txClient: TxOBTransactionProcessorClient; + signal?: AbortSignal; + }) => Promise; + telemetry?: TxOBTelemetryInstruments; +}; + +export const processEvent = async < + TxOBEventType extends string, + TEventDataMap extends TxOBEventDataMap, +>({ + client, + handlerMap, + unlockedEvent, + opts, +}: { + client: TxOBProcessorClient; + handlerMap: TxOBEventHandlerMap; + unlockedEvent: Pick< + TxOBEventByType, + "id" | "errors" + >; + opts?: Partial>; +}): Promise<{ backoffUntil?: Date }> => { + const { + logger, + maxErrors = defaultMaxErrors, + signal, + backoff = defaultBackoff, + maxHandlerConcurrency = defaultMaxHandlerConcurrency, + onEventMaxErrorsReached, + telemetry, + } = opts ?? {}; + const eventStartedAt = Date.now(); + + if (signal?.aborted) { + return {}; + } + if (unlockedEvent.errors >= maxErrors) { + // Potential issue with client configuration on finding unprocessed events + // Events with maximum allowed errors should not be returned from `getEventsToProcess` + logger?.warn( + { + eventId: unlockedEvent.id, + errors: unlockedEvent.errors, + maxErrors, + }, + "unexpected event with max errors returned from `getEventsToProcess`", + ); + recordTelemetryCounter(telemetry?.eventCounter, telemetry, { + [TxOBTelemetryAttributeKey.EventOutcome]: + TxOBTelemetryEventOutcome.SkippedMaxErrors, + }); + recordTelemetryDuration( + telemetry?.eventDuration, + telemetry, + eventStartedAt, + { + [TxOBTelemetryAttributeKey.EventOutcome]: + TxOBTelemetryEventOutcome.SkippedMaxErrors, + }, + ); + return {}; + } + + let backoffUntil: Date | undefined; + let eventSpan: TxOBTelemetrySpan | undefined; + let eventError: unknown; + let eventOutcome: TxOBTelemetryEventOutcome | undefined; + let eventMetricAttributes: TxOBTelemetryAttributes = {}; + + try { + await client.transaction(async (txClient) => { + const lockedEvent = await txClient.getEventByIdForUpdateSkipLocked( + unlockedEvent.id, + { signal, maxErrors }, + ); + if (!lockedEvent) { + eventOutcome = TxOBTelemetryEventOutcome.SkippedLocked; + logger?.debug( + { + eventId: unlockedEvent.id, + }, + "skipping locked or already processed event", + ); + return; + } + + eventMetricAttributes = { + [TxOBTelemetryAttributeKey.EventType]: lockedEvent.type, + }; + eventSpan = startTelemetrySpan( + telemetry, + TxOBTelemetrySpanName.EventProcess, + { + [TxOBTelemetryAttributeKey.EventId]: lockedEvent.id, + [TxOBTelemetryAttributeKey.EventType]: lockedEvent.type, + [TxOBTelemetryAttributeKey.EventCorrelationId]: + lockedEvent.correlation_id, + [TxOBTelemetryAttributeKey.EventErrors]: lockedEvent.errors, + }, + ); + + // While unlikely, the following two conditions are possible if a concurrent processor finished processing this event or reaching maximum errors between the time + // that this processor found the event with `getEventsToProcess` and called `getEventByIdForUpdateSkipLocked` + // `getEventByIdForUpdateSkipLocked` should handle this in its query implementation and return null to save resources + if (lockedEvent.processed_at) { + eventOutcome = TxOBTelemetryEventOutcome.SkippedProcessed; + logger?.debug( + { + eventId: lockedEvent.id, + correlationId: lockedEvent.correlation_id, + }, + "skipping already processed event", + ); + return; + } + if (lockedEvent.errors >= maxErrors) { + eventOutcome = TxOBTelemetryEventOutcome.SkippedMaxErrors; + logger?.debug( + { + eventId: lockedEvent.id, + correlationId: lockedEvent.correlation_id, + }, + "skipping event with maximum errors", + ); + return; + } + + let errored = false; + + const eventHandlerMap = handlerMap[lockedEvent.type] ?? {}; + + // Typescript should prevent the caller from passing a handler map that doesn't specify all event types but we'll check for it anyway + // This is distinct from an empty handler map for an event type which is valid + // We just want the caller to be explicit about the event types they are interested in handling and not accidentally skip events + if (!(lockedEvent.type in handlerMap)) { + logger?.warn( + { + eventId: lockedEvent.id, + type: lockedEvent.type, + correlationId: lockedEvent.correlation_id, + }, + "missing event handler map", + ); + errored = true; + lockedEvent.errors = maxErrors; + } + + logger?.debug( + { + eventId: lockedEvent.id, + type: lockedEvent.type, + correlationId: lockedEvent.correlation_id, + }, + `processing event`, + ); + + const backoffs: Date[] = []; + const backoffErrors: unknown[] = []; + let latestBackoffError: unknown; + + const handlerLimit = pLimit(maxHandlerConcurrency); + await Promise.allSettled( + Object.entries(eventHandlerMap).map(([handlerName, handler]) => + handlerLimit(async (): Promise => { + const handlerMetricAttributes = { + [TxOBTelemetryAttributeKey.EventType]: lockedEvent.type, + [TxOBTelemetryAttributeKey.HandlerName]: handlerName, + }; + const handlerResults = + lockedEvent.handler_results[handlerName] ?? {}; + if (handlerResults.processed_at) { + logger?.debug( + { + eventId: lockedEvent.id, + type: lockedEvent.type, + handlerName, + correlationId: lockedEvent.correlation_id, + }, + "handler already processed", + ); + recordTelemetryCounter(telemetry?.handlerCounter, telemetry, { + ...handlerMetricAttributes, + [TxOBTelemetryAttributeKey.HandlerOutcome]: + TxOBTelemetryHandlerOutcome.SkippedProcessed, + }); + return; + } + if (handlerResults.unprocessable_at) { + logger?.debug( + { + eventId: lockedEvent.id, + type: lockedEvent.type, + handlerName, + correlationId: lockedEvent.correlation_id, + }, + "handler unprocessable", + ); + recordTelemetryCounter(telemetry?.handlerCounter, telemetry, { + ...handlerMetricAttributes, + [TxOBTelemetryAttributeKey.HandlerOutcome]: + TxOBTelemetryHandlerOutcome.SkippedUnprocessable, + }); + return; + } + + handlerResults.errors ??= []; + const handlerStartedAt = Date.now(); + const handlerSpan = startTelemetrySpan( + telemetry, + TxOBTelemetrySpanName.HandlerProcess, + { + [TxOBTelemetryAttributeKey.EventId]: lockedEvent.id, + [TxOBTelemetryAttributeKey.EventType]: lockedEvent.type, + [TxOBTelemetryAttributeKey.EventCorrelationId]: + lockedEvent.correlation_id, + [TxOBTelemetryAttributeKey.HandlerName]: handlerName, + }, + ); + let handlerOutcome: TxOBTelemetryHandlerOutcome = + TxOBTelemetryHandlerOutcome.Success; + let handlerError: unknown; + + try { + await handler(lockedEvent, { signal }); + handlerResults.processed_at = getDate(); + logger?.debug( + { + eventId: lockedEvent.id, + type: lockedEvent.type, + handlerName, + correlationId: lockedEvent.correlation_id, + }, + "handler succeeded", + ); + } catch (error) { + handlerError = error; + latestBackoffError = error; + backoffErrors.push(error); + logger?.error( + { + eventId: lockedEvent.id, + type: lockedEvent.type, + handlerName, + error, + correlationId: lockedEvent.correlation_id, + }, + "handler errored", + ); + + if (error instanceof ErrorUnprocessableEventHandler) { + handlerOutcome = TxOBTelemetryHandlerOutcome.Unprocessable; + handlerResults.unprocessable_at = getDate(); + handlerResults.errors?.push({ + error: error.message ?? error, + timestamp: getDate(), + }); + errored = true; + } else { + handlerOutcome = TxOBTelemetryHandlerOutcome.Error; + if (error instanceof TxOBError && error.backoffUntil) { + backoffs.push(error.backoffUntil); + } + + errored = true; + handlerResults.errors?.push({ + error: (error as Error)?.message ?? error, + timestamp: getDate(), + }); + } + } finally { + const handlerAttributes = { + ...handlerMetricAttributes, + [TxOBTelemetryAttributeKey.HandlerOutcome]: handlerOutcome, + }; + recordTelemetryCounter( + telemetry?.handlerCounter, + telemetry, + handlerAttributes, + ); + recordTelemetryDuration( + telemetry?.handlerDuration, + telemetry, + handlerStartedAt, + handlerAttributes, + ); + endTelemetrySpan(handlerSpan, handlerError); + } + + lockedEvent.handler_results[handlerName] = handlerResults; + }), + ), + ); + + // Check if all remaining handlers (those that haven't succeeded) are unprocessable + // If so, there's nothing left to retry, so set errors to maxErrors to stop processing + const remainingHandlers = Object.entries(eventHandlerMap).filter( + ([handlerName, _]) => { + const result = lockedEvent.handler_results[handlerName]; + return !result?.processed_at; + }, + ); + + const allRemainingHandlersUnprocessable = + remainingHandlers.length > 0 && + remainingHandlers.every(([handlerName, _]) => { + const result = lockedEvent.handler_results[handlerName]; + return result?.unprocessable_at; + }); + + if (allRemainingHandlersUnprocessable) { + lockedEvent.errors = maxErrors; + errored = true; + } + + if (errored) { + lockedEvent.errors = Math.min(lockedEvent.errors + 1, maxErrors); + const backoffContext: TxOBBackoffContext = + { + attempt: lockedEvent.errors, + error: latestBackoffError, + errors: backoffErrors, + event: deepClone(lockedEvent) as Readonly< + TxOBEventByType + >, + maxErrors, + }; + backoffs.push(backoff(backoffContext)); + const latestBackoff = backoffs.sort( + (a, b) => b.getTime() - a.getTime(), + )[0]; + lockedEvent.backoff_until = latestBackoff; + if (lockedEvent.errors === maxErrors) { + lockedEvent.backoff_until = null; + lockedEvent.processed_at = getDate(); + + if (onEventMaxErrorsReached) { + try { + await onEventMaxErrorsReached({ + event: deepClone(lockedEvent) as Readonly< + TxOBEventByType + >, + txClient, + signal, + }); + } catch (hookError) { + logger?.error( + { + eventId: lockedEvent.id, + error: hookError, + }, + "error in onEventMaxErrorsReached hook", + ); + + throw hookError; + } + } + } + } else { + lockedEvent.backoff_until = null; + lockedEvent.processed_at = getDate(); + } + + eventOutcome = errored + ? lockedEvent.errors === maxErrors + ? TxOBTelemetryEventOutcome.MaxErrors + : TxOBTelemetryEventOutcome.Error + : TxOBTelemetryEventOutcome.Success; + setTelemetrySpanAttributes(eventSpan, { + [TxOBTelemetryAttributeKey.EventOutcome]: eventOutcome, + [TxOBTelemetryAttributeKey.EventErrors]: lockedEvent.errors, + }); + + backoffUntil = lockedEvent.backoff_until ?? undefined; + + await txClient.updateEvent(lockedEvent); + }); + } catch (error) { + eventError = error; + eventOutcome ??= TxOBTelemetryEventOutcome.Error; + throw error; + } finally { + if (eventOutcome) { + const eventAttributes = { + ...eventMetricAttributes, + [TxOBTelemetryAttributeKey.EventOutcome]: eventOutcome, + }; + recordTelemetryCounter( + telemetry?.eventCounter, + telemetry, + eventAttributes, + ); + recordTelemetryDuration( + telemetry?.eventDuration, + telemetry, + eventStartedAt, + eventAttributes, + ); + } + endTelemetrySpan(eventSpan, eventError); + } + + return { backoffUntil }; +}; diff --git a/src/processor-defaults.ts b/src/processor-defaults.ts new file mode 100644 index 0000000..eca1200 --- /dev/null +++ b/src/processor-defaults.ts @@ -0,0 +1,16 @@ +export const defaultPollingIntervalMs = 5_000; +export const defaultMaxErrors = 5; +export const defaultMaxEventConcurrency = 20; +export const defaultMaxHandlerConcurrency = 10; +export const defaultMaxQueuedEvents = 500; +export const defaultWakeupTimeoutMs = 60_000; +export const defaultWakeupThrottleMs = 1_000; + +export const defaultBackoff = ({ attempt }: { attempt: number }): Date => { + const baseDelayMs = 1000; + const maxDelayMs = 1000 * 60; + const backoffMs = Math.min(baseDelayMs * 2 ** attempt, maxDelayMs); + const retryTimestamp = new Date(Date.now() + backoffMs); + + return retryTimestamp; +}; diff --git a/src/processor-event.test.ts b/src/processor-event.test.ts new file mode 100644 index 0000000..f6827d4 --- /dev/null +++ b/src/processor-event.test.ts @@ -0,0 +1,165 @@ +import { describe, it, expect, vi, afterEach } from "vitest"; +import { + TxOBEvent, + TxOBEventDataMap, + TxOBEventHandlerMap, + defaultBackoff, +} from "./processor.js"; +import { processEvent } from "./process-event.js"; + +const mockTxClient = { + getEventByIdForUpdateSkipLocked: vi.fn(), + updateEvent: vi.fn(), + createEvent: vi.fn(), +}; +const mockClient = { + getEventsToProcess: vi.fn(), + transaction: vi.fn(async (fn) => fn(mockTxClient)), +}; + +const now = new Date(); +vi.mock("./date", async (getOg) => { + const mod = await getOg(); + return { + ...(mod as Object), + getDate: vi.fn(() => now), + }; +}); + +afterEach(() => { + vi.clearAllMocks(); +}); + +describe("processEvent", () => { + it("returns before opening a transaction when the signal is already aborted", async () => { + const ac = new AbortController(); + ac.abort(); + + const result = await processEvent({ + client: mockClient, + handlerMap: { evtType1: { h: vi.fn() } }, + unlockedEvent: { id: "1", errors: 0 }, + opts: { signal: ac.signal, maxErrors: 5 }, + }); + + expect(result).toEqual({}); + expect(mockClient.transaction).not.toHaveBeenCalled(); + }); + + it("skips work after lock when the row is already at max errors", async () => { + const logger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; + const locked: TxOBEvent<"evtType1"> = { + id: "1", + type: "evtType1", + timestamp: now, + data: {}, + correlation_id: "c1", + handler_results: {}, + errors: 5, + }; + mockTxClient.getEventByIdForUpdateSkipLocked.mockResolvedValue(locked); + + await processEvent({ + client: mockClient, + handlerMap: { evtType1: { h: vi.fn() } }, + unlockedEvent: { id: "1", errors: 0 }, + opts: { maxErrors: 5, logger }, + }); + + expect(logger.debug).toHaveBeenCalledWith( + expect.objectContaining({ eventId: "1", correlationId: "c1" }), + "skipping event with maximum errors", + ); + expect(mockTxClient.updateEvent).not.toHaveBeenCalled(); + }); + + it("treats an event type absent from the handler map as a configuration error", async () => { + const logger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; + const handlerMap: TxOBEventHandlerMap> = { + evtType1: { h: vi.fn() }, + }; + const locked: TxOBEvent = { + id: "1", + type: "orphanType", + timestamp: now, + data: {}, + correlation_id: "c1", + handler_results: {}, + errors: 0, + }; + mockTxClient.getEventByIdForUpdateSkipLocked.mockResolvedValue(locked); + + await processEvent({ + client: mockClient, + handlerMap, + unlockedEvent: { id: "1", errors: 0 }, + opts: { maxErrors: 5, backoff: defaultBackoff, logger }, + }); + + expect(logger.warn).toHaveBeenCalledWith( + expect.objectContaining({ + eventId: "1", + type: "orphanType", + correlationId: "c1", + }), + "missing event handler map", + ); + expect(mockTxClient.updateEvent).toHaveBeenCalledWith( + expect.objectContaining({ + type: "orphanType", + errors: 5, + }), + ); + }); + + it("does not rerun a handler that is already marked unprocessable", async () => { + const handler = vi.fn(() => Promise.resolve()); + const locked: TxOBEvent<"evtType1"> = { + id: "1", + type: "evtType1", + timestamp: now, + data: {}, + correlation_id: "c1", + handler_results: { + h: { unprocessable_at: now }, + }, + errors: 0, + }; + mockTxClient.getEventByIdForUpdateSkipLocked.mockResolvedValue(locked); + + await processEvent({ + client: mockClient, + handlerMap: { evtType1: { h: handler } }, + unlockedEvent: { id: "1", errors: 0 }, + opts: { maxErrors: 5, backoff: defaultBackoff }, + }); + + expect(handler).not.toHaveBeenCalled(); + expect(mockTxClient.updateEvent).toHaveBeenCalled(); + }); + + it("propagates when transaction rejects before the callback runs", async () => { + mockClient.transaction.mockRejectedValueOnce( + new Error("transaction start failed"), + ); + + await expect( + processEvent({ + client: mockClient, + handlerMap: { evtType1: { h: vi.fn() } }, + unlockedEvent: { id: "1", errors: 0 }, + opts: { maxErrors: 5, backoff: defaultBackoff }, + }), + ).rejects.toThrow("transaction start failed"); + }); +}); diff --git a/src/processor.test.ts b/src/processor.test.ts index f4e3d33..910e551 100644 --- a/src/processor.test.ts +++ b/src/processor.test.ts @@ -16,6 +16,7 @@ import { TxOBTelemetryMetricName, TxOBTelemetrySpanName, } from "./telemetry.js"; +import * as sleepModule from "./sleep.js"; const mockTxClient = { getEventByIdForUpdateSkipLocked: vi.fn(), @@ -70,12 +71,33 @@ describe("createEventProcessor", () => { }); }); +function createTestWakeupEmitter() { + const listeners = new Map void>>(); + return { + on: vi.fn((event: "wakeup", listener: () => void) => { + if (event !== "wakeup") return; + let set = listeners.get(event); + if (!set) { + set = new Set(); + listeners.set(event, set); + } + set.add(listener); + }), + off: vi.fn((event: "wakeup", listener: () => void) => { + listeners.get(event)?.delete(listener); + }), + close: vi.fn(async () => {}), + emitWakeup: () => { + for (const l of listeners.get("wakeup") ?? []) l(); + }, + }; +} + describe("EventProcessor - schema typing", () => { it("infers handler event data from Standard Schema outputs", () => { - const createSchema = >(): StandardSchemaV1< - unknown, - TOutput - > => ({ + const createSchema = < + TOutput extends Record, + >(): StandardSchemaV1 => ({ "~standard": { version: 1, vendor: "test", @@ -1203,9 +1225,16 @@ describe("EventProcessor - lifecycle", () => { expect(aborted).toBe(true); }); it("should respect shutdown timeout and throw", async () => { + const logger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; const handlerMap = { evtType1: { - handler1: vi.fn(() => sleep(100)), + // Never resolves so work would stay pending if the queue did not abort it + handler1: vi.fn(() => new Promise(() => {})), }, }; const evt1: TxOBEvent = { @@ -1218,7 +1247,10 @@ describe("EventProcessor - lifecycle", () => { errors: 0, }; let callCount = 0; - mockClient.getEventsToProcess.mockImplementation(() => { + mockClient.getEventsToProcess.mockImplementation((opts) => { + if (opts?.signal?.aborted) { + return Promise.reject(new DOMException("Aborted", "AbortError")); + } callCount++; return Promise.resolve(callCount === 1 ? [evt1] : []); }); @@ -1231,17 +1263,35 @@ describe("EventProcessor - lifecycle", () => { client: mockClient, handlerMap, pollingIntervalMs: 10, + logger, }); processor.start(); - const start = Date.now(); - try { - await processor.stop({ timeoutMs: 10 }); - } catch (error: any) { - expect(error.message).toBe("shutdown timeout 10ms elapsed"); + for (let i = 0; i < 100; i++) { + if (handlerMap.evtType1.handler1.mock.calls.length > 0) break; + await sleep(5); } - const diff = Date.now() - start; - expect(diff).toBeLessThan(50); + expect(handlerMap.evtType1.handler1).toHaveBeenCalled(); + + // Abort clears in-flight queue work quickly, so onPendingZero() normally settles + // before the shutdown timer. Stub it to model a hung queue and exercise the + // timeout + error logging path. + const queue = ( + processor as unknown as { queue: { onPendingZero: () => Promise } } + ).queue; + vi.spyOn(queue, "onPendingZero").mockImplementation( + () => new Promise(() => {}), + ); + + const start = Date.now(); + await expect(processor.stop({ timeoutMs: 40 })).rejects.toThrow( + "shutdown timeout 40ms elapsed", + ); + expect(Date.now() - start).toBeLessThan(500); + expect(logger.error).toHaveBeenCalledWith( + { error: expect.any(Error) }, + "shutdown error", + ); }); it("should warn when stopping a processor that is not started", async () => { const logger = { @@ -1394,6 +1444,324 @@ describe("EventProcessor - lifecycle", () => { }); }); +describe("EventProcessor - wakeup and polling edge cases", () => { + it("skips a poll when the previous poll is still awaiting the client", async () => { + const logger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; + let releaseGetEvents!: (value: { id: string; errors: number }[]) => void; + const getEventsBlocked = new Promise<{ id: string; errors: number }[]>( + (resolve) => { + releaseGetEvents = resolve; + }, + ); + mockClient.getEventsToProcess.mockImplementationOnce( + () => getEventsBlocked, + ); + + const wakeup = createTestWakeupEmitter(); + const processor = new EventProcessor({ + client: mockClient, + handlerMap: {}, + pollingIntervalMs: 1000, + wakeupEmitter: wakeup, + wakeupThrottleMs: 0, + wakeupTimeoutMs: 60_000, + logger, + }); + processor.start(); + + await vi.waitFor( + () => { + expect(mockClient.getEventsToProcess).toHaveBeenCalled(); + }, + { timeout: 2000 }, + ); + + wakeup.emitWakeup(); + await sleep(15); + + expect(logger.debug).toHaveBeenCalledWith( + "skipping poll - already polling", + ); + + releaseGetEvents([]); + await processor.stop(); + }); + + it("skips polling when the in-memory queue is at capacity", async () => { + const logger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; + const handlerMap = { + evtType1: { + handler1: vi.fn(() => new Promise(() => {})), + }, + }; + const evt1: TxOBEvent = { + type: "evtType1", + id: "1", + timestamp: now, + data: {}, + correlation_id: "abc123", + handler_results: {}, + errors: 0, + }; + mockClient.getEventsToProcess.mockImplementation((opts) => { + if (opts?.signal?.aborted) { + return Promise.reject(new DOMException("Aborted", "AbortError")); + } + return Promise.resolve([evt1]); + }); + mockTxClient.getEventByIdForUpdateSkipLocked.mockImplementation(() => + Promise.resolve(evt1), + ); + mockTxClient.updateEvent.mockImplementation(() => Promise.resolve()); + + const processor = new EventProcessor({ + client: mockClient, + handlerMap, + pollingIntervalMs: 15, + maxQueuedEvents: 1, + logger, + }); + processor.start(); + + await vi.waitFor( + () => { + expect(logger.debug).toHaveBeenCalledWith( + expect.objectContaining({ + queuedCount: 1, + maxQueuedEvents: 1, + }), + "skipping poll - queue at capacity", + ); + }, + { timeout: 3000 }, + ); + + await processor.stop(); + }); + + it("logs and continues when getEventsToProcess rejects", async () => { + const logger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; + const pollError = new Error("database unavailable"); + let calls = 0; + mockClient.getEventsToProcess.mockImplementation((opts) => { + if (opts?.signal?.aborted) { + return Promise.reject(new DOMException("Aborted", "AbortError")); + } + calls++; + if (calls === 1) return Promise.reject(pollError); + return Promise.resolve([]); + }); + + const processor = new EventProcessor({ + client: mockClient, + handlerMap: {}, + pollingIntervalMs: 10, + logger, + }); + processor.start(); + + await vi.waitFor( + () => { + expect(logger.error).toHaveBeenCalledWith( + { error: pollError }, + "error polling for events, will retry", + ); + }, + { timeout: 2000 }, + ); + + expect(calls).toBeGreaterThanOrEqual(2); + await processor.stop(); + }); + + it("runs fallback polling when no wakeup arrives within wakeupTimeoutMs", async () => { + const logger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; + mockClient.getEventsToProcess.mockImplementation((opts) => { + if (opts?.signal?.aborted) { + return Promise.reject(new DOMException("Aborted", "AbortError")); + } + return Promise.resolve([]); + }); + + const processor = new EventProcessor({ + client: mockClient, + handlerMap: {}, + pollingIntervalMs: 15, + wakeupEmitter: createTestWakeupEmitter(), + wakeupThrottleMs: 60_000, + wakeupTimeoutMs: 5, + logger, + }); + processor.start(); + + await vi.waitFor( + () => { + expect(logger.debug).toHaveBeenCalledWith( + expect.objectContaining({ + timeSinceLastWakeup: expect.any(Number), + wakeupTimeoutMs: 5, + }), + "fallback poll triggered - no wakeup signal received", + ); + }, + { timeout: 3000 }, + ); + + await processor.stop(); + }); + + it("skips fallback polling after a recent wakeup", async () => { + const logger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; + mockClient.getEventsToProcess.mockResolvedValue([]); + const wakeup = createTestWakeupEmitter(); + const processor = new EventProcessor({ + client: mockClient, + handlerMap: {}, + pollingIntervalMs: 25, + wakeupEmitter: wakeup, + wakeupThrottleMs: 0, + wakeupTimeoutMs: 60_000, + logger, + }); + processor.start(); + await sleep(5); + wakeup.emitWakeup(); + await sleep(35); + + await vi.waitFor( + () => { + expect(logger.debug).toHaveBeenCalledWith( + expect.objectContaining({ + timeSinceLastWakeup: expect.any(Number), + wakeupTimeoutMs: 60_000, + }), + "skipping fallback poll - wakeup signal received recently", + ); + }, + { timeout: 3000 }, + ); + + await processor.stop(); + }); + + it("unregisters the wakeup listener and cancels throttle timers on stop", async () => { + mockClient.getEventsToProcess.mockResolvedValue([]); + const wakeup = createTestWakeupEmitter(); + const processor = new EventProcessor({ + client: mockClient, + handlerMap: {}, + pollingIntervalMs: 100, + wakeupEmitter: wakeup, + wakeupThrottleMs: 50, + }); + processor.start(); + await sleep(20); + await processor.stop(); + + expect(wakeup.off).toHaveBeenCalled(); + }); + + it("logs when the standard polling loop's sleep step fails unexpectedly", async () => { + const logger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; + mockClient.getEventsToProcess.mockResolvedValue([]); + const sleepSpy = vi + .spyOn(sleepModule, "sleep") + .mockImplementationOnce(() => { + throw new Error("sleep interrupted"); + }); + + const processor = new EventProcessor({ + client: mockClient, + handlerMap: {}, + pollingIntervalMs: 10, + logger, + }); + processor.start(); + + await vi.waitFor( + () => { + expect(logger.error).toHaveBeenCalledWith( + { error: expect.any(Error) }, + "polling loop error", + ); + }, + { timeout: 2000 }, + ); + + sleepSpy.mockRestore(); + await processor.stop(); + }); + + it("logs when the wakeup fallback loop's sleep step fails unexpectedly", async () => { + const logger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; + mockClient.getEventsToProcess.mockResolvedValue([]); + const sleepSpy = vi + .spyOn(sleepModule, "sleep") + .mockImplementationOnce(() => { + throw new Error("sleep interrupted"); + }); + + const processor = new EventProcessor({ + client: mockClient, + handlerMap: {}, + pollingIntervalMs: 10, + wakeupEmitter: createTestWakeupEmitter(), + wakeupThrottleMs: 10, + wakeupTimeoutMs: 60_000, + logger, + }); + processor.start(); + + await vi.waitFor( + () => { + expect(logger.error).toHaveBeenCalledWith( + { error: expect.any(Error) }, + "fallback polling loop error", + ); + }, + { timeout: 2000 }, + ); + + sleepSpy.mockRestore(); + await processor.stop(); + }); +}); + describe("EventProcessor - basic", () => { it("should call into processEvents", async () => { const opts = { diff --git a/src/processor.ts b/src/processor.ts index f66ce18..167592b 100644 --- a/src/processor.ts +++ b/src/processor.ts @@ -1,9 +1,5 @@ -import { getDate } from "./date.js"; import { sleep } from "./sleep.js"; -import pLimit from "p-limit"; -import { deepClone } from "./clone.js"; import PQueue from "p-queue"; -import { ErrorUnprocessableEventHandler, TxOBError } from "./error.js"; import { throttle } from "throttle-debounce"; import type { StandardSchemaV1 } from "@standard-schema/spec"; import { @@ -14,15 +10,24 @@ import { setTelemetrySpanAttributes, startTelemetrySpan, TxOBTelemetryAttributeKey, - TxOBTelemetryEventOutcome, - TxOBTelemetryHandlerOutcome, TxOBTelemetryPollOutcome, TxOBTelemetrySpanName, type TxOBTelemetry, - type TxOBTelemetryAttributes, type TxOBTelemetryInstruments, - type TxOBTelemetrySpan, } from "./telemetry.js"; +import { + defaultBackoff, + defaultMaxErrors, + defaultMaxEventConcurrency, + defaultMaxHandlerConcurrency, + defaultMaxQueuedEvents, + defaultPollingIntervalMs, + defaultWakeupThrottleMs, + defaultWakeupTimeoutMs, +} from "./processor-defaults.js"; +import { processEvent } from "./process-event.js"; + +export { defaultBackoff }; type TxOBEventHandlerResult = { processed_at?: Date; @@ -69,7 +74,9 @@ export type TxOBSchemaOutput = type TxOBEventDataMapFromSchemas< TEventSchemas extends TxOBEventSchemaMap, > = { - [TType in keyof TEventSchemas & string]: TxOBSchemaOutput; + [TType in keyof TEventSchemas & string]: TxOBSchemaOutput< + TEventSchemas[TType] + >; }; type TxOBEventHandlerMapFromSchemas< @@ -79,8 +86,9 @@ type TxOBEventHandlerMapFromSchemas< TxOBEventDataMapFromSchemas >; -type TxOBEventTypeFromSchemas> = - keyof TEventSchemas & string; +type TxOBEventTypeFromSchemas< + TEventSchemas extends TxOBEventSchemaMap, +> = keyof TEventSchemas & string; type TxOBEventHandlerOpts = { signal?: AbortSignal; @@ -96,7 +104,8 @@ export type TxOBEventHandler< export type TxOBEventHandlerMap< TxOBEventType extends string, - TEventDataMap extends TxOBEventDataMap = TxOBEventDataMap, + TEventDataMap extends TxOBEventDataMap = + TxOBEventDataMap, > = { [TType in TxOBEventType]: { [key: string]: TxOBEventHandler; @@ -140,27 +149,36 @@ export interface WakeupEmitter { export interface TxOBProcessorClient< TxOBEventType extends string, - TEventDataMap extends TxOBEventDataMap = TxOBEventDataMap, + TEventDataMap extends TxOBEventDataMap = + TxOBEventDataMap, > { getEventsToProcess( opts: TxOBProcessorClientOpts, - ): Promise, "id" | "errors">[]>; + ): Promise< + Pick, "id" | "errors">[] + >; transaction( fn: ( - txProcessorClient: TxOBTransactionProcessorClient, + txProcessorClient: TxOBTransactionProcessorClient< + TxOBEventType, + TEventDataMap + >, ) => Promise, ): Promise; } export interface TxOBTransactionProcessorClient< TxOBEventType extends string, - TEventDataMap extends TxOBEventDataMap = TxOBEventDataMap, + TEventDataMap extends TxOBEventDataMap = + TxOBEventDataMap, > { getEventByIdForUpdateSkipLocked( eventId: TxOBEventByType["id"], opts: TxOBProcessorClientOpts, ): Promise | null>; - updateEvent(event: TxOBEventByType): Promise; + updateEvent( + event: TxOBEventByType, + ): Promise; createEvent( event: Omit< TxOBEventByType, @@ -171,7 +189,8 @@ export interface TxOBTransactionProcessorClient< export type TxOBBackoffContext< TxOBEventType extends string, - TEventDataMap extends TxOBEventDataMap = TxOBEventDataMap, + TEventDataMap extends TxOBEventDataMap = + TxOBEventDataMap, > = { attempt: number; error?: unknown; @@ -180,24 +199,6 @@ export type TxOBBackoffContext< maxErrors: number; }; -export const defaultBackoff = ({ - attempt, -}: TxOBBackoffContext): Date => { - const baseDelayMs = 1000; - const maxDelayMs = 1000 * 60; - const backoffMs = Math.min(baseDelayMs * 2 ** attempt, maxDelayMs); - const retryTimestamp = new Date(Date.now() + backoffMs); - - return retryTimestamp; -}; -const defaultPollingIntervalMs = 5_000; -const defaultMaxErrors = 5; -const defaultMaxEventConcurrency = 20; -const defaultMaxHandlerConcurrency = 10; -const defaultMaxQueuedEvents = 500; -const defaultWakeupTimeoutMs = 60_000; -const defaultWakeupThrottleMs = 1_000; - type TxOBProcessEventsOpts< TxOBEventType extends string, TEventDataMap extends TxOBEventDataMap, @@ -217,403 +218,6 @@ type TxOBProcessEventsOpts< telemetry?: TxOBTelemetryInstruments; }; -const processEvent = async < - TxOBEventType extends string, - TEventDataMap extends TxOBEventDataMap, ->({ - client, - handlerMap, - unlockedEvent, - opts, -}: { - client: TxOBProcessorClient; - handlerMap: TxOBEventHandlerMap; - unlockedEvent: Pick, "id" | "errors">; - opts?: Partial>; -}): Promise<{ backoffUntil?: Date }> => { - const { - logger, - maxErrors = defaultMaxErrors, - signal, - backoff = defaultBackoff, - maxHandlerConcurrency = defaultMaxHandlerConcurrency, - onEventMaxErrorsReached, - telemetry, - } = opts ?? {}; - const eventStartedAt = Date.now(); - - if (signal?.aborted) { - return {}; - } - if (unlockedEvent.errors >= maxErrors) { - // Potential issue with client configuration on finding unprocessed events - // Events with maximum allowed errors should not be returned from `getEventsToProcess` - logger?.warn( - { - eventId: unlockedEvent.id, - errors: unlockedEvent.errors, - maxErrors, - }, - "unexpected event with max errors returned from `getEventsToProcess`", - ); - recordTelemetryCounter(telemetry?.eventCounter, telemetry, { - [TxOBTelemetryAttributeKey.EventOutcome]: - TxOBTelemetryEventOutcome.SkippedMaxErrors, - }); - recordTelemetryDuration( - telemetry?.eventDuration, - telemetry, - eventStartedAt, - { - [TxOBTelemetryAttributeKey.EventOutcome]: - TxOBTelemetryEventOutcome.SkippedMaxErrors, - }, - ); - return {}; - } - - let backoffUntil: Date | undefined; - let eventSpan: TxOBTelemetrySpan | undefined; - let eventError: unknown; - let eventOutcome: TxOBTelemetryEventOutcome | undefined; - let eventMetricAttributes: TxOBTelemetryAttributes = {}; - - try { - await client.transaction(async (txClient) => { - const lockedEvent = await txClient.getEventByIdForUpdateSkipLocked( - unlockedEvent.id, - { signal, maxErrors }, - ); - if (!lockedEvent) { - eventOutcome = TxOBTelemetryEventOutcome.SkippedLocked; - logger?.debug( - { - eventId: unlockedEvent.id, - }, - "skipping locked or already processed event", - ); - return; - } - - eventMetricAttributes = { - [TxOBTelemetryAttributeKey.EventType]: lockedEvent.type, - }; - eventSpan = startTelemetrySpan( - telemetry, - TxOBTelemetrySpanName.EventProcess, - { - [TxOBTelemetryAttributeKey.EventId]: lockedEvent.id, - [TxOBTelemetryAttributeKey.EventType]: lockedEvent.type, - [TxOBTelemetryAttributeKey.EventCorrelationId]: - lockedEvent.correlation_id, - [TxOBTelemetryAttributeKey.EventErrors]: lockedEvent.errors, - }, - ); - - // While unlikely, the following two conditions are possible if a concurrent processor finished processing this event or reaching maximum errors between the time - // that this processor found the event with `getEventsToProcess` and called `getEventByIdForUpdateSkipLocked` - // `getEventByIdForUpdateSkipLocked` should handle this in its query implementation and return null to save resources - if (lockedEvent.processed_at) { - eventOutcome = TxOBTelemetryEventOutcome.SkippedProcessed; - logger?.debug( - { - eventId: lockedEvent.id, - correlationId: lockedEvent.correlation_id, - }, - "skipping already processed event", - ); - return; - } - if (lockedEvent.errors >= maxErrors) { - eventOutcome = TxOBTelemetryEventOutcome.SkippedMaxErrors; - logger?.debug( - { - eventId: lockedEvent.id, - correlationId: lockedEvent.correlation_id, - }, - "skipping event with maximum errors", - ); - return; - } - - let errored = false; - - const eventHandlerMap = handlerMap[lockedEvent.type] ?? {}; - - // Typescript should prevent the caller from passing a handler map that doesn't specify all event types but we'll check for it anyway - // This is distinct from an empty handler map for an event type which is valid - // We just want the caller to be explicit about the event types they are interested in handling and not accidentally skip events - if (!(lockedEvent.type in handlerMap)) { - logger?.warn( - { - eventId: lockedEvent.id, - type: lockedEvent.type, - correlationId: lockedEvent.correlation_id, - }, - "missing event handler map", - ); - errored = true; - lockedEvent.errors = maxErrors; - } - - logger?.debug( - { - eventId: lockedEvent.id, - type: lockedEvent.type, - correlationId: lockedEvent.correlation_id, - }, - `processing event`, - ); - - const backoffs: Date[] = []; - const backoffErrors: unknown[] = []; - let latestBackoffError: unknown; - - const handlerLimit = pLimit(maxHandlerConcurrency); - await Promise.allSettled( - Object.entries(eventHandlerMap).map(([handlerName, handler]) => - handlerLimit(async (): Promise => { - const handlerMetricAttributes = { - [TxOBTelemetryAttributeKey.EventType]: lockedEvent.type, - [TxOBTelemetryAttributeKey.HandlerName]: handlerName, - }; - const handlerResults = - lockedEvent.handler_results[handlerName] ?? {}; - if (handlerResults.processed_at) { - logger?.debug( - { - eventId: lockedEvent.id, - type: lockedEvent.type, - handlerName, - correlationId: lockedEvent.correlation_id, - }, - "handler already processed", - ); - recordTelemetryCounter(telemetry?.handlerCounter, telemetry, { - ...handlerMetricAttributes, - [TxOBTelemetryAttributeKey.HandlerOutcome]: - TxOBTelemetryHandlerOutcome.SkippedProcessed, - }); - return; - } - if (handlerResults.unprocessable_at) { - logger?.debug( - { - eventId: lockedEvent.id, - type: lockedEvent.type, - handlerName, - correlationId: lockedEvent.correlation_id, - }, - "handler unprocessable", - ); - recordTelemetryCounter(telemetry?.handlerCounter, telemetry, { - ...handlerMetricAttributes, - [TxOBTelemetryAttributeKey.HandlerOutcome]: - TxOBTelemetryHandlerOutcome.SkippedUnprocessable, - }); - return; - } - - handlerResults.errors ??= []; - const handlerStartedAt = Date.now(); - const handlerSpan = startTelemetrySpan( - telemetry, - TxOBTelemetrySpanName.HandlerProcess, - { - [TxOBTelemetryAttributeKey.EventId]: lockedEvent.id, - [TxOBTelemetryAttributeKey.EventType]: lockedEvent.type, - [TxOBTelemetryAttributeKey.EventCorrelationId]: - lockedEvent.correlation_id, - [TxOBTelemetryAttributeKey.HandlerName]: handlerName, - }, - ); - let handlerOutcome: TxOBTelemetryHandlerOutcome = - TxOBTelemetryHandlerOutcome.Success; - let handlerError: unknown; - - try { - await handler(lockedEvent, { signal }); - handlerResults.processed_at = getDate(); - logger?.debug( - { - eventId: lockedEvent.id, - type: lockedEvent.type, - handlerName, - correlationId: lockedEvent.correlation_id, - }, - "handler succeeded", - ); - } catch (error) { - handlerError = error; - latestBackoffError = error; - backoffErrors.push(error); - logger?.error( - { - eventId: lockedEvent.id, - type: lockedEvent.type, - handlerName, - error, - correlationId: lockedEvent.correlation_id, - }, - "handler errored", - ); - - if (error instanceof ErrorUnprocessableEventHandler) { - handlerOutcome = TxOBTelemetryHandlerOutcome.Unprocessable; - handlerResults.unprocessable_at = getDate(); - handlerResults.errors?.push({ - error: error.message ?? error, - timestamp: getDate(), - }); - errored = true; - } else { - handlerOutcome = TxOBTelemetryHandlerOutcome.Error; - if (error instanceof TxOBError && error.backoffUntil) { - backoffs.push(error.backoffUntil); - } - - errored = true; - handlerResults.errors?.push({ - error: (error as Error)?.message ?? error, - timestamp: getDate(), - }); - } - } finally { - const handlerAttributes = { - ...handlerMetricAttributes, - [TxOBTelemetryAttributeKey.HandlerOutcome]: handlerOutcome, - }; - recordTelemetryCounter( - telemetry?.handlerCounter, - telemetry, - handlerAttributes, - ); - recordTelemetryDuration( - telemetry?.handlerDuration, - telemetry, - handlerStartedAt, - handlerAttributes, - ); - endTelemetrySpan(handlerSpan, handlerError); - } - - lockedEvent.handler_results[handlerName] = handlerResults; - }), - ), - ); - - // Check if all remaining handlers (those that haven't succeeded) are unprocessable - // If so, there's nothing left to retry, so set errors to maxErrors to stop processing - const remainingHandlers = Object.entries(eventHandlerMap).filter( - ([handlerName, _]) => { - const result = lockedEvent.handler_results[handlerName]; - return !result?.processed_at; - }, - ); - - const allRemainingHandlersUnprocessable = - remainingHandlers.length > 0 && - remainingHandlers.every(([handlerName, _]) => { - const result = lockedEvent.handler_results[handlerName]; - return result?.unprocessable_at; - }); - - if (allRemainingHandlersUnprocessable) { - lockedEvent.errors = maxErrors; - errored = true; - } - - if (errored) { - lockedEvent.errors = Math.min(lockedEvent.errors + 1, maxErrors); - const backoffContext: TxOBBackoffContext = { - attempt: lockedEvent.errors, - error: latestBackoffError, - errors: backoffErrors, - event: deepClone(lockedEvent) as Readonly< - TxOBEventByType - >, - maxErrors, - }; - backoffs.push( - backoff(backoffContext), - ); - const latestBackoff = backoffs.sort( - (a, b) => b.getTime() - a.getTime(), - )[0]; - lockedEvent.backoff_until = latestBackoff; - if (lockedEvent.errors === maxErrors) { - lockedEvent.backoff_until = null; - lockedEvent.processed_at = getDate(); - - if (onEventMaxErrorsReached) { - try { - await onEventMaxErrorsReached({ - event: deepClone(lockedEvent) as Readonly< - TxOBEventByType - >, - txClient, - signal, - }); - } catch (hookError) { - logger?.error( - { - eventId: lockedEvent.id, - error: hookError, - }, - "error in onEventMaxErrorsReached hook", - ); - - throw hookError; - } - } - } - } else { - lockedEvent.backoff_until = null; - lockedEvent.processed_at = getDate(); - } - - eventOutcome = errored - ? lockedEvent.errors === maxErrors - ? TxOBTelemetryEventOutcome.MaxErrors - : TxOBTelemetryEventOutcome.Error - : TxOBTelemetryEventOutcome.Success; - setTelemetrySpanAttributes(eventSpan, { - [TxOBTelemetryAttributeKey.EventOutcome]: eventOutcome, - [TxOBTelemetryAttributeKey.EventErrors]: lockedEvent.errors, - }); - - backoffUntil = lockedEvent.backoff_until ?? undefined; - - await txClient.updateEvent(lockedEvent); - }); - } catch (error) { - eventError = error; - eventOutcome ??= TxOBTelemetryEventOutcome.Error; - throw error; - } finally { - if (eventOutcome) { - const eventAttributes = { - ...eventMetricAttributes, - [TxOBTelemetryAttributeKey.EventOutcome]: eventOutcome, - }; - recordTelemetryCounter( - telemetry?.eventCounter, - telemetry, - eventAttributes, - ); - recordTelemetryDuration( - telemetry?.eventDuration, - telemetry, - eventStartedAt, - eventAttributes, - ); - } - endTelemetrySpan(eventSpan, eventError); - } - - return { backoffUntil }; -}; - export interface Logger { debug(message?: unknown, ...optionalParams: unknown[]): void; info(message?: unknown, ...optionalParams: unknown[]): void; @@ -628,11 +232,15 @@ export interface TxOBProcessor { export class EventProcessor< TxOBEventType extends string, - TEventDataMap extends TxOBEventDataMap = TxOBEventDataMap, + TEventDataMap extends TxOBEventDataMap = + TxOBEventDataMap, > implements TxOBProcessor { private client: TxOBProcessorClient; private handlerMap: TxOBEventHandlerMap; - private opts: Omit, "signal"> & { + private opts: Omit< + TxOBProcessEventsOpts, + "signal" + > & { pollingIntervalMs: number; maxQueuedEvents: number; wakeupTimeoutMs: number; @@ -1001,12 +609,10 @@ export const createEventProcessor = < export const createEventHandlerMap = < const TEventSchemas extends TxOBEventSchemaMap, ->( - opts: { - eventSchemas: TEventSchemas; - handlerMap: TxOBEventHandlerMapFromSchemas; - }, -): TxOBEventHandlerMapFromSchemas => { +>(opts: { + eventSchemas: TEventSchemas; + handlerMap: TxOBEventHandlerMapFromSchemas; +}): TxOBEventHandlerMapFromSchemas => { const { eventSchemas: _eventSchemas, handlerMap } = opts; return handlerMap; };