Revert "Split Queue manager"

This reverts commit 25fa02b763.
This commit is contained in:
dusan
2026-02-10 21:12:12 +01:00
parent 8edb3e5e5d
commit 52aa3bfe96
3 changed files with 134 additions and 453 deletions
-140
View File
@@ -1,140 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package queue
import (
"context"
"log/slog"
"github.com/absmach/fluxmq/cluster"
"github.com/absmach/fluxmq/queue/types"
)
// ClusterAdapter abstracts all cluster interactions for the queue Manager.
// Implementations are nil-safe: a noopClusterAdapter is used in single-node mode.
type ClusterAdapter interface {
RegisterConsumer(ctx context.Context, info *cluster.QueueConsumerInfo) error
UnregisterConsumer(ctx context.Context, queueName, groupID, consumerID string) error
ForwardPublish(ctx context.Context, nodeID, topic string, payload []byte, properties map[string]string, forwardToLeader bool) error
RouteMessage(ctx context.Context, nodeID, clientID, queueName string, msg *cluster.QueueMessage) error
ListConsumers(ctx context.Context, queueName string) ([]*cluster.QueueConsumerInfo, error)
ListAllConsumers(ctx context.Context) ([]*cluster.QueueConsumerInfo, error)
ForwardToRemoteNodes(ctx context.Context, publish types.PublishRequest, queueExists func(string) bool)
LocalNodeID() string
IsRemote(nodeID string) bool
DistributionMode() DistributionMode
}
// clusterAdapter wraps a cluster.Cluster with distribution-mode awareness.
type clusterAdapter struct {
cluster cluster.Cluster
localNodeID string
distributionMode DistributionMode
logger *slog.Logger
}
func (a *clusterAdapter) RegisterConsumer(ctx context.Context, info *cluster.QueueConsumerInfo) error {
return a.cluster.RegisterQueueConsumer(ctx, info)
}
func (a *clusterAdapter) UnregisterConsumer(ctx context.Context, queueName, groupID, consumerID string) error {
return a.cluster.UnregisterQueueConsumer(ctx, queueName, groupID, consumerID)
}
func (a *clusterAdapter) ForwardPublish(ctx context.Context, nodeID, topic string, payload []byte, properties map[string]string, forwardToLeader bool) error {
return a.cluster.ForwardQueuePublish(ctx, nodeID, topic, payload, properties, forwardToLeader)
}
func (a *clusterAdapter) RouteMessage(ctx context.Context, nodeID, clientID, queueName string, msg *cluster.QueueMessage) error {
return a.cluster.RouteQueueMessage(ctx, nodeID, clientID, queueName, msg)
}
func (a *clusterAdapter) ListConsumers(ctx context.Context, queueName string) ([]*cluster.QueueConsumerInfo, error) {
return a.cluster.ListQueueConsumers(ctx, queueName)
}
func (a *clusterAdapter) ListAllConsumers(ctx context.Context) ([]*cluster.QueueConsumerInfo, error) {
return a.cluster.ListAllQueueConsumers(ctx)
}
func (a *clusterAdapter) ForwardToRemoteNodes(ctx context.Context, publish types.PublishRequest, queueExists func(string) bool) {
switch a.distributionMode {
case DistributionForward:
a.forwardToRemoteNodes(ctx, publish, false, queueExists)
case DistributionReplicate:
a.forwardToRemoteNodes(ctx, publish, true, queueExists)
}
}
func (a *clusterAdapter) forwardToRemoteNodes(ctx context.Context, publish types.PublishRequest, unknownOnly bool, queueExists func(string) bool) {
consumers, err := a.cluster.ListAllQueueConsumers(ctx)
if err != nil {
a.logger.Debug("failed to list cluster consumers for forwarding",
slog.String("error", err.Error()))
return
}
remoteNodes := make(map[string]bool)
for _, c := range consumers {
if c.ProxyNodeID == a.localNodeID {
continue
}
if unknownOnly && queueExists(c.QueueName) {
continue
}
queuePattern := "$queue/" + c.QueueName + "/#"
if matchesTopic(queuePattern, publish.Topic) {
remoteNodes[c.ProxyNodeID] = true
}
}
for nodeID := range remoteNodes {
if err := a.cluster.ForwardQueuePublish(ctx, nodeID, publish.Topic, publish.Payload, publish.Properties, false); err != nil {
a.logger.Warn("failed to forward publish to remote node",
slog.String("node", nodeID),
slog.String("topic", publish.Topic),
slog.String("error", err.Error()))
} else {
a.logger.Debug("forwarded publish to remote node",
slog.String("node", nodeID),
slog.String("topic", publish.Topic))
}
}
}
func (a *clusterAdapter) LocalNodeID() string { return a.localNodeID }
func (a *clusterAdapter) DistributionMode() DistributionMode { return a.distributionMode }
func (a *clusterAdapter) IsRemote(nodeID string) bool {
return nodeID != "" && nodeID != a.localNodeID
}
// noopClusterAdapter is used in single-node mode. All methods are no-ops.
type noopClusterAdapter struct{}
func (noopClusterAdapter) RegisterConsumer(context.Context, *cluster.QueueConsumerInfo) error {
return nil
}
func (noopClusterAdapter) UnregisterConsumer(context.Context, string, string, string) error {
return nil
}
func (noopClusterAdapter) ForwardPublish(context.Context, string, string, []byte, map[string]string, bool) error {
return nil
}
func (noopClusterAdapter) RouteMessage(context.Context, string, string, string, *cluster.QueueMessage) error {
return nil
}
func (noopClusterAdapter) ListConsumers(context.Context, string) ([]*cluster.QueueConsumerInfo, error) {
return nil, nil
}
func (noopClusterAdapter) ListAllConsumers(context.Context) ([]*cluster.QueueConsumerInfo, error) {
return nil, nil
}
func (noopClusterAdapter) ForwardToRemoteNodes(context.Context, types.PublishRequest, func(string) bool) {
}
func (noopClusterAdapter) LocalNodeID() string { return "" }
func (noopClusterAdapter) IsRemote(string) bool { return false }
func (noopClusterAdapter) DistributionMode() DistributionMode { return "" }
-230
View File
@@ -1,230 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package queue
import (
"context"
"io"
"log/slog"
"sync"
"testing"
"github.com/absmach/fluxmq/cluster"
"github.com/absmach/fluxmq/queue/types"
)
func TestNoopClusterAdapter(t *testing.T) {
var a ClusterAdapter = noopClusterAdapter{}
ctx := context.Background()
if err := a.RegisterConsumer(ctx, &cluster.QueueConsumerInfo{}); err != nil {
t.Fatalf("expected nil, got %v", err)
}
if err := a.UnregisterConsumer(ctx, "q", "g", "c"); err != nil {
t.Fatalf("expected nil, got %v", err)
}
if err := a.ForwardPublish(ctx, "n", "t", nil, nil, false); err != nil {
t.Fatalf("expected nil, got %v", err)
}
if err := a.RouteMessage(ctx, "n", "c", "q", nil); err != nil {
t.Fatalf("expected nil, got %v", err)
}
consumers, err := a.ListConsumers(ctx, "q")
if err != nil || consumers != nil {
t.Fatalf("expected (nil, nil), got (%v, %v)", consumers, err)
}
all, err := a.ListAllConsumers(ctx)
if err != nil || all != nil {
t.Fatalf("expected (nil, nil), got (%v, %v)", all, err)
}
// ForwardToRemoteNodes should be a no-op (no panic)
a.ForwardToRemoteNodes(ctx, types.PublishRequest{}, func(string) bool { return false })
if id := a.LocalNodeID(); id != "" {
t.Fatalf("expected empty, got %q", id)
}
if a.IsRemote("any-node") {
t.Fatal("noop adapter should never report remote")
}
if a.IsRemote("") {
t.Fatal("noop adapter should not report empty string as remote")
}
if mode := a.DistributionMode(); mode != "" {
t.Fatalf("expected empty, got %q", mode)
}
}
func TestClusterAdapterDelegates(t *testing.T) {
spy := newMockCluster("node-1")
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
a := &clusterAdapter{
cluster: spy,
localNodeID: "node-1",
distributionMode: DistributionForward,
logger: logger,
}
ctx := context.Background()
// RegisterConsumer
info := &cluster.QueueConsumerInfo{QueueName: "q", ConsumerID: "c"}
if err := a.RegisterConsumer(ctx, info); err != nil {
t.Fatalf("RegisterConsumer: %v", err)
}
registered := spy.GetRegisteredQueueConsumers()
if len(registered) != 1 || registered[0].QueueName != "q" {
t.Fatalf("expected 1 registered consumer, got %d", len(registered))
}
// ForwardPublish
if err := a.ForwardPublish(ctx, "node-2", "topic", []byte("p"), nil, true); err != nil {
t.Fatalf("ForwardPublish: %v", err)
}
calls := spy.GetForwardCalls()
if len(calls) != 1 || calls[0].nodeID != "node-2" || !calls[0].forwardToLeader {
t.Fatalf("unexpected forward call: %+v", calls)
}
// RouteMessage
if err := a.RouteMessage(ctx, "node-2", "client-1", "q", &cluster.QueueMessage{MessageID: "m1"}); err != nil {
t.Fatalf("RouteMessage: %v", err)
}
routed := spy.GetRoutedMessages()
if len(routed) != 1 || routed[0].nodeID != "node-2" {
t.Fatalf("unexpected routed messages: %+v", routed)
}
// ListConsumers
spy.SetQueueConsumers([]*cluster.QueueConsumerInfo{
{QueueName: "q", ProxyNodeID: "node-1"},
{QueueName: "q", ProxyNodeID: "node-2"},
{QueueName: "other", ProxyNodeID: "node-3"},
})
consumers, err := a.ListConsumers(ctx, "q")
if err != nil {
t.Fatalf("ListConsumers: %v", err)
}
if len(consumers) != 2 {
t.Fatalf("expected 2 consumers for queue q, got %d", len(consumers))
}
// ListAllConsumers
all, err := a.ListAllConsumers(ctx)
if err != nil {
t.Fatalf("ListAllConsumers: %v", err)
}
if len(all) != 3 {
t.Fatalf("expected 3 total consumers, got %d", len(all))
}
// LocalNodeID / IsRemote
if a.LocalNodeID() != "node-1" {
t.Fatalf("expected node-1, got %q", a.LocalNodeID())
}
if a.IsRemote("node-1") {
t.Fatal("local node should not be remote")
}
if !a.IsRemote("node-2") {
t.Fatal("node-2 should be remote")
}
if a.IsRemote("") {
t.Fatal("empty nodeID should not be remote")
}
// DistributionMode
if a.DistributionMode() != DistributionForward {
t.Fatalf("expected forward, got %q", a.DistributionMode())
}
}
func TestClusterAdapterForwardToRemoteNodes(t *testing.T) {
spy := newMockCluster("node-1")
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
a := &clusterAdapter{
cluster: spy,
localNodeID: "node-1",
distributionMode: DistributionForward,
logger: logger,
}
spy.SetQueueConsumers([]*cluster.QueueConsumerInfo{
{QueueName: "orders", ProxyNodeID: "node-1"}, // local — should be skipped
{QueueName: "orders", ProxyNodeID: "node-2"}, // remote — should forward
})
ctx := context.Background()
a.ForwardToRemoteNodes(ctx, types.PublishRequest{
Topic: "$queue/orders/new",
Payload: []byte("data"),
}, func(string) bool { return false })
calls := spy.GetForwardCalls()
if len(calls) != 1 {
t.Fatalf("expected 1 forward call, got %d", len(calls))
}
if calls[0].nodeID != "node-2" {
t.Fatalf("expected forward to node-2, got %s", calls[0].nodeID)
}
if calls[0].forwardToLeader {
t.Fatal("expected forwardToLeader=false for remote forwarding")
}
}
func TestClusterAdapterForwardToRemoteNodesReplicateMode(t *testing.T) {
spy := newMockCluster("node-1")
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
a := &clusterAdapter{
cluster: spy,
localNodeID: "node-1",
distributionMode: DistributionReplicate,
logger: logger,
}
spy.SetQueueConsumers([]*cluster.QueueConsumerInfo{
{QueueName: "orders", ProxyNodeID: "node-2"},
})
var mu sync.Mutex
knownQueues := map[string]bool{"orders": true}
ctx := context.Background()
a.ForwardToRemoteNodes(ctx, types.PublishRequest{
Topic: "$queue/orders/new",
Payload: []byte("data"),
}, func(name string) bool {
mu.Lock()
defer mu.Unlock()
return knownQueues[name]
})
calls := spy.GetForwardCalls()
if len(calls) != 0 {
t.Fatalf("replicate mode should skip known queues, got %d forward calls", len(calls))
}
// Unknown queue should forward
spy.SetQueueConsumers([]*cluster.QueueConsumerInfo{
{QueueName: "unknown", ProxyNodeID: "node-2"},
})
a.ForwardToRemoteNodes(ctx, types.PublishRequest{
Topic: "$queue/unknown/msg",
Payload: []byte("data"),
}, func(name string) bool {
mu.Lock()
defer mu.Unlock()
return knownQueues[name]
})
calls = spy.GetForwardCalls()
if len(calls) != 1 {
t.Fatalf("replicate mode should forward unknown queues, got %d forward calls", len(calls))
}
}
+134 -83
View File
@@ -24,19 +24,21 @@ import (
// Manager is the queue-based queue manager.
// It uses append-only logs with cursor-based consumer groups, NATS JetQueue-style.
type Manager struct {
queueStore storage.QueueStore
groupStore storage.ConsumerGroupStore
consumerManager *consumer.Manager
deliveryTarget Deliverer
logger *slog.Logger
config Config
writePolicy WritePolicy
queueStore storage.QueueStore
groupStore storage.ConsumerGroupStore
consumerManager *consumer.Manager
deliveryTarget Deliverer
logger *slog.Logger
config Config
writePolicy WritePolicy
distributionMode DistributionMode
// Raft replication manager
raftManager *raft.Manager
// Cluster adapter for cross-node message routing
clusterAdapter ClusterAdapter
// Cluster support for cross-node message routing
cluster cluster.Cluster
localNodeID string
// Lightweight heartbeat index keyed by client/queue/group.
// Stores only metadata needed to route heartbeat updates.
@@ -131,16 +133,9 @@ func NewManager(queueStore storage.QueueStore, groupStore storage.ConsumerGroupS
consumerMgr := consumer.NewManager(queueStore, groupStore, consumerCfg)
var ca ClusterAdapter
var localNodeID string
if cl != nil {
ca = &clusterAdapter{
cluster: cl,
localNodeID: cl.NodeID(),
distributionMode: normalizeDistributionMode(config.DistributionMode),
logger: logger,
}
} else {
ca = noopClusterAdapter{}
localNodeID = cl.NodeID()
}
return &Manager{
@@ -149,25 +144,25 @@ func NewManager(queueStore storage.QueueStore, groupStore storage.ConsumerGroupS
consumerManager: consumerMgr,
deliveryTarget: dt,
logger: logger,
config: config,
writePolicy: normalizeWritePolicy(config.WritePolicy),
clusterAdapter: ca,
subscriptions: make(map[string]map[string]*subscriptionRef),
stopCh: make(chan struct{}),
deliverySet: make(map[string]struct{}),
deliveryQueue: make(chan string, 4096),
metrics: metrics,
logger: logger,
config: config,
writePolicy: normalizeWritePolicy(config.WritePolicy),
distributionMode: normalizeDistributionMode(config.DistributionMode),
cluster: cl,
localNodeID: localNodeID,
subscriptions: make(map[string]map[string]*subscriptionRef),
stopCh: make(chan struct{}),
deliverySet: make(map[string]struct{}),
deliveryQueue: make(chan string, 4096),
metrics: metrics,
}
}
// Start starts background workers.
func (m *Manager) Start(ctx context.Context) error {
if m.clusterAdapter.DistributionMode() == DistributionReplicate && (m.raftManager == nil || !m.raftManager.IsEnabled()) {
if m.distributionMode == DistributionReplicate && (m.raftManager == nil || !m.raftManager.IsEnabled()) {
m.logger.Warn("distribution_mode=replicate requires raft to be enabled; falling back to forward")
if a, ok := m.clusterAdapter.(*clusterAdapter); ok {
a.distributionMode = DistributionForward
}
m.distributionMode = DistributionForward
}
// Ensure reserved queues exist
@@ -393,7 +388,16 @@ func (m *Manager) Publish(ctx context.Context, publish types.PublishRequest) err
}
// Forward to remote nodes that have consumers
m.clusterAdapter.ForwardToRemoteNodes(ctx, publish, m.queueExists(ctx))
if m.cluster != nil {
switch m.distributionMode {
case DistributionForward:
m.forwardToRemoteNodes(ctx, publish, false)
case DistributionReplicate:
// In replicate mode, only forward to nodes whose queues do not exist locally.
// This avoids duplicates while still supporting locally-created queues on remote nodes.
m.forwardToRemoteNodes(ctx, publish, true)
}
}
return nil
}
@@ -498,17 +502,25 @@ func autoQueueFromTopic(topic string) (queueName, pattern string) {
return topic, topic
}
// queueExists returns a cached closure that checks whether a queue exists in the local store.
func (m *Manager) queueExists(ctx context.Context) func(string) bool {
cache := make(map[string]bool)
return func(queueName string) bool {
if exists, ok := cache[queueName]; ok {
// forwardToRemoteNodes forwards a publish to nodes that have consumers for the topic.
func (m *Manager) forwardToRemoteNodes(ctx context.Context, publish types.PublishRequest, unknownOnly bool) {
// Get all consumers from the cluster
consumers, err := m.cluster.ListAllQueueConsumers(ctx)
if err != nil {
m.logger.Debug("failed to list cluster consumers for forwarding",
slog.String("error", err.Error()))
return
}
queueExistsCache := make(map[string]bool)
queueExists := func(queueName string) bool {
if exists, ok := queueExistsCache[queueName]; ok {
return exists
}
_, err := m.queueStore.GetQueue(ctx, queueName)
if err == nil {
cache[queueName] = true
queueExistsCache[queueName] = true
return true
}
if err != storage.ErrQueueNotFound {
@@ -517,9 +529,42 @@ func (m *Manager) queueExists(ctx context.Context) func(string) bool {
slog.String("error", err.Error()))
}
cache[queueName] = false
queueExistsCache[queueName] = false
return false
}
// Find unique remote nodes that have consumers for queues matching this topic
remoteNodes := make(map[string]bool)
for _, c := range consumers {
// Skip local consumers
if c.ProxyNodeID == m.localNodeID {
continue
}
if unknownOnly && queueExists(c.QueueName) {
continue
}
// Check if this consumer's queue pattern matches the topic
queuePattern := "$queue/" + c.QueueName + "/#"
if matchesTopic(queuePattern, publish.Topic) {
remoteNodes[c.ProxyNodeID] = true
}
}
// Forward to each unique remote node
for nodeID := range remoteNodes {
if err := m.cluster.ForwardQueuePublish(ctx, nodeID, publish.Topic, publish.Payload, publish.Properties, false); err != nil {
m.logger.Warn("failed to forward publish to remote node",
slog.String("node", nodeID),
slog.String("topic", publish.Topic),
slog.String("error", err.Error()))
} else {
m.logger.Debug("forwarded publish to remote node",
slog.String("node", nodeID),
slog.String("topic", publish.Topic))
}
}
}
// matchesTopic checks if a filter pattern matches a topic using MQTT wildcard rules.
@@ -540,8 +585,8 @@ func (m *Manager) Enqueue(ctx context.Context, topic string, payload []byte, pro
// SubscribeWithCursor adds a consumer with explicit cursor positioning.
func (m *Manager) SubscribeWithCursor(ctx context.Context, queueName, pattern string, clientID, groupID, proxyNodeID string, cursor *types.CursorOption) error {
if proxyNodeID == "" {
proxyNodeID = m.clusterAdapter.LocalNodeID()
if proxyNodeID == "" && m.localNodeID != "" {
proxyNodeID = m.localNodeID
}
mode := types.GroupModeQueue
@@ -631,20 +676,22 @@ func (m *Manager) SubscribeWithCursor(ctx context.Context, queueName, pattern st
// Clear ephemeral disconnect timestamp since we now have a consumer
m.clearEphemeralDisconnect(ctx, queueName)
info := &cluster.QueueConsumerInfo{
QueueName: queueName,
GroupID: patternGroupID,
ConsumerID: clientID,
ClientID: clientID,
Pattern: pattern,
Mode: string(mode),
ProxyNodeID: proxyNodeID,
RegisteredAt: time.Now(),
}
if err := m.clusterAdapter.RegisterConsumer(ctx, info); err != nil {
m.logger.Warn("failed to register consumer in cluster",
slog.String("error", err.Error()),
slog.String("client", clientID))
if m.cluster != nil {
info := &cluster.QueueConsumerInfo{
QueueName: queueName,
GroupID: patternGroupID,
ConsumerID: clientID,
ClientID: clientID,
Pattern: pattern,
Mode: string(mode),
ProxyNodeID: proxyNodeID,
RegisteredAt: time.Now(),
}
if err := m.cluster.RegisterQueueConsumer(ctx, info); err != nil {
m.logger.Warn("failed to register consumer in cluster",
slog.String("error", err.Error()),
slog.String("client", clientID))
}
}
m.trackSubscription(clientID, queueName, patternGroupID)
@@ -663,8 +710,8 @@ func (m *Manager) SubscribeWithCursor(ctx context.Context, queueName, pattern st
// Subscribe adds a consumer to a stream with optional pattern matching.
func (m *Manager) Subscribe(ctx context.Context, queueName, pattern string, clientID, groupID, proxyNodeID string) error {
if proxyNodeID == "" {
proxyNodeID = m.clusterAdapter.LocalNodeID()
if proxyNodeID == "" && m.localNodeID != "" {
proxyNodeID = m.localNodeID
}
// Ensure queue exists (auto-create if not)
@@ -701,20 +748,22 @@ func (m *Manager) Subscribe(ctx context.Context, queueName, pattern string, clie
m.clearEphemeralDisconnect(ctx, queueName)
// Register consumer in cluster for cross-node visibility
info := &cluster.QueueConsumerInfo{
QueueName: queueName,
GroupID: patternGroupID,
ConsumerID: clientID,
ClientID: clientID,
Pattern: pattern,
Mode: string(types.GroupModeQueue),
ProxyNodeID: proxyNodeID,
RegisteredAt: time.Now(),
}
if err := m.clusterAdapter.RegisterConsumer(ctx, info); err != nil {
m.logger.Warn("failed to register consumer in cluster",
slog.String("error", err.Error()),
slog.String("client", clientID))
if m.cluster != nil {
info := &cluster.QueueConsumerInfo{
QueueName: queueName,
GroupID: patternGroupID,
ConsumerID: clientID,
ClientID: clientID,
Pattern: pattern,
Mode: string(types.GroupModeQueue),
ProxyNodeID: proxyNodeID,
RegisteredAt: time.Now(),
}
if err := m.cluster.RegisterQueueConsumer(ctx, info); err != nil {
m.logger.Warn("failed to register consumer in cluster",
slog.String("error", err.Error()),
slog.String("client", clientID))
}
}
// Track subscription
@@ -752,10 +801,12 @@ func (m *Manager) Unsubscribe(ctx context.Context, queueName, pattern string, cl
}
// Unregister consumer from cluster
if err := m.clusterAdapter.UnregisterConsumer(ctx, queueName, patternGroupID, clientID); err != nil {
m.logger.Warn("failed to unregister consumer from cluster",
slog.String("error", err.Error()),
slog.String("client", clientID))
if m.cluster != nil {
if err := m.cluster.UnregisterQueueConsumer(ctx, queueName, patternGroupID, clientID); err != nil {
m.logger.Warn("failed to unregister consumer from cluster",
slog.String("error", err.Error()),
slog.String("client", clientID))
}
}
// Untrack subscription
@@ -1094,7 +1145,7 @@ func (m *Manager) deliverQueueConfig(ctx context.Context, queueConfig *types.Que
}
// Deliver to remote consumers registered in cluster.
if m.clusterAdapter.DistributionMode() == DistributionForward {
if m.cluster != nil && m.distributionMode == DistributionForward {
if m.deliverToRemoteConsumers(ctx, queueConfig) {
delivered = true
}
@@ -1104,7 +1155,7 @@ func (m *Manager) deliverQueueConfig(ctx context.Context, queueConfig *types.Que
}
func (m *Manager) forwardPublishToLeader(ctx context.Context, publish types.PublishRequest) error {
if m.clusterAdapter.LocalNodeID() == "" {
if m.cluster == nil {
return fmt.Errorf("cluster not configured for leader forward")
}
@@ -1113,14 +1164,14 @@ func (m *Manager) forwardPublishToLeader(ctx context.Context, publish types.Publ
return fmt.Errorf("raft leader unavailable")
}
return m.clusterAdapter.ForwardPublish(ctx, leaderID, publish.Topic, publish.Payload, publish.Properties, true)
return m.cluster.ForwardQueuePublish(ctx, leaderID, publish.Topic, publish.Payload, publish.Properties, true)
}
// deliverToRemoteConsumers delivers messages to consumers registered on remote nodes.
// This enables cross-node queue message routing.
func (m *Manager) deliverToRemoteConsumers(ctx context.Context, config *types.QueueConfig) bool {
// Get all consumers for this queue from the cluster
consumers, err := m.clusterAdapter.ListConsumers(ctx, config.Name)
consumers, err := m.cluster.ListQueueConsumers(ctx, config.Name)
if err != nil {
m.logger.Debug("failed to list cluster consumers",
slog.String("queue", config.Name),
@@ -1132,7 +1183,7 @@ func (m *Manager) deliverToRemoteConsumers(ctx context.Context, config *types.Qu
consumersByGroup := make(map[string][]*cluster.QueueConsumerInfo)
for _, c := range consumers {
// Only process consumers on remote nodes
if !m.clusterAdapter.IsRemote(c.ProxyNodeID) {
if c.ProxyNodeID == m.localNodeID {
continue
}
consumersByGroup[c.GroupID] = append(consumersByGroup[c.GroupID], c)
@@ -1198,7 +1249,7 @@ func (m *Manager) deliverToRemoteConsumers(ctx context.Context, config *types.Qu
config.PrimaryGroup,
)
err := m.clusterAdapter.RouteMessage(
err := m.cluster.RouteQueueMessage(
ctx,
consumerInfo.ProxyNodeID,
consumerInfo.ClientID,
@@ -1282,7 +1333,7 @@ func (m *Manager) deliverToGroup(ctx context.Context, config *types.QueueConfig,
for _, msg := range msgs {
// Check if consumer is on a remote node
if m.clusterAdapter.IsRemote(consumerInfo.ProxyNodeID) {
if m.cluster != nil && consumerInfo.ProxyNodeID != "" && consumerInfo.ProxyNodeID != m.localNodeID {
// Route to remote node
routeMsg := m.createRoutedQueueMessage(
msg,
@@ -1293,7 +1344,7 @@ func (m *Manager) deliverToGroup(ctx context.Context, config *types.QueueConfig,
hasWorkCommitted,
config.PrimaryGroup,
)
err := m.clusterAdapter.RouteMessage(
err := m.cluster.RouteQueueMessage(
ctx,
consumerInfo.ProxyNodeID,
consumerInfo.ClientID,