Fix queue routing in cluster mode

Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
dusan
2026-02-07 01:40:57 +01:00
parent b00b19e141
commit 8262cef0e5
2 changed files with 180 additions and 2 deletions
+25 -2
View File
@@ -368,8 +368,8 @@ func (m *Manager) publishLocal(ctx context.Context, publish types.PublishRequest
if len(queues) == 0 {
m.logger.Debug("no queues match topic, creating new queue", slog.String("topic", publish.Topic))
// Use the topic as the queue name and the topic itself as the matching pattern.
if _, err := m.GetOrCreateQueue(ctx, publish.Topic, publish.Topic); err != nil {
queueName, queuePattern := autoQueueFromTopic(publish.Topic)
if _, err := m.GetOrCreateQueue(ctx, queueName, queuePattern); err != nil {
m.logger.Error("failed to create ephemeral queue", slog.String("topic", publish.Topic), slog.String("error", err.Error()))
return err
}
@@ -428,6 +428,21 @@ func (m *Manager) publishLocal(ctx context.Context, publish types.PublishRequest
return nil
}
func autoQueueFromTopic(topic string) (queueName, pattern string) {
if strings.HasPrefix(topic, "$queue/") {
rest := strings.TrimPrefix(topic, "$queue/")
if rest != "" {
parts := strings.SplitN(rest, "/", 2)
if parts[0] != "" {
queueName = parts[0]
return queueName, "$queue/" + queueName + "/#"
}
}
}
return topic, topic
}
// forwardToRemoteNodes forwards a publish to nodes that have consumers for the topic.
func (m *Manager) forwardToRemoteNodes(ctx context.Context, publish types.PublishRequest, unknownOnly bool) {
// Get all consumers from the cluster
@@ -511,6 +526,10 @@ func (m *Manager) Enqueue(ctx context.Context, topic string, payload []byte, pro
// SubscribeWithCursor adds a consumer with explicit cursor positioning.
func (m *Manager) SubscribeWithCursor(ctx context.Context, queueName, pattern string, clientID, groupID, proxyNodeID string, cursor *types.CursorOption) error {
if proxyNodeID == "" && m.localNodeID != "" {
proxyNodeID = m.localNodeID
}
mode := types.GroupModeQueue
if cursor != nil && cursor.Mode != "" {
mode = cursor.Mode
@@ -630,6 +649,10 @@ func (m *Manager) SubscribeWithCursor(ctx context.Context, queueName, pattern st
// Subscribe adds a consumer to a stream with optional pattern matching.
func (m *Manager) Subscribe(ctx context.Context, queueName, pattern string, clientID, groupID, proxyNodeID string) error {
if proxyNodeID == "" && m.localNodeID != "" {
proxyNodeID = m.localNodeID
}
// Ensure queue exists (auto-create if not)
// Use $queue/<name>/# as the topic pattern so messages published to $queue/<name>/... are captured
queueTopicPattern := "$queue/" + queueName + "/#"
+155
View File
@@ -718,6 +718,8 @@ type mockCluster struct {
forwardCalls []forwardPublishCall
forwardCallsMu sync.Mutex
queueConsumers []*cluster.QueueConsumerInfo
registered []*cluster.QueueConsumerInfo
registeredMu sync.Mutex
}
type routedMessage struct {
@@ -781,6 +783,15 @@ func (c *mockCluster) SetQueueConsumers(consumers []*cluster.QueueConsumerInfo)
c.queueConsumers = consumers
}
func (c *mockCluster) GetRegisteredQueueConsumers() []*cluster.QueueConsumerInfo {
c.registeredMu.Lock()
defer c.registeredMu.Unlock()
out := make([]*cluster.QueueConsumerInfo, len(c.registered))
copy(out, c.registered)
return out
}
func (c *mockCluster) Start() error { return nil }
func (c *mockCluster) Stop() error { return nil }
func (c *mockCluster) IsLeader() bool { return true }
@@ -824,6 +835,9 @@ func (c *mockCluster) TakeoverSession(ctx context.Context, clientID, fromNode, t
}
func (c *mockCluster) RegisterQueueConsumer(ctx context.Context, info *cluster.QueueConsumerInfo) error {
c.registeredMu.Lock()
defer c.registeredMu.Unlock()
c.registered = append(c.registered, info)
return nil
}
@@ -969,6 +983,64 @@ func TestCrossNodeMessageRouting(t *testing.T) {
}
}
func TestSubscribeDefaultsProxyNodeIDFromCluster(t *testing.T) {
logStore := memlog.New()
groupStore := newMockGroupStore()
mockCl := newMockCluster("node-1")
manager := NewManager(
logStore,
groupStore,
func(ctx context.Context, clientID string, msg any) error { return nil },
DefaultConfig(),
slog.New(slog.NewTextHandler(io.Discard, nil)),
mockCl,
)
if err := manager.Subscribe(context.Background(), "demo-orders", "#", "amqp091-conn-1", "demo-workers", ""); err != nil {
t.Fatalf("Subscribe failed: %v", err)
}
registered := mockCl.GetRegisteredQueueConsumers()
if len(registered) != 1 {
t.Fatalf("expected 1 registered consumer, got %d", len(registered))
}
if registered[0].ProxyNodeID != "node-1" {
t.Fatalf("expected proxy node id node-1, got %q", registered[0].ProxyNodeID)
}
}
func TestSubscribeWithCursorDefaultsProxyNodeIDFromCluster(t *testing.T) {
logStore := memlog.New()
groupStore := newMockGroupStore()
mockCl := newMockCluster("node-1")
manager := NewManager(
logStore,
groupStore,
func(ctx context.Context, clientID string, msg any) error { return nil },
DefaultConfig(),
slog.New(slog.NewTextHandler(io.Discard, nil)),
mockCl,
)
cursor := &types.CursorOption{
Position: types.CursorEarliest,
Mode: types.GroupModeStream,
}
if err := manager.SubscribeWithCursor(context.Background(), "demo-events", "#", "amqp091-conn-1", "demo-readers", "", cursor); err != nil {
t.Fatalf("SubscribeWithCursor failed: %v", err)
}
registered := mockCl.GetRegisteredQueueConsumers()
if len(registered) != 1 {
t.Fatalf("expected 1 registered consumer, got %d", len(registered))
}
if registered[0].ProxyNodeID != "node-1" {
t.Fatalf("expected proxy node id node-1, got %q", registered[0].ProxyNodeID)
}
}
func TestPublishForwardPolicySkipsRemoteForwarding(t *testing.T) {
logStore := memlog.New()
groupStore := newMockGroupStore()
@@ -1149,6 +1221,89 @@ func TestGetOrCreateQueue_CreatesEphemeral(t *testing.T) {
}
}
func TestAutoQueueFromTopic(t *testing.T) {
tests := []struct {
name string
topic string
queueName string
pattern string
}{
{
name: "queue root topic",
topic: "$queue/demo-events",
queueName: "demo-events",
pattern: "$queue/demo-events/#",
},
{
name: "queue nested topic",
topic: "$queue/demo-events/eu/images",
queueName: "demo-events",
pattern: "$queue/demo-events/#",
},
{
name: "regular topic",
topic: "sensors/temp",
queueName: "sensors/temp",
pattern: "sensors/temp",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
gotQueue, gotPattern := autoQueueFromTopic(tc.topic)
if gotQueue != tc.queueName {
t.Fatalf("expected queue name %q, got %q", tc.queueName, gotQueue)
}
if gotPattern != tc.pattern {
t.Fatalf("expected pattern %q, got %q", tc.pattern, gotPattern)
}
})
}
}
func TestPublishAutoCreateQueueFromQueueTopic(t *testing.T) {
logStore := memlog.New()
groupStore := newMockGroupStore()
manager := NewManager(
logStore,
groupStore,
func(ctx context.Context, clientID string, msg any) error { return nil },
DefaultConfig(),
slog.New(slog.NewTextHandler(io.Discard, nil)),
nil,
)
ctx := context.Background()
topic := "$queue/demo-events"
if err := manager.Publish(ctx, types.PublishRequest{
Topic: topic,
Payload: []byte("hello"),
}); err != nil {
t.Fatalf("Publish failed: %v", err)
}
if _, err := logStore.GetQueue(ctx, "demo-events"); err != nil {
t.Fatalf("expected queue demo-events to exist: %v", err)
}
if _, err := logStore.GetQueue(ctx, topic); err != storage.ErrQueueNotFound {
t.Fatalf("expected queue %q to not exist, got err=%v", topic, err)
}
msg, err := logStore.Read(ctx, "demo-events", 0)
if err != nil {
t.Fatalf("failed to read message from auto-created queue: %v", err)
}
if msg.Topic != topic {
t.Fatalf("expected stored topic %q, got %q", topic, msg.Topic)
}
if string(msg.GetPayload()) != "hello" {
t.Fatalf("expected payload hello, got %q", string(msg.GetPayload()))
}
}
func TestEphemeralQueue_DisconnectAndCleanup(t *testing.T) {
logStore := memlog.New()
groupStore := newMockGroupStore()