sync docs content

Signed-off-by: Arvindh <arvindh91@gmail.com>
This commit is contained in:
Arvindh
2026-02-12 00:02:09 +05:30
parent 2aac2fa9d4
commit 8f8c52978e
49 changed files with 3484 additions and 545 deletions
@@ -0,0 +1,171 @@
---
title: Clustering
description: Embedded etcd metadata, gRPC routing, session takeover, and cluster message flow
---
# Clustering
**Last Updated:** 2026-02-07
FluxMQ clustering is designed around one idea: keep **coordination data** consistent across nodes, and keep **payload data** fast and local wherever possible.
This page explains how etcd, the inter-node transport, and the protocol brokers work together to route publishes, take over sessions, and (optionally) replicate durable queue logs.
## Two Planes: Metadata vs Data
In clustered mode there are two planes:
- **Metadata plane (etcd)**: “who owns what”, “who is subscribed”, “who is consuming”.
- **Data plane (gRPC transport)**: the actual routed messages and takeover payloads.
<Mermaid
chart="
flowchart TB
subgraph Metadata Plane
Own[Session ownership keys]
Subs[Subscription registry]
QCons[Queue consumer registry]
RIdx[Retained/will metadata]
end
subgraph Data Plane
RP[RoutePublish]
RQ[RouteQueueMessage]
TK[Takeover state transfer]
HF[Hybrid retained/will fetch]
end
Own --> TK
Subs --> RP
QCons --> RQ
RIdx --> HF"
/>
The system stays understandable if you keep this split in mind: etcd does not stream user traffic; it tells nodes where to send it.
## etcd Keyspace: What Lives Where
etcd is the **source of truth** for cluster metadata. FluxMQ maintains local in-memory caches (subscription and session-owner caches) to reduce etcd round trips; caches are kept up-to-date via etcd watches.
Key prefixes:
| Prefix | Meaning |
| --- | --- |
| `/mqtt/sessions/<client>/owner` | Session ownership (written with a lease so it expires on node death) |
| `/mqtt/subscriptions/<client>/<filter>` | Subscription registry for cross-node pub/sub routing |
| `/mqtt/queue-consumers/<queue>/<group>/<consumer>` | Queue consumer registry for cross-node queue delivery |
| `/mqtt/retained-data/*` and `/mqtt/retained-index/*` | Hybrid retained store (small payloads replicated; large payloads indexed) |
| `/mqtt/will-data/*` and `/mqtt/will-index/*` | Hybrid will store (same strategy as retained) |
| `/mqtt/leader` | Cluster leader election (used for coordination and visibility) |
## Session Ownership: Why It Exists
MQTT sessions are stateful (inflight QoS 1/2, offline queue, subscriptions, will). In a cluster, you need a single node to be “the owner” at any time so publishes, acks, and retained/will management dont split-brain.
Ownership is stored in etcd and written with a **lease**:
- If a node crashes, its lease expires and ownership keys disappear automatically.
- Nodes cache ownership locally for fast routing, but can fall back to etcd when needed.
## Session Takeover (MQTT): End-to-End Flow
Takeover happens when a client reconnects to a node that is not the current owner.
The goal: move session state from the old owner to the new owner, and guarantee that only one node continues the session.
<Mermaid
chart="
sequenceDiagram
participant C as Client
participant B as Node B (new)
participant E as etcd
participant A as Node A (old)
C->>B: CONNECT (client_id=X)
B->>E: Get /mqtt/sessions/X/owner
E-->>B: owner = A
B->>A: TakeoverRequest(X, from=A, to=B)
A->>A: Capture state (subs, inflight, offline, will)
A->>A: Close session locally
A-->>B: SessionState payload
B->>E: Put /mqtt/sessions/X/owner = B (lease)
B->>B: Restore state, resume session
B-->>C: CONNACK"
/>
Important details:
- The takeover request uses the gRPC transport, not etcd.
- Ownership is updated **after** the state transfer completes (so the new owner can safely overwrite).
- The old owner closes the session as part of preparing the state.
## Pub/Sub Routing Across Nodes
For “normal” pub/sub topics, the originating node does two things:
1. Deliver locally to matching subscriptions.
2. Forward to remote nodes that own sessions with matching subscriptions.
The routing decision is based on the subscription registry and the session-owner map.
Remote `RoutePublish` deliveries are dispatched to the local protocol broker by client ID namespace:
- MQTT clients: no protocol prefix
- AMQP 1.0 clients: `amqp:`
- AMQP 0.9.1 clients: `amqp091-`
<Mermaid
chart="
flowchart LR
P[Publish arrives on Node A] --> L[Local match + deliver]
P --> M[Match subscriptions local cache + etcd]
M --> O[Resolve owners cache + etcd]
O --> S[Send RoutePublish RPCs]
S --> B[Node B delivers locally]
S --> C[Node C delivers locally]"
/>
## Hybrid Retained and Will Storage
Retained messages and wills need to be available cluster-wide, but replicating large payloads through etcd is expensive.
FluxMQ uses a hybrid strategy (threshold configurable):
- **Small payloads**: store metadata + payload in etcd (replicated to all nodes).
- **Large payloads**: store payload in the owners local store; store metadata in etcd; fetch payload on-demand via gRPC.
<Mermaid
chart="
flowchart TB
Set[Set retained/will] --> Size{Payload < threshold?}
Size -->|yes| EtcdFull[Put data entry in etcd]
Size -->|no| Local[Write payload to local store]
Local --> EtcdMeta[Put metadata entry in etcd]
Get[Get retained/will] --> Cache[Check local store]
Cache -->|hit| Done[Return]
Cache -->|miss| Meta[Read metadata from cache]
Meta -->|replicated| EtcdFetch[Use replicated data]
Meta -->|remote large| RpcFetch[Fetch from owner via gRPC]"
/>
## Durable Queues Across Nodes
Queue consumers are registered cluster-wide so a node receiving a queue publish can find where consumption is happening.
Two distribution styles exist (configured via `cluster.raft.distribution_mode`):
- `forward`: the node that appends to the queue also delivers (or routes) messages to remote consumers.
- `replicate`: the queue log is replicated (Raft), so each node with consumers can deliver from its local log.
<Mermaid
chart="
flowchart LR
QP[Queue publish] --> A[Append locally or forward to leader]
A --> Mode{distribution_mode}
Mode -->|forward| R[RouteQueueMessage to remote nodes]
Mode -->|replicate| Raft[Replicate log to peers]
Raft --> LocalDeliver[Each node delivers locally]"
/>
## Configuration Entry Points
- Cluster basics: [Cluster configuration](/docs/configuration/clustering)
- Replication and tuning: [Configuration reference](/docs/reference/configuration-reference)
@@ -0,0 +1,10 @@
{
"title": "Architecture",
"pages": [
"overview",
"routing",
"webhooks",
"storage",
"clustering"
]
}
@@ -1,15 +1,15 @@
---
title: FluxMQ Architecture
description: Comprehensive system design overview covering layered architecture, protocol adapters, domain logic, and multi-protocol support
title: Overview
description: High-level system design overview covering core components and how they fit together
---
# FluxMQ Architecture
# Architecture Overview
**Last Updated:** 2026-02-05
**Last Updated:** 2026-02-10
## Overview
FluxMQ is a multi-protocol message broker built around a shared queue manager. MQTT transports (TCP, WebSocket, HTTP bridge, CoAP) share one MQTT broker instance, while AMQP 1.0 and AMQP 0.9.1 use dedicated brokers. Durable queues are protocol-agnostic and provide cross-protocol routing and fan-out.
FluxMQ is a multi-protocol message broker built around a shared queue manager. MQTT transports (TCP, WebSocket, HTTP bridge, CoAP) share one MQTT broker instance, while AMQP 1.0 and AMQP 0.9.1 use dedicated brokers. Queues are protocol-agnostic — all protocols route through the same queue manager, and delivery semantics depend on the [queue type](/docs/concepts/queues) (ephemeral, durable, or stream), not the protocol.
## High-Level View
@@ -36,7 +36,9 @@ FluxMQ is a multi-protocol message broker built around a shared queue manager. M
┌────────────────┐
│ Queue Manager │
│ (durable logs)
│ (ephemeral,
│ durable, │
│ stream) │
└──────┬─────────┘
┌────────────────┐
@@ -63,9 +65,10 @@ FluxMQ is a multi-protocol message broker built around a shared queue manager. M
- `server/amqp`, `server/amqp1` for AMQP listeners
- **Queue Manager**: `queue/` and `logstorage/`
- Append-only logs with consumer groups
- Queue and stream modes
- Ack/Nack/Reject support and retention policies
- Three queue types: ephemeral (in-memory, best-effort), durable (persistent work queue with PEL), stream (append-only log with cursor-based consumption)
- Shared routing layer — topic bindings, fan-out, consumer filters
- Delivery semantics depend on queue type, not protocol
- See [Queue Types](/docs/concepts/queues) for the full model
- **Storage**: `storage/` (BadgerDB and memory backends)
- Sessions, subscriptions, retained messages, offline queues
@@ -83,10 +86,25 @@ FluxMQ is a multi-protocol message broker built around a shared queue manager. M
- **Queue API (Connect/gRPC)**: `server/api/`, `server/queue/`
- Programmatic queue operations over HTTP/2 (h2c or TLS)
## Storage Overview
FluxMQ uses three storage layers, each optimized for a different job:
1. **Broker state storage** (`storage/`): Sessions, subscriptions, retained messages, wills, and offline queues. Backed by BadgerDB or in-memory for single-node mode.
2. **Queue log storage** (`logstorage/`): Append-only durable logs, consumer group state, and PEL tracking for queues.
3. **Cluster metadata** (embedded etcd): Session ownership, subscriptions, queue consumer registry, and hybrid retained/will metadata.
If you are debugging data persistence, start here:
1. `storage/` for MQTT session and retained/will state.
2. `logstorage/` for queue durability and retention behavior.
3. `cluster/etcd.go` for cross-node metadata and routing.
## Related Docs
- `docs/broker.md`
- `docs/queue.md`
- `docs/configuration.md`
- `docs/clustering.md`
- `docs/webhooks.md`
- [Routing internals](/docs/architecture/routing)
- [Webhooks](/docs/architecture/webhooks)
- [Storage internals](/docs/architecture/storage)
- [Clustering internals](/docs/architecture/clustering)
- [Durable queues](/docs/messaging/durable-queues)
- [Configuration reference](/docs/reference/configuration-reference)
@@ -0,0 +1,268 @@
---
title: Routing
description: Broker routing internals, session lifecycle, and protocol-specific state machines
---
# Routing Internals
**Last Updated:** 2026-02-07
FluxMQ ships three protocol brokers that share the same durable queue manager:
- **MQTT Broker**: `mqtt/broker/`
- **AMQP 1.0 Broker**: `amqp1/broker/`
- **AMQP 0.9.1 Broker**: `amqp/broker/`
Each broker has its own connection model and state machine, but they all converge on the same internal concepts:
- A **topic router** for pub/sub fan-out.
- A **queue manager** for durable queues and consumer groups.
- A **cluster router** for cross-node routing (when clustering is enabled).
This page focuses on the logic behind those routing decisions, and where special topic namespaces fit in.
## Topic Namespaces (Special Topics)
FluxMQ uses MQTT-style topics across protocols. Some topic namespaces have special meaning:
| Namespace | What It Means | Notes |
| --- | --- | --- |
| `$SYS/...` | Broker stats and health telemetry | Published periodically as retained messages. |
| `$queue/<name>/...` | Durable queue traffic | Routed to the queue manager (not normal pub/sub). |
| `$queue/.../$ack|$nack|$reject` | Queue acknowledgments | Consumers publish these to drive Ack/Nack/Reject. |
| `$share/<group>/<filter>` | Shared subscriptions (MQTT) | Load-balances pub/sub delivery inside a share group. |
| `$dlq/...` | Dead-letter namespace | A handler exists, but automatic DLQ routing is not fully wired yet. |
## Routing Decision (High Level)
At publish time, brokers classify traffic first, then take the appropriate path:
<Mermaid
chart="
flowchart TD
In[Incoming publish] --> Kind{Topic namespace?}
Kind -->|$queue/.../$ack| Ack[Queue Ack/Nack/Reject]
Kind -->|$queue/...| Q[Durable queue publish]
Kind -->|other| PS[Normal pub/sub publish]
Ack --> QM[Queue manager updates group state]
Q --> QM2[Queue manager appends to log]
PS --> Ret{Retain flag?}
Ret -->|yes| RStore[Retained store]
Ret -->|no| Route[Match subscriptions + deliver]
RStore --> Route
Route --> Cluster[Forward to remote nodes if clustered]"
/>
## MQTT Broker
**Responsibilities**
- Manage MQTT sessions (clean start, expiry, inflight tracking, offline queue)
- Route messages to local subscribers via the topic router
- Persist retained messages and wills
- Enforce QoS rules and MaxQoS downgrade
- Integrate with the queue manager for `$queue/` topics
- Integrate with clustering for cross-node routing and session takeover
- Emit webhook events (optional)
### MQTT Pub/Sub Routing
For “normal” topics (everything except `$queue/...`), MQTT routing is classic broker behavior:
- Subscribe registers a filter in the topic router (plus optional shared-subscription grouping).
- Publish matches filters and delivers to local sessions.
- In clustered mode, the broker forwards publishes to nodes that own matching sessions.
### Durable Queue Routing (`$queue/`)
Queue topics are not handled by the pub/sub router.
Instead, `$queue/<name>/...` is treated as a durable enqueue into the queue manager:
- The queue name is the first path segment after `$queue/`.
- The remainder of the topic is treated as the queue “routing key” and can be used for pattern filtering inside a queue.
- The queue manager stores a durable log entry and then delivers it to consumers in a consumer group.
### Queue Acknowledgments (`$ack/$nack/$reject`)
Queue deliveries include metadata that makes acknowledgments protocol-agnostic:
- `message-id` (formatted like `<queue>:<offset>`)
- `group-id` (the consumer group handling the delivery)
- `queue` (the queue name)
- `offset` (the sequence number)
To acknowledge a delivery over MQTT v5, publish to the delivery topic plus one of:
- `/$ack` to confirm processing
- `/$nack` to request redelivery
- `/$reject` to drop (DLQ handling is not fully wired yet)
The broker reads `message-id` and `group-id` from MQTT v5 user properties and applies the operation to the queue manager.
**Session Lifecycle (High Level)**
- CONNECT arrives over a transport (TCP or WebSocket)
- Broker creates or resumes the session based on `clean_start` and expiry
- Session state is restored from storage if needed
- In clustered mode, session ownership is acquired and takeover is handled
- On disconnect, offline queue is persisted (if session not expired)
### MQTT Session FSM
This is the actual session state model used by `mqtt/session/session.go`. The broker goes from `New` directly to `Connected` once a connection is attached. `StateConnecting` exists but is not currently set by the broker.
<Mermaid
chart="
stateDiagram-v2
[*] --> New
New --> Connected: CONNECT accepted
Connected --> Disconnecting: DISCONNECT or keepalive timeout
Disconnecting --> Disconnected: socket closed
Disconnected --> Connected: reconnect (resume or takeover)"
/>
**Message Routing (MQTT)**
Topic matching is trie-based. Retained messages are stored and replayed on subscribe. Shared subscriptions (`$share/...`) are grouped and then routed through the same matcher, but delivery within a share group is load-balanced.
## AMQP 1.0 Broker
**Responsibilities**
- Handle AMQP 1.0 protocol handshake (protocol header, optional SASL, OPEN)
- Manage sessions, links, and flow control windows
- Route non-queue links through the shared router
- Map queue links to the queue manager (capability-based or `$queue/` prefix)
### AMQP 1.0 Connection FSM (Broker Perspective)
<Mermaid
chart="
stateDiagram-v2
[*] --> ProtoHeader
ProtoHeader --> SASL: SASL header received
SASL --> ProtoHeader: SASL OK, read AMQP header
ProtoHeader --> Open: AMQP header exchange
Open --> Active: OPEN / OPEN-OK
Active --> Closing: CLOSE
Closing --> [*]"
/>
### AMQP 1.0 Link FSM (Simplified)
Links are the unit of subscription/publish. The broker treats a link as a queue link if the peer advertises the `queue` capability or if the address starts with `$queue/`.
<Mermaid
chart="
stateDiagram-v2
[*] --> Attached: ATTACH
Attached --> Active: FLOW (credit granted)
Active --> Active: TRANSFER / DISPOSITION
Active --> Detached: DETACH
Detached --> [*]"
/>
## AMQP 0.9.1 Broker
**Responsibilities**
- Handle AMQP 0.9.1 connection handshake and channel lifecycle
- Accumulate publish content frames (method -> header -> body)
- Route non-queue publishes through the shared router
- Map queue publishes and consumes to the queue manager
- Deliver cluster-routed messages to local AMQP 0.9.1 clients
### AMQP 0.9.1 Connection FSM (Handshake)
<Mermaid
chart="
stateDiagram-v2
[*] --> ProtocolHeader
ProtocolHeader --> Start: Connection.Start
Start --> Tune: Connection.StartOk
Tune --> Open: Connection.TuneOk
Open --> Active: Connection.Open / OpenOk
Active --> Closing: Connection.Close
Closing --> [*]"
/>
### AMQP 0.9.1 Publish Content FSM (Channel-Level)
The broker accumulates content frames in `amqp/broker/channel.go`. Any out-of-order or inconsistent frames trigger a channel close.
<Mermaid
chart="
stateDiagram-v2
[*] --> Idle
Idle --> Method: Basic.Publish
Method --> Header: ContentHeader
Header --> Body: ContentBody (one or many)
Body --> Idle: bodySize reached
Method --> Idle: header.BodySize == 0"
/>
**Queue Mapping (AMQP 0.9.1)**
AMQP 0.9.1 frames are assembled into a single publish, then routed as either:
- Pub/sub delivery (normal AMQP routing model inside FluxMQs broker adapter).
- Durable queue traffic (when the address/routing key is treated as queue-capable).
## Walkthroughs
**MQTT publish path**
<Mermaid
chart="
flowchart TD
Pub[MQTT PUBLISH] --> QueueCheck{Topic starts with $queue/ ?}
QueueCheck -->|yes| QueueMgr[Queue manager publish]
QueueCheck -->|no| Retained{Retain flag set?}
Retained -->|yes| RetainStore[Update retained store]
Retained -->|no| Route[Route to local subscribers]
RetainStore --> Route
Route --> Cluster[RoutePublish to remote nodes]"
/>
1. Broker receives PUBLISH and checks `$queue/` topics and queue acks.
2. Queue topics are forwarded to the queue manager (ack topics are handled separately).
3. Non-queue topics update retained state (if retain flag set).
4. Local router matches subscribers and delivers.
5. Cluster router forwards to remote nodes with matching subscriptions.
**AMQP 1.0 receive path (queue vs pub/sub)**
1. Client attaches a link. If it advertises the `queue` capability or uses `$queue/`, it is treated as a queue link.
2. Incoming transfers on queue links are converted into queue publishes.
3. Non-queue links are routed through the AMQP router and optionally forwarded via cluster.
**AMQP 0.9.1 publish path**
1. `Basic.Publish` sets the pending method.
2. A content header and body frames are assembled into a single message.
3. Default exchange + `$queue/` routing key goes to the queue manager.
4. Otherwise the message is routed to local AMQP subscribers.
5. In clustered mode, remote `RoutePublish` deliveries can target local AMQP 0.9.1 clients and are delivered through the same channel consumer path.
## Where To Look Next
- [Durable queues](/docs/messaging/durable-queues)
- [Storage internals](/docs/architecture/storage)
- [Clustering internals](/docs/architecture/clustering)
## Optional Subsystems
- **Auth/Authz**: pluggable interfaces in `broker/auth.go`
- **Rate limiting**: per-IP and per-client limits in `ratelimit/`
- **Webhooks**: event delivery via `broker/webhook/`
- **OTel metrics/tracing**: optional, configured via `server` settings
## Configuration Pointers
- `broker.*` for broker limits (max message size, max QoS, retry policy)
- `session.*` for session storage and offline queue limits
- `ratelimit.*` for rate limiting
- `webhook.*` for webhook delivery
See [Configuration reference](/docs/reference/configuration-reference) for full details.
@@ -0,0 +1,159 @@
---
title: Storage
description: Queue log storage internals, on-disk layout, indexes, and retention mechanics
---
# Storage Internals
**Last Updated:** 2026-02-05
FluxMQ durable queues are backed by a file-based append-only log (AOL). Each queue has a **single log** (no partitions). This doc explains where the files live, how they are laid out, and how retention truncates data.
## Where It Lives
By default, the queue log storage is created under:
```
<storage.badger_dir>/queue
```
This is wired in `cmd/main.go` when the queue manager is constructed.
## Storage Roles (How It Fits In)
FluxMQ separates responsibilities across storage layers:
1. `storage/` holds broker state (sessions, subscriptions, retained messages, wills).
2. `logstorage/` holds queue logs and consumer group state.
3. Cluster metadata (etcd) tracks ownership and routing, not payloads.
If you are only dealing with durable queues, `logstorage/` is the primary place to look.
## On-Disk Layout (Directory Tree)
```
<base>/queue/
config/
queues.json
groups/
<queue-name>/
<group-id>.json
queues/
<queue-name>/
segments/
00000000000000000000.seg
00000000000000000000.idx
00000000000000000000.tix
consumers/
<group-id>/
state.json
pel_0.json
pel_1.json
ops.log
```
<Mermaid
chart="
flowchart TB
Queue[Queue Log] --> Segments[segments/]
Segments --> Seg[**base**.seg]
Seg --> Idx[**base**.idx]
Seg --> Tix[**base**.tix]
Queue --> Consumers[consumers/_group_/]
Consumers --> State[state.json]
Consumers --> PEL[pel_**n**.json]
Consumers --> Ops[ops.log]
Meta[config/queues.json] --> Queue
GroupMeta[groups/**queue**/**group**.json] --> Queue"
/>
## Segment Files (`.seg`)
Segment files are named by **base offset** with a 20-digit, zero-padded filename:
```
00000000000000000000.seg
```
Each segment contains a sequence of **batches**. A batch header is 44 bytes:
| Field | Size | Notes |
| --- | --- | --- |
| Magic | 4 | `FLUX` (`0x464C5558`) |
| CRC | 4 | CRC over bytes after the CRC field |
| BaseOffset | 8 | First offset in batch |
| BatchLen | 4 | Length of encoded records |
| Count | 2 | Record count |
| Flags | 2 | Compression, keys, headers, timestamps |
| Compression | 1 | none, s2, zstd |
| Version | 1 | Batch format version |
| Reserved | 2 | Reserved |
| BaseTimestamp | 8 | Unix millis |
| MaxTimestamp | 8 | Unix millis |
Records are encoded as varints:
- offset delta (from batch base)
- timestamp delta (from base timestamp)
- key (length + bytes or `-1`)
- value (length + bytes or `-1`)
- headers (count + key/value pairs)
If compression is enabled and shrinks the payload, the records section is compressed (S2 by default).
## Offset Index (`.idx`)
Each segment has a sparse offset index:
- File name: `00000000000000000000.idx`
- Header (32 bytes): magic, version, base offset, entry count, index interval
- Entries (8 bytes each): relative offset (uint32) + file position (uint32)
The index is sparse and only adds entries every N bytes written (default 4KB).
## Time Index (`.tix`)
Each segment also has a time index:
- File name: `00000000000000000000.tix`
- Header (36 bytes): magic, version, base offset, entry count, min/max timestamps
- Entries (12 bytes each): timestamp (unix ms) + relative offset
Entries are only added when timestamps advance by at least 1 second. This keeps the time index small while still making time-based lookups efficient.
## Consumer State Files
For each consumer group:
- `state.json` stores ack floor, cursor, and delivery counters.
- `pel_<n>.json` stores pending entries (sharded for parallel redelivery).
- `ops.log` is an append-only operation log used for crash recovery and fast updates.
Compaction rewrites snapshots and truncates `ops.log` after a configurable number of operations.
## Queue and Group Metadata
- `config/queues.json` stores all queue configs in one versioned file.
- `groups/<queue>/<group>.json` stores the API-level group state (cursor, consumers, PEL map). The log storage adapter syncs cursor/PEL from the low-level consumer state when you read this file through the APIs.
## Retention and Truncation
Retention is enforced by the queue manager in a background loop:
1. Compute the minimum **committed** offset across queue-mode consumer groups.
2. Compute the oldest offset allowed by retention limits (time, size, message count).
3. Truncate the log to the **lowest safe offset**.
Truncation is **segment-granular**. Entire segments whose `NextOffset <= minOffset` are deleted. Partial segments are not split.
## Walkthrough: Append and Retention
1. Producer publishes to a queue topic.
2. Queue manager calls `Append` on the log store.
3. A batch is encoded and written to the active `.seg` file.
4. Sparse `.idx` and `.tix` entries are updated.
5. A retention loop computes a safe offset and deletes old segments.
## Recovery Tools
`logstorage/recovery.go` can validate or recover segments by scanning batch headers, verifying CRCs, and rebuilding `.idx`/`.tix` files. If corruption is detected at the tail, recovery truncates to the last valid batch.
@@ -0,0 +1,119 @@
---
title: Webhook System
description: Comprehensive webhook system for asynchronous event notifications with circuit breaker, retry logic, and flexible filtering
---
# Webhook System
**Last Updated:** 2026-02-05
FluxMQ can emit broker events to external HTTP endpoints using an asynchronous webhook notifier.
## Overview
- Asynchronous event queue with worker pool
- Retry with exponential backoff
- Circuit breaker per endpoint
- Filtering by event type and topic pattern
- HTTP sender only (gRPC sender not implemented)
## Architecture
```
Broker Events
Webhook Notifier (queue)
│ drop_policy: oldest/newest
Worker Pool
│ retry + circuit breaker
HTTP Sender
External Endpoints
```
## Event Types
Events are defined in `broker/events/events.go`.
- `client.connected`: `client_id`, `protocol`, `clean_start`, `keep_alive`, `remote_addr`
- `client.disconnected`: `client_id`, `reason`, `remote_addr`
- `client.session_takeover`: `client_id`, `from_node`, `to_node`
- `message.published`: `client_id`, `topic`, `qos`, `retained`, `payload_size`, `payload`
- `message.delivered`: `client_id`, `topic`, `qos`, `payload_size`
- `message.retained`: `topic`, `payload_size`, `cleared`
- `subscription.created`: `client_id`, `topic_filter`, `qos`, `subscription_id`
- `subscription.removed`: `client_id`, `topic_filter`
- `auth.success`: `client_id`, `remote_addr`
- `auth.failure`: `client_id`, `reason`, `remote_addr`
- `authz.publish_denied`: `client_id`, `topic`, `reason`
- `authz.subscribe_denied`: `client_id`, `topic_filter`, `reason`
### Payload Notes
The `message.published` payload field is defined in the event schema (base64-encoded when populated), but payload inclusion is not currently wired in the broker. `webhook.include_payload` is accepted in config, yet payloads are sent as empty strings at the moment.
## Event Envelope
```json
{
"event_type": "message.published",
"event_id": "550e8400-e29b-41d4-a716-446655440000",
"timestamp": "2026-02-05T12:00:00Z",
"broker_id": "broker-1",
"data": {
"client_id": "publisher-1",
"topic": "sensors/temperature",
"qos": 1,
"retained": false,
"payload_size": 256
}
}
```
## Filtering
Each endpoint can filter by:
- `events`: list of event types
- `topic_filters`: MQTT-style patterns (supports `+` and `#`)
## Retry and Circuit Breaker
- Retries use exponential backoff (`initial_interval * multiplier^attempt`), capped by `max_interval`
- Circuit breaker is per endpoint and trips after `failure_threshold` consecutive failures
## Configuration
```yaml
webhook:
enabled: true
queue_size: 10000
drop_policy: "oldest" # oldest or newest
workers: 5
include_payload: false
shutdown_timeout: "30s"
defaults:
timeout: "5s"
retry:
max_attempts: 3
initial_interval: "1s"
max_interval: "30s"
multiplier: 2.0
circuit_breaker:
failure_threshold: 5
reset_timeout: "60s"
endpoints:
- name: "analytics"
type: "http"
url: "https://example.com/webhook"
events: ["message.published"]
topic_filters: ["sensors/#"]
headers:
Authorization: "Bearer token"
```
-65
View File
@@ -1,65 +0,0 @@
---
title: Broker & Message Routing
description: Internal MQTT broker architecture, session management, message routing mechanisms, topic matching, QoS handling, and cluster integration
---
# MQTT Broker Internals
**Last Updated:** 2026-02-05
This document describes the MQTT broker implementation and how it integrates with queues and clustering. The broker code lives in `mqtt/broker/` and uses shared components in `broker/` (router, webhook events, interfaces).
## Responsibilities
- Manage MQTT sessions (clean start, expiry, inflight tracking, offline queue)
- Route messages to local subscribers via the topic router
- Persist retained messages and wills
- Enforce QoS rules and MaxQoS downgrade
- Integrate with the queue manager for `$queue/` topics
- Integrate with clustering for cross-node routing and session takeover
- Emit webhook events (optional)
## Session Lifecycle (High Level)
- CONNECT arrives over a transport (TCP or WebSocket)
- Broker creates or resumes session based on `clean_start` and expiry
- Session state is restored from storage if needed
- In clustered mode, session ownership is acquired and takeover is handled
- On disconnect, offline queue is persisted (if session not expired)
## Message Routing
- Topic matching uses a trie-based router in `broker/router/`
- Shared subscriptions (MQTT 5.0) are handled by the shared subscription manager
- Retained messages are stored in the retained store and delivered on subscribe
- Queue topics (`$queue/...`) are routed to the queue manager
- Queue acks (`$queue/.../$ack|$nack|$reject`) are handled separately and do not enter normal pub/sub routing
## QoS Handling
- QoS 0, 1, and 2 are supported
- Inflight tracking is persisted for QoS 1/2 sessions
- MaxQoS is enforced by downgrading requested QoS when above server limits
## Cluster Integration
- Session ownership is coordinated via the cluster layer
- Publishes are routed to remote nodes with matching subscriptions
- Retained and will messages are backed by cluster-aware stores
- Session takeover is supported when a client reconnects to another node
## Optional Subsystems
- **Auth/Authz**: pluggable interfaces in `broker/auth.go`
- **Rate limiting**: per-IP and per-client limits in `ratelimit/`
- **Webhooks**: event delivery via `broker/webhook/`
- **OTel metrics/tracing**: optional, configured via `server` settings
## Configuration Pointers
- `broker.*` for broker limits (max message size, max QoS, retry policy)
- `session.*` for session storage and offline queue limits
- `ratelimit.*` for rate limiting
- `webhook.*` for webhook delivery
See `docs/configuration.md` for full details.
@@ -1,9 +1,9 @@
---
title: Client Library
description: Pure Go client libraries for MQTT 3.1.1/5.0 and AMQP 0.9.1 with durable queue support, auto-reconnect, and comprehensive features
title: Go Client
description: Pure Go client library for MQTT 3.1.1/5.0 and AMQP 0.9.1 with durable queue support
---
# Client Library
# Go Client
Pure Go client libraries for MQTT 3.1.1/5.0 and AMQP 0.9.1 with durable queue support.
Note: FluxMQ also exposes AMQP 1.0 server support, but this repository does not
@@ -47,7 +47,10 @@ func main() {
log.Printf("Received: %s -> %s", topic, string(payload))
})
c := client.New(opts)
c, err := client.New(opts)
if err != nil {
log.Fatal(err)
}
// Connect
if err := c.Connect(); err != nil {
@@ -230,15 +233,26 @@ The client supports durable queues with consumer groups and message acknowledgme
Reject/DLQ wiring in the broker is pending.
MQTT v3 can publish and subscribe to queue topics, but acknowledgments require MQTT v5 user properties.
**When to use queues instead of regular pub/sub:**
- You need at-least-once processing with explicit acknowledgments
- Multiple consumers should share the workload (consumer groups)
- Failed messages need retry logic or dead-letter handling
Use queues when you want “workers processing a durable backlog” rather than “everyone who is subscribed gets a copy”.
**Key concepts:**
- **Queue**: Persistent message buffer with ordered delivery per queue (single log)
- **Consumer Group**: Multiple consumers share messages from the same queue
- **Acknowledgment**: Confirm success (Ack), request redelivery (Nack), or reject permanently (Reject)
On the wire, the Go client maps queue operations to MQTT v5 conventions:
- Publishing to queue `orders` becomes a publish to topic `$queue/orders`.
- Subscribing to a queue sets the consumer group with the MQTT v5 `consumer-group` user property.
- Acknowledgments publish to `$queue/<queue>/$ack|$nack|$reject` and attach `message-id` and `group-id` as MQTT v5 user properties.
<Mermaid
chart="
sequenceDiagram
participant App as Your app
participant Cli as Go client
participant Br as FluxMQ
App->>Cli: PublishToQueue(orders, payload)
Cli->>Br: PUBLISH $queue/orders
Br-->>Cli: Deliver message-id + group-id
App->>Cli: msg.Ack()
Cli->>Br: PUBLISH $queue/orders/$ack (user props)"
/>
### Publishing to Queues
@@ -308,6 +322,7 @@ c.UnsubscribeFromQueue("orders")
package main
import (
"fmt"
"log"
"github.com/absmach/fluxmq/client"
)
@@ -318,7 +333,10 @@ func main() {
SetClientID("order-processor").
SetProtocolVersion(5)
c := client.New(opts)
c, err := client.New(opts)
if err != nil {
log.Fatal(err)
}
if err := c.Connect(); err != nil {
log.Fatal(err)
}
@@ -343,7 +361,10 @@ func main() {
// Publish some orders
for i := 0; i < 10; i++ {
c.PublishToQueue("orders", []byte(`{"id": "`+string(rune(i))+`"}`))
order := fmt.Sprintf(`{"id":"%d"}`, i)
if err := c.PublishToQueue("orders", []byte(order)); err != nil {
log.Printf("publish failed: %v", err)
}
}
select {} // Keep running
@@ -0,0 +1,41 @@
---
title: HTTP
description: Publish messages over the HTTP bridge
---
# HTTP
**Last Updated:** 2026-02-05
FluxMQ includes an HTTP publish bridge.
## Enable
Set `server.http.plain.addr` (or TLS/MTLS) in your config.
## Publish
`POST /publish` with JSON body:
```json
{"topic":"sensors/temp","payload":"MjIuNQ==","qos":1,"retain":false}
```
`payload` is base64-encoded in JSON.
Example:
```bash
curl -sS -X POST http://localhost:8080/publish \
-H 'Content-Type: application/json' \
-d '{"topic":"sensors/temp","payload":"MjIuNQ==","qos":1,"retain":false}'
```
## Health
`GET /health` returns a simple status payload.
## Learn More
- [Publishing messages](/docs/messaging/publishing-messages)
- [Server configuration](/docs/configuration/server)
@@ -0,0 +1,9 @@
{
"title": "Clients",
"pages": [
"mqtt",
"http",
"go-client",
"websocket"
]
}
+111
View File
@@ -0,0 +1,111 @@
---
title: MQTT
description: Connect using MQTT 3.1.1 or 5.0 over TCP
---
# MQTT
**Last Updated:** 2026-02-06
FluxMQ supports MQTT 3.1.1 and MQTT 5.0 over TCP.
## Quick Start
```bash
mosquitto_sub -h localhost -p 1883 -t "sensors/#" -v
```
In another terminal:
```bash
mosquitto_pub -h localhost -p 1883 -t "sensors/temp" -m "22.5" -q 1
```
## MQTT v5 Consumer Groups (Queue Topics)
Use MQTT v5 `SUBSCRIBE` user property `consumer-group` for queue consumers:
```bash
mosquitto_sub -V mqttv5 -h localhost -p 1883 -u usr -P pwd \
-i worker-1 -q 1 -t '$queue/orders/#' -v \
-D subscribe user-property consumer-group workers
```
Start another worker in the same group:
```bash
mosquitto_sub -V mqttv5 -h localhost -p 1883 -u usr -P pwd \
-i worker-2 -q 1 -t '$queue/orders/#' -v \
-D subscribe user-property consumer-group workers
```
Publish messages to the queue:
```bash
mosquitto_pub -V mqttv5 -h localhost -p 1883 -u usr -P pwd \
-t '$queue/orders' -m '{"id":"order-1"}' -q 1
```
## Queue ACK/NACK/REJECT with `mosquitto_pub`
`mosquitto_sub` does not automatically send queue acknowledgments.
To ack (or nack/reject), publish to queue ack topics with MQTT v5 user properties:
```bash
# Ack
mosquitto_pub -V mqttv5 -h localhost -p 1883 -u usr -P pwd \
-t '$queue/orders/$ack' -m '' \
-D publish user-property message-id 'orders:42' \
-D publish user-property group-id 'workers'
# Nack (retry)
mosquitto_pub -V mqttv5 -h localhost -p 1883 -u usr -P pwd \
-t '$queue/orders/$nack' -m '' \
-D publish user-property message-id 'orders:42' \
-D publish user-property group-id 'workers'
# Reject (no retry)
mosquitto_pub -V mqttv5 -h localhost -p 1883 -u usr -P pwd \
-t '$queue/orders/$reject' -m '' \
-D publish user-property message-id 'orders:42' \
-D publish user-property group-id 'workers' \
-D publish user-property reason 'invalid payload'
```
`message-id` format is typically `<queue>:<offset>` (for example `orders:42`).
## Special Topics
| Topic | Purpose |
|-------|---------|
| `$queue/<queue>` | Publish to a durable queue root |
| `$queue/<queue>/<routing-key>` | Publish or consume queue messages with routing keys |
| `$queue/<queue>/#` | Subscribe to all messages in a queue |
| `$queue/<queue>/$ack` | Acknowledge a queue message |
| `$queue/<queue>/$nack` | Negative-acknowledge (retry) |
| `$queue/<queue>/$reject` | Reject (no retry) |
| `$share/<group>/<filter>` | MQTT shared subscription filter |
## Notes
- Use single quotes around `$queue/...` topics in shell commands to avoid `$` expansion.
- Queue consumer groups require MQTT v5 (`-V mqttv5`) if you want explicit `consumer-group` assignment.
- Reusing the same consumer group with different queue filters creates distinct internal groups per filter (`group@pattern` in logs).
- TLS and mTLS listeners are configured via `server.tcp.tls` and `server.tcp.mtls`.
- Shared subscriptions are supported.
## CLI Tip
To inspect SUBSCRIBE/PUBLISH packets and reason codes during troubleshooting, add `-d`:
```bash
mosquitto_sub -d -V mqttv5 -h localhost -p 1883 -t '$queue/orders/#' \
-D subscribe user-property consumer-group workers
```
## Learn More
- [Server configuration](/docs/configuration/server)
- [Publishing messages](/docs/messaging/publishing-messages)
- [Consuming messages](/docs/messaging/consuming-messages)
- [Consumer groups](/docs/messaging/consumer-groups)
@@ -0,0 +1,26 @@
---
title: WebSocket
description: Connect with MQTT over WebSocket
---
# WebSocket
**Last Updated:** 2026-02-05
FluxMQ supports MQTT over WebSocket. Configure the listener under `server.websocket.*`.
## Default Path
The default WebSocket path is `/mqtt`.
Example URL:
```
ws://localhost:8083/mqtt
```
Use any MQTT-over-WebSocket client library and point it to the WebSocket URL.
## Learn More
- [Server configuration](/docs/configuration/server)
-60
View File
@@ -1,60 +0,0 @@
---
title: Clustering
description: Distributed broker clustering with embedded etcd, gRPC transport, session takeover, and high availability architecture
---
# Clustering
**Last Updated:** 2026-02-05
FluxMQ supports optional clustering for high availability and cross-node routing. Clustering is embedded and uses etcd for metadata coordination plus gRPC for inter-broker transport.
## What Clustering Provides
- **Session ownership** across nodes
- **Subscription routing** for cross-node publishes
- **Queue consumer registry** for cross-node queue delivery
- **Retained/will storage** with a hybrid metadata + payload strategy
- **Session takeover** when a client reconnects to a different node
## Core Components
- **Embedded etcd**: stores session ownership, subscriptions, queue consumers, retained metadata
- **Transport (gRPC)**: routes publishes, queue deliveries, retained/will fetches
- **Optional Raft for queues**: configurable replication for queue appends
## Configuration (Minimal)
```yaml
cluster:
enabled: true
node_id: "broker-1"
etcd:
data_dir: "/tmp/fluxmq/etcd"
bind_addr: "0.0.0.0:2380"
client_addr: "0.0.0.0:2379"
initial_cluster: "broker-1=http://0.0.0.0:2380"
bootstrap: true
transport:
bind_addr: "0.0.0.0:7948"
peers: {}
```
For full cluster options (TLS, Raft, hybrid retained settings), see `docs/configuration.md`.
## Message Routing (High Level)
- On publish, the broker routes to local subscribers and calls the cluster router to forward to remote nodes with matching subscriptions.
- Retained and will messages are stored in a cluster-aware store with metadata in etcd and payloads stored locally for larger messages.
- When a client reconnects to a different node, the new node requests session takeover from the previous owner.
## Queue Delivery Across Nodes
Queue consumers are registered in the cluster. When a queue publish occurs, the queue manager determines which nodes host matching consumers and forwards delivery to those nodes.
## Notes
- Clustering is optional; single-node mode uses in-memory or BadgerDB storage.
- Queue replication is optional and controlled via `cluster.raft`.
+58
View File
@@ -0,0 +1,58 @@
---
title: Comparison
description: Comprehensive comparison of FluxMQ against industry solutions including EMQX, HiveMQ, Artemis, Mosquitto, NATS, RabbitMQ, and Kafka
---
# Comparison Guide
**Last Updated:** 2026-02-05
This document provides a basic, evergreen comparison guide without hard claims about thirdparty products. Vendor feature sets change frequently; verify details with official docs before making a decision.
## FluxMQ Snapshot (from current codebase)
- **Protocols**: MQTT 3.1.1/5.0 (TCP, WebSocket), HTTP publish bridge, CoAP publish bridge, AMQP 1.0 and AMQP 0.9.1
- **Queues**: Durable queues, consumer groups, ack/nack/reject; stream queues supported; retention policies (time/size/message count)
- **DLQ**: Handler exists, automatic DLQ routing not wired yet
- **Clustering**: Embedded etcd coordination, gRPC routing, session takeover, hybrid retained storage, optional Raft for queue appends
- **Storage**: BadgerDB or inmemory; pluggable interfaces
- **Security**: TLS/mTLS, WebSocket origin validation, rate limiting
- **Observability**: OpenTelemetry metrics/tracing (OTLP); no native Prometheus endpoint
- **Clients**: Go MQTT client, Go AMQP 0.9.1 client (AMQP 1.0 client not provided)
## How to Compare Systems (by category)
### MQTTNative Brokers
Best fit when:
- MQTT 5.0 features and device connectivity are the primary focus
- Edge/IoT workloads need lightweight clients and topicbased routing
### QueueCentric Brokers
Best fit when:
- Workqueue semantics and explicit acknowledgments are primary
- Finegrained retry/DLQ handling is critical
### LogCentric Brokers
Best fit when:
- Longterm event retention and replay are primary
- Stream processing and analytics require strong log semantics
### MultiProtocol Brokers
Best fit when:
- You need multiple client protocols in a single deployment
- Crossprotocol routing and shared durability are important
## Decision Checklist
- Protocols you must support (MQTT, AMQP, HTTP, CoAP)
- Delivery guarantees required (QoS 0/1/2, atleastonce, exactlyonce)
- Durability and retention needs (time/size retention, replay)
- Operational complexity you can tolerate (single binary vs external dependencies)
- Cluster behavior (session takeover, routing, replication strategy)
- Observability requirements (OTLP, Prometheus, tracing)
- Client library availability and ecosystem fit
## Notes
- FluxMQ is still evolving. For current status of features and limitations, see [Durable queues](/docs/messaging/durable-queues), [Configuration reference](/docs/reference/configuration-reference), and [Roadmap](/docs/roadmap).
- Benchmarking should be done on your target hardware using `benchmarks/`.
+3 -55
View File
@@ -1,58 +1,6 @@
---
title: Comparison
description: Comprehensive comparison of FluxMQ against industry solutions including EMQX, HiveMQ, Artemis, Mosquitto, NATS, RabbitMQ, and Kafka
title: Competition
description: This page has moved
---
# Comparison Guide
**Last Updated:** 2026-02-05
This document provides a basic, evergreen comparison guide without hard claims about thirdparty products. Vendor feature sets change frequently; verify details with official docs before making a decision.
## FluxMQ Snapshot (from current codebase)
- **Protocols**: MQTT 3.1.1/5.0 (TCP, WebSocket), HTTP publish bridge, CoAP publish bridge, AMQP 1.0 and AMQP 0.9.1
- **Queues**: Durable queues, consumer groups, ack/nack/reject; stream queues supported; retention policies (time/size/message count)
- **DLQ**: Handler exists, automatic DLQ routing not wired yet
- **Clustering**: Embedded etcd coordination, gRPC routing, session takeover, hybrid retained storage, optional Raft for queue appends
- **Storage**: BadgerDB or inmemory; pluggable interfaces
- **Security**: TLS/mTLS, WebSocket origin validation, rate limiting
- **Observability**: OpenTelemetry metrics/tracing (OTLP); no native Prometheus endpoint
- **Clients**: Go MQTT client, Go AMQP 0.9.1 client (AMQP 1.0 client not provided)
## How to Compare Systems (by category)
### MQTTNative Brokers
Best fit when:
- MQTT 5.0 features and device connectivity are the primary focus
- Edge/IoT workloads need lightweight clients and topicbased routing
### QueueCentric Brokers
Best fit when:
- Workqueue semantics and explicit acknowledgments are primary
- Finegrained retry/DLQ handling is critical
### LogCentric Brokers
Best fit when:
- Longterm event retention and replay are primary
- Stream processing and analytics require strong log semantics
### MultiProtocol Brokers
Best fit when:
- You need multiple client protocols in a single deployment
- Crossprotocol routing and shared durability are important
## Decision Checklist
- Protocols you must support (MQTT, AMQP, HTTP, CoAP)
- Delivery guarantees required (QoS 0/1/2, atleastonce, exactlyonce)
- Durability and retention needs (time/size retention, replay)
- Operational complexity you can tolerate (single binary vs external dependencies)
- Cluster behavior (session takeover, routing, replication strategy)
- Observability requirements (OTLP, Prometheus, tracing)
- Client library availability and ecosystem fit
## Notes
- FluxMQ is still evolving. For current status of features and limitations, see `docs/queue.md`, `docs/configuration.md`, and `docs/roadmap.md`.
- Benchmarking should be done on your target hardware using `benchmarks/`.
This page has moved to [Comparison](/docs/comparison).
@@ -0,0 +1,33 @@
---
title: Broker
description: What the broker does in FluxMQ and how protocol brokers fit together
---
# Broker
**Last Updated:** 2026-02-10
FluxMQ runs multiple protocol brokers that share the same queue manager:
- **MQTT broker** for MQTT 3.1.1/5.0 over TCP and WebSocket
- **AMQP 1.0 broker**
- **AMQP 0.9.1 broker**
Each broker owns its protocol state machine, but all queue-capable traffic flows into the shared queue manager. Protocol adapters translate protocol-specific concepts (AMQP exchanges/bindings, MQTT shared subscriptions, AMQP 1.0 link capabilities) into FluxMQ queue primitives without fabricating behavior that the underlying queue type doesn't support.
Delivery semantics depend on the [queue type](/docs/concepts/queues), not the protocol. An MQTT client and an AMQP 0.9.1 client consuming from the same durable queue get the same ack/nack/reject behavior. An AMQP 1.0 client consuming from a stream queue gets cursor-based semantics regardless of the protocol's native disposition model.
## What the Broker Handles
- Session lifecycle (connect, resume, takeover)
- Topic routing and shared subscriptions
- Retained messages and wills
- QoS enforcement and retry policies
- Queue integration for `$queue/` topics
- Protocol-to-queue translation (adapting protocol semantics to queue primitives)
## Learn More
- [Queue Types](/docs/concepts/queues)
- [Architecture overview](/docs/architecture/overview)
- [Routing internals](/docs/architecture/routing)
@@ -0,0 +1,22 @@
---
title: Clustering
description: High availability and cross-node routing basics
---
# Clustering
**Last Updated:** 2026-02-05
Clustering enables high availability and cross-node routing. FluxMQ uses embedded etcd for metadata coordination and a gRPC transport for inter-node message delivery.
## What You Get
- Session ownership and takeover
- Cross-node subscription routing
- Queue consumer registry across nodes
- Optional Raft replication for queues
## Learn More
- [Running a cluster](/docs/deployment/running-cluster)
- [Clustering internals](/docs/architecture/clustering)
@@ -0,0 +1,84 @@
---
title: Consumer Groups
description: Load-balanced queue consumption and acknowledgment behavior
---
# Consumer Groups
**Last Updated:** 2026-02-10
Consumer groups are how FluxMQ turns a queue log into a scalable worker pool. They apply to both durable and stream queues (ephemeral queues do not use consumer groups — delivery is best-effort with no progress tracking).
At a glance:
- One queue can have many groups (each group has independent progress).
- One group can have many consumers (messages are load-balanced within the group).
- Groups track progress using offsets (cursor/committed) and, in classic mode, a pending list (PEL).
- The group **mode** (classic or stream) determines acknowledgment semantics — see below.
## Group Modes: Classic vs Stream
FluxMQ supports two consumer group modes:
- **Classic (work queue)**: deliveries are claimed and tracked in a PEL; acks control safe progress.
- **Stream**: consumers read sequentially from an append-only log and advance a cursor.
## Progress Tracking (Cursor, Committed, PEL)
Each group keeps two offsets:
- `cursor`: the next offset to deliver
- `committed`: the safe floor used for retention/truncation
Classic groups also keep a **PEL** (Pending Entry List): messages that were delivered but not yet acknowledged.
Intuition:
- Claiming work moves the cursor forward and adds entries to the PEL.
- Acknowledging work removes entries from the PEL.
- The committed offset advances when the earliest pending work is cleared.
## Group Names in Logs (`group@pattern`)
FluxMQ stores queue consumer groups internally as:
- `<consumer-group>@<subscription-pattern>` when a pattern exists
- `<consumer-group>` when no pattern exists
This lets one logical group keep separate progress for different filters on the same queue.
Examples:
- Subscribe to `$queue/demo-events/#` with `consumer-group=demo-workers` -> internal group `demo-workers@#`
- Subscribe to `$queue/demo-events/orders/+` with `consumer-group=demo-workers` -> internal group `demo-workers@orders/+`
- Subscribe to `$queue/demo-events` (no trailing filter) with `consumer-group=demo-workers` -> internal group `demo-workers`
So seeing `group=demo-events@#` in logs means:
- effective group id is `demo-events`
- subscription pattern is `#` for that queue
## Acknowledgment Semantics
Ack/nack/reject mean different things depending on the queue type. Do not assume they are interchangeable.
### Durable Queue (Classic Mode)
- **Ack**: removes the message from the PEL and advances the committed offset. The message becomes eligible for log truncation.
- **Nack**: resets the PEL entry so the message is redelivered (immediately stealable by other consumers).
- **Reject**: removes from PEL without redelivery. The message is discarded (future: routes to DLQ).
### Stream Queue (Stream Mode)
- **Ack**: advances the committed offset (checkpoint). The message remains in the log — retention is policy-based, not ack-based.
- **Nack**: no-op. Stream consumers replay by seeking the cursor to a previous offset, not by nacking individual messages.
- **Reject**: advances the cursor past the message to prevent an infinite redelivery loop. The message stays in the log.
### Ephemeral Queue
Ephemeral queues do not use consumer groups or acknowledgments. Delivery is best-effort — messages are pushed to connected consumers and not tracked.
## Learn More
- [Consumer groups](/docs/messaging/consumer-groups)
- [Durable queues](/docs/messaging/durable-queues)
@@ -0,0 +1,11 @@
{
"title": "Concepts",
"pages": [
"broker",
"topics",
"queues",
"consumer-groups",
"storage",
"clustering"
]
}
@@ -0,0 +1,102 @@
---
title: Queue Types
description: Three queue types with distinct semantics — ephemeral, durable, and stream
---
# Queue Types
**Last Updated:** 2026-02-10
FluxMQ supports three distinct queue types. They share the same routing layer and topic namespace (`$queue/`), but differ in persistence, delivery guarantees, and acknowledgment semantics.
**Do not try to unify them.** Each type exists because it solves a different problem. If a use case needs both task semantics and replay, use two queues — one durable, one stream.
## Overview
| | Ephemeral | Durable | Stream |
| --------------------------- | --------------------------- | --------------------------------- | ------------------------------ |
| **Storage** | In-memory only | Persistent log | Persistent append-only log |
| **Delivery model** | Best-effort push | Push with ack/nack/reject | Cursor-based pull |
| **Message lifecycle** | Fire-and-forget | Removed on ack | Immutable, retained by policy |
| **Ack means** | N/A | "Message processed, remove it" | "Checkpoint my read position" |
| **Nack means** | N/A | "Retry this message" | No-op (replay via cursor seek) |
| **Persistence** | None | Yes | Yes |
| **Replay** | No | No | Yes (seek to any offset) |
| **Retention** | None (lost on disconnect) | Truncate after ack | Time/size/count-based |
| **Optimized for** | Low latency, transient data | Task processing, failure handling | Event streams, replay, audit |
| **Message loss acceptable** | Yes | No | No |
## Ephemeral Queue
Ephemeral queues are auto-created when a publish targets a `$queue/` topic that doesn't match any configured queue. They are in-memory, best-effort, and cleaned up after the last consumer disconnects (plus a short grace period).
- No persistence — messages are lost on broker restart
- No replay — once delivered, messages are gone
- Useful as a development convenience or for transient data where loss is acceptable
- Production deployments should define queues explicitly in config
Ephemeral queues are a safety net, not a primary design target. They prevent "publish to nowhere" from silently dropping data, but they don't provide durability or delivery guarantees.
## Durable Queue
Durable queues are persistent work queues optimized for task processing:
- Messages are appended to a durable log and delivered to consumer groups
- Consumers **ack** to confirm processing — the message is then eligible for truncation
- Consumers **nack** to request redelivery (with backoff and retry limits)
- Consumers **reject** to discard a message (future: route to dead-letter queue)
- Unacknowledged messages are tracked in a **Pending Entry List (PEL)** with visibility timeouts
- **Work stealing** rebalances pending work from slow consumers to idle ones
- Retention is driven by acknowledgment state — once all groups have acked past an offset, the log can be truncated
Use durable queues when every message must be processed at least once, failures must be retried, and message loss is not acceptable.
See [Durable Queues](/docs/messaging/durable-queues) for configuration, addressing, and protocol-specific usage.
## Stream Queue
Stream queues are persistent append-only logs optimized for event streaming:
- Messages are **immutable** — they are never modified or deleted by consumers
- Consumers track progress with a **cursor** (read position in the log)
- **Ack** means "checkpoint" — advance the committed offset, not delete the message
- **Nack** is a no-op — replay is done by seeking the cursor, not by redelivering individual messages
- Multiple independent consumer groups can read the same stream at different positions
- Consumers can seek to any offset or timestamp to replay history
- Retention is policy-based (time, size, message count) — independent of consumer progress
Use stream queues for event sourcing, audit logs, change data capture, analytics pipelines, and any scenario where you need replay, backfill, or multiple independent readers over the same data.
See [Durable Queues — Stream](/docs/messaging/durable-queues#stream) for configuration, cursor positioning, and commit control.
## Shared Routing Layer
All three queue types use the same routing infrastructure:
- **Topic namespace**: `$queue/<name>/...` with MQTT wildcard matching (`+`, `#`)
- **Fan-out**: A single publish can match multiple queues based on their topic bindings
- **Consumer filters**: Subscribers can apply sub-patterns within a queue for fine-grained routing
- **Protocol-agnostic**: MQTT, AMQP 1.0, and AMQP 0.9.1 all route through the same queue manager
Routing determines **which queue** a message is stored in. Delivery semantics depend on **queue type**, not protocol. An MQTT client and an AMQP client consuming from the same durable queue get the same ack/nack/reject behavior.
See [Topics](/docs/concepts/topics) for topic structure and wildcard rules.
## Design Rules
These rules guide FluxMQ's queue design:
1. **Do not assume streams replace queues.** Streams and durable queues solve different problems. A task processor needs ack-based lifecycle control. An event consumer needs cursor-based replay. Using streams for task processing (or durable queues for event replay) will fight the semantics.
2. **Do not overload ack semantics across queue types.** "Ack" means "processed, remove it" in a durable queue and "checkpoint my position" in a stream. These are fundamentally different operations and the broker treats them accordingly.
3. **Prefer correctness and explicit trade-offs over abstraction purity.** Each queue type has its own code path for delivery, acknowledgment, and retention. This is intentional — collapsing them into a single abstraction would hide important behavioral differences.
4. **If a use case needs both task semantics and replay, use two queues.** Publish the same messages to a durable queue (for processing) and a stream queue (for audit/replay). The shared routing layer makes this a one-line config change via overlapping topic bindings.
## Learn More
- [Durable Queues](/docs/messaging/durable-queues) — configuration, addressing, acknowledgments, and protocol-specific usage
- [Consumer Groups](/docs/concepts/consumer-groups) — group modes, progress tracking, and ack semantics per queue type
- [Topics](/docs/concepts/topics) — topic structure, wildcards, and special namespaces
- [Storage](/docs/concepts/storage) — storage layers for broker state, queue logs, and cluster metadata
@@ -0,0 +1,19 @@
---
title: Storage
description: Storage layers for broker state, queue logs, and cluster metadata
---
# Storage
**Last Updated:** 2026-02-05
FluxMQ separates storage concerns into three layers:
1. **Broker state** (`storage/`): sessions, subscriptions, retained messages, wills, offline queues.
2. **Queue logs** (`logstorage/`): append-only logs for durable queues and consumer groups.
3. **Cluster metadata** (embedded etcd): ownership and routing metadata.
## Learn More
- [Storage internals](/docs/architecture/storage)
- [Clustering internals](/docs/architecture/clustering)
@@ -0,0 +1,54 @@
---
title: Topics
description: Topic names, filters, and wildcard matching rules
---
# Topics
**Last Updated:** 2026-02-05
FluxMQ uses MQTT-style topics across protocols. Topics are hierarchical strings separated by `/`.
## Topic Examples
- `sensors/temperature`
- `orders/created`
- `$queue/orders`
- `$SYS/broker/clients/connected`
- `$share/workers/sensors/#`
## Topic Filters (Subscriptions)
MQTT wildcard rules apply:
- `+` matches a single level (`sensors/+`)
- `#` matches multiple levels (`sensors/#`)
These same patterns are used by queue topic bindings as well.
## Special Namespaces
Some prefixes are reserved for broker features:
| Prefix | Purpose |
| --- | --- |
| `$SYS/...` | Broker stats topics (published periodically, retained). |
| `$queue/<name>/...` | Durable queue traffic (stored in queue logs, delivered via consumer groups). |
| `$share/<group>/<filter>` | Shared subscriptions (MQTT): load-balanced pub/sub delivery. |
## Queue Topics (`$queue/`)
Queue traffic uses the `$queue/` prefix. For example:
- `$queue/orders` (publish)
- `$queue/orders/#` (subscribe)
- `$queue/orders/$ack` (ack a delivery)
FluxMQ treats `$queue/<name>/...` as:
- `name`: the durable queue name
- everything after `name/`: the queue routing key (used for pattern matching inside the queue)
For MQTT, durable queue behavior is triggered by the `$queue/` prefix. A publish to a non-`$queue/` topic uses normal pub/sub routing.
Learn more in [Durable queues](/docs/messaging/durable-queues).
@@ -0,0 +1,101 @@
---
title: Clustering
description: Configure embedded etcd, transport, and optional Raft replication
---
# Clustering Configuration
**Last Updated:** 2026-02-05
Clustering has three building blocks:
- **Embedded etcd**: cluster metadata (ownership, subscriptions, consumer registry).
- **Inter-node transport (gRPC)**: routing publishes, queue deliveries, and session takeover.
- **Optional Raft**: replicates durable queue operations.
```yaml
cluster:
enabled: true
node_id: "broker-1"
etcd:
data_dir: "/tmp/fluxmq/etcd"
bind_addr: "0.0.0.0:2380"
client_addr: "0.0.0.0:2379"
initial_cluster: "broker-1=http://0.0.0.0:2380"
bootstrap: true
hybrid_retained_size_threshold: 1024
transport:
bind_addr: "0.0.0.0:7948"
peers: {}
raft:
enabled: false
write_policy: "forward"
distribution_mode: "forward"
```
## etcd (Metadata Plane)
`cluster.etcd` configures the embedded etcd member running inside each broker node.
`hybrid_retained_size_threshold` controls the retained/will hybrid strategy:
- Messages smaller than the threshold are replicated via etcd (metadata + payload).
- Larger messages store only metadata in etcd; the payload stays on the owner node and is fetched via transport when needed.
## Transport (Data Plane)
`cluster.transport` configures the gRPC transport used for:
- Pub/sub routing across nodes.
- Queue distribution (forwarding publishes and routing queue deliveries).
- Session takeover state transfer.
- Hybrid retained/will payload fetch (large payloads).
## Raft (Queue Replication)
Raft affects durable queues only. If `cluster.raft.enabled` is true, FluxMQ starts a single Raft group that replicates queue operations (appends and consumer-group state changes).
Two settings determine cluster queue behavior:
### `write_policy`
Controls what happens when a node that is *not* the Raft leader receives a queue publish:
- `forward`: forward the publish to the leader (recommended).
- `reject`: reject the publish on followers (clients must retry against the leader).
- `local`: append locally without redirect (no durability guarantees across nodes).
### `distribution_mode`
Controls how messages reach consumers across nodes:
- `forward`: append on one node, then route deliveries to remote consumers via transport.
- `replicate`: replicate the queue log via Raft so nodes with consumers can deliver from local storage.
<Mermaid
chart="
flowchart LR
Pub[Queue publish arrives] --> LeaderCheck{Raft enabled + leader?}
LeaderCheck -->|leader| Append[Append via Raft apply]
LeaderCheck -->|follower| Policy{write_policy}
Policy -->|forward| Fwd[Forward to leader]
Policy -->|reject| Err[Return error]
Policy -->|local| Local[Append locally]
Append --> Dist{distribution_mode}
Dist -->|forward| Route[Route to remote consumers]
Dist -->|replicate| Repl[Replicate log to peers]"
/>
Notes on current behavior:
- Raft membership is derived from the configured peer list (and the local node). `replication_factor` is accepted in config, but does not currently limit membership.
- `min_in_sync_replicas` is accepted in config, but Raft quorum rules still ultimately govern commit behavior.
## Learn More
- [Running a cluster](/docs/deployment/running-cluster)
- [Clustering internals](/docs/architecture/clustering)
- [Configuration reference](/docs/reference/configuration-reference)
@@ -0,0 +1,9 @@
{
"title": "Configuration",
"pages": [
"server",
"storage",
"clustering",
"security"
]
}
@@ -0,0 +1,57 @@
---
title: Security
description: TLS/mTLS listeners, inter-broker TLS, and rate limiting
---
# Security Configuration
**Last Updated:** 2026-02-05
## TLS and mTLS
Listeners share TLS fields across `tls` and `mtls` blocks.
```yaml
server:
tcp:
tls:
addr: ":8883"
cert_file: "/path/server.crt"
key_file: "/path/server.key"
mtls:
addr: ":8884"
cert_file: "/path/server.crt"
key_file: "/path/server.key"
ca_file: "/path/clients-ca.crt"
client_auth: "require"
```
## Inter-Broker TLS
```yaml
cluster:
transport:
tls_enabled: true
tls_cert_file: "/path/transport.crt"
tls_key_file: "/path/transport.key"
tls_ca_file: "/path/transport-ca.crt"
```
## Rate Limiting
```yaml
ratelimit:
enabled: true
connection:
enabled: true
rate: 50
burst: 200
message:
enabled: true
rate: 500
burst: 2000
```
## Learn More
- [Configuration reference](/docs/reference/configuration-reference)
@@ -0,0 +1,44 @@
---
title: Server
description: Configure listeners, WebSocket path, health checks, and OpenTelemetry
---
# Server Configuration
**Last Updated:** 2026-02-05
`server` controls network listeners and telemetry endpoints. Example:
```yaml
server:
tcp:
plain:
addr: ":1883"
websocket:
plain:
addr: ":8083"
path: "/mqtt"
http:
plain:
addr: ":8080"
amqp:
plain:
addr: ":5672"
amqp091:
plain:
addr: ":5682"
health_enabled: true
health_addr: ":8081"
metrics_enabled: false
metrics_addr: "localhost:4317"
otel_metrics_enabled: true
otel_traces_enabled: false
otel_trace_sample_rate: 0.1
```
## Learn More
- [Configuration reference](/docs/reference/configuration-reference)
@@ -0,0 +1,28 @@
---
title: Storage
description: Configure broker storage backend and BadgerDB settings
---
# Storage Configuration
**Last Updated:** 2026-02-05
Broker state (sessions, retained messages, offline queues) is stored in the backend defined by `storage`.
```yaml
storage:
type: "badger" # "badger" or "memory"
badger_dir: "/tmp/fluxmq/data"
sync_writes: false
```
Queue logs are stored under:
```
<storage.badger_dir>/queue
```
## Learn More
- [Storage internals](/docs/architecture/storage)
- [Configuration reference](/docs/reference/configuration-reference)
@@ -0,0 +1,7 @@
{
"title": "Deployment",
"pages": [
"running-cluster",
"running-in-production"
]
}
@@ -0,0 +1,38 @@
---
title: Running Cluster
description: Start a multi-node FluxMQ cluster with embedded etcd and transport peers
---
# Running Cluster
**Last Updated:** 2026-02-05
FluxMQ clustering uses embedded etcd for metadata and gRPC transport for routing. The repo includes working 3-node examples.
## Use the Example Configs
From the repo root:
```bash
./build/fluxmq --config examples/node1.yaml
./build/fluxmq --config examples/node2.yaml
./build/fluxmq --config examples/node3.yaml
```
## Key Cluster Settings
- `cluster.enabled`: turn clustering on
- `cluster.node_id`: unique node identifier
- `cluster.etcd.*`: embedded etcd configuration
- `cluster.transport.*`: gRPC transport for routing
- `cluster.raft.*`: optional queue replication
## Bootstrap Rules
- Set `cluster.etcd.bootstrap: true` only on the first node.
- Other nodes should set `bootstrap: false` and share the same `initial_cluster` map.
## Learn More
- [Clustering internals](/docs/architecture/clustering)
- [Configuration reference](/docs/reference/configuration-reference)
@@ -1,13 +1,13 @@
---
title: Scaling & Performance
description: Comprehensive scaling guide covering capacity analysis, performance optimizations, benchmarks, topic sharding, and architecture for 100M+ clients
title: Running in Production
description: Production checklist covering performance, scaling, observability, and operational safety
---
# Scaling & Performance
# Running in Production
**Last Updated:** 2026-02-05
Performance and scaling are workload-dependent. Use the benchmark suites in `benchmarks/` and validate on your target hardware and network before making production commitments.
Production readiness is workload-dependent. Use the benchmark suites in `benchmarks/` and validate on your target hardware and network before making production commitments.
## Benchmarking
@@ -35,4 +35,15 @@ Performance and scaling are workload-dependent. Use the benchmark suites in `ben
- Use multiple nodes to improve availability and distribute connections.
- Queue replication is optional and configured via `cluster.raft`.
For configuration details, see `docs/configuration.md`.
## Security Basics
- Prefer TLS/mTLS listeners in `server.*.tls` and `server.*.mtls`.
- If you enable inter-broker transport TLS, configure `cluster.transport.tls_*`.
- Use `ratelimit.*` to protect against connection and message floods.
## Observability
- Enable OpenTelemetry metrics via `server.metrics_enabled` and `server.metrics_addr`.
- Consider enabling traces only for debugging (`server.otel_traces_enabled`).
For configuration details, see [Configuration reference](/docs/reference/configuration-reference).
@@ -0,0 +1,39 @@
---
title: First Durable Queue
description: Send a message to a durable queue and consume it with a consumer group
---
# First Durable Queue
**Last Updated:** 2026-02-05
Durable queues use the `$queue/` topic prefix. The broker stores the message in the queue log and delivers it to consumers in a group.
## 1. Publish to a Queue
```bash
mosquitto_pub -p 1883 -t "$queue/orders" -m "order-1" -q 1
```
## 2. Consume with a Consumer Group
Choose a client that can set a consumer group:
- **MQTT v5**: set the `consumer-group` user property on SUBSCRIBE.
- **AMQP 1.0**: set `consumer-group` in attach properties.
- **AMQP 0.9.1**: set `x-consumer-group` on `basic.consume`.
## 3. Acknowledge
Acks are required for durable queues. For MQTT v5, publish to:
- `$queue/<queue>/$ack`
- `$queue/<queue>/$nack`
- `$queue/<queue>/$reject`
Include `message-id` and `group-id` as user properties.
## Next Steps
- [Durable queues](/docs/messaging/durable-queues)
- [Consumer groups](/docs/concepts/consumer-groups)
@@ -0,0 +1,29 @@
---
title: First Message
description: Publish and subscribe to your first MQTT topic
---
# First Message
**Last Updated:** 2026-02-05
This example uses `mosquitto_pub`/`mosquitto_sub` against the default MQTT TCP listener.
## 1. Subscribe
```bash
mosquitto_sub -p 1883 -t "test/#" -v
```
## 2. Publish
```bash
mosquitto_pub -p 1883 -t "test/hello" -m "Hello FluxMQ"
```
You should see the message appear in the subscriber terminal.
## Next Steps
- [Publishing messages](/docs/messaging/publishing-messages)
- [Consuming messages](/docs/messaging/consuming-messages)
@@ -0,0 +1,8 @@
{
"title": "Getting Started",
"pages": [
"quick-start-docker",
"first-message",
"first-durable-queue"
]
}
@@ -0,0 +1,44 @@
---
title: Quick Start (Docker)
description: Run FluxMQ with Docker Compose or docker run using the provided config
---
# Quick Start (Docker)
**Last Updated:** 2026-02-05
## Option 1: Docker Compose (Recommended)
Run from the repo root so paths resolve correctly:
```bash
docker compose -f docker/compose.yaml up -d
```
Use a custom config (example):
```bash
FLUXMQ_CONFIG=../examples/no-cluster.yaml \
docker compose -f docker/compose.yaml up -d
```
## Option 2: Docker Run
```bash
docker run --rm \
-p 1883:1883 \
-p 8083:8083 \
-p 8080:8080 \
-p 5672:5672 \
-p 5682:5682 \
-p 8081:8081 \
-v "$(pwd)/docker/config.yaml:/etc/fluxmq/config.yaml:ro" \
-v fluxmq-data:/var/lib/fluxmq \
ghcr.io/absmach/fluxmq:latest \
--config /etc/fluxmq/config.yaml
```
## Next Steps
- [First message](/docs/getting-started/first-message)
- [First durable queue](/docs/getting-started/first-durable-queue)
+36 -27
View File
@@ -10,14 +10,16 @@ A high-performance, multi-protocol message broker written in Go designed for sca
## Features
- **Multi-Protocol Support** - MQTT 3.1.1/5.0, WebSocket, HTTP-MQTT Bridge, CoAP Bridge
- **High Performance** - 300K-500K msg/s per node with zero-copy packet parsing
- **Performance-focused** - zero-copy packet parsing and pooled allocations for high-throughput workloads
- **Clustering** - Embedded etcd for distributed coordination, no external dependencies
- **Durable Queues** - Consumer groups with ack/nack/reject semantics
- **Security** - TLS/mTLS, rate limiting, authentication plugins
- **Security** - TLS/mTLS, rate limiting, pluggable auth/authz
- **Observability** - OpenTelemetry metrics, structured logging, webhook notifications
## Quick Start
Prefer Docker? See [Quick Start (Docker)](/docs/getting-started/quick-start-docker).
### Build & Run
```bash
@@ -82,46 +84,53 @@ FluxMQ uses a clean layered architecture:
## Documentation
<Cards>
<Card
title="Getting Started"
href="/docs/getting-started/quick-start-docker"
description="Quick start, first message, first queue"
/>
<Card
title="Concepts"
href="/docs/concepts/broker"
description="Broker, topics, queues, consumer groups"
/>
<Card
title="Architecture"
href="/docs/architecture"
description="System design and component overview"
href="/docs/architecture/overview"
description="Overview, routing, storage, clustering internals"
/>
<Card
title="Messaging"
href="/docs/messaging/publishing-messages"
description="Publishing, consuming, queues, streams, and protocol interop"
/>
<Card
title="Configuration"
href="/docs/configuration"
description="Complete configuration reference"
href="/docs/configuration/server"
description="Server, storage, clustering, security"
/>
<Card
title="Clustering"
href="/docs/clustering"
description="Distributed broker setup"
title="Clients"
href="/docs/clients/mqtt"
description="MQTT, HTTP, Go client, WebSocket"
/>
<Card
title="Client Libraries"
href="/docs/client"
description="Go MQTT and AMQP clients"
/>
<Card
title="Durable Queues"
href="/docs/queue"
description="Queue system with consumer groups"
/>
<Card
title="Webhooks"
href="/docs/webhooks"
description="Event notification system"
title="Reference"
href="/docs/reference/configuration-reference"
description="Configuration, CLI, protocol summary"
/>
</Cards>
## Performance
## Performance Notes
| Metric | Value |
| -------------------------- | ------------------------ |
| **Concurrent Connections** | 500K+ per node |
| **Message Throughput** | 300K-500K msg/s per node |
| **Latency (local)** | \<10ms |
| **Session Takeover** | \<100ms |
| **Concurrent Connections** | Workload and hardware dependent |
| **Message Throughput** | Workload and hardware dependent |
| **Latency (local)** | Depends on QoS, fan-out, and persistence settings |
| **Session Takeover** | Fast-path optimized; benchmark in your target environment |
See `benchmarks/README.md` in the repository for reproducible benchmark commands.
## Use Cases
@@ -0,0 +1,396 @@
---
title: Consumer Groups
description: Configure consumer groups and acknowledgments across MQTT and AMQP
---
# Consumer Groups
**Last Updated:** 2026-02-06
Consumer groups provide load-balanced, fault-tolerant consumption for durable queues.
## Overview
A consumer group is a logical consumer that may be backed by many physical processes:
- **Load balancing**: Each message is delivered to exactly one member of the group
- **Progress tracking**: The group tracks cursor/committed offsets
- **Fault tolerance**: In classic mode, unacknowledged messages are redelivered after timeout
- **Work stealing**: Idle consumers can claim work from overloaded peers
```
┌─────────────────────────────────────────────────────────┐
│ Queue: orders │
│ [msg1] [msg2] [msg3] [msg4] [msg5] [msg6] [msg7] ... │
└─────────────────────────────────────────────────────────┘
┌─────────────┴─────────────┐
▼ ▼
┌───────────────┐ ┌───────────────┐
│ Group: workers│ │ Group: audit │
│ (3 consumers)│ │ (1 consumer) │
└───────┬───────┘ └───────┬───────┘
│ │
┌───────┼───────┐ │
▼ ▼ ▼ ▼
[C1] [C2] [C3] [C4]
C1 gets msg1 │ C4 gets msg1
C2 gets msg2 │ load-balanced C4 gets msg2
C3 gets msg3 │ C4 gets msg3
C1 gets msg4 ▼ ...all messages
```
## Key Concepts
### Multiple Groups = Fan-Out
Multiple consumer groups on the same queue each receive **all messages** independently:
- Group "workers" processes messages for order fulfillment
- Group "audit" logs all messages for compliance
- Group "analytics" aggregates metrics
Each group maintains its own cursor and acknowledgment state.
### Multiple Consumers in a Group = Load Balancing
Multiple consumers within a single group share the workload:
- Messages are distributed round-robin across available consumers
- Each message goes to exactly one consumer in the group
- If a consumer disconnects, its pending messages are redelivered to others
### Pattern-Based Group Isolation
When consumers subscribe with different filter patterns, FluxMQ tracks them as separate logical groups internally:
```
Subscribe: $queue/orders/eu/# → group: "workers@eu/#"
Subscribe: $queue/orders/us/# → group: "workers@us/#"
```
Even with the same consumer group name "workers", these are independent because the filter pattern differs.
## Setting a Consumer Group
### MQTT v5
Use the `consumer-group` user property on SUBSCRIBE:
```bash
# Join consumer group "workers"
mosquitto_sub -p 1883 -t '$queue/orders/#' -q 1 \
-D subscribe user-property consumer-group workers
```
Multiple clients with the same `consumer-group` value form a load-balanced pool.
### MQTT v3
MQTT v3 does not support user properties. The consumer group defaults to the client ID prefix (everything before the first `-`):
```
client ID: worker-1 → group: "worker"
client ID: worker-2 → group: "worker" (same group, load balanced)
client ID: audit-1 → group: "audit" (different group)
```
Note: Acknowledgments require MQTT v5 user properties, so MQTT v3 is limited to auto-ack scenarios.
### AMQP 0.9.1
Use the `x-consumer-group` argument on `basic.consume`:
```go
deliveries, err := ch.Consume(
"$queue/orders/#", // queue
"consumer-tag", // consumer tag
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
amqp091.Table{
"x-consumer-group": "workers",
},
)
```
### AMQP 1.0
Use `consumer-group` in attach properties:
```
attach {
name: "orders-consumer",
source: { address: "$queue/orders" },
properties: { "consumer-group": "workers" }
}
```
## Consumer Group Modes
### Classic Mode (Work Queue)
Default mode optimized for "process once" semantics:
- Messages are **claimed** and must be acknowledged
- `ack` marks the message as processed
- `nack` triggers redelivery
- `reject` removes the message (future: moves to DLQ)
- **Visibility timeout**: Unacknowledged messages are redelivered after timeout
- **Work stealing**: Idle consumers can claim work from overloaded peers
### Stream Mode
Optimized for "replay and observe" semantics:
- Cursor-based consumption (read position in append-only log)
- Messages are read, not claimed - no per-message acknowledgment tracking
- Multiple consumers can read the same messages independently
- Supports seeking to specific offsets or timestamps
- Optional manual commit for checkpoint control
```go
// AMQP 0.9.1: Subscribe to stream with cursor positioning
ch.Consume("my-stream", "", false, false, false, false, amqp091.Table{
"x-consumer-group": "readers",
"x-stream-offset": "first", // start from beginning
"x-auto-commit": false, // manual commit
})
```
See [Queue types](/docs/messaging/durable-queues#queue-types) for detailed explanation of how classic and stream modes differ internally (PEL tracking, cursor management, etc.).
## Acknowledgments
Queue deliveries include metadata for acknowledgment:
| Property | Description |
|----------|-------------|
| `message-id` | Unique identifier (format: `queueName:offset`) |
| `group-id` | Consumer group that received the message |
| `queue` | Queue name |
| `offset` | Message sequence number |
### MQTT v5 Acknowledgments
Publish to ack/nack/reject topics with required properties:
```bash
# Acknowledge
mosquitto_pub -p 1883 -t '$queue/orders/$ack' -m '' \
-D publish user-property message-id "orders:42" \
-D publish user-property group-id "workers"
# Negative acknowledge (retry)
mosquitto_pub -p 1883 -t '$queue/orders/$nack' -m '' \
-D publish user-property message-id "orders:42" \
-D publish user-property group-id "workers"
# Reject (no retry, future: DLQ)
mosquitto_pub -p 1883 -t '$queue/orders/$reject' -m '' \
-D publish user-property message-id "orders:42" \
-D publish user-property group-id "workers" \
-D publish user-property reason "invalid payload"
```
### AMQP 0.9.1 Acknowledgments
Standard AMQP acknowledgment methods are mapped:
```go
// Acknowledge - message processed successfully
delivery.Ack(false) // multiple=false
// Negative acknowledge - retry the message
delivery.Nack(false, true) // multiple=false, requeue=true
// Reject - no retry (future: moves to DLQ)
delivery.Reject(false) // requeue=false
```
### AMQP 1.0 Acknowledgments
AMQP 1.0 dispositions are mapped to queue acknowledgments:
| Disposition | Queue Action |
|-------------|--------------|
| `Accepted` | Ack |
| `Released` | Nack (retry) |
| `Rejected` | Reject (DLQ) |
## Visibility Timeout and Work Stealing
In classic mode, FluxMQ provides automatic fault recovery:
### Visibility Timeout
When a message is delivered, it becomes "invisible" to other consumers for a configurable timeout (default: 30s). If the consumer doesn't acknowledge within this window:
1. The message becomes visible again
2. It can be claimed by any consumer in the group
3. The delivery count is incremented
This handles consumer crashes, network issues, and slow processing.
### Work Stealing
Idle consumers can proactively claim pending messages from overloaded peers:
1. Consumer A has 10 pending messages
2. Consumer B is idle (no pending work)
3. Consumer B "steals" some of A's pending messages
4. This rebalances load without waiting for visibility timeout
Work stealing is enabled by default and runs periodically.
## Examples
### Example 1: Simple Worker Pool
Three workers process orders with load balancing:
```bash
# Terminal 1: Worker 1
mosquitto_sub -p 1883 -i worker-1 -t '$queue/orders/#' -q 1 \
-D subscribe user-property consumer-group workers
# Terminal 2: Worker 2
mosquitto_sub -p 1883 -i worker-2 -t '$queue/orders/#' -q 1 \
-D subscribe user-property consumer-group workers
# Terminal 3: Worker 3
mosquitto_sub -p 1883 -i worker-3 -t '$queue/orders/#' -q 1 \
-D subscribe user-property consumer-group workers
# Terminal 4: Publish orders
mosquitto_pub -p 1883 -t '$queue/orders' -m 'order-1'
mosquitto_pub -p 1883 -t '$queue/orders' -m 'order-2'
mosquitto_pub -p 1883 -t '$queue/orders' -m 'order-3'
# Each worker receives one order
```
### Example 2: Fan-Out to Multiple Groups
Same messages go to different groups for different purposes:
```bash
# Group 1: Process orders
mosquitto_sub -p 1883 -t '$queue/orders/#' -q 1 \
-D subscribe user-property consumer-group processors
# Group 2: Audit logging
mosquitto_sub -p 1883 -t '$queue/orders/#' -q 1 \
-D subscribe user-property consumer-group audit
# Group 3: Analytics
mosquitto_sub -p 1883 -t '$queue/orders/#' -q 1 \
-D subscribe user-property consumer-group analytics
# Publish - all three groups receive this message
mosquitto_pub -p 1883 -t '$queue/orders' -m 'order-1'
```
### Example 3: Filtered Consumption
Different consumers handle different message types:
```bash
# EU order processors
mosquitto_sub -p 1883 -t '$queue/orders/eu/#' -q 1 \
-D subscribe user-property consumer-group eu-processors
# US order processors
mosquitto_sub -p 1883 -t '$queue/orders/us/#' -q 1 \
-D subscribe user-property consumer-group us-processors
# Image processors (any region)
mosquitto_sub -p 1883 -t '$queue/orders/+/images/#' -q 1 \
-D subscribe user-property consumer-group image-processors
# Publish
mosquitto_pub -p 1883 -t '$queue/orders/eu/images/resize' -m 'photo.png'
# → eu-processors receives it (matches eu/#)
# → image-processors receives it (matches +/images/#)
# → us-processors does NOT receive it
```
### Example 4: AMQP 0.9.1 Worker Pool (Go)
```go
package main
import (
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
conn, _ := amqp.Dial("amqp://localhost:5672/")
defer conn.Close()
ch, _ := conn.Channel()
defer ch.Close()
// Set prefetch for fair dispatch
ch.Qos(1, 0, false)
// Subscribe with consumer group
deliveries, _ := ch.Consume(
"$queue/tasks/#",
"", // auto-generated tag
false, // manual ack
false, // not exclusive
false, // no-local
false, // no-wait
amqp.Table{
"x-consumer-group": "workers",
},
)
for d := range deliveries {
log.Printf("Processing: %s", d.Body)
// Simulate work
// ...
// Acknowledge completion
d.Ack(false)
}
}
```
## Configuration
Queue manager settings that affect consumer groups:
```yaml
queue_manager:
visibility_timeout: "30s" # Time before unacked message is redelivered
max_delivery_count: 5 # Max retries before giving up
auto_commit_interval: "5s" # Cursor commit frequency (stream mode)
consumer_timeout: "2m" # Disconnect stale consumers
steal_enabled: true # Enable work stealing
steal_interval: "5s" # Work stealing check interval
```
## Best Practices
1. **Use meaningful group names** that reflect the consumer's purpose (e.g., "order-processors", "audit-logger")
2. **Set appropriate prefetch/QoS** to control how many messages each consumer handles concurrently
3. **Keep processing time under visibility timeout** or extend the timeout for long-running tasks
4. **Use filters to partition work** when different consumers need different message subsets
5. **Monitor consumer lag** to detect slow consumers or processing bottlenecks
6. **Handle acknowledgments promptly** - don't batch acks for too long or you risk redelivery
## Learn More
- [Durable queues](/docs/messaging/durable-queues) - Queue configuration and lifecycle
- [Publishing messages](/docs/messaging/publishing-messages) - How to publish to queues
- [Consuming messages](/docs/messaging/consuming-messages) - Subscription patterns
@@ -0,0 +1,135 @@
---
title: Consuming Messages
description: Subscribe with MQTT and receive messages from topics or queues
---
# Consuming Messages
**Last Updated:** 2026-02-05
## MQTT Subscribe
```bash
mosquitto_sub -p 1883 -t "sensors/#" -v
```
Use QoS 1 or 2 when you need delivery guarantees:
```bash
mosquitto_sub -p 1883 -t "sensors/#" -q 1 -v
```
## Queue Consumption
Queue consumers subscribe to `$queue/<queue>/...` and set a consumer group (protocol-specific).
### Basic Queue Subscription
```bash
# Subscribe to all messages in the "orders" queue
mosquitto_sub -p 1883 -t '$queue/orders/#' -q 1 -v
```
### Filtered Queue Subscription
Use wildcards to filter messages within a queue:
```bash
# Only receive messages with routing key starting with "images/"
mosquitto_sub -p 1883 -t '$queue/orders/images/#' -q 1 -v
# Only receive messages matching +/images/# (e.g., eu/images/resize, us/images/png)
mosquitto_sub -p 1883 -t '$queue/orders/+/images/#' -q 1 -v
```
### With Consumer Group (MQTT v5)
```bash
# Join consumer group "workers" for load balancing
mosquitto_sub -p 1883 -t '$queue/orders/#' -q 1 -v \
-D subscribe user-property consumer-group workers
```
## AMQP 0.9.1 Queue Consumption
### Basic Queue Subscription (Go)
```go
// Subscribe to queue with consumer group
deliveries, err := ch.Consume(
"$queue/orders/#", // queue filter
"", // consumer tag (auto-generated)
false, // auto-ack (manual ack for durability)
false, // exclusive
false, // no-local
false, // no-wait
amqp091.Table{
"x-consumer-group": "workers",
},
)
for d := range deliveries {
// Process message
fmt.Printf("Received: %s\n", d.Body)
// Acknowledge
d.Ack(false)
}
```
### Filtered Queue Subscription
```go
// Only receive messages matching +/images/#
deliveries, _ := ch.Consume(
"$queue/orders/+/images/#",
"", false, false, false, false,
amqp091.Table{"x-consumer-group": "image-processors"},
)
```
### Stream Queue Consumption
```go
// Declare stream queue first
ch.QueueDeclare("events", true, false, false, false, amqp091.Table{
"x-queue-type": "stream",
"x-max-age": "24h",
})
// Consume from beginning
deliveries, _ := ch.Consume(
"events", "", false, false, false, false,
amqp091.Table{
"x-consumer-group": "replay-consumer",
"x-stream-offset": "first",
},
)
for d := range deliveries {
// Access stream metadata from headers
offset := d.Headers["x-stream-offset"]
timestamp := d.Headers["x-stream-timestamp"]
fmt.Printf("Offset %v at %v: %s\n", offset, timestamp, d.Body)
d.Ack(false)
}
```
### Acknowledgments
```go
// Acknowledge successful processing
d.Ack(false)
// Negative acknowledgment - retry the message
d.Nack(false, true) // multiple=false, requeue=true
// Reject - send to DLQ (no retry)
d.Reject(false) // requeue=false
```
See:
- [Consumer groups](/docs/messaging/consumer-groups)
- [Durable queues](/docs/messaging/durable-queues)
@@ -0,0 +1,641 @@
---
title: Durable Queues
description: Persistent queue system for MQTT and AMQP — durable work queues and stream queues with consumer groups, acknowledgments, and append-only log storage
---
# Durable Queues
**Last Updated:** 2026-02-10
This page covers FluxMQ's two persistent queue types: **durable queues** (work queue semantics) and **stream queues** (append-only log semantics). Both are shared across MQTT, AMQP 1.0, and AMQP 0.9.1.
For an overview of all three queue types (including ephemeral queues), see [Queue Types](/docs/concepts/queues).
## Overview
Persistent queues are built around a simple model:
- Producers publish to `$queue/<queue-name>/...`.
- The broker appends the message to the queue's log (durable storage).
- Consumers join a **consumer group** and receive messages from that log.
- What happens next depends on the queue type:
| | Durable (Classic) | Stream |
| -------------- | -------------------------------------------------- | ------------------------------------------ |
| **Ack** | Remove from pending list, advance truncation point | Checkpoint cursor position |
| **Nack** | Redeliver the message (with backoff) | No-op (replay via cursor seek) |
| **Reject** | Discard message (future: DLQ) | Advance cursor past message |
| **Retention** | Truncate after all groups ack | Time/size/count policy |
| **Redelivery** | Automatic (visibility timeout + work stealing) | Manual (consumer restarts from checkpoint) |
> **Design rule:** Do not overload ack semantics. "Ack" means "processed, remove it" in a durable queue and "checkpoint my position" in a stream. If you need both task processing and replay for the same data, use two queues with overlapping topic bindings.
## Do MQTT Producers Need `$queue/`?
Yes. In FluxMQ, MQTT publishes are routed to the queue manager only when the topic starts with `$queue/`.
- Publish to `sensors/temp` goes through normal pub/sub routing (subscriptions, retained, cluster pub/sub).
- Publish to `$queue/orders/...` goes through the queue manager (log storage + consumer groups + acks).
The `$queue/` prefix is what activates queue semantics — whether the target is an ephemeral, durable, or stream queue. If you configured a queue with topic bindings that match a non-`$queue/` topic, MQTT publishes to that topic will still *not* be enqueued, because the MQTT broker does not route non-`$queue/` publishes into the queue manager.
## Architecture
```
┌──────────────┐ ┌──────────────┐ ┌───────────────┐
│ MQTT Broker │ │ AMQP Broker │ │ AMQP091 Broker│
│ (TCP/WS/ │ │ (AMQP 1.0) │ │ (AMQP 0.9.1) │
│ HTTP/CoAP) │ │ │ │ │
└──────┬───────┘ └───────┬──────┘ └──────┬────────┘
│ │ │
└──────────────────┼────────────────┘
┌─────────────────────────┐
│ Shared Queue Manager │
│ - Topic bindings │
│ - Consumer groups │
│ - Retention loop │
└───────────┬─────────────┘
┌─────────────────────────┐
│ Log Storage (AOL) │
└─────────────────────────┘
```
## Queue Name, Topic, and Routing Key
Queue topics follow this convention:
- Queue root: `$queue/<name>`
- Routing key: everything after the root (optional)
Examples:
- `$queue/orders` (root only)
- `$queue/orders/created` (routing key `created`)
- `$queue/orders/images/png` (routing key `images/png`)
When you subscribe to `$queue/orders/images/#`, FluxMQ treats:
- queue name: `orders`
- group filter pattern: `images/#`
That pattern is applied inside the queue manager (not in the normal pub/sub router).
## Wildcard Patterns and Routing Key Filtering
Queue subscriptions support MQTT-style wildcards (`+` for single level, `#` for multiple levels). However, the wildcard handling is split into two stages:
1. **Queue binding**: The queue itself is bound to `$queue/<name>/#` (captures all messages for that queue)
2. **Consumer filter**: The pattern after the queue name filters messages during delivery
### Example 1: Simple Queue
```
Subscribe: $queue/orders
Publish: $queue/orders
```
- Queue name: `orders`
- Consumer filter: (none)
- All messages to `$queue/orders` are delivered
### Example 2: Single-Level Routing Key
```
Subscribe: $queue/orders/images
Publish: $queue/orders/images
```
- Queue name: `orders`
- Consumer filter: `images`
- Only messages with routing key `images` are delivered
### Example 3: Multi-Level Wildcard
```
Subscribe: $queue/orders/images/#
Publish: $queue/orders/images/png
Publish: $queue/orders/images/resize/thumbnail
```
- Queue name: `orders`
- Consumer filter: `images/#`
- Queue binding: `$queue/orders/#` (captures everything)
- During delivery, filter `images/#` matches `images/png` and `images/resize/thumbnail`
### Example 4: Single-Level Wildcard
```
Subscribe: $queue/orders/+/images/#
Publish: $queue/orders/eu/images/resize
Publish: $queue/orders/us/images/png
```
- Queue name: `orders`
- Consumer filter: `+/images/#`
Flow for publish to `$queue/orders/eu/images/resize`:
1. `FindMatchingQueues("$queue/orders/eu/images/resize")` matches queue `orders` (bound to `$queue/orders/#`)
2. Message stored in queue with topic `$queue/orders/eu/images/resize`
3. During delivery, routing key is extracted: `eu/images/resize`
4. Filter `+/images/#` matches:
- `+` matches `eu`
- `images` matches `images`
- `#` matches `resize`
5. Message delivered to consumer
### Example 5: Multiple Consumer Groups with Different Filters
One queue can have multiple consumer groups with different filter patterns:
```
# Group A: processes all EU orders
Subscribe: $queue/orders/eu/#
(consumer-group: "eu-processors")
# Group B: processes all image-related orders
Subscribe: $queue/orders/+/images/#
(consumer-group: "image-processors")
# Publish
Publish: $queue/orders/eu/images/resize
```
Both groups receive the message because:
- Group A filter `eu/#` matches `eu/images/resize`
- Group B filter `+/images/#` matches `eu/images/resize`
### Key Points
- The **publisher must always use the `$queue/` prefix** for messages to be routed through the queue manager
- The queue is bound to `$queue/<name>/#` regardless of the subscription pattern
- Wildcard filtering happens at **delivery time**, not at publish time
- Multiple consumer groups can have different filters on the same queue
## Routing Model (How Messages Find Queues)
FluxMQ uses **topic patterns on queues** to decide where a publish should be stored. This is a fan-out model: one publish can land in multiple queues if multiple patterns match.
- Each queue has one or more topic patterns (MQTT wildcard syntax).
- On publish, the queue manager calls `FindMatchingQueues(topic)` and appends the message to every matching queue.
- If **no queue matches**, FluxMQ creates an **[ephemeral queue](/docs/concepts/queues#ephemeral-queue)** whose name and pattern equal the topic, then appends. Ephemeral queues are in-memory, best-effort, and cleaned up after the last consumer disconnects.
- If no queues are configured at all, FluxMQ creates a reserved `mqtt` queue that matches `$queue/#` so queue publishes always have a landing zone.
In production, prefer explicit `queues:` configuration so queue names, types, and bindings are stable. Ephemeral queues are a development convenience — they prevent silent message loss but don't provide durability or delivery guarantees. See [Queue Types](/docs/concepts/queues) for when to use each type.
<Mermaid
chart="
flowchart LR
P[Publish $queue/... or queue-capable AMQP] --> QM[Queue Manager]
QM --> Match[Match topic against queue patterns]
Match --> Q1[Queue A log]
Match --> Q2[Queue B log]
Match --> QN[Queue N log]
QM --> Deliver[Delivery loop]
Deliver --> Local[Local consumers MQTT/AMQP]
Deliver --> Remote[Remote consumers via cluster]"
/>
## Walkthrough: Queue Message Lifecycle
This is the “classic” (work queue) lifecycle:
1. Producer publishes to a `$queue/<name>/...` topic (or a queue-capable AMQP address).
2. Queue manager matches the topic against queue bindings and appends to the queue log(s).
3. A consumer group claims a message at some offset; the message becomes pending (tracked in the PEL).
4. The broker delivers the message to the chosen consumer.
5. The consumer acks/nacks/rejects; the queue manager updates group state and may advance the safe truncation point.
<Mermaid
chart="
sequenceDiagram
participant P as Producer
participant B as Broker
participant Q as Queue Manager
participant C as Consumer
P->>B: PUBLISH $queue/orders/...
B->>Q: Publish(topic, payload, properties)
Q->>Q: Append to log (offset N)
Q-->>C: Deliver (message-id=orders:N, group-id=...)
C->>B: PUBLISH .../$ack (message-id, group-id)
B->>Q: Ack(queue=orders, message-id, group-id)
Q->>Q: Remove from PEL, advance committed offset"
/>
## Queue Addressing
Queue topics use `$queue/<queue-name>/...`.
### MQTT
- Publish: `$queue/orders`
- Publish with routing key: `$queue/orders/eu/images/resize`
- Subscribe to a pattern: `$queue/orders/#` or `$queue/orders/+/images/#`
- Ack: `$queue/orders/$ack`
- Nack: `$queue/orders/$nack`
- Reject: `$queue/orders/$reject`
### AMQP 0.9.1
AMQP 0.9.1 clients can interact with queues in multiple ways:
**Direct queue publish** (default exchange with `$queue/` routing key):
```
exchange: "" (default)
routing_key: "$queue/orders"
routing_key: "$queue/orders/eu/images/resize" # with routing key
```
**Stream queue publish** (declare queue with `x-queue-type: stream`):
```
exchange: "" (default)
routing_key: "my-stream" # queue name without $queue/ prefix
```
**Exchange-based routing** (bind queue to exchange):
```
queue.bind(queue="orders", exchange="my-exchange", routing_key="orders.#")
publish(exchange="my-exchange", routing_key="orders.eu.images")
```
**Consume from queue**:
```
basic.consume(queue="$queue/orders/#")
basic.consume(queue="$queue/orders/+/images/#")
basic.consume(queue="my-stream") # stream queue by name
```
**Acknowledgments**:
- `basic.ack` → Ack
- `basic.nack(requeue=true)` → Nack (retry)
- `basic.reject(requeue=false)` → Reject (DLQ)
#### Exchanges and Bindings Are Per-Connection
> **Important difference from RabbitMQ:** In FluxMQ, AMQP 0.9.1 exchanges and bindings are **per-connection routing state**. They are not shared across connections, not visible to other clients, and not persisted to disk — even when declared as `durable`.
FluxMQ's actual routing layer is topic-based: queues are matched by topic patterns, not by server-side exchange state. When an AMQP 0.9.1 client declares an exchange and binds a queue to it, the channel stores that binding locally. On publish, the channel consults its local binding table to translate `exchange + routing_key` into a `$queue/<queue>/<routing-key>` topic, then routes through the shared queue manager like any other protocol.
This means:
- **Client A declares an exchange and bindings → client B cannot see them.** Each connection maintains its own exchange/binding state.
- **Exchanges don't survive reconnection.** Clients must re-declare exchanges and bindings on every new connection (which is already standard practice in most AMQP 0.9.1 client libraries).
- **`durable: true` on exchange.declare is accepted but does not persist.** The flag is not rejected, but it has no effect — exchange state lives only in connection memory.
- **Passive exchange.declare checks local state only.** It reports whether *this connection* has declared the exchange, not whether it exists server-wide.
**Why this design**: FluxMQ routes messages through a shared topic index on queues. Exchanges are a compatibility layer that translates AMQP 0.9.1 routing concepts into FluxMQ topic semantics. Building a second server-wide routing layer for exchanges would duplicate what the queue topic index already provides.
**Practical impact**: For most AMQP 0.9.1 usage patterns, this is transparent — clients typically declare exchanges and bindings on connection setup before publishing. If your application depends on exchanges being shared across connections (e.g., one client declares, another publishes), use direct `$queue/` routing keys with the default exchange instead.
## Consumer Groups
Consumer groups are what turn a queue log into a scalable worker pool:
- A single queue can have many groups (independent progress).
- A single group can have many consumers (load balancing).
- Each group has its own cursor state and (in classic mode) pending list.
How to set the group ID depends on the protocol:
- MQTT v5: `consumer-group` user property on SUBSCRIBE.
- MQTT v3: falls back to client ID (acks require MQTT v5 user properties).
- AMQP 1.0: `consumer-group` in attach properties.
- AMQP 0.9.1: `x-consumer-group` argument on `basic.consume`.
### MQTT v5 Example
```bash
mosquitto_sub -p 1883 -t '$queue/orders/#' -q 1 \
-D subscribe user-property consumer-group workers
```
### AMQP 0.9.1 Example (Go)
```go
ch.Consume(
"$queue/orders/#", // queue
"consumer-tag", // consumer tag
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
amqp091.Table{
"x-consumer-group": "workers",
},
)
```
## Message Properties
Queue deliveries include properties that make acknowledgments and observability consistent across protocols:
- `message-id` (required for ack/nack/reject)
- `group-id` (consumer group name)
- `queue` (queue name)
- `offset` (sequence number)
Stream deliveries also include:
- `x-stream-offset`
- `x-stream-timestamp` (unix millis)
- `x-work-committed-offset` (if primary group is configured)
- `x-work-acked` (true when below committed offset)
- `x-work-group` (primary work group name)
## Acknowledgments
### MQTT
Ack/Nack/Reject are implemented by publishing to:
- `$queue/<queue>/$ack`
- `$queue/<queue>/$nack`
- `$queue/<queue>/$reject`
If you include a routing key (for example `$queue/orders/images/$ack`), the broker still derives the queue name from the first segment after `$queue/`.
MQTT v5 user properties required:
- `message-id`
- `group-id`
MQTT v3 can publish and subscribe to queue topics, but acknowledgments require MQTT v5 user properties.
### AMQP 1.0
AMQP dispositions are mapped to queue acknowledgments:
- Accepted → Ack
- Released → Nack
- Rejected → Reject
### AMQP 0.9.1
- `basic.ack`, `basic.nack`, `basic.reject` map to Ack/Nack/Reject
#### Stream Commit (AMQP 0.9.1)
If a stream group has auto-commit disabled, AMQP 0.9.1 can explicitly commit offsets by publishing to:
- `$queue/<queue>/$commit`
Headers:
- `x-group-id`
- `x-offset`
## Queue Types
### Classic (Work Queue)
Classic queues are optimized for "do work once" semantics:
- Messages are **claimed** and tracked in a Pending Entry List (PEL)
- `ack` removes a message from the PEL; `nack` makes it eligible for redelivery; `reject` removes it (DLQ wiring is incomplete)
- A visibility timeout plus work stealing prevents stuck consumers from stalling progress
- The safe truncation point is derived from group state (see cursor/committed below)
#### Classic Mode Delivery Flow
1. Consumer requests messages via `ClaimBatch`
2. Manager reads message from queue log at cursor position
3. **Creates PEL entry**: `{ offset, consumerID, claimedAt, deliveryCount }`
4. Advances cursor to next position
5. Delivers message to consumer
6. On `ack`: removes PEL entry, advances committed offset
7. On `nack`: resets PEL entry for redelivery
8. On timeout: message becomes stealable by other consumers
```
Classic Mode State:
Queue Log: [0] [1] [2] [3] [4] [5] [6] [7] ...
▲ ▲
│ └── cursor (next to deliver)
└── committed (safe to truncate)
PEL: {
offset=3: { consumer: "c1", claimedAt: t1, deliveries: 1 }
offset=4: { consumer: "c2", claimedAt: t2, deliveries: 2 }
offset=5: { consumer: "c1", claimedAt: t3, deliveries: 1 }
}
```
### Stream
Stream queues are optimized for "replay and progress" semantics:
- Consumption is cursor-based (read position in an append-only log)
- **No PEL tracking** - messages are simply read, not "claimed"
- Groups can auto-commit progress (default) or require explicit commits (AMQP 0.9.1)
#### Stream Mode Delivery Flow
1. Consumer requests messages via `ClaimBatchStream`
2. Manager reads messages from queue log starting at cursor position
3. **No PEL entry created** - message is simply read
4. Advances cursor to next position
5. Delivers messages to consumer
6. If `autoCommit: true`: committed offset is updated periodically
7. If `autoCommit: false`: consumer must explicitly commit via `$queue/<name>/$commit`
```
Stream Mode State:
Queue Log: [0] [1] [2] [3] [4] [5] [6] [7] ...
▲ ▲
│ └── cursor (next to read)
└── committed (checkpoint)
No PEL - just two offsets to track!
```
#### Why No PEL for Streams?
Stream mode is designed for different use cases than classic work queues:
| Aspect | Classic (PEL) | Stream (No PEL) |
| ---------------------- | ----------------------------- | ------------------------------ |
| **Semantics** | Process once | Read/replay many times |
| **Delivery guarantee** | At-least-once with ack | At-least-once with cursor |
| **Redelivery trigger** | Visibility timeout | Consumer restart/seek |
| **Work stealing** | Yes (steal from slow peers) | No (each consumer independent) |
| **Memory overhead** | PEL entry per pending message | Only cursor position |
| **Replayability** | No (acked = done) | Yes (seek to any offset) |
**Classic mode** answers: "Has this message been successfully processed?"
**Stream mode** answers: "Where is my read position in the log?"
In stream mode, the cursor itself serves as the progress indicator. If a consumer crashes:
- It restarts from the last committed cursor position
- Messages between committed and actual progress are re-read
- No per-message tracking needed - just resume from checkpoint
This makes streams more efficient for high-throughput scenarios where:
- Messages are idempotent or replayable
- Multiple independent readers need the same data
- You want Kafka-like log semantics
#### AMQP 0.9.1 Stream Queues
Declare a stream queue with retention settings:
```go
ch.QueueDeclare(
"my-stream", // name
true, // durable
false, // auto-delete
false, // exclusive
false, // no-wait
amqp091.Table{
"x-queue-type": "stream",
"x-max-age": "7D", // retain 7 days
"x-max-length-bytes": 1073741824, // 1GB max
"x-max-length": 1000000, // 1M messages max
},
)
```
Consume with cursor positioning:
```go
ch.Consume(
"my-stream", // queue name (not $queue/ prefix)
"consumer-tag",
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
amqp091.Table{
"x-consumer-group": "readers",
"x-stream-offset": "first", // "first", "last", "next", offset number, or "timestamp=..."
"x-auto-commit": false, // require manual commit
},
)
```
Stream offset options:
- `"first"` - Start from the beginning of the log
- `"last"` or `"next"` - Start from the end (new messages only)
- `123` (number) - Start from specific offset
- `"offset=123"` - Same as above, string format
- `"timestamp=1706745600000"` - Start from timestamp (unix millis)
Manual commit (when `x-auto-commit: false`):
```go
// Publish to special commit topic
ch.Publish("", "$queue/my-stream/$commit", false, false, amqp091.Publishing{
Headers: amqp091.Table{
"x-group-id": "readers",
"x-offset": int64(lastProcessedOffset),
},
})
```
## Cursors, Pending Lists, and Safe Truncation
Each consumer group tracks progress using two offsets:
- `cursor`: the next offset that will be delivered (read position)
- `committed`: the oldest offset considered safe for retention/truncation (durability floor)
In classic mode, `committed` is driven by the minimum pending offset in the PEL.
<Mermaid
chart="
flowchart LR
subgraph Queue Log
O0((0)) --> O1((1)) --> O2((2)) --> O3((3)) --> O4((4)) --> O5((5))
end
subgraph Group State classic
Cur[cursor = next to claim]
PEL[PEL = delivered, not acked]
Com[committed = min pending or cursor]
end"
/>
Practical intuition:
- Cursor moves forward when the group claims work.
- Committed moves forward when the group acks (or otherwise clears) the earliest pending work.
### Example: Classic Group Progress
Imagine a queue log with offsets `0..5`.
| Event | Cursor | PEL | Committed |
| ------------------ | ------ | ----- | --------- |
| Group starts | 0 | empty | 0 |
| Claims offsets 0,1 | 2 | {0,1} | 0 |
| Acks offset 0 | 2 | {1} | 1 |
| Acks offset 1 | 2 | empty | 2 |
This is why committed is the “safe truncation floor”: once everything below an offset is fully processed, the log can be truncated without breaking group semantics.
## Retention
Retention policies can be configured per queue:
- `max_age` (time-based)
- `max_length_bytes`
- `max_length_messages`
A background retention loop computes a **safe truncation offset** and truncates the queue log:
- Start from the minimum committed offset across queue-mode consumer groups.
- Apply retention limits (time/size/message count) to compute the oldest offset that should be kept.
- Truncate the log to the lowest safe offset (segment-granular in log storage).
See [Storage internals](/docs/architecture/storage) for on-disk format and retention behavior details.
## Clustering and Replication Notes
In clustered deployments, queue behavior depends on `cluster.raft.*`:
- `write_policy` controls how followers handle incoming queue publishes.
- `distribution_mode` controls whether deliveries are routed (`forward`) or logs are replicated (`replicate`).
See [Cluster configuration](/docs/configuration/clustering) and [Clustering internals](/docs/architecture/clustering) for the full picture.
## DLQ Status
A DLQ handler exists in `queue/consumer/dlq.go`, but the main delivery path does not automatically move rejected or expired messages into a DLQ yet. `Reject` currently removes the message from the pending list without pushing it to a DLQ.
## Configuration
Queues are configured under `queues` in the main config file:
```yaml
queue_manager:
auto_commit_interval: "5s"
queues:
- name: "orders"
topics: ["$queue/orders/#"]
type: "classic" # classic or stream
primary_group: "" # optional for stream status
limits:
max_message_size: 10485760
max_depth: 100000
message_ttl: "168h"
retry:
max_retries: 10
initial_backoff: "5s"
max_backoff: "5m"
multiplier: 2.0
dlq:
enabled: true
topic: "" # optional override
retention:
max_age: "168h"
max_length_bytes: 0
max_length_messages: 0
```
@@ -0,0 +1,162 @@
---
title: Example
description: Runnable Go example demonstrating MQTT pub/sub, cross-protocol queue interop, and stream consumption
---
# Messaging Example
A single Go program that demonstrates FluxMQ's messaging capabilities across three scenarios:
1. **MQTT pub/sub** — standard topic-based publish and subscribe
2. **MQTT → AMQP 0.9.1 queue** — publish via MQTT, consume via AMQP 0.9.1 on a durable queue
3. **AMQP 0.9.1 stream queue** — declare a stream, publish events, and replay from the beginning
The example uses only third-party MQTT and AMQP client libraries — no FluxMQ-specific SDK is required.
## Prerequisites
- **Go 1.24+** installed
- **FluxMQ running locally** with default ports:
- MQTT: `1883`
- AMQP 0.9.1: `5682`
Start FluxMQ with Docker or from source:
```bash
# Docker
docker compose -f docker/compose.yaml up -d
# From source
go run ./cmd --config examples/no-cluster.yaml
```
See [Quick Start (Docker)](/docs/getting-started/quick-start-docker) for details.
## Running the Example
```bash
go run ./examples/messaging/
```
Custom addresses:
```bash
go run ./examples/messaging/ \
-mqtt localhost:1883 \
-amqp091 localhost:5682
```
## What It Does
### Scenario 1: MQTT Pub/Sub
Standard MQTT messaging — a publisher sends a JSON payload to `demo/sensors/temperature`, and a subscriber on the same topic receives it. This is vanilla MQTT with no queues involved.
```go
// Publisher
pub.Publish("demo/sensors/temperature", 1, false, `{"sensor":"temp-1","value":22.5}`)
// Subscriber
sub.Subscribe("demo/sensors/temperature", 1, func(_ mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received: %s\n", msg.Payload())
})
```
### Scenario 2: MQTT → AMQP 0.9.1 (Durable Queue)
Demonstrates **cross-protocol interoperability**. An MQTT client publishes orders to the `$queue/demo-orders` topic. An AMQP 0.9.1 consumer reads from the same durable queue — messages traverse the protocol boundary transparently.
The AMQP 0.9.1 consumer joins a consumer group and manually acknowledges each message:
```go
deliveries, _ := ch.Consume(
"$queue/demo-orders/#",
"demo-consumer",
false, false, false, false,
amqp091.Table{
"x-consumer-group": "demo-workers",
},
)
for d := range deliveries {
fmt.Printf("Received: %s\n", d.Body)
d.Ack(false)
}
```
Key points:
- The `$queue/` prefix routes messages through the durable queue manager
- `x-consumer-group` sets the consumer group for load balancing
- Manual acks ensure at-least-once delivery
### Scenario 3: AMQP 0.9.1 Stream Queue
Demonstrates **stream consumption** — a replayable, append-only log. The example declares a stream queue, publishes events, then consumes them **from the beginning** using `x-stream-offset: "first"`.
```go
// Declare stream queue
ch.QueueDeclare("demo-events", true, false, false, false, amqp091.Table{
"x-queue-type": "stream",
"x-max-age": "1h",
})
// Publish to stream
ch.PublishWithContext(ctx, "", "demo-events", false, false, amqp091.Publishing{
Body: []byte(`{"event":"user.action","seq":1}`),
})
// Consume from beginning (replay)
deliveries, _ := ch.Consume("demo-events", "stream-reader", false, false, false, false,
amqp091.Table{
"x-consumer-group": "demo-readers",
"x-stream-offset": "first",
},
)
```
Key points:
- Stream queues use the queue name directly — no `$queue/` prefix
- `x-stream-offset` controls where consumption starts: `"first"`, `"last"`, `"next"`, or a specific offset number
- Each delivered message includes an `x-stream-offset` header with its position in the log
- Stream queues retain messages based on `x-max-age`, unlike classic queues which remove messages after acknowledgment
## Expected Output
```
=== Scenario 1: MQTT Pub/Sub ===
[MQTT sub] Subscribed to demo/sensors/temperature
[MQTT pub] Publishing to demo/sensors/temperature: {"sensor":"temp-1","value":22.5}
[MQTT sub] Received on demo/sensors/temperature: {"sensor":"temp-1","value":22.5}
[OK] MQTT pub/sub round-trip successful
=== Scenario 2: MQTT → AMQP 0.9.1 (Durable Queue) ===
[AMQP 0.9.1] Consuming from queue 'demo-orders' in group 'demo-workers'
[MQTT pub] Publishing to $queue/demo-orders: {"order_id":"order-1",...}
[AMQP 0.9.1] Received: {"order_id":"order-1",...}
...
[OK] All 5 messages published via MQTT, consumed via AMQP 0.9.1
=== Scenario 3: AMQP 0.9.1 Stream Queue ===
[AMQP 0.9.1] Declared stream queue 'demo-events'
[AMQP 0.9.1] Published to stream: {"event":"user.action","seq":1}
...
[AMQP 0.9.1] Consuming stream 'demo-events' from offset 'first'
[AMQP 0.9.1] Stream message (offset=1): {"event":"user.action","seq":1}
...
[OK] Published 5 events, replayed all 5 from stream
All scenarios completed.
```
## Libraries Used
| Library | Protocol | Purpose |
| --- | --- | --- |
| [eclipse/paho.mqtt.golang](https://github.com/eclipse/paho.mqtt.golang) | MQTT 3.1.1 | Publish and subscribe |
| [rabbitmq/amqp091-go](https://github.com/rabbitmq/amqp091-go) | AMQP 0.9.1 | Queue consume, stream declare/publish/consume |
## Next Steps
- [Durable Queues](/docs/messaging/durable-queues) — queue types, routing keys, retention, and acknowledgment semantics
- [Consumer Groups](/docs/messaging/consumer-groups) — fan-out, load balancing, and group configuration
- [Queue Client Example](https://github.com/absmach/fluxmq/tree/main/examples/queue-client) — a more advanced order-processing pipeline with multiple consumer groups across all three protocols
@@ -0,0 +1,10 @@
{
"title": "Messaging",
"pages": [
"publishing-messages",
"consuming-messages",
"durable-queues",
"consumer-groups",
"example"
]
}
@@ -0,0 +1,123 @@
---
title: Publishing Messages
description: Publish via MQTT or HTTP, including queue and retained messages
---
# Publishing Messages
**Last Updated:** 2026-02-05
## MQTT Publish
```bash
mosquitto_pub -p 1883 -t "sensors/temp" -m "22.5" -q 1
```
Retained message:
```bash
mosquitto_pub -p 1883 -t "sensors/last" -m "22.5" -r
```
## HTTP Publish (Bridge)
Enable the HTTP bridge by setting `server.http.plain.addr` in your config. The `/publish` endpoint accepts JSON with `topic`, `payload`, `qos`, and `retain`.
Note: `payload` is a base64-encoded string in JSON.
```bash
curl -sS -X POST http://localhost:8080/publish \
-H 'Content-Type: application/json' \
-d '{"topic":"sensors/temp","payload":"MjIuNQ==","qos":1,"retain":false}'
```
## Publishing to Queues
Use the `$queue/` prefix to publish to durable queues:
```bash
mosquitto_pub -p 1883 -t '$queue/orders' -m '{"id": "order-1"}' -q 1
```
### With Routing Keys
Add routing keys after the queue name to enable filtered consumption:
```bash
# Routing key: images/png
mosquitto_pub -p 1883 -t '$queue/orders/images/png' -m '{"file": "photo.png"}' -q 1
# Routing key: eu/images/resize
mosquitto_pub -p 1883 -t '$queue/orders/eu/images/resize' -m '{"file": "photo.png"}' -q 1
```
Consumers subscribed to `$queue/orders/images/#` will receive the first message.
Consumers subscribed to `$queue/orders/+/images/#` will receive both messages.
## AMQP 0.9.1 Publishing
### Basic Publish
```go
ch.Publish(
"", // exchange (default)
"sensors/temp", // routing key
false, // mandatory
false, // immediate
amqp091.Publishing{
ContentType: "application/json",
Body: []byte(`{"temp": 22.5}`),
},
)
```
### Publishing to Queues
Use the `$queue/` prefix with the default exchange:
```go
// Simple queue publish
ch.Publish("", "$queue/orders", false, false, amqp091.Publishing{
Body: []byte(`{"id": "order-1"}`),
})
// With routing key for filtered consumers
ch.Publish("", "$queue/orders/eu/images/resize", false, false, amqp091.Publishing{
Body: []byte(`{"file": "photo.png"}`),
})
```
### Publishing to Stream Queues
Stream queues use the queue name directly (without `$queue/` prefix):
```go
// Declare stream queue first
ch.QueueDeclare("events", true, false, false, false, amqp091.Table{
"x-queue-type": "stream",
})
// Publish to stream
ch.Publish("", "events", false, false, amqp091.Publishing{
Body: []byte(`{"event": "user.created"}`),
})
```
### Exchange-Based Routing
Bind queues to exchanges for flexible routing:
```go
// Declare exchange
ch.ExchangeDeclare("orders-exchange", "topic", true, false, false, false, nil)
// Bind queue to exchange with pattern
ch.QueueBind("orders", "orders.#", "orders-exchange", false, nil)
// Publish via exchange - routes to bound queues
ch.Publish("orders-exchange", "orders.eu.created", false, false, amqp091.Publishing{
Body: []byte(`{"id": "order-1"}`),
})
```
See [Durable queues](/docs/messaging/durable-queues) for wildcard patterns, acknowledgments, and consumer groups.
+16
View File
@@ -0,0 +1,16 @@
{
"title": "Docs",
"pages": [
"index",
"getting-started",
"concepts",
"architecture",
"messaging",
"configuration",
"clients",
"deployment",
"comparison",
"roadmap",
"reference"
]
}
-176
View File
@@ -1,176 +0,0 @@
---
title: Durable Queues
description: Shared queue system for MQTT and AMQP with consumer groups, acknowledgments, stream queues, and append-only log storage
---
# Durable Queues
**Last Updated:** 2026-02-05
FluxMQ provides durable queues shared across MQTT, AMQP 1.0, and AMQP 0.9.1. The queue manager is append-only with consumer groups and supports both classic work-queue semantics and stream-style consumption.
## Overview
- **Queue topics** use the `$queue/` prefix
- **Consumer groups** enable load-balanced processing
- **Ack/Nack/Reject** are supported across protocols
- **Retention** can be configured per queue (time/size/message count)
- **Stream queues** are supported via queue type `stream`
- **DLQ handler exists**, but automatic DLQ routing is not wired yet
## Architecture
```
┌──────────────┐ ┌──────────────┐ ┌───────────────┐
│ MQTT Broker │ │ AMQP Broker │ │ AMQP091 Broker│
│ (TCP/WS/ │ │ (AMQP 1.0) │ │ (AMQP 0.9.1) │
│ HTTP/CoAP) │ │ │ │ │
└──────┬───────┘ └───────┬──────┘ └──────┬────────┘
│ │ │
└──────────────────┼────────────────┘
┌─────────────────────────┐
│ Shared Queue Manager │
│ - Topic bindings │
│ - Consumer groups │
│ - Retention loop │
└───────────┬─────────────┘
┌─────────────────────────┐
│ Log Storage (AOL) │
└─────────────────────────┘
```
## Queue Addressing
Queue topics use `$queue/<queue-name>/...`.
Examples:
- Publish: `$queue/orders`
- Subscribe to a pattern: `$queue/orders/#`
- Ack: `$queue/orders/$ack`
- Nack: `$queue/orders/$nack`
- Reject: `$queue/orders/$reject`
## Consumer Groups
- **MQTT v5**: provide `consumer-group` as a user property on SUBSCRIBE
- **MQTT v3**: consumer group falls back to client ID (acks require MQTT v5)
- **AMQP 1.0**: provide `consumer-group` in attach properties
- **AMQP 0.9.1**: provide `x-consumer-group` in `basic.consume`
## Message Properties
Queue deliveries include these properties:
- `message-id` (required for ack/nack/reject)
- `group-id` (consumer group name)
- `queue` (queue name)
- `offset` (sequence number)
Stream deliveries also include:
- `x-stream-offset`
- `x-stream-timestamp` (unix millis)
- `x-work-committed-offset` (if primary group is configured)
- `x-work-acked` (true when below committed offset)
- `x-work-group` (primary work group name)
## Acknowledgments
### MQTT
Ack/Nack/Reject are implemented by publishing to `$queue/<queue>/$ack|$nack|$reject` with MQTT v5 user properties:
- `message-id`
- `group-id`
MQTT v3 can publish and subscribe to queue topics, but acknowledgments require MQTT v5 user properties.
### AMQP 1.0
AMQP dispositions are mapped to queue acknowledgments:
- Accepted → Ack
- Released → Nack
- Rejected → Reject
### AMQP 0.9.1
- `basic.ack`, `basic.nack`, `basic.reject` map to Ack/Nack/Reject
#### Stream Commit (AMQP 0.9.1)
Stream consumers can explicitly commit offsets by publishing to:
- `$queue/<queue>/$commit`
Headers:
- `x-group-id`
- `x-offset`
## Queue Types
### Classic (Work Queue)
- Ack/Nack/Reject semantics
- Pending entry tracking per consumer group
- Redelivery uses visibility timeouts and work stealing
- Retry backoff settings are accepted in config but not yet enforced in delivery timing
### Stream
- Append-only log semantics
- Cursor-based consumption
- Optional manual commit
## Retention
Retention policies can be configured per queue:
- `max_age` (time-based)
- `max_length_bytes`
- `max_length_messages`
A background retention loop truncates logs to the safe offset based on configured limits.
## DLQ Status
A DLQ handler exists in `queue/consumer/dlq.go`, but the main delivery path does not automatically move rejected or expired messages into a DLQ yet. `Reject` currently removes the message from the pending list without pushing it to a DLQ.
## Configuration
Queues are configured under `queues` in the main config file:
```yaml
queue_manager:
auto_commit_interval: "5s"
queues:
- name: "orders"
topics: ["$queue/orders/#"]
type: "classic" # classic or stream
primary_group: "" # optional for stream status
limits:
max_message_size: 10485760
max_depth: 100000
message_ttl: "168h"
retry:
max_retries: 10
initial_backoff: "5s"
max_backoff: "5m"
multiplier: 2.0
dlq:
enabled: true
topic: "" # optional override
retention:
max_age: "168h"
max_length_bytes: 0
max_length_messages: 0
```
@@ -0,0 +1,25 @@
---
title: CLI Reference
description: Command-line flags for starting FluxMQ
---
# CLI Reference
**Last Updated:** 2026-02-05
## fluxmq
```bash
./build/fluxmq [--config /path/to/config.yaml]
```
### Flags
- `--config` Path to a YAML configuration file. If omitted or the file is missing, defaults are used.
## Examples
```bash
./build/fluxmq
./build/fluxmq --config examples/no-cluster.yaml
```
@@ -1,11 +1,11 @@
---
title: Configuration Guide
description: Comprehensive configuration reference for server transports, broker settings, clustering, storage, security, and operational features
title: Configuration Reference
description: Comprehensive YAML configuration reference for server, broker, storage, clustering, and operational settings
---
# Configuration Guide
# Configuration Reference
**Last Updated:** 2026-02-05
**Last Updated:** 2026-02-07
FluxMQ uses a single YAML configuration file. Start the broker with:
@@ -15,6 +15,13 @@ FluxMQ uses a single YAML configuration file. Start the broker with:
If `--config` is omitted, defaults are used (see `config.Default()` in `config/config.go`).
Looking for a guided walkthrough? See:
- [Server configuration](/docs/configuration/server)
- [Storage configuration](/docs/configuration/storage)
- [Cluster configuration](/docs/configuration/clustering)
- [Security configuration](/docs/configuration/security)
## Configuration Overview
Top-level keys:
@@ -180,6 +187,14 @@ storage:
## Cluster
Clustering combines:
- **Embedded etcd** (`cluster.etcd`): metadata coordination (session ownership, subscriptions, queue consumers, retained/will metadata).
- **gRPC transport** (`cluster.transport`): cross-node routing (publishes, queue messages, session takeover, hybrid payload fetch), including delivery to local MQTT, AMQP 1.0, and AMQP 0.9.1 clients.
- **Optional Raft** (`cluster.raft`): replicates durable queue operations.
For a “how it works” deep dive, see [Clustering internals](/docs/architecture/clustering).
```yaml
cluster:
enabled: false
@@ -218,6 +233,25 @@ cluster:
snapshot_threshold: 8192
```
### Raft Behavior (What The Knobs Mean)
Two Raft fields control most real-world behavior for queues:
- `cluster.raft.write_policy`: follower behavior when receiving a queue publish (`forward` is usually the best default).
- `cluster.raft.distribution_mode`: cross-node delivery strategy (`forward` routes deliveries; `replicate` relies on the replicated log).
Other fields affect durability and timing:
- `sync_mode`: if true, a queue publish waits for the Raft apply to complete (bounded by `ack_timeout`).
- `ack_timeout`: how long the leader waits for an apply to finish in sync mode.
- `heartbeat_timeout`, `election_timeout`: Raft stability knobs (failover sensitivity vs churn).
- `snapshot_interval`, `snapshot_threshold`: storage/compaction knobs for the Raft log.
Notes on current implementation:
- FluxMQ currently runs a single Raft group for all queues. Group membership comes from the configured peer list (and the local node).
- `replication_factor` and `min_in_sync_replicas` are accepted in config but do not currently limit membership or override Raft quorum rules.
## Webhooks
```yaml
@@ -0,0 +1,8 @@
{
"title": "Reference",
"pages": [
"configuration-reference",
"cli-reference",
"protocol-reference"
]
}
@@ -0,0 +1,95 @@
---
title: Protocol Reference
description: Supported protocols, transport options, and how each protocol adapter maps to FluxMQ queue semantics
---
# Protocol Reference
**Last Updated:** 2026-02-10
FluxMQ supports multiple protocols and transports. Each protocol adapter translates protocol-specific concepts into FluxMQ's shared queue primitives. Delivery semantics depend on the [queue type](/docs/concepts/queues), not the protocol.
## Supported Protocols
| Protocol | Transport | Queue Publish | Queue Consume | Ack Support |
| -------------- | -------------- | ------------------------------------------ | ---------------------------------- | --------------------------------- |
| **MQTT 3.1.1** | TCP, WebSocket | `$queue/` prefix | `$queue/` prefix | No (no user properties) |
| **MQTT 5.0** | TCP, WebSocket | `$queue/` prefix | `$queue/` prefix + user properties | Yes (`$ack/$nack/$reject` topics) |
| **AMQP 0.9.1** | TCP | `$queue/` routing key or exchange bindings | `basic.consume` | Yes (`basic.ack/nack/reject`) |
| **AMQP 1.0** | TCP | Link to `$queue/` address | Link from `$queue/` address | Yes (dispositions) |
| **HTTP** | HTTP POST | `/publish` endpoint | No | No |
| **CoAP** | UDP | CoAP POST | No | No |
Protocol listeners are configured under `server.*` in the YAML config. See [Server configuration](/docs/configuration/server).
## Protocol Adapter Design
Protocol adapters are **translators, not abstractions**. Each adapter maps its native concepts to FluxMQ queue primitives without fabricating behavior that the underlying queue type doesn't support.
Key principles:
- **Routing is shared.** All protocols route through the same queue manager and topic index. An MQTT publish and an AMQP 0.9.1 publish to the same `$queue/` topic land in the same queue log.
- **Delivery semantics come from the queue type.** A durable queue delivers with PEL tracking and ack/nack/reject regardless of whether the consumer is MQTT or AMQP. A stream queue delivers with cursor semantics regardless of protocol.
- **Protocol-specific features are scoped to the adapter.** AMQP 0.9.1 exchanges exist only as per-connection routing tables. MQTT shared subscriptions use a separate code path from queue consumer groups. AMQP 1.0 link capabilities map to queue properties.
## MQTT (3.1.1 / 5.0)
- Queue traffic is activated by the `$queue/` topic prefix
- Consumer groups are set via `consumer-group` user property on SUBSCRIBE (MQTT v5 only)
- MQTT v3 clients can publish and subscribe to queue topics, but cannot set consumer groups or send acknowledgments (requires user properties)
- Acknowledgments use special topics: `$queue/<queue>/$ack`, `$queue/<queue>/$nack`, `$queue/<queue>/$reject`
- Shared subscriptions (`$share/<group>/<filter>`) are a separate pub/sub feature — they are not queue consumer groups
See [MQTT client](/docs/clients/mqtt) for usage examples.
## AMQP 0.9.1
- Direct queue publish: use `$queue/` prefix as routing key with the default exchange
- Stream queue publish: use queue name as routing key (without `$queue/` prefix) after declaring with `x-queue-type: stream`
- Exchange-based routing: declare an exchange, bind a queue, and publish via the exchange — the channel translates this to `$queue/<queue>/<routing-key>`
- Consumer groups: set via `x-consumer-group` argument on `basic.consume`
- Stream cursor positioning: set via `x-stream-offset` argument on `basic.consume`
- Acknowledgments: `basic.ack`, `basic.nack`, `basic.reject` map directly to queue Ack/Nack/Reject
### Exchanges and Bindings
**Exchanges and bindings are per-connection state.** They are not shared across connections, not visible to other clients, and not persisted — even when declared as `durable`. This is a deliberate design choice: FluxMQ's routing layer is topic-based on queues, and exchanges are a compatibility shim that translates AMQP 0.9.1 routing concepts into FluxMQ topic semantics.
Clients must re-declare exchanges and bindings on every new connection. If your application depends on exchanges being shared across connections, use direct `$queue/` routing keys with the default exchange instead.
See [Durable Queues — AMQP 0.9.1](/docs/messaging/durable-queues#amqp-091) for full details.
## AMQP 1.0
- Queue addressing: link to `$queue/<name>` address, or use `queue` capability in source/target
- Consumer groups: set via `consumer-group` in link properties
- Cursor positioning: set via `cursor` link property (`earliest`, `latest`, or a specific offset)
- Settle modes: settled-on-send (fire-and-forget) vs settled-on-ack (at-least-once)
- Dispositions map to queue acknowledgments:
| AMQP 1.0 Disposition | Queue Action |
| -------------------- | ------------ |
| `Accepted` | Ack |
| `Released` | Nack (retry) |
| `Rejected` | Reject (DLQ) |
## HTTP Bridge
- Publish-only: `POST /publish` with JSON body (`topic`, `payload`, `qos`, `retain`)
- No subscription or acknowledgment support
- Useful for integrating HTTP services that need to push messages into queues
See [HTTP client](/docs/clients/http) for usage.
## CoAP Bridge
- Publish-only: CoAP POST to topic path
- No subscription or acknowledgment support
## Learn More
- [Queue Types](/docs/concepts/queues)
- [Server configuration](/docs/configuration/server)
- [MQTT client](/docs/clients/mqtt)
- [WebSocket client](/docs/clients/websocket)
- [HTTP client](/docs/clients/http)
+9 -9
View File
@@ -13,14 +13,14 @@ This roadmap highlights nearterm focus areas. Ordering may change as issues a
## Focus Areas
- Tests
- Benchmarks
- Optimizations
- Architecture revision after the above
- Scaling and recovery tests
- Performance optimization and code cleanup
- Dashboards and a basic UI with metrics
- Improved and faster logging and telemetry
- Extensive storage tests
- 🧪 Tests
- 📊 Benchmarks
- ⚙️ Optimizations
- 🏗️ Architecture revision after the above
- 🔁 Scaling and recovery tests
- 🧹 Performance optimization and code cleanup
- 🖥️ Dashboards and a basic UI with metrics
- 📡 Improved and faster logging and telemetry
- 💾 Extensive storage tests
For daytoday progress, track open issues and PRs.
+3 -116
View File
@@ -1,119 +1,6 @@
---
title: Webhook System
description: Comprehensive webhook system for asynchronous event notifications with circuit breaker, retry logic, and flexible filtering
title: Webhooks
description: This page has moved
---
# Webhook System
**Last Updated:** 2026-02-05
FluxMQ can emit broker events to external HTTP endpoints using an asynchronous webhook notifier.
## Overview
- Asynchronous event queue with worker pool
- Retry with exponential backoff
- Circuit breaker per endpoint
- Filtering by event type and topic pattern
- HTTP sender only (gRPC sender not implemented)
## Architecture
```
Broker Events
Webhook Notifier (queue)
│ drop_policy: oldest/newest
Worker Pool
│ retry + circuit breaker
HTTP Sender
External Endpoints
```
## Event Types
Events are defined in `broker/events/events.go`.
- `client.connected`: `client_id`, `protocol`, `clean_start`, `keep_alive`, `remote_addr`
- `client.disconnected`: `client_id`, `reason`, `remote_addr`
- `client.session_takeover`: `client_id`, `from_node`, `to_node`
- `message.published`: `client_id`, `topic`, `qos`, `retained`, `payload_size`, `payload`
- `message.delivered`: `client_id`, `topic`, `qos`, `payload_size`
- `message.retained`: `topic`, `payload_size`, `cleared`
- `subscription.created`: `client_id`, `topic_filter`, `qos`, `subscription_id`
- `subscription.removed`: `client_id`, `topic_filter`
- `auth.success`: `client_id`, `remote_addr`
- `auth.failure`: `client_id`, `reason`, `remote_addr`
- `authz.publish_denied`: `client_id`, `topic`, `reason`
- `authz.subscribe_denied`: `client_id`, `topic_filter`, `reason`
### Payload Notes
The `message.published` payload field is defined in the event schema (base64-encoded when populated), but payload inclusion is not currently wired in the broker. `webhook.include_payload` is accepted in config, yet payloads are sent as empty strings at the moment.
## Event Envelope
```json
{
"event_type": "message.published",
"event_id": "550e8400-e29b-41d4-a716-446655440000",
"timestamp": "2026-02-05T12:00:00Z",
"broker_id": "broker-1",
"data": {
"client_id": "publisher-1",
"topic": "sensors/temperature",
"qos": 1,
"retained": false,
"payload_size": 256
}
}
```
## Filtering
Each endpoint can filter by:
- `events`: list of event types
- `topic_filters`: MQTT-style patterns (supports `+` and `#`)
## Retry and Circuit Breaker
- Retries use exponential backoff (`initial_interval * multiplier^attempt`), capped by `max_interval`
- Circuit breaker is per endpoint and trips after `failure_threshold` consecutive failures
## Configuration
```yaml
webhook:
enabled: true
queue_size: 10000
drop_policy: "oldest" # oldest or newest
workers: 5
include_payload: false
shutdown_timeout: "30s"
defaults:
timeout: "5s"
retry:
max_attempts: 3
initial_interval: "1s"
max_interval: "30s"
multiplier: 2.0
circuit_breaker:
failure_threshold: 5
reset_timeout: "60s"
endpoints:
- name: "analytics"
type: "http"
url: "https://example.com/webhook"
events: ["message.published"]
topic_filters: ["sensors/#"]
headers:
Authorization: "Bearer token"
```
This page has moved to [Webhooks](/docs/architecture/webhooks).