mirror of
https://github.com/amir20/dozzle.git
synced 2026-06-23 04:10:12 +00:00
fix: back off cloud notification dispatcher on invalid API key (#4747)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -53,6 +53,17 @@ func NewCloudDispatcher(name string, apiKey string, prefix string, expiresAt *ti
|
||||
|
||||
const defaultRetryAfter = 60 * time.Second
|
||||
|
||||
// unauthorizedRetryAfter is how long to back off after an auth failure (invalid/expired
|
||||
// API key). Retrying won't help until the user fixes their key, which recreates the
|
||||
// dispatcher and resets the breaker.
|
||||
const unauthorizedRetryAfter = 6 * time.Hour
|
||||
|
||||
// ResetBreaker clears the circuit breaker so the next Send dials cloud again.
|
||||
// Called when a cloud status check succeeds, proving the API key is valid.
|
||||
func (c *CloudDispatcher) ResetBreaker() {
|
||||
c.blockedUntil.Store(0)
|
||||
}
|
||||
|
||||
// Send sends a notification to Dozzle Cloud
|
||||
func (c *CloudDispatcher) Send(ctx context.Context, notification types.Notification) error {
|
||||
if blockedUntil := c.blockedUntil.Load(); blockedUntil > 0 && time.Now().UnixNano() < blockedUntil {
|
||||
@@ -99,6 +110,18 @@ func (c *CloudDispatcher) Send(ctx context.Context, notification types.Notificat
|
||||
return fmt.Errorf("cloud rate limited, backing off for %s", retryAfter)
|
||||
}
|
||||
|
||||
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
|
||||
limitedReader := io.LimitReader(resp.Body, 1024*1024)
|
||||
responseBody, _ := io.ReadAll(limitedReader)
|
||||
c.blockedUntil.Store(time.Now().Add(unauthorizedRetryAfter).UnixNano())
|
||||
log.Warn().
|
||||
Str("cloud", c.Name).
|
||||
Int("status_code", resp.StatusCode).
|
||||
Dur("retry_after", unauthorizedRetryAfter).
|
||||
Msg("cloud rejected API key, circuit breaker tripped")
|
||||
return fmt.Errorf("cloud returned status code %d: %s", resp.StatusCode, string(responseBody))
|
||||
}
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
limitedReader := io.LimitReader(resp.Body, 1024*1024)
|
||||
responseBody, _ := io.ReadAll(limitedReader)
|
||||
|
||||
@@ -0,0 +1,72 @@
|
||||
package dispatcher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func newTestCloudDispatcher(url string) *CloudDispatcher {
|
||||
return &CloudDispatcher{
|
||||
Name: "Dozzle Cloud",
|
||||
URL: url,
|
||||
APIKey: "test-key",
|
||||
client: &http.Client{Timeout: 5 * time.Second},
|
||||
}
|
||||
}
|
||||
|
||||
// On a 401/403 the breaker trips and subsequent sends short-circuit without
|
||||
// hitting cloud until the breaker is reset.
|
||||
func TestCloudDispatcher_AuthFailureTripsBreaker(t *testing.T) {
|
||||
for _, status := range []int{http.StatusUnauthorized, http.StatusForbidden} {
|
||||
var hits atomic.Int32
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
|
||||
hits.Add(1)
|
||||
rw.WriteHeader(status)
|
||||
rw.Write([]byte("Invalid API key\n"))
|
||||
}))
|
||||
|
||||
d := newTestCloudDispatcher(srv.URL)
|
||||
|
||||
err := d.Send(context.Background(), newTestNotification("first"))
|
||||
require.Error(t, err)
|
||||
assert.EqualValues(t, 1, hits.Load(), "first send should reach cloud")
|
||||
|
||||
err = d.Send(context.Background(), newTestNotification("second"))
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "rate limited")
|
||||
assert.EqualValues(t, 1, hits.Load(), "breaker should block second send (status %d)", status)
|
||||
|
||||
srv.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// ResetBreaker clears the circuit so the next send dials cloud again.
|
||||
func TestCloudDispatcher_ResetBreaker(t *testing.T) {
|
||||
var hits atomic.Int32
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
|
||||
hits.Add(1)
|
||||
rw.WriteHeader(http.StatusUnauthorized)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
d := newTestCloudDispatcher(srv.URL)
|
||||
|
||||
require.Error(t, d.Send(context.Background(), newTestNotification("first")))
|
||||
require.EqualValues(t, 1, hits.Load())
|
||||
|
||||
// Blocked while breaker is open.
|
||||
require.Error(t, d.Send(context.Background(), newTestNotification("blocked")))
|
||||
require.EqualValues(t, 1, hits.Load())
|
||||
|
||||
d.ResetBreaker()
|
||||
|
||||
require.Error(t, d.Send(context.Background(), newTestNotification("after-reset")))
|
||||
assert.EqualValues(t, 2, hits.Load(), "send after reset should reach cloud again")
|
||||
}
|
||||
@@ -335,6 +335,16 @@ func (m *Manager) ClearCloudDispatcher() {
|
||||
log.Debug().Msg("Cleared cloud dispatcher")
|
||||
}
|
||||
|
||||
// ResetCloudDispatcherBreaker clears the cloud dispatcher's circuit breaker, if set.
|
||||
// No-op when no cloud dispatcher is registered.
|
||||
func (m *Manager) ResetCloudDispatcherBreaker() {
|
||||
if p := m.cloudDispatcher.Load(); p != nil {
|
||||
if cd, ok := (*p).(*dispatcher.CloudDispatcher); ok {
|
||||
cd.ResetBreaker()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// getDispatcher resolves a dispatcher by subscription's DispatcherID.
|
||||
// DispatcherID == 0 means the cloud dispatcher; otherwise lookup in the dispatchers map.
|
||||
|
||||
@@ -263,6 +263,12 @@ func (m *MultiHostService) SetCloudStreamLogs(enabled bool) {
|
||||
m.persister.SetCloudStreamLogs(enabled)
|
||||
}
|
||||
|
||||
// ResetCloudDispatcherBreaker clears the cloud dispatcher's auth circuit breaker
|
||||
// so notifications resume immediately once the key is known good again.
|
||||
func (m *MultiHostService) ResetCloudDispatcherBreaker() {
|
||||
m.notificationManager.ResetCloudDispatcherBreaker()
|
||||
}
|
||||
|
||||
// RemoveCloudConfig clears the cloud config, removes the cloud dispatcher, deletes the file,
|
||||
// and broadcasts the change to all agents so they stop sending to cloud.
|
||||
func (m *MultiHostService) RemoveCloudConfig() {
|
||||
|
||||
@@ -238,3 +238,7 @@ func (m *K8sClusterService) SetCloudStreamLogs(enabled bool) {
|
||||
func (m *K8sClusterService) RemoveCloudConfig() {
|
||||
m.persister.RemoveCloudConfig()
|
||||
}
|
||||
|
||||
func (m *K8sClusterService) ResetCloudDispatcherBreaker() {
|
||||
m.notificationManager.ResetCloudDispatcherBreaker()
|
||||
}
|
||||
|
||||
@@ -161,6 +161,10 @@ func (h *handler) cloudStatus(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// A 200 proves the API key is valid, so clear any auth circuit breaker the
|
||||
// notification dispatcher tripped on a prior 401/403.
|
||||
h.hostService.ResetCloudDispatcherBreaker()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to read cloud status response")
|
||||
|
||||
@@ -111,6 +111,7 @@ type HostService interface {
|
||||
SetCloudConfig(cc *notification.CloudConfig)
|
||||
SetCloudStreamLogs(enabled bool)
|
||||
RemoveCloudConfig()
|
||||
ResetCloudDispatcherBreaker()
|
||||
}
|
||||
|
||||
type handler struct {
|
||||
|
||||
Reference in New Issue
Block a user