Remove unused code

Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
dusan
2026-02-08 14:46:16 +01:00
parent 72650fed37
commit 97fe8cd4c0
21 changed files with 55 additions and 803 deletions
-1
View File
@@ -106,7 +106,6 @@ type Channel struct {
type unackedDelivery struct {
deliveryTag uint64
exchange string
routingKey string
queueName string
messageID string
-9
View File
@@ -405,15 +405,6 @@ func (c *Connection) sendClose(amqpErr *performatives.Error) error {
return nil
}
// sendEnd sends an End frame with an error on the given channel.
func (c *Connection) sendEnd(ch uint16, amqpErr *performatives.Error) error {
resp := &performatives.End{Error: amqpErr}
body, err := resp.Encode()
if err != nil {
return err
}
return c.conn.WritePerformative(ch, body)
}
func (c *Connection) handleEnd(ch uint16, end *performatives.End) error {
c.sessionsMu.Lock()
-20
View File
@@ -12,7 +12,6 @@ import (
"github.com/absmach/fluxmq/amqp1/message"
"github.com/absmach/fluxmq/amqp1/performatives"
amqptypes "github.com/absmach/fluxmq/amqp1/types"
qtypes "github.com/absmach/fluxmq/queue/types"
"github.com/absmach/fluxmq/storage"
)
@@ -530,25 +529,6 @@ func (l *Link) handleManagementTransfer(transfer *performatives.Transfer, msg *m
replyLink.sendAMQPMessage(resp, 0)
}
// sendDetachError sends a detach with an error condition back to the client.
func (l *Link) sendDetachError(condition amqptypes.Symbol, description string) {
resp := &performatives.Detach{
Handle: l.handle,
Closed: true,
Error: &performatives.Error{
Condition: condition,
Description: description,
},
}
body, err := resp.Encode()
if err != nil {
l.logger.Error("failed to encode detach error", "error", err)
return
}
if err := l.session.conn.conn.WritePerformative(l.session.localCh, body); err != nil {
l.logger.Error("failed to send detach error", "error", err)
}
}
func uint32ToBytes(v uint32) []byte {
return []byte{byte(v >> 24), byte(v >> 16), byte(v >> 8), byte(v)}
-78
View File
@@ -20,10 +20,6 @@ const (
TypeRetainedMessageSet = "message.retained"
TypeSubscriptionCreated = "subscription.created"
TypeSubscriptionRemoved = "subscription.removed"
TypeAuthSuccess = "auth.success"
TypeAuthFailure = "auth.failure"
TypePublishDenied = "authz.publish_denied"
TypeSubscribeDenied = "authz.subscribe_denied"
)
// Event is the common interface for all webhook events.
@@ -210,77 +206,3 @@ func (e SubscriptionRemoved) Wrap(brokerID string) *Envelope {
}
}
// AuthSuccess is emitted when a client successfully authenticates.
type AuthSuccess struct {
ClientID string `json:"client_id"`
RemoteAddr string `json:"remote_addr"`
}
func (e AuthSuccess) Type() string { return TypeAuthSuccess }
func (e AuthSuccess) Topic() string { return "" }
func (e AuthSuccess) Wrap(brokerID string) *Envelope {
return &Envelope{
EventType: e.Type(),
EventID: uuid.New().String(),
Timestamp: time.Now().UTC().Format(time.RFC3339Nano),
BrokerID: brokerID,
Data: e,
}
}
// AuthFailure is emitted when client authentication fails.
type AuthFailure struct {
ClientID string `json:"client_id"`
Reason string `json:"reason"`
RemoteAddr string `json:"remote_addr"`
}
func (e AuthFailure) Type() string { return TypeAuthFailure }
func (e AuthFailure) Topic() string { return "" }
func (e AuthFailure) Wrap(brokerID string) *Envelope {
return &Envelope{
EventType: e.Type(),
EventID: uuid.New().String(),
Timestamp: time.Now().UTC().Format(time.RFC3339Nano),
BrokerID: brokerID,
Data: e,
}
}
// PublishDenied is emitted when a publish is denied by authorization.
type PublishDenied struct {
ClientID string `json:"client_id"`
MessageTopic string `json:"topic"`
Reason string `json:"reason"`
}
func (e PublishDenied) Type() string { return TypePublishDenied }
func (e PublishDenied) Topic() string { return e.MessageTopic }
func (e PublishDenied) Wrap(brokerID string) *Envelope {
return &Envelope{
EventType: e.Type(),
EventID: uuid.New().String(),
Timestamp: time.Now().UTC().Format(time.RFC3339Nano),
BrokerID: brokerID,
Data: e,
}
}
// SubscribeDenied is emitted when a subscribe is denied by authorization.
type SubscribeDenied struct {
ClientID string `json:"client_id"`
TopicFilter string `json:"topic_filter"`
Reason string `json:"reason"`
}
func (e SubscribeDenied) Type() string { return TypeSubscribeDenied }
func (e SubscribeDenied) Topic() string { return e.TopicFilter }
func (e SubscribeDenied) Wrap(brokerID string) *Envelope {
return &Envelope{
EventType: e.Type(),
EventID: uuid.New().String(),
Timestamp: time.Now().UTC().Format(time.RFC3339Nano),
BrokerID: brokerID,
Data: e,
}
}
-174
View File
@@ -1,174 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package broker
import (
"sync/atomic"
"time"
)
// Stats tracks detailed broker statistics.
type Stats struct {
startTime time.Time
// Connection stats
totalConnections atomic.Uint64
currentConnections atomic.Uint64
disconnections atomic.Uint64
// Message stats
messagesReceived atomic.Uint64
messagesSent atomic.Uint64
publishReceived atomic.Uint64
publishSent atomic.Uint64
// Byte stats
bytesReceived atomic.Uint64
bytesSent atomic.Uint64
// Subscription stats
subscriptions atomic.Uint64
unsubscriptions atomic.Uint64
// Retained message stats
retainedMessages atomic.Uint64
// Error stats
protocolErrors atomic.Uint64
authErrors atomic.Uint64
authzErrors atomic.Uint64
packetErrors atomic.Uint64
}
// NewStats creates a new Stats instance.
func NewStats() *Stats {
return &Stats{
startTime: time.Now(),
}
}
func (s *Stats) IncrementConnections() {
s.totalConnections.Add(1)
s.currentConnections.Add(1)
}
func (s *Stats) DecrementConnections() {
s.currentConnections.Add(^uint64(0))
s.disconnections.Add(1)
}
func (s *Stats) GetTotalConnections() uint64 {
return s.totalConnections.Load()
}
func (s *Stats) GetCurrentConnections() uint64 {
return s.currentConnections.Load()
}
func (s *Stats) GetDisconnections() uint64 {
return s.disconnections.Load()
}
func (s *Stats) IncrementMessagesReceived() {
s.messagesReceived.Add(1)
}
func (s *Stats) IncrementMessagesSent() {
s.messagesSent.Add(1)
}
func (s *Stats) IncrementPublishReceived() {
s.publishReceived.Add(1)
s.messagesReceived.Add(1)
}
func (s *Stats) IncrementPublishSent() {
s.publishSent.Add(1)
s.messagesSent.Add(1)
}
func (s *Stats) GetMessagesReceived() uint64 {
return s.messagesReceived.Load()
}
func (s *Stats) GetMessagesSent() uint64 {
return s.messagesSent.Load()
}
func (s *Stats) GetPublishReceived() uint64 {
return s.publishReceived.Load()
}
func (s *Stats) GetPublishSent() uint64 {
return s.publishSent.Load()
}
func (s *Stats) AddBytesReceived(n uint64) {
s.bytesReceived.Add(n)
}
func (s *Stats) AddBytesSent(n uint64) {
s.bytesSent.Add(n)
}
func (s *Stats) GetBytesReceived() uint64 {
return s.bytesReceived.Load()
}
func (s *Stats) GetBytesSent() uint64 {
return s.bytesSent.Load()
}
func (s *Stats) IncrementSubscriptions() {
s.subscriptions.Add(1)
}
func (s *Stats) DecrementSubscriptions() {
s.subscriptions.Add(^uint64(0))
s.unsubscriptions.Add(1)
}
func (s *Stats) GetSubscriptions() uint64 {
return s.subscriptions.Load()
}
func (s *Stats) GetRetainedMessages() uint64 {
return s.retainedMessages.Load()
}
func (s *Stats) IncrementProtocolErrors() {
s.protocolErrors.Add(1)
}
func (s *Stats) IncrementAuthErrors() {
s.authErrors.Add(1)
}
func (s *Stats) IncrementAuthzErrors() {
s.authzErrors.Add(1)
}
func (s *Stats) IncrementPacketErrors() {
s.packetErrors.Add(1)
}
func (s *Stats) GetProtocolErrors() uint64 {
return s.protocolErrors.Load()
}
func (s *Stats) GetAuthErrors() uint64 {
return s.authErrors.Load()
}
func (s *Stats) GetAuthzErrors() uint64 {
return s.authzErrors.Load()
}
func (s *Stats) GetPacketErrors() uint64 {
return s.packetErrors.Load()
}
func (s *Stats) GetUptime() time.Duration {
return time.Since(s.startTime)
}
+8 -2
View File
@@ -6,6 +6,7 @@ package webhook
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"strings"
@@ -17,6 +18,11 @@ import (
"github.com/sony/gobreaker"
)
var (
ErrSenderCannotBeNil = errors.New("sender cannot be nil")
ErrEventMustImplementEventInterface = errors.New("event must implement events.Event interface")
)
// GenericNotifier implements webhook notifications with worker pool and circuit breaker.
type GenericNotifier struct {
cfg config.WebhookConfig
@@ -55,7 +61,7 @@ func NewNotifier(cfg config.WebhookConfig, brokerID string, sender Sender, logge
}
if sender == nil {
return nil, fmt.Errorf("sender cannot be nil")
return nil, ErrSenderCannotBeNil
}
ctx, cancel := context.WithCancel(context.Background())
@@ -146,7 +152,7 @@ func (n *GenericNotifier) Notify(ctx context.Context, event interface{}) error {
// Cast to events.Event
ev, ok := event.(events.Event)
if !ok {
return fmt.Errorf("event must implement events.Event interface")
return ErrEventMustImplementEventInterface
}
// Filter endpoints and queue jobs
-1
View File
@@ -56,7 +56,6 @@ type Client struct {
reconnMu sync.Mutex
// Keep-alive
pingTimer *time.Timer
pingStop chan struct{}
lastActivity time.Time
activityMu sync.Mutex
+18 -11
View File
@@ -32,7 +32,14 @@ const (
urlPrefix = "http://"
)
var _ Cluster = (*EtcdCluster)(nil)
var (
_ Cluster = (*EtcdCluster)(nil)
ErrEtcdServerStartTimeout = errors.New("etcd server took too long to start")
ErrTransportNotConfigured = errors.New("transport not configured")
ErrNoMessageHandlerConfigured = errors.New("no message handler configured")
ErrNoLocalStoreConfigured = errors.New("no local store configured")
)
// EtcdCluster implements the Cluster interface using embedded etcd.
type EtcdCluster struct {
@@ -165,7 +172,7 @@ func NewEtcdCluster(cfg *EtcdConfig, localStore storage.Store, logger *slog.Logg
logger.Info("etcd server is ready", slog.String("node_id", cfg.NodeID))
case <-time.After(60 * time.Second):
e.Server.Stop()
return nil, fmt.Errorf("etcd server took too long to start")
return nil, ErrEtcdServerStartTimeout
}
// Create etcd client
@@ -1109,7 +1116,7 @@ func (c *EtcdCluster) TakeoverSession(ctx context.Context, clientID, fromNode, t
// EnqueueRemote sends an enqueue request to a remote node.
func (c *EtcdCluster) EnqueueRemote(ctx context.Context, nodeID, queueName string, payload []byte, properties map[string]string) (string, error) {
if c.transport == nil {
return "", fmt.Errorf("transport not configured")
return "", ErrTransportNotConfigured
}
return c.transport.SendEnqueueRemote(ctx, nodeID, queueName, payload, properties, false, false)
}
@@ -1117,7 +1124,7 @@ func (c *EtcdCluster) EnqueueRemote(ctx context.Context, nodeID, queueName strin
// RouteQueueMessage sends a queue message to a remote consumer.
func (c *EtcdCluster) RouteQueueMessage(ctx context.Context, nodeID, clientID, queueName, messageID string, payload []byte, properties map[string]string, sequence int64) error {
if c.transport == nil {
return fmt.Errorf("transport not configured")
return ErrTransportNotConfigured
}
return c.transport.SendRouteQueueMessage(ctx, nodeID, clientID, queueName, messageID, payload, properties, sequence)
}
@@ -1137,7 +1144,7 @@ func (c *EtcdCluster) SetQueueHandler(handler QueueHandler) {
// Delegates to the broker to deliver a message to a local client.
func (c *EtcdCluster) DeliverToClient(ctx context.Context, clientID string, msg *Message) error {
if c.msgHandler == nil {
return fmt.Errorf("no message handler configured")
return ErrNoMessageHandlerConfigured
}
return c.msgHandler.DeliverToClient(ctx, clientID, msg)
}
@@ -1146,7 +1153,7 @@ func (c *EtcdCluster) DeliverToClient(ctx context.Context, clientID string, msg
// Delegates to the broker to capture session state and close the session.
func (c *EtcdCluster) GetSessionStateAndClose(ctx context.Context, clientID string) (*clusterv1.SessionState, error) {
if c.msgHandler == nil {
return nil, fmt.Errorf("no message handler configured")
return nil, ErrNoMessageHandlerConfigured
}
return c.msgHandler.GetSessionStateAndClose(ctx, clientID)
}
@@ -1155,7 +1162,7 @@ func (c *EtcdCluster) GetSessionStateAndClose(ctx context.Context, clientID stri
// Fetches a retained message from the local BadgerDB store.
func (c *EtcdCluster) GetRetainedMessage(ctx context.Context, topic string) (*storage.Message, error) {
if c.localStore == nil {
return nil, fmt.Errorf("no local store configured")
return nil, ErrNoLocalStoreConfigured
}
return c.localStore.Retained().Get(ctx, topic)
}
@@ -1164,7 +1171,7 @@ func (c *EtcdCluster) GetRetainedMessage(ctx context.Context, topic string) (*st
// Fetches a will message from the local BadgerDB store.
func (c *EtcdCluster) GetWillMessage(ctx context.Context, clientID string) (*storage.WillMessage, error) {
if c.localStore == nil {
return nil, fmt.Errorf("no local store configured")
return nil, ErrNoLocalStoreConfigured
}
return c.localStore.Wills().Get(ctx, clientID)
}
@@ -1173,7 +1180,7 @@ func (c *EtcdCluster) GetWillMessage(ctx context.Context, clientID string) (*sto
// Called when another broker routes a PUBLISH message to this node.
func (c *EtcdCluster) HandlePublish(ctx context.Context, clientID, topic string, payload []byte, qos byte, retain, dup bool, properties map[string]string) error {
if c.msgHandler == nil {
return fmt.Errorf("no message handler configured")
return ErrNoMessageHandlerConfigured
}
msg := &Message{
@@ -1303,7 +1310,7 @@ func (c *EtcdCluster) ListAllQueueConsumers(ctx context.Context) ([]*QueueConsum
// ForwardQueuePublish forwards a queue publish to a remote node.
func (c *EtcdCluster) ForwardQueuePublish(ctx context.Context, nodeID, topic string, payload []byte, properties map[string]string, forwardToLeader bool) error {
if c.transport == nil {
return fmt.Errorf("transport not configured")
return ErrTransportNotConfigured
}
// Use SendEnqueueRemote with topic in queueName field
@@ -1321,7 +1328,7 @@ func (c *EtcdCluster) HandleTakeover(ctx context.Context, clientID, fromNode, to
// Check if we have a message handler
if c.msgHandler == nil {
return nil, fmt.Errorf("no message handler configured")
return nil, ErrNoMessageHandlerConfigured
}
// Get session state and close the session
-5
View File
@@ -9,7 +9,6 @@ import (
"github.com/absmach/fluxmq/queue/storage"
"github.com/absmach/fluxmq/queue/types"
"github.com/absmach/fluxmq/topics"
)
var (
@@ -629,7 +628,3 @@ func pendingEntryToTypes(entry *PendingEntry) *types.PendingEntry {
}
}
// matchTopic checks if a topic matches a topic pattern using MQTT-style wildcards.
func matchTopic(pattern, topic string) bool {
return topics.TopicMatch(pattern, topic)
}
-33
View File
@@ -154,39 +154,6 @@ func groupKey(queueName, groupID string) string {
return queueName + "/" + groupID
}
// loadGroup loads a consumer group state from disk.
func (s *ConsumerGroupStateStore) loadGroup(queueName, groupID string) (*types.ConsumerGroupState, error) {
data, err := os.ReadFile(s.groupPath(queueName, groupID))
if err != nil {
return nil, err
}
state, hasAutoCommit, err := decodeConsumerGroupState(data)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal consumer group state: %w", err)
}
if state == nil {
return nil, fmt.Errorf("consumer group state is empty")
}
// Ensure maps are initialized
if state.Cursor == nil {
state.Cursor = &types.QueueCursor{}
}
if state.Mode == "" {
state.Mode = types.GroupModeQueue
}
if !hasAutoCommit {
state.AutoCommit = true
}
if state.PEL == nil {
state.PEL = make(map[string][]*types.PendingEntry)
}
if state.Consumers == nil {
state.Consumers = make(map[string]*types.ConsumerInfo)
}
return state, nil
}
// Save persists a consumer group state.
func (s *ConsumerGroupStateStore) Save(state *types.ConsumerGroupState) error {
-6
View File
@@ -73,12 +73,6 @@ func (c *StoreConfig) Apply(opts ...Option) {
}
}
// NewStoreWithOptions creates a new store with the given options.
func NewStoreWithOptions(baseDir string, opts ...Option) (*Store, error) {
config := DefaultStoreConfig()
config.Apply(opts...)
return NewStore(baseDir, config)
}
// Compression presets
-156
View File
@@ -202,159 +202,3 @@ func RecoverSegments(dir string) (*RecoveryResult, error) {
return result, nil
}
// ValidateSegment validates a segment without modifying it.
func ValidateSegment(dir string, baseOffset uint64) (bool, []error) {
var errors []error
segPath := filepath.Join(dir, FormatSegmentName(baseOffset))
file, err := os.Open(segPath)
if err != nil {
return false, []error{err}
}
defer file.Close()
info, err := file.Stat()
if err != nil {
return false, []error{err}
}
var pos int64 = 0
header := make([]byte, BatchHeaderSize)
batchCount := 0
for pos < info.Size() {
n, err := file.ReadAt(header, pos)
if err == io.EOF {
break
}
if err != nil || n < BatchHeaderSize {
errors = append(errors, fmt.Errorf("failed to read header at position %d", pos))
break
}
magic := GetUint32(header[0:4])
if magic != SegmentMagic {
errors = append(errors, fmt.Errorf("invalid magic at position %d", pos))
break
}
storedCRC := GetUint32(header[4:8])
batchLen := GetUint32(header[12:16])
totalSize := BatchHeaderSize + int(batchLen)
if pos+int64(totalSize) > info.Size() {
errors = append(errors, fmt.Errorf("incomplete batch at position %d", pos))
break
}
batchData := make([]byte, totalSize)
_, err = file.ReadAt(batchData, pos)
if err != nil {
errors = append(errors, fmt.Errorf("failed to read batch at position %d: %w", pos, err))
break
}
computedCRC := Checksum(batchData[8:])
if storedCRC != computedCRC {
errors = append(errors, fmt.Errorf("CRC mismatch at position %d: stored=%x computed=%x", pos, storedCRC, computedCRC))
break
}
pos += int64(totalSize)
batchCount++
}
return len(errors) == 0, errors
}
// ValidateIndex validates an index file against its segment.
func ValidateIndex(dir string, baseOffset uint64) (bool, []error) {
var errors []error
// Open index
indexPath := filepath.Join(dir, FormatIndexName(baseOffset))
index, err := OpenIndex(indexPath, baseOffset, true)
if err != nil {
return false, []error{fmt.Errorf("failed to open index: %w", err)}
}
defer index.Close()
// Open segment
segPath := filepath.Join(dir, FormatSegmentName(baseOffset))
file, err := os.Open(segPath)
if err != nil {
return false, []error{fmt.Errorf("failed to open segment: %w", err)}
}
defer file.Close()
info, err := file.Stat()
if err != nil {
return false, []error{err}
}
// Validate each index entry
entries := index.Entries()
for i, entry := range entries {
if int64(entry.FilePosition) >= info.Size() {
errors = append(errors, fmt.Errorf("entry %d: position %d beyond segment size %d", i, entry.FilePosition, info.Size()))
continue
}
// Read and validate magic at position
magic := make([]byte, 4)
_, err := file.ReadAt(magic, int64(entry.FilePosition))
if err != nil {
errors = append(errors, fmt.Errorf("entry %d: failed to read at position %d: %w", i, entry.FilePosition, err))
continue
}
if GetUint32(magic) != SegmentMagic {
errors = append(errors, fmt.Errorf("entry %d: invalid magic at position %d", i, entry.FilePosition))
}
}
return len(errors) == 0, errors
}
// CheckpointFile writes a checkpoint file with the current state.
type Checkpoint struct {
HeadOffset uint64
TailOffset uint64
Timestamp int64
}
// WriteCheckpoint writes a checkpoint file.
func WriteCheckpoint(dir string, cp Checkpoint) error {
path := filepath.Join(dir, "checkpoint")
tempPath := path + TempExtension
data := make([]byte, 24)
PutUint64(data[0:8], cp.HeadOffset)
PutUint64(data[8:16], cp.TailOffset)
PutUint64(data[16:24], uint64(cp.Timestamp))
if err := os.WriteFile(tempPath, data, 0o644); err != nil {
return err
}
return os.Rename(tempPath, path)
}
// ReadCheckpoint reads a checkpoint file.
func ReadCheckpoint(dir string) (Checkpoint, error) {
path := filepath.Join(dir, "checkpoint")
data, err := os.ReadFile(path)
if err != nil {
return Checkpoint{}, err
}
if len(data) < 24 {
return Checkpoint{}, fmt.Errorf("invalid checkpoint file")
}
return Checkpoint{
HeadOffset: GetUint64(data[0:8]),
TailOffset: GetUint64(data[8:16]),
Timestamp: int64(GetUint64(data[16:24])),
}, nil
}
-1
View File
@@ -33,7 +33,6 @@ type Segment struct {
// Write buffer for batching small writes
writeBuf []byte
writePos int
// Cached batch positions for efficient lookup
batchPositions []batchPosition
+8 -3
View File
@@ -15,7 +15,12 @@ import (
v5 "github.com/absmach/fluxmq/mqtt/packets/v5"
)
var _ Connection = (*connection)(nil)
var (
_ Connection = (*connection)(nil)
ErrUnsupportedProtocolVersion = errors.New("unsupported MQTT protocol version")
ErrCannotEncodeNilPacket = errors.New("cannot encode nil packet")
)
// Connection represents a network connection that can read/write MQTT packets.
// It also manages connection state and keep-alive.
@@ -84,7 +89,7 @@ func (c *connection) ReadPacket() (packets.ControlPacket, error) {
// v4 is MQTT 3.1.1, v3 is MQTT 3.1
pkt, err = v3.ReadPacket(c.reader)
default:
err = errors.New("unsupported MQTT protocol version")
err = ErrUnsupportedProtocolVersion
}
if err != nil {
@@ -95,7 +100,7 @@ func (c *connection) ReadPacket() (packets.ControlPacket, error) {
func (c *connection) WritePacket(pkt packets.ControlPacket) error {
if pkt == nil {
return errors.New("cannot encode nil packet")
return ErrCannotEncodeNilPacket
}
return pkt.Pack(c.conn)
}
-278
View File
@@ -1,278 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package consumer
import (
"context"
"fmt"
"time"
"github.com/absmach/fluxmq/queue/storage"
"github.com/absmach/fluxmq/queue/types"
)
// DLQHandler handles moving messages to the dead-letter queue
// when they exceed the maximum delivery count.
type DLQHandler struct {
queueStore storage.QueueStore
groupStore storage.ConsumerGroupStore
config DLQConfig
metrics *Metrics
}
// DLQConfig defines DLQ handler configuration.
type DLQConfig struct {
// MaxDeliveryCount is the maximum number of delivery attempts before DLQ.
MaxDeliveryCount int
// DLQTopicPrefix is the prefix for DLQ topics.
// Default: "$dlq/"
DLQTopicPrefix string
// IncludeMetadata adds failure metadata to DLQ messages.
IncludeMetadata bool
}
// DefaultDLQConfig returns default DLQ configuration.
func DefaultDLQConfig() DLQConfig {
return DLQConfig{
MaxDeliveryCount: 5,
DLQTopicPrefix: "$dlq/",
IncludeMetadata: true,
}
}
// NewDLQHandler creates a new DLQ handler.
func NewDLQHandler(queueStore storage.QueueStore, groupStore storage.ConsumerGroupStore, config DLQConfig, metrics *Metrics) *DLQHandler {
return &DLQHandler{
queueStore: queueStore,
groupStore: groupStore,
config: config,
metrics: metrics,
}
}
// ShouldMoveToDLQ returns true if the message should be moved to DLQ.
func (h *DLQHandler) ShouldMoveToDLQ(deliveryCount int) bool {
return deliveryCount >= h.config.MaxDeliveryCount
}
// MoveToDLQ moves a message to the dead-letter queue.
func (h *DLQHandler) MoveToDLQ(ctx context.Context, queueName, groupID string, entry *types.PendingEntry, reason string) error {
// Read the original message
msg, err := h.queueStore.Read(ctx, queueName, entry.Offset)
if err != nil {
return fmt.Errorf("failed to read message for DLQ: %w", err)
}
// Create DLQ queue name
dlq := h.getDLQ(queueName)
// Ensure DLQ exists
if err := h.ensureDLQ(ctx, dlq); err != nil {
return fmt.Errorf("failed to ensure DLQ: %w", err)
}
// Create DLQ message with metadata
dlqMsg := h.createDLQMessage(msg, queueName, groupID, entry, reason)
// Append to DLQ
_, err = h.queueStore.Append(ctx, dlq, dlqMsg)
if err != nil {
return fmt.Errorf("failed to append to DLQ: %w", err)
}
// Remove from original PEL
if err := h.groupStore.RemovePendingEntry(ctx, queueName, groupID, entry.ConsumerID, entry.Offset); err != nil {
return fmt.Errorf("failed to remove from PEL: %w", err)
}
// Record metrics
if h.metrics != nil {
h.metrics.RecordDLQ()
}
return nil
}
// ProcessExpiredEntries scans for entries that have exceeded max delivery count
// and moves them to the DLQ.
func (h *DLQHandler) ProcessExpiredEntries(ctx context.Context, queueName, groupID string) (int, error) {
group, err := h.groupStore.GetConsumerGroup(ctx, queueName, groupID)
if err != nil {
return 0, err
}
moved := 0
// Scan all PEL entries across all consumers
for consumerID, entries := range group.PEL {
// Process in reverse to safely remove during iteration
for i := len(entries) - 1; i >= 0; i-- {
entry := entries[i]
if entry.DeliveryCount >= h.config.MaxDeliveryCount {
reason := fmt.Sprintf("exceeded max delivery count (%d/%d)", entry.DeliveryCount, h.config.MaxDeliveryCount)
// Create a copy of entry with correct consumer ID
entryCopy := *entry
entryCopy.ConsumerID = consumerID
if err := h.MoveToDLQ(ctx, queueName, groupID, &entryCopy, reason); err != nil {
// Log error but continue processing
continue
}
moved++
}
}
}
return moved, nil
}
// getDLQ returns the DLQ queue name for a queue.
func (h *DLQHandler) getDLQ(queueName string) string {
return h.config.DLQTopicPrefix + queueName
}
// ensureDLQ creates the DLQ queue if it doesn't exist.
func (h *DLQHandler) ensureDLQ(ctx context.Context, dlq string) error {
_, err := h.queueStore.GetQueue(ctx, dlq)
if err == nil {
return nil // queue exists
}
if err != storage.ErrQueueNotFound {
return err
}
// Create DLQ with minimal config
config := types.QueueConfig{
Name: dlq,
Topics: []string{dlq}, // DLQ matches its own name
Reserved: false,
MaxMessageSize: 1024 * 1024, // 1MB
MaxDepth: 1000000, // 1M messages
MessageTTL: 30 * 24 * time.Hour, // 30 days retention
DeliveryTimeout: 30 * time.Second,
BatchSize: 100,
HeartbeatTimeout: 2 * time.Minute,
}
return h.queueStore.CreateQueue(ctx, config)
}
// createDLQMessage creates a message for the DLQ with failure metadata.
func (h *DLQHandler) createDLQMessage(original *types.Message, queueName, groupID string, entry *types.PendingEntry, reason string) *types.Message {
now := time.Now()
dlqMsg := &types.Message{
ID: original.ID + "-dlq",
Payload: original.Payload,
Topic: original.Topic,
Properties: make(map[string]string),
State: types.StateDLQ,
CreatedAt: now,
FailureReason: reason,
FirstAttempt: entry.ClaimedAt,
LastAttempt: now,
MovedToDLQAt: now,
}
// Copy original properties
for k, v := range original.Properties {
dlqMsg.Properties[k] = v
}
// Add DLQ metadata
if h.config.IncludeMetadata {
dlqMsg.Properties["dlq-original-queue"] = queueName
dlqMsg.Properties["dlq-original-offset"] = fmt.Sprintf("%d", entry.Offset)
dlqMsg.Properties["dlq-consumer-group"] = groupID
dlqMsg.Properties["dlq-consumer-id"] = entry.ConsumerID
dlqMsg.Properties["dlq-delivery-count"] = fmt.Sprintf("%d", entry.DeliveryCount)
dlqMsg.Properties["dlq-failure-reason"] = reason
dlqMsg.Properties["dlq-moved-at"] = now.Format(time.RFC3339)
}
return dlqMsg
}
// ReplayFromDLQ moves a message from DLQ back to the original queue.
func (h *DLQHandler) ReplayFromDLQ(ctx context.Context, dlq string, offset uint64) error {
// Read DLQ message
msg, err := h.queueStore.Read(ctx, dlq, offset)
if err != nil {
return fmt.Errorf("failed to read DLQ message: %w", err)
}
// Get original queue from metadata
originalQueue, ok := msg.Properties["dlq-original-queue"]
if !ok {
return fmt.Errorf("DLQ message missing original queue metadata")
}
// Reset message state
replayMsg := &types.Message{
ID: msg.ID + "-replay",
Payload: msg.Payload,
Topic: msg.Topic,
Properties: make(map[string]string),
State: types.StateQueued,
CreatedAt: time.Now(),
}
// Copy properties except DLQ metadata
for k, v := range msg.Properties {
if len(k) < 4 || k[:4] != "dlq-" {
replayMsg.Properties[k] = v
}
}
// Add replay metadata
replayMsg.Properties["replayed-from-dlq"] = dlq
replayMsg.Properties["replayed-at"] = time.Now().Format(time.RFC3339)
// Append to original queue
_, err = h.queueStore.Append(ctx, originalQueue, replayMsg)
return err
}
// ListDLQMessages lists messages in the DLQ for a queue.
func (h *DLQHandler) ListDLQMessages(ctx context.Context, queueName string, limit int) ([]*types.Message, error) {
dlq := h.getDLQ(queueName)
// Check if DLQ exists
_, err := h.queueStore.GetQueue(ctx, dlq)
if err != nil {
if err == storage.ErrQueueNotFound {
return []*types.Message{}, nil
}
return nil, err
}
// Read from head
head, err := h.queueStore.Head(ctx, dlq)
if err != nil {
return nil, err
}
return h.queueStore.ReadBatch(ctx, dlq, head, limit)
}
// GetDLQCount returns the number of messages in the DLQ.
func (h *DLQHandler) GetDLQCount(ctx context.Context, queueName string) (uint64, error) {
dlq := h.getDLQ(queueName)
_, err := h.queueStore.GetQueue(ctx, dlq)
if err != nil {
if err == storage.ErrQueueNotFound {
return 0, nil
}
return 0, err
}
return h.queueStore.Count(ctx, dlq)
}
-1
View File
@@ -191,7 +191,6 @@ func (h *PendingHeap) Clear() {
// There's only one log per queue.
type QueueHeapManager struct {
heap *PendingHeap
mu sync.RWMutex
}
// NewQueueHeapManager creates a new queue heap manager.
+8 -7
View File
@@ -16,12 +16,13 @@ import (
// Manager errors.
var (
ErrNoMessages = errors.New("no messages available")
ErrGroupNotFound = errors.New("consumer group not found")
ErrConsumerNotFound = errors.New("consumer not found")
ErrMessageNotPending = errors.New("message not in pending list")
ErrInvalidOffset = errors.New("invalid offset")
ErrGroupModeMismatch = errors.New("consumer group mode mismatch")
ErrNoMessages = errors.New("no messages available")
ErrGroupNotFound = errors.New("consumer group not found")
ErrConsumerNotFound = errors.New("consumer not found")
ErrMessageNotPending = errors.New("message not in pending list")
ErrInvalidOffset = errors.New("invalid offset")
ErrGroupModeMismatch = errors.New("consumer group mode mismatch")
ErrCommitOffsetOnlyForStreamMode = errors.New("commit offset only supported for stream groups")
)
// Manager handles consumer group operations including claiming,
@@ -552,7 +553,7 @@ func (m *Manager) CommitOffset(ctx context.Context, queueName, groupID string, o
}
if group.Mode != types.GroupModeStream {
return errors.New("commit offset only supported for stream groups")
return ErrCommitOffsetOnlyForStreamMode
}
cursor := group.GetCursor()
-2
View File
@@ -48,7 +48,6 @@ type Manager struct {
subscriptionsMu sync.RWMutex
subscriptions map[string]map[string]*subscriptionRef // clientID -> refKey -> ref
mu sync.RWMutex
stopCh chan struct{}
stopOnce sync.Once
wg sync.WaitGroup
@@ -1781,7 +1780,6 @@ type DeliveryMessage struct {
Topic string
Properties map[string]string
GroupID string
queueName string
Offset uint64
DeliveredAt time.Time
AckTopic string
-3
View File
@@ -9,7 +9,6 @@ import (
"fmt"
"io"
"log/slog"
"sync"
"time"
"github.com/absmach/fluxmq/queue/storage"
@@ -88,8 +87,6 @@ type LogFSM struct {
queueStore storage.QueueStore
groupStore storage.ConsumerGroupStore
logger *slog.Logger
mu sync.RWMutex
}
// NewLogFSM creates a new FSM for queue operations.
-7
View File
@@ -15,10 +15,3 @@ type Consumer struct {
LastHeartbeat time.Time
ProxyNodeID string // For cluster routing
}
// ConsumerGroup represents a group of consumers.
type ConsumerGroup struct {
ID string
QueueName string
Consumers map[string]*Consumer
}
+13 -5
View File
@@ -25,6 +25,14 @@ import (
"github.com/gorilla/websocket"
)
var (
ErrExpectedBinaryMessage = errors.New("expected binary message")
ErrUnsupportedProtocolVersion = errors.New("unsupported MQTT protocol version")
ErrCannotEncodeNilPacket = errors.New("cannot encode nil packet")
ErrReadNotSupported = errors.New("Read not supported on WebSocket connection")
ErrWriteNotSupported = errors.New("Write not supported on WebSocket connection")
)
// IPRateLimiter is the interface for IP-based rate limiting.
type IPRateLimiter interface {
Allow(addr net.Addr) bool
@@ -242,7 +250,7 @@ func (c *wsConnection) ReadPacket() (packets.ControlPacket, error) {
}
if messageType != websocket.BinaryMessage {
return nil, errors.New("expected binary message")
return nil, ErrExpectedBinaryMessage
}
reader := bytes.NewReader(data)
@@ -265,7 +273,7 @@ func (c *wsConnection) ReadPacket() (packets.ControlPacket, error) {
case 3, 4:
pkt, err = v3.ReadPacket(c.reader)
default:
err = errors.New("unsupported MQTT protocol version")
err = ErrUnsupportedProtocolVersion
}
if err != nil {
@@ -276,7 +284,7 @@ func (c *wsConnection) ReadPacket() (packets.ControlPacket, error) {
func (c *wsConnection) WritePacket(pkt packets.ControlPacket) error {
if pkt == nil {
return errors.New("cannot encode nil packet")
return ErrCannotEncodeNilPacket
}
buf := &bytes.Buffer{}
@@ -288,11 +296,11 @@ func (c *wsConnection) WritePacket(pkt packets.ControlPacket) error {
}
func (c *wsConnection) Read(b []byte) (n int, err error) {
return 0, errors.New("Read not supported on WebSocket connection")
return 0, ErrReadNotSupported
}
func (c *wsConnection) Write(b []byte) (n int, err error) {
return 0, errors.New("Write not supported on WebSocket connection")
return 0, ErrWriteNotSupported
}
func (c *wsConnection) Close() error {