diff --git a/go.mod b/go.mod index fccd2562..b4538206 100644 --- a/go.mod +++ b/go.mod @@ -57,6 +57,7 @@ require ( github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect github.com/distribution/reference v0.6.0 // indirect github.com/emicklei/go-restful/v3 v3.12.2 // indirect + github.com/expr-lang/expr v1.17.7 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fxamacker/cbor/v2 v2.9.0 // indirect github.com/go-logr/logr v1.4.3 // indirect diff --git a/go.sum b/go.sum index e74daf12..6b26742b 100644 --- a/go.sum +++ b/go.sum @@ -78,6 +78,8 @@ github.com/emicklei/go-restful/v3 v3.12.1 h1:PJMDIM/ak7btuL8Ex0iYET9hxM3CI2sjZtz github.com/emicklei/go-restful/v3 v3.12.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/emicklei/go-restful/v3 v3.12.2 h1:DhwDP0vY3k8ZzE0RunuJy8GhNpPL6zqLkDf9B/a0/xU= github.com/emicklei/go-restful/v3 v3.12.2/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/expr-lang/expr v1.17.7 h1:Q0xY/e/2aCIp8g9s/LGvMDCC5PxYlvHgDZRQ4y16JX8= +github.com/expr-lang/expr v1.17.7/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= diff --git a/internal/notification/manager.go b/internal/notification/manager.go new file mode 100644 index 00000000..c9609db4 --- /dev/null +++ b/internal/notification/manager.go @@ -0,0 +1,325 @@ +package notification + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/amir20/dozzle/internal/container" + container_support "github.com/amir20/dozzle/internal/support/container" + "github.com/expr-lang/expr" + "github.com/expr-lang/expr/vm" + "github.com/rs/zerolog/log" +) + +// Manager handles notification subscriptions and matching +type Manager struct { + subscriptions []*compiledSubscription + mu sync.RWMutex + activeStreams map[string]*activeStream // containerID -> stream info + containerCache map[string]Container // containerID -> notification.Container + containerService ContainerService + dispatcher Dispatcher +} + +// activeStream tracks a single log stream and all subscriptions watching it +type activeStream struct { + container Container // notification.Container + subscriptions []*compiledSubscription // All subscriptions watching this container + cancel context.CancelFunc // Cancel function for the stream +} + +// compiledSubscription wraps a Subscription with compiled expr programs +type compiledSubscription struct { + subscription *Subscription + containerProgram *vm.Program // Always non-nil + logProgram *vm.Program // Always non-nil +} + +// ContainerService provides access to containers and log streaming +type ContainerService interface { + ListAllContainers(labels container.ContainerLabels) ([]container.Container, []error) + FindContainer(host, id string, labels container.ContainerLabels) (*container_support.ContainerService, error) + SubscribeContainersStarted(ctx context.Context, containers chan<- container.Container, filter container_support.ContainerFilter) +} + +// NewManager creates a new notification manager +func NewManager(containerService ContainerService, dispatcher Dispatcher) *Manager { + return &Manager{ + subscriptions: make([]*compiledSubscription, 0), + activeStreams: make(map[string]*activeStream), + containerCache: make(map[string]Container), + containerService: containerService, + dispatcher: dispatcher, + } +} + +// LoadSubscriptions loads and compiles multiple subscriptions (replaces existing) +func (m *Manager) LoadSubscriptions(subs []*Subscription) error { + m.mu.Lock() + m.subscriptions = make([]*compiledSubscription, 0, len(subs)) + m.mu.Unlock() + + for _, sub := range subs { + if err := m.AddSubscription(sub); err != nil { + return err + } + } + + log.Info().Int("count", len(subs)).Msg("loaded notification subscriptions") + return nil +} + +// AddSubscription adds a single subscription +func (m *Manager) AddSubscription(sub *Subscription) error { + cs, err := m.compileSubscription(sub) + if err != nil { + return err + } + + m.mu.Lock() + m.subscriptions = append(m.subscriptions, cs) + m.mu.Unlock() + + log.Info().Str("subscription", sub.Name).Msg("added notification subscription") + return nil +} + +// compileSubscription compiles expr programs for a subscription +func (m *Manager) compileSubscription(sub *Subscription) (*compiledSubscription, error) { + if sub.ContainerFilter == "" { + return nil, fmt.Errorf("container_filter is required for subscription %q", sub.Name) + } + if sub.LogFilter == "" { + return nil, fmt.Errorf("log_filter is required for subscription %q", sub.Name) + } + + cs := &compiledSubscription{ + subscription: sub, + } + + // Compile container filter (required) + containerProgram, err := expr.Compile(sub.ContainerFilter, expr.Env(Container{})) + if err != nil { + log.Error().Err(err).Str("subscription", sub.Name).Msg("failed to compile container filter") + return nil, fmt.Errorf("failed to compile container_filter for %q: %w", sub.Name, err) + } + cs.containerProgram = containerProgram + + // Compile log filter (required) + logProgram, err := expr.Compile(sub.LogFilter, expr.Env(Log{})) + if err != nil { + log.Error().Err(err).Str("subscription", sub.Name).Msg("failed to compile log filter") + return nil, fmt.Errorf("failed to compile log_filter for %q: %w", sub.Name, err) + } + cs.logProgram = logProgram + + return cs, nil +} + +// Start begins monitoring containers and streaming logs for matching subscriptions +// Only starts if there are subscriptions configured +func (m *Manager) Start(ctx context.Context) error { + m.mu.RLock() + hasSubscriptions := len(m.subscriptions) > 0 + m.mu.RUnlock() + + if !hasSubscriptions { + log.Debug().Msg("no subscriptions configured, skipping notification manager start") + return nil + } + + // Subscribe to new containers that match our filters + newContainers := make(chan container.Container) + m.containerService.SubscribeContainersStarted(ctx, newContainers, func(c *container.Container) bool { + matchingSubs := m.getMatchingSubscriptions(c) + return len(matchingSubs) > 0 + }) + + // Handle new containers in background + go func() { + for c := range newContainers { + m.startContainerStream(ctx, &c) + } + }() + + // Get all existing containers and start streaming matching ones + containers, errs := m.containerService.ListAllContainers(nil) + for _, err := range errs { + log.Warn().Err(err).Msg("error listing containers for notifications") + } + + // Check each container and start streaming only if it matches any subscription + for _, c := range containers { + if c.State == "running" { + matchingSubs := m.getMatchingSubscriptions(&c) + if len(matchingSubs) > 0 { + m.startContainerStream(ctx, &c) + } + } + } + + return nil +} + +// startContainerStream starts a single log stream for a container +// All matching subscriptions will receive logs from this one stream +func (m *Manager) startContainerStream(ctx context.Context, c *container.Container) { + // Check if already streaming + m.mu.RLock() + _, exists := m.activeStreams[c.ID] + m.mu.RUnlock() + + if exists { + return + } + + // Find which subscriptions match this container + matchingSubs := m.getMatchingSubscriptions(c) + if len(matchingSubs) == 0 { + return + } + + log.Debug(). + Str("container", c.Name). + Int("subscriptions", len(matchingSubs)). + Msg("starting log stream for notification subscriptions") + + // Convert to notification.Container and cache it + notifContainer := NewContainer(c) + m.mu.Lock() + m.containerCache[c.ID] = notifContainer + m.mu.Unlock() + + streamCtx, cancel := context.WithCancel(ctx) + + stream := &activeStream{ + container: notifContainer, + subscriptions: matchingSubs, + cancel: cancel, + } + + m.mu.Lock() + m.activeStreams[c.ID] = stream + m.mu.Unlock() + + go func() { + defer func() { + m.mu.Lock() + delete(m.activeStreams, c.ID) + m.mu.Unlock() + }() + + logs := make(chan *container.LogEvent) + + // Stream logs in separate goroutine + go func() { + containerService, err := m.containerService.FindContainer(c.Host, c.ID, nil) + if err != nil { + log.Error().Err(err).Msg("error finding container for notification streaming") + return + } + + err = containerService.StreamLogs(streamCtx, time.Now(), container.STDOUT|container.STDERR, logs) + if err != nil && err != context.Canceled { + log.Error().Err(err).Msg("error streaming logs for notification") + } + close(logs) + }() + + // Process logs - check against all matching subscriptions + for logEvent := range logs { + for _, cs := range matchingSubs { + m.processLogEvent(logEvent, c.ID, cs) + } + } + }() +} + +// getMatchingSubscriptions returns subscriptions that match a container +func (m *Manager) getMatchingSubscriptions(c *container.Container) []*compiledSubscription { + m.mu.RLock() + defer m.mu.RUnlock() + + containerCtx := NewContainer(c) + matching := make([]*compiledSubscription, 0) + + for _, cs := range m.subscriptions { + if m.matchesContainer(cs.containerProgram, containerCtx) { + matching = append(matching, cs) + } + } + + return matching +} + +// matchesContainer evaluates the container filter expression +func (m *Manager) matchesContainer(program *vm.Program, containerCtx Container) bool { + result, err := expr.Run(program, containerCtx) + if err != nil { + log.Error().Err(err).Msg("error evaluating container filter") + return false + } + + match, ok := result.(bool) + if !ok { + log.Error().Msg("container filter did not return boolean") + return false + } + + return match +} + +// processLogEvent evaluates a log event against the subscription's log filter +func (m *Manager) processLogEvent(logEvent *container.LogEvent, containerID string, cs *compiledSubscription) { + // Lookup notification.Container from cache + m.mu.RLock() + notifContainer, ok := m.containerCache[containerID] + m.mu.RUnlock() + + if !ok { + log.Warn().Str("container", containerID).Msg("container not in cache") + return + } + + logCtx := NewLog(logEvent, notifContainer) + + // Evaluate log filter (program is never nil) + if m.matchesLog(cs.logProgram, logCtx) { + m.triggerWebhook(cs.subscription, logCtx) + } +} + +// matchesLog evaluates the log filter expression +func (m *Manager) matchesLog(program *vm.Program, logCtx Log) bool { + result, err := expr.Run(program, logCtx) + if err != nil { + log.Error().Err(err).Msg("error evaluating log filter") + return false + } + + match, ok := result.(bool) + if !ok { + log.Error().Msg("log filter did not return boolean") + return false + } + + return match +} + +// triggerWebhook sends the notification +func (m *Manager) triggerWebhook(sub *Subscription, logCtx Log) { + payload := WebhookPayload{ + SubscriptionName: sub.Name, + Timestamp: time.Now(), + Container: logCtx.Container, + Log: LogEvent{ + Message: logCtx.Message, + Level: logCtx.Level, + Timestamp: logCtx.Timestamp, + }, + } + + m.dispatcher.Dispatch(sub, payload) +} diff --git a/internal/notification/persist.go b/internal/notification/persist.go new file mode 100644 index 00000000..ba0d32fe --- /dev/null +++ b/internal/notification/persist.go @@ -0,0 +1,87 @@ +package notification + +import ( + "fmt" + "os" + + "gopkg.in/yaml.v3" +) + +// Config represents the YAML configuration structure +type Config struct { + Dispatcher DispatcherConfig `yaml:"dispatcher"` + Subscriptions []*Subscription `yaml:"subscriptions"` +} + +// LoadFromFile loads config and returns a fully configured Manager +func LoadFromFile(path string, containerService ContainerService) (*Manager, error) { + data, err := os.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + return nil, nil // No config file is fine, return nil + } + return nil, fmt.Errorf("failed to read notifications config: %w", err) + } + + var config Config + if err := yaml.Unmarshal(data, &config); err != nil { + return nil, fmt.Errorf("failed to parse notifications config: %w", err) + } + + // Validate dispatcher config + if config.Dispatcher.Type == "" { + config.Dispatcher.Type = "simple" // Default to simple + } + if config.Dispatcher.URL == "" { + return nil, fmt.Errorf("dispatcher.url is required") + } + + // Validate subscriptions + for _, sub := range config.Subscriptions { + if sub.Name == "" { + return nil, fmt.Errorf("subscription missing name") + } + if sub.ContainerFilter == "" { + return nil, fmt.Errorf("subscription %q missing container_filter", sub.Name) + } + if sub.LogFilter == "" { + return nil, fmt.Errorf("subscription %q missing log_filter", sub.Name) + } + } + + // Create dispatcher based on type + var dispatcher Dispatcher + switch config.Dispatcher.Type { + case "simple": + dispatcher = NewWebhookDispatcher(&config.Dispatcher) + default: + return nil, fmt.Errorf("unsupported dispatcher type: %s", config.Dispatcher.Type) + } + + // Create and configure manager + manager := NewManager(containerService, dispatcher) + if err := manager.LoadSubscriptions(config.Subscriptions); err != nil { + return nil, err + } + + return manager, nil +} + +// SaveToFile saves dispatcher config and subscriptions to a YAML file +func SaveToFile(path string, dispatcher *DispatcherConfig, subs []*Subscription) error { + config := Config{ + Dispatcher: *dispatcher, + Subscriptions: subs, + } + + data, err := yaml.Marshal(&config) + if err != nil { + return fmt.Errorf("failed to marshal notifications config: %w", err) + } + + if err := os.WriteFile(path, data, 0644); err != nil { + return fmt.Errorf("failed to write notifications config: %w", err) + } + + return nil +} diff --git a/internal/notification/types.go b/internal/notification/types.go new file mode 100644 index 00000000..65a262b0 --- /dev/null +++ b/internal/notification/types.go @@ -0,0 +1,104 @@ +package notification + +import ( + "strings" + "time" + + "github.com/amir20/dozzle/internal/container" +) + +// DispatcherConfig represents global dispatcher configuration +type DispatcherConfig struct { + // Type determines the dispatcher type (webhook, dozzle-service, etc.) + Type string `yaml:"type"` // "webhook" is default + + // URL is the endpoint for the dispatcher service + URL string `yaml:"url"` + + // APIKey for authenticating with the dispatcher service + APIKey string `yaml:"api_key,omitempty"` + + // Headers to include in all requests + Headers map[string]string `yaml:"headers,omitempty"` +} + +// Subscription represents a notification rule +type Subscription struct { + // Name is a human-readable name for this subscription + Name string `yaml:"name"` + + // ContainerFilter is an expr expression to match containers (REQUIRED) + // Expression receives a Container object with: Name, Labels, Host, State + // Example: Name == "nginx" || Labels["app"] == "web" + ContainerFilter string `yaml:"container_filter"` + + // LogFilter is an expr expression to match log events (REQUIRED) + // Expression receives a Log object with: Message, Level, Timestamp, Container + // Example: Level == "error" || Message contains "panic" + LogFilter string `yaml:"log_filter"` +} + +// Container represents container information for expr evaluation (internal type) +type Container struct { + Name string `expr:"Name"` + Labels map[string]string `expr:"Labels"` + Host string `expr:"Host"` + State string `expr:"State"` +} + +// Log represents log event information for expr evaluation +type Log struct { + Message any `expr:"Message"` + Level string `expr:"Level"` + Timestamp time.Time `expr:"Timestamp"` + Container Container `expr:"Container"` +} + +// WebhookPayload is the structure sent to webhook endpoints +type WebhookPayload struct { + SubscriptionName string `json:"subscription_name"` + Timestamp time.Time `json:"timestamp"` + Container Container `json:"container"` + Log LogEvent `json:"log"` +} + +// LogEvent represents the log in the webhook payload +type LogEvent struct { + Message any `json:"message"` + Level string `json:"level"` + Timestamp time.Time `json:"timestamp"` +} + +// NewContainer converts container.Container to notification.Container +func NewContainer(c *container.Container) Container { + return Container{ + Name: c.Name, + Labels: c.Labels, + Host: c.Host, + State: c.State, + } +} + +// NewLog creates a Log from container.LogEvent and notification.Container +func NewLog(logEvent *container.LogEvent, notifContainer Container) Log { + timestamp := time.Unix(0, logEvent.Timestamp) + + // Extract message - keep as any type for expr flexibility + var message any = logEvent.Message + + // For grouped logs, join fragments into single string for easier matching + if fragments, ok := logEvent.Message.([]container.LogFragment); ok { + parts := make([]string, len(fragments)) + for i, frag := range fragments { + parts[i] = frag.Message + } + message = strings.Join(parts, "\n") + } + + return Log{ + Message: message, + Level: logEvent.Level, + Timestamp: timestamp, + Container: notifContainer, + } +} diff --git a/internal/notification/webhook.go b/internal/notification/webhook.go new file mode 100644 index 00000000..da3694d1 --- /dev/null +++ b/internal/notification/webhook.go @@ -0,0 +1,83 @@ +package notification + +import ( + "bytes" + "encoding/json" + "net/http" + "time" + + "github.com/rs/zerolog/log" +) + +// Dispatcher is the interface for sending notifications +type Dispatcher interface { + Dispatch(subscription *Subscription, payload WebhookPayload) +} + +// WebhookDispatcher handles async webhook delivery +type WebhookDispatcher struct { + client *http.Client + config *DispatcherConfig +} + +// NewWebhookDispatcher creates a new webhook dispatcher +func NewWebhookDispatcher(config *DispatcherConfig) *WebhookDispatcher { + return &WebhookDispatcher{ + client: &http.Client{ + Timeout: 10 * time.Second, + }, + config: config, + } +} + +// Dispatch sends a webhook payload asynchronously +func (d *WebhookDispatcher) Dispatch(subscription *Subscription, payload WebhookPayload) { + url := d.config.URL + go func() { + body, err := json.Marshal(payload) + if err != nil { + log.Error().Err(err).Str("url", url).Msg("failed to marshal webhook payload") + return + } + + req, err := http.NewRequest("POST", url, bytes.NewReader(body)) + if err != nil { + log.Error().Err(err).Str("url", url).Msg("failed to create webhook request") + return + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", "Dozzle-Notifications/1.0") + + // Add API key if configured + if d.config.APIKey != "" { + req.Header.Set("Authorization", "Bearer "+d.config.APIKey) + } + + // Add custom headers + for key, value := range d.config.Headers { + req.Header.Set(key, value) + } + + resp, err := d.client.Do(req) + if err != nil { + log.Error().Err(err).Str("url", url).Msg("failed to send webhook") + return + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + log.Warn(). + Int("status", resp.StatusCode). + Str("url", url). + Str("subscription", payload.SubscriptionName). + Msg("webhook returned error status") + } else { + log.Debug(). + Int("status", resp.StatusCode). + Str("url", url). + Str("subscription", payload.SubscriptionName). + Msg("webhook delivered successfully") + } + }() +} diff --git a/internal/support/docker/multi_host_service.go b/internal/support/docker/multi_host_service.go index 312032a1..3c4bd8b2 100644 --- a/internal/support/docker/multi_host_service.go +++ b/internal/support/docker/multi_host_service.go @@ -122,6 +122,12 @@ func (m *MultiHostService) SubscribeEventsAndStats(ctx context.Context, events c } } +func (m *MultiHostService) SubscribeEvents(ctx context.Context, events chan<- container.ContainerEvent) { + for _, client := range m.manager.List() { + client.SubscribeEvents(ctx, events) + } +} + func (m *MultiHostService) SubscribeContainersStarted(ctx context.Context, containers chan<- container.Container, filter container_support.ContainerFilter) { newContainers := make(chan container.Container) for _, client := range m.manager.List() { diff --git a/internal/support/k8s/k8s_cluster_service.go b/internal/support/k8s/k8s_cluster_service.go index ee5aa2f1..65afdaac 100644 --- a/internal/support/k8s/k8s_cluster_service.go +++ b/internal/support/k8s/k8s_cluster_service.go @@ -96,6 +96,10 @@ func (m *K8sClusterService) SubscribeEventsAndStats(ctx context.Context, events m.client.SubscribeStats(ctx, stats) } +func (m *K8sClusterService) SubscribeEvents(ctx context.Context, events chan<- container.ContainerEvent) { + m.client.SubscribeEvents(ctx, events) +} + func (m *K8sClusterService) SubscribeContainersStarted(ctx context.Context, containers chan<- container.Container, filter container_support.ContainerFilter) { newContainers := make(chan container.Container) m.client.SubscribeContainersStarted(ctx, newContainers) diff --git a/internal/web/routes.go b/internal/web/routes.go index c2eef374..b3e6c696 100644 --- a/internal/web/routes.go +++ b/internal/web/routes.go @@ -10,6 +10,7 @@ import ( "github.com/amir20/dozzle/internal/auth" "github.com/amir20/dozzle/internal/container" + "github.com/amir20/dozzle/internal/notification" container_support "github.com/amir20/dozzle/internal/support/container" "github.com/go-chi/chi/v5" @@ -34,19 +35,20 @@ const ( // Config is a struct for configuring the web service type Config struct { - Base string - Addr string - Version string - Hostname string - NoAnalytics bool - Dev bool - Mode string - Authorization Authorization - EnableActions bool - EnableShell bool - DisableAvatars bool - ReleaseCheckMode ReleaseCheckMode - Labels container.ContainerLabels + Base string + Addr string + Version string + Hostname string + NoAnalytics bool + Dev bool + Mode string + Authorization Authorization + EnableActions bool + EnableShell bool + DisableAvatars bool + ReleaseCheckMode ReleaseCheckMode + Labels container.ContainerLabels + NotificationManager *notification.Manager } type Authorization struct { diff --git a/main.go b/main.go index b425913e..971a3e90 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,7 @@ import ( "github.com/amir20/dozzle/internal/auth" "github.com/amir20/dozzle/internal/docker" "github.com/amir20/dozzle/internal/k8s" + "github.com/amir20/dozzle/internal/notification" "github.com/amir20/dozzle/internal/support/cli" docker_support "github.com/amir20/dozzle/internal/support/docker" k8s_support "github.com/amir20/dozzle/internal/support/k8s" @@ -108,15 +109,18 @@ func main() { log.Fatal().Str("mode", args.Mode).Msg("Invalid mode") } - srv := createServer(args, hostService) + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + notificationManager := initializeNotifications(ctx, hostService) + + srv := createServer(args, hostService, notificationManager) go func() { log.Info().Msgf("Accepting connections on %s", args.Addr) if err := srv.ListenAndServe(); err != http.ErrServerClosed { log.Fatal().Err(err).Msg("failed to listen") } }() - ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) - defer stop() <-ctx.Done() stop() @@ -137,7 +141,56 @@ func fileExists(filename string) bool { return err == nil } -func createServer(args cli.Args, hostService web.HostService) *http.Server { +func initializeSimpleAuth(authTTL string) web.Authorizer { + log.Debug().Msg("Using simple authentication") + + userFilePath := "./data/users.yml" + if !fileExists(userFilePath) { + userFilePath = "./data/users.yaml" + if !fileExists(userFilePath) { + log.Fatal().Msg("No users.yaml or users.yml file found.") + } + } + + log.Debug().Msgf("Reading %s file", filepath.Base(userFilePath)) + + db, err := auth.ReadUsersFromFile(userFilePath) + if err != nil { + log.Fatal().Err(err).Msgf("Could not read users file: %s", userFilePath) + } + + log.Debug().Int("users", len(db.Users)).Msg("Loaded users") + ttl := time.Duration(0) + if authTTL != "session" { + ttl, err = time.ParseDuration(authTTL) + if err != nil { + log.Fatal().Err(err).Msg("Could not parse auth ttl") + } + } + return auth.NewSimpleAuth(db, ttl) +} + +func initializeNotifications(ctx context.Context, hostService web.HostService) *notification.Manager { + notificationsPath := "./data/notifications.yml" + manager, err := notification.LoadFromFile(notificationsPath, hostService) + if err != nil { + log.Error().Err(err).Msg("failed to load notifications config") + return nil + } + + if manager == nil { + return nil // No config file + } + + if err := manager.Start(ctx); err != nil { + log.Error().Err(err).Msg("failed to start notification manager") + return nil + } + + return manager +} + +func createServer(args cli.Args, hostService web.HostService, notificationManager *notification.Manager) *http.Server { _, dev := os.LookupEnv("DEV") var releaseCheckMode web.ReleaseCheckMode = web.Automatic @@ -158,33 +211,8 @@ func createServer(args cli.Args, hostService web.HostService) *http.Server { provider = web.FORWARD_PROXY authorizer = auth.NewForwardProxyAuth(args.AuthHeaderUser, args.AuthHeaderEmail, args.AuthHeaderName, args.AuthHeaderFilter, args.AuthHeaderRoles) } else if args.AuthProvider == "simple" { - log.Debug().Msg("Using simple authentication") provider = web.SIMPLE - - userFilePath := "./data/users.yml" - if !fileExists(userFilePath) { - userFilePath = "./data/users.yaml" - if !fileExists(userFilePath) { - log.Fatal().Msg("No users.yaml or users.yml file found.") - } - } - - log.Debug().Msgf("Reading %s file", filepath.Base(userFilePath)) - - db, err := auth.ReadUsersFromFile(userFilePath) - if err != nil { - log.Fatal().Err(err).Msgf("Could not read users file: %s", userFilePath) - } - - log.Debug().Int("users", len(db.Users)).Msg("Loaded users") - ttl := time.Duration(0) - if args.AuthTTL != "session" { - ttl, err = time.ParseDuration(args.AuthTTL) - if err != nil { - log.Fatal().Err(err).Msg("Could not parse auth ttl") - } - } - authorizer = auth.NewSimpleAuth(db, ttl) + authorizer = initializeSimpleAuth(args.AuthTTL) } authTTL := time.Duration(0) @@ -211,11 +239,12 @@ func createServer(args cli.Args, hostService web.HostService) *http.Server { TTL: authTTL, LogoutUrl: args.AuthLogoutUrl, }, - EnableActions: args.EnableActions, - EnableShell: args.EnableShell, - DisableAvatars: args.DisableAvatars, - ReleaseCheckMode: releaseCheckMode, - Labels: args.Filter, + EnableActions: args.EnableActions, + EnableShell: args.EnableShell, + DisableAvatars: args.DisableAvatars, + ReleaseCheckMode: releaseCheckMode, + Labels: args.Filter, + NotificationManager: notificationManager, } assets, err := fs.Sub(content, "dist")