Update MQTT with properties

Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
dusan
2026-02-05 18:27:31 +01:00
parent 8fb1ab8a76
commit 910e013a45
4 changed files with 140 additions and 42 deletions
+21 -21
View File
@@ -4,13 +4,13 @@ This directory contains example configurations and client applications for the M
## Configuration Examples
| File | Description |
|------|-------------|
| `config.yaml` | Full broker configuration with all options |
| `no-cluster.yaml` | Single-node mode (no clustering) |
| `node1.yaml`, `node2.yaml`, `node3.yaml` | 3-node cluster setup |
| `single-node-cluster.yaml` | Single-node with cluster features enabled |
| `tls-server.yaml` | TLS/SSL configuration |
| File | Description |
| ---------------------------------------- | ------------------------------------------ |
| `config.yaml` | Full broker configuration with all options |
| `no-cluster.yaml` | Single-node mode (no clustering) |
| `node1.yaml`, `node2.yaml`, `node3.yaml` | 3-node cluster setup |
| `single-node-cluster.yaml` | Single-node with cluster features enabled |
| `tls-server.yaml` | TLS/SSL configuration |
## Client Examples
@@ -29,7 +29,7 @@ A queue named `tasks/orders` receives orders from multiple MQTT publishers. Thre
│ [MQTT] │ │ [MQTT] │ │ [MQTT] │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
│ QoS 2 PUBLISH (exactly-once)
│ QoS 1/2 PUBLISH
└───────────────────┼───────────────────┘
┌───────────────────┐
@@ -46,24 +46,24 @@ A queue named `tasks/orders` receives orders from multiple MQTT publishers. Thre
│ (Consumer Group 1) │ │ (Consumer Group 2) │
│ [MQTT] │ │ [AMQP 1.0] │
├─────────────────────┤ ├─────────────────────┤
│ ┌─────┐ ┌─────┐ │ │ ┌─────┐ │
│ │ C1 │ │ C2 │ │ │ │ C1 │ │
│ └─────┘ └─────┘ │ │ └─────┘ │
│ ┌─────┐ ┌─────┐ │ │ ┌─────┐ │
│ │ C1 │ │ C2 │ │ │ │ C1 │ │
│ └─────┘ └─────┘ │ │ └─────┘ │
│ Load balanced │ │ Single consumer │
└─────────────────────┘ └─────────────────────┘
```
#### Key Concepts Demonstrated
| Concept | Description |
|---------|-------------|
| **Cross-Protocol Interop** | MQTT publishers, AMQP 1.0 and AMQP 0.9.1 consumers on the same queue |
| **QoS 2 Publishing** | Exactly-once delivery from publisher to broker |
| **Consumer Groups** | Both groups receive ALL messages (fan-out pattern) |
| **Load Balancing** | Messages distributed across consumers within a group |
| **AMQP Dispositions** | `AcceptMessage`, `ReleaseMessage`, `RejectMessage` for ack/nack/reject |
| **AMQP 0.9.1 Acks** | `Ack`, `Nack`, `Reject` for processing control |
| **MQTT Acknowledgments** | `Ack()`, `Nack()`, `Reject()` for processing control |
| Concept | Description |
| -------------------------- | ---------------------------------------------------------------------- |
| **Cross-Protocol Interop** | MQTT publishers, AMQP 1.0 and AMQP 0.9.1 consumers on the same queue |
| **QoS 1/2 Publishing** | Reliable publish with QoS guarantees |
| **Consumer Groups** | Both groups receive ALL messages (fan-out pattern) |
| **Load Balancing** | Messages distributed across consumers within a group |
| **AMQP Dispositions** | `AcceptMessage`, `ReleaseMessage`, `RejectMessage` for ack/nack/reject |
| **AMQP 0.9.1 Acks** | `Ack`, `Nack`, `Reject` for processing control |
| **MQTT Acknowledgments** | `Ack()`, `Nack()`, `Reject()` for processing control |
#### AMQP 1.0 Consumer
@@ -114,7 +114,7 @@ The consumer group is passed via the `x-consumer-group` argument on `basic.consu
1. Start the broker:
```bash
go run ./cmd/broker/ -config examples/no-cluster.yaml
go run ./cmd/ --config examples/no-cluster.yaml
```
2. Run the queue client:
+67
View File
@@ -7,6 +7,7 @@ import (
"context"
"fmt"
"log/slog"
"strconv"
"time"
"github.com/absmach/fluxmq/broker/events"
@@ -128,6 +129,7 @@ func (b *Broker) DeliverMessage(s *session.Session, msg *storage.Message) error
p.TopicName = msg.Topic
p.Payload = msg.GetPayload()
p.ID = msg.PacketID
applyPublishProperties(p.Properties, msg)
pub = p
default:
@@ -149,6 +151,71 @@ func (b *Broker) DeliverMessage(s *session.Session, msg *storage.Message) error
return s.WritePacket(pub)
}
func applyPublishProperties(props *v5.PublishProperties, msg *storage.Message) {
if props == nil || msg == nil {
return
}
// Prefer explicit fields; fall back to mapped properties.
if msg.ContentType != "" {
props.ContentType = msg.ContentType
} else if v := msg.Properties["content-type"]; v != "" {
props.ContentType = v
}
if msg.ResponseTopic != "" {
props.ResponseTopic = msg.ResponseTopic
} else if v := msg.Properties["response-topic"]; v != "" {
props.ResponseTopic = v
}
if len(msg.CorrelationData) > 0 {
props.CorrelationData = msg.CorrelationData
} else if v := msg.Properties["correlation-id"]; v != "" {
props.CorrelationData = []byte(v)
}
if msg.PayloadFormat != nil {
props.PayloadFormat = msg.PayloadFormat
} else if v := msg.Properties["payload-format"]; v != "" {
if n, err := strconv.ParseUint(v, 10, 8); err == nil {
pf := byte(n)
props.PayloadFormat = &pf
}
}
userProps := make(map[string]string)
if msg.Properties != nil {
for k, v := range msg.Properties {
if isReservedUserPropertyKey(k) {
continue
}
userProps[k] = v
}
}
if msg.UserProperties != nil {
for k, v := range msg.UserProperties {
userProps[k] = v
}
}
if len(userProps) > 0 {
props.User = make([]v5.User, 0, len(userProps))
for k, v := range userProps {
props.User = append(props.User, v5.User{Key: k, Value: v})
}
}
}
func isReservedUserPropertyKey(key string) bool {
switch key {
case "content-type", "response-topic", "correlation-id", "payload-format":
return true
default:
return false
}
}
// DeliverToClient implements cluster.MessageHandler.DeliverToClient.
func (b *Broker) DeliverToClient(ctx context.Context, clientID string, msg *cluster.Message) error {
s := b.Get(clientID)
+9
View File
@@ -4,6 +4,7 @@
package broker
import (
"strconv"
"strings"
v5 "github.com/absmach/fluxmq/mqtt/packets/v5"
@@ -92,6 +93,10 @@ func extractAllProperties(props *v5.PublishProperties) map[string]string {
}
// Add other MQTT v5 properties if present
if props.ContentType != "" {
result["content-type"] = props.ContentType
}
if props.ResponseTopic != "" {
result["response-topic"] = props.ResponseTopic
}
@@ -100,5 +105,9 @@ func extractAllProperties(props *v5.PublishProperties) map[string]string {
result["correlation-id"] = string(props.CorrelationData)
}
if props.PayloadFormat != nil {
result["payload-format"] = strconv.FormatUint(uint64(*props.PayloadFormat), 10)
}
return result
}
+43 -21
View File
@@ -221,11 +221,21 @@ func (h *V5Handler) HandlePublish(s *session.Session, pkt packets.ControlPacket)
var messageExpiry *uint32
var expiryTime time.Time
publishTime := time.Now()
var payloadFormat *byte
var contentType string
var responseTopic string
var correlationData []byte
if p.Properties != nil && p.Properties.MessageExpiry != nil {
messageExpiry = p.Properties.MessageExpiry
expiryTime = publishTime.Add(time.Duration(*messageExpiry) * time.Second)
}
if p.Properties != nil {
payloadFormat = p.Properties.PayloadFormat
contentType = p.Properties.ContentType
responseTopic = p.Properties.ResponseTopic
correlationData = p.Properties.CorrelationData
}
// Extract MQTT v5 properties for queue functionality
properties := extractAllProperties(p.Properties)
@@ -235,13 +245,17 @@ func (h *V5Handler) HandlePublish(s *session.Session, pkt packets.ControlPacket)
// Zero-copy: Create ref-counted buffer from payload
buf := core.GetBufferWithData(payload)
msg := &storage.Message{
Topic: topic,
QoS: qos,
Retain: retain,
MessageExpiry: messageExpiry,
Expiry: expiryTime,
PublishTime: publishTime,
Properties: properties,
Topic: topic,
QoS: qos,
Retain: retain,
MessageExpiry: messageExpiry,
Expiry: expiryTime,
PublishTime: publishTime,
Properties: properties,
PayloadFormat: payloadFormat,
ContentType: contentType,
ResponseTopic: responseTopic,
CorrelationData: correlationData,
}
msg.SetPayloadFromBuffer(buf)
err := h.broker.Publish(msg)
@@ -256,13 +270,17 @@ func (h *V5Handler) HandlePublish(s *session.Session, pkt packets.ControlPacket)
// Zero-copy: Create ref-counted buffer from payload
buf := core.GetBufferWithData(payload)
msg := &storage.Message{
Topic: topic,
QoS: qos,
Retain: retain,
MessageExpiry: messageExpiry,
Expiry: expiryTime,
PublishTime: publishTime,
Properties: properties,
Topic: topic,
QoS: qos,
Retain: retain,
MessageExpiry: messageExpiry,
Expiry: expiryTime,
PublishTime: publishTime,
Properties: properties,
PayloadFormat: payloadFormat,
ContentType: contentType,
ResponseTopic: responseTopic,
CorrelationData: correlationData,
}
msg.SetPayloadFromBuffer(buf)
if err := h.broker.Publish(msg); err != nil {
@@ -286,13 +304,17 @@ func (h *V5Handler) HandlePublish(s *session.Session, pkt packets.ControlPacket)
// Publish message immediately (distribution to subscribers)
msg := &storage.Message{
Topic: topic,
QoS: qos,
Retain: retain,
MessageExpiry: messageExpiry,
Expiry: expiryTime,
PublishTime: publishTime,
Properties: properties,
Topic: topic,
QoS: qos,
Retain: retain,
MessageExpiry: messageExpiry,
Expiry: expiryTime,
PublishTime: publishTime,
Properties: properties,
PayloadFormat: payloadFormat,
ContentType: contentType,
ResponseTopic: responseTopic,
CorrelationData: correlationData,
}
msg.SetPayloadFromBuffer(buf)
if err := h.broker.Publish(msg); err != nil {