mirror of
https://github.com/amir20/dozzle.git
synced 2026-06-23 04:10:12 +00:00
WIP: notificatons
This commit is contained in:
@@ -57,6 +57,7 @@ require (
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect
|
||||
github.com/distribution/reference v0.6.0 // indirect
|
||||
github.com/emicklei/go-restful/v3 v3.12.2 // indirect
|
||||
github.com/expr-lang/expr v1.17.7 // indirect
|
||||
github.com/felixge/httpsnoop v1.0.4 // indirect
|
||||
github.com/fxamacker/cbor/v2 v2.9.0 // indirect
|
||||
github.com/go-logr/logr v1.4.3 // indirect
|
||||
|
||||
@@ -78,6 +78,8 @@ github.com/emicklei/go-restful/v3 v3.12.1 h1:PJMDIM/ak7btuL8Ex0iYET9hxM3CI2sjZtz
|
||||
github.com/emicklei/go-restful/v3 v3.12.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
|
||||
github.com/emicklei/go-restful/v3 v3.12.2 h1:DhwDP0vY3k8ZzE0RunuJy8GhNpPL6zqLkDf9B/a0/xU=
|
||||
github.com/emicklei/go-restful/v3 v3.12.2/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
|
||||
github.com/expr-lang/expr v1.17.7 h1:Q0xY/e/2aCIp8g9s/LGvMDCC5PxYlvHgDZRQ4y16JX8=
|
||||
github.com/expr-lang/expr v1.17.7/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
|
||||
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
|
||||
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
|
||||
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
|
||||
|
||||
@@ -0,0 +1,325 @@
|
||||
package notification
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/amir20/dozzle/internal/container"
|
||||
container_support "github.com/amir20/dozzle/internal/support/container"
|
||||
"github.com/expr-lang/expr"
|
||||
"github.com/expr-lang/expr/vm"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
// Manager handles notification subscriptions and matching
|
||||
type Manager struct {
|
||||
subscriptions []*compiledSubscription
|
||||
mu sync.RWMutex
|
||||
activeStreams map[string]*activeStream // containerID -> stream info
|
||||
containerCache map[string]Container // containerID -> notification.Container
|
||||
containerService ContainerService
|
||||
dispatcher Dispatcher
|
||||
}
|
||||
|
||||
// activeStream tracks a single log stream and all subscriptions watching it
|
||||
type activeStream struct {
|
||||
container Container // notification.Container
|
||||
subscriptions []*compiledSubscription // All subscriptions watching this container
|
||||
cancel context.CancelFunc // Cancel function for the stream
|
||||
}
|
||||
|
||||
// compiledSubscription wraps a Subscription with compiled expr programs
|
||||
type compiledSubscription struct {
|
||||
subscription *Subscription
|
||||
containerProgram *vm.Program // Always non-nil
|
||||
logProgram *vm.Program // Always non-nil
|
||||
}
|
||||
|
||||
// ContainerService provides access to containers and log streaming
|
||||
type ContainerService interface {
|
||||
ListAllContainers(labels container.ContainerLabels) ([]container.Container, []error)
|
||||
FindContainer(host, id string, labels container.ContainerLabels) (*container_support.ContainerService, error)
|
||||
SubscribeContainersStarted(ctx context.Context, containers chan<- container.Container, filter container_support.ContainerFilter)
|
||||
}
|
||||
|
||||
// NewManager creates a new notification manager
|
||||
func NewManager(containerService ContainerService, dispatcher Dispatcher) *Manager {
|
||||
return &Manager{
|
||||
subscriptions: make([]*compiledSubscription, 0),
|
||||
activeStreams: make(map[string]*activeStream),
|
||||
containerCache: make(map[string]Container),
|
||||
containerService: containerService,
|
||||
dispatcher: dispatcher,
|
||||
}
|
||||
}
|
||||
|
||||
// LoadSubscriptions loads and compiles multiple subscriptions (replaces existing)
|
||||
func (m *Manager) LoadSubscriptions(subs []*Subscription) error {
|
||||
m.mu.Lock()
|
||||
m.subscriptions = make([]*compiledSubscription, 0, len(subs))
|
||||
m.mu.Unlock()
|
||||
|
||||
for _, sub := range subs {
|
||||
if err := m.AddSubscription(sub); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
log.Info().Int("count", len(subs)).Msg("loaded notification subscriptions")
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddSubscription adds a single subscription
|
||||
func (m *Manager) AddSubscription(sub *Subscription) error {
|
||||
cs, err := m.compileSubscription(sub)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
m.subscriptions = append(m.subscriptions, cs)
|
||||
m.mu.Unlock()
|
||||
|
||||
log.Info().Str("subscription", sub.Name).Msg("added notification subscription")
|
||||
return nil
|
||||
}
|
||||
|
||||
// compileSubscription compiles expr programs for a subscription
|
||||
func (m *Manager) compileSubscription(sub *Subscription) (*compiledSubscription, error) {
|
||||
if sub.ContainerFilter == "" {
|
||||
return nil, fmt.Errorf("container_filter is required for subscription %q", sub.Name)
|
||||
}
|
||||
if sub.LogFilter == "" {
|
||||
return nil, fmt.Errorf("log_filter is required for subscription %q", sub.Name)
|
||||
}
|
||||
|
||||
cs := &compiledSubscription{
|
||||
subscription: sub,
|
||||
}
|
||||
|
||||
// Compile container filter (required)
|
||||
containerProgram, err := expr.Compile(sub.ContainerFilter, expr.Env(Container{}))
|
||||
if err != nil {
|
||||
log.Error().Err(err).Str("subscription", sub.Name).Msg("failed to compile container filter")
|
||||
return nil, fmt.Errorf("failed to compile container_filter for %q: %w", sub.Name, err)
|
||||
}
|
||||
cs.containerProgram = containerProgram
|
||||
|
||||
// Compile log filter (required)
|
||||
logProgram, err := expr.Compile(sub.LogFilter, expr.Env(Log{}))
|
||||
if err != nil {
|
||||
log.Error().Err(err).Str("subscription", sub.Name).Msg("failed to compile log filter")
|
||||
return nil, fmt.Errorf("failed to compile log_filter for %q: %w", sub.Name, err)
|
||||
}
|
||||
cs.logProgram = logProgram
|
||||
|
||||
return cs, nil
|
||||
}
|
||||
|
||||
// Start begins monitoring containers and streaming logs for matching subscriptions
|
||||
// Only starts if there are subscriptions configured
|
||||
func (m *Manager) Start(ctx context.Context) error {
|
||||
m.mu.RLock()
|
||||
hasSubscriptions := len(m.subscriptions) > 0
|
||||
m.mu.RUnlock()
|
||||
|
||||
if !hasSubscriptions {
|
||||
log.Debug().Msg("no subscriptions configured, skipping notification manager start")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Subscribe to new containers that match our filters
|
||||
newContainers := make(chan container.Container)
|
||||
m.containerService.SubscribeContainersStarted(ctx, newContainers, func(c *container.Container) bool {
|
||||
matchingSubs := m.getMatchingSubscriptions(c)
|
||||
return len(matchingSubs) > 0
|
||||
})
|
||||
|
||||
// Handle new containers in background
|
||||
go func() {
|
||||
for c := range newContainers {
|
||||
m.startContainerStream(ctx, &c)
|
||||
}
|
||||
}()
|
||||
|
||||
// Get all existing containers and start streaming matching ones
|
||||
containers, errs := m.containerService.ListAllContainers(nil)
|
||||
for _, err := range errs {
|
||||
log.Warn().Err(err).Msg("error listing containers for notifications")
|
||||
}
|
||||
|
||||
// Check each container and start streaming only if it matches any subscription
|
||||
for _, c := range containers {
|
||||
if c.State == "running" {
|
||||
matchingSubs := m.getMatchingSubscriptions(&c)
|
||||
if len(matchingSubs) > 0 {
|
||||
m.startContainerStream(ctx, &c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// startContainerStream starts a single log stream for a container
|
||||
// All matching subscriptions will receive logs from this one stream
|
||||
func (m *Manager) startContainerStream(ctx context.Context, c *container.Container) {
|
||||
// Check if already streaming
|
||||
m.mu.RLock()
|
||||
_, exists := m.activeStreams[c.ID]
|
||||
m.mu.RUnlock()
|
||||
|
||||
if exists {
|
||||
return
|
||||
}
|
||||
|
||||
// Find which subscriptions match this container
|
||||
matchingSubs := m.getMatchingSubscriptions(c)
|
||||
if len(matchingSubs) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug().
|
||||
Str("container", c.Name).
|
||||
Int("subscriptions", len(matchingSubs)).
|
||||
Msg("starting log stream for notification subscriptions")
|
||||
|
||||
// Convert to notification.Container and cache it
|
||||
notifContainer := NewContainer(c)
|
||||
m.mu.Lock()
|
||||
m.containerCache[c.ID] = notifContainer
|
||||
m.mu.Unlock()
|
||||
|
||||
streamCtx, cancel := context.WithCancel(ctx)
|
||||
|
||||
stream := &activeStream{
|
||||
container: notifContainer,
|
||||
subscriptions: matchingSubs,
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
m.activeStreams[c.ID] = stream
|
||||
m.mu.Unlock()
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
m.mu.Lock()
|
||||
delete(m.activeStreams, c.ID)
|
||||
m.mu.Unlock()
|
||||
}()
|
||||
|
||||
logs := make(chan *container.LogEvent)
|
||||
|
||||
// Stream logs in separate goroutine
|
||||
go func() {
|
||||
containerService, err := m.containerService.FindContainer(c.Host, c.ID, nil)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("error finding container for notification streaming")
|
||||
return
|
||||
}
|
||||
|
||||
err = containerService.StreamLogs(streamCtx, time.Now(), container.STDOUT|container.STDERR, logs)
|
||||
if err != nil && err != context.Canceled {
|
||||
log.Error().Err(err).Msg("error streaming logs for notification")
|
||||
}
|
||||
close(logs)
|
||||
}()
|
||||
|
||||
// Process logs - check against all matching subscriptions
|
||||
for logEvent := range logs {
|
||||
for _, cs := range matchingSubs {
|
||||
m.processLogEvent(logEvent, c.ID, cs)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// getMatchingSubscriptions returns subscriptions that match a container
|
||||
func (m *Manager) getMatchingSubscriptions(c *container.Container) []*compiledSubscription {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
containerCtx := NewContainer(c)
|
||||
matching := make([]*compiledSubscription, 0)
|
||||
|
||||
for _, cs := range m.subscriptions {
|
||||
if m.matchesContainer(cs.containerProgram, containerCtx) {
|
||||
matching = append(matching, cs)
|
||||
}
|
||||
}
|
||||
|
||||
return matching
|
||||
}
|
||||
|
||||
// matchesContainer evaluates the container filter expression
|
||||
func (m *Manager) matchesContainer(program *vm.Program, containerCtx Container) bool {
|
||||
result, err := expr.Run(program, containerCtx)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("error evaluating container filter")
|
||||
return false
|
||||
}
|
||||
|
||||
match, ok := result.(bool)
|
||||
if !ok {
|
||||
log.Error().Msg("container filter did not return boolean")
|
||||
return false
|
||||
}
|
||||
|
||||
return match
|
||||
}
|
||||
|
||||
// processLogEvent evaluates a log event against the subscription's log filter
|
||||
func (m *Manager) processLogEvent(logEvent *container.LogEvent, containerID string, cs *compiledSubscription) {
|
||||
// Lookup notification.Container from cache
|
||||
m.mu.RLock()
|
||||
notifContainer, ok := m.containerCache[containerID]
|
||||
m.mu.RUnlock()
|
||||
|
||||
if !ok {
|
||||
log.Warn().Str("container", containerID).Msg("container not in cache")
|
||||
return
|
||||
}
|
||||
|
||||
logCtx := NewLog(logEvent, notifContainer)
|
||||
|
||||
// Evaluate log filter (program is never nil)
|
||||
if m.matchesLog(cs.logProgram, logCtx) {
|
||||
m.triggerWebhook(cs.subscription, logCtx)
|
||||
}
|
||||
}
|
||||
|
||||
// matchesLog evaluates the log filter expression
|
||||
func (m *Manager) matchesLog(program *vm.Program, logCtx Log) bool {
|
||||
result, err := expr.Run(program, logCtx)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("error evaluating log filter")
|
||||
return false
|
||||
}
|
||||
|
||||
match, ok := result.(bool)
|
||||
if !ok {
|
||||
log.Error().Msg("log filter did not return boolean")
|
||||
return false
|
||||
}
|
||||
|
||||
return match
|
||||
}
|
||||
|
||||
// triggerWebhook sends the notification
|
||||
func (m *Manager) triggerWebhook(sub *Subscription, logCtx Log) {
|
||||
payload := WebhookPayload{
|
||||
SubscriptionName: sub.Name,
|
||||
Timestamp: time.Now(),
|
||||
Container: logCtx.Container,
|
||||
Log: LogEvent{
|
||||
Message: logCtx.Message,
|
||||
Level: logCtx.Level,
|
||||
Timestamp: logCtx.Timestamp,
|
||||
},
|
||||
}
|
||||
|
||||
m.dispatcher.Dispatch(sub, payload)
|
||||
}
|
||||
@@ -0,0 +1,87 @@
|
||||
package notification
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// Config represents the YAML configuration structure
|
||||
type Config struct {
|
||||
Dispatcher DispatcherConfig `yaml:"dispatcher"`
|
||||
Subscriptions []*Subscription `yaml:"subscriptions"`
|
||||
}
|
||||
|
||||
// LoadFromFile loads config and returns a fully configured Manager
|
||||
func LoadFromFile(path string, containerService ContainerService) (*Manager, error) {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil, nil // No config file is fine, return nil
|
||||
}
|
||||
return nil, fmt.Errorf("failed to read notifications config: %w", err)
|
||||
}
|
||||
|
||||
var config Config
|
||||
if err := yaml.Unmarshal(data, &config); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse notifications config: %w", err)
|
||||
}
|
||||
|
||||
// Validate dispatcher config
|
||||
if config.Dispatcher.Type == "" {
|
||||
config.Dispatcher.Type = "simple" // Default to simple
|
||||
}
|
||||
if config.Dispatcher.URL == "" {
|
||||
return nil, fmt.Errorf("dispatcher.url is required")
|
||||
}
|
||||
|
||||
// Validate subscriptions
|
||||
for _, sub := range config.Subscriptions {
|
||||
if sub.Name == "" {
|
||||
return nil, fmt.Errorf("subscription missing name")
|
||||
}
|
||||
if sub.ContainerFilter == "" {
|
||||
return nil, fmt.Errorf("subscription %q missing container_filter", sub.Name)
|
||||
}
|
||||
if sub.LogFilter == "" {
|
||||
return nil, fmt.Errorf("subscription %q missing log_filter", sub.Name)
|
||||
}
|
||||
}
|
||||
|
||||
// Create dispatcher based on type
|
||||
var dispatcher Dispatcher
|
||||
switch config.Dispatcher.Type {
|
||||
case "simple":
|
||||
dispatcher = NewWebhookDispatcher(&config.Dispatcher)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported dispatcher type: %s", config.Dispatcher.Type)
|
||||
}
|
||||
|
||||
// Create and configure manager
|
||||
manager := NewManager(containerService, dispatcher)
|
||||
if err := manager.LoadSubscriptions(config.Subscriptions); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return manager, nil
|
||||
}
|
||||
|
||||
// SaveToFile saves dispatcher config and subscriptions to a YAML file
|
||||
func SaveToFile(path string, dispatcher *DispatcherConfig, subs []*Subscription) error {
|
||||
config := Config{
|
||||
Dispatcher: *dispatcher,
|
||||
Subscriptions: subs,
|
||||
}
|
||||
|
||||
data, err := yaml.Marshal(&config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal notifications config: %w", err)
|
||||
}
|
||||
|
||||
if err := os.WriteFile(path, data, 0644); err != nil {
|
||||
return fmt.Errorf("failed to write notifications config: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,104 @@
|
||||
package notification
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/amir20/dozzle/internal/container"
|
||||
)
|
||||
|
||||
// DispatcherConfig represents global dispatcher configuration
|
||||
type DispatcherConfig struct {
|
||||
// Type determines the dispatcher type (webhook, dozzle-service, etc.)
|
||||
Type string `yaml:"type"` // "webhook" is default
|
||||
|
||||
// URL is the endpoint for the dispatcher service
|
||||
URL string `yaml:"url"`
|
||||
|
||||
// APIKey for authenticating with the dispatcher service
|
||||
APIKey string `yaml:"api_key,omitempty"`
|
||||
|
||||
// Headers to include in all requests
|
||||
Headers map[string]string `yaml:"headers,omitempty"`
|
||||
}
|
||||
|
||||
// Subscription represents a notification rule
|
||||
type Subscription struct {
|
||||
// Name is a human-readable name for this subscription
|
||||
Name string `yaml:"name"`
|
||||
|
||||
// ContainerFilter is an expr expression to match containers (REQUIRED)
|
||||
// Expression receives a Container object with: Name, Labels, Host, State
|
||||
// Example: Name == "nginx" || Labels["app"] == "web"
|
||||
ContainerFilter string `yaml:"container_filter"`
|
||||
|
||||
// LogFilter is an expr expression to match log events (REQUIRED)
|
||||
// Expression receives a Log object with: Message, Level, Timestamp, Container
|
||||
// Example: Level == "error" || Message contains "panic"
|
||||
LogFilter string `yaml:"log_filter"`
|
||||
}
|
||||
|
||||
// Container represents container information for expr evaluation (internal type)
|
||||
type Container struct {
|
||||
Name string `expr:"Name"`
|
||||
Labels map[string]string `expr:"Labels"`
|
||||
Host string `expr:"Host"`
|
||||
State string `expr:"State"`
|
||||
}
|
||||
|
||||
// Log represents log event information for expr evaluation
|
||||
type Log struct {
|
||||
Message any `expr:"Message"`
|
||||
Level string `expr:"Level"`
|
||||
Timestamp time.Time `expr:"Timestamp"`
|
||||
Container Container `expr:"Container"`
|
||||
}
|
||||
|
||||
// WebhookPayload is the structure sent to webhook endpoints
|
||||
type WebhookPayload struct {
|
||||
SubscriptionName string `json:"subscription_name"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Container Container `json:"container"`
|
||||
Log LogEvent `json:"log"`
|
||||
}
|
||||
|
||||
// LogEvent represents the log in the webhook payload
|
||||
type LogEvent struct {
|
||||
Message any `json:"message"`
|
||||
Level string `json:"level"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
}
|
||||
|
||||
// NewContainer converts container.Container to notification.Container
|
||||
func NewContainer(c *container.Container) Container {
|
||||
return Container{
|
||||
Name: c.Name,
|
||||
Labels: c.Labels,
|
||||
Host: c.Host,
|
||||
State: c.State,
|
||||
}
|
||||
}
|
||||
|
||||
// NewLog creates a Log from container.LogEvent and notification.Container
|
||||
func NewLog(logEvent *container.LogEvent, notifContainer Container) Log {
|
||||
timestamp := time.Unix(0, logEvent.Timestamp)
|
||||
|
||||
// Extract message - keep as any type for expr flexibility
|
||||
var message any = logEvent.Message
|
||||
|
||||
// For grouped logs, join fragments into single string for easier matching
|
||||
if fragments, ok := logEvent.Message.([]container.LogFragment); ok {
|
||||
parts := make([]string, len(fragments))
|
||||
for i, frag := range fragments {
|
||||
parts[i] = frag.Message
|
||||
}
|
||||
message = strings.Join(parts, "\n")
|
||||
}
|
||||
|
||||
return Log{
|
||||
Message: message,
|
||||
Level: logEvent.Level,
|
||||
Timestamp: timestamp,
|
||||
Container: notifContainer,
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,83 @@
|
||||
package notification
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
// Dispatcher is the interface for sending notifications
|
||||
type Dispatcher interface {
|
||||
Dispatch(subscription *Subscription, payload WebhookPayload)
|
||||
}
|
||||
|
||||
// WebhookDispatcher handles async webhook delivery
|
||||
type WebhookDispatcher struct {
|
||||
client *http.Client
|
||||
config *DispatcherConfig
|
||||
}
|
||||
|
||||
// NewWebhookDispatcher creates a new webhook dispatcher
|
||||
func NewWebhookDispatcher(config *DispatcherConfig) *WebhookDispatcher {
|
||||
return &WebhookDispatcher{
|
||||
client: &http.Client{
|
||||
Timeout: 10 * time.Second,
|
||||
},
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
// Dispatch sends a webhook payload asynchronously
|
||||
func (d *WebhookDispatcher) Dispatch(subscription *Subscription, payload WebhookPayload) {
|
||||
url := d.config.URL
|
||||
go func() {
|
||||
body, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Str("url", url).Msg("failed to marshal webhook payload")
|
||||
return
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
log.Error().Err(err).Str("url", url).Msg("failed to create webhook request")
|
||||
return
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("User-Agent", "Dozzle-Notifications/1.0")
|
||||
|
||||
// Add API key if configured
|
||||
if d.config.APIKey != "" {
|
||||
req.Header.Set("Authorization", "Bearer "+d.config.APIKey)
|
||||
}
|
||||
|
||||
// Add custom headers
|
||||
for key, value := range d.config.Headers {
|
||||
req.Header.Set(key, value)
|
||||
}
|
||||
|
||||
resp, err := d.client.Do(req)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Str("url", url).Msg("failed to send webhook")
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode >= 400 {
|
||||
log.Warn().
|
||||
Int("status", resp.StatusCode).
|
||||
Str("url", url).
|
||||
Str("subscription", payload.SubscriptionName).
|
||||
Msg("webhook returned error status")
|
||||
} else {
|
||||
log.Debug().
|
||||
Int("status", resp.StatusCode).
|
||||
Str("url", url).
|
||||
Str("subscription", payload.SubscriptionName).
|
||||
Msg("webhook delivered successfully")
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -122,6 +122,12 @@ func (m *MultiHostService) SubscribeEventsAndStats(ctx context.Context, events c
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MultiHostService) SubscribeEvents(ctx context.Context, events chan<- container.ContainerEvent) {
|
||||
for _, client := range m.manager.List() {
|
||||
client.SubscribeEvents(ctx, events)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MultiHostService) SubscribeContainersStarted(ctx context.Context, containers chan<- container.Container, filter container_support.ContainerFilter) {
|
||||
newContainers := make(chan container.Container)
|
||||
for _, client := range m.manager.List() {
|
||||
|
||||
@@ -96,6 +96,10 @@ func (m *K8sClusterService) SubscribeEventsAndStats(ctx context.Context, events
|
||||
m.client.SubscribeStats(ctx, stats)
|
||||
}
|
||||
|
||||
func (m *K8sClusterService) SubscribeEvents(ctx context.Context, events chan<- container.ContainerEvent) {
|
||||
m.client.SubscribeEvents(ctx, events)
|
||||
}
|
||||
|
||||
func (m *K8sClusterService) SubscribeContainersStarted(ctx context.Context, containers chan<- container.Container, filter container_support.ContainerFilter) {
|
||||
newContainers := make(chan container.Container)
|
||||
m.client.SubscribeContainersStarted(ctx, newContainers)
|
||||
|
||||
+15
-13
@@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/amir20/dozzle/internal/auth"
|
||||
"github.com/amir20/dozzle/internal/container"
|
||||
"github.com/amir20/dozzle/internal/notification"
|
||||
container_support "github.com/amir20/dozzle/internal/support/container"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
@@ -34,19 +35,20 @@ const (
|
||||
|
||||
// Config is a struct for configuring the web service
|
||||
type Config struct {
|
||||
Base string
|
||||
Addr string
|
||||
Version string
|
||||
Hostname string
|
||||
NoAnalytics bool
|
||||
Dev bool
|
||||
Mode string
|
||||
Authorization Authorization
|
||||
EnableActions bool
|
||||
EnableShell bool
|
||||
DisableAvatars bool
|
||||
ReleaseCheckMode ReleaseCheckMode
|
||||
Labels container.ContainerLabels
|
||||
Base string
|
||||
Addr string
|
||||
Version string
|
||||
Hostname string
|
||||
NoAnalytics bool
|
||||
Dev bool
|
||||
Mode string
|
||||
Authorization Authorization
|
||||
EnableActions bool
|
||||
EnableShell bool
|
||||
DisableAvatars bool
|
||||
ReleaseCheckMode ReleaseCheckMode
|
||||
Labels container.ContainerLabels
|
||||
NotificationManager *notification.Manager
|
||||
}
|
||||
|
||||
type Authorization struct {
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
"github.com/amir20/dozzle/internal/auth"
|
||||
"github.com/amir20/dozzle/internal/docker"
|
||||
"github.com/amir20/dozzle/internal/k8s"
|
||||
"github.com/amir20/dozzle/internal/notification"
|
||||
"github.com/amir20/dozzle/internal/support/cli"
|
||||
docker_support "github.com/amir20/dozzle/internal/support/docker"
|
||||
k8s_support "github.com/amir20/dozzle/internal/support/k8s"
|
||||
@@ -108,15 +109,18 @@ func main() {
|
||||
log.Fatal().Str("mode", args.Mode).Msg("Invalid mode")
|
||||
}
|
||||
|
||||
srv := createServer(args, hostService)
|
||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
||||
defer stop()
|
||||
|
||||
notificationManager := initializeNotifications(ctx, hostService)
|
||||
|
||||
srv := createServer(args, hostService, notificationManager)
|
||||
go func() {
|
||||
log.Info().Msgf("Accepting connections on %s", args.Addr)
|
||||
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
|
||||
log.Fatal().Err(err).Msg("failed to listen")
|
||||
}
|
||||
}()
|
||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
||||
defer stop()
|
||||
|
||||
<-ctx.Done()
|
||||
stop()
|
||||
@@ -137,7 +141,56 @@ func fileExists(filename string) bool {
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func createServer(args cli.Args, hostService web.HostService) *http.Server {
|
||||
func initializeSimpleAuth(authTTL string) web.Authorizer {
|
||||
log.Debug().Msg("Using simple authentication")
|
||||
|
||||
userFilePath := "./data/users.yml"
|
||||
if !fileExists(userFilePath) {
|
||||
userFilePath = "./data/users.yaml"
|
||||
if !fileExists(userFilePath) {
|
||||
log.Fatal().Msg("No users.yaml or users.yml file found.")
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug().Msgf("Reading %s file", filepath.Base(userFilePath))
|
||||
|
||||
db, err := auth.ReadUsersFromFile(userFilePath)
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msgf("Could not read users file: %s", userFilePath)
|
||||
}
|
||||
|
||||
log.Debug().Int("users", len(db.Users)).Msg("Loaded users")
|
||||
ttl := time.Duration(0)
|
||||
if authTTL != "session" {
|
||||
ttl, err = time.ParseDuration(authTTL)
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msg("Could not parse auth ttl")
|
||||
}
|
||||
}
|
||||
return auth.NewSimpleAuth(db, ttl)
|
||||
}
|
||||
|
||||
func initializeNotifications(ctx context.Context, hostService web.HostService) *notification.Manager {
|
||||
notificationsPath := "./data/notifications.yml"
|
||||
manager, err := notification.LoadFromFile(notificationsPath, hostService)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("failed to load notifications config")
|
||||
return nil
|
||||
}
|
||||
|
||||
if manager == nil {
|
||||
return nil // No config file
|
||||
}
|
||||
|
||||
if err := manager.Start(ctx); err != nil {
|
||||
log.Error().Err(err).Msg("failed to start notification manager")
|
||||
return nil
|
||||
}
|
||||
|
||||
return manager
|
||||
}
|
||||
|
||||
func createServer(args cli.Args, hostService web.HostService, notificationManager *notification.Manager) *http.Server {
|
||||
_, dev := os.LookupEnv("DEV")
|
||||
|
||||
var releaseCheckMode web.ReleaseCheckMode = web.Automatic
|
||||
@@ -158,33 +211,8 @@ func createServer(args cli.Args, hostService web.HostService) *http.Server {
|
||||
provider = web.FORWARD_PROXY
|
||||
authorizer = auth.NewForwardProxyAuth(args.AuthHeaderUser, args.AuthHeaderEmail, args.AuthHeaderName, args.AuthHeaderFilter, args.AuthHeaderRoles)
|
||||
} else if args.AuthProvider == "simple" {
|
||||
log.Debug().Msg("Using simple authentication")
|
||||
provider = web.SIMPLE
|
||||
|
||||
userFilePath := "./data/users.yml"
|
||||
if !fileExists(userFilePath) {
|
||||
userFilePath = "./data/users.yaml"
|
||||
if !fileExists(userFilePath) {
|
||||
log.Fatal().Msg("No users.yaml or users.yml file found.")
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug().Msgf("Reading %s file", filepath.Base(userFilePath))
|
||||
|
||||
db, err := auth.ReadUsersFromFile(userFilePath)
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msgf("Could not read users file: %s", userFilePath)
|
||||
}
|
||||
|
||||
log.Debug().Int("users", len(db.Users)).Msg("Loaded users")
|
||||
ttl := time.Duration(0)
|
||||
if args.AuthTTL != "session" {
|
||||
ttl, err = time.ParseDuration(args.AuthTTL)
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msg("Could not parse auth ttl")
|
||||
}
|
||||
}
|
||||
authorizer = auth.NewSimpleAuth(db, ttl)
|
||||
authorizer = initializeSimpleAuth(args.AuthTTL)
|
||||
}
|
||||
|
||||
authTTL := time.Duration(0)
|
||||
@@ -211,11 +239,12 @@ func createServer(args cli.Args, hostService web.HostService) *http.Server {
|
||||
TTL: authTTL,
|
||||
LogoutUrl: args.AuthLogoutUrl,
|
||||
},
|
||||
EnableActions: args.EnableActions,
|
||||
EnableShell: args.EnableShell,
|
||||
DisableAvatars: args.DisableAvatars,
|
||||
ReleaseCheckMode: releaseCheckMode,
|
||||
Labels: args.Filter,
|
||||
EnableActions: args.EnableActions,
|
||||
EnableShell: args.EnableShell,
|
||||
DisableAvatars: args.DisableAvatars,
|
||||
ReleaseCheckMode: releaseCheckMode,
|
||||
Labels: args.Filter,
|
||||
NotificationManager: notificationManager,
|
||||
}
|
||||
|
||||
assets, err := fs.Sub(content, "dist")
|
||||
|
||||
Reference in New Issue
Block a user