Fix commit handling

Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
dusan
2026-02-04 13:55:13 +01:00
parent faa59e69a4
commit 723bdcff9b
17 changed files with 238 additions and 89 deletions
+3
View File
@@ -65,6 +65,9 @@ A high-performance, multi-protocol message broker written in Go designed for sca
**Recommended configuration for EDA:**
```yaml
queue_manager:
auto_commit_interval: "5s"
queues:
- name: "orders"
topics: ["orders/#", "$queue/orders/#"]
+1
View File
@@ -43,6 +43,7 @@ type QueueManager interface {
Ack(ctx context.Context, queueName, messageID, groupID string) error
Nack(ctx context.Context, queueName, messageID, groupID string) error
Reject(ctx context.Context, queueName, messageID, groupID, reason string) error
CommitOffset(ctx context.Context, queueName, groupID string, offset uint64) error
}
// Broker is the core AMQP 0.9.1 broker.
+43 -1
View File
@@ -331,6 +331,38 @@ func (ch *Channel) completePublish() {
topic = exchangeName + "/" + routingKey
}
// Stream commit: publish to $queue/<queue>/$commit with x-group-id and x-offset headers.
if exchangeName == "" && strings.HasPrefix(routingKey, "$queue/") && strings.HasSuffix(routingKey, "/$commit") {
qm := ch.conn.broker.getQueueManager()
if qm == nil {
ch.conn.logger.Warn("queue commit ignored: queue manager not configured", "routing_key", routingKey)
} else {
queueName := strings.TrimSuffix(strings.TrimPrefix(routingKey, "$queue/"), "/$commit")
if queueName == "" {
ch.conn.logger.Warn("queue commit missing queue name", "routing_key", routingKey)
} else {
headers := header.Properties.Headers
groupID, ok := parseStringArg(headers["x-group-id"])
if !ok || groupID == "" {
ch.conn.logger.Warn("queue commit missing group id", "queue", queueName)
} else {
offsetVal, ok := headers["x-offset"]
if !ok {
ch.conn.logger.Warn("queue commit missing offset", "queue", queueName, "group", groupID)
} else if n, ok := parseInt64Arg(offsetVal); !ok || n < 0 {
ch.conn.logger.Warn("queue commit invalid offset", "queue", queueName, "group", groupID)
} else if err := qm.CommitOffset(context.Background(), queueName, groupID, uint64(n)); err != nil {
ch.conn.logger.Warn("queue commit failed", "queue", queueName, "group", groupID, "error", err)
}
}
}
}
if ch.confirmMode {
ch.sendPublisherAck()
}
return
}
// Direct queue publish: use $queue/ prefix on routing key with default exchange.
if exchangeName == "" && strings.HasPrefix(routingKey, "$queue/") {
qm := ch.conn.broker.getQueueManager()
@@ -591,7 +623,7 @@ func (ch *Channel) sendDelivery(cons *consumer, topic string, payload []byte, pr
} else {
headers[k] = v
}
case "x-primary-group-processed":
case "x-work-acked":
headers[k] = v == "true"
default:
headers[k] = v
@@ -1488,6 +1520,16 @@ func parseInt64Arg(val any) (int64, bool) {
return 0, false
}
func parseStringArg(val any) (string, bool) {
switch v := val.(type) {
case string:
return strings.TrimSpace(v), true
case []byte:
return strings.TrimSpace(string(v)), true
}
return "", false
}
func extractAutoCommit(args map[string]interface{}) *bool {
if len(args) == 0 {
return nil
-9
View File
@@ -89,17 +89,8 @@ func (qm *QueueMessage) StreamTimestamp() (int64, bool) {
return 0, false
}
// PrimaryGroupProcessed reports whether the primary group has processed this offset.
func (qm *QueueMessage) PrimaryGroupProcessed() (bool, bool) {
return headerBool(qm.Headers, "x-primary-group-processed")
}
// WorkAcked reports whether the primary work group has acknowledged this offset.
// Deprecated: Use PrimaryGroupProcessed instead.
func (qm *QueueMessage) WorkAcked() (bool, bool) {
if v, ok := headerBool(qm.Headers, "x-primary-group-processed"); ok {
return v, ok
}
return headerBool(qm.Headers, "x-work-acked")
}
+1
View File
@@ -334,6 +334,7 @@ func main() {
// Convert queue configs from main config to queue types
queueCfg := queue.DefaultConfig()
queueCfg.AutoCommitInterval = cfg.QueueManager.AutoCommitInterval
for _, qc := range cfg.Queues {
queueCfg.QueueConfigs = append(queueCfg.QueueConfigs, queueTypes.FromInput(queueTypes.QueueConfigInput{
Name: qc.Name,
+24 -9
View File
@@ -15,15 +15,16 @@ import (
// Config holds all configuration for the MQTT broker.
type Config struct {
Server ServerConfig `yaml:"server"`
Broker BrokerConfig `yaml:"broker"`
Session SessionConfig `yaml:"session"`
Log LogConfig `yaml:"log"`
Storage StorageConfig `yaml:"storage"`
Cluster ClusterConfig `yaml:"cluster"`
Webhook WebhookConfig `yaml:"webhook"`
RateLimit RateLimitConfig `yaml:"ratelimit"`
Queues []QueueConfig `yaml:"queues"`
Server ServerConfig `yaml:"server"`
Broker BrokerConfig `yaml:"broker"`
Session SessionConfig `yaml:"session"`
Log LogConfig `yaml:"log"`
Storage StorageConfig `yaml:"storage"`
Cluster ClusterConfig `yaml:"cluster"`
Webhook WebhookConfig `yaml:"webhook"`
RateLimit RateLimitConfig `yaml:"ratelimit"`
QueueManager QueueManagerConfig `yaml:"queue_manager"`
Queues []QueueConfig `yaml:"queues"`
}
// QueueConfig defines configuration for a persistent queue.
@@ -67,6 +68,13 @@ type QueueRetention struct {
MaxLengthMessages int64 `yaml:"max_length_messages"`
}
// QueueManagerConfig defines runtime behavior for the queue manager.
type QueueManagerConfig struct {
// AutoCommitInterval controls how often stream groups auto-commit offsets.
// Zero means commit on every delivery batch.
AutoCommitInterval time.Duration `yaml:"auto_commit_interval"`
}
// RateLimitConfig holds rate limiting configuration.
type RateLimitConfig struct {
Enabled bool `yaml:"enabled"`
@@ -536,6 +544,9 @@ func Default() *Config {
Burst: 10,
},
},
QueueManager: QueueManagerConfig{
AutoCommitInterval: 5 * time.Second,
},
Queues: []QueueConfig{
{
Name: "mqtt",
@@ -894,6 +905,10 @@ func (c *Config) Validate() error {
}
}
if c.QueueManager.AutoCommitInterval < 0 {
return fmt.Errorf("queue_manager.auto_commit_interval must be >= 0")
}
// Queue validation
seenQueues := make(map[string]bool)
for i, q := range c.Queues {
+20 -20
View File
@@ -562,38 +562,38 @@ if err := c.PublishToStream("events", []byte("hello"), nil); err != nil {
Stream deliveries include:
- `x-stream-offset`
- `x-stream-timestamp`
- `x-primary-group-processed` / `x-work-committed-offset`
- `x-work-acked` / `x-work-committed-offset`
The `x-work-*` fields report the configured primary work group's committed offset.
`x-work-acked` is `true` when this message's offset is below the committed offset,
which can lag slightly due to auto-commit interval batching.
Convenience accessors are available on `QueueMessage`:
`StreamOffset()`, `StreamTimestamp()`, `PrimaryGroupProcessed()`, `WorkCommittedOffset()`, `WorkGroup()`.
`WorkAcked()` is deprecated but still supported for backward compatibility.
`StreamOffset()`, `StreamTimestamp()`, `WorkAcked()`, `WorkCommittedOffset()`, `WorkGroup()`.
### Manual Commit Mode
By default, stream consumers auto-commit offsets as messages are delivered. For
exactly-once processing, disable auto-commit:
By default, stream consumers auto-commit offsets as messages are delivered
(similar to Kafka's `enable.auto.commit=true`). For exactly-once processing,
disable auto-commit and commit explicitly.
Auto-commit is rate-limited by the server setting
`queue_manager.auto_commit_interval` (default: `5s`).
Minimal example:
```go
autoCommit := false
err = c.SubscribeToStream(&amqp091.StreamConsumeOptions{
QueueName: "events",
AutoCommit: &autoCommit,
Offset: "first",
}, func(msg *amqp091.QueueMessage) {
if off, ok := msg.StreamOffset(); ok {
log.Printf("offset=%d payload=%s", off, string(msg.Body))
}
// Process message...
// Explicitly commit after successful processing
})
_ = c.SubscribeToStream(&amqp091.StreamConsumeOptions{
QueueName: "events",
ConsumerGroup: "my-group",
AutoCommit: &autoCommit,
}, handler)
// After processing a batch, commit the last processed offset
if err := c.CommitOffset("events", "my-group", lastProcessedOffset); err != nil {
log.Printf("commit failed: %v", err)
}
_ = c.CommitOffset("events", "my-group", lastProcessedOffset)
```
Use the same consumer group name in both calls.
With manual commit:
- Messages are delivered but the committed offset doesn't advance automatically
- On reconnect, delivery resumes from the last committed offset
+18
View File
@@ -511,6 +511,21 @@ Controls what happens when the offline queue is full.
## Queue Configuration
Queue runtime settings live under `queue_manager`:
```yaml
queue_manager:
auto_commit_interval: "5s"
```
### auto_commit_interval
How often stream consumer groups auto-commit offsets when auto-commit is enabled.
Notes:
- Default: `5s` (Kafka-like).
- `0s` commits on every delivery batch (lowest latency, more write pressure).
Queues are defined under `queues:` and bind **topic patterns** to durable queues.
Topics use MQTT-style wildcards (`+`, `#`). A publish routed through the queue
manager is enqueued into every queue whose bindings match the topic.
@@ -519,6 +534,9 @@ If no queues are configured, a default reserved queue named `mqtt` is created
with topics `["$queue/#"]`, which preserves the `$queue/*` behavior out of the box.
```yaml
queue_manager:
auto_commit_interval: "5s"
queues:
- name: "orders"
topics:
+22 -10
View File
@@ -102,7 +102,7 @@ Queue deliveries include properties:
Stream deliveries also include:
- `x-stream-offset`
- `x-stream-timestamp`
- `x-primary-group-processed` (based on the primary work group's committed offset)
- `x-work-acked` (true when this message is below the primary work group's committed offset; may lag by the auto-commit interval)
- `x-work-committed-offset`
- `x-work-group`
@@ -155,12 +155,13 @@ truncation point for queue-mode consumers.
- `timestamp=<unix-seconds|unix-millis>`
FluxMQ extensions for stream consumers:
- `x-primary-group-processed` and `x-work-committed-offset` to report delivery status for the
- `x-work-acked` and `x-work-committed-offset` to report delivery status for the
configured primary work group.
- `x-work-group` to identify the group used for status.
- Optional `x-consumer-group` on `basic.consume` to persist a shared cursor.
If omitted, the consumer tag is used as the stream group ID.
- Optional `x-auto-commit=false` to disable automatic offset commits (manual commit mode).
- Optional `x-auto-commit=false` to disable automatic offset commits
(default is enabled, similar to Kafka's `enable.auto.commit=true`).
Primary work group is configured per queue (see configuration section) and is
used only for delivery status reporting; it does not affect routing.
@@ -187,20 +188,26 @@ If a consumer joins a group with a different mode, the broker returns `ErrGroupM
### Manual Commit for Stream Consumers
By default, stream consumers auto-commit offsets as messages are delivered. For exactly-once processing, disable auto-commit:
By default, stream consumers auto-commit offsets as messages are delivered.
Commits are rate-limited by `queue_manager.auto_commit_interval` (default: `5s`),
so offsets are updated at most once per interval. For exactly-once processing,
disable auto-commit and commit explicitly.
Minimal example:
```go
autoCommit := false
err := client.SubscribeToStream(&StreamConsumeOptions{
QueueName: "events",
AutoCommit: &autoCommit,
Offset: "first",
_ = client.SubscribeToStream(&StreamConsumeOptions{
QueueName: "events",
ConsumerGroup: "my-group",
AutoCommit: &autoCommit,
}, handler)
// After processing, explicitly commit
client.CommitOffset("events", "my-group", lastProcessedOffset)
_ = client.CommitOffset("events", "my-group", lastProcessedOffset)
```
Use the same consumer group name in both calls.
With manual commit:
- Messages are delivered but committed offset doesn't advance automatically
- On reconnect, delivery resumes from last committed offset
@@ -301,6 +308,9 @@ Zero-copy delivery for queue messages is planned.
Queue bindings live under `queues` in the main config:
```yaml
queue_manager:
auto_commit_interval: "5s"
queues:
- name: "orders"
topics:
@@ -324,6 +334,8 @@ queues:
Notes:
- If no queues are configured, a default reserved queue `mqtt` is created with topic `$queue/#`.
- Auto-created queues are **ephemeral** and expire after the last consumer disconnects.
- `queue_manager.auto_commit_interval` controls how often stream offsets are auto-committed.
Use `0s` to commit every delivery batch.
- `message_ttl` is stored in message metadata; automatic expiration is not enforced yet.
- `limits` and `retry` are parsed into queue configs but not enforced at runtime yet.
- `dlq` configuration is parsed, but reject/DLQ wiring is not active in the main delivery path.
+3
View File
@@ -177,6 +177,9 @@ webhook:
# Queue Configuration
# Queues are used for durable, persistent message delivery.
# Messages published to topics starting with $queue/ are routed to matching queues.
queue_manager:
auto_commit_interval: 5s
queues:
# Default MQTT queue - captures all $queue/* messages
- name: "mqtt"
+3
View File
@@ -60,6 +60,9 @@ log:
format: "text"
# Queue Configuration - persistent message queues
queue_manager:
auto_commit_interval: 5s
queues:
- name: "mqtt"
topics:
+3
View File
@@ -105,3 +105,6 @@ cluster:
log:
level: "debug"
format: "text"
queue_manager:
auto_commit_interval: 5s
+3
View File
@@ -105,3 +105,6 @@ cluster:
log:
level: "debug"
format: "text"
queue_manager:
auto_commit_interval: 5s
+3
View File
@@ -105,3 +105,6 @@ cluster:
log:
level: "debug"
format: "text"
queue_manager:
auto_commit_interval: 5s
+54 -25
View File
@@ -4,6 +4,7 @@
package logstorage
import (
"bytes"
"encoding/json"
"fmt"
"os"
@@ -25,6 +26,12 @@ type ConsumerGroupStateStore struct {
const consumerGroupVersion uint8 = 2
type consumerGroupWrapper struct {
Version uint8 `json:"version"`
State json.RawMessage `json:"state"`
SavedAt int64 `json:"saved_at"`
}
// NewConsumerGroupStateStore creates or opens a consumer group state store.
func NewConsumerGroupStateStore(baseDir string) (*ConsumerGroupStateStore, error) {
dir := filepath.Join(baseDir, "groups")
@@ -45,6 +52,40 @@ func NewConsumerGroupStateStore(baseDir string) (*ConsumerGroupStateStore, error
return store, nil
}
func decodeConsumerGroupState(data []byte) (*types.ConsumerGroupState, bool, error) {
var wrapper consumerGroupWrapper
if err := json.Unmarshal(data, &wrapper); err != nil {
return nil, false, err
}
if wrapper.Version > consumerGroupVersion {
return nil, false, fmt.Errorf("unsupported consumer group version: %d", wrapper.Version)
}
rawState := bytes.TrimSpace(wrapper.State)
if len(rawState) == 0 || bytes.Equal(rawState, []byte("null")) {
return nil, false, nil
}
var state types.ConsumerGroupState
if err := json.Unmarshal(rawState, &state); err != nil {
return nil, false, err
}
hasAutoCommit := false
var fields map[string]json.RawMessage
if err := json.Unmarshal(rawState, &fields); err == nil {
if _, ok := fields["AutoCommit"]; ok {
hasAutoCommit = true
} else if _, ok := fields["autoCommit"]; ok {
hasAutoCommit = true
} else if _, ok := fields["auto_commit"]; ok {
hasAutoCommit = true
}
}
return &state, hasAutoCommit, nil
}
// loadAll loads all consumer group states from disk.
func (s *ConsumerGroupStateStore) loadAll() error {
err := filepath.Walk(s.dir, func(path string, info os.FileInfo, err error) error {
@@ -60,23 +101,14 @@ func (s *ConsumerGroupStateStore) loadAll() error {
if err != nil {
return nil
}
var wrapper struct {
Version uint8 `json:"version"`
State *types.ConsumerGroupState `json:"state"`
SavedAt int64 `json:"saved_at"`
}
if err := json.Unmarshal(data, &wrapper); err != nil {
state, hasAutoCommit, err := decodeConsumerGroupState(data)
if err != nil {
return nil
}
if wrapper.State == nil {
if state == nil {
return nil
}
state := wrapper.State
// Ensure maps are initialized
if state.Cursor == nil {
state.Cursor = &types.QueueCursor{}
@@ -84,6 +116,9 @@ func (s *ConsumerGroupStateStore) loadAll() error {
if state.Mode == "" {
state.Mode = types.GroupModeQueue
}
if !hasAutoCommit {
state.AutoCommit = true
}
if state.PEL == nil {
state.PEL = make(map[string][]*types.PendingEntry)
}
@@ -125,23 +160,14 @@ func (s *ConsumerGroupStateStore) loadGroup(queueName, groupID string) (*types.C
if err != nil {
return nil, err
}
var wrapper struct {
Version uint8 `json:"version"`
State *types.ConsumerGroupState `json:"state"`
SavedAt int64 `json:"saved_at"`
}
if err := json.Unmarshal(data, &wrapper); err != nil {
state, hasAutoCommit, err := decodeConsumerGroupState(data)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal consumer group state: %w", err)
}
if wrapper.Version > consumerGroupVersion {
return nil, fmt.Errorf("unsupported consumer group version: %d", wrapper.Version)
if state == nil {
return nil, fmt.Errorf("consumer group state is empty")
}
state := wrapper.State
// Ensure maps are initialized
if state.Cursor == nil {
state.Cursor = &types.QueueCursor{}
@@ -149,6 +175,9 @@ func (s *ConsumerGroupStateStore) loadGroup(queueName, groupID string) (*types.C
if state.Mode == "" {
state.Mode = types.GroupModeQueue
}
if !hasAutoCommit {
state.AutoCommit = true
}
if state.PEL == nil {
state.PEL = make(map[string][]*types.PendingEntry)
}
+25 -6
View File
@@ -30,6 +30,7 @@ type Manager struct {
queueStore storage.QueueStore
groupStore storage.ConsumerGroupStore
config Config
lastCommit map[string]time.Time
mu sync.RWMutex
}
@@ -47,15 +48,20 @@ type Config struct {
// StealBatchSize is the maximum number of messages to steal at once.
StealBatchSize int
// AutoCommitInterval controls how often stream groups auto-commit offsets.
// Zero means commit on every delivery batch.
AutoCommitInterval time.Duration
}
// DefaultConfig returns default manager configuration.
func DefaultConfig() Config {
return Config{
VisibilityTimeout: 30 * time.Second,
MaxDeliveryCount: 5,
ClaimBatchSize: 10,
StealBatchSize: 5,
VisibilityTimeout: 30 * time.Second,
MaxDeliveryCount: 5,
ClaimBatchSize: 10,
StealBatchSize: 5,
AutoCommitInterval: 5 * time.Second,
}
}
@@ -65,6 +71,7 @@ func NewManager(queueStore storage.QueueStore, groupStore storage.ConsumerGroupS
queueStore: queueStore,
groupStore: groupStore,
config: config,
lastCommit: make(map[string]time.Time),
}
}
@@ -254,8 +261,20 @@ func (m *Manager) ClaimBatchStream(ctx context.Context, queueName, groupID, cons
}
// Only auto-commit if the group has AutoCommit enabled.
if group.AutoCommit {
if err := m.groupStore.UpdateCommitted(ctx, group.QueueName, group.ID, newCursor); err != nil {
return nil, err
if m.config.AutoCommitInterval <= 0 {
if err := m.groupStore.UpdateCommitted(ctx, group.QueueName, group.ID, newCursor); err != nil {
return nil, err
}
} else {
key := group.QueueName + "/" + group.ID
now := time.Now()
last, ok := m.lastCommit[key]
if !ok || now.Sub(last) >= m.config.AutoCommitInterval {
if err := m.groupStore.UpdateCommitted(ctx, group.QueueName, group.ID, newCursor); err != nil {
return nil, err
}
m.lastCommit[key] = now
}
}
}
}
+12 -9
View File
@@ -59,9 +59,10 @@ type Manager struct {
// Config holds configuration for the queue-based queue manager.
type Config struct {
// Consumer configuration
VisibilityTimeout time.Duration
MaxDeliveryCount int
ClaimBatchSize int
VisibilityTimeout time.Duration
MaxDeliveryCount int
ClaimBatchSize int
AutoCommitInterval time.Duration
// Delivery configuration
DeliveryInterval time.Duration
@@ -89,6 +90,7 @@ func DefaultConfig() Config {
VisibilityTimeout: 30 * time.Second,
MaxDeliveryCount: 5,
ClaimBatchSize: 10,
AutoCommitInterval: 5 * time.Second,
DeliveryInterval: 10 * time.Millisecond,
DeliveryBatchSize: 100,
HeartbeatInterval: 10 * time.Second,
@@ -110,10 +112,11 @@ func NewManager(queueStore storage.QueueStore, groupStore storage.ConsumerGroupS
metrics := consumer.NewMetrics()
consumerCfg := consumer.Config{
VisibilityTimeout: config.VisibilityTimeout,
MaxDeliveryCount: config.MaxDeliveryCount,
ClaimBatchSize: config.ClaimBatchSize,
StealBatchSize: 5,
VisibilityTimeout: config.VisibilityTimeout,
MaxDeliveryCount: config.MaxDeliveryCount,
ClaimBatchSize: config.ClaimBatchSize,
StealBatchSize: 5,
AutoCommitInterval: config.AutoCommitInterval,
}
consumerMgr := consumer.NewManager(queueStore, groupStore, consumerCfg)
@@ -990,7 +993,7 @@ func (m *Manager) deliverToRemoteConsumers(ctx context.Context, config *types.Qu
}
if hasWorkCommitted {
propsCopy["x-work-committed-offset"] = fmt.Sprintf("%d", workCommitted)
propsCopy["x-primary-group-processed"] = strconv.FormatBool(msg.Sequence < workCommitted)
propsCopy["x-work-acked"] = strconv.FormatBool(msg.Sequence < workCommitted)
propsCopy["x-work-group"] = config.PrimaryGroup
}
properties = propsCopy
@@ -1403,7 +1406,7 @@ func (m *Manager) decorateStreamDelivery(delivery *brokerstorage.Message, msg *t
if hasWorkCommitted {
delivery.Properties["x-work-committed-offset"] = fmt.Sprintf("%d", workCommitted)
delivery.Properties["x-primary-group-processed"] = strconv.FormatBool(msg.Sequence < workCommitted)
delivery.Properties["x-work-acked"] = strconv.FormatBool(msg.Sequence < workCommitted)
if primaryGroup != "" {
delivery.Properties["x-work-group"] = primaryGroup
}