Files
Amir Raminfar be352eec66 fix: back off cloud notification dispatcher on invalid API key (#4747)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-26 14:03:05 +00:00

438 lines
13 KiB
Go

package notification
import (
"context"
"fmt"
"slices"
"sync/atomic"
"time"
"github.com/amir20/dozzle/internal/container"
"github.com/amir20/dozzle/internal/notification/dispatcher"
"github.com/amir20/dozzle/internal/utils"
"github.com/amir20/dozzle/types"
"github.com/expr-lang/expr"
"github.com/puzpuzpuz/xsync/v4"
"github.com/rs/zerolog/log"
"golang.org/x/sync/semaphore"
)
// Manager manages notification subscriptions and dispatches notifications
type Manager struct {
subscriptions *xsync.Map[int, *Subscription]
dispatchers *xsync.Map[int, dispatcher.Dispatcher]
cloudDispatcher atomic.Pointer[dispatcher.Dispatcher]
subscriptionCounter atomic.Int32
dispatcherCounter atomic.Int32
listener *ContainerLogListener
statsListener *ContainerStatsListener
eventListener *ContainerEventListener
ctx context.Context
cancel context.CancelFunc
sendSem *semaphore.Weighted
}
// NewManager creates a new notification manager
func NewManager(listener *ContainerLogListener, statsListener *ContainerStatsListener, eventListener *ContainerEventListener) *Manager {
ctx, cancel := context.WithCancel(context.Background())
m := &Manager{
subscriptions: xsync.NewMap[int, *Subscription](),
dispatchers: xsync.NewMap[int, dispatcher.Dispatcher](),
listener: listener,
statsListener: statsListener,
eventListener: eventListener,
ctx: ctx,
cancel: cancel,
sendSem: semaphore.NewWeighted(5),
}
// Start processing log events from the listener
go m.processLogEvents()
// Start processing stat events from the stats listener
go m.processStatEvents()
// Start processing Docker events from the event listener
go m.processDockerEvents()
return m
}
// Start initializes the manager and starts the log listener
func (m *Manager) Start() error {
return m.listener.Start(m)
}
// ShouldListenToContainer implements ContainerMatcher interface
// Only matches log-based subscriptions (metric-only subscriptions don't need log streaming)
func (m *Manager) ShouldListenToContainer(c container.Container) bool {
// Pass empty host for matching - host fields aren't used in container expressions
notificationContainer := FromContainerModel(c, container.Host{})
shouldListen := false
m.subscriptions.Range(func(_ int, sub *Subscription) bool {
if sub.Enabled && sub.LogExpression != "" && sub.MatchesContainer(notificationContainer) {
shouldListen = true
return false
}
return true
})
return shouldListen
}
// AddSubscription adds a new subscription with compiled expressions
func (m *Manager) AddSubscription(sub *Subscription) error {
// Auto-increment ID using atomic counter
sub.ID = int(m.subscriptionCounter.Add(1))
sub.Enabled = true
sub.MetricCooldowns = xsync.NewMap[string, time.Time]()
sub.MetricSampleBuffers = xsync.NewMap[string, *utils.RingBuffer[bool]]()
sub.EventCooldowns = xsync.NewMap[string, time.Time]()
if err := sub.CompileExpressions(); err != nil {
return err
}
m.subscriptions.Store(sub.ID, sub)
log.Debug().Str("name", sub.Name).Int("id", sub.ID).Msg("Added subscription")
m.updateListeners()
return nil
}
// RemoveSubscription removes a subscription by ID
func (m *Manager) RemoveSubscription(id int) {
if sub, ok := m.subscriptions.LoadAndDelete(id); ok {
log.Debug().Int("id", id).Str("name", sub.Name).Msg("Removed subscription")
m.updateListeners()
}
}
// ReplaceSubscription replaces a subscription with new data
func (m *Manager) ReplaceSubscription(sub *Subscription) error {
sub.MetricCooldowns = xsync.NewMap[string, time.Time]()
sub.MetricSampleBuffers = xsync.NewMap[string, *utils.RingBuffer[bool]]()
sub.EventCooldowns = xsync.NewMap[string, time.Time]()
if err := sub.CompileExpressions(); err != nil {
return err
}
// Preserve enabled state from existing subscription if it exists
if existing, ok := m.subscriptions.Load(sub.ID); ok {
sub.Enabled = existing.Enabled
} else {
sub.Enabled = true
}
m.subscriptions.Store(sub.ID, sub)
log.Debug().Str("name", sub.Name).Int("id", sub.ID).Msg("Replaced subscription")
m.updateListeners()
return nil
}
// UpdateSubscription updates a subscription with the provided fields
func (m *Manager) UpdateSubscription(id int, updates map[string]any) error {
var updateErr error
_, ok := m.subscriptions.Compute(id, func(sub *Subscription, loaded bool) (*Subscription, xsync.ComputeOp) {
if !loaded {
updateErr = fmt.Errorf("subscription not found")
return nil, xsync.CancelOp
}
// Clone the subscription
updated := &Subscription{
ID: sub.ID,
Name: sub.Name,
Enabled: sub.Enabled,
DispatcherID: sub.DispatcherID,
ContainerExpression: sub.ContainerExpression,
ContainerProgram: sub.ContainerProgram,
LogExpression: sub.LogExpression,
LogProgram: sub.LogProgram,
MetricExpression: sub.MetricExpression,
MetricProgram: sub.MetricProgram,
EventExpression: sub.EventExpression,
EventProgram: sub.EventProgram,
EventCooldowns: sub.EventCooldowns,
Cooldown: sub.Cooldown,
SampleWindow: sub.SampleWindow,
MetricCooldowns: sub.MetricCooldowns,
MetricSampleBuffers: sub.MetricSampleBuffers,
TriggeredContainerIDs: sub.TriggeredContainerIDs,
}
// Preserve runtime stats (atomics can't be copied in struct literal)
updated.TriggerCount.Store(sub.TriggerCount.Load())
updated.LastTriggeredAt.Store(sub.LastTriggeredAt.Load())
// Apply updates to the clone
for key, value := range updates {
switch key {
case "name":
if name, ok := value.(string); ok {
updated.Name = name
}
case "enabled":
if enabled, ok := value.(bool); ok {
updated.Enabled = enabled
}
case "dispatcherId":
if dispatcherID, ok := value.(int); ok {
updated.DispatcherID = dispatcherID
}
case "containerExpression":
if exprStr, ok := value.(string); ok {
program, err := expr.Compile(exprStr, expr.Env(types.NotificationContainer{}))
if err != nil {
updateErr = fmt.Errorf("failed to compile container expression: %w", err)
return nil, xsync.CancelOp
}
updated.ContainerExpression = exprStr
updated.ContainerProgram = program
}
case "logExpression":
if exprStr, ok := value.(string); ok {
if exprStr != "" {
program, err := expr.Compile(exprStr, expr.Env(types.NotificationLog{}))
if err != nil {
updateErr = fmt.Errorf("failed to compile log expression: %w", err)
return nil, xsync.CancelOp
}
updated.LogExpression = exprStr
updated.LogProgram = program
} else {
updated.LogExpression = ""
updated.LogProgram = nil
}
}
case "metricExpression":
if exprStr, ok := value.(string); ok {
if exprStr != "" {
program, err := expr.Compile(exprStr, expr.Env(types.NotificationStat{}))
if err != nil {
updateErr = fmt.Errorf("failed to compile metric expression: %w", err)
return nil, xsync.CancelOp
}
updated.MetricExpression = exprStr
updated.MetricProgram = program
} else {
updated.MetricExpression = ""
updated.MetricProgram = nil
}
}
case "eventExpression":
if exprStr, ok := value.(string); ok {
if exprStr != "" {
program, err := expr.Compile(exprStr, expr.Env(types.NotificationEvent{}))
if err != nil {
updateErr = fmt.Errorf("failed to compile event expression: %w", err)
return nil, xsync.CancelOp
}
updated.EventExpression = exprStr
updated.EventProgram = program
} else {
updated.EventExpression = ""
updated.EventProgram = nil
}
}
case "cooldown":
if cd, ok := value.(int); ok {
updated.Cooldown = cd
}
case "sampleWindow":
if sw, ok := value.(int); ok {
updated.SampleWindow = sw
updated.MetricSampleBuffers = xsync.NewMap[string, *utils.RingBuffer[bool]]()
}
}
}
return updated, xsync.UpdateOp
})
if updateErr != nil {
return updateErr
}
if !ok {
return fmt.Errorf("subscription not found")
}
log.Debug().Int("id", id).Interface("updates", updates).Msg("Updated subscription")
m.updateListeners()
return nil
}
// updateListeners updates log and stats listeners based on current subscriptions
func (m *Manager) updateListeners() {
m.listener.UpdateStreams()
hasMetric := false
hasEvent := false
m.subscriptions.Range(func(_ int, sub *Subscription) bool {
if sub.Enabled && sub.IsMetricAlert() {
hasMetric = true
}
if sub.Enabled && sub.IsEventAlert() {
hasEvent = true
}
if hasMetric && hasEvent {
return false
}
return true
})
if hasMetric {
m.statsListener.Start()
} else {
m.statsListener.Stop()
}
if hasEvent {
m.eventListener.Start()
} else {
m.eventListener.Stop()
}
}
// AddDispatcher adds a dispatcher and returns its auto-generated ID
func (m *Manager) AddDispatcher(d dispatcher.Dispatcher) int {
id := int(m.dispatcherCounter.Add(1))
m.dispatchers.Store(id, d)
log.Debug().Int("id", id).Msg("Added dispatcher")
return id
}
// UpdateDispatcher updates a dispatcher by ID
func (m *Manager) UpdateDispatcher(id int, d dispatcher.Dispatcher) {
m.dispatchers.Store(id, d)
log.Debug().Int("id", id).Msg("Updated dispatcher")
}
// RemoveDispatcher removes a dispatcher by ID
func (m *Manager) RemoveDispatcher(id int) {
if _, ok := m.dispatchers.LoadAndDelete(id); ok {
log.Debug().Int("id", id).Msg("Removed dispatcher")
}
}
// SetCloudDispatcher sets the dedicated cloud dispatcher used for subscriptions with DispatcherID == 0.
func (m *Manager) SetCloudDispatcher(d dispatcher.Dispatcher) {
m.cloudDispatcher.Store(&d)
log.Debug().Msg("Set cloud dispatcher")
}
// ClearCloudDispatcher removes the cloud dispatcher.
func (m *Manager) ClearCloudDispatcher() {
m.cloudDispatcher.Store(nil)
log.Debug().Msg("Cleared cloud dispatcher")
}
// ResetCloudDispatcherBreaker clears the cloud dispatcher's circuit breaker, if set.
// No-op when no cloud dispatcher is registered.
func (m *Manager) ResetCloudDispatcherBreaker() {
if p := m.cloudDispatcher.Load(); p != nil {
if cd, ok := (*p).(*dispatcher.CloudDispatcher); ok {
cd.ResetBreaker()
}
}
}
// getDispatcher resolves a dispatcher by subscription's DispatcherID.
// DispatcherID == 0 means the cloud dispatcher; otherwise lookup in the dispatchers map.
func (m *Manager) getDispatcher(id int) (dispatcher.Dispatcher, bool) {
if id == 0 {
if p := m.cloudDispatcher.Load(); p != nil {
return *p, true
}
return nil, false
}
return m.dispatchers.Load(id)
}
// Subscriptions returns all subscriptions sorted by ID
func (m *Manager) Subscriptions() []*Subscription {
result := make([]*Subscription, 0)
m.subscriptions.Range(func(_ int, sub *Subscription) bool {
result = append(result, sub)
return true
})
slices.SortFunc(result, func(a, b *Subscription) int {
return a.ID - b.ID
})
return result
}
// GetNotificationStats returns runtime stats for all subscriptions
func (m *Manager) GetNotificationStats() []types.SubscriptionStats {
var stats []types.SubscriptionStats
m.subscriptions.Range(func(_ int, sub *Subscription) bool {
var containerIDs []string
if sub.TriggeredContainerIDs != nil {
sub.TriggeredContainerIDs.Range(func(id string, _ struct{}) bool {
containerIDs = append(containerIDs, id)
return true
})
}
var lastTriggered *time.Time
if t := sub.LastTriggeredAt.Load(); t != nil && !t.IsZero() {
lastTriggered = t
}
stats = append(stats, types.SubscriptionStats{
SubscriptionID: sub.ID,
TriggerCount: sub.TriggerCount.Load(),
LastTriggeredAt: lastTriggered,
TriggeredContainerIDs: containerIDs,
})
return true
})
return stats
}
// Dispatchers returns all dispatchers as DispatcherConfig sorted by ID.
// Includes the cloud dispatcher (ID 0) when configured.
func (m *Manager) Dispatchers() []DispatcherConfig {
result := make([]DispatcherConfig, 0)
// Include cloud dispatcher if configured
if p := m.cloudDispatcher.Load(); p != nil {
if cd, ok := (*p).(*dispatcher.CloudDispatcher); ok {
result = append(result, DispatcherConfig{
ID: 0,
Name: cd.Name,
Type: "cloud",
Prefix: cd.Prefix,
})
}
}
m.dispatchers.Range(func(id int, d dispatcher.Dispatcher) bool {
switch v := d.(type) {
case *dispatcher.WebhookDispatcher:
result = append(result, DispatcherConfig{
ID: id,
Name: v.Name,
Type: "webhook",
URL: v.URL,
Template: v.TemplateText,
Headers: v.Headers,
})
}
return true
})
slices.SortFunc(result, func(a, b DispatcherConfig) int {
return a.ID - b.ID
})
return result
}