diff --git a/fswatch/doc.go b/fswatch/doc.go new file mode 100644 index 00000000..a2586d93 --- /dev/null +++ b/fswatch/doc.go @@ -0,0 +1,21 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package fswatch is a small cross-platform filesystem watcher used by +// Kubernetes to remove direct fsnotify dependencies from the Linux +// build closure. The Linux implementation uses raw inotify; non-Linux +// platforms wrap fsnotify. +package fswatch diff --git a/fswatch/fswatch.go b/fswatch/fswatch.go new file mode 100644 index 00000000..15bcccb1 --- /dev/null +++ b/fswatch/fswatch.go @@ -0,0 +1,123 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fswatch + +import ( + "errors" +) + +// Sentinel errors. +var ( + ErrClosed = errors.New("fswatch: watcher is closed") + ErrNonExistentWatch = errors.New("fswatch: path is not being watched") + // ErrEventOverflow is delivered on the errors channel when the + // kernel queue overflowed and events were dropped. + ErrEventOverflow = errors.New("fswatch: event queue overflow") +) + +// Op is the cross-platform event class. Bits match fsnotify v1's +// default set so direct callers can migrate by import-path swap. +type Op uint32 + +const ( + Create Op = 1 << iota + Write + Remove + Rename + Chmod +) + +// String returns a pipe-separated list of bits set in op. +func (op Op) String() string { + if op == 0 { + return "" + } + var s string + add := func(name string) { + if s != "" { + s += "|" + } + s += name + } + if op&Create != 0 { + add("Create") + } + if op&Write != 0 { + add("Write") + } + if op&Remove != 0 { + add("Remove") + } + if op&Rename != 0 { + add("Rename") + } + if op&Chmod != 0 { + add("Chmod") + } + return s +} + +// Event is a filesystem event. +type Event struct { + Name string + Op Op +} + +// Has reports whether op is set in e.Op. +func (e Event) Has(op Op) bool { return e.Op&op != 0 } + +// Watcher delivers filesystem events. Add, Remove, and Close are safe +// to call from any goroutine; Close is idempotent. Reading Events and +// Errors is the caller's responsibility. +type Watcher struct { + impl watcherImpl +} + +// watcherImpl is the platform-specific backing for Watcher. +type watcherImpl interface { + Add(path string) error + Remove(path string) error + Events() <-chan Event + Errors() <-chan error + Close() error +} + +// NewWatcher constructs a Watcher. +func NewWatcher() (*Watcher, error) { + impl, err := newWatcherImpl() + if err != nil { + return nil, err + } + return &Watcher{impl: impl}, nil +} + +// Add starts watching path. Returns ErrClosed if the watcher has been +// closed. +func (w *Watcher) Add(path string) error { return w.impl.Add(path) } + +// Remove stops watching path. +func (w *Watcher) Remove(path string) error { return w.impl.Remove(path) } + +// Events returns the channel of filesystem events. Closed after Close. +func (w *Watcher) Events() <-chan Event { return w.impl.Events() } + +// Errors returns the channel of backend errors. Closed after Close. +func (w *Watcher) Errors() <-chan error { return w.impl.Errors() } + +// Close stops the watcher. In-flight events are dropped. Subsequent +// Add and Remove return ErrClosed. Close is idempotent. +func (w *Watcher) Close() error { return w.impl.Close() } diff --git a/fswatch/fswatch_linux.go b/fswatch/fswatch_linux.go new file mode 100644 index 00000000..054d02a8 --- /dev/null +++ b/fswatch/fswatch_linux.go @@ -0,0 +1,326 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fswatch + +import ( + "errors" + "fmt" + "sync" + "sync/atomic" + "syscall" + "unsafe" +) + +const portableDefaultMask = syscall.IN_CREATE | + syscall.IN_MOVED_TO | + syscall.IN_MODIFY | + syscall.IN_ATTRIB | + syscall.IN_DELETE | + syscall.IN_DELETE_SELF | + syscall.IN_MOVE_SELF | + syscall.IN_MOVED_FROM + +// inotifyImpl is the Linux watcher backed by raw inotify(7), epoll for +// shutdown multiplexing, and a pipe for shutdown wakeup. It deliberately +// keeps no fsnotify dependency so the Linux build closure stays clean. +type inotifyImpl struct { + fd int + epollF int + wakeRd int + wakeWr int + + exited chan struct{} + done chan struct{} + shuttingDown atomic.Bool + + mu sync.Mutex + closed bool + watches map[uint32]string + paths map[string]uint32 + + events chan Event + errors chan error +} + +func newWatcherImpl() (watcherImpl, error) { + fd, err := syscall.InotifyInit1(syscall.IN_CLOEXEC | syscall.IN_NONBLOCK) + if err != nil { + return nil, fmt.Errorf("inotify_init1: %w", err) + } + + var pipeFds [2]int + if err := syscall.Pipe2(pipeFds[:], syscall.O_CLOEXEC|syscall.O_NONBLOCK); err != nil { + syscall.Close(fd) + return nil, fmt.Errorf("pipe2: %w", err) + } + + epollF, err := syscall.EpollCreate1(syscall.EPOLL_CLOEXEC) + if err != nil { + syscall.Close(fd) + syscall.Close(pipeFds[0]) + syscall.Close(pipeFds[1]) + return nil, fmt.Errorf("epoll_create1: %w", err) + } + + if err := syscall.EpollCtl(epollF, syscall.EPOLL_CTL_ADD, fd, + &syscall.EpollEvent{Events: syscall.EPOLLIN, Fd: int32(fd)}); err != nil { + syscall.Close(fd) + syscall.Close(pipeFds[0]) + syscall.Close(pipeFds[1]) + syscall.Close(epollF) + return nil, fmt.Errorf("epoll_ctl(inotify): %w", err) + } + if err := syscall.EpollCtl(epollF, syscall.EPOLL_CTL_ADD, pipeFds[0], + &syscall.EpollEvent{Events: syscall.EPOLLIN, Fd: int32(pipeFds[0])}); err != nil { + syscall.Close(fd) + syscall.Close(pipeFds[0]) + syscall.Close(pipeFds[1]) + syscall.Close(epollF) + return nil, fmt.Errorf("epoll_ctl(wakeRd): %w", err) + } + + b := &inotifyImpl{ + fd: fd, + epollF: epollF, + wakeRd: pipeFds[0], + wakeWr: pipeFds[1], + exited: make(chan struct{}), + done: make(chan struct{}), + watches: make(map[uint32]string), + paths: make(map[string]uint32), + events: make(chan Event, 64), + errors: make(chan error, 64), + } + go b.readEvents() + return b, nil +} + +func (b *inotifyImpl) Add(path string) error { + // Hold mu across the syscall so a concurrent Close cannot close + // b.fd while we are issuing inotify_add_watch on it. Close waits + // for the read goroutine to exit before closing FDs anyway, but + // it sets b.closed under mu first; observing closed=false under + // mu and keeping the lock proves the FD is still valid. + b.mu.Lock() + defer b.mu.Unlock() + if b.closed { + return ErrClosed + } + wd, err := syscall.InotifyAddWatch(b.fd, path, portableDefaultMask) + if err != nil { + return err + } + if existing, ok := b.paths[path]; ok && existing != uint32(wd) { + delete(b.watches, existing) + } + b.watches[uint32(wd)] = path + b.paths[path] = uint32(wd) + return nil +} + +func (b *inotifyImpl) Remove(path string) error { + b.mu.Lock() + defer b.mu.Unlock() + if b.closed { + return ErrClosed + } + wd, ok := b.paths[path] + if !ok { + return ErrNonExistentWatch + } + delete(b.paths, path) + delete(b.watches, wd) + if _, err := syscall.InotifyRmWatch(b.fd, wd); err != nil { + // EINVAL means the kernel already removed the watch (e.g. + // the watched path was deleted). + if errors.Is(err, syscall.EINVAL) { + return nil + } + return err + } + return nil +} + +func (b *inotifyImpl) Events() <-chan Event { return b.events } +func (b *inotifyImpl) Errors() <-chan error { return b.errors } + +func (b *inotifyImpl) Close() error { + b.mu.Lock() + if b.closed { + b.mu.Unlock() + return nil + } + b.closed = true + b.mu.Unlock() + + b.shuttingDown.Store(true) + close(b.done) + closeErr := syscall.Close(b.wakeWr) + <-b.exited + syscall.Close(b.fd) + syscall.Close(b.epollF) + syscall.Close(b.wakeRd) + close(b.events) + close(b.errors) + return closeErr +} + +func (b *inotifyImpl) readEvents() { + defer close(b.exited) + + var epollEvents [2]syscall.EpollEvent + var buf [syscall.SizeofInotifyEvent * 4096]byte + + for { + n, err := syscall.EpollWait(b.epollF, epollEvents[:], -1) + if err != nil { + if errors.Is(err, syscall.EINTR) { + continue + } + b.deliverError(err) + return + } + for i := 0; i < n; i++ { + switch epollEvents[i].Fd { + case int32(b.wakeRd): + return + case int32(b.fd): + if !b.drainInotify(buf[:]) { + return + } + } + } + } +} + +func (b *inotifyImpl) drainInotify(buf []byte) bool { + for { + select { + case <-b.done: + return false + default: + } + if b.shuttingDown.Load() { + return false + } + + n, err := syscall.Read(b.fd, buf) + if err != nil { + if errors.Is(err, syscall.EAGAIN) || errors.Is(err, syscall.EWOULDBLOCK) { + return true + } + if errors.Is(err, syscall.EINTR) { + continue + } + b.deliverError(err) + return false + } + if n < syscall.SizeofInotifyEvent { + b.deliverError(errors.New("fswatch: short inotify read")) + continue + } + + var offset uint32 + for offset <= uint32(n)-syscall.SizeofInotifyEvent { + select { + case <-b.done: + return false + default: + } + raw := (*syscall.InotifyEvent)(unsafe.Pointer(&buf[offset])) + b.deliverInotifyEvent(raw, buf, offset) + offset += syscall.SizeofInotifyEvent + raw.Len + } + } +} + +func (b *inotifyImpl) deliverInotifyEvent(raw *syscall.InotifyEvent, buf []byte, offset uint32) { + if uint32(raw.Mask)&syscall.IN_Q_OVERFLOW != 0 { + select { + case b.errors <- ErrEventOverflow: + default: + } + return + } + + op := translateMaskToOp(uint32(raw.Mask)) + if op == 0 { + return + } + + name := b.resolveName(raw, buf, offset) + ev := Event{Name: name, Op: op} + + select { + case <-b.done: + return + case b.events <- ev: + } +} + +// resolveName builds the absolute event path. When the kernel does not +// append a filename (events targeting the watched path itself), the +// watch's stored path is used. +func (b *inotifyImpl) resolveName(raw *syscall.InotifyEvent, buf []byte, offset uint32) string { + b.mu.Lock() + name := b.watches[uint32(raw.Wd)] + b.mu.Unlock() + if raw.Len > 0 { + nameBytes := buf[offset+syscall.SizeofInotifyEvent : offset+syscall.SizeofInotifyEvent+raw.Len] + for i, c := range nameBytes { + if c == 0 { + nameBytes = nameBytes[:i] + break + } + } + if len(nameBytes) > 0 { + name = name + "/" + string(nameBytes) + } + } + return name +} + +func (b *inotifyImpl) deliverError(err error) { + select { + case b.errors <- err: + default: + } +} + +// translateMaskToOp mirrors fsnotify v1's newEvent translation so +// direct fsnotify callers can migrate by import-path swap and observe +// the same Op set. IN_MOVED_TO maps to Create (file moved into watched +// dir), not Rename, matching fsnotify. +func translateMaskToOp(mask uint32) Op { + var op Op + if mask&syscall.IN_CREATE != 0 || mask&syscall.IN_MOVED_TO != 0 { + op |= Create + } + if mask&syscall.IN_DELETE != 0 || mask&syscall.IN_DELETE_SELF != 0 { + op |= Remove + } + if mask&syscall.IN_MODIFY != 0 { + op |= Write + } + if mask&syscall.IN_MOVE_SELF != 0 || mask&syscall.IN_MOVED_FROM != 0 { + op |= Rename + } + if mask&syscall.IN_ATTRIB != 0 { + op |= Chmod + } + return op +} diff --git a/fswatch/fswatch_others.go b/fswatch/fswatch_others.go new file mode 100644 index 00000000..465dd046 --- /dev/null +++ b/fswatch/fswatch_others.go @@ -0,0 +1,150 @@ +//go:build !linux + +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fswatch + +import ( + "errors" + "sync" + + upstream "github.com/fsnotify/fsnotify" +) + +// fsnotifyImpl wraps github.com/fsnotify/fsnotify on non-Linux +// platforms. The wrapper exists so the public API is identical across +// platforms; on Linux a raw inotify implementation is used instead, and +// fsnotify is therefore absent from the Linux build closure. +type fsnotifyImpl struct { + w *upstream.Watcher + events chan Event + errors chan error + done chan struct{} + exited chan struct{} + closeMu sync.Mutex + closed bool +} + +func newWatcherImpl() (watcherImpl, error) { + w, err := upstream.NewWatcher() + if err != nil { + return nil, err + } + impl := &fsnotifyImpl{ + w: w, + events: make(chan Event, 64), + errors: make(chan error, 64), + done: make(chan struct{}), + exited: make(chan struct{}), + } + go impl.translate() + return impl, nil +} + +func (i *fsnotifyImpl) Add(path string) error { + // Hold closeMu across the upstream syscall so a concurrent Close + // cannot tear down fsnotify's internal FDs mid-call. + i.closeMu.Lock() + defer i.closeMu.Unlock() + if i.closed { + return ErrClosed + } + if err := i.w.Add(path); err != nil { + return translateUpstreamErr(err) + } + return nil +} + +func (i *fsnotifyImpl) Remove(path string) error { + i.closeMu.Lock() + defer i.closeMu.Unlock() + if i.closed { + return ErrClosed + } + if err := i.w.Remove(path); err != nil { + if errors.Is(err, upstream.ErrClosed) { + return ErrClosed + } + // fsnotify uses a generic error for non-existent watches. + return ErrNonExistentWatch + } + return nil +} + +func translateUpstreamErr(err error) error { + switch { + case errors.Is(err, upstream.ErrClosed): + return ErrClosed + case errors.Is(err, upstream.ErrEventOverflow): + return ErrEventOverflow + default: + return err + } +} + +func (i *fsnotifyImpl) Events() <-chan Event { return i.events } +func (i *fsnotifyImpl) Errors() <-chan error { return i.errors } + +func (i *fsnotifyImpl) Close() error { + i.closeMu.Lock() + if i.closed { + i.closeMu.Unlock() + return nil + } + i.closed = true + // Close fsnotify under the lock so concurrent Add/Remove cannot + // issue an upstream call on the FD we are tearing down. + close(i.done) + err := i.w.Close() + i.closeMu.Unlock() + <-i.exited + close(i.events) + close(i.errors) + return err +} + +func (i *fsnotifyImpl) translate() { + defer close(i.exited) + for { + select { + case <-i.done: + return + case ev, ok := <-i.w.Events: + if !ok { + return + } + select { + case <-i.done: + return + case i.events <- Event{Name: ev.Name, Op: Op(ev.Op)}: + } + case err, ok := <-i.w.Errors: + if !ok { + return + } + translated := err + if errors.Is(err, upstream.ErrEventOverflow) { + translated = ErrEventOverflow + } + select { + case <-i.done: + return + case i.errors <- translated: + } + } + } +} diff --git a/fswatch/fswatch_test.go b/fswatch/fswatch_test.go new file mode 100644 index 00000000..dbc80cd6 --- /dev/null +++ b/fswatch/fswatch_test.go @@ -0,0 +1,253 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fswatch_test + +import ( + "errors" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "k8s.io/utils/fswatch" +) + +func TestNewWatcher(t *testing.T) { + w, err := fswatch.NewWatcher() + if err != nil { + t.Fatal(err) + } + if err := w.Close(); err != nil { + t.Fatal(err) + } +} + +func TestWatcherCloseIdempotent(t *testing.T) { + w, err := fswatch.NewWatcher() + if err != nil { + t.Fatal(err) + } + if err := w.Close(); err != nil { + t.Fatal(err) + } + if err := w.Close(); err != nil { + t.Errorf("second Close: got %v, want nil", err) + } +} + +func TestWatcherAddRemove(t *testing.T) { + dir := t.TempDir() + w, err := fswatch.NewWatcher() + if err != nil { + t.Fatal(err) + } + defer w.Close() + + if err := w.Add(dir); err != nil { + t.Fatalf("Add: %v", err) + } + if err := w.Remove(dir); err != nil { + t.Fatalf("Remove: %v", err) + } + if err := w.Remove(dir); !errors.Is(err, fswatch.ErrNonExistentWatch) { + t.Errorf("second Remove: got %v, want ErrNonExistentWatch", err) + } +} + +func TestWatcherAddAfterClose(t *testing.T) { + dir := t.TempDir() + w, err := fswatch.NewWatcher() + if err != nil { + t.Fatal(err) + } + if err := w.Close(); err != nil { + t.Fatal(err) + } + if err := w.Add(dir); !errors.Is(err, fswatch.ErrClosed) { + t.Errorf("Add after Close: got %v, want ErrClosed", err) + } +} + +// TestWatcherDeliversCreate exercises the end-to-end happy path: a +// fresh file in the watched directory produces an event with the +// portable Op set. +func TestWatcherDeliversCreate(t *testing.T) { + dir := t.TempDir() + w, err := fswatch.NewWatcher() + if err != nil { + t.Fatal(err) + } + defer w.Close() + if err := w.Add(dir); err != nil { + t.Fatal(err) + } + + target := filepath.Join(dir, "f") + if err := os.WriteFile(target, []byte("hi"), 0644); err != nil { + t.Fatal(err) + } + + deadline := time.After(2 * time.Second) + for { + select { + case ev := <-w.Events(): + if ev.Name == target && ev.Has(fswatch.Create) { + return + } + case err := <-w.Errors(): + t.Fatalf("unexpected error: %v", err) + case <-deadline: + t.Fatal("Create event for target not received within 2s") + } + } +} + +// TestWatcherCloseNoLeakUnderTraffic exercises the shutdown path under +// active event traffic. Without proper coordination between the read +// goroutine and Close, this test would deadlock or leak. +func TestWatcherCloseNoLeakUnderTraffic(t *testing.T) { + dir := t.TempDir() + w, err := fswatch.NewWatcher() + if err != nil { + t.Fatal(err) + } + if err := w.Add(dir); err != nil { + t.Fatal(err) + } + + // Pump events from a goroutine; we deliberately don't drain the + // public channels. + stop := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + i := 0 + for { + select { + case <-stop: + return + default: + name := filepath.Join(dir, "f"+itoa(i)) + _ = os.WriteFile(name, []byte("x"), 0644) + _ = os.Remove(name) + i++ + } + } + }() + + time.Sleep(100 * time.Millisecond) + + done := make(chan error, 1) + go func() { done <- w.Close() }() + + select { + case err := <-done: + close(stop) + wg.Wait() + if err != nil { + t.Fatalf("Close: %v", err) + } + case <-time.After(2 * time.Second): + close(stop) + wg.Wait() + t.Fatal("Close deadlocked under traffic") + } +} + +// TestWatcherAddCloseRace exercises Add and Close racing each other. +// Add must either succeed cleanly or return ErrClosed; the underlying +// inotify FD must never be closed mid-syscall (which would surface as +// a syscall errno or a panic under -race). +func TestWatcherAddCloseRace(t *testing.T) { + for i := 0; i < 50; i++ { + dir := t.TempDir() + w, err := fswatch.NewWatcher() + if err != nil { + t.Fatal(err) + } + + var addErrs sync.Map + var wg sync.WaitGroup + for j := 0; j < 4; j++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + err := w.Add(dir) + addErrs.Store(id, err) + }(j) + } + // Race Close against the Adds. + _ = w.Close() + wg.Wait() + + addErrs.Range(func(_, v any) bool { + if v == nil { + return true + } + err, _ := v.(error) + if err != nil && !errors.Is(err, fswatch.ErrClosed) { + t.Errorf("Add returned non-ErrClosed error during Close race: %v", err) + } + return true + }) + } +} + +func TestOpString(t *testing.T) { + cases := []struct { + op fswatch.Op + want string + }{ + {0, ""}, + {fswatch.Create, "Create"}, + {fswatch.Write, "Write"}, + {fswatch.Create | fswatch.Write, "Create|Write"}, + } + for _, c := range cases { + if got := c.op.String(); got != c.want { + t.Errorf("Op(%d).String() = %q, want %q", c.op, got, c.want) + } + } +} + +// itoa is a tiny helper avoiding strconv import in test loops. +func itoa(i int) string { + if i == 0 { + return "0" + } + var buf [20]byte + n := len(buf) + for i > 0 { + n-- + buf[n] = byte('0' + i%10) + i /= 10 + } + return string(buf[n:]) +} + +func waitFor(d time.Duration, cond func() bool) bool { + end := time.Now().Add(d) + for time.Now().Before(end) { + if cond() { + return true + } + time.Sleep(20 * time.Millisecond) + } + return cond() +} diff --git a/fswatch/watch_dir.go b/fswatch/watch_dir.go new file mode 100644 index 00000000..a4a91bf1 --- /dev/null +++ b/fswatch/watch_dir.go @@ -0,0 +1,150 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fswatch + +import ( + "context" + "errors" + "time" +) + +// DirOption configures WatchDir. +type DirOption func(*dirConfig) + +type dirConfig struct { + recheckInterval time.Duration + errorHandler func(error) +} + +// WithDirRecheckInterval fires onChange every d regardless of +// filesystem events. Also drives retries of Add(dir) when the watch +// is inactive (initial Add failed or the directory was removed). +func WithDirRecheckInterval(d time.Duration) DirOption { + return func(c *dirConfig) { c.recheckInterval = d } +} + +// WithDirErrorHandler installs an error handler invoked on watcher +// errors and Add failures. +func WithDirErrorHandler(h func(error)) DirOption { + return func(c *dirConfig) { c.errorHandler = h } +} + +// WatchDir watches dir non-recursively and invokes onChange on every +// filesystem event in dir. Blocks until ctx is canceled. +// +// If NewWatcher or Add(dir) fails, WatchDir reports the error via the +// configured error handler and keeps ticking onChange on the recheck +// interval; it retries Add(dir) on each tick while the watch is +// inactive. Self Remove/Rename of dir also marks the watch inactive +// so a recreated directory regains event-driven updates on the next +// successful Add. +// +// A return value reports an unrecoverable startup error only when no +// recheck interval was configured. +func WatchDir(ctx context.Context, dir string, onChange func(), opts ...DirOption) error { + cfg := dirConfig{} + for _, o := range opts { + o(&cfg) + } + if onChange == nil { + onChange = func() {} + } + reportErr := func(err error) { + if cfg.errorHandler != nil { + cfg.errorHandler(err) + } + } + + w, werr := NewWatcher() + if werr != nil { + reportErr(werr) + if cfg.recheckInterval <= 0 { + return werr + } + } else { + defer w.Close() + } + + var ( + watchActive bool + eventsCh <-chan Event + errorsCh <-chan error + ) + if w != nil { + if err := w.Add(dir); err != nil { + reportErr(err) + if cfg.recheckInterval <= 0 { + return err + } + } else { + watchActive = true + eventsCh = w.Events() + errorsCh = w.Errors() + } + } + + var recheckCh <-chan time.Time + if cfg.recheckInterval > 0 { + t := time.NewTicker(cfg.recheckInterval) + defer t.Stop() + recheckCh = t.C + } + + // retryAdd attempts to (re)establish the watch when it is + // inactive (initial Add failed or the directory was removed). + retryAdd := func() { + if watchActive || w == nil { + return + } + if err := w.Add(dir); err != nil { + reportErr(err) + return + } + watchActive = true + eventsCh = w.Events() + errorsCh = w.Errors() + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-recheckCh: + retryAdd() + onChange() + case ev, ok := <-eventsCh: + if !ok { + eventsCh = nil + continue + } + if ev.Name == dir && (ev.Has(Remove) || ev.Has(Rename)) { + watchActive = false + } + onChange() + case err, ok := <-errorsCh: + if !ok { + errorsCh = nil + continue + } + if errors.Is(err, ErrEventOverflow) { + onChange() + continue + } + reportErr(err) + } + } +} diff --git a/fswatch/watch_dir_test.go b/fswatch/watch_dir_test.go new file mode 100644 index 00000000..730c165f --- /dev/null +++ b/fswatch/watch_dir_test.go @@ -0,0 +1,120 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fswatch_test + +import ( + "context" + "os" + "path/filepath" + "sync/atomic" + "testing" + "time" + + "k8s.io/utils/fswatch" +) + +// TestWatchDirFiresOnFileCreate verifies that creating a file in the +// watched directory fires onChange. +func TestWatchDirFiresOnFileCreate(t *testing.T) { + dir := t.TempDir() + + var changes atomic.Int32 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + _ = fswatch.WatchDir(ctx, dir, func() { changes.Add(1) }) + }() + time.Sleep(100 * time.Millisecond) + + if err := os.WriteFile(filepath.Join(dir, "f"), []byte("x"), 0644); err != nil { + t.Fatal(err) + } + + if !waitFor(2*time.Second, func() bool { return changes.Load() >= 1 }) { + t.Fatal("onChange not invoked within 2s for file create") + } +} + +// TestWatchDirRecheckFiresOnTick verifies that WithDirRecheckInterval +// drives onChange callbacks at a regular cadence even when no +// filesystem events occur. +func TestWatchDirRecheckFiresOnTick(t *testing.T) { + dir := t.TempDir() + + var ticks atomic.Int32 + ctx, cancel := context.WithTimeout(context.Background(), 600*time.Millisecond) + defer cancel() + + _ = fswatch.WatchDir(ctx, dir, func() { + ticks.Add(1) + }, fswatch.WithDirRecheckInterval(100*time.Millisecond)) + + if got := ticks.Load(); got < 3 { + t.Errorf("recheck-on-tick fired %d times in 600ms with 100ms interval, want >= 3", got) + } +} + +// TestWatchDirRetriesAddOnRecheck verifies that an initial Add(dir) +// failure does not stop the loop: the error handler is invoked, the +// recheck ticker keeps firing onChange, and once the directory exists +// Add succeeds and event-driven updates resume. +func TestWatchDirRetriesAddOnRecheck(t *testing.T) { + parent := t.TempDir() + missing := filepath.Join(parent, "later") + + var ( + ticks atomic.Int32 + errs atomic.Int32 + ) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + done := make(chan struct{}) + go func() { + defer close(done) + _ = fswatch.WatchDir(ctx, missing, func() { + ticks.Add(1) + }, + fswatch.WithDirRecheckInterval(100*time.Millisecond), + fswatch.WithDirErrorHandler(func(error) { errs.Add(1) }), + ) + }() + + // First few ticks should report the Add failure but keep ticking. + if !waitFor(2*time.Second, func() bool { return errs.Load() >= 1 && ticks.Load() >= 1 }) { + t.Fatalf("expected error handler + ticks before dir exists; got errs=%d ticks=%d", errs.Load(), ticks.Load()) + } + + // Create the directory; the next recheck should bring the watch + // online and a file create should produce a tick. + if err := os.Mkdir(missing, 0755); err != nil { + t.Fatal(err) + } + // Wait long enough for the next recheck to retry Add. + time.Sleep(250 * time.Millisecond) + preTicks := ticks.Load() + if err := os.WriteFile(filepath.Join(missing, "f"), []byte("x"), 0644); err != nil { + t.Fatal(err) + } + if !waitFor(2*time.Second, func() bool { return ticks.Load() > preTicks }) { + t.Fatalf("expected event-driven tick after directory recreated; ticks=%d (was %d)", ticks.Load(), preTicks) + } + + cancel() + <-done +} diff --git a/fswatch/watch_file.go b/fswatch/watch_file.go new file mode 100644 index 00000000..2380ba59 --- /dev/null +++ b/fswatch/watch_file.go @@ -0,0 +1,208 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fswatch + +import ( + "context" + "errors" + "os" + "path/filepath" + "time" +) + +// FileOption configures WatchFile. +type FileOption func(*fileConfig) + +type fileConfig struct { + recheckInterval time.Duration + fallbackPolling time.Duration + initialCallback bool + errorHandler func(error) +} + +// WithRecheckInterval makes onChange fire every d, regardless of +// filesystem events. Used by reloaders that need to retry transient +// apply failures even when the watched file is otherwise stable. +func WithRecheckInterval(d time.Duration) FileOption { + return func(c *fileConfig) { c.recheckInterval = d } +} + +// WithFallbackPolling enables stat-based detection when watcher init +// or the initial Add fails. +func WithFallbackPolling(d time.Duration) FileOption { + return func(c *fileConfig) { c.fallbackPolling = d } +} + +// WithInitialCallback fires onChange once after a successful watch is +// established. +func WithInitialCallback() FileOption { + return func(c *fileConfig) { c.initialCallback = true } +} + +// WithErrorHandler installs an error handler invoked on watcher +// errors and init failures. +func WithErrorHandler(h func(error)) FileOption { + return func(c *fileConfig) { c.errorHandler = h } +} + +// WatchFile watches path and invokes onChange whenever the file's +// lstat info (size, mode, mtime, inode, or symlink target) changes. +// Blocks until ctx is canceled. +// +// Watch the parent directory so atomic rename updates are observed. +func WatchFile(ctx context.Context, path string, onChange func(), opts ...FileOption) error { + cfg := fileConfig{} + for _, o := range opts { + o(&cfg) + } + if onChange == nil { + onChange = func() {} + } + + lastSnap := lstatSnapshot(path) + + changed := func() { + s := lstatSnapshot(path) + if !s.equal(lastSnap) { + lastSnap = s + onChange() + } + } + + parent := filepath.Dir(path) + + w, werr := NewWatcher() + var ( + eventsCh <-chan Event + errorsCh <-chan error + watchActive bool + ) + if werr != nil { + if cfg.errorHandler != nil { + cfg.errorHandler(werr) + } + if cfg.fallbackPolling <= 0 { + return werr + } + } else { + defer w.Close() + if err := w.Add(parent); err != nil { + if cfg.errorHandler != nil { + cfg.errorHandler(err) + } + if cfg.fallbackPolling <= 0 { + return err + } + } else { + watchActive = true + eventsCh = w.Events() + errorsCh = w.Errors() + } + } + + // Fire the initial callback only after the watch (or fallback + // polling) is in place, so a change between the callback and the + // first event source coming online cannot be missed. + if cfg.initialCallback { + onChange() + lastSnap = lstatSnapshot(path) + } + + var ( + recheckCh <-chan time.Time + pollCh <-chan time.Time + ) + if cfg.recheckInterval > 0 { + t := time.NewTicker(cfg.recheckInterval) + defer t.Stop() + recheckCh = t.C + } + if !watchActive && cfg.fallbackPolling > 0 { + t := time.NewTicker(cfg.fallbackPolling) + defer t.Stop() + pollCh = t.C + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-recheckCh: + // WatchUntil-style tick: fire onChange unconditionally so + // callers can implement their own retry semantics. + onChange() + lastSnap = lstatSnapshot(path) + case <-pollCh: + changed() + case _, ok := <-eventsCh: + if !ok { + eventsCh = nil + continue + } + changed() + case err, ok := <-errorsCh: + if !ok { + errorsCh = nil + continue + } + if errors.Is(err, ErrEventOverflow) { + // Treat overflow as "treat path as changed." + changed() + continue + } + if cfg.errorHandler != nil { + cfg.errorHandler(err) + } + } + } +} + +// fileSnapshot captures the parts of os.FileInfo we use for change +// detection plus the resolved symlink target's info if any. +type fileSnapshot struct { + link os.FileInfo + target os.FileInfo +} + +func lstatSnapshot(path string) fileSnapshot { + link, err := os.Lstat(path) + if err != nil { + return fileSnapshot{} + } + var target os.FileInfo + if link.Mode()&os.ModeSymlink != 0 { + target, _ = os.Stat(path) + } + return fileSnapshot{link: link, target: target} +} + +func (s fileSnapshot) equal(other fileSnapshot) bool { + return sameInfo(s.link, other.link) && sameInfo(s.target, other.target) +} + +func sameInfo(a, b os.FileInfo) bool { + if (a == nil) != (b == nil) { + return false + } + if a == nil { + return true + } + if a.Size() != b.Size() || a.Mode() != b.Mode() || !a.ModTime().Equal(b.ModTime()) { + return false + } + return os.SameFile(a, b) +} diff --git a/fswatch/watch_file_test.go b/fswatch/watch_file_test.go new file mode 100644 index 00000000..2c581fcd --- /dev/null +++ b/fswatch/watch_file_test.go @@ -0,0 +1,159 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fswatch_test + +import ( + "context" + "errors" + "os" + "path/filepath" + "sync/atomic" + "testing" + "time" + + "k8s.io/utils/fswatch" +) + +// TestWatchFileFiresOnInPlaceWrite verifies that an in-place write +// triggers onChange. +func TestWatchFileFiresOnInPlaceWrite(t *testing.T) { + dir := t.TempDir() + target := filepath.Join(dir, "config") + if err := os.WriteFile(target, []byte("v1"), 0644); err != nil { + t.Fatal(err) + } + + var changes atomic.Int32 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + done := make(chan error, 1) + go func() { + done <- fswatch.WatchFile(ctx, target, func() { + changes.Add(1) + }) + }() + time.Sleep(100 * time.Millisecond) + + if err := os.WriteFile(target, []byte("v22"), 0644); err != nil { + t.Fatal(err) + } + + if !waitFor(2*time.Second, func() bool { return changes.Load() >= 1 }) { + t.Fatal("onChange not invoked within 2s") + } + + cancel() + if err := <-done; !errors.Is(err, context.Canceled) { + t.Errorf("got %v, want context.Canceled", err) + } +} + +// TestWatchFileAtomicRename verifies that rename-into-place fires +// onChange. +func TestWatchFileAtomicRename(t *testing.T) { + dir := t.TempDir() + target := filepath.Join(dir, "config") + if err := os.WriteFile(target, []byte("v1"), 0644); err != nil { + t.Fatal(err) + } + + var changes atomic.Int32 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + _ = fswatch.WatchFile(ctx, target, func() { changes.Add(1) }) + }() + time.Sleep(100 * time.Millisecond) + + tmp := target + ".tmp" + if err := os.WriteFile(tmp, []byte("v2-different-size"), 0644); err != nil { + t.Fatal(err) + } + if err := os.Rename(tmp, target); err != nil { + t.Fatal(err) + } + + if !waitFor(2*time.Second, func() bool { return changes.Load() >= 1 }) { + t.Fatal("onChange not invoked within 2s for atomic rename") + } +} + +// TestWatchFileInitialCallback verifies that WithInitialCallback fires +// onChange once at startup. +func TestWatchFileInitialCallback(t *testing.T) { + dir := t.TempDir() + target := filepath.Join(dir, "config") + if err := os.WriteFile(target, []byte("v1"), 0644); err != nil { + t.Fatal(err) + } + + var changes atomic.Int32 + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + _ = fswatch.WatchFile(ctx, target, func() { + changes.Add(1) + }, fswatch.WithInitialCallback()) + + if got := changes.Load(); got != 1 { + t.Errorf("onChange invoked %d times, want 1 (initial only)", got) + } +} + +// TestWatchFileRecheckFiresUnconditionally verifies that +// WithRecheckInterval fires onChange on every tick regardless of +// whether the file changed. Callers rely on this for retry-on-failure +// semantics that match the original WatchUntil eventHandler-on-tick +// behavior. +func TestWatchFileRecheckFiresUnconditionally(t *testing.T) { + dir := t.TempDir() + target := filepath.Join(dir, "config") + if err := os.WriteFile(target, []byte("hello"), 0644); err != nil { + t.Fatal(err) + } + + var changes atomic.Int32 + ctx, cancel := context.WithTimeout(context.Background(), 700*time.Millisecond) + defer cancel() + + _ = fswatch.WatchFile(ctx, target, func() { + changes.Add(1) + }, fswatch.WithRecheckInterval(100*time.Millisecond)) + + if got := changes.Load(); got < 3 { + t.Errorf("recheck-on-tick fired %d times in 700ms with 100ms interval, want >= 3", got) + } +} + +// TestWatchFileErrorHandlerCalledOnInitFailure verifies that the +// error handler is invoked when the parent directory does not exist. +func TestWatchFileErrorHandlerCalledOnInitFailure(t *testing.T) { + target := "/nonexistent-fswatch-dir/some/file" + + var hits atomic.Int32 + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + _ = fswatch.WatchFile(ctx, target, func() {}, + fswatch.WithErrorHandler(func(error) { hits.Add(1) }), + fswatch.WithFallbackPolling(50*time.Millisecond), + ) + if hits.Load() == 0 { + t.Errorf("error handler not invoked on init failure") + } +} diff --git a/go.mod b/go.mod index 29ee77db..cfdb5732 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,11 @@ go 1.23 require ( github.com/davecgh/go-spew v1.1.1 + github.com/fsnotify/fsnotify v1.9.0 k8s.io/klog/v2 v2.80.1 ) -require github.com/go-logr/logr v1.2.0 // indirect +require ( + github.com/go-logr/logr v1.2.0 // indirect + golang.org/x/sys v0.13.0 // indirect +) diff --git a/go.sum b/go.sum index 6b8e4dbd..c9899894 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/go-logr/logr v1.2.0 h1:QK40JKJyMdUDz+h+xvCsru/bJhvG0UxvePV0ufL/AcE= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4= k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=