Fix queue manager (#20)

* Fix multiple log stores in main and topic index

Signed-off-by: dusan <borovcanindusan1@gmail.com>

* Fix dangling queue deliveries

Signed-off-by: dusan <borovcanindusan1@gmail.com>

* Simplify lease takeover

Signed-off-by: dusan <borovcanindusan1@gmail.com>

* Make use more responsive

Signed-off-by: dusan <borovcanindusan1@gmail.com>

* Simplify consumer tracking in queue

Signed-off-by: dusan <borovcanindusan1@gmail.com>

---------

Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
Dušan Borovčanin
2026-02-07 21:18:16 +01:00
committed by GitHub
parent 8262cef0e5
commit 72650fed37
13 changed files with 1262 additions and 222 deletions
+86
View File
@@ -4,9 +4,12 @@
package cluster_test
import (
"context"
"fmt"
"testing"
"time"
qtypes "github.com/absmach/fluxmq/queue/types"
"github.com/absmach/fluxmq/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -129,6 +132,89 @@ func TestCrossNode_QoS2_PublishSubscribe(t *testing.T) {
assert.Equal(t, byte(2), msg.QoS)
}
// TestCrossNode_StreamReplayFromFirstOffsetAfterLateConsumer mirrors the reported cluster stream issue:
// publish stream messages first, then attach a stream consumer at earliest offset and verify replay.
func TestCrossNode_StreamReplayFromFirstOffsetAfterLateConsumer(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
cluster := testutil.NewTestCluster(t, 3)
defer cluster.Stop()
require.NoError(t, cluster.Start())
require.NoError(t, cluster.WaitForClusterReady(30*time.Second))
pubNode := cluster.GetNodeByIndex(0)
subNode := cluster.GetNodeByIndex(2)
require.NotNil(t, pubNode)
require.NotNil(t, subNode)
publisher := testutil.NewTestMQTTClient(t, pubNode, "stream-pub-late")
require.NoError(t, publisher.Connect(true))
defer publisher.Disconnect()
consumer := testutil.NewTestMQTTClient(t, subNode, "stream-sub-late")
require.NoError(t, consumer.Connect(true))
defer consumer.Disconnect()
pubQM := pubNode.Broker.GetQueueManager()
subQM := subNode.Broker.GetQueueManager()
require.NotNil(t, pubQM)
require.NotNil(t, subQM)
queueName := fmt.Sprintf("demo-events-%d", time.Now().UnixNano())
queueTopic := "$queue/" + queueName
ctx := context.Background()
queueCfg := qtypes.DefaultQueueConfig(queueName, queueTopic+"/#")
queueCfg.Type = qtypes.QueueTypeStream
require.NoError(t, pubQM.CreateQueue(ctx, queueCfg))
const messageCount = 5
expectedPayloads := make([][]byte, 0, messageCount)
for i := 1; i <= messageCount; i++ {
payload := []byte(fmt.Sprintf(`{"event":"user.action","seq":%d}`, i))
expectedPayloads = append(expectedPayloads, payload)
require.NoError(t, publisher.Publish(queueTopic, 1, payload, false))
}
// Ensure publishes land before stream consumer registration.
time.Sleep(300 * time.Millisecond)
cursor := &qtypes.CursorOption{
Position: qtypes.CursorEarliest,
Mode: qtypes.GroupModeStream,
}
require.NoError(t, subQM.SubscribeWithCursor(ctx, queueName, "#", consumer.ClientID, "demo-readers", "", cursor))
msgs, err := consumer.WaitForMessages(messageCount, 12*time.Second)
require.NoError(t, err)
require.Len(t, msgs, messageCount)
for i, msg := range msgs {
assert.Equal(t, queueTopic, msg.Topic)
assert.Equal(t, expectedPayloads[i], msg.Payload)
}
deliveries := subNode.QueueDeliveries()
offsets := make([]string, 0, messageCount)
for _, d := range deliveries {
if d.ClientID != consumer.ClientID || d.Message == nil {
continue
}
if d.Message.Properties["queue"] != queueName {
continue
}
offset := d.Message.Properties["x-stream-offset"]
if offset != "" {
offsets = append(offsets, offset)
}
}
require.GreaterOrEqual(t, len(offsets), messageCount)
assert.Equal(t, []string{"0", "1", "2", "3", "4"}, offsets[:messageCount])
}
// TestCrossNode_MultipleSubscribers verifies message delivery to multiple subscribers on different nodes.
func TestCrossNode_MultipleSubscribers(t *testing.T) {
if testing.Short() {
+109 -24
View File
@@ -6,6 +6,7 @@ package cluster
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/url"
@@ -48,6 +49,8 @@ type EtcdCluster struct {
// Lease for session ownership (with auto-renewal)
sessionLease clientv3.LeaseID
leaseMu sync.Mutex
leaseCancel context.CancelFunc
// gRPC transport for inter-broker communication
transport *Transport
@@ -76,6 +79,9 @@ type EtcdCluster struct {
wg sync.WaitGroup
stopCh chan struct{}
lifecycleCtx context.Context
cancelLifecycle context.CancelFunc
}
// EtcdConfig holds embedded etcd configuration.
@@ -197,26 +203,12 @@ func NewEtcdCluster(cfg *EtcdConfig, localStore storage.Store, logger *slog.Logg
localStore: localStore,
stopCh: make(chan struct{}),
}
c.lifecycleCtx, c.cancelLifecycle = context.WithCancel(context.Background())
// Create a lease for session ownership with auto-renewal
leaseResp, err := client.Grant(context.Background(), 30) // 30 second TTL
if err != nil {
return nil, fmt.Errorf("failed to create lease: %w", err)
if err := c.refreshSessionLease(context.Background()); err != nil {
return nil, fmt.Errorf("failed to create session lease: %w", err)
}
c.sessionLease = leaseResp.ID
// Keep lease alive
ch, err := client.KeepAlive(context.Background(), c.sessionLease)
if err != nil {
return nil, fmt.Errorf("failed to keep lease alive: %w", err)
}
// Consume keepalive responses in background
go func() {
for range ch {
// Lease kept alive
}
}()
// Initialize gRPC transport if configured
if cfg.TransportAddr != "" {
@@ -325,8 +317,18 @@ func (c *EtcdCluster) Start() error {
// Stop gracefully shuts down the cluster.
func (c *EtcdCluster) Stop() error {
close(c.stopCh)
if c.cancelLifecycle != nil {
c.cancelLifecycle()
}
c.wg.Wait()
c.leaseMu.Lock()
if c.leaseCancel != nil {
c.leaseCancel()
c.leaseCancel = nil
}
c.leaseMu.Unlock()
// Stop gRPC transport
if c.transport != nil {
c.transport.Stop()
@@ -473,7 +475,7 @@ func (c *EtcdCluster) loadRetainedCache() error {
// watchRetained watches etcd for retained message changes and updates the local cache.
func (c *EtcdCluster) watchRetained() {
for {
watchCh := c.client.Watch(context.Background(), retainedPrefix, clientv3.WithPrefix())
watchCh := c.client.Watch(c.lifecycleCtx, retainedPrefix, clientv3.WithPrefix())
for {
select {
@@ -481,6 +483,9 @@ func (c *EtcdCluster) watchRetained() {
return
case watchResp, ok := <-watchCh:
if !ok {
if c.lifecycleCtx.Err() != nil {
return
}
c.logger.Warn("retained watch channel closed, reloading cache")
if err := c.loadRetainedCache(); err != nil {
c.logger.Error("failed to reload retained messages", slog.String("error", err.Error()))
@@ -531,14 +536,20 @@ func (c *EtcdCluster) campaignLeader() {
// This prevents racing to campaign before the cluster is ready
time.Sleep(3 * time.Second)
ctx := context.Background()
ctx := c.lifecycleCtx
retryDelay := 2 * time.Second
maxRetryDelay := 30 * time.Second
for {
if ctx.Err() != nil {
return
}
c.logger.Info("Campaigning for leadership", slog.String("node_id", c.nodeID))
if err := c.election.Campaign(ctx, c.nodeID); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || ctx.Err() != nil {
return
}
c.logger.Warn("Failed to campaign for leader",
slog.String("node_id", c.nodeID),
slog.String("error", err.Error()),
@@ -557,6 +568,8 @@ func (c *EtcdCluster) campaignLeader() {
case <-c.stopCh:
c.logger.Info("Cluster stopping, ending campaign", slog.String("node_id", c.nodeID))
return
case <-ctx.Done():
return
case <-time.After(retryDelay):
// Exponential backoff with max cap
retryDelay *= 2
@@ -597,6 +610,77 @@ func (c *EtcdCluster) recreateSessionAndElection() error {
return nil
}
func (c *EtcdCluster) refreshSessionLease(ctx context.Context) error {
c.leaseMu.Lock()
defer c.leaseMu.Unlock()
return c.refreshSessionLeaseLocked(ctx)
}
func (c *EtcdCluster) refreshSessionLeaseLocked(ctx context.Context) error {
leaseResp, err := c.client.Grant(ctx, 30)
if err != nil {
return fmt.Errorf("failed to create lease: %w", err)
}
if c.leaseCancel != nil {
c.leaseCancel()
c.leaseCancel = nil
}
keepAliveCtx, cancel := context.WithCancel(context.Background())
ch, err := c.client.KeepAlive(keepAliveCtx, leaseResp.ID)
if err != nil {
cancel()
return fmt.Errorf("failed to keep lease alive: %w", err)
}
c.sessionLease = leaseResp.ID
c.leaseCancel = cancel
go func() {
for {
select {
case <-c.stopCh:
return
case _, ok := <-ch:
if !ok {
return
}
}
}
}()
return nil
}
func isLeaseNotFoundErr(err error) bool {
return err != nil && strings.Contains(err.Error(), "requested lease not found")
}
func (c *EtcdCluster) putWithSessionLease(ctx context.Context, key, value string) error {
c.leaseMu.Lock()
leaseID := c.sessionLease
c.leaseMu.Unlock()
_, err := c.client.Put(ctx, key, value, clientv3.WithLease(leaseID))
if err == nil {
return nil
}
if !isLeaseNotFoundErr(err) {
return err
}
if err := c.refreshSessionLease(ctx); err != nil {
return err
}
c.leaseMu.Lock()
leaseID = c.sessionLease
c.leaseMu.Unlock()
_, err = c.client.Put(ctx, key, value, clientv3.WithLease(leaseID))
return err
}
// AcquireSession registers this node as the owner of a session.
// Uses a leased Put so ownership auto-expires if this node dies.
// This is called after takeover has completed (if needed), so it's safe
@@ -604,8 +688,7 @@ func (c *EtcdCluster) recreateSessionAndElection() error {
func (c *EtcdCluster) AcquireSession(ctx context.Context, clientID, nodeID string) error {
key := sessionsPrefix + clientID + "/owner"
_, err := c.client.Put(ctx, key, nodeID, clientv3.WithLease(c.sessionLease))
if err != nil {
if err := c.putWithSessionLease(ctx, key, nodeID); err != nil {
return err
}
@@ -1117,8 +1200,7 @@ func (c *EtcdCluster) RegisterQueueConsumer(ctx context.Context, info *QueueCons
return fmt.Errorf("failed to marshal consumer info: %w", err)
}
_, err = c.client.Put(ctx, key, string(data))
if err != nil {
if err := c.putWithSessionLease(ctx, key, string(data)); err != nil {
return fmt.Errorf("failed to store consumer in etcd: %w", err)
}
@@ -1309,7 +1391,7 @@ func (c *EtcdCluster) loadSubscriptionCache() error {
// watchSubscriptions watches etcd for subscription changes and updates the local cache.
func (c *EtcdCluster) watchSubscriptions() {
for {
watchCh := c.client.Watch(context.Background(), subscriptionsPrefix, clientv3.WithPrefix())
watchCh := c.client.Watch(c.lifecycleCtx, subscriptionsPrefix, clientv3.WithPrefix())
for {
select {
@@ -1317,6 +1399,9 @@ func (c *EtcdCluster) watchSubscriptions() {
return
case watchResp, ok := <-watchCh:
if !ok {
if c.lifecycleCtx.Err() != nil {
return
}
// Watch channel closed, reload and restart
c.logger.Warn("subscription watch channel closed, reloading cache")
if err := c.loadSubscriptionCache(); err != nil {
+126
View File
@@ -0,0 +1,126 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package cluster
import (
"context"
"fmt"
"io"
"log/slog"
"net"
"path/filepath"
"testing"
"time"
"github.com/absmach/fluxmq/storage/memory"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func freeLocalPort(t *testing.T) int {
t.Helper()
ln, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
defer ln.Close()
return ln.Addr().(*net.TCPAddr).Port
}
func newSingleNodeEtcdCluster(t *testing.T) *EtcdCluster {
t.Helper()
peerPort := freeLocalPort(t)
clientPort := freeLocalPort(t)
nodeID := "lease-test-node"
cfg := &EtcdConfig{
NodeID: nodeID,
DataDir: filepath.Join(t.TempDir(), "etcd"),
BindAddr: fmt.Sprintf("127.0.0.1:%d", peerPort),
ClientAddr: fmt.Sprintf("127.0.0.1:%d", clientPort),
AdvertiseAddr: fmt.Sprintf("127.0.0.1:%d", peerPort),
InitialCluster: fmt.Sprintf("%s=http://127.0.0.1:%d", nodeID, peerPort),
Bootstrap: true,
}
c, err := NewEtcdCluster(cfg, memory.New(), slog.New(slog.NewTextHandler(io.Discard, nil)))
require.NoError(t, err)
require.NoError(t, c.Start())
t.Cleanup(func() {
_ = c.Stop()
})
return c
}
func TestRegisterQueueConsumerStoresLeasedKey(t *testing.T) {
c := newSingleNodeEtcdCluster(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
info := &QueueConsumerInfo{
QueueName: "events",
GroupID: "workers@#",
ConsumerID: "consumer-1",
ClientID: "client-1",
Pattern: "#",
Mode: "stream",
ProxyNodeID: c.nodeID,
RegisteredAt: time.Now(),
}
require.NoError(t, c.RegisterQueueConsumer(ctx, info))
key := fmt.Sprintf("%s%s/%s/%s", queueConsumersPrefix, info.QueueName, info.GroupID, info.ConsumerID)
resp, err := c.client.Get(ctx, key)
require.NoError(t, err)
require.Len(t, resp.Kvs, 1)
c.leaseMu.Lock()
leaseID := c.sessionLease
c.leaseMu.Unlock()
assert.NotZero(t, resp.Kvs[0].Lease)
assert.Equal(t, int64(leaseID), resp.Kvs[0].Lease)
}
func TestRegisterQueueConsumerRefreshesExpiredLease(t *testing.T) {
c := newSingleNodeEtcdCluster(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
c.leaseMu.Lock()
oldLease := c.sessionLease
c.leaseMu.Unlock()
_, err := c.client.Revoke(ctx, oldLease)
require.NoError(t, err)
info := &QueueConsumerInfo{
QueueName: "events",
GroupID: "workers@#",
ConsumerID: "consumer-2",
ClientID: "client-2",
Pattern: "#",
Mode: "stream",
ProxyNodeID: c.nodeID,
RegisteredAt: time.Now(),
}
require.NoError(t, c.RegisterQueueConsumer(ctx, info))
c.leaseMu.Lock()
newLease := c.sessionLease
c.leaseMu.Unlock()
assert.NotEqual(t, oldLease, newLease)
key := fmt.Sprintf("%s%s/%s/%s", queueConsumersPrefix, info.QueueName, info.GroupID, info.ConsumerID)
resp, err := c.client.Get(ctx, key)
require.NoError(t, err)
require.Len(t, resp.Kvs, 1)
assert.Equal(t, int64(newLease), resp.Kvs[0].Lease)
}
+23 -31
View File
@@ -305,6 +305,11 @@ func main() {
amqp091Broker := amqpbroker.New(nil, logger)
defer amqp091Broker.Close()
var (
qm *queue.Manager
queueLogStore *logStorage.Adapter
)
if metrics != nil {
amqpMetrics, err := amqp1broker.NewMetrics()
if err != nil {
@@ -324,12 +329,12 @@ func main() {
queueDir += "queue"
// Use file-based AOL storage (implements both LogStore and ConsumerGroupStore)
logStore, err := logStorage.NewAdapter(queueDir, logStorage.DefaultAdapterConfig())
queueLogStore, err = logStorage.NewAdapter(queueDir, logStorage.DefaultAdapterConfig())
if err != nil {
slog.Error("Failed to initialize queue log storage", "error", err)
os.Exit(1)
}
defer logStore.Close()
defer queueLogStore.Close()
// Convert queue configs from main config to queue types
queueCfg := queue.DefaultConfig()
@@ -372,9 +377,9 @@ func main() {
}
// Create log-based queue manager with wildcard support
qm := queue.NewManager(
logStore,
logStore,
qm = queue.NewManager(
queueLogStore,
queueLogStore,
deliverFn,
queueCfg,
logger,
@@ -399,8 +404,8 @@ func main() {
cfg.Cluster.NodeID,
cfg.Cluster.Raft.BindAddr,
cfg.Cluster.Raft.DataDir,
logStore,
logStore,
queueLogStore,
queueLogStore,
cfg.Cluster.Raft.Peers,
raftCfg,
logger,
@@ -725,32 +730,19 @@ func main() {
ShutdownTimeout: cfg.Server.ShutdownTimeout,
}
// Get the queue manager from the broker and log store
queueManager := b.GetQueueManager()
if qm, ok := queueManager.(*queue.Manager); ok {
queueDir := cfg.Storage.BadgerDir
if !strings.HasSuffix(queueDir, "/") {
queueDir += "/"
}
queueDir += "queue"
logStore, err := logStorage.NewAdapter(queueDir, logStorage.DefaultAdapterConfig())
if err == nil {
defer logStore.Close()
apiServer := api.New(apiCfg, qm, logStore, logStore, logger)
if qm != nil && queueLogStore != nil {
apiServer := api.New(apiCfg, qm, queueLogStore, queueLogStore, logger)
wg.Add(1)
go func() {
defer wg.Done()
slog.Info("Starting Queue API server", "address", cfg.Server.APIAddr)
if err := apiServer.Listen(ctx); err != nil {
serverErr <- err
}
}()
} else {
slog.Warn("Failed to create API server log store", "error", err)
}
wg.Add(1)
go func() {
defer wg.Done()
slog.Info("Starting Queue API server", "address", cfg.Server.APIAddr)
if err := apiServer.Listen(ctx); err != nil {
serverErr <- err
}
}()
} else {
slog.Warn("Queue manager not available or wrong type, API server disabled")
slog.Warn("Queue manager or log storage not available, API server disabled")
}
}
+8 -1
View File
@@ -133,7 +133,14 @@ func (a *Adapter) CreateQueue(ctx context.Context, config types.QueueConfig) err
// UpdateQueue updates an existing queue's configuration.
func (a *Adapter) UpdateQueue(ctx context.Context, config types.QueueConfig) error {
return a.queueStore.Save(config)
if err := a.queueStore.Save(config); err != nil {
return err
}
// Refresh topic index to ensure matcher reflects updated topic patterns.
a.topicIndex.AddQueue(config.Name, config.Topics)
return nil
}
// GetQueue retrieves a queue's configuration.
+29
View File
@@ -79,3 +79,32 @@ func TestAdapter_StreamCursorAndCommitDoNotRegress(t *testing.T) {
assert.Equal(t, uint64(7), got.GetCursor().Cursor)
assert.Equal(t, uint64(7), got.GetCursor().Committed)
}
func TestAdapter_UpdateQueueRefreshesTopicIndex(t *testing.T) {
dir := t.TempDir()
adapter, err := NewAdapter(dir, DefaultAdapterConfig())
require.NoError(t, err)
defer adapter.Close()
ctx := context.Background()
cfg := types.DefaultQueueConfig("orders", "$queue/orders/#")
require.NoError(t, adapter.CreateQueue(ctx, cfg))
matches, err := adapter.FindMatchingQueues(ctx, "$queue/orders/new")
require.NoError(t, err)
require.Len(t, matches, 1)
assert.Equal(t, "orders", matches[0])
cfg.Topics = []string{"$queue/payments/#"}
require.NoError(t, adapter.UpdateQueue(ctx, cfg))
matches, err = adapter.FindMatchingQueues(ctx, "$queue/orders/new")
require.NoError(t, err)
assert.Len(t, matches, 0)
matches, err = adapter.FindMatchingQueues(ctx, "$queue/payments/new")
require.NoError(t, err)
require.Len(t, matches, 1)
assert.Equal(t, "orders", matches[0])
}
+395 -136
View File
@@ -43,21 +43,37 @@ type Manager struct {
cluster cluster.Cluster
localNodeID string
// Active consumer groups: queueName -> groupID -> group state
groups sync.Map // map[string]*sync.Map
// Subscription patterns: clientID -> []pattern
subscriptions sync.Map // map[string][]string
// Lightweight heartbeat index keyed by client/queue/group.
// Stores only metadata needed to route heartbeat updates.
subscriptionsMu sync.RWMutex
subscriptions map[string]map[string]*subscriptionRef // clientID -> refKey -> ref
mu sync.RWMutex
stopCh chan struct{}
stopOnce sync.Once
wg sync.WaitGroup
deliveryMu sync.Mutex
deliverySet map[string]struct{}
deliveryQueue chan string
// Metrics
metrics *consumer.Metrics
}
type subscriptionRef struct {
queueName string
groupID string
refCount int
lastSeen time.Time
}
type subscriptionTarget struct {
key string
queueName string
groupID string
}
// Config holds configuration for the queue-based queue manager.
type Config struct {
// Consumer configuration
@@ -145,7 +161,10 @@ func NewManager(queueStore storage.QueueStore, groupStore storage.ConsumerGroupS
distributionMode: normalizeDistributionMode(config.DistributionMode),
cluster: cl,
localNodeID: localNodeID,
subscriptions: make(map[string]map[string]*subscriptionRef),
stopCh: make(chan struct{}),
deliverySet: make(map[string]struct{}),
deliveryQueue: make(chan string, 4096),
metrics: metrics,
}
}
@@ -165,6 +184,9 @@ func (m *Manager) Start(ctx context.Context) error {
// Cleanup ephemeral queues that expired while broker was down
m.cleanupEphemeralQueues()
// Prime delivery for existing queues at startup.
m.scheduleAllQueues(ctx)
// Start delivery workers
m.wg.Add(1)
go m.runDeliveryLoop()
@@ -191,6 +213,43 @@ func (m *Manager) Start(ctx context.Context) error {
return nil
}
func (m *Manager) scheduleAllQueues(ctx context.Context) {
queues, err := m.queueStore.ListQueues(ctx)
if err != nil {
return
}
for _, queueConfig := range queues {
m.scheduleQueueDelivery(queueConfig.Name)
}
}
func (m *Manager) scheduleQueueDelivery(queueName string) {
if queueName == "" {
return
}
m.deliveryMu.Lock()
if _, exists := m.deliverySet[queueName]; exists {
m.deliveryMu.Unlock()
return
}
m.deliverySet[queueName] = struct{}{}
m.deliveryMu.Unlock()
select {
case m.deliveryQueue <- queueName:
default:
// Prevent a dropped enqueue from getting stuck in the dedupe set forever.
m.markQueueDelivered(queueName)
}
}
func (m *Manager) markQueueDelivered(queueName string) {
m.deliveryMu.Lock()
delete(m.deliverySet, queueName)
m.deliveryMu.Unlock()
}
// ensureReservedQueues creates queues from config or the default mqtt queue if no config provided.
func (m *Manager) ensureReservedQueues(ctx context.Context) error {
// If no queue configs provided, use the default mqtt queue
@@ -251,6 +310,7 @@ func (m *Manager) CreateQueue(ctx context.Context, config types.QueueConfig) err
if err := m.queueStore.CreateQueue(ctx, config); err != nil {
return err
}
m.scheduleQueueDelivery(config.Name)
m.logger.Info("queue created",
slog.String("queue", config.Name),
@@ -289,7 +349,11 @@ func (m *Manager) GetOrCreateQueue(ctx context.Context, queueName string, topics
// DeleteQueue deletes a queue.
func (m *Manager) DeleteQueue(ctx context.Context, queueName string) error {
return m.queueStore.DeleteQueue(ctx, queueName)
if err := m.queueStore.DeleteQueue(ctx, queueName); err != nil {
return err
}
m.markQueueDelivered(queueName)
return nil
}
// GetQueue returns the configuration for a queue.
@@ -423,6 +487,8 @@ func (m *Manager) publishLocal(ctx context.Context, publish types.PublishRequest
slog.String("queue", queueName),
slog.String("topic", publish.Topic),
slog.Uint64("offset", offset))
m.scheduleQueueDelivery(queueName)
}
return nil
@@ -635,7 +701,7 @@ func (m *Manager) SubscribeWithCursor(ctx context.Context, queueName, pattern st
}
}
m.trackSubscription(clientID, fmt.Sprintf("%s/%s", queueName, pattern))
m.trackSubscription(clientID, queueName, patternGroupID)
m.logger.Info("consumer subscribed with cursor",
slog.String("queue", queueName),
@@ -644,6 +710,8 @@ func (m *Manager) SubscribeWithCursor(ctx context.Context, queueName, pattern st
slog.String("cursor", fmt.Sprintf("%d", cursor.Position)),
slog.String("mode", string(mode)))
m.scheduleQueueDelivery(queueName)
return nil
}
@@ -706,7 +774,7 @@ func (m *Manager) Subscribe(ctx context.Context, queueName, pattern string, clie
}
// Track subscription
m.trackSubscription(clientID, fmt.Sprintf("%s/%s", queueName, pattern))
m.trackSubscription(clientID, queueName, patternGroupID)
m.logger.Info("consumer subscribed",
slog.String("queue", queueName),
@@ -714,6 +782,8 @@ func (m *Manager) Subscribe(ctx context.Context, queueName, pattern string, clie
slog.String("client", clientID),
slog.String("pattern", pattern))
m.scheduleQueueDelivery(queueName)
return nil
}
@@ -745,7 +815,7 @@ func (m *Manager) Unsubscribe(ctx context.Context, queueName, pattern string, cl
}
// Untrack subscription
m.untrackSubscription(clientID, fmt.Sprintf("%s/%s", queueName, pattern))
m.untrackSubscription(clientID, queueName, patternGroupID)
// Track last consumer disconnect for ephemeral queues
m.checkEphemeralDisconnect(ctx, queueName)
@@ -755,6 +825,8 @@ func (m *Manager) Unsubscribe(ctx context.Context, queueName, pattern string, cl
slog.String("group", patternGroupID),
slog.String("client", clientID))
m.scheduleQueueDelivery(queueName)
return nil
}
@@ -787,6 +859,7 @@ func (m *Manager) Ack(ctx context.Context, queueName, messageID, groupID string)
slog.String("error", err.Error()))
}
}
m.scheduleQueueDelivery(queueName)
return nil
}
}
@@ -820,6 +893,7 @@ func (m *Manager) Ack(ctx context.Context, queueName, messageID, groupID string)
slog.String("error", err.Error()))
}
}
m.scheduleQueueDelivery(queueName)
return nil
}
@@ -828,6 +902,7 @@ func (m *Manager) Ack(ctx context.Context, queueName, messageID, groupID string)
err := m.consumerManager.Ack(ctx, queueName, group.ID, consumerID, offset)
if err == nil {
m.metrics.RecordAck(0)
m.scheduleQueueDelivery(queueName)
return nil
}
}
@@ -846,6 +921,7 @@ func (m *Manager) Nack(ctx context.Context, queueName, messageID, groupID string
if groupID != "" {
if group, err := m.groupStore.GetConsumerGroup(ctx, queueName, groupID); err == nil {
if group.Mode == types.GroupModeStream {
m.scheduleQueueDelivery(queueName)
return nil
}
}
@@ -861,6 +937,7 @@ func (m *Manager) Nack(ctx context.Context, queueName, messageID, groupID string
continue
}
if group.Mode == types.GroupModeStream {
m.scheduleQueueDelivery(queueName)
return nil
}
@@ -868,6 +945,7 @@ func (m *Manager) Nack(ctx context.Context, queueName, messageID, groupID string
err := m.consumerManager.Nack(ctx, queueName, group.ID, consumerID, offset)
if err == nil {
m.metrics.RecordNack()
m.scheduleQueueDelivery(queueName)
return nil
}
}
@@ -886,6 +964,7 @@ func (m *Manager) Reject(ctx context.Context, queueName, messageID, groupID, rea
if groupID != "" {
if group, err := m.groupStore.GetConsumerGroup(ctx, queueName, groupID); err == nil {
if group.Mode == types.GroupModeStream {
m.scheduleQueueDelivery(queueName)
return nil
}
}
@@ -901,6 +980,7 @@ func (m *Manager) Reject(ctx context.Context, queueName, messageID, groupID, rea
continue
}
if group.Mode == types.GroupModeStream {
m.scheduleQueueDelivery(queueName)
return nil
}
@@ -908,6 +988,7 @@ func (m *Manager) Reject(ctx context.Context, queueName, messageID, groupID, rea
err := m.consumerManager.Reject(ctx, queueName, group.ID, consumerID, offset, reason)
if err == nil {
m.metrics.RecordReject()
m.scheduleQueueDelivery(queueName)
return nil
}
}
@@ -920,28 +1001,28 @@ func (m *Manager) Reject(ctx context.Context, queueName, messageID, groupID, rea
// UpdateHeartbeat updates the heartbeat for a consumer.
func (m *Manager) UpdateHeartbeat(ctx context.Context, clientID string) error {
m.subscriptions.Range(func(key, value any) bool {
if key.(string) != clientID {
return true
targets := m.getSubscriptionTargets(clientID)
if len(targets) == 0 {
return nil
}
now := time.Now()
var staleKeys []string
for _, target := range targets {
err := m.consumerManager.UpdateHeartbeat(ctx, target.queueName, target.groupID, clientID)
if err == nil {
m.touchSubscription(clientID, target.key, now)
continue
}
patterns := value.([]string)
for _, filter := range patterns {
queueName, pattern := parseSubscriptionFilter(filter)
if queueName == "" {
continue
}
groupID := extractGroupFromClientID(clientID)
if pattern != "" {
groupID = fmt.Sprintf("%s@%s", groupID, pattern)
}
m.consumerManager.UpdateHeartbeat(ctx, queueName, groupID, clientID)
if err == storage.ErrConsumerNotFound || err == consumer.ErrConsumerNotFound {
staleKeys = append(staleKeys, target.key)
}
}
if len(staleKeys) > 0 {
m.removeSubscriptionKeys(clientID, staleKeys)
}
return true
})
return nil
}
@@ -950,15 +1031,22 @@ func (m *Manager) UpdateHeartbeat(ctx context.Context, clientID string) error {
func (m *Manager) runDeliveryLoop() {
defer m.wg.Done()
ticker := time.NewTicker(m.config.DeliveryInterval)
sweepInterval := time.Second
ticker := time.NewTicker(sweepInterval)
defer ticker.Stop()
for {
select {
case <-m.stopCh:
return
case queueName := <-m.deliveryQueue:
m.markQueueDelivered(queueName)
if m.deliverQueue(context.Background(), queueName) {
// Continue draining while this queue still has deliverable work.
m.scheduleQueueDelivery(queueName)
}
case <-ticker.C:
m.deliverMessages()
m.scheduleAllQueues(context.Background())
}
}
}
@@ -971,47 +1059,73 @@ func (m *Manager) deliverMessages() {
return
}
for _, queueConfig := range queues {
primaryGroup := strings.TrimSpace(queueConfig.PrimaryGroup)
primaryCommitted := make(map[string]uint64)
getPrimaryCommitted := func(pattern string) (uint64, bool) {
if primaryGroup == "" {
return 0, false
}
for i := range queues {
m.deliverQueueConfig(ctx, &queues[i])
}
}
patternGroupID := primaryGroup
if pattern != "" {
patternGroupID = fmt.Sprintf("%s@%s", primaryGroup, pattern)
}
func (m *Manager) deliverQueue(ctx context.Context, queueName string) bool {
if queueName == "" {
return false
}
if val, ok := primaryCommitted[patternGroupID]; ok {
return val, true
}
queueConfig, err := m.queueStore.GetQueue(ctx, queueName)
if err != nil {
return false
}
committed, err := m.consumerManager.GetCommittedOffset(ctx, queueConfig.Name, patternGroupID)
if err != nil {
return 0, false
}
return m.deliverQueueConfig(ctx, queueConfig)
}
primaryCommitted[patternGroupID] = committed
return committed, true
func (m *Manager) deliverQueueConfig(ctx context.Context, queueConfig *types.QueueConfig) bool {
if queueConfig == nil {
return false
}
delivered := false
primaryGroup := strings.TrimSpace(queueConfig.PrimaryGroup)
primaryCommitted := make(map[string]uint64)
getPrimaryCommitted := func(pattern string) (uint64, bool) {
if primaryGroup == "" {
return 0, false
}
// Deliver to local consumer groups
groups, err := m.groupStore.ListConsumerGroups(ctx, queueConfig.Name)
patternGroupID := primaryGroup
if pattern != "" {
patternGroupID = fmt.Sprintf("%s@%s", primaryGroup, pattern)
}
if val, ok := primaryCommitted[patternGroupID]; ok {
return val, true
}
committed, err := m.consumerManager.GetCommittedOffset(ctx, queueConfig.Name, patternGroupID)
if err != nil {
continue
return 0, false
}
primaryCommitted[patternGroupID] = committed
return committed, true
}
// Deliver to local consumer groups.
groups, err := m.groupStore.ListConsumerGroups(ctx, queueConfig.Name)
if err == nil {
for _, group := range groups {
m.deliverToGroup(ctx, &queueConfig, group, getPrimaryCommitted)
}
// Also deliver to remote consumers registered in cluster
if m.cluster != nil && m.distributionMode == DistributionForward {
m.deliverToRemoteConsumers(ctx, &queueConfig)
if m.deliverToGroup(ctx, queueConfig, group, getPrimaryCommitted) {
delivered = true
}
}
}
// Deliver to remote consumers registered in cluster.
if m.cluster != nil && m.distributionMode == DistributionForward {
if m.deliverToRemoteConsumers(ctx, queueConfig) {
delivered = true
}
}
return delivered
}
func (m *Manager) forwardPublishToLeader(ctx context.Context, publish types.PublishRequest) error {
@@ -1029,14 +1143,14 @@ func (m *Manager) forwardPublishToLeader(ctx context.Context, publish types.Publ
// deliverToRemoteConsumers delivers messages to consumers registered on remote nodes.
// This enables cross-node queue message routing.
func (m *Manager) deliverToRemoteConsumers(ctx context.Context, config *types.QueueConfig) {
func (m *Manager) deliverToRemoteConsumers(ctx context.Context, config *types.QueueConfig) bool {
// Get all consumers for this queue from the cluster
consumers, err := m.cluster.ListQueueConsumers(ctx, config.Name)
if err != nil {
m.logger.Debug("failed to list cluster consumers",
slog.String("queue", config.Name),
slog.String("error", err.Error()))
return
return false
}
// Group consumers by groupID for proper cursor management
@@ -1049,6 +1163,7 @@ func (m *Manager) deliverToRemoteConsumers(ctx context.Context, config *types.Qu
consumersByGroup[c.GroupID] = append(consumersByGroup[c.GroupID], c)
}
delivered := false
for groupID, groupConsumers := range consumersByGroup {
mode := types.GroupModeQueue
if groupConsumers[0].Mode != "" {
@@ -1092,27 +1207,16 @@ func (m *Manager) deliverToRemoteConsumers(ctx context.Context, config *types.Qu
if err != nil {
continue
}
if len(msgs) > 0 {
delivered = true
}
// Route each message to the remote node
for _, msg := range msgs {
payload := msg.GetPayload()
properties := msg.Properties
properties := m.createRouteProperties(msg, groupID, config.Name)
if group.Mode == types.GroupModeStream {
propsCopy := make(map[string]string, len(msg.Properties)+4)
for k, v := range msg.Properties {
propsCopy[k] = v
}
// Decorate properties for stream consumers.
propsCopy["x-stream-offset"] = fmt.Sprintf("%d", msg.Sequence)
if !msg.CreatedAt.IsZero() {
propsCopy["x-stream-timestamp"] = fmt.Sprintf("%d", msg.CreatedAt.UnixMilli())
}
if hasWorkCommitted {
propsCopy["x-work-committed-offset"] = fmt.Sprintf("%d", workCommitted)
propsCopy["x-work-acked"] = strconv.FormatBool(msg.Sequence < workCommitted)
propsCopy["x-work-group"] = config.PrimaryGroup
}
properties = propsCopy
m.decorateStreamProperties(properties, msg, workCommitted, hasWorkCommitted, config.PrimaryGroup)
}
err := m.cluster.RouteQueueMessage(
@@ -1120,7 +1224,7 @@ func (m *Manager) deliverToRemoteConsumers(ctx context.Context, config *types.Qu
consumerInfo.ProxyNodeID,
consumerInfo.ClientID,
config.Name,
msg.ID,
properties["message-id"],
payload,
properties,
int64(msg.Sequence),
@@ -1141,11 +1245,13 @@ func (m *Manager) deliverToRemoteConsumers(ctx context.Context, config *types.Qu
}
}
}
return delivered
}
func (m *Manager) deliverToGroup(ctx context.Context, config *types.QueueConfig, group *types.ConsumerGroupState, primaryCommitted func(pattern string) (uint64, bool)) {
func (m *Manager) deliverToGroup(ctx context.Context, config *types.QueueConfig, group *types.ConsumerGroupState, primaryCommitted func(pattern string) (uint64, bool)) bool {
if group.ConsumerCount() == 0 {
return
return false
}
// Create filter from group pattern
@@ -1157,9 +1263,10 @@ func (m *Manager) deliverToGroup(ctx context.Context, config *types.QueueConfig,
// Round-robin delivery across consumers
consumers := group.ConsumerIDs()
if len(consumers) == 0 {
return
return false
}
delivered := false
for _, consumerID := range consumers {
var msgs []*types.Message
var err error
@@ -1171,6 +1278,9 @@ func (m *Manager) deliverToGroup(ctx context.Context, config *types.QueueConfig,
if err != nil {
continue
}
if len(msgs) > 0 {
delivered = true
}
// Fetch fresh group state to get current consumer info
freshGroup, err := m.groupStore.GetConsumerGroup(ctx, config.Name, group.ID)
@@ -1198,14 +1308,18 @@ func (m *Manager) deliverToGroup(ctx context.Context, config *types.QueueConfig,
// Check if consumer is on a remote node
if m.cluster != nil && consumerInfo.ProxyNodeID != "" && consumerInfo.ProxyNodeID != m.localNodeID {
// Route to remote node
properties := m.createRouteProperties(msg, group.ID, config.Name)
if group.Mode == types.GroupModeStream {
m.decorateStreamProperties(properties, msg, workCommitted, hasWorkCommitted, config.PrimaryGroup)
}
err := m.cluster.RouteQueueMessage(
ctx,
consumerInfo.ProxyNodeID,
consumerInfo.ClientID,
config.Name,
msg.ID,
properties["message-id"],
msg.GetPayload(),
msg.Properties,
properties,
int64(msg.Sequence),
)
if err != nil {
@@ -1231,6 +1345,8 @@ func (m *Manager) deliverToGroup(ctx context.Context, config *types.QueueConfig,
}
}
}
return delivered
}
func (m *Manager) runStealLoop() {
@@ -1261,6 +1377,7 @@ func (m *Manager) runCleanupLoop() {
return
case <-ticker.C:
m.cleanupStaleConsumers()
m.pruneStaleSubscriptions()
}
}
}
@@ -1454,48 +1571,155 @@ func (m *Manager) runEphemeralCleanupLoop() {
// --- Helper Functions ---
func (m *Manager) trackSubscription(clientID, filter string) {
val, _ := m.subscriptions.LoadOrStore(clientID, []string{})
patterns := val.([]string)
patterns = append(patterns, filter)
m.subscriptions.Store(clientID, patterns)
func (m *Manager) subscriptionRefKey(queueName, groupID string) string {
return queueName + "\x00" + groupID
}
func (m *Manager) untrackSubscription(clientID, filter string) {
val, ok := m.subscriptions.Load(clientID)
func (m *Manager) trackSubscription(clientID, queueName, groupID string) {
if clientID == "" || queueName == "" || groupID == "" {
return
}
key := m.subscriptionRefKey(queueName, groupID)
now := time.Now()
m.subscriptionsMu.Lock()
defer m.subscriptionsMu.Unlock()
refs, ok := m.subscriptions[clientID]
if !ok {
refs = make(map[string]*subscriptionRef)
m.subscriptions[clientID] = refs
}
if ref, ok := refs[key]; ok {
ref.refCount++
ref.lastSeen = now
return
}
refs[key] = &subscriptionRef{
queueName: queueName,
groupID: groupID,
refCount: 1,
lastSeen: now,
}
}
func (m *Manager) untrackSubscription(clientID, queueName, groupID string) {
if clientID == "" || queueName == "" || groupID == "" {
return
}
key := m.subscriptionRefKey(queueName, groupID)
m.subscriptionsMu.Lock()
defer m.subscriptionsMu.Unlock()
refs, ok := m.subscriptions[clientID]
if !ok {
return
}
patterns := val.([]string)
newPatterns := make([]string, 0, len(patterns))
for _, p := range patterns {
if p != filter {
newPatterns = append(newPatterns, p)
}
ref, ok := refs[key]
if !ok {
return
}
if len(newPatterns) == 0 {
m.subscriptions.Delete(clientID)
} else {
m.subscriptions.Store(clientID, newPatterns)
ref.refCount--
if ref.refCount <= 0 {
delete(refs, key)
}
if len(refs) == 0 {
delete(m.subscriptions, clientID)
}
}
func (m *Manager) getSubscriptionTargets(clientID string) []subscriptionTarget {
m.subscriptionsMu.RLock()
defer m.subscriptionsMu.RUnlock()
refs, ok := m.subscriptions[clientID]
if !ok {
return nil
}
targets := make([]subscriptionTarget, 0, len(refs))
for key, ref := range refs {
targets = append(targets, subscriptionTarget{
key: key,
queueName: ref.queueName,
groupID: ref.groupID,
})
}
return targets
}
func (m *Manager) touchSubscription(clientID, key string, ts time.Time) {
m.subscriptionsMu.Lock()
defer m.subscriptionsMu.Unlock()
refs, ok := m.subscriptions[clientID]
if !ok {
return
}
ref, ok := refs[key]
if !ok {
return
}
ref.lastSeen = ts
}
func (m *Manager) removeSubscriptionKeys(clientID string, keys []string) {
if len(keys) == 0 {
return
}
m.subscriptionsMu.Lock()
defer m.subscriptionsMu.Unlock()
refs, ok := m.subscriptions[clientID]
if !ok {
return
}
for _, key := range keys {
delete(refs, key)
}
if len(refs) == 0 {
delete(m.subscriptions, clientID)
}
}
func (m *Manager) pruneStaleSubscriptions() {
maxIdle := m.config.ConsumerTimeout * 2
if maxIdle <= 0 {
maxIdle = 5 * time.Minute
}
cutoff := time.Now().Add(-maxIdle)
m.subscriptionsMu.Lock()
defer m.subscriptionsMu.Unlock()
for clientID, refs := range m.subscriptions {
for key, ref := range refs {
if ref.lastSeen.Before(cutoff) {
delete(refs, key)
}
}
if len(refs) == 0 {
delete(m.subscriptions, clientID)
}
}
}
func (m *Manager) createDeliveryMessage(msg *types.Message, groupID string, queueName string) *brokerstorage.Message {
messageID := fmt.Sprintf("%s:%d", queueName, msg.Sequence)
// Create properties map with ack info
props := make(map[string]string)
if msg.Properties != nil {
for k, v := range msg.Properties {
props[k] = v
}
}
props["message-id"] = messageID
props["group-id"] = groupID
props["queue"] = queueName
props["offset"] = fmt.Sprintf("%d", msg.Sequence)
props := m.createRouteProperties(msg, groupID, queueName)
deliveryMsg := &brokerstorage.Message{
Topic: msg.Topic,
@@ -1515,16 +1739,37 @@ func (m *Manager) decorateStreamDelivery(delivery *brokerstorage.Message, msg *t
delivery.Properties = make(map[string]string)
}
delivery.Properties["x-stream-offset"] = fmt.Sprintf("%d", msg.Sequence)
m.decorateStreamProperties(delivery.Properties, msg, workCommitted, hasWorkCommitted, primaryGroup)
}
func (m *Manager) createRouteProperties(msg *types.Message, groupID, queueName string) map[string]string {
props := make(map[string]string, len(msg.Properties)+4)
for k, v := range msg.Properties {
props[k] = v
}
props["message-id"] = fmt.Sprintf("%s:%d", queueName, msg.Sequence)
props["group-id"] = groupID
props["queue"] = queueName
props["offset"] = fmt.Sprintf("%d", msg.Sequence)
return props
}
func (m *Manager) decorateStreamProperties(properties map[string]string, msg *types.Message, workCommitted uint64, hasWorkCommitted bool, primaryGroup string) {
if properties == nil || msg == nil {
return
}
properties["x-stream-offset"] = fmt.Sprintf("%d", msg.Sequence)
if !msg.CreatedAt.IsZero() {
delivery.Properties["x-stream-timestamp"] = fmt.Sprintf("%d", msg.CreatedAt.UnixMilli())
properties["x-stream-timestamp"] = fmt.Sprintf("%d", msg.CreatedAt.UnixMilli())
}
if hasWorkCommitted {
delivery.Properties["x-work-committed-offset"] = fmt.Sprintf("%d", workCommitted)
delivery.Properties["x-work-acked"] = strconv.FormatBool(msg.Sequence < workCommitted)
properties["x-work-committed-offset"] = fmt.Sprintf("%d", workCommitted)
properties["x-work-acked"] = strconv.FormatBool(msg.Sequence < workCommitted)
if primaryGroup != "" {
delivery.Properties["x-work-group"] = primaryGroup
properties["x-work-group"] = primaryGroup
}
}
}
@@ -1633,15 +1878,6 @@ func (m *Manager) computeRetentionOffset(ctx context.Context, config *types.Queu
return offset, hasRetention
}
func parseSubscriptionFilter(filter string) (queueName, pattern string) {
for i, c := range filter {
if c == '/' {
return filter[:i], filter[i+1:]
}
}
return filter, ""
}
// --- Metrics ---
// GetMetrics returns the current metrics snapshot.
@@ -1695,19 +1931,42 @@ func (m *Manager) DeliverQueueMessage(ctx context.Context, clientID string, msg
payload, _ := msgMap["payload"].([]byte)
properties, _ := msgMap["properties"].(map[string]string)
messageID, _ := msgMap["id"].(string)
sequence, _ := msgMap["sequence"].(int64)
if properties == nil {
properties = make(map[string]string)
var sequence int64
switch v := msgMap["sequence"].(type) {
case int64:
sequence = v
case uint64:
sequence = int64(v)
case int:
sequence = int64(v)
case float64:
sequence = int64(v)
}
props := make(map[string]string, len(properties)+4)
for k, v := range properties {
props[k] = v
}
if messageID == "" {
if props["message-id"] != "" {
messageID = props["message-id"]
} else {
messageID = fmt.Sprintf("%s:%d", queueName, sequence)
}
}
props["message-id"] = messageID
props["queue"] = queueName
props["offset"] = fmt.Sprintf("%d", sequence)
topic := queueName
if topic != "" && !strings.HasPrefix(topic, "$queue/") {
topic = "$queue/" + topic
}
properties["message-id"] = messageID
properties["queue"] = queueName
properties["offset"] = fmt.Sprintf("%d", sequence)
deliveryMsg := &brokerstorage.Message{
Topic: queueName,
Topic: topic,
QoS: 1,
Properties: properties,
Properties: props,
}
deliveryMsg.SetPayloadFromBytes(payload)
+101
View File
@@ -0,0 +1,101 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package queue
import (
"context"
"fmt"
"io"
"log/slog"
"testing"
"github.com/absmach/fluxmq/queue/storage"
memlog "github.com/absmach/fluxmq/queue/storage/memory/log"
"github.com/absmach/fluxmq/queue/types"
brokerstorage "github.com/absmach/fluxmq/storage"
)
func benchmarkQueueDeliveryPath(b *testing.B, queueCount int, fullSweep bool) {
b.Helper()
logStore := memlog.New()
groupStore := newMockGroupStore()
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
var lastMessageID string
var lastGroupID string
deliverFn := func(ctx context.Context, clientID string, msg any) error {
deliveryMsg, ok := msg.(*brokerstorage.Message)
if !ok {
return nil
}
if deliveryMsg.Properties != nil {
lastMessageID = deliveryMsg.Properties["message-id"]
lastGroupID = deliveryMsg.Properties["group-id"]
}
return nil
}
cfg := DefaultConfig()
cfg.DeliveryBatchSize = 1
mgr := NewManager(logStore, groupStore, deliverFn, cfg, logger, nil)
ctx := context.Background()
for i := 0; i < queueCount; i++ {
queueName := fmt.Sprintf("q-%d", i)
queueCfg := types.DefaultQueueConfig(queueName, "$queue/"+queueName+"/#")
if err := mgr.CreateQueue(ctx, queueCfg); err != nil {
b.Fatalf("CreateQueue(%s) failed: %v", queueName, err)
}
}
if err := mgr.Subscribe(ctx, "q-0", "", "worker-1", "workers", ""); err != nil {
b.Fatalf("Subscribe failed: %v", err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
lastMessageID = ""
lastGroupID = ""
if err := mgr.Publish(ctx, types.PublishRequest{
Topic: "$queue/q-0/jobs",
Payload: []byte("x"),
}); err != nil {
b.Fatalf("Publish failed: %v", err)
}
if fullSweep {
mgr.deliverMessages()
} else if ok := mgr.deliverQueue(ctx, "q-0"); !ok {
b.Fatalf("deliverQueue returned no delivery")
}
if lastMessageID == "" {
b.Fatalf("expected delivered message-id")
}
if lastGroupID == "" {
b.Fatalf("expected delivered group-id")
}
if err := mgr.Ack(ctx, "q-0", lastMessageID, lastGroupID); err != nil {
if err == storage.ErrConsumerNotFound {
b.Fatalf("Ack failed with consumer not found: %v", err)
}
b.Fatalf("Ack failed: %v", err)
}
}
}
func BenchmarkQueueDeliveryScanVsTargeted(b *testing.B) {
for _, queueCount := range []int{100, 1000} {
b.Run(fmt.Sprintf("full_sweep_%dq", queueCount), func(b *testing.B) {
benchmarkQueueDeliveryPath(b, queueCount, true)
})
b.Run(fmt.Sprintf("targeted_queue_%dq", queueCount), func(b *testing.B) {
benchmarkQueueDeliveryPath(b, queueCount, false)
})
}
}
+247 -14
View File
@@ -718,17 +718,19 @@ type mockCluster struct {
forwardCalls []forwardPublishCall
forwardCallsMu sync.Mutex
queueConsumers []*cluster.QueueConsumerInfo
queueConsumersMu sync.RWMutex
registered []*cluster.QueueConsumerInfo
registeredMu sync.Mutex
}
type routedMessage struct {
nodeID string
clientID string
queueName string
messageID string
payload []byte
sequence int64
nodeID string
clientID string
queueName string
messageID string
payload []byte
properties map[string]string
sequence int64
}
type forwardPublishCall struct {
@@ -750,15 +752,20 @@ func newMockCluster(nodeID string) *mockCluster {
func (c *mockCluster) NodeID() string { return c.nodeID }
func (c *mockCluster) RouteQueueMessage(ctx context.Context, nodeID, clientID, queueName, messageID string, payload []byte, properties map[string]string, sequence int64) error {
propsCopy := make(map[string]string, len(properties))
for k, v := range properties {
propsCopy[k] = v
}
c.routedMessagesMu.Lock()
defer c.routedMessagesMu.Unlock()
c.routedMessages = append(c.routedMessages, routedMessage{
nodeID: nodeID,
clientID: clientID,
queueName: queueName,
messageID: messageID,
payload: payload,
sequence: sequence,
nodeID: nodeID,
clientID: clientID,
queueName: queueName,
messageID: messageID,
payload: payload,
properties: propsCopy,
sequence: sequence,
})
return nil
}
@@ -780,6 +787,8 @@ func (c *mockCluster) GetForwardCalls() []forwardPublishCall {
}
func (c *mockCluster) SetQueueConsumers(consumers []*cluster.QueueConsumerInfo) {
c.queueConsumersMu.Lock()
defer c.queueConsumersMu.Unlock()
c.queueConsumers = consumers
}
@@ -846,7 +855,21 @@ func (c *mockCluster) UnregisterQueueConsumer(ctx context.Context, queueName, gr
}
func (c *mockCluster) ListQueueConsumers(ctx context.Context, queueName string) ([]*cluster.QueueConsumerInfo, error) {
return nil, nil
c.queueConsumersMu.RLock()
defer c.queueConsumersMu.RUnlock()
if c.queueConsumers == nil {
return nil, nil
}
consumers := make([]*cluster.QueueConsumerInfo, 0, len(c.queueConsumers))
for _, consumer := range c.queueConsumers {
if consumer != nil && consumer.QueueName == queueName {
consumers = append(consumers, consumer)
}
}
return consumers, nil
}
func (c *mockCluster) ListQueueConsumersByGroup(ctx context.Context, queueName, groupID string) ([]*cluster.QueueConsumerInfo, error) {
@@ -854,6 +877,9 @@ func (c *mockCluster) ListQueueConsumersByGroup(ctx context.Context, queueName,
}
func (c *mockCluster) ListAllQueueConsumers(ctx context.Context) ([]*cluster.QueueConsumerInfo, error) {
c.queueConsumersMu.RLock()
defer c.queueConsumersMu.RUnlock()
if c.queueConsumers == nil {
return nil, nil
}
@@ -979,6 +1005,12 @@ func TestCrossNodeMessageRouting(t *testing.T) {
if rm.clientID != remoteClientID {
t.Errorf("Expected routed message to client %s, got %s", remoteClientID, rm.clientID)
}
if rm.messageID == "" {
t.Error("Expected routed message-id to be set")
}
if rm.properties["group-id"] == "" {
t.Error("Expected routed message to include group-id property")
}
}
}
}
@@ -1010,6 +1042,123 @@ func TestSubscribeDefaultsProxyNodeIDFromCluster(t *testing.T) {
}
}
func TestRemoteRoutingIncludesAckMetadata(t *testing.T) {
logStore := memlog.New()
groupStore := newMockGroupStore()
mockCl := newMockCluster("node-1")
manager := NewManager(
logStore,
groupStore,
func(ctx context.Context, clientID string, msg any) error { return nil },
DefaultConfig(),
slog.New(slog.NewTextHandler(io.Discard, nil)),
mockCl,
)
ctx := context.Background()
if err := manager.Subscribe(ctx, "tasks", "", "remote-client", "workers", "node-2"); err != nil {
t.Fatalf("Subscribe failed: %v", err)
}
if err := manager.Enqueue(ctx, "$queue/tasks/new", []byte("job"), map[string]string{"custom": "value"}); err != nil {
t.Fatalf("Enqueue failed: %v", err)
}
manager.deliverMessages()
routed := mockCl.GetRoutedMessages()
if len(routed) != 1 {
t.Fatalf("expected 1 routed message, got %d", len(routed))
}
msg := routed[0]
if msg.messageID != "tasks:0" {
t.Fatalf("expected message-id tasks:0, got %q", msg.messageID)
}
if got := msg.properties["message-id"]; got != "tasks:0" {
t.Fatalf("expected properties message-id tasks:0, got %q", got)
}
if got := msg.properties["group-id"]; got != "workers" {
t.Fatalf("expected properties group-id workers, got %q", got)
}
if got := msg.properties["queue"]; got != "tasks" {
t.Fatalf("expected properties queue tasks, got %q", got)
}
if got := msg.properties["offset"]; got != "0" {
t.Fatalf("expected properties offset 0, got %q", got)
}
}
func TestRemoteStreamBacklogDeliveredByFallbackSweep(t *testing.T) {
logStore := memlog.New()
groupStore := newMockGroupStore()
mockCl := newMockCluster("node-1")
cfg := DefaultConfig()
cfg.DeliveryInterval = 5 * time.Second
manager := NewManager(
logStore,
groupStore,
func(ctx context.Context, clientID string, msg any) error { return nil },
cfg,
slog.New(slog.NewTextHandler(io.Discard, nil)),
mockCl,
)
if err := manager.Start(context.Background()); err != nil {
t.Fatalf("Start failed: %v", err)
}
defer manager.Stop()
ctx := context.Background()
queueCfg := types.DefaultQueueConfig("events", "$queue/events/#")
queueCfg.Type = types.QueueTypeStream
if err := manager.CreateQueue(ctx, queueCfg); err != nil && err != storage.ErrQueueAlreadyExists {
t.Fatalf("CreateQueue failed: %v", err)
}
if err := manager.Enqueue(ctx, "$queue/events/user.action", []byte("event-1"), nil); err != nil {
t.Fatalf("Enqueue failed: %v", err)
}
time.Sleep(200 * time.Millisecond)
// Simulate remote stream consumer registration that happens after publish.
mockCl.SetQueueConsumers([]*cluster.QueueConsumerInfo{
{
QueueName: "events",
GroupID: "demo-readers@#",
ConsumerID: "remote-consumer-1",
ClientID: "amqp091-conn-remote",
Pattern: "#",
Mode: string(types.GroupModeStream),
ProxyNodeID: "node-2",
RegisteredAt: time.Now(),
},
})
deadline := time.After(3 * time.Second)
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
for {
routed := mockCl.GetRoutedMessages()
if len(routed) > 0 {
if got := routed[0].properties["x-stream-offset"]; got != "0" {
t.Fatalf("expected x-stream-offset=0, got %q", got)
}
return
}
select {
case <-deadline:
t.Fatal("expected fallback sweep to deliver backlog to remote stream consumer")
case <-ticker.C:
}
}
}
func TestSubscribeWithCursorDefaultsProxyNodeIDFromCluster(t *testing.T) {
logStore := memlog.New()
groupStore := newMockGroupStore()
@@ -1157,7 +1306,7 @@ func TestDeliverQueueMessage(t *testing.T) {
msg := map[string]interface{}{
"id": "msg-123",
"queueName": "$queue/test",
"queueName": "test",
"payload": []byte("routed payload"),
"properties": map[string]string{"custom": "prop"},
"sequence": int64(42),
@@ -1190,6 +1339,9 @@ func TestDeliverQueueMessage(t *testing.T) {
if deliveredMsg.Properties["message-id"] != "msg-123" {
t.Errorf("Expected message-id 'msg-123', got '%s'", deliveredMsg.Properties["message-id"])
}
if deliveredMsg.Properties["queue"] != "test" {
t.Errorf("Expected queue 'test', got '%s'", deliveredMsg.Properties["queue"])
}
}
func TestGetOrCreateQueue_CreatesEphemeral(t *testing.T) {
@@ -1456,3 +1608,84 @@ func TestEnqueueLocal(t *testing.T) {
t.Error("Expected message to be stored in mqtt queue")
}
}
func TestSubscriptionTrackingReferenceCounts(t *testing.T) {
manager := NewManager(
memlog.New(),
newMockGroupStore(),
func(context.Context, string, any) error { return nil },
DefaultConfig(),
slog.New(slog.NewTextHandler(io.Discard, nil)),
nil,
)
manager.trackSubscription("client-1", "orders", "workers@#")
manager.trackSubscription("client-1", "orders", "workers@#")
targets := manager.getSubscriptionTargets("client-1")
if len(targets) != 1 {
t.Fatalf("expected 1 tracked target after duplicate subscriptions, got %d", len(targets))
}
manager.untrackSubscription("client-1", "orders", "workers@#")
targets = manager.getSubscriptionTargets("client-1")
if len(targets) != 1 {
t.Fatalf("expected tracked target to remain after first untrack, got %d", len(targets))
}
manager.untrackSubscription("client-1", "orders", "workers@#")
targets = manager.getSubscriptionTargets("client-1")
if len(targets) != 0 {
t.Fatalf("expected no tracked targets after reference count reaches zero, got %d", len(targets))
}
}
func TestSubscriptionTrackingPrunesStaleEntries(t *testing.T) {
cfg := DefaultConfig()
cfg.ConsumerTimeout = 20 * time.Millisecond
manager := NewManager(
memlog.New(),
newMockGroupStore(),
func(context.Context, string, any) error { return nil },
cfg,
slog.New(slog.NewTextHandler(io.Discard, nil)),
nil,
)
manager.trackSubscription("client-1", "orders", "workers@#")
key := manager.subscriptionRefKey("orders", "workers@#")
manager.subscriptionsMu.Lock()
manager.subscriptions["client-1"][key].lastSeen = time.Now().Add(-time.Minute)
manager.subscriptionsMu.Unlock()
manager.pruneStaleSubscriptions()
targets := manager.getSubscriptionTargets("client-1")
if len(targets) != 0 {
t.Fatalf("expected stale tracked target to be pruned, got %d entries", len(targets))
}
}
func TestUpdateHeartbeatRemovesStaleTrackedTargets(t *testing.T) {
manager := NewManager(
memlog.New(),
newMockGroupStore(),
func(context.Context, string, any) error { return nil },
DefaultConfig(),
slog.New(slog.NewTextHandler(io.Discard, nil)),
nil,
)
manager.trackSubscription("client-1", "orders", "workers@#")
if err := manager.UpdateHeartbeat(context.Background(), "client-1"); err != nil {
t.Fatalf("UpdateHeartbeat failed: %v", err)
}
targets := manager.getSubscriptionTargets("client-1")
if len(targets) != 0 {
t.Fatalf("expected stale tracked target to be removed after heartbeat update, got %d entries", len(targets))
}
}
+132 -12
View File
@@ -7,6 +7,7 @@ import (
"context"
"fmt"
"log/slog"
"net"
"os"
"sync"
"testing"
@@ -14,8 +15,11 @@ import (
"github.com/absmach/fluxmq/cluster"
"github.com/absmach/fluxmq/config"
logStorage "github.com/absmach/fluxmq/logstorage"
"github.com/absmach/fluxmq/mqtt/broker"
"github.com/absmach/fluxmq/queue"
"github.com/absmach/fluxmq/server/tcp"
brokerstorage "github.com/absmach/fluxmq/storage"
"github.com/absmach/fluxmq/storage/badger"
"github.com/stretchr/testify/require"
)
@@ -28,6 +32,23 @@ type TestCluster struct {
stopped bool
}
func allocateUniquePort(t *testing.T, used map[int]struct{}) int {
t.Helper()
for {
ln, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
port := ln.Addr().(*net.TCPAddr).Port
_ = ln.Close()
if _, exists := used[port]; exists {
continue
}
used[port] = struct{}{}
return port
}
}
// TestNode represents a single node in the test cluster.
type TestNode struct {
ID string
@@ -46,6 +67,71 @@ type TestNode struct {
ctx context.Context
cancel context.CancelFunc
tcpStopped chan struct{}
queueStore *logStorage.Adapter
queueDeliveriesMu sync.RWMutex
queueDeliveries []QueueDelivery
}
// QueueDelivery captures a queue delivery observed by the test node queue manager.
type QueueDelivery struct {
ClientID string
Message *brokerstorage.Message
}
func cloneDeliveryMessage(msg *brokerstorage.Message) *brokerstorage.Message {
if msg == nil {
return nil
}
clone := &brokerstorage.Message{
Topic: msg.Topic,
ContentType: msg.ContentType,
ResponseTopic: msg.ResponseTopic,
QoS: msg.QoS,
Retain: msg.Retain,
}
if len(msg.Properties) > 0 {
clone.Properties = make(map[string]string, len(msg.Properties))
for k, v := range msg.Properties {
clone.Properties[k] = v
}
}
if payload := msg.GetPayload(); len(payload) > 0 {
clone.Payload = make([]byte, len(payload))
copy(clone.Payload, payload)
}
return clone
}
func (n *TestNode) recordQueueDelivery(clientID string, msg any) {
deliveryMsg, ok := msg.(*brokerstorage.Message)
if !ok || deliveryMsg == nil {
return
}
n.queueDeliveriesMu.Lock()
n.queueDeliveries = append(n.queueDeliveries, QueueDelivery{
ClientID: clientID,
Message: cloneDeliveryMessage(deliveryMsg),
})
n.queueDeliveriesMu.Unlock()
}
// QueueDeliveries returns a snapshot of captured queue deliveries.
func (n *TestNode) QueueDeliveries() []QueueDelivery {
n.queueDeliveriesMu.RLock()
defer n.queueDeliveriesMu.RUnlock()
out := make([]QueueDelivery, len(n.queueDeliveries))
for i, d := range n.queueDeliveries {
out[i] = QueueDelivery{
ClientID: d.ClientID,
Message: cloneDeliveryMessage(d.Message),
}
}
return out
}
// NewTestCluster creates a new test cluster with the specified number of nodes.
@@ -59,18 +145,24 @@ func NewTestCluster(t *testing.T, nodeCount int) *TestCluster {
Nodes: make([]*TestNode, nodeCount),
}
// Base ports for allocation
// Make sure etcd client and peer port ranges don't overlap!
baseTCPPort := 10883 // MQTT TCP: 10883, 10884, 10885, ...
baseGRPCPort := 19000 // gRPC: 19000, 19001, 19002, ...
baseEtcdPort := 12379 // etcd client: 12379, 12380, 12381, ...
basePeerPort := 12390 // etcd peer: 12390, 12391, 12392, ...
// Allocate ports dynamically to avoid collisions across concurrent/stale test runs.
usedPorts := make(map[int]struct{})
tcpPorts := make([]int, nodeCount)
grpcPorts := make([]int, nodeCount)
etcdPorts := make([]int, nodeCount)
peerPorts := make([]int, nodeCount)
for i := 0; i < nodeCount; i++ {
tcpPorts[i] = allocateUniquePort(t, usedPorts)
grpcPorts[i] = allocateUniquePort(t, usedPorts)
etcdPorts[i] = allocateUniquePort(t, usedPorts)
peerPorts[i] = allocateUniquePort(t, usedPorts)
}
// Build initial cluster string for etcd
initialCluster := make([]string, nodeCount)
for i := 0; i < nodeCount; i++ {
nodeID := fmt.Sprintf("node-%d", i)
peerAddr := fmt.Sprintf("127.0.0.1:%d", basePeerPort+i)
peerAddr := fmt.Sprintf("127.0.0.1:%d", peerPorts[i])
initialCluster[i] = fmt.Sprintf("%s=http://%s", nodeID, peerAddr)
}
initialClusterStr := ""
@@ -83,7 +175,7 @@ func NewTestCluster(t *testing.T, nodeCount int) *TestCluster {
// Create nodes
for i := 0; i < nodeCount; i++ {
node := tc.createNode(i, baseTCPPort+i, baseGRPCPort+i, baseEtcdPort+i, basePeerPort+i, initialClusterStr)
node := tc.createNode(i, tcpPorts[i], grpcPorts[i], etcdPorts[i], peerPorts[i], initialClusterStr)
tc.Nodes[i] = node
}
@@ -196,6 +288,18 @@ func (tc *TestCluster) startNode(node *TestNode, bootstrap bool, peerTransports
}
b := broker.NewBroker(store, clust, nullLogger, nil, nil, nil, nil, config.SessionConfig{})
// Create log-backed queue manager for queue/stream integration paths.
queueStore, err := logStorage.NewAdapter(fmt.Sprintf("%s/queue", node.DataDir), logStorage.DefaultAdapterConfig())
if err != nil {
return fmt.Errorf("failed to create queue log storage: %w", err)
}
queueCfg := queue.DefaultConfig()
deliverFn := func(ctx context.Context, clientID string, msg any) error {
node.recordQueueDelivery(clientID, msg)
return b.DeliverToSessionByID(ctx, clientID, msg)
}
qm := queue.NewManager(queueStore, queueStore, deliverFn, queueCfg, nullLogger, clust)
// Wire broker as message handler (includes session management)
clust.SetMessageHandler(b)
@@ -204,6 +308,13 @@ func (tc *TestCluster) startNode(node *TestNode, bootstrap bool, peerTransports
return fmt.Errorf("failed to start cluster: %w", err)
}
if err := b.SetQueueManager(qm); err != nil {
_ = queueStore.Close()
return fmt.Errorf("failed to set queue manager: %w", err)
}
clust.SetQueueHandler(qm)
node.queueStore = queueStore
// Create and start TCP server
tcpCfg := tcp.Config{
Address: node.TCPAddr,
@@ -268,14 +379,19 @@ func (tc *TestCluster) stopNode(node *TestNode) {
}
}
// Stop cluster first
// Stop broker first so queue/session cleanup finishes before etcd client closes.
if node.Broker != nil {
node.Broker.Close()
}
// Stop cluster after broker/queue shutdown.
if node.Cluster != nil {
node.Cluster.Stop()
}
// Close broker
if node.Broker != nil {
node.Broker.Close()
if node.queueStore != nil {
_ = node.queueStore.Close()
node.queueStore = nil
}
// Clean up data directory
@@ -283,6 +399,10 @@ func (tc *TestCluster) stopNode(node *TestNode) {
os.RemoveAll(node.DataDir)
}
node.queueDeliveriesMu.Lock()
node.queueDeliveries = nil
node.queueDeliveriesMu.Unlock()
tc.t.Logf("Stopped node %s", node.ID)
}
+1 -1
View File
@@ -3,7 +3,7 @@ import { NodeNetwork } from "@/components/node-network";
export function HeroSection() {
return (
<section className="relative border-b-2 border-(--flux-border) py-20 md:py-20 h-[90vh]">
<section className="relative border-b-2 border-(--flux-border) py-20 md:py-20 min-h-[90vh]">
<div className="container mx-auto px-6">
<div className="grid md:grid-cols-2 gap-12 items-center">
{/* Left side - Content */}
@@ -18,17 +18,17 @@ export function NewsletterSection() {
target="_blank"
className="max-w-md mx-auto"
>
<div className="flex gap-0">
<div className="flex flex-col sm:flex-row gap-0">
<input
type="email"
name="EMAIL"
placeholder="Enter your email"
required
className="flex-1 brutalist-border border-r-0 px-4 py-3 focus:outline-none focus:border-(--flux-orange)"
className="flex-1 brutalist-border sm:border-r-0 px-4 py-3 focus:outline-none focus:border-(--flux-orange)"
/>
<button
type="submit"
className="brutalist-border bg-(--flux-orange) hover:bg-(--flux-blue) text-white px-6 py-3 font-bold transition-colors"
className="brutalist-border border-t-0 sm:border-t-2 bg-(--flux-orange) hover:bg-(--flux-blue) text-white px-6 py-3 font-bold transition-colors"
>
SUBSCRIBE
</button>
@@ -12,6 +12,7 @@ export function PerformanceSection() {
</h2>
<div className="max-w-4xl mx-auto">
<div className="overflow-x-auto">
<table className="metrics-table w-full mono text-sm md:text-base">
<thead>
<tr>
@@ -42,6 +43,7 @@ export function PerformanceSection() {
</tr>
</tbody>
</table>
</div>
<div className="mt-8 brutalist-border bg-theme p-6">
<h3 className="font-bold mb-4 mono text-lg">CLUSTER SCALING</h3>