Fix delivery bug for exactly-once

Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
dusan
2026-02-09 19:07:27 +01:00
parent 4a1c56e348
commit b732100647
4 changed files with 130 additions and 11 deletions
+43 -9
View File
@@ -6,6 +6,7 @@ package broker
import (
"bytes"
"context"
"encoding/base64"
"fmt"
"io"
"math"
@@ -310,7 +311,7 @@ func (ch *Channel) completePublish() {
props["content-encoding"] = header.Properties.ContentEncoding
}
if header.Properties.CorrelationID != "" {
props["correlation-id"] = header.Properties.CorrelationID
props["correlation-id"] = base64.StdEncoding.EncodeToString([]byte(header.Properties.CorrelationID))
}
if header.Properties.ReplyTo != "" {
props["reply-to"] = header.Properties.ReplyTo
@@ -367,15 +368,20 @@ func (ch *Channel) completePublish() {
if exchangeName == "" && strings.HasPrefix(routingKey, "$queue/") {
qm := ch.conn.broker.getQueueManager()
if qm != nil {
if err := qm.Publish(context.Background(), qtypes.PublishRequest{
err := qm.Publish(context.Background(), qtypes.PublishRequest{
Topic: routingKey,
Payload: body,
Properties: props,
}); err != nil {
})
if err != nil {
ch.conn.logger.Error("queue publish failed", "queue", routingKey, "error", err)
}
if ch.confirmMode {
ch.sendPublisherAck()
if err != nil {
ch.sendPublisherNack()
} else {
ch.sendPublisherAck()
}
}
return
}
@@ -386,15 +392,20 @@ func (ch *Channel) completePublish() {
qm := ch.conn.broker.getQueueManager()
if qm != nil {
queueTopic := "$queue/" + routingKey
if err := qm.Publish(context.Background(), qtypes.PublishRequest{
err := qm.Publish(context.Background(), qtypes.PublishRequest{
Topic: queueTopic,
Payload: body,
Properties: props,
}); err != nil {
})
if err != nil {
ch.conn.logger.Error("queue publish failed", "queue", routingKey, "error", err)
}
if ch.confirmMode {
ch.sendPublisherAck()
if err != nil {
ch.sendPublisherNack()
} else {
ch.sendPublisherAck()
}
}
return
}
@@ -402,6 +413,7 @@ func (ch *Channel) completePublish() {
// Check if this targets a queue via exchange bindings
isQueuePublish := false
var publishFailed bool
ch.exchangeMu.RLock()
bindings := make([]binding, 0, len(ch.bindings))
for _, b := range ch.bindings {
@@ -426,6 +438,7 @@ func (ch *Channel) completePublish() {
Properties: props,
}); err != nil {
ch.conn.logger.Error("queue publish failed", "queue", b.queue, "error", err)
publishFailed = true
}
}
isQueuePublish = true
@@ -452,7 +465,11 @@ func (ch *Channel) completePublish() {
// Publisher confirms
if ch.confirmMode {
ch.sendPublisherAck()
if publishFailed {
ch.sendPublisherNack()
} else {
ch.sendPublisherAck()
}
}
}
@@ -647,9 +664,14 @@ func (ch *Channel) sendDelivery(cons *consumer, topic string, payload []byte, pr
headers = nil
}
correlationID := props["correlation-id"]
if decoded, err := base64.StdEncoding.DecodeString(correlationID); err == nil {
correlationID = string(decoded)
}
properties := codec.BasicProperties{
ContentType: props["content-type"],
CorrelationID: props["correlation-id"],
CorrelationID: correlationID,
ReplyTo: props["reply-to"],
MessageID: props[qtypes.PropMessageID],
Type: props["type"],
@@ -714,6 +736,18 @@ func (ch *Channel) sendPublisherAck() {
}
}
func (ch *Channel) sendPublisherNack() {
seq := ch.publishSeq.Add(1)
nack := &codec.BasicNack{
DeliveryTag: seq,
Multiple: false,
Requeue: false,
}
if err := ch.conn.writeMethod(ch.id, nack); err != nil {
ch.conn.logger.Error("failed to write publisher nack", "error", err)
}
}
func buildMethodFrame(buf *bytes.Buffer, channel uint16, method interface{ Write(io.Writer) error }) (*codec.Frame, error) {
buf.Reset()
if err := method.Write(buf); err != nil {
+6 -1
View File
@@ -5,6 +5,7 @@ package broker
import (
"context"
"encoding/base64"
"fmt"
"log/slog"
"strconv"
@@ -181,7 +182,11 @@ func applyPublishProperties(props *v5.PublishProperties, msg *storage.Message) {
if len(msg.CorrelationData) > 0 {
props.CorrelationData = msg.CorrelationData
} else if v := msg.Properties["correlation-id"]; v != "" {
props.CorrelationData = []byte(v)
if decoded, err := base64.StdEncoding.DecodeString(v); err == nil {
props.CorrelationData = decoded
} else {
props.CorrelationData = []byte(v)
}
}
if msg.PayloadFormat != nil {
+2 -1
View File
@@ -4,6 +4,7 @@
package broker
import (
"encoding/base64"
"strconv"
"strings"
@@ -102,7 +103,7 @@ func extractAllProperties(props *v5.PublishProperties) map[string]string {
}
if props.CorrelationData != nil {
result["correlation-id"] = string(props.CorrelationData)
result["correlation-id"] = base64.StdEncoding.EncodeToString(props.CorrelationData)
}
if props.PayloadFormat != nil {
+79
View File
@@ -0,0 +1,79 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package broker
import (
"encoding/base64"
"testing"
v5 "github.com/absmach/fluxmq/mqtt/packets/v5"
)
func TestExtractAllProperties_CorrelationDataBase64(t *testing.T) {
tests := []struct {
name string
correlationData []byte
}{
{
name: "ascii text",
correlationData: []byte("request-123"),
},
{
name: "binary protobuf-like data",
correlationData: []byte{0x08, 0x96, 0x01, 0x12, 0x07, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67},
},
{
name: "data with null bytes",
correlationData: []byte{0x00, 0x01, 0x02, 0xff, 0xfe, 0xfd},
},
{
name: "non-UTF-8 bytes",
correlationData: []byte{0x80, 0x81, 0xfe, 0xff},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
props := &v5.PublishProperties{
CorrelationData: tt.correlationData,
}
result := extractAllProperties(props)
encoded := result["correlation-id"]
if encoded == "" {
t.Fatal("correlation-id not set in result")
}
decoded, err := base64.StdEncoding.DecodeString(encoded)
if err != nil {
t.Fatalf("correlation-id is not valid base64: %v", err)
}
if string(decoded) != string(tt.correlationData) {
t.Errorf("round-trip failed: got %v, want %v", decoded, tt.correlationData)
}
})
}
}
func TestExtractAllProperties_NilCorrelationData(t *testing.T) {
props := &v5.PublishProperties{
ContentType: "application/json",
}
result := extractAllProperties(props)
if _, ok := result["correlation-id"]; ok {
t.Error("correlation-id should not be set when CorrelationData is nil")
}
}
func TestExtractAllProperties_NilProps(t *testing.T) {
result := extractAllProperties(nil)
if len(result) != 0 {
t.Errorf("expected empty map for nil props, got %v", result)
}
}