diff --git a/Dockerfile b/Dockerfile index 8610f993..0c97cd10 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,6 +19,9 @@ COPY assets ./assets COPY locales ./locales COPY public ./public +ARG CLOUD_URL +ENV CLOUD_URL=$CLOUD_URL + # Build assets RUN pnpm build diff --git a/Makefile b/Makefile index a979d615..3da2c03a 100644 --- a/Makefile +++ b/Makefile @@ -27,7 +27,7 @@ build: dist generate .PHONY: docker docker: generate - @docker build --build-arg TAG=local -t amir20/dozzle:local . + @docker build --build-arg TAG=local --build-arg CLOUD_URL=$(CLOUD_URL) -t amir20/dozzle:local . .PHONY: generate generate: shared_key.pem shared_cert.pem diff --git a/docs/guide/k8s.md b/docs/guide/k8s.md index 9a2712be..0700ad21 100644 --- a/docs/guide/k8s.md +++ b/docs/guide/k8s.md @@ -44,6 +44,23 @@ roleRef: name: pod-viewer-role apiGroup: rbac.authorization.k8s.io --- +# pvc.yaml +# ReadWriteOnce + Recreate strategy means cloud config / notification rules +# briefly become unavailable during pod rollouts (the new pod can't mount until +# the old one releases). For zero-downtime config persistence, use a +# ReadWriteMany storage class (NFS, CephFS, etc.) and switch the strategy +# below to RollingUpdate. +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: dozzle-data +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi +--- # deployment.yaml apiVersion: apps/v1 kind: Deployment @@ -53,6 +70,8 @@ spec: selector: matchLabels: app: dozzle + strategy: + type: Recreate template: metadata: labels: @@ -67,6 +86,13 @@ spec: env: - name: DOZZLE_MODE value: "k8s" + volumeMounts: + - name: data + mountPath: /data + volumes: + - name: data + persistentVolumeClaim: + claimName: dozzle-data --- # service.yaml apiVersion: v1 @@ -84,6 +110,7 @@ spec: ``` This configuration creates a service account, a cluster role, and a cluster role binding to allow Dozzle to access the necessary Kubernetes resources. It also creates a deployment for Dozzle and exposes it via a service. + > [!WARNING] > When deploying this with any GitOps tool (like Flux CD or Argo CD) in a specific namespace apart from `default`, make sure to change the **namespace** in the **ClusterRoleBinding Subject** diff --git a/docs/guide/swarm-mode.md b/docs/guide/swarm-mode.md index ca5b6297..2b987d8a 100644 --- a/docs/guide/swarm-mode.md +++ b/docs/guide/swarm-mode.md @@ -26,6 +26,7 @@ services: - DOZZLE_MODE=swarm volumes: - /var/run/docker.sock:/var/run/docker.sock + - /opt/dozzle/data:/data ports: - 8080:8080 networks: @@ -39,6 +40,8 @@ networks: Note that the `DOZZLE_MODE` environment variable is set to `swarm`. This tells Dozzle to automatically discover other Dozzle instances in the swarm. The `overlay` network is used to create the mesh network between the different Dozzle instances. +The `/data` volume is mounted to persist Dozzle's configuration (notifications, cloud settings, custom stacks). Since Dozzle is deployed globally on every node, mount a host path on each node so each instance keeps its local state across restarts. + > [!WARNING] > Socket-proxy cannot be used in Docker Swarm mode. This limitation stems from Docker itself, not Dozzle. In Swarm mode, services can only communicate with other services, but Dozzle requires direct connections to individual proxy instancesโ€”which isn't supported. If you have a solution for using socket-proxy in Swarm mode, we'd love to hear from you! @@ -56,6 +59,7 @@ services: - DOZZLE_AUTH_PROVIDER=simple volumes: - /var/run/docker.sock:/var/run/docker.sock + - /opt/dozzle/data:/data secrets: - source: users target: /data/users.yml @@ -95,6 +99,7 @@ services: - DOZZLE_REMOTE_AGENT=agent:7007 volumes: - /var/run/docker.sock:/var/run/docker.sock + - /opt/dozzle/data:/data ports: - 8080:8080 networks: diff --git a/examples/setup-swarm.fish b/examples/setup-swarm.fish index 98e5e0ad..3ce03be1 100755 --- a/examples/setup-swarm.fish +++ b/examples/setup-swarm.fish @@ -15,7 +15,7 @@ docker run -d --name manager \ -p 2377:2377 \ -p 7946:7946 \ -p 4789:4789 \ - -p 8080:8080 \ + -p 8090:8090 \ -v ./examples:/examples \ docker @@ -57,6 +57,14 @@ end echo "โœ… Swarm is ready. Deploying Dozzle..." +echo "๐Ÿ”จ Building local image..." +env CLOUD_URL=http://localhost:3000 make -C (git rev-parse --show-toplevel) docker + +echo "๐Ÿ“ฆ Loading image into swarm nodes..." +for node in manager worker-1 worker-2 + docker save amir20/dozzle:local | docker exec -i $node docker load +end + # Create the stack file inside the manager docker exec manager sh -c 'cat > /dozzle-stack.yml << "EOF" services: @@ -64,21 +72,32 @@ services: image: amir20/dozzle:local environment: - DOZZLE_MODE=swarm + - DOZZLE_ENABLE_ACTIONS=true + - DOLIGENCE_URL=http://doligence-api:8080 + - AGENT_URL=http://doligence-api:8082 volumes: - /var/run/docker.sock:/var/run/docker.sock + - dozzle-data:/data ports: - - 8080:8080 + - 8090:8080 networks: - dozzle + # 172.16.48.100 is a static IP for the doligence-api dev container, + # assigned via doligence/compose.override.yaml on the swarm-net + # external network (the network this script creates). Local-dev only. + extra_hosts: + - "doligence-api:172.16.48.100" deploy: mode: global networks: dozzle: driver: overlay +volumes: + dozzle-data: EOF' docker exec manager docker stack deploy -c /dozzle-stack.yml dozzle -echo "๐Ÿš€ Dozzle deployed! Access at http://localhost:8080" +echo "๐Ÿš€ Dozzle deployed! Access at http://localhost:8090" function swarm-cleanup docker exec manager docker stack rm dozzle 2>/dev/null @@ -95,7 +114,7 @@ end function swarm-deploy echo "๐Ÿ”จ Building local image..." - make -C (git rev-parse --show-toplevel) docker + env CLOUD_URL=http://localhost:3000 make -C (git rev-parse --show-toplevel) docker echo "๐Ÿ“ฆ Loading image into swarm nodes..." for node in manager worker-1 worker-2 docker save amir20/dozzle:local | docker exec -i $node docker load diff --git a/internal/cloud/client.go b/internal/cloud/client.go index 7acb5a4f..bae1e8bc 100644 --- a/internal/cloud/client.go +++ b/internal/cloud/client.go @@ -11,7 +11,6 @@ import ( "time" "github.com/amir20/dozzle/internal/notification/dispatcher" - "github.com/amir20/dozzle/internal/support/cli" pb "github.com/amir20/dozzle/proto/cloud" "github.com/rs/zerolog/log" "golang.org/x/sync/semaphore" @@ -38,6 +37,8 @@ const ( type Client struct { deps ToolDeps apiKeyFunc func() string + instanceID string + version string streamLogsFunc func() bool target string plaintext bool @@ -55,7 +56,11 @@ type Client struct { // NewClient creates a new cloud gRPC client. // apiKeyFunc is called to get the current cloud API key โ€” it may return "" // if no cloud dispatcher is configured yet, in which case the client waits. -func NewClient(apiKeyFunc func() string, deps ToolDeps) *Client { +// instanceID is a stable per-process identifier (typically the local host ID) +// sent as `x-instance-id` metadata so the cloud registry can keep multiple +// connections per API key (e.g. one per swarm replica or remote agent). +// version is reported back to the cloud in ListToolsResponse. +func NewClient(apiKeyFunc func() string, instanceID string, version string, deps ToolDeps) *Client { cloudURL := os.Getenv("AGENT_URL") if cloudURL == "" { cloudURL = "https://agent.doligence.dozzle.dev" @@ -78,6 +83,8 @@ func NewClient(apiKeyFunc func() string, deps ToolDeps) *Client { return &Client{ deps: deps, apiKeyFunc: apiKeyFunc, + instanceID: instanceID, + version: version, target: target, plaintext: plaintext, toolSem: semaphore.NewWeighted(maxConcurrent), @@ -225,7 +232,11 @@ func (c *Client) connect(ctx context.Context, apiKey string) (wasConnected bool, c.connMu.Unlock() }() - md := metadata.Pairs("x-api-key", apiKey) + mdPairs := []string{"x-api-key", apiKey} + if c.instanceID != "" { + mdPairs = append(mdPairs, "x-instance-id", c.instanceID) + } + md := metadata.Pairs(mdPairs...) streamCtx := metadata.NewOutgoingContext(connCtx, md) stream, err := client.ToolStream(streamCtx) @@ -371,7 +382,7 @@ func (c *Client) handleRequest(ctx context.Context, req *pb.ToolRequest) *pb.Too resp.Type = &pb.ToolResponse_ListTools{ ListTools: &pb.ListToolsResponse{ Tools: c.tools(), - Version: cli.Version, + Version: c.version, }, } diff --git a/internal/cloud/client_test.go b/internal/cloud/client_test.go index 4f48446d..e4a8401b 100644 --- a/internal/cloud/client_test.go +++ b/internal/cloud/client_test.go @@ -13,20 +13,20 @@ import ( func TestNewClient_DefaultURL(t *testing.T) { t.Setenv("AGENT_URL", "") - client := NewClient(func() string { return "test-key" }, ToolDeps{EnableActions: true}) + client := NewClient(func() string { return "test-key" }, "test-instance", "test", ToolDeps{EnableActions: true}) assert.Equal(t, "agent.doligence.dozzle.dev:443", client.target) } func TestNewClient_CustomURL(t *testing.T) { t.Setenv("AGENT_URL", "https://custom.cloud.dev") - client := NewClient(func() string { return "test-key" }, ToolDeps{EnableActions: true}) + client := NewClient(func() string { return "test-key" }, "test-instance", "test", ToolDeps{EnableActions: true}) assert.Equal(t, "custom.cloud.dev:443", client.target) assert.False(t, client.plaintext) } func TestNewClient_PlaintextURL(t *testing.T) { t.Setenv("AGENT_URL", "http://localhost:7008") - client := NewClient(func() string { return "test-key" }, ToolDeps{EnableActions: true}) + client := NewClient(func() string { return "test-key" }, "test-instance", "test", ToolDeps{EnableActions: true}) assert.Equal(t, "localhost:7008", client.target) assert.True(t, client.plaintext) } diff --git a/internal/support/cli/agent_command.go b/internal/support/cli/agent_command.go index a7a276cf..661afea5 100644 --- a/internal/support/cli/agent_command.go +++ b/internal/support/cli/agent_command.go @@ -7,9 +7,11 @@ import ( "net" "os" "os/signal" + "sync/atomic" "syscall" "github.com/amir20/dozzle/internal/agent" + "github.com/amir20/dozzle/internal/cloud" "github.com/amir20/dozzle/internal/docker" "github.com/amir20/dozzle/internal/notification" "github.com/amir20/dozzle/internal/notification/dispatcher" @@ -25,8 +27,20 @@ type AgentCmd struct { // persistingNotificationHandler wraps a notification manager and saves config to disk after updates type persistingNotificationHandler struct { - manager *notification.Manager - configPath string + manager *notification.Manager + configPath string + cloudConfig atomic.Pointer[notification.CloudConfig] + onCloudSet func() +} + +// CloudConfig returns the agent's currently active cloud config, or nil if +// none has been pushed by the main server / loaded from disk. +func (h *persistingNotificationHandler) CloudConfig() *notification.CloudConfig { + return h.cloudConfig.Load() +} + +func (h *persistingNotificationHandler) setCloudConfig(cc *notification.CloudConfig) { + h.cloudConfig.Store(cc) } func (h *persistingNotificationHandler) GetNotificationStats() []types.SubscriptionStats { @@ -72,6 +86,10 @@ func (h *persistingNotificationHandler) SetCloudDispatcher(d dispatcher.Dispatch Prefix: cd.Prefix, ExpiresAt: cd.ExpiresAt, } + h.cloudConfig.Store(&cc) + if h.onCloudSet != nil { + h.onCloudSet() + } if err := os.MkdirAll("./data", 0755); err != nil { log.Error().Err(err).Msg("Could not create data directory for cloud config") return @@ -91,6 +109,10 @@ func (h *persistingNotificationHandler) SetCloudDispatcher(d dispatcher.Dispatch func (h *persistingNotificationHandler) ClearCloudDispatcher() { h.manager.ClearCloudDispatcher() + h.cloudConfig.Store(nil) + if h.onCloudSet != nil { + h.onCloudSet() + } if err := os.Remove("./data/cloud.yml"); err != nil && !os.IsNotExist(err) { log.Error().Err(err).Msg("Could not remove cloud.yml on agent") } @@ -149,6 +171,12 @@ func (a *AgentCmd) Run(args Args, embeddedCerts embed.FS) error { file.Close() } + // Create handler that wraps manager and persists config to disk + notificationHandler := &persistingNotificationHandler{ + manager: notificationManager, + configPath: notificationConfigPath, + } + // Load cloud config if available if file, err := os.Open("./data/cloud.yml"); err == nil { cc, err := notification.LoadCloudConfig(file) @@ -161,15 +189,43 @@ func (a *AgentCmd) Run(args Args, embeddedCerts embed.FS) error { log.Error().Err(err).Msg("Failed to create cloud dispatcher on agent") } else { notificationManager.SetCloudDispatcher(d) + notificationHandler.setCloudConfig(&cc) log.Info().Msg("Loaded cloud config from disk") } } } - // Create handler that wraps manager and persists config to disk - notificationHandler := &persistingNotificationHandler{ - manager: notificationManager, - configPath: notificationConfigPath, + // Create a single-host MultiHostService so the cloud client has a + // HostService for tool execution (list_containers, fetch_logs, etc.). + agentManager := docker_support.NewRetriableClientManager(nil, args.Timeout, certs, clientService) + agentHostService := docker_support.NewMultiHostService(agentManager, args.Timeout) + + // Cloud gRPC client โ€” connects directly to Dozzle Cloud with this agent's + // own host ID as instance_id, so log streaming and tool dispatch happen + // here instead of funneling through the main server. + var instanceID string + if h, err := agentHostService.LocalHost(); err == nil { + instanceID = h.ID + } + apiKeyFunc := func() string { + if cc := notificationHandler.CloudConfig(); cc != nil { + return cc.APIKey + } + return "" + } + cloudClient := cloud.NewClient(apiKeyFunc, instanceID, args.Version(), cloud.ToolDeps{ + EnableActions: false, // agents don't host action tools today + HostService: agentHostService, + Labels: args.Filter, + }) + cloudClient.SetStreamLogsFunc(func() bool { + cc := notificationHandler.CloudConfig() + return cc != nil && cc.StreamLogsEnabled() + }) + notificationHandler.onCloudSet = cloudClient.Notify + go cloudClient.Run(ctx) + if apiKeyFunc() != "" { + cloudClient.Notify() } // Create agent server using the same shared client service diff --git a/internal/support/docker/multi_host_service.go b/internal/support/docker/multi_host_service.go index 80fdc1c7..04c6c4c1 100644 --- a/internal/support/docker/multi_host_service.go +++ b/internal/support/docker/multi_host_service.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/amir20/dozzle/internal/container" @@ -40,6 +41,7 @@ type MultiHostService struct { timeout time.Duration notificationManager *notification.Manager persister *notification.Persister + cloudNotifyFn atomic.Pointer[func()] } func NewMultiHostService(manager ClientManager, timeout time.Duration) *MultiHostService { @@ -165,6 +167,12 @@ func (m *MultiHostService) LocalHost() (container.Host, error) { return host, nil } } + // Swarm mode marks every host as "swarm" so the loop above never matches. + // Fall back to the local docker client directly โ€” its host ID is stable + // per node and is what callers (cloud client instance ID, etc.) actually want. + for _, client := range m.manager.LocalClients() { + return client.Host(), nil + } return container.Host{}, fmt.Errorf("local host not found") } @@ -357,6 +365,66 @@ func (m *MultiHostService) NotificationHandler() *notification.Manager { return m.notificationManager } +// SetCloudNotifyFunc registers a callback the agent server invokes after a +// peer broadcast so the local cloud client reconnects with the new API key. +func (m *MultiHostService) SetCloudNotifyFunc(fn func()) { + m.cloudNotifyFn.Store(&fn) +} + +func (m *MultiHostService) cloudNotify() { + if fn := m.cloudNotifyFn.Load(); fn != nil { + (*fn)() + } +} + +// SwarmNotificationHandler returns the agent-server handler for swarm replicas. +// Broadcasts persist to disk and update this replica's persister + cloud client. +func (m *MultiHostService) SwarmNotificationHandler() *swarmNotificationHandler { + return &swarmNotificationHandler{ + Manager: m.notificationManager, + persister: m.persister, + notify: m.cloudNotify, + } +} + +type swarmNotificationHandler struct { + *notification.Manager + persister *notification.Persister + notify func() +} + +func (h *swarmNotificationHandler) HandleNotificationConfig(subscriptions []types.SubscriptionConfig, dispatchers []types.DispatcherConfig) error { + if err := h.Manager.HandleNotificationConfig(subscriptions, dispatchers); err != nil { + return err + } + h.persister.SaveNotifications() + return nil +} + +// persister.SetCloudConfig calls applyCloudDispatcher โ†’ Manager.SetCloudDispatcher, +// and persister.RemoveCloudConfig calls Manager.ClearCloudDispatcher; we route +// through the persister so disk + manager stay in lockstep on every replica. +func (h *swarmNotificationHandler) SetCloudDispatcher(d dispatcher.Dispatcher) { + cd, ok := d.(*dispatcher.CloudDispatcher) + if !ok { + log.Warn().Str("type", fmt.Sprintf("%T", d)).Msg("Cloud dispatcher type assertion failed in swarm handler, falling back to in-memory only") + h.Manager.SetCloudDispatcher(d) + return + } + cc := ¬ification.CloudConfig{ + APIKey: cd.APIKey, + Prefix: cd.Prefix, + ExpiresAt: cd.ExpiresAt, + } + h.persister.SetCloudConfig(cc) + h.notify() +} + +func (h *swarmNotificationHandler) ClearCloudDispatcher() { + h.persister.RemoveCloudConfig() + h.notify() +} + // AddSubscription adds a subscription to local manager and broadcasts to agents func (m *MultiHostService) AddSubscription(sub *notification.Subscription) error { if err := m.notificationManager.AddSubscription(sub); err != nil { diff --git a/main.go b/main.go index 33e1a644..0bae82f1 100644 --- a/main.go +++ b/main.go @@ -11,17 +11,20 @@ import ( "os" "os/signal" "path/filepath" + "sync" "syscall" "time" "github.com/amir20/dozzle/internal/agent" "github.com/amir20/dozzle/internal/auth" "github.com/amir20/dozzle/internal/cloud" + "github.com/amir20/dozzle/internal/container" "github.com/amir20/dozzle/internal/deploy" "github.com/amir20/dozzle/internal/docker" "github.com/amir20/dozzle/internal/k8s" "github.com/amir20/dozzle/internal/notification/dispatcher" "github.com/amir20/dozzle/internal/support/cli" + container_support "github.com/amir20/dozzle/internal/support/container" docker_support "github.com/amir20/dozzle/internal/support/docker" k8s_support "github.com/amir20/dozzle/internal/support/k8s" "github.com/amir20/dozzle/internal/web" @@ -100,7 +103,7 @@ func main() { } // Create client service for agent server in swarm mode clientService := docker_support.NewDockerClientService(localClient, args.Filter) - server, err := agent.NewServer(clientService, certs, args.Version(), multiHostService.NotificationHandler()) + server, err := agent.NewServer(clientService, certs, args.Version(), multiHostService.SwarmNotificationHandler()) if err != nil { log.Fatal().Err(err).Msg("failed to create agent") } @@ -153,9 +156,16 @@ func main() { } } - cloudClient := cloud.NewClient(apiKeyFunc, cloud.ToolDeps{ + var instanceID string + if h, err := hostService.LocalHost(); err == nil { + instanceID = h.ID + } + + cloudHostService := newLocalCloudHostService(hostService) + + cloudClient := cloud.NewClient(apiKeyFunc, instanceID, args.Version(), cloud.ToolDeps{ EnableActions: args.EnableActions, - HostService: hostService, + HostService: cloudHostService, Labels: args.Filter, DeployManager: deployManager, NotificationService: notificationService, @@ -165,6 +175,12 @@ func main() { }) go cloudClient.Run(ctx) + // In swarm mode, peer broadcasts of cloud config should kick this + // replica's cloud client too, so every replica holds its own connection. + if mhs, ok := hostService.(*docker_support.MultiHostService); ok { + mhs.SetCloudNotifyFunc(cloudClient.Notify) + } + // If cloud is already configured at startup, start the client immediately if apiKeyFunc() != "" { cloudClient.Notify() @@ -307,3 +323,116 @@ func createServer(args cli.Args, hostService web.HostService, onCloudSetup func( return web.CreateServer(hostService, assets, config) } + +// localCloudHostService scopes the cloud client to local docker only; +// otherwise every connection re-reports its peers, multiplying hosts +// by connection count on the cloud side. +type localCloudHostService struct { + services []container_support.ClientService + hostIDByIdx []string // parallel to services, populated lazily + hostIDOnce sync.Once +} + +func newLocalCloudHostService(hs web.HostService) cloud.LogStreamHostService { + services := hs.LocalClientServices() + if len(services) == 0 { + // k8s has no docker LocalClientServices but its HostService already + // exposes only what this process can see, so use it directly. + return hs + } + return &localCloudHostService{services: services} +} + +func (l *localCloudHostService) localHostTimeout() (context.Context, context.CancelFunc) { + return context.WithTimeout(context.Background(), 5*time.Second) +} + +// resolveHostIDs caches each service's host ID once. Local docker host IDs +// are stable for the process lifetime, so a one-time lookup is enough. +func (l *localCloudHostService) resolveHostIDs() []string { + l.hostIDOnce.Do(func() { + l.hostIDByIdx = make([]string, len(l.services)) + for i, s := range l.services { + ctx, cancel := l.localHostTimeout() + h, err := s.Host(ctx) + cancel() + if err == nil { + l.hostIDByIdx[i] = h.ID + } + } + }) + return l.hostIDByIdx +} + +func (l *localCloudHostService) Hosts() []container.Host { + hosts := make([]container.Host, 0, len(l.services)) + for _, s := range l.services { + ctx, cancel := l.localHostTimeout() + h, err := s.Host(ctx) + cancel() + if err != nil { + continue + } + h.Available = true + hosts = append(hosts, h) + } + return hosts +} + +func (l *localCloudHostService) ListAllContainers(labels container.ContainerLabels) ([]container.Container, []error) { + var all []container.Container + var errs []error + for _, s := range l.services { + ctx, cancel := l.localHostTimeout() + list, err := s.ListContainers(ctx, labels) + cancel() + if err != nil { + errs = append(errs, err) + continue + } + all = append(all, list...) + } + return all, errs +} + +func (l *localCloudHostService) FindContainer(host string, id string, labels container.ContainerLabels) (*container_support.ContainerService, error) { + hostIDs := l.resolveHostIDs() + for i, s := range l.services { + if hostIDs[i] != host { + continue + } + ctx, cancel := l.localHostTimeout() + cont, err := s.FindContainer(ctx, id, labels) + cancel() + if err != nil { + return nil, err + } + return container_support.NewContainerService(s, cont), nil + } + return nil, fmt.Errorf("host %s not local to this process", host) +} + +func (l *localCloudHostService) SubscribeContainersStarted(ctx context.Context, containers chan<- container.Container, filter container_support.ContainerFilter) { + // One inbound channel + forwarder goroutine per service so a slow consumer + // or a burst on one service can't cause the others to drop events. + for _, s := range l.services { + ch := make(chan container.Container, 64) + s.SubscribeContainersStarted(ctx, ch) + go func() { + for { + select { + case <-ctx.Done(): + return + case c := <-ch: + if filter(&c) { + select { + case containers <- c: + case <-ctx.Done(): + return + } + } + } + } + }() + } +}