mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 07:00:25 +00:00
Fix silent failure for stale consumers
Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
+12
-3
@@ -7,6 +7,7 @@ package consumer
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -21,6 +22,7 @@ type GroupManager struct {
|
||||
groups map[string]*Group
|
||||
consumerStore queueStorage.ConsumerStore
|
||||
heartbeatTimeout time.Duration
|
||||
logger *slog.Logger
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
mu sync.RWMutex
|
||||
@@ -34,6 +36,7 @@ func NewGroupManager(queueName string, consumerStore queueStorage.ConsumerStore,
|
||||
groups: make(map[string]*Group),
|
||||
consumerStore: consumerStore,
|
||||
heartbeatTimeout: heartbeatTimeout,
|
||||
logger: slog.Default(),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
@@ -206,10 +209,16 @@ func (cgm *GroupManager) checkStaleConsumers() {
|
||||
consumers := group.ListConsumers()
|
||||
for _, consumer := range consumers {
|
||||
if now.Sub(consumer.LastHeartbeat) > timeout {
|
||||
// Consumer heartbeat is stale, remove it
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
_ = cgm.RemoveConsumer(ctx, group.ID(), consumer.ID)
|
||||
cancel() // Always cancel immediately after use to prevent context leak
|
||||
if err := cgm.RemoveConsumer(ctx, group.ID(), consumer.ID); err != nil {
|
||||
cgm.logger.Warn("failed to remove stale consumer, phantom consumer may persist",
|
||||
slog.String("queue", cgm.queueName),
|
||||
slog.String("group", group.ID()),
|
||||
slog.String("consumer", consumer.ID),
|
||||
slog.Duration("stale_for", now.Sub(consumer.LastHeartbeat)),
|
||||
slog.String("error", err.Error()))
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+7
-1
@@ -225,6 +225,10 @@ func (m *Manager) scheduleQueueDelivery(queueName string) {
|
||||
select {
|
||||
case m.deliveryQueue <- queueName:
|
||||
default:
|
||||
// Channel full — delivery trigger dropped. The 1-second sweep ticker
|
||||
// will reschedule, but there's a gap where this queue won't get delivered.
|
||||
m.logger.Warn("delivery channel full, dropping trigger (will retry on next sweep)",
|
||||
slog.String("queue", queueName))
|
||||
// Prevent a dropped enqueue from getting stuck in the dedupe set forever.
|
||||
m.markQueueDelivered(queueName)
|
||||
}
|
||||
@@ -786,8 +790,10 @@ func (m *Manager) Unsubscribe(ctx context.Context, queueName, pattern string, cl
|
||||
|
||||
// Unregister consumer locally
|
||||
if err := m.consumerManager.UnregisterConsumer(ctx, queueName, patternGroupID, clientID); err != nil {
|
||||
m.logger.Debug("unregister consumer error",
|
||||
m.logger.Warn("failed to unregister consumer, may become phantom",
|
||||
slog.String("error", err.Error()),
|
||||
slog.String("queue", queueName),
|
||||
slog.String("group", patternGroupID),
|
||||
slog.String("client", clientID))
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user