mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 06:50:18 +00:00
+24
-27
@@ -14,25 +14,25 @@ const (
|
||||
)
|
||||
|
||||
const (
|
||||
MethodConnectionStart = 10
|
||||
MethodConnectionStartOk = 11
|
||||
MethodConnectionSecure = 20
|
||||
MethodConnectionSecureOk= 21
|
||||
MethodConnectionTune = 30
|
||||
MethodConnectionTuneOk = 31
|
||||
MethodConnectionOpen = 40
|
||||
MethodConnectionOpenOk = 41
|
||||
MethodConnectionClose = 50
|
||||
MethodConnectionCloseOk = 51
|
||||
MethodConnectionStart = 10
|
||||
MethodConnectionStartOk = 11
|
||||
MethodConnectionSecure = 20
|
||||
MethodConnectionSecureOk = 21
|
||||
MethodConnectionTune = 30
|
||||
MethodConnectionTuneOk = 31
|
||||
MethodConnectionOpen = 40
|
||||
MethodConnectionOpenOk = 41
|
||||
MethodConnectionClose = 50
|
||||
MethodConnectionCloseOk = 51
|
||||
)
|
||||
|
||||
// ConnectionStart is the initial method sent by the server.
|
||||
type ConnectionStart struct {
|
||||
VersionMajor byte
|
||||
VersionMinor byte
|
||||
VersionMajor byte
|
||||
VersionMinor byte
|
||||
ServerProperties map[string]interface{} // Actually a table
|
||||
Mechanisms string // Long string
|
||||
Locales string // Long string
|
||||
Mechanisms string // Long string
|
||||
Locales string // Long string
|
||||
}
|
||||
|
||||
func (m *ConnectionStart) Read(r *bytes.Reader) (err error) {
|
||||
@@ -79,9 +79,9 @@ func (m *ConnectionStart) Write(w io.Writer) (err error) {
|
||||
// ConnectionStartOk is sent by the client in response to ConnectionStart.
|
||||
type ConnectionStartOk struct {
|
||||
ClientProperties map[string]interface{} // Actually a table
|
||||
Mechanism string // Short string
|
||||
Response string // Long string
|
||||
Locale string // Short string
|
||||
Mechanism string // Short string
|
||||
Response string // Long string
|
||||
Locale string // Short string
|
||||
}
|
||||
|
||||
func (m *ConnectionStartOk) Read(r *bytes.Reader) (err error) {
|
||||
@@ -119,8 +119,6 @@ func (m *ConnectionStartOk) Write(w io.Writer) (err error) {
|
||||
return WriteShortStr(w, m.Locale)
|
||||
}
|
||||
|
||||
|
||||
|
||||
// ConnectionSecure is sent by the server to challenge the client.
|
||||
type ConnectionSecure struct {
|
||||
Challenge string // Long string
|
||||
@@ -239,7 +237,7 @@ func (m *ConnectionTuneOk) Write(w io.Writer) (err error) {
|
||||
|
||||
// ConnectionOpen is sent by the client to open a virtual host.
|
||||
type ConnectionOpen struct {
|
||||
VirtualHost string // Short string
|
||||
VirtualHost string // Short string
|
||||
Capabilities string // Short string - deprecated
|
||||
Insist bool // Bit
|
||||
}
|
||||
@@ -366,12 +364,12 @@ const (
|
||||
)
|
||||
|
||||
const (
|
||||
MethodChannelOpen = 10
|
||||
MethodChannelOpenOk = 11
|
||||
MethodChannelFlow = 20
|
||||
MethodChannelFlowOk = 21
|
||||
MethodChannelClose = 40
|
||||
MethodChannelCloseOk = 41
|
||||
MethodChannelOpen = 10
|
||||
MethodChannelOpenOk = 11
|
||||
MethodChannelFlow = 20
|
||||
MethodChannelFlowOk = 21
|
||||
MethodChannelClose = 40
|
||||
MethodChannelCloseOk = 41
|
||||
)
|
||||
|
||||
type ChannelOpen struct {
|
||||
@@ -2081,4 +2079,3 @@ func (m *TxRollbackOk) Write(w io.Writer) (err error) {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
+3
-5
@@ -12,9 +12,9 @@ import (
|
||||
|
||||
// Constants for AMQP frame types
|
||||
const (
|
||||
FrameMethod byte = 1
|
||||
FrameHeader byte = 2
|
||||
FrameBody byte = 3
|
||||
FrameMethod byte = 1
|
||||
FrameHeader byte = 2
|
||||
FrameBody byte = 3
|
||||
FrameHeartbeat byte = 8
|
||||
)
|
||||
|
||||
@@ -397,5 +397,3 @@ func WriteArray(w io.Writer, arr []interface{}) error {
|
||||
_, err := w.Write(b.Bytes())
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -42,13 +42,13 @@ const (
|
||||
TypeSymbolLong byte = 0xb3
|
||||
|
||||
// Compound types
|
||||
TypeList0 byte = 0x45
|
||||
TypeList8 byte = 0xc0
|
||||
TypeList32 byte = 0xd0
|
||||
TypeMap8 byte = 0xc1
|
||||
TypeMap32 byte = 0xd1
|
||||
TypeArray8 byte = 0xe0
|
||||
TypeArray32 byte = 0xf0
|
||||
TypeList0 byte = 0x45
|
||||
TypeList8 byte = 0xc0
|
||||
TypeList32 byte = 0xd0
|
||||
TypeMap8 byte = 0xc1
|
||||
TypeMap32 byte = 0xd1
|
||||
TypeArray8 byte = 0xe0
|
||||
TypeArray32 byte = 0xf0
|
||||
|
||||
// Described type constructor
|
||||
TypeDescriptor byte = 0x00
|
||||
|
||||
@@ -200,8 +200,8 @@ func (cm *ConsumerManager) Stats() ConsumerManagerStats {
|
||||
defer cm.mu.RUnlock()
|
||||
|
||||
stats := ConsumerManagerStats{
|
||||
GroupCount: len(cm.groups),
|
||||
GroupStats: make(map[string]ConsumerStateStats),
|
||||
GroupCount: len(cm.groups),
|
||||
GroupStats: make(map[string]ConsumerStateStats),
|
||||
}
|
||||
|
||||
for groupID, state := range cm.groups {
|
||||
@@ -333,9 +333,9 @@ func (cm *ConsumerManager) GetDeadLetterBatches(maxDeliveries int, maxPerGroup i
|
||||
|
||||
// ShardAssignment describes which shards a node is responsible for.
|
||||
type ShardAssignment struct {
|
||||
NodeID string
|
||||
GroupID string
|
||||
ShardIDs []int
|
||||
NodeID string
|
||||
GroupID string
|
||||
ShardIDs []int
|
||||
AssignedAt time.Time
|
||||
}
|
||||
|
||||
|
||||
@@ -45,8 +45,8 @@ type ConsumerStateConfig struct {
|
||||
NumPELShards int // Number of PEL shards (default: 8)
|
||||
|
||||
// Batching
|
||||
MaxBatchSize int // Max entries per batch operation
|
||||
BatchFlushDelay time.Duration // Max delay before flushing batch
|
||||
MaxBatchSize int // Max entries per batch operation
|
||||
BatchFlushDelay time.Duration // Max delay before flushing batch
|
||||
|
||||
// Compaction
|
||||
CompactThreshold int // Operations before compaction (default: 10000)
|
||||
@@ -185,10 +185,10 @@ type StateSnapshot struct {
|
||||
|
||||
// PELSnapshot is the serialized PEL shard.
|
||||
type PELSnapshot struct {
|
||||
Version uint64 `json:"version"`
|
||||
ShardID int `json:"shard_id"`
|
||||
Entries map[uint64]*PendingEntry `json:"entries"`
|
||||
SavedAt int64 `json:"saved_at"`
|
||||
Version uint64 `json:"version"`
|
||||
ShardID int `json:"shard_id"`
|
||||
Entries map[uint64]*PendingEntry `json:"entries"`
|
||||
SavedAt int64 `json:"saved_at"`
|
||||
}
|
||||
|
||||
// load loads state from snapshots.
|
||||
|
||||
@@ -18,16 +18,16 @@ type mockQueueManager struct {
|
||||
}
|
||||
|
||||
type ackCall struct {
|
||||
queueName string
|
||||
messageID string
|
||||
groupID string
|
||||
queueName string
|
||||
messageID string
|
||||
groupID string
|
||||
}
|
||||
|
||||
type rejectCall struct {
|
||||
queueName string
|
||||
messageID string
|
||||
groupID string
|
||||
reason string
|
||||
queueName string
|
||||
messageID string
|
||||
groupID string
|
||||
reason string
|
||||
}
|
||||
|
||||
func (m *mockQueueManager) Start(ctx context.Context) error { return nil }
|
||||
@@ -64,7 +64,9 @@ func (m *mockQueueManager) DeleteQueue(ctx context.Context, queueName string) er
|
||||
func (m *mockQueueManager) GetQueue(ctx context.Context, queueName string) (*qtypes.QueueConfig, error) {
|
||||
return nil, storage.ErrNotFound
|
||||
}
|
||||
func (m *mockQueueManager) ListQueues(ctx context.Context) ([]qtypes.QueueConfig, error) { return nil, nil }
|
||||
func (m *mockQueueManager) ListQueues(ctx context.Context) ([]qtypes.QueueConfig, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func TestHandleQueueAck_UsesParsedQueueName(t *testing.T) {
|
||||
qm := &mockQueueManager{}
|
||||
@@ -126,4 +128,3 @@ func TestHandleQueueAck_InvalidQueueTopic(t *testing.T) {
|
||||
require.Error(t, b.handleQueueAck(msg))
|
||||
require.Empty(t, qm.ackCalls)
|
||||
}
|
||||
|
||||
|
||||
@@ -10,8 +10,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/absmach/fluxmq/broker/events"
|
||||
clusterv1 "github.com/absmach/fluxmq/pkg/proto/cluster/v1"
|
||||
"github.com/absmach/fluxmq/mqtt/session"
|
||||
clusterv1 "github.com/absmach/fluxmq/pkg/proto/cluster/v1"
|
||||
"github.com/absmach/fluxmq/storage"
|
||||
"github.com/absmach/fluxmq/storage/messages"
|
||||
"github.com/absmach/fluxmq/topics"
|
||||
|
||||
@@ -13,10 +13,10 @@ import (
|
||||
// Filter represents a compiled routing key filter.
|
||||
// It matches routing keys against a subscription pattern.
|
||||
type Filter struct {
|
||||
pattern string // Original pattern (e.g., "images/+/png")
|
||||
regex *regexp.Regexp // Compiled regex for matching
|
||||
isExact bool // True if pattern has no wildcards
|
||||
matchAll bool // True if pattern matches everything (# or empty)
|
||||
pattern string // Original pattern (e.g., "images/+/png")
|
||||
regex *regexp.Regexp // Compiled regex for matching
|
||||
isExact bool // True if pattern has no wildcards
|
||||
matchAll bool // True if pattern matches everything (# or empty)
|
||||
}
|
||||
|
||||
// NewFilter creates a new filter from a subscription pattern.
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
//go:build ignore
|
||||
|
||||
// TODO: enable when queue/storage/badger packages are implemented
|
||||
|
||||
package queue
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
//go:build ignore
|
||||
|
||||
// TODO: enable when queue/storage/badger packages are implemented
|
||||
|
||||
package raft
|
||||
|
||||
@@ -164,8 +164,8 @@ func TestQueueConfig_Validate(t *testing.T) {
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "valid ephemeral config",
|
||||
config: DefaultEphemeralQueueConfig("$queue/test"),
|
||||
name: "valid ephemeral config",
|
||||
config: DefaultEphemeralQueueConfig("$queue/test"),
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
|
||||
@@ -22,10 +22,10 @@ type Queue interface {
|
||||
|
||||
// queue is a queue for offline messages (QoS > 0).
|
||||
type queue struct {
|
||||
mu sync.Mutex
|
||||
messages []*storage.Message
|
||||
maxSize int
|
||||
evictOnFull bool
|
||||
mu sync.Mutex
|
||||
messages []*storage.Message
|
||||
maxSize int
|
||||
evictOnFull bool
|
||||
}
|
||||
|
||||
// NewMessageQueue creates a new message queue.
|
||||
|
||||
Reference in New Issue
Block a user