mirror of
https://github.com/absmach/magistrala.git
synced 2026-06-23 04:10:28 +00:00
SMQ-2703 - Shorten channel and message prefix in message topics (#2787)
Signed-off-by: Felix Gateru <felix.gateru@gmail.com>
This commit is contained in:
@@ -37,7 +37,7 @@ servers:
|
||||
- user-password: []
|
||||
|
||||
channels:
|
||||
ch/{channelID}/msg/{subtopic}:
|
||||
c/{channelID}/m/{subtopic}:
|
||||
parameters:
|
||||
channelID:
|
||||
$ref: '#/components/parameters/channelID'
|
||||
|
||||
@@ -32,7 +32,7 @@ servers:
|
||||
default: '8186'
|
||||
|
||||
channels:
|
||||
'ch/{channelID}/msg/{subtopic}':
|
||||
'c/{channelID}/m/{subtopic}':
|
||||
parameters:
|
||||
channelID:
|
||||
$ref: '#/components/parameters/channelID'
|
||||
|
||||
@@ -27,7 +27,7 @@ tags:
|
||||
url: https://docs.supermq.abstractmachines.fr/
|
||||
|
||||
paths:
|
||||
/ch/{id}/msg:
|
||||
/c/{id}/m:
|
||||
post:
|
||||
summary: Sends message to the communication channel
|
||||
description: |
|
||||
|
||||
@@ -33,7 +33,7 @@ const (
|
||||
startObserve = 0 // observe option value that indicates start of observation
|
||||
)
|
||||
|
||||
var channelPartRegExp = regexp.MustCompile(`^/ch/([\w\-]+)/msg(/[^?]*)?(\?.*)?$`)
|
||||
var channelPartRegExp = regexp.MustCompile(`^/c/([\w\-]+)/m(/[^?]*)?(\?.*)?$`)
|
||||
|
||||
const (
|
||||
numGroups = 3 // entire expression + channel group + subtopic group
|
||||
|
||||
@@ -227,7 +227,7 @@ func TestPublish(t *testing.T) {
|
||||
req := testRequest{
|
||||
client: ts.Client(),
|
||||
method: http.MethodPost,
|
||||
url: fmt.Sprintf("%s/ch/%s/msg", ts.URL, tc.chanID),
|
||||
url: fmt.Sprintf("%s/c/%s/m", ts.URL, tc.chanID),
|
||||
contentType: tc.contentType,
|
||||
token: tc.key,
|
||||
body: strings.NewReader(tc.msg),
|
||||
|
||||
@@ -33,14 +33,14 @@ func MakeHandler(logger *slog.Logger, instanceID string) http.Handler {
|
||||
}
|
||||
|
||||
r := chi.NewRouter()
|
||||
r.Post("/ch/{chanID}/msg", otelhttp.NewHandler(kithttp.NewServer(
|
||||
r.Post("/c/{chanID}/m", otelhttp.NewHandler(kithttp.NewServer(
|
||||
sendMessageEndpoint(),
|
||||
decodeRequest,
|
||||
api.EncodeResponse,
|
||||
opts...,
|
||||
), "publish").ServeHTTP)
|
||||
|
||||
r.Post("/ch/{chanID}/msg/*", otelhttp.NewHandler(kithttp.NewServer(
|
||||
r.Post("/c/{chanID}/m/*", otelhttp.NewHandler(kithttp.NewServer(
|
||||
sendMessageEndpoint(),
|
||||
decodeRequest,
|
||||
api.EncodeResponse,
|
||||
|
||||
+2
-2
@@ -55,7 +55,7 @@ var (
|
||||
errFailedParseSubtopic = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("failed to parse subtopic"))
|
||||
)
|
||||
|
||||
var channelRegExp = regexp.MustCompile(`^\/?ch\/([\w\-]+)\/msg(\/[^?]*)?(\?.*)?$`)
|
||||
var channelRegExp = regexp.MustCompile(`^\/?c\/([\w\-]+)\/m(\/[^?]*)?(\?.*)?$`)
|
||||
|
||||
// Event implements events.Event interface.
|
||||
type handler struct {
|
||||
@@ -210,7 +210,7 @@ func (h *handler) Disconnect(ctx context.Context) error {
|
||||
|
||||
func parseTopic(topic string) (string, string, error) {
|
||||
// Topics are in the format:
|
||||
// ch/<channel_id>/msg/<subtopic>/.../ct/<content_type>
|
||||
// c/<channel_id>/m/<subtopic>/.../ct/<content_type>
|
||||
channelParts := channelRegExp.FindStringSubmatch(topic)
|
||||
if len(channelParts) < 2 {
|
||||
return "", "", errors.Wrap(errFailedPublish, errMalformedTopic)
|
||||
|
||||
@@ -35,12 +35,12 @@ const (
|
||||
chanID = "123e4567-e89b-12d3-a456-000000000001"
|
||||
invalidID = "invalidID"
|
||||
invalidValue = "invalidValue"
|
||||
invalidChannelIDTopic = "ch/**/msg"
|
||||
invalidChannelIDTopic = "c/**/m"
|
||||
)
|
||||
|
||||
var (
|
||||
topicMsg = "ch/%s/msg"
|
||||
subtopicMsg = "ch/%s/msg/subtopic"
|
||||
topicMsg = "c/%s/m"
|
||||
subtopicMsg = "c/%s/m/subtopic"
|
||||
topic = fmt.Sprintf(topicMsg, chanID)
|
||||
subtopic = fmt.Sprintf(subtopicMsg, chanID)
|
||||
invalidTopic = invalidValue
|
||||
|
||||
@@ -25,7 +25,7 @@ const (
|
||||
var (
|
||||
errFailedSession = errors.New("failed to obtain session from context")
|
||||
errMalformedTopic = errors.New("malformed topic")
|
||||
channelRegExp = regexp.MustCompile(`^\/?ch\/([\w\-]+)\/msg(\/[^?]*)?(\?.*)?$`)
|
||||
channelRegExp = regexp.MustCompile(`^\/?c\/([\w\-]+)\/m(\/[^?]*)?(\?.*)?$`)
|
||||
)
|
||||
|
||||
// EventStore is a struct used to store event streams in Redis.
|
||||
|
||||
+1
-1
@@ -49,7 +49,7 @@ func handle(ctx context.Context, pub messaging.Publisher, logger *slog.Logger) h
|
||||
}
|
||||
// Use concatenation instead of fmt.Sprintf for the
|
||||
// sake of simplicity and performance.
|
||||
topic := "ch/" + msg.GetChannel() + "/msg"
|
||||
topic := "c/" + msg.GetChannel() + "/m"
|
||||
if msg.GetSubtopic() != "" {
|
||||
topic = topic + "/" + strings.ReplaceAll(msg.GetSubtopic(), ".", "/")
|
||||
}
|
||||
|
||||
+3
-3
@@ -57,7 +57,7 @@ var (
|
||||
|
||||
var (
|
||||
errInvalidUserId = errors.New("invalid user id")
|
||||
channelRegExp = regexp.MustCompile(`^\/?ch\/([\w\-]+)\/msg(\/[^?]*)?(\?.*)?$`)
|
||||
channelRegExp = regexp.MustCompile(`^\/?c\/([\w\-]+)\/m(\/[^?]*)?(\?.*)?$`)
|
||||
)
|
||||
|
||||
// Event implements events.Event interface.
|
||||
@@ -159,7 +159,7 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
|
||||
}
|
||||
h.logger.Info(fmt.Sprintf(LogInfoPublished, s.ID, *topic))
|
||||
// Topics are in the format:
|
||||
// ch/<channel_id>/msg/<subtopic>/.../ct/<content_type>
|
||||
// c/<channel_id>/m/<subtopic>/.../ct/<content_type>
|
||||
|
||||
channelParts := channelRegExp.FindStringSubmatch(*topic)
|
||||
if len(channelParts) < 2 {
|
||||
@@ -225,7 +225,7 @@ func (h *handler) Disconnect(ctx context.Context) error {
|
||||
|
||||
func (h *handler) authAccess(ctx context.Context, clientID, topic string, msgType connections.ConnType) error {
|
||||
// Topics are in the format:
|
||||
// ch/<channel_id>/msg/<subtopic>/.../ct/<content_type>
|
||||
// c/<channel_id>/m/<subtopic>/.../ct/<content_type>
|
||||
if !channelRegExp.MatchString(topic) {
|
||||
return ErrMalformedTopic
|
||||
}
|
||||
|
||||
@@ -36,11 +36,11 @@ const (
|
||||
clientID = "clientID"
|
||||
clientID1 = "clientID1"
|
||||
subtopic = "testSubtopic"
|
||||
invalidChannelIDTopic = "ch/**/msg"
|
||||
invalidChannelIDTopic = "c/**/m"
|
||||
)
|
||||
|
||||
var (
|
||||
topicMsg = "ch/%s/msg"
|
||||
topicMsg = "c/%s/m"
|
||||
topic = fmt.Sprintf(topicMsg, chanID)
|
||||
invalidTopic = invalidValue
|
||||
payload = []byte("[{'n':'test-name', 'v': 1.2}]")
|
||||
|
||||
+1
-1
@@ -23,7 +23,7 @@ func (sdk mgSDK) SendMessage(ctx context.Context, chanName, msg, key string) err
|
||||
subtopicPart = fmt.Sprintf("/%s", strings.ReplaceAll(chanNameParts[1], ".", "/"))
|
||||
}
|
||||
|
||||
reqURL := fmt.Sprintf("%s/ch/%s/msg%s", sdk.httpAdapterURL, chanID, subtopicPart)
|
||||
reqURL := fmt.Sprintf("%s/c/%s/m%s", sdk.httpAdapterURL, chanID, subtopicPart)
|
||||
|
||||
_, _, err := sdk.processRequest(ctx, http.MethodPost, reqURL, ClientPrefix+key, []byte(msg), nil, http.StatusAccepted)
|
||||
|
||||
|
||||
@@ -65,9 +65,9 @@ func makeURL(tsURL, chanID, subtopic, clientKey string, header bool) (string, er
|
||||
|
||||
if chanID == "0" || chanID == "" {
|
||||
if header {
|
||||
return fmt.Sprintf("%s/ch/%s/msg", u, chanID), fmt.Errorf("invalid channel id")
|
||||
return fmt.Sprintf("%s/c/%s/m", u, chanID), fmt.Errorf("invalid channel id")
|
||||
}
|
||||
return fmt.Sprintf("%s/ch/%s/msg?authorization=%s", u, chanID, clientKey), fmt.Errorf("invalid channel id")
|
||||
return fmt.Sprintf("%s/c/%s/m?authorization=%s", u, chanID, clientKey), fmt.Errorf("invalid channel id")
|
||||
}
|
||||
|
||||
subtopicPart := ""
|
||||
@@ -75,10 +75,10 @@ func makeURL(tsURL, chanID, subtopic, clientKey string, header bool) (string, er
|
||||
subtopicPart = fmt.Sprintf("/%s", subtopic)
|
||||
}
|
||||
if header {
|
||||
return fmt.Sprintf("%s/ch/%s/msg%s", u, chanID, subtopicPart), nil
|
||||
return fmt.Sprintf("%s/c/%s/m%s", u, chanID, subtopicPart), nil
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s/ch/%s/msg%s?authorization=%s", u, chanID, subtopicPart, clientKey), nil
|
||||
return fmt.Sprintf("%s/c/%s/m%s?authorization=%s", u, chanID, subtopicPart, clientKey), nil
|
||||
}
|
||||
|
||||
func handshake(tsURL, chanID, subtopic, clientKey string, addHeader bool) (*websocket.Conn, *http.Response, error) {
|
||||
|
||||
+1
-1
@@ -16,7 +16,7 @@ import (
|
||||
"github.com/go-chi/chi/v5"
|
||||
)
|
||||
|
||||
var channelPartRegExp = regexp.MustCompile(`^/ch/([\w\-]+)/msg(/[^?]*)?(\?.*)?$`)
|
||||
var channelPartRegExp = regexp.MustCompile(`^/c/([\w\-]+)/m(/[^?]*)?(\?.*)?$`)
|
||||
|
||||
func handshake(ctx context.Context, svc ws.Service) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
+2
-2
@@ -40,8 +40,8 @@ func MakeHandler(ctx context.Context, svc ws.Service, l *slog.Logger, instanceID
|
||||
logger = l
|
||||
|
||||
mux := chi.NewRouter()
|
||||
mux.Get("/ch/{chanID}/msg", handshake(ctx, svc))
|
||||
mux.Get("/ch/{chanID}/msg/*", handshake(ctx, svc))
|
||||
mux.Get("/c/{chanID}/m", handshake(ctx, svc))
|
||||
mux.Get("/c/{chanID}/m/*", handshake(ctx, svc))
|
||||
|
||||
mux.Get("/health", supermq.Health(service, instanceID))
|
||||
mux.Handle("/metrics", promhttp.Handler())
|
||||
|
||||
+3
-3
@@ -50,7 +50,7 @@ var (
|
||||
errFailedPublishToMsgBroker = errors.New("failed to publish to supermq message broker")
|
||||
)
|
||||
|
||||
var channelRegExp = regexp.MustCompile(`^\/?ch\/([\w\-]+)\/msg(\/[^?]*)?(\?.*)?$`)
|
||||
var channelRegExp = regexp.MustCompile(`^\/?c\/([\w\-]+)\/m(\/[^?]*)?(\?.*)?$`)
|
||||
|
||||
// Event implements events.Event interface.
|
||||
type handler struct {
|
||||
@@ -139,7 +139,7 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
|
||||
}
|
||||
|
||||
// Topics are in the format:
|
||||
// ch/<channel_id>/msg/<subtopic>/.../ct/<content_type>
|
||||
// c/<channel_id>/m/<subtopic>/.../ct/<content_type>
|
||||
channelParts := channelRegExp.FindStringSubmatch(*topic)
|
||||
if len(channelParts) < 2 {
|
||||
return errors.Wrap(errFailedPublish, errMalformedTopic)
|
||||
@@ -224,7 +224,7 @@ func (h *handler) authAccess(ctx context.Context, token, topic string, msgType c
|
||||
clientID := authnRes.GetId()
|
||||
|
||||
// Topics are in the format:
|
||||
// ch/<channel_id>/msg/<subtopic>/.../ct/<content_type>
|
||||
// c/<channel_id>/m/<subtopic>/.../ct/<content_type>
|
||||
if !channelRegExp.MatchString(topic) {
|
||||
return "", "", errMalformedTopic
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user