mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 06:40:19 +00:00
Don't enqueue retained messages
Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
+64
-49
@@ -28,6 +28,25 @@ func (b *Broker) Publish(msg *storage.Message) error {
|
||||
b.metrics.RecordMessageReceived(msg.QoS, int64(payloadLen))
|
||||
}
|
||||
|
||||
// Handle retained messages before routing — ensures queue topics also
|
||||
// store retained state so new subscribers receive the last known value.
|
||||
if msg.Retain {
|
||||
if err := b.handleRetained(msg, payloadLen); err != nil {
|
||||
if isQueueTopic(msg.Topic) {
|
||||
b.logError("retained_store_failed", err, slog.String("topic", msg.Topic))
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// For queue topics, retained messages are stored only in the retained
|
||||
// store (for last-known-value delivery on subscribe). They are NOT
|
||||
// enqueued — the queue handles the ordered stream of non-retained
|
||||
// messages separately, avoiding duplicates on subscribe.
|
||||
if isQueueTopic(msg.Topic) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Route queue topics and ack topics to queue manager
|
||||
if b.queueManager != nil {
|
||||
if isQueueAckTopic(msg.Topic) {
|
||||
@@ -36,7 +55,6 @@ func (b *Broker) Publish(msg *storage.Message) error {
|
||||
}
|
||||
|
||||
if isQueueTopic(msg.Topic) {
|
||||
// Route to queue manager - use existing properties or nil (avoid allocation)
|
||||
return b.queueManager.Publish(context.Background(), types.PublishRequest{
|
||||
Topic: msg.Topic,
|
||||
Payload: msg.GetPayload(),
|
||||
@@ -60,54 +78,6 @@ func (b *Broker) Publish(msg *storage.Message) error {
|
||||
})
|
||||
}
|
||||
|
||||
if msg.Retain {
|
||||
ctx := context.Background()
|
||||
if payloadLen == 0 {
|
||||
// Clear retained message
|
||||
if err := b.retained.Delete(ctx, msg.Topic); err != nil {
|
||||
return err
|
||||
}
|
||||
// Also delete from cluster
|
||||
if b.cluster != nil {
|
||||
if err := b.cluster.Retained().Delete(ctx, msg.Topic); err != nil {
|
||||
b.logError("cluster_delete_retained", err, slog.String("topic", msg.Topic))
|
||||
}
|
||||
}
|
||||
// Webhook: retained message cleared
|
||||
if b.webhooks != nil {
|
||||
b.webhooks.Notify(context.Background(), events.RetainedMessageSet{
|
||||
MessageTopic: msg.Topic,
|
||||
PayloadSize: 0,
|
||||
Cleared: true,
|
||||
})
|
||||
}
|
||||
} else {
|
||||
// Set retained message - CopyMessage internally retains the buffer
|
||||
retainedMsg := storage.CopyMessage(msg)
|
||||
retainedMsg.Retain = true
|
||||
if err := b.retained.Set(ctx, msg.Topic, retainedMsg); err != nil {
|
||||
retainedMsg.ReleasePayload()
|
||||
return err
|
||||
}
|
||||
// Also store in cluster
|
||||
if b.cluster != nil {
|
||||
retainedMsg.RetainPayload()
|
||||
if err := b.cluster.Retained().Set(ctx, msg.Topic, retainedMsg); err != nil {
|
||||
retainedMsg.ReleasePayload()
|
||||
b.logError("cluster_set_retained", err, slog.String("topic", msg.Topic))
|
||||
}
|
||||
}
|
||||
// Webhook: retained message set
|
||||
if b.webhooks != nil {
|
||||
b.webhooks.Notify(context.Background(), events.RetainedMessageSet{
|
||||
MessageTopic: msg.Topic,
|
||||
PayloadSize: payloadLen,
|
||||
Cleared: false,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Distribute message to subscribers (this will retain the buffer as needed)
|
||||
err := b.distribute(msg)
|
||||
|
||||
@@ -152,6 +122,51 @@ func (b *Broker) PublishWill(clientID string) error {
|
||||
return b.wills.Delete(ctx, clientID)
|
||||
}
|
||||
|
||||
// handleRetained stores or clears a retained message.
|
||||
func (b *Broker) handleRetained(msg *storage.Message, payloadLen int) error {
|
||||
ctx := context.Background()
|
||||
if payloadLen == 0 {
|
||||
if err := b.retained.Delete(ctx, msg.Topic); err != nil {
|
||||
return err
|
||||
}
|
||||
if b.cluster != nil {
|
||||
if err := b.cluster.Retained().Delete(ctx, msg.Topic); err != nil {
|
||||
b.logError("cluster_delete_retained", err, slog.String("topic", msg.Topic))
|
||||
}
|
||||
}
|
||||
if b.webhooks != nil {
|
||||
b.webhooks.Notify(ctx, events.RetainedMessageSet{
|
||||
MessageTopic: msg.Topic,
|
||||
PayloadSize: 0,
|
||||
Cleared: true,
|
||||
})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
retainedMsg := storage.CopyMessage(msg)
|
||||
retainedMsg.Retain = true
|
||||
if err := b.retained.Set(ctx, msg.Topic, retainedMsg); err != nil {
|
||||
retainedMsg.ReleasePayload()
|
||||
return err
|
||||
}
|
||||
if b.cluster != nil {
|
||||
retainedMsg.RetainPayload()
|
||||
if err := b.cluster.Retained().Set(ctx, msg.Topic, retainedMsg); err != nil {
|
||||
retainedMsg.ReleasePayload()
|
||||
b.logError("cluster_set_retained", err, slog.String("topic", msg.Topic))
|
||||
}
|
||||
}
|
||||
if b.webhooks != nil {
|
||||
b.webhooks.Notify(ctx, events.RetainedMessageSet{
|
||||
MessageTopic: msg.Topic,
|
||||
PayloadSize: payloadLen,
|
||||
Cleared: false,
|
||||
})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Distribute distributes a message to all matching subscribers (implements Service interface).
|
||||
func (b *Broker) Distribute(topic string, payload []byte, qos byte, retain bool, props map[string]string) error {
|
||||
msg := &storage.Message{
|
||||
|
||||
Reference in New Issue
Block a user