Files
dozzle/internal/cloud/log_streamer_test.go
2026-05-24 06:15:13 -07:00

531 lines
15 KiB
Go

package cloud
import (
"context"
"io"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/amir20/dozzle/internal/container"
container_support "github.com/amir20/dozzle/internal/support/container"
pb "github.com/amir20/dozzle/proto/cloud"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// fakeClientService is a ClientService that delivers scripted log events.
type fakeClientService struct {
host container.Host
logsCh chan *container.LogEvent
streamed atomic.Bool
wait chan struct{} // closed once StreamLogs is invoked
once sync.Once
}
func newFakeClientService(hostID string) *fakeClientService {
return &fakeClientService{
host: container.Host{ID: hostID, Name: hostID},
logsCh: make(chan *container.LogEvent, 16),
wait: make(chan struct{}),
}
}
func (f *fakeClientService) FindContainer(_ context.Context, _ string, _ container.ContainerLabels) (container.Container, error) {
return container.Container{}, nil
}
func (f *fakeClientService) ListContainers(_ context.Context, _ container.ContainerLabels) ([]container.Container, error) {
return nil, nil
}
func (f *fakeClientService) Host(_ context.Context) (container.Host, error) { return f.host, nil }
func (f *fakeClientService) ContainerAction(_ context.Context, _ container.Container, _ container.ContainerAction) error {
return nil
}
func (f *fakeClientService) UpdateContainer(_ context.Context, _ container.Container, progressCh chan<- container.UpdateProgress) (bool, error) {
close(progressCh)
return false, nil
}
func (f *fakeClientService) LogsBetweenDates(_ context.Context, _ container.Container, _ time.Time, _ time.Time, _ container.StdType) (<-chan *container.LogEvent, error) {
return nil, nil
}
func (f *fakeClientService) RawLogs(_ context.Context, _ container.Container, _ time.Time, _ time.Time, _ container.StdType) (io.ReadCloser, error) {
return nil, nil
}
func (f *fakeClientService) SubscribeStats(_ context.Context, _ chan<- container.ContainerStat) {}
func (f *fakeClientService) SubscribeEvents(_ context.Context, _ chan<- container.ContainerEvent) {}
func (f *fakeClientService) SubscribeContainersStarted(_ context.Context, _ chan<- container.Container) {
}
func (f *fakeClientService) StreamLogs(ctx context.Context, _ container.Container, _ time.Time, _ container.StdType, events chan<- *container.LogEvent) error {
f.once.Do(func() { close(f.wait) })
f.streamed.Store(true)
for {
select {
case <-ctx.Done():
return ctx.Err()
case ev, ok := <-f.logsCh:
if !ok {
return io.EOF
}
select {
case events <- ev:
case <-ctx.Done():
return ctx.Err()
}
}
}
}
func (f *fakeClientService) Attach(_ context.Context, _ container.Container, _ container.ExecEventReader, _ io.Writer) error {
return nil
}
func (f *fakeClientService) Exec(_ context.Context, _ container.Container, _ []string, _ container.ExecEventReader, _ io.Writer) error {
return nil
}
// fakeHostService is a LogStreamHostService driven from a map of containers.
type fakeHostService struct {
mu sync.Mutex
containers []container.Container
clients map[string]*fakeClientService // hostID -> client
startedCh chan<- container.Container
}
func (f *fakeHostService) ListAllContainers(_ container.ContainerLabels) ([]container.Container, []error) {
f.mu.Lock()
defer f.mu.Unlock()
return append([]container.Container(nil), f.containers...), nil
}
func (f *fakeHostService) FindContainer(host string, id string, _ container.ContainerLabels) (*container_support.ContainerService, error) {
f.mu.Lock()
defer f.mu.Unlock()
client, ok := f.clients[host]
if !ok {
return nil, assert.AnError
}
for _, c := range f.containers {
if c.ID == id && c.Host == host {
return container_support.NewContainerService(client, c), nil
}
}
return nil, assert.AnError
}
func (f *fakeHostService) Hosts() []container.Host {
f.mu.Lock()
defer f.mu.Unlock()
hosts := make([]container.Host, 0, len(f.clients))
for id := range f.clients {
hosts = append(hosts, container.Host{ID: id, Name: id})
}
return hosts
}
func (f *fakeHostService) SubscribeContainersStarted(_ context.Context, ch chan<- container.Container, _ container_support.ContainerFilter) {
f.mu.Lock()
f.startedCh = ch
f.mu.Unlock()
}
func (f *fakeHostService) emitStart(c container.Container) {
f.mu.Lock()
f.containers = append(f.containers, c)
ch := f.startedCh
f.mu.Unlock()
if ch != nil {
ch <- c
}
}
func collectBatches(t *testing.T, mu *sync.Mutex, out *[]*pb.LogBatch, wantEntries int, timeout time.Duration) {
t.Helper()
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
mu.Lock()
total := 0
for _, b := range *out {
total += len(b.Entries)
}
mu.Unlock()
if total >= wantEntries {
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("timed out waiting for %d entries", wantEntries)
}
func TestLogStreamer_InitialSnapshotAndBatching(t *testing.T) {
client := newFakeClientService("host-1")
hs := &fakeHostService{
containers: []container.Container{
{ID: "c1", Name: "nginx", Host: "host-1", State: "running"},
},
clients: map[string]*fakeClientService{"host-1": client},
}
var sendMu sync.Mutex
var sent []*pb.LogBatch
send := func(resp *pb.ToolResponse) error {
sendMu.Lock()
defer sendMu.Unlock()
if lb := resp.GetLogBatch(); lb != nil {
sent = append(sent, lb)
}
return nil
}
ls := newLogStreamer(hs, nil, send)
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
runDone := make(chan struct{})
go func() {
ls.run(ctx)
close(runDone)
}()
// Wait for reader to hook in.
select {
case <-client.wait:
case <-time.After(2 * time.Second):
t.Fatal("StreamLogs was never called")
}
ts := time.Now().UnixMilli()
for range 3 {
client.logsCh <- &container.LogEvent{
Timestamp: ts,
RawMessage: "hello world",
Stream: "stdout",
Level: "info",
}
}
collectBatches(t, &sendMu, &sent, 3, 3*time.Second)
sendMu.Lock()
var allEntries []*pb.LogBatchEntry
for _, b := range sent {
allEntries = append(allEntries, b.Entries...)
}
sendMu.Unlock()
require.GreaterOrEqual(t, len(allEntries), 3)
e := allEntries[0]
assert.Equal(t, "host-1", e.HostId)
assert.Equal(t, "c1", e.ContainerId)
assert.Equal(t, "nginx", e.ContainerName)
assert.Equal(t, "hello world", e.Message)
assert.Equal(t, "stdout", e.Stream)
assert.Equal(t, "info", e.Level)
assert.Equal(t, ts*int64(time.Millisecond), e.TimestampNs)
cancel()
<-runDone
}
func TestLogStreamer_NewContainerStartsReader(t *testing.T) {
client := newFakeClientService("host-1")
hs := &fakeHostService{
containers: nil,
clients: map[string]*fakeClientService{"host-1": client},
}
send := func(_ *pb.ToolResponse) error { return nil }
ls := newLogStreamer(hs, nil, send)
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
runDone := make(chan struct{})
go func() {
ls.run(ctx)
close(runDone)
}()
// Wait for run() to subscribe before emitting; emitting before
// startedCh is registered would silently no-op.
require.Eventually(t, func() bool {
hs.mu.Lock()
defer hs.mu.Unlock()
return hs.startedCh != nil
}, 2*time.Second, 5*time.Millisecond, "subscription was never registered")
hs.emitStart(container.Container{ID: "c-new", Name: "redis", Host: "host-1", State: "running"})
select {
case <-client.wait:
case <-time.After(2 * time.Second):
t.Fatal("reader was not started for new container")
}
cancel()
<-runDone
}
func TestLogStreamer_LevelUnknownIsBlank(t *testing.T) {
client := newFakeClientService("host-1")
hs := &fakeHostService{
containers: []container.Container{
{ID: "c1", Name: "n", Host: "host-1", State: "running"},
},
clients: map[string]*fakeClientService{"host-1": client},
}
var sendMu sync.Mutex
var sent []*pb.LogBatch
send := func(resp *pb.ToolResponse) error {
sendMu.Lock()
defer sendMu.Unlock()
if lb := resp.GetLogBatch(); lb != nil {
sent = append(sent, lb)
}
return nil
}
ls := newLogStreamer(hs, nil, send)
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
go ls.run(ctx)
<-client.wait
client.logsCh <- &container.LogEvent{Timestamp: time.Now().UnixMilli(), RawMessage: "m", Stream: "stdout", Level: "unknown"}
collectBatches(t, &sendMu, &sent, 1, 2*time.Second)
sendMu.Lock()
assert.Equal(t, "", sent[0].Entries[0].Level)
sendMu.Unlock()
}
func TestLogStreamer_LabelDisabledSkipsContainer(t *testing.T) {
disabledClient := newFakeClientService("host-1")
canaryClient := newFakeClientService("host-2")
// The disabled container is listed first; a canary on a second host follows.
// run() processes the snapshot in order, so once the canary's reader starts
// we know the disabled container was already evaluated (and skipped).
hs := &fakeHostService{
containers: []container.Container{
{
ID: "c1", Name: "noisy", Host: "host-1", State: "running",
Labels: map[string]string{cloudMinLevelLabel: "disabled"},
},
{ID: "c2", Name: "canary", Host: "host-2", State: "running"},
},
clients: map[string]*fakeClientService{
"host-1": disabledClient,
"host-2": canaryClient,
},
}
send := func(_ *pb.ToolResponse) error { return nil }
ls := newLogStreamer(hs, nil, send)
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
runDone := make(chan struct{})
go func() { ls.run(ctx); close(runDone) }()
// Sync point: wait for the canary reader, which proves the snapshot was
// fully processed without relying on a wall-clock delay.
select {
case <-canaryClient.wait:
case <-time.After(2 * time.Second):
t.Fatal("canary reader was never started")
}
cancel()
<-runDone
assert.False(t, disabledClient.streamed.Load(), "disabled container must not be streamed")
}
func TestLogStreamer_LabelMinLevelFiltersBelow(t *testing.T) {
client := newFakeClientService("host-1")
hs := &fakeHostService{
containers: []container.Container{
{
ID: "c1", Name: "n", Host: "host-1", State: "running",
Labels: map[string]string{cloudMinLevelLabel: "warn"},
},
},
clients: map[string]*fakeClientService{"host-1": client},
}
var sendMu sync.Mutex
var sent []*pb.LogBatch
send := func(resp *pb.ToolResponse) error {
sendMu.Lock()
defer sendMu.Unlock()
if lb := resp.GetLogBatch(); lb != nil {
sent = append(sent, lb)
}
return nil
}
ls := newLogStreamer(hs, nil, send)
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
runDone := make(chan struct{})
go func() { ls.run(ctx); close(runDone) }()
<-client.wait
ts := time.Now().UnixMilli()
// debug + info should be dropped; warn + error pass; unknown passes.
client.logsCh <- &container.LogEvent{Timestamp: ts, RawMessage: "d", Stream: "stdout", Level: "debug"}
client.logsCh <- &container.LogEvent{Timestamp: ts, RawMessage: "i", Stream: "stdout", Level: "info"}
client.logsCh <- &container.LogEvent{Timestamp: ts, RawMessage: "w", Stream: "stdout", Level: "warn"}
client.logsCh <- &container.LogEvent{Timestamp: ts, RawMessage: "e", Stream: "stdout", Level: "error"}
client.logsCh <- &container.LogEvent{Timestamp: ts, RawMessage: "u", Stream: "stdout", Level: "unknown"}
collectBatches(t, &sendMu, &sent, 3, 2*time.Second)
sendMu.Lock()
var msgs []string
for _, b := range sent {
for _, e := range b.Entries {
msgs = append(msgs, e.Message)
}
}
sendMu.Unlock()
assert.ElementsMatch(t, []string{"w", "e", "u"}, msgs)
cancel()
<-runDone
}
func TestLogStreamer_InvalidLabelIgnoredStreamsAll(t *testing.T) {
client := newFakeClientService("host-1")
hs := &fakeHostService{
containers: []container.Container{
{
ID: "c1", Name: "n", Host: "host-1", State: "running",
Labels: map[string]string{cloudMinLevelLabel: "garbage"},
},
},
clients: map[string]*fakeClientService{"host-1": client},
}
var sendMu sync.Mutex
var sent []*pb.LogBatch
send := func(resp *pb.ToolResponse) error {
sendMu.Lock()
defer sendMu.Unlock()
if lb := resp.GetLogBatch(); lb != nil {
sent = append(sent, lb)
}
return nil
}
ls := newLogStreamer(hs, nil, send)
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
runDone := make(chan struct{})
go func() { ls.run(ctx); close(runDone) }()
<-client.wait
ts := time.Now().UnixMilli()
// An invalid label is ignored, so no filtering: every level passes through.
client.logsCh <- &container.LogEvent{Timestamp: ts, RawMessage: "d", Stream: "stdout", Level: "debug"}
client.logsCh <- &container.LogEvent{Timestamp: ts, RawMessage: "i", Stream: "stdout", Level: "info"}
client.logsCh <- &container.LogEvent{Timestamp: ts, RawMessage: "w", Stream: "stdout", Level: "warn"}
collectBatches(t, &sendMu, &sent, 3, 2*time.Second)
sendMu.Lock()
var msgs []string
for _, b := range sent {
for _, e := range b.Entries {
msgs = append(msgs, e.Message)
}
}
sendMu.Unlock()
assert.ElementsMatch(t, []string{"d", "i", "w"}, msgs)
cancel()
<-runDone
}
func TestParseMinLevel(t *testing.T) {
cases := []struct {
in string
rank int
disabled bool
valid bool
}{
{"", 0, false, true},
{"disabled", 0, true, true},
{"DISABLED", 0, true, true},
{" disabled ", 0, true, true},
{"info", 3, false, true},
{"WARN", 4, false, true},
{"garbage", 0, false, false},
}
for _, c := range cases {
r, d, v := parseMinLevel(c.in)
assert.Equal(t, c.rank, r, "rank for %q", c.in)
assert.Equal(t, c.disabled, d, "disabled for %q", c.in)
assert.Equal(t, c.valid, v, "valid for %q", c.in)
}
}
func TestLogStreamer_BatchFlushesOnMaxEntries(t *testing.T) {
client := newFakeClientService("host-1")
hs := &fakeHostService{
containers: []container.Container{
{ID: "c1", Name: "n", Host: "host-1", State: "running"},
},
clients: map[string]*fakeClientService{"host-1": client},
}
var sendMu sync.Mutex
var sent []*pb.LogBatch
send := func(resp *pb.ToolResponse) error {
sendMu.Lock()
defer sendMu.Unlock()
if lb := resp.GetLogBatch(); lb != nil {
sent = append(sent, lb)
}
return nil
}
ls := newLogStreamer(hs, nil, send)
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
runDone := make(chan struct{})
go func() { ls.run(ctx); close(runDone) }()
<-client.wait
// Push just over the max entries cap so we flush before the 1s timer.
ts := time.Now().UnixMilli()
total := logBatchMaxEntries + 10
for range total {
client.logsCh <- &container.LogEvent{Timestamp: ts, RawMessage: "x", Stream: "stdout", Level: "info"}
}
// A full flush should happen well under 1s (we're below the timer).
deadline := time.Now().Add(500 * time.Millisecond)
for time.Now().Before(deadline) {
sendMu.Lock()
hasFullBatch := false
for _, b := range sent {
if len(b.Entries) >= logBatchMaxEntries {
hasFullBatch = true
break
}
}
sendMu.Unlock()
if hasFullBatch {
cancel()
<-runDone
return
}
time.Sleep(10 * time.Millisecond)
}
cancel()
<-runDone
t.Fatal("expected a batch with >= logBatchMaxEntries entries")
}