Reject stream message and advance

Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
dusan
2026-02-09 21:05:40 +01:00
parent 565dcd5e20
commit 4cdd74cfca
2 changed files with 65 additions and 2 deletions
+32 -2
View File
@@ -956,7 +956,7 @@ func (m *Manager) Reject(ctx context.Context, queueName, messageID, groupID, rea
if groupID != "" {
if group, err := m.groupStore.GetConsumerGroup(ctx, queueName, groupID); err == nil {
if group.Mode == types.GroupModeStream {
m.scheduleQueueDelivery(queueName)
m.rejectStream(ctx, queueName, group, offset, reason)
return nil
}
}
@@ -972,7 +972,7 @@ func (m *Manager) Reject(ctx context.Context, queueName, messageID, groupID, rea
continue
}
if group.Mode == types.GroupModeStream {
m.scheduleQueueDelivery(queueName)
m.rejectStream(ctx, queueName, group, offset, reason)
return nil
}
@@ -989,6 +989,36 @@ func (m *Manager) Reject(ctx context.Context, queueName, messageID, groupID, rea
return consumer.ErrMessageNotPending
}
// rejectStream handles reject for stream-mode consumer groups.
// Stream queues don't have PEL, so reject advances the cursor past the
// rejected message (same as ack) to prevent infinite redelivery.
func (m *Manager) rejectStream(ctx context.Context, queueName string, group *types.ConsumerGroup, offset uint64, reason string) {
cursor := group.GetCursor()
next := offset + 1
if next > cursor.Cursor {
if err := m.groupStore.UpdateCursor(ctx, queueName, group.ID, next); err != nil {
m.logger.Warn("failed to update stream cursor on reject",
slog.String("queue", queueName),
slog.String("group", group.ID),
slog.String("error", err.Error()))
}
if err := m.groupStore.UpdateCommitted(ctx, queueName, group.ID, next); err != nil {
m.logger.Warn("failed to update stream committed offset on reject",
slog.String("queue", queueName),
slog.String("group", group.ID),
slog.String("error", err.Error()))
}
}
m.logger.Info("stream message rejected",
slog.String("queue", queueName),
slog.String("group", group.ID),
slog.Uint64("offset", offset),
slog.String("reason", reason))
m.metrics.RecordReject()
m.scheduleQueueDelivery(queueName)
}
// --- Heartbeat ---
// UpdateHeartbeat updates the heartbeat for a consumer.
+33
View File
@@ -425,6 +425,39 @@ func TestStreamAckAdvancesCursor(t *testing.T) {
}
}
func TestStreamRejectAdvancesCursor(t *testing.T) {
logStore := memlog.New()
groupStore := newMockGroupStore()
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
mgr := NewManager(logStore, groupStore, nil, DefaultConfig(), logger, nil)
queueCfg := types.DefaultQueueConfig("events", "$queue/events/#")
queueCfg.Type = types.QueueTypeStream
if err := mgr.CreateQueue(context.Background(), queueCfg); err != nil {
t.Fatalf("CreateQueue failed: %v", err)
}
cursor := &types.CursorOption{Position: types.CursorEarliest, Mode: types.GroupModeStream}
if err := mgr.SubscribeWithCursor(context.Background(), "events", "", "client-1", "streamer", "", cursor); err != nil {
t.Fatalf("SubscribeWithCursor failed: %v", err)
}
if err := mgr.Reject(context.Background(), "events", "events:0", "streamer", "bad message"); err != nil {
t.Fatalf("Reject failed: %v", err)
}
group, err := groupStore.GetConsumerGroup(context.Background(), "events", "streamer")
if err != nil {
t.Fatalf("GetConsumerGroup failed: %v", err)
}
if c := group.GetCursor().Cursor; c != 1 {
t.Fatalf("expected cursor 1 after reject, got %d", c)
}
if c := group.GetCursor().Committed; c != 1 {
t.Fatalf("expected committed 1 after reject, got %d", c)
}
}
func TestRetentionOffsetMessages(t *testing.T) {
logStore := memlog.New()
groupStore := newMockGroupStore()