Use message DTO instead of parameters

Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
dusan
2026-02-08 20:23:27 +01:00
parent 245fe8260c
commit c03e3fbb31
20 changed files with 386 additions and 162 deletions
+2 -1
View File
@@ -14,6 +14,7 @@ import (
"github.com/absmach/fluxmq/amqp/codec"
"github.com/absmach/fluxmq/cluster"
qtypes "github.com/absmach/fluxmq/queue/types"
)
func readBrokerFramesFrom(t *testing.T, buf *bytes.Buffer, start int) []*codec.Frame {
@@ -61,7 +62,7 @@ func TestDeliverToClusterMessage(t *testing.T) {
msg := &cluster.Message{
Topic: "telemetry/room1",
Payload: []byte("hello"),
Properties: map[string]string{"message-id": "m1"},
Properties: map[string]string{qtypes.PropMessageID: "m1"},
}
if err := b.DeliverToClusterMessage(context.Background(), PrefixedClientID(c.connID), msg); err != nil {
t.Fatalf("DeliverToClusterMessage failed: %v", err)
+11 -11
View File
@@ -316,7 +316,7 @@ func (ch *Channel) completePublish() {
props["reply-to"] = header.Properties.ReplyTo
}
if header.Properties.MessageID != "" {
props["message-id"] = header.Properties.MessageID
props[qtypes.PropMessageID] = header.Properties.MessageID
}
if header.Properties.Type != "" {
props["type"] = header.Properties.Type
@@ -342,11 +342,11 @@ func (ch *Channel) completePublish() {
ch.conn.logger.Warn("queue commit missing queue name", "routing_key", routingKey)
} else {
headers := header.Properties.Headers
groupID, ok := parseStringArg(headers["x-group-id"])
groupID, ok := parseStringArg(headers[qtypes.PropCommitGroupID])
if !ok || groupID == "" {
ch.conn.logger.Warn("queue commit missing group id", "queue", queueName)
} else {
offsetVal, ok := headers["x-offset"]
offsetVal, ok := headers[qtypes.PropCommitOffset]
if !ok {
ch.conn.logger.Warn("queue commit missing offset", "queue", queueName, "group", groupID)
} else if n, ok := parseInt64Arg(offsetVal); !ok || n < 0 {
@@ -593,8 +593,8 @@ func (ch *Channel) sendDelivery(cons *consumer, topic string, payload []byte, pr
deliveryTag: deliveryTag,
routingKey: topic,
queueName: cons.queueName,
messageID: props["message-id"],
groupID: props["group-id"],
messageID: props[qtypes.PropMessageID],
groupID: props[qtypes.PropGroupID],
}
ch.unackedMu.Unlock()
}
@@ -623,21 +623,21 @@ func (ch *Channel) sendDelivery(cons *consumer, topic string, payload []byte, pr
headers := make(map[string]interface{})
for k, v := range props {
switch k {
case "content-type", "content-encoding", "correlation-id", "reply-to", "message-id", "type":
case "content-type", "content-encoding", "correlation-id", "reply-to", qtypes.PropMessageID, "type":
continue
case "x-stream-offset", "x-work-committed-offset":
case qtypes.PropStreamOffset, qtypes.PropWorkCommittedOffset:
if n, err := strconv.ParseUint(v, 10, 64); err == nil {
headers[k] = int64(n) // AMQP uses signed integers
} else {
headers[k] = v
}
case "x-stream-timestamp":
case qtypes.PropStreamTimestamp:
if n, err := strconv.ParseInt(v, 10, 64); err == nil {
headers[k] = n
} else {
headers[k] = v
}
case "x-work-acked":
case qtypes.PropWorkAcked:
headers[k] = v == "true"
default:
headers[k] = v
@@ -651,7 +651,7 @@ func (ch *Channel) sendDelivery(cons *consumer, topic string, payload []byte, pr
ContentType: props["content-type"],
CorrelationID: props["correlation-id"],
ReplyTo: props["reply-to"],
MessageID: props["message-id"],
MessageID: props[qtypes.PropMessageID],
Type: props["type"],
Headers: headers,
}
@@ -1433,7 +1433,7 @@ func extractStreamOffset(args map[string]interface{}) (*qtypes.CursorOption, boo
if len(args) == 0 {
return nil, false
}
val, ok := args["x-stream-offset"]
val, ok := args[qtypes.PropStreamOffset]
if !ok {
return nil, false
}
+1 -1
View File
@@ -184,7 +184,7 @@ func TestPrefetchBuffering(t *testing.T) {
noAck: false,
}
props := map[string]string{"message-id": "m1"}
props := map[string]string{qtypes.PropMessageID: "m1"}
ch.deliverMessage("q", []byte("one"), props)
ch.deliverMessage("q", []byte("two"), props)
+2 -1
View File
@@ -16,6 +16,7 @@ import (
"github.com/absmach/fluxmq/broker"
"github.com/absmach/fluxmq/broker/router"
"github.com/absmach/fluxmq/cluster"
qtypes "github.com/absmach/fluxmq/queue/types"
"github.com/absmach/fluxmq/storage"
)
@@ -146,7 +147,7 @@ func (b *Broker) DeliverToClient(ctx context.Context, clientID string, msg any)
amqpMsg.ApplicationProperties[k] = v
}
}
if msgID, ok := props["message-id"]; ok {
if msgID, ok := props[qtypes.PropMessageID]; ok {
amqpMsg.Properties.MessageID = msgID
}
+6 -7
View File
@@ -382,10 +382,10 @@ func (l *Link) sendMessage(topic string, payload []byte, props map[string]string
if !settled {
l.pendingMu.Lock()
pd := &pendingDelivery{deliveryID: deliveryID}
if msgID, ok := props["message-id"]; ok {
if msgID, ok := props[qtypes.PropMessageID]; ok {
pd.messageID = msgID
pd.queueName, _ = props["queue"]
pd.groupID, _ = props["group-id"]
pd.queueName, _ = props[qtypes.PropQueueName]
pd.groupID, _ = props[qtypes.PropGroupID]
}
l.pending[deliveryID] = pd
l.pendingMu.Unlock()
@@ -442,13 +442,13 @@ func (l *Link) sendAMQPMessage(msg interface{}, qos byte) {
if !settled && amqpMsg.ApplicationProperties != nil {
l.pendingMu.Lock()
pd := &pendingDelivery{deliveryID: deliveryID}
if msgID, ok := amqpMsg.ApplicationProperties["message-id"]; ok {
if msgID, ok := amqpMsg.ApplicationProperties[qtypes.PropMessageID]; ok {
pd.messageID, _ = msgID.(string)
}
if qn, ok := amqpMsg.ApplicationProperties["queue"]; ok {
if qn, ok := amqpMsg.ApplicationProperties[qtypes.PropQueueName]; ok {
pd.queueName, _ = qn.(string)
}
if gid, ok := amqpMsg.ApplicationProperties["group-id"]; ok {
if gid, ok := amqpMsg.ApplicationProperties[qtypes.PropGroupID]; ok {
pd.groupID, _ = gid.(string)
}
l.pending[deliveryID] = pd
@@ -529,7 +529,6 @@ func (l *Link) handleManagementTransfer(transfer *performatives.Transfer, msg *m
replyLink.sendAMQPMessage(resp, 0)
}
func uint32ToBytes(v uint32) []byte {
return []byte{byte(v >> 24), byte(v >> 16), byte(v >> 8), byte(v)}
}
+1 -1
View File
@@ -154,7 +154,7 @@ type Cluster interface {
// RouteQueueMessage sends a queue message to a remote consumer.
// This is called in proxy mode when the worker needs to deliver a message
// to a consumer connected to a different node.
RouteQueueMessage(ctx context.Context, nodeID, clientID, queueName, messageID string, payload []byte, properties map[string]string, sequence int64) error
RouteQueueMessage(ctx context.Context, nodeID, clientID, queueName string, msg *QueueMessage) error
}
// MessageHandler handles message delivery and session management for the cluster.
+2 -2
View File
@@ -203,10 +203,10 @@ func TestCrossNode_StreamReplayFromFirstOffsetAfterLateConsumer(t *testing.T) {
if d.ClientID != consumer.ClientID || d.Message == nil {
continue
}
if d.Message.Properties["queue"] != queueName {
if d.Message.Properties[qtypes.PropQueueName] != queueName {
continue
}
offset := d.Message.Properties["x-stream-offset"]
offset := d.Message.Properties[qtypes.PropStreamOffset]
if offset != "" {
offsets = append(offsets, offset)
}
+2 -2
View File
@@ -1122,11 +1122,11 @@ func (c *EtcdCluster) EnqueueRemote(ctx context.Context, nodeID, queueName strin
}
// RouteQueueMessage sends a queue message to a remote consumer.
func (c *EtcdCluster) RouteQueueMessage(ctx context.Context, nodeID, clientID, queueName, messageID string, payload []byte, properties map[string]string, sequence int64) error {
func (c *EtcdCluster) RouteQueueMessage(ctx context.Context, nodeID, clientID, queueName string, msg *QueueMessage) error {
if c.transport == nil {
return ErrTransportNotConfigured
}
return c.transport.SendRouteQueueMessage(ctx, nodeID, clientID, queueName, messageID, payload, properties, sequence)
return c.transport.SendRouteQueueMessage(ctx, nodeID, clientID, queueName, msg)
}
// SetQueueHandler sets the queue handler for queue distribution operations.
+20
View File
@@ -13,3 +13,23 @@ type Message struct {
Dup bool
Properties map[string]string
}
// QueueMessage is a typed envelope for cross-node queue delivery.
// It separates queue metadata from user-defined message properties.
type QueueMessage struct {
MessageID string
QueueName string
GroupID string
Payload []byte
Sequence int64
UserProperties map[string]string
Stream bool
StreamOffset int64
StreamTimestamp int64 // Unix milliseconds
HasWorkCommitted bool
WorkCommittedOffset int64
WorkAcked bool
WorkGroup string
}
+1 -1
View File
@@ -142,7 +142,7 @@ func (n *NoopCluster) EnqueueRemote(ctx context.Context, nodeID, queueName strin
return "", ErrClusterNotEnabled
}
func (n *NoopCluster) RouteQueueMessage(ctx context.Context, nodeID, clientID, queueName, messageID string, payload []byte, properties map[string]string, sequence int64) error {
func (n *NoopCluster) RouteQueueMessage(ctx context.Context, nodeID, clientID, queueName string, msg *QueueMessage) error {
// Single-node: no remote nodes to route to
return ErrClusterNotEnabled
}
+115 -11
View File
@@ -12,6 +12,7 @@ import (
"net"
"net/http"
"os"
"strconv"
"sync"
"time"
@@ -29,7 +30,7 @@ type QueueHandler interface {
EnqueueLocal(ctx context.Context, queueName string, payload []byte, properties map[string]string) (string, error)
// DeliverQueueMessage delivers a queue message to a local consumer.
DeliverQueueMessage(ctx context.Context, clientID string, msg any) error
DeliverQueueMessage(ctx context.Context, clientID string, msg *QueueMessage) error
// HandleQueuePublish handles a publish with the given mode.
HandleQueuePublish(ctx context.Context, publish queueTypes.PublishRequest, mode queueTypes.PublishMode) error
@@ -444,12 +445,59 @@ func (t *Transport) RouteQueueMessage(ctx context.Context, req *RouteQueueMessag
}), nil
}
msg := map[string]interface{}{
"id": req.Msg.MessageId,
"queueName": req.Msg.QueueName,
"payload": req.Msg.Payload,
"properties": req.Msg.Properties,
"sequence": req.Msg.Sequence,
rawProps := make(map[string]string, len(req.Msg.Properties))
for k, v := range req.Msg.Properties {
rawProps[k] = v
}
msg := &QueueMessage{
MessageID: req.Msg.MessageId,
QueueName: req.Msg.QueueName,
Payload: req.Msg.Payload,
Sequence: req.Msg.Sequence,
UserProperties: make(map[string]string, len(rawProps)),
}
if msg.MessageID == "" {
msg.MessageID = rawProps[queueTypes.PropMessageID]
}
if groupID := rawProps[queueTypes.PropGroupID]; groupID != "" {
msg.GroupID = groupID
}
if msg.QueueName == "" {
if queueName := rawProps[queueTypes.PropQueueName]; queueName != "" {
msg.QueueName = queueName
}
}
if offset, ok := parseInt64Property(rawProps, queueTypes.PropOffset); ok {
msg.Sequence = offset
}
if streamOffset, ok := parseInt64Property(rawProps, queueTypes.PropStreamOffset); ok {
msg.Stream = true
msg.StreamOffset = streamOffset
}
if streamTs, ok := parseInt64Property(rawProps, queueTypes.PropStreamTimestamp); ok {
msg.Stream = true
msg.StreamTimestamp = streamTs
}
if committed, ok := parseInt64Property(rawProps, queueTypes.PropWorkCommittedOffset); ok {
msg.HasWorkCommitted = true
msg.WorkCommittedOffset = committed
}
if workAcked, ok := parseBoolProperty(rawProps, queueTypes.PropWorkAcked); ok {
msg.HasWorkCommitted = true
msg.WorkAcked = workAcked
}
if workGroup := rawProps[queueTypes.PropWorkGroup]; workGroup != "" {
msg.HasWorkCommitted = true
msg.WorkGroup = workGroup
}
for k, v := range rawProps {
if queueTypes.IsReservedQueueDeliveryProperty(k) {
continue
}
msg.UserProperties[k] = v
}
err := handler.DeliverQueueMessage(ctx, req.Msg.ClientId, msg)
@@ -658,20 +706,52 @@ func (t *Transport) SendEnqueueRemote(ctx context.Context, nodeID, queueName str
}
// SendRouteQueueMessage sends a queue message delivery request to a peer node with retry and circuit breaker.
func (t *Transport) SendRouteQueueMessage(ctx context.Context, nodeID, clientID, queueName, messageID string, payload []byte, properties map[string]string, sequence int64) error {
func (t *Transport) SendRouteQueueMessage(ctx context.Context, nodeID, clientID, queueName string, msg *QueueMessage) error {
return retryWithBreaker(ctx, t.breakers, nodeID, func() error {
client, err := t.GetPeerClient(nodeID)
if err != nil {
return err
}
if msg == nil {
return fmt.Errorf("queue message is nil")
}
properties := make(map[string]string, len(msg.UserProperties)+8)
for k, v := range msg.UserProperties {
properties[k] = v
}
if msg.MessageID != "" {
properties[queueTypes.PropMessageID] = msg.MessageID
}
if msg.GroupID != "" {
properties[queueTypes.PropGroupID] = msg.GroupID
}
if queueName != "" {
properties[queueTypes.PropQueueName] = queueName
}
properties[queueTypes.PropOffset] = fmt.Sprintf("%d", msg.Sequence)
if msg.Stream {
properties[queueTypes.PropStreamOffset] = fmt.Sprintf("%d", msg.StreamOffset)
if msg.StreamTimestamp != 0 {
properties[queueTypes.PropStreamTimestamp] = fmt.Sprintf("%d", msg.StreamTimestamp)
}
}
if msg.HasWorkCommitted {
properties[queueTypes.PropWorkCommittedOffset] = fmt.Sprintf("%d", msg.WorkCommittedOffset)
properties[queueTypes.PropWorkAcked] = strconv.FormatBool(msg.WorkAcked)
if msg.WorkGroup != "" {
properties[queueTypes.PropWorkGroup] = msg.WorkGroup
}
}
req := connect.NewRequest(&clusterv1.RouteQueueMessageRequest{
ClientId: clientID,
QueueName: queueName,
MessageId: messageID,
Payload: payload,
MessageId: msg.MessageID,
Payload: msg.Payload,
Properties: properties,
Sequence: sequence,
Sequence: msg.Sequence,
})
resp, err := client.RouteQueueMessage(ctx, req)
@@ -686,3 +766,27 @@ func (t *Transport) SendRouteQueueMessage(ctx context.Context, nodeID, clientID,
return nil
})
}
func parseInt64Property(props map[string]string, key string) (int64, bool) {
raw, ok := props[key]
if !ok || raw == "" {
return 0, false
}
val, err := strconv.ParseInt(raw, 10, 64)
if err != nil {
return 0, false
}
return val, true
}
func parseBoolProperty(props map[string]string, key string) (bool, bool) {
raw, ok := props[key]
if !ok || raw == "" {
return false, false
}
val, err := strconv.ParseBool(raw)
if err != nil {
return false, false
}
return val, true
}
+4 -4
View File
@@ -293,8 +293,8 @@ func (b *Broker) handleQueueAck(msg *storage.Message) error {
// Extract message ID and group ID from properties
if msg.Properties != nil {
messageID = msg.Properties["message-id"]
groupID = msg.Properties["group-id"]
messageID = msg.Properties[types.PropMessageID]
groupID = msg.Properties[types.PropGroupID]
}
if messageID == "" {
@@ -318,8 +318,8 @@ func (b *Broker) handleQueueAck(msg *storage.Message) error {
return b.queueManager.Nack(ctx, queueName, messageID, groupID)
} else if strings.HasSuffix(msg.Topic, "/$reject") {
reason := "rejected by consumer"
if msg.Properties != nil && msg.Properties["reason"] != "" {
reason = msg.Properties["reason"]
if msg.Properties != nil && msg.Properties[types.PropRejectReason] != "" {
reason = msg.Properties[types.PropRejectReason]
}
b.logOp("queue_reject", slog.String("queue", queueName), slog.String("message_id", messageID), slog.String("group_id", groupID), slog.String("reason", reason))
return b.queueManager.Reject(ctx, queueName, messageID, groupID, reason)
+6 -6
View File
@@ -78,8 +78,8 @@ func TestHandleQueueAck_UsesParsedQueueName(t *testing.T) {
msg := &storage.Message{
Topic: "$queue/orders/$ack",
Properties: map[string]string{
"message-id": "orders:42",
"group-id": "workers",
qtypes.PropMessageID: "orders:42",
qtypes.PropGroupID: "workers",
},
}
@@ -100,8 +100,8 @@ func TestHandleQueueAck_IgnoresRoutingKeyInAckTopic(t *testing.T) {
msg := &storage.Message{
Topic: "$queue/orders/images/$nack",
Properties: map[string]string{
"message-id": "orders:1",
"group-id": "workers@images/#",
qtypes.PropMessageID: "orders:1",
qtypes.PropGroupID: "workers@images/#",
},
}
@@ -120,8 +120,8 @@ func TestHandleQueueAck_InvalidQueueTopic(t *testing.T) {
msg := &storage.Message{
Topic: "$queue/$ack",
Properties: map[string]string{
"message-id": "orders:1",
"group-id": "workers",
qtypes.PropMessageID: "orders:1",
qtypes.PropGroupID: "workers",
},
}
+52 -49
View File
@@ -7,6 +7,7 @@ import (
"context"
"fmt"
"log/slog"
"strconv"
"strings"
"sync"
"time"
@@ -1198,21 +1199,22 @@ func (m *Manager) deliverToRemoteConsumers(ctx context.Context, config *types.Qu
// Route each message to the remote node
for _, msg := range msgs {
payload := msg.GetPayload()
properties := m.createRouteProperties(msg, groupID, config.Name)
if group.Mode == types.GroupModeStream {
m.decorateStreamProperties(properties, msg, workCommitted, hasWorkCommitted, config.PrimaryGroup)
}
routeMsg := m.createRoutedQueueMessage(
msg,
groupID,
config.Name,
group.Mode == types.GroupModeStream,
workCommitted,
hasWorkCommitted,
config.PrimaryGroup,
)
err := m.cluster.RouteQueueMessage(
ctx,
consumerInfo.ProxyNodeID,
consumerInfo.ClientID,
config.Name,
properties["message-id"],
payload,
properties,
int64(msg.Sequence),
routeMsg,
)
if err != nil {
m.logger.Warn("remote queue message delivery failed",
@@ -1293,19 +1295,21 @@ func (m *Manager) deliverToGroup(ctx context.Context, config *types.QueueConfig,
// Check if consumer is on a remote node
if m.cluster != nil && consumerInfo.ProxyNodeID != "" && consumerInfo.ProxyNodeID != m.localNodeID {
// Route to remote node
properties := m.createRouteProperties(msg, group.ID, config.Name)
if group.Mode == types.GroupModeStream {
m.decorateStreamProperties(properties, msg, workCommitted, hasWorkCommitted, config.PrimaryGroup)
}
routeMsg := m.createRoutedQueueMessage(
msg,
group.ID,
config.Name,
group.Mode == types.GroupModeStream,
workCommitted,
hasWorkCommitted,
config.PrimaryGroup,
)
err := m.cluster.RouteQueueMessage(
ctx,
consumerInfo.ProxyNodeID,
consumerInfo.ClientID,
config.Name,
properties["message-id"],
msg.GetPayload(),
properties,
int64(msg.Sequence),
routeMsg,
)
if err != nil {
m.logger.Warn("queue message remote routing failed",
@@ -1585,54 +1589,53 @@ func (m *Manager) EnqueueLocal(ctx context.Context, topic string, payload []byte
return "", err
}
if properties != nil && properties["message-id"] != "" {
return properties["message-id"], nil
if properties != nil && properties[types.PropMessageID] != "" {
return properties[types.PropMessageID], nil
}
return generateMessageID(), nil
}
// DeliverQueueMessage implements cluster.QueueHandler.DeliverQueueMessage.
func (m *Manager) DeliverQueueMessage(ctx context.Context, clientID string, msg any) error {
func (m *Manager) DeliverQueueMessage(ctx context.Context, clientID string, msg *cluster.QueueMessage) error {
if m.deliverFn == nil {
return fmt.Errorf("no delivery function configured")
}
msgMap, ok := msg.(map[string]any)
if !ok {
return fmt.Errorf("invalid message type: expected map[string]any")
if msg == nil {
return fmt.Errorf("queue message is nil")
}
queueName, _ := msgMap["queueName"].(string)
payload, _ := msgMap["payload"].([]byte)
properties, _ := msgMap["properties"].(map[string]string)
messageID, _ := msgMap["id"].(string)
var sequence int64
switch v := msgMap["sequence"].(type) {
case int64:
sequence = v
case uint64:
sequence = int64(v)
case int:
sequence = int64(v)
case float64:
sequence = int64(v)
}
props := make(map[string]string, len(properties)+4)
for k, v := range properties {
queueName := msg.QueueName
props := make(map[string]string, len(msg.UserProperties)+8)
for k, v := range msg.UserProperties {
props[k] = v
}
messageID := msg.MessageID
if messageID == "" {
if props["message-id"] != "" {
messageID = props["message-id"]
} else {
messageID = fmt.Sprintf("%s:%d", queueName, sequence)
messageID = fmt.Sprintf("%s:%d", queueName, msg.Sequence)
}
props[types.PropMessageID] = messageID
props[types.PropGroupID] = msg.GroupID
props[types.PropQueueName] = queueName
props[types.PropOffset] = fmt.Sprintf("%d", msg.Sequence)
if msg.Stream {
props[types.PropStreamOffset] = fmt.Sprintf("%d", msg.StreamOffset)
if msg.StreamTimestamp != 0 {
props[types.PropStreamTimestamp] = fmt.Sprintf("%d", msg.StreamTimestamp)
}
}
if msg.HasWorkCommitted {
props[types.PropWorkCommittedOffset] = fmt.Sprintf("%d", msg.WorkCommittedOffset)
props[types.PropWorkAcked] = strconv.FormatBool(msg.WorkAcked)
if msg.WorkGroup != "" {
props[types.PropWorkGroup] = msg.WorkGroup
}
}
props["message-id"] = messageID
props["queue"] = queueName
props["offset"] = fmt.Sprintf("%d", sequence)
topic := queueName
if topic != "" && !strings.HasPrefix(topic, "$queue/") {
@@ -1644,7 +1647,7 @@ func (m *Manager) DeliverQueueMessage(ctx context.Context, clientID string, msg
QoS: 1,
Properties: props,
}
deliveryMsg.SetPayloadFromBytes(payload)
deliveryMsg.SetPayloadFromBytes(msg.Payload)
return m.deliverFn(ctx, clientID, deliveryMsg)
}
+2 -2
View File
@@ -31,8 +31,8 @@ func benchmarkQueueDeliveryPath(b *testing.B, queueCount int, fullSweep bool) {
return nil
}
if deliveryMsg.Properties != nil {
lastMessageID = deliveryMsg.Properties["message-id"]
lastGroupID = deliveryMsg.Properties["group-id"]
lastMessageID = deliveryMsg.Properties[types.PropMessageID]
lastGroupID = deliveryMsg.Properties[types.PropGroupID]
}
return nil
}
+68 -52
View File
@@ -217,7 +217,7 @@ func (s *mockGroupStore) RegisterConsumer(ctx context.Context, queueName, groupI
}
group := s.groups[queueName][groupID]
group.Consumers[consumer.ID] = consumer
group.SetConsumer(consumer.ID, consumer)
return nil
}
@@ -230,8 +230,8 @@ func (s *mockGroupStore) UnregisterConsumer(ctx context.Context, queueName, grou
}
group := s.groups[queueName][groupID]
delete(group.Consumers, consumerID)
delete(group.PEL, consumerID)
group.DeleteConsumer(consumerID)
group.DeleteConsumerPEL(consumerID)
return nil
}
@@ -245,9 +245,10 @@ func (s *mockGroupStore) ListConsumers(ctx context.Context, queueName, groupID s
group := s.groups[queueName][groupID]
var result []*types.ConsumerInfo
for _, c := range group.Consumers {
group.ForEachConsumer(func(_ string, c *types.ConsumerInfo) bool {
result = append(result, c)
}
return true
})
return result, nil
}
@@ -298,10 +299,11 @@ func TestWildcardQueueSubscription(t *testing.T) {
groups, _ := groupStore.ListConsumerGroups(ctx, queueName)
t.Logf("Groups after subscribe: %v", len(groups))
for _, g := range groups {
t.Logf(" Group: %s (pattern=%s, consumers=%d)", g.ID, g.Pattern, len(g.Consumers))
for cid, ci := range g.Consumers {
t.Logf(" Group: %s (pattern=%s, consumers=%d)", g.ID, g.Pattern, g.ConsumerCount())
g.ForEachConsumer(func(cid string, ci *types.ConsumerInfo) bool {
t.Logf(" Consumer: %s (clientID=%s)", cid, ci.ClientID)
}
return true
})
}
publishTopic := "$queue/topic/test"
@@ -374,7 +376,7 @@ func TestStreamGroupDeliversWithoutPEL(t *testing.T) {
select {
case msg := <-delivered:
if got := msg.Properties["x-stream-offset"]; got != "0" {
if got := msg.Properties[types.PropStreamOffset]; got != "0" {
t.Fatalf("expected stream offset 0, got %q", got)
}
case <-time.After(2 * time.Second):
@@ -724,13 +726,10 @@ type mockCluster struct {
}
type routedMessage struct {
nodeID string
clientID string
queueName string
messageID string
payload []byte
properties map[string]string
sequence int64
nodeID string
clientID string
queueName string
message *cluster.QueueMessage
}
type forwardPublishCall struct {
@@ -751,21 +750,27 @@ func newMockCluster(nodeID string) *mockCluster {
func (c *mockCluster) NodeID() string { return c.nodeID }
func (c *mockCluster) RouteQueueMessage(ctx context.Context, nodeID, clientID, queueName, messageID string, payload []byte, properties map[string]string, sequence int64) error {
propsCopy := make(map[string]string, len(properties))
for k, v := range properties {
propsCopy[k] = v
func (c *mockCluster) RouteQueueMessage(ctx context.Context, nodeID, clientID, queueName string, msg *cluster.QueueMessage) error {
var msgCopy *cluster.QueueMessage
if msg != nil {
userProps := make(map[string]string, len(msg.UserProperties))
for k, v := range msg.UserProperties {
userProps[k] = v
}
payloadCopy := make([]byte, len(msg.Payload))
copy(payloadCopy, msg.Payload)
copied := *msg
copied.UserProperties = userProps
copied.Payload = payloadCopy
msgCopy = &copied
}
c.routedMessagesMu.Lock()
defer c.routedMessagesMu.Unlock()
c.routedMessages = append(c.routedMessages, routedMessage{
nodeID: nodeID,
clientID: clientID,
queueName: queueName,
messageID: messageID,
payload: payload,
properties: propsCopy,
sequence: sequence,
nodeID: nodeID,
clientID: clientID,
queueName: queueName,
message: msgCopy,
})
return nil
}
@@ -1005,11 +1010,15 @@ func TestCrossNodeMessageRouting(t *testing.T) {
if rm.clientID != remoteClientID {
t.Errorf("Expected routed message to client %s, got %s", remoteClientID, rm.clientID)
}
if rm.messageID == "" {
if rm.message == nil {
t.Error("Expected routed message payload to be set")
continue
}
if rm.message.MessageID == "" {
t.Error("Expected routed message-id to be set")
}
if rm.properties["group-id"] == "" {
t.Error("Expected routed message to include group-id property")
if rm.message.GroupID == "" {
t.Error("Expected routed message to include group-id")
}
}
}
@@ -1073,20 +1082,23 @@ func TestRemoteRoutingIncludesAckMetadata(t *testing.T) {
}
msg := routed[0]
if msg.messageID != "tasks:0" {
t.Fatalf("expected message-id tasks:0, got %q", msg.messageID)
if msg.message == nil {
t.Fatal("expected routed queue message payload")
}
if got := msg.properties["message-id"]; got != "tasks:0" {
t.Fatalf("expected properties message-id tasks:0, got %q", got)
if msg.message.MessageID != "tasks:0" {
t.Fatalf("expected message-id tasks:0, got %q", msg.message.MessageID)
}
if got := msg.properties["group-id"]; got != "workers" {
t.Fatalf("expected properties group-id workers, got %q", got)
if got := msg.message.GroupID; got != "workers" {
t.Fatalf("expected group-id workers, got %q", got)
}
if got := msg.properties["queue"]; got != "tasks" {
t.Fatalf("expected properties queue tasks, got %q", got)
if got := msg.message.QueueName; got != "tasks" {
t.Fatalf("expected queue tasks, got %q", got)
}
if got := msg.properties["offset"]; got != "0" {
t.Fatalf("expected properties offset 0, got %q", got)
if got := msg.message.Sequence; got != 0 {
t.Fatalf("expected sequence 0, got %d", got)
}
if got := msg.message.UserProperties["custom"]; got != "value" {
t.Fatalf("expected user property custom=value, got %q", got)
}
}
@@ -1145,8 +1157,11 @@ func TestRemoteStreamBacklogDeliveredByFallbackSweep(t *testing.T) {
for {
routed := mockCl.GetRoutedMessages()
if len(routed) > 0 {
if got := routed[0].properties["x-stream-offset"]; got != "0" {
t.Fatalf("expected x-stream-offset=0, got %q", got)
if routed[0].message == nil {
t.Fatal("expected routed stream message payload")
}
if got := routed[0].message.StreamOffset; got != 0 {
t.Fatalf("expected stream offset=0, got %d", got)
}
return
}
@@ -1304,12 +1319,13 @@ func TestDeliverQueueMessage(t *testing.T) {
ctx := context.Background()
msg := map[string]interface{}{
"id": "msg-123",
"queueName": "test",
"payload": []byte("routed payload"),
"properties": map[string]string{"custom": "prop"},
"sequence": int64(42),
msg := &cluster.QueueMessage{
MessageID: "msg-123",
QueueName: "test",
GroupID: "workers",
Payload: []byte("routed payload"),
Sequence: 42,
UserProperties: map[string]string{"custom": "prop"},
}
err := manager.DeliverQueueMessage(ctx, "target-client", msg)
@@ -1336,11 +1352,11 @@ func TestDeliverQueueMessage(t *testing.T) {
t.Errorf("Expected payload 'routed payload', got '%s'", string(deliveredMsg.GetPayload()))
}
if deliveredMsg.Properties["message-id"] != "msg-123" {
t.Errorf("Expected message-id 'msg-123', got '%s'", deliveredMsg.Properties["message-id"])
if deliveredMsg.Properties[types.PropMessageID] != "msg-123" {
t.Errorf("Expected message-id 'msg-123', got '%s'", deliveredMsg.Properties[types.PropMessageID])
}
if deliveredMsg.Properties["queue"] != "test" {
t.Errorf("Expected queue 'test', got '%s'", deliveredMsg.Properties["queue"])
if deliveredMsg.Properties[types.PropQueueName] != "test" {
t.Errorf("Expected queue 'test', got '%s'", deliveredMsg.Properties[types.PropQueueName])
}
}
+42 -9
View File
@@ -8,6 +8,7 @@ import (
"strconv"
"time"
"github.com/absmach/fluxmq/cluster"
"github.com/absmach/fluxmq/queue/types"
brokerstorage "github.com/absmach/fluxmq/storage"
)
@@ -41,29 +42,61 @@ func (m *Manager) createRouteProperties(msg *types.Message, groupID, queueName s
for k, v := range msg.Properties {
props[k] = v
}
props["message-id"] = fmt.Sprintf("%s:%d", queueName, msg.Sequence)
props["group-id"] = groupID
props["queue"] = queueName
props["offset"] = fmt.Sprintf("%d", msg.Sequence)
props[types.PropMessageID] = fmt.Sprintf("%s:%d", queueName, msg.Sequence)
props[types.PropGroupID] = groupID
props[types.PropQueueName] = queueName
props[types.PropOffset] = fmt.Sprintf("%d", msg.Sequence)
return props
}
func (m *Manager) createRoutedQueueMessage(msg *types.Message, groupID, queueName string, stream bool, workCommitted uint64, hasWorkCommitted bool, primaryGroup string) *cluster.QueueMessage {
userProps := make(map[string]string, len(msg.Properties))
for k, v := range msg.Properties {
userProps[k] = v
}
routeMsg := &cluster.QueueMessage{
MessageID: fmt.Sprintf("%s:%d", queueName, msg.Sequence),
QueueName: queueName,
GroupID: groupID,
Payload: msg.GetPayload(),
Sequence: int64(msg.Sequence),
UserProperties: userProps,
Stream: stream,
}
if stream {
routeMsg.StreamOffset = int64(msg.Sequence)
if !msg.CreatedAt.IsZero() {
routeMsg.StreamTimestamp = msg.CreatedAt.UnixMilli()
}
if hasWorkCommitted {
routeMsg.HasWorkCommitted = true
routeMsg.WorkCommittedOffset = int64(workCommitted)
routeMsg.WorkAcked = msg.Sequence < workCommitted
routeMsg.WorkGroup = primaryGroup
}
}
return routeMsg
}
func (m *Manager) decorateStreamProperties(properties map[string]string, msg *types.Message, workCommitted uint64, hasWorkCommitted bool, primaryGroup string) {
if properties == nil || msg == nil {
return
}
properties["x-stream-offset"] = fmt.Sprintf("%d", msg.Sequence)
properties[types.PropStreamOffset] = fmt.Sprintf("%d", msg.Sequence)
if !msg.CreatedAt.IsZero() {
properties["x-stream-timestamp"] = fmt.Sprintf("%d", msg.CreatedAt.UnixMilli())
properties[types.PropStreamTimestamp] = fmt.Sprintf("%d", msg.CreatedAt.UnixMilli())
}
if hasWorkCommitted {
properties["x-work-committed-offset"] = fmt.Sprintf("%d", workCommitted)
properties["x-work-acked"] = strconv.FormatBool(msg.Sequence < workCommitted)
properties[types.PropWorkCommittedOffset] = fmt.Sprintf("%d", workCommitted)
properties[types.PropWorkAcked] = strconv.FormatBool(msg.Sequence < workCommitted)
if primaryGroup != "" {
properties["x-work-group"] = primaryGroup
properties[types.PropWorkGroup] = primaryGroup
}
}
}
+8 -1
View File
@@ -110,7 +110,9 @@ func (s *Store) GetQueue(ctx context.Context, queueName string) (*types.QueueCon
}
sl := val.(*log)
sl.mu.RLock()
configCopy := sl.config
sl.mu.RUnlock()
return &configCopy, nil
}
@@ -119,7 +121,10 @@ func (s *Store) DeleteQueue(ctx context.Context, queueName string) error {
val, exists := s.logs.Load(queueName)
if exists {
sl := val.(*log)
if sl.config.Reserved {
sl.mu.RLock()
isReserved := sl.config.Reserved
sl.mu.RUnlock()
if isReserved {
return storage.ErrQueueNotFound // Cannot delete reserved queue
}
}
@@ -137,7 +142,9 @@ func (s *Store) ListQueues(ctx context.Context) ([]types.QueueConfig, error) {
s.logs.Range(func(key, value interface{}) bool {
sl := value.(*log)
sl.mu.RLock()
configs = append(configs, sl.config)
sl.mu.RUnlock()
return true
})
+40
View File
@@ -0,0 +1,40 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package types
const (
// Queue delivery metadata properties.
PropMessageID = "message-id"
PropGroupID = "group-id"
PropQueueName = "queue"
PropOffset = "offset"
// Stream delivery metadata properties.
PropStreamOffset = "x-stream-offset"
PropStreamTimestamp = "x-stream-timestamp"
// Work stealing metadata properties.
PropWorkCommittedOffset = "x-work-committed-offset"
PropWorkAcked = "x-work-acked"
PropWorkGroup = "x-work-group"
// Queue commit headers/properties.
PropCommitGroupID = "x-group-id"
PropCommitOffset = "x-offset"
// Queue reject metadata.
PropRejectReason = "reason"
)
// IsReservedQueueDeliveryProperty returns true for keys managed by queue routing.
func IsReservedQueueDeliveryProperty(key string) bool {
switch key {
case PropMessageID, PropGroupID, PropQueueName, PropOffset,
PropStreamOffset, PropStreamTimestamp,
PropWorkCommittedOffset, PropWorkAcked, PropWorkGroup:
return true
default:
return false
}
}
+1 -1
View File
@@ -105,7 +105,7 @@ func (m *mockCluster) TakeoverSession(ctx context.Context, clientID, fromNode, t
return nil, nil
}
func (m *mockCluster) RouteQueueMessage(ctx context.Context, nodeID, clientID, queueName, messageID string, payload []byte, properties map[string]string, sequence int64) error {
func (m *mockCluster) RouteQueueMessage(ctx context.Context, nodeID, clientID, queueName string, msg *cluster.QueueMessage) error {
return nil
}