feat: expose container actions to Dozzle Cloud via gRPC (#4578)

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Amir Raminfar
2026-04-03 06:53:26 -07:00
committed by GitHub
parent 481455a09b
commit 3ce1f9ebdc
12 changed files with 1576 additions and 14 deletions
-11
View File
@@ -74,7 +74,6 @@ github.com/containerd/errdefs/pkg v0.3.0 h1:9IKJ06FvyNlexW690DXuQNx2KA2cUJXx151X
github.com/containerd/errdefs/pkg v0.3.0/go.mod h1:NJw6s9HwNuRhnjJhM7pylWwMyAkmCQvQ4GpJHEqRLVk=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -142,7 +141,6 @@ github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y=
github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8=
github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4=
github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gohugoio/go-i18n/v2 v2.1.3-0.20230805085216-e63c13218d0e h1:QArsSubW7eDh8APMXkByjQWvuljwPGAGQpJEFn0F0wY=
github.com/gohugoio/go-i18n/v2 v2.1.3-0.20230805085216-e63c13218d0e/go.mod h1:3Ltoo9Banwq0gOtcOwxuHG6omk+AwsQPADyw2vQYOJQ=
github.com/gohugoio/hashstructure v0.5.0 h1:G2fjSBU36RdwEJBWJ+919ERvOVqAg9tfcYp47K9swqg=
@@ -217,11 +215,8 @@ github.com/makeworld-the-better-one/dither/v2 v2.4.0 h1:Az/dYXiTcwcRSe59Hzw4RI1r
github.com/makeworld-the-better-one/dither/v2 v2.4.0/go.mod h1:VBtN8DXO7SNtyGmLiGA7IsFeKrBkQPze1/iAeM95arc=
github.com/marekm4/color-extractor v1.2.1 h1:3Zb2tQsn6bITZ8MBVhc33Qn1k5/SEuZ18mrXGUqIwn0=
github.com/marekm4/color-extractor v1.2.1/go.mod h1:90VjmiHI6M8ez9eYUaXLdcKnS+BAOp7w+NpwBdkJmpA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE=
github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
@@ -296,9 +291,6 @@ github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0=
github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY=
github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ=
github.com/rs/zerolog v1.35.0 h1:VD0ykx7HMiMJytqINBsKcbLS+BJ4WYjz+05us+LRTdI=
github.com/rs/zerolog v1.35.0/go.mod h1:EjML9kdfa/RMA7h/6z6pYmq1ykOuA8/mjWaEvGI+jcw=
github.com/samber/lo v1.53.0 h1:t975lj2py4kJPQ6haz1QMgtId2gtmfktACxIXArw3HM=
@@ -343,8 +335,6 @@ github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcY
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yuin/goldmark v1.7.17 h1:p36OVWwRb246iHxA/U4p8OPEpOTESm4n+g+8t0EE5uA=
github.com/yuin/goldmark v1.7.17/go.mod h1:ip/1k0VRfGynBgxOz0yCqHrbZXhcjxyuS66Brc7iBKg=
github.com/yuin/goldmark v1.8.2 h1:kEGpgqJXdgbkhcOgBxkC0X0PmoPG1ZyoZ117rDVp4zE=
github.com/yuin/goldmark v1.8.2/go.mod h1:ip/1k0VRfGynBgxOz0yCqHrbZXhcjxyuS66Brc7iBKg=
github.com/yuin/goldmark-emoji v1.0.6 h1:QWfF2FYaXwL74tfGOW5izeiZepUDroDJfWubQI9HTHs=
@@ -421,7 +411,6 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+302
View File
@@ -0,0 +1,302 @@
package cloud
import (
"context"
"errors"
"fmt"
"math/rand/v2"
"os"
"strings"
"sync"
"time"
"github.com/amir20/dozzle/internal/cloud/pb"
"github.com/amir20/dozzle/internal/container"
"github.com/amir20/dozzle/internal/notification/dispatcher"
"github.com/rs/zerolog/log"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
const (
initialBackoff = 1 * time.Second
maxBackoff = 30 * time.Second
backoffFactor = 2
jitterFraction = 0.1
maxConcurrent = 5
)
// Client manages the gRPC connection to Dozzle Cloud
type Client struct {
enableActions bool
labels container.ContainerLabels
hostService ToolHostService
apiKeyFunc func() string
target string
plaintext bool
toolSem *semaphore.Weighted
cachedToolsJSON []string
cachedToolsOnce sync.Once
startCh chan struct{}
}
// NewClient creates a new cloud gRPC client.
// apiKeyFunc is called to get the current cloud API key — it may return ""
// if no cloud dispatcher is configured yet, in which case the client waits.
func NewClient(enableActions bool, labels container.ContainerLabels, hostService ToolHostService, apiKeyFunc func() string) *Client {
cloudURL := os.Getenv("AGENT_URL")
if cloudURL == "" {
cloudURL = "https://agent.doligence.dozzle.dev"
}
// Support plaintext for local dev (AGENT_URL=http://localhost:7008)
plaintext := strings.HasPrefix(cloudURL, "http://")
target := cloudURL
target = strings.TrimPrefix(target, "https://")
target = strings.TrimPrefix(target, "http://")
if !strings.Contains(target, ":") {
if plaintext {
target = target + ":80"
} else {
target = target + ":443"
}
}
return &Client{
enableActions: enableActions,
labels: labels,
hostService: hostService,
apiKeyFunc: apiKeyFunc,
target: target,
plaintext: plaintext,
toolSem: semaphore.NewWeighted(maxConcurrent),
startCh: make(chan struct{}, 1),
}
}
// Notify signals the client to attempt a connection. Safe to call multiple times.
// Use this when a cloud dispatcher is added or when the status page is viewed.
func (c *Client) Notify() {
select {
case c.startCh <- struct{}{}:
default:
}
}
// Run blocks until signaled via Notify(), then connects to the cloud gRPC endpoint
// and processes tool requests. Reconnects automatically on failure.
// Does nothing until Notify() is called — zero overhead for non-cloud users.
// Blocks until ctx is cancelled.
func (c *Client) Run(ctx context.Context) {
// Wait for signal to start
select {
case <-ctx.Done():
return
case <-c.startCh:
}
backoff := initialBackoff
backoffTimer := time.NewTimer(0)
backoffTimer.Stop()
defer backoffTimer.Stop()
for {
apiKey := c.apiKeyFunc()
if apiKey == "" {
// Cloud dispatcher was removed — go back to waiting for signal
select {
case <-ctx.Done():
return
case <-c.startCh:
}
continue
}
wasConnected, err := c.connect(ctx, apiKey)
if ctx.Err() != nil {
return
}
if wasConnected {
backoff = initialBackoff
}
if err != nil {
if isPermissionDenied(err) {
log.Debug().Msg("cloud account does not have pro plan, stopping cloud client")
return
}
log.Debug().Err(err).Dur("backoff", backoff).Msg("cloud connection failed, reconnecting")
}
jitter := time.Duration(float64(backoff) * jitterFraction * rand.Float64())
backoffTimer.Reset(backoff + jitter)
select {
case <-ctx.Done():
return
case <-backoffTimer.C:
}
backoff = min(backoff*backoffFactor, maxBackoff)
}
}
// connect establishes a gRPC stream to the cloud and processes requests.
// Returns wasConnected=true if the stream was successfully established and
// at least one message was received before disconnecting.
func (c *Client) connect(ctx context.Context, apiKey string) (wasConnected bool, err error) {
var creds grpc.DialOption
if c.plaintext {
creds = grpc.WithTransportCredentials(insecure.NewCredentials())
} else {
creds = grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, ""))
}
conn, err := grpc.NewClient(c.target, creds, grpc.WithUserAgent(dispatcher.UserAgent))
if err != nil {
return false, fmt.Errorf("failed to dial cloud: %w", err)
}
defer conn.Close()
client := pb.NewCloudToolServiceClient(conn)
md := metadata.Pairs("x-api-key", apiKey)
streamCtx := metadata.NewOutgoingContext(ctx, md)
stream, err := client.ToolStream(streamCtx)
if err != nil {
return false, fmt.Errorf("failed to open tool stream: %w", err)
}
log.Debug().Str("target", c.target).Msg("connected to cloud tool service")
streamLifetime, streamCancel := context.WithCancel(stream.Context())
var sendMu sync.Mutex
var wg sync.WaitGroup
sendResp := func(resp *pb.ToolResponse) error {
sendMu.Lock()
defer sendMu.Unlock()
return stream.Send(resp)
}
defer func() {
streamCancel()
wg.Wait()
}()
for {
req, err := stream.Recv()
if err != nil {
return wasConnected, fmt.Errorf("stream recv error: %w", err)
}
wasConnected = true
// List tools is fast — handle inline. Tool calls run concurrently with a semaphore.
if _, ok := req.Type.(*pb.ToolRequest_CallTool); ok {
if !c.toolSem.TryAcquire(1) {
resp := &pb.ToolResponse{
RequestId: req.RequestId,
Type: &pb.ToolResponse_CallTool{
CallTool: &pb.CallToolResponse{
Success: false,
Error: "too many concurrent tool calls",
},
},
}
if err := sendResp(resp); err != nil {
return wasConnected, fmt.Errorf("stream send error: %w", err)
}
continue
}
wg.Add(1)
go func() {
defer wg.Done()
defer c.toolSem.Release(1)
resp := c.handleRequest(streamLifetime, req)
if streamLifetime.Err() != nil {
return
}
if err := sendResp(resp); err != nil {
log.Debug().Err(err).Msg("failed to send tool response")
}
}()
} else {
resp := c.handleRequest(streamLifetime, req)
if err := sendResp(resp); err != nil {
return wasConnected, fmt.Errorf("stream send error: %w", err)
}
}
}
}
func (c *Client) handleRequest(ctx context.Context, req *pb.ToolRequest) *pb.ToolResponse {
resp := &pb.ToolResponse{
RequestId: req.RequestId,
}
switch t := req.Type.(type) {
case *pb.ToolRequest_ListTools:
log.Debug().Str("request_id", req.RequestId).Msg("cloud requested tool list")
resp.Type = &pb.ToolResponse_ListTools{
ListTools: &pb.ListToolsResponse{
ToolsJson: c.toolsJSON(),
},
}
case *pb.ToolRequest_CallTool:
log.Debug().Str("request_id", req.RequestId).Str("tool", t.CallTool.Name).Str("args", t.CallTool.ArgumentsJson).Msg("cloud tool call received")
result, err := ExecuteTool(ctx, t.CallTool.Name, t.CallTool.ArgumentsJson, c.enableActions, c.hostService, c.labels)
if err != nil {
log.Debug().Err(err).Str("request_id", req.RequestId).Str("tool", t.CallTool.Name).Msg("cloud tool call failed")
resp.Type = &pb.ToolResponse_CallTool{
CallTool: &pb.CallToolResponse{
Success: false,
Error: err.Error(),
},
}
} else {
log.Debug().Str("request_id", req.RequestId).Str("tool", t.CallTool.Name).Msg("cloud tool call completed")
resp.Type = &pb.ToolResponse_CallTool{
CallTool: &pb.CallToolResponse{
Success: true,
ResultJson: result,
},
}
}
default:
log.Warn().Msg("received unknown tool request type")
resp.Type = &pb.ToolResponse_CallTool{
CallTool: &pb.CallToolResponse{
Success: false,
Error: "unknown request type",
},
}
}
return resp
}
func (c *Client) toolsJSON() []string {
c.cachedToolsOnce.Do(func() {
c.cachedToolsJSON = marshalTools(c.enableActions)
})
return c.cachedToolsJSON
}
func isPermissionDenied(err error) bool {
for e := err; e != nil; e = errors.Unwrap(e) {
if s, ok := status.FromError(e); ok && s.Code() == codes.PermissionDenied {
return true
}
}
return false
}
+152
View File
@@ -0,0 +1,152 @@
package cloud
import (
"context"
"testing"
"github.com/amir20/dozzle/internal/cloud/pb"
"github.com/amir20/dozzle/internal/container"
container_support "github.com/amir20/dozzle/internal/support/container"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
func TestNewClient_DefaultURL(t *testing.T) {
t.Setenv("AGENT_URL", "")
client := NewClient(true, nil, nil, func() string { return "test-key" })
assert.Equal(t, "agent.doligence.dozzle.dev:443", client.target)
}
func TestNewClient_CustomURL(t *testing.T) {
t.Setenv("AGENT_URL", "https://custom.cloud.dev")
client := NewClient(true, nil, nil, func() string { return "test-key" })
assert.Equal(t, "custom.cloud.dev:443", client.target)
assert.False(t, client.plaintext)
}
func TestNewClient_PlaintextURL(t *testing.T) {
t.Setenv("AGENT_URL", "http://localhost:7008")
client := NewClient(true, nil, nil, func() string { return "test-key" })
assert.Equal(t, "localhost:7008", client.target)
assert.True(t, client.plaintext)
}
func TestHandleRequest_ListTools(t *testing.T) {
client := &Client{
enableActions: true,
}
req := &pb.ToolRequest{
RequestId: "req-1",
Type: &pb.ToolRequest_ListTools{
ListTools: &pb.ListToolsRequest{},
},
}
resp := client.handleRequest(context.Background(), req)
assert.Equal(t, "req-1", resp.RequestId)
listResp := resp.GetListTools()
assert.NotNil(t, listResp)
assert.Len(t, listResp.ToolsJson, 4) // find_containers + 3 actions
}
func TestHandleRequest_ListTools_ActionsDisabled(t *testing.T) {
client := &Client{
enableActions: false,
}
req := &pb.ToolRequest{
RequestId: "req-2",
Type: &pb.ToolRequest_ListTools{
ListTools: &pb.ListToolsRequest{},
},
}
resp := client.handleRequest(context.Background(), req)
listResp := resp.GetListTools()
assert.Len(t, listResp.ToolsJson, 1) // only list_containers
}
func TestHandleRequest_CallTool_ListContainers(t *testing.T) {
mockHost := &MockHostService{}
mockHost.On("ListAllContainers", container.ContainerLabels(nil)).Return([]container.Container{
{ID: "abc", Name: "nginx", Image: "nginx:latest", State: "running", Host: "local"},
}, nil)
client := &Client{
hostService: mockHost,
}
req := &pb.ToolRequest{
RequestId: "req-3",
Type: &pb.ToolRequest_CallTool{
CallTool: &pb.CallToolRequest{
Name: "find_containers",
ArgumentsJson: "",
},
},
}
resp := client.handleRequest(context.Background(), req)
callResp := resp.GetCallTool()
assert.True(t, callResp.Success)
assert.Contains(t, callResp.ResultJson, "nginx")
}
func TestHandleRequest_CallTool_UnknownTool(t *testing.T) {
mockHost := &MockHostService{}
client := &Client{
hostService: mockHost,
}
req := &pb.ToolRequest{
RequestId: "req-4",
Type: &pb.ToolRequest_CallTool{
CallTool: &pb.CallToolRequest{
Name: "nonexistent",
ArgumentsJson: "",
},
},
}
resp := client.handleRequest(context.Background(), req)
callResp := resp.GetCallTool()
assert.False(t, callResp.Success)
assert.Contains(t, callResp.Error, "unknown tool")
}
func TestHandleRequest_CallTool_RestartContainer(t *testing.T) {
mockClient := &MockClientService{}
mockClient.On("ContainerAction", mock.Anything, mock.Anything, container.Restart).Return(nil)
cs := container_support.NewContainerService(mockClient, container.Container{ID: "abc123"})
mockHost := &MockHostService{}
mockHost.On("FindContainer", "local", "abc123", container.ContainerLabels(nil)).Return(cs, nil)
client := &Client{
hostService: mockHost,
enableActions: true,
}
req := &pb.ToolRequest{
RequestId: "req-5",
Type: &pb.ToolRequest_CallTool{
CallTool: &pb.CallToolRequest{
Name: "restart_container",
ArgumentsJson: `{"container_id": "abc123", "host": "local"}`,
},
},
}
resp := client.handleRequest(context.Background(), req)
callResp := resp.GetCallTool()
assert.True(t, callResp.Success)
mockClient.AssertCalled(t, "ContainerAction", mock.Anything, mock.Anything, container.Restart)
}
+496
View File
@@ -0,0 +1,496 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.11
// protoc v7.34.1
// source: cloud.proto
package pb
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
unsafe "unsafe"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type ToolRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"`
// Types that are valid to be assigned to Type:
//
// *ToolRequest_ListTools
// *ToolRequest_CallTool
Type isToolRequest_Type `protobuf_oneof:"type"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ToolRequest) Reset() {
*x = ToolRequest{}
mi := &file_cloud_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ToolRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ToolRequest) ProtoMessage() {}
func (x *ToolRequest) ProtoReflect() protoreflect.Message {
mi := &file_cloud_proto_msgTypes[0]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ToolRequest.ProtoReflect.Descriptor instead.
func (*ToolRequest) Descriptor() ([]byte, []int) {
return file_cloud_proto_rawDescGZIP(), []int{0}
}
func (x *ToolRequest) GetRequestId() string {
if x != nil {
return x.RequestId
}
return ""
}
func (x *ToolRequest) GetType() isToolRequest_Type {
if x != nil {
return x.Type
}
return nil
}
func (x *ToolRequest) GetListTools() *ListToolsRequest {
if x != nil {
if x, ok := x.Type.(*ToolRequest_ListTools); ok {
return x.ListTools
}
}
return nil
}
func (x *ToolRequest) GetCallTool() *CallToolRequest {
if x != nil {
if x, ok := x.Type.(*ToolRequest_CallTool); ok {
return x.CallTool
}
}
return nil
}
type isToolRequest_Type interface {
isToolRequest_Type()
}
type ToolRequest_ListTools struct {
ListTools *ListToolsRequest `protobuf:"bytes,2,opt,name=list_tools,json=listTools,proto3,oneof"`
}
type ToolRequest_CallTool struct {
CallTool *CallToolRequest `protobuf:"bytes,3,opt,name=call_tool,json=callTool,proto3,oneof"`
}
func (*ToolRequest_ListTools) isToolRequest_Type() {}
func (*ToolRequest_CallTool) isToolRequest_Type() {}
type ToolResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"`
// Types that are valid to be assigned to Type:
//
// *ToolResponse_ListTools
// *ToolResponse_CallTool
Type isToolResponse_Type `protobuf_oneof:"type"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ToolResponse) Reset() {
*x = ToolResponse{}
mi := &file_cloud_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ToolResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ToolResponse) ProtoMessage() {}
func (x *ToolResponse) ProtoReflect() protoreflect.Message {
mi := &file_cloud_proto_msgTypes[1]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ToolResponse.ProtoReflect.Descriptor instead.
func (*ToolResponse) Descriptor() ([]byte, []int) {
return file_cloud_proto_rawDescGZIP(), []int{1}
}
func (x *ToolResponse) GetRequestId() string {
if x != nil {
return x.RequestId
}
return ""
}
func (x *ToolResponse) GetType() isToolResponse_Type {
if x != nil {
return x.Type
}
return nil
}
func (x *ToolResponse) GetListTools() *ListToolsResponse {
if x != nil {
if x, ok := x.Type.(*ToolResponse_ListTools); ok {
return x.ListTools
}
}
return nil
}
func (x *ToolResponse) GetCallTool() *CallToolResponse {
if x != nil {
if x, ok := x.Type.(*ToolResponse_CallTool); ok {
return x.CallTool
}
}
return nil
}
type isToolResponse_Type interface {
isToolResponse_Type()
}
type ToolResponse_ListTools struct {
ListTools *ListToolsResponse `protobuf:"bytes,2,opt,name=list_tools,json=listTools,proto3,oneof"`
}
type ToolResponse_CallTool struct {
CallTool *CallToolResponse `protobuf:"bytes,3,opt,name=call_tool,json=callTool,proto3,oneof"`
}
func (*ToolResponse_ListTools) isToolResponse_Type() {}
func (*ToolResponse_CallTool) isToolResponse_Type() {}
type ListToolsRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ListToolsRequest) Reset() {
*x = ListToolsRequest{}
mi := &file_cloud_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ListToolsRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ListToolsRequest) ProtoMessage() {}
func (x *ListToolsRequest) ProtoReflect() protoreflect.Message {
mi := &file_cloud_proto_msgTypes[2]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ListToolsRequest.ProtoReflect.Descriptor instead.
func (*ListToolsRequest) Descriptor() ([]byte, []int) {
return file_cloud_proto_rawDescGZIP(), []int{2}
}
type ListToolsResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
ToolsJson []string `protobuf:"bytes,1,rep,name=tools_json,json=toolsJson,proto3" json:"tools_json,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ListToolsResponse) Reset() {
*x = ListToolsResponse{}
mi := &file_cloud_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ListToolsResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ListToolsResponse) ProtoMessage() {}
func (x *ListToolsResponse) ProtoReflect() protoreflect.Message {
mi := &file_cloud_proto_msgTypes[3]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ListToolsResponse.ProtoReflect.Descriptor instead.
func (*ListToolsResponse) Descriptor() ([]byte, []int) {
return file_cloud_proto_rawDescGZIP(), []int{3}
}
func (x *ListToolsResponse) GetToolsJson() []string {
if x != nil {
return x.ToolsJson
}
return nil
}
type CallToolRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
ArgumentsJson string `protobuf:"bytes,2,opt,name=arguments_json,json=argumentsJson,proto3" json:"arguments_json,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *CallToolRequest) Reset() {
*x = CallToolRequest{}
mi := &file_cloud_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *CallToolRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*CallToolRequest) ProtoMessage() {}
func (x *CallToolRequest) ProtoReflect() protoreflect.Message {
mi := &file_cloud_proto_msgTypes[4]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use CallToolRequest.ProtoReflect.Descriptor instead.
func (*CallToolRequest) Descriptor() ([]byte, []int) {
return file_cloud_proto_rawDescGZIP(), []int{4}
}
func (x *CallToolRequest) GetName() string {
if x != nil {
return x.Name
}
return ""
}
func (x *CallToolRequest) GetArgumentsJson() string {
if x != nil {
return x.ArgumentsJson
}
return ""
}
type CallToolResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"`
ResultJson string `protobuf:"bytes,2,opt,name=result_json,json=resultJson,proto3" json:"result_json,omitempty"`
Error string `protobuf:"bytes,3,opt,name=error,proto3" json:"error,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *CallToolResponse) Reset() {
*x = CallToolResponse{}
mi := &file_cloud_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *CallToolResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*CallToolResponse) ProtoMessage() {}
func (x *CallToolResponse) ProtoReflect() protoreflect.Message {
mi := &file_cloud_proto_msgTypes[5]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use CallToolResponse.ProtoReflect.Descriptor instead.
func (*CallToolResponse) Descriptor() ([]byte, []int) {
return file_cloud_proto_rawDescGZIP(), []int{5}
}
func (x *CallToolResponse) GetSuccess() bool {
if x != nil {
return x.Success
}
return false
}
func (x *CallToolResponse) GetResultJson() string {
if x != nil {
return x.ResultJson
}
return ""
}
func (x *CallToolResponse) GetError() string {
if x != nil {
return x.Error
}
return ""
}
var File_cloud_proto protoreflect.FileDescriptor
const file_cloud_proto_rawDesc = "" +
"\n" +
"\vcloud.proto\x12\x05cloud\"\xa5\x01\n" +
"\vToolRequest\x12\x1d\n" +
"\n" +
"request_id\x18\x01 \x01(\tR\trequestId\x128\n" +
"\n" +
"list_tools\x18\x02 \x01(\v2\x17.cloud.ListToolsRequestH\x00R\tlistTools\x125\n" +
"\tcall_tool\x18\x03 \x01(\v2\x16.cloud.CallToolRequestH\x00R\bcallToolB\x06\n" +
"\x04type\"\xa8\x01\n" +
"\fToolResponse\x12\x1d\n" +
"\n" +
"request_id\x18\x01 \x01(\tR\trequestId\x129\n" +
"\n" +
"list_tools\x18\x02 \x01(\v2\x18.cloud.ListToolsResponseH\x00R\tlistTools\x126\n" +
"\tcall_tool\x18\x03 \x01(\v2\x17.cloud.CallToolResponseH\x00R\bcallToolB\x06\n" +
"\x04type\"\x12\n" +
"\x10ListToolsRequest\"2\n" +
"\x11ListToolsResponse\x12\x1d\n" +
"\n" +
"tools_json\x18\x01 \x03(\tR\ttoolsJson\"L\n" +
"\x0fCallToolRequest\x12\x12\n" +
"\x04name\x18\x01 \x01(\tR\x04name\x12%\n" +
"\x0earguments_json\x18\x02 \x01(\tR\rargumentsJson\"c\n" +
"\x10CallToolResponse\x12\x18\n" +
"\asuccess\x18\x01 \x01(\bR\asuccess\x12\x1f\n" +
"\vresult_json\x18\x02 \x01(\tR\n" +
"resultJson\x12\x14\n" +
"\x05error\x18\x03 \x01(\tR\x05error2M\n" +
"\x10CloudToolService\x129\n" +
"\n" +
"ToolStream\x12\x13.cloud.ToolResponse\x1a\x12.cloud.ToolRequest(\x010\x01B\x13Z\x11internal/cloud/pbb\x06proto3"
var (
file_cloud_proto_rawDescOnce sync.Once
file_cloud_proto_rawDescData []byte
)
func file_cloud_proto_rawDescGZIP() []byte {
file_cloud_proto_rawDescOnce.Do(func() {
file_cloud_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_cloud_proto_rawDesc), len(file_cloud_proto_rawDesc)))
})
return file_cloud_proto_rawDescData
}
var file_cloud_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_cloud_proto_goTypes = []any{
(*ToolRequest)(nil), // 0: cloud.ToolRequest
(*ToolResponse)(nil), // 1: cloud.ToolResponse
(*ListToolsRequest)(nil), // 2: cloud.ListToolsRequest
(*ListToolsResponse)(nil), // 3: cloud.ListToolsResponse
(*CallToolRequest)(nil), // 4: cloud.CallToolRequest
(*CallToolResponse)(nil), // 5: cloud.CallToolResponse
}
var file_cloud_proto_depIdxs = []int32{
2, // 0: cloud.ToolRequest.list_tools:type_name -> cloud.ListToolsRequest
4, // 1: cloud.ToolRequest.call_tool:type_name -> cloud.CallToolRequest
3, // 2: cloud.ToolResponse.list_tools:type_name -> cloud.ListToolsResponse
5, // 3: cloud.ToolResponse.call_tool:type_name -> cloud.CallToolResponse
1, // 4: cloud.CloudToolService.ToolStream:input_type -> cloud.ToolResponse
0, // 5: cloud.CloudToolService.ToolStream:output_type -> cloud.ToolRequest
5, // [5:6] is the sub-list for method output_type
4, // [4:5] is the sub-list for method input_type
4, // [4:4] is the sub-list for extension type_name
4, // [4:4] is the sub-list for extension extendee
0, // [0:4] is the sub-list for field type_name
}
func init() { file_cloud_proto_init() }
func file_cloud_proto_init() {
if File_cloud_proto != nil {
return
}
file_cloud_proto_msgTypes[0].OneofWrappers = []any{
(*ToolRequest_ListTools)(nil),
(*ToolRequest_CallTool)(nil),
}
file_cloud_proto_msgTypes[1].OneofWrappers = []any{
(*ToolResponse_ListTools)(nil),
(*ToolResponse_CallTool)(nil),
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_cloud_proto_rawDesc), len(file_cloud_proto_rawDesc)),
NumEnums: 0,
NumMessages: 6,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_cloud_proto_goTypes,
DependencyIndexes: file_cloud_proto_depIdxs,
MessageInfos: file_cloud_proto_msgTypes,
}.Build()
File_cloud_proto = out.File
file_cloud_proto_goTypes = nil
file_cloud_proto_depIdxs = nil
}
+117
View File
@@ -0,0 +1,117 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.6.0
// - protoc v7.34.1
// source: cloud.proto
package pb
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.64.0 or later.
const _ = grpc.SupportPackageIsVersion9
const (
CloudToolService_ToolStream_FullMethodName = "/cloud.CloudToolService/ToolStream"
)
// CloudToolServiceClient is the client API for CloudToolService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type CloudToolServiceClient interface {
// Dozzle sends ToolResponse, cloud sends ToolRequest
ToolStream(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[ToolResponse, ToolRequest], error)
}
type cloudToolServiceClient struct {
cc grpc.ClientConnInterface
}
func NewCloudToolServiceClient(cc grpc.ClientConnInterface) CloudToolServiceClient {
return &cloudToolServiceClient{cc}
}
func (c *cloudToolServiceClient) ToolStream(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[ToolResponse, ToolRequest], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &CloudToolService_ServiceDesc.Streams[0], CloudToolService_ToolStream_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &grpc.GenericClientStream[ToolResponse, ToolRequest]{ClientStream: stream}
return x, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type CloudToolService_ToolStreamClient = grpc.BidiStreamingClient[ToolResponse, ToolRequest]
// CloudToolServiceServer is the server API for CloudToolService service.
// All implementations must embed UnimplementedCloudToolServiceServer
// for forward compatibility.
type CloudToolServiceServer interface {
// Dozzle sends ToolResponse, cloud sends ToolRequest
ToolStream(grpc.BidiStreamingServer[ToolResponse, ToolRequest]) error
mustEmbedUnimplementedCloudToolServiceServer()
}
// UnimplementedCloudToolServiceServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedCloudToolServiceServer struct{}
func (UnimplementedCloudToolServiceServer) ToolStream(grpc.BidiStreamingServer[ToolResponse, ToolRequest]) error {
return status.Error(codes.Unimplemented, "method ToolStream not implemented")
}
func (UnimplementedCloudToolServiceServer) mustEmbedUnimplementedCloudToolServiceServer() {}
func (UnimplementedCloudToolServiceServer) testEmbeddedByValue() {}
// UnsafeCloudToolServiceServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to CloudToolServiceServer will
// result in compilation errors.
type UnsafeCloudToolServiceServer interface {
mustEmbedUnimplementedCloudToolServiceServer()
}
func RegisterCloudToolServiceServer(s grpc.ServiceRegistrar, srv CloudToolServiceServer) {
// If the following call panics, it indicates UnimplementedCloudToolServiceServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&CloudToolService_ServiceDesc, srv)
}
func _CloudToolService_ToolStream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(CloudToolServiceServer).ToolStream(&grpc.GenericServerStream[ToolResponse, ToolRequest]{ServerStream: stream})
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type CloudToolService_ToolStreamServer = grpc.BidiStreamingServer[ToolResponse, ToolRequest]
// CloudToolService_ServiceDesc is the grpc.ServiceDesc for CloudToolService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var CloudToolService_ServiceDesc = grpc.ServiceDesc{
ServiceName: "cloud.CloudToolService",
HandlerType: (*CloudToolServiceServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "ToolStream",
Handler: _CloudToolService_ToolStream_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "cloud.proto",
}
+250
View File
@@ -0,0 +1,250 @@
package cloud
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/amir20/dozzle/internal/container"
container_support "github.com/amir20/dozzle/internal/support/container"
"github.com/rs/zerolog/log"
)
// ToolHostService is the subset of HostService needed by tool execution
type ToolHostService interface {
ListAllContainers(labels container.ContainerLabels) ([]container.Container, []error)
FindContainer(host string, id string, labels container.ContainerLabels) (*container_support.ContainerService, error)
}
// FunctionDefinition describes a tool that can be called by the cloud service.
type FunctionDefinition struct {
Name string `json:"name"`
Description string `json:"description"`
Parameters ParameterDefinition `json:"parameters"`
}
// ParameterDefinition describes the JSON Schema parameters for a tool.
type ParameterDefinition struct {
Type string `json:"type"`
Properties map[string]PropertyDefinition `json:"properties"`
Required []string `json:"required,omitempty"`
}
// PropertyDefinition describes a single property in a tool's parameters.
type PropertyDefinition struct {
Type string `json:"type"`
Description string `json:"description,omitempty"`
}
type containerResult struct {
ID string `json:"id"`
Name string `json:"name"`
Image string `json:"image"`
Command string `json:"command"`
Created string `json:"created"`
StartedAt string `json:"startedAt"`
FinishedAt string `json:"finishedAt,omitempty"`
State string `json:"state"`
Health string `json:"health,omitempty"`
Host string `json:"host,omitempty"`
Group string `json:"group,omitempty"`
CPUPercent *float64 `json:"cpuPercent,omitempty"`
MaxCPU5Min *float64 `json:"maxCpu5Min,omitempty"`
MemoryPercent *float64 `json:"memoryPercent,omitempty"`
MaxMemory5Min *float64 `json:"maxMemory5Min,omitempty"`
}
var actionMap = map[string]container.ContainerAction{
"start_container": container.Start,
"stop_container": container.Stop,
"restart_container": container.Restart,
}
type actionResult struct {
Success bool `json:"success"`
ContainerID string `json:"containerId"`
Action string `json:"action"`
}
type containerActionArgs struct {
ContainerID string `json:"container_id"`
Host string `json:"host"`
}
// AvailableTools returns the list of tool definitions based on configuration.
// list_containers is always available. Action tools require enableActions.
func AvailableTools(enableActions bool) []FunctionDefinition {
tools := []FunctionDefinition{
{
Name: "find_containers",
Description: "List all Docker containers with their current state, name, image, and host",
Parameters: ParameterDefinition{
Type: "object",
Properties: map[string]PropertyDefinition{},
},
},
}
if enableActions {
actionParams := ParameterDefinition{
Type: "object",
Properties: map[string]PropertyDefinition{
"container_id": {
Type: "string",
Description: "The container ID",
},
"host": {
Type: "string",
Description: "The host name where the container is running",
},
},
Required: []string{"container_id", "host"},
}
tools = append(tools,
FunctionDefinition{
Name: "start_container",
Description: "Start a stopped Docker container",
Parameters: actionParams,
},
FunctionDefinition{
Name: "stop_container",
Description: "Stop a running Docker container",
Parameters: actionParams,
},
FunctionDefinition{
Name: "restart_container",
Description: "Restart a Docker container",
Parameters: actionParams,
},
)
}
return tools
}
// marshalTools serializes tool definitions to JSON strings for the gRPC response.
func marshalTools(enableActions bool) []string {
tools := AvailableTools(enableActions)
result := make([]string, 0, len(tools))
for _, tool := range tools {
data, err := json.Marshal(tool)
if err != nil {
log.Error().Err(err).Str("tool", tool.Name).Msg("failed to marshal tool definition")
continue
}
result = append(result, string(data))
}
return result
}
// ExecuteTool dispatches a tool call by name and returns JSON result.
// enableActions must be true for action tools (start/stop/restart) to execute.
func ExecuteTool(ctx context.Context, name string, argsJSON string, enableActions bool, hostService ToolHostService, labels container.ContainerLabels) (string, error) {
switch name {
case "find_containers":
if ctx.Err() != nil {
return "", ctx.Err()
}
return executeListContainers(hostService, labels)
case "start_container", "stop_container", "restart_container":
if !enableActions {
return "", fmt.Errorf("container actions are not enabled")
}
return executeContainerAction(ctx, argsJSON, actionMap[name], hostService, labels)
default:
return "", fmt.Errorf("unknown tool: %s", name)
}
}
func executeListContainers(hostService ToolHostService, labels container.ContainerLabels) (string, error) {
containers, errs := hostService.ListAllContainers(labels)
for _, err := range errs {
if err != nil {
log.Warn().Err(err).Msg("error listing containers from host")
}
}
results := make([]containerResult, len(containers))
for i, c := range containers {
r := containerResult{
ID: c.ID,
Name: c.Name,
Image: c.Image,
Command: c.Command,
Created: c.Created.UTC().Format(time.RFC3339),
StartedAt: c.StartedAt.UTC().Format(time.RFC3339),
FinishedAt: formatTimeOrEmpty(c.FinishedAt),
State: c.State,
Health: c.Health,
Host: c.Host,
Group: c.Group,
}
if c.Stats != nil && c.Stats.Len() > 0 {
stats := c.Stats.Data()
latest := stats[len(stats)-1]
r.CPUPercent = &latest.CPUPercent
r.MemoryPercent = &latest.MemoryPercent
var maxCPU, maxMem float64
for _, s := range stats {
maxCPU = max(maxCPU, s.CPUPercent)
maxMem = max(maxMem, s.MemoryPercent)
}
r.MaxCPU5Min = &maxCPU
r.MaxMemory5Min = &maxMem
}
results[i] = r
}
data, err := json.Marshal(results)
if err != nil {
return "", fmt.Errorf("failed to marshal containers: %w", err)
}
return string(data), nil
}
func formatTimeOrEmpty(t time.Time) string {
if t.IsZero() {
return ""
}
return t.UTC().Format(time.RFC3339)
}
func executeContainerAction(ctx context.Context, argsJSON string, action container.ContainerAction, hostService ToolHostService, labels container.ContainerLabels) (string, error) {
var args containerActionArgs
if err := json.Unmarshal([]byte(argsJSON), &args); err != nil {
return "", fmt.Errorf("failed to parse arguments: %w", err)
}
if args.ContainerID == "" {
return "", fmt.Errorf("container_id is required")
}
if args.Host == "" {
return "", fmt.Errorf("host is required")
}
cs, err := hostService.FindContainer(args.Host, args.ContainerID, labels)
if err != nil {
return "", fmt.Errorf("container not found: %w", err)
}
if err := cs.Action(ctx, action); err != nil {
return "", fmt.Errorf("action failed: %w", err)
}
result := actionResult{
Success: true,
ContainerID: args.ContainerID,
Action: string(action),
}
data, err := json.Marshal(result)
if err != nil {
return "", fmt.Errorf("failed to marshal result: %w", err)
}
return string(data), nil
}
+183
View File
@@ -0,0 +1,183 @@
package cloud
import (
"context"
"encoding/json"
"fmt"
"io"
"testing"
"time"
"github.com/amir20/dozzle/internal/container"
container_support "github.com/amir20/dozzle/internal/support/container"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
func TestAvailableTools_WithActionsEnabled(t *testing.T) {
tools := AvailableTools(true)
names := make([]string, len(tools))
for i, tool := range tools {
names[i] = tool.Name
}
assert.Contains(t, names, "find_containers")
assert.Contains(t, names, "start_container")
assert.Contains(t, names, "stop_container")
assert.Contains(t, names, "restart_container")
assert.Len(t, tools, 4)
}
func TestAvailableTools_WithActionsDisabled(t *testing.T) {
tools := AvailableTools(false)
names := make([]string, len(tools))
for i, tool := range tools {
names[i] = tool.Name
}
assert.Contains(t, names, "find_containers")
assert.Len(t, tools, 1)
}
func TestAvailableTools_ParametersAreValid(t *testing.T) {
tools := AvailableTools(true)
for _, tool := range tools {
assert.NotEmpty(t, tool.Name)
assert.NotEmpty(t, tool.Description)
assert.NotNil(t, tool.Parameters)
}
}
// MockHostService mocks the HostService interface for testing
type MockHostService struct {
mock.Mock
}
func (m *MockHostService) ListAllContainers(labels container.ContainerLabels) ([]container.Container, []error) {
args := m.Called(labels)
containers := args.Get(0).([]container.Container)
var errs []error
if args.Get(1) != nil {
errs = args.Get(1).([]error)
}
return containers, errs
}
func (m *MockHostService) FindContainer(host string, id string, labels container.ContainerLabels) (*container_support.ContainerService, error) {
args := m.Called(host, id, labels)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*container_support.ContainerService), args.Error(1)
}
type MockClientService struct {
mock.Mock
}
func (m *MockClientService) FindContainer(_ context.Context, _ string, _ container.ContainerLabels) (container.Container, error) {
return container.Container{}, nil
}
func (m *MockClientService) ListContainers(_ context.Context, _ container.ContainerLabels) ([]container.Container, error) {
return nil, nil
}
func (m *MockClientService) Host(_ context.Context) (container.Host, error) {
return container.Host{}, nil
}
func (m *MockClientService) ContainerAction(ctx context.Context, c container.Container, action container.ContainerAction) error {
args := m.Called(ctx, c, action)
return args.Error(0)
}
func (m *MockClientService) LogsBetweenDates(_ context.Context, _ container.Container, _ time.Time, _ time.Time, _ container.StdType) (<-chan *container.LogEvent, error) {
return nil, nil
}
func (m *MockClientService) RawLogs(_ context.Context, _ container.Container, _ time.Time, _ time.Time, _ container.StdType) (io.ReadCloser, error) {
return nil, nil
}
func (m *MockClientService) SubscribeStats(_ context.Context, _ chan<- container.ContainerStat) {}
func (m *MockClientService) SubscribeEvents(_ context.Context, _ chan<- container.ContainerEvent) {
}
func (m *MockClientService) SubscribeContainersStarted(_ context.Context, _ chan<- container.Container) {
}
func (m *MockClientService) StreamLogs(_ context.Context, _ container.Container, _ time.Time, _ container.StdType, _ chan<- *container.LogEvent) error {
return nil
}
func (m *MockClientService) Attach(_ context.Context, _ container.Container, _ container.ExecEventReader, _ io.Writer) error {
return nil
}
func (m *MockClientService) Exec(_ context.Context, _ container.Container, _ []string, _ container.ExecEventReader, _ io.Writer) error {
return nil
}
func TestExecuteTool_ListContainers(t *testing.T) {
mockHost := &MockHostService{}
mockHost.On("ListAllContainers", container.ContainerLabels(nil)).Return([]container.Container{
{ID: "abc123", Name: "nginx", Image: "nginx:latest", State: "running", Host: "local"},
{ID: "def456", Name: "redis", Image: "redis:7", State: "running", Host: "local"},
}, nil)
result, err := ExecuteTool(context.Background(), "find_containers", "", false, mockHost, nil)
assert.NoError(t, err)
var containers []map[string]any
err = json.Unmarshal([]byte(result), &containers)
assert.NoError(t, err)
assert.Len(t, containers, 2)
assert.Equal(t, "abc123", containers[0]["id"])
assert.Equal(t, "nginx", containers[0]["name"])
}
func TestExecuteTool_RestartContainer(t *testing.T) {
mockClient := &MockClientService{}
mockClient.On("ContainerAction", mock.Anything, mock.Anything, container.Restart).Return(nil)
cs := container_support.NewContainerService(mockClient, container.Container{ID: "abc123"})
mockHost := &MockHostService{}
mockHost.On("FindContainer", "local", "abc123", container.ContainerLabels(nil)).Return(cs, nil)
argsJSON := `{"container_id": "abc123", "host": "local"}`
result, err := ExecuteTool(context.Background(), "restart_container", argsJSON, true, mockHost, nil)
assert.NoError(t, err)
assert.Contains(t, result, "success")
mockClient.AssertCalled(t, "ContainerAction", mock.Anything, mock.Anything, container.Restart)
}
func TestExecuteTool_RestartContainer_ActionsDisabled(t *testing.T) {
mockHost := &MockHostService{}
argsJSON := `{"container_id": "abc123"}`
_, err := ExecuteTool(context.Background(), "restart_container", argsJSON, false, mockHost, nil)
assert.Error(t, err)
assert.Contains(t, err.Error(), "container actions are not enabled")
}
func TestExecuteTool_ListContainers_PartialHostError(t *testing.T) {
mockHost := &MockHostService{}
mockHost.On("ListAllContainers", container.ContainerLabels(nil)).Return(
[]container.Container{
{ID: "abc123", Name: "nginx", Image: "nginx:latest", State: "running", Host: "local"},
},
[]error{fmt.Errorf("host2 unreachable")},
)
result, err := ExecuteTool(context.Background(), "find_containers", "", false, mockHost, nil)
assert.NoError(t, err)
var containers []map[string]any
err = json.Unmarshal([]byte(result), &containers)
assert.NoError(t, err)
assert.Len(t, containers, 1)
}
func TestExecuteTool_UnknownTool(t *testing.T) {
mockHost := &MockHostService{}
_, err := ExecuteTool(context.Background(), "unknown_tool", "", false, mockHost, nil)
assert.Error(t, err)
assert.Contains(t, err.Error(), "unknown tool")
}
+1 -1
View File
@@ -277,7 +277,7 @@ func (s *ContainerStore) init() {
for {
select {
case event := <-s.events:
log.Debug().Str("event", event.Name).Str("id", event.ActorID).Msg("received container event")
log.Trace().Str("event", event.Name).Str("id", event.ActorID).Msg("received container event")
switch event.Name {
case "create":
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
+8
View File
@@ -93,6 +93,10 @@ func (h *handler) cloudCallback(w http.ResponseWriter, r *http.Request) {
id := h.hostService.AddDispatcher(cloudDispatcher)
if h.config.OnCloudSetup != nil {
h.config.OnCloudSetup()
}
base := h.config.Base
if base == "/" {
base = ""
@@ -148,6 +152,10 @@ func (h *handler) cloudStatus(w http.ResponseWriter, r *http.Request) {
return
}
if h.config.OnCloudSetup != nil {
h.config.OnCloudSetup()
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
io.Copy(w, resp.Body)
+1
View File
@@ -50,6 +50,7 @@ type Config struct {
DisableAvatars bool
ReleaseCheckMode ReleaseCheckMode
Labels container.ContainerLabels
OnCloudSetup func()
}
type Authorization struct {
+23 -2
View File
@@ -16,6 +16,7 @@ import (
"github.com/amir20/dozzle/internal/agent"
"github.com/amir20/dozzle/internal/auth"
"github.com/amir20/dozzle/internal/cloud"
"github.com/amir20/dozzle/internal/docker"
"github.com/amir20/dozzle/internal/k8s"
"github.com/amir20/dozzle/internal/notification/dispatcher"
@@ -33,6 +34,7 @@ var content embed.FS
var certs embed.FS
//go:generate protoc --go_out=. --go-grpc_out=. --proto_path=./protos ./protos/rpc.proto ./protos/types.proto
//go:generate protoc --go_out=. --go-grpc_out=. --proto_path=./protos ./protos/cloud.proto
func main() {
cli.ValidateEnvVars(cli.Args{}, cli.AgentCmd{})
args, subcommand := cli.ParseArgs()
@@ -122,7 +124,25 @@ func main() {
log.Fatal().Str("mode", args.Mode).Msg("Invalid mode")
}
srv := createServer(args, hostService)
// Create cloud tool client — does nothing until Notify() is called
apiKeyFunc := func() string {
for _, d := range hostService.Dispatchers() {
if d.Type == "cloud" && d.APIKey != "" {
return d.APIKey
}
}
return ""
}
cloudClient := cloud.NewClient(args.EnableActions, args.Filter, hostService, apiKeyFunc)
go cloudClient.Run(ctx)
// If cloud is already configured at startup, start the client immediately
if apiKeyFunc() != "" {
cloudClient.Notify()
}
srv := createServer(args, hostService, cloudClient.Notify)
go func() {
log.Info().Msgf("Accepting connections on %s", args.Addr)
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
@@ -149,7 +169,7 @@ func fileExists(filename string) bool {
return err == nil
}
func createServer(args cli.Args, hostService web.HostService) *http.Server {
func createServer(args cli.Args, hostService web.HostService, onCloudSetup func()) *http.Server {
_, dev := os.LookupEnv("DEV")
var releaseCheckMode web.ReleaseCheckMode = web.Automatic
@@ -228,6 +248,7 @@ func createServer(args cli.Args, hostService web.HostService) *http.Server {
DisableAvatars: args.DisableAvatars,
ReleaseCheckMode: releaseCheckMode,
Labels: args.Filter,
OnCloudSetup: onCloudSetup,
}
assets, err := fs.Sub(content, "dist")
+43
View File
@@ -0,0 +1,43 @@
syntax = "proto3";
package cloud;
option go_package = "internal/cloud/pb";
service CloudToolService {
// Dozzle sends ToolResponse, cloud sends ToolRequest
rpc ToolStream(stream ToolResponse) returns (stream ToolRequest);
}
message ToolRequest {
string request_id = 1;
oneof type {
ListToolsRequest list_tools = 2;
CallToolRequest call_tool = 3;
}
}
message ToolResponse {
string request_id = 1;
oneof type {
ListToolsResponse list_tools = 2;
CallToolResponse call_tool = 3;
}
}
message ListToolsRequest {}
message ListToolsResponse {
repeated string tools_json = 1;
}
message CallToolRequest {
string name = 1;
string arguments_json = 2;
}
message CallToolResponse {
bool success = 1;
string result_json = 2;
string error = 3;
}