mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 06:50:18 +00:00
@@ -12,6 +12,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/absmach/fluxmq/broker/router"
|
||||
"github.com/absmach/fluxmq/cluster"
|
||||
"github.com/absmach/fluxmq/storage"
|
||||
|
||||
qtypes "github.com/absmach/fluxmq/queue/types"
|
||||
@@ -140,6 +141,20 @@ func (b *Broker) DeliverToClient(ctx context.Context, clientID string, msg any)
|
||||
}
|
||||
}
|
||||
|
||||
// DeliverToClusterMessage delivers a message routed from another cluster node to a local AMQP 0.9.1 client.
|
||||
func (b *Broker) DeliverToClusterMessage(ctx context.Context, clientID string, msg *cluster.Message) error {
|
||||
connID := strings.TrimPrefix(clientID, amqp091Prefix)
|
||||
|
||||
val, ok := b.connections.Load(connID)
|
||||
if !ok {
|
||||
return fmt.Errorf("AMQP 0.9.1 client not found: %s", connID)
|
||||
}
|
||||
|
||||
c := val.(*Connection)
|
||||
c.deliverMessage(msg.Topic, msg.Payload, msg.Properties)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close gracefully shuts down the broker.
|
||||
func (b *Broker) Close() error {
|
||||
b.connections.Range(func(key, val any) bool {
|
||||
|
||||
@@ -0,0 +1,112 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package broker
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/absmach/fluxmq/amqp/codec"
|
||||
"github.com/absmach/fluxmq/cluster"
|
||||
)
|
||||
|
||||
func readBrokerFramesFrom(t *testing.T, buf *bytes.Buffer, start int) []*codec.Frame {
|
||||
t.Helper()
|
||||
|
||||
data := buf.Bytes()
|
||||
if start > len(data) {
|
||||
t.Fatalf("start offset beyond buffer length")
|
||||
}
|
||||
|
||||
r := bytes.NewReader(data[start:])
|
||||
var frames []*codec.Frame
|
||||
for r.Len() > 0 {
|
||||
frame, err := codec.ReadFrame(r)
|
||||
if err != nil {
|
||||
t.Fatalf("ReadFrame failed: %v", err)
|
||||
}
|
||||
frames = append(frames, frame)
|
||||
}
|
||||
return frames
|
||||
}
|
||||
|
||||
func TestDeliverToClusterMessage(t *testing.T) {
|
||||
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
|
||||
b := New(nil, logger)
|
||||
buf := &bytes.Buffer{}
|
||||
|
||||
c := &Connection{
|
||||
broker: b,
|
||||
writer: bufio.NewWriter(buf),
|
||||
frameMax: defaultFrameMax,
|
||||
logger: logger,
|
||||
connID: "conn-1",
|
||||
channels: make(map[uint16]*Channel),
|
||||
}
|
||||
ch := newChannel(c, 1)
|
||||
ch.consumers["ctag"] = &consumer{
|
||||
tag: "ctag",
|
||||
queue: "telemetry/#",
|
||||
noAck: true,
|
||||
}
|
||||
c.channels[1] = ch
|
||||
b.connections.Store(c.connID, c)
|
||||
|
||||
msg := &cluster.Message{
|
||||
Topic: "telemetry/room1",
|
||||
Payload: []byte("hello"),
|
||||
Properties: map[string]string{"message-id": "m1"},
|
||||
}
|
||||
if err := b.DeliverToClusterMessage(context.Background(), PrefixedClientID(c.connID), msg); err != nil {
|
||||
t.Fatalf("DeliverToClusterMessage failed: %v", err)
|
||||
}
|
||||
|
||||
frames := readBrokerFramesFrom(t, buf, 0)
|
||||
if len(frames) != 3 {
|
||||
t.Fatalf("expected 3 frames, got %d", len(frames))
|
||||
}
|
||||
|
||||
decoded, err := frames[0].Decode()
|
||||
if err != nil {
|
||||
t.Fatalf("Decode failed: %v", err)
|
||||
}
|
||||
deliver, ok := decoded.(*codec.BasicDeliver)
|
||||
if !ok {
|
||||
t.Fatalf("expected BasicDeliver, got %T", decoded)
|
||||
}
|
||||
if deliver.ConsumerTag != "ctag" {
|
||||
t.Fatalf("expected consumer tag ctag, got %q", deliver.ConsumerTag)
|
||||
}
|
||||
|
||||
if frames[1].Type != codec.FrameHeader {
|
||||
t.Fatalf("expected header frame, got %d", frames[1].Type)
|
||||
}
|
||||
if frames[2].Type != codec.FrameBody {
|
||||
t.Fatalf("expected body frame, got %d", frames[2].Type)
|
||||
}
|
||||
if string(frames[2].Payload) != "hello" {
|
||||
t.Fatalf("expected payload hello, got %q", string(frames[2].Payload))
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeliverToClusterMessageClientNotFound(t *testing.T) {
|
||||
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
|
||||
b := New(nil, logger)
|
||||
|
||||
err := b.DeliverToClusterMessage(context.Background(), PrefixedClientID("missing"), &cluster.Message{
|
||||
Topic: "telemetry/room1",
|
||||
Payload: []byte("hello"),
|
||||
})
|
||||
if err == nil {
|
||||
t.Fatal("expected error, got nil")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "client not found") {
|
||||
t.Fatalf("expected client not found error, got %v", err)
|
||||
}
|
||||
}
|
||||
+1
-2
@@ -57,8 +57,7 @@ func (d *messageDispatcher) DeliverToClient(ctx context.Context, clientID string
|
||||
return d.amqp.DeliverToClusterMessage(ctx, clientID, msg)
|
||||
}
|
||||
if amqpbroker.IsAMQP091Client(clientID) {
|
||||
// This method needs to be implemented in amqp091broker
|
||||
// return d.amqp091.DeliverToClusterMessage(ctx, clientID, msg)
|
||||
return d.amqp091.DeliverToClusterMessage(ctx, clientID, msg)
|
||||
}
|
||||
return d.mqtt.DeliverToClient(ctx, clientID, msg)
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
# Absmach MQTT Broker Configuration Example
|
||||
# FluxMQ Broker Configuration Example
|
||||
|
||||
# Server Configuration
|
||||
server:
|
||||
|
||||
@@ -20,7 +20,11 @@ Demonstrates cross-protocol queue interop between MQTT, AMQP 1.0, and AMQP 0.9.1
|
||||
|
||||
#### Scenario: Order Processing Pipeline
|
||||
|
||||
A queue named `tasks/orders` receives orders from multiple MQTT publishers. Three independent consumer groups process every message — two using MQTT, one using AMQP 1.0, and one using AMQP 0.9.1:
|
||||
A queue named `tasks/orders` receives orders from multiple MQTT publishers. Three independent consumer groups process every message:
|
||||
|
||||
- `order-validators` (MQTT, 2 consumers)
|
||||
- `order-fulfillment` (AMQP 1.0, 1 consumer)
|
||||
- `order-shipper` (AMQP 0.9.1, 1 consumer)
|
||||
|
||||
```
|
||||
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
|
||||
@@ -38,19 +42,16 @@ A queue named `tasks/orders` receives orders from multiple MQTT publishers. Thre
|
||||
│ (Durable Queue) │
|
||||
└─────────┬─────────┘
|
||||
│
|
||||
┌───────────────┴───────────────┐
|
||||
│ Fan-out to consumer groups │
|
||||
▼ ▼
|
||||
┌─────────────────────┐ ┌─────────────────────┐
|
||||
│ order-validators │ │ order-fulfillment │
|
||||
│ (Consumer Group 1) │ │ (Consumer Group 2) │
|
||||
│ [MQTT] │ │ [AMQP 1.0] │
|
||||
├─────────────────────┤ ├─────────────────────┤
|
||||
│ ┌─────┐ ┌─────┐ │ │ ┌─────┐ │
|
||||
│ │ C1 │ │ C2 │ │ │ │ C1 │ │
|
||||
│ └─────┘ └─────┘ │ │ └─────┘ │
|
||||
│ Load balanced │ │ Single consumer │
|
||||
└─────────────────────┘ └─────────────────────┘
|
||||
Fan-out to consumer groups
|
||||
┌──────────────┬───────────────┬───────────────┐
|
||||
▼ ▼ ▼
|
||||
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
|
||||
│ order-validators │ │ order-fulfillment│ │ order-shipper │
|
||||
│ [MQTT] │ │ [AMQP 1.0] │ │ [AMQP 0.9.1] │
|
||||
├──────────────────┤ ├──────────────────┤ ├──────────────────┤
|
||||
│ C1, C2 │ │ C1 │ │ C1 │
|
||||
│ load-balanced │ │ single consumer │ │ single consumer │
|
||||
└──────────────────┘ └──────────────────┘ └──────────────────┘
|
||||
```
|
||||
|
||||
#### Key Concepts Demonstrated
|
||||
@@ -59,7 +60,7 @@ A queue named `tasks/orders` receives orders from multiple MQTT publishers. Thre
|
||||
| -------------------------- | ---------------------------------------------------------------------- |
|
||||
| **Cross-Protocol Interop** | MQTT publishers, AMQP 1.0 and AMQP 0.9.1 consumers on the same queue |
|
||||
| **QoS 1/2 Publishing** | Reliable publish with QoS guarantees |
|
||||
| **Consumer Groups** | Both groups receive ALL messages (fan-out pattern) |
|
||||
| **Consumer Groups** | All three groups receive ALL messages (fan-out pattern) |
|
||||
| **Load Balancing** | Messages distributed across consumers within a group |
|
||||
| **AMQP Dispositions** | `AcceptMessage`, `ReleaseMessage`, `RejectMessage` for ack/nack/reject |
|
||||
| **AMQP 0.9.1 Acks** | `Ack`, `Nack`, `Reject` for processing control |
|
||||
@@ -114,7 +115,7 @@ The consumer group is passed via the `x-consumer-group` argument on `basic.consu
|
||||
|
||||
1. Start the broker:
|
||||
```bash
|
||||
go run ./cmd/ --config examples/no-cluster.yaml
|
||||
go run ./cmd --config examples/no-cluster.yaml
|
||||
```
|
||||
|
||||
2. Run the queue client:
|
||||
@@ -150,11 +151,13 @@ Starting publishers...
|
||||
|
||||
=== Statistics ===
|
||||
Messages published: 30
|
||||
Validator group processed (MQTT): 30
|
||||
Fulfillment group processed (AMQP): 30
|
||||
Validator group processed (MQTT): 30
|
||||
Fulfillment group processed (AMQP 1.0): 30-40 (depends on retries)
|
||||
Shipper group processed (AMQP 0.9.1): 30
|
||||
```
|
||||
|
||||
Note: AMQP 1.0 in this demo intentionally performs occasional `ReleaseMessage`/`RejectMessage` calls to demonstrate dispositions, so its processed count may differ from published count due to redelivery.
|
||||
|
||||
#### Architecture Notes
|
||||
|
||||
- **MQTT v5 Required**: Consumer groups and partition keys use MQTT v5 user properties
|
||||
|
||||
+138
-7
@@ -7,6 +7,9 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"math"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"connectrpc.com/connect"
|
||||
@@ -82,13 +85,51 @@ func (h *Handler) ListQueues(ctx context.Context, req *connect.Request[queuev1.L
|
||||
return nil, connect.NewError(connect.CodeInternal, err)
|
||||
}
|
||||
|
||||
queues := make([]*queuev1.Queue, len(configs))
|
||||
for i := range configs {
|
||||
queues[i] = h.queueToProto(&configs[i])
|
||||
filtered := make([]types.QueueConfig, 0, len(configs))
|
||||
prefix := req.Msg.Prefix
|
||||
for _, cfg := range configs {
|
||||
if prefix != "" && !strings.HasPrefix(cfg.Name, prefix) {
|
||||
continue
|
||||
}
|
||||
filtered = append(filtered, cfg)
|
||||
}
|
||||
|
||||
sort.Slice(filtered, func(i, j int) bool {
|
||||
return filtered[i].Name < filtered[j].Name
|
||||
})
|
||||
|
||||
start := 0
|
||||
pageToken := req.Msg.PageToken
|
||||
if pageToken != "" {
|
||||
for i, cfg := range filtered {
|
||||
if cfg.Name > pageToken {
|
||||
start = i
|
||||
break
|
||||
}
|
||||
start = len(filtered)
|
||||
}
|
||||
}
|
||||
|
||||
end := len(filtered)
|
||||
limit := int(req.Msg.Limit)
|
||||
if limit > 0 && start+limit < end {
|
||||
end = start + limit
|
||||
}
|
||||
|
||||
page := filtered[start:end]
|
||||
queues := make([]*queuev1.Queue, len(page))
|
||||
for i := range page {
|
||||
queues[i] = h.queueToProto(&page[i])
|
||||
}
|
||||
|
||||
nextPageToken := ""
|
||||
if end < len(filtered) && len(page) > 0 {
|
||||
nextPageToken = page[len(page)-1].Name
|
||||
}
|
||||
|
||||
return connect.NewResponse(&queuev1.ListQueuesResponse{
|
||||
Queues: queues,
|
||||
Queues: queues,
|
||||
NextPageToken: nextPageToken,
|
||||
}), nil
|
||||
}
|
||||
|
||||
@@ -112,7 +153,35 @@ func (h *Handler) UpdateQueue(ctx context.Context, req *connect.Request[queuev1.
|
||||
return nil, connect.NewError(connect.CodeInternal, err)
|
||||
}
|
||||
|
||||
return connect.NewResponse(h.queueToProto(config)), nil
|
||||
updated := *config
|
||||
if req.Msg.Config != nil {
|
||||
cfg := req.Msg.Config
|
||||
if cfg.Retention != nil {
|
||||
if cfg.Retention.MaxAge != nil {
|
||||
maxAge := cfg.Retention.MaxAge.AsDuration()
|
||||
updated.MessageTTL = maxAge
|
||||
updated.Retention.RetentionTime = maxAge
|
||||
}
|
||||
if cfg.Retention.MaxBytes > 0 {
|
||||
updated.Retention.RetentionBytes = int64(cfg.Retention.MaxBytes)
|
||||
}
|
||||
if cfg.Retention.MinMessages > 0 {
|
||||
updated.Retention.RetentionMessages = int64(cfg.Retention.MinMessages)
|
||||
}
|
||||
}
|
||||
if cfg.MaxMessageSize > 0 {
|
||||
updated.MaxMessageSize = int64(cfg.MaxMessageSize)
|
||||
}
|
||||
}
|
||||
|
||||
if err := h.queueStore.UpdateQueue(ctx, updated); err != nil {
|
||||
if err == storage.ErrQueueNotFound {
|
||||
return nil, connect.NewError(connect.CodeNotFound, err)
|
||||
}
|
||||
return nil, connect.NewError(connect.CodeInternal, err)
|
||||
}
|
||||
|
||||
return connect.NewResponse(h.queueToProto(&updated)), nil
|
||||
}
|
||||
|
||||
// --- Append Operations ---
|
||||
@@ -336,8 +405,45 @@ func (h *Handler) SeekToTimestamp(ctx context.Context, req *connect.Request[queu
|
||||
return nil, connect.NewError(connect.CodeInternal, err)
|
||||
}
|
||||
|
||||
tail, err := h.queueStore.Tail(ctx, msg.QueueName)
|
||||
if err != nil {
|
||||
return nil, connect.NewError(connect.CodeInternal, err)
|
||||
}
|
||||
|
||||
if msg.Timestamp == nil {
|
||||
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("timestamp is required"))
|
||||
}
|
||||
|
||||
target := msg.Timestamp.AsTime()
|
||||
offset := head
|
||||
for offset < tail {
|
||||
batch, err := h.queueStore.ReadBatch(ctx, msg.QueueName, offset, 128)
|
||||
if err != nil {
|
||||
if err == storage.ErrOffsetOutOfRange {
|
||||
break
|
||||
}
|
||||
return nil, connect.NewError(connect.CodeInternal, err)
|
||||
}
|
||||
if len(batch) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
for _, m := range batch {
|
||||
if !m.CreatedAt.Before(target) {
|
||||
return connect.NewResponse(&queuev1.SeekResponse{
|
||||
Offset: m.Sequence,
|
||||
Timestamp: timestamppb.New(m.CreatedAt),
|
||||
ExactMatch: m.CreatedAt.Equal(target),
|
||||
}), nil
|
||||
}
|
||||
}
|
||||
|
||||
offset = batch[len(batch)-1].Sequence + 1
|
||||
}
|
||||
|
||||
return connect.NewResponse(&queuev1.SeekResponse{
|
||||
Offset: head,
|
||||
Offset: tail,
|
||||
Timestamp: timestamppb.New(target),
|
||||
}), nil
|
||||
}
|
||||
|
||||
@@ -725,17 +831,42 @@ func (h *Handler) Truncate(ctx context.Context, req *connect.Request[queuev1.Tru
|
||||
// --- Helper Functions ---
|
||||
|
||||
func (h *Handler) queueToProto(config *types.QueueConfig) *queuev1.Queue {
|
||||
retentionMaxAge := config.Retention.RetentionTime
|
||||
if retentionMaxAge == 0 {
|
||||
retentionMaxAge = config.MessageTTL
|
||||
}
|
||||
|
||||
return &queuev1.Queue{
|
||||
Name: config.Name,
|
||||
Topics: config.Topics,
|
||||
Config: &queuev1.QueueConfig{
|
||||
Retention: &queuev1.RetentionConfig{
|
||||
MaxAge: durationpb.New(config.MessageTTL),
|
||||
MaxAge: durationpb.New(retentionMaxAge),
|
||||
MaxBytes: clampInt64ToUint64(config.Retention.RetentionBytes),
|
||||
MinMessages: clampInt64ToUint64(config.Retention.RetentionMessages),
|
||||
},
|
||||
MaxMessageSize: clampInt64ToUint32(config.MaxMessageSize),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func clampInt64ToUint64(value int64) uint64 {
|
||||
if value <= 0 {
|
||||
return 0
|
||||
}
|
||||
return uint64(value)
|
||||
}
|
||||
|
||||
func clampInt64ToUint32(value int64) uint32 {
|
||||
if value <= 0 {
|
||||
return 0
|
||||
}
|
||||
if value > math.MaxUint32 {
|
||||
return math.MaxUint32
|
||||
}
|
||||
return uint32(value)
|
||||
}
|
||||
|
||||
func (h *Handler) messageToProto(msg *types.Message) *queuev1.Message {
|
||||
protoMsg := &queuev1.Message{
|
||||
Offset: msg.Sequence,
|
||||
|
||||
@@ -0,0 +1,207 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"connectrpc.com/connect"
|
||||
queuev1 "github.com/absmach/fluxmq/pkg/proto/queue/v1"
|
||||
memlog "github.com/absmach/fluxmq/queue/storage/memory/log"
|
||||
"github.com/absmach/fluxmq/queue/types"
|
||||
"google.golang.org/protobuf/types/known/durationpb"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
func TestListQueuesFilteringAndPagination(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
store := memlog.New()
|
||||
h := NewHandler(nil, store, nil, nil)
|
||||
|
||||
for _, name := range []string{"alpha", "beta", "delta", "gamma"} {
|
||||
cfg := types.DefaultQueueConfig(name, "$queue/"+name+"/#")
|
||||
if err := store.CreateQueue(ctx, cfg); err != nil {
|
||||
t.Fatalf("create queue %q: %v", name, err)
|
||||
}
|
||||
}
|
||||
|
||||
filteredResp, err := h.ListQueues(ctx, connect.NewRequest(&queuev1.ListQueuesRequest{
|
||||
Prefix: "g",
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("list filtered queues: %v", err)
|
||||
}
|
||||
if len(filteredResp.Msg.Queues) != 1 || filteredResp.Msg.Queues[0].Name != "gamma" {
|
||||
t.Fatalf("unexpected filtered queues: %#v", filteredResp.Msg.Queues)
|
||||
}
|
||||
|
||||
page1Resp, err := h.ListQueues(ctx, connect.NewRequest(&queuev1.ListQueuesRequest{
|
||||
Limit: 2,
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("list queues page 1: %v", err)
|
||||
}
|
||||
if got := len(page1Resp.Msg.Queues); got != 2 {
|
||||
t.Fatalf("unexpected page 1 size: %d", got)
|
||||
}
|
||||
page1Names := []string{page1Resp.Msg.Queues[0].Name, page1Resp.Msg.Queues[1].Name}
|
||||
if !sort.StringsAreSorted(page1Names) {
|
||||
t.Fatalf("page 1 not sorted: %#v", page1Names)
|
||||
}
|
||||
if page1Resp.Msg.NextPageToken == "" {
|
||||
t.Fatalf("expected next_page_token on first page")
|
||||
}
|
||||
|
||||
page2Resp, err := h.ListQueues(ctx, connect.NewRequest(&queuev1.ListQueuesRequest{
|
||||
Limit: 2,
|
||||
PageToken: page1Resp.Msg.NextPageToken,
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("list queues page 2: %v", err)
|
||||
}
|
||||
if got := len(page2Resp.Msg.Queues); got != 2 {
|
||||
t.Fatalf("unexpected page 2 size: %d", got)
|
||||
}
|
||||
if page2Resp.Msg.NextPageToken != "" {
|
||||
t.Fatalf("expected empty next_page_token on last page, got %q", page2Resp.Msg.NextPageToken)
|
||||
}
|
||||
page2Names := []string{page2Resp.Msg.Queues[0].Name, page2Resp.Msg.Queues[1].Name}
|
||||
if !sort.StringsAreSorted(page2Names) {
|
||||
t.Fatalf("page 2 not sorted: %#v", page2Names)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateQueueAppliesConfig(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
store := memlog.New()
|
||||
h := NewHandler(nil, store, nil, nil)
|
||||
|
||||
cfg := types.DefaultQueueConfig("orders", "$queue/orders/#")
|
||||
if err := store.CreateQueue(ctx, cfg); err != nil {
|
||||
t.Fatalf("create queue: %v", err)
|
||||
}
|
||||
|
||||
retention := 2 * time.Hour
|
||||
updateResp, err := h.UpdateQueue(ctx, connect.NewRequest(&queuev1.UpdateQueueRequest{
|
||||
Name: "orders",
|
||||
Config: &queuev1.QueueConfig{
|
||||
Retention: &queuev1.RetentionConfig{
|
||||
MaxAge: durationpb.New(retention),
|
||||
MaxBytes: 2048,
|
||||
MinMessages: 10,
|
||||
},
|
||||
MaxMessageSize: 4096,
|
||||
},
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("update queue: %v", err)
|
||||
}
|
||||
|
||||
updated, err := store.GetQueue(ctx, "orders")
|
||||
if err != nil {
|
||||
t.Fatalf("read updated queue: %v", err)
|
||||
}
|
||||
|
||||
if updated.MessageTTL != retention {
|
||||
t.Fatalf("unexpected message ttl: got %v want %v", updated.MessageTTL, retention)
|
||||
}
|
||||
if updated.Retention.RetentionTime != retention {
|
||||
t.Fatalf("unexpected retention time: got %v want %v", updated.Retention.RetentionTime, retention)
|
||||
}
|
||||
if updated.Retention.RetentionBytes != 2048 {
|
||||
t.Fatalf("unexpected retention bytes: got %d", updated.Retention.RetentionBytes)
|
||||
}
|
||||
if updated.Retention.RetentionMessages != 10 {
|
||||
t.Fatalf("unexpected retention messages: got %d", updated.Retention.RetentionMessages)
|
||||
}
|
||||
if updated.MaxMessageSize != 4096 {
|
||||
t.Fatalf("unexpected max message size: got %d", updated.MaxMessageSize)
|
||||
}
|
||||
|
||||
if got := updateResp.Msg.Config.GetRetention().GetMaxAge().AsDuration(); got != retention {
|
||||
t.Fatalf("response retention mismatch: got %v want %v", got, retention)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSeekToTimestamp(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
store := memlog.New()
|
||||
h := NewHandler(nil, store, nil, nil)
|
||||
|
||||
cfg := types.DefaultQueueConfig("events", "$queue/events/#")
|
||||
if err := store.CreateQueue(ctx, cfg); err != nil {
|
||||
t.Fatalf("create queue: %v", err)
|
||||
}
|
||||
|
||||
base := time.Unix(1700000000, 0).UTC()
|
||||
points := []time.Time{
|
||||
base,
|
||||
base.Add(1 * time.Minute),
|
||||
base.Add(2 * time.Minute),
|
||||
}
|
||||
for i, ts := range points {
|
||||
if _, err := store.Append(ctx, "events", &types.Message{
|
||||
ID: "m",
|
||||
Topic: "events",
|
||||
Payload: []byte{byte(i)},
|
||||
CreatedAt: ts,
|
||||
}); err != nil {
|
||||
t.Fatalf("append message %d: %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
exactResp, err := h.SeekToTimestamp(ctx, connect.NewRequest(&queuev1.SeekToTimestampRequest{
|
||||
QueueName: "events",
|
||||
Timestamp: timestamppb.New(points[1]),
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("seek exact: %v", err)
|
||||
}
|
||||
if exactResp.Msg.Offset != 1 || !exactResp.Msg.ExactMatch {
|
||||
t.Fatalf("unexpected exact seek response: %+v", exactResp.Msg)
|
||||
}
|
||||
|
||||
between := points[1].Add(30 * time.Second)
|
||||
betweenResp, err := h.SeekToTimestamp(ctx, connect.NewRequest(&queuev1.SeekToTimestampRequest{
|
||||
QueueName: "events",
|
||||
Timestamp: timestamppb.New(between),
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("seek in-between: %v", err)
|
||||
}
|
||||
if betweenResp.Msg.Offset != 2 || betweenResp.Msg.ExactMatch {
|
||||
t.Fatalf("unexpected in-between seek response: %+v", betweenResp.Msg)
|
||||
}
|
||||
|
||||
afterLast := points[2].Add(1 * time.Second)
|
||||
afterResp, err := h.SeekToTimestamp(ctx, connect.NewRequest(&queuev1.SeekToTimestampRequest{
|
||||
QueueName: "events",
|
||||
Timestamp: timestamppb.New(afterLast),
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("seek after last: %v", err)
|
||||
}
|
||||
if afterResp.Msg.Offset != 3 || afterResp.Msg.ExactMatch {
|
||||
t.Fatalf("unexpected tail seek response: %+v", afterResp.Msg)
|
||||
}
|
||||
if got := afterResp.Msg.Timestamp.AsTime(); !got.Equal(afterLast) {
|
||||
t.Fatalf("unexpected tail seek timestamp: got %v want %v", got, afterLast)
|
||||
}
|
||||
|
||||
_, err = h.SeekToTimestamp(ctx, connect.NewRequest(&queuev1.SeekToTimestampRequest{
|
||||
QueueName: "events",
|
||||
}))
|
||||
if got := connect.CodeOf(err); got != connect.CodeInvalidArgument {
|
||||
t.Fatalf("unexpected code for missing timestamp: got %s want %s", got, connect.CodeInvalidArgument)
|
||||
}
|
||||
}
|
||||
@@ -5,7 +5,7 @@ description: Embedded etcd metadata, gRPC routing, session takeover, and cluster
|
||||
|
||||
# Clustering
|
||||
|
||||
**Last Updated:** 2026-02-05
|
||||
**Last Updated:** 2026-02-07
|
||||
|
||||
FluxMQ clustering is designed around one idea: keep **coordination data** consistent across nodes, and keep **payload data** fast and local wherever possible.
|
||||
|
||||
@@ -106,6 +106,12 @@ For “normal” pub/sub topics, the originating node does two things:
|
||||
|
||||
The routing decision is based on the subscription registry and the session-owner map.
|
||||
|
||||
Remote `RoutePublish` deliveries are dispatched to the local protocol broker by client ID namespace:
|
||||
|
||||
- MQTT clients: no protocol prefix
|
||||
- AMQP 1.0 clients: `amqp:`
|
||||
- AMQP 0.9.1 clients: `amqp091-`
|
||||
|
||||
<Mermaid
|
||||
chart="
|
||||
flowchart LR
|
||||
|
||||
@@ -102,6 +102,6 @@ If you are debugging data persistence, start here:
|
||||
- [Routing internals](/docs/architecture/routing)
|
||||
- [Webhooks](/docs/architecture/webhooks)
|
||||
- [Storage internals](/docs/architecture/storage)
|
||||
- [Clustering internals](/docs/architecture/clustering-internals)
|
||||
- [Clustering internals](/docs/architecture/clustering)
|
||||
- [Durable queues](/docs/messaging/durable-queues)
|
||||
- [Configuration reference](/docs/reference/configuration-reference)
|
||||
|
||||
@@ -5,7 +5,7 @@ description: Broker routing internals, session lifecycle, and protocol-specific
|
||||
|
||||
# Routing Internals
|
||||
|
||||
**Last Updated:** 2026-02-05
|
||||
**Last Updated:** 2026-02-07
|
||||
|
||||
FluxMQ ships three protocol brokers that share the same durable queue manager:
|
||||
|
||||
@@ -171,6 +171,7 @@ stateDiagram-v2
|
||||
- Accumulate publish content frames (method -> header -> body)
|
||||
- Route non-queue publishes through the shared router
|
||||
- Map queue publishes and consumes to the queue manager
|
||||
- Deliver cluster-routed messages to local AMQP 0.9.1 clients
|
||||
|
||||
### AMQP 0.9.1 Connection FSM (Handshake)
|
||||
|
||||
@@ -242,12 +243,13 @@ flowchart TD
|
||||
2. A content header and body frames are assembled into a single message.
|
||||
3. Default exchange + `$queue/` routing key goes to the queue manager.
|
||||
4. Otherwise the message is routed to local AMQP subscribers.
|
||||
5. In clustered mode, remote `RoutePublish` deliveries can target local AMQP 0.9.1 clients and are delivered through the same channel consumer path.
|
||||
|
||||
## Where To Look Next
|
||||
|
||||
- [Durable queues](/docs/messaging/durable-queues)
|
||||
- [Storage internals](/docs/architecture/storage)
|
||||
- [Clustering internals](/docs/architecture/clustering-internals)
|
||||
- [Clustering internals](/docs/architecture/clustering)
|
||||
|
||||
## Optional Subsystems
|
||||
|
||||
|
||||
@@ -47,7 +47,10 @@ func main() {
|
||||
log.Printf("Received: %s -> %s", topic, string(payload))
|
||||
})
|
||||
|
||||
c := client.New(opts)
|
||||
c, err := client.New(opts)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// Connect
|
||||
if err := c.Connect(); err != nil {
|
||||
@@ -319,6 +322,7 @@ c.UnsubscribeFromQueue("orders")
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"github.com/absmach/fluxmq/client"
|
||||
)
|
||||
@@ -329,7 +333,10 @@ func main() {
|
||||
SetClientID("order-processor").
|
||||
SetProtocolVersion(5)
|
||||
|
||||
c := client.New(opts)
|
||||
c, err := client.New(opts)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err := c.Connect(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
@@ -354,7 +361,10 @@ func main() {
|
||||
|
||||
// Publish some orders
|
||||
for i := 0; i < 10; i++ {
|
||||
c.PublishToQueue("orders", []byte(`{"id": "`+string(rune(i))+`"}`))
|
||||
order := fmt.Sprintf(`{"id":"%d"}`, i)
|
||||
if err := c.PublishToQueue("orders", []byte(order)); err != nil {
|
||||
log.Printf("publish failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
select {} // Keep running
|
||||
|
||||
@@ -19,4 +19,4 @@ Clustering enables high availability and cross-node routing. FluxMQ uses embedde
|
||||
## Learn More
|
||||
|
||||
- [Running a cluster](/docs/deployment/running-cluster)
|
||||
- [Clustering internals](/docs/architecture/clustering-internals)
|
||||
- [Clustering internals](/docs/architecture/clustering)
|
||||
|
||||
@@ -16,4 +16,4 @@ FluxMQ separates storage concerns into three layers:
|
||||
## Learn More
|
||||
|
||||
- [Storage internals](/docs/architecture/storage)
|
||||
- [Clustering internals](/docs/architecture/clustering-internals)
|
||||
- [Clustering internals](/docs/architecture/clustering)
|
||||
|
||||
@@ -97,5 +97,5 @@ Notes on current behavior:
|
||||
## Learn More
|
||||
|
||||
- [Running a cluster](/docs/deployment/running-cluster)
|
||||
- [Clustering internals](/docs/architecture/clustering-internals)
|
||||
- [Clustering internals](/docs/architecture/clustering)
|
||||
- [Configuration reference](/docs/reference/configuration-reference)
|
||||
|
||||
@@ -34,5 +34,5 @@ From the repo root:
|
||||
|
||||
## Learn More
|
||||
|
||||
- [Clustering internals](/docs/architecture/clustering-internals)
|
||||
- [Clustering internals](/docs/architecture/clustering)
|
||||
- [Configuration reference](/docs/reference/configuration-reference)
|
||||
|
||||
@@ -10,7 +10,7 @@ A high-performance, multi-protocol message broker written in Go designed for sca
|
||||
## Features
|
||||
|
||||
- **Multi-Protocol Support** - MQTT 3.1.1/5.0, WebSocket, HTTP-MQTT Bridge, CoAP Bridge
|
||||
- **High Performance** - 300K-500K msg/s per node with zero-copy packet parsing
|
||||
- **Performance-focused** - zero-copy packet parsing and pooled allocations for high-throughput workloads
|
||||
- **Clustering** - Embedded etcd for distributed coordination, no external dependencies
|
||||
- **Durable Queues** - Consumer groups with ack/nack/reject semantics
|
||||
- **Security** - TLS/mTLS, rate limiting, pluggable auth/authz
|
||||
@@ -121,14 +121,16 @@ FluxMQ uses a clean layered architecture:
|
||||
/>
|
||||
</Cards>
|
||||
|
||||
## Performance
|
||||
## Performance Notes
|
||||
|
||||
| Metric | Value |
|
||||
| -------------------------- | ------------------------ |
|
||||
| **Concurrent Connections** | 500K+ per node |
|
||||
| **Message Throughput** | 300K-500K msg/s per node |
|
||||
| **Latency (local)** | \<10ms |
|
||||
| **Session Takeover** | \<100ms |
|
||||
| **Concurrent Connections** | Workload and hardware dependent |
|
||||
| **Message Throughput** | Workload and hardware dependent |
|
||||
| **Latency (local)** | Depends on QoS, fan-out, and persistence settings |
|
||||
| **Session Takeover** | Fast-path optimized; benchmark in your target environment |
|
||||
|
||||
See `benchmarks/README.md` in the repository for reproducible benchmark commands.
|
||||
|
||||
## Use Cases
|
||||
|
||||
|
||||
@@ -575,7 +575,7 @@ In clustered deployments, queue behavior depends on `cluster.raft.*`:
|
||||
- `write_policy` controls how followers handle incoming queue publishes.
|
||||
- `distribution_mode` controls whether deliveries are routed (`forward`) or logs are replicated (`replicate`).
|
||||
|
||||
See [Cluster configuration](/docs/configuration/clustering) and [Clustering internals](/docs/architecture/clustering-internals) for the full picture.
|
||||
See [Cluster configuration](/docs/configuration/clustering) and [Clustering internals](/docs/architecture/clustering) for the full picture.
|
||||
|
||||
## DLQ Status
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ The example uses only third-party MQTT and AMQP client libraries — no FluxMQ-s
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- **Go 1.21+** installed
|
||||
- **Go 1.24+** installed
|
||||
- **FluxMQ running locally** with default ports:
|
||||
- MQTT: `1883`
|
||||
- AMQP 0.9.1: `5682`
|
||||
@@ -27,7 +27,7 @@ Start FluxMQ with Docker or from source:
|
||||
docker compose -f docker/compose.yaml up -d
|
||||
|
||||
# From source
|
||||
go run ./cmd/ --config examples/no-cluster.yaml
|
||||
go run ./cmd --config examples/no-cluster.yaml
|
||||
```
|
||||
|
||||
See [Quick Start (Docker)](/docs/getting-started/quick-start-docker) for details.
|
||||
|
||||
@@ -5,7 +5,7 @@ description: Comprehensive YAML configuration reference for server, broker, stor
|
||||
|
||||
# Configuration Reference
|
||||
|
||||
**Last Updated:** 2026-02-05
|
||||
**Last Updated:** 2026-02-07
|
||||
|
||||
FluxMQ uses a single YAML configuration file. Start the broker with:
|
||||
|
||||
@@ -190,10 +190,10 @@ storage:
|
||||
Clustering combines:
|
||||
|
||||
- **Embedded etcd** (`cluster.etcd`): metadata coordination (session ownership, subscriptions, queue consumers, retained/will metadata).
|
||||
- **gRPC transport** (`cluster.transport`): cross-node routing (publishes, queue messages, session takeover, hybrid payload fetch).
|
||||
- **gRPC transport** (`cluster.transport`): cross-node routing (publishes, queue messages, session takeover, hybrid payload fetch), including delivery to local MQTT, AMQP 1.0, and AMQP 0.9.1 clients.
|
||||
- **Optional Raft** (`cluster.raft`): replicates durable queue operations.
|
||||
|
||||
For a “how it works” deep dive, see [Clustering internals](/docs/architecture/clustering-internals).
|
||||
For a “how it works” deep dive, see [Clustering internals](/docs/architecture/clustering).
|
||||
|
||||
```yaml
|
||||
cluster:
|
||||
|
||||
Reference in New Issue
Block a user