Update client and README

Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
dusan
2026-02-05 18:06:45 +01:00
parent 41734e4cc2
commit 8fb1ab8a76
4 changed files with 143 additions and 106 deletions
+20 -74
View File
@@ -23,13 +23,13 @@ A high-performance, multi-protocol message broker written in Go designed for sca
- **Event backbone for microservices** - Reliable, ordered event distribution between services with at-least-once or exactly-once delivery (QoS 1/2)
- **CQRS systems** - Durable queues for command/event distribution with per-queue FIFO ordering
- **Asynchronous workflows** - Decouple services with persistent message queues and ack/nack-based redelivery
- **Real-time event processing** - High throughput (300K-500K msg/s per node) with low latency (<10ms local, ~50ms cross-node)
- **Real-time event processing** - Low-latency pub/sub with durable queues and ordering
**Why choose this over for EDA:**
**Why choose this for EDA:**
- ✅ Simple operations - single binary with embedded storage, no Zookeeper/KRaft
- ✅ Multi-protocol - same broker handles MQTT, HTTP, WebSocket, CoAP
- ✅ Per-queue FIFO ordering (single-log queues)
- ✅ Retention via committed-offset truncation (time/size retention planned)
- ✅ Retention policies for queue logs (time/size/message count)
- ✅ Optional Raft layer for queue appends (WIP)
**IoT & Real-Time Systems**
@@ -39,20 +39,9 @@ A high-performance, multi-protocol message broker written in Go designed for sca
- **Constrained devices** - CoAP bridge for resource-limited IoT hardware
**High-Availability Systems**
- **Clustered deployments** - 3-5 node clusters with automatic failover (sub-100ms session takeover)
- **Clustered deployments** - Automatic session takeover with embedded coordination
- **Geographic distribution** - gRPC-based cross-node routing with embedded etcd coordination
- **Scalability** - Cluster support (3-node cluster: 1-2M msg/s, 5-node cluster: 2-4M msg/s)
## ⚠️ Not Recommended For
**Long-term Event Storage**
- ❌ Event sourcing as permanent source of truth - compaction/deletion/retention policies are allowed
- ❌ Compliance/audit trails requiring immutability - use purpose-built event stores (EventStoreDB)
- ❌ Time-travel debugging or temporal queries - no time-range indexing
**Complex Event Processing**
- ❌ Advanced queries over events - no indexing beyond topic and offset
- ❌ Built-in stream processing - no Kafka Streams equivalent. Consumers do event processing
- **Scalability** - Horizontal scaling with multi-node clusters
### Event-Driven Architecture Pattern
@@ -110,7 +99,7 @@ FluxMQ is optimized for event-driven systems that need ordered delivery, durable
- FIFO per queue and per consumer group (single cursor)
- DLQ handler present (delivery path wiring pending)
- Optional Raft layer for queue appends (WIP)
- Retention via committed-offset truncation (time/size retention planned)
- Retention policies (time/size/message count)
- **Persistent Storage**
- BadgerDB for session state and offline queues
@@ -199,68 +188,25 @@ Defaults in `examples/no-cluster.yaml`:
Configuration is YAML-based. See `examples/` for starter files and `docs/configuration.md` for the full reference.
## Performance
## Benchmarks
| Metric | Value |
| -------------------------- | ------------------------ |
| **Concurrent Connections** | 500K+ per node |
| **Message Throughput** | 300K-500K msg/s per node |
| **Latency (local)** | <10ms |
| **Latency (cross-node)** | ~5ms |
| **Session Takeover** | <100ms |
**With clustering and topic sharding:**
- 3-node cluster: 1-2M msg/s
- 5-node cluster: 2-4M msg/s
See [Scaling & Performance](docs/scaling.md) for detailed benchmarks.
Benchmark results are workload- and hardware-dependent. For reproducible numbers,
run the benchmark scripts in `benchmarks/` and capture results on your target
hardware. See `benchmarks/README.md` for commands and guidance.
## Documentation
| Document | Description |
| ---------------------------------------- | ------------------------------------------- |
| [Architecture](docs/architecture.md) | Detailed system design |
| [Scaling & Performance](docs/scaling.md) | Capacity analysis, benchmarks, optimization |
| [Clustering](docs/clustering.md) | Distributed broker design |
| Document | Description |
| ---------------------------------------- | ------------------------------------------------- |
| [Architecture](docs/architecture.md) | Detailed system design |
| [Scaling & Performance](docs/scaling.md) | Benchmarking and tuning guidance |
| [Clustering](docs/clustering.md) | Distributed broker design |
| [Client Library](docs/client.md) | Go MQTT and AMQP 0.9.1 clients with queue support |
| [Broker Internals](docs/broker.md) | Message routing, sessions |
| [Durable Queues](docs/queue.md) | Queue configuration, consumer groups |
| [Configuration](docs/configuration.md) | Complete config reference |
| [Webhooks](docs/webhooks.md) | Webhook event system |
| [Roadmap](docs/roadmap.md) | Development plan |
## Roadmap
### Completed ✅
- MQTT 3.1.1 and 5.0 support
- TCP, WebSocket, HTTP transports
- QoS 0/1/2, retained messages, will messages
- Clustering with embedded etcd
- gRPC inter-broker communication (mTLS supported)
- BadgerDB persistent storage
- Durable queues with consumer groups
- Raft layer for queue appends (WIP)
- Committed-offset truncation for queues (time/size retention planned)
- TLS/mTLS for client and inter-broker connections
- WebSocket origin validation
- Shared subscriptions (MQTT 5.0)
- MaxQoS enforcement (MQTT 5.0)
- Performance optimization (3.3x throughput, zero-copy buffers)
- Rate limiting (per-IP connections, per-client messages/subscriptions)
- CoAP with UDP and DTLS/mDTLS support
### In Progress 🚧
- Secure default ACL and Auth integrations
### Planned 📋
- Management dashboard
- Prometheus metrics endpoint
- Distributed tracing instrumentation
- Hot configuration reload
- Load tests and benchmarks
- Performance optimizations and fine-tuning
See [Roadmap](docs/roadmap.md) for details.
| [Broker Internals](docs/broker.md) | Message routing, sessions |
| [Durable Queues](docs/queue.md) | Queue configuration, consumer groups |
| [Configuration](docs/configuration.md) | Complete config reference |
| [Webhooks](docs/webhooks.md) | Webhook event system |
| [Roadmap](docs/roadmap.md) | Project planning notes |
## Contributing
+79
View File
@@ -527,6 +527,53 @@ func (c *Client) Publish(topic string, payload []byte, qos byte, retain bool) er
return op.wait(c.opts.AckTimeout)
}
// PublishMessage sends a message with optional MQTT 5.0 publish properties.
// For MQTT 3.1.1, publish properties are ignored.
func (c *Client) PublishMessage(msg *Message) error {
if msg == nil {
return ErrInvalidMessage
}
if !c.state.isConnected() {
return ErrNotConnected
}
if msg.QoS > 2 {
return ErrInvalidQoS
}
if msg.Topic == "" {
return ErrInvalidTopic
}
if msg.QoS == 0 {
return c.sendPublish(msg, 0)
}
packetID := c.pending.nextPacketID()
if packetID == 0 {
return ErrMaxInflight
}
publishMsg := msg.Copy()
publishMsg.PacketID = packetID
if err := c.store.StoreOutbound(packetID, publishMsg); err != nil {
return err
}
op, err := c.pending.add(packetID, pendingPublish, publishMsg)
if err != nil {
c.store.DeleteOutbound(packetID)
return err
}
if err := c.sendPublish(publishMsg, packetID); err != nil {
c.pending.remove(packetID)
c.store.DeleteOutbound(packetID)
return err
}
return op.wait(c.opts.AckTimeout)
}
func (c *Client) sendPublish(msg *Message, packetID uint16) error {
c.connMu.RLock()
conn := c.conn
@@ -552,6 +599,38 @@ func (c *Client) sendPublish(msg *Message, packetID uint16) error {
ID: packetID,
}
if msg.PayloadFormat != nil || msg.MessageExpiry != nil || msg.ContentType != "" || msg.ResponseTopic != "" || len(msg.CorrelationData) > 0 || len(msg.UserProperties) > 0 {
if pkt.Properties == nil {
pkt.Properties = &v5.PublishProperties{}
}
}
if msg.PayloadFormat != nil {
pkt.Properties.PayloadFormat = msg.PayloadFormat
}
if msg.MessageExpiry != nil {
pkt.Properties.MessageExpiry = msg.MessageExpiry
}
if msg.ContentType != "" {
pkt.Properties.ContentType = msg.ContentType
}
if msg.ResponseTopic != "" {
pkt.Properties.ResponseTopic = msg.ResponseTopic
}
if len(msg.CorrelationData) > 0 {
pkt.Properties.CorrelationData = msg.CorrelationData
}
if len(msg.UserProperties) > 0 {
if pkt.Properties == nil {
pkt.Properties = &v5.PublishProperties{}
}
pkt.Properties.User = make([]v5.User, 0, len(msg.UserProperties))
for k, v := range msg.UserProperties {
pkt.Properties.User = append(pkt.Properties.User, v5.User{Key: k, Value: v})
}
}
// Apply topic alias if available
if c.topicAliases != nil {
if alias, isNew, ok := c.topicAliases.getOrAssignOutbound(msg.Topic); ok {
+1
View File
@@ -24,6 +24,7 @@ var (
ErrMaxInflight = errors.New("maximum inflight messages exceeded")
ErrConnectionLost = errors.New("connection lost")
ErrClientClosed = errors.New("client has been closed")
ErrInvalidMessage = errors.New("invalid message")
ErrInvalidQoS = errors.New("invalid QoS level (must be 0, 1, or 2)")
ErrInvalidTopic = errors.New("invalid topic")
ErrSubscribeFailed = errors.New("subscription failed")
+43 -32
View File
@@ -10,11 +10,11 @@ currently include a dedicated AMQP 1.0 client library.
- **Protocol Support:** MQTT 3.1.1 (v4) and MQTT 5.0 (v5)
- **Auto-Reconnect:** Exponential backoff with configurable limits
- **QoS Levels:** Full QoS 0/1/2 support with message persistence
- **QoS Levels:** Full QoS 0/1/2 support with pluggable in-flight store (memory by default)
- **TLS/SSL:** Secure connections with custom certificates
- **Session Persistence:** Configurable session expiry
- **Durable Queues:** Consumer groups and acknowledgments (DLQ wiring pending)
- **MQTT 5.0 Features:** Topic aliases, user properties, flow control
- **MQTT 5.0 Features:** Topic aliases, user properties (publish/receive/will), flow control
---
@@ -135,20 +135,24 @@ c.Publish("topic", []byte("payload"), 2, false)
c.Publish("config/device", []byte("settings"), 1, true)
```
### Publish with MQTT 5.0 Properties
### MQTT 5.0 Publish Properties
Use `PublishMessage` to set publish properties such as content type, response
topic, correlation data, and user properties (MQTT 5.0 only).
```go
msg := &client.Message{
Topic: "sensors/temp",
Payload: []byte("22.5"),
QoS: 1,
UserProperties: map[string]string{"unit": "celsius"},
ContentType: "text/plain",
ResponseTopic: "responses/temp",
CorrelationData: []byte("req-123"),
MessageExpiry: 3600, // Expires in 1 hour
Topic: "sensors/temp",
Payload: []byte("22.5"),
QoS: 1,
ContentType: "text/plain",
ResponseTopic: "responses/temp",
CorrelationData: []byte("req-123"),
UserProperties: map[string]string{"unit": "celsius"},
}
if err := c.PublishMessage(msg); err != nil {
log.Fatal(err)
}
c.PublishMessage(msg)
```
---
@@ -219,6 +223,7 @@ opts.SetOnMessageV2(func(msg *client.Message) {
The client supports durable queues with consumer groups and message acknowledgment.
Reject/DLQ wiring in the broker is pending.
MQTT v3 can publish and subscribe to queue topics, but acknowledgments require MQTT v5 user properties.
**When to use queues instead of regular pub/sub:**
- You need at-least-once processing with explicit acknowledgments
@@ -252,18 +257,16 @@ c.PublishToQueueWithOptions(&client.QueuePublishOptions{
err := c.SubscribeToQueue("orders", "order-processors", func(msg *client.QueueMessage) {
log.Printf("Processing order: %s", msg.Payload)
log.Printf("Message ID: %s", msg.MessageID)
if msg.UserProperties != nil {
log.Printf("Offset: %s", msg.UserProperties["offset"])
log.Printf("Group: %s", msg.UserProperties["group-id"])
}
log.Printf("Group: %s", msg.GroupID)
log.Printf("Offset: %d", msg.Offset)
// Process message...
if processedOK {
msg.Ack() // Message removed from queue
} else if shouldRetry {
msg.Nack() // Redelivery eligible immediately (no backoff enforcement yet)
msg.Nack() // Redelivery eligible (subject to broker delivery/visibility timing)
} else {
msg.Reject() // Rejects message; DLQ wiring pending
msg.Reject() // Removes from pending; DLQ routing not wired yet
}
})
```
@@ -274,7 +277,7 @@ err := c.SubscribeToQueue("orders", "order-processors", func(msg *client.QueueMe
| -------------- | -------------------------------------------------- |
| `msg.Ack()` | Message processed successfully, removed from queue |
| `msg.Nack()` | Processing failed, make eligible for redelivery |
| `msg.Reject()` | Permanent failure, DLQ wiring pending |
| `msg.Reject()` | Remove from pending; DLQ routing not wired yet |
### Direct Acknowledgment
@@ -284,9 +287,9 @@ c.AckWithGroup("orders", "msg-12345", "processors")
c.NackWithGroup("orders", "msg-12345", "processors")
c.RejectWithGroup("orders", "msg-12345", "processors")
```
Note: the broker expects `message-id` and `group-id` user properties on acks
(MQTT v5). `QueueMessage.Ack()` sends both. For direct acks, use the `*WithGroup`
helpers or provide `group-id` manually.
Note: MQTT queue acknowledgments require MQTT v5 and the broker expects
`message-id` and `group-id` user properties on ack messages. `QueueMessage.Ack()`
sends both when they are present on incoming messages.
### Unsubscribe from Queue
@@ -376,7 +379,7 @@ SetOnServerCapabilities(func(caps *client.ServerCapabilities) {
c.Disconnect()
// With reason (MQTT 5.0)
c.DisconnectWithReason(0x04, "Going offline")
c.DisconnectWithReason(0x04, 0, "Going offline")
```
---
@@ -415,8 +418,12 @@ opts.Will = &client.WillMessage{
| `ErrNoServers` | No broker addresses configured |
| `ErrEmptyClientID` | ClientID not set |
| `ErrInvalidProtocol` | Protocol version must be 4 or 5 |
| `ErrInvalidQoS` | QoS must be 0, 1, or 2 |
| `ErrInvalidTopic` | Empty or invalid topic string |
| `ErrInvalidMessage` | Message is nil or invalid |
| `ErrMaxInflight` | Too many pending messages |
| `ErrQueueAckRequiresV5` | Queue acks require MQTT v5 user properties |
| `ErrQueueAckMissingGroup` | `group-id` missing for queue ack |
### Handling Connection Errors
@@ -437,16 +444,17 @@ if err := c.Connect(); err != nil {
## Message Store
For QoS 1/2 message persistence:
For QoS 1/2 in-flight storage:
```go
store := client.NewFileStore("/path/to/store")
store := client.NewMemoryStore()
opts.SetStore(store)
```
Built-in stores:
- **MemoryStore** (default): In-memory, lost on restart
- **FileStore**: Persistent to disk
You can implement the `MessageStore` interface to persist QoS 1/2 in-flight data.
---
@@ -458,7 +466,9 @@ Built-in stores:
| ConnectTimeout | 10 seconds |
| WriteTimeout | 5 seconds |
| AckTimeout | 10 seconds |
| PingTimeout | 5 seconds |
| MaxInflight | 100 |
| MessageChanSize | 256 |
| AutoReconnect | true |
| ReconnectBackoff | 1 second |
| MaxReconnectWait | 2 minutes |
@@ -524,11 +534,12 @@ func main() {
Stream queues provide log-style consumption with cursor offsets.
Stream queue names follow RabbitMQ conventions (no `$queue/` prefix).
Supported offsets: `first`, `last`, `next`, `offset=<n>`, `timestamp=<unix>`.
Offsets are passed as `x-stream-offset` strings; values like `first`, `last`,
`next`, `offset=<n>`, `timestamp=<unix>` are interpreted by the broker.
```go
// Declare a stream queue
qName, err := c.DeclareStreamQueue(&amqp091.StreamQueueOptions{
qName, err := c.DeclareStreamQueue(&amqp.StreamQueueOptions{
Name: "events",
Durable: true,
MaxAge: "7D",
@@ -540,10 +551,10 @@ if err != nil {
log.Printf("stream queue: %s", qName)
// Consume from the beginning
err = c.SubscribeToStream(&amqp091.StreamConsumeOptions{
err = c.SubscribeToStream(&amqp.StreamConsumeOptions{
QueueName: "events",
Offset: "first",
}, func(msg *amqp091.QueueMessage) {
}, func(msg *amqp.QueueMessage) {
if off, ok := msg.StreamOffset(); ok {
log.Printf("offset=%d payload=%s", off, string(msg.Body))
}
@@ -583,7 +594,7 @@ Minimal example:
```go
autoCommit := false
_ = c.SubscribeToStream(&amqp091.StreamConsumeOptions{
_ = c.SubscribeToStream(&amqp.StreamConsumeOptions{
QueueName: "events",
ConsumerGroup: "my-group",
AutoCommit: &autoCommit,
@@ -602,7 +613,7 @@ With manual commit:
### Pub/Sub
```go
_ = c.Subscribe("sensors/#", func(msg *amqp091.Message) {
_ = c.Subscribe("sensors/#", func(msg *amqp.Message) {
log.Printf("Topic: %s Payload: %s", msg.Topic, string(msg.Body))
})