From 33207df7bf8d4c230992d0b1f5c8a15f4c20283c Mon Sep 17 00:00:00 2001 From: Michael Puncel Date: Tue, 29 May 2018 15:21:40 -0400 Subject: [PATCH 1/2] Add log lines when daemon set handling goroutine exits Occasionally we see problems in the DS farm where the main control loop gets blocked sending updates to daemon set handlers via channels. This is likely because the handler goroutines are exiting, but logs don't currently reveal the cause. This commit adds logs to all of the places where these goroutines might exit which should shed light on the issue --- pkg/ds/daemon_set.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/ds/daemon_set.go b/pkg/ds/daemon_set.go index f98c722e3..192299a23 100644 --- a/pkg/ds/daemon_set.go +++ b/pkg/ds/daemon_set.go @@ -361,6 +361,7 @@ func (ds *daemonSet) WatchDesires( // so that the timer would be stopped after err = nil case <-ctx.Done(): + ds.logger.Warnln("goroutine exiting because ") return } } else { @@ -380,6 +381,7 @@ func (ds *daemonSet) WatchDesires( case newDS, ok := <-updatedCh: if !ok { // channel closed + ds.logger.Warnln("goroutine exiting because updatedCh has closed") return } if ds.ID() != newDS.ID { @@ -462,6 +464,7 @@ func (ds *daemonSet) WatchDesires( case deleteDS, ok := <-deletedCh: if !ok { + ds.logger.Warnln("goroutine exiting because deletedCh has closed") return } // Deleting a daemon sets has no effect @@ -471,6 +474,7 @@ func (ds *daemonSet) WatchDesires( case _, ok := <-nodesChangedCh: if !ok { // channel closed + ds.logger.Warnln("goroutine exiting because nodesChangedCh has closed") return } if reportErr := ds.reportEligible(); reportErr != nil { @@ -530,6 +534,7 @@ func (ds *daemonSet) WatchDesires( nodesToAdd <- addedNodes case <-ctx.Done(): + ds.logger.Warnln("goroutine exiting because context was canceled") return } } From 782d378d35222badc71dcfa34b523a9bba51acff Mon Sep 17 00:00:00 2001 From: Michael Puncel Date: Tue, 29 May 2018 16:29:29 -0400 Subject: [PATCH 2/2] DS farm: buffer per-DS update channel Occasionally the daemon set farm locks up with the farm goroutine blocking forever attempting to send an update to a daemon set worker goroutine. This can happen due to a race where the worker thread might exit for a number of reasons after the farm goroutine checks the child map to determine a worker already exists but before sending an update. This commit sidesteps the problem by buffering the per-daemon set update channel so that the farm goroutine will never block sending to a worker. If a worker dies, an existing routine grabs a mutex protecting the child map and clears out the child entry and drains the buffered channel. The next time an update is seen for the daemon set, the farm loop should know that it needs to spawn another worker. --- pkg/ds/daemon_set.go | 2 +- pkg/ds/farm.go | 18 ++++++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/pkg/ds/daemon_set.go b/pkg/ds/daemon_set.go index 192299a23..cb1e653a2 100644 --- a/pkg/ds/daemon_set.go +++ b/pkg/ds/daemon_set.go @@ -361,7 +361,7 @@ func (ds *daemonSet) WatchDesires( // so that the timer would be stopped after err = nil case <-ctx.Done(): - ds.logger.Warnln("goroutine exiting because ") + ds.logger.Warnln("goroutine exiting due to canceled context") return } } else { diff --git a/pkg/ds/farm.go b/pkg/ds/farm.go index dab525b82..e81a32fd5 100644 --- a/pkg/ds/farm.go +++ b/pkg/ds/farm.go @@ -87,7 +87,7 @@ type Farm struct { type childDS struct { ds DaemonSet cancel context.CancelFunc - updatedCh chan<- ds_fields.DaemonSet + updatedCh chan ds_fields.DaemonSet deletedCh chan<- ds_fields.DaemonSet errCh <-chan error unlocker consul.TxnUnlocker @@ -310,6 +310,10 @@ func (dsf *Farm) closeChild(dsID fields.ID) { if child, ok := dsf.children[dsID]; ok { dsf.logger.WithField("ds", dsID).Infoln("Releasing daemon set") child.cancel() + + // drain the updatedCh (it's buffered) + for range child.updatedCh { + } close(child.updatedCh) close(child.deletedCh) @@ -560,7 +564,9 @@ func (dsf *Farm) spawnDaemonSet( dsf.statusWritingInterval, ) - updatedCh := make(chan ds_fields.DaemonSet) + // updatedCh is buffered by 1 to protect the control loop by slow (or + // dead) readers + updatedCh := make(chan ds_fields.DaemonSet, 1) deletedCh := make(chan ds_fields.DaemonSet) ctx, cancel := context.WithCancel(ctx) @@ -733,6 +739,14 @@ func (dsf *Farm) lockAndSpawn(ctx context.Context, dsFields ds_fields.DaemonSet, // If we already are running the daemon set, just pass the update. Otherwise spawn one if ok { + // try to drain the buffered value off the updatedCh if there is one (which + // indicates the worker goroutine was slow to read it or is dead) + select { + case <-child.updatedCh: + dsf.logger.Warnln("daemon set worker missed an update, sending a newer one") + default: + } + child.updatedCh <- dsFields } else { dsf.children[dsFields.ID] = dsf.spawnDaemonSet(