diff --git a/v2/inflightmessages.go b/v2/inflightmessages.go new file mode 100644 index 0000000..984682f --- /dev/null +++ b/v2/inflightmessages.go @@ -0,0 +1,62 @@ +package shuttle + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" +) + +const inFlightMessageAbandonTimeout = 10 * time.Second + +type inFlightMessages struct { + mu sync.RWMutex + tracked map[*azservicebus.ReceivedMessage]struct{} +} + +func newInFlightMessages() *inFlightMessages { + return &inFlightMessages{ + tracked: make(map[*azservicebus.ReceivedMessage]struct{}), + } +} + +func (m *inFlightMessages) track(message *azservicebus.ReceivedMessage) { + m.mu.Lock() + defer m.mu.Unlock() + m.tracked[message] = struct{}{} +} + +func (m *inFlightMessages) forget(message *azservicebus.ReceivedMessage) { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.tracked, message) +} + +func (m *inFlightMessages) close(ctx context.Context, settler MessageSettler) error { + var errs []error + for _, message := range m.messages() { + abandonCtx, cancel := context.WithTimeout(ctx, inFlightMessageAbandonTimeout) + err := settler.AbandonMessage(abandonCtx, message, nil) + cancel() + if err != nil { + errs = append(errs, fmt.Errorf("failed to abandon message %s during processor close: %w", message.MessageID, err)) + continue + } + m.forget(message) + } + return errors.Join(errs...) +} + +func (m *inFlightMessages) messages() []*azservicebus.ReceivedMessage { + m.mu.RLock() + defer m.mu.RUnlock() + + messages := make([]*azservicebus.ReceivedMessage, 0, len(m.tracked)) + for message := range m.tracked { + messages = append(messages, message) + } + return messages +} diff --git a/v2/inflightmessages_test.go b/v2/inflightmessages_test.go new file mode 100644 index 0000000..e84cc45 --- /dev/null +++ b/v2/inflightmessages_test.go @@ -0,0 +1,167 @@ +package shuttle + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + "github.com/stretchr/testify/require" +) + +func TestInFlightMessages_CloseAbandonsTrackedMessages(t *testing.T) { + inFlight := newInFlightMessages() + settler := &inFlightMessageSettler{} + trackedMessage := &azservicebus.ReceivedMessage{MessageID: "tracked"} + forgottenMessage := &azservicebus.ReceivedMessage{MessageID: "forgotten"} + + inFlight.track(trackedMessage) + inFlight.track(forgottenMessage) + inFlight.forget(forgottenMessage) + + require.NoError(t, inFlight.close(context.Background(), settler)) + + messages := settler.abandonedMessages() + require.Len(t, messages, 1) + require.Same(t, trackedMessage, messages[0]) +} + +func TestInFlightMessages_CloseRemovesTrackedMessages(t *testing.T) { + inFlight := newInFlightMessages() + settler := &inFlightMessageSettler{} + message := &azservicebus.ReceivedMessage{MessageID: "message"} + + inFlight.track(message) + + require.NoError(t, inFlight.close(context.Background(), settler)) + require.NoError(t, inFlight.close(context.Background(), settler)) + + messages := settler.abandonedMessages() + require.Len(t, messages, 1) + require.Same(t, message, messages[0]) +} + +func TestInFlightMessages_CloseReturnsAbandonErrors(t *testing.T) { + firstErr := errors.New("first abandon failed") + secondErr := errors.New("second abandon failed") + inFlight := newInFlightMessages() + settler := &inFlightMessageSettler{ + abandonErrors: []error{firstErr, secondErr}, + } + + inFlight.track(&azservicebus.ReceivedMessage{MessageID: "first"}) + inFlight.track(&azservicebus.ReceivedMessage{MessageID: "second"}) + + err := inFlight.close(context.Background(), settler) + + require.Error(t, err) + require.ErrorIs(t, err, firstErr) + require.ErrorIs(t, err, secondErr) +} + +func TestInFlightMessages_CloseKeepsMessagesWhenAbandonFails(t *testing.T) { + abandonErr := errors.New("abandon failed") + inFlight := newInFlightMessages() + failingSettler := &inFlightMessageSettler{ + abandonErrors: []error{abandonErr}, + } + message := &azservicebus.ReceivedMessage{MessageID: "message"} + + inFlight.track(message) + + err := inFlight.close(context.Background(), failingSettler) + + require.ErrorIs(t, err, abandonErr) + messages := inFlight.messages() + require.Len(t, messages, 1) + require.Same(t, message, messages[0]) + + successfulSettler := &inFlightMessageSettler{} + require.NoError(t, inFlight.close(context.Background(), successfulSettler)) + require.Empty(t, inFlight.messages()) + abandonedMessages := successfulSettler.abandonedMessages() + require.Len(t, abandonedMessages, 1) + require.Same(t, message, abandonedMessages[0]) +} + +func TestInFlightMessages_CloseUsesAbandonTimeout(t *testing.T) { + inFlight := newInFlightMessages() + settler := &inFlightMessageSettler{} + + inFlight.track(&azservicebus.ReceivedMessage{MessageID: "message"}) + + beforeClose := time.Now() + require.NoError(t, inFlight.close(context.Background(), settler)) + afterClose := time.Now() + + deadlines := settler.abandonDeadlines() + require.Len(t, deadlines, 1) + require.True(t, deadlines[0].ok) + require.True(t, deadlines[0].deadline.After(beforeClose.Add(9*time.Second))) + require.True(t, deadlines[0].deadline.Before(afterClose.Add(11*time.Second))) +} + +type inFlightMessageSettler struct { + mu sync.Mutex + abandoned []*azservicebus.ReceivedMessage + deadlines []abandonDeadline + abandonErrors []error + abandonAttempt int +} + +type abandonDeadline struct { + deadline time.Time + ok bool +} + +func (s *inFlightMessageSettler) AbandonMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.AbandonMessageOptions) error { + deadline, ok := ctx.Deadline() + + s.mu.Lock() + defer s.mu.Unlock() + s.abandoned = append(s.abandoned, message) + s.deadlines = append(s.deadlines, abandonDeadline{ + deadline: deadline, + ok: ok, + }) + err := error(nil) + if s.abandonAttempt < len(s.abandonErrors) { + err = s.abandonErrors[s.abandonAttempt] + } + s.abandonAttempt++ + return err +} + +func (s *inFlightMessageSettler) CompleteMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.CompleteMessageOptions) error { + return nil +} + +func (s *inFlightMessageSettler) DeadLetterMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.DeadLetterOptions) error { + return nil +} + +func (s *inFlightMessageSettler) DeferMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.DeferMessageOptions) error { + return nil +} + +func (s *inFlightMessageSettler) RenewMessageLock(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.RenewMessageLockOptions) error { + return nil +} + +func (s *inFlightMessageSettler) abandonedMessages() []*azservicebus.ReceivedMessage { + s.mu.Lock() + defer s.mu.Unlock() + messages := make([]*azservicebus.ReceivedMessage, len(s.abandoned)) + copy(messages, s.abandoned) + return messages +} + +func (s *inFlightMessageSettler) abandonDeadlines() []abandonDeadline { + s.mu.Lock() + defer s.mu.Unlock() + deadlines := make([]abandonDeadline, len(s.deadlines)) + copy(deadlines, s.deadlines) + return deadlines +} diff --git a/v2/processor.go b/v2/processor.go index 814de7b..117bd56 100644 --- a/v2/processor.go +++ b/v2/processor.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync" "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" @@ -42,7 +43,11 @@ type Processor struct { receiver Receiver options ProcessorOptions handle Handler - concurrencyTokens chan struct{} // tracks how many concurrent messages are currently being handled by the processor + concurrencyTokens chan struct{} // TODO: remove once receive sizing and in-flight tracking share a simpler lifecycle model. + inFlightMessages *inFlightMessages + shutdownCtx context.Context + shutdownCancel context.CancelFunc + receiveMu sync.Mutex } // ProcessorOptions configures the processor @@ -101,11 +106,15 @@ func applyProcessorOptions(options *ProcessorOptions) *ProcessorOptions { // NewProcessor creates a new processor with the provided receiver and handler. func NewProcessor(receiver Receiver, handler HandlerFunc, options *ProcessorOptions) *Processor { opts := applyProcessorOptions(options) + shutdownCtx, shutdownCancel := context.WithCancel(context.Background()) return &Processor{ receiver: receiver, handle: handler, options: *opts, concurrencyTokens: make(chan struct{}, opts.MaxConcurrency), + inFlightMessages: newInFlightMessages(), + shutdownCtx: shutdownCtx, + shutdownCancel: shutdownCancel, } } @@ -113,6 +122,11 @@ func NewProcessor(receiver Receiver, handler HandlerFunc, options *ProcessorOpti // It will retry starting the processor based on the StartMaxAttempt and StartRetryDelayStrategy. // Returns a combined list of errors encountered during each processor start attempt. func (p *Processor) Start(ctx context.Context) (err error) { + if p.isClosed() { + return context.Canceled + } + ctx, cancel := p.startContext(ctx) + defer cancel() defer func() { if rec := recover(); rec != nil { err = fmt.Errorf("panic recovered from processor: %s", rec) @@ -121,6 +135,15 @@ func (p *Processor) Start(ctx context.Context) (err error) { return p.startWithRetries(ctx) } +// Close stops receiving new messages, cancels in-flight message handlers, and +// abandons messages currently held by the processor. +func (p *Processor) Close(ctx context.Context) error { + p.shutdownCancel() + p.receiveMu.Lock() + defer p.receiveMu.Unlock() + return p.inFlightMessages.close(ctx, p.receiver) +} + // startWithRetries starts a processor and blocks until an error occurs or the context is canceled. // It will retry starting the processor based on the StartMaxAttempt and StartRetryDelayStrategy. // Returns a combined list of errors during the start attempts or ctx.Err() if the context @@ -147,18 +170,32 @@ func (p *Processor) startWithRetries(ctx context.Context) error { return savedError } +func (p *Processor) startContext(ctx context.Context) (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(ctx) + if p.isClosed() { + cancel() + return ctx, cancel + } + go func() { + select { + case <-p.shutdownCtx.Done(): + cancel() + case <-ctx.Done(): + } + }() + return ctx, cancel +} + +func (p *Processor) isClosed() bool { + return p.shutdownCtx.Err() != nil +} + // start starts the processor and blocks until an error occurs or the context is canceled. func (p *Processor) start(ctx context.Context) error { logger := getLogger(ctx) logger.Info("starting processor") - messages, err := p.receiver.ReceiveMessages(ctx, p.options.MaxReceiveCount, nil) - if err != nil { - return fmt.Errorf("failed to receive messages: %w", err) - } - logger.Info(fmt.Sprintf("received %d messages - initial", len(messages))) - processor.Metric.IncMessageReceived(float64(len(messages))) - for _, msg := range messages { - p.process(ctx, msg) + if err := p.receiveAndProcess(ctx, p.options.MaxReceiveCount, "initial"); err != nil { + return err } for ctx.Err() == nil { select { @@ -167,14 +204,8 @@ func (p *Processor) start(ctx context.Context) error { if ctx.Err() != nil || maxMessages == 0 { break } - messages, err := p.receiver.ReceiveMessages(ctx, maxMessages, nil) - if err != nil { - return fmt.Errorf("failed to receive messages: %w", err) - } - logger.Info(fmt.Sprintf("received %d messages from processor loop", len(messages))) - processor.Metric.IncMessageReceived(float64(len(messages))) - for _, msg := range messages { - p.process(ctx, msg) + if err := p.receiveAndProcess(ctx, maxMessages, "from processor loop"); err != nil { + return err } case <-ctx.Done(): logger.Info("context done, stop receiving from processor") @@ -184,13 +215,32 @@ func (p *Processor) start(ctx context.Context) error { return ctx.Err() } +func (p *Processor) receiveAndProcess(ctx context.Context, maxMessages int, source string) error { + p.receiveMu.Lock() + defer p.receiveMu.Unlock() + + messages, err := p.receiver.ReceiveMessages(ctx, maxMessages, nil) + if err != nil { + return fmt.Errorf("failed to receive messages: %w", err) + } + getLogger(ctx).Info(fmt.Sprintf("received %d messages - %s", len(messages), source)) + processor.Metric.IncMessageReceived(float64(len(messages))) + for _, msg := range messages { + p.process(ctx, msg) + } + return nil +} + func (p *Processor) process(ctx context.Context, message *azservicebus.ReceivedMessage) { p.concurrencyTokens <- struct{}{} + p.inFlightMessages.track(message) + go func() { msgContext, cancel := context.WithCancel(ctx) // cancel messageContext when we get out of this goroutine defer cancel() defer func() { + p.inFlightMessages.forget(message) <-p.concurrencyTokens processor.Metric.IncMessageHandled(message) processor.Metric.DecConcurrentMessageCount(message) diff --git a/v2/processor_fake_test.go b/v2/processor_fake_test.go index 4c90798..49e53ec 100644 --- a/v2/processor_fake_test.go +++ b/v2/processor_fake_test.go @@ -3,6 +3,7 @@ package shuttle_test import ( "context" "fmt" + "sync" "sync/atomic" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" @@ -18,11 +19,18 @@ type fakeSettler struct { DeadLetterCalled atomic.Int32 DeferCalled atomic.Int32 RenewCalled atomic.Int32 + SetupAbandonErr error + + mu sync.Mutex + AbandonedMessages []*azservicebus.ReceivedMessage } func (f *fakeSettler) AbandonMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.AbandonMessageOptions) error { f.AbandonCalled.Add(1) - return nil + f.mu.Lock() + f.AbandonedMessages = append(f.AbandonedMessages, message) + f.mu.Unlock() + return f.SetupAbandonErr } func (f *fakeSettler) CompleteMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.CompleteMessageOptions) error { @@ -45,6 +53,14 @@ func (f *fakeSettler) RenewMessageLock(ctx context.Context, message *azservicebu return nil } +func (f *fakeSettler) abandonedMessages() []*azservicebus.ReceivedMessage { + f.mu.Lock() + defer f.mu.Unlock() + messages := make([]*azservicebus.ReceivedMessage, len(f.AbandonedMessages)) + copy(messages, f.AbandonedMessages) + return messages +} + type fakeReceiver struct { // outcomes to verify ReceiveCalls []int // array of maxMessage value passed to receive calls in the lifetime of the fake receiver @@ -55,21 +71,51 @@ type fakeReceiver struct { *fakeSettler SetupMaxReceiveCalls int SetupReceivePanic string + SetupRespectContext bool + SetupReceiveStarted chan struct{} } -func (f *fakeReceiver) ReceiveMessages(_ context.Context, maxMessages int, _ *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error) { +func (f *fakeReceiver) ReceiveMessages(ctx context.Context, maxMessages int, _ *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error) { f.ReceiveCalls = append(f.ReceiveCalls, maxMessages) + if f.SetupReceiveStarted != nil { + select { + case f.SetupReceiveStarted <- struct{}{}: + default: + } + } if maxMessages == 0 && len(f.SetupReceivedMessages) > 0 { return nil, nil } var result []*azservicebus.ReceivedMessage - for msg := range f.SetupReceivedMessages { - result = append(result, msg) - if len(result) == maxMessages || len(f.SetupReceivedMessages) == 0 { - break + for len(result) < maxMessages { + if f.SetupRespectContext { + select { + case msg, ok := <-f.SetupReceivedMessages: + if !ok { + return f.receiveResult(result) + } + result = append(result, msg) + if len(f.SetupReceivedMessages) == 0 { + return f.receiveResult(result) + } + case <-ctx.Done(): + return result, ctx.Err() + } + continue + } + for msg := range f.SetupReceivedMessages { + result = append(result, msg) + if len(result) == maxMessages || len(f.SetupReceivedMessages) == 0 { + break + } } + break } + return f.receiveResult(result) +} + +func (f *fakeReceiver) receiveResult(result []*azservicebus.ReceivedMessage) ([]*azservicebus.ReceivedMessage, error) { if f.SetupReceivePanic != "" { panic(f.SetupReceivePanic) } diff --git a/v2/processor_test.go b/v2/processor_test.go index 9e2c3d3..49c5148 100644 --- a/v2/processor_test.go +++ b/v2/processor_test.go @@ -2,6 +2,7 @@ package shuttle_test import ( "context" + "errors" "fmt" "testing" "time" @@ -377,6 +378,195 @@ func TestProcessorStart_RecoversReceiverPanic(t *testing.T) { g.Expect(err.Error()).To(ContainSubstring("panic recovered from processor: receive panic!")) } +func TestProcessorClose_CancelsAndAbandonsInflightMessages(t *testing.T) { + messages := messagesChannel(2) + close(messages) + settler := &fakeSettler{} + rcv := &fakeReceiver{ + fakeSettler: settler, + SetupReceivedMessages: messages, + SetupMaxReceiveCalls: 10, + } + started := make(chan *azservicebus.ReceivedMessage, 2) + canceled := make(chan *azservicebus.ReceivedMessage, 2) + processor := shuttle.NewProcessor(rcv, func(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) { + started <- message + <-ctx.Done() + canceled <- message + }, &shuttle.ProcessorOptions{ + MaxConcurrency: 2, + ReceiveInterval: to.Ptr(1 * time.Hour), + }) + + errCh := make(chan error, 1) + go func() { errCh <- processor.Start(context.Background()) }() + + g := NewWithT(t) + g.Eventually(started).Should(Receive()) + g.Eventually(started).Should(Receive()) + + g.Expect(processor.Close(context.Background())).To(Succeed()) + g.Eventually(canceled).Should(Receive()) + g.Eventually(canceled).Should(Receive()) + g.Eventually(errCh).Should(Receive(MatchError(context.Canceled))) + g.Expect(settler.AbandonCalled.Load()).To(Equal(int32(2))) + g.Expect(settler.abandonedMessages()).To(HaveLen(2)) +} + +func TestProcessorClose_StopsStartReceiveLoop(t *testing.T) { + rcv := &fakeReceiver{ + fakeSettler: &fakeSettler{}, + SetupReceivedMessages: make(chan *azservicebus.ReceivedMessage), + SetupMaxReceiveCalls: 10, + SetupRespectContext: true, + SetupReceiveStarted: make(chan struct{}, 1), + } + processor := shuttle.NewProcessor(rcv, MyHandler(0*time.Second), &shuttle.ProcessorOptions{ + ReceiveInterval: to.Ptr(10 * time.Millisecond), + }) + errCh := make(chan error, 1) + go func() { errCh <- processor.Start(context.Background()) }() + + g := NewWithT(t) + g.Eventually(rcv.SetupReceiveStarted).Should(Receive()) + g.Expect(processor.Close(context.Background())).To(Succeed()) + g.Eventually(errCh).Should(Receive(MatchError(MatchRegexp("failed to receive messages: context canceled")))) + g.Expect(rcv.ReceiveCalls).To(HaveLen(1)) +} + +func TestProcessorClose_WaitsForReceiveBatchBeforeAbandoning(t *testing.T) { + messages := make(chan *azservicebus.ReceivedMessage) + settler := &fakeSettler{} + rcv := &fakeReceiver{ + fakeSettler: settler, + SetupReceivedMessages: messages, + SetupMaxReceiveCalls: 10, + SetupReceiveStarted: make(chan struct{}, 1), + } + releaseHandler := make(chan struct{}) + processor := shuttle.NewProcessor(rcv, func(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) { + <-releaseHandler + }, &shuttle.ProcessorOptions{ + ReceiveInterval: to.Ptr(1 * time.Hour), + }) + errCh := make(chan error, 1) + go func() { errCh <- processor.Start(context.Background()) }() + + g := NewWithT(t) + g.Eventually(rcv.SetupReceiveStarted).Should(Receive()) + + closeErrCh := make(chan error, 1) + go func() { closeErrCh <- processor.Close(context.Background()) }() + g.Consistently(closeErrCh, 20*time.Millisecond).ShouldNot(Receive()) + + message := &azservicebus.ReceivedMessage{MessageID: "received-during-close"} + messages <- message + + g.Eventually(closeErrCh).Should(Receive(Succeed())) + abandoned := settler.abandonedMessages() + g.Expect(abandoned).To(HaveLen(1)) + g.Expect(abandoned[0]).To(BeIdenticalTo(message)) + + close(releaseHandler) + g.Eventually(errCh).Should(Receive(MatchError(context.Canceled))) +} + +func TestProcessorClose_ReturnsAbandonErrors(t *testing.T) { + abandonErr := errors.New("abandon failed") + messages := messagesChannel(2) + close(messages) + settler := &fakeSettler{SetupAbandonErr: abandonErr} + rcv := &fakeReceiver{ + fakeSettler: settler, + SetupReceivedMessages: messages, + SetupMaxReceiveCalls: 10, + } + started := make(chan struct{}, 2) + release := make(chan struct{}) + processor := shuttle.NewProcessor(rcv, func(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) { + started <- struct{}{} + <-release + }, &shuttle.ProcessorOptions{ + MaxConcurrency: 2, + ReceiveInterval: to.Ptr(1 * time.Hour), + }) + errCh := make(chan error, 1) + go func() { errCh <- processor.Start(context.Background()) }() + defer func() { + close(release) + <-errCh + }() + + g := NewWithT(t) + g.Eventually(started).Should(Receive()) + g.Eventually(started).Should(Receive()) + + err := processor.Close(context.Background()) + g.Expect(err).To(HaveOccurred()) + g.Expect(errors.Is(err, abandonErr)).To(BeTrue()) + g.Expect(err.Error()).To(ContainSubstring("failed to abandon message")) + g.Expect(settler.AbandonCalled.Load()).To(Equal(int32(2))) +} + +func TestProcessorClose_IsIdempotent(t *testing.T) { + messages := messagesChannel(1) + close(messages) + settler := &fakeSettler{} + rcv := &fakeReceiver{ + fakeSettler: settler, + SetupReceivedMessages: messages, + SetupMaxReceiveCalls: 10, + } + started := make(chan struct{}, 1) + processor := shuttle.NewProcessor(rcv, func(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) { + started <- struct{}{} + <-ctx.Done() + }, &shuttle.ProcessorOptions{ + ReceiveInterval: to.Ptr(1 * time.Hour), + }) + errCh := make(chan error, 1) + go func() { errCh <- processor.Start(context.Background()) }() + + g := NewWithT(t) + g.Eventually(started).Should(Receive()) + g.Expect(processor.Close(context.Background())).To(Succeed()) + g.Expect(processor.Close(context.Background())).To(Succeed()) + g.Eventually(errCh).Should(Receive(MatchError(context.Canceled))) + g.Expect(settler.AbandonCalled.Load()).To(Equal(int32(1))) +} + +func TestProcessorClose_DoesNotWaitForHandlerToExit(t *testing.T) { + messages := messagesChannel(1) + close(messages) + settler := &fakeSettler{} + rcv := &fakeReceiver{ + fakeSettler: settler, + SetupReceivedMessages: messages, + SetupMaxReceiveCalls: 10, + } + started := make(chan struct{}, 1) + release := make(chan struct{}) + processor := shuttle.NewProcessor(rcv, func(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) { + started <- struct{}{} + <-release + }, &shuttle.ProcessorOptions{ + ReceiveInterval: to.Ptr(1 * time.Hour), + }) + errCh := make(chan error, 1) + go func() { errCh <- processor.Start(context.Background()) }() + + g := NewWithT(t) + g.Eventually(started).Should(Receive()) + + closeCtx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + g.Expect(processor.Close(closeCtx)).To(Succeed()) + g.Expect(settler.AbandonCalled.Load()).To(Equal(int32(1))) + g.Eventually(errCh).Should(Receive(MatchError(context.Canceled))) + + close(release) +} + func messagesChannel(messageCount int) chan *azservicebus.ReceivedMessage { messages := make(chan *azservicebus.ReceivedMessage, messageCount) for i := 0; i < messageCount; i++ {