diff --git a/CLAUDE.md b/CLAUDE.md index 4237f5a3..7ed12954 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -31,9 +31,6 @@ The application supports multiple deployment modes: standalone server, Docker Sw # Install dependencies pnpm install -# Install Go tools (protobuf, air hot-reloader) -make tools - # Generate certificates and protobuf files make generate ``` @@ -130,8 +127,18 @@ The Go backend is organized into these key packages: - Role-based authorization (`roles.go`) - **`internal/container/`** - Container domain models and interfaces + - `event_generator.go`: Log parsing and grouping logic (multi-line, JSON detection) -- **`main.go`** - Application entry point with mode switching (server/swarm/k8s) +- **`internal/notification/`** - Alert and notification system + - `manager.go`: Notification rule evaluation and dispatching + - `log_listener.go`: Log pattern matching for alerts + - `dispatcher/`: Notification channel implementations (email, webhook, etc.) + +- **`graph/`** - GraphQL API layer + - `schema.graphqls`: GraphQL schema definitions + - `*.resolvers.go`: GraphQL resolver implementations + +- **`main.go`** - Application entry point with mode switching (server/swarm/k8s/agent) ### Frontend (Vue 3) @@ -161,17 +168,23 @@ The frontend uses file-based routing with these conventions: - `ContainerTable.vue`: Container table with historical stat visualization - **`assets/stores/`** - Pinia stores (auto-imported) - - `config.ts`: App configuration and feature flags - - `container.ts`: Container state management + - `config.ts`: App configuration and feature flags (injected from backend HTML, frozen immutable) + - `container.ts`: Container state management with EventSource streaming (`/api/events/stream`) - `hosts.ts`: Multi-host state - - `settings.ts`: User preferences + - `settings.ts`: User preferences (localStorage-backed via profileStorage) + - `pinned.ts`: Pinned container logs for side-by-side viewing + - `swarm.ts`, `k8s.ts`: Deployment mode-specific state + - `announcements.ts`: Feature announcements - **`assets/composable/`** - Vue composables (auto-imported) - - `eventStreams.ts`: SSE connection management + - `eventStreams.ts`: SSE connection management with buffer-based flushing (250ms debounce) - `historicalLogs.ts`: Historical log fetching - - `logContext.ts`: Log filtering and search context - - `storage.ts`: LocalStorage abstractions + - `logContext.ts`: Log filtering and search context (provide/inject pattern) + - `scrollContext.ts`: Scroll state management (paused, progress, currentDate) + - `storage.ts`: LocalStorage abstractions with reactivity - `visible.ts`: Log filtering by visible keys for complex logs + - `containerActions.ts`: Container control operations + - `duckdb.ts`: DuckDB WASM for SQL queries on logs - **`assets/modules/`** - Vue plugins - `router.ts`: Vue Router configuration @@ -185,6 +198,7 @@ The frontend uses file-based routing with these conventions: 3. **Stats**: Real-time CPU/memory stats streamed via SSE alongside events 4. **Actions**: POST to `/api/hosts/{host}/containers/{id}/actions/{action}` (start/stop/restart) 5. **Terminal**: WebSocket connections for container attach/exec at `/api/hosts/{host}/containers/{id}/attach` +6. **GraphQL**: POST to `/api/graphql` for queries and mutations (container metadata, historical logs, notifications) ### Build System @@ -206,6 +220,8 @@ The frontend uses file-based routing with these conventions: - `ComplexLogEntry`: Structured JSON logs (`JSONObject`) - `GroupedLogEntry`: Multi-line grouped logs (`string[]`) - **Type consistency**: Use `LogMessage` type alias instead of `string | string[] | JSONObject` for log entry messages +- **Log Entry Factory Pattern**: Use `LogEntry.create(logEvent)` to instantiate the correct entry type based on `logEvent.t` field +- **EventSource Buffering**: Log streams use buffer-based flushing (250ms debounce, 1000ms max) to batch UI updates - **Charts/Visualizations**: Custom lightweight implementations (no D3.js) - `BarChart.vue`: Self-contained bar chart with responsive downsampling - Downsampling algorithm: Averages data into buckets based on available screen width @@ -217,6 +233,13 @@ The frontend uses file-based routing with these conventions: - Certificate generation is required (`make generate` creates shared_key.pem and shared_cert.pem) - Protocol buffer generation happens via `go generate` directive in `main.go` - Docker client uses API version negotiation for compatibility +- **GraphQL API**: Uses gqlgen with schema in `graph/schema.graphqls`, generated code in `graph/generated.go` + - Run `pnpm codegen` to regenerate GraphQL types + - Resolvers follow-schema layout in `graph/*.resolvers.go` +- **Service Layer Architecture**: + - `ClientService` interface abstracts Docker/K8s/Agent backends + - `MultiHostService` orchestrates multi-host operations + - `ClientManager` implementations: `RetriableClientManager` (server mode), `SwarmClientManager` (swarm mode) ### Authentication @@ -247,7 +270,168 @@ The frontend uses file-based routing with these conventions: ### Deployment Modes -- **Server mode**: Single or multi-host Docker monitoring +- **Server mode** (default): Single or multi-host Docker monitoring + - Uses `RetriableClientManager` with local + remote agent clients - **Swarm mode**: Automatic discovery of Swarm nodes via Docker API + - Creates gRPC agent server on each node (port 7007) + - Uses `SwarmClientManager` for node discovery - **K8s mode**: Pod log monitoring in Kubernetes cluster + - Implements `container.Client` interface via Kubernetes API - **Agent mode**: Lightweight gRPC agent for remote log collection + - Run with `dozzle agent` or `pnpm run agent:dev` + - Listens on port 7007 with TLS certificate authentication + +## Key Architectural Patterns + +### Backend Abstraction Layers + +The backend follows a clean layered architecture: + +``` +HTTP Handlers (internal/web) + ↓ +HostService Interface (MultiHostService) + ↓ +ClientService Interface (per host) + ↓ +container.Client Interface + ↓ +Implementation (DockerClient, K8sClient, AgentClient) +``` + +**When adding new container operations:** + +1. Define method in `container.Client` interface (`internal/container/client.go`) +2. Implement in `internal/docker/client.go` (and `internal/k8s/client.go` if applicable) +3. Add wrapper method in `ClientService` interface (`internal/support/container/service.go`) +4. Add HTTP handler in `internal/web/` with appropriate route + +### Frontend Data Flow + +**Real-time Log Viewing:** + +1. User navigates to `/container/{id}` route +2. Page component calls `useContainerStream(container)` composable +3. Composable creates EventSource connection to `/api/hosts/{host}/containers/{id}/logs/stream` +4. Backend streams `LogEvent` objects via SSE +5. Frontend buffers events (250ms debounce, max 1000ms) +6. Batched buffer flushes update reactive `messages` array +7. `LogViewer.vue` renders using appropriate component (`SimpleLogItem`, `ComplexLogItem`, `GroupedLogItem`) +8. When messages exceed `maxLogs` (400), oldest entries replaced or marked as `SkippedLogsEntry` + +**Stats Streaming:** + +1. `container.ts` store connects to `/api/events/stream` on app init +2. Backend multiplexes container events and stats into single SSE stream +3. `container-stat` events update `Container._stat` and append to `_statsHistory` +4. EMA calculation provides smoothed `movingAverageStat` (alpha=0.2) +5. `ContainerTable.vue` displays mini bar charts using `statsHistory` with downsampling + +### Protocol Buffer Flow (Agent Mode) + +1. Main server creates `agent.NewClient(endpoint, certs)` for each remote host +2. AgentClient implements `container.Client` interface +3. Method calls translate to gRPC requests defined in `protos/rpc.proto` +4. Remote agent receives gRPC call, delegates to local `DockerClient` +5. Streaming RPCs (logs, stats, events) use bidirectional channels +6. Responses converted back to domain models via `FromProto()` methods + +### Log Parsing Pipeline + +1. Docker API returns multiplexed stream (8-byte headers + payload) +2. `log_reader.go` parses headers, extracts stdout/stderr type +3. `event_generator.go` receives raw log lines +4. Detection logic identifies: + - JSON structure → `ComplexLogEntry` + - Multi-line patterns (stack traces) → `GroupedLogEntry` + - Single lines → `SimpleLogEntry` +5. Log level extraction via regex patterns +6. `LogEvent` serialized to JSON and sent via SSE +7. Frontend deserializes and renders with appropriate component + +## Adding New Features + +### Adding a New HTTP Route + +1. Define route in `internal/web/routes.go` using chi router: + ```go + r.Get("/api/custom-endpoint", h.customHandler) + ``` +2. Implement handler method in appropriate file (e.g., `actions.go`, `logs.go`) +3. Use `hostService` to find container/host via `FindContainer()` or `FindHost()` +4. Return JSON response or establish SSE/WebSocket stream + +### Adding a New Log View Type + +1. Create route file in `assets/pages/` (e.g., `custom/[id].vue`) +2. Create composable in `assets/composable/eventStreams.ts` (e.g., `useCustomStream()`) +3. Composable should: + - Build API URL with appropriate filters + - Create EventSource connection + - Handle buffering and message batching + - Return reactive `messages` array and control methods +4. Use `LogViewer.vue` component to render messages +5. Add backend API endpoint if needed (see above) + +### Adding a New GraphQL Query/Mutation + +1. Define in `graph/schema.graphqls` +2. Run `pnpm codegen` to regenerate types +3. Implement resolver in `graph/schema.resolvers.go` +4. Use `hostService` from resolver context to access backend services +5. Frontend calls via urql client (auto-imported via `@urql/vue`) + +### Adding Container Stats/Metrics + +1. Add field to `Stat` type in `internal/container/types.go` +2. Update `stats_collector.go` to extract metric from Docker API response +3. Add calculation logic in `docker/calculation.go` if needed +4. Ensure protobuf definition includes field in `protos/rpc.proto` +5. Frontend automatically receives updates via existing SSE stream +6. Update `Container` model in `assets/models/Container.ts` if UI needs access + +### Working with Notifications/Alerts + +**Backend** (`internal/notification/`): + +- `manager.go`: Rule evaluation engine, manages alert state +- `log_listener.go`: Subscribes to container log streams, evaluates rules against incoming logs +- `types.go`: Alert rule definitions (log pattern matching, thresholds) +- `dispatcher/`: Notification channel implementations + +**Frontend** (`assets/pages/notifications.vue`, `assets/components/Notification/`): + +- `AlertForm.vue`, `DestinationForm.vue`: UI for creating rules +- Rules stored via GraphQL mutations +- Alert state displayed in notification cards + +**Adding a new notification channel:** + +1. Implement dispatcher interface in `internal/notification/dispatcher/` +2. Register in `manager.go` dispatcher factory +3. Add UI form in `assets/components/Notification/DestinationForm.vue` +4. Add GraphQL schema fields if needed + +## Common Development Patterns + +### Testing + +- Always run Go tests with race detector: `go test -race` +- Frontend tests require `TZ=UTC` for timestamp consistency +- Integration tests use Playwright with `make int` (runs docker-compose setup) +- Use `testify/assert` for Go test assertions + +### Hot Reload Development + +- `make dev` runs both backend (air) and frontend (vite) with hot reload +- `DEV=true` disables embedded asset serving +- `LIVE_FS=true` serves assets from filesystem instead of embedded +- Backend changes trigger air restart automatically +- Frontend changes trigger vite HMR + +### Debugging + +- Backend logs: Set `--level debug` flag or `DOZZLE_LEVEL=debug` env var +- Frontend: Vue DevTools browser extension +- GraphQL: Use GraphQL Playground at `/api/graphql` (when enabled) +- SSE streams: Browser DevTools Network tab shows EventSource connections diff --git a/Makefile b/Makefile index 68efa84b..d84d9d5d 100644 --- a/Makefile +++ b/Makefile @@ -27,7 +27,7 @@ build: dist generate .PHONY: docker docker: shared_key.pem shared_cert.pem - @docker build --build-arg TAG=local -t amir20/dozzle . + @docker build --build-arg TAG=local -t amir20/dozzle:local . .PHONY: generate generate: shared_key.pem shared_cert.pem @@ -55,7 +55,18 @@ push: docker @docker push amir20/dozzle:local-test run: docker - docker run -it --rm -p 8080:8080 -v /var/run/docker.sock:/var/run/docker.sock amir20/dozzle:latest + docker run -it --rm -p 8080:8080 -v /var/run/docker.sock:/var/run/docker.sock amir20/dozzle:local preview: build pnpm preview + +.PHONY: agent-reload +agent-reload: docker + @VM_NAME=$${VM_NAME:-dozzle-agent}; \ + echo "📦 Loading image into VM $$VM_NAME..."; \ + docker save amir20/dozzle:local | orb exec -m $$VM_NAME docker load; \ + echo "🔄 Recreating agent..."; \ + orb exec -m $$VM_NAME docker stop dozzle-agent || true; \ + orb exec -m $$VM_NAME docker rm dozzle-agent || true; \ + orb exec -m $$VM_NAME docker run -d --name dozzle-agent -p 7007:7007 -v /var/run/docker.sock:/var/run/docker.sock -v ~/dozzle-certs:/certs -v ~/dozzle-data:/data -e DOZZLE_LEVEL=debug amir20/dozzle:local agent --cert /certs/shared_cert.pem --key /certs/shared_key.pem; \ + echo "✅ Agent reloaded" diff --git a/examples/setup-remote-agent.sh b/examples/setup-remote-agent.sh index cb7dfdf6..966fc005 100755 --- a/examples/setup-remote-agent.sh +++ b/examples/setup-remote-agent.sh @@ -1,14 +1,39 @@ #!/bin/bash set -e +# Parse arguments +USE_LOCAL=false +POSITIONAL_ARGS=() + +while [[ $# -gt 0 ]]; do + case $1 in + --local) + USE_LOCAL=true + shift + ;; + *) + POSITIONAL_ARGS+=("$1") + shift + ;; + esac +done + # Configuration -VM_NAME="${1:-dozzle-agent}" -DISTRO="${2:-ubuntu}" -AGENT_PORT="${3:-7007}" +VM_NAME="${POSITIONAL_ARGS[0]:-dozzle-agent}" +DISTRO="${POSITIONAL_ARGS[1]:-ubuntu}" +AGENT_PORT="${POSITIONAL_ARGS[2]:-7007}" SHARED_CERT="./shared_cert.pem" SHARED_KEY="./shared_key.pem" +DOZZLE_IMAGE="amir20/dozzle:latest" + +if [ "$USE_LOCAL" = true ]; then + DOZZLE_IMAGE="amir20/dozzle:local" +fi echo "🚀 Setting up Dozzle Agent on OrbStack VM: $VM_NAME" +if [ "$USE_LOCAL" = true ]; then + echo " Using locally built image" +fi # Verify shared certificates exist if [ ! -f "$SHARED_CERT" ]; then @@ -40,10 +65,7 @@ sleep 3 # Step 2: Install Docker in the VM echo "🐳 Installing Docker..." -if ! orb exec -m "$VM_NAME" bash -c ' -curl -fsSL https://get.docker.com | sh -sudo usermod -aG docker $(whoami) -'; then +if ! orb exec -m "$VM_NAME" bash -c 'curl -fsSL https://get.docker.com | sh && sudo usermod -aG docker $(whoami)'; then echo "❌ Docker installation failed" exit 1 fi @@ -62,17 +84,44 @@ cat "$SHARED_KEY" | orb exec -m "$VM_NAME" bash -c 'cat > ~/dozzle-certs/shared_ echo "✅ Certificates copied" -# Step 4: Start Dozzle agent +# Step 4: Load or pull Dozzle image +if [ "$USE_LOCAL" = true ]; then + echo "🔨 Building local Docker image..." + SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" + + if ! (cd "$PROJECT_ROOT" && make docker); then + echo "❌ Failed to build Docker image" + exit 1 + fi + + echo "📦 Loading image into VM..." + if ! docker save amir20/dozzle:local | orb exec -m "$VM_NAME" docker load; then + echo "❌ Failed to load image into VM" + exit 1 + fi + echo "✅ Local image loaded" +else + echo "📥 Pulling Dozzle image..." + if ! orb exec -m "$VM_NAME" docker pull amir20/dozzle:latest; then + echo "❌ Failed to pull image" + exit 1 + fi +fi + +# Step 5: Start Dozzle agent echo "🎯 Starting Dozzle agent..." +orb exec -m "$VM_NAME" bash -c 'mkdir -p ~/dozzle-data' if ! orb exec -m "$VM_NAME" bash -c " set -e -docker pull amir20/dozzle:latest docker run -d --name dozzle-agent \ --restart unless-stopped \ -v /var/run/docker.sock:/var/run/docker.sock \ -v ~/dozzle-certs:/certs \ + -v ~/dozzle-data:/data \ -p $AGENT_PORT:7007 \ - amir20/dozzle:latest agent \ + -e DOZZLE_LEVEL=debug \ + $DOZZLE_IMAGE agent \ --cert /certs/shared_cert.pem \ --key /certs/shared_key.pem "; then @@ -82,11 +131,11 @@ fi echo "✅ Dozzle agent started" -# Step 5: Wait for agent to be ready +# Step 6: Wait for agent to be ready echo "⏳ Waiting for agent to be ready..." sleep 3 -# Step 6: Verify agent is running +# Step 7: Verify agent is running echo "🧪 Verifying agent is running..." if orb exec -m "$VM_NAME" docker ps --filter name=dozzle-agent --format "{{.Status}}" | grep -q "Up"; then echo "✅ Agent is running" diff --git a/internal/agent/client.go b/internal/agent/client.go index 807a4674..c20b8295 100644 --- a/internal/agent/client.go +++ b/internal/agent/client.go @@ -8,12 +8,14 @@ import ( "errors" "fmt" "io" + "sync" "time" "encoding/json" "github.com/amir20/dozzle/internal/agent/pb" "github.com/amir20/dozzle/internal/container" + "github.com/amir20/dozzle/types" "github.com/rs/zerolog/log" orderedmap "github.com/wk8/go-ordered-map/v2" "google.golang.org/grpc" @@ -365,30 +367,41 @@ func (c *Client) ContainerAttach(ctx context.Context, containerId string) (*cont defer stdoutWriter.Close() for { - msg, err := stream.Recv() - if err != nil { + select { + case <-ctx.Done(): return - } + default: + msg, err := stream.Recv() + if err != nil { + return + } - stdoutWriter.Write(msg.Stdout) + stdoutWriter.Write(msg.Stdout) + } } }() go func() { + defer stdinReader.Close() buffer := make([]byte, 1024) for { - n, err := stdinReader.Read(buffer) - if err != nil { + select { + case <-ctx.Done(): return - } + default: + n, err := stdinReader.Read(buffer) + if err != nil { + return + } - if err := stream.Send(&pb.ContainerAttachRequest{ - Payload: &pb.ContainerAttachRequest_Stdin{ - Stdin: buffer[:n], - }, - }); err != nil { - return + if err := stream.Send(&pb.ContainerAttachRequest{ + Payload: &pb.ContainerAttachRequest_Stdin{ + Stdin: buffer[:n], + }, + }); err != nil { + return + } } } }() @@ -412,76 +425,101 @@ func (c *Client) ContainerAttach(ctx context.Context, containerId string) (*cont }, nil } -func (c *Client) ContainerExec(ctx context.Context, containerId string, cmd []string) (*container.ExecSession, error) { +func (c *Client) Exec(ctx context.Context, containerId string, cmd []string, events container.ExecEventReader, stdout io.Writer) error { stream, err := c.client.ContainerExec(ctx) if err != nil { - return nil, err + return err } if err = stream.Send(&pb.ContainerExecRequest{ ContainerId: containerId, Command: cmd, }); err != nil { - return nil, err + return err } - stdoutReader, stdoutWriter := io.Pipe() - stdinReader, stdinWriter := io.Pipe() - go func() { - defer stdoutWriter.Close() + var wg sync.WaitGroup + // Read from gRPC stream and write to stdout + wg.Go(func() { for { msg, err := stream.Recv() if err != nil { return } - - stdoutWriter.Write(msg.Stdout) + stdout.Write(msg.Stdout) } - }() - - go func() { - buffer := make([]byte, 1024) + }) + // Read events and convert to gRPC messages + wg.Go(func() { for { - n, err := stdinReader.Read(buffer) + event, err := events.ReadEvent() if err != nil { return } - if err := stream.Send(&pb.ContainerExecRequest{ - Payload: &pb.ContainerExecRequest_Stdin{ - Stdin: buffer[:n], - }, - }); err != nil { - return + switch event.Type { + case "userinput": + stream.Send(&pb.ContainerExecRequest{ + Payload: &pb.ContainerExecRequest_Stdin{ + Stdin: []byte(event.Data), + }, + }) + case "resize": + stream.Send(&pb.ContainerExecRequest{ + Payload: &pb.ContainerExecRequest_Resize{ + Resize: &pb.ResizePayload{ + Width: uint32(event.Width), + Height: uint32(event.Height), + }, + }, + }) } } - }() + }) - // Create resize closure that sends via gRPC - resizeFn := func(width uint, height uint) error { - return stream.Send(&pb.ContainerExecRequest{ - Payload: &pb.ContainerExecRequest_Resize{ - Resize: &pb.ResizePayload{ - Width: uint32(width), - Height: uint32(height), - }, - }, - }) - } - - return &container.ExecSession{ - Writer: stdinWriter, - Reader: stdoutReader, - Resize: resizeFn, - }, nil + wg.Wait() + return nil } func (c *Client) Close() error { return c.conn.Close() } +func (c *Client) UpdateNotificationConfig(ctx context.Context, subscriptions []types.SubscriptionConfig, dispatchers []types.DispatcherConfig) error { + // Convert to proto + pbSubs := make([]*pb.NotificationSubscription, len(subscriptions)) + for i, sub := range subscriptions { + pbSubs[i] = &pb.NotificationSubscription{ + Id: int32(sub.ID), + Name: sub.Name, + Enabled: sub.Enabled, + DispatcherId: int32(sub.DispatcherID), + LogExpression: sub.LogExpression, + ContainerExpression: sub.ContainerExpression, + } + } + + pbDispatchers := make([]*pb.NotificationDispatcher, len(dispatchers)) + for i, d := range dispatchers { + pbDispatchers[i] = &pb.NotificationDispatcher{ + Id: int32(d.ID), + Name: d.Name, + Type: d.Type, + Url: d.URL, + Template: d.Template, + } + } + + _, err := c.client.UpdateNotificationConfig(ctx, &pb.UpdateNotificationConfigRequest{ + Subscriptions: pbSubs, + Dispatchers: pbDispatchers, + }) + + return err +} + func jsonBytesToOrderedMap(b []byte) *orderedmap.OrderedMap[string, any] { var data *orderedmap.OrderedMap[string, any] reader := bytes.NewReader(b) diff --git a/internal/agent/client_test.go b/internal/agent/client_test.go index 029edeb8..48a2ed59 100644 --- a/internal/agent/client_test.go +++ b/internal/agent/client_test.go @@ -13,7 +13,6 @@ import ( "github.com/amir20/dozzle/internal/container" "github.com/amir20/dozzle/internal/utils" - "github.com/docker/docker/api/types/system" "github.com/go-faker/faker/v4" "github.com/go-faker/faker/v4/pkg/options" "github.com/stretchr/testify/assert" @@ -27,58 +26,67 @@ const bufSize = 1024 * 1024 var lis *bufconn.Listener var certs tls.Certificate -var client *MockedClient +var mockService *MockedClientService -type MockedClient struct { +type MockedClientService struct { mock.Mock - container.Client } -func (m *MockedClient) FindContainer(ctx context.Context, id string) (container.Container, error) { - args := m.Called(ctx, id) +func (m *MockedClientService) FindContainer(ctx context.Context, id string, labels container.ContainerLabels) (container.Container, error) { + args := m.Called(ctx, id, labels) return args.Get(0).(container.Container), args.Error(1) } -func (m *MockedClient) ContainerActions(ctx context.Context, action container.ContainerAction, containerID string) error { - args := m.Called(ctx, action, containerID) - return args.Error(0) -} - -func (m *MockedClient) ContainerEvents(ctx context.Context, events chan<- container.ContainerEvent) error { - args := m.Called(ctx, events) - return args.Error(0) -} - -func (m *MockedClient) ListContainers(ctx context.Context, filter container.ContainerLabels) ([]container.Container, error) { +func (m *MockedClientService) ListContainers(ctx context.Context, filter container.ContainerLabels) ([]container.Container, error) { args := m.Called(ctx, filter) return args.Get(0).([]container.Container), args.Error(1) } -func (m *MockedClient) ContainerLogs(ctx context.Context, id string, since time.Time, stdType container.StdType) (io.ReadCloser, error) { - args := m.Called(ctx, id, since, stdType) +func (m *MockedClientService) Host(ctx context.Context) (container.Host, error) { + args := m.Called(ctx) + return args.Get(0).(container.Host), args.Error(1) +} + +func (m *MockedClientService) ContainerAction(ctx context.Context, c container.Container, action container.ContainerAction) error { + args := m.Called(ctx, c, action) + return args.Error(0) +} + +func (m *MockedClientService) LogsBetweenDates(ctx context.Context, c container.Container, from time.Time, to time.Time, stdTypes container.StdType) (<-chan *container.LogEvent, error) { + args := m.Called(ctx, c, from, to, stdTypes) + return args.Get(0).(<-chan *container.LogEvent), args.Error(1) +} + +func (m *MockedClientService) RawLogs(ctx context.Context, c container.Container, from time.Time, to time.Time, stdTypes container.StdType) (io.ReadCloser, error) { + args := m.Called(ctx, c, from, to, stdTypes) return args.Get(0).(io.ReadCloser), args.Error(1) } -func (m *MockedClient) ContainerStats(context.Context, string, chan<- container.ContainerStat) error { - return nil +func (m *MockedClientService) SubscribeStats(ctx context.Context, stats chan<- container.ContainerStat) { + m.Called(ctx, stats) } -func (m *MockedClient) ContainerLogsBetweenDates(ctx context.Context, id string, from time.Time, to time.Time, stdType container.StdType) (io.ReadCloser, error) { - args := m.Called(ctx, id, from, to, stdType) - return args.Get(0).(io.ReadCloser), args.Error(1) +func (m *MockedClientService) SubscribeEvents(ctx context.Context, events chan<- container.ContainerEvent) { + m.Called(ctx, events) } -func (m *MockedClient) Host() container.Host { - args := m.Called() - return args.Get(0).(container.Host) +func (m *MockedClientService) SubscribeContainersStarted(ctx context.Context, containers chan<- container.Container) { + m.Called(ctx, containers) } -func (m *MockedClient) IsSwarmMode() bool { - return false +func (m *MockedClientService) StreamLogs(ctx context.Context, c container.Container, from time.Time, stdTypes container.StdType, events chan<- *container.LogEvent) error { + args := m.Called(ctx, c, from, stdTypes, events) + return args.Error(0) } -func (m *MockedClient) SystemInfo() system.Info { - return system.Info{ID: "123"} +func (m *MockedClientService) Attach(ctx context.Context, c container.Container, events container.ExecEventReader, stdout io.Writer) error { + args := m.Called(ctx, c, events, stdout) + return args.Error(0) +} + +func (m *MockedClientService) Exec(ctx context.Context, c container.Container, cmd []string, events container.ExecEventReader, stdout io.Writer) error { + args := m.Called(ctx, c, cmd, events, stdout) + return args.Error(0) } var wantedContainer = container.Container{} @@ -103,28 +111,30 @@ func init() { panic(err) } - client = &MockedClient{} - client.On("ListContainers", mock.Anything, mock.Anything).Return([]container.Container{ - { - ID: "123456", - Name: "test", - Host: "localhost", - State: "running", - }, + mockService = &MockedClientService{} + mockService.On("ListContainers", mock.Anything, mock.Anything).Return([]container.Container{ + wantedContainer, }, nil) - client.On("Host").Return(container.Host{ + mockService.On("Host", mock.Anything).Return(container.Host{ ID: "localhost", Endpoint: "local", Name: "local", - }) + }, nil) - client.On("ContainerEvents", mock.Anything, mock.AnythingOfType("chan<- container.ContainerEvent")).Return(nil).Run(func(args mock.Arguments) { + mockService.On("SubscribeEvents", mock.Anything, mock.AnythingOfType("chan<- container.ContainerEvent")).Return().Run(func(args mock.Arguments) { time.Sleep(5 * time.Second) }) - client.On("FindContainer", mock.Anything, "123456").Return(wantedContainer, nil) - server, _ := NewServer(client, certs, "test", container.ContainerLabels{}) + mockService.On("SubscribeStats", mock.Anything, mock.AnythingOfType("chan<- container.ContainerStat")).Return() + + mockService.On("SubscribeContainersStarted", mock.Anything, mock.AnythingOfType("chan<- container.Container")).Return() + + mockService.On("FindContainer", mock.Anything, "123456", mock.Anything).Return(wantedContainer, nil) + + mockService.On("Client").Return(nil) + + server, _ := NewServer(mockService, certs, "test", nil) go server.Serve(lis) } diff --git a/internal/agent/pb/rpc.pb.go b/internal/agent/pb/rpc.pb.go index 5b63e261..eebdb59d 100644 --- a/internal/agent/pb/rpc.pb.go +++ b/internal/agent/pb/rpc.pb.go @@ -1270,11 +1270,99 @@ func (x *ContainerAttachResponse) GetStdout() []byte { return nil } +type UpdateNotificationConfigRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Subscriptions []*NotificationSubscription `protobuf:"bytes,1,rep,name=subscriptions,proto3" json:"subscriptions,omitempty"` + Dispatchers []*NotificationDispatcher `protobuf:"bytes,2,rep,name=dispatchers,proto3" json:"dispatchers,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *UpdateNotificationConfigRequest) Reset() { + *x = UpdateNotificationConfigRequest{} + mi := &file_rpc_proto_msgTypes[25] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *UpdateNotificationConfigRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UpdateNotificationConfigRequest) ProtoMessage() {} + +func (x *UpdateNotificationConfigRequest) ProtoReflect() protoreflect.Message { + mi := &file_rpc_proto_msgTypes[25] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UpdateNotificationConfigRequest.ProtoReflect.Descriptor instead. +func (*UpdateNotificationConfigRequest) Descriptor() ([]byte, []int) { + return file_rpc_proto_rawDescGZIP(), []int{25} +} + +func (x *UpdateNotificationConfigRequest) GetSubscriptions() []*NotificationSubscription { + if x != nil { + return x.Subscriptions + } + return nil +} + +func (x *UpdateNotificationConfigRequest) GetDispatchers() []*NotificationDispatcher { + if x != nil { + return x.Dispatchers + } + return nil +} + +type UpdateNotificationConfigResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *UpdateNotificationConfigResponse) Reset() { + *x = UpdateNotificationConfigResponse{} + mi := &file_rpc_proto_msgTypes[26] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *UpdateNotificationConfigResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UpdateNotificationConfigResponse) ProtoMessage() {} + +func (x *UpdateNotificationConfigResponse) ProtoReflect() protoreflect.Message { + mi := &file_rpc_proto_msgTypes[26] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UpdateNotificationConfigResponse.ProtoReflect.Descriptor instead. +func (*UpdateNotificationConfigResponse) Descriptor() ([]byte, []int) { + return file_rpc_proto_rawDescGZIP(), []int{26} +} + var File_rpc_proto protoreflect.FileDescriptor const file_rpc_proto_rawDesc = "" + "\n" + - "\trpc.proto\x12\bprotobuf\x1a\vtypes.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"\xb1\x01\n" + + "\trpc.proto\x12\bprotobuf\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\vtypes.proto\"\xb1\x01\n" + "\x15ListContainersRequest\x12C\n" + "\x06filter\x18\x01 \x03(\v2+.protobuf.ListContainersRequest.FilterEntryR\x06filter\x1aS\n" + "\vFilterEntry\x12\x10\n" + @@ -1345,7 +1433,11 @@ const file_rpc_proto_rawDesc = "" + "\x06resize\x18\x03 \x01(\v2\x17.protobuf.ResizePayloadH\x00R\x06resizeB\t\n" + "\apayload\"1\n" + "\x17ContainerAttachResponse\x12\x16\n" + - "\x06stdout\x18\x01 \x01(\fR\x06stdout2\xa1\b\n" + + "\x06stdout\x18\x01 \x01(\fR\x06stdout\"\xaf\x01\n" + + "\x1fUpdateNotificationConfigRequest\x12H\n" + + "\rsubscriptions\x18\x01 \x03(\v2\".protobuf.NotificationSubscriptionR\rsubscriptions\x12B\n" + + "\vdispatchers\x18\x02 \x03(\v2 .protobuf.NotificationDispatcherR\vdispatchers\"\"\n" + + " UpdateNotificationConfigResponse2\x96\t\n" + "\fAgentService\x12U\n" + "\x0eListContainers\x12\x1f.protobuf.ListContainersRequest\x1a .protobuf.ListContainersResponse\"\x00\x12R\n" + "\rFindContainer\x12\x1e.protobuf.FindContainerRequest\x1a\x1f.protobuf.FindContainerResponse\"\x00\x12K\n" + @@ -1359,7 +1451,8 @@ const file_rpc_proto_rawDesc = "" + "\bHostInfo\x12\x19.protobuf.HostInfoRequest\x1a\x1a.protobuf.HostInfoResponse\"\x00\x12X\n" + "\x0fContainerAction\x12 .protobuf.ContainerActionRequest\x1a!.protobuf.ContainerActionResponse\"\x00\x12V\n" + "\rContainerExec\x12\x1e.protobuf.ContainerExecRequest\x1a\x1f.protobuf.ContainerExecResponse\"\x00(\x010\x01\x12\\\n" + - "\x0fContainerAttach\x12 .protobuf.ContainerAttachRequest\x1a!.protobuf.ContainerAttachResponse\"\x00(\x010\x01B\x13Z\x11internal/agent/pbb\x06proto3" + "\x0fContainerAttach\x12 .protobuf.ContainerAttachRequest\x1a!.protobuf.ContainerAttachResponse\"\x00(\x010\x01\x12s\n" + + "\x18UpdateNotificationConfig\x12).protobuf.UpdateNotificationConfigRequest\x1a*.protobuf.UpdateNotificationConfigResponse\"\x00B\x13Z\x11internal/agent/pbb\x06proto3" var ( file_rpc_proto_rawDescOnce sync.Once @@ -1373,92 +1466,100 @@ func file_rpc_proto_rawDescGZIP() []byte { return file_rpc_proto_rawDescData } -var file_rpc_proto_msgTypes = make([]protoimpl.MessageInfo, 27) +var file_rpc_proto_msgTypes = make([]protoimpl.MessageInfo, 29) var file_rpc_proto_goTypes = []any{ - (*ListContainersRequest)(nil), // 0: protobuf.ListContainersRequest - (*RepeatedString)(nil), // 1: protobuf.RepeatedString - (*ListContainersResponse)(nil), // 2: protobuf.ListContainersResponse - (*FindContainerRequest)(nil), // 3: protobuf.FindContainerRequest - (*FindContainerResponse)(nil), // 4: protobuf.FindContainerResponse - (*StreamLogsRequest)(nil), // 5: protobuf.StreamLogsRequest - (*StreamLogsResponse)(nil), // 6: protobuf.StreamLogsResponse - (*LogsBetweenDatesRequest)(nil), // 7: protobuf.LogsBetweenDatesRequest - (*StreamRawBytesRequest)(nil), // 8: protobuf.StreamRawBytesRequest - (*StreamRawBytesResponse)(nil), // 9: protobuf.StreamRawBytesResponse - (*StreamEventsRequest)(nil), // 10: protobuf.StreamEventsRequest - (*StreamEventsResponse)(nil), // 11: protobuf.StreamEventsResponse - (*StreamStatsRequest)(nil), // 12: protobuf.StreamStatsRequest - (*StreamStatsResponse)(nil), // 13: protobuf.StreamStatsResponse - (*HostInfoRequest)(nil), // 14: protobuf.HostInfoRequest - (*HostInfoResponse)(nil), // 15: protobuf.HostInfoResponse - (*StreamContainerStartedRequest)(nil), // 16: protobuf.StreamContainerStartedRequest - (*StreamContainerStartedResponse)(nil), // 17: protobuf.StreamContainerStartedResponse - (*ContainerActionRequest)(nil), // 18: protobuf.ContainerActionRequest - (*ContainerActionResponse)(nil), // 19: protobuf.ContainerActionResponse - (*ContainerExecRequest)(nil), // 20: protobuf.ContainerExecRequest - (*ResizePayload)(nil), // 21: protobuf.ResizePayload - (*ContainerExecResponse)(nil), // 22: protobuf.ContainerExecResponse - (*ContainerAttachRequest)(nil), // 23: protobuf.ContainerAttachRequest - (*ContainerAttachResponse)(nil), // 24: protobuf.ContainerAttachResponse - nil, // 25: protobuf.ListContainersRequest.FilterEntry - nil, // 26: protobuf.FindContainerRequest.FilterEntry - (*Container)(nil), // 27: protobuf.Container - (*timestamppb.Timestamp)(nil), // 28: google.protobuf.Timestamp - (*LogEvent)(nil), // 29: protobuf.LogEvent - (*ContainerEvent)(nil), // 30: protobuf.ContainerEvent - (*ContainerStat)(nil), // 31: protobuf.ContainerStat - (*Host)(nil), // 32: protobuf.Host - (ContainerAction)(0), // 33: protobuf.ContainerAction + (*ListContainersRequest)(nil), // 0: protobuf.ListContainersRequest + (*RepeatedString)(nil), // 1: protobuf.RepeatedString + (*ListContainersResponse)(nil), // 2: protobuf.ListContainersResponse + (*FindContainerRequest)(nil), // 3: protobuf.FindContainerRequest + (*FindContainerResponse)(nil), // 4: protobuf.FindContainerResponse + (*StreamLogsRequest)(nil), // 5: protobuf.StreamLogsRequest + (*StreamLogsResponse)(nil), // 6: protobuf.StreamLogsResponse + (*LogsBetweenDatesRequest)(nil), // 7: protobuf.LogsBetweenDatesRequest + (*StreamRawBytesRequest)(nil), // 8: protobuf.StreamRawBytesRequest + (*StreamRawBytesResponse)(nil), // 9: protobuf.StreamRawBytesResponse + (*StreamEventsRequest)(nil), // 10: protobuf.StreamEventsRequest + (*StreamEventsResponse)(nil), // 11: protobuf.StreamEventsResponse + (*StreamStatsRequest)(nil), // 12: protobuf.StreamStatsRequest + (*StreamStatsResponse)(nil), // 13: protobuf.StreamStatsResponse + (*HostInfoRequest)(nil), // 14: protobuf.HostInfoRequest + (*HostInfoResponse)(nil), // 15: protobuf.HostInfoResponse + (*StreamContainerStartedRequest)(nil), // 16: protobuf.StreamContainerStartedRequest + (*StreamContainerStartedResponse)(nil), // 17: protobuf.StreamContainerStartedResponse + (*ContainerActionRequest)(nil), // 18: protobuf.ContainerActionRequest + (*ContainerActionResponse)(nil), // 19: protobuf.ContainerActionResponse + (*ContainerExecRequest)(nil), // 20: protobuf.ContainerExecRequest + (*ResizePayload)(nil), // 21: protobuf.ResizePayload + (*ContainerExecResponse)(nil), // 22: protobuf.ContainerExecResponse + (*ContainerAttachRequest)(nil), // 23: protobuf.ContainerAttachRequest + (*ContainerAttachResponse)(nil), // 24: protobuf.ContainerAttachResponse + (*UpdateNotificationConfigRequest)(nil), // 25: protobuf.UpdateNotificationConfigRequest + (*UpdateNotificationConfigResponse)(nil), // 26: protobuf.UpdateNotificationConfigResponse + nil, // 27: protobuf.ListContainersRequest.FilterEntry + nil, // 28: protobuf.FindContainerRequest.FilterEntry + (*Container)(nil), // 29: protobuf.Container + (*timestamppb.Timestamp)(nil), // 30: google.protobuf.Timestamp + (*LogEvent)(nil), // 31: protobuf.LogEvent + (*ContainerEvent)(nil), // 32: protobuf.ContainerEvent + (*ContainerStat)(nil), // 33: protobuf.ContainerStat + (*Host)(nil), // 34: protobuf.Host + (ContainerAction)(0), // 35: protobuf.ContainerAction + (*NotificationSubscription)(nil), // 36: protobuf.NotificationSubscription + (*NotificationDispatcher)(nil), // 37: protobuf.NotificationDispatcher } var file_rpc_proto_depIdxs = []int32{ - 25, // 0: protobuf.ListContainersRequest.filter:type_name -> protobuf.ListContainersRequest.FilterEntry - 27, // 1: protobuf.ListContainersResponse.containers:type_name -> protobuf.Container - 26, // 2: protobuf.FindContainerRequest.filter:type_name -> protobuf.FindContainerRequest.FilterEntry - 27, // 3: protobuf.FindContainerResponse.container:type_name -> protobuf.Container - 28, // 4: protobuf.StreamLogsRequest.since:type_name -> google.protobuf.Timestamp - 29, // 5: protobuf.StreamLogsResponse.event:type_name -> protobuf.LogEvent - 28, // 6: protobuf.LogsBetweenDatesRequest.since:type_name -> google.protobuf.Timestamp - 28, // 7: protobuf.LogsBetweenDatesRequest.until:type_name -> google.protobuf.Timestamp - 28, // 8: protobuf.StreamRawBytesRequest.since:type_name -> google.protobuf.Timestamp - 28, // 9: protobuf.StreamRawBytesRequest.until:type_name -> google.protobuf.Timestamp - 30, // 10: protobuf.StreamEventsResponse.event:type_name -> protobuf.ContainerEvent - 31, // 11: protobuf.StreamStatsResponse.stat:type_name -> protobuf.ContainerStat - 32, // 12: protobuf.HostInfoResponse.host:type_name -> protobuf.Host - 27, // 13: protobuf.StreamContainerStartedResponse.container:type_name -> protobuf.Container - 33, // 14: protobuf.ContainerActionRequest.action:type_name -> protobuf.ContainerAction + 27, // 0: protobuf.ListContainersRequest.filter:type_name -> protobuf.ListContainersRequest.FilterEntry + 29, // 1: protobuf.ListContainersResponse.containers:type_name -> protobuf.Container + 28, // 2: protobuf.FindContainerRequest.filter:type_name -> protobuf.FindContainerRequest.FilterEntry + 29, // 3: protobuf.FindContainerResponse.container:type_name -> protobuf.Container + 30, // 4: protobuf.StreamLogsRequest.since:type_name -> google.protobuf.Timestamp + 31, // 5: protobuf.StreamLogsResponse.event:type_name -> protobuf.LogEvent + 30, // 6: protobuf.LogsBetweenDatesRequest.since:type_name -> google.protobuf.Timestamp + 30, // 7: protobuf.LogsBetweenDatesRequest.until:type_name -> google.protobuf.Timestamp + 30, // 8: protobuf.StreamRawBytesRequest.since:type_name -> google.protobuf.Timestamp + 30, // 9: protobuf.StreamRawBytesRequest.until:type_name -> google.protobuf.Timestamp + 32, // 10: protobuf.StreamEventsResponse.event:type_name -> protobuf.ContainerEvent + 33, // 11: protobuf.StreamStatsResponse.stat:type_name -> protobuf.ContainerStat + 34, // 12: protobuf.HostInfoResponse.host:type_name -> protobuf.Host + 29, // 13: protobuf.StreamContainerStartedResponse.container:type_name -> protobuf.Container + 35, // 14: protobuf.ContainerActionRequest.action:type_name -> protobuf.ContainerAction 21, // 15: protobuf.ContainerExecRequest.resize:type_name -> protobuf.ResizePayload 21, // 16: protobuf.ContainerAttachRequest.resize:type_name -> protobuf.ResizePayload - 1, // 17: protobuf.ListContainersRequest.FilterEntry.value:type_name -> protobuf.RepeatedString - 1, // 18: protobuf.FindContainerRequest.FilterEntry.value:type_name -> protobuf.RepeatedString - 0, // 19: protobuf.AgentService.ListContainers:input_type -> protobuf.ListContainersRequest - 3, // 20: protobuf.AgentService.FindContainer:input_type -> protobuf.FindContainerRequest - 5, // 21: protobuf.AgentService.StreamLogs:input_type -> protobuf.StreamLogsRequest - 7, // 22: protobuf.AgentService.LogsBetweenDates:input_type -> protobuf.LogsBetweenDatesRequest - 8, // 23: protobuf.AgentService.StreamRawBytes:input_type -> protobuf.StreamRawBytesRequest - 10, // 24: protobuf.AgentService.StreamEvents:input_type -> protobuf.StreamEventsRequest - 12, // 25: protobuf.AgentService.StreamStats:input_type -> protobuf.StreamStatsRequest - 16, // 26: protobuf.AgentService.StreamContainerStarted:input_type -> protobuf.StreamContainerStartedRequest - 14, // 27: protobuf.AgentService.HostInfo:input_type -> protobuf.HostInfoRequest - 18, // 28: protobuf.AgentService.ContainerAction:input_type -> protobuf.ContainerActionRequest - 20, // 29: protobuf.AgentService.ContainerExec:input_type -> protobuf.ContainerExecRequest - 23, // 30: protobuf.AgentService.ContainerAttach:input_type -> protobuf.ContainerAttachRequest - 2, // 31: protobuf.AgentService.ListContainers:output_type -> protobuf.ListContainersResponse - 4, // 32: protobuf.AgentService.FindContainer:output_type -> protobuf.FindContainerResponse - 6, // 33: protobuf.AgentService.StreamLogs:output_type -> protobuf.StreamLogsResponse - 6, // 34: protobuf.AgentService.LogsBetweenDates:output_type -> protobuf.StreamLogsResponse - 9, // 35: protobuf.AgentService.StreamRawBytes:output_type -> protobuf.StreamRawBytesResponse - 11, // 36: protobuf.AgentService.StreamEvents:output_type -> protobuf.StreamEventsResponse - 13, // 37: protobuf.AgentService.StreamStats:output_type -> protobuf.StreamStatsResponse - 17, // 38: protobuf.AgentService.StreamContainerStarted:output_type -> protobuf.StreamContainerStartedResponse - 15, // 39: protobuf.AgentService.HostInfo:output_type -> protobuf.HostInfoResponse - 19, // 40: protobuf.AgentService.ContainerAction:output_type -> protobuf.ContainerActionResponse - 22, // 41: protobuf.AgentService.ContainerExec:output_type -> protobuf.ContainerExecResponse - 24, // 42: protobuf.AgentService.ContainerAttach:output_type -> protobuf.ContainerAttachResponse - 31, // [31:43] is the sub-list for method output_type - 19, // [19:31] is the sub-list for method input_type - 19, // [19:19] is the sub-list for extension type_name - 19, // [19:19] is the sub-list for extension extendee - 0, // [0:19] is the sub-list for field type_name + 36, // 17: protobuf.UpdateNotificationConfigRequest.subscriptions:type_name -> protobuf.NotificationSubscription + 37, // 18: protobuf.UpdateNotificationConfigRequest.dispatchers:type_name -> protobuf.NotificationDispatcher + 1, // 19: protobuf.ListContainersRequest.FilterEntry.value:type_name -> protobuf.RepeatedString + 1, // 20: protobuf.FindContainerRequest.FilterEntry.value:type_name -> protobuf.RepeatedString + 0, // 21: protobuf.AgentService.ListContainers:input_type -> protobuf.ListContainersRequest + 3, // 22: protobuf.AgentService.FindContainer:input_type -> protobuf.FindContainerRequest + 5, // 23: protobuf.AgentService.StreamLogs:input_type -> protobuf.StreamLogsRequest + 7, // 24: protobuf.AgentService.LogsBetweenDates:input_type -> protobuf.LogsBetweenDatesRequest + 8, // 25: protobuf.AgentService.StreamRawBytes:input_type -> protobuf.StreamRawBytesRequest + 10, // 26: protobuf.AgentService.StreamEvents:input_type -> protobuf.StreamEventsRequest + 12, // 27: protobuf.AgentService.StreamStats:input_type -> protobuf.StreamStatsRequest + 16, // 28: protobuf.AgentService.StreamContainerStarted:input_type -> protobuf.StreamContainerStartedRequest + 14, // 29: protobuf.AgentService.HostInfo:input_type -> protobuf.HostInfoRequest + 18, // 30: protobuf.AgentService.ContainerAction:input_type -> protobuf.ContainerActionRequest + 20, // 31: protobuf.AgentService.ContainerExec:input_type -> protobuf.ContainerExecRequest + 23, // 32: protobuf.AgentService.ContainerAttach:input_type -> protobuf.ContainerAttachRequest + 25, // 33: protobuf.AgentService.UpdateNotificationConfig:input_type -> protobuf.UpdateNotificationConfigRequest + 2, // 34: protobuf.AgentService.ListContainers:output_type -> protobuf.ListContainersResponse + 4, // 35: protobuf.AgentService.FindContainer:output_type -> protobuf.FindContainerResponse + 6, // 36: protobuf.AgentService.StreamLogs:output_type -> protobuf.StreamLogsResponse + 6, // 37: protobuf.AgentService.LogsBetweenDates:output_type -> protobuf.StreamLogsResponse + 9, // 38: protobuf.AgentService.StreamRawBytes:output_type -> protobuf.StreamRawBytesResponse + 11, // 39: protobuf.AgentService.StreamEvents:output_type -> protobuf.StreamEventsResponse + 13, // 40: protobuf.AgentService.StreamStats:output_type -> protobuf.StreamStatsResponse + 17, // 41: protobuf.AgentService.StreamContainerStarted:output_type -> protobuf.StreamContainerStartedResponse + 15, // 42: protobuf.AgentService.HostInfo:output_type -> protobuf.HostInfoResponse + 19, // 43: protobuf.AgentService.ContainerAction:output_type -> protobuf.ContainerActionResponse + 22, // 44: protobuf.AgentService.ContainerExec:output_type -> protobuf.ContainerExecResponse + 24, // 45: protobuf.AgentService.ContainerAttach:output_type -> protobuf.ContainerAttachResponse + 26, // 46: protobuf.AgentService.UpdateNotificationConfig:output_type -> protobuf.UpdateNotificationConfigResponse + 34, // [34:47] is the sub-list for method output_type + 21, // [21:34] is the sub-list for method input_type + 21, // [21:21] is the sub-list for extension type_name + 21, // [21:21] is the sub-list for extension extendee + 0, // [0:21] is the sub-list for field type_name } func init() { file_rpc_proto_init() } @@ -1481,7 +1582,7 @@ func file_rpc_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_rpc_proto_rawDesc), len(file_rpc_proto_rawDesc)), NumEnums: 0, - NumMessages: 27, + NumMessages: 29, NumExtensions: 0, NumServices: 1, }, diff --git a/internal/agent/pb/rpc_grpc.pb.go b/internal/agent/pb/rpc_grpc.pb.go index be0f8441..7a3f8f20 100644 --- a/internal/agent/pb/rpc_grpc.pb.go +++ b/internal/agent/pb/rpc_grpc.pb.go @@ -19,18 +19,19 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - AgentService_ListContainers_FullMethodName = "/protobuf.AgentService/ListContainers" - AgentService_FindContainer_FullMethodName = "/protobuf.AgentService/FindContainer" - AgentService_StreamLogs_FullMethodName = "/protobuf.AgentService/StreamLogs" - AgentService_LogsBetweenDates_FullMethodName = "/protobuf.AgentService/LogsBetweenDates" - AgentService_StreamRawBytes_FullMethodName = "/protobuf.AgentService/StreamRawBytes" - AgentService_StreamEvents_FullMethodName = "/protobuf.AgentService/StreamEvents" - AgentService_StreamStats_FullMethodName = "/protobuf.AgentService/StreamStats" - AgentService_StreamContainerStarted_FullMethodName = "/protobuf.AgentService/StreamContainerStarted" - AgentService_HostInfo_FullMethodName = "/protobuf.AgentService/HostInfo" - AgentService_ContainerAction_FullMethodName = "/protobuf.AgentService/ContainerAction" - AgentService_ContainerExec_FullMethodName = "/protobuf.AgentService/ContainerExec" - AgentService_ContainerAttach_FullMethodName = "/protobuf.AgentService/ContainerAttach" + AgentService_ListContainers_FullMethodName = "/protobuf.AgentService/ListContainers" + AgentService_FindContainer_FullMethodName = "/protobuf.AgentService/FindContainer" + AgentService_StreamLogs_FullMethodName = "/protobuf.AgentService/StreamLogs" + AgentService_LogsBetweenDates_FullMethodName = "/protobuf.AgentService/LogsBetweenDates" + AgentService_StreamRawBytes_FullMethodName = "/protobuf.AgentService/StreamRawBytes" + AgentService_StreamEvents_FullMethodName = "/protobuf.AgentService/StreamEvents" + AgentService_StreamStats_FullMethodName = "/protobuf.AgentService/StreamStats" + AgentService_StreamContainerStarted_FullMethodName = "/protobuf.AgentService/StreamContainerStarted" + AgentService_HostInfo_FullMethodName = "/protobuf.AgentService/HostInfo" + AgentService_ContainerAction_FullMethodName = "/protobuf.AgentService/ContainerAction" + AgentService_ContainerExec_FullMethodName = "/protobuf.AgentService/ContainerExec" + AgentService_ContainerAttach_FullMethodName = "/protobuf.AgentService/ContainerAttach" + AgentService_UpdateNotificationConfig_FullMethodName = "/protobuf.AgentService/UpdateNotificationConfig" ) // AgentServiceClient is the client API for AgentService service. @@ -49,6 +50,7 @@ type AgentServiceClient interface { ContainerAction(ctx context.Context, in *ContainerActionRequest, opts ...grpc.CallOption) (*ContainerActionResponse, error) ContainerExec(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[ContainerExecRequest, ContainerExecResponse], error) ContainerAttach(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[ContainerAttachRequest, ContainerAttachResponse], error) + UpdateNotificationConfig(ctx context.Context, in *UpdateNotificationConfigRequest, opts ...grpc.CallOption) (*UpdateNotificationConfigResponse, error) } type agentServiceClient struct { @@ -239,6 +241,16 @@ func (c *agentServiceClient) ContainerAttach(ctx context.Context, opts ...grpc.C // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. type AgentService_ContainerAttachClient = grpc.BidiStreamingClient[ContainerAttachRequest, ContainerAttachResponse] +func (c *agentServiceClient) UpdateNotificationConfig(ctx context.Context, in *UpdateNotificationConfigRequest, opts ...grpc.CallOption) (*UpdateNotificationConfigResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(UpdateNotificationConfigResponse) + err := c.cc.Invoke(ctx, AgentService_UpdateNotificationConfig_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + // AgentServiceServer is the server API for AgentService service. // All implementations must embed UnimplementedAgentServiceServer // for forward compatibility. @@ -255,6 +267,7 @@ type AgentServiceServer interface { ContainerAction(context.Context, *ContainerActionRequest) (*ContainerActionResponse, error) ContainerExec(grpc.BidiStreamingServer[ContainerExecRequest, ContainerExecResponse]) error ContainerAttach(grpc.BidiStreamingServer[ContainerAttachRequest, ContainerAttachResponse]) error + UpdateNotificationConfig(context.Context, *UpdateNotificationConfigRequest) (*UpdateNotificationConfigResponse, error) mustEmbedUnimplementedAgentServiceServer() } @@ -301,6 +314,9 @@ func (UnimplementedAgentServiceServer) ContainerExec(grpc.BidiStreamingServer[Co func (UnimplementedAgentServiceServer) ContainerAttach(grpc.BidiStreamingServer[ContainerAttachRequest, ContainerAttachResponse]) error { return status.Error(codes.Unimplemented, "method ContainerAttach not implemented") } +func (UnimplementedAgentServiceServer) UpdateNotificationConfig(context.Context, *UpdateNotificationConfigRequest) (*UpdateNotificationConfigResponse, error) { + return nil, status.Error(codes.Unimplemented, "method UpdateNotificationConfig not implemented") +} func (UnimplementedAgentServiceServer) mustEmbedUnimplementedAgentServiceServer() {} func (UnimplementedAgentServiceServer) testEmbeddedByValue() {} @@ -474,6 +490,24 @@ func _AgentService_ContainerAttach_Handler(srv interface{}, stream grpc.ServerSt // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. type AgentService_ContainerAttachServer = grpc.BidiStreamingServer[ContainerAttachRequest, ContainerAttachResponse] +func _AgentService_UpdateNotificationConfig_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(UpdateNotificationConfigRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(AgentServiceServer).UpdateNotificationConfig(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: AgentService_UpdateNotificationConfig_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(AgentServiceServer).UpdateNotificationConfig(ctx, req.(*UpdateNotificationConfigRequest)) + } + return interceptor(ctx, in, info, handler) +} + // AgentService_ServiceDesc is the grpc.ServiceDesc for AgentService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -497,6 +531,10 @@ var AgentService_ServiceDesc = grpc.ServiceDesc{ MethodName: "ContainerAction", Handler: _AgentService_ContainerAction_Handler, }, + { + MethodName: "UpdateNotificationConfig", + Handler: _AgentService_UpdateNotificationConfig_Handler, + }, }, Streams: []grpc.StreamDesc{ { diff --git a/internal/agent/pb/types.pb.go b/internal/agent/pb/types.pb.go index 0041ffac..11943fad 100644 --- a/internal/agent/pb/types.pb.go +++ b/internal/agent/pb/types.pb.go @@ -820,6 +820,166 @@ func (x *Host) GetDockerVersion() string { return "" } +type NotificationSubscription struct { + state protoimpl.MessageState `protogen:"open.v1"` + Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + Enabled bool `protobuf:"varint,3,opt,name=enabled,proto3" json:"enabled,omitempty"` + DispatcherId int32 `protobuf:"varint,4,opt,name=dispatcherId,proto3" json:"dispatcherId,omitempty"` + LogExpression string `protobuf:"bytes,5,opt,name=logExpression,proto3" json:"logExpression,omitempty"` + ContainerExpression string `protobuf:"bytes,6,opt,name=containerExpression,proto3" json:"containerExpression,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *NotificationSubscription) Reset() { + *x = NotificationSubscription{} + mi := &file_types_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *NotificationSubscription) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*NotificationSubscription) ProtoMessage() {} + +func (x *NotificationSubscription) ProtoReflect() protoreflect.Message { + mi := &file_types_proto_msgTypes[9] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use NotificationSubscription.ProtoReflect.Descriptor instead. +func (*NotificationSubscription) Descriptor() ([]byte, []int) { + return file_types_proto_rawDescGZIP(), []int{9} +} + +func (x *NotificationSubscription) GetId() int32 { + if x != nil { + return x.Id + } + return 0 +} + +func (x *NotificationSubscription) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *NotificationSubscription) GetEnabled() bool { + if x != nil { + return x.Enabled + } + return false +} + +func (x *NotificationSubscription) GetDispatcherId() int32 { + if x != nil { + return x.DispatcherId + } + return 0 +} + +func (x *NotificationSubscription) GetLogExpression() string { + if x != nil { + return x.LogExpression + } + return "" +} + +func (x *NotificationSubscription) GetContainerExpression() string { + if x != nil { + return x.ContainerExpression + } + return "" +} + +type NotificationDispatcher struct { + state protoimpl.MessageState `protogen:"open.v1"` + Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + Type string `protobuf:"bytes,3,opt,name=type,proto3" json:"type,omitempty"` + Url string `protobuf:"bytes,4,opt,name=url,proto3" json:"url,omitempty"` + Template string `protobuf:"bytes,5,opt,name=template,proto3" json:"template,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *NotificationDispatcher) Reset() { + *x = NotificationDispatcher{} + mi := &file_types_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *NotificationDispatcher) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*NotificationDispatcher) ProtoMessage() {} + +func (x *NotificationDispatcher) ProtoReflect() protoreflect.Message { + mi := &file_types_proto_msgTypes[10] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use NotificationDispatcher.ProtoReflect.Descriptor instead. +func (*NotificationDispatcher) Descriptor() ([]byte, []int) { + return file_types_proto_rawDescGZIP(), []int{10} +} + +func (x *NotificationDispatcher) GetId() int32 { + if x != nil { + return x.Id + } + return 0 +} + +func (x *NotificationDispatcher) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *NotificationDispatcher) GetType() string { + if x != nil { + return x.Type + } + return "" +} + +func (x *NotificationDispatcher) GetUrl() string { + if x != nil { + return x.Url + } + return "" +} + +func (x *NotificationDispatcher) GetTemplate() string { + if x != nil { + return x.Template + } + return "" +} + var File_types_proto protoreflect.FileDescriptor const file_types_proto_rawDesc = "" + @@ -898,7 +1058,20 @@ const file_types_proto_rawDesc = "" + "\rdockerVersion\x18\f \x01(\tR\rdockerVersion\x1a9\n" + "\vLabelsEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + - "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01*3\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\xd4\x01\n" + + "\x18NotificationSubscription\x12\x0e\n" + + "\x02id\x18\x01 \x01(\x05R\x02id\x12\x12\n" + + "\x04name\x18\x02 \x01(\tR\x04name\x12\x18\n" + + "\aenabled\x18\x03 \x01(\bR\aenabled\x12\"\n" + + "\fdispatcherId\x18\x04 \x01(\x05R\fdispatcherId\x12$\n" + + "\rlogExpression\x18\x05 \x01(\tR\rlogExpression\x120\n" + + "\x13containerExpression\x18\x06 \x01(\tR\x13containerExpression\"~\n" + + "\x16NotificationDispatcher\x12\x0e\n" + + "\x02id\x18\x01 \x01(\x05R\x02id\x12\x12\n" + + "\x04name\x18\x02 \x01(\tR\x04name\x12\x12\n" + + "\x04type\x18\x03 \x01(\tR\x04type\x12\x10\n" + + "\x03url\x18\x04 \x01(\tR\x03url\x12\x1a\n" + + "\btemplate\x18\x05 \x01(\tR\btemplate*3\n" + "\x0fContainerAction\x12\t\n" + "\x05Start\x10\x00\x12\b\n" + "\x04Stop\x10\x01\x12\v\n" + @@ -917,34 +1090,36 @@ func file_types_proto_rawDescGZIP() []byte { } var file_types_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_types_proto_msgTypes = make([]protoimpl.MessageInfo, 11) +var file_types_proto_msgTypes = make([]protoimpl.MessageInfo, 13) var file_types_proto_goTypes = []any{ - (ContainerAction)(0), // 0: protobuf.ContainerAction - (*Container)(nil), // 1: protobuf.Container - (*ContainerStat)(nil), // 2: protobuf.ContainerStat - (*LogFragment)(nil), // 3: protobuf.LogFragment - (*LogEvent)(nil), // 4: protobuf.LogEvent - (*SingleMessage)(nil), // 5: protobuf.SingleMessage - (*GroupMessage)(nil), // 6: protobuf.GroupMessage - (*ComplexMessage)(nil), // 7: protobuf.ComplexMessage - (*ContainerEvent)(nil), // 8: protobuf.ContainerEvent - (*Host)(nil), // 9: protobuf.Host - nil, // 10: protobuf.Container.LabelsEntry - nil, // 11: protobuf.Host.LabelsEntry - (*timestamppb.Timestamp)(nil), // 12: google.protobuf.Timestamp - (*anypb.Any)(nil), // 13: google.protobuf.Any + (ContainerAction)(0), // 0: protobuf.ContainerAction + (*Container)(nil), // 1: protobuf.Container + (*ContainerStat)(nil), // 2: protobuf.ContainerStat + (*LogFragment)(nil), // 3: protobuf.LogFragment + (*LogEvent)(nil), // 4: protobuf.LogEvent + (*SingleMessage)(nil), // 5: protobuf.SingleMessage + (*GroupMessage)(nil), // 6: protobuf.GroupMessage + (*ComplexMessage)(nil), // 7: protobuf.ComplexMessage + (*ContainerEvent)(nil), // 8: protobuf.ContainerEvent + (*Host)(nil), // 9: protobuf.Host + (*NotificationSubscription)(nil), // 10: protobuf.NotificationSubscription + (*NotificationDispatcher)(nil), // 11: protobuf.NotificationDispatcher + nil, // 12: protobuf.Container.LabelsEntry + nil, // 13: protobuf.Host.LabelsEntry + (*timestamppb.Timestamp)(nil), // 14: google.protobuf.Timestamp + (*anypb.Any)(nil), // 15: google.protobuf.Any } var file_types_proto_depIdxs = []int32{ - 12, // 0: protobuf.Container.created:type_name -> google.protobuf.Timestamp - 12, // 1: protobuf.Container.started:type_name -> google.protobuf.Timestamp - 10, // 2: protobuf.Container.labels:type_name -> protobuf.Container.LabelsEntry + 14, // 0: protobuf.Container.created:type_name -> google.protobuf.Timestamp + 14, // 1: protobuf.Container.started:type_name -> google.protobuf.Timestamp + 12, // 2: protobuf.Container.labels:type_name -> protobuf.Container.LabelsEntry 2, // 3: protobuf.Container.stats:type_name -> protobuf.ContainerStat - 12, // 4: protobuf.Container.finished:type_name -> google.protobuf.Timestamp - 13, // 5: protobuf.LogEvent.message:type_name -> google.protobuf.Any - 12, // 6: protobuf.LogEvent.timestamp:type_name -> google.protobuf.Timestamp + 14, // 4: protobuf.Container.finished:type_name -> google.protobuf.Timestamp + 15, // 5: protobuf.LogEvent.message:type_name -> google.protobuf.Any + 14, // 6: protobuf.LogEvent.timestamp:type_name -> google.protobuf.Timestamp 3, // 7: protobuf.GroupMessage.fragments:type_name -> protobuf.LogFragment - 12, // 8: protobuf.ContainerEvent.timestamp:type_name -> google.protobuf.Timestamp - 11, // 9: protobuf.Host.labels:type_name -> protobuf.Host.LabelsEntry + 14, // 8: protobuf.ContainerEvent.timestamp:type_name -> google.protobuf.Timestamp + 13, // 9: protobuf.Host.labels:type_name -> protobuf.Host.LabelsEntry 10, // [10:10] is the sub-list for method output_type 10, // [10:10] is the sub-list for method input_type 10, // [10:10] is the sub-list for extension type_name @@ -963,7 +1138,7 @@ func file_types_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_types_proto_rawDesc), len(file_types_proto_rawDesc)), NumEnums: 1, - NumMessages: 11, + NumMessages: 13, NumExtensions: 0, NumServices: 0, }, diff --git a/internal/agent/server.go b/internal/agent/server.go index 3beec0e4..90dcb675 100644 --- a/internal/agent/server.go +++ b/internal/agent/server.go @@ -6,7 +6,7 @@ import ( "crypto/tls" "crypto/x509" "fmt" - "sync" + "io" "encoding/json" @@ -14,7 +14,7 @@ import ( "github.com/amir20/dozzle/internal/agent/pb" "github.com/amir20/dozzle/internal/container" - "github.com/amir20/dozzle/internal/docker" + "github.com/amir20/dozzle/types" "github.com/rs/zerolog/log" orderedmap "github.com/wk8/go-ordered-map/v2" "google.golang.org/grpc" @@ -26,21 +26,40 @@ import ( "google.golang.org/grpc/status" ) +// NotificationConfigHandler handles notification config updates received from the main server +type NotificationConfigHandler interface { + HandleNotificationConfig(subscriptions []types.SubscriptionConfig, dispatchers []types.DispatcherConfig) error +} + +// ClientService is the interface for container operations used by the agent server +type ClientService interface { + FindContainer(ctx context.Context, id string, labels container.ContainerLabels) (container.Container, error) + ListContainers(ctx context.Context, filter container.ContainerLabels) ([]container.Container, error) + Host(ctx context.Context) (container.Host, error) + ContainerAction(ctx context.Context, container container.Container, action container.ContainerAction) error + LogsBetweenDates(ctx context.Context, container container.Container, from time.Time, to time.Time, stdTypes container.StdType) (<-chan *container.LogEvent, error) + RawLogs(ctx context.Context, container container.Container, from time.Time, to time.Time, stdTypes container.StdType) (io.ReadCloser, error) + SubscribeStats(context.Context, chan<- container.ContainerStat) + SubscribeEvents(context.Context, chan<- container.ContainerEvent) + SubscribeContainersStarted(context.Context, chan<- container.Container) + StreamLogs(context.Context, container.Container, time.Time, container.StdType, chan<- *container.LogEvent) error + Attach(context.Context, container.Container, container.ExecEventReader, io.Writer) error + Exec(context.Context, container.Container, []string, container.ExecEventReader, io.Writer) error +} + type server struct { - client container.Client - store *container.ContainerStore - version string + service ClientService + version string + notificationConfigHandler NotificationConfigHandler pb.UnimplementedAgentServiceServer } -func newServer(client container.Client, dozzleVersion string, labels container.ContainerLabels) pb.AgentServiceServer { - statsCollector := docker.NewDockerStatsCollector(client, labels) +func newServer(service ClientService, dozzleVersion string, notificationHandler NotificationConfigHandler) pb.AgentServiceServer { return &server{ - client: client, - version: dozzleVersion, - - store: container.NewContainerStore(context.Background(), client, statsCollector, labels), + service: service, + version: dozzleVersion, + notificationConfigHandler: notificationHandler, } } @@ -50,20 +69,18 @@ func (s *server) StreamLogs(in *pb.StreamLogsRequest, out pb.AgentService_Stream since = in.Since.AsTime() } - c, err := s.store.FindContainer(in.ContainerId, container.ContainerLabels{}) + c, err := s.service.FindContainer(out.Context(), in.ContainerId, container.ContainerLabels{}) if err != nil { return err } - reader, err := s.client.ContainerLogs(out.Context(), in.ContainerId, since, container.StdType(in.StreamTypes)) - if err != nil { - return err - } + events := make(chan *container.LogEvent) + go func() { + defer close(events) + s.service.StreamLogs(out.Context(), c, since, container.StdType(in.StreamTypes), events) + }() - dockerReader := docker.NewLogReader(reader, c.Tty) - g := container.NewEventGenerator(out.Context(), dockerReader, c) - - for event := range g.Events { + for event := range events { if event != nil { out.Send(&pb.StreamLogsResponse{ Event: logEventToPb(event), @@ -71,31 +88,23 @@ func (s *server) StreamLogs(in *pb.StreamLogsRequest, out pb.AgentService_Stream } } - select { - case e := <-g.Errors: - return e - default: - return nil - } + return nil } func (s *server) LogsBetweenDates(in *pb.LogsBetweenDatesRequest, out pb.AgentService_LogsBetweenDatesServer) error { - reader, err := s.client.ContainerLogsBetweenDates(out.Context(), in.ContainerId, in.Since.AsTime(), in.Until.AsTime(), container.StdType(in.StreamTypes)) + c, err := s.service.FindContainer(out.Context(), in.ContainerId, container.ContainerLabels{}) if err != nil { return err } - c, err := s.client.FindContainer(out.Context(), in.ContainerId) + events, err := s.service.LogsBetweenDates(out.Context(), c, in.Since.AsTime(), in.Until.AsTime(), container.StdType(in.StreamTypes)) if err != nil { return err } - dockerReader := docker.NewLogReader(reader, c.Tty) - g := container.NewEventGenerator(out.Context(), dockerReader, c) - for { select { - case event, ok := <-g.Events: + case event, ok := <-events: if !ok { // Channel closed, exit cleanly return nil @@ -103,8 +112,6 @@ func (s *server) LogsBetweenDates(in *pb.LogsBetweenDatesRequest, out pb.AgentSe out.Send(&pb.StreamLogsResponse{ Event: logEventToPb(event), }) - case e := <-g.Errors: - return e case <-out.Context().Done(): return nil } @@ -112,16 +119,24 @@ func (s *server) LogsBetweenDates(in *pb.LogsBetweenDatesRequest, out pb.AgentSe } func (s *server) StreamRawBytes(in *pb.StreamRawBytesRequest, out pb.AgentService_StreamRawBytesServer) error { - reader, err := s.client.ContainerLogsBetweenDates(out.Context(), in.ContainerId, in.Since.AsTime(), in.Until.AsTime(), container.StdType(in.StreamTypes)) - + c, err := s.service.FindContainer(out.Context(), in.ContainerId, container.ContainerLabels{}) if err != nil { return err } + reader, err := s.service.RawLogs(out.Context(), c, in.Since.AsTime(), in.Until.AsTime(), container.StdType(in.StreamTypes)) + if err != nil { + return err + } + defer reader.Close() + buf := make([]byte, 1024) for { n, err := reader.Read(buf) if err != nil { + if err == io.EOF { + return nil + } return err } @@ -142,7 +157,7 @@ func (s *server) StreamRawBytes(in *pb.StreamRawBytesRequest, out pb.AgentServic func (s *server) StreamEvents(in *pb.StreamEventsRequest, out pb.AgentService_StreamEventsServer) error { events := make(chan container.ContainerEvent) - s.store.SubscribeEvents(out.Context(), events) + s.service.SubscribeEvents(out.Context(), events) for { select { @@ -164,7 +179,7 @@ func (s *server) StreamEvents(in *pb.StreamEventsRequest, out pb.AgentService_St func (s *server) StreamStats(in *pb.StreamStatsRequest, out pb.AgentService_StreamStatsServer) error { stats := make(chan container.ContainerStat) - s.store.SubscribeStats(out.Context(), stats) + s.service.SubscribeStats(out.Context(), stats) for { select { @@ -193,13 +208,13 @@ func (s *server) FindContainer(ctx context.Context, in *pb.FindContainerRequest) } } - container, err := s.store.FindContainer(in.ContainerId, labels) + c, err := s.service.FindContainer(ctx, in.ContainerId, labels) if err != nil { return nil, status.Error(codes.NotFound, err.Error()) } - c := container.ToProto() + proto := c.ToProto() return &pb.FindContainerResponse{ - Container: &c, + Container: &proto, }, nil } @@ -211,15 +226,15 @@ func (s *server) ListContainers(ctx context.Context, in *pb.ListContainersReques } } - containers, err := s.store.ListContainers(labels) + containers, err := s.service.ListContainers(ctx, labels) if err != nil { return nil, err } var pbContainers []*pb.Container - for _, container := range containers { - c := container.ToProto() - pbContainers = append(pbContainers, &c) + for _, c := range containers { + proto := c.ToProto() + pbContainers = append(pbContainers, &proto) } return &pb.ListContainersResponse{ @@ -228,7 +243,10 @@ func (s *server) ListContainers(ctx context.Context, in *pb.ListContainersReques } func (s *server) HostInfo(ctx context.Context, in *pb.HostInfoRequest) (*pb.HostInfoResponse, error) { - host := s.client.Host() + host, err := s.service.Host(ctx) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } return &pb.HostInfoResponse{ Host: &pb.Host{ Id: host.ID, @@ -244,7 +262,7 @@ func (s *server) HostInfo(ctx context.Context, in *pb.HostInfoRequest) (*pb.Host func (s *server) StreamContainerStarted(in *pb.StreamContainerStartedRequest, out pb.AgentService_StreamContainerStartedServer) error { containers := make(chan container.Container) - go s.store.SubscribeNewContainers(out.Context(), containers) + go s.service.SubscribeContainersStarted(out.Context(), containers) for { select { @@ -275,8 +293,12 @@ func (s *server) ContainerAction(ctx context.Context, in *pb.ContainerActionRequ return nil, status.Error(codes.InvalidArgument, "invalid action") } - err := s.client.ContainerActions(ctx, action, in.ContainerId) + c, err := s.service.FindContainer(ctx, in.ContainerId, container.ContainerLabels{}) + if err != nil { + return nil, status.Error(codes.NotFound, err.Error()) + } + err = s.service.ContainerAction(ctx, c, action) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -284,62 +306,62 @@ func (s *server) ContainerAction(ctx context.Context, in *pb.ContainerActionRequ return &pb.ContainerActionResponse{}, nil } +// terminalMessage represents a message from a terminal gRPC stream (exec or attach) +type terminalMessage interface { + GetStdin() []byte + GetResize() *pb.ResizePayload +} + +// protoEventReader converts gRPC protobuf messages directly to ExecEvents (no JSON) +type protoEventReader struct { + recv func() (terminalMessage, error) +} + +func (r *protoEventReader) ReadEvent() (*container.ExecEvent, error) { + msg, err := r.recv() + if err != nil { + return nil, err + } + + if stdin := msg.GetStdin(); stdin != nil { + return &container.ExecEvent{Type: "userinput", Data: string(stdin)}, nil + } else if resize := msg.GetResize(); resize != nil { + return &container.ExecEvent{Type: "resize", Width: uint(resize.Width), Height: uint(resize.Height)}, nil + } + + // Skip unknown message types + return r.ReadEvent() +} + +// terminalStreamWriter adapts a gRPC terminal stream to io.Writer +type terminalStreamWriter struct { + send func([]byte) error +} + +func (w *terminalStreamWriter) Write(p []byte) (int, error) { + if err := w.send(p); err != nil { + return 0, err + } + return len(p), nil +} + func (s *server) ContainerExec(stream pb.AgentService_ContainerExecServer) error { request, err := stream.Recv() if err != nil { return status.Error(codes.Internal, err.Error()) } - cancelCtx, cancel := context.WithCancel(stream.Context()) - session, err := s.client.ContainerExec(cancelCtx, request.ContainerId, request.Command) + c, err := s.service.FindContainer(stream.Context(), request.ContainerId, container.ContainerLabels{}) if err != nil { - cancel() - return status.Error(codes.Internal, err.Error()) + return status.Error(codes.NotFound, err.Error()) } - var wg sync.WaitGroup + reader := &protoEventReader{recv: func() (terminalMessage, error) { return stream.Recv() }} + writer := &terminalStreamWriter{send: func(p []byte) error { return stream.Send(&pb.ContainerExecResponse{Stdout: p}) }} - // Read from container and send to client - wg.Go(func() { - defer cancel() - buffer := make([]byte, 1024) - for { - n, err := session.Reader.Read(buffer) - if err != nil { - return - } - - if err := stream.Send(&pb.ContainerExecResponse{Stdout: buffer[:n]}); err != nil { - return - } - } - }) - - // Read from client stream and handle stdin/resize - wg.Go(func() { - defer cancel() - defer session.Writer.Close() - for { - req, err := stream.Recv() - if err != nil { - return - } - - switch payload := req.Payload.(type) { - case *pb.ContainerExecRequest_Stdin: - if _, err := session.Writer.Write(payload.Stdin); err != nil { - log.Error().Err(err).Msg("error writing stdin to container") - return - } - case *pb.ContainerExecRequest_Resize: - if err := session.Resize(uint(payload.Resize.Width), uint(payload.Resize.Height)); err != nil { - log.Error().Err(err).Msg("error resizing terminal") - } - } - } - }) - - wg.Wait() + if err := s.service.Exec(stream.Context(), c, request.Command, reader, writer); err != nil { + return status.Error(codes.Internal, err.Error()) + } return nil } @@ -350,61 +372,73 @@ func (s *server) ContainerAttach(stream pb.AgentService_ContainerAttachServer) e return status.Error(codes.Internal, err.Error()) } - cancelCtx, cancel := context.WithCancel(stream.Context()) - session, err := s.client.ContainerAttach(cancelCtx, request.ContainerId) + c, err := s.service.FindContainer(stream.Context(), request.ContainerId, container.ContainerLabels{}) if err != nil { - cancel() - return status.Error(codes.Internal, err.Error()) + return status.Error(codes.NotFound, err.Error()) } - var wg sync.WaitGroup + reader := &protoEventReader{recv: func() (terminalMessage, error) { return stream.Recv() }} + writer := &terminalStreamWriter{send: func(p []byte) error { return stream.Send(&pb.ContainerAttachResponse{Stdout: p}) }} - // Read from container and send to client - wg.Go(func() { - defer cancel() - buffer := make([]byte, 1024) - for { - n, err := session.Reader.Read(buffer) - if err != nil { - return - } - - if err := stream.Send(&pb.ContainerAttachResponse{Stdout: buffer[:n]}); err != nil { - return - } - } - }) - - // Read from client stream and handle stdin/resize - wg.Go(func() { - defer cancel() - defer session.Writer.Close() - for { - req, err := stream.Recv() - if err != nil { - return - } - - switch payload := req.Payload.(type) { - case *pb.ContainerAttachRequest_Stdin: - if _, err := session.Writer.Write(payload.Stdin); err != nil { - log.Error().Err(err).Msg("error writing stdin to container") - return - } - case *pb.ContainerAttachRequest_Resize: - if err := session.Resize(uint(payload.Resize.Width), uint(payload.Resize.Height)); err != nil { - log.Error().Err(err).Msg("error resizing terminal") - } - } - } - }) - - wg.Wait() + if err := s.service.Attach(stream.Context(), c, reader, writer); err != nil { + return status.Error(codes.Internal, err.Error()) + } return nil } -func NewServer(client container.Client, certificates tls.Certificate, dozzleVersion string, labels container.ContainerLabels) (*grpc.Server, error) { +func (s *server) UpdateNotificationConfig(ctx context.Context, req *pb.UpdateNotificationConfigRequest) (*pb.UpdateNotificationConfigResponse, error) { + if s.notificationConfigHandler == nil { + log.Warn().Msg("No notification config handler registered, ignoring config update") + return &pb.UpdateNotificationConfigResponse{}, nil + } + + // Validate request sizes to prevent memory exhaustion + const maxSubscriptions = 1000 + const maxDispatchers = 100 + if len(req.Subscriptions) > maxSubscriptions { + return nil, status.Errorf(codes.InvalidArgument, "too many subscriptions: %d (max %d)", len(req.Subscriptions), maxSubscriptions) + } + if len(req.Dispatchers) > maxDispatchers { + return nil, status.Errorf(codes.InvalidArgument, "too many dispatchers: %d (max %d)", len(req.Dispatchers), maxDispatchers) + } + + // Convert proto subscriptions to types + subscriptions := make([]types.SubscriptionConfig, len(req.Subscriptions)) + for i, sub := range req.Subscriptions { + subscriptions[i] = types.SubscriptionConfig{ + ID: int(sub.Id), + Name: sub.Name, + Enabled: sub.Enabled, + DispatcherID: int(sub.DispatcherId), + LogExpression: sub.LogExpression, + ContainerExpression: sub.ContainerExpression, + } + } + + // Convert proto dispatchers to types + dispatchers := make([]types.DispatcherConfig, len(req.Dispatchers)) + for i, d := range req.Dispatchers { + dispatchers[i] = types.DispatcherConfig{ + ID: int(d.Id), + Name: d.Name, + Type: d.Type, + URL: d.Url, + Template: d.Template, + } + } + + // Call the handler (handler is responsible for persisting if needed) + if err := s.notificationConfigHandler.HandleNotificationConfig(subscriptions, dispatchers); err != nil { + log.Error().Err(err).Msg("Failed to handle notification config") + return nil, status.Error(codes.Internal, err.Error()) + } + + log.Info().Int("subscriptions", len(subscriptions)).Int("dispatchers", len(dispatchers)).Msg("Updated notification config from main server") + return &pb.UpdateNotificationConfigResponse{}, nil +} + +func NewServer(service ClientService, certificates tls.Certificate, dozzleVersion string, notificationHandler NotificationConfigHandler) (*grpc.Server, error) { caCertPool := x509.NewCertPool() c, err := x509.ParseCertificate(certificates.Certificate[0]) if err != nil { @@ -423,7 +457,7 @@ func NewServer(client container.Client, certificates tls.Certificate, dozzleVers creds := credentials.NewTLS(tlsConfig) grpcServer := grpc.NewServer(grpc.Creds(creds)) - pb.RegisterAgentServiceServer(grpcServer, newServer(client, dozzleVersion, labels)) + pb.RegisterAgentServiceServer(grpcServer, newServer(service, dozzleVersion, notificationHandler)) return grpcServer, nil } diff --git a/internal/container/client.go b/internal/container/client.go index 7cd3523e..2acffb5c 100644 --- a/internal/container/client.go +++ b/internal/container/client.go @@ -41,6 +41,11 @@ type ExecEvent struct { Height uint `json:"height,omitempty"` } +// ExecEventReader provides structured exec events (userinput, resize) +type ExecEventReader interface { + ReadEvent() (*ExecEvent, error) +} + type Client interface { ListContainers(context.Context, ContainerLabels) ([]Container, error) FindContainer(context.Context, string) (Container, error) diff --git a/internal/notification/log_listener.go b/internal/notification/log_listener.go index d4adcaea..17a7b04d 100644 --- a/internal/notification/log_listener.go +++ b/internal/notification/log_listener.go @@ -122,7 +122,7 @@ func (l *ContainerLogListener) startListening(c container.Container, client cont go func() { log.Debug().Str("containerID", c.ID).Str("name", c.Name).Msg("Started listening to container") - if err := client.StreamLogs(streamCtx, c, time.Now(), container.STDALL, l.logChannel); err != nil && !errors.Is(err, io.EOF) { + if err := client.StreamLogs(streamCtx, c, time.Now(), container.STDALL, l.logChannel); err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) { log.Error().Err(err).Str("containerID", c.ID).Msg("Error streaming logs") } }() diff --git a/internal/notification/manager.go b/internal/notification/manager.go index abcbda28..07d117a6 100644 --- a/internal/notification/manager.go +++ b/internal/notification/manager.go @@ -451,6 +451,79 @@ func (m *Manager) LoadConfig(r io.Reader) error { return nil } +// HandleNotificationConfig implements agent.NotificationConfigHandler interface +// It atomically replaces all subscriptions and dispatchers with new state from the main server +func (m *Manager) HandleNotificationConfig(subscriptions []types.SubscriptionConfig, dispatchers []types.DispatcherConfig) error { + // Clear existing state (with nil checks for defensive programming) + if m.subscriptions != nil { + m.subscriptions.Clear() + } else { + m.subscriptions = xsync.NewMap[int, *Subscription]() + } + if m.dispatchers != nil { + m.dispatchers.Clear() + } else { + m.dispatchers = xsync.NewMap[int, dispatcher.Dispatcher]() + } + + // Find max IDs to initialize counters + var maxSubID, maxDispatcherID int + for _, sub := range subscriptions { + if sub.ID > maxSubID { + maxSubID = sub.ID + } + } + for _, d := range dispatchers { + if d.ID > maxDispatcherID { + maxDispatcherID = d.ID + } + } + m.subscriptionCounter.Store(int32(maxSubID)) + m.dispatcherCounter.Store(int32(maxDispatcherID)) + + // Load subscriptions (convert from types.SubscriptionConfig to Subscription) + for _, sub := range subscriptions { + s := &Subscription{ + ID: sub.ID, + Name: sub.Name, + Enabled: sub.Enabled, + DispatcherID: sub.DispatcherID, + LogExpression: sub.LogExpression, + ContainerExpression: sub.ContainerExpression, + } + if err := m.loadSubscription(s); err != nil { + return fmt.Errorf("failed to load subscription %s: %w", sub.Name, err) + } + } + + // Load dispatchers + for _, dispatcherConfig := range dispatchers { + var d dispatcher.Dispatcher + switch dispatcherConfig.Type { + case "webhook": + webhook, err := dispatcher.NewWebhookDispatcher(dispatcherConfig.Name, dispatcherConfig.URL, dispatcherConfig.Template) + if err != nil { + return fmt.Errorf("failed to create webhook dispatcher %s: %w", dispatcherConfig.Name, err) + } + d = webhook + default: + return fmt.Errorf("unknown dispatcher type: %s", dispatcherConfig.Type) + } + m.dispatchers.Store(dispatcherConfig.ID, d) + log.Debug().Int("id", dispatcherConfig.ID).Msg("Loaded dispatcher from state sync") + } + + // Update listener to start/stop streams based on new subscriptions + if m.listener != nil { + if err := m.listener.UpdateStreams(); err != nil { + return fmt.Errorf("failed to update listener streams: %w", err) + } + } + + log.Debug().Int("subscriptions", len(subscriptions)).Int("dispatchers", len(dispatchers)).Msg("Replaced notification state") + return nil +} + // loadSubscription loads a subscription with its existing ID (used when loading from config) func (m *Manager) loadSubscription(sub *Subscription) error { // Compile container expression if provided diff --git a/internal/notification/types.go b/internal/notification/types.go index d5e15800..0020afa5 100644 --- a/internal/notification/types.go +++ b/internal/notification/types.go @@ -149,7 +149,9 @@ func (s *Subscription) MatchesLog(l types.NotificationLog) bool { result, err := expr.Run(s.LogProgram, l) if err != nil { - log.Warn().Err(err).Str("expression", s.LogExpression).Msg("log expression evaluation error") + // Type mismatches are expected when expression doesn't match log type + // e.g., "message contains X" on JSON logs or "message.field" on string logs + log.Debug().Err(err).Str("expression", s.LogExpression).Msg("log expression evaluation error") return false } diff --git a/internal/support/cli/agent_command.go b/internal/support/cli/agent_command.go index d2417657..3e8ab36e 100644 --- a/internal/support/cli/agent_command.go +++ b/internal/support/cli/agent_command.go @@ -12,6 +12,10 @@ import ( "github.com/amir20/dozzle/internal/agent" "github.com/amir20/dozzle/internal/docker" + "github.com/amir20/dozzle/internal/notification" + container_support "github.com/amir20/dozzle/internal/support/container" + docker_support "github.com/amir20/dozzle/internal/support/docker" + "github.com/amir20/dozzle/types" "github.com/rs/zerolog/log" ) @@ -19,6 +23,37 @@ type AgentCmd struct { Addr string `arg:"--agent-addr,env:DOZZLE_AGENT_ADDR" default:":7007" help:"sets the host:port to bind for the agent"` } +// persistingNotificationHandler wraps a notification manager and saves config to disk after updates +type persistingNotificationHandler struct { + manager *notification.Manager + configPath string +} + +func (h *persistingNotificationHandler) HandleNotificationConfig(subscriptions []types.SubscriptionConfig, dispatchers []types.DispatcherConfig) error { + // Update the manager + if err := h.manager.HandleNotificationConfig(subscriptions, dispatchers); err != nil { + return err + } + + // Save to disk + if err := os.MkdirAll("./data", 0755); err != nil { + return fmt.Errorf("failed to create data directory: %w", err) + } + + file, err := os.Create(h.configPath) + if err != nil { + return fmt.Errorf("failed to create config file: %w", err) + } + defer file.Close() + + if err := h.manager.WriteConfig(file); err != nil { + return fmt.Errorf("failed to save config: %w", err) + } + + log.Debug().Str("path", h.configPath).Msg("Saved notification config to disk") + return nil +} + func (a *AgentCmd) Run(args Args, embeddedCerts embed.FS) error { if args.Mode != "server" { return fmt.Errorf("agent command is only available in server mode") @@ -43,12 +78,43 @@ func (a *AgentCmd) Run(args Args, embeddedCerts embed.FS) error { io.WriteString(tempFile, listener.Addr().String()) log.Debug().Str("file", tempFile.Name()).Msg("Created temp file") go StartEvent(args, "", client, "agent") - server, err := agent.NewServer(client, certs, args.Version(), args.Filter) + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + // Create shared client service (single ContainerStore for both agent server and notifications) + clientService := docker_support.NewDockerClientService(client, args.Filter) + + // Create notification manager using the shared client service + const notificationConfigPath = "./data/notifications.yml" + clients := []container_support.ClientService{clientService} + notificationManager := notification.NewManager(notification.NewContainerLogListener(ctx, clients)) + + // Load existing notification config if available + if file, err := os.Open(notificationConfigPath); err == nil { + if err := notificationManager.LoadConfig(file); err != nil { + log.Warn().Err(err).Msg("Failed to load notification config, starting fresh") + } else { + log.Info().Str("path", notificationConfigPath).Msg("Loaded notification config from disk") + } + file.Close() + } + + if err := notificationManager.Start(); err != nil { + return fmt.Errorf("failed to start notification manager: %w", err) + } + + // Create handler that wraps manager and persists config to disk + notificationHandler := &persistingNotificationHandler{ + manager: notificationManager, + configPath: notificationConfigPath, + } + + // Create agent server using the same shared client service + server, err := agent.NewServer(clientService, certs, args.Version(), notificationHandler) if err != nil { return fmt.Errorf("failed to create agent server: %w", err) } - ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) - defer stop() go func() { log.Info().Msgf("Dozzle agent version %s", args.Version()) log.Info().Msgf("Agent listening on %s", listener.Addr().String()) diff --git a/internal/support/container/agent_service.go b/internal/support/container/agent_service.go index 4ac42fa5..718ff002 100644 --- a/internal/support/container/agent_service.go +++ b/internal/support/container/agent_service.go @@ -2,21 +2,20 @@ package container_support import ( "context" - "encoding/json" "io" - "sync" + "sync/atomic" "time" "github.com/amir20/dozzle/internal/agent" "github.com/amir20/dozzle/internal/container" - "github.com/docker/docker/pkg/stdcopy" + "github.com/amir20/dozzle/types" "github.com/rs/zerolog/log" ) type agentService struct { client *agent.Client - host container.Host + host atomic.Pointer[container.Host] } func NewAgentService(client *agent.Client) ClientService { @@ -49,13 +48,16 @@ func (a *agentService) ListContainers(ctx context.Context, labels container.Cont func (a *agentService) Host(ctx context.Context) (container.Host, error) { host, err := a.client.Host(ctx) if err != nil { - host := a.host - host.Available = false - return host, err + if cached := a.host.Load(); cached != nil { + h := *cached + h.Available = false + return h, err + } + return container.Host{Available: false}, err } - a.host = host - return a.host, err + a.host.Store(&host) + return host, nil } func (a *agentService) SubscribeStats(ctx context.Context, stats chan<- container.ContainerStat) { @@ -74,57 +76,14 @@ func (a *agentService) ContainerAction(ctx context.Context, container container. return a.client.ContainerAction(ctx, container.ID, action) } -func (a *agentService) Attach(ctx context.Context, container container.Container, stdin io.Reader, stdout io.Writer) error { +func (a *agentService) Attach(ctx context.Context, c container.Container, events container.ExecEventReader, stdout io.Writer) error { panic("not implemented") } -func (a *agentService) Exec(ctx context.Context, c container.Container, cmd []string, stdin io.Reader, stdout io.Writer) error { - cancelCtx, cancel := context.WithCancel(ctx) - session, err := a.client.ContainerExec(cancelCtx, c.ID, cmd) - - if err != nil { - cancel() - return err - } - - var wg sync.WaitGroup - - wg.Go(func() { - decoder := json.NewDecoder(stdin) - loop: - for { - var event container.ExecEvent - if err := decoder.Decode(&event); err != nil { - if err != io.EOF { - log.Error().Err(err).Msg("error decoding event from ws using agent") - } - break - } - - switch event.Type { - case "userinput": - if _, err := session.Writer.Write([]byte(event.Data)); err != nil { - log.Error().Err(err).Msg("error writing to container using agent") - break loop - } - case "resize": - if err := session.Resize(event.Width, event.Height); err != nil { - log.Error().Err(err).Msg("error resizing terminal using agent") - } - } - } - cancel() - session.Writer.Close() - }) - - wg.Go(func() { - if _, err := stdcopy.StdCopy(stdout, stdout, session.Reader); err != nil { - log.Error().Err(err).Msg("error while writing to ws using agent") - } - cancel() - }) - - wg.Wait() - - return nil +func (a *agentService) Exec(ctx context.Context, c container.Container, cmd []string, events container.ExecEventReader, stdout io.Writer) error { + return a.client.Exec(ctx, c.ID, cmd, events, stdout) +} + +func (a *agentService) UpdateNotificationConfig(ctx context.Context, subscriptions []types.SubscriptionConfig, dispatchers []types.DispatcherConfig) error { + return a.client.UpdateNotificationConfig(ctx, subscriptions, dispatchers) } diff --git a/internal/support/container/client_service.go b/internal/support/container/client_service.go index 77e244b5..9b586abe 100644 --- a/internal/support/container/client_service.go +++ b/internal/support/container/client_service.go @@ -27,6 +27,6 @@ type ClientService interface { StreamLogs(context.Context, container.Container, time.Time, container.StdType, chan<- *container.LogEvent) error // Terminal - Attach(context.Context, container.Container, io.Reader, io.Writer) error - Exec(context.Context, container.Container, []string, io.Reader, io.Writer) error + Attach(context.Context, container.Container, container.ExecEventReader, io.Writer) error + Exec(context.Context, container.Container, []string, container.ExecEventReader, io.Writer) error } diff --git a/internal/support/container/container_service.go b/internal/support/container/container_service.go index 07a97760..7e2296ff 100644 --- a/internal/support/container/container_service.go +++ b/internal/support/container/container_service.go @@ -36,10 +36,10 @@ func (c *ContainerService) Action(ctx context.Context, action container.Containe return c.clientService.ContainerAction(ctx, c.Container, action) } -func (c *ContainerService) Attach(ctx context.Context, stdin io.Reader, stdout io.Writer) error { - return c.clientService.Attach(ctx, c.Container, stdin, stdout) +func (c *ContainerService) Attach(ctx context.Context, events container.ExecEventReader, stdout io.Writer) error { + return c.clientService.Attach(ctx, c.Container, events, stdout) } -func (c *ContainerService) Exec(ctx context.Context, cmd []string, stdin io.Reader, stdout io.Writer) error { - return c.clientService.Exec(ctx, c.Container, cmd, stdin, stdout) +func (c *ContainerService) Exec(ctx context.Context, cmd []string, events container.ExecEventReader, stdout io.Writer) error { + return c.clientService.Exec(ctx, c.Container, cmd, events, stdout) } diff --git a/internal/support/docker/docker_service.go b/internal/support/docker/docker_service.go index c5df78a0..50492083 100644 --- a/internal/support/docker/docker_service.go +++ b/internal/support/docker/docker_service.go @@ -2,7 +2,6 @@ package docker_support import ( "context" - "encoding/json" "io" "sync" "time" @@ -112,7 +111,7 @@ func (d *DockerClientService) SubscribeContainersStarted(ctx context.Context, co d.store.SubscribeNewContainers(ctx, containers) } -func (d *DockerClientService) Attach(ctx context.Context, c container.Container, stdin io.Reader, stdout io.Writer) error { +func (d *DockerClientService) Attach(ctx context.Context, c container.Container, events container.ExecEventReader, stdout io.Writer) error { cancelCtx, cancel := context.WithCancel(ctx) session, err := d.client.ContainerAttach(cancelCtx, c.ID) if err != nil { @@ -123,13 +122,12 @@ func (d *DockerClientService) Attach(ctx context.Context, c container.Container, var wg sync.WaitGroup wg.Go(func() { - decoder := json.NewDecoder(stdin) loop: for { - var event container.ExecEvent - if err := decoder.Decode(&event); err != nil { + event, err := events.ReadEvent() + if err != nil { if err != io.EOF { - log.Error().Err(err).Msg("error while decoding event from ws") + log.Error().Err(err).Msg("error while reading event") } break } @@ -170,7 +168,7 @@ func (d *DockerClientService) Attach(ctx context.Context, c container.Container, return nil } -func (d *DockerClientService) Exec(ctx context.Context, c container.Container, cmd []string, stdin io.Reader, stdout io.Writer) error { +func (d *DockerClientService) Exec(ctx context.Context, c container.Container, cmd []string, events container.ExecEventReader, stdout io.Writer) error { cancelCtx, cancel := context.WithCancel(ctx) session, err := d.client.ContainerExec(cancelCtx, c.ID, cmd) if err != nil { @@ -181,13 +179,12 @@ func (d *DockerClientService) Exec(ctx context.Context, c container.Container, c var wg sync.WaitGroup wg.Go(func() { - decoder := json.NewDecoder(stdin) loop: for { - var event container.ExecEvent - if err := decoder.Decode(&event); err != nil { + event, err := events.ReadEvent() + if err != nil { if err != io.EOF { - log.Error().Err(err).Msg("error while decoding event from ws") + log.Error().Err(err).Msg("error while reading event") } break } diff --git a/internal/support/docker/multi_host_service.go b/internal/support/docker/multi_host_service.go index 6b777ae6..9b2fa923 100644 --- a/internal/support/docker/multi_host_service.go +++ b/internal/support/docker/multi_host_service.go @@ -4,12 +4,14 @@ import ( "context" "fmt" "os" + "sync" "time" "github.com/amir20/dozzle/internal/container" "github.com/amir20/dozzle/internal/notification" "github.com/amir20/dozzle/internal/notification/dispatcher" container_support "github.com/amir20/dozzle/internal/support/container" + "github.com/amir20/dozzle/types" "github.com/rs/zerolog/log" lop "github.com/samber/lo/parallel" ) @@ -223,28 +225,76 @@ func (m *MultiHostService) saveNotificationConfig() { if err := m.notificationManager.WriteConfig(file); err != nil { log.Error().Err(err).Msg("Could not write notification config") } + + // Broadcast to all agents + m.broadcastNotificationConfig() +} + +// NotificationConfigUpdater is an interface for clients that support notification config updates +type NotificationConfigUpdater interface { + UpdateNotificationConfig(ctx context.Context, subscriptions []types.SubscriptionConfig, dispatchers []types.DispatcherConfig) error +} + +// broadcastNotificationConfig sends current notification config to all agent clients +func (m *MultiHostService) broadcastNotificationConfig() { + notifSubs := m.notificationManager.Subscriptions() + notifDispatchers := m.notificationManager.Dispatchers() + + // Convert notification.Subscription to types.SubscriptionConfig + subscriptions := make([]types.SubscriptionConfig, len(notifSubs)) + for i, sub := range notifSubs { + subscriptions[i] = types.SubscriptionConfig{ + ID: sub.ID, + Name: sub.Name, + Enabled: sub.Enabled, + DispatcherID: sub.DispatcherID, + LogExpression: sub.LogExpression, + ContainerExpression: sub.ContainerExpression, + } + } + + // Convert notification.DispatcherConfig to types.DispatcherConfig + dispatchers := make([]types.DispatcherConfig, len(notifDispatchers)) + for i, d := range notifDispatchers { + dispatchers[i] = types.DispatcherConfig{ + ID: d.ID, + Name: d.Name, + Type: d.Type, + URL: d.URL, + Template: d.Template, + } + } + + var wg sync.WaitGroup + for _, client := range m.manager.List() { + // Check if client supports notification config updates (agents do, local docker clients don't) + if updater, ok := client.(NotificationConfigUpdater); ok { + wg.Go(func() { + ctx, cancel := context.WithTimeout(context.Background(), m.timeout) + defer cancel() + if err := updater.UpdateNotificationConfig(ctx, subscriptions, dispatchers); err != nil { + log.Error().Err(err).Msg("Failed to broadcast notification config to agent") + } else { + log.Debug().Int("subscriptions", len(subscriptions)).Int("dispatchers", len(dispatchers)).Msg("Broadcasted notification config to agent") + } + }) + } + } + wg.Wait() } // AddSubscription adds a subscription to local manager and broadcasts to agents func (m *MultiHostService) AddSubscription(sub *notification.Subscription) error { - // Add to local manager if err := m.notificationManager.AddSubscription(sub); err != nil { return err } - - // TODO: Broadcast to agents via gRPC when agent notification support is added - m.saveNotificationConfig() return nil } // RemoveSubscription removes a subscription from local manager and broadcasts to agents func (m *MultiHostService) RemoveSubscription(id int) { - // Remove from local manager m.notificationManager.RemoveSubscription(id) - - // TODO: Broadcast to agents via gRPC when agent notification support is added - m.saveNotificationConfig() } diff --git a/internal/support/k8s/k8s_service.go b/internal/support/k8s/k8s_service.go index b2885d12..403c00b8 100644 --- a/internal/support/k8s/k8s_service.go +++ b/internal/support/k8s/k8s_service.go @@ -2,7 +2,6 @@ package k8s_support import ( "context" - "encoding/json" "io" "sync" @@ -93,7 +92,7 @@ func (k *K8sClientService) SubscribeContainersStarted(ctx context.Context, conta k.store.SubscribeNewContainers(ctx, containers) } -func (k *K8sClientService) Attach(ctx context.Context, c container.Container, stdin io.Reader, stdout io.Writer) error { +func (k *K8sClientService) Attach(ctx context.Context, c container.Container, events container.ExecEventReader, stdout io.Writer) error { cancelCtx, cancel := context.WithCancel(ctx) session, err := k.client.ContainerAttach(cancelCtx, c.ID) if err != nil { @@ -107,13 +106,12 @@ func (k *K8sClientService) Attach(ctx context.Context, c container.Container, st defer session.Writer.Close() defer cancel() - decoder := json.NewDecoder(stdin) loop: for { - var event container.ExecEvent - if err := decoder.Decode(&event); err != nil { + event, err := events.ReadEvent() + if err != nil { if err != io.EOF { - log.Error().Err(err).Msg("error decoding event") + log.Error().Err(err).Msg("error reading event") } break } @@ -143,7 +141,7 @@ func (k *K8sClientService) Attach(ctx context.Context, c container.Container, st return nil } -func (k *K8sClientService) Exec(ctx context.Context, c container.Container, cmd []string, stdin io.Reader, stdout io.Writer) error { +func (k *K8sClientService) Exec(ctx context.Context, c container.Container, cmd []string, events container.ExecEventReader, stdout io.Writer) error { cancelCtx, cancel := context.WithCancel(ctx) session, err := k.client.ContainerExec(cancelCtx, c.ID, cmd) if err != nil { @@ -157,13 +155,12 @@ func (k *K8sClientService) Exec(ctx context.Context, c container.Container, cmd defer session.Writer.Close() defer cancel() - decoder := json.NewDecoder(stdin) loop: for { - var event container.ExecEvent - if err := decoder.Decode(&event); err != nil { + event, err := events.ReadEvent() + if err != nil { if err != io.EOF { - log.Error().Err(err).Msg("error decoding event") + log.Error().Err(err).Msg("error reading event") } break } diff --git a/internal/web/terminal.go b/internal/web/terminal.go index 33edbc04..b69a6b24 100644 --- a/internal/web/terminal.go +++ b/internal/web/terminal.go @@ -1,9 +1,12 @@ package web import ( + "encoding/json" + "io" "net/http" "github.com/amir20/dozzle/internal/auth" + "github.com/amir20/dozzle/internal/container" "github.com/go-chi/chi/v5" "github.com/gorilla/websocket" "github.com/rs/zerolog/log" @@ -48,9 +51,9 @@ func (h *handler) attach(w http.ResponseWriter, r *http.Request) { return } - wsReader := &webSocketReader{conn: conn} + eventReader := &jsonEventReader{conn: conn} wsWriter := &webSocketWriter{conn: conn} - if err = containerService.Attach(r.Context(), wsReader, wsWriter); err != nil { + if err = containerService.Attach(r.Context(), eventReader, wsWriter); err != nil { log.Error().Err(err).Msg("error while trying to attach to container") conn.WriteMessage(websocket.TextMessage, []byte("🚨 Error while trying to attach to container\r\n")) return @@ -88,9 +91,9 @@ func (h *handler) exec(w http.ResponseWriter, r *http.Request) { return } - wsReader := &webSocketReader{conn: conn} + eventReader := &jsonEventReader{conn: conn} wsWriter := &webSocketWriter{conn: conn} - if err = containerService.Exec(r.Context(), []string{"sh", "-c", "command -v bash >/dev/null 2>&1 && exec bash || exec sh"}, wsReader, wsWriter); err != nil { + if err = containerService.Exec(r.Context(), []string{"sh", "-c", "command -v bash >/dev/null 2>&1 && exec bash || exec sh"}, eventReader, wsWriter); err != nil { log.Error().Err(err).Msg("error while trying to attach to container") conn.WriteMessage(websocket.TextMessage, []byte("🚨 Error while trying to attach to container\r\n")) return @@ -106,30 +109,21 @@ func (w *webSocketWriter) Write(p []byte) (int, error) { return len(p), err } -type webSocketReader struct { - conn *websocket.Conn - buffer []byte +// jsonEventReader reads JSON-encoded ExecEvents from a websocket connection +type jsonEventReader struct { + conn *websocket.Conn } -func (r *webSocketReader) Read(p []byte) (n int, err error) { - if len(r.buffer) > 0 { - n = copy(p, r.buffer) - r.buffer = r.buffer[n:] - return n, nil - } - - // Otherwise, read a new message +func (r *jsonEventReader) ReadEvent() (*container.ExecEvent, error) { _, message, err := r.conn.ReadMessage() if err != nil { - return 0, err + return nil, io.EOF } - n = copy(p, message) - - // If we couldn't copy the entire message, store the rest in our buffer - if n < len(message) { - r.buffer = message[n:] + var event container.ExecEvent + if err := json.Unmarshal(message, &event); err != nil { + return nil, err } - return n, nil + return &event, nil } diff --git a/main.go b/main.go index dd9b37b3..07cac66c 100644 --- a/main.go +++ b/main.go @@ -89,7 +89,10 @@ func main() { if err != nil { log.Fatal().Err(err).Msg("failed to listen") } - server, err := agent.NewServer(localClient, certs, args.Version(), args.Filter) + // Create client service for agent server in swarm mode + clientService := docker_support.NewDockerClientService(localClient, args.Filter) + // TODO add notification for swarm mode + server, err := agent.NewServer(clientService, certs, args.Version(), nil) if err != nil { log.Fatal().Err(err).Msg("failed to create agent") } diff --git a/protos/rpc.proto b/protos/rpc.proto index 741b4542..369f9e2e 100644 --- a/protos/rpc.proto +++ b/protos/rpc.proto @@ -1,44 +1,48 @@ syntax = "proto3"; -option go_package = "internal/agent/pb"; package protobuf; -import "types.proto"; import "google/protobuf/timestamp.proto"; +import "types.proto"; + +option go_package = "internal/agent/pb"; service AgentService { rpc ListContainers(ListContainersRequest) returns (ListContainersResponse) {} rpc FindContainer(FindContainerRequest) returns (FindContainerResponse) {} rpc StreamLogs(StreamLogsRequest) returns (stream StreamLogsResponse) {} - rpc LogsBetweenDates(LogsBetweenDatesRequest) - returns (stream StreamLogsResponse) {} - rpc StreamRawBytes(StreamRawBytesRequest) - returns (stream StreamRawBytesResponse) {} + rpc LogsBetweenDates(LogsBetweenDatesRequest) returns (stream StreamLogsResponse) {} + rpc StreamRawBytes(StreamRawBytesRequest) returns (stream StreamRawBytesResponse) {} rpc StreamEvents(StreamEventsRequest) returns (stream StreamEventsResponse) {} rpc StreamStats(StreamStatsRequest) returns (stream StreamStatsResponse) {} - rpc StreamContainerStarted(StreamContainerStartedRequest) - returns (stream StreamContainerStartedResponse) {} + rpc StreamContainerStarted(StreamContainerStartedRequest) returns (stream StreamContainerStartedResponse) {} rpc HostInfo(HostInfoRequest) returns (HostInfoResponse) {} - rpc ContainerAction(ContainerActionRequest) - returns (ContainerActionResponse) {} - rpc ContainerExec(stream ContainerExecRequest) - returns (stream ContainerExecResponse) {} - rpc ContainerAttach(stream ContainerAttachRequest) - returns (stream ContainerAttachResponse) {} + rpc ContainerAction(ContainerActionRequest) returns (ContainerActionResponse) {} + rpc ContainerExec(stream ContainerExecRequest) returns (stream ContainerExecResponse) {} + rpc ContainerAttach(stream ContainerAttachRequest) returns (stream ContainerAttachResponse) {} + rpc UpdateNotificationConfig(UpdateNotificationConfigRequest) returns (UpdateNotificationConfigResponse) {} } -message ListContainersRequest { map filter = 1; } +message ListContainersRequest { + map filter = 1; +} -message RepeatedString { repeated string values = 1; } +message RepeatedString { + repeated string values = 1; +} -message ListContainersResponse { repeated Container containers = 1; } +message ListContainersResponse { + repeated Container containers = 1; +} message FindContainerRequest { string containerId = 1; map filter = 2; } -message FindContainerResponse { Container container = 1; } +message FindContainerResponse { + Container container = 1; +} message StreamLogsRequest { string containerId = 1; @@ -46,7 +50,9 @@ message StreamLogsRequest { int32 streamTypes = 3; } -message StreamLogsResponse { LogEvent event = 1; } +message StreamLogsResponse { + LogEvent event = 1; +} message LogsBetweenDatesRequest { string containerId = 1; @@ -61,20 +67,30 @@ message StreamRawBytesRequest { google.protobuf.Timestamp until = 3; int32 streamTypes = 4; } -message StreamRawBytesResponse { bytes data = 1; } +message StreamRawBytesResponse { + bytes data = 1; +} message StreamEventsRequest {} -message StreamEventsResponse { ContainerEvent event = 1; } +message StreamEventsResponse { + ContainerEvent event = 1; +} message StreamStatsRequest {} -message StreamStatsResponse { ContainerStat stat = 1; } +message StreamStatsResponse { + ContainerStat stat = 1; +} message HostInfoRequest {} -message HostInfoResponse { Host host = 1; } +message HostInfoResponse { + Host host = 1; +} message StreamContainerStartedRequest {} -message StreamContainerStartedResponse { Container container = 1; } +message StreamContainerStartedResponse { + Container container = 1; +} message ContainerActionRequest { string containerId = 1; @@ -97,7 +113,9 @@ message ResizePayload { uint32 height = 2; } -message ContainerExecResponse { bytes stdout = 1; } +message ContainerExecResponse { + bytes stdout = 1; +} message ContainerAttachRequest { string containerId = 1; @@ -107,4 +125,13 @@ message ContainerAttachRequest { } } -message ContainerAttachResponse { bytes stdout = 1; } +message ContainerAttachResponse { + bytes stdout = 1; +} + +message UpdateNotificationConfigRequest { + repeated NotificationSubscription subscriptions = 1; + repeated NotificationDispatcher dispatchers = 2; +} + +message UpdateNotificationConfigResponse {} diff --git a/protos/types.proto b/protos/types.proto index 744ec8ab..5f4708a8 100644 --- a/protos/types.proto +++ b/protos/types.proto @@ -92,3 +92,20 @@ enum ContainerAction { Stop = 1; Restart = 2; } + +message NotificationSubscription { + int32 id = 1; + string name = 2; + bool enabled = 3; + int32 dispatcherId = 4; + string logExpression = 5; + string containerExpression = 6; +} + +message NotificationDispatcher { + int32 id = 1; + string name = 2; + string type = 3; + string url = 4; + string template = 5; +} diff --git a/types/notification.go b/types/notification.go index 18725bb1..282b9da5 100644 --- a/types/notification.go +++ b/types/notification.go @@ -30,3 +30,22 @@ type NotificationLog struct { Stream string `json:"stream" expr:"stream"` Type string `json:"type" expr:"type"` } + +// SubscriptionConfig represents a notification subscription configuration +type SubscriptionConfig struct { + ID int + Name string + Enabled bool + DispatcherID int + LogExpression string + ContainerExpression string +} + +// DispatcherConfig represents a notification dispatcher configuration +type DispatcherConfig struct { + ID int + Name string + Type string + URL string + Template string +}