Files
Amir Raminfar 8811dc82bd
Deploy VitePress site to Pages / build (push) Has been cancelled
Deploy VitePress site to Pages / Deploy (push) Has been cancelled
Push container / Push branches and PRs (push) Has been cancelled
Test / Typecheck (push) Has been cancelled
Test / JavaScript Tests (push) Has been cancelled
Test / Go Tests (push) Has been cancelled
Test / Go Staticcheck (push) Has been cancelled
Test / Integration Tests (push) Has been cancelled
fix(cloud): resolve read-only container tools in one shot (no extra LLM round-trip) (#4767)
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-01 07:59:19 -07:00

175 lines
4.1 KiB
Go

package cloud
import (
"context"
"encoding/json"
"fmt"
"regexp"
"strings"
"time"
"github.com/amir20/dozzle/internal/container"
pb "github.com/amir20/dozzle/proto/cloud"
"github.com/rs/zerolog/log"
)
// streamSender is a function that sends a ToolResponse to the cloud.
type streamSender func(resp *pb.ToolResponse) error
func parseStreamArgs(argsJSON string) (*fetchLogsArgs, *regexp.Regexp, error) {
var args fetchLogsArgs
if err := json.Unmarshal([]byte(argsJSON), &args); err != nil {
return nil, nil, fmt.Errorf("failed to parse arguments: %w", err)
}
if args.ContainerID == "" {
return nil, nil, fmt.Errorf("container_id is required")
}
var re *regexp.Regexp
if args.Regex != "" {
var err error
re, err = regexp.Compile(args.Regex)
if err != nil {
return nil, nil, fmt.Errorf("invalid regex pattern: %w", err)
}
}
return &args, re, nil
}
func matchesFilters(event *container.LogEvent, args *fetchLogsArgs, re *regexp.Regexp) (string, bool) {
if args.Level != "" && !strings.EqualFold(event.Level, args.Level) {
return "", false
}
msg := event.RawMessage
if msg == "" {
msg = fmt.Sprintf("%v", event.Message)
}
if args.Query != "" {
matched := containsIgnoreCase(msg, args.Query)
if matched == args.Inverse {
return "", false
}
}
if re != nil {
matched := re.MatchString(msg)
if matched == args.Inverse {
return "", false
}
}
return msg, true
}
func executeStreamLogs(ctx context.Context, requestID string, argsJSON string, deps ToolDeps, send streamSender) error {
args, re, err := parseStreamArgs(argsJSON)
if err != nil {
return err
}
// Read-only: resolve an ambiguous name in one shot instead of erroring. note
// is non-empty when the name resolved to one of several candidates; it is
// surfaced once, on the first emitted batch, so the model learns the pick and
// its siblings without a round-trip and without repeating on every batch.
hostID, containerID, note, err := resolveContainerRefRead(args.ContainerID, args.Host, deps)
if err != nil {
return err
}
cs, err := deps.HostService.FindContainer(hostID, containerID, deps.Labels)
if err != nil {
return fmt.Errorf("container not found: %w", err)
}
events := make(chan *container.LogEvent, 100)
go func() {
defer close(events)
if err := cs.StreamLogs(ctx, time.Now().Add(-30*time.Second), container.STDOUT|container.STDERR, events); err != nil {
log.Debug().Err(err).Str("container", cs.Container.Name).Msg("StreamLogs ended with error")
}
}()
noteSent := false
sendBatch := func(entries []*pb.LogEntry, endStream bool) error {
if len(entries) == 0 && !endStream {
return nil
}
name := cs.Container.Name
if !noteSent {
name = withResolutionNote(name, note)
noteSent = true
}
resp := &pb.ToolResponse{
RequestId: requestID,
Type: &pb.ToolResponse_CallTool{
CallTool: &pb.CallToolResponse{
Success: true,
Stream: !endStream,
EndStream: endStream,
Result: &pb.CallToolResponse_FetchLogs{
FetchLogs: &pb.FetchLogsResult{
ContainerName: name,
Entries: entries,
},
},
},
},
}
return send(resp)
}
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
const batchSize = 50
batch := make([]*pb.LogEntry, 0, batchSize)
flush := func(endStream bool) error {
if err := sendBatch(batch, endStream); err != nil {
return err
}
batch = batch[:0]
return nil
}
for {
select {
case event, ok := <-events:
if !ok {
// Channel closed — drain and send end_stream
return flush(true)
}
msg, matches := matchesFilters(event, args, re)
if !matches {
continue
}
batch = append(batch, &pb.LogEntry{
Timestamp: event.Timestamp,
Message: msg,
Stream: event.Stream,
Level: event.Level,
})
if len(batch) >= batchSize {
if err := flush(false); err != nil {
return err
}
}
case <-ticker.C:
if len(batch) > 0 {
if err := flush(false); err != nil {
return err
}
}
case <-ctx.Done():
if err := flush(true); err != nil {
log.Debug().Err(err).Msg("failed to send end_stream on cancel")
}
return ctx.Err()
}
}
}