SMQ-2703 - Shorten channel and message prefix in message topics (#2764)

Signed-off-by: Felix Gateru <felix.gateru@gmail.com>
This commit is contained in:
Felix Gateru
2025-03-21 00:29:15 +03:00
committed by GitHub
parent f6b3938f96
commit 3b675a7ab3
17 changed files with 30 additions and 30 deletions
+1 -1
View File
@@ -37,7 +37,7 @@ servers:
- user-password: []
channels:
channels/{channelID}/messages/{subtopic}:
ch/{channelID}/msg/{subtopic}:
parameters:
channelID:
$ref: '#/components/parameters/channelID'
+1 -1
View File
@@ -32,7 +32,7 @@ servers:
default: '8186'
channels:
'channels/{channelID}/messages/{subtopic}':
'ch/{channelID}/msg/{subtopic}':
parameters:
channelID:
$ref: '#/components/parameters/channelID'
+1 -1
View File
@@ -27,7 +27,7 @@ tags:
url: https://docs.supermq.abstractmachines.fr/
paths:
/channels/{id}/messages:
/ch/{id}/msg:
post:
summary: Sends message to the communication channel
description: |
+1 -1
View File
@@ -33,7 +33,7 @@ const (
startObserve = 0 // observe option value that indicates start of observation
)
var channelPartRegExp = regexp.MustCompile(`^/channels/([\w\-]+)/messages(/[^?]*)?(\?.*)?$`)
var channelPartRegExp = regexp.MustCompile(`^/ch/([\w\-]+)/msg(/[^?]*)?(\?.*)?$`)
const (
numGroups = 3 // entire expression + channel group + subtopic group
+1 -1
View File
@@ -227,7 +227,7 @@ func TestPublish(t *testing.T) {
req := testRequest{
client: ts.Client(),
method: http.MethodPost,
url: fmt.Sprintf("%s/channels/%s/messages", ts.URL, tc.chanID),
url: fmt.Sprintf("%s/ch/%s/msg", ts.URL, tc.chanID),
contentType: tc.contentType,
token: tc.key,
body: strings.NewReader(tc.msg),
+2 -2
View File
@@ -33,14 +33,14 @@ func MakeHandler(logger *slog.Logger, instanceID string) http.Handler {
}
r := chi.NewRouter()
r.Post("/channels/{chanID}/messages", otelhttp.NewHandler(kithttp.NewServer(
r.Post("/ch/{chanID}/msg", otelhttp.NewHandler(kithttp.NewServer(
sendMessageEndpoint(),
decodeRequest,
api.EncodeResponse,
opts...,
), "publish").ServeHTTP)
r.Post("/channels/{chanID}/messages/*", otelhttp.NewHandler(kithttp.NewServer(
r.Post("/ch/{chanID}/msg/*", otelhttp.NewHandler(kithttp.NewServer(
sendMessageEndpoint(),
decodeRequest,
api.EncodeResponse,
+2 -2
View File
@@ -55,7 +55,7 @@ var (
errFailedParseSubtopic = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("failed to parse subtopic"))
)
var channelRegExp = regexp.MustCompile(`^\/?channels\/([\w\-]+)\/messages(\/[^?]*)?(\?.*)?$`)
var channelRegExp = regexp.MustCompile(`^\/?ch\/([\w\-]+)\/msg(\/[^?]*)?(\?.*)?$`)
// Event implements events.Event interface.
type handler struct {
@@ -210,7 +210,7 @@ func (h *handler) Disconnect(ctx context.Context) error {
func parseTopic(topic string) (string, string, error) {
// Topics are in the format:
// channels/<channel_id>/messages/<subtopic>/.../ct/<content_type>
// ch/<channel_id>/msg/<subtopic>/.../ct/<content_type>
channelParts := channelRegExp.FindStringSubmatch(topic)
if len(channelParts) < 2 {
return "", "", errors.Wrap(errFailedPublish, errMalformedTopic)
+3 -3
View File
@@ -35,12 +35,12 @@ const (
chanID = "123e4567-e89b-12d3-a456-000000000001"
invalidID = "invalidID"
invalidValue = "invalidValue"
invalidChannelIDTopic = "channels/**/messages"
invalidChannelIDTopic = "ch/**/msg"
)
var (
topicMsg = "channels/%s/messages"
subtopicMsg = "channels/%s/messages/subtopic"
topicMsg = "ch/%s/msg"
subtopicMsg = "ch/%s/msg/subtopic"
topic = fmt.Sprintf(topicMsg, chanID)
subtopic = fmt.Sprintf(subtopicMsg, chanID)
invalidTopic = invalidValue
+1 -1
View File
@@ -20,7 +20,7 @@ const streamID = "supermq.mqtt"
var (
errFailedSession = errors.New("failed to obtain session from context")
errMalformedTopic = errors.New("malformed topic")
channelRegExp = regexp.MustCompile(`^\/?channels\/([\w\-]+)\/messages(\/[^?]*)?(\?.*)?$`)
channelRegExp = regexp.MustCompile(`^\/?ch\/([\w\-]+)\/msg(\/[^?]*)?(\?.*)?$`)
)
// EventStore is a struct used to store event streams in Redis.
+1 -1
View File
@@ -49,7 +49,7 @@ func handle(ctx context.Context, pub messaging.Publisher, logger *slog.Logger) h
}
// Use concatenation instead of fmt.Sprintf for the
// sake of simplicity and performance.
topic := "channels/" + msg.GetChannel() + "/messages"
topic := "ch/" + msg.GetChannel() + "/msg"
if msg.GetSubtopic() != "" {
topic = topic + "/" + strings.ReplaceAll(msg.GetSubtopic(), ".", "/")
}
+3 -3
View File
@@ -57,7 +57,7 @@ var (
var (
errInvalidUserId = errors.New("invalid user id")
channelRegExp = regexp.MustCompile(`^\/?channels\/([\w\-]+)\/messages(\/[^?]*)?(\?.*)?$`)
channelRegExp = regexp.MustCompile(`^\/?ch\/([\w\-]+)\/msg(\/[^?]*)?(\?.*)?$`)
)
// Event implements events.Event interface.
@@ -159,7 +159,7 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
}
h.logger.Info(fmt.Sprintf(LogInfoPublished, s.ID, *topic))
// Topics are in the format:
// channels/<channel_id>/messages/<subtopic>/.../ct/<content_type>
// ch/<channel_id>/msg/<subtopic>/.../ct/<content_type>
channelParts := channelRegExp.FindStringSubmatch(*topic)
if len(channelParts) < 2 {
@@ -225,7 +225,7 @@ func (h *handler) Disconnect(ctx context.Context) error {
func (h *handler) authAccess(ctx context.Context, clientID, topic string, msgType connections.ConnType) error {
// Topics are in the format:
// channels/<channel_id>/messages/<subtopic>/.../ct/<content_type>
// ch/<channel_id>/msg/<subtopic>/.../ct/<content_type>
if !channelRegExp.MatchString(topic) {
return ErrMalformedTopic
}
+2 -2
View File
@@ -36,11 +36,11 @@ const (
clientID = "clientID"
clientID1 = "clientID1"
subtopic = "testSubtopic"
invalidChannelIDTopic = "channels/**/messages"
invalidChannelIDTopic = "ch/**/msg"
)
var (
topicMsg = "channels/%s/messages"
topicMsg = "ch/%s/msg"
topic = fmt.Sprintf(topicMsg, chanID)
invalidTopic = invalidValue
payload = []byte("[{'n':'test-name', 'v': 1.2}]")
+1 -1
View File
@@ -22,7 +22,7 @@ func (sdk mgSDK) SendMessage(chanName, msg, key string) errors.SDKError {
subtopicPart = fmt.Sprintf("/%s", strings.ReplaceAll(chanNameParts[1], ".", "/"))
}
reqURL := fmt.Sprintf("%s/channels/%s/messages%s", sdk.httpAdapterURL, chanID, subtopicPart)
reqURL := fmt.Sprintf("%s/ch/%s/msg%s", sdk.httpAdapterURL, chanID, subtopicPart)
_, _, err := sdk.processRequest(http.MethodPost, reqURL, ClientPrefix+key, []byte(msg), nil, http.StatusAccepted)
+4 -4
View File
@@ -65,9 +65,9 @@ func makeURL(tsURL, chanID, subtopic, clientKey string, header bool) (string, er
if chanID == "0" || chanID == "" {
if header {
return fmt.Sprintf("%s/channels/%s/messages", u, chanID), fmt.Errorf("invalid channel id")
return fmt.Sprintf("%s/ch/%s/msg", u, chanID), fmt.Errorf("invalid channel id")
}
return fmt.Sprintf("%s/channels/%s/messages?authorization=%s", u, chanID, clientKey), fmt.Errorf("invalid channel id")
return fmt.Sprintf("%s/ch/%s/msg?authorization=%s", u, chanID, clientKey), fmt.Errorf("invalid channel id")
}
subtopicPart := ""
@@ -75,10 +75,10 @@ func makeURL(tsURL, chanID, subtopic, clientKey string, header bool) (string, er
subtopicPart = fmt.Sprintf("/%s", subtopic)
}
if header {
return fmt.Sprintf("%s/channels/%s/messages%s", u, chanID, subtopicPart), nil
return fmt.Sprintf("%s/ch/%s/msg%s", u, chanID, subtopicPart), nil
}
return fmt.Sprintf("%s/channels/%s/messages%s?authorization=%s", u, chanID, subtopicPart, clientKey), nil
return fmt.Sprintf("%s/ch/%s/msg%s?authorization=%s", u, chanID, subtopicPart, clientKey), nil
}
func handshake(tsURL, chanID, subtopic, clientKey string, addHeader bool) (*websocket.Conn, *http.Response, error) {
+1 -1
View File
@@ -16,7 +16,7 @@ import (
"github.com/go-chi/chi/v5"
)
var channelPartRegExp = regexp.MustCompile(`^/channels/([\w\-]+)/messages(/[^?]*)?(\?.*)?$`)
var channelPartRegExp = regexp.MustCompile(`^/ch/([\w\-]+)/msg(/[^?]*)?(\?.*)?$`)
func handshake(ctx context.Context, svc ws.Service) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
+2 -2
View File
@@ -40,8 +40,8 @@ func MakeHandler(ctx context.Context, svc ws.Service, l *slog.Logger, instanceID
logger = l
mux := chi.NewRouter()
mux.Get("/channels/{chanID}/messages", handshake(ctx, svc))
mux.Get("/channels/{chanID}/messages/*", handshake(ctx, svc))
mux.Get("/ch/{chanID}/msg", handshake(ctx, svc))
mux.Get("/ch/{chanID}/msg/*", handshake(ctx, svc))
mux.Get("/health", supermq.Health(service, instanceID))
mux.Handle("/metrics", promhttp.Handler())
+3 -3
View File
@@ -50,7 +50,7 @@ var (
errFailedPublishToMsgBroker = errors.New("failed to publish to supermq message broker")
)
var channelRegExp = regexp.MustCompile(`^\/?channels\/([\w\-]+)\/messages(\/[^?]*)?(\?.*)?$`)
var channelRegExp = regexp.MustCompile(`^\/?ch\/([\w\-]+)\/msg(\/[^?]*)?(\?.*)?$`)
// Event implements events.Event interface.
type handler struct {
@@ -139,7 +139,7 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
}
// Topics are in the format:
// channels/<channel_id>/messages/<subtopic>/.../ct/<content_type>
// ch/<channel_id>/msg/<subtopic>/.../ct/<content_type>
channelParts := channelRegExp.FindStringSubmatch(*topic)
if len(channelParts) < 2 {
return errors.Wrap(errFailedPublish, errMalformedTopic)
@@ -224,7 +224,7 @@ func (h *handler) authAccess(ctx context.Context, token, topic string, msgType c
clientID := authnRes.GetId()
// Topics are in the format:
// channels/<channel_id>/messages/<subtopic>/.../ct/<content_type>
// ch/<channel_id>/msg/<subtopic>/.../ct/<content_type>
if !channelRegExp.MatchString(topic) {
return "", "", errMalformedTopic
}