diff --git a/internal/container/event_generator.go b/internal/container/event_generator.go index 3c147ecb..6d2233b0 100644 --- a/internal/container/event_generator.go +++ b/internal/container/event_generator.go @@ -98,13 +98,22 @@ func (g *EventGenerator) emitAsSingles(events []*LogEvent) bool { return true } +// maxOrphanLines bounds how many leading lines skipOrphanedLines will buffer +// before giving up. A genuine orphan run is the tail of a single group split at +// a fetch boundary, which is small. A busy container streaming sustained +// level-less lines (all spaced under maxGroupTimeDelta) would otherwise look +// like one endless orphan run and buffer forever, so the live view shows +// nothing until a timing gap appears. Past this many lines it clearly isn't a +// leftover fragment — emit what we have and resume normal processing. +const maxOrphanLines = 1000 + // skipOrphanedLines drains leading simple events without a level that look // like orphaned continuation lines from a group already emitted in a prior // fetch. Returns the first non-orphan event (or nil if the stream ends). -// If no non-orphan event arrives (stream ends or times out waiting), the -// buffered events are emitted as singles — they weren't really orphans. -// Lines near the container start time are never skipped since nothing can -// precede them. +// If no non-orphan event arrives (stream ends, times out waiting, or the run +// exceeds maxOrphanLines), the buffered events are emitted as singles — they +// weren't really orphans. Lines near the container start time are never +// skipped since nothing can precede them. func (g *EventGenerator) skipOrphanedLines() *LogEvent { var orphanBuffer []*LogEvent var lastTimestamp int64 @@ -148,6 +157,13 @@ func (g *EventGenerator) skipOrphanedLines() *LogEvent { lastTimestamp = current.Timestamp orphanBuffer = append(orphanBuffer, current) + // A sustained run this long isn't a leftover group fragment — it's real + // content (a busy container). Stop skipping and emit it, then resume. + if len(orphanBuffer) >= maxOrphanLines { + g.emitAsSingles(orphanBuffer) + return g.nextEvent() + } + // Use peek (with timeout) so we don't block forever on a live stream. if next := g.peek(); next == nil { // No more events within the timeout — these aren't orphans. diff --git a/internal/container/event_generator_test.go b/internal/container/event_generator_test.go index a4c5f799..c6ca0684 100644 --- a/internal/container/event_generator_test.go +++ b/internal/container/event_generator_test.go @@ -88,6 +88,50 @@ func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { } } +// steadyLevellessReader emits simple, level-less, timestamped lines forever, +// spaced under both maxGroupTimeDelta (so they look groupable) and the peek +// timeout (so peek never reports a gap). It stops when ctx is cancelled. +type steadyLevellessReader struct { + ctx context.Context + base time.Time + step time.Duration // timestamp spacing between consecutive lines + delay time.Duration // wall-clock spacing between Read() calls + i int +} + +func (r *steadyLevellessReader) Read() (string, StdType, error) { + select { + case <-r.ctx.Done(): + return "", 0, io.EOF + case <-time.After(r.delay): + } + ts := r.base.Add(time.Duration(r.i) * r.step).Format(time.RFC3339Nano) + r.i++ + return ts + " lorem ipsum dolor sit amet", STDOUT, nil +} + +func TestEventGenerator_doesNotStallOnSustainedLevellessStream(t *testing.T) { + // Reproduces the live-log stall: a busy container whose backlog has rotated + // away streams from a point far past its start, so skipOrphanedLines never + // short-circuits. With sustained level-less lines spaced under + // maxGroupTimeDelta, every line looks like an orphaned continuation, so the + // skip loop buffers forever and the UI shows "no logs". The generator must + // give up and emit instead. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + reader := &steadyLevellessReader{ctx: ctx, base: time.Now(), step: time.Millisecond, delay: time.Millisecond} + // startedAt far in the past so the near-start short-circuit cannot fire + g := NewEventGenerator(ctx, reader, Container{StartedAt: time.Now().Add(-time.Hour)}) + + select { + case event := <-g.Events: + require.NotNil(t, event) + case <-time.After(5 * time.Second): + t.Fatal("no event within 5s: skipOrphanedLines stalled on a sustained level-less stream") + } +} + func Test_createEvent(t *testing.T) { data := orderedmap.New[string, any]() data.Set("xyz", "value")