Split Manager code to more maintainable units

Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
dusan
2026-02-08 18:17:49 +01:00
parent 1b2b6a8104
commit 245fe8260c
6 changed files with 387 additions and 325 deletions
-322
View File
@@ -7,7 +7,6 @@ import (
"context"
"fmt"
"log/slog"
"strconv"
"strings"
"sync"
"time"
@@ -60,19 +59,6 @@ type Manager struct {
metrics *consumer.Metrics
}
type subscriptionRef struct {
queueName string
groupID string
refCount int
lastSeen time.Time
}
type subscriptionTarget struct {
key string
queueName string
groupID string
}
// Config holds configuration for the queue-based queue manager.
type Config struct {
// Consumer configuration
@@ -1568,314 +1554,6 @@ func (m *Manager) runEphemeralCleanupLoop() {
}
}
// --- Helper Functions ---
func (m *Manager) subscriptionRefKey(queueName, groupID string) string {
return queueName + "\x00" + groupID
}
func (m *Manager) trackSubscription(clientID, queueName, groupID string) {
if clientID == "" || queueName == "" || groupID == "" {
return
}
key := m.subscriptionRefKey(queueName, groupID)
now := time.Now()
m.subscriptionsMu.Lock()
defer m.subscriptionsMu.Unlock()
refs, ok := m.subscriptions[clientID]
if !ok {
refs = make(map[string]*subscriptionRef)
m.subscriptions[clientID] = refs
}
if ref, ok := refs[key]; ok {
ref.refCount++
ref.lastSeen = now
return
}
refs[key] = &subscriptionRef{
queueName: queueName,
groupID: groupID,
refCount: 1,
lastSeen: now,
}
}
func (m *Manager) untrackSubscription(clientID, queueName, groupID string) {
if clientID == "" || queueName == "" || groupID == "" {
return
}
key := m.subscriptionRefKey(queueName, groupID)
m.subscriptionsMu.Lock()
defer m.subscriptionsMu.Unlock()
refs, ok := m.subscriptions[clientID]
if !ok {
return
}
ref, ok := refs[key]
if !ok {
return
}
ref.refCount--
if ref.refCount <= 0 {
delete(refs, key)
}
if len(refs) == 0 {
delete(m.subscriptions, clientID)
}
}
func (m *Manager) getSubscriptionTargets(clientID string) []subscriptionTarget {
m.subscriptionsMu.RLock()
defer m.subscriptionsMu.RUnlock()
refs, ok := m.subscriptions[clientID]
if !ok {
return nil
}
targets := make([]subscriptionTarget, 0, len(refs))
for key, ref := range refs {
targets = append(targets, subscriptionTarget{
key: key,
queueName: ref.queueName,
groupID: ref.groupID,
})
}
return targets
}
func (m *Manager) touchSubscription(clientID, key string, ts time.Time) {
m.subscriptionsMu.Lock()
defer m.subscriptionsMu.Unlock()
refs, ok := m.subscriptions[clientID]
if !ok {
return
}
ref, ok := refs[key]
if !ok {
return
}
ref.lastSeen = ts
}
func (m *Manager) removeSubscriptionKeys(clientID string, keys []string) {
if len(keys) == 0 {
return
}
m.subscriptionsMu.Lock()
defer m.subscriptionsMu.Unlock()
refs, ok := m.subscriptions[clientID]
if !ok {
return
}
for _, key := range keys {
delete(refs, key)
}
if len(refs) == 0 {
delete(m.subscriptions, clientID)
}
}
func (m *Manager) pruneStaleSubscriptions() {
maxIdle := m.config.ConsumerTimeout * 2
if maxIdle <= 0 {
maxIdle = 5 * time.Minute
}
cutoff := time.Now().Add(-maxIdle)
m.subscriptionsMu.Lock()
defer m.subscriptionsMu.Unlock()
for clientID, refs := range m.subscriptions {
for key, ref := range refs {
if ref.lastSeen.Before(cutoff) {
delete(refs, key)
}
}
if len(refs) == 0 {
delete(m.subscriptions, clientID)
}
}
}
func (m *Manager) createDeliveryMessage(msg *types.Message, groupID string, queueName string) *brokerstorage.Message {
props := m.createRouteProperties(msg, groupID, queueName)
deliveryMsg := &brokerstorage.Message{
Topic: msg.Topic,
QoS: 1, // queue messages use QoS 1 by default
Properties: props,
}
deliveryMsg.SetPayloadFromBytes(msg.GetPayload())
return deliveryMsg
}
func (m *Manager) decorateStreamDelivery(delivery *brokerstorage.Message, msg *types.Message, _ *types.ConsumerGroup, workCommitted uint64, hasWorkCommitted bool, primaryGroup string) {
if delivery == nil || msg == nil {
return
}
if delivery.Properties == nil {
delivery.Properties = make(map[string]string)
}
m.decorateStreamProperties(delivery.Properties, msg, workCommitted, hasWorkCommitted, primaryGroup)
}
func (m *Manager) createRouteProperties(msg *types.Message, groupID, queueName string) map[string]string {
props := make(map[string]string, len(msg.Properties)+4)
for k, v := range msg.Properties {
props[k] = v
}
props["message-id"] = fmt.Sprintf("%s:%d", queueName, msg.Sequence)
props["group-id"] = groupID
props["queue"] = queueName
props["offset"] = fmt.Sprintf("%d", msg.Sequence)
return props
}
func (m *Manager) decorateStreamProperties(properties map[string]string, msg *types.Message, workCommitted uint64, hasWorkCommitted bool, primaryGroup string) {
if properties == nil || msg == nil {
return
}
properties["x-stream-offset"] = fmt.Sprintf("%d", msg.Sequence)
if !msg.CreatedAt.IsZero() {
properties["x-stream-timestamp"] = fmt.Sprintf("%d", msg.CreatedAt.UnixMilli())
}
if hasWorkCommitted {
properties["x-work-committed-offset"] = fmt.Sprintf("%d", workCommitted)
properties["x-work-acked"] = strconv.FormatBool(msg.Sequence < workCommitted)
if primaryGroup != "" {
properties["x-work-group"] = primaryGroup
}
}
}
// DeliveryMessage is the internal message format for queue delivery tracking.
type DeliveryMessage struct {
ID string
Payload []byte
Topic string
Properties map[string]string
GroupID string
Offset uint64
DeliveredAt time.Time
AckTopic string
NackTopic string
RejectTopic string
}
func extractGroupFromClientID(clientID string) string {
for i, c := range clientID {
if c == '-' {
return clientID[:i]
}
}
return clientID
}
func generateMessageID() string {
return fmt.Sprintf("%d", time.Now().UnixNano())
}
func parseMessageID(messageID string) (uint64, error) {
var offset uint64
// Format: queueName:offset (we only need the offset)
for i := len(messageID) - 1; i >= 0; i-- {
if messageID[i] == ':' {
_, err := fmt.Sscanf(messageID[i+1:], "%d", &offset)
return offset, err
}
}
// Try parsing as just an offset
_, err := fmt.Sscanf(messageID, "%d", &offset)
return offset, err
}
func (m *Manager) offsetByTime(ctx context.Context, queueName string, ts time.Time) (uint64, error) {
if provider, ok := m.queueStore.(storage.TimeOffsetProvider); ok {
return provider.OffsetByTime(ctx, queueName, ts)
}
return m.queueStore.Head(ctx, queueName)
}
func (m *Manager) offsetBySize(ctx context.Context, queueName string, retentionBytes int64) (uint64, error) {
if provider, ok := m.queueStore.(storage.SizeOffsetProvider); ok {
return provider.OffsetBySize(ctx, queueName, retentionBytes)
}
return m.queueStore.Head(ctx, queueName)
}
func (m *Manager) computeRetentionOffset(ctx context.Context, config *types.QueueConfig) (uint64, bool) {
if config == nil {
return 0, false
}
var offset uint64
hasRetention := false
if config.Retention.RetentionTime > 0 {
cutoff := time.Now().Add(-config.Retention.RetentionTime)
if off, err := m.offsetByTime(ctx, config.Name, cutoff); err == nil {
if off > offset {
offset = off
}
hasRetention = true
}
}
if config.Retention.RetentionBytes > 0 {
if off, err := m.offsetBySize(ctx, config.Name, config.Retention.RetentionBytes); err == nil {
if off > offset {
offset = off
}
hasRetention = true
}
}
if config.Retention.RetentionMessages > 0 {
head, err := m.queueStore.Head(ctx, config.Name)
if err == nil {
tail, err := m.queueStore.Tail(ctx, config.Name)
if err == nil {
if tail > head+uint64(config.Retention.RetentionMessages) {
msgOffset := tail - uint64(config.Retention.RetentionMessages)
if msgOffset > offset {
offset = msgOffset
}
} else if head > offset {
offset = head
}
hasRetention = true
}
}
}
return offset, hasRetention
}
// --- Metrics ---
// GetMetrics returns the current metrics snapshot.
+110
View File
@@ -0,0 +1,110 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package queue
import (
"fmt"
"strconv"
"time"
"github.com/absmach/fluxmq/queue/types"
brokerstorage "github.com/absmach/fluxmq/storage"
)
func (m *Manager) createDeliveryMessage(msg *types.Message, groupID string, queueName string) *brokerstorage.Message {
props := m.createRouteProperties(msg, groupID, queueName)
deliveryMsg := &brokerstorage.Message{
Topic: msg.Topic,
QoS: 1, // queue messages use QoS 1 by default
Properties: props,
}
deliveryMsg.SetPayloadFromBytes(msg.GetPayload())
return deliveryMsg
}
func (m *Manager) decorateStreamDelivery(delivery *brokerstorage.Message, msg *types.Message, _ *types.ConsumerGroup, workCommitted uint64, hasWorkCommitted bool, primaryGroup string) {
if delivery == nil || msg == nil {
return
}
if delivery.Properties == nil {
delivery.Properties = make(map[string]string)
}
m.decorateStreamProperties(delivery.Properties, msg, workCommitted, hasWorkCommitted, primaryGroup)
}
func (m *Manager) createRouteProperties(msg *types.Message, groupID, queueName string) map[string]string {
props := make(map[string]string, len(msg.Properties)+4)
for k, v := range msg.Properties {
props[k] = v
}
props["message-id"] = fmt.Sprintf("%s:%d", queueName, msg.Sequence)
props["group-id"] = groupID
props["queue"] = queueName
props["offset"] = fmt.Sprintf("%d", msg.Sequence)
return props
}
func (m *Manager) decorateStreamProperties(properties map[string]string, msg *types.Message, workCommitted uint64, hasWorkCommitted bool, primaryGroup string) {
if properties == nil || msg == nil {
return
}
properties["x-stream-offset"] = fmt.Sprintf("%d", msg.Sequence)
if !msg.CreatedAt.IsZero() {
properties["x-stream-timestamp"] = fmt.Sprintf("%d", msg.CreatedAt.UnixMilli())
}
if hasWorkCommitted {
properties["x-work-committed-offset"] = fmt.Sprintf("%d", workCommitted)
properties["x-work-acked"] = strconv.FormatBool(msg.Sequence < workCommitted)
if primaryGroup != "" {
properties["x-work-group"] = primaryGroup
}
}
}
// DeliveryMessage is the internal message format for queue delivery tracking.
type DeliveryMessage struct {
ID string
Payload []byte
Topic string
Properties map[string]string
GroupID string
Offset uint64
DeliveredAt time.Time
AckTopic string
NackTopic string
RejectTopic string
}
func extractGroupFromClientID(clientID string) string {
for i, c := range clientID {
if c == '-' {
return clientID[:i]
}
}
return clientID
}
func generateMessageID() string {
return fmt.Sprintf("%d", time.Now().UnixNano())
}
func parseMessageID(messageID string) (uint64, error) {
var offset uint64
// Format: queueName:offset (we only need the offset)
for i := len(messageID) - 1; i >= 0; i-- {
if messageID[i] == ':' {
_, err := fmt.Sscanf(messageID[i+1:], "%d", &offset)
return offset, err
}
}
// Try parsing as just an offset
_, err := fmt.Sscanf(messageID, "%d", &offset)
return offset, err
}
+74
View File
@@ -0,0 +1,74 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package queue
import (
"context"
"time"
"github.com/absmach/fluxmq/queue/storage"
"github.com/absmach/fluxmq/queue/types"
)
func (m *Manager) offsetByTime(ctx context.Context, queueName string, ts time.Time) (uint64, error) {
if provider, ok := m.queueStore.(storage.TimeOffsetProvider); ok {
return provider.OffsetByTime(ctx, queueName, ts)
}
return m.queueStore.Head(ctx, queueName)
}
func (m *Manager) offsetBySize(ctx context.Context, queueName string, retentionBytes int64) (uint64, error) {
if provider, ok := m.queueStore.(storage.SizeOffsetProvider); ok {
return provider.OffsetBySize(ctx, queueName, retentionBytes)
}
return m.queueStore.Head(ctx, queueName)
}
func (m *Manager) computeRetentionOffset(ctx context.Context, config *types.QueueConfig) (uint64, bool) {
if config == nil {
return 0, false
}
var offset uint64
hasRetention := false
if config.Retention.RetentionTime > 0 {
cutoff := time.Now().Add(-config.Retention.RetentionTime)
if off, err := m.offsetByTime(ctx, config.Name, cutoff); err == nil {
if off > offset {
offset = off
}
hasRetention = true
}
}
if config.Retention.RetentionBytes > 0 {
if off, err := m.offsetBySize(ctx, config.Name, config.Retention.RetentionBytes); err == nil {
if off > offset {
offset = off
}
hasRetention = true
}
}
if config.Retention.RetentionMessages > 0 {
head, err := m.queueStore.Head(ctx, config.Name)
if err == nil {
tail, err := m.queueStore.Tail(ctx, config.Name)
if err == nil {
if tail > head+uint64(config.Retention.RetentionMessages) {
msgOffset := tail - uint64(config.Retention.RetentionMessages)
if msgOffset > offset {
offset = msgOffset
}
} else if head > offset {
offset = head
}
hasRetention = true
}
}
}
return offset, hasRetention
}
+33
View File
@@ -0,0 +1,33 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package queue
import (
"context"
"github.com/absmach/fluxmq/broker"
"github.com/absmach/fluxmq/cluster"
"github.com/absmach/fluxmq/queue/consumer"
)
var _ Service = (*Manager)(nil)
// StreamCommitter exposes explicit commit control for stream consumer groups.
type StreamCommitter interface {
CommitOffset(ctx context.Context, queueName, groupID string, offset uint64) error
}
// MetricsProvider exposes read-only queue delivery metrics.
type MetricsProvider interface {
GetMetrics() consumer.Metrics
GetLag(ctx context.Context, queueName, groupID string) (uint64, error)
}
// Service captures the queue manager contracts used by protocol brokers and cluster transport.
type Service interface {
broker.QueueManager
cluster.QueueHandler
StreamCommitter
MetricsProvider
}
+166
View File
@@ -0,0 +1,166 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package queue
import "time"
type subscriptionRef struct {
queueName string
groupID string
refCount int
lastSeen time.Time
}
type subscriptionTarget struct {
key string
queueName string
groupID string
}
func (m *Manager) subscriptionRefKey(queueName, groupID string) string {
return queueName + "\x00" + groupID
}
func (m *Manager) trackSubscription(clientID, queueName, groupID string) {
if clientID == "" || queueName == "" || groupID == "" {
return
}
key := m.subscriptionRefKey(queueName, groupID)
now := time.Now()
m.subscriptionsMu.Lock()
defer m.subscriptionsMu.Unlock()
refs, ok := m.subscriptions[clientID]
if !ok {
refs = make(map[string]*subscriptionRef)
m.subscriptions[clientID] = refs
}
if ref, ok := refs[key]; ok {
ref.refCount++
ref.lastSeen = now
return
}
refs[key] = &subscriptionRef{
queueName: queueName,
groupID: groupID,
refCount: 1,
lastSeen: now,
}
}
func (m *Manager) untrackSubscription(clientID, queueName, groupID string) {
if clientID == "" || queueName == "" || groupID == "" {
return
}
key := m.subscriptionRefKey(queueName, groupID)
m.subscriptionsMu.Lock()
defer m.subscriptionsMu.Unlock()
refs, ok := m.subscriptions[clientID]
if !ok {
return
}
ref, ok := refs[key]
if !ok {
return
}
ref.refCount--
if ref.refCount <= 0 {
delete(refs, key)
}
if len(refs) == 0 {
delete(m.subscriptions, clientID)
}
}
func (m *Manager) getSubscriptionTargets(clientID string) []subscriptionTarget {
m.subscriptionsMu.RLock()
defer m.subscriptionsMu.RUnlock()
refs, ok := m.subscriptions[clientID]
if !ok {
return nil
}
targets := make([]subscriptionTarget, 0, len(refs))
for key, ref := range refs {
targets = append(targets, subscriptionTarget{
key: key,
queueName: ref.queueName,
groupID: ref.groupID,
})
}
return targets
}
func (m *Manager) touchSubscription(clientID, key string, ts time.Time) {
m.subscriptionsMu.Lock()
defer m.subscriptionsMu.Unlock()
refs, ok := m.subscriptions[clientID]
if !ok {
return
}
ref, ok := refs[key]
if !ok {
return
}
ref.lastSeen = ts
}
func (m *Manager) removeSubscriptionKeys(clientID string, keys []string) {
if len(keys) == 0 {
return
}
m.subscriptionsMu.Lock()
defer m.subscriptionsMu.Unlock()
refs, ok := m.subscriptions[clientID]
if !ok {
return
}
for _, key := range keys {
delete(refs, key)
}
if len(refs) == 0 {
delete(m.subscriptions, clientID)
}
}
func (m *Manager) pruneStaleSubscriptions() {
maxIdle := m.config.ConsumerTimeout * 2
if maxIdle <= 0 {
maxIdle = 5 * time.Minute
}
cutoff := time.Now().Add(-maxIdle)
m.subscriptionsMu.Lock()
defer m.subscriptionsMu.Unlock()
for clientID, refs := range m.subscriptions {
for key, ref := range refs {
if ref.lastSeen.Before(cutoff) {
delete(refs, key)
}
}
if len(refs) == 0 {
delete(m.subscriptions, clientID)
}
}
}
+4 -3
View File
@@ -5,9 +5,10 @@ package types
// PublishRequest encapsulates publish data for queue routing.
type PublishRequest struct {
Topic string
Payload []byte
Properties map[string]string
PublisherID string
Topic string
Payload []byte
Properties map[string]string
}
// PublishMode controls how the queue manager should handle a publish.