Use buffer pools for AMQP

Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
dusan
2026-02-07 00:43:30 +01:00
parent f763cbe669
commit d9518cbf21
12 changed files with 404 additions and 223 deletions
+21 -12
View File
@@ -16,6 +16,7 @@ import (
"time"
"github.com/absmach/fluxmq/amqp/codec"
"github.com/absmach/fluxmq/internal/bufpool"
qtypes "github.com/absmach/fluxmq/queue/types"
"github.com/absmach/fluxmq/storage"
"github.com/absmach/fluxmq/topics"
@@ -132,7 +133,7 @@ func newChannel(c *Connection, id uint16) *Channel {
}
}
func (ch *Channel) handleMethod(decoded interface{}) error {
func (ch *Channel) handleMethod(decoded any) error {
switch m := decoded.(type) {
// Channel
case *codec.ChannelFlow:
@@ -613,7 +614,9 @@ func (ch *Channel) sendDelivery(cons *consumer, topic string, payload []byte, pr
Exchange: exchange,
RoutingKey: routingKey,
}
methodFrame, err := buildMethodFrame(ch.id, deliver)
methodBuf := bufpool.Get()
defer bufpool.Put(methodBuf)
methodFrame, err := buildMethodFrame(methodBuf, ch.id, deliver)
if err != nil {
return err
}
@@ -654,7 +657,9 @@ func (ch *Channel) sendDelivery(cons *consumer, topic string, payload []byte, pr
Headers: headers,
}
headerFrame, err := buildContentHeaderFrame(ch.id, uint64(len(payload)), properties)
headerBuf := bufpool.Get()
defer bufpool.Put(headerBuf)
headerFrame, err := buildContentHeaderFrame(headerBuf, ch.id, uint64(len(payload)), properties)
if err != nil {
return err
}
@@ -677,12 +682,16 @@ func (ch *Channel) sendBasicReturn(method *codec.BasicPublish, header *codec.Con
Exchange: normalizeExchange(method.Exchange),
RoutingKey: method.RoutingKey,
}
methodFrame, err := buildMethodFrame(ch.id, ret)
methodBuf := bufpool.Get()
defer bufpool.Put(methodBuf)
methodFrame, err := buildMethodFrame(methodBuf, ch.id, ret)
if err != nil {
ch.conn.logger.Error("failed to write basic.return", "error", err)
return
}
headerFrame, err := buildContentHeaderFrame(ch.id, uint64(len(body)), header.Properties)
headerBuf := bufpool.Get()
defer bufpool.Put(headerBuf)
headerFrame, err := buildContentHeaderFrame(headerBuf, ch.id, uint64(len(body)), header.Properties)
if err != nil {
ch.conn.logger.Error("failed to write return header", "error", err)
return
@@ -706,9 +715,9 @@ func (ch *Channel) sendPublisherAck() {
}
}
func buildMethodFrame(channel uint16, method interface{ Write(io.Writer) error }) (*codec.Frame, error) {
var buf bytes.Buffer
if err := method.Write(&buf); err != nil {
func buildMethodFrame(buf *bytes.Buffer, channel uint16, method interface{ Write(io.Writer) error }) (*codec.Frame, error) {
buf.Reset()
if err := method.Write(buf); err != nil {
return nil, err
}
return &codec.Frame{
@@ -718,21 +727,21 @@ func buildMethodFrame(channel uint16, method interface{ Write(io.Writer) error }
}, nil
}
func buildContentHeaderFrame(channel uint16, bodySize uint64, props codec.BasicProperties) (*codec.Frame, error) {
func buildContentHeaderFrame(buf *bytes.Buffer, channel uint16, bodySize uint64, props codec.BasicProperties) (*codec.Frame, error) {
buf.Reset()
header := &codec.ContentHeader{
ClassID: codec.ClassBasic,
Weight: 0,
BodySize: bodySize,
Properties: props,
}
var headerBuf bytes.Buffer
if err := header.WriteContentHeader(&headerBuf); err != nil {
if err := header.WriteContentHeader(buf); err != nil {
return nil, err
}
return &codec.Frame{
Type: codec.FrameHeader,
Channel: channel,
Payload: headerBuf.Bytes(),
Payload: buf.Bytes(),
}, nil
}
+4 -2
View File
@@ -15,6 +15,7 @@ import (
"time"
"github.com/absmach/fluxmq/amqp/codec"
"github.com/absmach/fluxmq/internal/bufpool"
)
// AMQP 0.9.1 protocol header: "AMQP" followed by 0, 0, 9, 1.
@@ -338,8 +339,9 @@ func (c *Connection) deliverMessage(topic string, payload []byte, props map[stri
// writeMethod serializes a method and sends it as a FrameMethod.
func (c *Connection) writeMethod(channel uint16, method interface{ Write(io.Writer) error }) error {
var buf bytes.Buffer
if err := method.Write(&buf); err != nil {
buf := bufpool.Get()
defer bufpool.Put(buf)
if err := method.Write(buf); err != nil {
return err
}
return c.writeFrame(&codec.Frame{
+6 -2
View File
@@ -8,6 +8,8 @@ import (
"encoding/binary"
"io"
"math"
"github.com/absmach/fluxmq/internal/bufpool"
)
// Constants for AMQP frame types
@@ -172,7 +174,8 @@ func ReadTable(r io.Reader) (map[string]interface{}, error) {
// WriteTable writes a field-table to the writer.
func WriteTable(w io.Writer, table map[string]interface{}) error {
b := new(bytes.Buffer)
b := bufpool.Get()
defer bufpool.Put(b)
for key, value := range table {
if err := WriteShortStr(b, key); err != nil {
return err
@@ -385,7 +388,8 @@ func ReadArray(r io.Reader) ([]interface{}, error) {
// WriteArray writes a field-array to the writer.
func WriteArray(w io.Writer, arr []interface{}) error {
b := new(bytes.Buffer)
b := bufpool.Get()
defer bufpool.Put(b)
for _, value := range arr {
if err := WriteFieldValue(b, value); err != nil {
return err
+52 -17
View File
@@ -16,6 +16,26 @@ import (
"github.com/absmach/fluxmq/amqp1/types"
)
var frameBufPool = sync.Pool{New: func() any { s := make([]byte, 0, 8192); return &s }}
const maxPooledFrameBuf = 256 * 1024
func getFrameBuf(size int) *[]byte {
bp := frameBufPool.Get().(*[]byte)
if cap(*bp) < size {
*bp = make([]byte, size)
}
return bp
}
func putFrameBuf(bp *[]byte) {
if cap(*bp) > maxPooledFrameBuf {
return
}
*bp = (*bp)[:0]
frameBufPool.Put(bp)
}
// Connection wraps a net.Conn for AMQP 1.0 frame I/O.
type Connection struct {
conn net.Conn
@@ -99,12 +119,15 @@ func (c *Connection) WriteTransfer(channel uint16, transfer *performatives.Trans
combined := len(perfBody) + len(payload)
if maxBody <= 0 || combined <= maxBody {
buf := make([]byte, combined)
copy(buf, perfBody)
copy(buf[len(perfBody):], payload)
buf := getFrameBuf(combined)
*buf = (*buf)[:combined]
copy(*buf, perfBody)
copy((*buf)[len(perfBody):], payload)
c.mu.Lock()
defer c.mu.Unlock()
return frames.WriteFrame(c.conn, frames.FrameTypeAMQP, channel, buf)
err := frames.WriteFrame(c.conn, frames.FrameTypeAMQP, channel, *buf)
c.mu.Unlock()
putFrameBuf(buf)
return err
}
// Multi-frame: re-encode first transfer with More=true
@@ -142,12 +165,16 @@ func (c *Connection) WriteTransfer(channel uint16, transfer *performatives.Trans
defer c.mu.Unlock()
// First frame
buf := make([]byte, len(perfBody)+firstChunk)
copy(buf, perfBody)
copy(buf[len(perfBody):], payload[:firstChunk])
if err := frames.WriteFrame(c.conn, frames.FrameTypeAMQP, channel, buf); err != nil {
size := len(perfBody) + firstChunk
buf := getFrameBuf(size)
*buf = (*buf)[:size]
copy(*buf, perfBody)
copy((*buf)[len(perfBody):], payload[:firstChunk])
if err := frames.WriteFrame(c.conn, frames.FrameTypeAMQP, channel, *buf); err != nil {
putFrameBuf(buf)
return err
}
putFrameBuf(buf)
offset := firstChunk
remaining := len(payload) - offset
@@ -167,12 +194,16 @@ func (c *Connection) WriteTransfer(channel uint16, transfer *performatives.Trans
}
}
buf := make([]byte, len(framePerf)+chunkSize)
copy(buf, framePerf)
copy(buf[len(framePerf):], payload[offset:offset+chunkSize])
if err := frames.WriteFrame(c.conn, frames.FrameTypeAMQP, channel, buf); err != nil {
size := len(framePerf) + chunkSize
fbuf := getFrameBuf(size)
*fbuf = (*fbuf)[:size]
copy(*fbuf, framePerf)
copy((*fbuf)[len(framePerf):], payload[offset:offset+chunkSize])
if err := frames.WriteFrame(c.conn, frames.FrameTypeAMQP, channel, *fbuf); err != nil {
putFrameBuf(fbuf)
return err
}
putFrameBuf(fbuf)
offset += chunkSize
remaining -= chunkSize
@@ -184,10 +215,14 @@ func (c *Connection) WriteTransfer(channel uint16, transfer *performatives.Trans
// WriteTransferRaw writes a pre-encoded transfer frame with payload.
// Does NOT handle multi-frame splitting — use WriteTransfer for that.
func (c *Connection) WriteTransferRaw(channel uint16, perfBody []byte, payload []byte) error {
combined := make([]byte, len(perfBody)+len(payload))
copy(combined, perfBody)
copy(combined[len(perfBody):], payload)
return c.WriteFrame(frames.FrameTypeAMQP, channel, combined)
size := len(perfBody) + len(payload)
buf := getFrameBuf(size)
*buf = (*buf)[:size]
copy(*buf, perfBody)
copy((*buf)[len(perfBody):], payload)
err := c.WriteFrame(frames.FrameTypeAMQP, channel, *buf)
putFrameBuf(buf)
return err
}
// WriteSASLFrame writes a SASL frame.
+47 -39
View File
@@ -8,6 +8,7 @@ import (
"fmt"
"github.com/absmach/fluxmq/amqp1/types"
"github.com/absmach/fluxmq/internal/bufpool"
)
// Message section descriptors.
@@ -62,51 +63,54 @@ type Message struct {
// Encode serializes the message into wire format (concatenated described sections).
func (m *Message) Encode() ([]byte, error) {
var buf bytes.Buffer
buf := bufpool.Get()
defer bufpool.Put(buf)
if m.Header != nil {
if err := encodeHeader(&buf, m.Header); err != nil {
if err := encodeHeader(buf, m.Header); err != nil {
return nil, err
}
}
if len(m.MessageAnnotations) > 0 {
if err := encodeAnnotations(&buf, DescriptorMessageAnnotations, m.MessageAnnotations); err != nil {
if err := encodeAnnotations(buf, DescriptorMessageAnnotations, m.MessageAnnotations); err != nil {
return nil, err
}
}
if m.Properties != nil {
if err := encodeProperties(&buf, m.Properties); err != nil {
if err := encodeProperties(buf, m.Properties); err != nil {
return nil, err
}
}
if len(m.ApplicationProperties) > 0 {
if err := encodeAppProperties(&buf, m.ApplicationProperties); err != nil {
if err := encodeAppProperties(buf, m.ApplicationProperties); err != nil {
return nil, err
}
}
for _, data := range m.Data {
if err := types.WriteDescriptor(&buf, DescriptorData); err != nil {
if err := types.WriteDescriptor(buf, DescriptorData); err != nil {
return nil, err
}
if err := types.WriteBinary(&buf, data); err != nil {
if err := types.WriteBinary(buf, data); err != nil {
return nil, err
}
}
if m.Value != nil {
if err := types.WriteDescriptor(&buf, DescriptorAMQPValue); err != nil {
if err := types.WriteDescriptor(buf, DescriptorAMQPValue); err != nil {
return nil, err
}
if err := types.WriteAny(&buf, m.Value); err != nil {
if err := types.WriteAny(buf, m.Value); err != nil {
return nil, err
}
}
return buf.Bytes(), nil
result := make([]byte, buf.Len())
copy(result, buf.Bytes())
return result, nil
}
// Decode parses message sections from the wire format payload.
@@ -149,26 +153,27 @@ func Decode(payload []byte) (*Message, error) {
}
func encodeHeader(buf *bytes.Buffer, h *Header) error {
var fields bytes.Buffer
fields := bufpool.Get()
defer bufpool.Put(fields)
count := 0
if err := types.WriteBool(&fields, h.Durable); err != nil {
if err := types.WriteBool(fields, h.Durable); err != nil {
return err
}
count++
if err := types.WriteUbyte(&fields, h.Priority); err != nil {
if err := types.WriteUbyte(fields, h.Priority); err != nil {
return err
}
count++
if err := types.WriteUint(&fields, h.TTL); err != nil {
if err := types.WriteUint(fields, h.TTL); err != nil {
return err
}
count++
if err := types.WriteBool(&fields, h.FirstAcquirer); err != nil {
if err := types.WriteBool(fields, h.FirstAcquirer); err != nil {
return err
}
count++
if err := types.WriteUint(&fields, h.DeliveryCount); err != nil {
if err := types.WriteUint(fields, h.DeliveryCount); err != nil {
return err
}
count++
@@ -180,16 +185,17 @@ func encodeHeader(buf *bytes.Buffer, h *Header) error {
}
func encodeProperties(buf *bytes.Buffer, p *Properties) error {
var fields bytes.Buffer
fields := bufpool.Get()
defer bufpool.Put(fields)
count := 0
// message-id (0)
if p.MessageID != nil {
if err := types.WriteAny(&fields, p.MessageID); err != nil {
if err := types.WriteAny(fields, p.MessageID); err != nil {
return err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return err
}
}
@@ -197,11 +203,11 @@ func encodeProperties(buf *bytes.Buffer, p *Properties) error {
// user-id (1)
if p.UserID != nil {
if err := types.WriteBinary(&fields, p.UserID); err != nil {
if err := types.WriteBinary(fields, p.UserID); err != nil {
return err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return err
}
}
@@ -209,11 +215,11 @@ func encodeProperties(buf *bytes.Buffer, p *Properties) error {
// to (2)
if p.To != "" {
if err := types.WriteString(&fields, p.To); err != nil {
if err := types.WriteString(fields, p.To); err != nil {
return err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return err
}
}
@@ -221,11 +227,11 @@ func encodeProperties(buf *bytes.Buffer, p *Properties) error {
// subject (3)
if p.Subject != "" {
if err := types.WriteString(&fields, p.Subject); err != nil {
if err := types.WriteString(fields, p.Subject); err != nil {
return err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return err
}
}
@@ -233,11 +239,11 @@ func encodeProperties(buf *bytes.Buffer, p *Properties) error {
// reply-to (4)
if p.ReplyTo != "" {
if err := types.WriteString(&fields, p.ReplyTo); err != nil {
if err := types.WriteString(fields, p.ReplyTo); err != nil {
return err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return err
}
}
@@ -245,11 +251,11 @@ func encodeProperties(buf *bytes.Buffer, p *Properties) error {
// correlation-id (5)
if p.CorrelationID != nil {
if err := types.WriteAny(&fields, p.CorrelationID); err != nil {
if err := types.WriteAny(fields, p.CorrelationID); err != nil {
return err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return err
}
}
@@ -257,11 +263,11 @@ func encodeProperties(buf *bytes.Buffer, p *Properties) error {
// content-type (6)
if p.ContentType != "" {
if err := types.WriteSymbol(&fields, p.ContentType); err != nil {
if err := types.WriteSymbol(fields, p.ContentType); err != nil {
return err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return err
}
}
@@ -269,11 +275,11 @@ func encodeProperties(buf *bytes.Buffer, p *Properties) error {
// content-encoding (7)
if p.ContentEncoding != "" {
if err := types.WriteSymbol(&fields, p.ContentEncoding); err != nil {
if err := types.WriteSymbol(fields, p.ContentEncoding); err != nil {
return err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return err
}
}
@@ -286,12 +292,13 @@ func encodeProperties(buf *bytes.Buffer, p *Properties) error {
}
func encodeAnnotations(buf *bytes.Buffer, descriptor uint64, ann map[types.Symbol]any) error {
var pairs bytes.Buffer
pairs := bufpool.Get()
defer bufpool.Put(pairs)
for k, v := range ann {
if err := types.WriteSymbol(&pairs, k); err != nil {
if err := types.WriteSymbol(pairs, k); err != nil {
return err
}
if err := types.WriteAny(&pairs, v); err != nil {
if err := types.WriteAny(pairs, v); err != nil {
return err
}
}
@@ -302,12 +309,13 @@ func encodeAnnotations(buf *bytes.Buffer, descriptor uint64, ann map[types.Symbo
}
func encodeAppProperties(buf *bytes.Buffer, props map[string]any) error {
var pairs bytes.Buffer
pairs := bufpool.Get()
defer bufpool.Put(pairs)
for k, v := range props {
if err := types.WriteString(&pairs, k); err != nil {
if err := types.WriteString(pairs, k); err != nil {
return err
}
if err := types.WriteAny(&pairs, v); err != nil {
if err := types.WriteAny(pairs, v); err != nil {
return err
}
}
+21 -15
View File
@@ -4,9 +4,10 @@
package performatives
import (
"bytes"
"io"
"github.com/absmach/fluxmq/amqp1/types"
"github.com/absmach/fluxmq/internal/bufpool"
)
// AMQP error descriptor
@@ -56,37 +57,41 @@ type Error struct {
// Encode serializes the error as a described list.
func (e *Error) Encode() ([]byte, error) {
var fields bytes.Buffer
if err := types.WriteSymbol(&fields, e.Condition); err != nil {
fields := bufpool.Get()
defer bufpool.Put(fields)
if err := types.WriteSymbol(fields, e.Condition); err != nil {
return nil, err
}
if e.Description != "" {
if err := types.WriteString(&fields, e.Description); err != nil {
if err := types.WriteString(fields, e.Description); err != nil {
return nil, err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
}
if len(e.Info) > 0 {
if err := writeSymbolAnyMap(&fields, e.Info); err != nil {
if err := writeSymbolAnyMap(fields, e.Info); err != nil {
return nil, err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
}
var buf bytes.Buffer
if err := types.WriteDescriptor(&buf, DescriptorError); err != nil {
buf := bufpool.Get()
defer bufpool.Put(buf)
if err := types.WriteDescriptor(buf, DescriptorError); err != nil {
return nil, err
}
if err := types.WriteList(&buf, fields.Bytes(), 3); err != nil {
if err := types.WriteList(buf, fields.Bytes(), 3); err != nil {
return nil, err
}
return buf.Bytes(), nil
result := make([]byte, buf.Len())
copy(result, buf.Bytes())
return result, nil
}
// DecodeError decodes an AMQP error from list fields.
@@ -104,13 +109,14 @@ func DecodeError(fields []any) *Error {
return e
}
func writeSymbolAnyMap(w *bytes.Buffer, m map[types.Symbol]any) error {
var pairs bytes.Buffer
func writeSymbolAnyMap(w io.Writer, m map[types.Symbol]any) error {
pairs := bufpool.Get()
defer bufpool.Put(pairs)
for k, v := range m {
if err := types.WriteSymbol(&pairs, k); err != nil {
if err := types.WriteSymbol(pairs, k); err != nil {
return err
}
if err := types.WriteAny(&pairs, v); err != nil {
if err := types.WriteAny(pairs, v); err != nil {
return err
}
}
+36 -23
View File
@@ -4,9 +4,8 @@
package performatives
import (
"bytes"
"github.com/absmach/fluxmq/amqp1/types"
"github.com/absmach/fluxmq/internal/bufpool"
)
// Outcome descriptors.
@@ -21,14 +20,17 @@ const (
type Accepted struct{}
func (a *Accepted) Encode() ([]byte, error) {
var buf bytes.Buffer
if err := types.WriteDescriptor(&buf, DescriptorAccepted); err != nil {
buf := bufpool.Get()
defer bufpool.Put(buf)
if err := types.WriteDescriptor(buf, DescriptorAccepted); err != nil {
return nil, err
}
if err := types.WriteList(&buf, nil, 0); err != nil {
if err := types.WriteList(buf, nil, 0); err != nil {
return nil, err
}
return buf.Bytes(), nil
result := make([]byte, buf.Len())
copy(result, buf.Bytes())
return result, nil
}
// Rejected outcome with optional error.
@@ -37,7 +39,8 @@ type Rejected struct {
}
func (r *Rejected) Encode() ([]byte, error) {
var fields bytes.Buffer
fields := bufpool.Get()
defer bufpool.Put(fields)
if r.Error != nil {
errBytes, err := r.Error.Encode()
if err != nil {
@@ -45,33 +48,39 @@ func (r *Rejected) Encode() ([]byte, error) {
}
fields.Write(errBytes)
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
}
var buf bytes.Buffer
if err := types.WriteDescriptor(&buf, DescriptorRejected); err != nil {
buf := bufpool.Get()
defer bufpool.Put(buf)
if err := types.WriteDescriptor(buf, DescriptorRejected); err != nil {
return nil, err
}
if err := types.WriteList(&buf, fields.Bytes(), 1); err != nil {
if err := types.WriteList(buf, fields.Bytes(), 1); err != nil {
return nil, err
}
return buf.Bytes(), nil
result := make([]byte, buf.Len())
copy(result, buf.Bytes())
return result, nil
}
// Released outcome.
type Released struct{}
func (r *Released) Encode() ([]byte, error) {
var buf bytes.Buffer
if err := types.WriteDescriptor(&buf, DescriptorReleased); err != nil {
buf := bufpool.Get()
defer bufpool.Put(buf)
if err := types.WriteDescriptor(buf, DescriptorReleased); err != nil {
return nil, err
}
if err := types.WriteList(&buf, nil, 0); err != nil {
if err := types.WriteList(buf, nil, 0); err != nil {
return nil, err
}
return buf.Bytes(), nil
result := make([]byte, buf.Len())
copy(result, buf.Bytes())
return result, nil
}
// Modified outcome.
@@ -81,22 +90,26 @@ type Modified struct {
}
func (m *Modified) Encode() ([]byte, error) {
var fields bytes.Buffer
if err := types.WriteBool(&fields, m.DeliveryFailed); err != nil {
fields := bufpool.Get()
defer bufpool.Put(fields)
if err := types.WriteBool(fields, m.DeliveryFailed); err != nil {
return nil, err
}
if err := types.WriteBool(&fields, m.UndeliverableHere); err != nil {
if err := types.WriteBool(fields, m.UndeliverableHere); err != nil {
return nil, err
}
var buf bytes.Buffer
if err := types.WriteDescriptor(&buf, DescriptorModified); err != nil {
buf := bufpool.Get()
defer bufpool.Put(buf)
if err := types.WriteDescriptor(buf, DescriptorModified); err != nil {
return nil, err
}
if err := types.WriteList(&buf, fields.Bytes(), 2); err != nil {
if err := types.WriteList(buf, fields.Bytes(), 2); err != nil {
return nil, err
}
return buf.Bytes(), nil
result := make([]byte, buf.Len())
copy(result, buf.Bytes())
return result, nil
}
// DecodeOutcome decodes a disposition state from a described type.
+85 -72
View File
@@ -8,6 +8,7 @@ import (
"fmt"
"github.com/absmach/fluxmq/amqp1/types"
"github.com/absmach/fluxmq/internal/bufpool"
)
// Performative descriptors.
@@ -40,20 +41,21 @@ type Open struct {
}
func (o *Open) Encode() ([]byte, error) {
var fields bytes.Buffer
fields := bufpool.Get()
defer bufpool.Put(fields)
count := 0
if err := types.WriteString(&fields, o.ContainerID); err != nil {
if err := types.WriteString(fields, o.ContainerID); err != nil {
return nil, err
}
count++
if o.Hostname != "" {
if err := types.WriteString(&fields, o.Hostname); err != nil {
if err := types.WriteString(fields, o.Hostname); err != nil {
return nil, err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
}
@@ -63,7 +65,7 @@ func (o *Open) Encode() ([]byte, error) {
if maxFrame == 0 {
maxFrame = uint32(DefaultMaxFrameSize)
}
if err := types.WriteUint(&fields, maxFrame); err != nil {
if err := types.WriteUint(fields, maxFrame); err != nil {
return nil, err
}
count++
@@ -72,28 +74,28 @@ func (o *Open) Encode() ([]byte, error) {
if channelMax == 0 {
channelMax = 65535
}
if err := types.WriteUshort(&fields, channelMax); err != nil {
if err := types.WriteUshort(fields, channelMax); err != nil {
return nil, err
}
count++
if o.IdleTimeOut > 0 {
if err := types.WriteUint(&fields, o.IdleTimeOut); err != nil {
if err := types.WriteUint(fields, o.IdleTimeOut); err != nil {
return nil, err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
}
count++
if len(o.Properties) > 0 {
if err := writeSymbolAnyMap(&fields, o.Properties); err != nil {
if err := writeSymbolAnyMap(fields, o.Properties); err != nil {
return nil, err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
}
@@ -135,31 +137,32 @@ type Begin struct {
}
func (b *Begin) Encode() ([]byte, error) {
var fields bytes.Buffer
fields := bufpool.Get()
defer bufpool.Put(fields)
count := 0
if b.RemoteChannel != nil {
if err := types.WriteUshort(&fields, *b.RemoteChannel); err != nil {
if err := types.WriteUshort(fields, *b.RemoteChannel); err != nil {
return nil, err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
}
count++
if err := types.WriteUint(&fields, b.NextOutgoingID); err != nil {
if err := types.WriteUint(fields, b.NextOutgoingID); err != nil {
return nil, err
}
count++
if err := types.WriteUint(&fields, b.IncomingWindow); err != nil {
if err := types.WriteUint(fields, b.IncomingWindow); err != nil {
return nil, err
}
count++
if err := types.WriteUint(&fields, b.OutgoingWindow); err != nil {
if err := types.WriteUint(fields, b.OutgoingWindow); err != nil {
return nil, err
}
count++
@@ -168,7 +171,7 @@ func (b *Begin) Encode() ([]byte, error) {
if handleMax == 0 {
handleMax = 4294967295
}
if err := types.WriteUint(&fields, handleMax); err != nil {
if err := types.WriteUint(fields, handleMax); err != nil {
return nil, err
}
count++
@@ -212,34 +215,35 @@ type Attach struct {
}
func (a *Attach) Encode() ([]byte, error) {
var fields bytes.Buffer
fields := bufpool.Get()
defer bufpool.Put(fields)
count := 0
// Name (0)
if err := types.WriteString(&fields, a.Name); err != nil {
if err := types.WriteString(fields, a.Name); err != nil {
return nil, err
}
count++
// Handle (1)
if err := types.WriteUint(&fields, a.Handle); err != nil {
if err := types.WriteUint(fields, a.Handle); err != nil {
return nil, err
}
count++
// Role (2)
if err := types.WriteBool(&fields, a.Role); err != nil {
if err := types.WriteBool(fields, a.Role); err != nil {
return nil, err
}
count++
// SndSettleMode (3)
if a.SndSettleMode != nil {
if err := types.WriteUbyte(&fields, *a.SndSettleMode); err != nil {
if err := types.WriteUbyte(fields, *a.SndSettleMode); err != nil {
return nil, err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
}
@@ -247,11 +251,11 @@ func (a *Attach) Encode() ([]byte, error) {
// RcvSettleMode (4)
if a.RcvSettleMode != nil {
if err := types.WriteUbyte(&fields, *a.RcvSettleMode); err != nil {
if err := types.WriteUbyte(fields, *a.RcvSettleMode); err != nil {
return nil, err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
}
@@ -265,7 +269,7 @@ func (a *Attach) Encode() ([]byte, error) {
}
fields.Write(src)
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
}
@@ -279,31 +283,31 @@ func (a *Attach) Encode() ([]byte, error) {
}
fields.Write(tgt)
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
}
count++
// Unsettled (7) - null
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
count++
// IncompleteUnsettled (8) - null
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
count++
// InitialDeliveryCount (9) - required for sender role
if !a.Role { // sender
if err := types.WriteUint(&fields, a.InitialDeliveryCount); err != nil {
if err := types.WriteUint(fields, a.InitialDeliveryCount); err != nil {
return nil, err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
}
@@ -311,11 +315,11 @@ func (a *Attach) Encode() ([]byte, error) {
// MaxMessageSize (10)
if a.MaxMessageSize > 0 {
if err := types.WriteUlong(&fields, a.MaxMessageSize); err != nil {
if err := types.WriteUlong(fields, a.MaxMessageSize); err != nil {
return nil, err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
}
@@ -324,19 +328,19 @@ func (a *Attach) Encode() ([]byte, error) {
// Only encode fields 11-13 if Properties is set
if len(a.Properties) > 0 {
// OfferedCapabilities (11) - null
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
count++
// DesiredCapabilities (12) - null
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
count++
// Properties (13)
if err := types.WriteStringAnyMap(&fields, a.Properties); err != nil {
if err := types.WriteStringAnyMap(fields, a.Properties); err != nil {
return nil, err
}
count++
@@ -412,14 +416,15 @@ type Flow struct {
}
func (f *Flow) Encode() ([]byte, error) {
var fields bytes.Buffer
fields := bufpool.Get()
defer bufpool.Put(fields)
count := 0
writeOptUint32 := func(v *uint32) error {
if v != nil {
return types.WriteUint(&fields, *v)
return types.WriteUint(fields, *v)
}
return types.WriteNull(&fields)
return types.WriteNull(fields)
}
if err := writeOptUint32(f.NextIncomingID); err != nil {
@@ -427,17 +432,17 @@ func (f *Flow) Encode() ([]byte, error) {
}
count++
if err := types.WriteUint(&fields, f.IncomingWindow); err != nil {
if err := types.WriteUint(fields, f.IncomingWindow); err != nil {
return nil, err
}
count++
if err := types.WriteUint(&fields, f.NextOutgoingID); err != nil {
if err := types.WriteUint(fields, f.NextOutgoingID); err != nil {
return nil, err
}
count++
if err := types.WriteUint(&fields, f.OutgoingWindow); err != nil {
if err := types.WriteUint(fields, f.OutgoingWindow); err != nil {
return nil, err
}
count++
@@ -462,12 +467,12 @@ func (f *Flow) Encode() ([]byte, error) {
}
count++
if err := types.WriteBool(&fields, f.Drain); err != nil {
if err := types.WriteBool(fields, f.Drain); err != nil {
return nil, err
}
count++
if err := types.WriteBool(&fields, f.Echo); err != nil {
if err := types.WriteBool(fields, f.Echo); err != nil {
return nil, err
}
count++
@@ -529,53 +534,54 @@ type Transfer struct {
}
func (t *Transfer) Encode() ([]byte, error) {
var fields bytes.Buffer
fields := bufpool.Get()
defer bufpool.Put(fields)
count := 0
if err := types.WriteUint(&fields, t.Handle); err != nil {
if err := types.WriteUint(fields, t.Handle); err != nil {
return nil, err
}
count++
if t.DeliveryID != nil {
if err := types.WriteUint(&fields, *t.DeliveryID); err != nil {
if err := types.WriteUint(fields, *t.DeliveryID); err != nil {
return nil, err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
}
count++
if t.DeliveryTag != nil {
if err := types.WriteBinary(&fields, t.DeliveryTag); err != nil {
if err := types.WriteBinary(fields, t.DeliveryTag); err != nil {
return nil, err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
}
count++
if t.MessageFormat != nil {
if err := types.WriteUint(&fields, *t.MessageFormat); err != nil {
if err := types.WriteUint(fields, *t.MessageFormat); err != nil {
return nil, err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
}
count++
if err := types.WriteBool(&fields, t.Settled); err != nil {
if err := types.WriteBool(fields, t.Settled); err != nil {
return nil, err
}
count++
if err := types.WriteBool(&fields, t.More); err != nil {
if err := types.WriteBool(fields, t.More); err != nil {
return nil, err
}
count++
@@ -623,31 +629,32 @@ type Disposition struct {
}
func (d *Disposition) Encode() ([]byte, error) {
var fields bytes.Buffer
fields := bufpool.Get()
defer bufpool.Put(fields)
count := 0
if err := types.WriteBool(&fields, d.Role); err != nil {
if err := types.WriteBool(fields, d.Role); err != nil {
return nil, err
}
count++
if err := types.WriteUint(&fields, d.First); err != nil {
if err := types.WriteUint(fields, d.First); err != nil {
return nil, err
}
count++
if d.Last != nil {
if err := types.WriteUint(&fields, *d.Last); err != nil {
if err := types.WriteUint(fields, *d.Last); err != nil {
return nil, err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
}
count++
if err := types.WriteBool(&fields, d.Settled); err != nil {
if err := types.WriteBool(fields, d.Settled); err != nil {
return nil, err
}
count++
@@ -659,13 +666,13 @@ func (d *Disposition) Encode() ([]byte, error) {
}
fields.Write(stateBytes)
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
}
count++
if err := types.WriteBool(&fields, d.Batchable); err != nil {
if err := types.WriteBool(fields, d.Batchable); err != nil {
return nil, err
}
count++
@@ -707,15 +714,16 @@ type Detach struct {
}
func (d *Detach) Encode() ([]byte, error) {
var fields bytes.Buffer
fields := bufpool.Get()
defer bufpool.Put(fields)
count := 0
if err := types.WriteUint(&fields, d.Handle); err != nil {
if err := types.WriteUint(fields, d.Handle); err != nil {
return nil, err
}
count++
if err := types.WriteBool(&fields, d.Closed); err != nil {
if err := types.WriteBool(fields, d.Closed); err != nil {
return nil, err
}
count++
@@ -727,7 +735,7 @@ func (d *Detach) Encode() ([]byte, error) {
}
fields.Write(errBytes)
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
}
@@ -760,7 +768,8 @@ type End struct {
}
func (e *End) Encode() ([]byte, error) {
var fields bytes.Buffer
fields := bufpool.Get()
defer bufpool.Put(fields)
count := 0
if e.Error != nil {
@@ -793,7 +802,8 @@ type Close struct {
}
func (c *Close) Encode() ([]byte, error) {
var fields bytes.Buffer
fields := bufpool.Get()
defer bufpool.Put(fields)
count := 0
if c.Error != nil {
@@ -853,14 +863,17 @@ func DecodePerformative(body []byte) (uint64, any, error) {
}
func encodePerformative(descriptor uint64, fields []byte, count int) ([]byte, error) {
var buf bytes.Buffer
if err := types.WriteDescriptor(&buf, descriptor); err != nil {
buf := bufpool.Get()
defer bufpool.Put(buf)
if err := types.WriteDescriptor(buf, descriptor); err != nil {
return nil, err
}
if err := types.WriteList(&buf, fields, count); err != nil {
if err := types.WriteList(buf, fields, count); err != nil {
return nil, err
}
return buf.Bytes(), nil
result := make([]byte, buf.Len())
copy(result, buf.Bytes())
return result, nil
}
func encodeOutcome(state any) ([]byte, error) {
+41 -34
View File
@@ -4,9 +4,8 @@
package performatives
import (
"bytes"
"github.com/absmach/fluxmq/amqp1/types"
"github.com/absmach/fluxmq/internal/bufpool"
)
// Descriptors for Source and Target.
@@ -31,97 +30,101 @@ type Source struct {
// Encode serializes the Source as a described list.
func (s *Source) Encode() ([]byte, error) {
var fields bytes.Buffer
fields := bufpool.Get()
defer bufpool.Put(fields)
count := 0
// Address (index 0)
if s.Address != "" {
if err := types.WriteString(&fields, s.Address); err != nil {
if err := types.WriteString(fields, s.Address); err != nil {
return nil, err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
}
count++
// Durable (index 1)
if err := types.WriteUint(&fields, s.Durable); err != nil {
if err := types.WriteUint(fields, s.Durable); err != nil {
return nil, err
}
count++
// ExpiryPolicy (index 2)
if s.ExpiryPolicy != "" {
if err := types.WriteSymbol(&fields, s.ExpiryPolicy); err != nil {
if err := types.WriteSymbol(fields, s.ExpiryPolicy); err != nil {
return nil, err
}
} else {
if err := types.WriteSymbol(&fields, "session-end"); err != nil {
if err := types.WriteSymbol(fields, "session-end"); err != nil {
return nil, err
}
}
count++
// Timeout (index 3)
if err := types.WriteUint(&fields, s.Timeout); err != nil {
if err := types.WriteUint(fields, s.Timeout); err != nil {
return nil, err
}
count++
// Dynamic (index 4)
if err := types.WriteBool(&fields, s.Dynamic); err != nil {
if err := types.WriteBool(fields, s.Dynamic); err != nil {
return nil, err
}
count++
if len(s.Capabilities) > 0 {
// dynamic-node-properties (5) - null
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
count++
// distribution-mode (6) - null
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
count++
// filter (7) - null
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
count++
// default-outcome (8) - null
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
count++
// outcomes (9) - null
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
count++
// capabilities (10)
if err := types.WriteSymbolArray(&fields, s.Capabilities); err != nil {
if err := types.WriteSymbolArray(fields, s.Capabilities); err != nil {
return nil, err
}
count++
}
var buf bytes.Buffer
if err := types.WriteDescriptor(&buf, DescriptorSource); err != nil {
buf := bufpool.Get()
defer bufpool.Put(buf)
if err := types.WriteDescriptor(buf, DescriptorSource); err != nil {
return nil, err
}
if err := types.WriteList(&buf, fields.Bytes(), count); err != nil {
if err := types.WriteList(buf, fields.Bytes(), count); err != nil {
return nil, err
}
return buf.Bytes(), nil
result := make([]byte, buf.Len())
copy(result, buf.Bytes())
return result, nil
}
// DecodeSource decodes a Source from list fields.
@@ -163,68 +166,72 @@ type Target struct {
// Encode serializes the Target as a described list.
func (t *Target) Encode() ([]byte, error) {
var fields bytes.Buffer
fields := bufpool.Get()
defer bufpool.Put(fields)
count := 0
if t.Address != "" {
if err := types.WriteString(&fields, t.Address); err != nil {
if err := types.WriteString(fields, t.Address); err != nil {
return nil, err
}
} else {
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
}
count++
if err := types.WriteUint(&fields, t.Durable); err != nil {
if err := types.WriteUint(fields, t.Durable); err != nil {
return nil, err
}
count++
if t.ExpiryPolicy != "" {
if err := types.WriteSymbol(&fields, t.ExpiryPolicy); err != nil {
if err := types.WriteSymbol(fields, t.ExpiryPolicy); err != nil {
return nil, err
}
} else {
if err := types.WriteSymbol(&fields, "session-end"); err != nil {
if err := types.WriteSymbol(fields, "session-end"); err != nil {
return nil, err
}
}
count++
if err := types.WriteUint(&fields, t.Timeout); err != nil {
if err := types.WriteUint(fields, t.Timeout); err != nil {
return nil, err
}
count++
if err := types.WriteBool(&fields, t.Dynamic); err != nil {
if err := types.WriteBool(fields, t.Dynamic); err != nil {
return nil, err
}
count++
if len(t.Capabilities) > 0 {
// dynamic-node-properties (5) - null
if err := types.WriteNull(&fields); err != nil {
if err := types.WriteNull(fields); err != nil {
return nil, err
}
count++
// capabilities (6)
if err := types.WriteSymbolArray(&fields, t.Capabilities); err != nil {
if err := types.WriteSymbolArray(fields, t.Capabilities); err != nil {
return nil, err
}
count++
}
var buf bytes.Buffer
if err := types.WriteDescriptor(&buf, DescriptorTarget); err != nil {
buf := bufpool.Get()
defer bufpool.Put(buf)
if err := types.WriteDescriptor(buf, DescriptorTarget); err != nil {
return nil, err
}
if err := types.WriteList(&buf, fields.Bytes(), count); err != nil {
if err := types.WriteList(buf, fields.Bytes(), count); err != nil {
return nil, err
}
return buf.Bytes(), nil
result := make([]byte, buf.Len())
copy(result, buf.Bytes())
return result, nil
}
// DecodeTarget decodes a Target from list fields.
+8 -7
View File
@@ -4,11 +4,12 @@
package types
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"math"
"github.com/absmach/fluxmq/internal/bufpool"
)
// WriteNull writes a null value.
@@ -324,7 +325,8 @@ func WriteSymbolArray(w io.Writer, symbols []Symbol) error {
if len(symbols) == 1 {
return WriteSymbol(w, symbols[0])
}
var elems bytes.Buffer
elems := bufpool.Get()
defer bufpool.Put(elems)
for _, s := range symbols {
b := []byte(s)
if len(b) <= 255 {
@@ -337,8 +339,6 @@ func WriteSymbolArray(w io.Writer, symbols []Symbol) error {
elems.Write(b)
}
}
// Use sym8 element type for short symbols, sym32 for long
// For simplicity, determine based on whether all symbols fit in sym8
allShort := true
for _, s := range symbols {
if len([]byte(s)) > 255 {
@@ -355,12 +355,13 @@ func WriteSymbolArray(w io.Writer, symbols []Symbol) error {
// WriteStringAnyMap writes a map with string keys and any values.
func WriteStringAnyMap(w io.Writer, m map[string]any) error {
var pairs bytes.Buffer
pairs := bufpool.Get()
defer bufpool.Put(pairs)
for k, v := range m {
if err := WriteString(&pairs, k); err != nil {
if err := WriteString(pairs, k); err != nil {
return err
}
if err := WriteAny(&pairs, v); err != nil {
if err := WriteAny(pairs, v); err != nil {
return err
}
}
+26
View File
@@ -0,0 +1,26 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package bufpool
import (
"bytes"
"sync"
)
const maxPooledCap = 64 * 1024
var pool = sync.Pool{New: func() any { return new(bytes.Buffer) }}
func Get() *bytes.Buffer {
b := pool.Get().(*bytes.Buffer)
b.Reset()
return b
}
func Put(b *bytes.Buffer) {
if b.Cap() > maxPooledCap {
return
}
pool.Put(b)
}
+57
View File
@@ -0,0 +1,57 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package bufpool
import (
"sync"
"testing"
)
func TestGetReturnsResetBuffer(t *testing.T) {
b := Get()
b.WriteString("hello")
Put(b)
b2 := Get()
if b2.Len() != 0 {
t.Fatalf("expected empty buffer, got %d bytes", b2.Len())
}
Put(b2)
}
func TestPutDiscardsOversizedBuffer(t *testing.T) {
b := Get()
b.Grow(maxPooledCap + 1)
Put(b) // should be discarded, not panic
}
func TestConcurrentGetPut(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
b := Get()
b.WriteString("concurrent test data")
Put(b)
}()
}
wg.Wait()
}
func TestGetReturnsUsableBuffer(t *testing.T) {
b := Get()
defer Put(b)
n, err := b.WriteString("test")
if err != nil {
t.Fatal(err)
}
if n != 4 {
t.Fatalf("expected 4 bytes written, got %d", n)
}
if b.String() != "test" {
t.Fatalf("expected %q, got %q", "test", b.String())
}
}