From 66639e83f7be4f1354817e45321f50bdb8e3227d Mon Sep 17 00:00:00 2001 From: Benson Wong Date: Tue, 21 Apr 2026 23:21:48 -0700 Subject: [PATCH] proxy: replace fsnotify with stat-poll watcher and add SIGHUP reload (#685) The fsnotify-based config watcher does not work reliably when the config file is bind-mounted into a Docker container as an individual file, and mishandles k8s ConfigMap projections (atomically swapped symlinks). Replace it with a small os.Stat-polling watcher and add SIGHUP as an explicit reload signal. - new proxy/configwatcher package: 2s os.Stat poller, follows symlinks, fires on mtime/size change and on missing -> present transitions - SIGHUP triggers reload unconditionally (works without --watch-config) via the same ConfigFileChangedEvent pipeline so the UI sees identical state transitions - watcher goroutine now exits cleanly on shutdown via a context - drop github.com/fsnotify/fsnotify dependency fixes #682 --- go.mod | 1 - go.sum | 2 - llama-swap.go | 86 +++++++------ proxy/configwatcher/watcher.go | 85 +++++++++++++ proxy/configwatcher/watcher_test.go | 191 ++++++++++++++++++++++++++++ 5 files changed, 322 insertions(+), 43 deletions(-) create mode 100644 proxy/configwatcher/watcher.go create mode 100644 proxy/configwatcher/watcher_test.go diff --git a/go.mod b/go.mod index 8b679d82..d417f215 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.26.1 require ( github.com/billziss-gh/golib v0.2.0 - github.com/fsnotify/fsnotify v1.9.0 github.com/gin-gonic/gin v1.10.0 github.com/klauspost/compress v1.18.5 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 348a72cf..ca65bc88 100644 --- a/go.sum +++ b/go.sum @@ -11,8 +11,6 @@ github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= -github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= diff --git a/llama-swap.go b/llama-swap.go index 9706e07d..431c3f4b 100644 --- a/llama-swap.go +++ b/llama-swap.go @@ -9,14 +9,15 @@ import ( "os" "os/signal" "path/filepath" + "runtime" "syscall" "time" - "github.com/fsnotify/fsnotify" "github.com/gin-gonic/gin" "github.com/mostlygeek/llama-swap/event" "github.com/mostlygeek/llama-swap/proxy" "github.com/mostlygeek/llama-swap/proxy/config" + "github.com/mostlygeek/llama-swap/proxy/configwatcher" ) var ( @@ -79,6 +80,17 @@ func main() { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + // Reload signals (SIGHUP on POSIX, none on Windows — Windows does not + // deliver SIGHUP). Always wired up so `kill -HUP` works regardless of + // --watch-config. + reloadChan := make(chan os.Signal, 1) + if runtime.GOOS != "windows" { + signal.Notify(reloadChan, syscall.SIGHUP) + } + + // Context that bounds the lifetime of background watcher goroutines. + watcherCtx, watcherCancel := context.WithCancel(context.Background()) + // Create server with initial handler srv := &http.Server{ Addr: *listenStr, @@ -121,52 +133,45 @@ func main() { // load the initial proxy manager reloadProxyManager() debouncedReload := debounce(time.Second, reloadProxyManager) - if *watchConfig { - defer event.On(func(e proxy.ConfigFileChangedEvent) { - if e.ReloadingState == proxy.ReloadingStateStart { - debouncedReload() - } - })() - fmt.Println("Watching Configuration for changes") + // Listen for ConfigFileChangedEvent unconditionally so SIGHUP and the + // poll-based watcher both feed the same debounced reload pipeline. The + // UI also listens for the matching ReloadingStateEnd emitted from + // reloadProxyManager. + defer event.On(func(e proxy.ConfigFileChangedEvent) { + if e.ReloadingState == proxy.ReloadingStateStart { + debouncedReload() + } + })() + + // SIGHUP (or platform-equivalent) → reload. Back-to-back signals collapse + // to one reload via the debounce window, which is the desired behavior. + go func() { + for range reloadChan { + fmt.Println("Received reload signal, reloading configuration") + event.Emit(proxy.ConfigFileChangedEvent{ + ReloadingState: proxy.ReloadingStateStart, + }) + } + }() + + if *watchConfig { go func() { absConfigPath, err := filepath.Abs(*configPath) if err != nil { fmt.Printf("Error getting absolute path for watching config file: %v\n", err) return } - watcher, err := fsnotify.NewWatcher() - if err != nil { - fmt.Printf("Error creating file watcher: %v. File watching disabled.\n", err) - return - } - - configDir := filepath.Dir(absConfigPath) - err = watcher.Add(configDir) - if err != nil { - fmt.Printf("Error adding config path directory (%s) to watcher: %v. File watching disabled.", configDir, err) - return - } - - defer watcher.Close() - for { - select { - case changeEvent := <-watcher.Events: - if changeEvent.Name == absConfigPath && (changeEvent.Has(fsnotify.Write) || changeEvent.Has(fsnotify.Create) || changeEvent.Has(fsnotify.Remove)) { - event.Emit(proxy.ConfigFileChangedEvent{ - ReloadingState: proxy.ReloadingStateStart, - }) - } else if changeEvent.Name == filepath.Join(configDir, "..data") && changeEvent.Has(fsnotify.Create) { - // the change for k8s configmap - event.Emit(proxy.ConfigFileChangedEvent{ - ReloadingState: proxy.ReloadingStateStart, - }) - } - - case err := <-watcher.Errors: - log.Printf("File watcher error: %v", err) - } - } + fmt.Println("Watching configuration for changes (poll-based, 2s interval)") + (&configwatcher.Watcher{ + Path: absConfigPath, + Interval: configwatcher.DefaultInterval, + OnChange: func() { + event.Emit(proxy.ConfigFileChangedEvent{ + ReloadingState: proxy.ReloadingStateStart, + }) + }, + }).Run(watcherCtx) }() } @@ -174,6 +179,7 @@ func main() { go func() { sig := <-sigChan fmt.Printf("Received signal %v, shutting down...\n", sig) + watcherCancel() ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() diff --git a/proxy/configwatcher/watcher.go b/proxy/configwatcher/watcher.go new file mode 100644 index 00000000..c2b7ccb3 --- /dev/null +++ b/proxy/configwatcher/watcher.go @@ -0,0 +1,85 @@ +// Package configwatcher provides a simple cross-platform file watcher based +// on os.Stat polling. It works correctly inside Docker containers where the +// config file is bind-mounted as an individual file, and for k8s ConfigMap +// projections (which present the file as a symlink to an atomically swapped +// target) — both cases where inotify-based watchers are unreliable. +package configwatcher + +import ( + "context" + "errors" + "io/fs" + "log" + "os" + "time" +) + +const DefaultInterval = 2 * time.Second + +type Watcher struct { + Path string + Interval time.Duration + OnChange func() +} + +type snapshot struct { + exists bool + modTime time.Time + size int64 +} + +// Run blocks until ctx is canceled. It polls Path on Interval and invokes +// OnChange whenever the file's modification time or size changes, or when +// the file reappears after being missing. The baseline poll establishes +// initial state and does not fire OnChange. +func (w *Watcher) Run(ctx context.Context) { + interval := w.Interval + if interval <= 0 { + interval = DefaultInterval + } + + prev := stat(w.Path) + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + cur := stat(w.Path) + if changed(prev, cur) && w.OnChange != nil { + w.OnChange() + } + prev = cur + } + } +} + +func stat(path string) snapshot { + fi, err := os.Stat(path) + if err != nil { + if !errors.Is(err, fs.ErrNotExist) { + log.Printf("configwatcher: stat %s: %v", path, err) + } + return snapshot{} + } + return snapshot{ + exists: true, + modTime: fi.ModTime(), + size: fi.Size(), + } +} + +func changed(prev, cur snapshot) bool { + // Present → missing: stay quiet (likely a transient rename-style write). + // Missing → present: fire so we reload as soon as the file comes back. + if !cur.exists { + return false + } + if !prev.exists { + return true + } + return !prev.modTime.Equal(cur.modTime) || prev.size != cur.size +} diff --git a/proxy/configwatcher/watcher_test.go b/proxy/configwatcher/watcher_test.go new file mode 100644 index 00000000..6426dd58 --- /dev/null +++ b/proxy/configwatcher/watcher_test.go @@ -0,0 +1,191 @@ +package configwatcher + +import ( + "context" + "os" + "path/filepath" + "runtime" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +const testInterval = 25 * time.Millisecond + +// startWatcher launches w.Run in a goroutine and returns a function that +// cancels the context and waits for Run to return. +func startWatcher(t *testing.T, w *Watcher) func() { + t.Helper() + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + w.Run(ctx) + close(done) + }() + return func() { + cancel() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("watcher did not stop within 2s of cancel") + } + } +} + +// waitForCount blocks until counter reaches want or timeout elapses. +func waitForCount(t *testing.T, counter *int64, want int64, timeout time.Duration) bool { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if atomic.LoadInt64(counter) >= want { + return true + } + time.Sleep(5 * time.Millisecond) + } + return false +} + +func TestWatcher_NoFireOnBaseline(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "config.yaml") + require.NoError(t, os.WriteFile(path, []byte("a"), 0o644)) + + var n int64 + stop := startWatcher(t, &Watcher{ + Path: path, + Interval: testInterval, + OnChange: func() { atomic.AddInt64(&n, 1) }, + }) + defer stop() + + time.Sleep(testInterval * 5) + require.Equal(t, int64(0), atomic.LoadInt64(&n), "baseline poll must not fire") +} + +func TestWatcher_DetectsModTimeChange(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "config.yaml") + require.NoError(t, os.WriteFile(path, []byte("a"), 0o644)) + + // Force a known baseline mtime. + base := time.Now().Add(-1 * time.Hour).Truncate(time.Second) + require.NoError(t, os.Chtimes(path, base, base)) + + var n int64 + stop := startWatcher(t, &Watcher{ + Path: path, + Interval: testInterval, + OnChange: func() { atomic.AddInt64(&n, 1) }, + }) + defer stop() + + // Let the baseline settle. + time.Sleep(testInterval * 2) + + // Bump mtime well above the baseline so low-resolution filesystems still notice. + require.NoError(t, os.Chtimes(path, base.Add(10*time.Second), base.Add(10*time.Second))) + + require.True(t, waitForCount(t, &n, 1, time.Second), "callback should fire after mtime change") +} + +func TestWatcher_DetectsSizeChangeWithSameModTime(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "config.yaml") + require.NoError(t, os.WriteFile(path, []byte("a"), 0o644)) + + fi, err := os.Stat(path) + require.NoError(t, err) + originalMtime := fi.ModTime() + + var n int64 + stop := startWatcher(t, &Watcher{ + Path: path, + Interval: testInterval, + OnChange: func() { atomic.AddInt64(&n, 1) }, + }) + defer stop() + time.Sleep(testInterval * 2) + + require.NoError(t, os.WriteFile(path, []byte("aaaaa"), 0o644)) + // Reset mtime back to the original so size is the only signal. + require.NoError(t, os.Chtimes(path, originalMtime, originalMtime)) + + require.True(t, waitForCount(t, &n, 1, time.Second), "callback should fire on size change") +} + +func TestWatcher_SymlinkTargetSwap(t *testing.T) { + dir := t.TempDir() + targetA := filepath.Join(dir, "targetA") + targetB := filepath.Join(dir, "targetB") + link := filepath.Join(dir, "config.yaml") + + require.NoError(t, os.WriteFile(targetA, []byte("AAAA"), 0o644)) + require.NoError(t, os.WriteFile(targetB, []byte("BBBBBBBB"), 0o644)) + + if err := os.Symlink(targetA, link); err != nil { + if runtime.GOOS == "windows" { + t.Skipf("symlink creation requires privilege on Windows: %v", err) + } + t.Fatalf("os.Symlink: %v", err) + } + + var n int64 + stop := startWatcher(t, &Watcher{ + Path: link, + Interval: testInterval, + OnChange: func() { atomic.AddInt64(&n, 1) }, + }) + defer stop() + time.Sleep(testInterval * 2) + + // Atomic symlink swap (k8s ConfigMap pattern): create new symlink at a + // temp name, then rename over the existing one. + tmpLink := filepath.Join(dir, "config.yaml.tmp") + require.NoError(t, os.Symlink(targetB, tmpLink)) + require.NoError(t, os.Rename(tmpLink, link)) + + require.True(t, waitForCount(t, &n, 1, time.Second), "callback should fire after symlink target swap") +} + +func TestWatcher_FileMissingThenReturns(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "config.yaml") + require.NoError(t, os.WriteFile(path, []byte("a"), 0o644)) + + var n int64 + stop := startWatcher(t, &Watcher{ + Path: path, + Interval: testInterval, + OnChange: func() { atomic.AddInt64(&n, 1) }, + }) + defer stop() + time.Sleep(testInterval * 2) + + require.NoError(t, os.Remove(path)) + time.Sleep(testInterval * 3) + require.Equal(t, int64(0), atomic.LoadInt64(&n), "removal alone must not fire") + + require.NoError(t, os.WriteFile(path, []byte("b"), 0o644)) + require.True(t, waitForCount(t, &n, 1, time.Second), "callback should fire when file returns") +} + +func TestWatcher_ContextCancelStopsRun(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "config.yaml") + require.NoError(t, os.WriteFile(path, []byte("a"), 0o644)) + + w := &Watcher{Path: path, Interval: testInterval} + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { w.Run(ctx); close(done) }() + + time.Sleep(testInterval * 2) + cancel() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("Run did not return within 2s of cancel") + } +}