mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 06:20:18 +00:00
@@ -0,0 +1,140 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
|
||||
"github.com/absmach/fluxmq/cluster"
|
||||
"github.com/absmach/fluxmq/queue/types"
|
||||
)
|
||||
|
||||
// ClusterAdapter abstracts all cluster interactions for the queue Manager.
|
||||
// Implementations are nil-safe: a noopClusterAdapter is used in single-node mode.
|
||||
type ClusterAdapter interface {
|
||||
RegisterConsumer(ctx context.Context, info *cluster.QueueConsumerInfo) error
|
||||
UnregisterConsumer(ctx context.Context, queueName, groupID, consumerID string) error
|
||||
ForwardPublish(ctx context.Context, nodeID, topic string, payload []byte, properties map[string]string, forwardToLeader bool) error
|
||||
RouteMessage(ctx context.Context, nodeID, clientID, queueName string, msg *cluster.QueueMessage) error
|
||||
ListConsumers(ctx context.Context, queueName string) ([]*cluster.QueueConsumerInfo, error)
|
||||
ListAllConsumers(ctx context.Context) ([]*cluster.QueueConsumerInfo, error)
|
||||
ForwardToRemoteNodes(ctx context.Context, publish types.PublishRequest, queueExists func(string) bool)
|
||||
LocalNodeID() string
|
||||
IsRemote(nodeID string) bool
|
||||
DistributionMode() DistributionMode
|
||||
}
|
||||
|
||||
// clusterAdapter wraps a cluster.Cluster with distribution-mode awareness.
|
||||
type clusterAdapter struct {
|
||||
cluster cluster.Cluster
|
||||
localNodeID string
|
||||
distributionMode DistributionMode
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
func (a *clusterAdapter) RegisterConsumer(ctx context.Context, info *cluster.QueueConsumerInfo) error {
|
||||
return a.cluster.RegisterQueueConsumer(ctx, info)
|
||||
}
|
||||
|
||||
func (a *clusterAdapter) UnregisterConsumer(ctx context.Context, queueName, groupID, consumerID string) error {
|
||||
return a.cluster.UnregisterQueueConsumer(ctx, queueName, groupID, consumerID)
|
||||
}
|
||||
|
||||
func (a *clusterAdapter) ForwardPublish(ctx context.Context, nodeID, topic string, payload []byte, properties map[string]string, forwardToLeader bool) error {
|
||||
return a.cluster.ForwardQueuePublish(ctx, nodeID, topic, payload, properties, forwardToLeader)
|
||||
}
|
||||
|
||||
func (a *clusterAdapter) RouteMessage(ctx context.Context, nodeID, clientID, queueName string, msg *cluster.QueueMessage) error {
|
||||
return a.cluster.RouteQueueMessage(ctx, nodeID, clientID, queueName, msg)
|
||||
}
|
||||
|
||||
func (a *clusterAdapter) ListConsumers(ctx context.Context, queueName string) ([]*cluster.QueueConsumerInfo, error) {
|
||||
return a.cluster.ListQueueConsumers(ctx, queueName)
|
||||
}
|
||||
|
||||
func (a *clusterAdapter) ListAllConsumers(ctx context.Context) ([]*cluster.QueueConsumerInfo, error) {
|
||||
return a.cluster.ListAllQueueConsumers(ctx)
|
||||
}
|
||||
|
||||
func (a *clusterAdapter) ForwardToRemoteNodes(ctx context.Context, publish types.PublishRequest, queueExists func(string) bool) {
|
||||
switch a.distributionMode {
|
||||
case DistributionForward:
|
||||
a.forwardToRemoteNodes(ctx, publish, false, queueExists)
|
||||
case DistributionReplicate:
|
||||
a.forwardToRemoteNodes(ctx, publish, true, queueExists)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *clusterAdapter) forwardToRemoteNodes(ctx context.Context, publish types.PublishRequest, unknownOnly bool, queueExists func(string) bool) {
|
||||
consumers, err := a.cluster.ListAllQueueConsumers(ctx)
|
||||
if err != nil {
|
||||
a.logger.Debug("failed to list cluster consumers for forwarding",
|
||||
slog.String("error", err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
remoteNodes := make(map[string]bool)
|
||||
for _, c := range consumers {
|
||||
if c.ProxyNodeID == a.localNodeID {
|
||||
continue
|
||||
}
|
||||
|
||||
if unknownOnly && queueExists(c.QueueName) {
|
||||
continue
|
||||
}
|
||||
|
||||
queuePattern := "$queue/" + c.QueueName + "/#"
|
||||
if matchesTopic(queuePattern, publish.Topic) {
|
||||
remoteNodes[c.ProxyNodeID] = true
|
||||
}
|
||||
}
|
||||
|
||||
for nodeID := range remoteNodes {
|
||||
if err := a.cluster.ForwardQueuePublish(ctx, nodeID, publish.Topic, publish.Payload, publish.Properties, false); err != nil {
|
||||
a.logger.Warn("failed to forward publish to remote node",
|
||||
slog.String("node", nodeID),
|
||||
slog.String("topic", publish.Topic),
|
||||
slog.String("error", err.Error()))
|
||||
} else {
|
||||
a.logger.Debug("forwarded publish to remote node",
|
||||
slog.String("node", nodeID),
|
||||
slog.String("topic", publish.Topic))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *clusterAdapter) LocalNodeID() string { return a.localNodeID }
|
||||
func (a *clusterAdapter) DistributionMode() DistributionMode { return a.distributionMode }
|
||||
|
||||
func (a *clusterAdapter) IsRemote(nodeID string) bool {
|
||||
return nodeID != "" && nodeID != a.localNodeID
|
||||
}
|
||||
|
||||
// noopClusterAdapter is used in single-node mode. All methods are no-ops.
|
||||
type noopClusterAdapter struct{}
|
||||
|
||||
func (noopClusterAdapter) RegisterConsumer(context.Context, *cluster.QueueConsumerInfo) error {
|
||||
return nil
|
||||
}
|
||||
func (noopClusterAdapter) UnregisterConsumer(context.Context, string, string, string) error {
|
||||
return nil
|
||||
}
|
||||
func (noopClusterAdapter) ForwardPublish(context.Context, string, string, []byte, map[string]string, bool) error {
|
||||
return nil
|
||||
}
|
||||
func (noopClusterAdapter) RouteMessage(context.Context, string, string, string, *cluster.QueueMessage) error {
|
||||
return nil
|
||||
}
|
||||
func (noopClusterAdapter) ListConsumers(context.Context, string) ([]*cluster.QueueConsumerInfo, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (noopClusterAdapter) ListAllConsumers(context.Context) ([]*cluster.QueueConsumerInfo, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (noopClusterAdapter) ForwardToRemoteNodes(context.Context, types.PublishRequest, func(string) bool) {
|
||||
}
|
||||
func (noopClusterAdapter) LocalNodeID() string { return "" }
|
||||
func (noopClusterAdapter) IsRemote(string) bool { return false }
|
||||
func (noopClusterAdapter) DistributionMode() DistributionMode { return "" }
|
||||
@@ -0,0 +1,230 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/absmach/fluxmq/cluster"
|
||||
"github.com/absmach/fluxmq/queue/types"
|
||||
)
|
||||
|
||||
func TestNoopClusterAdapter(t *testing.T) {
|
||||
var a ClusterAdapter = noopClusterAdapter{}
|
||||
ctx := context.Background()
|
||||
|
||||
if err := a.RegisterConsumer(ctx, &cluster.QueueConsumerInfo{}); err != nil {
|
||||
t.Fatalf("expected nil, got %v", err)
|
||||
}
|
||||
if err := a.UnregisterConsumer(ctx, "q", "g", "c"); err != nil {
|
||||
t.Fatalf("expected nil, got %v", err)
|
||||
}
|
||||
if err := a.ForwardPublish(ctx, "n", "t", nil, nil, false); err != nil {
|
||||
t.Fatalf("expected nil, got %v", err)
|
||||
}
|
||||
if err := a.RouteMessage(ctx, "n", "c", "q", nil); err != nil {
|
||||
t.Fatalf("expected nil, got %v", err)
|
||||
}
|
||||
|
||||
consumers, err := a.ListConsumers(ctx, "q")
|
||||
if err != nil || consumers != nil {
|
||||
t.Fatalf("expected (nil, nil), got (%v, %v)", consumers, err)
|
||||
}
|
||||
|
||||
all, err := a.ListAllConsumers(ctx)
|
||||
if err != nil || all != nil {
|
||||
t.Fatalf("expected (nil, nil), got (%v, %v)", all, err)
|
||||
}
|
||||
|
||||
// ForwardToRemoteNodes should be a no-op (no panic)
|
||||
a.ForwardToRemoteNodes(ctx, types.PublishRequest{}, func(string) bool { return false })
|
||||
|
||||
if id := a.LocalNodeID(); id != "" {
|
||||
t.Fatalf("expected empty, got %q", id)
|
||||
}
|
||||
if a.IsRemote("any-node") {
|
||||
t.Fatal("noop adapter should never report remote")
|
||||
}
|
||||
if a.IsRemote("") {
|
||||
t.Fatal("noop adapter should not report empty string as remote")
|
||||
}
|
||||
if mode := a.DistributionMode(); mode != "" {
|
||||
t.Fatalf("expected empty, got %q", mode)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClusterAdapterDelegates(t *testing.T) {
|
||||
spy := newMockCluster("node-1")
|
||||
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
|
||||
|
||||
a := &clusterAdapter{
|
||||
cluster: spy,
|
||||
localNodeID: "node-1",
|
||||
distributionMode: DistributionForward,
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// RegisterConsumer
|
||||
info := &cluster.QueueConsumerInfo{QueueName: "q", ConsumerID: "c"}
|
||||
if err := a.RegisterConsumer(ctx, info); err != nil {
|
||||
t.Fatalf("RegisterConsumer: %v", err)
|
||||
}
|
||||
registered := spy.GetRegisteredQueueConsumers()
|
||||
if len(registered) != 1 || registered[0].QueueName != "q" {
|
||||
t.Fatalf("expected 1 registered consumer, got %d", len(registered))
|
||||
}
|
||||
|
||||
// ForwardPublish
|
||||
if err := a.ForwardPublish(ctx, "node-2", "topic", []byte("p"), nil, true); err != nil {
|
||||
t.Fatalf("ForwardPublish: %v", err)
|
||||
}
|
||||
calls := spy.GetForwardCalls()
|
||||
if len(calls) != 1 || calls[0].nodeID != "node-2" || !calls[0].forwardToLeader {
|
||||
t.Fatalf("unexpected forward call: %+v", calls)
|
||||
}
|
||||
|
||||
// RouteMessage
|
||||
if err := a.RouteMessage(ctx, "node-2", "client-1", "q", &cluster.QueueMessage{MessageID: "m1"}); err != nil {
|
||||
t.Fatalf("RouteMessage: %v", err)
|
||||
}
|
||||
routed := spy.GetRoutedMessages()
|
||||
if len(routed) != 1 || routed[0].nodeID != "node-2" {
|
||||
t.Fatalf("unexpected routed messages: %+v", routed)
|
||||
}
|
||||
|
||||
// ListConsumers
|
||||
spy.SetQueueConsumers([]*cluster.QueueConsumerInfo{
|
||||
{QueueName: "q", ProxyNodeID: "node-1"},
|
||||
{QueueName: "q", ProxyNodeID: "node-2"},
|
||||
{QueueName: "other", ProxyNodeID: "node-3"},
|
||||
})
|
||||
consumers, err := a.ListConsumers(ctx, "q")
|
||||
if err != nil {
|
||||
t.Fatalf("ListConsumers: %v", err)
|
||||
}
|
||||
if len(consumers) != 2 {
|
||||
t.Fatalf("expected 2 consumers for queue q, got %d", len(consumers))
|
||||
}
|
||||
|
||||
// ListAllConsumers
|
||||
all, err := a.ListAllConsumers(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("ListAllConsumers: %v", err)
|
||||
}
|
||||
if len(all) != 3 {
|
||||
t.Fatalf("expected 3 total consumers, got %d", len(all))
|
||||
}
|
||||
|
||||
// LocalNodeID / IsRemote
|
||||
if a.LocalNodeID() != "node-1" {
|
||||
t.Fatalf("expected node-1, got %q", a.LocalNodeID())
|
||||
}
|
||||
if a.IsRemote("node-1") {
|
||||
t.Fatal("local node should not be remote")
|
||||
}
|
||||
if !a.IsRemote("node-2") {
|
||||
t.Fatal("node-2 should be remote")
|
||||
}
|
||||
if a.IsRemote("") {
|
||||
t.Fatal("empty nodeID should not be remote")
|
||||
}
|
||||
|
||||
// DistributionMode
|
||||
if a.DistributionMode() != DistributionForward {
|
||||
t.Fatalf("expected forward, got %q", a.DistributionMode())
|
||||
}
|
||||
}
|
||||
|
||||
func TestClusterAdapterForwardToRemoteNodes(t *testing.T) {
|
||||
spy := newMockCluster("node-1")
|
||||
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
|
||||
|
||||
a := &clusterAdapter{
|
||||
cluster: spy,
|
||||
localNodeID: "node-1",
|
||||
distributionMode: DistributionForward,
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
spy.SetQueueConsumers([]*cluster.QueueConsumerInfo{
|
||||
{QueueName: "orders", ProxyNodeID: "node-1"}, // local — should be skipped
|
||||
{QueueName: "orders", ProxyNodeID: "node-2"}, // remote — should forward
|
||||
})
|
||||
|
||||
ctx := context.Background()
|
||||
a.ForwardToRemoteNodes(ctx, types.PublishRequest{
|
||||
Topic: "$queue/orders/new",
|
||||
Payload: []byte("data"),
|
||||
}, func(string) bool { return false })
|
||||
|
||||
calls := spy.GetForwardCalls()
|
||||
if len(calls) != 1 {
|
||||
t.Fatalf("expected 1 forward call, got %d", len(calls))
|
||||
}
|
||||
if calls[0].nodeID != "node-2" {
|
||||
t.Fatalf("expected forward to node-2, got %s", calls[0].nodeID)
|
||||
}
|
||||
if calls[0].forwardToLeader {
|
||||
t.Fatal("expected forwardToLeader=false for remote forwarding")
|
||||
}
|
||||
}
|
||||
|
||||
func TestClusterAdapterForwardToRemoteNodesReplicateMode(t *testing.T) {
|
||||
spy := newMockCluster("node-1")
|
||||
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
|
||||
|
||||
a := &clusterAdapter{
|
||||
cluster: spy,
|
||||
localNodeID: "node-1",
|
||||
distributionMode: DistributionReplicate,
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
spy.SetQueueConsumers([]*cluster.QueueConsumerInfo{
|
||||
{QueueName: "orders", ProxyNodeID: "node-2"},
|
||||
})
|
||||
|
||||
var mu sync.Mutex
|
||||
knownQueues := map[string]bool{"orders": true}
|
||||
|
||||
ctx := context.Background()
|
||||
a.ForwardToRemoteNodes(ctx, types.PublishRequest{
|
||||
Topic: "$queue/orders/new",
|
||||
Payload: []byte("data"),
|
||||
}, func(name string) bool {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
return knownQueues[name]
|
||||
})
|
||||
|
||||
calls := spy.GetForwardCalls()
|
||||
if len(calls) != 0 {
|
||||
t.Fatalf("replicate mode should skip known queues, got %d forward calls", len(calls))
|
||||
}
|
||||
|
||||
// Unknown queue should forward
|
||||
spy.SetQueueConsumers([]*cluster.QueueConsumerInfo{
|
||||
{QueueName: "unknown", ProxyNodeID: "node-2"},
|
||||
})
|
||||
|
||||
a.ForwardToRemoteNodes(ctx, types.PublishRequest{
|
||||
Topic: "$queue/unknown/msg",
|
||||
Payload: []byte("data"),
|
||||
}, func(name string) bool {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
return knownQueues[name]
|
||||
})
|
||||
|
||||
calls = spy.GetForwardCalls()
|
||||
if len(calls) != 1 {
|
||||
t.Fatalf("replicate mode should forward unknown queues, got %d forward calls", len(calls))
|
||||
}
|
||||
}
|
||||
+83
-134
@@ -24,21 +24,19 @@ import (
|
||||
// Manager is the queue-based queue manager.
|
||||
// It uses append-only logs with cursor-based consumer groups, NATS JetQueue-style.
|
||||
type Manager struct {
|
||||
queueStore storage.QueueStore
|
||||
groupStore storage.ConsumerGroupStore
|
||||
consumerManager *consumer.Manager
|
||||
deliveryTarget Deliverer
|
||||
logger *slog.Logger
|
||||
config Config
|
||||
writePolicy WritePolicy
|
||||
distributionMode DistributionMode
|
||||
queueStore storage.QueueStore
|
||||
groupStore storage.ConsumerGroupStore
|
||||
consumerManager *consumer.Manager
|
||||
deliveryTarget Deliverer
|
||||
logger *slog.Logger
|
||||
config Config
|
||||
writePolicy WritePolicy
|
||||
|
||||
// Raft replication manager
|
||||
raftManager *raft.Manager
|
||||
|
||||
// Cluster support for cross-node message routing
|
||||
cluster cluster.Cluster
|
||||
localNodeID string
|
||||
// Cluster adapter for cross-node message routing
|
||||
clusterAdapter ClusterAdapter
|
||||
|
||||
// Lightweight heartbeat index keyed by client/queue/group.
|
||||
// Stores only metadata needed to route heartbeat updates.
|
||||
@@ -133,9 +131,16 @@ func NewManager(queueStore storage.QueueStore, groupStore storage.ConsumerGroupS
|
||||
|
||||
consumerMgr := consumer.NewManager(queueStore, groupStore, consumerCfg)
|
||||
|
||||
var localNodeID string
|
||||
var ca ClusterAdapter
|
||||
if cl != nil {
|
||||
localNodeID = cl.NodeID()
|
||||
ca = &clusterAdapter{
|
||||
cluster: cl,
|
||||
localNodeID: cl.NodeID(),
|
||||
distributionMode: normalizeDistributionMode(config.DistributionMode),
|
||||
logger: logger,
|
||||
}
|
||||
} else {
|
||||
ca = noopClusterAdapter{}
|
||||
}
|
||||
|
||||
return &Manager{
|
||||
@@ -144,25 +149,25 @@ func NewManager(queueStore storage.QueueStore, groupStore storage.ConsumerGroupS
|
||||
consumerManager: consumerMgr,
|
||||
deliveryTarget: dt,
|
||||
|
||||
logger: logger,
|
||||
config: config,
|
||||
writePolicy: normalizeWritePolicy(config.WritePolicy),
|
||||
distributionMode: normalizeDistributionMode(config.DistributionMode),
|
||||
cluster: cl,
|
||||
localNodeID: localNodeID,
|
||||
subscriptions: make(map[string]map[string]*subscriptionRef),
|
||||
stopCh: make(chan struct{}),
|
||||
deliverySet: make(map[string]struct{}),
|
||||
deliveryQueue: make(chan string, 4096),
|
||||
metrics: metrics,
|
||||
logger: logger,
|
||||
config: config,
|
||||
writePolicy: normalizeWritePolicy(config.WritePolicy),
|
||||
clusterAdapter: ca,
|
||||
subscriptions: make(map[string]map[string]*subscriptionRef),
|
||||
stopCh: make(chan struct{}),
|
||||
deliverySet: make(map[string]struct{}),
|
||||
deliveryQueue: make(chan string, 4096),
|
||||
metrics: metrics,
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts background workers.
|
||||
func (m *Manager) Start(ctx context.Context) error {
|
||||
if m.distributionMode == DistributionReplicate && (m.raftManager == nil || !m.raftManager.IsEnabled()) {
|
||||
if m.clusterAdapter.DistributionMode() == DistributionReplicate && (m.raftManager == nil || !m.raftManager.IsEnabled()) {
|
||||
m.logger.Warn("distribution_mode=replicate requires raft to be enabled; falling back to forward")
|
||||
m.distributionMode = DistributionForward
|
||||
if a, ok := m.clusterAdapter.(*clusterAdapter); ok {
|
||||
a.distributionMode = DistributionForward
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure reserved queues exist
|
||||
@@ -388,16 +393,7 @@ func (m *Manager) Publish(ctx context.Context, publish types.PublishRequest) err
|
||||
}
|
||||
|
||||
// Forward to remote nodes that have consumers
|
||||
if m.cluster != nil {
|
||||
switch m.distributionMode {
|
||||
case DistributionForward:
|
||||
m.forwardToRemoteNodes(ctx, publish, false)
|
||||
case DistributionReplicate:
|
||||
// In replicate mode, only forward to nodes whose queues do not exist locally.
|
||||
// This avoids duplicates while still supporting locally-created queues on remote nodes.
|
||||
m.forwardToRemoteNodes(ctx, publish, true)
|
||||
}
|
||||
}
|
||||
m.clusterAdapter.ForwardToRemoteNodes(ctx, publish, m.queueExists(ctx))
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -502,25 +498,17 @@ func autoQueueFromTopic(topic string) (queueName, pattern string) {
|
||||
return topic, topic
|
||||
}
|
||||
|
||||
// forwardToRemoteNodes forwards a publish to nodes that have consumers for the topic.
|
||||
func (m *Manager) forwardToRemoteNodes(ctx context.Context, publish types.PublishRequest, unknownOnly bool) {
|
||||
// Get all consumers from the cluster
|
||||
consumers, err := m.cluster.ListAllQueueConsumers(ctx)
|
||||
if err != nil {
|
||||
m.logger.Debug("failed to list cluster consumers for forwarding",
|
||||
slog.String("error", err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
queueExistsCache := make(map[string]bool)
|
||||
queueExists := func(queueName string) bool {
|
||||
if exists, ok := queueExistsCache[queueName]; ok {
|
||||
// queueExists returns a cached closure that checks whether a queue exists in the local store.
|
||||
func (m *Manager) queueExists(ctx context.Context) func(string) bool {
|
||||
cache := make(map[string]bool)
|
||||
return func(queueName string) bool {
|
||||
if exists, ok := cache[queueName]; ok {
|
||||
return exists
|
||||
}
|
||||
|
||||
_, err := m.queueStore.GetQueue(ctx, queueName)
|
||||
if err == nil {
|
||||
queueExistsCache[queueName] = true
|
||||
cache[queueName] = true
|
||||
return true
|
||||
}
|
||||
if err != storage.ErrQueueNotFound {
|
||||
@@ -529,42 +517,9 @@ func (m *Manager) forwardToRemoteNodes(ctx context.Context, publish types.Publis
|
||||
slog.String("error", err.Error()))
|
||||
}
|
||||
|
||||
queueExistsCache[queueName] = false
|
||||
cache[queueName] = false
|
||||
return false
|
||||
}
|
||||
|
||||
// Find unique remote nodes that have consumers for queues matching this topic
|
||||
remoteNodes := make(map[string]bool)
|
||||
for _, c := range consumers {
|
||||
// Skip local consumers
|
||||
if c.ProxyNodeID == m.localNodeID {
|
||||
continue
|
||||
}
|
||||
|
||||
if unknownOnly && queueExists(c.QueueName) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if this consumer's queue pattern matches the topic
|
||||
queuePattern := "$queue/" + c.QueueName + "/#"
|
||||
if matchesTopic(queuePattern, publish.Topic) {
|
||||
remoteNodes[c.ProxyNodeID] = true
|
||||
}
|
||||
}
|
||||
|
||||
// Forward to each unique remote node
|
||||
for nodeID := range remoteNodes {
|
||||
if err := m.cluster.ForwardQueuePublish(ctx, nodeID, publish.Topic, publish.Payload, publish.Properties, false); err != nil {
|
||||
m.logger.Warn("failed to forward publish to remote node",
|
||||
slog.String("node", nodeID),
|
||||
slog.String("topic", publish.Topic),
|
||||
slog.String("error", err.Error()))
|
||||
} else {
|
||||
m.logger.Debug("forwarded publish to remote node",
|
||||
slog.String("node", nodeID),
|
||||
slog.String("topic", publish.Topic))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// matchesTopic checks if a filter pattern matches a topic using MQTT wildcard rules.
|
||||
@@ -585,8 +540,8 @@ func (m *Manager) Enqueue(ctx context.Context, topic string, payload []byte, pro
|
||||
|
||||
// SubscribeWithCursor adds a consumer with explicit cursor positioning.
|
||||
func (m *Manager) SubscribeWithCursor(ctx context.Context, queueName, pattern string, clientID, groupID, proxyNodeID string, cursor *types.CursorOption) error {
|
||||
if proxyNodeID == "" && m.localNodeID != "" {
|
||||
proxyNodeID = m.localNodeID
|
||||
if proxyNodeID == "" {
|
||||
proxyNodeID = m.clusterAdapter.LocalNodeID()
|
||||
}
|
||||
|
||||
mode := types.GroupModeQueue
|
||||
@@ -676,22 +631,20 @@ func (m *Manager) SubscribeWithCursor(ctx context.Context, queueName, pattern st
|
||||
// Clear ephemeral disconnect timestamp since we now have a consumer
|
||||
m.clearEphemeralDisconnect(ctx, queueName)
|
||||
|
||||
if m.cluster != nil {
|
||||
info := &cluster.QueueConsumerInfo{
|
||||
QueueName: queueName,
|
||||
GroupID: patternGroupID,
|
||||
ConsumerID: clientID,
|
||||
ClientID: clientID,
|
||||
Pattern: pattern,
|
||||
Mode: string(mode),
|
||||
ProxyNodeID: proxyNodeID,
|
||||
RegisteredAt: time.Now(),
|
||||
}
|
||||
if err := m.cluster.RegisterQueueConsumer(ctx, info); err != nil {
|
||||
m.logger.Warn("failed to register consumer in cluster",
|
||||
slog.String("error", err.Error()),
|
||||
slog.String("client", clientID))
|
||||
}
|
||||
info := &cluster.QueueConsumerInfo{
|
||||
QueueName: queueName,
|
||||
GroupID: patternGroupID,
|
||||
ConsumerID: clientID,
|
||||
ClientID: clientID,
|
||||
Pattern: pattern,
|
||||
Mode: string(mode),
|
||||
ProxyNodeID: proxyNodeID,
|
||||
RegisteredAt: time.Now(),
|
||||
}
|
||||
if err := m.clusterAdapter.RegisterConsumer(ctx, info); err != nil {
|
||||
m.logger.Warn("failed to register consumer in cluster",
|
||||
slog.String("error", err.Error()),
|
||||
slog.String("client", clientID))
|
||||
}
|
||||
|
||||
m.trackSubscription(clientID, queueName, patternGroupID)
|
||||
@@ -710,8 +663,8 @@ func (m *Manager) SubscribeWithCursor(ctx context.Context, queueName, pattern st
|
||||
|
||||
// Subscribe adds a consumer to a stream with optional pattern matching.
|
||||
func (m *Manager) Subscribe(ctx context.Context, queueName, pattern string, clientID, groupID, proxyNodeID string) error {
|
||||
if proxyNodeID == "" && m.localNodeID != "" {
|
||||
proxyNodeID = m.localNodeID
|
||||
if proxyNodeID == "" {
|
||||
proxyNodeID = m.clusterAdapter.LocalNodeID()
|
||||
}
|
||||
|
||||
// Ensure queue exists (auto-create if not)
|
||||
@@ -748,22 +701,20 @@ func (m *Manager) Subscribe(ctx context.Context, queueName, pattern string, clie
|
||||
m.clearEphemeralDisconnect(ctx, queueName)
|
||||
|
||||
// Register consumer in cluster for cross-node visibility
|
||||
if m.cluster != nil {
|
||||
info := &cluster.QueueConsumerInfo{
|
||||
QueueName: queueName,
|
||||
GroupID: patternGroupID,
|
||||
ConsumerID: clientID,
|
||||
ClientID: clientID,
|
||||
Pattern: pattern,
|
||||
Mode: string(types.GroupModeQueue),
|
||||
ProxyNodeID: proxyNodeID,
|
||||
RegisteredAt: time.Now(),
|
||||
}
|
||||
if err := m.cluster.RegisterQueueConsumer(ctx, info); err != nil {
|
||||
m.logger.Warn("failed to register consumer in cluster",
|
||||
slog.String("error", err.Error()),
|
||||
slog.String("client", clientID))
|
||||
}
|
||||
info := &cluster.QueueConsumerInfo{
|
||||
QueueName: queueName,
|
||||
GroupID: patternGroupID,
|
||||
ConsumerID: clientID,
|
||||
ClientID: clientID,
|
||||
Pattern: pattern,
|
||||
Mode: string(types.GroupModeQueue),
|
||||
ProxyNodeID: proxyNodeID,
|
||||
RegisteredAt: time.Now(),
|
||||
}
|
||||
if err := m.clusterAdapter.RegisterConsumer(ctx, info); err != nil {
|
||||
m.logger.Warn("failed to register consumer in cluster",
|
||||
slog.String("error", err.Error()),
|
||||
slog.String("client", clientID))
|
||||
}
|
||||
|
||||
// Track subscription
|
||||
@@ -801,12 +752,10 @@ func (m *Manager) Unsubscribe(ctx context.Context, queueName, pattern string, cl
|
||||
}
|
||||
|
||||
// Unregister consumer from cluster
|
||||
if m.cluster != nil {
|
||||
if err := m.cluster.UnregisterQueueConsumer(ctx, queueName, patternGroupID, clientID); err != nil {
|
||||
m.logger.Warn("failed to unregister consumer from cluster",
|
||||
slog.String("error", err.Error()),
|
||||
slog.String("client", clientID))
|
||||
}
|
||||
if err := m.clusterAdapter.UnregisterConsumer(ctx, queueName, patternGroupID, clientID); err != nil {
|
||||
m.logger.Warn("failed to unregister consumer from cluster",
|
||||
slog.String("error", err.Error()),
|
||||
slog.String("client", clientID))
|
||||
}
|
||||
|
||||
// Untrack subscription
|
||||
@@ -1145,7 +1094,7 @@ func (m *Manager) deliverQueueConfig(ctx context.Context, queueConfig *types.Que
|
||||
}
|
||||
|
||||
// Deliver to remote consumers registered in cluster.
|
||||
if m.cluster != nil && m.distributionMode == DistributionForward {
|
||||
if m.clusterAdapter.DistributionMode() == DistributionForward {
|
||||
if m.deliverToRemoteConsumers(ctx, queueConfig) {
|
||||
delivered = true
|
||||
}
|
||||
@@ -1155,7 +1104,7 @@ func (m *Manager) deliverQueueConfig(ctx context.Context, queueConfig *types.Que
|
||||
}
|
||||
|
||||
func (m *Manager) forwardPublishToLeader(ctx context.Context, publish types.PublishRequest) error {
|
||||
if m.cluster == nil {
|
||||
if m.clusterAdapter.LocalNodeID() == "" {
|
||||
return fmt.Errorf("cluster not configured for leader forward")
|
||||
}
|
||||
|
||||
@@ -1164,14 +1113,14 @@ func (m *Manager) forwardPublishToLeader(ctx context.Context, publish types.Publ
|
||||
return fmt.Errorf("raft leader unavailable")
|
||||
}
|
||||
|
||||
return m.cluster.ForwardQueuePublish(ctx, leaderID, publish.Topic, publish.Payload, publish.Properties, true)
|
||||
return m.clusterAdapter.ForwardPublish(ctx, leaderID, publish.Topic, publish.Payload, publish.Properties, true)
|
||||
}
|
||||
|
||||
// deliverToRemoteConsumers delivers messages to consumers registered on remote nodes.
|
||||
// This enables cross-node queue message routing.
|
||||
func (m *Manager) deliverToRemoteConsumers(ctx context.Context, config *types.QueueConfig) bool {
|
||||
// Get all consumers for this queue from the cluster
|
||||
consumers, err := m.cluster.ListQueueConsumers(ctx, config.Name)
|
||||
consumers, err := m.clusterAdapter.ListConsumers(ctx, config.Name)
|
||||
if err != nil {
|
||||
m.logger.Debug("failed to list cluster consumers",
|
||||
slog.String("queue", config.Name),
|
||||
@@ -1183,7 +1132,7 @@ func (m *Manager) deliverToRemoteConsumers(ctx context.Context, config *types.Qu
|
||||
consumersByGroup := make(map[string][]*cluster.QueueConsumerInfo)
|
||||
for _, c := range consumers {
|
||||
// Only process consumers on remote nodes
|
||||
if c.ProxyNodeID == m.localNodeID {
|
||||
if !m.clusterAdapter.IsRemote(c.ProxyNodeID) {
|
||||
continue
|
||||
}
|
||||
consumersByGroup[c.GroupID] = append(consumersByGroup[c.GroupID], c)
|
||||
@@ -1249,7 +1198,7 @@ func (m *Manager) deliverToRemoteConsumers(ctx context.Context, config *types.Qu
|
||||
config.PrimaryGroup,
|
||||
)
|
||||
|
||||
err := m.cluster.RouteQueueMessage(
|
||||
err := m.clusterAdapter.RouteMessage(
|
||||
ctx,
|
||||
consumerInfo.ProxyNodeID,
|
||||
consumerInfo.ClientID,
|
||||
@@ -1333,7 +1282,7 @@ func (m *Manager) deliverToGroup(ctx context.Context, config *types.QueueConfig,
|
||||
|
||||
for _, msg := range msgs {
|
||||
// Check if consumer is on a remote node
|
||||
if m.cluster != nil && consumerInfo.ProxyNodeID != "" && consumerInfo.ProxyNodeID != m.localNodeID {
|
||||
if m.clusterAdapter.IsRemote(consumerInfo.ProxyNodeID) {
|
||||
// Route to remote node
|
||||
routeMsg := m.createRoutedQueueMessage(
|
||||
msg,
|
||||
@@ -1344,7 +1293,7 @@ func (m *Manager) deliverToGroup(ctx context.Context, config *types.QueueConfig,
|
||||
hasWorkCommitted,
|
||||
config.PrimaryGroup,
|
||||
)
|
||||
err := m.cluster.RouteQueueMessage(
|
||||
err := m.clusterAdapter.RouteMessage(
|
||||
ctx,
|
||||
consumerInfo.ProxyNodeID,
|
||||
consumerInfo.ClientID,
|
||||
|
||||
Reference in New Issue
Block a user