mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 06:20:18 +00:00
+13
-4
@@ -29,7 +29,8 @@ func (b *Broker) DeliverToSession(s *session.Session, msg *storage.Message) (uin
|
||||
slog.String("client_id", s.ID),
|
||||
slog.String("topic", msg.Topic),
|
||||
slog.Time("expiry", msg.Expiry))
|
||||
msg.ReleasePayload() // Drop expired message - release buffer
|
||||
msg.ReleasePayload()
|
||||
storage.ReleaseMessage(msg)
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
@@ -69,7 +70,8 @@ func (b *Broker) DeliverToSession(s *session.Session, msg *storage.Message) (uin
|
||||
msg.PacketID = packetID
|
||||
// Inflight storage takes ownership - it will release when message is ACK'd or expires
|
||||
if err := s.Inflight().Add(packetID, msg, messages.Outbound); err != nil {
|
||||
msg.ReleasePayload() // Failed to store - release buffer
|
||||
msg.ReleasePayload()
|
||||
storage.ReleaseMessage(msg)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
@@ -91,9 +93,16 @@ func (b *Broker) DeliverToSession(s *session.Session, msg *storage.Message) (uin
|
||||
return packetID, nil
|
||||
}
|
||||
|
||||
// AckMessage acknowledges a message by packet ID.
|
||||
// AckMessage acknowledges a message by packet ID and releases the buffer.
|
||||
func (b *Broker) AckMessage(s *session.Session, packetID uint16) error {
|
||||
s.Inflight().Ack(packetID)
|
||||
msg, err := s.Inflight().Ack(packetID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if msg != nil {
|
||||
msg.ReleasePayload()
|
||||
storage.ReleaseMessage(msg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -82,8 +82,7 @@ func (b *Broker) Publish(msg *storage.Message) error {
|
||||
})
|
||||
}
|
||||
} else {
|
||||
// Set retained message - need to retain buffer for storage
|
||||
msg.RetainPayload()
|
||||
// Set retained message - CopyMessage internally retains the buffer
|
||||
retainedMsg := storage.CopyMessage(msg)
|
||||
retainedMsg.Retain = true
|
||||
if err := b.retained.Set(ctx, msg.Topic, retainedMsg); err != nil {
|
||||
@@ -229,10 +228,8 @@ func (b *Broker) distribute(msg *storage.Message) error {
|
||||
deliverMsg.Properties = msg.Properties
|
||||
deliverMsg.SetPayloadFromBuffer(msg.PayloadBuf)
|
||||
|
||||
// DeliverToSession takes ownership of the buffer and message
|
||||
// DeliverToSession takes full ownership of the message
|
||||
if _, err := b.DeliverToSession(s, deliverMsg); err != nil {
|
||||
// Failed to deliver - release message back to pool
|
||||
storage.ReleaseMessage(deliverMsg)
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
@@ -256,10 +253,8 @@ func (b *Broker) distribute(msg *storage.Message) error {
|
||||
deliverMsg.Properties = msg.Properties
|
||||
deliverMsg.SetPayloadFromBuffer(msg.PayloadBuf)
|
||||
|
||||
// DeliverToSession takes ownership of the buffer and message
|
||||
// DeliverToSession takes full ownership of the message
|
||||
if _, err := b.DeliverToSession(s, deliverMsg); err != nil {
|
||||
// Failed to deliver - release message back to pool
|
||||
storage.ReleaseMessage(deliverMsg)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -226,7 +226,9 @@ func (h *V3Handler) HandlePublish(s *session.Session, pkt packets.ControlPacket)
|
||||
// Zero-copy: Create ref-counted buffer from payload
|
||||
buf := core.GetBufferWithData(payload)
|
||||
|
||||
// Publish message immediately (distribution to subscribers)
|
||||
// Retain buffer before Publish consumes it (for QoS 2 inflight tracking)
|
||||
buf.Retain()
|
||||
|
||||
msg := &storage.Message{
|
||||
Topic: topic,
|
||||
QoS: qos,
|
||||
@@ -234,19 +236,20 @@ func (h *V3Handler) HandlePublish(s *session.Session, pkt packets.ControlPacket)
|
||||
}
|
||||
msg.SetPayloadFromBuffer(buf)
|
||||
if err := h.broker.Publish(msg); err != nil {
|
||||
buf.Release()
|
||||
return err
|
||||
}
|
||||
|
||||
// Store for QoS 2 flow tracking - retain buffer for second message
|
||||
msg.PayloadBuf.Retain()
|
||||
// Store for QoS 2 flow tracking using the retained buffer reference
|
||||
storeMsg := &storage.Message{
|
||||
Topic: topic,
|
||||
QoS: 2,
|
||||
Retain: retain,
|
||||
PacketID: packetID,
|
||||
}
|
||||
storeMsg.SetPayloadFromBuffer(msg.PayloadBuf)
|
||||
storeMsg.SetPayloadFromBuffer(buf)
|
||||
if err := s.Inflight().Add(packetID, storeMsg, messages.Inbound); err != nil {
|
||||
storeMsg.ReleasePayload()
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -302,7 +302,9 @@ func (h *V5Handler) HandlePublish(s *session.Session, pkt packets.ControlPacket)
|
||||
// Zero-copy: Create ref-counted buffer from payload
|
||||
buf := core.GetBufferWithData(payload)
|
||||
|
||||
// Publish message immediately (distribution to subscribers)
|
||||
// Retain buffer before Publish consumes it (for QoS 2 inflight tracking)
|
||||
buf.Retain()
|
||||
|
||||
msg := &storage.Message{
|
||||
Topic: topic,
|
||||
QoS: qos,
|
||||
@@ -318,11 +320,11 @@ func (h *V5Handler) HandlePublish(s *session.Session, pkt packets.ControlPacket)
|
||||
}
|
||||
msg.SetPayloadFromBuffer(buf)
|
||||
if err := h.broker.Publish(msg); err != nil {
|
||||
buf.Release()
|
||||
return sendV5PubRec(s, packetID, v5.PubRecUnspecifiedError, "Publish failed")
|
||||
}
|
||||
|
||||
// Store for QoS 2 flow tracking - retain buffer for second message
|
||||
msg.PayloadBuf.Retain()
|
||||
// Store for QoS 2 flow tracking using the retained buffer reference
|
||||
storeMsg := &storage.Message{
|
||||
Topic: topic,
|
||||
QoS: 2,
|
||||
@@ -332,8 +334,9 @@ func (h *V5Handler) HandlePublish(s *session.Session, pkt packets.ControlPacket)
|
||||
Expiry: expiryTime,
|
||||
PublishTime: publishTime,
|
||||
}
|
||||
storeMsg.SetPayloadFromBuffer(msg.PayloadBuf)
|
||||
storeMsg.SetPayloadFromBuffer(buf)
|
||||
if err := s.Inflight().Add(packetID, storeMsg, messages.Inbound); err != nil {
|
||||
storeMsg.ReleasePayload()
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user