fix: live log view stalls on busy containers with rotated logs (#4776)
Deploy VitePress site to Pages / build (push) Has been cancelled
Deploy VitePress site to Pages / Deploy (push) Has been cancelled
Push container / Push branches and PRs (push) Has been cancelled
Test / Typecheck (push) Has been cancelled
Test / JavaScript Tests (push) Has been cancelled
Test / Go Tests (push) Has been cancelled
Test / Go Staticcheck (push) Has been cancelled
Test / Integration Tests (push) Has been cancelled

This commit is contained in:
Amir Raminfar
2026-06-02 08:05:37 -07:00
committed by GitHub
parent 87bc48338d
commit fd0a4850f0
2 changed files with 64 additions and 4 deletions
+20 -4
View File
@@ -98,13 +98,22 @@ func (g *EventGenerator) emitAsSingles(events []*LogEvent) bool {
return true
}
// maxOrphanLines bounds how many leading lines skipOrphanedLines will buffer
// before giving up. A genuine orphan run is the tail of a single group split at
// a fetch boundary, which is small. A busy container streaming sustained
// level-less lines (all spaced under maxGroupTimeDelta) would otherwise look
// like one endless orphan run and buffer forever, so the live view shows
// nothing until a timing gap appears. Past this many lines it clearly isn't a
// leftover fragment — emit what we have and resume normal processing.
const maxOrphanLines = 1000
// skipOrphanedLines drains leading simple events without a level that look
// like orphaned continuation lines from a group already emitted in a prior
// fetch. Returns the first non-orphan event (or nil if the stream ends).
// If no non-orphan event arrives (stream ends or times out waiting), the
// buffered events are emitted as singles — they weren't really orphans.
// Lines near the container start time are never skipped since nothing can
// precede them.
// If no non-orphan event arrives (stream ends, times out waiting, or the run
// exceeds maxOrphanLines), the buffered events are emitted as singles — they
// weren't really orphans. Lines near the container start time are never
// skipped since nothing can precede them.
func (g *EventGenerator) skipOrphanedLines() *LogEvent {
var orphanBuffer []*LogEvent
var lastTimestamp int64
@@ -148,6 +157,13 @@ func (g *EventGenerator) skipOrphanedLines() *LogEvent {
lastTimestamp = current.Timestamp
orphanBuffer = append(orphanBuffer, current)
// A sustained run this long isn't a leftover group fragment — it's real
// content (a busy container). Stop skipping and emit it, then resume.
if len(orphanBuffer) >= maxOrphanLines {
g.emitAsSingles(orphanBuffer)
return g.nextEvent()
}
// Use peek (with timeout) so we don't block forever on a live stream.
if next := g.peek(); next == nil {
// No more events within the timeout — these aren't orphans.
@@ -88,6 +88,50 @@ func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
}
}
// steadyLevellessReader emits simple, level-less, timestamped lines forever,
// spaced under both maxGroupTimeDelta (so they look groupable) and the peek
// timeout (so peek never reports a gap). It stops when ctx is cancelled.
type steadyLevellessReader struct {
ctx context.Context
base time.Time
step time.Duration // timestamp spacing between consecutive lines
delay time.Duration // wall-clock spacing between Read() calls
i int
}
func (r *steadyLevellessReader) Read() (string, StdType, error) {
select {
case <-r.ctx.Done():
return "", 0, io.EOF
case <-time.After(r.delay):
}
ts := r.base.Add(time.Duration(r.i) * r.step).Format(time.RFC3339Nano)
r.i++
return ts + " lorem ipsum dolor sit amet", STDOUT, nil
}
func TestEventGenerator_doesNotStallOnSustainedLevellessStream(t *testing.T) {
// Reproduces the live-log stall: a busy container whose backlog has rotated
// away streams from a point far past its start, so skipOrphanedLines never
// short-circuits. With sustained level-less lines spaced under
// maxGroupTimeDelta, every line looks like an orphaned continuation, so the
// skip loop buffers forever and the UI shows "no logs". The generator must
// give up and emit instead.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
reader := &steadyLevellessReader{ctx: ctx, base: time.Now(), step: time.Millisecond, delay: time.Millisecond}
// startedAt far in the past so the near-start short-circuit cannot fire
g := NewEventGenerator(ctx, reader, Container{StartedAt: time.Now().Add(-time.Hour)})
select {
case event := <-g.Events:
require.NotNil(t, event)
case <-time.After(5 * time.Second):
t.Fatal("no event within 5s: skipOrphanedLines stalled on a sustained level-less stream")
}
}
func Test_createEvent(t *testing.T) {
data := orderedmap.New[string, any]()
data.Set("xyz", "value")