feat: supports notifications for agents (#4372)

This commit is contained in:
Amir Raminfar
2026-01-22 16:54:53 -08:00
committed by GitHub
parent a7405ac2d6
commit 0ff0d486b1
25 changed files with 1382 additions and 533 deletions
+195 -11
View File
@@ -31,9 +31,6 @@ The application supports multiple deployment modes: standalone server, Docker Sw
# Install dependencies
pnpm install
# Install Go tools (protobuf, air hot-reloader)
make tools
# Generate certificates and protobuf files
make generate
```
@@ -130,8 +127,18 @@ The Go backend is organized into these key packages:
- Role-based authorization (`roles.go`)
- **`internal/container/`** - Container domain models and interfaces
- `event_generator.go`: Log parsing and grouping logic (multi-line, JSON detection)
- **`main.go`** - Application entry point with mode switching (server/swarm/k8s)
- **`internal/notification/`** - Alert and notification system
- `manager.go`: Notification rule evaluation and dispatching
- `log_listener.go`: Log pattern matching for alerts
- `dispatcher/`: Notification channel implementations (email, webhook, etc.)
- **`graph/`** - GraphQL API layer
- `schema.graphqls`: GraphQL schema definitions
- `*.resolvers.go`: GraphQL resolver implementations
- **`main.go`** - Application entry point with mode switching (server/swarm/k8s/agent)
### Frontend (Vue 3)
@@ -161,17 +168,23 @@ The frontend uses file-based routing with these conventions:
- `ContainerTable.vue`: Container table with historical stat visualization
- **`assets/stores/`** - Pinia stores (auto-imported)
- `config.ts`: App configuration and feature flags
- `container.ts`: Container state management
- `config.ts`: App configuration and feature flags (injected from backend HTML, frozen immutable)
- `container.ts`: Container state management with EventSource streaming (`/api/events/stream`)
- `hosts.ts`: Multi-host state
- `settings.ts`: User preferences
- `settings.ts`: User preferences (localStorage-backed via profileStorage)
- `pinned.ts`: Pinned container logs for side-by-side viewing
- `swarm.ts`, `k8s.ts`: Deployment mode-specific state
- `announcements.ts`: Feature announcements
- **`assets/composable/`** - Vue composables (auto-imported)
- `eventStreams.ts`: SSE connection management
- `eventStreams.ts`: SSE connection management with buffer-based flushing (250ms debounce)
- `historicalLogs.ts`: Historical log fetching
- `logContext.ts`: Log filtering and search context
- `storage.ts`: LocalStorage abstractions
- `logContext.ts`: Log filtering and search context (provide/inject pattern)
- `scrollContext.ts`: Scroll state management (paused, progress, currentDate)
- `storage.ts`: LocalStorage abstractions with reactivity
- `visible.ts`: Log filtering by visible keys for complex logs
- `containerActions.ts`: Container control operations
- `duckdb.ts`: DuckDB WASM for SQL queries on logs
- **`assets/modules/`** - Vue plugins
- `router.ts`: Vue Router configuration
@@ -185,6 +198,7 @@ The frontend uses file-based routing with these conventions:
3. **Stats**: Real-time CPU/memory stats streamed via SSE alongside events
4. **Actions**: POST to `/api/hosts/{host}/containers/{id}/actions/{action}` (start/stop/restart)
5. **Terminal**: WebSocket connections for container attach/exec at `/api/hosts/{host}/containers/{id}/attach`
6. **GraphQL**: POST to `/api/graphql` for queries and mutations (container metadata, historical logs, notifications)
### Build System
@@ -206,6 +220,8 @@ The frontend uses file-based routing with these conventions:
- `ComplexLogEntry`: Structured JSON logs (`JSONObject`)
- `GroupedLogEntry`: Multi-line grouped logs (`string[]`)
- **Type consistency**: Use `LogMessage` type alias instead of `string | string[] | JSONObject` for log entry messages
- **Log Entry Factory Pattern**: Use `LogEntry.create(logEvent)` to instantiate the correct entry type based on `logEvent.t` field
- **EventSource Buffering**: Log streams use buffer-based flushing (250ms debounce, 1000ms max) to batch UI updates
- **Charts/Visualizations**: Custom lightweight implementations (no D3.js)
- `BarChart.vue`: Self-contained bar chart with responsive downsampling
- Downsampling algorithm: Averages data into buckets based on available screen width
@@ -217,6 +233,13 @@ The frontend uses file-based routing with these conventions:
- Certificate generation is required (`make generate` creates shared_key.pem and shared_cert.pem)
- Protocol buffer generation happens via `go generate` directive in `main.go`
- Docker client uses API version negotiation for compatibility
- **GraphQL API**: Uses gqlgen with schema in `graph/schema.graphqls`, generated code in `graph/generated.go`
- Run `pnpm codegen` to regenerate GraphQL types
- Resolvers follow-schema layout in `graph/*.resolvers.go`
- **Service Layer Architecture**:
- `ClientService` interface abstracts Docker/K8s/Agent backends
- `MultiHostService` orchestrates multi-host operations
- `ClientManager` implementations: `RetriableClientManager` (server mode), `SwarmClientManager` (swarm mode)
### Authentication
@@ -247,7 +270,168 @@ The frontend uses file-based routing with these conventions:
### Deployment Modes
- **Server mode**: Single or multi-host Docker monitoring
- **Server mode** (default): Single or multi-host Docker monitoring
- Uses `RetriableClientManager` with local + remote agent clients
- **Swarm mode**: Automatic discovery of Swarm nodes via Docker API
- Creates gRPC agent server on each node (port 7007)
- Uses `SwarmClientManager` for node discovery
- **K8s mode**: Pod log monitoring in Kubernetes cluster
- Implements `container.Client` interface via Kubernetes API
- **Agent mode**: Lightweight gRPC agent for remote log collection
- Run with `dozzle agent` or `pnpm run agent:dev`
- Listens on port 7007 with TLS certificate authentication
## Key Architectural Patterns
### Backend Abstraction Layers
The backend follows a clean layered architecture:
```
HTTP Handlers (internal/web)
HostService Interface (MultiHostService)
ClientService Interface (per host)
container.Client Interface
Implementation (DockerClient, K8sClient, AgentClient)
```
**When adding new container operations:**
1. Define method in `container.Client` interface (`internal/container/client.go`)
2. Implement in `internal/docker/client.go` (and `internal/k8s/client.go` if applicable)
3. Add wrapper method in `ClientService` interface (`internal/support/container/service.go`)
4. Add HTTP handler in `internal/web/` with appropriate route
### Frontend Data Flow
**Real-time Log Viewing:**
1. User navigates to `/container/{id}` route
2. Page component calls `useContainerStream(container)` composable
3. Composable creates EventSource connection to `/api/hosts/{host}/containers/{id}/logs/stream`
4. Backend streams `LogEvent` objects via SSE
5. Frontend buffers events (250ms debounce, max 1000ms)
6. Batched buffer flushes update reactive `messages` array
7. `LogViewer.vue` renders using appropriate component (`SimpleLogItem`, `ComplexLogItem`, `GroupedLogItem`)
8. When messages exceed `maxLogs` (400), oldest entries replaced or marked as `SkippedLogsEntry`
**Stats Streaming:**
1. `container.ts` store connects to `/api/events/stream` on app init
2. Backend multiplexes container events and stats into single SSE stream
3. `container-stat` events update `Container._stat` and append to `_statsHistory`
4. EMA calculation provides smoothed `movingAverageStat` (alpha=0.2)
5. `ContainerTable.vue` displays mini bar charts using `statsHistory` with downsampling
### Protocol Buffer Flow (Agent Mode)
1. Main server creates `agent.NewClient(endpoint, certs)` for each remote host
2. AgentClient implements `container.Client` interface
3. Method calls translate to gRPC requests defined in `protos/rpc.proto`
4. Remote agent receives gRPC call, delegates to local `DockerClient`
5. Streaming RPCs (logs, stats, events) use bidirectional channels
6. Responses converted back to domain models via `FromProto()` methods
### Log Parsing Pipeline
1. Docker API returns multiplexed stream (8-byte headers + payload)
2. `log_reader.go` parses headers, extracts stdout/stderr type
3. `event_generator.go` receives raw log lines
4. Detection logic identifies:
- JSON structure → `ComplexLogEntry`
- Multi-line patterns (stack traces) → `GroupedLogEntry`
- Single lines → `SimpleLogEntry`
5. Log level extraction via regex patterns
6. `LogEvent` serialized to JSON and sent via SSE
7. Frontend deserializes and renders with appropriate component
## Adding New Features
### Adding a New HTTP Route
1. Define route in `internal/web/routes.go` using chi router:
```go
r.Get("/api/custom-endpoint", h.customHandler)
```
2. Implement handler method in appropriate file (e.g., `actions.go`, `logs.go`)
3. Use `hostService` to find container/host via `FindContainer()` or `FindHost()`
4. Return JSON response or establish SSE/WebSocket stream
### Adding a New Log View Type
1. Create route file in `assets/pages/` (e.g., `custom/[id].vue`)
2. Create composable in `assets/composable/eventStreams.ts` (e.g., `useCustomStream()`)
3. Composable should:
- Build API URL with appropriate filters
- Create EventSource connection
- Handle buffering and message batching
- Return reactive `messages` array and control methods
4. Use `LogViewer.vue` component to render messages
5. Add backend API endpoint if needed (see above)
### Adding a New GraphQL Query/Mutation
1. Define in `graph/schema.graphqls`
2. Run `pnpm codegen` to regenerate types
3. Implement resolver in `graph/schema.resolvers.go`
4. Use `hostService` from resolver context to access backend services
5. Frontend calls via urql client (auto-imported via `@urql/vue`)
### Adding Container Stats/Metrics
1. Add field to `Stat` type in `internal/container/types.go`
2. Update `stats_collector.go` to extract metric from Docker API response
3. Add calculation logic in `docker/calculation.go` if needed
4. Ensure protobuf definition includes field in `protos/rpc.proto`
5. Frontend automatically receives updates via existing SSE stream
6. Update `Container` model in `assets/models/Container.ts` if UI needs access
### Working with Notifications/Alerts
**Backend** (`internal/notification/`):
- `manager.go`: Rule evaluation engine, manages alert state
- `log_listener.go`: Subscribes to container log streams, evaluates rules against incoming logs
- `types.go`: Alert rule definitions (log pattern matching, thresholds)
- `dispatcher/`: Notification channel implementations
**Frontend** (`assets/pages/notifications.vue`, `assets/components/Notification/`):
- `AlertForm.vue`, `DestinationForm.vue`: UI for creating rules
- Rules stored via GraphQL mutations
- Alert state displayed in notification cards
**Adding a new notification channel:**
1. Implement dispatcher interface in `internal/notification/dispatcher/`
2. Register in `manager.go` dispatcher factory
3. Add UI form in `assets/components/Notification/DestinationForm.vue`
4. Add GraphQL schema fields if needed
## Common Development Patterns
### Testing
- Always run Go tests with race detector: `go test -race`
- Frontend tests require `TZ=UTC` for timestamp consistency
- Integration tests use Playwright with `make int` (runs docker-compose setup)
- Use `testify/assert` for Go test assertions
### Hot Reload Development
- `make dev` runs both backend (air) and frontend (vite) with hot reload
- `DEV=true` disables embedded asset serving
- `LIVE_FS=true` serves assets from filesystem instead of embedded
- Backend changes trigger air restart automatically
- Frontend changes trigger vite HMR
### Debugging
- Backend logs: Set `--level debug` flag or `DOZZLE_LEVEL=debug` env var
- Frontend: Vue DevTools browser extension
- GraphQL: Use GraphQL Playground at `/api/graphql` (when enabled)
- SSE streams: Browser DevTools Network tab shows EventSource connections
+13 -2
View File
@@ -27,7 +27,7 @@ build: dist generate
.PHONY: docker
docker: shared_key.pem shared_cert.pem
@docker build --build-arg TAG=local -t amir20/dozzle .
@docker build --build-arg TAG=local -t amir20/dozzle:local .
.PHONY: generate
generate: shared_key.pem shared_cert.pem
@@ -55,7 +55,18 @@ push: docker
@docker push amir20/dozzle:local-test
run: docker
docker run -it --rm -p 8080:8080 -v /var/run/docker.sock:/var/run/docker.sock amir20/dozzle:latest
docker run -it --rm -p 8080:8080 -v /var/run/docker.sock:/var/run/docker.sock amir20/dozzle:local
preview: build
pnpm preview
.PHONY: agent-reload
agent-reload: docker
@VM_NAME=$${VM_NAME:-dozzle-agent}; \
echo "📦 Loading image into VM $$VM_NAME..."; \
docker save amir20/dozzle:local | orb exec -m $$VM_NAME docker load; \
echo "🔄 Recreating agent..."; \
orb exec -m $$VM_NAME docker stop dozzle-agent || true; \
orb exec -m $$VM_NAME docker rm dozzle-agent || true; \
orb exec -m $$VM_NAME docker run -d --name dozzle-agent -p 7007:7007 -v /var/run/docker.sock:/var/run/docker.sock -v ~/dozzle-certs:/certs -v ~/dozzle-data:/data -e DOZZLE_LEVEL=debug amir20/dozzle:local agent --cert /certs/shared_cert.pem --key /certs/shared_key.pem; \
echo "✅ Agent reloaded"
+61 -12
View File
@@ -1,14 +1,39 @@
#!/bin/bash
set -e
# Parse arguments
USE_LOCAL=false
POSITIONAL_ARGS=()
while [[ $# -gt 0 ]]; do
case $1 in
--local)
USE_LOCAL=true
shift
;;
*)
POSITIONAL_ARGS+=("$1")
shift
;;
esac
done
# Configuration
VM_NAME="${1:-dozzle-agent}"
DISTRO="${2:-ubuntu}"
AGENT_PORT="${3:-7007}"
VM_NAME="${POSITIONAL_ARGS[0]:-dozzle-agent}"
DISTRO="${POSITIONAL_ARGS[1]:-ubuntu}"
AGENT_PORT="${POSITIONAL_ARGS[2]:-7007}"
SHARED_CERT="./shared_cert.pem"
SHARED_KEY="./shared_key.pem"
DOZZLE_IMAGE="amir20/dozzle:latest"
if [ "$USE_LOCAL" = true ]; then
DOZZLE_IMAGE="amir20/dozzle:local"
fi
echo "🚀 Setting up Dozzle Agent on OrbStack VM: $VM_NAME"
if [ "$USE_LOCAL" = true ]; then
echo " Using locally built image"
fi
# Verify shared certificates exist
if [ ! -f "$SHARED_CERT" ]; then
@@ -40,10 +65,7 @@ sleep 3
# Step 2: Install Docker in the VM
echo "🐳 Installing Docker..."
if ! orb exec -m "$VM_NAME" bash -c '
curl -fsSL https://get.docker.com | sh
sudo usermod -aG docker $(whoami)
'; then
if ! orb exec -m "$VM_NAME" bash -c 'curl -fsSL https://get.docker.com | sh && sudo usermod -aG docker $(whoami)'; then
echo "❌ Docker installation failed"
exit 1
fi
@@ -62,17 +84,44 @@ cat "$SHARED_KEY" | orb exec -m "$VM_NAME" bash -c 'cat > ~/dozzle-certs/shared_
echo "✅ Certificates copied"
# Step 4: Start Dozzle agent
# Step 4: Load or pull Dozzle image
if [ "$USE_LOCAL" = true ]; then
echo "🔨 Building local Docker image..."
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
if ! (cd "$PROJECT_ROOT" && make docker); then
echo "❌ Failed to build Docker image"
exit 1
fi
echo "📦 Loading image into VM..."
if ! docker save amir20/dozzle:local | orb exec -m "$VM_NAME" docker load; then
echo "❌ Failed to load image into VM"
exit 1
fi
echo "✅ Local image loaded"
else
echo "📥 Pulling Dozzle image..."
if ! orb exec -m "$VM_NAME" docker pull amir20/dozzle:latest; then
echo "❌ Failed to pull image"
exit 1
fi
fi
# Step 5: Start Dozzle agent
echo "🎯 Starting Dozzle agent..."
orb exec -m "$VM_NAME" bash -c 'mkdir -p ~/dozzle-data'
if ! orb exec -m "$VM_NAME" bash -c "
set -e
docker pull amir20/dozzle:latest
docker run -d --name dozzle-agent \
--restart unless-stopped \
-v /var/run/docker.sock:/var/run/docker.sock \
-v ~/dozzle-certs:/certs \
-v ~/dozzle-data:/data \
-p $AGENT_PORT:7007 \
amir20/dozzle:latest agent \
-e DOZZLE_LEVEL=debug \
$DOZZLE_IMAGE agent \
--cert /certs/shared_cert.pem \
--key /certs/shared_key.pem
"; then
@@ -82,11 +131,11 @@ fi
echo "✅ Dozzle agent started"
# Step 5: Wait for agent to be ready
# Step 6: Wait for agent to be ready
echo "⏳ Waiting for agent to be ready..."
sleep 3
# Step 6: Verify agent is running
# Step 7: Verify agent is running
echo "🧪 Verifying agent is running..."
if orb exec -m "$VM_NAME" docker ps --filter name=dozzle-agent --format "{{.Status}}" | grep -q "Up"; then
echo "✅ Agent is running"
+89 -51
View File
@@ -8,12 +8,14 @@ import (
"errors"
"fmt"
"io"
"sync"
"time"
"encoding/json"
"github.com/amir20/dozzle/internal/agent/pb"
"github.com/amir20/dozzle/internal/container"
"github.com/amir20/dozzle/types"
"github.com/rs/zerolog/log"
orderedmap "github.com/wk8/go-ordered-map/v2"
"google.golang.org/grpc"
@@ -365,30 +367,41 @@ func (c *Client) ContainerAttach(ctx context.Context, containerId string) (*cont
defer stdoutWriter.Close()
for {
msg, err := stream.Recv()
if err != nil {
select {
case <-ctx.Done():
return
}
default:
msg, err := stream.Recv()
if err != nil {
return
}
stdoutWriter.Write(msg.Stdout)
stdoutWriter.Write(msg.Stdout)
}
}
}()
go func() {
defer stdinReader.Close()
buffer := make([]byte, 1024)
for {
n, err := stdinReader.Read(buffer)
if err != nil {
select {
case <-ctx.Done():
return
}
default:
n, err := stdinReader.Read(buffer)
if err != nil {
return
}
if err := stream.Send(&pb.ContainerAttachRequest{
Payload: &pb.ContainerAttachRequest_Stdin{
Stdin: buffer[:n],
},
}); err != nil {
return
if err := stream.Send(&pb.ContainerAttachRequest{
Payload: &pb.ContainerAttachRequest_Stdin{
Stdin: buffer[:n],
},
}); err != nil {
return
}
}
}
}()
@@ -412,76 +425,101 @@ func (c *Client) ContainerAttach(ctx context.Context, containerId string) (*cont
}, nil
}
func (c *Client) ContainerExec(ctx context.Context, containerId string, cmd []string) (*container.ExecSession, error) {
func (c *Client) Exec(ctx context.Context, containerId string, cmd []string, events container.ExecEventReader, stdout io.Writer) error {
stream, err := c.client.ContainerExec(ctx)
if err != nil {
return nil, err
return err
}
if err = stream.Send(&pb.ContainerExecRequest{
ContainerId: containerId,
Command: cmd,
}); err != nil {
return nil, err
return err
}
stdoutReader, stdoutWriter := io.Pipe()
stdinReader, stdinWriter := io.Pipe()
go func() {
defer stdoutWriter.Close()
var wg sync.WaitGroup
// Read from gRPC stream and write to stdout
wg.Go(func() {
for {
msg, err := stream.Recv()
if err != nil {
return
}
stdoutWriter.Write(msg.Stdout)
stdout.Write(msg.Stdout)
}
}()
go func() {
buffer := make([]byte, 1024)
})
// Read events and convert to gRPC messages
wg.Go(func() {
for {
n, err := stdinReader.Read(buffer)
event, err := events.ReadEvent()
if err != nil {
return
}
if err := stream.Send(&pb.ContainerExecRequest{
Payload: &pb.ContainerExecRequest_Stdin{
Stdin: buffer[:n],
},
}); err != nil {
return
switch event.Type {
case "userinput":
stream.Send(&pb.ContainerExecRequest{
Payload: &pb.ContainerExecRequest_Stdin{
Stdin: []byte(event.Data),
},
})
case "resize":
stream.Send(&pb.ContainerExecRequest{
Payload: &pb.ContainerExecRequest_Resize{
Resize: &pb.ResizePayload{
Width: uint32(event.Width),
Height: uint32(event.Height),
},
},
})
}
}
}()
})
// Create resize closure that sends via gRPC
resizeFn := func(width uint, height uint) error {
return stream.Send(&pb.ContainerExecRequest{
Payload: &pb.ContainerExecRequest_Resize{
Resize: &pb.ResizePayload{
Width: uint32(width),
Height: uint32(height),
},
},
})
}
return &container.ExecSession{
Writer: stdinWriter,
Reader: stdoutReader,
Resize: resizeFn,
}, nil
wg.Wait()
return nil
}
func (c *Client) Close() error {
return c.conn.Close()
}
func (c *Client) UpdateNotificationConfig(ctx context.Context, subscriptions []types.SubscriptionConfig, dispatchers []types.DispatcherConfig) error {
// Convert to proto
pbSubs := make([]*pb.NotificationSubscription, len(subscriptions))
for i, sub := range subscriptions {
pbSubs[i] = &pb.NotificationSubscription{
Id: int32(sub.ID),
Name: sub.Name,
Enabled: sub.Enabled,
DispatcherId: int32(sub.DispatcherID),
LogExpression: sub.LogExpression,
ContainerExpression: sub.ContainerExpression,
}
}
pbDispatchers := make([]*pb.NotificationDispatcher, len(dispatchers))
for i, d := range dispatchers {
pbDispatchers[i] = &pb.NotificationDispatcher{
Id: int32(d.ID),
Name: d.Name,
Type: d.Type,
Url: d.URL,
Template: d.Template,
}
}
_, err := c.client.UpdateNotificationConfig(ctx, &pb.UpdateNotificationConfigRequest{
Subscriptions: pbSubs,
Dispatchers: pbDispatchers,
})
return err
}
func jsonBytesToOrderedMap(b []byte) *orderedmap.OrderedMap[string, any] {
var data *orderedmap.OrderedMap[string, any]
reader := bytes.NewReader(b)
+54 -44
View File
@@ -13,7 +13,6 @@ import (
"github.com/amir20/dozzle/internal/container"
"github.com/amir20/dozzle/internal/utils"
"github.com/docker/docker/api/types/system"
"github.com/go-faker/faker/v4"
"github.com/go-faker/faker/v4/pkg/options"
"github.com/stretchr/testify/assert"
@@ -27,58 +26,67 @@ const bufSize = 1024 * 1024
var lis *bufconn.Listener
var certs tls.Certificate
var client *MockedClient
var mockService *MockedClientService
type MockedClient struct {
type MockedClientService struct {
mock.Mock
container.Client
}
func (m *MockedClient) FindContainer(ctx context.Context, id string) (container.Container, error) {
args := m.Called(ctx, id)
func (m *MockedClientService) FindContainer(ctx context.Context, id string, labels container.ContainerLabels) (container.Container, error) {
args := m.Called(ctx, id, labels)
return args.Get(0).(container.Container), args.Error(1)
}
func (m *MockedClient) ContainerActions(ctx context.Context, action container.ContainerAction, containerID string) error {
args := m.Called(ctx, action, containerID)
return args.Error(0)
}
func (m *MockedClient) ContainerEvents(ctx context.Context, events chan<- container.ContainerEvent) error {
args := m.Called(ctx, events)
return args.Error(0)
}
func (m *MockedClient) ListContainers(ctx context.Context, filter container.ContainerLabels) ([]container.Container, error) {
func (m *MockedClientService) ListContainers(ctx context.Context, filter container.ContainerLabels) ([]container.Container, error) {
args := m.Called(ctx, filter)
return args.Get(0).([]container.Container), args.Error(1)
}
func (m *MockedClient) ContainerLogs(ctx context.Context, id string, since time.Time, stdType container.StdType) (io.ReadCloser, error) {
args := m.Called(ctx, id, since, stdType)
func (m *MockedClientService) Host(ctx context.Context) (container.Host, error) {
args := m.Called(ctx)
return args.Get(0).(container.Host), args.Error(1)
}
func (m *MockedClientService) ContainerAction(ctx context.Context, c container.Container, action container.ContainerAction) error {
args := m.Called(ctx, c, action)
return args.Error(0)
}
func (m *MockedClientService) LogsBetweenDates(ctx context.Context, c container.Container, from time.Time, to time.Time, stdTypes container.StdType) (<-chan *container.LogEvent, error) {
args := m.Called(ctx, c, from, to, stdTypes)
return args.Get(0).(<-chan *container.LogEvent), args.Error(1)
}
func (m *MockedClientService) RawLogs(ctx context.Context, c container.Container, from time.Time, to time.Time, stdTypes container.StdType) (io.ReadCloser, error) {
args := m.Called(ctx, c, from, to, stdTypes)
return args.Get(0).(io.ReadCloser), args.Error(1)
}
func (m *MockedClient) ContainerStats(context.Context, string, chan<- container.ContainerStat) error {
return nil
func (m *MockedClientService) SubscribeStats(ctx context.Context, stats chan<- container.ContainerStat) {
m.Called(ctx, stats)
}
func (m *MockedClient) ContainerLogsBetweenDates(ctx context.Context, id string, from time.Time, to time.Time, stdType container.StdType) (io.ReadCloser, error) {
args := m.Called(ctx, id, from, to, stdType)
return args.Get(0).(io.ReadCloser), args.Error(1)
func (m *MockedClientService) SubscribeEvents(ctx context.Context, events chan<- container.ContainerEvent) {
m.Called(ctx, events)
}
func (m *MockedClient) Host() container.Host {
args := m.Called()
return args.Get(0).(container.Host)
func (m *MockedClientService) SubscribeContainersStarted(ctx context.Context, containers chan<- container.Container) {
m.Called(ctx, containers)
}
func (m *MockedClient) IsSwarmMode() bool {
return false
func (m *MockedClientService) StreamLogs(ctx context.Context, c container.Container, from time.Time, stdTypes container.StdType, events chan<- *container.LogEvent) error {
args := m.Called(ctx, c, from, stdTypes, events)
return args.Error(0)
}
func (m *MockedClient) SystemInfo() system.Info {
return system.Info{ID: "123"}
func (m *MockedClientService) Attach(ctx context.Context, c container.Container, events container.ExecEventReader, stdout io.Writer) error {
args := m.Called(ctx, c, events, stdout)
return args.Error(0)
}
func (m *MockedClientService) Exec(ctx context.Context, c container.Container, cmd []string, events container.ExecEventReader, stdout io.Writer) error {
args := m.Called(ctx, c, cmd, events, stdout)
return args.Error(0)
}
var wantedContainer = container.Container{}
@@ -103,28 +111,30 @@ func init() {
panic(err)
}
client = &MockedClient{}
client.On("ListContainers", mock.Anything, mock.Anything).Return([]container.Container{
{
ID: "123456",
Name: "test",
Host: "localhost",
State: "running",
},
mockService = &MockedClientService{}
mockService.On("ListContainers", mock.Anything, mock.Anything).Return([]container.Container{
wantedContainer,
}, nil)
client.On("Host").Return(container.Host{
mockService.On("Host", mock.Anything).Return(container.Host{
ID: "localhost",
Endpoint: "local",
Name: "local",
})
}, nil)
client.On("ContainerEvents", mock.Anything, mock.AnythingOfType("chan<- container.ContainerEvent")).Return(nil).Run(func(args mock.Arguments) {
mockService.On("SubscribeEvents", mock.Anything, mock.AnythingOfType("chan<- container.ContainerEvent")).Return().Run(func(args mock.Arguments) {
time.Sleep(5 * time.Second)
})
client.On("FindContainer", mock.Anything, "123456").Return(wantedContainer, nil)
server, _ := NewServer(client, certs, "test", container.ContainerLabels{})
mockService.On("SubscribeStats", mock.Anything, mock.AnythingOfType("chan<- container.ContainerStat")).Return()
mockService.On("SubscribeContainersStarted", mock.Anything, mock.AnythingOfType("chan<- container.Container")).Return()
mockService.On("FindContainer", mock.Anything, "123456", mock.Anything).Return(wantedContainer, nil)
mockService.On("Client").Return(nil)
server, _ := NewServer(mockService, certs, "test", nil)
go server.Serve(lis)
}
+186 -85
View File
@@ -1270,11 +1270,99 @@ func (x *ContainerAttachResponse) GetStdout() []byte {
return nil
}
type UpdateNotificationConfigRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
Subscriptions []*NotificationSubscription `protobuf:"bytes,1,rep,name=subscriptions,proto3" json:"subscriptions,omitempty"`
Dispatchers []*NotificationDispatcher `protobuf:"bytes,2,rep,name=dispatchers,proto3" json:"dispatchers,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *UpdateNotificationConfigRequest) Reset() {
*x = UpdateNotificationConfigRequest{}
mi := &file_rpc_proto_msgTypes[25]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *UpdateNotificationConfigRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*UpdateNotificationConfigRequest) ProtoMessage() {}
func (x *UpdateNotificationConfigRequest) ProtoReflect() protoreflect.Message {
mi := &file_rpc_proto_msgTypes[25]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use UpdateNotificationConfigRequest.ProtoReflect.Descriptor instead.
func (*UpdateNotificationConfigRequest) Descriptor() ([]byte, []int) {
return file_rpc_proto_rawDescGZIP(), []int{25}
}
func (x *UpdateNotificationConfigRequest) GetSubscriptions() []*NotificationSubscription {
if x != nil {
return x.Subscriptions
}
return nil
}
func (x *UpdateNotificationConfigRequest) GetDispatchers() []*NotificationDispatcher {
if x != nil {
return x.Dispatchers
}
return nil
}
type UpdateNotificationConfigResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *UpdateNotificationConfigResponse) Reset() {
*x = UpdateNotificationConfigResponse{}
mi := &file_rpc_proto_msgTypes[26]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *UpdateNotificationConfigResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*UpdateNotificationConfigResponse) ProtoMessage() {}
func (x *UpdateNotificationConfigResponse) ProtoReflect() protoreflect.Message {
mi := &file_rpc_proto_msgTypes[26]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use UpdateNotificationConfigResponse.ProtoReflect.Descriptor instead.
func (*UpdateNotificationConfigResponse) Descriptor() ([]byte, []int) {
return file_rpc_proto_rawDescGZIP(), []int{26}
}
var File_rpc_proto protoreflect.FileDescriptor
const file_rpc_proto_rawDesc = "" +
"\n" +
"\trpc.proto\x12\bprotobuf\x1a\vtypes.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"\xb1\x01\n" +
"\trpc.proto\x12\bprotobuf\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\vtypes.proto\"\xb1\x01\n" +
"\x15ListContainersRequest\x12C\n" +
"\x06filter\x18\x01 \x03(\v2+.protobuf.ListContainersRequest.FilterEntryR\x06filter\x1aS\n" +
"\vFilterEntry\x12\x10\n" +
@@ -1345,7 +1433,11 @@ const file_rpc_proto_rawDesc = "" +
"\x06resize\x18\x03 \x01(\v2\x17.protobuf.ResizePayloadH\x00R\x06resizeB\t\n" +
"\apayload\"1\n" +
"\x17ContainerAttachResponse\x12\x16\n" +
"\x06stdout\x18\x01 \x01(\fR\x06stdout2\xa1\b\n" +
"\x06stdout\x18\x01 \x01(\fR\x06stdout\"\xaf\x01\n" +
"\x1fUpdateNotificationConfigRequest\x12H\n" +
"\rsubscriptions\x18\x01 \x03(\v2\".protobuf.NotificationSubscriptionR\rsubscriptions\x12B\n" +
"\vdispatchers\x18\x02 \x03(\v2 .protobuf.NotificationDispatcherR\vdispatchers\"\"\n" +
" UpdateNotificationConfigResponse2\x96\t\n" +
"\fAgentService\x12U\n" +
"\x0eListContainers\x12\x1f.protobuf.ListContainersRequest\x1a .protobuf.ListContainersResponse\"\x00\x12R\n" +
"\rFindContainer\x12\x1e.protobuf.FindContainerRequest\x1a\x1f.protobuf.FindContainerResponse\"\x00\x12K\n" +
@@ -1359,7 +1451,8 @@ const file_rpc_proto_rawDesc = "" +
"\bHostInfo\x12\x19.protobuf.HostInfoRequest\x1a\x1a.protobuf.HostInfoResponse\"\x00\x12X\n" +
"\x0fContainerAction\x12 .protobuf.ContainerActionRequest\x1a!.protobuf.ContainerActionResponse\"\x00\x12V\n" +
"\rContainerExec\x12\x1e.protobuf.ContainerExecRequest\x1a\x1f.protobuf.ContainerExecResponse\"\x00(\x010\x01\x12\\\n" +
"\x0fContainerAttach\x12 .protobuf.ContainerAttachRequest\x1a!.protobuf.ContainerAttachResponse\"\x00(\x010\x01B\x13Z\x11internal/agent/pbb\x06proto3"
"\x0fContainerAttach\x12 .protobuf.ContainerAttachRequest\x1a!.protobuf.ContainerAttachResponse\"\x00(\x010\x01\x12s\n" +
"\x18UpdateNotificationConfig\x12).protobuf.UpdateNotificationConfigRequest\x1a*.protobuf.UpdateNotificationConfigResponse\"\x00B\x13Z\x11internal/agent/pbb\x06proto3"
var (
file_rpc_proto_rawDescOnce sync.Once
@@ -1373,92 +1466,100 @@ func file_rpc_proto_rawDescGZIP() []byte {
return file_rpc_proto_rawDescData
}
var file_rpc_proto_msgTypes = make([]protoimpl.MessageInfo, 27)
var file_rpc_proto_msgTypes = make([]protoimpl.MessageInfo, 29)
var file_rpc_proto_goTypes = []any{
(*ListContainersRequest)(nil), // 0: protobuf.ListContainersRequest
(*RepeatedString)(nil), // 1: protobuf.RepeatedString
(*ListContainersResponse)(nil), // 2: protobuf.ListContainersResponse
(*FindContainerRequest)(nil), // 3: protobuf.FindContainerRequest
(*FindContainerResponse)(nil), // 4: protobuf.FindContainerResponse
(*StreamLogsRequest)(nil), // 5: protobuf.StreamLogsRequest
(*StreamLogsResponse)(nil), // 6: protobuf.StreamLogsResponse
(*LogsBetweenDatesRequest)(nil), // 7: protobuf.LogsBetweenDatesRequest
(*StreamRawBytesRequest)(nil), // 8: protobuf.StreamRawBytesRequest
(*StreamRawBytesResponse)(nil), // 9: protobuf.StreamRawBytesResponse
(*StreamEventsRequest)(nil), // 10: protobuf.StreamEventsRequest
(*StreamEventsResponse)(nil), // 11: protobuf.StreamEventsResponse
(*StreamStatsRequest)(nil), // 12: protobuf.StreamStatsRequest
(*StreamStatsResponse)(nil), // 13: protobuf.StreamStatsResponse
(*HostInfoRequest)(nil), // 14: protobuf.HostInfoRequest
(*HostInfoResponse)(nil), // 15: protobuf.HostInfoResponse
(*StreamContainerStartedRequest)(nil), // 16: protobuf.StreamContainerStartedRequest
(*StreamContainerStartedResponse)(nil), // 17: protobuf.StreamContainerStartedResponse
(*ContainerActionRequest)(nil), // 18: protobuf.ContainerActionRequest
(*ContainerActionResponse)(nil), // 19: protobuf.ContainerActionResponse
(*ContainerExecRequest)(nil), // 20: protobuf.ContainerExecRequest
(*ResizePayload)(nil), // 21: protobuf.ResizePayload
(*ContainerExecResponse)(nil), // 22: protobuf.ContainerExecResponse
(*ContainerAttachRequest)(nil), // 23: protobuf.ContainerAttachRequest
(*ContainerAttachResponse)(nil), // 24: protobuf.ContainerAttachResponse
nil, // 25: protobuf.ListContainersRequest.FilterEntry
nil, // 26: protobuf.FindContainerRequest.FilterEntry
(*Container)(nil), // 27: protobuf.Container
(*timestamppb.Timestamp)(nil), // 28: google.protobuf.Timestamp
(*LogEvent)(nil), // 29: protobuf.LogEvent
(*ContainerEvent)(nil), // 30: protobuf.ContainerEvent
(*ContainerStat)(nil), // 31: protobuf.ContainerStat
(*Host)(nil), // 32: protobuf.Host
(ContainerAction)(0), // 33: protobuf.ContainerAction
(*ListContainersRequest)(nil), // 0: protobuf.ListContainersRequest
(*RepeatedString)(nil), // 1: protobuf.RepeatedString
(*ListContainersResponse)(nil), // 2: protobuf.ListContainersResponse
(*FindContainerRequest)(nil), // 3: protobuf.FindContainerRequest
(*FindContainerResponse)(nil), // 4: protobuf.FindContainerResponse
(*StreamLogsRequest)(nil), // 5: protobuf.StreamLogsRequest
(*StreamLogsResponse)(nil), // 6: protobuf.StreamLogsResponse
(*LogsBetweenDatesRequest)(nil), // 7: protobuf.LogsBetweenDatesRequest
(*StreamRawBytesRequest)(nil), // 8: protobuf.StreamRawBytesRequest
(*StreamRawBytesResponse)(nil), // 9: protobuf.StreamRawBytesResponse
(*StreamEventsRequest)(nil), // 10: protobuf.StreamEventsRequest
(*StreamEventsResponse)(nil), // 11: protobuf.StreamEventsResponse
(*StreamStatsRequest)(nil), // 12: protobuf.StreamStatsRequest
(*StreamStatsResponse)(nil), // 13: protobuf.StreamStatsResponse
(*HostInfoRequest)(nil), // 14: protobuf.HostInfoRequest
(*HostInfoResponse)(nil), // 15: protobuf.HostInfoResponse
(*StreamContainerStartedRequest)(nil), // 16: protobuf.StreamContainerStartedRequest
(*StreamContainerStartedResponse)(nil), // 17: protobuf.StreamContainerStartedResponse
(*ContainerActionRequest)(nil), // 18: protobuf.ContainerActionRequest
(*ContainerActionResponse)(nil), // 19: protobuf.ContainerActionResponse
(*ContainerExecRequest)(nil), // 20: protobuf.ContainerExecRequest
(*ResizePayload)(nil), // 21: protobuf.ResizePayload
(*ContainerExecResponse)(nil), // 22: protobuf.ContainerExecResponse
(*ContainerAttachRequest)(nil), // 23: protobuf.ContainerAttachRequest
(*ContainerAttachResponse)(nil), // 24: protobuf.ContainerAttachResponse
(*UpdateNotificationConfigRequest)(nil), // 25: protobuf.UpdateNotificationConfigRequest
(*UpdateNotificationConfigResponse)(nil), // 26: protobuf.UpdateNotificationConfigResponse
nil, // 27: protobuf.ListContainersRequest.FilterEntry
nil, // 28: protobuf.FindContainerRequest.FilterEntry
(*Container)(nil), // 29: protobuf.Container
(*timestamppb.Timestamp)(nil), // 30: google.protobuf.Timestamp
(*LogEvent)(nil), // 31: protobuf.LogEvent
(*ContainerEvent)(nil), // 32: protobuf.ContainerEvent
(*ContainerStat)(nil), // 33: protobuf.ContainerStat
(*Host)(nil), // 34: protobuf.Host
(ContainerAction)(0), // 35: protobuf.ContainerAction
(*NotificationSubscription)(nil), // 36: protobuf.NotificationSubscription
(*NotificationDispatcher)(nil), // 37: protobuf.NotificationDispatcher
}
var file_rpc_proto_depIdxs = []int32{
25, // 0: protobuf.ListContainersRequest.filter:type_name -> protobuf.ListContainersRequest.FilterEntry
27, // 1: protobuf.ListContainersResponse.containers:type_name -> protobuf.Container
26, // 2: protobuf.FindContainerRequest.filter:type_name -> protobuf.FindContainerRequest.FilterEntry
27, // 3: protobuf.FindContainerResponse.container:type_name -> protobuf.Container
28, // 4: protobuf.StreamLogsRequest.since:type_name -> google.protobuf.Timestamp
29, // 5: protobuf.StreamLogsResponse.event:type_name -> protobuf.LogEvent
28, // 6: protobuf.LogsBetweenDatesRequest.since:type_name -> google.protobuf.Timestamp
28, // 7: protobuf.LogsBetweenDatesRequest.until:type_name -> google.protobuf.Timestamp
28, // 8: protobuf.StreamRawBytesRequest.since:type_name -> google.protobuf.Timestamp
28, // 9: protobuf.StreamRawBytesRequest.until:type_name -> google.protobuf.Timestamp
30, // 10: protobuf.StreamEventsResponse.event:type_name -> protobuf.ContainerEvent
31, // 11: protobuf.StreamStatsResponse.stat:type_name -> protobuf.ContainerStat
32, // 12: protobuf.HostInfoResponse.host:type_name -> protobuf.Host
27, // 13: protobuf.StreamContainerStartedResponse.container:type_name -> protobuf.Container
33, // 14: protobuf.ContainerActionRequest.action:type_name -> protobuf.ContainerAction
27, // 0: protobuf.ListContainersRequest.filter:type_name -> protobuf.ListContainersRequest.FilterEntry
29, // 1: protobuf.ListContainersResponse.containers:type_name -> protobuf.Container
28, // 2: protobuf.FindContainerRequest.filter:type_name -> protobuf.FindContainerRequest.FilterEntry
29, // 3: protobuf.FindContainerResponse.container:type_name -> protobuf.Container
30, // 4: protobuf.StreamLogsRequest.since:type_name -> google.protobuf.Timestamp
31, // 5: protobuf.StreamLogsResponse.event:type_name -> protobuf.LogEvent
30, // 6: protobuf.LogsBetweenDatesRequest.since:type_name -> google.protobuf.Timestamp
30, // 7: protobuf.LogsBetweenDatesRequest.until:type_name -> google.protobuf.Timestamp
30, // 8: protobuf.StreamRawBytesRequest.since:type_name -> google.protobuf.Timestamp
30, // 9: protobuf.StreamRawBytesRequest.until:type_name -> google.protobuf.Timestamp
32, // 10: protobuf.StreamEventsResponse.event:type_name -> protobuf.ContainerEvent
33, // 11: protobuf.StreamStatsResponse.stat:type_name -> protobuf.ContainerStat
34, // 12: protobuf.HostInfoResponse.host:type_name -> protobuf.Host
29, // 13: protobuf.StreamContainerStartedResponse.container:type_name -> protobuf.Container
35, // 14: protobuf.ContainerActionRequest.action:type_name -> protobuf.ContainerAction
21, // 15: protobuf.ContainerExecRequest.resize:type_name -> protobuf.ResizePayload
21, // 16: protobuf.ContainerAttachRequest.resize:type_name -> protobuf.ResizePayload
1, // 17: protobuf.ListContainersRequest.FilterEntry.value:type_name -> protobuf.RepeatedString
1, // 18: protobuf.FindContainerRequest.FilterEntry.value:type_name -> protobuf.RepeatedString
0, // 19: protobuf.AgentService.ListContainers:input_type -> protobuf.ListContainersRequest
3, // 20: protobuf.AgentService.FindContainer:input_type -> protobuf.FindContainerRequest
5, // 21: protobuf.AgentService.StreamLogs:input_type -> protobuf.StreamLogsRequest
7, // 22: protobuf.AgentService.LogsBetweenDates:input_type -> protobuf.LogsBetweenDatesRequest
8, // 23: protobuf.AgentService.StreamRawBytes:input_type -> protobuf.StreamRawBytesRequest
10, // 24: protobuf.AgentService.StreamEvents:input_type -> protobuf.StreamEventsRequest
12, // 25: protobuf.AgentService.StreamStats:input_type -> protobuf.StreamStatsRequest
16, // 26: protobuf.AgentService.StreamContainerStarted:input_type -> protobuf.StreamContainerStartedRequest
14, // 27: protobuf.AgentService.HostInfo:input_type -> protobuf.HostInfoRequest
18, // 28: protobuf.AgentService.ContainerAction:input_type -> protobuf.ContainerActionRequest
20, // 29: protobuf.AgentService.ContainerExec:input_type -> protobuf.ContainerExecRequest
23, // 30: protobuf.AgentService.ContainerAttach:input_type -> protobuf.ContainerAttachRequest
2, // 31: protobuf.AgentService.ListContainers:output_type -> protobuf.ListContainersResponse
4, // 32: protobuf.AgentService.FindContainer:output_type -> protobuf.FindContainerResponse
6, // 33: protobuf.AgentService.StreamLogs:output_type -> protobuf.StreamLogsResponse
6, // 34: protobuf.AgentService.LogsBetweenDates:output_type -> protobuf.StreamLogsResponse
9, // 35: protobuf.AgentService.StreamRawBytes:output_type -> protobuf.StreamRawBytesResponse
11, // 36: protobuf.AgentService.StreamEvents:output_type -> protobuf.StreamEventsResponse
13, // 37: protobuf.AgentService.StreamStats:output_type -> protobuf.StreamStatsResponse
17, // 38: protobuf.AgentService.StreamContainerStarted:output_type -> protobuf.StreamContainerStartedResponse
15, // 39: protobuf.AgentService.HostInfo:output_type -> protobuf.HostInfoResponse
19, // 40: protobuf.AgentService.ContainerAction:output_type -> protobuf.ContainerActionResponse
22, // 41: protobuf.AgentService.ContainerExec:output_type -> protobuf.ContainerExecResponse
24, // 42: protobuf.AgentService.ContainerAttach:output_type -> protobuf.ContainerAttachResponse
31, // [31:43] is the sub-list for method output_type
19, // [19:31] is the sub-list for method input_type
19, // [19:19] is the sub-list for extension type_name
19, // [19:19] is the sub-list for extension extendee
0, // [0:19] is the sub-list for field type_name
36, // 17: protobuf.UpdateNotificationConfigRequest.subscriptions:type_name -> protobuf.NotificationSubscription
37, // 18: protobuf.UpdateNotificationConfigRequest.dispatchers:type_name -> protobuf.NotificationDispatcher
1, // 19: protobuf.ListContainersRequest.FilterEntry.value:type_name -> protobuf.RepeatedString
1, // 20: protobuf.FindContainerRequest.FilterEntry.value:type_name -> protobuf.RepeatedString
0, // 21: protobuf.AgentService.ListContainers:input_type -> protobuf.ListContainersRequest
3, // 22: protobuf.AgentService.FindContainer:input_type -> protobuf.FindContainerRequest
5, // 23: protobuf.AgentService.StreamLogs:input_type -> protobuf.StreamLogsRequest
7, // 24: protobuf.AgentService.LogsBetweenDates:input_type -> protobuf.LogsBetweenDatesRequest
8, // 25: protobuf.AgentService.StreamRawBytes:input_type -> protobuf.StreamRawBytesRequest
10, // 26: protobuf.AgentService.StreamEvents:input_type -> protobuf.StreamEventsRequest
12, // 27: protobuf.AgentService.StreamStats:input_type -> protobuf.StreamStatsRequest
16, // 28: protobuf.AgentService.StreamContainerStarted:input_type -> protobuf.StreamContainerStartedRequest
14, // 29: protobuf.AgentService.HostInfo:input_type -> protobuf.HostInfoRequest
18, // 30: protobuf.AgentService.ContainerAction:input_type -> protobuf.ContainerActionRequest
20, // 31: protobuf.AgentService.ContainerExec:input_type -> protobuf.ContainerExecRequest
23, // 32: protobuf.AgentService.ContainerAttach:input_type -> protobuf.ContainerAttachRequest
25, // 33: protobuf.AgentService.UpdateNotificationConfig:input_type -> protobuf.UpdateNotificationConfigRequest
2, // 34: protobuf.AgentService.ListContainers:output_type -> protobuf.ListContainersResponse
4, // 35: protobuf.AgentService.FindContainer:output_type -> protobuf.FindContainerResponse
6, // 36: protobuf.AgentService.StreamLogs:output_type -> protobuf.StreamLogsResponse
6, // 37: protobuf.AgentService.LogsBetweenDates:output_type -> protobuf.StreamLogsResponse
9, // 38: protobuf.AgentService.StreamRawBytes:output_type -> protobuf.StreamRawBytesResponse
11, // 39: protobuf.AgentService.StreamEvents:output_type -> protobuf.StreamEventsResponse
13, // 40: protobuf.AgentService.StreamStats:output_type -> protobuf.StreamStatsResponse
17, // 41: protobuf.AgentService.StreamContainerStarted:output_type -> protobuf.StreamContainerStartedResponse
15, // 42: protobuf.AgentService.HostInfo:output_type -> protobuf.HostInfoResponse
19, // 43: protobuf.AgentService.ContainerAction:output_type -> protobuf.ContainerActionResponse
22, // 44: protobuf.AgentService.ContainerExec:output_type -> protobuf.ContainerExecResponse
24, // 45: protobuf.AgentService.ContainerAttach:output_type -> protobuf.ContainerAttachResponse
26, // 46: protobuf.AgentService.UpdateNotificationConfig:output_type -> protobuf.UpdateNotificationConfigResponse
34, // [34:47] is the sub-list for method output_type
21, // [21:34] is the sub-list for method input_type
21, // [21:21] is the sub-list for extension type_name
21, // [21:21] is the sub-list for extension extendee
0, // [0:21] is the sub-list for field type_name
}
func init() { file_rpc_proto_init() }
@@ -1481,7 +1582,7 @@ func file_rpc_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_rpc_proto_rawDesc), len(file_rpc_proto_rawDesc)),
NumEnums: 0,
NumMessages: 27,
NumMessages: 29,
NumExtensions: 0,
NumServices: 1,
},
+50 -12
View File
@@ -19,18 +19,19 @@ import (
const _ = grpc.SupportPackageIsVersion9
const (
AgentService_ListContainers_FullMethodName = "/protobuf.AgentService/ListContainers"
AgentService_FindContainer_FullMethodName = "/protobuf.AgentService/FindContainer"
AgentService_StreamLogs_FullMethodName = "/protobuf.AgentService/StreamLogs"
AgentService_LogsBetweenDates_FullMethodName = "/protobuf.AgentService/LogsBetweenDates"
AgentService_StreamRawBytes_FullMethodName = "/protobuf.AgentService/StreamRawBytes"
AgentService_StreamEvents_FullMethodName = "/protobuf.AgentService/StreamEvents"
AgentService_StreamStats_FullMethodName = "/protobuf.AgentService/StreamStats"
AgentService_StreamContainerStarted_FullMethodName = "/protobuf.AgentService/StreamContainerStarted"
AgentService_HostInfo_FullMethodName = "/protobuf.AgentService/HostInfo"
AgentService_ContainerAction_FullMethodName = "/protobuf.AgentService/ContainerAction"
AgentService_ContainerExec_FullMethodName = "/protobuf.AgentService/ContainerExec"
AgentService_ContainerAttach_FullMethodName = "/protobuf.AgentService/ContainerAttach"
AgentService_ListContainers_FullMethodName = "/protobuf.AgentService/ListContainers"
AgentService_FindContainer_FullMethodName = "/protobuf.AgentService/FindContainer"
AgentService_StreamLogs_FullMethodName = "/protobuf.AgentService/StreamLogs"
AgentService_LogsBetweenDates_FullMethodName = "/protobuf.AgentService/LogsBetweenDates"
AgentService_StreamRawBytes_FullMethodName = "/protobuf.AgentService/StreamRawBytes"
AgentService_StreamEvents_FullMethodName = "/protobuf.AgentService/StreamEvents"
AgentService_StreamStats_FullMethodName = "/protobuf.AgentService/StreamStats"
AgentService_StreamContainerStarted_FullMethodName = "/protobuf.AgentService/StreamContainerStarted"
AgentService_HostInfo_FullMethodName = "/protobuf.AgentService/HostInfo"
AgentService_ContainerAction_FullMethodName = "/protobuf.AgentService/ContainerAction"
AgentService_ContainerExec_FullMethodName = "/protobuf.AgentService/ContainerExec"
AgentService_ContainerAttach_FullMethodName = "/protobuf.AgentService/ContainerAttach"
AgentService_UpdateNotificationConfig_FullMethodName = "/protobuf.AgentService/UpdateNotificationConfig"
)
// AgentServiceClient is the client API for AgentService service.
@@ -49,6 +50,7 @@ type AgentServiceClient interface {
ContainerAction(ctx context.Context, in *ContainerActionRequest, opts ...grpc.CallOption) (*ContainerActionResponse, error)
ContainerExec(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[ContainerExecRequest, ContainerExecResponse], error)
ContainerAttach(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[ContainerAttachRequest, ContainerAttachResponse], error)
UpdateNotificationConfig(ctx context.Context, in *UpdateNotificationConfigRequest, opts ...grpc.CallOption) (*UpdateNotificationConfigResponse, error)
}
type agentServiceClient struct {
@@ -239,6 +241,16 @@ func (c *agentServiceClient) ContainerAttach(ctx context.Context, opts ...grpc.C
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type AgentService_ContainerAttachClient = grpc.BidiStreamingClient[ContainerAttachRequest, ContainerAttachResponse]
func (c *agentServiceClient) UpdateNotificationConfig(ctx context.Context, in *UpdateNotificationConfigRequest, opts ...grpc.CallOption) (*UpdateNotificationConfigResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(UpdateNotificationConfigResponse)
err := c.cc.Invoke(ctx, AgentService_UpdateNotificationConfig_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// AgentServiceServer is the server API for AgentService service.
// All implementations must embed UnimplementedAgentServiceServer
// for forward compatibility.
@@ -255,6 +267,7 @@ type AgentServiceServer interface {
ContainerAction(context.Context, *ContainerActionRequest) (*ContainerActionResponse, error)
ContainerExec(grpc.BidiStreamingServer[ContainerExecRequest, ContainerExecResponse]) error
ContainerAttach(grpc.BidiStreamingServer[ContainerAttachRequest, ContainerAttachResponse]) error
UpdateNotificationConfig(context.Context, *UpdateNotificationConfigRequest) (*UpdateNotificationConfigResponse, error)
mustEmbedUnimplementedAgentServiceServer()
}
@@ -301,6 +314,9 @@ func (UnimplementedAgentServiceServer) ContainerExec(grpc.BidiStreamingServer[Co
func (UnimplementedAgentServiceServer) ContainerAttach(grpc.BidiStreamingServer[ContainerAttachRequest, ContainerAttachResponse]) error {
return status.Error(codes.Unimplemented, "method ContainerAttach not implemented")
}
func (UnimplementedAgentServiceServer) UpdateNotificationConfig(context.Context, *UpdateNotificationConfigRequest) (*UpdateNotificationConfigResponse, error) {
return nil, status.Error(codes.Unimplemented, "method UpdateNotificationConfig not implemented")
}
func (UnimplementedAgentServiceServer) mustEmbedUnimplementedAgentServiceServer() {}
func (UnimplementedAgentServiceServer) testEmbeddedByValue() {}
@@ -474,6 +490,24 @@ func _AgentService_ContainerAttach_Handler(srv interface{}, stream grpc.ServerSt
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type AgentService_ContainerAttachServer = grpc.BidiStreamingServer[ContainerAttachRequest, ContainerAttachResponse]
func _AgentService_UpdateNotificationConfig_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(UpdateNotificationConfigRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(AgentServiceServer).UpdateNotificationConfig(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: AgentService_UpdateNotificationConfig_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(AgentServiceServer).UpdateNotificationConfig(ctx, req.(*UpdateNotificationConfigRequest))
}
return interceptor(ctx, in, info, handler)
}
// AgentService_ServiceDesc is the grpc.ServiceDesc for AgentService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@@ -497,6 +531,10 @@ var AgentService_ServiceDesc = grpc.ServiceDesc{
MethodName: "ContainerAction",
Handler: _AgentService_ContainerAction_Handler,
},
{
MethodName: "UpdateNotificationConfig",
Handler: _AgentService_UpdateNotificationConfig_Handler,
},
},
Streams: []grpc.StreamDesc{
{
+200 -25
View File
@@ -820,6 +820,166 @@ func (x *Host) GetDockerVersion() string {
return ""
}
type NotificationSubscription struct {
state protoimpl.MessageState `protogen:"open.v1"`
Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
Enabled bool `protobuf:"varint,3,opt,name=enabled,proto3" json:"enabled,omitempty"`
DispatcherId int32 `protobuf:"varint,4,opt,name=dispatcherId,proto3" json:"dispatcherId,omitempty"`
LogExpression string `protobuf:"bytes,5,opt,name=logExpression,proto3" json:"logExpression,omitempty"`
ContainerExpression string `protobuf:"bytes,6,opt,name=containerExpression,proto3" json:"containerExpression,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *NotificationSubscription) Reset() {
*x = NotificationSubscription{}
mi := &file_types_proto_msgTypes[9]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *NotificationSubscription) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*NotificationSubscription) ProtoMessage() {}
func (x *NotificationSubscription) ProtoReflect() protoreflect.Message {
mi := &file_types_proto_msgTypes[9]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use NotificationSubscription.ProtoReflect.Descriptor instead.
func (*NotificationSubscription) Descriptor() ([]byte, []int) {
return file_types_proto_rawDescGZIP(), []int{9}
}
func (x *NotificationSubscription) GetId() int32 {
if x != nil {
return x.Id
}
return 0
}
func (x *NotificationSubscription) GetName() string {
if x != nil {
return x.Name
}
return ""
}
func (x *NotificationSubscription) GetEnabled() bool {
if x != nil {
return x.Enabled
}
return false
}
func (x *NotificationSubscription) GetDispatcherId() int32 {
if x != nil {
return x.DispatcherId
}
return 0
}
func (x *NotificationSubscription) GetLogExpression() string {
if x != nil {
return x.LogExpression
}
return ""
}
func (x *NotificationSubscription) GetContainerExpression() string {
if x != nil {
return x.ContainerExpression
}
return ""
}
type NotificationDispatcher struct {
state protoimpl.MessageState `protogen:"open.v1"`
Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
Type string `protobuf:"bytes,3,opt,name=type,proto3" json:"type,omitempty"`
Url string `protobuf:"bytes,4,opt,name=url,proto3" json:"url,omitempty"`
Template string `protobuf:"bytes,5,opt,name=template,proto3" json:"template,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *NotificationDispatcher) Reset() {
*x = NotificationDispatcher{}
mi := &file_types_proto_msgTypes[10]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *NotificationDispatcher) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*NotificationDispatcher) ProtoMessage() {}
func (x *NotificationDispatcher) ProtoReflect() protoreflect.Message {
mi := &file_types_proto_msgTypes[10]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use NotificationDispatcher.ProtoReflect.Descriptor instead.
func (*NotificationDispatcher) Descriptor() ([]byte, []int) {
return file_types_proto_rawDescGZIP(), []int{10}
}
func (x *NotificationDispatcher) GetId() int32 {
if x != nil {
return x.Id
}
return 0
}
func (x *NotificationDispatcher) GetName() string {
if x != nil {
return x.Name
}
return ""
}
func (x *NotificationDispatcher) GetType() string {
if x != nil {
return x.Type
}
return ""
}
func (x *NotificationDispatcher) GetUrl() string {
if x != nil {
return x.Url
}
return ""
}
func (x *NotificationDispatcher) GetTemplate() string {
if x != nil {
return x.Template
}
return ""
}
var File_types_proto protoreflect.FileDescriptor
const file_types_proto_rawDesc = "" +
@@ -898,7 +1058,20 @@ const file_types_proto_rawDesc = "" +
"\rdockerVersion\x18\f \x01(\tR\rdockerVersion\x1a9\n" +
"\vLabelsEntry\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
"\x05value\x18\x02 \x01(\tR\x05value:\x028\x01*3\n" +
"\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\xd4\x01\n" +
"\x18NotificationSubscription\x12\x0e\n" +
"\x02id\x18\x01 \x01(\x05R\x02id\x12\x12\n" +
"\x04name\x18\x02 \x01(\tR\x04name\x12\x18\n" +
"\aenabled\x18\x03 \x01(\bR\aenabled\x12\"\n" +
"\fdispatcherId\x18\x04 \x01(\x05R\fdispatcherId\x12$\n" +
"\rlogExpression\x18\x05 \x01(\tR\rlogExpression\x120\n" +
"\x13containerExpression\x18\x06 \x01(\tR\x13containerExpression\"~\n" +
"\x16NotificationDispatcher\x12\x0e\n" +
"\x02id\x18\x01 \x01(\x05R\x02id\x12\x12\n" +
"\x04name\x18\x02 \x01(\tR\x04name\x12\x12\n" +
"\x04type\x18\x03 \x01(\tR\x04type\x12\x10\n" +
"\x03url\x18\x04 \x01(\tR\x03url\x12\x1a\n" +
"\btemplate\x18\x05 \x01(\tR\btemplate*3\n" +
"\x0fContainerAction\x12\t\n" +
"\x05Start\x10\x00\x12\b\n" +
"\x04Stop\x10\x01\x12\v\n" +
@@ -917,34 +1090,36 @@ func file_types_proto_rawDescGZIP() []byte {
}
var file_types_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_types_proto_msgTypes = make([]protoimpl.MessageInfo, 11)
var file_types_proto_msgTypes = make([]protoimpl.MessageInfo, 13)
var file_types_proto_goTypes = []any{
(ContainerAction)(0), // 0: protobuf.ContainerAction
(*Container)(nil), // 1: protobuf.Container
(*ContainerStat)(nil), // 2: protobuf.ContainerStat
(*LogFragment)(nil), // 3: protobuf.LogFragment
(*LogEvent)(nil), // 4: protobuf.LogEvent
(*SingleMessage)(nil), // 5: protobuf.SingleMessage
(*GroupMessage)(nil), // 6: protobuf.GroupMessage
(*ComplexMessage)(nil), // 7: protobuf.ComplexMessage
(*ContainerEvent)(nil), // 8: protobuf.ContainerEvent
(*Host)(nil), // 9: protobuf.Host
nil, // 10: protobuf.Container.LabelsEntry
nil, // 11: protobuf.Host.LabelsEntry
(*timestamppb.Timestamp)(nil), // 12: google.protobuf.Timestamp
(*anypb.Any)(nil), // 13: google.protobuf.Any
(ContainerAction)(0), // 0: protobuf.ContainerAction
(*Container)(nil), // 1: protobuf.Container
(*ContainerStat)(nil), // 2: protobuf.ContainerStat
(*LogFragment)(nil), // 3: protobuf.LogFragment
(*LogEvent)(nil), // 4: protobuf.LogEvent
(*SingleMessage)(nil), // 5: protobuf.SingleMessage
(*GroupMessage)(nil), // 6: protobuf.GroupMessage
(*ComplexMessage)(nil), // 7: protobuf.ComplexMessage
(*ContainerEvent)(nil), // 8: protobuf.ContainerEvent
(*Host)(nil), // 9: protobuf.Host
(*NotificationSubscription)(nil), // 10: protobuf.NotificationSubscription
(*NotificationDispatcher)(nil), // 11: protobuf.NotificationDispatcher
nil, // 12: protobuf.Container.LabelsEntry
nil, // 13: protobuf.Host.LabelsEntry
(*timestamppb.Timestamp)(nil), // 14: google.protobuf.Timestamp
(*anypb.Any)(nil), // 15: google.protobuf.Any
}
var file_types_proto_depIdxs = []int32{
12, // 0: protobuf.Container.created:type_name -> google.protobuf.Timestamp
12, // 1: protobuf.Container.started:type_name -> google.protobuf.Timestamp
10, // 2: protobuf.Container.labels:type_name -> protobuf.Container.LabelsEntry
14, // 0: protobuf.Container.created:type_name -> google.protobuf.Timestamp
14, // 1: protobuf.Container.started:type_name -> google.protobuf.Timestamp
12, // 2: protobuf.Container.labels:type_name -> protobuf.Container.LabelsEntry
2, // 3: protobuf.Container.stats:type_name -> protobuf.ContainerStat
12, // 4: protobuf.Container.finished:type_name -> google.protobuf.Timestamp
13, // 5: protobuf.LogEvent.message:type_name -> google.protobuf.Any
12, // 6: protobuf.LogEvent.timestamp:type_name -> google.protobuf.Timestamp
14, // 4: protobuf.Container.finished:type_name -> google.protobuf.Timestamp
15, // 5: protobuf.LogEvent.message:type_name -> google.protobuf.Any
14, // 6: protobuf.LogEvent.timestamp:type_name -> google.protobuf.Timestamp
3, // 7: protobuf.GroupMessage.fragments:type_name -> protobuf.LogFragment
12, // 8: protobuf.ContainerEvent.timestamp:type_name -> google.protobuf.Timestamp
11, // 9: protobuf.Host.labels:type_name -> protobuf.Host.LabelsEntry
14, // 8: protobuf.ContainerEvent.timestamp:type_name -> google.protobuf.Timestamp
13, // 9: protobuf.Host.labels:type_name -> protobuf.Host.LabelsEntry
10, // [10:10] is the sub-list for method output_type
10, // [10:10] is the sub-list for method input_type
10, // [10:10] is the sub-list for extension type_name
@@ -963,7 +1138,7 @@ func file_types_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_types_proto_rawDesc), len(file_types_proto_rawDesc)),
NumEnums: 1,
NumMessages: 11,
NumMessages: 13,
NumExtensions: 0,
NumServices: 0,
},
+176 -142
View File
@@ -6,7 +6,7 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"sync"
"io"
"encoding/json"
@@ -14,7 +14,7 @@ import (
"github.com/amir20/dozzle/internal/agent/pb"
"github.com/amir20/dozzle/internal/container"
"github.com/amir20/dozzle/internal/docker"
"github.com/amir20/dozzle/types"
"github.com/rs/zerolog/log"
orderedmap "github.com/wk8/go-ordered-map/v2"
"google.golang.org/grpc"
@@ -26,21 +26,40 @@ import (
"google.golang.org/grpc/status"
)
// NotificationConfigHandler handles notification config updates received from the main server
type NotificationConfigHandler interface {
HandleNotificationConfig(subscriptions []types.SubscriptionConfig, dispatchers []types.DispatcherConfig) error
}
// ClientService is the interface for container operations used by the agent server
type ClientService interface {
FindContainer(ctx context.Context, id string, labels container.ContainerLabels) (container.Container, error)
ListContainers(ctx context.Context, filter container.ContainerLabels) ([]container.Container, error)
Host(ctx context.Context) (container.Host, error)
ContainerAction(ctx context.Context, container container.Container, action container.ContainerAction) error
LogsBetweenDates(ctx context.Context, container container.Container, from time.Time, to time.Time, stdTypes container.StdType) (<-chan *container.LogEvent, error)
RawLogs(ctx context.Context, container container.Container, from time.Time, to time.Time, stdTypes container.StdType) (io.ReadCloser, error)
SubscribeStats(context.Context, chan<- container.ContainerStat)
SubscribeEvents(context.Context, chan<- container.ContainerEvent)
SubscribeContainersStarted(context.Context, chan<- container.Container)
StreamLogs(context.Context, container.Container, time.Time, container.StdType, chan<- *container.LogEvent) error
Attach(context.Context, container.Container, container.ExecEventReader, io.Writer) error
Exec(context.Context, container.Container, []string, container.ExecEventReader, io.Writer) error
}
type server struct {
client container.Client
store *container.ContainerStore
version string
service ClientService
version string
notificationConfigHandler NotificationConfigHandler
pb.UnimplementedAgentServiceServer
}
func newServer(client container.Client, dozzleVersion string, labels container.ContainerLabels) pb.AgentServiceServer {
statsCollector := docker.NewDockerStatsCollector(client, labels)
func newServer(service ClientService, dozzleVersion string, notificationHandler NotificationConfigHandler) pb.AgentServiceServer {
return &server{
client: client,
version: dozzleVersion,
store: container.NewContainerStore(context.Background(), client, statsCollector, labels),
service: service,
version: dozzleVersion,
notificationConfigHandler: notificationHandler,
}
}
@@ -50,20 +69,18 @@ func (s *server) StreamLogs(in *pb.StreamLogsRequest, out pb.AgentService_Stream
since = in.Since.AsTime()
}
c, err := s.store.FindContainer(in.ContainerId, container.ContainerLabels{})
c, err := s.service.FindContainer(out.Context(), in.ContainerId, container.ContainerLabels{})
if err != nil {
return err
}
reader, err := s.client.ContainerLogs(out.Context(), in.ContainerId, since, container.StdType(in.StreamTypes))
if err != nil {
return err
}
events := make(chan *container.LogEvent)
go func() {
defer close(events)
s.service.StreamLogs(out.Context(), c, since, container.StdType(in.StreamTypes), events)
}()
dockerReader := docker.NewLogReader(reader, c.Tty)
g := container.NewEventGenerator(out.Context(), dockerReader, c)
for event := range g.Events {
for event := range events {
if event != nil {
out.Send(&pb.StreamLogsResponse{
Event: logEventToPb(event),
@@ -71,31 +88,23 @@ func (s *server) StreamLogs(in *pb.StreamLogsRequest, out pb.AgentService_Stream
}
}
select {
case e := <-g.Errors:
return e
default:
return nil
}
return nil
}
func (s *server) LogsBetweenDates(in *pb.LogsBetweenDatesRequest, out pb.AgentService_LogsBetweenDatesServer) error {
reader, err := s.client.ContainerLogsBetweenDates(out.Context(), in.ContainerId, in.Since.AsTime(), in.Until.AsTime(), container.StdType(in.StreamTypes))
c, err := s.service.FindContainer(out.Context(), in.ContainerId, container.ContainerLabels{})
if err != nil {
return err
}
c, err := s.client.FindContainer(out.Context(), in.ContainerId)
events, err := s.service.LogsBetweenDates(out.Context(), c, in.Since.AsTime(), in.Until.AsTime(), container.StdType(in.StreamTypes))
if err != nil {
return err
}
dockerReader := docker.NewLogReader(reader, c.Tty)
g := container.NewEventGenerator(out.Context(), dockerReader, c)
for {
select {
case event, ok := <-g.Events:
case event, ok := <-events:
if !ok {
// Channel closed, exit cleanly
return nil
@@ -103,8 +112,6 @@ func (s *server) LogsBetweenDates(in *pb.LogsBetweenDatesRequest, out pb.AgentSe
out.Send(&pb.StreamLogsResponse{
Event: logEventToPb(event),
})
case e := <-g.Errors:
return e
case <-out.Context().Done():
return nil
}
@@ -112,16 +119,24 @@ func (s *server) LogsBetweenDates(in *pb.LogsBetweenDatesRequest, out pb.AgentSe
}
func (s *server) StreamRawBytes(in *pb.StreamRawBytesRequest, out pb.AgentService_StreamRawBytesServer) error {
reader, err := s.client.ContainerLogsBetweenDates(out.Context(), in.ContainerId, in.Since.AsTime(), in.Until.AsTime(), container.StdType(in.StreamTypes))
c, err := s.service.FindContainer(out.Context(), in.ContainerId, container.ContainerLabels{})
if err != nil {
return err
}
reader, err := s.service.RawLogs(out.Context(), c, in.Since.AsTime(), in.Until.AsTime(), container.StdType(in.StreamTypes))
if err != nil {
return err
}
defer reader.Close()
buf := make([]byte, 1024)
for {
n, err := reader.Read(buf)
if err != nil {
if err == io.EOF {
return nil
}
return err
}
@@ -142,7 +157,7 @@ func (s *server) StreamRawBytes(in *pb.StreamRawBytesRequest, out pb.AgentServic
func (s *server) StreamEvents(in *pb.StreamEventsRequest, out pb.AgentService_StreamEventsServer) error {
events := make(chan container.ContainerEvent)
s.store.SubscribeEvents(out.Context(), events)
s.service.SubscribeEvents(out.Context(), events)
for {
select {
@@ -164,7 +179,7 @@ func (s *server) StreamEvents(in *pb.StreamEventsRequest, out pb.AgentService_St
func (s *server) StreamStats(in *pb.StreamStatsRequest, out pb.AgentService_StreamStatsServer) error {
stats := make(chan container.ContainerStat)
s.store.SubscribeStats(out.Context(), stats)
s.service.SubscribeStats(out.Context(), stats)
for {
select {
@@ -193,13 +208,13 @@ func (s *server) FindContainer(ctx context.Context, in *pb.FindContainerRequest)
}
}
container, err := s.store.FindContainer(in.ContainerId, labels)
c, err := s.service.FindContainer(ctx, in.ContainerId, labels)
if err != nil {
return nil, status.Error(codes.NotFound, err.Error())
}
c := container.ToProto()
proto := c.ToProto()
return &pb.FindContainerResponse{
Container: &c,
Container: &proto,
}, nil
}
@@ -211,15 +226,15 @@ func (s *server) ListContainers(ctx context.Context, in *pb.ListContainersReques
}
}
containers, err := s.store.ListContainers(labels)
containers, err := s.service.ListContainers(ctx, labels)
if err != nil {
return nil, err
}
var pbContainers []*pb.Container
for _, container := range containers {
c := container.ToProto()
pbContainers = append(pbContainers, &c)
for _, c := range containers {
proto := c.ToProto()
pbContainers = append(pbContainers, &proto)
}
return &pb.ListContainersResponse{
@@ -228,7 +243,10 @@ func (s *server) ListContainers(ctx context.Context, in *pb.ListContainersReques
}
func (s *server) HostInfo(ctx context.Context, in *pb.HostInfoRequest) (*pb.HostInfoResponse, error) {
host := s.client.Host()
host, err := s.service.Host(ctx)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &pb.HostInfoResponse{
Host: &pb.Host{
Id: host.ID,
@@ -244,7 +262,7 @@ func (s *server) HostInfo(ctx context.Context, in *pb.HostInfoRequest) (*pb.Host
func (s *server) StreamContainerStarted(in *pb.StreamContainerStartedRequest, out pb.AgentService_StreamContainerStartedServer) error {
containers := make(chan container.Container)
go s.store.SubscribeNewContainers(out.Context(), containers)
go s.service.SubscribeContainersStarted(out.Context(), containers)
for {
select {
@@ -275,8 +293,12 @@ func (s *server) ContainerAction(ctx context.Context, in *pb.ContainerActionRequ
return nil, status.Error(codes.InvalidArgument, "invalid action")
}
err := s.client.ContainerActions(ctx, action, in.ContainerId)
c, err := s.service.FindContainer(ctx, in.ContainerId, container.ContainerLabels{})
if err != nil {
return nil, status.Error(codes.NotFound, err.Error())
}
err = s.service.ContainerAction(ctx, c, action)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
@@ -284,62 +306,62 @@ func (s *server) ContainerAction(ctx context.Context, in *pb.ContainerActionRequ
return &pb.ContainerActionResponse{}, nil
}
// terminalMessage represents a message from a terminal gRPC stream (exec or attach)
type terminalMessage interface {
GetStdin() []byte
GetResize() *pb.ResizePayload
}
// protoEventReader converts gRPC protobuf messages directly to ExecEvents (no JSON)
type protoEventReader struct {
recv func() (terminalMessage, error)
}
func (r *protoEventReader) ReadEvent() (*container.ExecEvent, error) {
msg, err := r.recv()
if err != nil {
return nil, err
}
if stdin := msg.GetStdin(); stdin != nil {
return &container.ExecEvent{Type: "userinput", Data: string(stdin)}, nil
} else if resize := msg.GetResize(); resize != nil {
return &container.ExecEvent{Type: "resize", Width: uint(resize.Width), Height: uint(resize.Height)}, nil
}
// Skip unknown message types
return r.ReadEvent()
}
// terminalStreamWriter adapts a gRPC terminal stream to io.Writer
type terminalStreamWriter struct {
send func([]byte) error
}
func (w *terminalStreamWriter) Write(p []byte) (int, error) {
if err := w.send(p); err != nil {
return 0, err
}
return len(p), nil
}
func (s *server) ContainerExec(stream pb.AgentService_ContainerExecServer) error {
request, err := stream.Recv()
if err != nil {
return status.Error(codes.Internal, err.Error())
}
cancelCtx, cancel := context.WithCancel(stream.Context())
session, err := s.client.ContainerExec(cancelCtx, request.ContainerId, request.Command)
c, err := s.service.FindContainer(stream.Context(), request.ContainerId, container.ContainerLabels{})
if err != nil {
cancel()
return status.Error(codes.Internal, err.Error())
return status.Error(codes.NotFound, err.Error())
}
var wg sync.WaitGroup
reader := &protoEventReader{recv: func() (terminalMessage, error) { return stream.Recv() }}
writer := &terminalStreamWriter{send: func(p []byte) error { return stream.Send(&pb.ContainerExecResponse{Stdout: p}) }}
// Read from container and send to client
wg.Go(func() {
defer cancel()
buffer := make([]byte, 1024)
for {
n, err := session.Reader.Read(buffer)
if err != nil {
return
}
if err := stream.Send(&pb.ContainerExecResponse{Stdout: buffer[:n]}); err != nil {
return
}
}
})
// Read from client stream and handle stdin/resize
wg.Go(func() {
defer cancel()
defer session.Writer.Close()
for {
req, err := stream.Recv()
if err != nil {
return
}
switch payload := req.Payload.(type) {
case *pb.ContainerExecRequest_Stdin:
if _, err := session.Writer.Write(payload.Stdin); err != nil {
log.Error().Err(err).Msg("error writing stdin to container")
return
}
case *pb.ContainerExecRequest_Resize:
if err := session.Resize(uint(payload.Resize.Width), uint(payload.Resize.Height)); err != nil {
log.Error().Err(err).Msg("error resizing terminal")
}
}
}
})
wg.Wait()
if err := s.service.Exec(stream.Context(), c, request.Command, reader, writer); err != nil {
return status.Error(codes.Internal, err.Error())
}
return nil
}
@@ -350,61 +372,73 @@ func (s *server) ContainerAttach(stream pb.AgentService_ContainerAttachServer) e
return status.Error(codes.Internal, err.Error())
}
cancelCtx, cancel := context.WithCancel(stream.Context())
session, err := s.client.ContainerAttach(cancelCtx, request.ContainerId)
c, err := s.service.FindContainer(stream.Context(), request.ContainerId, container.ContainerLabels{})
if err != nil {
cancel()
return status.Error(codes.Internal, err.Error())
return status.Error(codes.NotFound, err.Error())
}
var wg sync.WaitGroup
reader := &protoEventReader{recv: func() (terminalMessage, error) { return stream.Recv() }}
writer := &terminalStreamWriter{send: func(p []byte) error { return stream.Send(&pb.ContainerAttachResponse{Stdout: p}) }}
// Read from container and send to client
wg.Go(func() {
defer cancel()
buffer := make([]byte, 1024)
for {
n, err := session.Reader.Read(buffer)
if err != nil {
return
}
if err := stream.Send(&pb.ContainerAttachResponse{Stdout: buffer[:n]}); err != nil {
return
}
}
})
// Read from client stream and handle stdin/resize
wg.Go(func() {
defer cancel()
defer session.Writer.Close()
for {
req, err := stream.Recv()
if err != nil {
return
}
switch payload := req.Payload.(type) {
case *pb.ContainerAttachRequest_Stdin:
if _, err := session.Writer.Write(payload.Stdin); err != nil {
log.Error().Err(err).Msg("error writing stdin to container")
return
}
case *pb.ContainerAttachRequest_Resize:
if err := session.Resize(uint(payload.Resize.Width), uint(payload.Resize.Height)); err != nil {
log.Error().Err(err).Msg("error resizing terminal")
}
}
}
})
wg.Wait()
if err := s.service.Attach(stream.Context(), c, reader, writer); err != nil {
return status.Error(codes.Internal, err.Error())
}
return nil
}
func NewServer(client container.Client, certificates tls.Certificate, dozzleVersion string, labels container.ContainerLabels) (*grpc.Server, error) {
func (s *server) UpdateNotificationConfig(ctx context.Context, req *pb.UpdateNotificationConfigRequest) (*pb.UpdateNotificationConfigResponse, error) {
if s.notificationConfigHandler == nil {
log.Warn().Msg("No notification config handler registered, ignoring config update")
return &pb.UpdateNotificationConfigResponse{}, nil
}
// Validate request sizes to prevent memory exhaustion
const maxSubscriptions = 1000
const maxDispatchers = 100
if len(req.Subscriptions) > maxSubscriptions {
return nil, status.Errorf(codes.InvalidArgument, "too many subscriptions: %d (max %d)", len(req.Subscriptions), maxSubscriptions)
}
if len(req.Dispatchers) > maxDispatchers {
return nil, status.Errorf(codes.InvalidArgument, "too many dispatchers: %d (max %d)", len(req.Dispatchers), maxDispatchers)
}
// Convert proto subscriptions to types
subscriptions := make([]types.SubscriptionConfig, len(req.Subscriptions))
for i, sub := range req.Subscriptions {
subscriptions[i] = types.SubscriptionConfig{
ID: int(sub.Id),
Name: sub.Name,
Enabled: sub.Enabled,
DispatcherID: int(sub.DispatcherId),
LogExpression: sub.LogExpression,
ContainerExpression: sub.ContainerExpression,
}
}
// Convert proto dispatchers to types
dispatchers := make([]types.DispatcherConfig, len(req.Dispatchers))
for i, d := range req.Dispatchers {
dispatchers[i] = types.DispatcherConfig{
ID: int(d.Id),
Name: d.Name,
Type: d.Type,
URL: d.Url,
Template: d.Template,
}
}
// Call the handler (handler is responsible for persisting if needed)
if err := s.notificationConfigHandler.HandleNotificationConfig(subscriptions, dispatchers); err != nil {
log.Error().Err(err).Msg("Failed to handle notification config")
return nil, status.Error(codes.Internal, err.Error())
}
log.Info().Int("subscriptions", len(subscriptions)).Int("dispatchers", len(dispatchers)).Msg("Updated notification config from main server")
return &pb.UpdateNotificationConfigResponse{}, nil
}
func NewServer(service ClientService, certificates tls.Certificate, dozzleVersion string, notificationHandler NotificationConfigHandler) (*grpc.Server, error) {
caCertPool := x509.NewCertPool()
c, err := x509.ParseCertificate(certificates.Certificate[0])
if err != nil {
@@ -423,7 +457,7 @@ func NewServer(client container.Client, certificates tls.Certificate, dozzleVers
creds := credentials.NewTLS(tlsConfig)
grpcServer := grpc.NewServer(grpc.Creds(creds))
pb.RegisterAgentServiceServer(grpcServer, newServer(client, dozzleVersion, labels))
pb.RegisterAgentServiceServer(grpcServer, newServer(service, dozzleVersion, notificationHandler))
return grpcServer, nil
}
+5
View File
@@ -41,6 +41,11 @@ type ExecEvent struct {
Height uint `json:"height,omitempty"`
}
// ExecEventReader provides structured exec events (userinput, resize)
type ExecEventReader interface {
ReadEvent() (*ExecEvent, error)
}
type Client interface {
ListContainers(context.Context, ContainerLabels) ([]Container, error)
FindContainer(context.Context, string) (Container, error)
+1 -1
View File
@@ -122,7 +122,7 @@ func (l *ContainerLogListener) startListening(c container.Container, client cont
go func() {
log.Debug().Str("containerID", c.ID).Str("name", c.Name).Msg("Started listening to container")
if err := client.StreamLogs(streamCtx, c, time.Now(), container.STDALL, l.logChannel); err != nil && !errors.Is(err, io.EOF) {
if err := client.StreamLogs(streamCtx, c, time.Now(), container.STDALL, l.logChannel); err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
log.Error().Err(err).Str("containerID", c.ID).Msg("Error streaming logs")
}
}()
+73
View File
@@ -451,6 +451,79 @@ func (m *Manager) LoadConfig(r io.Reader) error {
return nil
}
// HandleNotificationConfig implements agent.NotificationConfigHandler interface
// It atomically replaces all subscriptions and dispatchers with new state from the main server
func (m *Manager) HandleNotificationConfig(subscriptions []types.SubscriptionConfig, dispatchers []types.DispatcherConfig) error {
// Clear existing state (with nil checks for defensive programming)
if m.subscriptions != nil {
m.subscriptions.Clear()
} else {
m.subscriptions = xsync.NewMap[int, *Subscription]()
}
if m.dispatchers != nil {
m.dispatchers.Clear()
} else {
m.dispatchers = xsync.NewMap[int, dispatcher.Dispatcher]()
}
// Find max IDs to initialize counters
var maxSubID, maxDispatcherID int
for _, sub := range subscriptions {
if sub.ID > maxSubID {
maxSubID = sub.ID
}
}
for _, d := range dispatchers {
if d.ID > maxDispatcherID {
maxDispatcherID = d.ID
}
}
m.subscriptionCounter.Store(int32(maxSubID))
m.dispatcherCounter.Store(int32(maxDispatcherID))
// Load subscriptions (convert from types.SubscriptionConfig to Subscription)
for _, sub := range subscriptions {
s := &Subscription{
ID: sub.ID,
Name: sub.Name,
Enabled: sub.Enabled,
DispatcherID: sub.DispatcherID,
LogExpression: sub.LogExpression,
ContainerExpression: sub.ContainerExpression,
}
if err := m.loadSubscription(s); err != nil {
return fmt.Errorf("failed to load subscription %s: %w", sub.Name, err)
}
}
// Load dispatchers
for _, dispatcherConfig := range dispatchers {
var d dispatcher.Dispatcher
switch dispatcherConfig.Type {
case "webhook":
webhook, err := dispatcher.NewWebhookDispatcher(dispatcherConfig.Name, dispatcherConfig.URL, dispatcherConfig.Template)
if err != nil {
return fmt.Errorf("failed to create webhook dispatcher %s: %w", dispatcherConfig.Name, err)
}
d = webhook
default:
return fmt.Errorf("unknown dispatcher type: %s", dispatcherConfig.Type)
}
m.dispatchers.Store(dispatcherConfig.ID, d)
log.Debug().Int("id", dispatcherConfig.ID).Msg("Loaded dispatcher from state sync")
}
// Update listener to start/stop streams based on new subscriptions
if m.listener != nil {
if err := m.listener.UpdateStreams(); err != nil {
return fmt.Errorf("failed to update listener streams: %w", err)
}
}
log.Debug().Int("subscriptions", len(subscriptions)).Int("dispatchers", len(dispatchers)).Msg("Replaced notification state")
return nil
}
// loadSubscription loads a subscription with its existing ID (used when loading from config)
func (m *Manager) loadSubscription(sub *Subscription) error {
// Compile container expression if provided
+3 -1
View File
@@ -149,7 +149,9 @@ func (s *Subscription) MatchesLog(l types.NotificationLog) bool {
result, err := expr.Run(s.LogProgram, l)
if err != nil {
log.Warn().Err(err).Str("expression", s.LogExpression).Msg("log expression evaluation error")
// Type mismatches are expected when expression doesn't match log type
// e.g., "message contains X" on JSON logs or "message.field" on string logs
log.Debug().Err(err).Str("expression", s.LogExpression).Msg("log expression evaluation error")
return false
}
+69 -3
View File
@@ -12,6 +12,10 @@ import (
"github.com/amir20/dozzle/internal/agent"
"github.com/amir20/dozzle/internal/docker"
"github.com/amir20/dozzle/internal/notification"
container_support "github.com/amir20/dozzle/internal/support/container"
docker_support "github.com/amir20/dozzle/internal/support/docker"
"github.com/amir20/dozzle/types"
"github.com/rs/zerolog/log"
)
@@ -19,6 +23,37 @@ type AgentCmd struct {
Addr string `arg:"--agent-addr,env:DOZZLE_AGENT_ADDR" default:":7007" help:"sets the host:port to bind for the agent"`
}
// persistingNotificationHandler wraps a notification manager and saves config to disk after updates
type persistingNotificationHandler struct {
manager *notification.Manager
configPath string
}
func (h *persistingNotificationHandler) HandleNotificationConfig(subscriptions []types.SubscriptionConfig, dispatchers []types.DispatcherConfig) error {
// Update the manager
if err := h.manager.HandleNotificationConfig(subscriptions, dispatchers); err != nil {
return err
}
// Save to disk
if err := os.MkdirAll("./data", 0755); err != nil {
return fmt.Errorf("failed to create data directory: %w", err)
}
file, err := os.Create(h.configPath)
if err != nil {
return fmt.Errorf("failed to create config file: %w", err)
}
defer file.Close()
if err := h.manager.WriteConfig(file); err != nil {
return fmt.Errorf("failed to save config: %w", err)
}
log.Debug().Str("path", h.configPath).Msg("Saved notification config to disk")
return nil
}
func (a *AgentCmd) Run(args Args, embeddedCerts embed.FS) error {
if args.Mode != "server" {
return fmt.Errorf("agent command is only available in server mode")
@@ -43,12 +78,43 @@ func (a *AgentCmd) Run(args Args, embeddedCerts embed.FS) error {
io.WriteString(tempFile, listener.Addr().String())
log.Debug().Str("file", tempFile.Name()).Msg("Created temp file")
go StartEvent(args, "", client, "agent")
server, err := agent.NewServer(client, certs, args.Version(), args.Filter)
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
// Create shared client service (single ContainerStore for both agent server and notifications)
clientService := docker_support.NewDockerClientService(client, args.Filter)
// Create notification manager using the shared client service
const notificationConfigPath = "./data/notifications.yml"
clients := []container_support.ClientService{clientService}
notificationManager := notification.NewManager(notification.NewContainerLogListener(ctx, clients))
// Load existing notification config if available
if file, err := os.Open(notificationConfigPath); err == nil {
if err := notificationManager.LoadConfig(file); err != nil {
log.Warn().Err(err).Msg("Failed to load notification config, starting fresh")
} else {
log.Info().Str("path", notificationConfigPath).Msg("Loaded notification config from disk")
}
file.Close()
}
if err := notificationManager.Start(); err != nil {
return fmt.Errorf("failed to start notification manager: %w", err)
}
// Create handler that wraps manager and persists config to disk
notificationHandler := &persistingNotificationHandler{
manager: notificationManager,
configPath: notificationConfigPath,
}
// Create agent server using the same shared client service
server, err := agent.NewServer(clientService, certs, args.Version(), notificationHandler)
if err != nil {
return fmt.Errorf("failed to create agent server: %w", err)
}
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
go func() {
log.Info().Msgf("Dozzle agent version %s", args.Version())
log.Info().Msgf("Agent listening on %s", listener.Addr().String())
+18 -59
View File
@@ -2,21 +2,20 @@ package container_support
import (
"context"
"encoding/json"
"io"
"sync"
"sync/atomic"
"time"
"github.com/amir20/dozzle/internal/agent"
"github.com/amir20/dozzle/internal/container"
"github.com/docker/docker/pkg/stdcopy"
"github.com/amir20/dozzle/types"
"github.com/rs/zerolog/log"
)
type agentService struct {
client *agent.Client
host container.Host
host atomic.Pointer[container.Host]
}
func NewAgentService(client *agent.Client) ClientService {
@@ -49,13 +48,16 @@ func (a *agentService) ListContainers(ctx context.Context, labels container.Cont
func (a *agentService) Host(ctx context.Context) (container.Host, error) {
host, err := a.client.Host(ctx)
if err != nil {
host := a.host
host.Available = false
return host, err
if cached := a.host.Load(); cached != nil {
h := *cached
h.Available = false
return h, err
}
return container.Host{Available: false}, err
}
a.host = host
return a.host, err
a.host.Store(&host)
return host, nil
}
func (a *agentService) SubscribeStats(ctx context.Context, stats chan<- container.ContainerStat) {
@@ -74,57 +76,14 @@ func (a *agentService) ContainerAction(ctx context.Context, container container.
return a.client.ContainerAction(ctx, container.ID, action)
}
func (a *agentService) Attach(ctx context.Context, container container.Container, stdin io.Reader, stdout io.Writer) error {
func (a *agentService) Attach(ctx context.Context, c container.Container, events container.ExecEventReader, stdout io.Writer) error {
panic("not implemented")
}
func (a *agentService) Exec(ctx context.Context, c container.Container, cmd []string, stdin io.Reader, stdout io.Writer) error {
cancelCtx, cancel := context.WithCancel(ctx)
session, err := a.client.ContainerExec(cancelCtx, c.ID, cmd)
if err != nil {
cancel()
return err
}
var wg sync.WaitGroup
wg.Go(func() {
decoder := json.NewDecoder(stdin)
loop:
for {
var event container.ExecEvent
if err := decoder.Decode(&event); err != nil {
if err != io.EOF {
log.Error().Err(err).Msg("error decoding event from ws using agent")
}
break
}
switch event.Type {
case "userinput":
if _, err := session.Writer.Write([]byte(event.Data)); err != nil {
log.Error().Err(err).Msg("error writing to container using agent")
break loop
}
case "resize":
if err := session.Resize(event.Width, event.Height); err != nil {
log.Error().Err(err).Msg("error resizing terminal using agent")
}
}
}
cancel()
session.Writer.Close()
})
wg.Go(func() {
if _, err := stdcopy.StdCopy(stdout, stdout, session.Reader); err != nil {
log.Error().Err(err).Msg("error while writing to ws using agent")
}
cancel()
})
wg.Wait()
return nil
func (a *agentService) Exec(ctx context.Context, c container.Container, cmd []string, events container.ExecEventReader, stdout io.Writer) error {
return a.client.Exec(ctx, c.ID, cmd, events, stdout)
}
func (a *agentService) UpdateNotificationConfig(ctx context.Context, subscriptions []types.SubscriptionConfig, dispatchers []types.DispatcherConfig) error {
return a.client.UpdateNotificationConfig(ctx, subscriptions, dispatchers)
}
+2 -2
View File
@@ -27,6 +27,6 @@ type ClientService interface {
StreamLogs(context.Context, container.Container, time.Time, container.StdType, chan<- *container.LogEvent) error
// Terminal
Attach(context.Context, container.Container, io.Reader, io.Writer) error
Exec(context.Context, container.Container, []string, io.Reader, io.Writer) error
Attach(context.Context, container.Container, container.ExecEventReader, io.Writer) error
Exec(context.Context, container.Container, []string, container.ExecEventReader, io.Writer) error
}
@@ -36,10 +36,10 @@ func (c *ContainerService) Action(ctx context.Context, action container.Containe
return c.clientService.ContainerAction(ctx, c.Container, action)
}
func (c *ContainerService) Attach(ctx context.Context, stdin io.Reader, stdout io.Writer) error {
return c.clientService.Attach(ctx, c.Container, stdin, stdout)
func (c *ContainerService) Attach(ctx context.Context, events container.ExecEventReader, stdout io.Writer) error {
return c.clientService.Attach(ctx, c.Container, events, stdout)
}
func (c *ContainerService) Exec(ctx context.Context, cmd []string, stdin io.Reader, stdout io.Writer) error {
return c.clientService.Exec(ctx, c.Container, cmd, stdin, stdout)
func (c *ContainerService) Exec(ctx context.Context, cmd []string, events container.ExecEventReader, stdout io.Writer) error {
return c.clientService.Exec(ctx, c.Container, cmd, events, stdout)
}
+8 -11
View File
@@ -2,7 +2,6 @@ package docker_support
import (
"context"
"encoding/json"
"io"
"sync"
"time"
@@ -112,7 +111,7 @@ func (d *DockerClientService) SubscribeContainersStarted(ctx context.Context, co
d.store.SubscribeNewContainers(ctx, containers)
}
func (d *DockerClientService) Attach(ctx context.Context, c container.Container, stdin io.Reader, stdout io.Writer) error {
func (d *DockerClientService) Attach(ctx context.Context, c container.Container, events container.ExecEventReader, stdout io.Writer) error {
cancelCtx, cancel := context.WithCancel(ctx)
session, err := d.client.ContainerAttach(cancelCtx, c.ID)
if err != nil {
@@ -123,13 +122,12 @@ func (d *DockerClientService) Attach(ctx context.Context, c container.Container,
var wg sync.WaitGroup
wg.Go(func() {
decoder := json.NewDecoder(stdin)
loop:
for {
var event container.ExecEvent
if err := decoder.Decode(&event); err != nil {
event, err := events.ReadEvent()
if err != nil {
if err != io.EOF {
log.Error().Err(err).Msg("error while decoding event from ws")
log.Error().Err(err).Msg("error while reading event")
}
break
}
@@ -170,7 +168,7 @@ func (d *DockerClientService) Attach(ctx context.Context, c container.Container,
return nil
}
func (d *DockerClientService) Exec(ctx context.Context, c container.Container, cmd []string, stdin io.Reader, stdout io.Writer) error {
func (d *DockerClientService) Exec(ctx context.Context, c container.Container, cmd []string, events container.ExecEventReader, stdout io.Writer) error {
cancelCtx, cancel := context.WithCancel(ctx)
session, err := d.client.ContainerExec(cancelCtx, c.ID, cmd)
if err != nil {
@@ -181,13 +179,12 @@ func (d *DockerClientService) Exec(ctx context.Context, c container.Container, c
var wg sync.WaitGroup
wg.Go(func() {
decoder := json.NewDecoder(stdin)
loop:
for {
var event container.ExecEvent
if err := decoder.Decode(&event); err != nil {
event, err := events.ReadEvent()
if err != nil {
if err != io.EOF {
log.Error().Err(err).Msg("error while decoding event from ws")
log.Error().Err(err).Msg("error while reading event")
}
break
}
+58 -8
View File
@@ -4,12 +4,14 @@ import (
"context"
"fmt"
"os"
"sync"
"time"
"github.com/amir20/dozzle/internal/container"
"github.com/amir20/dozzle/internal/notification"
"github.com/amir20/dozzle/internal/notification/dispatcher"
container_support "github.com/amir20/dozzle/internal/support/container"
"github.com/amir20/dozzle/types"
"github.com/rs/zerolog/log"
lop "github.com/samber/lo/parallel"
)
@@ -223,28 +225,76 @@ func (m *MultiHostService) saveNotificationConfig() {
if err := m.notificationManager.WriteConfig(file); err != nil {
log.Error().Err(err).Msg("Could not write notification config")
}
// Broadcast to all agents
m.broadcastNotificationConfig()
}
// NotificationConfigUpdater is an interface for clients that support notification config updates
type NotificationConfigUpdater interface {
UpdateNotificationConfig(ctx context.Context, subscriptions []types.SubscriptionConfig, dispatchers []types.DispatcherConfig) error
}
// broadcastNotificationConfig sends current notification config to all agent clients
func (m *MultiHostService) broadcastNotificationConfig() {
notifSubs := m.notificationManager.Subscriptions()
notifDispatchers := m.notificationManager.Dispatchers()
// Convert notification.Subscription to types.SubscriptionConfig
subscriptions := make([]types.SubscriptionConfig, len(notifSubs))
for i, sub := range notifSubs {
subscriptions[i] = types.SubscriptionConfig{
ID: sub.ID,
Name: sub.Name,
Enabled: sub.Enabled,
DispatcherID: sub.DispatcherID,
LogExpression: sub.LogExpression,
ContainerExpression: sub.ContainerExpression,
}
}
// Convert notification.DispatcherConfig to types.DispatcherConfig
dispatchers := make([]types.DispatcherConfig, len(notifDispatchers))
for i, d := range notifDispatchers {
dispatchers[i] = types.DispatcherConfig{
ID: d.ID,
Name: d.Name,
Type: d.Type,
URL: d.URL,
Template: d.Template,
}
}
var wg sync.WaitGroup
for _, client := range m.manager.List() {
// Check if client supports notification config updates (agents do, local docker clients don't)
if updater, ok := client.(NotificationConfigUpdater); ok {
wg.Go(func() {
ctx, cancel := context.WithTimeout(context.Background(), m.timeout)
defer cancel()
if err := updater.UpdateNotificationConfig(ctx, subscriptions, dispatchers); err != nil {
log.Error().Err(err).Msg("Failed to broadcast notification config to agent")
} else {
log.Debug().Int("subscriptions", len(subscriptions)).Int("dispatchers", len(dispatchers)).Msg("Broadcasted notification config to agent")
}
})
}
}
wg.Wait()
}
// AddSubscription adds a subscription to local manager and broadcasts to agents
func (m *MultiHostService) AddSubscription(sub *notification.Subscription) error {
// Add to local manager
if err := m.notificationManager.AddSubscription(sub); err != nil {
return err
}
// TODO: Broadcast to agents via gRPC when agent notification support is added
m.saveNotificationConfig()
return nil
}
// RemoveSubscription removes a subscription from local manager and broadcasts to agents
func (m *MultiHostService) RemoveSubscription(id int) {
// Remove from local manager
m.notificationManager.RemoveSubscription(id)
// TODO: Broadcast to agents via gRPC when agent notification support is added
m.saveNotificationConfig()
}
+8 -11
View File
@@ -2,7 +2,6 @@ package k8s_support
import (
"context"
"encoding/json"
"io"
"sync"
@@ -93,7 +92,7 @@ func (k *K8sClientService) SubscribeContainersStarted(ctx context.Context, conta
k.store.SubscribeNewContainers(ctx, containers)
}
func (k *K8sClientService) Attach(ctx context.Context, c container.Container, stdin io.Reader, stdout io.Writer) error {
func (k *K8sClientService) Attach(ctx context.Context, c container.Container, events container.ExecEventReader, stdout io.Writer) error {
cancelCtx, cancel := context.WithCancel(ctx)
session, err := k.client.ContainerAttach(cancelCtx, c.ID)
if err != nil {
@@ -107,13 +106,12 @@ func (k *K8sClientService) Attach(ctx context.Context, c container.Container, st
defer session.Writer.Close()
defer cancel()
decoder := json.NewDecoder(stdin)
loop:
for {
var event container.ExecEvent
if err := decoder.Decode(&event); err != nil {
event, err := events.ReadEvent()
if err != nil {
if err != io.EOF {
log.Error().Err(err).Msg("error decoding event")
log.Error().Err(err).Msg("error reading event")
}
break
}
@@ -143,7 +141,7 @@ func (k *K8sClientService) Attach(ctx context.Context, c container.Container, st
return nil
}
func (k *K8sClientService) Exec(ctx context.Context, c container.Container, cmd []string, stdin io.Reader, stdout io.Writer) error {
func (k *K8sClientService) Exec(ctx context.Context, c container.Container, cmd []string, events container.ExecEventReader, stdout io.Writer) error {
cancelCtx, cancel := context.WithCancel(ctx)
session, err := k.client.ContainerExec(cancelCtx, c.ID, cmd)
if err != nil {
@@ -157,13 +155,12 @@ func (k *K8sClientService) Exec(ctx context.Context, c container.Container, cmd
defer session.Writer.Close()
defer cancel()
decoder := json.NewDecoder(stdin)
loop:
for {
var event container.ExecEvent
if err := decoder.Decode(&event); err != nil {
event, err := events.ReadEvent()
if err != nil {
if err != io.EOF {
log.Error().Err(err).Msg("error decoding event")
log.Error().Err(err).Msg("error reading event")
}
break
}
+16 -22
View File
@@ -1,9 +1,12 @@
package web
import (
"encoding/json"
"io"
"net/http"
"github.com/amir20/dozzle/internal/auth"
"github.com/amir20/dozzle/internal/container"
"github.com/go-chi/chi/v5"
"github.com/gorilla/websocket"
"github.com/rs/zerolog/log"
@@ -48,9 +51,9 @@ func (h *handler) attach(w http.ResponseWriter, r *http.Request) {
return
}
wsReader := &webSocketReader{conn: conn}
eventReader := &jsonEventReader{conn: conn}
wsWriter := &webSocketWriter{conn: conn}
if err = containerService.Attach(r.Context(), wsReader, wsWriter); err != nil {
if err = containerService.Attach(r.Context(), eventReader, wsWriter); err != nil {
log.Error().Err(err).Msg("error while trying to attach to container")
conn.WriteMessage(websocket.TextMessage, []byte("🚨 Error while trying to attach to container\r\n"))
return
@@ -88,9 +91,9 @@ func (h *handler) exec(w http.ResponseWriter, r *http.Request) {
return
}
wsReader := &webSocketReader{conn: conn}
eventReader := &jsonEventReader{conn: conn}
wsWriter := &webSocketWriter{conn: conn}
if err = containerService.Exec(r.Context(), []string{"sh", "-c", "command -v bash >/dev/null 2>&1 && exec bash || exec sh"}, wsReader, wsWriter); err != nil {
if err = containerService.Exec(r.Context(), []string{"sh", "-c", "command -v bash >/dev/null 2>&1 && exec bash || exec sh"}, eventReader, wsWriter); err != nil {
log.Error().Err(err).Msg("error while trying to attach to container")
conn.WriteMessage(websocket.TextMessage, []byte("🚨 Error while trying to attach to container\r\n"))
return
@@ -106,30 +109,21 @@ func (w *webSocketWriter) Write(p []byte) (int, error) {
return len(p), err
}
type webSocketReader struct {
conn *websocket.Conn
buffer []byte
// jsonEventReader reads JSON-encoded ExecEvents from a websocket connection
type jsonEventReader struct {
conn *websocket.Conn
}
func (r *webSocketReader) Read(p []byte) (n int, err error) {
if len(r.buffer) > 0 {
n = copy(p, r.buffer)
r.buffer = r.buffer[n:]
return n, nil
}
// Otherwise, read a new message
func (r *jsonEventReader) ReadEvent() (*container.ExecEvent, error) {
_, message, err := r.conn.ReadMessage()
if err != nil {
return 0, err
return nil, io.EOF
}
n = copy(p, message)
// If we couldn't copy the entire message, store the rest in our buffer
if n < len(message) {
r.buffer = message[n:]
var event container.ExecEvent
if err := json.Unmarshal(message, &event); err != nil {
return nil, err
}
return n, nil
return &event, nil
}
+4 -1
View File
@@ -89,7 +89,10 @@ func main() {
if err != nil {
log.Fatal().Err(err).Msg("failed to listen")
}
server, err := agent.NewServer(localClient, certs, args.Version(), args.Filter)
// Create client service for agent server in swarm mode
clientService := docker_support.NewDockerClientService(localClient, args.Filter)
// TODO add notification for swarm mode
server, err := agent.NewServer(clientService, certs, args.Version(), nil)
if err != nil {
log.Fatal().Err(err).Msg("failed to create agent")
}
+53 -26
View File
@@ -1,44 +1,48 @@
syntax = "proto3";
option go_package = "internal/agent/pb";
package protobuf;
import "types.proto";
import "google/protobuf/timestamp.proto";
import "types.proto";
option go_package = "internal/agent/pb";
service AgentService {
rpc ListContainers(ListContainersRequest) returns (ListContainersResponse) {}
rpc FindContainer(FindContainerRequest) returns (FindContainerResponse) {}
rpc StreamLogs(StreamLogsRequest) returns (stream StreamLogsResponse) {}
rpc LogsBetweenDates(LogsBetweenDatesRequest)
returns (stream StreamLogsResponse) {}
rpc StreamRawBytes(StreamRawBytesRequest)
returns (stream StreamRawBytesResponse) {}
rpc LogsBetweenDates(LogsBetweenDatesRequest) returns (stream StreamLogsResponse) {}
rpc StreamRawBytes(StreamRawBytesRequest) returns (stream StreamRawBytesResponse) {}
rpc StreamEvents(StreamEventsRequest) returns (stream StreamEventsResponse) {}
rpc StreamStats(StreamStatsRequest) returns (stream StreamStatsResponse) {}
rpc StreamContainerStarted(StreamContainerStartedRequest)
returns (stream StreamContainerStartedResponse) {}
rpc StreamContainerStarted(StreamContainerStartedRequest) returns (stream StreamContainerStartedResponse) {}
rpc HostInfo(HostInfoRequest) returns (HostInfoResponse) {}
rpc ContainerAction(ContainerActionRequest)
returns (ContainerActionResponse) {}
rpc ContainerExec(stream ContainerExecRequest)
returns (stream ContainerExecResponse) {}
rpc ContainerAttach(stream ContainerAttachRequest)
returns (stream ContainerAttachResponse) {}
rpc ContainerAction(ContainerActionRequest) returns (ContainerActionResponse) {}
rpc ContainerExec(stream ContainerExecRequest) returns (stream ContainerExecResponse) {}
rpc ContainerAttach(stream ContainerAttachRequest) returns (stream ContainerAttachResponse) {}
rpc UpdateNotificationConfig(UpdateNotificationConfigRequest) returns (UpdateNotificationConfigResponse) {}
}
message ListContainersRequest { map<string, RepeatedString> filter = 1; }
message ListContainersRequest {
map<string, RepeatedString> filter = 1;
}
message RepeatedString { repeated string values = 1; }
message RepeatedString {
repeated string values = 1;
}
message ListContainersResponse { repeated Container containers = 1; }
message ListContainersResponse {
repeated Container containers = 1;
}
message FindContainerRequest {
string containerId = 1;
map<string, RepeatedString> filter = 2;
}
message FindContainerResponse { Container container = 1; }
message FindContainerResponse {
Container container = 1;
}
message StreamLogsRequest {
string containerId = 1;
@@ -46,7 +50,9 @@ message StreamLogsRequest {
int32 streamTypes = 3;
}
message StreamLogsResponse { LogEvent event = 1; }
message StreamLogsResponse {
LogEvent event = 1;
}
message LogsBetweenDatesRequest {
string containerId = 1;
@@ -61,20 +67,30 @@ message StreamRawBytesRequest {
google.protobuf.Timestamp until = 3;
int32 streamTypes = 4;
}
message StreamRawBytesResponse { bytes data = 1; }
message StreamRawBytesResponse {
bytes data = 1;
}
message StreamEventsRequest {}
message StreamEventsResponse { ContainerEvent event = 1; }
message StreamEventsResponse {
ContainerEvent event = 1;
}
message StreamStatsRequest {}
message StreamStatsResponse { ContainerStat stat = 1; }
message StreamStatsResponse {
ContainerStat stat = 1;
}
message HostInfoRequest {}
message HostInfoResponse { Host host = 1; }
message HostInfoResponse {
Host host = 1;
}
message StreamContainerStartedRequest {}
message StreamContainerStartedResponse { Container container = 1; }
message StreamContainerStartedResponse {
Container container = 1;
}
message ContainerActionRequest {
string containerId = 1;
@@ -97,7 +113,9 @@ message ResizePayload {
uint32 height = 2;
}
message ContainerExecResponse { bytes stdout = 1; }
message ContainerExecResponse {
bytes stdout = 1;
}
message ContainerAttachRequest {
string containerId = 1;
@@ -107,4 +125,13 @@ message ContainerAttachRequest {
}
}
message ContainerAttachResponse { bytes stdout = 1; }
message ContainerAttachResponse {
bytes stdout = 1;
}
message UpdateNotificationConfigRequest {
repeated NotificationSubscription subscriptions = 1;
repeated NotificationDispatcher dispatchers = 2;
}
message UpdateNotificationConfigResponse {}
+17
View File
@@ -92,3 +92,20 @@ enum ContainerAction {
Stop = 1;
Restart = 2;
}
message NotificationSubscription {
int32 id = 1;
string name = 2;
bool enabled = 3;
int32 dispatcherId = 4;
string logExpression = 5;
string containerExpression = 6;
}
message NotificationDispatcher {
int32 id = 1;
string name = 2;
string type = 3;
string url = 4;
string template = 5;
}
+19
View File
@@ -30,3 +30,22 @@ type NotificationLog struct {
Stream string `json:"stream" expr:"stream"`
Type string `json:"type" expr:"type"`
}
// SubscriptionConfig represents a notification subscription configuration
type SubscriptionConfig struct {
ID int
Name string
Enabled bool
DispatcherID int
LogExpression string
ContainerExpression string
}
// DispatcherConfig represents a notification dispatcher configuration
type DispatcherConfig struct {
ID int
Name string
Type string
URL string
Template string
}