mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 06:40:19 +00:00
Use queue name for ack, not topic
Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
+13
-6
@@ -286,9 +286,16 @@ func (b *Broker) handleQueueAck(msg *storage.Message) error {
|
||||
|
||||
// Extract queue topic and message ID
|
||||
queueTopic := extractQueueTopicFromAck(msg.Topic)
|
||||
queueName, _ := parseQueueFilter(queueTopic)
|
||||
messageID := ""
|
||||
groupID := ""
|
||||
|
||||
if queueName == "" {
|
||||
b.logError("queue_ack_invalid_queue_topic", fmt.Errorf("invalid queue topic %q", queueTopic),
|
||||
slog.String("topic", msg.Topic))
|
||||
return fmt.Errorf("invalid queue topic: %s", queueTopic)
|
||||
}
|
||||
|
||||
// Extract message ID and group ID from properties
|
||||
if msg.Properties != nil {
|
||||
messageID = msg.Properties["message-id"]
|
||||
@@ -309,18 +316,18 @@ func (b *Broker) handleQueueAck(msg *storage.Message) error {
|
||||
|
||||
// Route to appropriate ack method
|
||||
if strings.HasSuffix(msg.Topic, "/$ack") {
|
||||
b.logOp("queue_ack", slog.String("queue", queueTopic), slog.String("message_id", messageID), slog.String("group_id", groupID))
|
||||
return b.queueManager.Ack(ctx, queueTopic, messageID, groupID)
|
||||
b.logOp("queue_ack", slog.String("queue", queueName), slog.String("message_id", messageID), slog.String("group_id", groupID))
|
||||
return b.queueManager.Ack(ctx, queueName, messageID, groupID)
|
||||
} else if strings.HasSuffix(msg.Topic, "/$nack") {
|
||||
b.logOp("queue_nack", slog.String("queue", queueTopic), slog.String("message_id", messageID), slog.String("group_id", groupID))
|
||||
return b.queueManager.Nack(ctx, queueTopic, messageID, groupID)
|
||||
b.logOp("queue_nack", slog.String("queue", queueName), slog.String("message_id", messageID), slog.String("group_id", groupID))
|
||||
return b.queueManager.Nack(ctx, queueName, messageID, groupID)
|
||||
} else if strings.HasSuffix(msg.Topic, "/$reject") {
|
||||
reason := "rejected by consumer"
|
||||
if msg.Properties != nil && msg.Properties["reason"] != "" {
|
||||
reason = msg.Properties["reason"]
|
||||
}
|
||||
b.logOp("queue_reject", slog.String("queue", queueTopic), slog.String("message_id", messageID), slog.String("group_id", groupID), slog.String("reason", reason))
|
||||
return b.queueManager.Reject(ctx, queueTopic, messageID, groupID, reason)
|
||||
b.logOp("queue_reject", slog.String("queue", queueName), slog.String("message_id", messageID), slog.String("group_id", groupID), slog.String("reason", reason))
|
||||
return b.queueManager.Reject(ctx, queueName, messageID, groupID, reason)
|
||||
}
|
||||
|
||||
return fmt.Errorf("invalid queue ack topic: %s", msg.Topic)
|
||||
|
||||
@@ -0,0 +1,129 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"log/slog"
|
||||
"testing"
|
||||
|
||||
qtypes "github.com/absmach/fluxmq/queue/types"
|
||||
"github.com/absmach/fluxmq/storage"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type mockQueueManager struct {
|
||||
ackCalls []ackCall
|
||||
nackCalls []ackCall
|
||||
rejectCalls []rejectCall
|
||||
}
|
||||
|
||||
type ackCall struct {
|
||||
queueName string
|
||||
messageID string
|
||||
groupID string
|
||||
}
|
||||
|
||||
type rejectCall struct {
|
||||
queueName string
|
||||
messageID string
|
||||
groupID string
|
||||
reason string
|
||||
}
|
||||
|
||||
func (m *mockQueueManager) Start(ctx context.Context) error { return nil }
|
||||
func (m *mockQueueManager) Stop() error { return nil }
|
||||
func (m *mockQueueManager) Publish(ctx context.Context, publish qtypes.PublishRequest) error {
|
||||
return nil
|
||||
}
|
||||
func (m *mockQueueManager) Subscribe(ctx context.Context, queueName, pattern, clientID, groupID, proxyNodeID string) error {
|
||||
return nil
|
||||
}
|
||||
func (m *mockQueueManager) SubscribeWithCursor(ctx context.Context, queueName, pattern, clientID, groupID, proxyNodeID string, cursor *qtypes.CursorOption) error {
|
||||
return nil
|
||||
}
|
||||
func (m *mockQueueManager) Unsubscribe(ctx context.Context, queueName, pattern, clientID, groupID string) error {
|
||||
return nil
|
||||
}
|
||||
func (m *mockQueueManager) Ack(ctx context.Context, queueName, messageID, groupID string) error {
|
||||
m.ackCalls = append(m.ackCalls, ackCall{queueName: queueName, messageID: messageID, groupID: groupID})
|
||||
return nil
|
||||
}
|
||||
func (m *mockQueueManager) Nack(ctx context.Context, queueName, messageID, groupID string) error {
|
||||
m.nackCalls = append(m.nackCalls, ackCall{queueName: queueName, messageID: messageID, groupID: groupID})
|
||||
return nil
|
||||
}
|
||||
func (m *mockQueueManager) Reject(ctx context.Context, queueName, messageID, groupID, reason string) error {
|
||||
m.rejectCalls = append(m.rejectCalls, rejectCall{queueName: queueName, messageID: messageID, groupID: groupID, reason: reason})
|
||||
return nil
|
||||
}
|
||||
func (m *mockQueueManager) UpdateHeartbeat(ctx context.Context, clientID string) error { return nil }
|
||||
func (m *mockQueueManager) CreateQueue(ctx context.Context, config qtypes.QueueConfig) error {
|
||||
return nil
|
||||
}
|
||||
func (m *mockQueueManager) DeleteQueue(ctx context.Context, queueName string) error { return nil }
|
||||
func (m *mockQueueManager) GetQueue(ctx context.Context, queueName string) (*qtypes.QueueConfig, error) {
|
||||
return nil, storage.ErrNotFound
|
||||
}
|
||||
func (m *mockQueueManager) ListQueues(ctx context.Context) ([]qtypes.QueueConfig, error) { return nil, nil }
|
||||
|
||||
func TestHandleQueueAck_UsesParsedQueueName(t *testing.T) {
|
||||
qm := &mockQueueManager{}
|
||||
b := &Broker{
|
||||
queueManager: qm,
|
||||
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
|
||||
}
|
||||
|
||||
msg := &storage.Message{
|
||||
Topic: "$queue/orders/$ack",
|
||||
Properties: map[string]string{
|
||||
"message-id": "orders:42",
|
||||
"group-id": "workers",
|
||||
},
|
||||
}
|
||||
|
||||
require.NoError(t, b.handleQueueAck(msg))
|
||||
require.Len(t, qm.ackCalls, 1)
|
||||
require.Equal(t, "orders", qm.ackCalls[0].queueName)
|
||||
require.Equal(t, "orders:42", qm.ackCalls[0].messageID)
|
||||
require.Equal(t, "workers", qm.ackCalls[0].groupID)
|
||||
}
|
||||
|
||||
func TestHandleQueueAck_IgnoresRoutingKeyInAckTopic(t *testing.T) {
|
||||
qm := &mockQueueManager{}
|
||||
b := &Broker{
|
||||
queueManager: qm,
|
||||
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
|
||||
}
|
||||
|
||||
msg := &storage.Message{
|
||||
Topic: "$queue/orders/images/$nack",
|
||||
Properties: map[string]string{
|
||||
"message-id": "orders:1",
|
||||
"group-id": "workers@images/#",
|
||||
},
|
||||
}
|
||||
|
||||
require.NoError(t, b.handleQueueAck(msg))
|
||||
require.Len(t, qm.nackCalls, 1)
|
||||
require.Equal(t, "orders", qm.nackCalls[0].queueName)
|
||||
}
|
||||
|
||||
func TestHandleQueueAck_InvalidQueueTopic(t *testing.T) {
|
||||
qm := &mockQueueManager{}
|
||||
b := &Broker{
|
||||
queueManager: qm,
|
||||
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
|
||||
}
|
||||
|
||||
msg := &storage.Message{
|
||||
Topic: "$queue/$ack",
|
||||
Properties: map[string]string{
|
||||
"message-id": "orders:1",
|
||||
"group-id": "workers",
|
||||
},
|
||||
}
|
||||
|
||||
require.Error(t, b.handleQueueAck(msg))
|
||||
require.Empty(t, qm.ackCalls)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user