mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 07:30:25 +00:00
Fix map ordering in consumers
Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
+33
-5
@@ -227,6 +227,9 @@ type Group struct {
|
||||
id string
|
||||
queueName string
|
||||
consumers map[string]*types.Consumer
|
||||
sortedIDs []string
|
||||
idsDirty bool
|
||||
rrIndex int
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
@@ -250,6 +253,7 @@ func (cg *Group) AddConsumer(consumer *types.Consumer) {
|
||||
defer cg.mu.Unlock()
|
||||
|
||||
cg.consumers[consumer.ID] = consumer
|
||||
cg.idsDirty = true
|
||||
}
|
||||
|
||||
// RemoveConsumer removes a consumer from the group.
|
||||
@@ -258,6 +262,11 @@ func (cg *Group) RemoveConsumer(consumerID string) {
|
||||
defer cg.mu.Unlock()
|
||||
|
||||
delete(cg.consumers, consumerID)
|
||||
cg.idsDirty = true
|
||||
if len(cg.consumers) == 0 {
|
||||
cg.sortedIDs = nil
|
||||
cg.rrIndex = 0
|
||||
}
|
||||
}
|
||||
|
||||
// GetConsumer returns a consumer by ID.
|
||||
@@ -291,16 +300,35 @@ func (cg *Group) Size() int {
|
||||
|
||||
// GetNextConsumer returns the next consumer for round-robin message delivery.
|
||||
func (cg *Group) GetNextConsumer() (*types.Consumer, bool) {
|
||||
cg.mu.RLock()
|
||||
defer cg.mu.RUnlock()
|
||||
cg.mu.Lock()
|
||||
defer cg.mu.Unlock()
|
||||
|
||||
if len(cg.consumers) == 0 {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Simple round-robin: return first consumer
|
||||
// For proper round-robin with state, use an external counter
|
||||
for _, consumer := range cg.consumers {
|
||||
if cg.idsDirty {
|
||||
ids := make([]string, 0, len(cg.consumers))
|
||||
for id := range cg.consumers {
|
||||
ids = append(ids, id)
|
||||
}
|
||||
sort.Strings(ids)
|
||||
cg.sortedIDs = ids
|
||||
cg.idsDirty = false
|
||||
}
|
||||
|
||||
if cg.rrIndex >= len(cg.sortedIDs) {
|
||||
cg.rrIndex = 0
|
||||
}
|
||||
|
||||
id := cg.sortedIDs[cg.rrIndex]
|
||||
cg.rrIndex++
|
||||
if cg.rrIndex >= len(cg.sortedIDs) {
|
||||
cg.rrIndex = 0
|
||||
}
|
||||
|
||||
consumer, exists := cg.consumers[id]
|
||||
if exists {
|
||||
return consumer, true
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user