mirror of
https://github.com/amir20/dozzle.git
synced 2026-06-23 04:10:12 +00:00
428 lines
13 KiB
Go
428 lines
13 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"embed"
|
|
"fmt"
|
|
"io/fs"
|
|
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/amir20/dozzle/internal/agent"
|
|
"github.com/amir20/dozzle/internal/auth"
|
|
"github.com/amir20/dozzle/internal/cloud"
|
|
"github.com/amir20/dozzle/internal/container"
|
|
"github.com/amir20/dozzle/internal/docker"
|
|
"github.com/amir20/dozzle/internal/k8s"
|
|
"github.com/amir20/dozzle/internal/notification/dispatcher"
|
|
"github.com/amir20/dozzle/internal/support/cli"
|
|
container_support "github.com/amir20/dozzle/internal/support/container"
|
|
docker_support "github.com/amir20/dozzle/internal/support/docker"
|
|
k8s_support "github.com/amir20/dozzle/internal/support/k8s"
|
|
"github.com/amir20/dozzle/internal/web"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
//go:embed all:dist
|
|
var content embed.FS
|
|
|
|
//go:embed shared_cert.pem shared_key.pem
|
|
var certs embed.FS
|
|
|
|
//go:generate protoc --go_out=. --go-grpc_out=. --proto_path=./protos ./protos/rpc.proto ./protos/types.proto
|
|
//go:generate protoc --go_out=. --go-grpc_out=. --proto_path=./protos --go_opt=module=github.com/amir20/dozzle --go-grpc_opt=module=github.com/amir20/dozzle ./protos/cloud.proto
|
|
func main() {
|
|
cli.ValidateEnvVars(cli.Args{}, cli.AgentCmd{})
|
|
args, subcommand := cli.ParseArgs()
|
|
if subcommand != nil {
|
|
runnable, ok := subcommand.(cli.Runnable)
|
|
if !ok {
|
|
log.Fatal().Msg("Invalid command")
|
|
}
|
|
err := runnable.Run(args, certs)
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("Failed to run command")
|
|
}
|
|
|
|
os.Exit(0)
|
|
}
|
|
|
|
if args.AuthProvider != "none" && args.AuthProvider != "forward-proxy" && args.AuthProvider != "simple" {
|
|
log.Fatal().Str("provider", args.AuthProvider).Msg("Invalid auth provider")
|
|
}
|
|
|
|
log.Info().Msgf("Dozzle version %s", args.Version())
|
|
dispatcher.UserAgent = fmt.Sprintf("Dozzle/%s", args.Version())
|
|
|
|
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
|
defer stop()
|
|
|
|
var hostService web.HostService
|
|
var notificationService cloud.NotificationService
|
|
if args.Mode == "server" {
|
|
multiHostService := cli.CreateMultiHostService(certs, args)
|
|
if multiHostService.TotalClients() == 0 {
|
|
log.Fatal().Msg("Could not connect to any Docker Engine")
|
|
} else {
|
|
log.Info().Int("clients", multiHostService.TotalClients()).Msg("Connected to Docker")
|
|
}
|
|
if err := multiHostService.StartNotificationManager(ctx); err != nil {
|
|
log.Fatal().Err(err).Msg("Could not start notification manager")
|
|
}
|
|
hostService = multiHostService
|
|
notificationService = multiHostService
|
|
} else if args.Mode == "swarm" {
|
|
localClient, err := docker.NewLocalClient("")
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("Could not create docker client")
|
|
}
|
|
certs, err := cli.ReadCertificates(certs, args.CertPath, args.KeyPath)
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("Could not read certificates")
|
|
}
|
|
agentManager := docker_support.NewRetriableClientManager(args.RemoteAgent, args.Timeout, certs)
|
|
manager := docker_support.NewSwarmClientManager(localClient, certs, args.Timeout, agentManager, args.Filter)
|
|
multiHostService := docker_support.NewMultiHostService(manager, args.Timeout)
|
|
if err := multiHostService.StartNotificationManager(ctx); err != nil {
|
|
log.Fatal().Err(err).Msg("Could not start notification manager")
|
|
}
|
|
hostService = multiHostService
|
|
notificationService = multiHostService
|
|
log.Info().Msg("Starting in swarm mode")
|
|
listener, err := net.Listen("tcp", ":7007")
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("failed to listen")
|
|
}
|
|
// Create client service for agent server in swarm mode
|
|
clientService := docker_support.NewDockerClientService(localClient, args.Filter)
|
|
server, err := agent.NewServer(clientService, certs, args.Version(), multiHostService.SwarmNotificationHandler())
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("failed to create agent")
|
|
}
|
|
go cli.StartEvent(args, "swarm", localClient, "")
|
|
go func() {
|
|
log.Info().Msgf("Dozzle agent version in swarm mode %s", args.Version())
|
|
if err := server.Serve(listener); err != nil {
|
|
log.Error().Err(err).Msg("failed to serve")
|
|
}
|
|
}()
|
|
} else if args.Mode == "k8s" {
|
|
localClient, err := k8s.NewK8sClient(args.Namespace)
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("Could not create k8s client")
|
|
}
|
|
|
|
clusterService, err := k8s_support.NewK8sClusterService(localClient, args.Timeout)
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("Could not create k8s cluster service")
|
|
}
|
|
|
|
if err := clusterService.StartNotificationManager(ctx); err != nil {
|
|
log.Fatal().Err(err).Msg("Could not start notification manager")
|
|
}
|
|
|
|
go cli.StartEvent(args, "k8s", localClient, "")
|
|
hostService = clusterService
|
|
} else {
|
|
log.Fatal().Str("mode", args.Mode).Msg("Invalid mode")
|
|
}
|
|
|
|
// Create cloud tool client — does nothing until Notify() is called
|
|
apiKeyFunc := func() string {
|
|
if cc := hostService.CloudConfig(); cc != nil {
|
|
return cc.APIKey
|
|
}
|
|
return ""
|
|
}
|
|
|
|
var instanceID string
|
|
if h, err := hostService.LocalHost(); err == nil {
|
|
instanceID = h.ID
|
|
}
|
|
|
|
cloudHostService := newLocalCloudHostService(hostService)
|
|
|
|
cloudClient := cloud.NewClient(apiKeyFunc, instanceID, args.Version(), cloud.ToolDeps{
|
|
EnableActions: args.EnableActions,
|
|
HostService: cloudHostService,
|
|
Labels: args.Filter,
|
|
NotificationService: notificationService,
|
|
})
|
|
cloudClient.SetStreamLogsFunc(func() bool {
|
|
return hostService.CloudConfig().StreamLogsEnabled()
|
|
})
|
|
go cloudClient.Run(ctx)
|
|
|
|
// In swarm mode, peer broadcasts of cloud config should kick this
|
|
// replica's cloud client too, so every replica holds its own connection.
|
|
if mhs, ok := hostService.(*docker_support.MultiHostService); ok {
|
|
mhs.SetCloudNotifyFunc(cloudClient.Notify)
|
|
}
|
|
|
|
// If cloud is already configured at startup, start the client immediately
|
|
if apiKeyFunc() != "" {
|
|
cloudClient.Notify()
|
|
}
|
|
|
|
srv := createServer(args, hostService, web.CloudHooks{
|
|
OnSetup: cloudClient.Notify,
|
|
OnUpdate: cloudClient.Reconnect,
|
|
SearchLogs: cloudClient.SearchLogs,
|
|
})
|
|
|
|
go func() {
|
|
log.Info().Msgf("Accepting connections on %s", args.Addr)
|
|
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
|
|
log.Fatal().Err(err).Msg("failed to listen")
|
|
}
|
|
}()
|
|
|
|
<-ctx.Done()
|
|
stop()
|
|
log.Info().Msg("shutting down gracefully, press Ctrl+C again to force")
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
defer cancel()
|
|
if err := srv.Shutdown(ctx); err != nil {
|
|
log.Error().Err(err).Msg("failed to shut down")
|
|
}
|
|
log.Debug().Msg("shut down complete")
|
|
}
|
|
|
|
func fileExists(filename string) bool {
|
|
_, err := os.Stat(filename)
|
|
if os.IsNotExist(err) {
|
|
return false
|
|
}
|
|
return err == nil
|
|
}
|
|
|
|
func createServer(args cli.Args, hostService web.HostService, cloudHooks web.CloudHooks) *http.Server {
|
|
_, dev := os.LookupEnv("DEV")
|
|
|
|
var releaseCheckMode web.ReleaseCheckMode = web.Automatic
|
|
|
|
switch args.ReleaseCheckMode {
|
|
case "automatic":
|
|
releaseCheckMode = web.Automatic
|
|
case "manual":
|
|
releaseCheckMode = web.Manual
|
|
default:
|
|
log.Fatal().Str("releaseCheckMode", args.ReleaseCheckMode).Msg("Invalid release check mode")
|
|
}
|
|
|
|
var provider web.AuthProvider = web.NONE
|
|
var authorizer web.Authorizer
|
|
if args.AuthProvider == "forward-proxy" {
|
|
log.Debug().Msg("Using forward proxy authentication")
|
|
provider = web.FORWARD_PROXY
|
|
authorizer = auth.NewForwardProxyAuth(args.AuthHeaderUser, args.AuthHeaderEmail, args.AuthHeaderName, args.AuthHeaderFilter, args.AuthHeaderRoles)
|
|
} else if args.AuthProvider == "simple" {
|
|
log.Debug().Msg("Using simple authentication")
|
|
provider = web.SIMPLE
|
|
|
|
userFilePath := "./data/users.yml"
|
|
if !fileExists(userFilePath) {
|
|
userFilePath = "./data/users.yaml"
|
|
if !fileExists(userFilePath) {
|
|
log.Fatal().Msg("No users.yaml or users.yml file found.")
|
|
}
|
|
}
|
|
|
|
log.Debug().Msgf("Reading %s file", filepath.Base(userFilePath))
|
|
|
|
db, err := auth.ReadUsersFromFile(userFilePath)
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msgf("Could not read users file: %s", userFilePath)
|
|
}
|
|
|
|
log.Debug().Int("users", len(db.Users)).Msg("Loaded users")
|
|
ttl := time.Duration(0)
|
|
if args.AuthTTL != "session" {
|
|
ttl, err = time.ParseDuration(args.AuthTTL)
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("Could not parse auth ttl")
|
|
}
|
|
}
|
|
authorizer = auth.NewSimpleAuth(db, ttl)
|
|
}
|
|
|
|
authTTL := time.Duration(0)
|
|
|
|
if args.AuthTTL != "session" {
|
|
ttl, err := time.ParseDuration(args.AuthTTL)
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("Could not parse auth ttl")
|
|
}
|
|
authTTL = ttl
|
|
}
|
|
|
|
config := web.Config{
|
|
Addr: args.Addr,
|
|
Base: args.Base,
|
|
Version: args.Version(),
|
|
Hostname: args.Hostname,
|
|
NoAnalytics: args.NoAnalytics,
|
|
Dev: dev,
|
|
Mode: args.Mode,
|
|
Authorization: web.Authorization{
|
|
Provider: provider,
|
|
Authorizer: authorizer,
|
|
TTL: authTTL,
|
|
LogoutUrl: args.AuthLogoutUrl,
|
|
},
|
|
EnableActions: args.EnableActions,
|
|
EnableShell: args.EnableShell,
|
|
EnableMCP: args.EnableMCP,
|
|
DisableAvatars: args.DisableAvatars,
|
|
ReleaseCheckMode: releaseCheckMode,
|
|
Labels: args.Filter,
|
|
Cloud: cloudHooks,
|
|
}
|
|
|
|
assets, err := fs.Sub(content, "dist")
|
|
if err != nil {
|
|
log.Fatal().Err(err).Msg("Could not get sub filesystem")
|
|
}
|
|
|
|
if _, ok := os.LookupEnv("LIVE_FS"); ok {
|
|
if dev {
|
|
log.Info().Msg("Using live filesystem at ./public")
|
|
assets = os.DirFS("./public")
|
|
} else {
|
|
log.Info().Msg("Using live filesystem at ./dist")
|
|
assets = os.DirFS("./dist")
|
|
}
|
|
}
|
|
|
|
if !dev {
|
|
if _, err := assets.Open(".vite/manifest.json"); err != nil {
|
|
log.Fatal().Msg("manifest.json not found")
|
|
}
|
|
if _, err := assets.Open("index.html"); err != nil {
|
|
log.Fatal().Msg("index.html not found")
|
|
}
|
|
}
|
|
|
|
return web.CreateServer(hostService, assets, config)
|
|
}
|
|
|
|
// localCloudHostService scopes the cloud client to local docker only;
|
|
// otherwise every connection re-reports its peers, multiplying hosts
|
|
// by connection count on the cloud side.
|
|
type localCloudHostService struct {
|
|
services []container_support.ClientService
|
|
hostIDByIdx []string // parallel to services, populated lazily
|
|
hostIDOnce sync.Once
|
|
}
|
|
|
|
func newLocalCloudHostService(hs web.HostService) cloud.LogStreamHostService {
|
|
services := hs.LocalClientServices()
|
|
if len(services) == 0 {
|
|
// k8s has no docker LocalClientServices but its HostService already
|
|
// exposes only what this process can see, so use it directly.
|
|
return hs
|
|
}
|
|
return &localCloudHostService{services: services}
|
|
}
|
|
|
|
func (l *localCloudHostService) localHostTimeout() (context.Context, context.CancelFunc) {
|
|
return context.WithTimeout(context.Background(), 5*time.Second)
|
|
}
|
|
|
|
// resolveHostIDs caches each service's host ID once. Local docker host IDs
|
|
// are stable for the process lifetime, so a one-time lookup is enough.
|
|
func (l *localCloudHostService) resolveHostIDs() []string {
|
|
l.hostIDOnce.Do(func() {
|
|
l.hostIDByIdx = make([]string, len(l.services))
|
|
for i, s := range l.services {
|
|
ctx, cancel := l.localHostTimeout()
|
|
h, err := s.Host(ctx)
|
|
cancel()
|
|
if err == nil {
|
|
l.hostIDByIdx[i] = h.ID
|
|
}
|
|
}
|
|
})
|
|
return l.hostIDByIdx
|
|
}
|
|
|
|
func (l *localCloudHostService) Hosts() []container.Host {
|
|
hosts := make([]container.Host, 0, len(l.services))
|
|
for _, s := range l.services {
|
|
ctx, cancel := l.localHostTimeout()
|
|
h, err := s.Host(ctx)
|
|
cancel()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
h.Available = true
|
|
hosts = append(hosts, h)
|
|
}
|
|
return hosts
|
|
}
|
|
|
|
func (l *localCloudHostService) ListAllContainers(labels container.ContainerLabels) ([]container.Container, []error) {
|
|
var all []container.Container
|
|
var errs []error
|
|
for _, s := range l.services {
|
|
ctx, cancel := l.localHostTimeout()
|
|
list, err := s.ListContainers(ctx, labels)
|
|
cancel()
|
|
if err != nil {
|
|
errs = append(errs, err)
|
|
continue
|
|
}
|
|
all = append(all, list...)
|
|
}
|
|
return all, errs
|
|
}
|
|
|
|
func (l *localCloudHostService) FindContainer(host string, id string, labels container.ContainerLabels) (*container_support.ContainerService, error) {
|
|
hostIDs := l.resolveHostIDs()
|
|
for i, s := range l.services {
|
|
if hostIDs[i] != host {
|
|
continue
|
|
}
|
|
ctx, cancel := l.localHostTimeout()
|
|
cont, err := s.FindContainer(ctx, id, labels)
|
|
cancel()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return container_support.NewContainerService(s, cont), nil
|
|
}
|
|
return nil, fmt.Errorf("host %s not local to this process", host)
|
|
}
|
|
|
|
func (l *localCloudHostService) SubscribeContainersStarted(ctx context.Context, containers chan<- container.Container, filter container_support.ContainerFilter) {
|
|
// One inbound channel + forwarder goroutine per service so a slow consumer
|
|
// or a burst on one service can't cause the others to drop events.
|
|
for _, s := range l.services {
|
|
ch := make(chan container.Container, 64)
|
|
s.SubscribeContainersStarted(ctx, ch)
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case c := <-ch:
|
|
if filter(&c) {
|
|
select {
|
|
case containers <- c:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
}
|