From be352eec66b35df43e828966c9f928c67cc13de7 Mon Sep 17 00:00:00 2001 From: Amir Raminfar Date: Tue, 26 May 2026 07:03:05 -0700 Subject: [PATCH] fix: back off cloud notification dispatcher on invalid API key (#4747) Co-authored-by: Claude Opus 4.7 (1M context) --- internal/notification/dispatcher/cloud.go | 23 ++++++ .../notification/dispatcher/cloud_test.go | 72 +++++++++++++++++++ internal/notification/manager.go | 10 +++ internal/support/docker/multi_host_service.go | 6 ++ internal/support/k8s/k8s_cluster_service.go | 4 ++ internal/web/cloud.go | 4 ++ internal/web/routes.go | 1 + 7 files changed, 120 insertions(+) create mode 100644 internal/notification/dispatcher/cloud_test.go diff --git a/internal/notification/dispatcher/cloud.go b/internal/notification/dispatcher/cloud.go index 80df29e0..9df96720 100644 --- a/internal/notification/dispatcher/cloud.go +++ b/internal/notification/dispatcher/cloud.go @@ -53,6 +53,17 @@ func NewCloudDispatcher(name string, apiKey string, prefix string, expiresAt *ti const defaultRetryAfter = 60 * time.Second +// unauthorizedRetryAfter is how long to back off after an auth failure (invalid/expired +// API key). Retrying won't help until the user fixes their key, which recreates the +// dispatcher and resets the breaker. +const unauthorizedRetryAfter = 6 * time.Hour + +// ResetBreaker clears the circuit breaker so the next Send dials cloud again. +// Called when a cloud status check succeeds, proving the API key is valid. +func (c *CloudDispatcher) ResetBreaker() { + c.blockedUntil.Store(0) +} + // Send sends a notification to Dozzle Cloud func (c *CloudDispatcher) Send(ctx context.Context, notification types.Notification) error { if blockedUntil := c.blockedUntil.Load(); blockedUntil > 0 && time.Now().UnixNano() < blockedUntil { @@ -99,6 +110,18 @@ func (c *CloudDispatcher) Send(ctx context.Context, notification types.Notificat return fmt.Errorf("cloud rate limited, backing off for %s", retryAfter) } + if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { + limitedReader := io.LimitReader(resp.Body, 1024*1024) + responseBody, _ := io.ReadAll(limitedReader) + c.blockedUntil.Store(time.Now().Add(unauthorizedRetryAfter).UnixNano()) + log.Warn(). + Str("cloud", c.Name). + Int("status_code", resp.StatusCode). + Dur("retry_after", unauthorizedRetryAfter). + Msg("cloud rejected API key, circuit breaker tripped") + return fmt.Errorf("cloud returned status code %d: %s", resp.StatusCode, string(responseBody)) + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { limitedReader := io.LimitReader(resp.Body, 1024*1024) responseBody, _ := io.ReadAll(limitedReader) diff --git a/internal/notification/dispatcher/cloud_test.go b/internal/notification/dispatcher/cloud_test.go new file mode 100644 index 00000000..90059134 --- /dev/null +++ b/internal/notification/dispatcher/cloud_test.go @@ -0,0 +1,72 @@ +package dispatcher + +import ( + "context" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func newTestCloudDispatcher(url string) *CloudDispatcher { + return &CloudDispatcher{ + Name: "Dozzle Cloud", + URL: url, + APIKey: "test-key", + client: &http.Client{Timeout: 5 * time.Second}, + } +} + +// On a 401/403 the breaker trips and subsequent sends short-circuit without +// hitting cloud until the breaker is reset. +func TestCloudDispatcher_AuthFailureTripsBreaker(t *testing.T) { + for _, status := range []int{http.StatusUnauthorized, http.StatusForbidden} { + var hits atomic.Int32 + srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + hits.Add(1) + rw.WriteHeader(status) + rw.Write([]byte("Invalid API key\n")) + })) + + d := newTestCloudDispatcher(srv.URL) + + err := d.Send(context.Background(), newTestNotification("first")) + require.Error(t, err) + assert.EqualValues(t, 1, hits.Load(), "first send should reach cloud") + + err = d.Send(context.Background(), newTestNotification("second")) + require.Error(t, err) + assert.Contains(t, err.Error(), "rate limited") + assert.EqualValues(t, 1, hits.Load(), "breaker should block second send (status %d)", status) + + srv.Close() + } +} + +// ResetBreaker clears the circuit so the next send dials cloud again. +func TestCloudDispatcher_ResetBreaker(t *testing.T) { + var hits atomic.Int32 + srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + hits.Add(1) + rw.WriteHeader(http.StatusUnauthorized) + })) + defer srv.Close() + + d := newTestCloudDispatcher(srv.URL) + + require.Error(t, d.Send(context.Background(), newTestNotification("first"))) + require.EqualValues(t, 1, hits.Load()) + + // Blocked while breaker is open. + require.Error(t, d.Send(context.Background(), newTestNotification("blocked"))) + require.EqualValues(t, 1, hits.Load()) + + d.ResetBreaker() + + require.Error(t, d.Send(context.Background(), newTestNotification("after-reset"))) + assert.EqualValues(t, 2, hits.Load(), "send after reset should reach cloud again") +} diff --git a/internal/notification/manager.go b/internal/notification/manager.go index 78d516cd..1577d114 100644 --- a/internal/notification/manager.go +++ b/internal/notification/manager.go @@ -335,6 +335,16 @@ func (m *Manager) ClearCloudDispatcher() { log.Debug().Msg("Cleared cloud dispatcher") } +// ResetCloudDispatcherBreaker clears the cloud dispatcher's circuit breaker, if set. +// No-op when no cloud dispatcher is registered. +func (m *Manager) ResetCloudDispatcherBreaker() { + if p := m.cloudDispatcher.Load(); p != nil { + if cd, ok := (*p).(*dispatcher.CloudDispatcher); ok { + cd.ResetBreaker() + } + } +} + // getDispatcher resolves a dispatcher by subscription's DispatcherID. // DispatcherID == 0 means the cloud dispatcher; otherwise lookup in the dispatchers map. diff --git a/internal/support/docker/multi_host_service.go b/internal/support/docker/multi_host_service.go index 04c6c4c1..6bfd3362 100644 --- a/internal/support/docker/multi_host_service.go +++ b/internal/support/docker/multi_host_service.go @@ -263,6 +263,12 @@ func (m *MultiHostService) SetCloudStreamLogs(enabled bool) { m.persister.SetCloudStreamLogs(enabled) } +// ResetCloudDispatcherBreaker clears the cloud dispatcher's auth circuit breaker +// so notifications resume immediately once the key is known good again. +func (m *MultiHostService) ResetCloudDispatcherBreaker() { + m.notificationManager.ResetCloudDispatcherBreaker() +} + // RemoveCloudConfig clears the cloud config, removes the cloud dispatcher, deletes the file, // and broadcasts the change to all agents so they stop sending to cloud. func (m *MultiHostService) RemoveCloudConfig() { diff --git a/internal/support/k8s/k8s_cluster_service.go b/internal/support/k8s/k8s_cluster_service.go index fffcec18..b75fc85c 100644 --- a/internal/support/k8s/k8s_cluster_service.go +++ b/internal/support/k8s/k8s_cluster_service.go @@ -238,3 +238,7 @@ func (m *K8sClusterService) SetCloudStreamLogs(enabled bool) { func (m *K8sClusterService) RemoveCloudConfig() { m.persister.RemoveCloudConfig() } + +func (m *K8sClusterService) ResetCloudDispatcherBreaker() { + m.notificationManager.ResetCloudDispatcherBreaker() +} diff --git a/internal/web/cloud.go b/internal/web/cloud.go index 750c36ea..5f8a2a18 100644 --- a/internal/web/cloud.go +++ b/internal/web/cloud.go @@ -161,6 +161,10 @@ func (h *handler) cloudStatus(w http.ResponseWriter, r *http.Request) { return } + // A 200 proves the API key is valid, so clear any auth circuit breaker the + // notification dispatcher tripped on a prior 401/403. + h.hostService.ResetCloudDispatcherBreaker() + body, err := io.ReadAll(resp.Body) if err != nil { log.Error().Err(err).Msg("Failed to read cloud status response") diff --git a/internal/web/routes.go b/internal/web/routes.go index 6a731b6a..a4da7814 100644 --- a/internal/web/routes.go +++ b/internal/web/routes.go @@ -111,6 +111,7 @@ type HostService interface { SetCloudConfig(cc *notification.CloudConfig) SetCloudStreamLogs(enabled bool) RemoveCloudConfig() + ResetCloudDispatcherBreaker() } type handler struct {