Abstract delivery into delivery engine

Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
dusan
2026-02-10 21:34:43 +01:00
parent 52aa3bfe96
commit dce448035d
4 changed files with 919 additions and 461 deletions
+514
View File
@@ -0,0 +1,514 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package queue
import (
"context"
"fmt"
"log/slog"
"strconv"
"strings"
"sync"
"time"
"github.com/absmach/fluxmq/cluster"
"github.com/absmach/fluxmq/queue/consumer"
"github.com/absmach/fluxmq/queue/storage"
"github.com/absmach/fluxmq/queue/types"
brokerstorage "github.com/absmach/fluxmq/storage"
)
// RemoteRouter is the subset of cluster.Cluster needed by the delivery engine
// for cross-node message routing.
type RemoteRouter interface {
ListQueueConsumers(ctx context.Context, queueName string) ([]*cluster.QueueConsumerInfo, error)
RouteQueueMessage(ctx context.Context, nodeID, clientID, queueName string, msg *cluster.QueueMessage) error
}
// DeliveryEngine claims messages from queues and routes them to local or
// remote consumers. It owns the scheduling loop and delivery state; the
// Manager delegates all delivery work here.
type DeliveryEngine struct {
queueStore storage.QueueStore
groupStore storage.ConsumerGroupStore
consumerManager *consumer.Manager
local Deliverer
remote RemoteRouter // nil for single-node
localNodeID string
distributionMode DistributionMode
batchSize int
logger *slog.Logger
mu sync.Mutex
enqueued map[string]struct{}
queue chan string
stopCh chan struct{}
wg sync.WaitGroup
}
// NewDeliveryEngine creates a delivery engine. remote may be nil for
// single-node deployments.
func NewDeliveryEngine(
queueStore storage.QueueStore,
groupStore storage.ConsumerGroupStore,
consumerMgr *consumer.Manager,
local Deliverer,
remote RemoteRouter,
localNodeID string,
distributionMode DistributionMode,
batchSize int,
logger *slog.Logger,
) *DeliveryEngine {
return &DeliveryEngine{
queueStore: queueStore,
groupStore: groupStore,
consumerManager: consumerMgr,
local: local,
remote: remote,
localNodeID: localNodeID,
distributionMode: distributionMode,
batchSize: batchSize,
logger: logger,
enqueued: make(map[string]struct{}),
queue: make(chan string, 4096),
stopCh: make(chan struct{}),
}
}
// Start launches the delivery loop goroutine.
func (e *DeliveryEngine) Start() {
e.wg.Add(1)
go e.run()
}
// Stop signals the delivery loop to exit and waits for it to finish.
func (e *DeliveryEngine) Stop() {
close(e.stopCh)
e.wg.Wait()
}
// Schedule enqueues a queue name for delivery. Duplicate schedules for the
// same queue are coalesced until the queue is delivered.
func (e *DeliveryEngine) Schedule(queueName string) {
if queueName == "" {
return
}
e.mu.Lock()
if _, exists := e.enqueued[queueName]; exists {
e.mu.Unlock()
return
}
e.enqueued[queueName] = struct{}{}
e.mu.Unlock()
select {
case e.queue <- queueName:
default:
e.logger.Warn("delivery channel full, dropping trigger (will retry on next sweep)",
slog.String("queue", queueName))
e.markDelivered(queueName)
}
}
// ScheduleAll lists all queues and schedules each for delivery.
func (e *DeliveryEngine) ScheduleAll(ctx context.Context) {
queues, err := e.queueStore.ListQueues(ctx)
if err != nil {
return
}
for _, queueConfig := range queues {
e.Schedule(queueConfig.Name)
}
}
// Unschedule removes a queue from the dedup set. Called when a queue is deleted.
func (e *DeliveryEngine) Unschedule(queueName string) {
e.markDelivered(queueName)
}
// DeliverAll delivers messages for every queue (full sweep). Intended for
// tests and benchmarks that need synchronous delivery without the loop.
func (e *DeliveryEngine) DeliverAll(ctx context.Context) {
queues, err := e.queueStore.ListQueues(ctx)
if err != nil {
return
}
for i := range queues {
e.deliverQueueConfig(ctx, &queues[i])
}
}
// DeliverQueue delivers messages for a single queue by name. Returns true if
// any messages were delivered.
func (e *DeliveryEngine) DeliverQueue(ctx context.Context, queueName string) bool {
if queueName == "" {
return false
}
queueConfig, err := e.queueStore.GetQueue(ctx, queueName)
if err != nil {
return false
}
return e.deliverQueueConfig(ctx, queueConfig)
}
func (e *DeliveryEngine) markDelivered(queueName string) {
e.mu.Lock()
delete(e.enqueued, queueName)
e.mu.Unlock()
}
func (e *DeliveryEngine) run() {
defer e.wg.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-e.stopCh:
return
case queueName := <-e.queue:
e.markDelivered(queueName)
if e.DeliverQueue(context.Background(), queueName) {
e.Schedule(queueName)
}
case <-ticker.C:
e.ScheduleAll(context.Background())
}
}
}
func (e *DeliveryEngine) deliverQueueConfig(ctx context.Context, queueConfig *types.QueueConfig) bool {
if queueConfig == nil {
return false
}
delivered := false
primaryGroup := strings.TrimSpace(queueConfig.PrimaryGroup)
primaryCommitted := make(map[string]uint64)
getPrimaryCommitted := func(pattern string) (uint64, bool) {
if primaryGroup == "" {
return 0, false
}
patternGroupID := primaryGroup
if pattern != "" {
patternGroupID = fmt.Sprintf("%s@%s", primaryGroup, pattern)
}
if val, ok := primaryCommitted[patternGroupID]; ok {
return val, true
}
committed, err := e.consumerManager.GetCommittedOffset(ctx, queueConfig.Name, patternGroupID)
if err != nil {
return 0, false
}
primaryCommitted[patternGroupID] = committed
return committed, true
}
groups, err := e.groupStore.ListConsumerGroups(ctx, queueConfig.Name)
if err == nil {
for _, group := range groups {
if e.deliverToGroup(ctx, queueConfig, group, getPrimaryCommitted) {
delivered = true
}
}
}
if e.remote != nil && e.distributionMode == DistributionForward {
if e.deliverToRemoteConsumers(ctx, queueConfig) {
delivered = true
}
}
return delivered
}
func (e *DeliveryEngine) deliverToGroup(ctx context.Context, config *types.QueueConfig, group *types.ConsumerGroup, primaryCommitted func(pattern string) (uint64, bool)) bool {
if group.ConsumerCount() == 0 {
return false
}
var filter *consumer.Filter
if group.Pattern != "" {
filter = consumer.NewFilter(group.Pattern)
}
consumers := group.ConsumerIDs()
if len(consumers) == 0 {
return false
}
delivered := false
for _, consumerID := range consumers {
var msgs []*types.Message
var err error
if group.Mode == types.GroupModeStream {
msgs, err = e.consumerManager.ClaimBatchStream(ctx, config.Name, group.ID, consumerID, filter, e.batchSize)
} else {
msgs, err = e.consumerManager.ClaimBatch(ctx, config.Name, group.ID, consumerID, filter, e.batchSize)
}
if err != nil {
continue
}
if len(msgs) > 0 {
delivered = true
}
freshGroup, err := e.groupStore.GetConsumerGroup(ctx, config.Name, group.ID)
if err != nil {
continue
}
consumerInfo := freshGroup.GetConsumer(consumerID)
if consumerInfo == nil {
continue
}
if len(msgs) > 0 {
e.consumerManager.UpdateHeartbeat(ctx, config.Name, group.ID, consumerID)
}
var workCommitted uint64
var hasWorkCommitted bool
if group.Mode == types.GroupModeStream && primaryCommitted != nil {
workCommitted, hasWorkCommitted = primaryCommitted(group.Pattern)
}
for _, msg := range msgs {
if e.remote != nil && consumerInfo.ProxyNodeID != "" && consumerInfo.ProxyNodeID != e.localNodeID {
routeMsg := createRoutedQueueMessage(
msg,
group.ID,
config.Name,
group.Mode == types.GroupModeStream,
workCommitted,
hasWorkCommitted,
config.PrimaryGroup,
)
err := e.remote.RouteQueueMessage(
ctx,
consumerInfo.ProxyNodeID,
consumerInfo.ClientID,
config.Name,
routeMsg,
)
if err != nil {
e.logger.Warn("queue message remote routing failed",
slog.String("client", consumerInfo.ClientID),
slog.String("node", consumerInfo.ProxyNodeID),
slog.String("topic", msg.Topic),
slog.String("error", err.Error()))
}
} else if e.local != nil {
deliveryMsg := createDeliveryMessage(msg, group.ID, config.Name)
if group.Mode == types.GroupModeStream {
decorateStreamDelivery(deliveryMsg, msg, workCommitted, hasWorkCommitted, config.PrimaryGroup)
}
if err := e.local.Deliver(ctx, consumerInfo.ClientID, deliveryMsg); err != nil {
e.logger.Warn("queue message delivery failed",
slog.String("client", consumerInfo.ClientID),
slog.String("topic", msg.Topic),
slog.String("error", err.Error()))
}
}
}
}
return delivered
}
func (e *DeliveryEngine) deliverToRemoteConsumers(ctx context.Context, config *types.QueueConfig) bool {
consumers, err := e.remote.ListQueueConsumers(ctx, config.Name)
if err != nil {
e.logger.Debug("failed to list cluster consumers",
slog.String("queue", config.Name),
slog.String("error", err.Error()))
return false
}
consumersByGroup := make(map[string][]*cluster.QueueConsumerInfo)
for _, c := range consumers {
if c.ProxyNodeID == e.localNodeID {
continue
}
consumersByGroup[c.GroupID] = append(consumersByGroup[c.GroupID], c)
}
delivered := false
for groupID, groupConsumers := range consumersByGroup {
mode := types.GroupModeQueue
if groupConsumers[0].Mode != "" {
mode = types.ConsumerGroupMode(groupConsumers[0].Mode)
}
group, err := e.consumerManager.GetOrCreateGroup(ctx, config.Name, groupID, groupConsumers[0].Pattern, mode, true)
if err != nil {
continue
}
var filter *consumer.Filter
if group.Pattern != "" {
filter = consumer.NewFilter(group.Pattern)
}
var workCommitted uint64
var hasWorkCommitted bool
if group.Mode == types.GroupModeStream && config.PrimaryGroup != "" {
patternGroupID := config.PrimaryGroup
if group.Pattern != "" {
patternGroupID = fmt.Sprintf("%s@%s", config.PrimaryGroup, group.Pattern)
}
if committed, err := e.consumerManager.GetCommittedOffset(ctx, config.Name, patternGroupID); err == nil {
workCommitted = committed
hasWorkCommitted = true
}
}
for _, consumerInfo := range groupConsumers {
var msgs []*types.Message
var err error
if group.Mode == types.GroupModeStream {
msgs, err = e.consumerManager.ClaimBatchStream(ctx, config.Name, groupID, consumerInfo.ConsumerID, filter, e.batchSize)
} else {
msgs, err = e.consumerManager.ClaimBatch(ctx, config.Name, groupID, consumerInfo.ConsumerID, filter, e.batchSize)
}
if err != nil {
continue
}
if len(msgs) > 0 {
delivered = true
}
for _, msg := range msgs {
routeMsg := createRoutedQueueMessage(
msg,
groupID,
config.Name,
group.Mode == types.GroupModeStream,
workCommitted,
hasWorkCommitted,
config.PrimaryGroup,
)
err := e.remote.RouteQueueMessage(
ctx,
consumerInfo.ProxyNodeID,
consumerInfo.ClientID,
config.Name,
routeMsg,
)
if err != nil {
e.logger.Warn("remote queue message delivery failed",
slog.String("client", consumerInfo.ClientID),
slog.String("node", consumerInfo.ProxyNodeID),
slog.String("queue", config.Name),
slog.String("error", err.Error()))
} else {
e.logger.Debug("routed queue message to remote consumer",
slog.String("client", consumerInfo.ClientID),
slog.String("node", consumerInfo.ProxyNodeID),
slog.String("queue", config.Name),
slog.Uint64("offset", msg.Sequence))
}
}
}
}
return delivered
}
// --- Message building helpers (stateless) ---
func createDeliveryMessage(msg *types.Message, groupID string, queueName string) *brokerstorage.Message {
props := createRouteProperties(msg, groupID, queueName)
deliveryMsg := &brokerstorage.Message{
Topic: msg.Topic,
QoS: 1,
Properties: props,
}
deliveryMsg.SetPayloadFromBytes(msg.GetPayload())
return deliveryMsg
}
func decorateStreamDelivery(delivery *brokerstorage.Message, msg *types.Message, workCommitted uint64, hasWorkCommitted bool, primaryGroup string) {
if delivery == nil || msg == nil {
return
}
if delivery.Properties == nil {
delivery.Properties = make(map[string]string)
}
decorateStreamProperties(delivery.Properties, msg, workCommitted, hasWorkCommitted, primaryGroup)
}
func createRouteProperties(msg *types.Message, groupID, queueName string) map[string]string {
props := make(map[string]string, len(msg.Properties)+4)
for k, v := range msg.Properties {
props[k] = v
}
props[types.PropMessageID] = fmt.Sprintf("%s:%d", queueName, msg.Sequence)
props[types.PropGroupID] = groupID
props[types.PropQueueName] = queueName
props[types.PropOffset] = fmt.Sprintf("%d", msg.Sequence)
return props
}
func createRoutedQueueMessage(msg *types.Message, groupID, queueName string, stream bool, workCommitted uint64, hasWorkCommitted bool, primaryGroup string) *cluster.QueueMessage {
userProps := make(map[string]string, len(msg.Properties))
for k, v := range msg.Properties {
userProps[k] = v
}
routeMsg := &cluster.QueueMessage{
MessageID: fmt.Sprintf("%s:%d", queueName, msg.Sequence),
QueueName: queueName,
GroupID: groupID,
Payload: msg.GetPayload(),
Sequence: int64(msg.Sequence),
UserProperties: userProps,
Stream: stream,
}
if stream {
routeMsg.StreamOffset = int64(msg.Sequence)
if !msg.CreatedAt.IsZero() {
routeMsg.StreamTimestamp = msg.CreatedAt.UnixMilli()
}
if hasWorkCommitted {
routeMsg.HasWorkCommitted = true
routeMsg.WorkCommittedOffset = int64(workCommitted)
routeMsg.WorkAcked = msg.Sequence < workCommitted
routeMsg.WorkGroup = primaryGroup
}
}
return routeMsg
}
func decorateStreamProperties(properties map[string]string, msg *types.Message, workCommitted uint64, hasWorkCommitted bool, primaryGroup string) {
if properties == nil || msg == nil {
return
}
properties[types.PropStreamOffset] = fmt.Sprintf("%d", msg.Sequence)
if !msg.CreatedAt.IsZero() {
properties[types.PropStreamTimestamp] = fmt.Sprintf("%d", msg.CreatedAt.UnixMilli())
}
if hasWorkCommitted {
properties[types.PropWorkCommittedOffset] = fmt.Sprintf("%d", workCommitted)
properties[types.PropWorkAcked] = strconv.FormatBool(msg.Sequence < workCommitted)
if primaryGroup != "" {
properties[types.PropWorkGroup] = primaryGroup
}
}
}
+362
View File
@@ -0,0 +1,362 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package queue
import (
"context"
"io"
"log/slog"
"sync"
"testing"
"github.com/absmach/fluxmq/cluster"
"github.com/absmach/fluxmq/queue/consumer"
memlog "github.com/absmach/fluxmq/queue/storage/memory/log"
"github.com/absmach/fluxmq/queue/types"
brokerstorage "github.com/absmach/fluxmq/storage"
)
func newTestEngine(t *testing.T, local Deliverer, remote RemoteRouter) (*DeliveryEngine, *memlog.Store, *mockGroupStore) {
t.Helper()
logStore := memlog.New()
groupStore := newMockGroupStore()
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
consumerCfg := consumer.Config{
ClaimBatchSize: 10,
MaxDeliveryCount: 5,
MaxPELSize: 100_000,
AutoCommitInterval: DefaultConfig().AutoCommitInterval,
VisibilityTimeout: DefaultConfig().VisibilityTimeout,
}
consumerMgr := consumer.NewManager(logStore, groupStore, consumerCfg)
nodeID := ""
if remote != nil {
nodeID = "node-1"
}
engine := NewDeliveryEngine(
logStore, groupStore, consumerMgr,
local, remote, nodeID,
DistributionForward, 100, logger,
)
return engine, logStore, groupStore
}
func TestScheduleDedup(t *testing.T) {
engine, _, _ := newTestEngine(t, nil, nil)
engine.Schedule("q1")
engine.Schedule("q1") // duplicate — should be coalesced
// Only one item should be in the channel.
select {
case name := <-engine.queue:
if name != "q1" {
t.Fatalf("expected q1, got %s", name)
}
default:
t.Fatal("expected one item in the queue channel")
}
// Channel should now be empty.
select {
case name := <-engine.queue:
t.Fatalf("expected empty channel, got %s", name)
default:
// ok
}
}
func TestScheduleEmptyQueueName(t *testing.T) {
engine, _, _ := newTestEngine(t, nil, nil)
engine.Schedule("")
select {
case <-engine.queue:
t.Fatal("expected empty string to be ignored")
default:
// ok
}
}
func TestScheduleAllQueues(t *testing.T) {
engine, logStore, _ := newTestEngine(t, nil, nil)
ctx := context.Background()
for _, name := range []string{"a", "b", "c"} {
logStore.CreateQueue(ctx, types.DefaultQueueConfig(name, "$queue/"+name+"/#"))
}
engine.ScheduleAll(ctx)
seen := make(map[string]bool)
for i := 0; i < 3; i++ {
select {
case name := <-engine.queue:
seen[name] = true
default:
t.Fatalf("expected 3 items, got %d", i)
}
}
for _, name := range []string{"a", "b", "c"} {
if !seen[name] {
t.Fatalf("expected queue %s to be scheduled", name)
}
}
}
func TestUnschedule(t *testing.T) {
engine, _, _ := newTestEngine(t, nil, nil)
// Manually add to enqueued set to simulate a pending schedule.
engine.mu.Lock()
engine.enqueued["q1"] = struct{}{}
engine.mu.Unlock()
engine.Unschedule("q1")
engine.mu.Lock()
_, exists := engine.enqueued["q1"]
engine.mu.Unlock()
if exists {
t.Fatal("expected q1 to be removed from enqueued set")
}
}
func TestDeliverQueueLocalConsumer(t *testing.T) {
var mu sync.Mutex
var delivered []*brokerstorage.Message
local := DeliveryTargetFunc(func(ctx context.Context, clientID string, msg *brokerstorage.Message) error {
mu.Lock()
delivered = append(delivered, msg)
mu.Unlock()
return nil
})
engine, logStore, groupStore := newTestEngine(t, local, nil)
ctx := context.Background()
queueCfg := types.DefaultQueueConfig("tasks", "$queue/tasks/#")
logStore.CreateQueue(ctx, queueCfg)
group := types.NewConsumerGroupState("tasks", "workers", "")
group.SetConsumer("c1", &types.ConsumerInfo{
ID: "c1",
ClientID: "c1",
})
groupStore.CreateConsumerGroup(ctx, group)
logStore.Append(ctx, "tasks", &types.Message{
ID: "1",
Topic: "$queue/tasks/new",
Payload: []byte("job1"),
})
ok := engine.DeliverQueue(ctx, "tasks")
if !ok {
t.Fatal("expected DeliverQueue to return true")
}
mu.Lock()
count := len(delivered)
mu.Unlock()
if count != 1 {
t.Fatalf("expected 1 delivered message, got %d", count)
}
if delivered[0].Properties[types.PropQueueName] != "tasks" {
t.Fatalf("expected queue name tasks, got %s", delivered[0].Properties[types.PropQueueName])
}
}
func TestDeliverQueueRemoteConsumer(t *testing.T) {
mockRemote := &mockRemoteRouter{}
engine, logStore, _ := newTestEngine(t, nil, mockRemote)
ctx := context.Background()
queueCfg := types.DefaultQueueConfig("tasks", "$queue/tasks/#")
logStore.CreateQueue(ctx, queueCfg)
// Register a remote consumer via cluster listing.
mockRemote.mu.Lock()
mockRemote.consumers = []*cluster.QueueConsumerInfo{
{
QueueName: "tasks",
GroupID: "workers",
ConsumerID: "remote-c1",
ClientID: "remote-client",
Pattern: "",
Mode: string(types.GroupModeQueue),
ProxyNodeID: "node-2",
},
}
mockRemote.mu.Unlock()
logStore.Append(ctx, "tasks", &types.Message{
ID: "1",
Topic: "$queue/tasks/new",
Payload: []byte("job1"),
})
ok := engine.DeliverQueue(ctx, "tasks")
if !ok {
t.Fatal("expected DeliverQueue to return true")
}
mockRemote.mu.Lock()
count := len(mockRemote.routed)
mockRemote.mu.Unlock()
if count != 1 {
t.Fatalf("expected 1 routed message, got %d", count)
}
if mockRemote.routed[0].msg.QueueName != "tasks" {
t.Fatalf("expected queue tasks, got %s", mockRemote.routed[0].msg.QueueName)
}
if mockRemote.routed[0].nodeID != "node-2" {
t.Fatalf("expected node-2, got %s", mockRemote.routed[0].nodeID)
}
}
func TestDeliverAllSweep(t *testing.T) {
var mu sync.Mutex
deliveryCount := 0
local := DeliveryTargetFunc(func(ctx context.Context, clientID string, msg *brokerstorage.Message) error {
mu.Lock()
deliveryCount++
mu.Unlock()
return nil
})
engine, logStore, groupStore := newTestEngine(t, local, nil)
ctx := context.Background()
for _, name := range []string{"q1", "q2"} {
logStore.CreateQueue(ctx, types.DefaultQueueConfig(name, "$queue/"+name+"/#"))
group := types.NewConsumerGroupState(name, "g-"+name, "")
group.SetConsumer("c1", &types.ConsumerInfo{
ID: "c1",
ClientID: "c1",
})
groupStore.CreateConsumerGroup(ctx, group)
logStore.Append(ctx, name, &types.Message{
ID: "msg-" + name,
Topic: "$queue/" + name + "/test",
Payload: []byte("data"),
})
}
engine.DeliverAll(ctx)
mu.Lock()
defer mu.Unlock()
if deliveryCount != 2 {
t.Fatalf("expected 2 deliveries across 2 queues, got %d", deliveryCount)
}
}
func TestDeliverQueueNilRemoteRouter(t *testing.T) {
var mu sync.Mutex
deliveryCount := 0
local := DeliveryTargetFunc(func(ctx context.Context, clientID string, msg *brokerstorage.Message) error {
mu.Lock()
deliveryCount++
mu.Unlock()
return nil
})
engine, logStore, groupStore := newTestEngine(t, local, nil)
ctx := context.Background()
logStore.CreateQueue(ctx, types.DefaultQueueConfig("q1", "$queue/q1/#"))
group := types.NewConsumerGroupState("q1", "g1", "")
group.SetConsumer("c1", &types.ConsumerInfo{
ID: "c1",
ClientID: "c1",
ProxyNodeID: "node-2", // remote proxy, but no remote router
})
groupStore.CreateConsumerGroup(ctx, group)
logStore.Append(ctx, "q1", &types.Message{
ID: "1",
Topic: "$queue/q1/test",
Payload: []byte("data"),
})
ok := engine.DeliverQueue(ctx, "q1")
if !ok {
t.Fatal("expected DeliverQueue to return true")
}
// With nil remote, the engine should deliver locally instead of remote routing.
mu.Lock()
defer mu.Unlock()
if deliveryCount != 1 {
t.Fatalf("expected 1 local delivery (nil remote falls through to local), got %d", deliveryCount)
}
}
func TestDeliverQueueEmptyName(t *testing.T) {
engine, _, _ := newTestEngine(t, nil, nil)
if engine.DeliverQueue(context.Background(), "") {
t.Fatal("expected false for empty queue name")
}
}
// --- mock RemoteRouter ---
type routedEntry struct {
nodeID string
clientID string
queueName string
msg *cluster.QueueMessage
}
type mockRemoteRouter struct {
mu sync.Mutex
consumers []*cluster.QueueConsumerInfo
routed []routedEntry
}
func (r *mockRemoteRouter) ListQueueConsumers(ctx context.Context, queueName string) ([]*cluster.QueueConsumerInfo, error) {
r.mu.Lock()
defer r.mu.Unlock()
var result []*cluster.QueueConsumerInfo
for _, c := range r.consumers {
if c.QueueName == queueName {
result = append(result, c)
}
}
return result, nil
}
func (r *mockRemoteRouter) RouteQueueMessage(ctx context.Context, nodeID, clientID, queueName string, msg *cluster.QueueMessage) error {
r.mu.Lock()
defer r.mu.Unlock()
r.routed = append(r.routed, routedEntry{
nodeID: nodeID,
clientID: clientID,
queueName: queueName,
msg: msg,
})
return nil
}
+43 -368
View File
@@ -49,9 +49,7 @@ type Manager struct {
stopOnce sync.Once
wg sync.WaitGroup
deliveryMu sync.Mutex
deliverySet map[string]struct{}
deliveryQueue chan string
delivery *DeliveryEngine
// Metrics
metrics *consumer.Metrics
@@ -138,6 +136,23 @@ func NewManager(queueStore storage.QueueStore, groupStore storage.ConsumerGroupS
localNodeID = cl.NodeID()
}
distMode := normalizeDistributionMode(config.DistributionMode)
var remote RemoteRouter
if cl != nil {
remote = cl
}
engine := NewDeliveryEngine(
queueStore, groupStore, consumerMgr,
dt,
remote,
localNodeID,
distMode,
config.DeliveryBatchSize,
logger,
)
return &Manager{
queueStore: queueStore,
groupStore: groupStore,
@@ -147,13 +162,12 @@ func NewManager(queueStore storage.QueueStore, groupStore storage.ConsumerGroupS
logger: logger,
config: config,
writePolicy: normalizeWritePolicy(config.WritePolicy),
distributionMode: normalizeDistributionMode(config.DistributionMode),
distributionMode: distMode,
cluster: cl,
localNodeID: localNodeID,
subscriptions: make(map[string]map[string]*subscriptionRef),
stopCh: make(chan struct{}),
deliverySet: make(map[string]struct{}),
deliveryQueue: make(chan string, 4096),
delivery: engine,
metrics: metrics,
}
}
@@ -174,11 +188,10 @@ func (m *Manager) Start(ctx context.Context) error {
m.cleanupEphemeralQueues()
// Prime delivery for existing queues at startup.
m.scheduleAllQueues(ctx)
m.delivery.ScheduleAll(ctx)
// Start delivery workers
m.wg.Add(1)
go m.runDeliveryLoop()
// Start delivery engine
m.delivery.Start()
// Start work stealing if enabled
if m.config.StealEnabled {
@@ -202,46 +215,6 @@ func (m *Manager) Start(ctx context.Context) error {
return nil
}
func (m *Manager) scheduleAllQueues(ctx context.Context) {
queues, err := m.queueStore.ListQueues(ctx)
if err != nil {
return
}
for _, queueConfig := range queues {
m.scheduleQueueDelivery(queueConfig.Name)
}
}
func (m *Manager) scheduleQueueDelivery(queueName string) {
if queueName == "" {
return
}
m.deliveryMu.Lock()
if _, exists := m.deliverySet[queueName]; exists {
m.deliveryMu.Unlock()
return
}
m.deliverySet[queueName] = struct{}{}
m.deliveryMu.Unlock()
select {
case m.deliveryQueue <- queueName:
default:
// Channel full — delivery trigger dropped. The 1-second sweep ticker
// will reschedule, but there's a gap where this queue won't get delivered.
m.logger.Warn("delivery channel full, dropping trigger (will retry on next sweep)",
slog.String("queue", queueName))
// Prevent a dropped enqueue from getting stuck in the dedupe set forever.
m.markQueueDelivered(queueName)
}
}
func (m *Manager) markQueueDelivered(queueName string) {
m.deliveryMu.Lock()
delete(m.deliverySet, queueName)
m.deliveryMu.Unlock()
}
// ensureReservedQueues creates queues from config or the default mqtt queue if no config provided.
func (m *Manager) ensureReservedQueues(ctx context.Context) error {
@@ -269,6 +242,8 @@ func (m *Manager) ensureReservedQueues(ctx context.Context) error {
// Stop stops the manager and all workers.
func (m *Manager) Stop() error {
m.delivery.Stop()
m.stopOnce.Do(func() {
close(m.stopCh)
})
@@ -303,7 +278,7 @@ func (m *Manager) CreateQueue(ctx context.Context, config types.QueueConfig) err
if err := m.queueStore.CreateQueue(ctx, config); err != nil {
return err
}
m.scheduleQueueDelivery(config.Name)
m.delivery.Schedule(config.Name)
m.logger.Info("queue created",
slog.String("queue", config.Name),
@@ -345,7 +320,7 @@ func (m *Manager) DeleteQueue(ctx context.Context, queueName string) error {
if err := m.queueStore.DeleteQueue(ctx, queueName); err != nil {
return err
}
m.markQueueDelivered(queueName)
m.delivery.Unschedule(queueName)
return nil
}
@@ -481,7 +456,7 @@ func (m *Manager) publishLocal(ctx context.Context, publish types.PublishRequest
slog.String("topic", publish.Topic),
slog.Uint64("offset", offset))
m.scheduleQueueDelivery(queueName)
m.delivery.Schedule(queueName)
}
return nil
@@ -703,7 +678,7 @@ func (m *Manager) SubscribeWithCursor(ctx context.Context, queueName, pattern st
slog.String("cursor", fmt.Sprintf("%d", cursor.Position)),
slog.String("mode", string(mode)))
m.scheduleQueueDelivery(queueName)
m.delivery.Schedule(queueName)
return nil
}
@@ -775,7 +750,7 @@ func (m *Manager) Subscribe(ctx context.Context, queueName, pattern string, clie
slog.String("client", clientID),
slog.String("pattern", pattern))
m.scheduleQueueDelivery(queueName)
m.delivery.Schedule(queueName)
return nil
}
@@ -820,7 +795,7 @@ func (m *Manager) Unsubscribe(ctx context.Context, queueName, pattern string, cl
slog.String("group", patternGroupID),
slog.String("client", clientID))
m.scheduleQueueDelivery(queueName)
m.delivery.Schedule(queueName)
return nil
}
@@ -854,7 +829,7 @@ func (m *Manager) Ack(ctx context.Context, queueName, messageID, groupID string)
slog.String("error", err.Error()))
}
}
m.scheduleQueueDelivery(queueName)
m.delivery.Schedule(queueName)
return nil
}
}
@@ -888,7 +863,7 @@ func (m *Manager) Ack(ctx context.Context, queueName, messageID, groupID string)
slog.String("error", err.Error()))
}
}
m.scheduleQueueDelivery(queueName)
m.delivery.Schedule(queueName)
return nil
}
@@ -898,7 +873,7 @@ func (m *Manager) Ack(ctx context.Context, queueName, messageID, groupID string)
if err == nil {
m.metrics.RecordAck(0)
m.metrics.UpdatePELSize(uint64(group.PendingCount()))
m.scheduleQueueDelivery(queueName)
m.delivery.Schedule(queueName)
return nil
}
}
@@ -917,7 +892,7 @@ func (m *Manager) Nack(ctx context.Context, queueName, messageID, groupID string
if groupID != "" {
if group, err := m.groupStore.GetConsumerGroup(ctx, queueName, groupID); err == nil {
if group.Mode == types.GroupModeStream {
m.scheduleQueueDelivery(queueName)
m.delivery.Schedule(queueName)
return nil
}
}
@@ -933,7 +908,7 @@ func (m *Manager) Nack(ctx context.Context, queueName, messageID, groupID string
continue
}
if group.Mode == types.GroupModeStream {
m.scheduleQueueDelivery(queueName)
m.delivery.Schedule(queueName)
return nil
}
@@ -941,7 +916,7 @@ func (m *Manager) Nack(ctx context.Context, queueName, messageID, groupID string
err := m.consumerManager.Nack(ctx, queueName, group.ID, consumerID, offset)
if err == nil {
m.metrics.RecordNack()
m.scheduleQueueDelivery(queueName)
m.delivery.Schedule(queueName)
return nil
}
}
@@ -984,7 +959,7 @@ func (m *Manager) Reject(ctx context.Context, queueName, messageID, groupID, rea
err := m.consumerManager.Reject(ctx, queueName, group.ID, consumerID, offset, reason)
if err == nil {
m.metrics.RecordReject()
m.scheduleQueueDelivery(queueName)
m.delivery.Schedule(queueName)
return nil
}
}
@@ -1020,7 +995,7 @@ func (m *Manager) rejectStream(ctx context.Context, queueName string, group *typ
slog.Uint64("offset", offset),
slog.String("reason", reason))
m.metrics.RecordReject()
m.scheduleQueueDelivery(queueName)
m.delivery.Schedule(queueName)
}
// --- Heartbeat ---
@@ -1054,104 +1029,14 @@ func (m *Manager) UpdateHeartbeat(ctx context.Context, clientID string) error {
// --- Background Workers ---
func (m *Manager) runDeliveryLoop() {
defer m.wg.Done()
sweepInterval := time.Second
ticker := time.NewTicker(sweepInterval)
defer ticker.Stop()
for {
select {
case <-m.stopCh:
return
case queueName := <-m.deliveryQueue:
m.markQueueDelivered(queueName)
if m.deliverQueue(context.Background(), queueName) {
// Continue draining while this queue still has deliverable work.
m.scheduleQueueDelivery(queueName)
}
case <-ticker.C:
m.scheduleAllQueues(context.Background())
}
}
}
// deliverMessages is a thin forwarding method for test/bench compatibility.
func (m *Manager) deliverMessages() {
ctx := context.Background()
queues, err := m.queueStore.ListQueues(ctx)
if err != nil {
return
}
for i := range queues {
m.deliverQueueConfig(ctx, &queues[i])
}
m.delivery.DeliverAll(context.Background())
}
// deliverQueue is a thin forwarding method for test/bench compatibility.
func (m *Manager) deliverQueue(ctx context.Context, queueName string) bool {
if queueName == "" {
return false
}
queueConfig, err := m.queueStore.GetQueue(ctx, queueName)
if err != nil {
return false
}
return m.deliverQueueConfig(ctx, queueConfig)
}
func (m *Manager) deliverQueueConfig(ctx context.Context, queueConfig *types.QueueConfig) bool {
if queueConfig == nil {
return false
}
delivered := false
primaryGroup := strings.TrimSpace(queueConfig.PrimaryGroup)
primaryCommitted := make(map[string]uint64)
getPrimaryCommitted := func(pattern string) (uint64, bool) {
if primaryGroup == "" {
return 0, false
}
patternGroupID := primaryGroup
if pattern != "" {
patternGroupID = fmt.Sprintf("%s@%s", primaryGroup, pattern)
}
if val, ok := primaryCommitted[patternGroupID]; ok {
return val, true
}
committed, err := m.consumerManager.GetCommittedOffset(ctx, queueConfig.Name, patternGroupID)
if err != nil {
return 0, false
}
primaryCommitted[patternGroupID] = committed
return committed, true
}
// Deliver to local consumer groups.
groups, err := m.groupStore.ListConsumerGroups(ctx, queueConfig.Name)
if err == nil {
for _, group := range groups {
if m.deliverToGroup(ctx, queueConfig, group, getPrimaryCommitted) {
delivered = true
}
}
}
// Deliver to remote consumers registered in cluster.
if m.cluster != nil && m.distributionMode == DistributionForward {
if m.deliverToRemoteConsumers(ctx, queueConfig) {
delivered = true
}
}
return delivered
return m.delivery.DeliverQueue(ctx, queueName)
}
func (m *Manager) forwardPublishToLeader(ctx context.Context, publish types.PublishRequest) error {
@@ -1167,216 +1052,6 @@ func (m *Manager) forwardPublishToLeader(ctx context.Context, publish types.Publ
return m.cluster.ForwardQueuePublish(ctx, leaderID, publish.Topic, publish.Payload, publish.Properties, true)
}
// deliverToRemoteConsumers delivers messages to consumers registered on remote nodes.
// This enables cross-node queue message routing.
func (m *Manager) deliverToRemoteConsumers(ctx context.Context, config *types.QueueConfig) bool {
// Get all consumers for this queue from the cluster
consumers, err := m.cluster.ListQueueConsumers(ctx, config.Name)
if err != nil {
m.logger.Debug("failed to list cluster consumers",
slog.String("queue", config.Name),
slog.String("error", err.Error()))
return false
}
// Group consumers by groupID for proper cursor management
consumersByGroup := make(map[string][]*cluster.QueueConsumerInfo)
for _, c := range consumers {
// Only process consumers on remote nodes
if c.ProxyNodeID == m.localNodeID {
continue
}
consumersByGroup[c.GroupID] = append(consumersByGroup[c.GroupID], c)
}
delivered := false
for groupID, groupConsumers := range consumersByGroup {
mode := types.GroupModeQueue
if groupConsumers[0].Mode != "" {
mode = types.ConsumerGroupMode(groupConsumers[0].Mode)
}
// Get or create a local consumer group state for tracking cursor (default auto-commit for remote)
group, err := m.consumerManager.GetOrCreateGroup(ctx, config.Name, groupID, groupConsumers[0].Pattern, mode, true)
if err != nil {
continue
}
// Create filter from group pattern
var filter *consumer.Filter
if group.Pattern != "" {
filter = consumer.NewFilter(group.Pattern)
}
// Round-robin across remote consumers in this group
var workCommitted uint64
var hasWorkCommitted bool
if group.Mode == types.GroupModeStream && config.PrimaryGroup != "" {
patternGroupID := config.PrimaryGroup
if group.Pattern != "" {
patternGroupID = fmt.Sprintf("%s@%s", config.PrimaryGroup, group.Pattern)
}
if committed, err := m.consumerManager.GetCommittedOffset(ctx, config.Name, patternGroupID); err == nil {
workCommitted = committed
hasWorkCommitted = true
}
}
for _, consumerInfo := range groupConsumers {
// Claim messages for this remote consumer
var msgs []*types.Message
var err error
if group.Mode == types.GroupModeStream {
msgs, err = m.consumerManager.ClaimBatchStream(ctx, config.Name, groupID, consumerInfo.ConsumerID, filter, m.config.DeliveryBatchSize)
} else {
msgs, err = m.consumerManager.ClaimBatch(ctx, config.Name, groupID, consumerInfo.ConsumerID, filter, m.config.DeliveryBatchSize)
}
if err != nil {
continue
}
if len(msgs) > 0 {
delivered = true
}
// Route each message to the remote node
for _, msg := range msgs {
routeMsg := m.createRoutedQueueMessage(
msg,
groupID,
config.Name,
group.Mode == types.GroupModeStream,
workCommitted,
hasWorkCommitted,
config.PrimaryGroup,
)
err := m.cluster.RouteQueueMessage(
ctx,
consumerInfo.ProxyNodeID,
consumerInfo.ClientID,
config.Name,
routeMsg,
)
if err != nil {
m.logger.Warn("remote queue message delivery failed",
slog.String("client", consumerInfo.ClientID),
slog.String("node", consumerInfo.ProxyNodeID),
slog.String("queue", config.Name),
slog.String("error", err.Error()))
} else {
m.logger.Debug("routed queue message to remote consumer",
slog.String("client", consumerInfo.ClientID),
slog.String("node", consumerInfo.ProxyNodeID),
slog.String("queue", config.Name),
slog.Uint64("offset", msg.Sequence))
}
}
}
}
return delivered
}
func (m *Manager) deliverToGroup(ctx context.Context, config *types.QueueConfig, group *types.ConsumerGroup, primaryCommitted func(pattern string) (uint64, bool)) bool {
if group.ConsumerCount() == 0 {
return false
}
// Create filter from group pattern
var filter *consumer.Filter
if group.Pattern != "" {
filter = consumer.NewFilter(group.Pattern)
}
// Round-robin delivery across consumers
consumers := group.ConsumerIDs()
if len(consumers) == 0 {
return false
}
delivered := false
for _, consumerID := range consumers {
var msgs []*types.Message
var err error
if group.Mode == types.GroupModeStream {
msgs, err = m.consumerManager.ClaimBatchStream(ctx, config.Name, group.ID, consumerID, filter, m.config.DeliveryBatchSize)
} else {
msgs, err = m.consumerManager.ClaimBatch(ctx, config.Name, group.ID, consumerID, filter, m.config.DeliveryBatchSize)
}
if err != nil {
continue
}
if len(msgs) > 0 {
delivered = true
}
// Fetch fresh group state to get current consumer info
freshGroup, err := m.groupStore.GetConsumerGroup(ctx, config.Name, group.ID)
if err != nil {
continue
}
consumerInfo := freshGroup.GetConsumer(consumerID)
if consumerInfo == nil {
continue
}
// Update heartbeat on delivery so active consumers are never cleaned up as stale.
if len(msgs) > 0 {
m.consumerManager.UpdateHeartbeat(ctx, config.Name, group.ID, consumerID)
}
var workCommitted uint64
var hasWorkCommitted bool
if group.Mode == types.GroupModeStream && primaryCommitted != nil {
workCommitted, hasWorkCommitted = primaryCommitted(group.Pattern)
}
for _, msg := range msgs {
// Check if consumer is on a remote node
if m.cluster != nil && consumerInfo.ProxyNodeID != "" && consumerInfo.ProxyNodeID != m.localNodeID {
// Route to remote node
routeMsg := m.createRoutedQueueMessage(
msg,
group.ID,
config.Name,
group.Mode == types.GroupModeStream,
workCommitted,
hasWorkCommitted,
config.PrimaryGroup,
)
err := m.cluster.RouteQueueMessage(
ctx,
consumerInfo.ProxyNodeID,
consumerInfo.ClientID,
config.Name,
routeMsg,
)
if err != nil {
m.logger.Warn("queue message remote routing failed",
slog.String("client", consumerInfo.ClientID),
slog.String("node", consumerInfo.ProxyNodeID),
slog.String("topic", msg.Topic),
slog.String("error", err.Error()))
}
} else if m.deliveryTarget != nil {
// Local delivery
deliveryMsg := m.createDeliveryMessage(msg, group.ID, config.Name)
if group.Mode == types.GroupModeStream {
m.decorateStreamDelivery(deliveryMsg, msg, group, workCommitted, hasWorkCommitted, config.PrimaryGroup)
}
if err := m.deliveryTarget.Deliver(ctx, consumerInfo.ClientID, deliveryMsg); err != nil {
m.logger.Warn("queue message delivery failed",
slog.String("client", consumerInfo.ClientID),
slog.String("topic", msg.Topic),
slog.String("error", err.Error()))
}
}
}
}
return delivered
}
func (m *Manager) runStealLoop() {
defer m.wg.Done()
-93
View File
@@ -5,102 +5,9 @@ package queue
import (
"fmt"
"strconv"
"time"
"github.com/absmach/fluxmq/cluster"
"github.com/absmach/fluxmq/queue/types"
brokerstorage "github.com/absmach/fluxmq/storage"
)
func (m *Manager) createDeliveryMessage(msg *types.Message, groupID string, queueName string) *brokerstorage.Message {
props := m.createRouteProperties(msg, groupID, queueName)
deliveryMsg := &brokerstorage.Message{
Topic: msg.Topic,
QoS: 1, // queue messages use QoS 1 by default
Properties: props,
}
deliveryMsg.SetPayloadFromBytes(msg.GetPayload())
return deliveryMsg
}
func (m *Manager) decorateStreamDelivery(delivery *brokerstorage.Message, msg *types.Message, _ *types.ConsumerGroup, workCommitted uint64, hasWorkCommitted bool, primaryGroup string) {
if delivery == nil || msg == nil {
return
}
if delivery.Properties == nil {
delivery.Properties = make(map[string]string)
}
m.decorateStreamProperties(delivery.Properties, msg, workCommitted, hasWorkCommitted, primaryGroup)
}
func (m *Manager) createRouteProperties(msg *types.Message, groupID, queueName string) map[string]string {
props := make(map[string]string, len(msg.Properties)+4)
for k, v := range msg.Properties {
props[k] = v
}
props[types.PropMessageID] = fmt.Sprintf("%s:%d", queueName, msg.Sequence)
props[types.PropGroupID] = groupID
props[types.PropQueueName] = queueName
props[types.PropOffset] = fmt.Sprintf("%d", msg.Sequence)
return props
}
func (m *Manager) createRoutedQueueMessage(msg *types.Message, groupID, queueName string, stream bool, workCommitted uint64, hasWorkCommitted bool, primaryGroup string) *cluster.QueueMessage {
userProps := make(map[string]string, len(msg.Properties))
for k, v := range msg.Properties {
userProps[k] = v
}
routeMsg := &cluster.QueueMessage{
MessageID: fmt.Sprintf("%s:%d", queueName, msg.Sequence),
QueueName: queueName,
GroupID: groupID,
Payload: msg.GetPayload(),
Sequence: int64(msg.Sequence),
UserProperties: userProps,
Stream: stream,
}
if stream {
routeMsg.StreamOffset = int64(msg.Sequence)
if !msg.CreatedAt.IsZero() {
routeMsg.StreamTimestamp = msg.CreatedAt.UnixMilli()
}
if hasWorkCommitted {
routeMsg.HasWorkCommitted = true
routeMsg.WorkCommittedOffset = int64(workCommitted)
routeMsg.WorkAcked = msg.Sequence < workCommitted
routeMsg.WorkGroup = primaryGroup
}
}
return routeMsg
}
func (m *Manager) decorateStreamProperties(properties map[string]string, msg *types.Message, workCommitted uint64, hasWorkCommitted bool, primaryGroup string) {
if properties == nil || msg == nil {
return
}
properties[types.PropStreamOffset] = fmt.Sprintf("%d", msg.Sequence)
if !msg.CreatedAt.IsZero() {
properties[types.PropStreamTimestamp] = fmt.Sprintf("%d", msg.CreatedAt.UnixMilli())
}
if hasWorkCommitted {
properties[types.PropWorkCommittedOffset] = fmt.Sprintf("%d", workCommitted)
properties[types.PropWorkAcked] = strconv.FormatBool(msg.Sequence < workCommitted)
if primaryGroup != "" {
properties[types.PropWorkGroup] = primaryGroup
}
}
}
// DeliveryMessage is the internal message format for queue delivery tracking.
type DeliveryMessage struct {
ID string