Set PEL size limit

Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
dusan
2026-02-09 23:00:41 +01:00
parent 3681f881d4
commit da51648874
3 changed files with 85 additions and 0 deletions
+15
View File
@@ -23,6 +23,7 @@ var (
ErrInvalidOffset = errors.New("invalid offset")
ErrGroupModeMismatch = errors.New("consumer group mode mismatch")
ErrCommitOffsetOnlyForStreamMode = errors.New("commit offset only supported for stream groups")
ErrPELFull = errors.New("pending entry list at capacity")
)
// Manager handles consumer group operations including claiming,
@@ -53,6 +54,11 @@ type Config struct {
// AutoCommitInterval controls how often stream groups auto-commit offsets.
// Zero means commit on every delivery batch.
AutoCommitInterval time.Duration
// MaxPELSize is the maximum number of pending entries per consumer group.
// When reached, new claims are rejected until entries are acknowledged.
// Zero means unlimited (not recommended for production).
MaxPELSize int
}
// DefaultConfig returns default manager configuration.
@@ -63,6 +69,7 @@ func DefaultConfig() Config {
ClaimBatchSize: 10,
StealBatchSize: 5,
AutoCommitInterval: 5 * time.Second,
MaxPELSize: 100_000,
}
}
@@ -285,6 +292,14 @@ func (m *Manager) ClaimBatchStream(ctx context.Context, queueName, groupID, cons
// claimFromCursor tries to claim a message from the cursor position.
func (m *Manager) claimFromCursor(ctx context.Context, group *types.ConsumerGroup, consumerID string, filter *Filter) (*types.Message, error) {
// Check PEL capacity before claiming
if m.config.MaxPELSize > 0 {
pelCount := group.PendingCount()
if pelCount >= m.config.MaxPELSize {
return nil, ErrPELFull
}
}
cursor := group.GetCursor()
// Get log tail
+6
View File
@@ -81,6 +81,9 @@ type Config struct {
StealInterval time.Duration
StealEnabled bool
// PEL configuration
MaxPELSize int
// Retention configuration
RetentionCheckInterval time.Duration
@@ -103,6 +106,7 @@ func DefaultConfig() Config {
DeliveryBatchSize: 100,
HeartbeatInterval: 10 * time.Second,
ConsumerTimeout: 2 * time.Minute,
MaxPELSize: 100_000,
DLQTopicPrefix: "$dlq/",
StealInterval: 5 * time.Second,
StealEnabled: true,
@@ -127,6 +131,7 @@ func NewManager(queueStore storage.QueueStore, groupStore storage.ConsumerGroupS
ClaimBatchSize: config.ClaimBatchSize,
StealBatchSize: 5,
AutoCommitInterval: config.AutoCommitInterval,
MaxPELSize: config.MaxPELSize,
}
consumerMgr := consumer.NewManager(queueStore, groupStore, consumerCfg)
@@ -894,6 +899,7 @@ func (m *Manager) Ack(ctx context.Context, queueName, messageID, groupID string)
err := m.consumerManager.Ack(ctx, queueName, group.ID, consumerID, offset)
if err == nil {
m.metrics.RecordAck(0)
m.metrics.UpdatePELSize(uint64(group.PendingCount()))
m.scheduleQueueDelivery(queueName)
return nil
}
+64
View File
@@ -5,6 +5,7 @@ package queue
import (
"context"
"fmt"
"io"
"log/slog"
"reflect"
@@ -15,6 +16,7 @@ import (
"github.com/absmach/fluxmq/cluster"
clusterv1 "github.com/absmach/fluxmq/pkg/proto/cluster/v1"
"github.com/absmach/fluxmq/queue/consumer"
queueraft "github.com/absmach/fluxmq/queue/raft"
"github.com/absmach/fluxmq/queue/storage"
memlog "github.com/absmach/fluxmq/queue/storage/memory/log"
@@ -1738,3 +1740,65 @@ func TestUpdateHeartbeatRemovesStaleTrackedTargets(t *testing.T) {
t.Fatalf("expected stale tracked target to be removed after heartbeat update, got %d entries", len(targets))
}
}
func TestPELCapRejectsClaim(t *testing.T) {
logStore := memlog.New()
groupStore := newMockGroupStore()
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
config := DefaultConfig()
config.MaxPELSize = 3
mgr := NewManager(logStore, groupStore, nil, config, logger, nil)
ctx := context.Background()
queueCfg := types.DefaultQueueConfig("pelcap", "$queue/pelcap/#")
if err := mgr.CreateQueue(ctx, queueCfg); err != nil {
t.Fatalf("CreateQueue failed: %v", err)
}
// Publish more messages than MaxPELSize
for i := 0; i < 5; i++ {
if err := mgr.Publish(ctx, types.PublishRequest{
Topic: "$queue/pelcap/test",
Payload: []byte("msg"),
}); err != nil {
t.Fatalf("Publish failed: %v", err)
}
}
// Set up consumer group + consumer via Subscribe
if err := mgr.Subscribe(ctx, "pelcap", "", "c1", "g1", ""); err != nil {
t.Fatalf("Subscribe failed: %v", err)
}
// Claim exactly MaxPELSize messages (should succeed)
msgs, err := mgr.consumerManager.ClaimBatch(ctx, "pelcap", "g1", "c1", nil, 3)
if err != nil {
t.Fatalf("ClaimBatch should succeed: %v", err)
}
if len(msgs) != 3 {
t.Fatalf("expected 3 messages, got %d", len(msgs))
}
// Next claim should fail — PEL is full, so ClaimBatch returns ErrNoMessages
_, err = mgr.consumerManager.ClaimBatch(ctx, "pelcap", "g1", "c1", nil, 1)
if err != consumer.ErrNoMessages {
t.Fatalf("expected ErrNoMessages (PEL full), got: %v", err)
}
// Ack one message to free PEL space (message ID format is queueName:offset)
ackID := fmt.Sprintf("pelcap:%d", msgs[0].Sequence)
if err := mgr.Ack(ctx, "pelcap", ackID, "g1"); err != nil {
t.Fatalf("Ack failed: %v", err)
}
// Now claim should succeed again
msgs2, err := mgr.consumerManager.ClaimBatch(ctx, "pelcap", "g1", "c1", nil, 1)
if err != nil {
t.Fatalf("ClaimBatch after ack should succeed: %v", err)
}
if len(msgs2) != 1 {
t.Fatalf("expected 1 message after ack, got %d", len(msgs2))
}
}