SMQ-2888 - Change internal message topic format (#2889)

Signed-off-by: Arvindh <arvindh91@gmail.com>
This commit is contained in:
Arvindh
2025-06-05 13:18:56 +05:30
committed by GitHub
parent d64a74b1be
commit 21c7b1019e
41 changed files with 842 additions and 543 deletions
+2 -2
View File
@@ -148,8 +148,8 @@ func main() {
}
defer mpub.Close()
fwd := mqtt.NewForwarder(brokers.SubjectAllChannels, logger)
fwd = mqtttracing.New(serverConfig, tracer, fwd, brokers.SubjectAllChannels)
fwd := mqtt.NewForwarder(brokers.SubjectAllMessages, logger)
fwd = mqtttracing.New(serverConfig, tracer, fwd, brokers.SubjectAllMessages)
if err := fwd.Forward(ctx, svcName, bsub, mpub); err != nil {
logger.Error(fmt.Sprintf("failed to forward message broker messages: %s", err))
exitCode = 1
+1 -1
View File
@@ -76,5 +76,5 @@ Setting `SMQ_CLIENTS_GRPC_CLIENT_CERT` and `SMQ_CLIENTS_GRPC_CLIENT_KEY` will en
## Usage
If CoAP adapter is running locally (on default 5683 port), a valid URL would be: `coap://localhost/channels/<channel_id>/messages?auth=<client_auth_key>`.
If CoAP adapter is running locally (on default 5683 port), a valid URL would be: `coap://localhost/m/<domain_id>/c/<channel_id>/<subtopic>?auth=<client_auth_key>`.
Since CoAP protocol does not support `Authorization` header (option) and options have limited size, in order to send CoAP messages, valid `auth` value (a valid Client key) must be present in `Uri-Query` option.
+6 -19
View File
@@ -8,7 +8,6 @@ package coap
import (
"context"
"fmt"
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
grpcClientsV1 "github.com/absmach/supermq/api/grpc/clients/v1"
@@ -21,8 +20,6 @@ import (
var errFailedToDisconnectClient = errors.New("failed to disconnect client")
const chansPrefix = "channels"
// Service specifies CoAP service API.
type Service interface {
// Publish publishes message to specified channel.
@@ -37,7 +34,7 @@ type Service interface {
Unsubscribe(ctx context.Context, key, domainID, chanID, subptopic, token string) error
// DisconnectHandler method is used to disconnected the client
DisconnectHandler(ctx context.Context, chanID, subptopic, token string) error
DisconnectHandler(ctx context.Context, domainID, chanID, subptopic, token string) error
}
var _ Service = (*adapterService)(nil)
@@ -87,7 +84,7 @@ func (svc *adapterService) Publish(ctx context.Context, key string, msg *messagi
msg.Publisher = authnRes.GetId()
return svc.pubsub.Publish(ctx, msg.GetChannel(), msg)
return svc.pubsub.Publish(ctx, messaging.EncodeMessageTopic(msg), msg)
}
func (svc *adapterService) Subscribe(ctx context.Context, key, domainID, chanID, subtopic string, c Client) error {
@@ -116,11 +113,7 @@ func (svc *adapterService) Subscribe(ctx context.Context, key, domainID, chanID,
return svcerr.ErrAuthorization
}
subject := fmt.Sprintf("%s.%s", chansPrefix, chanID)
if subtopic != "" {
subject = fmt.Sprintf("%s.%s", subject, subtopic)
}
subject := messaging.EncodeTopic(domainID, chanID, subtopic)
authzc := newAuthzClient(clientID, domainID, chanID, subtopic, svc.channels, c)
subCfg := messaging.SubscriberConfig{
ID: c.Token(),
@@ -156,19 +149,13 @@ func (svc *adapterService) Unsubscribe(ctx context.Context, key, domainID, chanI
return svcerr.ErrAuthorization
}
subject := fmt.Sprintf("%s.%s", chansPrefix, chanID)
if subtopic != "" {
subject = fmt.Sprintf("%s.%s", subject, subtopic)
}
subject := messaging.EncodeTopic(domainID, chanID, subtopic)
return svc.pubsub.Unsubscribe(ctx, token, subject)
}
func (svc *adapterService) DisconnectHandler(ctx context.Context, chanID, subtopic, token string) error {
subject := fmt.Sprintf("%s.%s", chansPrefix, chanID)
if subtopic != "" {
subject = fmt.Sprintf("%s.%s", subject, subtopic)
}
func (svc *adapterService) DisconnectHandler(ctx context.Context, domainID, chanID, subtopic, token string) error {
subject := messaging.EncodeTopic(domainID, chanID, subtopic)
return svc.pubsub.Unsubscribe(ctx, token, subject)
}
+3 -2
View File
@@ -97,10 +97,11 @@ func (lm *loggingMiddleware) Unsubscribe(ctx context.Context, key, domainID, cha
// DisconnectHandler logs the disconnect handler. It logs the channel ID, subtopic (if any) and the time it took to complete the request.
// If the request fails, it logs the error.
func (lm *loggingMiddleware) DisconnectHandler(ctx context.Context, chanID, subtopic, token string) (err error) {
func (lm *loggingMiddleware) DisconnectHandler(ctx context.Context, domainID, chanID, subtopic, token string) (err error) {
defer func(begin time.Time) {
args := []any{
slog.String("duration", time.Since(begin).String()),
slog.String("domain_id", domainID),
slog.String("channel_id", chanID),
slog.String("token", token),
}
@@ -115,5 +116,5 @@ func (lm *loggingMiddleware) DisconnectHandler(ctx context.Context, chanID, subt
lm.logger.Info("Unsubscribe completed successfully", args...)
}(time.Now())
return lm.svc.DisconnectHandler(ctx, chanID, subtopic, token)
return lm.svc.DisconnectHandler(ctx, domainID, chanID, subtopic, token)
}
+2 -2
View File
@@ -62,11 +62,11 @@ func (mm *metricsMiddleware) Unsubscribe(ctx context.Context, key, domainID, cha
}
// DisconnectHandler instruments DisconnectHandler method with metrics.
func (mm *metricsMiddleware) DisconnectHandler(ctx context.Context, chanID, subtopic, token string) error {
func (mm *metricsMiddleware) DisconnectHandler(ctx context.Context, domainID, chanID, subtopic, token string) error {
defer func(begin time.Time) {
mm.counter.With("method", "disconnect_handler").Add(1)
mm.latency.With("method", "disconnect_handler").Observe(time.Since(begin).Seconds())
}(time.Now())
return mm.svc.DisconnectHandler(ctx, chanID, subtopic, token)
return mm.svc.DisconnectHandler(ctx, domainID, chanID, subtopic, token)
}
+14 -51
View File
@@ -9,8 +9,6 @@ import (
"io"
"log/slog"
"net/http"
"net/url"
"regexp"
"strings"
"time"
@@ -33,18 +31,9 @@ const (
startObserve = 0 // observe option value that indicates start of observation
)
var channelPartRegExp = regexp.MustCompile(`^/m/([\w\-]+)/c/([\w\-]+)(/[^?]*)?(\?.*)?$`)
const (
numGroups = 4 // entire expression+ domain group + channel group + subtopic group
domainGroup = 1 // domain group is first in channel regexp
channelGroup = 3 // channel group is third in channel regexp
)
var (
errMalformedSubtopic = errors.New("malformed subtopic")
errBadOptions = errors.New("bad options")
errMethodNotAllowed = errors.New("method not allowed")
errBadOptions = errors.New("bad options")
errMethodNotAllowed = errors.New("method not allowed")
)
var (
@@ -133,7 +122,7 @@ func handleGet(m *mux.Message, w mux.ResponseWriter, msg *messaging.Message, key
if obs == startObserve {
c := coap.NewClient(w.Conn(), m.Token(), logger)
w.Conn().AddOnClose(func() {
_ = service.DisconnectHandler(context.Background(), msg.GetChannel(), msg.GetSubtopic(), c.Token())
_ = service.DisconnectHandler(context.Background(), msg.GetDomain(), msg.GetChannel(), msg.GetSubtopic(), c.Token())
})
return service.Subscribe(w.Conn().Context(), key, msg.GetDomain(), msg.GetChannel(), msg.GetSubtopic(), c)
}
@@ -148,20 +137,23 @@ func decodeMessage(msg *mux.Message) (*messaging.Message, error) {
if err != nil {
return &messaging.Message{}, err
}
channelParts := channelPartRegExp.FindStringSubmatch(path)
if len(channelParts) < numGroups {
return &messaging.Message{}, errMalformedSubtopic
}
st, err := parseSubtopic(channelParts[channelGroup])
var domainID, channelID, subTopic string
switch msg.Code() {
case codes.GET:
domainID, channelID, subTopic, err = messaging.ParseSubscribeTopic(path)
case codes.POST:
domainID, channelID, subTopic, err = messaging.ParsePublishTopic(path)
}
if err != nil {
return &messaging.Message{}, err
}
ret := &messaging.Message{
Protocol: protocol,
Domain: channelParts[domainGroup],
Channel: channelParts[2],
Subtopic: st,
Domain: domainID,
Channel: channelID,
Subtopic: subTopic,
Payload: []byte{},
Created: time.Now().UnixNano(),
}
@@ -187,32 +179,3 @@ func parseKey(msg *mux.Message) (string, error) {
}
return vars[1], nil
}
func parseSubtopic(subtopic string) (string, error) {
if subtopic == "" {
return subtopic, nil
}
subtopic, err := url.QueryUnescape(subtopic)
if err != nil {
return "", errMalformedSubtopic
}
subtopic = strings.ReplaceAll(subtopic, "/", ".")
elems := strings.Split(subtopic, ".")
filteredElems := []string{}
for _, elem := range elems {
if elem == "" {
continue
}
if len(elem) > 1 && (strings.Contains(elem, "*") || strings.Contains(elem, ">")) {
return "", errMalformedSubtopic
}
filteredElems = append(filteredElems, elem)
}
subtopic = strings.Join(filteredElems, ".")
return subtopic, nil
}
+3 -2
View File
@@ -66,11 +66,12 @@ func (tm *tracingServiceMiddleware) Unsubscribe(ctx context.Context, key, domain
}
// DisconnectHandler traces a CoAP disconnect operation.
func (tm *tracingServiceMiddleware) DisconnectHandler(ctx context.Context, chanID, subptopic, token string) error {
func (tm *tracingServiceMiddleware) DisconnectHandler(ctx context.Context, domainID, chanID, subptopic, token string) error {
ctx, span := tm.tracer.Start(ctx, disconnectHandlerOp, trace.WithAttributes(
attribute.String("domain_id", domainID),
attribute.String("channel_id", chanID),
attribute.String("subtopic", subptopic),
))
defer span.End()
return tm.svc.DisconnectHandler(ctx, chanID, subptopic, token)
return tm.svc.DisconnectHandler(ctx, domainID, chanID, subptopic, token)
}
+1 -1
View File
@@ -123,7 +123,7 @@ type config struct {
func loadConfig(configPath string) (config, error) {
cfg := config{
SubscriberCfg: subscriberConfig{
Subjects: []string{brokers.SubjectAllChannels},
Subjects: []string{brokers.SubjectAllMessages},
},
TransformerCfg: transformerConfig{
Format: defFormat,
+2 -1
View File
@@ -28,6 +28,7 @@ import (
smqauthn "github.com/absmach/supermq/pkg/authn"
authnMocks "github.com/absmach/supermq/pkg/authn/mocks"
"github.com/absmach/supermq/pkg/connections"
"github.com/absmach/supermq/pkg/messaging"
pubsub "github.com/absmach/supermq/pkg/messaging/mocks"
"github.com/absmach/supermq/pkg/policies"
"github.com/stretchr/testify/assert"
@@ -257,7 +258,7 @@ func TestPublish(t *testing.T) {
ClientType: policies.ClientType,
Type: uint32(connections.Publish),
}).Return(tc.authzRes, tc.authzErr)
svcCall := pub.On("Publish", mock.Anything, tc.chanID, mock.Anything).Return(nil)
svcCall := pub.On("Publish", mock.Anything, messaging.EncodeTopicSuffix(tc.domainID, tc.chanID, ""), mock.Anything).Return(nil)
req := testRequest{
client: ts.Client(),
method: http.MethodPost,
+6 -61
View File
@@ -8,8 +8,6 @@ import (
"fmt"
"log/slog"
"net/http"
"net/url"
"regexp"
"strings"
"time"
@@ -49,14 +47,10 @@ var (
errClientNotInitialized = errors.New("client is not initialized")
errFailedPublish = errors.New("failed to publish")
errFailedPublishToMsgBroker = errors.New("failed to publish to supermq message broker")
errMalformedSubtopic = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("malformed subtopic"))
errMalformedTopic = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("malformed topic"))
errMissingTopicPub = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("failed to publish due to missing topic"))
errFailedParseSubtopic = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("failed to parse subtopic"))
)
var channelRegExp = regexp.MustCompile(`^\/?m\/([\w\-]+)\/c\/([\w\-]+)(\/[^?]*)?(\?.*)?$`)
// Event implements events.Event interface.
type handler struct {
publisher messaging.Publisher
@@ -125,6 +119,11 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
return errors.Wrap(errFailedPublish, errClientNotInitialized)
}
domainID, chanID, subtopic, err := messaging.ParsePublishTopic(*topic)
if err != nil {
return errors.Wrap(errMalformedTopic, err)
}
var clientID, clientType string
switch {
case strings.HasPrefix(string(s.Password), "Client"):
@@ -153,11 +152,6 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
return mgate.NewHTTPProxyError(http.StatusUnauthorized, svcerr.ErrAuthentication)
}
domainID, chanID, subtopic, err := parseTopic(*topic)
if err != nil {
return mgate.NewHTTPProxyError(http.StatusBadRequest, err)
}
msg := messaging.Message{
Protocol: protocol,
Domain: domainID,
@@ -186,7 +180,7 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
msg.Publisher = clientID
}
if err := h.publisher.Publish(ctx, msg.Channel, &msg); err != nil {
if err := h.publisher.Publish(ctx, messaging.EncodeMessageTopic(&msg), &msg); err != nil {
return errors.Wrap(errFailedPublishToMsgBroker, err)
}
@@ -209,52 +203,3 @@ func (h *handler) Unsubscribe(ctx context.Context, topics *[]string) error {
func (h *handler) Disconnect(ctx context.Context) error {
return nil
}
func parseTopic(topic string) (string, string, string, error) {
// Topics are in the format:
// m/<domain_id>/c/<channel_id>/<subtopic>/.../ct/<content_type>
channelParts := channelRegExp.FindStringSubmatch(topic)
if len(channelParts) < 3 {
return "", "", "", errors.Wrap(errFailedPublish, errMalformedTopic)
}
domainID := channelParts[1]
chanID := channelParts[2]
subtopic := channelParts[3]
subtopic, err := parseSubtopic(subtopic)
if err != nil {
return "", "", "", errors.Wrap(errFailedParseSubtopic, err)
}
return domainID, chanID, subtopic, nil
}
func parseSubtopic(subtopic string) (string, error) {
if subtopic == "" {
return subtopic, nil
}
subtopic, err := url.QueryUnescape(subtopic)
if err != nil {
return "", mgate.NewHTTPProxyError(http.StatusBadRequest, errMalformedSubtopic)
}
subtopic = strings.ReplaceAll(subtopic, "/", ".")
elems := strings.Split(subtopic, ".")
filteredElems := []string{}
for _, elem := range elems {
if elem == "" {
continue
}
if len(elem) > 1 && (strings.Contains(elem, "*") || strings.Contains(elem, ">")) {
return "", mgate.NewHTTPProxyError(http.StatusBadRequest, errMalformedSubtopic)
}
filteredElems = append(filteredElems, elem)
}
subtopic = strings.Join(filteredElems, ".")
return subtopic, nil
}
+8 -5
View File
@@ -7,6 +7,7 @@ import (
"context"
"fmt"
"net/http"
"strings"
"testing"
mghttp "github.com/absmach/mgate/pkg/http"
@@ -53,10 +54,8 @@ var (
validToken = "token"
validID = testsutil.GenerateUUID(&testing.T{})
errClientNotInitialized = errors.New("client is not initialized")
errFailedPublish = errors.New("failed to publish")
errMissingTopicPub = errors.New("failed to publish due to missing topic")
errMalformedTopic = errors.New("malformed topic")
errFailedParseSubtopic = errors.New("failed to parse subtopic")
errMalformedSubtopic = errors.New("malformed subtopic")
errFailedPublishToMsgBroker = errors.New("failed to publish to supermq message broker")
)
@@ -225,7 +224,7 @@ func TestPublish(t *testing.T) {
session: &clientKeySession,
authNRes: &grpcClientsV1.AuthnRes{Id: clientID, Authenticated: true},
authNErr: nil,
err: errors.Wrap(errFailedPublish, errMalformedTopic),
err: errMalformedTopic,
},
{
desc: "publish with malformwd subtopic",
@@ -235,7 +234,7 @@ func TestPublish(t *testing.T) {
session: &clientKeySession,
authNRes: &grpcClientsV1.AuthnRes{Id: clientID, Authenticated: true},
authNErr: nil,
err: errors.Wrap(errFailedParseSubtopic, errMalformedSubtopic),
err: errMalformedSubtopic,
},
{
desc: "publish with empty password",
@@ -333,10 +332,14 @@ func TestPublish(t *testing.T) {
if tc.session != nil {
ctx = session.NewContext(ctx, tc.session)
}
var internalTopic string
if tc.topic != nil {
internalTopic = strings.TrimPrefix(strings.ReplaceAll(*tc.topic, "/", "."), ".m.")
}
clientsCall := clients.On("Authenticate", ctx, &grpcClientsV1.AuthnReq{ClientSecret: tc.password}).Return(tc.authNRes, tc.authNErr)
authCall := authn.On("Authenticate", ctx, mock.Anything).Return(tc.authNRes1, tc.authNErr)
channelsCall := channels.On("Authorize", ctx, mock.Anything).Return(tc.authZRes, tc.authZErr)
repoCall := publisher.On("Publish", ctx, tc.channelID, mock.Anything).Return(tc.publishErr)
repoCall := publisher.On("Publish", ctx, internalTopic, mock.Anything).Return(tc.publishErr)
err := handler.Publish(ctx, tc.topic, tc.payload)
hpe, ok := err.(mghttp.HTTPProxyError)
if ok {
+2
View File
@@ -37,6 +37,7 @@ type subscribeEvent struct {
operation string
clientID string
subscriberID string
domainID string
channelID string
subtopic string
}
@@ -46,6 +47,7 @@ func (se subscribeEvent) Encode() (map[string]interface{}, error) {
"operation": se.operation,
"client_id": se.clientID,
"subscriber_id": se.subscriberID,
"domainID": se.domainID,
"channel_id": se.channelID,
"subtopic": se.subtopic,
}, nil
+5 -48
View File
@@ -5,14 +5,12 @@ package events
import (
"context"
"net/url"
"regexp"
"strings"
"github.com/absmach/mgate/pkg/session"
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/events"
"github.com/absmach/supermq/pkg/events/store"
"github.com/absmach/supermq/pkg/messaging"
)
const (
@@ -22,11 +20,7 @@ const (
disconnectStream = supermqPrefix + clientDisconnect
)
var (
errFailedSession = errors.New("failed to obtain session from context")
errMalformedTopic = errors.New("malformed topic")
channelRegExp = regexp.MustCompile(`^\/?m\/([\w\-]+)\/c\/([\w\-]+)(\/[^?]*)?(\?.*)?$`)
)
var errFailedSession = errors.New("failed to obtain session from context")
// EventStore is a struct used to store event streams in Redis.
type eventStore struct {
@@ -96,16 +90,17 @@ func (es *eventStore) Subscribe(ctx context.Context, topics *[]string) error {
}
for _, topic := range *topics {
channelID, subtopic, err := parseTopic(topic)
domainID, channelID, subTopic, err := messaging.ParseSubscribeTopic(topic)
if err != nil {
return err
}
ev := subscribeEvent{
operation: clientSubscribe,
clientID: s.Username,
domainID: domainID,
channelID: channelID,
subtopic: subTopic,
subscriberID: s.ID,
subtopic: subtopic,
}
if err := es.ep.Publish(ctx, subscribeStream, ev); err != nil {
@@ -139,41 +134,3 @@ func (es *eventStore) Disconnect(ctx context.Context) error {
return es.ep.Publish(ctx, disconnectStream, ev)
}
func parseTopic(topic string) (string, string, error) {
channelParts := channelRegExp.FindStringSubmatch(topic)
if len(channelParts) < 3 {
return "", "", errMalformedTopic
}
chanID := channelParts[2]
subtopic := channelParts[3]
if subtopic == "" {
return chanID, subtopic, nil
}
subtopic, err := url.QueryUnescape(subtopic)
if err != nil {
return "", "", errMalformedTopic
}
subtopic = strings.ReplaceAll(subtopic, "/", ".")
elems := strings.Split(subtopic, ".")
filteredElems := []string{}
for _, elem := range elems {
if elem == "" {
continue
}
if len(elem) > 1 && (strings.Contains(elem, "*") || strings.Contains(elem, ">")) {
return "", "", errMalformedTopic
}
filteredElems = append(filteredElems, elem)
}
subtopic = strings.Join(filteredElems, ".")
return chanID, subtopic, nil
}
+2 -7
View File
@@ -7,7 +7,6 @@ import (
"context"
"fmt"
"log/slog"
"strings"
"github.com/absmach/supermq/pkg/messaging"
)
@@ -47,12 +46,8 @@ func handle(ctx context.Context, pub messaging.Publisher, logger *slog.Logger) h
if msg.GetProtocol() == protocol {
return nil
}
// Use concatenation instead of fmt.Sprintf for the
// sake of simplicity and performance.
topic := "m/" + msg.GetDomain() + "/c/" + msg.GetChannel()
if msg.GetSubtopic() != "" {
topic = topic + "/" + strings.ReplaceAll(msg.GetSubtopic(), ".", "/")
}
topic := messaging.EncodeMessageMQTTTopic(msg)
go func() {
if err := pub.Publish(ctx, topic, msg); err != nil {
+17 -70
View File
@@ -7,8 +7,6 @@ import (
"context"
"fmt"
"log/slog"
"net/url"
"regexp"
"strings"
"time"
@@ -37,9 +35,7 @@ const (
// Error wrappers for MQTT errors.
var (
ErrMalformedSubtopic = errors.New("malformed subtopic")
ErrClientNotInitialized = errors.New("client is not initialized")
ErrMalformedTopic = errors.New("malformed topic")
ErrMissingClientID = errors.New("client_id not found")
ErrMissingTopicPub = errors.New("failed to publish due to missing topic")
ErrMissingTopicSub = errors.New("failed to subscribe due to missing topic")
@@ -49,15 +45,11 @@ var (
ErrFailedPublish = errors.New("failed to publish")
ErrFailedDisconnect = errors.New("failed to disconnect")
ErrFailedPublishDisconnectEvent = errors.New("failed to publish disconnect event")
ErrFailedParseSubtopic = errors.New("failed to parse subtopic")
ErrFailedPublishConnectEvent = errors.New("failed to publish connect event")
ErrFailedSubscribeEvent = errors.New("failed to publish subscribe event")
ErrFailedPublishToMsgBroker = errors.New("failed to publish to supermq message broker")
)
var (
errInvalidUserId = errors.New("invalid user id")
channelRegExp = regexp.MustCompile(`^\/?m\/([\w\-]+)\/c\/([\w\-]+)(\/[^?]*)?(\?.*)?$`)
)
// Event implements events.Event interface.
@@ -118,7 +110,12 @@ func (h *handler) AuthPublish(ctx context.Context, topic *string, payload *[]byt
return ErrClientNotInitialized
}
return h.authAccess(ctx, string(s.Username), *topic, connections.Publish)
domainID, chanID, _, err := messaging.ParsePublishTopic(*topic)
if err != nil {
return err
}
return h.authAccess(ctx, string(s.Username), domainID, chanID, connections.Publish)
}
// AuthSubscribe is called on device subscribe,
@@ -133,7 +130,12 @@ func (h *handler) AuthSubscribe(ctx context.Context, topics *[]string) error {
}
for _, topic := range *topics {
if err := h.authAccess(ctx, string(s.Username), topic, connections.Subscribe); err != nil {
domainID, chanID, _, err := messaging.ParseSubscribeTopic(topic)
if err != nil {
return err
}
if err := h.authAccess(ctx, string(s.Username), domainID, chanID, connections.Subscribe); err != nil {
return err
}
}
@@ -159,34 +161,22 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
}
h.logger.Info(fmt.Sprintf(LogInfoPublished, s.ID, *topic))
// Topics are in the format:
// m/<domain_id>/c/<channel_id>/<subtopic>/.../ct/<content_type>
channelParts := channelRegExp.FindStringSubmatch(*topic)
if len(channelParts) < 3 {
return errors.Wrap(ErrFailedPublish, ErrMalformedTopic)
}
domainID := channelParts[1]
chanID := channelParts[2]
subtopic := channelParts[3]
subtopic, err := parseSubtopic(subtopic)
domainID, chanID, subTopic, err := messaging.ParsePublishTopic(*topic)
if err != nil {
return errors.Wrap(ErrFailedParseSubtopic, err)
return errors.Wrap(ErrFailedPublish, err)
}
msg := messaging.Message{
Protocol: protocol,
Domain: domainID,
Channel: chanID,
Subtopic: subtopic,
Subtopic: subTopic,
Publisher: s.Username,
Payload: *payload,
Created: time.Now().UnixNano(),
}
if err := h.publisher.Publish(ctx, msg.GetChannel(), &msg); err != nil {
if err := h.publisher.Publish(ctx, messaging.EncodeMessageTopic(&msg), &msg); err != nil {
return errors.Wrap(ErrFailedPublishToMsgBroker, err)
}
@@ -226,21 +216,7 @@ func (h *handler) Disconnect(ctx context.Context) error {
return nil
}
func (h *handler) authAccess(ctx context.Context, clientID, topic string, msgType connections.ConnType) error {
// Topics are in the format:
// m/<domain_id>/c/<channel_id>/<subtopic>/.../ct/<content_type>
if !channelRegExp.MatchString(topic) {
return ErrMalformedTopic
}
channelParts := channelRegExp.FindStringSubmatch(topic)
if len(channelParts) < 3 {
return ErrMalformedTopic
}
domainID := channelParts[1]
chanID := channelParts[2]
func (h *handler) authAccess(ctx context.Context, clientID, domainID, chanID string, msgType connections.ConnType) error {
ar := &grpcChannelsV1.AuthzReq{
Type: uint32(msgType),
ClientId: clientID,
@@ -258,32 +234,3 @@ func (h *handler) authAccess(ctx context.Context, clientID, topic string, msgTyp
return nil
}
func parseSubtopic(subtopic string) (string, error) {
if subtopic == "" {
return subtopic, nil
}
subtopic, err := url.QueryUnescape(subtopic)
if err != nil {
return "", ErrMalformedSubtopic
}
subtopic = strings.ReplaceAll(subtopic, "/", ".")
elems := strings.Split(subtopic, ".")
filteredElems := []string{}
for _, elem := range elems {
if elem == "" {
continue
}
if len(elem) > 1 && (strings.Contains(elem, "*") || strings.Contains(elem, ">")) {
return "", ErrMalformedSubtopic
}
filteredElems = append(filteredElems, elem)
}
subtopic = strings.Join(filteredElems, ".")
return subtopic, nil
}
+7 -6
View File
@@ -21,6 +21,7 @@ import (
"github.com/absmach/supermq/pkg/connections"
"github.com/absmach/supermq/pkg/errors"
svcerr "github.com/absmach/supermq/pkg/errors/service"
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/messaging/mocks"
"github.com/absmach/supermq/pkg/policies"
"github.com/stretchr/testify/assert"
@@ -192,7 +193,7 @@ func TestAuthPublish(t *testing.T) {
{
desc: "publish with malformed topic",
session: &sessionClient,
err: mqtt.ErrMalformedTopic,
err: messaging.ErrMalformedTopic,
topic: &invalidTopic,
payload: payload,
},
@@ -254,7 +255,7 @@ func TestAuthSubscribe(t *testing.T) {
{
desc: "subscribe with invalid topics",
session: &sessionClient,
err: mqtt.ErrMalformedTopic,
err: messaging.ErrMalformedTopic,
topic: &invalidTopics,
},
{
@@ -366,28 +367,28 @@ func TestPublish(t *testing.T) {
topic: invalidTopic,
payload: payload,
logMsg: fmt.Sprintf(mqtt.LogInfoPublished, clientID, invalidTopic),
err: errors.Wrap(mqtt.ErrFailedPublish, mqtt.ErrMalformedTopic),
err: errors.Wrap(mqtt.ErrFailedPublish, messaging.ErrMalformedTopic),
},
{
desc: "publish with invalid channel ID",
session: &sessionClient,
topic: invalidChannelIDTopic,
payload: payload,
err: errors.Wrap(mqtt.ErrFailedPublish, mqtt.ErrMalformedTopic),
err: errors.Wrap(mqtt.ErrFailedPublish, messaging.ErrMalformedTopic),
},
{
desc: "publish with malformed subtopic",
session: &sessionClient,
topic: malformedSubtopics,
payload: payload,
err: errors.Wrap(mqtt.ErrFailedParseSubtopic, mqtt.ErrMalformedSubtopic),
err: errors.Wrap(mqtt.ErrFailedPublish, errors.Wrap(messaging.ErrMalformedTopic, errors.Wrap(messaging.ErrMalformedSubtopic, errors.New("invalid URL escape \"%\"")))),
},
{
desc: "publish with subtopic containing wrong character",
session: &sessionClient,
topic: wrongCharSubtopics,
payload: payload,
err: errors.Wrap(mqtt.ErrFailedParseSubtopic, mqtt.ErrMalformedSubtopic),
err: errors.Wrap(mqtt.ErrFailedPublish, errors.Wrap(messaging.ErrMalformedTopic, messaging.ErrMalformedSubtopic)),
},
{
desc: "publish with subtopic",
+2 -3
View File
@@ -37,15 +37,14 @@ func New(config server.Config, tracer trace.Tracer, forwarder mqtt.Forwarder, to
// Forward traces mqtt forward operations.
func (fm *forwarderMiddleware) Forward(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error {
subject := fmt.Sprintf("channels.%s.messages", fm.topic)
spanName := fmt.Sprintf("%s %s", subject, forwardOP)
spanName := fmt.Sprintf("%s %s", fm.topic, forwardOP)
ctx, span := fm.tracer.Start(ctx,
spanName,
trace.WithAttributes(
attribute.String("messaging.system", "mqtt"),
attribute.Bool("messaging.destination.anonymous", false),
attribute.String("messaging.destination.template", "channels/{channelID}/messages/*"),
attribute.String("messaging.destination.template", "m/{domainID}/c/{channelID}/*"),
attribute.Bool("messaging.destination.temporary", true),
attribute.String("network.protocol.name", "mqtt"),
attribute.String("network.protocol.version", "3.1.1"),
+2 -2
View File
@@ -15,8 +15,8 @@ import (
"github.com/absmach/supermq/pkg/messaging/nats"
)
// SubjectAllChannels represents subject to subscribe for all the channels.
const SubjectAllChannels = "channels.>"
// SubjectAllMessages represents subject to subscribe for all the messages.
const SubjectAllMessages = messaging.MsgTopicPrefix + ".>"
func init() {
log.Println("The binary was build using Nats as the message broker")
+2 -2
View File
@@ -15,8 +15,8 @@ import (
"github.com/absmach/supermq/pkg/messaging/rabbitmq"
)
// SubjectAllChannels represents subject to subscribe for all the channels.
const SubjectAllChannels = "channels.#"
// SubjectAllMessages represents subject to subscribe for all the messages.
const SubjectAllMessages = messaging.MsgTopicPrefix + ".#"
func init() {
log.Println("The binary was build using RabbitMQ as the message broker")
@@ -15,9 +15,6 @@ import (
"go.opentelemetry.io/otel/trace"
)
// SubjectAllChannels represents subject to subscribe for all the channels.
const SubjectAllChannels = "channels.>"
func init() {
log.Println("The binary was build using Nats as the message broker")
}
@@ -15,9 +15,6 @@ import (
"go.opentelemetry.io/otel/trace"
)
// SubjectAllChannels represents subject to subscribe for all the channels.
const SubjectAllChannels = "channels.#"
func init() {
log.Println("The binary was build using RabbitMQ as the message broker")
}
+14 -14
View File
@@ -19,7 +19,7 @@ import (
const (
topic = "topic"
chansPrefix = "channels"
msgPrefix = "m"
channel = "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b"
subtopic = "engine"
tokenTimeout = 100 * time.Millisecond
@@ -317,7 +317,7 @@ func TestUnsubscribe(t *testing.T) {
}{
{
desc: "Subscribe to a topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
clientID: "clientid4",
err: nil,
subscribe: true,
@@ -325,7 +325,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Subscribe to the same topic with a different ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
clientID: "clientid9",
err: nil,
subscribe: true,
@@ -333,7 +333,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Unsubscribe from a topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
clientID: "clientid4",
err: nil,
subscribe: false,
@@ -341,7 +341,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Unsubscribe from same topic with different ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
clientID: "clientid9",
err: nil,
subscribe: false,
@@ -357,7 +357,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Unsubscribe from an already unsubscribed topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
clientID: "clientid4",
err: mqttpubsub.ErrNotSubscribed,
subscribe: false,
@@ -365,7 +365,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Subscribe to a topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
topic: fmt.Sprintf("%s.%s.%s", msgPrefix, topic, subtopic),
clientID: "clientidd4",
err: nil,
subscribe: true,
@@ -373,7 +373,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Unsubscribe from a topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
topic: fmt.Sprintf("%s.%s.%s", msgPrefix, topic, subtopic),
clientID: "clientidd4",
err: nil,
subscribe: false,
@@ -381,7 +381,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Unsubscribe from an already unsubscribed topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
topic: fmt.Sprintf("%s.%s.%s", msgPrefix, topic, subtopic),
clientID: "clientid4",
err: mqttpubsub.ErrNotSubscribed,
subscribe: false,
@@ -397,7 +397,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Unsubscribe from a topic with empty ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
clientID: "",
err: mqttpubsub.ErrEmptyID,
subscribe: false,
@@ -405,7 +405,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Subscribe to a new topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic+"2"),
clientID: "clientid55",
err: nil,
subscribe: true,
@@ -413,7 +413,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Unsubscribe from a topic with an ID with failing handler",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic+"2"),
clientID: "clientid55",
err: errFailedHandleMessage,
subscribe: false,
@@ -421,7 +421,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Subscribe to a new topic with subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic+"2", subtopic),
topic: fmt.Sprintf("%s.%s.%s", msgPrefix, topic+"2", subtopic),
clientID: "clientid55",
err: nil,
subscribe: true,
@@ -429,7 +429,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Unsubscribe from a topic with subtopic with an ID with failing handler",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic+"2", subtopic),
topic: fmt.Sprintf("%s.%s.%s", msgPrefix, topic+"2", subtopic),
clientID: "clientid55",
err: errFailedHandleMessage,
subscribe: false,
+4 -4
View File
@@ -16,9 +16,9 @@ var (
ErrInvalidType = errors.New("invalid type")
jsStreamConfig = jetstream.StreamConfig{
Name: "channels",
Name: "m",
Description: "SuperMQ stream for sending and receiving messages in between SuperMQ channels",
Subjects: []string{"channels.>"},
Subjects: []string{"m.>"},
Retention: jetstream.LimitsPolicy,
MaxMsgsPerSubject: 1e6,
MaxAge: time.Hour * 24,
@@ -28,7 +28,7 @@ var (
}
)
const chansPrefix = "channels"
const msgPrefix = "m"
type options struct {
prefix string
@@ -37,7 +37,7 @@ type options struct {
func defaultOptions() options {
return options{
prefix: chansPrefix,
prefix: msgPrefix,
jsStreamConfig: jsStreamConfig,
}
}
+3 -5
View File
@@ -74,13 +74,11 @@ func (pub *publisher) Publish(ctx context.Context, topic string, msg *messaging.
}
subject := fmt.Sprintf("%s.%s", pub.prefix, topic)
if msg.GetSubtopic() != "" {
subject = fmt.Sprintf("%s.%s", subject, msg.GetSubtopic())
if _, err = pub.js.Publish(ctx, subject, data); err != nil {
return err
}
_, err = pub.js.Publish(ctx, subject, data)
return err
return nil
}
func (pub *publisher) Close() error {
+21 -21
View File
@@ -15,11 +15,11 @@ import (
)
const (
topic = "topic"
chansPrefix = "channels"
channel = "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b"
subtopic = "engine"
clientID = "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b"
topic = "topic"
msgPrefix = "m"
channel = "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b"
subtopic = "engine"
clientID = "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b"
)
var (
@@ -37,7 +37,7 @@ var (
func TestPublisher(t *testing.T) {
subCfg := messaging.SubscriberConfig{
ID: clientID,
Topic: fmt.Sprintf("%s.>", chansPrefix),
Topic: fmt.Sprintf("%s.>", msgPrefix),
Handler: handler{},
}
err := pubsub.Subscribe(context.TODO(), subCfg)
@@ -116,7 +116,7 @@ func TestPubsub(t *testing.T) {
}{
{
desc: "Subscribe to a topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
clientID: "clientid1",
errorMessage: nil,
pubsub: true,
@@ -124,7 +124,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Subscribe using malformed topic and ID",
topic: fmt.Sprintf("%s.>", chansPrefix),
topic: fmt.Sprintf("%s.>", msgPrefix),
clientID: "clientid1",
errorMessage: nil,
pubsub: true,
@@ -132,7 +132,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Subscribe using malformed topic and ID",
topic: fmt.Sprintf("%s.*", chansPrefix),
topic: fmt.Sprintf("%s.*", msgPrefix),
clientID: "clientid1",
errorMessage: nil,
pubsub: true,
@@ -140,7 +140,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Subscribe to the same topic with a different ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
clientID: "clientid2",
errorMessage: nil,
pubsub: true,
@@ -148,7 +148,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Subscribe to an already subscribed topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
clientID: "clientid1",
errorMessage: nil,
pubsub: true,
@@ -156,7 +156,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Unsubscribe from a topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
clientID: "clientid1",
errorMessage: nil,
pubsub: false,
@@ -172,7 +172,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Unsubscribe from the same topic with a different ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
clientID: "clientidd2",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
@@ -180,7 +180,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Unsubscribe from the same topic with a different ID not subscribed",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
clientID: "clientidd3",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
@@ -188,7 +188,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Unsubscribe from an already unsubscribed topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
clientID: "clientid1",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
@@ -196,7 +196,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Subscribe to a topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
topic: fmt.Sprintf("%s.%s.%s", msgPrefix, topic, subtopic),
clientID: "clientidd1",
errorMessage: nil,
pubsub: true,
@@ -204,7 +204,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Subscribe to an already subscribed topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
topic: fmt.Sprintf("%s.%s.%s", msgPrefix, topic, subtopic),
clientID: "clientidd1",
errorMessage: nil,
pubsub: true,
@@ -212,7 +212,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Unsubscribe from a topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
topic: fmt.Sprintf("%s.%s.%s", msgPrefix, topic, subtopic),
clientID: "clientidd1",
errorMessage: nil,
pubsub: false,
@@ -220,7 +220,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Unsubscribe from an already unsubscribed topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
topic: fmt.Sprintf("%s.%s.%s", msgPrefix, topic, subtopic),
clientID: "clientid1",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
@@ -244,7 +244,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Subscribe to a topic with empty id",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
clientID: "",
errorMessage: nats.ErrEmptyID,
pubsub: true,
@@ -252,7 +252,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Unsubscribe from a topic with empty id",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
clientID: "",
errorMessage: nats.ErrEmptyID,
pubsub: false,
+2 -2
View File
@@ -14,7 +14,7 @@ var ErrInvalidType = errors.New("invalid type")
const (
exchangeName = "messages"
chansPrefix = "channels"
msgPrefix = "m"
)
type options struct {
@@ -24,7 +24,7 @@ type options struct {
func defaultOptions() options {
return options{
prefix: chansPrefix,
prefix: msgPrefix,
exchange: exchangeName,
}
}
-9
View File
@@ -6,7 +6,6 @@ package rabbitmq
import (
"context"
"fmt"
"strings"
"github.com/absmach/supermq/pkg/messaging"
amqp "github.com/rabbitmq/amqp091-go"
@@ -61,10 +60,6 @@ func (pub *publisher) Publish(ctx context.Context, topic string, msg *messaging.
}
subject := fmt.Sprintf("%s.%s", pub.prefix, topic)
if msg.GetSubtopic() != "" {
subject = fmt.Sprintf("%s.%s", subject, msg.GetSubtopic())
}
subject = formatTopic(subject)
err = pub.channel.PublishWithContext(
ctx,
@@ -88,7 +83,3 @@ func (pub *publisher) Publish(ctx context.Context, topic string, msg *messaging.
func (pub *publisher) Close() error {
return pub.conn.Close()
}
func formatTopic(topic string) string {
return strings.ReplaceAll(topic, ">", "#")
}
+5
View File
@@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"log/slog"
"strings"
"sync"
"github.com/absmach/supermq/pkg/messaging"
@@ -180,3 +181,7 @@ func (ps *pubsub) handle(deliveries <-chan amqp.Delivery, h messaging.MessageHan
}
}
}
func formatTopic(topic string) string {
return strings.ReplaceAll(topic, ">", "#")
}
+18 -18
View File
@@ -18,7 +18,7 @@ import (
const (
topic = "topic"
chansPrefix = "channels"
msgPrefix = "m"
channel = "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b"
subtopic = "engine"
clientID = "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b"
@@ -37,8 +37,8 @@ func TestPublisher(t *testing.T) {
conn, ch, err := newConn()
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
topicChan := subscribe(t, ch, fmt.Sprintf("%s.%s", chansPrefix, topic))
subtopicChan := subscribe(t, ch, fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic))
topicChan := subscribe(t, ch, fmt.Sprintf("%s.%s", msgPrefix, topic))
subtopicChan := subscribe(t, ch, fmt.Sprintf("%s.%s.%s", msgPrefix, topic, subtopic))
go rabbitHandler(topicChan, handler{})
go rabbitHandler(subtopicChan, handler{})
@@ -224,7 +224,7 @@ func TestUnsubscribe(t *testing.T) {
}{
{
desc: "Subscribe to a topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
clientID: "clientid4",
err: nil,
subscribe: true,
@@ -232,7 +232,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Subscribe to the same topic with a different ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
clientID: "clientid9",
err: nil,
subscribe: true,
@@ -240,7 +240,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Unsubscribe from a topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
clientID: "clientid4",
err: nil,
subscribe: false,
@@ -248,7 +248,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Unsubscribe from same topic with different ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
clientID: "clientid9",
err: nil,
subscribe: false,
@@ -264,7 +264,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Unsubscribe from an already unsubscribed topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
clientID: "clientid4",
err: rabbitmq.ErrNotSubscribed,
subscribe: false,
@@ -272,7 +272,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Subscribe to a topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
topic: fmt.Sprintf("%s.%s.%s", msgPrefix, topic, subtopic),
clientID: "clientidd4",
err: nil,
subscribe: true,
@@ -280,7 +280,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Unsubscribe from a topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
topic: fmt.Sprintf("%s.%s.%s", msgPrefix, topic, subtopic),
clientID: "clientidd4",
err: nil,
subscribe: false,
@@ -288,7 +288,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Unsubscribe from an already unsubscribed topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
topic: fmt.Sprintf("%s.%s.%s", msgPrefix, topic, subtopic),
clientID: "clientid4",
err: rabbitmq.ErrNotSubscribed,
subscribe: false,
@@ -304,7 +304,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Unsubscribe from a topic with empty ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
clientID: "",
err: rabbitmq.ErrEmptyID,
subscribe: false,
@@ -312,7 +312,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Subscribe to a new topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic+"2"),
clientID: "clientid55",
err: nil,
subscribe: true,
@@ -320,7 +320,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Unsubscribe from a topic with an ID with failing handler",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"),
topic: fmt.Sprintf("%s.%s", msgPrefix, topic+"2"),
clientID: "clientid55",
err: errFailedHandleMessage,
subscribe: false,
@@ -328,7 +328,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Subscribe to a new topic with subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic+"2", subtopic),
topic: fmt.Sprintf("%s.%s.%s", msgPrefix, topic+"2", subtopic),
clientID: "clientid55",
err: nil,
subscribe: true,
@@ -336,7 +336,7 @@ func TestUnsubscribe(t *testing.T) {
},
{
desc: "Unsubscribe from a topic with subtopic with an ID with failing handler",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic+"2", subtopic),
topic: fmt.Sprintf("%s.%s.%s", msgPrefix, topic+"2", subtopic),
clientID: "clientid55",
err: errFailedHandleMessage,
subscribe: false,
@@ -408,7 +408,7 @@ func TestPubSub(t *testing.T) {
for _, tc := range cases {
subject := ""
if tc.topic != "" {
subject = fmt.Sprintf("%s.%s", chansPrefix, tc.topic)
subject = fmt.Sprintf("%s.%s", msgPrefix, tc.topic)
}
subCfg := messaging.SubscriberConfig{
ID: tc.clientID,
@@ -432,7 +432,7 @@ func TestPubSub(t *testing.T) {
assert.Equal(t, expectedMsg.Channel, receivedMsg.Channel, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
assert.Equal(t, expectedMsg.Payload, receivedMsg.Payload, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
err = pubsub.Unsubscribe(context.TODO(), tc.clientID, fmt.Sprintf("%s.%s", chansPrefix, tc.topic))
err = pubsub.Unsubscribe(context.TODO(), tc.clientID, fmt.Sprintf("%s.%s", msgPrefix, tc.topic))
assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", tc.desc, err))
default:
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, err, tc.err))
+156
View File
@@ -0,0 +1,156 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package messaging
import (
"fmt"
"net/url"
"regexp"
"strings"
"github.com/absmach/supermq/pkg/errors"
)
const (
MsgTopicPrefix = "m"
ChannelTopicPrefix = "c"
numGroups = 4 // entire expression + domain group + channel group + subtopic group
domainGroup = 1 // domain group is first in msg topic regexp
channelGroup = 2 // channel group is second in msg topic regexp
subtopicGroup = 3 // subtopic group is third in msg topic regexp
)
var (
ErrMalformedTopic = errors.New("malformed topic")
ErrMalformedSubtopic = errors.New("malformed subtopic")
// Regex to group topic in format m.<domain_id>.c.<channel_id>.<sub_topic> `^\/?m\/([\w\-]+)\/c\/([\w\-]+)(\/[^?]*)?(\?.*)?$`.
topicRegExp = regexp.MustCompile(`^\/?` + MsgTopicPrefix + `\/([\w\-]+)\/` + ChannelTopicPrefix + `\/([\w\-]+)(\/[^?]*)?(\?.*)?$`)
mqWildcards = "+#"
wildcards = "*>"
subtopicInvalidChars = " #+"
wildcardsReplacer = strings.NewReplacer("+", "*", "#", ">")
pathReplacer = strings.NewReplacer("/", ".")
)
func ParsePublishTopic(topic string) (domainID, chanID, subtopic string, err error) {
msgParts := topicRegExp.FindStringSubmatch(topic)
if len(msgParts) < numGroups {
return "", "", "", ErrMalformedTopic
}
domainID = msgParts[domainGroup]
chanID = msgParts[channelGroup]
subtopic = msgParts[subtopicGroup]
subtopic, err = ParsePublishSubtopic(subtopic)
if err != nil {
return "", "", "", errors.Wrap(ErrMalformedTopic, err)
}
return domainID, chanID, subtopic, nil
}
func ParsePublishSubtopic(subtopic string) (parseSubTopic string, err error) {
if subtopic == "" {
return subtopic, nil
}
subtopic, err = formatSubtopic(subtopic)
if err != nil {
return "", errors.Wrap(ErrMalformedSubtopic, err)
}
if strings.ContainsAny(subtopic, subtopicInvalidChars+wildcards) {
return "", ErrMalformedSubtopic
}
if strings.Contains(subtopic, "..") {
return "", ErrMalformedSubtopic
}
return subtopic, nil
}
func ParseSubscribeTopic(topic string) (domainID string, chanID string, subtopic string, err error) {
msgParts := topicRegExp.FindStringSubmatch(topic)
if len(msgParts) < numGroups {
return "", "", "", ErrMalformedTopic
}
domainID = msgParts[domainGroup]
chanID = msgParts[channelGroup]
subtopic = msgParts[subtopicGroup]
subtopic, err = ParseSubscribeSubtopic(subtopic)
if err != nil {
return "", "", "", errors.Wrap(ErrMalformedTopic, err)
}
return domainID, chanID, subtopic, nil
}
func ParseSubscribeSubtopic(subtopic string) (parseSubTopic string, err error) {
if subtopic == "" {
return "", nil
}
if strings.ContainsAny(subtopic, mqWildcards) {
subtopic = wildcardsReplacer.Replace(subtopic)
}
subtopic, err = formatSubtopic(subtopic)
if err != nil {
return "", errors.Wrap(ErrMalformedSubtopic, err)
}
if strings.ContainsAny(subtopic, subtopicInvalidChars) {
return "", ErrMalformedSubtopic
}
if strings.Contains(subtopic, "..") {
return "", ErrMalformedSubtopic
}
for _, elem := range strings.Split(subtopic, ".") {
if len(elem) > 1 && strings.ContainsAny(elem, wildcards) {
return "", ErrMalformedSubtopic
}
}
return subtopic, nil
}
func formatSubtopic(subtopic string) (string, error) {
subtopic, err := url.QueryUnescape(subtopic)
if err != nil {
return "", err
}
subtopic = strings.TrimPrefix(subtopic, "/")
subtopic = strings.TrimSuffix(subtopic, "/")
subtopic = strings.TrimSpace(subtopic)
subtopic = pathReplacer.Replace(subtopic)
return subtopic, nil
}
func EncodeTopic(domainID string, channelID string, subtopic string) string {
return fmt.Sprintf("%s.%s", MsgTopicPrefix, EncodeTopicSuffix(domainID, channelID, subtopic))
}
func EncodeTopicSuffix(domainID string, channelID string, subtopic string) string {
subject := fmt.Sprintf("%s.%s.%s", domainID, ChannelTopicPrefix, channelID)
if subtopic != "" {
subject = fmt.Sprintf("%s.%s", subject, subtopic)
}
return subject
}
func EncodeMessageTopic(m *Message) string {
return EncodeTopicSuffix(m.GetDomain(), m.GetChannel(), m.GetSubtopic())
}
func EncodeMessageMQTTTopic(m *Message) string {
topic := fmt.Sprintf("%s/%s/%s/%s", MsgTopicPrefix, m.GetDomain(), ChannelTopicPrefix, m.GetChannel())
if m.GetSubtopic() != "" {
topic = topic + "/" + strings.ReplaceAll(m.GetSubtopic(), ".", "/")
}
return topic
}
+422
View File
@@ -0,0 +1,422 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package messaging_test
import (
"testing"
"github.com/absmach/supermq/pkg/messaging"
"github.com/stretchr/testify/assert"
)
var ParsePublisherTopicTestCases = []struct {
desc string
topic string
domainID string
channelID string
subtopic string
expectErr bool
}{
{
desc: "valid topic with subtopic /m/domain123/c/channel456/devices/temp",
topic: "/m/domain123/c/channel456/devices/temp",
domainID: "domain123",
channelID: "channel456",
subtopic: "devices.temp",
},
{
desc: "valid topic with URL encoded subtopic /m/domain123/c/channel456/devices%2Ftemp%2Fdata",
topic: "/m/domain123/c/channel456/devices%2Ftemp%2Fdata",
domainID: "domain123",
channelID: "channel456",
subtopic: "devices.temp.data",
},
{
desc: "valid topic with subtopic /m/domain/c/channel/extra/extra2",
topic: "/m/domain/c/channel/extra/extra2",
domainID: "domain",
channelID: "channel",
subtopic: "extra.extra2",
},
{
desc: "valid topic without subtopic /m/domain123/c/channel456",
topic: "/m/domain123/c/channel456",
domainID: "domain123",
channelID: "channel456",
subtopic: "",
},
{
desc: "invalid topic format (missing parts) /m/domain123/c/",
topic: "/m/domain123/c/",
domainID: "domain123",
channelID: "",
subtopic: "",
expectErr: true,
},
{
desc: "invalid topic format (missing domain) /m//c/channel123",
topic: "/m//c/channel123",
domainID: "",
channelID: "",
subtopic: "",
expectErr: true,
},
{
desc: "topic with wildcards + and # /m/domain123/c/channel456/devices/+/temp/#",
topic: "/m/domain123/c/channel456/devices/+/temp/#",
domainID: "domain123",
channelID: "channel456",
subtopic: "",
expectErr: true,
},
{
desc: "invalid domain name m/domain*123/c/channel456/devices/+/temp/#",
topic: "m/domain*123/c/channel456/devices/+/temp/#",
domainID: "",
channelID: "channel456",
subtopic: "devices.*.temp.>",
expectErr: true,
},
{
desc: "invalid subtopic /m/domain123/c/channel456/sub/a*b/topic",
topic: "/m/domain123/c/channel456/sub/a*b/topic",
domainID: "domain123",
channelID: "channel456",
subtopic: "",
expectErr: true,
},
{
desc: "invalid subtopic /m/domain123/c/channel456/sub/a>b/topic",
topic: "/m/domain123/c/channel456/sub/a>b/topic",
domainID: "domain123",
channelID: "channel456",
subtopic: "",
expectErr: true,
},
{
desc: "invalid subtopic /m/domain123/c/channel456/sub/a#b/topic",
topic: "/m/domain123/c/channel456/sub/a#b/topic",
domainID: "domain123",
channelID: "channel456",
subtopic: "",
expectErr: true,
},
{
desc: "invalid subtopic /m/domain123/c/channel456/sub/a+b/topic",
topic: "/m/domain123/c/channel456/sub/a+b/topic",
domainID: "domain123",
channelID: "channel456",
subtopic: "",
expectErr: true,
},
{
desc: "invalid subtopic /m/domain123/c/channel456/sub/a//b/topic",
topic: "/m/domain123/c/channel456/sub/a//b/topic",
domainID: "domain123",
channelID: "channel456",
subtopic: "",
expectErr: true,
},
{
desc: "invalid topic regex \"not-a-topic\"",
topic: "not-a-topic",
domainID: "",
channelID: "",
subtopic: "",
expectErr: true,
},
}
func TestParsePublishTopic(t *testing.T) {
for _, tc := range ParsePublisherTopicTestCases {
t.Run(tc.desc, func(t *testing.T) {
domainID, channelID, subtopic, err := messaging.ParsePublishTopic(tc.topic)
if tc.expectErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tc.domainID, domainID)
assert.Equal(t, tc.channelID, channelID)
assert.Equal(t, tc.subtopic, subtopic)
}
})
}
}
func BenchmarkParsePublisherTopic(b *testing.B) {
for _, tc := range ParsePublisherTopicTestCases {
b.Run(tc.desc, func(b *testing.B) {
for i := 0; i < b.N; i++ {
_, _, _, _ = messaging.ParsePublishTopic(tc.topic)
}
})
}
}
var ParseSubscribeTestCases = []struct {
desc string
topic string
domainID string
channelID string
subtopic string
expectErr bool
}{
{
desc: "valid topic with subtopic /m/domain123/c/channel456/devices/temp",
topic: "/m/domain123/c/channel456/devices/temp",
domainID: "domain123",
channelID: "channel456",
subtopic: "devices.temp",
},
{
desc: "topic with wildcards + and # /m/domain123/c/channel456/devices/+/temp/#",
topic: "/m/domain123/c/channel456/devices/+/temp/#",
domainID: "domain123",
channelID: "channel456",
subtopic: "devices.*.temp.>",
},
{
desc: "valid topic without subtopic /m/domain123/c/channel456",
topic: "/m/domain123/c/channel456",
domainID: "domain123",
channelID: "channel456",
subtopic: "",
},
{
desc: "invalid topic format (missing channel) /m/domain123/c/",
topic: "/m/domain123/c/",
domainID: "domain123",
channelID: "",
subtopic: "",
expectErr: true,
},
{
desc: "invalid topic format (missing domain) /m//c/channel123",
topic: "/m//c/channel123",
domainID: "",
channelID: "",
subtopic: "",
expectErr: true,
},
{
desc: "invalid domain name m/domain*123/c/channel456/devices/+/temp/#",
topic: "m/domain*123/c/channel456/devices/+/temp/#",
domainID: "",
channelID: "channel456",
subtopic: "devices.*.temp.>",
expectErr: true,
},
{
desc: "invalid subtopic /m/domain123/c/channel456/sub/a*b/topic",
topic: "/m/domain123/c/channel456/sub/a*b/topic",
domainID: "domain123",
channelID: "channel456",
subtopic: "",
expectErr: true,
},
{
desc: "invalid subtopic /m/domain123/c/channel456/sub/a>b/topic",
topic: "/m/domain123/c/channel456/sub/a>b/topic",
domainID: "domain123",
channelID: "channel456",
subtopic: "",
expectErr: true,
},
{
desc: "invalid subtopic /m/domain123/c/channel456/sub/a#b/topic",
topic: "/m/domain123/c/channel456/sub/a#b/topic",
domainID: "domain123",
channelID: "channel456",
subtopic: "",
expectErr: true,
},
{
desc: "invalid subtopic /m/domain123/c/channel456/sub/a+b/topic",
topic: "/m/domain123/c/channel456/sub/a+b/topic",
domainID: "domain123",
channelID: "channel456",
subtopic: "",
expectErr: true,
},
{
desc: "invalid subtopic /m/domain123/c/channel456/sub/a//b/topic",
topic: "/m/domain123/c/channel456/sub/a//b/topic",
domainID: "domain123",
channelID: "channel456",
subtopic: "",
expectErr: true,
},
{
desc: "invalid subtopic /m/domain123/c/channel456/sub/a/ /b/topic",
topic: "/m/domain123/c/channel456/sub/a/ /b/topic",
domainID: "domain123",
channelID: "channel456",
subtopic: "",
expectErr: true,
},
{
desc: "completely invalid topic \"invalid-topic\"",
topic: "invalid-topic",
domainID: "",
channelID: "",
subtopic: "",
expectErr: true,
},
}
func TestParseSubscribeTopic(t *testing.T) {
for _, tc := range ParseSubscribeTestCases {
t.Run(tc.desc, func(t *testing.T) {
domainID, channelID, subtopic, err := messaging.ParseSubscribeTopic(tc.topic)
if tc.expectErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tc.domainID, domainID)
assert.Equal(t, tc.channelID, channelID)
assert.Equal(t, tc.subtopic, subtopic)
}
})
}
}
func BenchmarkParseSubscribeTopic(b *testing.B) {
for _, tc := range ParseSubscribeTestCases {
b.Run(tc.desc, func(b *testing.B) {
for i := 0; i < b.N; i++ {
_, _, _, _ = messaging.ParseSubscribeTopic(tc.topic)
}
})
}
}
func TestEncodeTopic(t *testing.T) {
cases := []struct {
desc string
domainID string
channelID string
subtopic string
expected string
}{
{
desc: "with subtopic",
domainID: "domain1",
channelID: "chan1",
subtopic: "dev.sensor.temp",
expected: "m.domain1.c.chan1.dev.sensor.temp",
},
{
desc: "without subtopic",
domainID: "domain1",
channelID: "chan1",
subtopic: "",
expected: "m.domain1.c.chan1",
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
actual := messaging.EncodeTopic(tc.domainID, tc.channelID, tc.subtopic)
assert.Equal(t, tc.expected, actual)
})
}
}
func TestEncodeTopicSuffix(t *testing.T) {
cases := []struct {
desc string
domainID string
channelID string
subtopic string
expected string
}{
{
desc: "with subtopic",
domainID: "domain1",
channelID: "chan1",
subtopic: "dev.sensor.temp",
expected: "domain1.c.chan1.dev.sensor.temp",
},
{
desc: "without subtopic",
domainID: "domain1",
channelID: "chan1",
subtopic: "",
expected: "domain1.c.chan1",
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
actual := messaging.EncodeTopicSuffix(tc.domainID, tc.channelID, tc.subtopic)
assert.Equal(t, tc.expected, actual)
})
}
}
func TestMessage_EncodeTopicSuffix(t *testing.T) {
cases := []struct {
desc string
message *messaging.Message
expected string
}{
{
desc: "with subtopic",
message: &messaging.Message{
Domain: "domainX",
Channel: "chanX",
Subtopic: "device.123.status",
},
expected: "domainX.c.chanX.device.123.status",
},
{
desc: "without subtopic",
message: &messaging.Message{
Domain: "domainY",
Channel: "chanY",
},
expected: "domainY.c.chanY",
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
actual := messaging.EncodeMessageTopic(tc.message)
assert.Equal(t, tc.expected, actual)
})
}
}
func TestMessage_EncodeToMQTTTopic(t *testing.T) {
cases := []struct {
desc string
message *messaging.Message
expected string
}{
{
desc: "with subtopic",
message: &messaging.Message{
Domain: "domainA",
Channel: "chanA",
Subtopic: "dev.1.temp",
},
expected: "m/domainA/c/chanA/dev/1/temp",
},
{
desc: "without subtopic",
message: &messaging.Message{
Domain: "domainB",
Channel: "chanB",
},
expected: "m/domainB/c/chanB",
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
actual := messaging.EncodeMessageMQTTTopic(tc.message)
assert.Equal(t, tc.expected, actual)
})
}
}
+1 -1
View File
@@ -13,7 +13,7 @@ import (
var defaultAttributes = []attribute.KeyValue{
attribute.Bool("messaging.destination.anonymous", false),
attribute.String("messaging.destination.template", "channels/{channelID}/messages/*"),
attribute.String("messaging.destination.template", "m/{domainID}/c/{channelID}/*"),
attribute.Bool("messaging.destination.temporary", true),
attribute.String("network.transport", "tcp"),
attribute.String("network.type", "ipv4"),
+4 -2
View File
@@ -10,6 +10,7 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"github.com/absmach/mgate"
@@ -173,14 +174,15 @@ func TestSendMessage(t *testing.T) {
},
}
for _, tc := range cases {
internalTopic := tc.domainID + ".c." + strings.ReplaceAll(tc.topic, "/", ".")
t.Run(tc.desc, func(t *testing.T) {
authzCall := clientsGRPCClient.On("Authenticate", mock.Anything, mock.Anything).Return(tc.authRes, tc.authErr)
authnCall := channelsGRPCClient.On("Authorize", mock.Anything, mock.Anything).Return(&grpcChannelsV1.AuthzRes{Authorized: true}, nil)
svcCall := pub.On("Publish", mock.Anything, channelID, mock.Anything).Return(tc.svcErr)
svcCall := pub.On("Publish", mock.Anything, internalTopic, mock.Anything).Return(tc.svcErr)
err := mgsdk.SendMessage(context.Background(), tc.domainID, tc.topic, tc.msg, tc.secret)
assert.Equal(t, tc.err, err)
if tc.err == nil {
ok := svcCall.Parent.AssertCalled(t, "Publish", mock.Anything, channelID, mock.Anything)
ok := svcCall.Parent.AssertCalled(t, "Publish", mock.Anything, internalTopic, mock.Anything)
assert.True(t, ok)
}
svcCall.Unset()
+2 -2
View File
@@ -42,7 +42,7 @@ will be transformed to:
The message format is stored in *the subtopic*. It's the last part of the subtopic. In the example:
```
http://localhost:8008/channels/<channelID>/messages/home/temperature/myFormat
http://localhost:8008/m/<domain_id>/c/<channelID>/home/temperature/myFormat
```
the message format is `myFormat`. It can be any valid subtopic name, JSON transformer is format-agnostic. The format is used by the JSON message consumers so that they can process the message properly. If the format is not present (i.e. message subtopic is empty), JSON Transformer will report an error. Since the Transformer is agnostic to the format, having format in the subtopic does not prevent the publisher to send the content of different formats to the same subtopic. It's up to the consumer to handle this kind of issue. Message writers, for example, will store the message(s) in the table/collection/measurement (depending on the underlying database) with the name of the format (which in the example is `myFormat`). SuperMQ writers will try to save any format received (whether it will be successful depends on the writer implementation and the underlying database), but it's recommended that the publisher takes care not to send different formats to the same subtopic.
@@ -50,5 +50,5 @@ the message format is `myFormat`. It can be any valid subtopic name, JSON transf
Having a message format in the subtopic means that the subscriber has an option to subscribe to only one message format. This is a nice feature because message subscribers know what's the expected format of the message so that they can process it. If the message format is not important, wildcard subtopic can always be used to subscribe to any message format:
```
http://localhost:8185/channels/<channelID>/messages/home/temperature/*
http://localhost:8185/m/<domain_id>/c/<channelID>/home/temperature/*
```
+4 -12
View File
@@ -5,7 +5,6 @@ package ws
import (
"context"
"fmt"
"strings"
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
@@ -17,8 +16,6 @@ import (
"github.com/absmach/supermq/pkg/policies"
)
const chansPrefix = "channels"
var (
// ErrFailedSubscription indicates that client couldn't subscribe to specified channel.
ErrFailedSubscription = errors.New("failed to subscribe to a channel")
@@ -67,11 +64,9 @@ func (svc *adapterService) Subscribe(ctx context.Context, sessionID, clientKey,
return svcerr.ErrAuthorization
}
subject := fmt.Sprintf("%s.%s", chansPrefix, chanID)
if subtopic != "" {
subject = fmt.Sprintf("%s.%s", subject, subtopic)
}
c.id = clientID
subject := messaging.EncodeTopic(domainID, chanID, subtopic)
subCfg := messaging.SubscriberConfig{
ID: sessionID,
ClientID: clientID,
@@ -79,17 +74,14 @@ func (svc *adapterService) Subscribe(ctx context.Context, sessionID, clientKey,
Handler: c,
}
if err := svc.pubsub.Subscribe(ctx, subCfg); err != nil {
return ErrFailedSubscription
return errors.Wrap(ErrFailedSubscription, err)
}
return nil
}
func (svc *adapterService) Unsubscribe(ctx context.Context, sessionID, domainID, chanID, subtopic string) error {
topic := fmt.Sprintf("%s.%s", chansPrefix, chanID)
if subtopic != "" {
topic = fmt.Sprintf("%s.%s", topic, subtopic)
}
topic := messaging.EncodeTopic(domainID, chanID, subtopic)
if err := svc.pubsub.Unsubscribe(ctx, sessionID, topic); err != nil {
return errors.Wrap(ErrFailedSubscribe, err)
+3 -2
View File
@@ -16,6 +16,7 @@ import (
climocks "github.com/absmach/supermq/clients/mocks"
"github.com/absmach/supermq/internal/testsutil"
"github.com/absmach/supermq/pkg/connections"
"github.com/absmach/supermq/pkg/errors"
svcerr "github.com/absmach/supermq/pkg/errors/service"
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/messaging/mocks"
@@ -185,7 +186,7 @@ func TestSubscribe(t *testing.T) {
for _, tc := range cases {
subConfig := messaging.SubscriberConfig{
ID: sessionID,
Topic: "channels." + tc.chanID + "." + subTopic,
Topic: "m." + tc.domainID + ".c." + tc.chanID + "." + subTopic,
ClientID: clientID,
Handler: c,
}
@@ -203,7 +204,7 @@ func TestSubscribe(t *testing.T) {
}).Return(tc.authZRes, tc.authZErr)
repocall := pubsub.On("Subscribe", mock.Anything, subConfig).Return(tc.subErr)
err := svc.Subscribe(context.Background(), sessionID, tc.clientKey, tc.domainID, tc.chanID, tc.subtopic, c)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
repocall.Unset()
clientsCall.Unset()
channelsCall.Unset()
+2 -2
View File
@@ -208,7 +208,7 @@ func TestHandshake(t *testing.T) {
subtopic: "",
header: true,
clientKey: clientKey,
status: http.StatusBadRequest,
status: http.StatusUnauthorized,
msg: []byte{},
},
{
@@ -228,7 +228,7 @@ func TestHandshake(t *testing.T) {
subtopic: "sub/a*b/topic",
header: true,
clientKey: clientKey,
status: http.StatusBadGateway,
status: http.StatusUnauthorized,
msg: msg,
},
}
+11 -51
View File
@@ -10,20 +10,14 @@ import (
"fmt"
"log/slog"
"net/http"
"net/url"
"regexp"
"strings"
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/ws"
"github.com/go-chi/chi/v5"
)
var (
channelPartRegExp = regexp.MustCompile(`^\/?m\/([\w\-]+)\/c\/([\w\-]+)(\/[^?]*)?(\?.*)?$`)
errGenSessionID = errors.New("failed to generate session id")
)
var errGenSessionID = errors.New("failed to generate session id")
func generateSessionID() (string, error) {
b := make([]byte, 32)
@@ -35,7 +29,7 @@ func generateSessionID() (string, error) {
func handshake(ctx context.Context, svc ws.Service, logger *slog.Logger) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
req, err := decodeRequest(r)
req, err := decodeRequest(r, logger)
if err != nil {
encodeError(w, err)
return
@@ -71,7 +65,7 @@ func handshake(ctx context.Context, svc ws.Service, logger *slog.Logger) http.Ha
}
}
func decodeRequest(r *http.Request) (connReq, error) {
func decodeRequest(r *http.Request, logger *slog.Logger) (connReq, error) {
authKey := r.Header.Get("Authorization")
if authKey == "" {
authKeys := r.URL.Query()["authorization"]
@@ -91,53 +85,19 @@ func decodeRequest(r *http.Request) (connReq, error) {
domainID: domainID,
}
channelParts := channelPartRegExp.FindStringSubmatch(r.RequestURI)
if len(channelParts) < 3 {
logger.Warn("Empty channel id or malformed url")
return connReq{}, errors.ErrMalformedEntity
}
subTopic := chi.URLParam(r, "*")
subtopic, err := parseSubTopic(channelParts[3])
if err != nil {
return connReq{}, err
if subTopic != "" {
subTopic, err := messaging.ParseSubscribeSubtopic(subTopic)
if err != nil {
return connReq{}, err
}
req.subtopic = subTopic
}
req.subtopic = subtopic
return req, nil
}
func parseSubTopic(subtopic string) (string, error) {
if subtopic == "" {
return subtopic, nil
}
subtopic, err := url.QueryUnescape(subtopic)
if err != nil {
return "", errMalformedSubtopic
}
subtopic = strings.ReplaceAll(subtopic, "/", ".")
elems := strings.Split(subtopic, ".")
filteredElems := []string{}
for _, elem := range elems {
if elem == "" {
continue
}
if len(elem) > 1 && (strings.Contains(elem, "*") || strings.Contains(elem, ">")) {
return "", errMalformedSubtopic
}
filteredElems = append(filteredElems, elem)
}
subtopic = strings.Join(filteredElems, ".")
return subtopic, nil
}
func encodeError(w http.ResponseWriter, err error) {
var statusCode int
-1
View File
@@ -45,6 +45,5 @@ func MakeHandler(ctx context.Context, svc ws.Service, l *slog.Logger, instanceID
mux.Get("/health", supermq.Health(service, instanceID))
mux.Handle("/metrics", promhttp.Handler())
return mux
}
+49 -21
View File
@@ -35,10 +35,11 @@ const (
// Client handles messaging and websocket connection.
type Client struct {
logger *slog.Logger
conn *websocket.Conn
id string
msg chan *messaging.Message
logger *slog.Logger
conn *websocket.Conn
id string
msg chan *messaging.Message
handledClose bool
}
// NewClient returns a new websocket client.
@@ -60,6 +61,20 @@ func (c *Client) Cancel() error {
return c.conn.Close()
}
// Close handles the websocket connection after unsubscribing.
func (c *Client) Close() error {
err := c.conn.Close()
if err != nil {
c.logger.Debug("failed to close websocket client", slog.String("session_id", c.id), slog.String("error", err.Error()))
}
ch := c.conn.CloseHandler()
err = ch(0, "")
if err != nil {
c.logger.Debug("failed to execute websocket connection close handler", slog.String("session_id", c.id), slog.String("error", err.Error()))
}
return nil
}
// Handle handles the sending and receiving of messages via the broker.
func (c *Client) Handle(msg *messaging.Message) error {
select {
@@ -79,38 +94,48 @@ func (c *Client) readPump(ctx context.Context, cancel context.CancelFunc) error
}
return nil
})
errCh := make(chan error, 1)
go func() {
errCh <- c.readMessage()
}()
for {
select {
case <-ctx.Done():
c.logger.Debug("read_pump: received context Done")
return nil
default:
msgType, msg, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
c.logger.Debug("read_pump: unexpected close error", slog.String("error", err.Error()))
return nil
}
return errors.Wrap(errReadMsg, err)
}
c.logger.Debug("read_pump: received message ", slog.Int("message_type", msgType), slog.String("message", string(msg)))
case err := <-errCh:
return err
}
}
}
func (c *Client) readMessage() error {
for {
msgType, msg, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
c.logger.Debug("read_pump: unexpected close error", slog.String("error", err.Error()))
return nil
}
return errors.Wrap(errReadMsg, err)
}
c.logger.Debug("read_pump: received message ", slog.Int("message_type", msgType), slog.String("message", string(msg)))
}
}
func (c *Client) writePump(ctx context.Context, cancel context.CancelFunc) error {
defer cancel()
ticker := time.NewTicker(pingPeriod)
defer ticker.Stop()
if err := c.conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
return err
}
for {
select {
case <-ctx.Done():
c.logger.Debug("write_pump: received context Done ")
return nil
case msg, ok := <-c.msg:
_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
if err := c.conn.WriteMessage(websocket.CloseMessage, []byte{}); err != nil {
return errors.Wrap(errHandlerClosedMsgChan, err)
@@ -121,7 +146,7 @@ func (c *Client) writePump(ctx context.Context, cancel context.CancelFunc) error
return errors.Wrap(errFailedToWriteMsg, err)
}
case <-ticker.C:
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
if err := c.conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(writeWait)); err != nil {
return errors.Wrap(errFailedToWritePing, err)
}
}
@@ -131,15 +156,18 @@ func (c *Client) writePump(ctx context.Context, cancel context.CancelFunc) error
// SetCloseHandler sets a close handler for the WebSocket connection.
func (c *Client) SetCloseHandler(handler func(code int, text string) error) {
c.conn.SetCloseHandler(func(code int, text string) error {
c.logger.Debug("WebSocket closed", slog.String("session_id", c.id), slog.Int("code", code), slog.String("text", text))
if err := handler(code, text); err != nil {
c.logger.Warn("Error in close handler", slog.String("error", err.Error()))
if !c.handledClose {
if err := handler(code, text); err != nil {
c.logger.Warn("Error in close handler", slog.String("error", err.Error()))
}
c.handledClose = true
}
return nil
})
}
func (c *Client) Start(ctx context.Context) {
defer c.Close()
ctx, cancel := context.WithCancel(ctx)
g, ctx := errgroup.WithContext(ctx)
+34 -85
View File
@@ -8,8 +8,6 @@ import (
"fmt"
"log/slog"
"net/http"
"net/url"
"regexp"
"strings"
"time"
@@ -40,13 +38,10 @@ const (
// Error wrappers for MQTT errors.
var (
channelRegExp = regexp.MustCompile(`^\/?m\/([\w\-]+)\/c\/([\w\-]+)(\/[^?]*)?(\?.*)?$`)
errMalformedSubtopic = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("malformed subtopic"))
errClientNotInitialized = mgate.NewHTTPProxyError(http.StatusInternalServerError, errors.New("client is not initialized"))
errMalformedTopic = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("malformed topic"))
errMissingTopicPub = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("failed to publish due to missing topic"))
errMissingTopicSub = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("failed to subscribe due to missing topic"))
errClientNotInitialized = errors.New("client is not initialized")
errMissingTopicPub = errors.New("failed to publish due to missing topic")
errMissingTopicSub = errors.New("failed to subscribe due to missing topic")
errFailedPublish = errors.New("failed to publish")
errFailedPublishToMsgBroker = errors.New("failed to publish to supermq message broker")
)
@@ -95,9 +90,21 @@ func (h *handler) AuthPublish(ctx context.Context, topic *string, payload *[]byt
token = string(s.Password)
}
_, _, err := h.authAccess(ctx, token, *topic, connections.Publish)
domainID, chanID, _, err := messaging.ParsePublishTopic(*topic)
if err != nil {
return err
}
return err
clientID, clientType, err := h.authAccess(ctx, token, domainID, chanID, connections.Publish)
if err != nil {
return err
}
if s.Username == "" && clientType == policies.ClientType {
s.Username = clientID
}
return nil
}
// AuthSubscribe is called on device publish,
@@ -112,7 +119,11 @@ func (h *handler) AuthSubscribe(ctx context.Context, topics *[]string) error {
}
for _, topic := range *topics {
if _, _, err := h.authAccess(ctx, string(s.Password), topic, connections.Subscribe); err != nil {
domainID, chanID, _, err := messaging.ParseSubscribeTopic(topic)
if err != nil {
return err
}
if _, _, err := h.authAccess(ctx, string(s.Password), domainID, chanID, connections.Subscribe); err != nil {
return err
}
}
@@ -137,41 +148,22 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
return nil
}
// Topics are in the format:
// m/<domain_id>/c/<channel_id>/<subtopic>/.../ct/<content_type>
channelParts := channelRegExp.FindStringSubmatch(*topic)
if len(channelParts) < 3 {
return errMalformedTopic
}
domainID := channelParts[1]
chanID := channelParts[2]
subtopic := channelParts[3]
subtopic, err := parseSubtopic(subtopic)
domainID, chanID, subtopic, err := messaging.ParsePublishTopic(*topic)
if err != nil {
return err
}
clientID, clientType, err := h.authAccess(ctx, string(s.Password), *topic, connections.Publish)
if err != nil {
return err
return errors.Wrap(errFailedPublish, err)
}
msg := messaging.Message{
Protocol: protocol,
Domain: domainID,
Channel: chanID,
Subtopic: subtopic,
Payload: *payload,
Created: time.Now().UnixNano(),
Protocol: protocol,
Domain: domainID,
Channel: chanID,
Subtopic: subtopic,
Payload: *payload,
Publisher: s.Username,
Created: time.Now().UnixNano(),
}
if clientType == policies.ClientType {
msg.Publisher = clientID
}
if err := h.pubsub.Publish(ctx, msg.GetChannel(), &msg); err != nil {
if err := h.pubsub.Publish(ctx, messaging.EncodeMessageTopic(&msg), &msg); err != nil {
return mgate.NewHTTPProxyError(http.StatusInternalServerError, errors.Wrap(errFailedPublishToMsgBroker, err))
}
@@ -200,7 +192,7 @@ func (h *handler) Disconnect(ctx context.Context) error {
return nil
}
func (h *handler) authAccess(ctx context.Context, token, topic string, msgType connections.ConnType) (string, string, mgate.HTTPProxyError) {
func (h *handler) authAccess(ctx context.Context, token, domainID, chanID string, msgType connections.ConnType) (string, string, error) {
authnReq := &grpcClientsV1.AuthnReq{
ClientSecret: token,
}
@@ -218,20 +210,6 @@ func (h *handler) authAccess(ctx context.Context, token, topic string, msgType c
clientType := policies.ClientType
clientID := authnRes.GetId()
// Topics are in the format:
// m/<domain_id>/c/<channel_id>/<subtopic>/.../ct/<content_type>
if !channelRegExp.MatchString(topic) {
return "", "", mgate.NewHTTPProxyError(http.StatusBadRequest, errMalformedTopic)
}
channelParts := channelRegExp.FindStringSubmatch(topic)
if len(channelParts) < 3 {
return "", "", mgate.NewHTTPProxyError(http.StatusBadRequest, errMalformedTopic)
}
domainID := channelParts[1]
chanID := channelParts[2]
ar := &grpcChannelsV1.AuthzReq{
Type: uint32(msgType),
ClientId: clientID,
@@ -250,35 +228,6 @@ func (h *handler) authAccess(ctx context.Context, token, topic string, msgType c
return clientID, clientType, nil
}
func parseSubtopic(subtopic string) (string, mgate.HTTPProxyError) {
if subtopic == "" {
return subtopic, nil
}
subtopic, err := url.QueryUnescape(subtopic)
if err != nil {
return "", errMalformedSubtopic
}
subtopic = strings.ReplaceAll(subtopic, "/", ".")
elems := strings.Split(subtopic, ".")
filteredElems := []string{}
for _, elem := range elems {
if elem == "" {
continue
}
if len(elem) > 1 && (strings.Contains(elem, "*") || strings.Contains(elem, ">")) {
return "", errMalformedSubtopic
}
filteredElems = append(filteredElems, elem)
}
subtopic = strings.Join(filteredElems, ".")
return subtopic, nil
}
// extractClientSecret returns value of the client secret. If there is no client key - an empty value is returned.
func extractClientSecret(token string) string {
if !strings.HasPrefix(token, apiutil.ClientPrefix) {