mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 07:00:25 +00:00
+129
@@ -0,0 +1,129 @@
|
||||
# Copyright (c) Abstract Machines
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
version: "2"
|
||||
run:
|
||||
timeout: 10m
|
||||
build-tags:
|
||||
- nats
|
||||
linters:
|
||||
default: none
|
||||
enable:
|
||||
- asasalint
|
||||
- asciicheck
|
||||
- bidichk
|
||||
- contextcheck
|
||||
- copyloopvar
|
||||
- decorder
|
||||
- dogsled
|
||||
- dupword
|
||||
- errcheck
|
||||
- errchkjson
|
||||
- errname
|
||||
- ginkgolinter
|
||||
- gocheckcompilerdirectives
|
||||
- goconst
|
||||
- gocritic
|
||||
- godot
|
||||
- godox
|
||||
- goheader
|
||||
- goprintffuncname
|
||||
- govet
|
||||
- importas
|
||||
- ineffassign
|
||||
- loggercheck
|
||||
- makezero
|
||||
- mirror
|
||||
- misspell
|
||||
- nakedret
|
||||
- staticcheck
|
||||
- unused
|
||||
- whitespace
|
||||
settings:
|
||||
gocritic:
|
||||
enabled-checks:
|
||||
- importShadow
|
||||
- httpNoBody
|
||||
- paramTypeCombine
|
||||
- emptyStringTest
|
||||
- builtinShadow
|
||||
- exposedSyncMutex
|
||||
disabled-checks:
|
||||
- appendAssign
|
||||
enabled-tags:
|
||||
- diagnostic
|
||||
disabled-tags:
|
||||
- performance
|
||||
- style
|
||||
- experimental
|
||||
- opinionated
|
||||
goheader:
|
||||
template: |-
|
||||
Copyright (c) Abstract Machines
|
||||
SPDX-License-Identifier: Apache-2.0
|
||||
importas:
|
||||
alias:
|
||||
- pkg: github.com/absmach/callhome/pkg/client
|
||||
alias: chclient
|
||||
- pkg: github.com/absmach/supermq/logger
|
||||
alias: smqlog
|
||||
- pkg: github.com/absmach/supermq/pkg/errors/service
|
||||
alias: svcerr
|
||||
- pkg: github.com/absmach/supermq/pkg/errors/repository
|
||||
alias: repoerr
|
||||
- pkg: github.com/absmach/supermq/pkg/sdk/mocks
|
||||
alias: sdkmocks
|
||||
- pkg: github.com/absmach/supermq/api/http/util
|
||||
alias: apiutil
|
||||
- pkg: github.com/absmach/supermq/api/http
|
||||
alias: api
|
||||
no-unaliased: true
|
||||
no-extra-aliases: false
|
||||
misspell:
|
||||
ignore-rules:
|
||||
- mosquitto
|
||||
staticcheck:
|
||||
checks:
|
||||
- -ST1000
|
||||
- -ST1003
|
||||
- -ST1020
|
||||
- -ST1021
|
||||
- -ST1022
|
||||
exclusions:
|
||||
generated: lax
|
||||
presets:
|
||||
- comments
|
||||
- common-false-positives
|
||||
- legacy
|
||||
- std-error-handling
|
||||
rules:
|
||||
- linters:
|
||||
- godot
|
||||
path: cli/commands_test.go
|
||||
- path: (.+)\.go$
|
||||
text: |-
|
||||
string `Usage:
|
||||
` has (\d+) occurrences, make it a constant
|
||||
- path: (.+)\.go$
|
||||
text: |-
|
||||
string `For example:
|
||||
` has (\d+) occurrences, make it a constant
|
||||
paths:
|
||||
- third_party$
|
||||
- builtin$
|
||||
- examples$
|
||||
issues:
|
||||
max-issues-per-linter: 100
|
||||
max-same-issues: 100
|
||||
formatters:
|
||||
enable:
|
||||
- gci
|
||||
- gofmt
|
||||
- gofumpt
|
||||
- goimports
|
||||
exclusions:
|
||||
generated: lax
|
||||
paths:
|
||||
- third_party$
|
||||
- builtin$
|
||||
- examples$
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package broker
|
||||
|
||||
// AuthEngine handles authentication and authorization checks.
|
||||
|
||||
+3
-1
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package broker
|
||||
|
||||
import (
|
||||
@@ -167,7 +170,6 @@ func (b *Broker) CreateSession(clientID string, opts SessionOptions) (*session.S
|
||||
nodeID := b.cluster.NodeID()
|
||||
if err := b.cluster.AcquireSession(ctx, clientID, nodeID); err != nil {
|
||||
b.logError("cluster_acquire_session", err, slog.String("client_id", clientID))
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package broker
|
||||
|
||||
import (
|
||||
@@ -10,13 +13,13 @@ import (
|
||||
v5 "github.com/absmach/mqtt/core/packets/v5"
|
||||
)
|
||||
|
||||
// mockAddr implements net.Addr for testing
|
||||
// mockAddr implements net.Addr for testing.
|
||||
type mockAddr struct{}
|
||||
|
||||
func (m *mockAddr) Network() string { return "tcp" }
|
||||
func (m *mockAddr) String() string { return "127.0.0.1:1883" }
|
||||
|
||||
// mockConnection implements core.Connection for testing
|
||||
// mockConnection implements core.Connection for testing.
|
||||
type mockConnection struct {
|
||||
net.Conn
|
||||
packets []packets.ControlPacket
|
||||
|
||||
+4
-1
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package broker
|
||||
|
||||
import (
|
||||
@@ -7,7 +10,7 @@ import (
|
||||
)
|
||||
|
||||
// GenerateClientID generates a random client ID.
|
||||
// Format: auto-<16-char-hex>
|
||||
// Format: auto-<16-char-hex>.
|
||||
func GenerateClientID() (string, error) {
|
||||
b := make([]byte, 8)
|
||||
if _, err := rand.Read(b); err != nil {
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package broker
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package broker
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package broker
|
||||
|
||||
import (
|
||||
|
||||
+9
-6
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package broker
|
||||
|
||||
import (
|
||||
@@ -45,7 +48,7 @@ func NewStats() *Stats {
|
||||
}
|
||||
}
|
||||
|
||||
// Connection tracking
|
||||
// Connection tracking.
|
||||
func (s *Stats) IncrementConnections() {
|
||||
s.totalConnections.Add(1)
|
||||
s.currentConnections.Add(1)
|
||||
@@ -68,7 +71,7 @@ func (s *Stats) GetDisconnections() uint64 {
|
||||
return s.disconnections.Load()
|
||||
}
|
||||
|
||||
// Message tracking
|
||||
// Message tracking.
|
||||
func (s *Stats) IncrementMessagesReceived() {
|
||||
s.messagesReceived.Add(1)
|
||||
}
|
||||
@@ -103,7 +106,7 @@ func (s *Stats) GetPublishSent() uint64 {
|
||||
return s.publishSent.Load()
|
||||
}
|
||||
|
||||
// Byte tracking
|
||||
// Byte tracking.
|
||||
func (s *Stats) AddBytesReceived(n uint64) {
|
||||
s.bytesReceived.Add(n)
|
||||
}
|
||||
@@ -120,7 +123,7 @@ func (s *Stats) GetBytesSent() uint64 {
|
||||
return s.bytesSent.Load()
|
||||
}
|
||||
|
||||
// Subscription tracking
|
||||
// Subscription tracking.
|
||||
func (s *Stats) IncrementSubscriptions() {
|
||||
s.subscriptions.Add(1)
|
||||
}
|
||||
@@ -140,7 +143,7 @@ func (s *Stats) GetRetainedMessages() uint64 {
|
||||
return s.retainedMessages.Load()
|
||||
}
|
||||
|
||||
// Error tracking
|
||||
// Error tracking.
|
||||
func (s *Stats) IncrementProtocolErrors() {
|
||||
s.protocolErrors.Add(1)
|
||||
}
|
||||
@@ -173,7 +176,7 @@ func (s *Stats) GetPacketErrors() uint64 {
|
||||
return s.packetErrors.Load()
|
||||
}
|
||||
|
||||
// Uptime
|
||||
// Uptime.
|
||||
func (s *Stats) GetUptime() time.Duration {
|
||||
return time.Since(s.startTime)
|
||||
}
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package broker
|
||||
|
||||
// Message represents a protocol-agnostic MQTT message.
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package broker
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package broker
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
|
||||
+3
-1
@@ -1,4 +1,6 @@
|
||||
// Package cluster provides distributed coordination for MQTT broker clustering.
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package cluster
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package cluster
|
||||
|
||||
import (
|
||||
|
||||
+5
-4
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package cluster
|
||||
|
||||
import (
|
||||
@@ -7,10 +10,8 @@ import (
|
||||
"github.com/absmach/mqtt/storage"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrClusterNotEnabled is returned when cluster operations are called on a non-clustered broker.
|
||||
ErrClusterNotEnabled = errors.New("clustering is not enabled")
|
||||
)
|
||||
// ErrClusterNotEnabled is returned when cluster operations are called on a non-clustered broker.
|
||||
var ErrClusterNotEnabled = errors.New("clustering is not enabled")
|
||||
|
||||
// NoopCluster is a no-op cluster implementation for single-node mode.
|
||||
// It satisfies the Cluster interface but returns appropriate errors
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package cluster
|
||||
|
||||
import (
|
||||
@@ -148,7 +151,6 @@ func (t *Transport) RoutePublish(ctx context.Context, req *PublishRequest) (*Pub
|
||||
req.Dup,
|
||||
req.Properties,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return &PublishResponse{
|
||||
Success: false,
|
||||
@@ -178,7 +180,6 @@ func (t *Transport) TakeoverSession(ctx context.Context, req *TakeoverRequest) (
|
||||
req.ToNode,
|
||||
req.SessionState,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return &TakeoverResponse{
|
||||
Success: false,
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package cluster
|
||||
|
||||
import "strings"
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package config
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package config
|
||||
|
||||
import (
|
||||
|
||||
+18
-15
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package codec_test
|
||||
|
||||
import (
|
||||
@@ -11,16 +14,16 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
testString = "test string"
|
||||
testBytes = []byte("test bytes")
|
||||
maxUint16 = uint16(65535)
|
||||
maxUint32 = uint32(4294967295)
|
||||
maxVBI = 268435455 // Maximum VBI value per MQTT spec
|
||||
emptyString = ""
|
||||
emptyBytes = []byte{}
|
||||
longString = string(make([]byte, 65535))
|
||||
utf8String = "Hello 世界 🌍"
|
||||
utf8Bytes = []byte(utf8String)
|
||||
testString = "test string"
|
||||
testBytes = []byte("test bytes")
|
||||
maxUint16 = uint16(65535)
|
||||
maxUint32 = uint32(4294967295)
|
||||
maxVBI = 268435455 // Maximum VBI value per MQTT spec
|
||||
emptyString = ""
|
||||
emptyBytes = []byte{}
|
||||
longString = string(make([]byte, 65535))
|
||||
utf8String = "Hello 世界 🌍"
|
||||
utf8Bytes = []byte(utf8String)
|
||||
)
|
||||
|
||||
func TestEncodeDecodeBytes(t *testing.T) {
|
||||
@@ -228,10 +231,10 @@ func TestEncodeDecodeUint32(t *testing.T) {
|
||||
|
||||
func TestEncodeDecodeVBI(t *testing.T) {
|
||||
cases := []struct {
|
||||
desc string
|
||||
input int
|
||||
expectedLen int
|
||||
err error
|
||||
desc string
|
||||
input int
|
||||
expectedLen int
|
||||
err error
|
||||
}{
|
||||
{
|
||||
desc: "encode and decode zero",
|
||||
@@ -530,7 +533,7 @@ func TestEncodeDecodeBinaryData(t *testing.T) {
|
||||
input []byte
|
||||
}{
|
||||
{
|
||||
desc: "encode and decode all byte values",
|
||||
desc: "encode and decode all byte values",
|
||||
input: func() []byte {
|
||||
data := make([]byte, 256)
|
||||
for i := 0; i < 256; i++ {
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package codec
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package codec
|
||||
|
||||
// Encode methods rewrite some of bigEndian methods
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package codec
|
||||
|
||||
import (
|
||||
@@ -5,11 +8,11 @@ import (
|
||||
"errors"
|
||||
)
|
||||
|
||||
// Errors for zero-copy decoding
|
||||
// Errors for zero-copy decoding.
|
||||
var (
|
||||
ErrBufferTooShort = errors.New("buffer too short")
|
||||
ErrMalformedVBI = errors.New("malformed variable byte integer")
|
||||
ErrStringTooLong = errors.New("string exceeds buffer")
|
||||
ErrBufferTooShort = errors.New("buffer too short")
|
||||
ErrMalformedVBI = errors.New("malformed variable byte integer")
|
||||
ErrStringTooLong = errors.New("string exceeds buffer")
|
||||
)
|
||||
|
||||
// ZeroCopyReader provides zero-copy reading from a byte slice.
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package core
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package core_test
|
||||
|
||||
import (
|
||||
@@ -8,7 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/absmach/mqtt/core"
|
||||
"github.com/absmach/mqtt/core/packets/v3"
|
||||
v3 "github.com/absmach/mqtt/core/packets/v3"
|
||||
v5 "github.com/absmach/mqtt/core/packets/v5"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
// package packets provides shared constants and interfaces for MQTT packet handling.
|
||||
// Version-specific implementations are in the v3 and v5 packages.
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package packets
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package packets
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package packets_test
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v3
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v3
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v3
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,2 +1,4 @@
|
||||
// package v3 contains MQTT v3.1.1 packet definitions.
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v3
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v3_test
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v3
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v3
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package pool
|
||||
|
||||
import "sync"
|
||||
|
||||
// Buffer size classes for different packet sizes
|
||||
// Buffer size classes for different packet sizes.
|
||||
const (
|
||||
SmallBufferSize = 256 // Small packets (PINGREQ, PUBACK, etc.)
|
||||
MediumBufferSize = 4096 // Medium packets (typical PUBLISH)
|
||||
|
||||
@@ -1,13 +1,6 @@
|
||||
// Package pool provides sync.Pool-based allocators for MQTT packets.
|
||||
// Use this package in high-throughput scenarios (brokers) to reduce GC pressure.
|
||||
//
|
||||
// Usage:
|
||||
//
|
||||
// pkt := pool.AcquirePublish()
|
||||
// defer pool.ReleasePublish(pkt)
|
||||
// // use pkt...
|
||||
//
|
||||
// Important: Never use a packet after releasing it back to the pool.
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package pool
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package pool
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v3_test
|
||||
|
||||
import (
|
||||
@@ -12,14 +15,14 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
validClientID = "test-client-123"
|
||||
longClientID = strings.Repeat("a", 65535)
|
||||
maxPacketID = uint16(65535)
|
||||
testTopic = "test/topic"
|
||||
validPayload = []byte("test payload")
|
||||
maxPayload = make([]byte, 65535)
|
||||
emptyPayload = []byte{}
|
||||
wildcardTopics = []string{"test/+/topic", "test/#", "+/+/+", "#"}
|
||||
validClientID = "test-client-123"
|
||||
longClientID = strings.Repeat("a", 65535)
|
||||
maxPacketID = uint16(65535)
|
||||
testTopic = "test/topic"
|
||||
validPayload = []byte("test payload")
|
||||
maxPayload = make([]byte, 65535)
|
||||
emptyPayload = []byte{}
|
||||
wildcardTopics = []string{"test/+/topic", "test/#", "+/+/+", "#"}
|
||||
)
|
||||
|
||||
func TestConnectProtocolCompliance(t *testing.T) {
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v3
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v3
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v3
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v3
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v3
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v3
|
||||
|
||||
import "github.com/absmach/mqtt/core/packets"
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v3
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v3
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v3
|
||||
|
||||
import (
|
||||
@@ -8,7 +11,7 @@ import (
|
||||
"github.com/absmach/mqtt/core/packets"
|
||||
)
|
||||
|
||||
// Re-export common types
|
||||
// Re-export common types.
|
||||
type (
|
||||
FixedHeader = packets.FixedHeader
|
||||
Details = packets.Details
|
||||
@@ -17,7 +20,7 @@ type (
|
||||
Resetter = packets.Resetter
|
||||
)
|
||||
|
||||
// Re-export packet constants
|
||||
// Re-export packet constants.
|
||||
const (
|
||||
ConnectType = packets.ConnectType
|
||||
ConnAckType = packets.ConnAckType
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v3
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v3
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v3
|
||||
|
||||
import "github.com/absmach/mqtt/core/codec"
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v3_test
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5
|
||||
|
||||
import (
|
||||
@@ -86,7 +89,7 @@ func (p *AuthProperties) Unpack(r io.Reader) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.User = append(p.User, User{k, v})
|
||||
p.User = append(p.User, User{Key: k, Value: v})
|
||||
default:
|
||||
return fmt.Errorf("invalid property type %d for connect packet", prop)
|
||||
}
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5_test
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5
|
||||
|
||||
import (
|
||||
@@ -10,7 +13,7 @@ import (
|
||||
)
|
||||
|
||||
// ConnackReturnCodes is a map of the error codes constants for Connect()
|
||||
// to a string representation of the error
|
||||
// to a string representation of the error.
|
||||
var ConnackReturnCodes = map[uint8]string{
|
||||
0: "Connection Accepted",
|
||||
1: "Connection Refused: Bad Protocol Version",
|
||||
@@ -151,7 +154,7 @@ func (p *ConnAckProperties) Unpack(r io.Reader) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.User = append(p.User, User{k, v})
|
||||
p.User = append(p.User, User{Key: k, Value: v})
|
||||
case WildcardSubAvailableProp:
|
||||
wsa, err := codec.DecodeByte(r)
|
||||
if err != nil {
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5
|
||||
|
||||
import (
|
||||
@@ -9,7 +12,7 @@ import (
|
||||
"github.com/absmach/mqtt/core/packets"
|
||||
)
|
||||
|
||||
// Error codes returned by Connect()
|
||||
// Error codes returned by Connect().
|
||||
const (
|
||||
Accepted = 0x00
|
||||
ErrRefusedBadProtocolVersion = 0x01
|
||||
@@ -143,7 +146,7 @@ func (p *ConnectProperties) Unpack(r io.Reader) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.User = append(p.User, User{k, v})
|
||||
p.User = append(p.User, User{Key: k, Value: v})
|
||||
case AuthMethodProp:
|
||||
p.AuthMethod, err = codec.DecodeString(r)
|
||||
if err != nil {
|
||||
@@ -282,7 +285,7 @@ func (p *WillProperties) Unpack(r io.Reader) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.User = append(p.User, User{k, v})
|
||||
p.User = append(p.User, User{Key: k, Value: v})
|
||||
default:
|
||||
return fmt.Errorf("invalid will property type %d", prop)
|
||||
}
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5
|
||||
|
||||
import (
|
||||
@@ -9,7 +12,7 @@ import (
|
||||
"github.com/absmach/mqtt/core/packets"
|
||||
)
|
||||
|
||||
// Disconnect is an internal representation of the fields of the DISCONNECT MQTT packet
|
||||
// Disconnect is an internal representation of the fields of the DISCONNECT MQTT packet.
|
||||
type Disconnect struct {
|
||||
packets.FixedHeader
|
||||
// Variable Header
|
||||
@@ -61,7 +64,7 @@ func (p *DisconnectProperties) Unpack(r io.Reader) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.User = append(p.User, User{k, v})
|
||||
p.User = append(p.User, User{Key: k, Value: v})
|
||||
case ServerReferenceProp:
|
||||
p.ServerReference, err = codec.DecodeString(r)
|
||||
if err != nil {
|
||||
|
||||
@@ -1,2 +1,4 @@
|
||||
// package v5 contains MQTT packets definitions.
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5_test
|
||||
|
||||
import (
|
||||
@@ -8,7 +11,7 @@ import (
|
||||
. "github.com/absmach/mqtt/core/packets/v5"
|
||||
)
|
||||
|
||||
// Helper to create pointer to value
|
||||
// Helper to create pointer to value.
|
||||
func ptr[T any](v T) *T {
|
||||
return &v
|
||||
}
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5
|
||||
|
||||
import (
|
||||
@@ -6,7 +9,7 @@ import (
|
||||
"github.com/absmach/mqtt/core/packets"
|
||||
)
|
||||
|
||||
// PingReq is an internal representation of the fields of the PINGREQ MQTT packet
|
||||
// PingReq is an internal representation of the fields of the PINGREQ MQTT packet.
|
||||
type PingReq struct {
|
||||
packets.FixedHeader
|
||||
}
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package pool
|
||||
|
||||
import "sync"
|
||||
|
||||
// Buffer size classes for different packet sizes
|
||||
// Buffer size classes for different packet sizes.
|
||||
const (
|
||||
SmallBufferSize = 256 // Small packets (PINGREQ, PUBACK, etc.)
|
||||
MediumBufferSize = 4096 // Medium packets (typical PUBLISH)
|
||||
|
||||
@@ -1,13 +1,6 @@
|
||||
// Package pool provides sync.Pool-based allocators for MQTT packets.
|
||||
// Use this package in high-throughput scenarios (brokers) to reduce GC pressure.
|
||||
//
|
||||
// Usage:
|
||||
//
|
||||
// pkt := pool.AcquirePublish()
|
||||
// defer pool.ReleasePublish(pkt)
|
||||
// // use pkt...
|
||||
//
|
||||
// Important: Never use a packet after releasing it back to the pool.
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package pool
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package pool
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5
|
||||
|
||||
import (
|
||||
@@ -8,7 +11,7 @@ import (
|
||||
)
|
||||
|
||||
// PropPayloadFormat, etc are the list of property codes for the
|
||||
// MQTT packet properties
|
||||
// MQTT packet properties.
|
||||
const (
|
||||
PayloadFormatProp byte = 1
|
||||
MessageExpiryProp byte = 2
|
||||
@@ -78,7 +81,7 @@ func (p *BasicProperties) Unpack(r io.Reader) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.User = append(p.User, User{k, v})
|
||||
p.User = append(p.User, User{Key: k, Value: v})
|
||||
default:
|
||||
return fmt.Errorf("invalid property type %d", prop)
|
||||
}
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5_test
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5
|
||||
|
||||
import (
|
||||
@@ -54,7 +57,7 @@ func (pkt *PubComp) Pack(w io.Writer) error {
|
||||
}
|
||||
|
||||
// Unpack decodes the details of a ControlPacket after the fixed
|
||||
// header has been read
|
||||
// header has been read.
|
||||
func (pkt *PubComp) Unpack(r io.Reader) error {
|
||||
var err error
|
||||
pkt.ID, err = codec.DecodeUint16(r)
|
||||
@@ -89,7 +92,7 @@ func (pkt *PubComp) Unpack(r io.Reader) error {
|
||||
}
|
||||
|
||||
// Details returns a Details struct containing the Qos and
|
||||
// ID of this ControlPacket
|
||||
// ID of this ControlPacket.
|
||||
func (pkt *PubComp) Details() Details {
|
||||
return Details{Type: PubCompType, ID: pkt.ID, QoS: pkt.QoS}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5
|
||||
|
||||
import (
|
||||
@@ -100,7 +103,7 @@ func (p *PublishProperties) Unpack(r io.Reader) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.User = append(p.User, User{k, v})
|
||||
p.User = append(p.User, User{Key: k, Value: v})
|
||||
case SubscriptionIdentifierProp:
|
||||
si, err := codec.DecodeVBI(r)
|
||||
if err != nil {
|
||||
@@ -226,7 +229,7 @@ func (pkt *Publish) Unpack(r io.Reader) error {
|
||||
// Copy creates a new PublishPacket with the same topic and payload
|
||||
// but an empty fixed header, useful for when you want to deliver
|
||||
// a message with different properties such as Qos but the same
|
||||
// content
|
||||
// content.
|
||||
func (pkt *Publish) Copy() *Publish {
|
||||
newP := NewControlPacket(PublishType).(*Publish)
|
||||
newP.TopicName = pkt.TopicName
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5
|
||||
|
||||
// Reset clears all fields in the Connect packet for reuse.
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5
|
||||
|
||||
import (
|
||||
@@ -9,7 +12,7 @@ import (
|
||||
"github.com/absmach/mqtt/core/packets"
|
||||
)
|
||||
|
||||
// Subscribe is an internal representation of the fields of the SUBSCRIBE MQTT packet
|
||||
// Subscribe is an internal representation of the fields of the SUBSCRIBE MQTT packet.
|
||||
type Subscribe struct {
|
||||
packets.FixedHeader
|
||||
// Variable Header
|
||||
@@ -113,7 +116,7 @@ func (p *SubscribeProperties) Unpack(r io.Reader) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.User = append(p.User, User{k, v})
|
||||
p.User = append(p.User, User{Key: k, Value: v})
|
||||
default:
|
||||
return fmt.Errorf("invalid property type %d for subscribe packet", prop)
|
||||
}
|
||||
@@ -211,7 +214,7 @@ func (pkt *Subscribe) Unpack(r io.Reader) error {
|
||||
}
|
||||
|
||||
// Details returns a Details struct containing the Qos and
|
||||
// ID of this ControlPacket
|
||||
// ID of this ControlPacket.
|
||||
func (pkt *Subscribe) Details() Details {
|
||||
return Details{Type: SubscribeType, ID: pkt.ID, QoS: 1}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5
|
||||
|
||||
import (
|
||||
@@ -12,7 +15,7 @@ import (
|
||||
// It embeds packets.FixedHeader to allow v5-specific methods.
|
||||
type FixedHeader = packets.FixedHeader
|
||||
|
||||
// Re-export other types from parent package for convenience
|
||||
// Re-export other types from parent package for convenience.
|
||||
type (
|
||||
Details = packets.Details
|
||||
ControlPacket = packets.ControlPacket
|
||||
@@ -21,7 +24,7 @@ type (
|
||||
Resetter = packets.Resetter
|
||||
)
|
||||
|
||||
// Re-export packet type constants
|
||||
// Re-export packet type constants.
|
||||
const (
|
||||
ConnectType = packets.ConnectType
|
||||
ConnAckType = packets.ConnAckType
|
||||
@@ -40,14 +43,14 @@ const (
|
||||
AuthType = packets.AuthType
|
||||
)
|
||||
|
||||
// Re-export protocol version constants
|
||||
// Re-export protocol version constants.
|
||||
const (
|
||||
V31 = packets.V31 // MQTT 3.1
|
||||
V311 = packets.V311 // MQTT 3.1.1
|
||||
V5 = packets.V5 // MQTT 5.0
|
||||
)
|
||||
|
||||
// PacketNames maps packet type constants to string names
|
||||
// PacketNames maps packet type constants to string names.
|
||||
var PacketNames = packets.PacketNames
|
||||
|
||||
// NewControlPacket creates a new MQTT 5.0 packet of the specified type.
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5
|
||||
|
||||
import (
|
||||
@@ -43,7 +46,7 @@ func (p *UnsubscribeProperties) Unpack(r io.Reader) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.User = append(p.User, User{k, v})
|
||||
p.User = append(p.User, User{Key: k, Value: v})
|
||||
default:
|
||||
return fmt.Errorf("invalid property type %d for unsubscribe packet", prop)
|
||||
}
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5
|
||||
|
||||
import "github.com/absmach/mqtt/core/codec"
|
||||
@@ -103,7 +106,7 @@ func (p *PublishProperties) unpackBytes(data []byte) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.User = append(p.User, User{k, v})
|
||||
p.User = append(p.User, User{Key: k, Value: v})
|
||||
case SubscriptionIdentifierProp:
|
||||
si, err := r.ReadVBI()
|
||||
if err != nil {
|
||||
@@ -279,7 +282,7 @@ func (p *ConnectProperties) unpackBytes(data []byte) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.User = append(p.User, User{k, v})
|
||||
p.User = append(p.User, User{Key: k, Value: v})
|
||||
case AuthMethodProp:
|
||||
p.AuthMethod, err = r.ReadString()
|
||||
if err != nil {
|
||||
@@ -351,7 +354,7 @@ func (p *WillProperties) unpackBytes(data []byte) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.User = append(p.User, User{k, v})
|
||||
p.User = append(p.User, User{Key: k, Value: v})
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@@ -435,7 +438,7 @@ func (p *SubscribeProperties) unpackBytes(data []byte) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.User = append(p.User, User{k, v})
|
||||
p.User = append(p.User, User{Key: k, Value: v})
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package v5_test
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package packets
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package packets_test
|
||||
|
||||
import (
|
||||
|
||||
@@ -246,7 +246,6 @@ func (s *Server) gracefulShutdown(listener net.Listener, acceptDone <-chan struc
|
||||
|
||||
// configureTCPConn sets TCP socket options for optimal performance and resilience.
|
||||
func (s *Server) configureTCPConn(conn *net.TCPConn) error {
|
||||
|
||||
if s.config.TCPKeepAlive > 0 {
|
||||
if err := conn.SetKeepAlive(true); err != nil {
|
||||
return fmt.Errorf("failed to enable keepalive: %w", err)
|
||||
|
||||
@@ -110,7 +110,7 @@ func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) {
|
||||
broker.HandleConnection(s.broker, conn)
|
||||
}
|
||||
|
||||
// wsConnection implements core.Connection for WebSocket transport
|
||||
// wsConnection implements core.Connection for WebSocket transport.
|
||||
type wsConnection struct {
|
||||
ws *websocket.Conn
|
||||
remoteAddr string
|
||||
@@ -249,7 +249,7 @@ func (c *wsConnection) Touch() {
|
||||
c.lastActivity = time.Now()
|
||||
}
|
||||
|
||||
// wsAddr implements net.Addr for WebSocket connections
|
||||
// wsAddr implements net.Addr for WebSocket connections.
|
||||
type wsAddr struct {
|
||||
addr string
|
||||
}
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package session
|
||||
|
||||
import "sync"
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package session
|
||||
|
||||
import "errors"
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package session
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package badger
|
||||
|
||||
import (
|
||||
@@ -55,7 +58,6 @@ func (m *MessageStore) Get(key string) (*storage.Message, error) {
|
||||
return json.Unmarshal(val, msg)
|
||||
})
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -91,7 +93,6 @@ func (m *MessageStore) List(prefix string) ([]*storage.Message, error) {
|
||||
messages = append(messages, &msg)
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unmarshal message: %w", err)
|
||||
}
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package badger
|
||||
|
||||
import (
|
||||
@@ -12,7 +15,7 @@ var _ storage.RetainedStore = (*RetainedStore)(nil)
|
||||
|
||||
// RetainedStore implements storage.RetainedStore using BadgerDB.
|
||||
//
|
||||
// Key format: retained:{topic}
|
||||
// Key format: retained:{topic}.
|
||||
type RetainedStore struct {
|
||||
db *badger.DB
|
||||
}
|
||||
@@ -60,7 +63,6 @@ func (r *RetainedStore) Get(topic string) (*storage.Message, error) {
|
||||
return json.Unmarshal(val, msg)
|
||||
})
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -104,7 +106,6 @@ func (r *RetainedStore) Match(filter string) ([]*storage.Message, error) {
|
||||
matched = append(matched, &msg)
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unmarshal retained message: %w", err)
|
||||
}
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package badger
|
||||
|
||||
import (
|
||||
@@ -40,7 +43,6 @@ func (s *SessionStore) Get(clientID string) (*storage.Session, error) {
|
||||
return json.Unmarshal(val, session)
|
||||
})
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -110,7 +112,6 @@ func (s *SessionStore) GetExpired(before time.Time) ([]string, error) {
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -143,7 +144,6 @@ func (s *SessionStore) List() ([]*storage.Session, error) {
|
||||
sessions = append(sessions, &session)
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unmarshal session: %w", err)
|
||||
}
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
// Package badger provides a BadgerDB-backed implementation of the storage interfaces.
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package badger
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package badger
|
||||
|
||||
import (
|
||||
@@ -14,7 +17,7 @@ var _ storage.SubscriptionStore = (*SubscriptionStore)(nil)
|
||||
|
||||
// SubscriptionStore implements storage.SubscriptionStore using BadgerDB.
|
||||
//
|
||||
// Key format: sub:{clientID}:{filter}
|
||||
// Key format: sub:{clientID}:{filter}.
|
||||
type SubscriptionStore struct {
|
||||
db *badger.DB
|
||||
count atomic.Int64 // Cached subscription count
|
||||
@@ -125,7 +128,6 @@ func (s *SubscriptionStore) GetForClient(clientID string) ([]*storage.Subscripti
|
||||
subs = append(subs, &sub)
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unmarshal subscription: %w", err)
|
||||
}
|
||||
@@ -164,7 +166,6 @@ func (s *SubscriptionStore) Match(topic string) ([]*storage.Subscription, error)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unmarshal subscription: %w", err)
|
||||
}
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package badger
|
||||
|
||||
import (
|
||||
@@ -13,14 +16,14 @@ var _ storage.WillStore = (*WillStore)(nil)
|
||||
|
||||
// WillStore implements storage.WillStore using BadgerDB.
|
||||
//
|
||||
// Key format: will:{clientID}
|
||||
// Key format: will:{clientID}.
|
||||
type WillStore struct {
|
||||
db *badger.DB
|
||||
}
|
||||
|
||||
// willEntry wraps a will message with disconnect timestamp for delay calculation.
|
||||
type willEntry struct {
|
||||
Will *storage.WillMessage `json:"will"`
|
||||
Will *storage.WillMessage `json:"will"`
|
||||
DisconnectedAt time.Time `json:"disconnected_at"`
|
||||
}
|
||||
|
||||
@@ -34,7 +37,7 @@ func (w *WillStore) Set(clientID string, will *storage.WillMessage) error {
|
||||
key := []byte("will:" + clientID)
|
||||
|
||||
entry := &willEntry{
|
||||
Will: will,
|
||||
Will: will,
|
||||
DisconnectedAt: time.Now(), // Mark as disconnected now
|
||||
}
|
||||
|
||||
@@ -67,7 +70,6 @@ func (w *WillStore) Get(clientID string) (*storage.WillMessage, error) {
|
||||
return json.Unmarshal(val, entry)
|
||||
})
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -85,7 +87,7 @@ func (w *WillStore) Delete(clientID string) error {
|
||||
}
|
||||
|
||||
// GetPending returns will messages that should be triggered.
|
||||
// (will delay elapsed and client still disconnected)
|
||||
// (will delay elapsed and client still disconnected).
|
||||
func (w *WillStore) GetPending(before time.Time) ([]*storage.WillMessage, error) {
|
||||
var pending []*storage.WillMessage
|
||||
|
||||
@@ -117,7 +119,6 @@ func (w *WillStore) GetPending(before time.Time) ([]*storage.WillMessage, error)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unmarshal will entry: %w", err)
|
||||
}
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package memory
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package memory
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package memory
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
// Package memory provides an in-memory implementation of the store interfaces.
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package memory
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package memory
|
||||
|
||||
import (
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package memory
|
||||
|
||||
import (
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user