Rename consumer group state to consumer group

Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
dusan
2026-02-08 14:48:23 +01:00
parent 97fe8cd4c0
commit db3fcb8469
11 changed files with 77 additions and 79 deletions
+6 -7
View File
@@ -320,7 +320,7 @@ func (a *Adapter) TotalCount(ctx context.Context, queueName string) (uint64, err
// ConsumerGroupStore interface implementation
// CreateConsumerGroup creates a new consumer group for a queue.
func (a *Adapter) CreateConsumerGroup(ctx context.Context, group *types.ConsumerGroupState) error {
func (a *Adapter) CreateConsumerGroup(ctx context.Context, group *types.ConsumerGroup) error {
existing, _ := a.groupStore.Get(group.QueueName, group.ID)
if existing != nil {
return storage.ErrConsumerGroupExists
@@ -330,7 +330,7 @@ func (a *Adapter) CreateConsumerGroup(ctx context.Context, group *types.Consumer
}
// GetConsumerGroup retrieves a consumer group's state.
func (a *Adapter) GetConsumerGroup(ctx context.Context, queueName, groupID string) (*types.ConsumerGroupState, error) {
func (a *Adapter) GetConsumerGroup(ctx context.Context, queueName, groupID string) (*types.ConsumerGroup, error) {
group, err := a.groupStore.Get(queueName, groupID)
if err != nil {
return nil, err
@@ -350,7 +350,7 @@ func (a *Adapter) GetConsumerGroup(ctx context.Context, queueName, groupID strin
}
// UpdateConsumerGroup updates a consumer group's state.
func (a *Adapter) UpdateConsumerGroup(ctx context.Context, group *types.ConsumerGroupState) error {
func (a *Adapter) UpdateConsumerGroup(ctx context.Context, group *types.ConsumerGroup) error {
group.UpdatedAt = time.Now()
return a.groupStore.Save(group)
}
@@ -361,7 +361,7 @@ func (a *Adapter) DeleteConsumerGroup(ctx context.Context, queueName, groupID st
}
// ListConsumerGroups lists all consumer groups for a queue.
func (a *Adapter) ListConsumerGroups(ctx context.Context, queueName string) ([]*types.ConsumerGroupState, error) {
func (a *Adapter) ListConsumerGroups(ctx context.Context, queueName string) ([]*types.ConsumerGroup, error) {
return a.groupStore.List(queueName)
}
@@ -554,7 +554,7 @@ func (a *Adapter) Sync() error {
// Helper functions
// syncCursorsFromStore syncs cursor state from the log store to the group state.
func (a *Adapter) syncCursorsFromStore(queueName, groupID string, group *types.ConsumerGroupState) {
func (a *Adapter) syncCursorsFromStore(queueName, groupID string, group *types.ConsumerGroup) {
cursorState, err := a.store.GetCursorState(queueName, groupID)
if err != nil {
return
@@ -566,7 +566,7 @@ func (a *Adapter) syncCursorsFromStore(queueName, groupID string, group *types.C
}
// syncPELFromStore syncs PEL state from the log store to the group state.
func (a *Adapter) syncPELFromStore(queueName, groupID string, group *types.ConsumerGroupState) {
func (a *Adapter) syncPELFromStore(queueName, groupID string, group *types.ConsumerGroup) {
allEntries, err := a.store.GetAllPending(queueName, groupID)
if err != nil {
return
@@ -627,4 +627,3 @@ func pendingEntryToTypes(entry *PendingEntry) *types.PendingEntry {
DeliveryCount: int(entry.DeliveryCount),
}
}
+20 -21
View File
@@ -20,8 +20,8 @@ type ConsumerGroupStateStore struct {
mu sync.RWMutex
dir string
groups map[string]map[string]*types.ConsumerGroupState // queueName -> groupID -> state
dirty map[string]bool // groupKey -> dirty flag
groups map[string]map[string]*types.ConsumerGroup // queueName -> groupID -> state
dirty map[string]bool // groupKey -> dirty flag
}
const consumerGroupVersion uint8 = 2
@@ -41,7 +41,7 @@ func NewConsumerGroupStateStore(baseDir string) (*ConsumerGroupStateStore, error
store := &ConsumerGroupStateStore{
dir: dir,
groups: make(map[string]map[string]*types.ConsumerGroupState),
groups: make(map[string]map[string]*types.ConsumerGroup),
dirty: make(map[string]bool),
}
@@ -52,7 +52,7 @@ func NewConsumerGroupStateStore(baseDir string) (*ConsumerGroupStateStore, error
return store, nil
}
func decodeConsumerGroupState(data []byte) (*types.ConsumerGroupState, bool, error) {
func decodeConsumerGroupState(data []byte) (*types.ConsumerGroup, bool, error) {
var wrapper consumerGroupWrapper
if err := json.Unmarshal(data, &wrapper); err != nil {
return nil, false, err
@@ -66,7 +66,7 @@ func decodeConsumerGroupState(data []byte) (*types.ConsumerGroupState, bool, err
return nil, false, nil
}
var state types.ConsumerGroupState
var state types.ConsumerGroup
if err := json.Unmarshal(rawState, &state); err != nil {
return nil, false, err
}
@@ -129,7 +129,7 @@ func (s *ConsumerGroupStateStore) loadAll() error {
// Add to memory map
groups, ok := s.groups[state.QueueName]
if !ok {
groups = make(map[string]*types.ConsumerGroupState)
groups = make(map[string]*types.ConsumerGroup)
s.groups[state.QueueName] = groups
}
groups[state.ID] = state
@@ -154,15 +154,14 @@ func groupKey(queueName, groupID string) string {
return queueName + "/" + groupID
}
// Save persists a consumer group state.
func (s *ConsumerGroupStateStore) Save(state *types.ConsumerGroupState) error {
func (s *ConsumerGroupStateStore) Save(state *types.ConsumerGroup) error {
s.mu.Lock()
defer s.mu.Unlock()
groups, ok := s.groups[state.QueueName]
if !ok {
groups = make(map[string]*types.ConsumerGroupState)
groups = make(map[string]*types.ConsumerGroup)
s.groups[state.QueueName] = groups
}
@@ -173,7 +172,7 @@ func (s *ConsumerGroupStateStore) Save(state *types.ConsumerGroupState) error {
}
// saveGroup saves a single group to disk (must hold lock).
func (s *ConsumerGroupStateStore) saveGroup(state *types.ConsumerGroupState) error {
func (s *ConsumerGroupStateStore) saveGroup(state *types.ConsumerGroup) error {
path := s.groupPath(state.QueueName, state.ID)
dir := filepath.Dir(path)
@@ -182,9 +181,9 @@ func (s *ConsumerGroupStateStore) saveGroup(state *types.ConsumerGroupState) err
}
wrapper := struct {
Version uint8 `json:"version"`
State *types.ConsumerGroupState `json:"state"`
SavedAt int64 `json:"saved_at"`
Version uint8 `json:"version"`
State *types.ConsumerGroup `json:"state"`
SavedAt int64 `json:"saved_at"`
}{
Version: consumerGroupVersion,
State: state,
@@ -213,7 +212,7 @@ func (s *ConsumerGroupStateStore) saveGroup(state *types.ConsumerGroupState) err
}
// Get retrieves a consumer group state.
func (s *ConsumerGroupStateStore) Get(queueName, groupID string) (*types.ConsumerGroupState, error) {
func (s *ConsumerGroupStateStore) Get(queueName, groupID string) (*types.ConsumerGroup, error) {
s.mu.RLock()
defer s.mu.RUnlock()
@@ -261,16 +260,16 @@ func (s *ConsumerGroupStateStore) Delete(queueName, groupID string) error {
}
// List returns all consumer groups for a queue.
func (s *ConsumerGroupStateStore) List(queueName string) ([]*types.ConsumerGroupState, error) {
func (s *ConsumerGroupStateStore) List(queueName string) ([]*types.ConsumerGroup, error) {
s.mu.RLock()
defer s.mu.RUnlock()
groups, ok := s.groups[queueName]
if !ok {
return []*types.ConsumerGroupState{}, nil
return []*types.ConsumerGroup{}, nil
}
result := make([]*types.ConsumerGroupState, 0, len(groups))
result := make([]*types.ConsumerGroup, 0, len(groups))
for _, state := range groups {
result = append(result, state)
}
@@ -279,11 +278,11 @@ func (s *ConsumerGroupStateStore) List(queueName string) ([]*types.ConsumerGroup
}
// ListAll returns all consumer groups across all queues.
func (s *ConsumerGroupStateStore) ListAll() ([]*types.ConsumerGroupState, error) {
func (s *ConsumerGroupStateStore) ListAll() ([]*types.ConsumerGroup, error) {
s.mu.RLock()
defer s.mu.RUnlock()
var result []*types.ConsumerGroupState
var result []*types.ConsumerGroup
for _, groups := range s.groups {
for _, state := range groups {
result = append(result, state)
@@ -334,13 +333,13 @@ func (s *ConsumerGroupStateStore) Exists(queueName, groupID string) bool {
}
// CreateIfNotExists creates a consumer group if it doesn't exist.
func (s *ConsumerGroupStateStore) CreateIfNotExists(state *types.ConsumerGroupState) error {
func (s *ConsumerGroupStateStore) CreateIfNotExists(state *types.ConsumerGroup) error {
s.mu.Lock()
defer s.mu.Unlock()
groups, ok := s.groups[state.QueueName]
if !ok {
groups = make(map[string]*types.ConsumerGroupState)
groups = make(map[string]*types.ConsumerGroup)
s.groups[state.QueueName] = groups
}
+4 -4
View File
@@ -77,7 +77,7 @@ func NewManager(queueStore storage.QueueStore, groupStore storage.ConsumerGroupS
}
// GetOrCreateGroup retrieves or creates a consumer group.
func (m *Manager) GetOrCreateGroup(ctx context.Context, queueName, groupID, pattern string, mode types.ConsumerGroupMode, autoCommit bool) (*types.ConsumerGroupState, error) {
func (m *Manager) GetOrCreateGroup(ctx context.Context, queueName, groupID, pattern string, mode types.ConsumerGroupMode, autoCommit bool) (*types.ConsumerGroup, error) {
// Try to get existing group
group, err := m.groupStore.GetConsumerGroup(ctx, queueName, groupID)
if err == nil {
@@ -284,7 +284,7 @@ func (m *Manager) ClaimBatchStream(ctx context.Context, queueName, groupID, cons
}
// claimFromCursor tries to claim a message from the cursor position.
func (m *Manager) claimFromCursor(ctx context.Context, group *types.ConsumerGroupState, consumerID string, filter *Filter) (*types.Message, error) {
func (m *Manager) claimFromCursor(ctx context.Context, group *types.ConsumerGroup, consumerID string, filter *Filter) (*types.Message, error) {
cursor := group.GetCursor()
// Get log tail
@@ -340,7 +340,7 @@ func (m *Manager) claimFromCursor(ctx context.Context, group *types.ConsumerGrou
}
// stealWork tries to steal a message from another consumer's PEL.
func (m *Manager) stealWork(ctx context.Context, group *types.ConsumerGroupState, consumerID string, filter *Filter) (*types.Message, error) {
func (m *Manager) stealWork(ctx context.Context, group *types.ConsumerGroup, consumerID string, filter *Filter) (*types.Message, error) {
// Get stealable entries
stealable := group.StealableEntries(m.config.VisibilityTimeout, consumerID)
if len(stealable) == 0 {
@@ -481,7 +481,7 @@ func (m *Manager) Reject(ctx context.Context, queueName, groupID, consumerID str
}
// advanceCommitted updates the committed offset to the minimum pending offset.
func (m *Manager) advanceCommitted(ctx context.Context, group *types.ConsumerGroupState) error {
func (m *Manager) advanceCommitted(ctx context.Context, group *types.ConsumerGroup) error {
cursor := group.GetCursor()
// Find minimum pending offset
+2 -2
View File
@@ -1248,7 +1248,7 @@ func (m *Manager) deliverToRemoteConsumers(ctx context.Context, config *types.Qu
return delivered
}
func (m *Manager) deliverToGroup(ctx context.Context, config *types.QueueConfig, group *types.ConsumerGroupState, primaryCommitted func(pattern string) (uint64, bool)) bool {
func (m *Manager) deliverToGroup(ctx context.Context, config *types.QueueConfig, group *types.ConsumerGroup, primaryCommitted func(pattern string) (uint64, bool)) bool {
if group.ConsumerCount() == 0 {
return false
}
@@ -1730,7 +1730,7 @@ func (m *Manager) createDeliveryMessage(msg *types.Message, groupID string, queu
return deliveryMsg
}
func (m *Manager) decorateStreamDelivery(delivery *brokerstorage.Message, msg *types.Message, _ *types.ConsumerGroupState, workCommitted uint64, hasWorkCommitted bool, primaryGroup string) {
func (m *Manager) decorateStreamDelivery(delivery *brokerstorage.Message, msg *types.Message, _ *types.ConsumerGroup, workCommitted uint64, hasWorkCommitted bool, primaryGroup string) {
if delivery == nil || msg == nil {
return
}
+8 -8
View File
@@ -26,21 +26,21 @@ import (
// mockGroupStore implements storage.ConsumerGroupStore for testing.
type mockGroupStore struct {
mu sync.RWMutex
groups map[string]map[string]*types.ConsumerGroupState // queueName -> groupID -> state
groups map[string]map[string]*types.ConsumerGroup // queueName -> groupID -> state
}
func newMockGroupStore() *mockGroupStore {
return &mockGroupStore{
groups: make(map[string]map[string]*types.ConsumerGroupState),
groups: make(map[string]map[string]*types.ConsumerGroup),
}
}
func (s *mockGroupStore) CreateConsumerGroup(ctx context.Context, group *types.ConsumerGroupState) error {
func (s *mockGroupStore) CreateConsumerGroup(ctx context.Context, group *types.ConsumerGroup) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.groups[group.QueueName] == nil {
s.groups[group.QueueName] = make(map[string]*types.ConsumerGroupState)
s.groups[group.QueueName] = make(map[string]*types.ConsumerGroup)
}
if _, exists := s.groups[group.QueueName][group.ID]; exists {
@@ -51,7 +51,7 @@ func (s *mockGroupStore) CreateConsumerGroup(ctx context.Context, group *types.C
return nil
}
func (s *mockGroupStore) GetConsumerGroup(ctx context.Context, queueName, groupID string) (*types.ConsumerGroupState, error) {
func (s *mockGroupStore) GetConsumerGroup(ctx context.Context, queueName, groupID string) (*types.ConsumerGroup, error) {
s.mu.RLock()
defer s.mu.RUnlock()
@@ -67,7 +67,7 @@ func (s *mockGroupStore) GetConsumerGroup(ctx context.Context, queueName, groupI
return group, nil
}
func (s *mockGroupStore) UpdateConsumerGroup(ctx context.Context, group *types.ConsumerGroupState) error {
func (s *mockGroupStore) UpdateConsumerGroup(ctx context.Context, group *types.ConsumerGroup) error {
s.mu.Lock()
defer s.mu.Unlock()
@@ -89,11 +89,11 @@ func (s *mockGroupStore) DeleteConsumerGroup(ctx context.Context, queueName, gro
return nil
}
func (s *mockGroupStore) ListConsumerGroups(ctx context.Context, queueName string) ([]*types.ConsumerGroupState, error) {
func (s *mockGroupStore) ListConsumerGroups(ctx context.Context, queueName string) ([]*types.ConsumerGroup, error) {
s.mu.RLock()
defer s.mu.RUnlock()
var groups []*types.ConsumerGroupState
var groups []*types.ConsumerGroup
if s.groups[queueName] != nil {
for _, group := range s.groups[queueName] {
groups = append(groups, group)
+4 -4
View File
@@ -70,8 +70,8 @@ type Operation struct {
ConsumerInfo *types.ConsumerInfo `json:"consumer_info,omitempty"`
// For OpCreateGroup
GroupState *types.ConsumerGroupState `json:"group_state,omitempty"`
Pattern string `json:"pattern,omitempty"`
GroupState *types.ConsumerGroup `json:"group_state,omitempty"`
Pattern string `json:"pattern,omitempty"`
}
// ApplyResult holds the result of an FSM apply operation.
@@ -441,8 +441,8 @@ func (f *LogFSM) Restore(rc io.ReadCloser) error {
// QueueSnapshotData holds snapshot data for a single queue.
type QueueSnapshotData struct {
QueueName string `json:"queue_name"`
Groups []*types.ConsumerGroupState `json:"groups"`
QueueName string `json:"queue_name"`
Groups []*types.ConsumerGroup `json:"groups"`
}
// GlobalSnapshotData represents the serialized snapshot data for all queues.
+1 -1
View File
@@ -435,7 +435,7 @@ func (m *Manager) ApplyTruncate(ctx context.Context, queueName string, minOffset
}
// ApplyCreateGroup submits a create consumer group operation to Raft.
func (m *Manager) ApplyCreateGroup(ctx context.Context, queueName string, group *types.ConsumerGroupState) error {
func (m *Manager) ApplyCreateGroup(ctx context.Context, queueName string, group *types.ConsumerGroup) error {
if !m.IsEnabled() {
return nil
}
+4 -4
View File
@@ -73,19 +73,19 @@ type QueueStore interface {
// ConsumerGroupStore manages cursor-based consumer groups with PEL tracking.
type ConsumerGroupStore interface {
// CreateConsumerGroup creates a new consumer group for a queue.
CreateConsumerGroup(ctx context.Context, group *types.ConsumerGroupState) error
CreateConsumerGroup(ctx context.Context, group *types.ConsumerGroup) error
// GetConsumerGroup retrieves a consumer group's state.
GetConsumerGroup(ctx context.Context, queueName, groupID string) (*types.ConsumerGroupState, error)
GetConsumerGroup(ctx context.Context, queueName, groupID string) (*types.ConsumerGroup, error)
// UpdateConsumerGroup updates a consumer group's state (cursor, PEL).
UpdateConsumerGroup(ctx context.Context, group *types.ConsumerGroupState) error
UpdateConsumerGroup(ctx context.Context, group *types.ConsumerGroup) error
// DeleteConsumerGroup removes a consumer group.
DeleteConsumerGroup(ctx context.Context, queueName, groupID string) error
// ListConsumerGroups lists all consumer groups for a queue.
ListConsumerGroups(ctx context.Context, queueName string) ([]*types.ConsumerGroupState, error)
ListConsumerGroups(ctx context.Context, queueName string) ([]*types.ConsumerGroup, error)
// AddPendingEntry adds an entry to a consumer's PEL.
AddPendingEntry(ctx context.Context, queueName, groupID string, entry *types.PendingEntry) error
+7 -7
View File
@@ -405,7 +405,7 @@ func (s *Store) getQueueLog(queueName string) (*log, error) {
// --- ConsumerGroupStore Implementation ---
// CreateConsumerGroup creates a new consumer group for a queue.
func (s *Store) CreateConsumerGroup(ctx context.Context, group *types.ConsumerGroupState) error {
func (s *Store) CreateConsumerGroup(ctx context.Context, group *types.ConsumerGroup) error {
groupsVal, exists := s.groups.Load(group.QueueName)
if !exists {
return storage.ErrQueueNotFound
@@ -422,7 +422,7 @@ func (s *Store) CreateConsumerGroup(ctx context.Context, group *types.ConsumerGr
}
// GetConsumerGroup retrieves a consumer group's state.
func (s *Store) GetConsumerGroup(ctx context.Context, queueName, groupID string) (*types.ConsumerGroupState, error) {
func (s *Store) GetConsumerGroup(ctx context.Context, queueName, groupID string) (*types.ConsumerGroup, error) {
groupsVal, exists := s.groups.Load(queueName)
if !exists {
return nil, storage.ErrQueueNotFound
@@ -435,11 +435,11 @@ func (s *Store) GetConsumerGroup(ctx context.Context, queueName, groupID string)
return nil, storage.ErrConsumerNotFound
}
return val.(*types.ConsumerGroupState), nil
return val.(*types.ConsumerGroup), nil
}
// UpdateConsumerGroup updates a consumer group's state.
func (s *Store) UpdateConsumerGroup(ctx context.Context, group *types.ConsumerGroupState) error {
func (s *Store) UpdateConsumerGroup(ctx context.Context, group *types.ConsumerGroup) error {
groupsVal, exists := s.groups.Load(group.QueueName)
if !exists {
return storage.ErrQueueNotFound
@@ -463,17 +463,17 @@ func (s *Store) DeleteConsumerGroup(ctx context.Context, queueName, groupID stri
}
// ListConsumerGroups lists all consumer groups for a queue.
func (s *Store) ListConsumerGroups(ctx context.Context, queueName string) ([]*types.ConsumerGroupState, error) {
func (s *Store) ListConsumerGroups(ctx context.Context, queueName string) ([]*types.ConsumerGroup, error) {
groupsVal, exists := s.groups.Load(queueName)
if !exists {
return nil, storage.ErrQueueNotFound
}
groups := groupsVal.(*sync.Map)
var result []*types.ConsumerGroupState
var result []*types.ConsumerGroup
groups.Range(func(key, value interface{}) bool {
result = append(result, value.(*types.ConsumerGroupState))
result = append(result, value.(*types.ConsumerGroup))
return true
})
+20 -20
View File
@@ -33,10 +33,10 @@ type ConsumerInfo struct {
LastHeartbeat time.Time // Last activity timestamp
}
// ConsumerGroupState represents the complete state of a consumer group.
// ConsumerGroup represents the complete state of a consumer group.
// This includes cursor, PEL, and consumer membership.
// All map access is protected by an internal mutex for thread safety.
type ConsumerGroupState struct {
type ConsumerGroup struct {
mu sync.RWMutex `json:"-"`
// Identity
@@ -73,9 +73,9 @@ const (
)
// NewConsumerGroupState creates a new consumer group state.
func NewConsumerGroupState(queueName, groupID, pattern string) *ConsumerGroupState {
func NewConsumerGroupState(queueName, groupID, pattern string) *ConsumerGroup {
now := time.Now()
return &ConsumerGroupState{
return &ConsumerGroup{
ID: groupID,
QueueName: queueName,
Pattern: pattern,
@@ -93,7 +93,7 @@ func NewConsumerGroupState(queueName, groupID, pattern string) *ConsumerGroupSta
}
// GetCursor returns the queue cursor, creating if needed.
func (g *ConsumerGroupState) GetCursor() *QueueCursor {
func (g *ConsumerGroup) GetCursor() *QueueCursor {
g.mu.Lock()
defer g.mu.Unlock()
@@ -107,7 +107,7 @@ func (g *ConsumerGroupState) GetCursor() *QueueCursor {
}
// ReplacePEL atomically replaces the entire PEL map.
func (g *ConsumerGroupState) ReplacePEL(pel map[string][]*PendingEntry) {
func (g *ConsumerGroup) ReplacePEL(pel map[string][]*PendingEntry) {
g.mu.Lock()
defer g.mu.Unlock()
@@ -116,7 +116,7 @@ func (g *ConsumerGroupState) ReplacePEL(pel map[string][]*PendingEntry) {
}
// AddPending adds a pending entry for a consumer.
func (g *ConsumerGroupState) AddPending(consumerID string, entry *PendingEntry) {
func (g *ConsumerGroup) AddPending(consumerID string, entry *PendingEntry) {
g.mu.Lock()
defer g.mu.Unlock()
@@ -125,7 +125,7 @@ func (g *ConsumerGroupState) AddPending(consumerID string, entry *PendingEntry)
}
// RemovePending removes a pending entry for a consumer by offset.
func (g *ConsumerGroupState) RemovePending(consumerID string, offset uint64) bool {
func (g *ConsumerGroup) RemovePending(consumerID string, offset uint64) bool {
g.mu.Lock()
defer g.mu.Unlock()
@@ -145,7 +145,7 @@ func (g *ConsumerGroupState) RemovePending(consumerID string, offset uint64) boo
}
// DeleteConsumerPEL removes all pending entries for a consumer.
func (g *ConsumerGroupState) DeleteConsumerPEL(consumerID string) {
func (g *ConsumerGroup) DeleteConsumerPEL(consumerID string) {
g.mu.Lock()
defer g.mu.Unlock()
@@ -154,7 +154,7 @@ func (g *ConsumerGroupState) DeleteConsumerPEL(consumerID string) {
}
// FindPending finds a pending entry by offset across all consumers.
func (g *ConsumerGroupState) FindPending(offset uint64) (*PendingEntry, string) {
func (g *ConsumerGroup) FindPending(offset uint64) (*PendingEntry, string) {
g.mu.RLock()
defer g.mu.RUnlock()
@@ -169,7 +169,7 @@ func (g *ConsumerGroupState) FindPending(offset uint64) (*PendingEntry, string)
}
// TransferPending moves a pending entry from one consumer to another.
func (g *ConsumerGroupState) TransferPending(offset uint64, fromConsumer, toConsumer string) bool {
func (g *ConsumerGroup) TransferPending(offset uint64, fromConsumer, toConsumer string) bool {
g.mu.Lock()
defer g.mu.Unlock()
@@ -194,7 +194,7 @@ func (g *ConsumerGroupState) TransferPending(offset uint64, fromConsumer, toCons
// MinPendingOffset returns the minimum offset across all PEL entries.
// This is used to calculate the committed offset.
func (g *ConsumerGroupState) MinPendingOffset() (uint64, bool) {
func (g *ConsumerGroup) MinPendingOffset() (uint64, bool) {
g.mu.RLock()
defer g.mu.RUnlock()
@@ -214,7 +214,7 @@ func (g *ConsumerGroupState) MinPendingOffset() (uint64, bool) {
}
// PendingCount returns the total number of pending entries.
func (g *ConsumerGroupState) PendingCount() int {
func (g *ConsumerGroup) PendingCount() int {
g.mu.RLock()
defer g.mu.RUnlock()
@@ -226,7 +226,7 @@ func (g *ConsumerGroupState) PendingCount() int {
}
// StealableEntries returns entries that are older than the visibility timeout.
func (g *ConsumerGroupState) StealableEntries(visibilityTimeout time.Duration, excludeConsumer string) []*PendingEntry {
func (g *ConsumerGroup) StealableEntries(visibilityTimeout time.Duration, excludeConsumer string) []*PendingEntry {
g.mu.RLock()
defer g.mu.RUnlock()
@@ -248,7 +248,7 @@ func (g *ConsumerGroupState) StealableEntries(visibilityTimeout time.Duration, e
}
// GetConsumer returns a consumer by ID, or nil if not found.
func (g *ConsumerGroupState) GetConsumer(consumerID string) *ConsumerInfo {
func (g *ConsumerGroup) GetConsumer(consumerID string) *ConsumerInfo {
g.mu.RLock()
defer g.mu.RUnlock()
@@ -256,7 +256,7 @@ func (g *ConsumerGroupState) GetConsumer(consumerID string) *ConsumerInfo {
}
// SetConsumer adds or updates a consumer.
func (g *ConsumerGroupState) SetConsumer(consumerID string, info *ConsumerInfo) {
func (g *ConsumerGroup) SetConsumer(consumerID string, info *ConsumerInfo) {
g.mu.Lock()
defer g.mu.Unlock()
@@ -265,7 +265,7 @@ func (g *ConsumerGroupState) SetConsumer(consumerID string, info *ConsumerInfo)
}
// DeleteConsumer removes a consumer by ID.
func (g *ConsumerGroupState) DeleteConsumer(consumerID string) {
func (g *ConsumerGroup) DeleteConsumer(consumerID string) {
g.mu.Lock()
defer g.mu.Unlock()
@@ -274,7 +274,7 @@ func (g *ConsumerGroupState) DeleteConsumer(consumerID string) {
}
// ConsumerCount returns the number of consumers in the group.
func (g *ConsumerGroupState) ConsumerCount() int {
func (g *ConsumerGroup) ConsumerCount() int {
g.mu.RLock()
defer g.mu.RUnlock()
@@ -282,7 +282,7 @@ func (g *ConsumerGroupState) ConsumerCount() int {
}
// ConsumerIDs returns a slice of all consumer IDs.
func (g *ConsumerGroupState) ConsumerIDs() []string {
func (g *ConsumerGroup) ConsumerIDs() []string {
g.mu.RLock()
defer g.mu.RUnlock()
@@ -295,7 +295,7 @@ func (g *ConsumerGroupState) ConsumerIDs() []string {
// ForEachConsumer iterates over all consumers with the lock held.
// Return false from fn to stop iteration.
func (g *ConsumerGroupState) ForEachConsumer(fn func(id string, info *ConsumerInfo) bool) {
func (g *ConsumerGroup) ForEachConsumer(fn func(id string, info *ConsumerInfo) bool) {
g.mu.RLock()
defer g.mu.RUnlock()
+1 -1
View File
@@ -884,7 +884,7 @@ func (h *Handler) messageToProto(msg *types.Message) *queuev1.Message {
return protoMsg
}
func (h *Handler) groupToProto(group *types.ConsumerGroupState) *queuev1.ConsumerGroup {
func (h *Handler) groupToProto(group *types.ConsumerGroup) *queuev1.ConsumerGroup {
consumers := make([]*queuev1.ConsumerInfo, 0, len(group.Consumers))
for _, c := range group.Consumers {
consumers = append(consumers, &queuev1.ConsumerInfo{