Update docs and client code

Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
dusan
2026-02-04 01:04:32 +01:00
parent e3f28fa41a
commit 60b6d958b5
17 changed files with 786 additions and 1581 deletions
+11 -11
View File
@@ -57,52 +57,52 @@ bench:
# Run benchmarks for broker only
.PHONY: bench-broker
bench-broker:
$(GO) test -bench=. -benchmem -run=^$$ ./broker
$(GO) test -bench=. -benchmem -run=^$$ ./mqtt/broker
# Run zero-copy comparison benchmarks
.PHONY: bench-zerocopy
bench-zerocopy:
$(GO) test -bench=BenchmarkMessageCopy -benchmem -run=^$$ ./broker
$(GO) test -bench=BenchmarkMessageCopy -benchmem -run=^$$ ./mqtt/broker
# Run stress tests (all)
.PHONY: stress
stress:
$(GO) test -v -run=TestStress -timeout=30m ./broker
$(GO) test -v -run=TestStress -timeout=30m ./mqtt/broker
# Run specific stress tests
.PHONY: stress-throughput
stress-throughput:
$(GO) test -v -run=TestStress_HighThroughputPublish -timeout=5m ./broker
$(GO) test -v -run=TestStress_HighThroughputPublish -timeout=5m ./mqtt/broker
.PHONY: stress-concurrent
stress-concurrent:
$(GO) test -v -run=TestStress_ConcurrentPublishers -timeout=5m ./broker
$(GO) test -v -run=TestStress_ConcurrentPublishers -timeout=5m ./mqtt/broker
.PHONY: stress-memory
stress-memory:
$(GO) test -v -run=TestStress_MemoryPressure -timeout=5m ./broker
$(GO) test -v -run=TestStress_MemoryPressure -timeout=5m ./mqtt/broker
.PHONY: stress-sustained
stress-sustained:
$(GO) test -v -run=TestStress_SustainedLoad -timeout=5m ./broker
$(GO) test -v -run=TestStress_SustainedLoad -timeout=5m ./mqtt/broker
.PHONY: stress-pool
stress-pool:
$(GO) test -v -run=TestStress_BufferPoolExhaustion -timeout=5m ./broker
$(GO) test -v -run=TestStress_BufferPoolExhaustion -timeout=5m ./mqtt/broker
.PHONY: stress-fanout
stress-fanout:
$(GO) test -v -run=TestStress_FanOutExtreme -timeout=5m ./broker
$(GO) test -v -run=TestStress_FanOutExtreme -timeout=5m ./mqtt/broker
.PHONY: stress-churn
stress-churn:
$(GO) test -v -run=TestStress_RapidSubscribeUnsubscribe -timeout=5m ./broker
$(GO) test -v -run=TestStress_RapidSubscribeUnsubscribe -timeout=5m ./mqtt/broker
# Generate benchmark report
.PHONY: bench-report
bench-report:
@mkdir -p $(BUILD_DIR)
$(GO) test -bench=. -benchmem -run=^$$ ./broker | tee $(BUILD_DIR)/benchmark-results.txt
$(GO) test -bench=. -benchmem -run=^$$ ./mqtt/broker | tee $(BUILD_DIR)/benchmark-results.txt
@echo ""
@echo "Benchmark results saved to $(BUILD_DIR)/benchmark-results.txt"
+30 -25
View File
@@ -11,16 +11,16 @@ A high-performance, multi-protocol message broker written in Go designed for sca
**Event-Driven Architectures**
- **Event backbone for microservices** - Reliable, ordered event distribution between services with at-least-once or exactly-once delivery (QoS 1/2)
- **CQRS systems** - Durable queues for command/event distribution with partition-based ordering per aggregate
- **Asynchronous workflows** - Decouple services with persistent message queues and automatic retries
- **CQRS systems** - Durable queues for command/event distribution with per-queue FIFO ordering
- **Asynchronous workflows** - Decouple services with persistent message queues and ack/nack-based redelivery
- **Real-time event processing** - High throughput (300K-500K msg/s per node) with low latency (<10ms local, ~5ms cross-node)
**Why choose this over Kafka for EDA:**
- ✅ Simpler operations - single binary with embedded storage, no Zookeeper/KRaft
- ✅ Multi-protocol - same broker handles MQTT, HTTP, WebSocket, CoAP
- ✅ Partition-based ordering with sequence numbers (perfect for aggregate-based event streams)
-Configurable retention (hours to days) for event replay during deployments/failures
-Raft replication with quorum writes ensures no lost events
- ✅ Per-queue FIFO ordering (single-log queues)
-Retention via committed-offset truncation (time/size retention planned)
-Optional Raft layer for queue appends (WIP)
**IoT & Real-Time Systems**
- **Device communication** - MQTT 3.1.1/5.0 with QoS levels for reliable delivery over unreliable networks
@@ -41,7 +41,7 @@ A high-performance, multi-protocol message broker written in Go designed for sca
- ❌ Time-travel debugging or temporal queries - no time-range indexing
**Complex Event Processing**
- ❌ Advanced queries over events - no indexing beyond partition+sequence
- ❌ Advanced queries over events - no indexing beyond topic and offset
- ❌ Built-in stream processing - no Kafka Streams equivalent (process events in consumers)
**Large Payloads**
@@ -54,9 +54,9 @@ A high-performance, multi-protocol message broker written in Go designed for sca
│ Service A │────────>│ MQTT Broker │────────>│ Service B │
│ (Producer) │ events │ (Event Bus) │ events │ (Consumer) │
└─────────────┘ │ │ └─────────────┘
│ │ • Retention: 7d │
│ │ • Replication:3x│
▼ │ • Ordering: Yes │
│ │ • Retention: committed offset
│ │ • Replication: WIP (Raft)
▼ │ • Ordering: FIFO per queue
┌─────────────┐ └──────────────────┘ ┌─────────────┐
│ Database │ │ Database │
│ (State) │ Broker = Durable Pipe │ (State) │
@@ -65,17 +65,22 @@ A high-performance, multi-protocol message broker written in Go designed for sca
**Recommended configuration for EDA:**
```yaml
queue:
ordering: partition # FIFO per aggregate/entity
partitions: 50-100 # Balance parallelism vs overhead
retention:
retention_time: 168h # 7 days for replay
replication:
enabled: true
replication_factor: 3 # Survive node failures
mode: sync # Don't lose events
min_in_sync_replicas: 2 # Quorum writes
queues:
- name: "orders"
topics: ["orders/#", "$queue/orders/#"]
limits:
max_message_size: 1048576
max_depth: 100000
message_ttl: 168h
retry:
max_retries: 10
initial_backoff: 5s
max_backoff: 5m
multiplier: 2.0
dlq:
enabled: true
```
Note: queue limits, retry, and DLQ wiring are parsed but not fully enforced yet.
## Features
@@ -125,10 +130,10 @@ queue:
- **Durable Queues**
- Persistent message queues with consumer groups
- Ack/Nack/Reject message acknowledgment
- Partitioning with ordered delivery
- Dead-letter queue support
- Raft-based replication (3x, automatic failover)
- Kafka-style retention (time, size, log compaction)
- FIFO per queue and per consumer group (single cursor)
- DLQ handler present (delivery path wiring pending)
- Optional Raft layer for queue appends (WIP)
- Retention via committed-offset truncation (time/size retention planned)
- **Persistent Storage**
- BadgerDB for session state and offline queues
@@ -349,8 +354,8 @@ See [Scaling & Performance](docs/scaling.md) for detailed benchmarks.
- gRPC inter-broker communication (mTLS supported)
- BadgerDB persistent storage
- Durable queues with consumer groups
- Queue replication with Raft (3x replication, automatic failover)
- Kafka-style retention policies (time, size, log compaction)
- Raft layer for queue appends (WIP)
- Committed-offset truncation for queues (time/size retention planned)
- TLS/mTLS for client and inter-broker connections
- WebSocket origin validation
- Shared subscriptions (MQTT 5.0)
+9 -20
View File
@@ -18,14 +18,13 @@ This directory contains benchmark suites and performance analysis results for th
```bash
# Message publishing benchmarks
go test -bench=BenchmarkMessagePublish -benchtime=5s ./broker
go test -bench=BenchmarkMessagePublish -benchtime=5s ./mqtt/broker
# Router benchmarks
go test -bench=BenchmarkRouter -benchtime=3s ./broker/router
# Queue benchmarks
go test -bench=BenchmarkEnqueue -benchtime=3s ./queue
go test -bench=BenchmarkDequeue -benchtime=3s ./queue
# Queue benchmarks are not currently present in-tree.
```
## Performance Results
@@ -36,11 +35,11 @@ go test -bench=BenchmarkDequeue -benchtime=3s ./queue
- **Memory:** 177 B/op
- **Allocations:** 4 allocs/op
**See:** [`docs/PERFORMANCE_RESULTS.md`](../docs/PERFORMANCE_RESULTS.md) for comprehensive results.
Results produced by the scripts are stored under `benchmarks/results/`.
## Benchmark Suite
### Broker Benchmarks (`broker/message_bench_test.go`)
### Broker Benchmarks (`mqtt/broker/message_bench_test.go`)
**Message Publishing:**
- `BenchmarkMessagePublish_SingleSubscriber` - Single subscriber performance
@@ -65,19 +64,9 @@ go test -bench=BenchmarkDequeue -benchtime=3s ./queue
- `BenchmarkRouter_Unsubscribe` - Subscription removal
- `BenchmarkRouter_Mixed_*` - Mixed read/write workloads
### Queue Benchmarks (`queue/*_bench_test.go`)
### Queue Benchmarks
**Enqueue:**
- `BenchmarkEnqueue_SinglePartition` - Single partition enqueue
- `BenchmarkEnqueue_MultiplePartitions` - Multi-partition enqueue
- `BenchmarkEnqueue_SmallPayload` - Small message performance
- `BenchmarkEnqueue_LargePayload` - Large message performance
- `BenchmarkEnqueue_Parallel` - Concurrent enqueue
**Dequeue:**
- `BenchmarkDequeue_SingleConsumer` - Single consumer dequeue
- `BenchmarkDequeue_MultipleConsumers` - Multi-consumer scaling
- `BenchmarkDequeue_PartitionScanning` - Partition scanning overhead
Queue benchmarks are not currently present in-tree.
## Profiling
@@ -86,15 +75,15 @@ go test -bench=BenchmarkDequeue -benchtime=3s ./queue
```bash
# CPU profile
go test -bench=BenchmarkMessagePublish_MultipleSubscribers/1000 \
-cpuprofile=cpu.prof -benchtime=30s ./broker
-cpuprofile=cpu.prof -benchtime=30s ./mqtt/broker
# Memory profile
go test -bench=BenchmarkMessagePublish_MultipleSubscribers/1000 \
-memprofile=mem.prof -benchtime=30s ./broker
-memprofile=mem.prof -benchtime=30s ./mqtt/broker
# Mutex contention profile
go test -bench=BenchmarkMessagePublish_MultipleSubscribers/1000 \
-mutexprofile=mutex.prof -benchtime=30s ./broker
-mutexprofile=mutex.prof -benchtime=30s ./mqtt/broker
```
### Analyze Profiles
+25 -11
View File
@@ -37,7 +37,8 @@ type Client struct {
topicAliases *topicAliasManager
// Queue subscriptions
queueSubs *queueSubscriptions
queueSubs *queueSubscriptions
queueAckCache *queueAckCache
// Pending operations
pending *pendingStore
@@ -76,12 +77,13 @@ func New(opts *Options) (*Client, error) {
}
return &Client{
opts: opts,
state: newStateManager(),
pending: newPendingStore(opts.MaxInflight),
store: store,
qos2Incoming: make(map[uint16]*Message),
queueSubs: newQueueSubscriptions(),
opts: opts,
state: newStateManager(),
pending: newPendingStore(opts.MaxInflight),
store: store,
qos2Incoming: make(map[uint16]*Message),
queueSubs: newQueueSubscriptions(),
queueAckCache: newQueueAckCache(5 * time.Minute),
}, nil
}
@@ -984,22 +986,34 @@ func (c *Client) handleQueueMessage(msg *Message) {
// Extract queue message metadata from user properties
var messageID string
var sequence uint64
var groupID string
var offset uint64
if msg.UserProperties != nil {
if msgID, ok := msg.UserProperties["message-id"]; ok {
messageID = msgID
}
if seq, ok := msg.UserProperties["sequence"]; ok {
fmt.Sscanf(seq, "%d", &sequence)
if gid, ok := msg.UserProperties["group-id"]; ok {
groupID = gid
}
if off, ok := msg.UserProperties["offset"]; ok {
fmt.Sscanf(off, "%d", &offset)
} else if seq, ok := msg.UserProperties["sequence"]; ok {
fmt.Sscanf(seq, "%d", &offset)
}
}
if c.queueAckCache != nil {
c.queueAckCache.set(messageID, groupID)
}
// Create queue message with ack/nack/reject methods
queueMsg := &QueueMessage{
Message: msg,
MessageID: messageID,
Sequence: sequence,
GroupID: groupID,
Offset: offset,
Sequence: offset,
client: c,
queueName: sub.queueName,
}
+9 -7
View File
@@ -20,13 +20,15 @@ var (
ErrConnectTimeout = errors.New("connection timeout")
// Operation errors.
ErrTimeout = errors.New("operation timed out")
ErrMaxInflight = errors.New("maximum inflight messages exceeded")
ErrConnectionLost = errors.New("connection lost")
ErrClientClosed = errors.New("client has been closed")
ErrInvalidQoS = errors.New("invalid QoS level (must be 0, 1, or 2)")
ErrInvalidTopic = errors.New("invalid topic")
ErrSubscribeFailed = errors.New("subscription failed")
ErrTimeout = errors.New("operation timed out")
ErrMaxInflight = errors.New("maximum inflight messages exceeded")
ErrConnectionLost = errors.New("connection lost")
ErrClientClosed = errors.New("client has been closed")
ErrInvalidQoS = errors.New("invalid QoS level (must be 0, 1, or 2)")
ErrInvalidTopic = errors.New("invalid topic")
ErrSubscribeFailed = errors.New("subscription failed")
ErrQueueAckRequiresV5 = errors.New("queue acknowledgments require MQTT v5 user properties")
ErrQueueAckMissingGroup = errors.New("group-id required for queue acknowledgment")
// Protocol errors.
ErrUnexpectedPacket = errors.New("unexpected packet type")
+111 -16
View File
@@ -28,7 +28,9 @@ type QueueMessage struct {
*Message // Embedded standard MQTT message
MessageID string // Unique message ID for acknowledgment
Sequence uint64 // Message sequence number
GroupID string // Consumer group ID for acknowledgment
Offset uint64 // Queue offset
Sequence uint64 // Legacy alias for Offset
// Internal fields for acknowledgment
client *Client
@@ -41,7 +43,7 @@ func (qm *QueueMessage) Ack() error {
if qm.client == nil {
return fmt.Errorf("cannot acknowledge: client not set")
}
return qm.client.Ack(qm.queueName, qm.MessageID)
return qm.client.AckWithGroup(qm.queueName, qm.MessageID, qm.GroupID)
}
// Nack negatively acknowledges the message, triggering a retry.
@@ -50,7 +52,7 @@ func (qm *QueueMessage) Nack() error {
if qm.client == nil {
return fmt.Errorf("cannot nack: client not set")
}
return qm.client.Nack(qm.queueName, qm.MessageID)
return qm.client.NackWithGroup(qm.queueName, qm.MessageID, qm.GroupID)
}
// Reject rejects the message, sending it to the dead-letter queue.
@@ -59,7 +61,7 @@ func (qm *QueueMessage) Reject() error {
if qm.client == nil {
return fmt.Errorf("cannot reject: client not set")
}
return qm.client.Reject(qm.queueName, qm.MessageID)
return qm.client.RejectWithGroup(qm.queueName, qm.MessageID, qm.GroupID)
}
// queueSubscription tracks a queue subscription and its handler.
@@ -100,6 +102,55 @@ func (qs *queueSubscriptions) remove(queueTopic string) {
delete(qs.subs, queueTopic)
}
type queueAckInfo struct {
groupID string
expires time.Time
}
type queueAckCache struct {
mu sync.Mutex
ttl time.Duration
entries map[string]queueAckInfo
}
func newQueueAckCache(ttl time.Duration) *queueAckCache {
return &queueAckCache{
ttl: ttl,
entries: make(map[string]queueAckInfo),
}
}
func (c *queueAckCache) set(messageID, groupID string) {
if messageID == "" || groupID == "" {
return
}
c.mu.Lock()
defer c.mu.Unlock()
c.entries[messageID] = queueAckInfo{
groupID: groupID,
expires: time.Now().Add(c.ttl),
}
}
func (c *queueAckCache) get(messageID string) (string, bool) {
if messageID == "" {
return "", false
}
c.mu.Lock()
defer c.mu.Unlock()
entry, ok := c.entries[messageID]
if !ok {
return "", false
}
if !entry.expires.IsZero() && entry.expires.Before(time.Now()) {
delete(c.entries, messageID)
return "", false
}
return entry.groupID, true
}
// PublishToQueue publishes a message to a durable queue.
// The queueName should NOT include the "$queue/" prefix - it will be added automatically.
func (c *Client) PublishToQueue(queueName string, payload []byte) error {
@@ -345,39 +396,83 @@ func (c *Client) UnsubscribeFromQueue(queueName string) error {
// Ack acknowledges successful processing of a queue message.
func (c *Client) Ack(queueName, messageID string) error {
return c.sendQueueAck(queueName, messageID, "$ack")
return c.sendQueueAck(queueName, messageID, "", "$ack")
}
// Nack negatively acknowledges a queue message, triggering retry.
func (c *Client) Nack(queueName, messageID string) error {
return c.sendQueueAck(queueName, messageID, "$nack")
return c.sendQueueAck(queueName, messageID, "", "$nack")
}
// Reject rejects a queue message, sending it to the dead-letter queue.
func (c *Client) Reject(queueName, messageID string) error {
return c.sendQueueAck(queueName, messageID, "$reject")
return c.sendQueueAck(queueName, messageID, "", "$reject")
}
// AckWithGroup acknowledges a queue message with an explicit consumer group.
func (c *Client) AckWithGroup(queueName, messageID, groupID string) error {
return c.sendQueueAck(queueName, messageID, groupID, "$ack")
}
// NackWithGroup negatively acknowledges a queue message with an explicit consumer group.
func (c *Client) NackWithGroup(queueName, messageID, groupID string) error {
return c.sendQueueAck(queueName, messageID, groupID, "$nack")
}
// RejectWithGroup rejects a queue message with an explicit consumer group.
func (c *Client) RejectWithGroup(queueName, messageID, groupID string) error {
return c.sendQueueAck(queueName, messageID, groupID, "$reject")
}
// sendQueueAck sends an acknowledgment for a queue message.
func (c *Client) sendQueueAck(queueName, messageID, ackType string) error {
func (c *Client) sendQueueAck(queueName, messageID, groupID, ackType string) error {
if !c.state.isConnected() {
return ErrNotConnected
}
if c.opts.ProtocolVersion != 5 {
return ErrQueueAckRequiresV5
}
if groupID == "" && c.queueAckCache != nil {
if cachedGroup, ok := c.queueAckCache.get(messageID); ok {
groupID = cachedGroup
}
}
if groupID == "" {
queueTopic := "$queue/" + queueName
if sub, ok := c.queueSubs.get(queueTopic); ok {
if sub.consumerGroup != "" {
groupID = sub.consumerGroup
} else if c.opts.ClientID != "" {
groupID = defaultGroupID(c.opts.ClientID)
}
}
}
if groupID == "" {
return ErrQueueAckMissingGroup
}
// Build ack topic: $queue/{queueName}/{$ack|$nack|$reject}
ackTopic := "$queue/" + queueName + "/" + ackType
// Send with message-id user property (MQTT v5)
if c.opts.ProtocolVersion == 5 {
userProps := map[string]string{
"message-id": messageID,
}
return c.publishWithUserProperties(ackTopic, nil, 1, false, userProps)
userProps := map[string]string{
"message-id": messageID,
"group-id": groupID,
}
return c.publishWithUserProperties(ackTopic, nil, 1, false, userProps)
}
// For MQTT v3, we can't send the message ID, so just publish empty message
// The broker would need to track inflight by client ID + topic
return c.Publish(ackTopic, nil, 1, false)
func defaultGroupID(clientID string) string {
for i, c := range clientID {
if c == '-' {
return clientID[:i]
}
}
return clientID
}
// isQueueTopic returns true if the topic is a queue topic.
+8 -6
View File
@@ -171,9 +171,9 @@ func TestClient_HandleQueueMessage(t *testing.T) {
Topic: "$queue/test",
Payload: []byte("test payload"),
UserProperties: map[string]string{
"message-id": "msg-123",
"partition-id": "5",
"sequence": "42",
"message-id": "msg-123",
"group-id": "workers",
"offset": "42",
},
}
@@ -209,9 +209,9 @@ func TestClient_HandleQueueMessage_WithHandler(t *testing.T) {
Topic: "$queue/test",
Payload: []byte("test payload"),
UserProperties: map[string]string{
"message-id": "msg-123",
"partition-id": "5",
"sequence": "42",
"message-id": "msg-123",
"group-id": "workers",
"offset": "42",
},
}
@@ -233,6 +233,8 @@ func TestClient_HandleQueueMessage_WithHandler(t *testing.T) {
require.NotNil(t, receivedQueueMsg)
assert.Equal(t, "msg-123", receivedQueueMsg.MessageID)
assert.Equal(t, "workers", receivedQueueMsg.GroupID)
assert.Equal(t, uint64(42), receivedQueueMsg.Offset)
assert.Equal(t, uint64(42), receivedQueueMsg.Sequence)
assert.Equal(t, "test", receivedQueueMsg.queueName)
assert.Equal(t, client, receivedQueueMsg.client)
+38 -7
View File
@@ -30,9 +30,20 @@ type Broker struct {
retained storage.RetainedStore // Retained messages
wills storage.WillStore // Will messages
cluster cluster.Cluster // Cluster coordination (nil if single-node)
cluster cluster.Cluster // Cluster coordination (nil if single-node)
queueManager QueueManager // Durable queue manager (nil if disabled)
auth *AuthEngine // Authentication
rateLimiter ClientRateLimiter // Rate limiting (nil if disabled)
webhooks Notifier // Webhook notifier (nil if disabled)
metrics *otel.Metrics // OTel metrics (nil if disabled)
tracer trace.Tracer // OTel tracing (nil if disabled)
sharedSubs *SharedSubscriptionManager
maxQoS byte
maxOfflineQueueSize int
offlineQueueEvict bool
auth *AuthEngine // Authentication
logger *slog.Logger // Structured logging
stats *Stats // Metrics collection
@@ -51,7 +62,16 @@ type Broker struct {
### Initialization
```go
func NewBroker(store storage.Store, clust cluster.Cluster, logger *slog.Logger, stats *Stats) *Broker {
func NewBroker(
store storage.Store,
clust cluster.Cluster,
logger *slog.Logger,
stats *Stats,
webhooks Notifier,
metrics *otel.Metrics,
tracer trace.Tracer,
sessionCfg config.SessionConfig,
) *Broker {
if store == nil {
store = memory.New() // Default to in-memory
}
@@ -67,6 +87,13 @@ func NewBroker(store storage.Store, clust cluster.Cluster, logger *slog.Logger,
cluster: clust, // Can be nil for single-node
logger: logger,
stats: stats,
webhooks: webhooks,
metrics: metrics,
tracer: tracer,
sharedSubs: NewSharedSubscriptionManager(),
maxQoS: 2,
maxOfflineQueueSize: sessionCfg.MaxOfflineQueueSize,
offlineQueueEvict: sessionCfg.OfflineQueuePolicy == "evict",
stopCh: make(chan struct{}),
}
@@ -83,6 +110,9 @@ func NewBroker(store storage.Store, clust cluster.Cluster, logger *slog.Logger,
- `clust`: Cluster coordination (nil, NoopCluster, or EtcdCluster)
- `logger`: Structured logging (slog)
- `stats`: Metrics collection
- `webhooks`: Optional webhook notifier (nil disables)
- `metrics` / `tracer`: Optional OpenTelemetry instrumentation
- `sessionCfg`: Session defaults for offline queue behavior
### Domain Methods
@@ -90,12 +120,12 @@ The broker exposes clean, protocol-agnostic methods:
```go
// Session Management
CreateSession(clientID string, opts SessionOptions) (*session.Session, bool, error)
CreateSession(clientID string, version byte, opts session.Options) (*session.Session, bool, error)
DestroySession(clientID string) error
Get(clientID string) *session.Session
// Publishing
Publish(msg Message) error
Publish(msg *storage.Message) error
PublishWill(clientID string) error
// Subscribing
@@ -103,8 +133,9 @@ Subscribe(clientID string, filter string, qos byte, opts storage.SubscribeOption
Unsubscribe(clientID string, filter string) error
// Delivery
DeliverToSession(s *session.Session, msg Message) (uint16, error)
AckMessage(clientID string, packetID uint16, qos byte) error
DeliverToSession(s *session.Session, msg *storage.Message) (uint16, error)
DeliverToSessionByID(ctx context.Context, clientID string, msg interface{}) error
AckMessage(s *session.Session, packetID uint16) error
// Cluster Integration (implements cluster.MessageHandler)
DeliverToClient(ctx context.Context, clientID, topic string, ...) error
+23 -21
View File
@@ -13,7 +13,7 @@ currently include a dedicated AMQP 1.0 client library.
- **QoS Levels:** Full QoS 0/1/2 support with message persistence
- **TLS/SSL:** Secure connections with custom certificates
- **Session Persistence:** Configurable session expiry
- **Durable Queues:** Consumer groups, acknowledgments, dead-letter queues
- **Durable Queues:** Consumer groups and acknowledgments (DLQ wiring pending)
- **MQTT 5.0 Features:** Topic aliases, user properties, flow control
---
@@ -217,33 +217,29 @@ opts.SetOnMessageV2(func(msg *client.Message) {
## Durable Queues
The client supports durable queues with consumer groups, message acknowledgment, and dead-letter queues.
The client supports durable queues with consumer groups and message acknowledgment.
Reject/DLQ wiring in the broker is pending.
**When to use queues instead of regular pub/sub:**
- You need guaranteed message processing (exactly-once semantics)
- You need at-least-once processing with explicit acknowledgments
- Multiple consumers should share the workload (consumer groups)
- Failed messages need retry logic or dead-letter handling
**Key concepts:**
- **Queue**: Persistent message buffer with ordered delivery per partition
- **Queue**: Persistent message buffer with ordered delivery per queue (single log)
- **Consumer Group**: Multiple consumers share messages from the same queue
- **Partition Key**: Messages with the same key are processed in order
- **Acknowledgment**: Confirm successful processing (Ack), request retry (Nack), or reject permanently (Reject)
- **Acknowledgment**: Confirm success (Ack), request redelivery (Nack), or reject permanently (Reject)
### Publishing to Queues
```go
// Simple queue publish
c.PublishToQueue("orders", []byte(`{"item": "widget"}`), "customer-123")
// With partition key for ordered processing
c.PublishToQueue("transactions", []byte("txn-data"), "account-456")
c.PublishToQueue("orders", []byte(`{"item": "widget"}`))
// Full control
c.PublishToQueueWithOptions(&client.QueuePublishOptions{
QueueName: "events",
Payload: []byte("event-data"),
PartitionKey: "user-789",
Properties: map[string]string{"priority": "high"},
QoS: 1,
})
@@ -256,15 +252,18 @@ c.PublishToQueueWithOptions(&client.QueuePublishOptions{
err := c.SubscribeToQueue("orders", "order-processors", func(msg *client.QueueMessage) {
log.Printf("Processing order: %s", msg.Payload)
log.Printf("Message ID: %s", msg.MessageID)
log.Printf("Partition: %d, Sequence: %d", msg.PartitionID, msg.Sequence)
if msg.UserProperties != nil {
log.Printf("Offset: %s", msg.UserProperties["offset"])
log.Printf("Group: %s", msg.UserProperties["group-id"])
}
// Process message...
if processedOK {
msg.Ack() // Message removed from queue
} else if shouldRetry {
msg.Nack() // Redelivered with backoff
msg.Nack() // Redelivery eligible immediately (no backoff enforcement yet)
} else {
msg.Reject() // Sent to dead-letter queue
msg.Reject() // Rejects message; DLQ wiring pending
}
})
```
@@ -274,17 +273,20 @@ err := c.SubscribeToQueue("orders", "order-processors", func(msg *client.QueueMe
| Method | Effect |
| -------------- | -------------------------------------------------- |
| `msg.Ack()` | Message processed successfully, removed from queue |
| `msg.Nack()` | Processing failed, retry with backoff |
| `msg.Reject()` | Permanent failure, sent to dead-letter queue |
| `msg.Nack()` | Processing failed, make eligible for redelivery |
| `msg.Reject()` | Permanent failure, DLQ wiring pending |
### Direct Acknowledgment
```go
// Acknowledge by message ID
c.Ack("orders", "msg-12345")
c.Nack("orders", "msg-12345")
c.Reject("orders", "msg-12345")
// Acknowledge by message ID (explicit group)
c.AckWithGroup("orders", "msg-12345", "processors")
c.NackWithGroup("orders", "msg-12345", "processors")
c.RejectWithGroup("orders", "msg-12345", "processors")
```
Note: the broker expects `message-id` and `group-id` user properties on acks
(MQTT v5). `QueueMessage.Ack()` sends both. For direct acks, use the `*WithGroup`
helpers or provide `group-id` manually.
### Unsubscribe from Queue
@@ -333,7 +335,7 @@ func main() {
// Publish some orders
for i := 0; i < 10; i++ {
c.PublishToQueue("orders", []byte(`{"id": "`+string(rune(i))+`"}`), "customer-1")
c.PublishToQueue("orders", []byte(`{"id": "`+string(rune(i))+`"}`))
}
select {} // Keep running
+7 -7
View File
@@ -1,6 +1,6 @@
# Competitive Analysis: Absmach MQTT vs Industry Solutions
**Last Updated:** 2026-01-12
**Last Updated:** 2026-02-03
This document provides a comprehensive comparison of the Absmach MQTT broker against major messaging solutions in the market.
@@ -22,8 +22,8 @@ This document provides a comprehensive comparison of the Absmach MQTT broker aga
| Session Migration | ✅ | ✅ | ✅ | N/A | ✅ | ❌ | N/A |
| **Persistence** |
| Message Persistence | ✅ (BadgerDB) | ✅ (RocksDB) | ✅ | ✅ (SQLite) | ✅ (JetStream) | ✅ | ✅ |
| Queue Replication | ✅ (Raft) | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ |
| Retention Policies | ✅ | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ |
| Queue Replication | ⚠️ Append-only Raft (WIP) | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ |
| Retention Policies | ⚠️ Committed-offset truncation only | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ |
| **Performance** |
| Zero-Copy | ✅ | ✅ | ✅ | ❌ | ✅ | ❌ | ✅ |
| Buffer Pooling | ✅ | ✅ | ✅ | ❌ | ✅ | ❌ | ✅ |
@@ -32,7 +32,7 @@ This document provides a comprehensive comparison of the Absmach MQTT broker aga
| TLS | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| mTLS | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Auth Plugins | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| Rate Limiting | | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ |
| Rate Limiting | | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ |
| RBAC | ⚠️ Interface only | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| **Observability** |
| Prometheus Metrics | ❌ (OTLP only) | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
@@ -81,7 +81,7 @@ This document provides a comprehensive comparison of the Absmach MQTT broker aga
- In-memory ring buffers for hot data
- BadgerDB (LSM-tree) for durability
- Raft replication for queue partitions
- Optional Raft layer for queue appends (WIP)
---
@@ -199,8 +199,8 @@ This document provides a comprehensive comparison of the Absmach MQTT broker aga
| ------------- | ---------------- | --------------- |
| Protocol | MQTT | Kafka Protocol |
| Message Model | Pub/Sub + Queues | Log-based |
| Ordering | Per-topic | Per-partition |
| Retention | Configurable | Log compaction |
| Ordering | Per-queue (single log) | Per-partition |
| Retention | Committed-offset truncation (time/size planned) | Log compaction |
| Use Case | IoT/Real-time | Event streaming |
**When to choose Absmach MQTT:** IoT devices, bidirectional communication, MQTT protocol.
+249 -8
View File
@@ -11,6 +11,8 @@ This document provides a comprehensive guide to configuring the MQTT broker for
- [Queue Configuration](#queue-configuration)
- [Storage Configuration](#storage-configuration)
- [Cluster Configuration](#cluster-configuration)
- [Rate Limiting Configuration](#rate-limiting-configuration)
- [Webhook Configuration](#webhook-configuration)
- [Logging Configuration](#logging-configuration)
- [Example Configurations](#example-configurations)
- [Best Practices](#best-practices)
@@ -37,12 +39,25 @@ server:
addr: ":5672"
amqp091:
plain:
addr: ":5673"
addr: ":5682"
# ...
health_enabled: true
health_addr: ":8081"
metrics_enabled: false
metrics_addr: "localhost:4317"
otel_service_name: "fluxmq"
otel_service_version: "1.0.0"
otel_metrics_enabled: true
otel_traces_enabled: false
otel_trace_sample_rate: 0.1
api_enabled: false
api_addr: ":9090"
broker:
# Broker behavior settings
max_message_size: 1048576
max_qos: 2
# ...
session:
@@ -65,6 +80,12 @@ cluster:
enabled: true
# ...
ratelimit:
enabled: false
webhook:
enabled: false
log:
# Logging settings
level: "info"
@@ -75,10 +96,10 @@ log:
```bash
# Use default configuration
./build/mqttd
./build/fluxmq
# Load from file
./build/mqttd --config /path/to/config.yaml
./build/fluxmq --config /path/to/config.yaml
```
## Server Configuration
@@ -119,6 +140,9 @@ server:
plain:
addr: ":8083"
path: "/mqtt"
allowed_origins:
- "https://app.example.com"
- "*.example.com"
tls:
addr: ":8084"
path: "/mqtt"
@@ -161,15 +185,26 @@ server:
# AMQP 0.9.1
amqp091:
plain:
addr: ":5673"
addr: ":5682"
max_connections: 10000
tls:
addr: ":5674"
addr: ":5681"
cert_file: "/path/to/server.crt"
key_file: "/path/to/server.key"
mtls: {}
shutdown_timeout: "30s"
health_enabled: true
health_addr: ":8081"
metrics_enabled: false
metrics_addr: "localhost:4317"
otel_service_name: "fluxmq"
otel_service_version: "1.0.0"
otel_metrics_enabled: true
otel_traces_enabled: false
otel_trace_sample_rate: 0.1
api_enabled: false
api_addr: ":9090"
```
AMQP listeners use the same schema as TCP (addr, max_connections, and TLS fields).
@@ -250,6 +285,9 @@ server:
plain:
addr: ":8083"
path: "/mqtt"
allowed_origins:
- "https://app.example.com"
- "*.example.com"
```
**Client Connection**:
@@ -258,6 +296,12 @@ server:
const client = mqtt.connect('ws://localhost:8083/mqtt');
```
**Origin Validation**:
- `allowed_origins` empty: allow all origins (development mode, warns in logs)
- `allowed_origins` set: only listed origins allowed
- `"*"`: explicit wildcard for all origins
- Supports wildcard subdomains like `"*.example.com"`
### HTTP Bridge Configuration
```yaml
@@ -279,6 +323,50 @@ curl -X POST http://localhost:8080/publish \
}'
```
### Health and Readiness
```yaml
server:
health_enabled: true
health_addr: ":8081"
```
Endpoints:
- `GET /health` liveness
- `GET /ready` readiness
- `GET /cluster/status` cluster status (single node returns `cluster_mode=false`)
### OpenTelemetry / Metrics
OpenTelemetry is enabled when `server.metrics_enabled` is `true`. Metrics and
traces are exported via OTLP/gRPC to `server.metrics_addr`.
```yaml
server:
metrics_enabled: false
metrics_addr: "localhost:4317"
otel_service_name: "fluxmq"
otel_service_version: "1.0.0"
otel_metrics_enabled: true
otel_traces_enabled: false
otel_trace_sample_rate: 0.1
```
Notes:
- `metrics_enabled` controls provider initialization.
- `otel_metrics_enabled` and `otel_traces_enabled` control what is emitted.
- `otel_trace_sample_rate` must be between 0.0 and 1.0.
### Queue API Server (Connect/gRPC)
The queue API server exposes Connect/HTTP and h2c endpoints for queue management.
```yaml
server:
api_enabled: false
api_addr: ":9090"
```
## Broker Configuration
Controls broker behavior and message handling.
@@ -289,6 +377,7 @@ broker:
max_retained_messages: 10000 # Max retained messages
retry_interval: "20s" # QoS retry interval
max_retries: 0 # 0 = infinite retries
max_qos: 2 # Max supported QoS (0-2)
```
### max_message_size
@@ -318,6 +407,8 @@ Maximum number of retained messages stored.
- Small deployments: 1000-10000
- Large deployments: 100000+
**Note**: This limit is not enforced in the current broker implementation.
### retry_interval
How often to retry unacknowledged QoS 1/2 messages.
@@ -327,6 +418,8 @@ How often to retry unacknowledged QoS 1/2 messages.
- Too long: Delayed delivery
- Typical: 10-30 seconds
**Note**: The broker currently uses a fixed 20s retry timeout; config wiring is planned.
### max_retries
Maximum retry attempts before giving up.
@@ -335,6 +428,13 @@ Maximum retry attempts before giving up.
- `0`: Infinite retries (recommended for reliability)
- `N`: Give up after N attempts
**Note**: Max retries are not currently enforced.
### max_qos
Maximum QoS level supported by the broker (0, 1, or 2). Higher client QoS
publishes are downgraded to this level.
## Session Configuration
Controls session lifecycle and queuing.
@@ -345,6 +445,7 @@ session:
default_expiry_interval: 300 # 5 minutes default expiry
max_offline_queue_size: 1000 # Max queued messages per session
max_inflight_messages: 100 # Max inflight QoS 1/2 per session
offline_queue_policy: "evict" # "evict" or "reject"
```
### max_sessions
@@ -378,7 +479,7 @@ Default session expiry for clients that don't specify.
Maximum messages queued for disconnected client.
**Behavior**:
- When limit reached, oldest messages discarded (FIFO)
- When limit reached, behavior is controlled by `offline_queue_policy`
- Prevents memory exhaustion from offline clients
- Only applies to QoS > 0 messages
@@ -400,6 +501,14 @@ Maximum unacknowledged QoS 1/2 messages per session.
- Standard: 1000
- Aggressive: 10000
### offline_queue_policy
Controls what happens when the offline queue is full.
**Values**:
- `evict`: Drop oldest messages and enqueue new ones
- `reject`: Reject new messages while preserving existing queue
## Queue Configuration
Queues are defined under `queues:` and bind **topic patterns** to durable queues.
@@ -450,6 +559,10 @@ Marks a queue as system-reserved (cannot be deleted via management APIs).
Per-queue limits, retry policy, and dead-letter behavior. If `dlq.topic` is
empty, it defaults to `$dlq/<queue-name>`.
**Note**: Queue limits, retry policy, and DLQ behavior are parsed into queue
configs but are not fully enforced at runtime yet. `message_ttl` is stored on
messages, but automatic expiry is not implemented.
## Storage Configuration
Selects the storage backend for persistence.
@@ -527,12 +640,31 @@ cluster:
client_addr: "127.0.0.1:2379" # etcd client address
initial_cluster: "node1=http://127.0.0.1:2380,node2=http://127.0.0.1:2480,node3=http://127.0.0.1:2580"
bootstrap: true # Bootstrap new cluster
hybrid_retained_size_threshold: 1024 # Bytes; smaller retained messages replicated via etcd
transport:
bind_addr: "127.0.0.1:7948" # gRPC listen address
peers: # Peer node addresses
node2: "127.0.0.1:7949"
node3: "127.0.0.1:7950"
tls_enabled: false
tls_cert_file: "/path/to/transport.crt"
tls_key_file: "/path/to/transport.key"
tls_ca_file: "/path/to/ca.crt"
raft:
enabled: false
replication_factor: 3
sync_mode: true
min_in_sync_replicas: 2
ack_timeout: "5s"
bind_addr: "127.0.0.1:7100"
data_dir: "/tmp/fluxmq/raft"
peers: {}
heartbeat_timeout: "1s"
election_timeout: "3s"
snapshot_interval: "5m"
snapshot_threshold: 8192
```
### enabled
@@ -540,8 +672,8 @@ cluster:
Enable or disable clustering.
**Values**:
- `false`: Single-node mode (default)
- `true`: Cluster mode
- `false`: Single-node mode
- `true`: Cluster mode (default in `config.Default()`)
### node_id
@@ -642,6 +774,12 @@ node3: bootstrap: true
# 2. Start new node with bootstrap: false
```
#### hybrid_retained_size_threshold
Threshold in bytes for hybrid retained storage:
- Messages smaller than this are replicated via etcd.
- Larger messages are stored on the owner node and fetched on-demand.
### transport Configuration
#### bind_addr
@@ -698,6 +836,109 @@ transport:
node3: "192.168.1.12:7948"
```
#### transport TLS
Enable mTLS for inter-broker transport:
```yaml
transport:
tls_enabled: true
tls_cert_file: "/path/to/transport.crt"
tls_key_file: "/path/to/transport.key"
tls_ca_file: "/path/to/ca.crt"
```
All three files are required when `tls_enabled` is `true`.
### Raft Configuration
Raft-based replication for **queue appends** is optional and disabled by default.
Consumer state and ack/nack/reject paths are not replicated yet.
```yaml
raft:
enabled: false
replication_factor: 3
sync_mode: true
min_in_sync_replicas: 2
ack_timeout: "5s"
bind_addr: "127.0.0.1:7100"
data_dir: "/tmp/fluxmq/raft"
peers:
node2: "127.0.0.1:7101"
node3: "127.0.0.1:7102"
heartbeat_timeout: "1s"
election_timeout: "3s"
snapshot_interval: "5m"
snapshot_threshold: 8192
```
## Rate Limiting Configuration
Rate limiting can be enabled per IP and per client.
```yaml
ratelimit:
enabled: false
connection:
enabled: true
rate: 1.6667 # connections per second per IP (100/min)
burst: 20
cleanup_interval: "5m"
message:
enabled: true
rate: 1000 # messages per second per client
burst: 100
subscribe:
enabled: true
rate: 100 # subscriptions per second per client
burst: 10
```
Behavior:
- Connection limits are enforced before MQTT handshake.
- Message limits return MQTT 5 `QuotaExceeded` for QoS > 0 and drop QoS 0.
- Subscribe limits return MQTT 5 `SubAckQuotaExceeded` or MQTT 3 `SubAckFailure`.
## Webhook Configuration
Webhook notifications are optional and disabled by default.
```yaml
webhook:
enabled: false
queue_size: 10000
drop_policy: "oldest"
workers: 5
include_payload: false
shutdown_timeout: "30s"
defaults:
timeout: "5s"
retry:
max_attempts: 3
initial_interval: "1s"
max_interval: "30s"
multiplier: 2.0
circuit_breaker:
failure_threshold: 5
reset_timeout: "60s"
endpoints:
- name: "analytics-service"
type: "http"
url: "https://analytics.example.com/mqtt/events"
events: ["message.published", "client.connected"]
topic_filters: ["sensors/#"]
headers:
Authorization: "Bearer token"
timeout: "10s"
retry:
max_attempts: 5
```
Notes:
- `include_payload` is accepted by config but payload inclusion is not yet wired in the notifier.
- Supported endpoint `type` is `"http"` only.
## Logging Configuration
Controls logging output.
+182 -1228
View File
File diff suppressed because it is too large Load Diff
+39 -120
View File
@@ -1,6 +1,6 @@
# MQTT Broker Development Roadmap
**Last Updated:** 2026-01-19
**Last Updated:** 2026-02-03
**Current Phase:** Phase Q - Queue Architecture Redesign (TOP PRIORITY)
---
@@ -11,14 +11,14 @@ Production-ready MQTT broker with focus on high performance, durability, and sca
### Current Status
- 🚨 **Phase Q: Queue Architecture Redesign** - TOP PRIORITY (Log-based model with work stealing)
- 🚨 **Phase Q: Queue Architecture Redesign** - IN PROGRESS (log-based queues in tree; retention/DLQ wiring and admin API pending)
- ⏸️ **Phase 0: Production Hardening** - PAUSED (Resume after Phase Q)
-**Phase 1: Performance Optimization** - COMPLETE (3.27x faster)
- **Phase 2: Queue Replication** - COMPLETE (~98%)
- Phase 2.1: Raft Infrastructure - COMPLETE
- Phase 2.2: Queue Integration - COMPLETE
- Phase 2.3: Testing & Optimization - COMPLETE (core tests done, benchmarks done)
- Phase 2.4: Retention Policies - COMPLETE (time/size retention + log compaction)
- **Phase 2: Queue Replication** - PARTIAL (Raft manager exists; append-only replication; consumer state not replicated)
- Phase 2.1: Raft Infrastructure - Implemented, not fully integrated
- Phase 2.2: Queue Integration - Append replication only
- Phase 2.3: Testing & Optimization - Not present in-tree
- Phase 2.4: Retention Policies - Not wired into runtime
- 📋 Phase 2.5: Observability & Migration - PLANNED (deferred)
-**Phase 3: E2E Cluster Testing** - PLANNED (after Phase 2)
- 📋 **Phase 4: Custom Raft** - FUTURE (50M+ clients only)
@@ -35,32 +35,10 @@ Phase Q redesigns the queue system from delete-on-ack to a log-based model inspi
Phase 0 must be completed before any production deployment. These are critical security vulnerabilities and operational gaps identified during code audit.
**Completed (2026-01-13):**
- ✅ Fixed Raft storage initialization (BadgerStableStore contract)
- ✅ Fixed partition port conflicts (per-partition bind addresses)
- ✅ Fixed message pool corruption (copy message for Raft FSM)
- ✅ Fixed MessageTTL=0 causing immediate expiration
-`TestReplication_BasicEnqueueDequeue` now passing
- ✅ Created comprehensive performance benchmarks (`replication_bench_test.go`)
-**Phase 2.3 Core Tests Complete:**
-`TestFailover_LeaderElection` - Leader failover <5s (2.4s achieved)
-`TestFailover_MessageDurability` - Messages survive leader failure
-`TestReplication_ISRTracking` - ISR tracking and quorum maintenance
-`TestFailover_FollowerCatchup` - Follower lag behavior (100 msg delta)
-**Phase 2.4 Retention Policies - COMPLETE:**
- ✅ RetentionPolicy schema added to QueueConfig
- ✅ RetentionManager core logic (400+ lines)
- ✅ Raft OpRetentionDelete operation
- ✅ MessageStore interface extended (5 new methods including ListAllMessages)
- ✅ Full retention implementations in memory and badger stores
- ✅ Manager integration (async size checks on enqueue)
- ✅ Partition worker integration (background time-based cleanup)
- ✅ Leader-only execution for replicated queues
-**Retention unit tests** (`retention_test.go`, `retention_manager_test.go`)
-**Retention integration tests** (`retention_integration_test.go`)
-**Log compaction implementation** (Kafka-style, keeps latest per key)
-**Compaction unit tests** (4 tests: basic, lag, no-key, not-configured)
-**Compaction integration tests** (2 tests: replication, leader-only)
**Status Note (2026-02-03):**
Queue replication and retention are partial in-tree (append-only Raft, committed-offset
truncation only). The detailed completion checklist previously listed here was
removed to avoid stale status. See `docs/queue.md` for current implementation notes.
**Completed (2026-01-16):**
-**Rate Limiting - COMPLETE:**
@@ -94,10 +72,11 @@ Remaining production hardening tasks:
## Phase Q: Queue Architecture Redesign 🚨 TOP PRIORITY
**Status:** PLANNING
**Status:** IN PROGRESS
**Goal:** Redesign queue system to log-based model with cursors, PEL, and work stealing
This phase fundamentally changes how queues work, moving from a delete-on-ack model to an append-only log model inspired by Kafka and Redis Streams. This enables wildcard queue subscriptions and improves performance and reliability.
Core log-based queue behavior exists in-tree; remaining work includes retention wiring, DLQ routing, and queue admin APIs.
---
@@ -881,55 +860,38 @@ func (b *Broker) setupSignalHandler() {
---
## Phase 2: Queue Replication ✅ COMPLETE
## Phase 2: Queue Replication ⏳ PARTIAL
**Status:** Completed 2026-01-14 | **Progress:** 98% (Phase 2.5 deferred)
**Status:** In progress | **Scope in tree:** Append-only Raft replication for queue appends; consumer state not replicated; retention policies not wired.
### Summary
### Current Implementation Notes
Raft-based per-partition replication with automatic failover:
- Each queue partition = independent Raft group (leader + replicas)
- Configurable replication factor (default: 3), sync/async modes
- ISR (In-Sync Replicas) tracking, leader-only delivery
- Kafka-style retention policies (time, size, log compaction)
### Benchmark Results
| Metric | Result | Target |
|--------|--------|--------|
| Sync mode throughput | >5K enqueues/sec | ✅ Met |
| Async mode throughput | >50K enqueues/sec | ✅ Met |
| Leader failover | 2.4s | ✅ <5s |
| P99 latency (sync) | <50ms | ✅ Met |
| P99 latency (async) | <10ms | ✅ Met |
- Single Raft group shared by all queues (`queue/raft/manager.go`)
- Append operations go through Raft only on the leader; non-leader appends are local
- Ack/Nack/Reject, cursor/PEL updates, and retention truncation are not replicated
- Benchmarks and failover tests for the Raft layer are not present in-tree
### Configuration Example
```yaml
queue:
replication:
cluster:
enabled: true
node_id: "node1"
raft:
enabled: true
replication_factor: 3
mode: sync # "sync" or "async"
sync_mode: true
min_in_sync_replicas: 2
retention:
time: 168h # 7 days
bytes: 10737418240 # 10GB max
compaction_enabled: false
compaction_key: "entity_id"
ack_timeout: 5s
bind_addr: "127.0.0.1:7100"
data_dir: "/tmp/fluxmq/raft"
peers:
node1: "127.0.0.1:7100"
node2: "127.0.0.1:7200"
node3: "127.0.0.1:7300"
```
### Sub-Phase Summary
| Phase | Status | Key Deliverable |
|-------|--------|-----------------|
| 2.1: Raft Infrastructure | ✅ | Core Raft + FSM + BadgerDB storage |
| 2.2: Queue Integration | ✅ | Replicated enqueue, leader-only delivery |
| 2.3: Testing | ✅ | Failover tests, benchmarks |
| 2.4: Retention | ✅ | Time/size retention, log compaction |
| 2.5: Observability | 📋 | Deferred (metrics, migration tooling) |
📖 **Details:** [Scaling & Performance Guide](scaling.md#queue-replication-benchmarks)
📖 **Details:** [Scaling & Performance Guide](scaling.md#queue-replication)
---
@@ -940,8 +902,8 @@ queue:
**Status:** Planned to start after Phase 2 completion
**Prerequisites:**
- Phase 2.3 queue replication tests complete
- Phase 2.4 observability metrics in place
- Queue Raft benchmarks and failover tests in tree
- Queue observability metrics in place
### Scope
@@ -1218,56 +1180,13 @@ Use **hashicorp/raft** library + **BadgerDB** storage:
## Overall Progress Summary
| Phase | Duration | Completion | Status |
|-------|----------|------------|--------|
| **Phase Q: Queue Architecture Redesign** | 6-10 weeks | 0% | 🚨 **TOP PRIORITY** |
| └─ Q.1: Storage Layer Changes | 1-2 weeks | 0% | 📋 Planned |
| └─ Q.2: Consumer Group Redesign | 1-2 weeks | 0% | 📋 Planned |
| └─ Q.3: Work Stealing | 1 week | 0% | 📋 Planned |
| └─ Q.4: Wildcard Subscriptions | 1 week | 0% | 📋 Planned |
| └─ Q.5: Delivery Integration | 1-2 weeks | 0% | 📋 Planned |
| └─ Q.6: Migration & Testing | 1-2 weeks | 0% | 📋 Planned |
| **Phase 0: Production Hardening** | 3-4 weeks | 50% | ⏸️ PAUSED |
| └─ 0.1: Critical Security Fixes | 1 week | 67% | ⏸️ Paused (2/3 complete) |
| └─ 0.2: Rate Limiting | 1 week | 100% | ✅ Complete |
| └─ 0.3: Observability Completion | 3-5 days | 0% | 📋 Planned (P2) |
| └─ 0.4: Protocol Compliance | 3-5 days | 100% | ✅ Complete |
| └─ 0.5: Management Dashboard | 2-3 weeks | 0% | 📋 Planned (P3) |
| └─ 0.6: Operational Readiness | 1 week | 0% | 📋 Planned (P3) |
| **Phase 1: Performance Optimization** | 2 weeks | 100% | ✅ Complete |
| **Phase 2: Queue Replication** | 6 weeks | 98% | ✅ **COMPLETE** |
| └─ 2.1: Raft Infrastructure | 1 week | 100% | ✅ Complete |
| └─ 2.2: Queue Integration | 1 week | 100% | ✅ Complete |
| └─ 2.3: Testing & Optimization | 1 week | 100% | ✅ Complete |
| └─ 2.4: Retention Policies | 2 weeks | 100% | ✅ **COMPLETE** |
| └─ 2.5: Observability & Migration | 2-3 weeks | 0% | 📋 Planned (deferred) |
| **Phase 3: E2E Cluster Testing** | 2-3 weeks | 0% | ⏳ Planned |
| **Phase 4: Custom Raft** | 20 weeks | N/A | 📋 Future (50M+ only) |
Status snapshot (2026-02-03):
- Phase Q: Log-based queues are in tree (consumer groups, PEL, work stealing, wildcard patterns). Remaining: retention wiring, DLQ routing, queue admin API, Raft state replication.
- Phase 2: Raft manager exists; append-only replication on leader; consumer state not replicated; tests/benchmarks not in tree.
- Phase 0.2 Rate limiting and CoAP DTLS/mDTLS are implemented (see sections above).
**Current Sprint:** Phase Q - Queue Architecture Redesign (TOP PRIORITY)
**Key Metrics:**
- ✅ 2,800+ lines of queue replication code complete
- ✅ 17+ unit tests passing (retention + compaction)
- ✅ 4/4 core failover tests passing (leader election, durability, ISR, catch-up)
- ✅ Performance benchmarks complete (sync/async modes, latency, concurrency)
-**Phase 0.2 Rate Limiting - COMPLETE:**
- ✅ Per-IP connection rate limiting (TCP, WebSocket)
- ✅ Per-client message/subscription rate limiting
- ✅ 12 unit tests (`ratelimit/ratelimit_test.go`)
-**CoAP with DTLS/mDTLS - COMPLETE:**
- ✅ Full UDP CoAP implementation (go-coap v3)
- ✅ DTLS with mutual TLS support (pion/dtls v3)
- ✅ 5 unit tests (`server/coap/server_test.go`)
-**Phase 2.4 Retention Policies - COMPLETE:**
- ✅ Retention infrastructure complete (schema, manager, Raft ops)
- ✅ Full storage implementations (memory & badger stores)
- ✅ Manager & partition worker integration complete
- ✅ Leader-only execution & async size checks
- ✅ Retention unit & integration tests (11 tests)
- ✅ Log compaction (Kafka-style, keeps latest per key)
- ✅ Compaction unit & integration tests (6 tests)
---
## Contributing
+32 -71
View File
@@ -227,11 +227,11 @@ The zero-copy optimization using reference-counted buffers provides significant
| **Throughput** | 4.6K msg/s | 14.9K msg/s | **3.24x increase** |
| **GC CPU Time** | 75% | ~40% | **35% reduction** |
📁 **Test file:** [broker/message_bench_test.go](file:///home/dusan/go/src/github.com/absmach/fluxmq/broker/message_bench_test.go)
📁 **Test file:** [mqtt/broker/message_bench_test.go](mqtt/broker/message_bench_test.go)
```bash
# Run this benchmark
go test -bench=BenchmarkMessagePublish_MultipleSubscribers/1000 -benchmem -benchtime=10s ./broker
go test -bench=BenchmarkMessagePublish_MultipleSubscribers/1000 -benchmem -benchtime=10s ./mqtt/broker
```
### Zero-Copy vs Legacy Performance
@@ -245,13 +245,13 @@ go test -bench=BenchmarkMessagePublish_MultipleSubscribers/1000 -benchmem -bench
**Key insight:** Larger messages see dramatically better improvements.
📁 **Test file:** [broker/message_bench_test.go](file:///home/dusan/go/src/github.com/absmach/fluxmq/broker/message_bench_test.go)
📁 **Test file:** [mqtt/broker/message_bench_test.go](mqtt/broker/message_bench_test.go)
```bash
# Run zero-copy comparison
make bench-zerocopy
# Or directly:
go test -bench=BenchmarkMessageCopy -benchmem ./broker
go test -bench=BenchmarkMessageCopy -benchmem ./mqtt/broker
```
### Single Subscriber Performance
@@ -266,7 +266,7 @@ go test -bench=BenchmarkMessageCopy -benchmem ./broker
**Key insight:** Memory usage is constant (~600 bytes) regardless of message size.
```bash
go test -bench=BenchmarkMessagePublish_SingleSubscriber -benchmem ./broker
go test -bench=BenchmarkMessagePublish_SingleSubscriber -benchmem ./mqtt/broker
```
### Multiple Subscribers Scalability
@@ -284,7 +284,7 @@ go test -bench=BenchmarkMessagePublish_SingleSubscriber -benchmem ./broker
- Zero-copy prevents O(N*MessageSize) memory usage
```bash
go test -bench=BenchmarkMessagePublish_MultipleSubscribers -benchmem ./broker
go test -bench=BenchmarkMessagePublish_MultipleSubscribers -benchmem ./mqtt/broker
```
### Fan-Out Pattern (1:N)
@@ -299,7 +299,7 @@ Publishing 256-byte messages (typical sensor data) to many subscribers:
| 1,000 | 253,930 | 3,940 | 424,370 |
```bash
go test -bench=BenchmarkMessagePublish_FanOut -benchmem ./broker
go test -bench=BenchmarkMessagePublish_FanOut -benchmem ./mqtt/broker
```
### Buffer Pool Performance
@@ -317,10 +317,10 @@ go test -bench=BenchmarkMessagePublish_FanOut -benchmem ./broker
- Pool provides consistent performance regardless of buffer size
- Zero allocations when pool is warm
📁 **Test file:** [mqtt/refbuffer_test.go](file:///home/dusan/go/src/github.com/absmach/fluxmq/mqtt/refbuffer_test.go)
📁 **Test file:** [mqtt/refbuffer_test.go](mqtt/refbuffer_test.go)
```bash
go test -bench=BenchmarkBufferPool -benchmem ./core
go test -bench=BenchmarkRefCountedBuffer -benchmem ./mqtt
```
### Router Performance
@@ -332,7 +332,7 @@ go test -bench=BenchmarkBufferPool -benchmem ./core
| Unsubscribe | 21.7 μs/op | 109 B/op | 4 |
| Wildcard matching | 212 ns/op | 199 B/op | 6 |
📁 **Test file:** [broker/router/router_bench_test.go](file:///home/dusan/go/src/github.com/absmach/fluxmq/broker/router/router_bench_test.go)
📁 **Test file:** [broker/router/router_bench_test.go](broker/router/router_bench_test.go)
```bash
go test -bench=. -benchmem ./broker/router
@@ -348,78 +348,39 @@ go test -bench=. -benchmem ./broker/router
| **Memory Pressure** | 64KB-1MB messages to 20 subs | <100MB total |
| **Extreme Fan-Out** | 1K msgs to 5K subs | 5M deliveries, <100 B/delivery |
📁 **Test file:** [broker/message_stress_test.go](file:///home/dusan/go/src/github.com/absmach/fluxmq/broker/message_stress_test.go)
📁 **Test file:** [mqtt/broker/message_stress_test.go](mqtt/broker/message_stress_test.go)
```bash
# Run all stress tests
make stress
# Run specific stress test
go test -v -run=TestStress_BufferPoolExhaustion ./broker
go test -v -run=TestStress_HighThroughput ./broker
go test -v -run=TestStress_ConcurrentPublishers ./broker
go test -v -run=TestStress_MemoryPressure ./broker
go test -v -run=TestStress_ExtremeFanOut ./broker
go test -v -run=TestStress_BufferPoolExhaustion ./mqtt/broker
go test -v -run=TestStress_HighThroughput ./mqtt/broker
go test -v -run=TestStress_ConcurrentPublishers ./mqtt/broker
go test -v -run=TestStress_MemoryPressure ./mqtt/broker
go test -v -run=TestStress_ExtremeFanOut ./mqtt/broker
# With race detector
go test -race -run=TestStress_ConcurrentPublishers ./broker
go test -race -run=TestStress_ConcurrentPublishers ./mqtt/broker
```
### Queue Performance
| Operation | Time | Memory | Allocs |
|-----------------------|------------|----------|--------|
| Enqueue (1 partition) | 1,459 ns/op | 776 B/op | 5 |
| Enqueue (10 partitions)| 1,519 ns/op | 795 B/op | 5 |
| Dequeue (1 consumer) | 337 μs/op | 5,645 B/op | 9 |
📁 **Test files:**
- [queue/enqueue_bench_test.go](file:///home/dusan/go/src/github.com/absmach/fluxmq/queue/enqueue_bench_test.go)
- [queue/dequeue_bench_test.go](file:///home/dusan/go/src/github.com/absmach/fluxmq/queue/dequeue_bench_test.go)
Queue-specific benchmarks are not yet included in the repository.
When benchmarks are added under `queue/` or `logstorage/`, this section should
be updated to reference them.
```bash
go test -bench=. -benchmem ./queue
go test ./queue ./logstorage
```
### Queue Replication Benchmarks
### Queue Replication
**Raft-based per-partition replication** with automatic failover (3 replicas per partition).
| Metric | Sync Mode | Async Mode | Notes |
|--------|-----------|------------|-------|
| **Throughput** | >5K enqueue/sec | >50K enqueue/sec | Per partition |
| **P99 Latency** | <50ms | <10ms | Write acknowledgment |
| **Leader Failover** | 2.4s | 2.4s | Target: <5s |
| **ISR Convergence** | <1s | <1s | After leader change |
**Failover Test Results:**
| Test | Result | Notes |
|------|--------|-------|
| `TestFailover_LeaderElection` | ✅ 2.4s | New leader elected, clients reconnect |
| `TestFailover_MessageDurability` | ✅ | Zero message loss after leader failure |
| `TestReplication_ISRTracking` | ✅ | Quorum maintained during failures |
| `TestFailover_FollowerCatchup` | ✅ | 100 msg delta caught up correctly |
**Retention Performance:**
| Operation | Time | Impact |
|-----------|------|--------|
| Size check (per enqueue) | <2ms | Async, no blocking |
| Time-based cleanup | <100ms | Per partition, 5min interval |
| Log compaction | <500ms | Per partition, 10min interval |
📁 **Test files:**
- [queue/replication_bench_test.go](file:///home/dusan/go/src/github.com/absmach/fluxmq/queue/replication_bench_test.go) - Performance benchmarks
- [queue/failover_test.go](file:///home/dusan/go/src/github.com/absmach/fluxmq/queue/failover_test.go) - Failover tests
```bash
# Run replication benchmarks
go test -bench=BenchmarkReplication -benchmem ./queue
# Run failover tests
go test -v -run=TestFailover ./queue
```
Raft replication is implemented for **queue appends only** and is configurable,
but consumer state (cursor/PEL) and ack/nack/reject paths are not replicated yet.
Non-leader nodes currently append locally when Raft is enabled. Benchmarks and
failover tests for the queue Raft layer are not currently present in-tree.
---
@@ -571,22 +532,22 @@ make stress
```bash
# All benchmarks
go test -bench=. -benchmem ./broker
go test -bench=. -benchmem ./mqtt/broker
# Specific benchmark
go test -bench=BenchmarkMessagePublish_FanOut -benchmem ./broker
go test -bench=BenchmarkMessagePublish_FanOut -benchmem ./mqtt/broker
# With specific subscriber count
go test -bench=BenchmarkMessagePublish_MultipleSubscribers/1000 -benchtime=10s ./broker
go test -bench=BenchmarkMessagePublish_MultipleSubscribers/1000 -benchtime=10s ./mqtt/broker
# Stress tests
go test -v -run=TestStress -timeout=30m ./broker
go test -v -run=TestStress -timeout=30m ./mqtt/broker
# Specific stress test
go test -v -run=TestStress_BufferPoolExhaustion ./broker
go test -v -run=TestStress_BufferPoolExhaustion ./mqtt/broker
# With race detector
go test -race -run=TestStress_ConcurrentPublishers ./broker
go test -race -run=TestStress_ConcurrentPublishers ./mqtt/broker
```
### Interpreting Results
+6 -3
View File
@@ -99,6 +99,9 @@ The MQTT broker provides a comprehensive webhook system for asynchronous event n
| `authz.publish_denied` | Publish authorization denied | clientID, topic, reason |
| `authz.subscribe_denied` | Subscribe authorization denied | clientID, topicFilter, reason |
Note: Message payloads are not currently included in webhook events. The
`payload` field is omitted regardless of `include_payload`.
**Event Envelope (Common Wrapper):**
```json
{
@@ -137,7 +140,7 @@ webhook:
queue_size: 10000
drop_policy: "oldest" # or "newest"
workers: 5
include_payload: false # Exclude message payloads by default
include_payload: false # Accepted by config, payload inclusion not yet wired
shutdown_timeout: 30s
```
@@ -386,7 +389,7 @@ webhook:
queue_size: 10000
drop_policy: "oldest"
workers: 5
include_payload: false
include_payload: false # Accepted by config, payload inclusion not yet wired
shutdown_timeout: 30s
defaults:
@@ -505,7 +508,7 @@ Empty `topic_filters` array = all topics.
| `queue_size` | int | 10000 | Max events in memory queue |
| `drop_policy` | string | "oldest" | "oldest" or "newest" when queue is full |
| `workers` | int | 5 | Number of concurrent worker goroutines |
| `include_payload` | bool | false | Include message payloads in events (base64 encoded) |
| `include_payload` | bool | false | Accepted by config, payload inclusion not yet wired |
| `shutdown_timeout` | duration | 30s | Graceful shutdown timeout |
### Default Settings
+3 -10
View File
@@ -22,7 +22,7 @@ echo ""
# Run key benchmarks with shorter time
echo -e "${YELLOW}1. Message Distribution (1000 subscribers)${NC}"
go test -bench="BenchmarkMessagePublish_MultipleSubscribers/1000" \
-benchtime=3s -benchmem ./broker 2>/dev/null | grep -E "Benchmark|ns/op"
-benchtime=3s -benchmem ./mqtt/broker 2>/dev/null | grep -E "Benchmark|ns/op"
echo ""
echo -e "${YELLOW}2. Router Performance${NC}"
@@ -30,16 +30,9 @@ go test -bench="BenchmarkRouter" \
-benchtime=3s -benchmem ./broker/router 2>/dev/null | grep -E "Benchmark|ns/op"
echo ""
echo -e "${YELLOW}3. Queue Operations${NC}"
go test -bench="BenchmarkEnqueue" \
-benchtime=3s -benchmem ./queue 2>/dev/null | grep -E "Benchmark|ns/op"
go test -bench="BenchmarkDequeue" \
-benchtime=3s -benchmem ./queue 2>/dev/null | grep -E "Benchmark|ns/op"
echo ""
echo -e "${YELLOW}4. Zero-Copy vs Legacy${NC}"
echo -e "${YELLOW}3. Zero-Copy vs Legacy${NC}"
go test -bench="BenchmarkMessageCopy" \
-benchtime=3s -benchmem ./broker 2>/dev/null | grep -E "Benchmark|ns/op"
-benchtime=3s -benchmem ./mqtt/broker 2>/dev/null | grep -E "Benchmark|ns/op"
echo ""
echo -e "${GREEN}✓ Quick benchmark complete${NC}"
+4 -10
View File
@@ -96,13 +96,9 @@ run_package_benchmarks "./mqtt/packets/v5" "MQTT v5 Packet Encoding/Decoding"
run_package_benchmarks "./broker/router" "Topic Router Performance"
# 3. Message distribution benchmarks
run_package_benchmarks "./broker" "Message Distribution & Broker Core"
run_package_benchmarks "./mqtt/broker" "Message Distribution & Broker Core"
# 4. Queue benchmarks
echo -e "${BLUE}Running Queue Benchmarks${NC}"
run_package_benchmarks "./queue" "Queue Operations"
# 5. Storage benchmarks (if they exist)
# 4. Storage benchmarks (if they exist)
if [ -d "$PROJECT_ROOT/storage/badger" ]; then
run_package_benchmarks "./storage/badger" "BadgerDB Storage Backend"
fi
@@ -118,11 +114,9 @@ echo -e "Running Profiled Benchmarks"
echo -e "======================================${NC}"
echo ""
run_with_profiling "./broker" "BenchmarkMessagePublish_MultipleSubscribers/1000" "message_fanout_1000"
run_with_profiling "./broker" "BenchmarkMessagePublish_FanOut" "message_fanout"
run_with_profiling "./mqtt/broker" "BenchmarkMessagePublish_MultipleSubscribers/1000" "message_fanout_1000"
run_with_profiling "./mqtt/broker" "BenchmarkMessagePublish_FanOut" "message_fanout"
run_with_profiling "./broker/router" "BenchmarkRouter" "router"
run_with_profiling "./queue" "BenchmarkEnqueue" "queue_enqueue"
run_with_profiling "./queue" "BenchmarkDequeue" "queue_dequeue"
# Generate summary
echo ""