mirror of
https://github.com/amir20/dozzle.git
synced 2026-06-23 04:10:12 +00:00
be352eec66
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
438 lines
13 KiB
Go
438 lines
13 KiB
Go
package notification
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"slices"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/amir20/dozzle/internal/container"
|
|
"github.com/amir20/dozzle/internal/notification/dispatcher"
|
|
"github.com/amir20/dozzle/internal/utils"
|
|
"github.com/amir20/dozzle/types"
|
|
"github.com/expr-lang/expr"
|
|
"github.com/puzpuzpuz/xsync/v4"
|
|
"github.com/rs/zerolog/log"
|
|
"golang.org/x/sync/semaphore"
|
|
)
|
|
|
|
// Manager manages notification subscriptions and dispatches notifications
|
|
type Manager struct {
|
|
subscriptions *xsync.Map[int, *Subscription]
|
|
dispatchers *xsync.Map[int, dispatcher.Dispatcher]
|
|
cloudDispatcher atomic.Pointer[dispatcher.Dispatcher]
|
|
subscriptionCounter atomic.Int32
|
|
dispatcherCounter atomic.Int32
|
|
listener *ContainerLogListener
|
|
statsListener *ContainerStatsListener
|
|
eventListener *ContainerEventListener
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
sendSem *semaphore.Weighted
|
|
}
|
|
|
|
// NewManager creates a new notification manager
|
|
func NewManager(listener *ContainerLogListener, statsListener *ContainerStatsListener, eventListener *ContainerEventListener) *Manager {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
m := &Manager{
|
|
subscriptions: xsync.NewMap[int, *Subscription](),
|
|
dispatchers: xsync.NewMap[int, dispatcher.Dispatcher](),
|
|
listener: listener,
|
|
statsListener: statsListener,
|
|
eventListener: eventListener,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
sendSem: semaphore.NewWeighted(5),
|
|
}
|
|
|
|
// Start processing log events from the listener
|
|
go m.processLogEvents()
|
|
|
|
// Start processing stat events from the stats listener
|
|
go m.processStatEvents()
|
|
|
|
// Start processing Docker events from the event listener
|
|
go m.processDockerEvents()
|
|
|
|
return m
|
|
}
|
|
|
|
// Start initializes the manager and starts the log listener
|
|
func (m *Manager) Start() error {
|
|
return m.listener.Start(m)
|
|
}
|
|
|
|
// ShouldListenToContainer implements ContainerMatcher interface
|
|
// Only matches log-based subscriptions (metric-only subscriptions don't need log streaming)
|
|
func (m *Manager) ShouldListenToContainer(c container.Container) bool {
|
|
// Pass empty host for matching - host fields aren't used in container expressions
|
|
notificationContainer := FromContainerModel(c, container.Host{})
|
|
|
|
shouldListen := false
|
|
m.subscriptions.Range(func(_ int, sub *Subscription) bool {
|
|
if sub.Enabled && sub.LogExpression != "" && sub.MatchesContainer(notificationContainer) {
|
|
shouldListen = true
|
|
return false
|
|
}
|
|
return true
|
|
})
|
|
return shouldListen
|
|
}
|
|
|
|
// AddSubscription adds a new subscription with compiled expressions
|
|
func (m *Manager) AddSubscription(sub *Subscription) error {
|
|
// Auto-increment ID using atomic counter
|
|
sub.ID = int(m.subscriptionCounter.Add(1))
|
|
sub.Enabled = true
|
|
sub.MetricCooldowns = xsync.NewMap[string, time.Time]()
|
|
sub.MetricSampleBuffers = xsync.NewMap[string, *utils.RingBuffer[bool]]()
|
|
sub.EventCooldowns = xsync.NewMap[string, time.Time]()
|
|
|
|
if err := sub.CompileExpressions(); err != nil {
|
|
return err
|
|
}
|
|
|
|
m.subscriptions.Store(sub.ID, sub)
|
|
log.Debug().Str("name", sub.Name).Int("id", sub.ID).Msg("Added subscription")
|
|
|
|
m.updateListeners()
|
|
|
|
return nil
|
|
}
|
|
|
|
// RemoveSubscription removes a subscription by ID
|
|
func (m *Manager) RemoveSubscription(id int) {
|
|
if sub, ok := m.subscriptions.LoadAndDelete(id); ok {
|
|
log.Debug().Int("id", id).Str("name", sub.Name).Msg("Removed subscription")
|
|
|
|
m.updateListeners()
|
|
}
|
|
}
|
|
|
|
// ReplaceSubscription replaces a subscription with new data
|
|
func (m *Manager) ReplaceSubscription(sub *Subscription) error {
|
|
sub.MetricCooldowns = xsync.NewMap[string, time.Time]()
|
|
sub.MetricSampleBuffers = xsync.NewMap[string, *utils.RingBuffer[bool]]()
|
|
sub.EventCooldowns = xsync.NewMap[string, time.Time]()
|
|
|
|
if err := sub.CompileExpressions(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Preserve enabled state from existing subscription if it exists
|
|
if existing, ok := m.subscriptions.Load(sub.ID); ok {
|
|
sub.Enabled = existing.Enabled
|
|
} else {
|
|
sub.Enabled = true
|
|
}
|
|
|
|
m.subscriptions.Store(sub.ID, sub)
|
|
log.Debug().Str("name", sub.Name).Int("id", sub.ID).Msg("Replaced subscription")
|
|
|
|
m.updateListeners()
|
|
|
|
return nil
|
|
}
|
|
|
|
// UpdateSubscription updates a subscription with the provided fields
|
|
func (m *Manager) UpdateSubscription(id int, updates map[string]any) error {
|
|
var updateErr error
|
|
_, ok := m.subscriptions.Compute(id, func(sub *Subscription, loaded bool) (*Subscription, xsync.ComputeOp) {
|
|
if !loaded {
|
|
updateErr = fmt.Errorf("subscription not found")
|
|
return nil, xsync.CancelOp
|
|
}
|
|
|
|
// Clone the subscription
|
|
updated := &Subscription{
|
|
ID: sub.ID,
|
|
Name: sub.Name,
|
|
Enabled: sub.Enabled,
|
|
DispatcherID: sub.DispatcherID,
|
|
ContainerExpression: sub.ContainerExpression,
|
|
ContainerProgram: sub.ContainerProgram,
|
|
LogExpression: sub.LogExpression,
|
|
LogProgram: sub.LogProgram,
|
|
MetricExpression: sub.MetricExpression,
|
|
MetricProgram: sub.MetricProgram,
|
|
EventExpression: sub.EventExpression,
|
|
EventProgram: sub.EventProgram,
|
|
EventCooldowns: sub.EventCooldowns,
|
|
Cooldown: sub.Cooldown,
|
|
SampleWindow: sub.SampleWindow,
|
|
MetricCooldowns: sub.MetricCooldowns,
|
|
MetricSampleBuffers: sub.MetricSampleBuffers,
|
|
TriggeredContainerIDs: sub.TriggeredContainerIDs,
|
|
}
|
|
|
|
// Preserve runtime stats (atomics can't be copied in struct literal)
|
|
updated.TriggerCount.Store(sub.TriggerCount.Load())
|
|
updated.LastTriggeredAt.Store(sub.LastTriggeredAt.Load())
|
|
|
|
// Apply updates to the clone
|
|
for key, value := range updates {
|
|
switch key {
|
|
case "name":
|
|
if name, ok := value.(string); ok {
|
|
updated.Name = name
|
|
}
|
|
case "enabled":
|
|
if enabled, ok := value.(bool); ok {
|
|
updated.Enabled = enabled
|
|
}
|
|
case "dispatcherId":
|
|
if dispatcherID, ok := value.(int); ok {
|
|
updated.DispatcherID = dispatcherID
|
|
}
|
|
case "containerExpression":
|
|
if exprStr, ok := value.(string); ok {
|
|
program, err := expr.Compile(exprStr, expr.Env(types.NotificationContainer{}))
|
|
if err != nil {
|
|
updateErr = fmt.Errorf("failed to compile container expression: %w", err)
|
|
return nil, xsync.CancelOp
|
|
}
|
|
updated.ContainerExpression = exprStr
|
|
updated.ContainerProgram = program
|
|
}
|
|
case "logExpression":
|
|
if exprStr, ok := value.(string); ok {
|
|
if exprStr != "" {
|
|
program, err := expr.Compile(exprStr, expr.Env(types.NotificationLog{}))
|
|
if err != nil {
|
|
updateErr = fmt.Errorf("failed to compile log expression: %w", err)
|
|
return nil, xsync.CancelOp
|
|
}
|
|
updated.LogExpression = exprStr
|
|
updated.LogProgram = program
|
|
} else {
|
|
updated.LogExpression = ""
|
|
updated.LogProgram = nil
|
|
}
|
|
}
|
|
case "metricExpression":
|
|
if exprStr, ok := value.(string); ok {
|
|
if exprStr != "" {
|
|
program, err := expr.Compile(exprStr, expr.Env(types.NotificationStat{}))
|
|
if err != nil {
|
|
updateErr = fmt.Errorf("failed to compile metric expression: %w", err)
|
|
return nil, xsync.CancelOp
|
|
}
|
|
updated.MetricExpression = exprStr
|
|
updated.MetricProgram = program
|
|
} else {
|
|
updated.MetricExpression = ""
|
|
updated.MetricProgram = nil
|
|
}
|
|
}
|
|
case "eventExpression":
|
|
if exprStr, ok := value.(string); ok {
|
|
if exprStr != "" {
|
|
program, err := expr.Compile(exprStr, expr.Env(types.NotificationEvent{}))
|
|
if err != nil {
|
|
updateErr = fmt.Errorf("failed to compile event expression: %w", err)
|
|
return nil, xsync.CancelOp
|
|
}
|
|
updated.EventExpression = exprStr
|
|
updated.EventProgram = program
|
|
} else {
|
|
updated.EventExpression = ""
|
|
updated.EventProgram = nil
|
|
}
|
|
}
|
|
case "cooldown":
|
|
if cd, ok := value.(int); ok {
|
|
updated.Cooldown = cd
|
|
}
|
|
case "sampleWindow":
|
|
if sw, ok := value.(int); ok {
|
|
updated.SampleWindow = sw
|
|
updated.MetricSampleBuffers = xsync.NewMap[string, *utils.RingBuffer[bool]]()
|
|
}
|
|
}
|
|
}
|
|
|
|
return updated, xsync.UpdateOp
|
|
})
|
|
|
|
if updateErr != nil {
|
|
return updateErr
|
|
}
|
|
|
|
if !ok {
|
|
return fmt.Errorf("subscription not found")
|
|
}
|
|
|
|
log.Debug().Int("id", id).Interface("updates", updates).Msg("Updated subscription")
|
|
|
|
m.updateListeners()
|
|
|
|
return nil
|
|
}
|
|
|
|
// updateListeners updates log and stats listeners based on current subscriptions
|
|
func (m *Manager) updateListeners() {
|
|
m.listener.UpdateStreams()
|
|
|
|
hasMetric := false
|
|
hasEvent := false
|
|
m.subscriptions.Range(func(_ int, sub *Subscription) bool {
|
|
if sub.Enabled && sub.IsMetricAlert() {
|
|
hasMetric = true
|
|
}
|
|
if sub.Enabled && sub.IsEventAlert() {
|
|
hasEvent = true
|
|
}
|
|
if hasMetric && hasEvent {
|
|
return false
|
|
}
|
|
return true
|
|
})
|
|
|
|
if hasMetric {
|
|
m.statsListener.Start()
|
|
} else {
|
|
m.statsListener.Stop()
|
|
}
|
|
|
|
if hasEvent {
|
|
m.eventListener.Start()
|
|
} else {
|
|
m.eventListener.Stop()
|
|
}
|
|
}
|
|
|
|
// AddDispatcher adds a dispatcher and returns its auto-generated ID
|
|
func (m *Manager) AddDispatcher(d dispatcher.Dispatcher) int {
|
|
id := int(m.dispatcherCounter.Add(1))
|
|
m.dispatchers.Store(id, d)
|
|
log.Debug().Int("id", id).Msg("Added dispatcher")
|
|
return id
|
|
}
|
|
|
|
// UpdateDispatcher updates a dispatcher by ID
|
|
func (m *Manager) UpdateDispatcher(id int, d dispatcher.Dispatcher) {
|
|
m.dispatchers.Store(id, d)
|
|
log.Debug().Int("id", id).Msg("Updated dispatcher")
|
|
}
|
|
|
|
// RemoveDispatcher removes a dispatcher by ID
|
|
func (m *Manager) RemoveDispatcher(id int) {
|
|
if _, ok := m.dispatchers.LoadAndDelete(id); ok {
|
|
log.Debug().Int("id", id).Msg("Removed dispatcher")
|
|
}
|
|
}
|
|
|
|
// SetCloudDispatcher sets the dedicated cloud dispatcher used for subscriptions with DispatcherID == 0.
|
|
func (m *Manager) SetCloudDispatcher(d dispatcher.Dispatcher) {
|
|
m.cloudDispatcher.Store(&d)
|
|
log.Debug().Msg("Set cloud dispatcher")
|
|
}
|
|
|
|
// ClearCloudDispatcher removes the cloud dispatcher.
|
|
func (m *Manager) ClearCloudDispatcher() {
|
|
m.cloudDispatcher.Store(nil)
|
|
log.Debug().Msg("Cleared cloud dispatcher")
|
|
}
|
|
|
|
// ResetCloudDispatcherBreaker clears the cloud dispatcher's circuit breaker, if set.
|
|
// No-op when no cloud dispatcher is registered.
|
|
func (m *Manager) ResetCloudDispatcherBreaker() {
|
|
if p := m.cloudDispatcher.Load(); p != nil {
|
|
if cd, ok := (*p).(*dispatcher.CloudDispatcher); ok {
|
|
cd.ResetBreaker()
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
// getDispatcher resolves a dispatcher by subscription's DispatcherID.
|
|
// DispatcherID == 0 means the cloud dispatcher; otherwise lookup in the dispatchers map.
|
|
func (m *Manager) getDispatcher(id int) (dispatcher.Dispatcher, bool) {
|
|
if id == 0 {
|
|
if p := m.cloudDispatcher.Load(); p != nil {
|
|
return *p, true
|
|
}
|
|
return nil, false
|
|
}
|
|
return m.dispatchers.Load(id)
|
|
}
|
|
|
|
// Subscriptions returns all subscriptions sorted by ID
|
|
func (m *Manager) Subscriptions() []*Subscription {
|
|
result := make([]*Subscription, 0)
|
|
m.subscriptions.Range(func(_ int, sub *Subscription) bool {
|
|
result = append(result, sub)
|
|
return true
|
|
})
|
|
slices.SortFunc(result, func(a, b *Subscription) int {
|
|
return a.ID - b.ID
|
|
})
|
|
return result
|
|
}
|
|
|
|
// GetNotificationStats returns runtime stats for all subscriptions
|
|
func (m *Manager) GetNotificationStats() []types.SubscriptionStats {
|
|
var stats []types.SubscriptionStats
|
|
m.subscriptions.Range(func(_ int, sub *Subscription) bool {
|
|
var containerIDs []string
|
|
if sub.TriggeredContainerIDs != nil {
|
|
sub.TriggeredContainerIDs.Range(func(id string, _ struct{}) bool {
|
|
containerIDs = append(containerIDs, id)
|
|
return true
|
|
})
|
|
}
|
|
|
|
var lastTriggered *time.Time
|
|
if t := sub.LastTriggeredAt.Load(); t != nil && !t.IsZero() {
|
|
lastTriggered = t
|
|
}
|
|
|
|
stats = append(stats, types.SubscriptionStats{
|
|
SubscriptionID: sub.ID,
|
|
TriggerCount: sub.TriggerCount.Load(),
|
|
LastTriggeredAt: lastTriggered,
|
|
TriggeredContainerIDs: containerIDs,
|
|
})
|
|
return true
|
|
})
|
|
return stats
|
|
}
|
|
|
|
// Dispatchers returns all dispatchers as DispatcherConfig sorted by ID.
|
|
// Includes the cloud dispatcher (ID 0) when configured.
|
|
func (m *Manager) Dispatchers() []DispatcherConfig {
|
|
result := make([]DispatcherConfig, 0)
|
|
|
|
// Include cloud dispatcher if configured
|
|
if p := m.cloudDispatcher.Load(); p != nil {
|
|
if cd, ok := (*p).(*dispatcher.CloudDispatcher); ok {
|
|
result = append(result, DispatcherConfig{
|
|
ID: 0,
|
|
Name: cd.Name,
|
|
Type: "cloud",
|
|
Prefix: cd.Prefix,
|
|
})
|
|
}
|
|
}
|
|
|
|
m.dispatchers.Range(func(id int, d dispatcher.Dispatcher) bool {
|
|
switch v := d.(type) {
|
|
case *dispatcher.WebhookDispatcher:
|
|
result = append(result, DispatcherConfig{
|
|
ID: id,
|
|
Name: v.Name,
|
|
Type: "webhook",
|
|
URL: v.URL,
|
|
Template: v.TemplateText,
|
|
Headers: v.Headers,
|
|
})
|
|
}
|
|
return true
|
|
})
|
|
slices.SortFunc(result, func(a, b DispatcherConfig) int {
|
|
return a.ID - b.ID
|
|
})
|
|
return result
|
|
}
|